Source code for sofia_redux.instruments.hawc.steps.stepbgsubtract

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Background subtraction pipeline step."""

from astropy import log
from astropy.stats import sigma_clip
import numpy as np
import psutil

from sofia_redux.instruments.hawc.steps.basemap import BaseMap
from sofia_redux.instruments.hawc.stepmoparent import StepMOParent
from sofia_redux.toolkit.resampling.resample import Resample
from sofia_redux.toolkit.utilities.func import stack

__all__ = ['StepBgSubtract']


[docs] class StepBgSubtract(StepMOParent, BaseMap): """ Subtract residual background across multiple input files. This step iteratively solves for additive offsets between the input files, then subtracts the offset from each flux image. The input data expected is a DataFits with STOKES and ERROR frames for I, Q and U each, as well as COVAR Q I, COVAR U I, and COVAR Q U images. For total intensity data, only STOKES I and ERROR I are expected. Input is typically produced by the `sofia_redux.instruments.hawc.steps.StepCalibrate` pipeline step. The output image from this step contains the same image frames as the input image. The STOKES frames have been background corrected. """ def __init__(self): # placeholder for calculated offsets self.offsets = [] super().__init__()
[docs] def setup(self): """ Set parameters and metadata for the pipeline step. Output files have PRODTYPE = 'bgsubtract', and are named with the step abbreviation 'BGS'. Parameters defined for this step are: cdelt : list of float Pixel size in arcseconds of output map. One value for each HAWC filter band. proj : str Projection of output map. sizelimit : int Upper limit on output map size (either axis, in pixels). fwhm : list of float FWHM of gaussian smoothing kernel in arcseconds (per band). radius : list of float Integration radius for local fits, in arcseconds (per band). errflag : bool If set, use uncertainties when computing averages. widowstokesi : bool Use widow pixels (flagged 1 or 2) to compute Stokes I map. edge_threshold : float Threshold to set edge pixels to NaN. Range is 0-1; 0 means keep all edge pixels. Higher values keep fewer pixels. fit_order : int Polynomial fit order for local regression. bgoffset : int Maximum number of iterations of background subtraction. chauvenet : bool If set, use Chauvenet's criterion (sigma clipping for outlier rejection) in background averages. fitflag : bool If set, use errors in intensity for weighting background averages. qubgsubtract : bool If set, apply background correction to Stokes Q and U images as well as Stokes I. """ # Name of the pipeline reduction step self.name = 'bgsubtract' self.description = 'Subtract Background' # Shortcut for pipeline reduction step and identifier for # saved file names. self.procname = 'bgs' # Clear Parameter list self.paramlist = [] # resampling parameters self.paramlist.append(['cdelt', [2.57, 4.02, 4.02, 6.93, 9.43], 'Pixel size in arcseconds of output map']) self.paramlist.append(['proj', 'TAN', 'Projection of output map']) self.paramlist.append(['sizelimit', 3000, "Upper limit on output map size " "(either axis, in pixels)."]) self.paramlist.append(['fwhm', [2.57, 4.02, 4.02, 6.93, 9.43], 'FWHM of gaussian smoothing kernel, ' 'in arcseconds (per band)']) self.paramlist.append(['radius', [7.71, 12.06, 12.06, 20.79, 28.29], 'Integration radius for smoothing, in ' 'arcseconds (per band)']) self.paramlist.append(['errflag', True, "Use uncertainties when computing averages"]) self.paramlist.append(['widowstokesi', True, "Use widow pixels (flagged 1 or 2) to " "compute Stokes I map"]) self.paramlist.append(['edge_threshold', 0.5, "Set edge pixels to NaN. Range 0-1; 0 means " "keep all edge pixels. Higher values " "keep fewer pixels."]) self.paramlist.append(['fit_order', 0, "Polynomial fit order for local regression."]) # background parameters self.paramlist.append(['bgoffset', 10, 'Number of iterations of background ' 'subtraction']) self.paramlist.append(['chauvenet', True, "Use Chauvenet's criterion in " "background subtraction"]) self.paramlist.append(['fitflag', False, "Use errors in intensity for offset " "calculation"]) self.paramlist.append(['qubgsubtract', True, 'Apply linear background correction ' 'to Stokes Q and U images'])
[docs] def read_fwhm_radius_cdelt(self): """ Read a fwhm, radius, and cdelt value from the parameters. The parameters are expected to be defined as a list, with one entry for each HAWC band. The correct value for the input data is selected from the list. Returns ------- fwhm : float FWHM value for the input data. radius : float, float, float Radius value for the input data. cdelt : float Pixel scale value for the input data. """ fwhm = self.getarg('fwhm') radius = self.getarg('radius') cdelt = self.getarg('cdelt') waveband = self.datain[0].getheadval('spectel1') bands = ['A', 'B', 'C', 'D', 'E'] try: idx = bands.index(waveband[-1]) except (ValueError, IndexError): # waveband not in list msg = 'Cannot parse waveband: %s' % waveband log.error(msg) raise ValueError(msg) try: fwhm = fwhm[idx] radius = radius[idx] cdelt = cdelt[idx] except IndexError: msg = 'Missing radius/fwhm values for all wavebands' log.error(msg) raise IndexError(msg) return fwhm, radius, cdelt
[docs] def resample_images(self, radius, fit_order, smoothing, edge, errflag, max_cores, separate=False): """ Resample input images into a common grid. Resampling is performed via a distance-weighted, low-order polynomial surface fit to the input data within a window around each output grid point. Parameters ---------- radius : float Fit window to consider for local fits. fit_order : int Polynomial surface order to fit. smoothing : float Smoothing radius for distance weighting, expressed as a fraction of the fit window. edge : float Threshold for setting edge pixels to NaN. Higher values block more pixels, a zero values allows all edge pixels through. errflag : bool If True, errors on the flux values will be used to weight the fits to the data. If False, only distance weights will be used. max_cores : int, or None If a number larger than 1, the data processing will proceed in parallel on max_cores CPUs. Multiprocessing requires that joblib is installed. separate : bool, optional If True, separate maps will be made for each input file (all on the same coordinate grid), and returned in a list of dictionaries. If False, a single map will be made, and stored in self.pmap Returns ------- list of dict The map(s) generated from the input. Keys for the dictionary are the Stokes values and associated errors: I, dI for all; and Q, dQ, U, dU if more than one HWP is present. """ if self.nhwp == 1: stokes_vals = ['I'] else: stokes_vals = ['I', 'Q', 'U'] if separate: nmap = self.nfiles else: nmap = 1 # loop over maps and stokes for fluxes and errors maps = [] for i in range(nmap): flxvals = [] errvals = [] for stokes in stokes_vals: if separate: flx = self.pdata[i][stokes].ravel() err = self.pdata[i]['d{}'.format(stokes)].ravel() else: flx = np.hstack([d[stokes].ravel() for d in self.pdata]) err = np.hstack([d['d{}'.format(stokes)].ravel() for d in self.pdata]) flxvals.append(flx) errvals.append(err) if separate: ra = self.pdata[i]['ra'].ravel() dec = self.pdata[i]['dec'].ravel() base_ra = self.pmap['base_ra'] base_dec = self.pmap['base_dec'] # offsets in arcsec, with RA reversed xs = -1 * (ra - base_ra) * \ np.cos(np.radians(base_dec)) * 3600. ys = (dec - base_dec) * 3600. coordvals = stack(xs, ys) pmap = {} else: coordvals = self.pmap['coordinates'] pmap = self.pmap resampler = Resample( coordvals, flxvals, error=errvals, window=radius, order=fit_order, robust=None, negthresh=None) flux, std = resampler( *self.pmap['grid'], smoothing=smoothing, fit_threshold=None, edge_threshold=edge, edge_algorithm='distribution', get_error=True, error_weighting=errflag, jobs=max_cores) for j, stokes in enumerate(stokes_vals): pmap[stokes] = flux[j] pmap['d{}'.format(stokes)] = std[j] maps.append(pmap) return maps
[docs] def run(self): """ Run the data reduction algorithm. Because this step is multi-in, multi-out (MIMO), self.datain must be a list of DataFits objects. The output is also a list of DataFits objects, stored in self.dataout. The process is: 1. Read in all good pixels from the input data. 2. Make a map out of all input data for reference. 3. Make a map out of each input data file to compare to the reference. 4. Compute and subtract the average offset for all corresponding pixels, from the individual map to the reference map. 5. Repeat steps 2-4 until convergence or the maximum number of iterations is reached. 6. Store offset-subtracted images in self.dataout. """ # make sure it is possible to combine input files # and set self.nhwp self.checkvalid() # self.datain must be a list/tuple self.nfiles = len(self.datain) self.dataout = [d.copy() for d in self.datain] # if only one input file, just return if self.nfiles < 2: log.debug("One file only. No background subtracted.") return fwhm, radius, cdelt = self.read_fwhm_radius_cdelt() errflag = self.getarg('errflag') widowstokesi = self.getarg('widowstokesi') edge = self.getarg('edge_threshold') fit_order = self.getarg('fit_order') bgoffset = self.getarg('bgoffset') chflag = int(self.getarg('chauvenet')) fitflag = int(self.getarg('fitflag')) qubgsubtract = self.getarg('qubgsubtract') if qubgsubtract and self.nhwp > 1: stokes = ['I', 'Q', 'U'] else: stokes = ['I'] # set up for parallel processing via joblib max_cores = psutil.cpu_count() // 2 # serial if only one core available max_cores = 1 if max_cores < 2 else max_cores sigma = fwhm / (2 * np.sqrt(2 * np.log(2))) # If widowstokesi = True, then widow pixels (flagged # as 1 or 2) will also be accounted in the smoothed # Stokes I image. Stokes Q and U will continue # using only strictly good pixels (flagged as 0) if widowstokesi: maxgoodpix = 2 else: maxgoodpix = 0 # read data, allocate space for maps self.read_resample_data(maxgoodpix) self.make_resample_map(cdelt) # iteratively remove offsets from maps self.offsets = {'I': np.zeros(self.nfiles), 'Q': np.zeros(self.nfiles), 'U': np.zeros(self.nfiles)} epsilon = 1e-3 last_offsets = None while bgoffset > 0: log.debug("iteration {} of bgoffset".format(bgoffset)) # smoothed map of all files together # output data is stored in self.pmap self.resample_images(radius, fit_order, sigma, edge, errflag, max_cores) # create new individual smoothed maps on the same grid new_maps = self.resample_images(radius, fit_order, sigma, edge, errflag, max_cores, separate=True) new_offsets = {'I': np.zeros(self.nfiles), 'Q': np.zeros(self.nfiles), 'U': np.zeros(self.nfiles)} for i, map_img in enumerate(new_maps): for s in stokes: # offset from combined map diff = map_img[s] - self.pmap[s] var = self.pmap['d{}'.format(s)] ** 2 # flatten and pull out nans idx = (~np.isnan(diff)) & (~np.isnan(var)) diff = diff[idx] var = var[idx] # weighted, sigma-clipped averaged offset if chflag: diff = sigma_clip(diff, sigma=3, masked=True, copy=False, cenfunc='mean') var = var[~diff.mask] diff = diff.compressed() if fitflag: offset = np.average(diff, weights=1 / var) else: offset = np.mean(diff) # subtract offset from data self.pdata[i][s] -= offset new_offsets[s][i] = offset # stop iterating if offsets are not changing all_offsets = np.hstack([np.abs(new_offsets[s]) for s in new_offsets]) if np.all(all_offsets < epsilon): # pragma: no cover bgoffset = 0 if last_offsets is not None: diff = np.abs(all_offsets - last_offsets) if np.all(diff < epsilon): # pragma: no cover bgoffset = 0 for s in self.offsets: self.offsets[s] += new_offsets[s] last_offsets = all_offsets bgoffset -= 1 # Now assign background subtracted images back to each pipedata object # Errors and covariances are untouched for i, f in enumerate(self.dataout): f.imageset(f.imageget('STOKES I') - self.offsets['I'][i], 'STOKES I') log.debug('Image {} Stokes I ' 'offset: {:.2f}'.format(i + 1, self.offsets['I'][i])) if self.nhwp > 1: # polarization f.imageset(f.imageget('STOKES Q') - self.offsets['Q'][i], 'STOKES Q') f.imageset(f.imageget('STOKES U') - self.offsets['U'][i], 'STOKES U') log.debug('Image {} Stokes Q ' 'offset: {:.2f}'.format(i + 1, self.offsets['Q'][i])) log.debug('Image {} Stokes U ' 'offset: {:.2f}'.format(i + 1, self.offsets['U'][i]))