Source code for sofia_redux.instruments.hawc.steps.stepfocus

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Focus analysis pipeline step."""

from astropy import log
from astropy import wcs as astwcs
from matplotlib.backends.backend_agg \
    import FigureCanvasAgg as FigureCanvas
from matplotlib.figure import Figure
from matplotlib.patches import Ellipse
import numpy as np
from scipy import ndimage, optimize

from sofia_redux.instruments.hawc.stepmoparent import StepMOParent

__all__ = ['StepFocus']


[docs] class StepFocus(StepMOParent): """ Calculate an optimal focus value from short calibration scans. This step fits and reports the best focus offset from a set of image with varying focus values. Input for this step is a set calibrated scan maps. This step is typically run after `sofia_redux.instruments.hawc.steps.StepStdPhotCal`. The output from this step is identical to the input. It is not typically saved. As a side effect, this step produces several PNG images of focus plots, written to the same directory and basename as the input file. """
[docs] def setup(self): """ Set parameters and metadata for the pipeline step. Output files have PRODTYPE = 'focus', and are named with the step abbreviation 'FCS'. Parameters defined for this step are: widowisgood : bool Include widow pixels in the analysis if set. medianaverage : bool If set, replace missing pixels with a local median value. boxaverage : int Size of the box used in medianaverage. autocrop : bool If set, the image will be automatcally be cropped to center the target. cropimage : bool If set, the image will be cropped, using 'xyboxcent' and 'boxsizecrop' parameters; 'autocrop' overrides this option if set. xyboxcent : list Central [x, y] pixel to crop around, if cropimage is set; 'autocrop' overrides this option if set. boxsizecrop : int Box size to crop to, if cropimage is set; 'autocrop' overrides this option if set. primaryimg : str Image extension name to use for the fit. If blank, the first image extension is used. """ # Name of the pipeline reduction step self.name = 'focus' self.description = 'Make Focus Plots' # Shortcut for pipeline reduction step and identifier for # saved file names. self.procname = 'fcs' # Clear Parameter list self.paramlist = [] # Append parameters self.paramlist.append(['widowisgood', True, 'Include widow pixels in the analysis (T) ' 'or only good, non-widow pixels (F)']) self.paramlist.append(['medianaverage', True, 'Run a median average box through the array ' 'to fill bad pixels (T) or not (F)']) self.paramlist.append(['boxaverage', 3, 'Size of the median average box ' '(if medianaverage is True) in pixels']) self.paramlist.append(['autocrop', True, 'Crop image automatically around the target ' '(w/ boxsize = 1/3 of image size)']) self.paramlist.append(['cropimage', False, 'Crop portion (box) of the image ' 'for analysis?']) self.paramlist.append(['xyboxcent', [32, 20], 'If cropimage = True, central X/Y pixel ' 'position of the box to be cropped']) self.paramlist.append(['boxsizecrop', 20, 'If cropimage = True, size of the box ' 'to be cropped (in pixels)']) self.paramlist.append(['primaryimg', '', 'Specifies which image will be used for ' 'the Gaussian fit. If left blank, the ' 'first image will be used.'])
[docs] def gaussian(self, height, center_x, center_y, width_x, width_y, bgoffset): """ Return a Gaussian function with the given parameters. Parameters ---------- height : float Gaussian amplitude. center_x : float Center x pixel. center_y : float Center y pixel. width_x : float Gaussian width, x-direction. width_y : float Gaussian width, y-direction. bgoffset : float Background level. Returns ------- function The Gaussian function. Arguments are x, y. """ width_x = float(width_x) width_y = float(width_y) def gauss(x, y): g = height * np.exp( -(((center_x - x) / width_x)**2 + ((center_y - y) / width_y)**2) / 2.) + bgoffset return g return gauss
[docs] def moments(self, data): """ Compute Gaussian parameters from moments. Parameters ---------- data : array-like The image to fit. Returns ------- tuple of float Elements are the Gaussian parameters for the 2D distribution: height, x, y, width_x, width_y, bgoffset. """ total = np.nansum(data) bgoffset = np.nanmedian(data) big_y, big_x = np.indices(data.shape) ysize, xsize = data.shape if abs(total) == 0: total = 1e-7 x = np.nansum(big_x * data) / total y = np.nansum(big_y * data) / total log.debug("Moments: x=%f y=%f tot=%f bgoff=%f" % (x, y, total, float(bgoffset))) # If the initial guess for x and y is outside the array, # will assume a guess in the center of the image # (this situation might happen in case there is no source # in the image. The assumption is used to avoid an index # problem in the col and row definitions below if x >= xsize or x <= 0 or np.isnan(x): x = xsize / 2. if y >= ysize or y <= 0 or np.isnan(y): y = ysize / 2. col = data[int(y), :] width_x = np.sqrt(np.nansum(abs((np.arange(col.size) - y)**2 * col)) / abs(np.nansum(col))) row = data[:, int(x)] width_y = np.sqrt(np.nansum(abs((np.arange(row.size) - x)**2 * row)) / abs(np.nansum(row))) height = np.nanmax(data) log.debug("Moments: returning h=%f x=%f y=%f wx=%f wy=%f bg=%f" % (float(height), x, y, width_x, width_y, float(bgoffset))) return height, x, y, width_x, width_y, bgoffset
[docs] def fitgaussian(self, data, nanpix, medianaverage): """ Fit a Gaussian function to an image. Parameters ---------- data : array-like The image to fit. nanpix : array-like A mask or index array indicating the positions of NaN pixels. medianaverage : bool If not set, NaN pixels will be replaced with model values. Returns ------- height, x, y, width_x, width_y : tuple of float Parameters for a Gaussian fit to the data. """ params = list(self.moments(data)) data_img = data.copy() # If medianaverage is False, # assume that for bad pixels (NaNs), the value # on the array is equal to the model # (so that the difference -- errorfunction -- will be zero) if not medianaverage: model = self.gaussian(*params)(*np.indices(data.shape)) data_img[nanpix] = model[nanpix] # function to fit def errorfunction(par): return np.ravel(self.gaussian(*par)(*np.indices(data_img.shape)) - data_img) result = optimize.leastsq(errorfunction, np.array(params)) amp, centy, centx, widy, widx, offset = result[0] success = result[1] log.debug("Fit Values: returning h=%f, x=%f y=%f " "wx=%f wy=%f bg=%f" % (amp, centx, centy, widx, widy, offset)) return amp, centy, centx, widy, widx, offset, success
[docs] def focusplot(self, focus, values, difftotfoc, label, lbl, sign, units=''): """ Find and plot the best fit focus value. The best focus value is at either the maximum or minimum of the `values`, as fit by a 2nd order polynomial. Plots showing the fit and best value are written to disk. Parameters ---------- focus : `list` of float Focus values (independent variable). values : list of float Fit values (dependent variable). difftotfoc : float Mean difference from the total focus offset. label : str Long label for the plot. lbl : str Short label for the plot. sign : {-1, 1} If -1, best fit is at a maximum. If 1, best fit is at a minimum. units : str, optional Units for the `values`. """ # Fit focus curve, find best focus # fit(x) = fit[0]*x**2. + fit[1]*x + fit[2] fit = np.polyfit(focus, values, 2) xax = np.asarray(np.linspace(min(focus), max(focus), 1000)) yax = fit[0] * xax**2. + fit[1] * xax + fit[2] bestfocx = -fit[1] / (2. * fit[0]) # Set up plot fig = Figure(figsize=(6, 5)) FigureCanvas(fig) ax = fig.add_subplot() ax.plot(xax, yax, 'r--', linewidth=3) ax.plot(focus, values, 'mo', markersize=10) ax.set_xlabel(r'Focus TOTAL offsets ($\mu m$)') ax.set_ylabel(label) # bestfocx is a local extremum, but could be a # maximum (ideally it would be a minimum). # Test to see if there are points in the fit below bestfocx # Create a polynomial using the coefficients from fit fittest = np.poly1d(fit) # Add DEBUG records to the log for the FWHM values log.debug(' %s at bestfoc: %.3f %s' % (lbl, fittest(bestfocx), units)) # Check if extremum is correct if fit[0] * sign < 0 or bestfocx < min(focus) or bestfocx > max(focus): # Find the (first) minimum fwhmx # This used to raise a warning in the log, for certain cases, # but we can ignore it valsign = [sign * v for v in values] try: # It's possible for # focus[valsign == min(valsign)] to return [] guessfocx = focus[valsign == min(valsign)][0] except (IndexError, TypeError): log.warning('Error in focus guess - ' 'just using first value') guessfocx = focus[0] if sign < 0: fig.suptitle('No local maximum found!', fontsize=12) log.warning('No local maximum found! Best focus ' 'may be out of analyzed range.') else: fig.suptitle('No local minimum found!', fontsize=12) log.warning('No local minimum found! Best focus ' 'may be out of analyzed range.') ax.set_title('Suggested best focus position ' r'(%s): %.1f $\mu m$' % (lbl, guessfocx)) else: # Do this if bestfocx is at the minimum ax.set_title(r'Best TOTAL Offset (%s): %.1f $\mu m$ ' r'(Absolute Position $\sim$ %.1f $\mu m$)' % (lbl, bestfocx, bestfocx + difftotfoc), fontsize=11) log.info('') log.info('Best focus position (%s): %.1f um ' % (lbl, bestfocx)) log.info(' (Look at images and graphs ' 'to make sure it is a valid minimum!)') log.info('') # Plot a black 'x' on the minimum (just for convenience) ax.plot(bestfocx, fittest(bestfocx), 'kx', markersize=10, mew=3) # Save image pngname = self.datain[-1].filename.replace( '.fits', '_autofocus_%s.png' % lbl.replace(' ', '_')) fig.savefig(pngname) self.auxout.append(pngname) log.info('Saved result %s' % pngname)
[docs] def run(self): """ Run the data reduction algorithm. Because this step is multi-in, multi-out (MIMO), self.datain must be a list of DataFits objects. The output is also a list of DataFits objects, stored in self.dataout. The process is: 1. Read in each file, extract an image stamp, and fit a Gaussian to it. 2. From the fit Gaussian parameters for all files, calculate the best focus value from: the minimum FWHM (x, y, and total), and the maximum Gaussian height. """ # Get parameters widowisgood = self.getarg('widowisgood') medianaverage = self.getarg('medianaverage') boxaverage = self.getarg('boxaverage') autocrop = self.getarg('autocrop') cropimage = self.getarg('cropimage') xyboxcent = list(self.getarg('xyboxcent')) boxsizecrop = self.getarg('boxsizecrop') primaryimg = self.getarg('primaryimg') # self.datain must be a list/tuple self.nfiles = len(self.datain) log.debug('Analysing %d files' % self.nfiles) # counter for the number of images nfigs = 1 # input is set to output, unmodified self.dataout = self.datain # list to hold output .png names self.auxout = [] amplitude = [] fwhmx = [] fwhmy = [] focus = [] focus_totoff = [] srcpeak = [] srcfwhm = [] image = None for i in range(self.nfiles): log.debug("ANALYZING FILE %d" % (i + 1)) # If the primary image HDU isn't specified, # use the first image in the file if primaryimg == '': try: image = self.datain[i].image.copy() except AttributeError: msg = 'No image data in file.' log.error(msg) raise ValueError(msg) log.debug("No HDU specified. Using first " "image: %s" % self.datain[i].imgnames[0]) else: # If the HDU is specified, use that image image = self.datain[i].imageget(primaryimg).copy() log.debug("Using specified image: %s" % primaryimg) # Determine what to use for the bad pixel mask if 'BAD PIXEL MASK' in self.datain[i].imgnames: # Read the bad pixel mask badpix = self.datain[i].imageget('BAD PIXEL MASK') log.debug("Bad Pixel Mask found") elif 'IMAGE MASK' in self.datain[i].imgnames: # Read the image mask imgmask = self.datain[i].imageget('IMAGE MASK') log.debug("Didn't find Bad Pixel Mask - " "Using Image Mask") # get bad pixel mask, as zero, set to 3 (bad) # wherever imgmask==Nan badpix = np.zeros_like(imgmask) badpix[np.isnan(imgmask)] = 3 else: # If there's no bad pixel or image mask, use a zero array badpix = np.zeros_like(image) log.debug("No Bad Pixel or Image Mask - " "Using zero array") # Autocrop if autocrop: header = self.datain[i].header tempwcs = astwcs.WCS(header) ratarget = self.datain[i].getheadval('CRVAL1') dectarget = self.datain[i].getheadval('CRVAL2') pix1, pix2 = tempwcs.wcs_world2pix(ratarget, dectarget, 1) cropimage = True xyboxcent[0] = pix1 xyboxcent[1] = pix2 boxsizecrop = np.mean(image.shape) / 3. # Option to crop part of the image around a central pixel if cropimage: x1 = int(xyboxcent[0] - boxsizecrop / 2.) x2 = int(xyboxcent[0] + boxsizecrop / 2.) y1 = int(xyboxcent[1] - boxsizecrop / 2.) y2 = int(xyboxcent[1] + boxsizecrop / 2.) if x1 < 0 or x1 > image.shape[1]: x1 = 0 log.warning('Crop box x1 invalid, was set to 0') if y1 < 0 or y1 > image.shape[0]: y1 = 0 log.warning('Crop box y1 invalid, was set to 0') if x2 <= x1 or x2 > image.shape[1]: x2 = image.shape[1] log.warning('Crop box x2 invalid, was set to width') if y2 <= y1 or y2 > image.shape[0]: y2 = image.shape[0] log.warning('Crop box y2 invalid, was set to height') image = image[y1:y2, x1:x2] badpix = badpix[y1:y2, x1:x2] # Select between use widow pixels or use only good pixels if widowisgood: nanpix = np.where(badpix > 2) else: nanpix = np.where(badpix != 0) # Choose between median average the image # (to get rid of bad pixels) # or keep the bad pixels as NaNs in the 2D gaussian fit if medianaverage: # Assign bad pixels to 0 to allow the box median averaging image[nanpix] = 0. # Added to make sure there's no NANs image[np.where(image != image)] = 0. image = ndimage.uniform_filter(image, size=boxaverage) else: image[nanpix] = np.nan # Fit 2d Gaussian (offset is not used in later code) amp, centy, centx, widy, widx, _offset, success = \ self.fitgaussian(image, nanpix, medianaverage) # Append values only if gaussian fit was successful if success in [1, 2, 3, 4]: fwhmx_img = 2.355 * np.abs(widx) fwhmy_img = 2.355 * np.abs(widy) amplitude.append(amp) fwhmx.append(fwhmx_img) fwhmy.append(fwhmy_img) # Compute average focus focval_st = self.datain[i].getheadval('FOCUS_ST') focval_en = self.datain[i].getheadval('FOCUS_EN') focval = np.mean([focval_st, focval_en]) focus.append(focval) if 'FCSTOFF' in self.datain[i].header: fcstoff = self.datain[i].getheadval('FCSTOFF') focus_totoff.append(fcstoff) else: # if no FoCus Total OFFset in the header, copy focval fcstoff = focval focus_totoff.append(focval) log.debug('FCSTOFF not found; using focus value') # Plotting figure and ellipse around object fig = Figure() FigureCanvas(fig) ax = fig.add_subplot() nfigs += 1 ymax, xmax = image.shape ax.imshow(image, cmap='gray', extent=[0, xmax - 1, 0, ymax - 1], interpolation='none') ax.plot(centx, ymax - 1 - centy, 'm+', markersize=15, mew=2) ellipse = Ellipse(xy=(centx, ymax - 1 - centy), width=2.355 * np.abs(widx), height=2.355 * np.abs(widy), edgecolor='b', fc='None', lw=2) ax.add_patch(ellipse) ax.annotate('Img %s; Focus: %.1f microns, ' 'Tot. Off: %.1f microns' % (nfigs - 1, float(focval), fcstoff), xy=(1, 0.95 * ymax), color='.5') ax.annotate('Gaussian FWHM X / Y: %.1f / %.1f pixels' % (fwhmx_img, fwhmy_img), xy=(1, 0.90 * ymax), color='.5') ax.annotate('Gaussian center X / Y: %.1f / %.1f pixels' % (centx, centy), xy=(1, 1), color='.5') ax.annotate('Gaussian height: %1f' % amp, xy=(1, 0.85 * ymax), color='.5') # Get scanmap estimates (from header of first table) dat = self.datain[i] try: srcfwhm.append(dat.getheadval('SRCFWHM', dat.tabnames[0], errmsg=False)) except (KeyError, IndexError): srcfwhm.append(0) try: srcpeak.append(dat.getheadval('SRCPEAK', dat.tabnames[0], errmsg=False)) except (KeyError, IndexError): srcpeak.append(0) else: # unsuccessful gaussian fit # Plotting figure and state that the fit was unsuccessful fig = Figure() FigureCanvas(fig) ax = fig.add_subplot() nfigs += 1 ymax, xmax = image.shape ax.imshow(image, cmap='gray', extent=[0, xmax - 1, 0, ymax - 1], interpolation='none') ax.annotate('Gaussian fit was unsuccessful', xy=(1, ymax - 3), color='k') log.debug('Gaussian fit was unsuccessful ' 'for image {}'.format(i + 1)) # Save the Plot pngname = self.datain[-1].filename.replace( '.fits', '_autofocus_image%s.png' % (nfigs - 1)) fig.savefig(pngname) self.auxout.append(pngname) log.info('Saved result %s' % pngname) # Calculate difference between focus and focus_totoff difftotfoc = float(np.mean(np.asarray(focus)[:] - np.asarray(focus_totoff)[:])) # At least 3 successful gaussian fits are required # to attempt a parabolic fit of FWHM x Focus # Also attempt a fit for Gaussian height if len(focus_totoff) >= 3: # FWHM x Focus for X self.focusplot(focus_totoff, fwhmx, difftotfoc, 'FWHM Along X Axis (pix)', 'FWHM X', 1, units='pix') # FWHM x Focus for Y self.focusplot(focus_totoff, fwhmy, difftotfoc, 'FWHM Along Y Axis (pix)', 'FWHM Y', 1, units='pix') # FWHM x Focus for XY self.focusplot(focus_totoff + focus_totoff, fwhmx + fwhmy, difftotfoc, 'FWHM Along X and Y Axis (pix)', 'FWHM XY', 1, units='pix') # Focus for Gaussian Height try: imgunit = self.datain[0].header['BUNIT'] except KeyError: imgunit = 'Img Units' self.focusplot(focus_totoff, amplitude, difftotfoc, 'Gaussian Height ({})'.format(imgunit), 'Amplitude', -1, units=imgunit) # Select only data with valid inputs # (i.e. srcpeak not zero) srcn = len(srcpeak) srcfoc = [focus_totoff[i] for i in range(srcn) if srcpeak[i] != 0] srcp = [srcpeak[i] for i in range(srcn) if srcpeak[i] != 0] srcselfwhm = [srcfwhm[i] for i in range(srcn) if srcpeak[i] != 0] if len(srcfoc) > 0: # Focus from Scanmap Peak/Int self.focusplot(srcfoc, srcp, difftotfoc, 'Peak from scan map fit ({})'.format(imgunit), 'Peak', -1, units=imgunit) # Focus from Scanmap FWHM self.focusplot(srcfoc, srcselfwhm, difftotfoc, 'FWHM from scan map fit (arcsec)', 'FWHM-C', 1, units='arcsec') else: log.debug('Scan map fit keys not found.') elif image is not None: # Plotting last figure again and state that # there are not enough points for parabolic fit fig = Figure() FigureCanvas(fig) ax = fig.add_subplot() ymax, xmax = image.shape ax.imshow(image, cmap='gray', extent=[0, xmax - 1, 0, ymax - 1], interpolation='none') ax.annotate('There are not enough successful', xy=(1, ymax - 5), color='w') ax.annotate('Gaussian fits to fit a parabola (minimum 3)', xy=(1, ymax - 9), color='w') log.info('There are not enough successful Gaussian ' 'fits to fit a parabola (minimum 3)') pngname = self.datain[-1].filename.replace( '.fits', '_autofocus_xyaxis.png') fig.savefig(pngname) self.auxout.append(pngname) log.info('Saved result %s' % pngname)