Source code for sofia_redux.instruments.hawc.steps.stepscanstokes

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Scanning mode Stokes parameters pipeline step."""

import re

import numpy as np
from astropy import log
from astropy.stats import sigma_clipped_stats
from astropy.wcs import WCS
from scipy.ndimage import generic_filter, median_filter

from sofia_redux.instruments.hawc.datafits import DataFits
from sofia_redux.instruments.hawc.stepparent import StepParent
from sofia_redux.toolkit.image import adjust

__all__ = ['StepScanStokes']


[docs] class StepScanStokes(StepParent): """ Compute Stokes parameters for scanning polarimetry data. This step derives Stokes I, Q, and U images with associated uncertainties and covariances from R and T array images. Since the input data was produced by the scan map algorithm, it has already been resampled into sky coordinates. The R and T images for each HWP angle must be shifted into a common reference frame before addition and subtraction. Shift values are determined from the WCS values recorded in the extension headers, as output by `sofia_redux.instruments.hawc.steps.StepScanMapPol`. Shift interpolations are performed via `sofia_redux.toolkit.image.adjust.shift`. Optionally, a zero-level correction may be applied to the R and T images, using a mean- or median-filter to identify the lowest negative region in the image. Thereafter, R and T arrays are directly added and subtracted to produce Stokes parameter fluxes, as in the standard Stokes algorithm for chop-nod polarimetry data (see `sofia_redux.instruments.hawc.steps.StepStokes`). Input for this step must be a single DataFits that contains 3 image planes (HDUs) for each subarray (R0 and T0), at each of 4 HWP angles. The three images are: DATA, ERROR, and EXPOSURE. Output from this step is a DataFits with the following image extensions: STOKES I, ERROR I, STOKES Q, ERROR Q, STOKES U, ERROR U, COVAR Q I, COVAR U I, COVAR Q U. """
[docs] def setup(self): """ Set parameters and metadata for the pipeline step. Output files have PRODTYPE = 'scanstokes', and are named with the step abbreviation 'STK'. Parameters defined for this step are: hwp_tol : float Tolerance for difference from expected values for HWP angles. HWP angles for Stokes parameters must differ by no more than 45 +/- hwp_tol degrees. zero_level_method : {'mean', 'median', 'none'} Statistic for zero-level calculation. If 'none', the zero-level will not be corrected. For the other options, either a mean or median statistic will be used to determine the zero-level value from the region set by the region parameters. zero_level_region : str If set to 'header', the zero-level region will be determined from the ZERO_RA, ZERO_DEC, ZERO_RAD keywords (for RA center, Dec center, and radius, respectively). If set to 'auto', a mean- or median-filter will be applied to the R and T images, with the radius specified by the zero_level_radius parameter. The lowest negative local average that is negative in both R and T for all HWP angles is assumed to be the zero level. R and T values are applied separately, from the value of the average at the same pixel. Otherwise, a region may be directly provided as a list of [RA center, Dec center, radius], in degrees. zero_level_radius : list of float Filter radius for zero-level calculation, in arcseconds (per band). Used only for zero_level_region = 'auto'. zero_level_sigma : float Sigma value for statistics clipping. Ignored for zero_level_region = 'auto'. """ # Name of the pipeline reduction step self.name = 'scanstokes' self.description = 'Compute Stokes' # Shortcut for pipeline reduction step and identifier for # saved file names. self.procname = 'stk' # Clear Parameter list self.paramlist = [] # Append parameters self.paramlist.append(['hwp_tol', 5.0, 'HWP angles for Stokes parameters must ' 'differ by no more than 45+-hwp_tol ' 'degrees']) self.paramlist.append(['zero_level_method', 'none', 'Statistic for zero-level calculation ' '(mean, median, none)']) self.paramlist.append(['zero_level_region', 'header', 'Zero level region method (header, auto, ' 'or [RA, Dec, radius] in ' 'degrees).']) self.paramlist.append(['zero_level_radius', [4.84, 7.80, 7.80, 13.6, 18.2], 'Filter radius for zero-level calculation ' 'in auto mode.']) self.paramlist.append(['zero_level_sigma', 5.0, 'Sigma value for statistics clipping in ' 'non-auto mode.'])
[docs] def read_radius(self): """ Read a radius value from the parameters. The parameters are expected to be defined as a list, with one entry for each HAWC band. The correct value for the input data is selected from the list. Returns ------- radius : float Radius value for the input data. """ radius = self.getarg('zero_level_radius') waveband = self.datain.getheadval('spectel1') bands = ['A', 'B', 'C', 'D', 'E'] try: idx = bands.index(waveband[-1]) except (ValueError, IndexError): # waveband not in list msg = 'Cannot parse waveband: %s' % waveband log.error(msg) raise ValueError(msg) try: radius = radius[idx] except IndexError: msg = 'Missing radius values for all wavebands' log.error(msg) raise IndexError(msg) return radius
[docs] def stokes(self, idx1, idx2, rmt_data, rmt_sigma, r_var, t_var): """ Compute stokes Q and U. The index parameters control which Stokes parameter image is computed. Parameters ---------- idx1 : `list` of int Index for angle 1. idx2 : `list` of int Index for angle 2. rmt_data : array-like R - T flux data array. Should have three dimensions, where the first dimension indexes the HWP angle. rmt_sigma : array-like. R - T error data array. Dimensions should match rmt_data. r_var : array-like Variance for the R array. Dimensions should match rmt_data. t_var : array-like Variance for the T array. Dimensions should match rmt_data. Returns ------- stokes : array-like The Stokes Q or U flux image. dstokes : array-like The error on the Stokes Q or U flux. stokes_icov : array-like The covariance on the Stokes Q or U image, with respect to the Stokes I image. """ # propagation equations: # (for the most common 4 HWP case) # Q = (1/2) (R1 - R3 - T1 + T3) # U = (1/2) (R2 - R4 - T2 + T4) # VQ = (1/4) (VR1 + VR3 + VT1 + VT3) # VU = (1/4) (VR2 + VR4 + VT2 + VT4) # cov(Q, I) = (1/8) (VR1 - VR3 - VT1 + VT3) # cov(U, I) = (1/8) (VR2 - VR4 - VT2 + VT4) # cov(Q, U) = 0 count = float(2 * len(idx1)) stokes = (rmt_data[idx1].sum(axis=0) - rmt_data[idx2].sum(axis=0)) / count dstokes = np.sqrt(np.sum(rmt_sigma[idx1 + idx2] ** 2, axis=0)) / count stokes_icov = np.sum(r_var[idx1] - r_var[idx2] - t_var[idx1] + t_var[idx2], axis=0) / (2 * count**2) return stokes, dstokes, stokes_icov
[docs] def wcs_shift(self, header, refheader): """ Calculate the WCS shift between two headers. Parameters ---------- header : fits.Header The header for the data to shift. refheader : fits.Header The reference header to shift to. Returns ------- shift : array-like The (x, y) shift to apply to the data associated with header (dx = shift[0], dy = shift[1]). """ wcs_in = WCS(header) wcs_out = WCS(refheader) xy = [refheader['CRPIX1'], refheader['CRPIX2']] xyin = np.array([xy]) wxy = wcs_in.wcs_pix2world(xyin, 1) xyin_on_ref = wcs_out.wcs_world2pix(wxy, 1) shift = (xyin_on_ref - xyin)[0] return shift
def _expand_array(self, data, shape): """ Expand an image array to a new shape. Data dimensions must be smaller than the new shape. Only two dimensions are expected. data : array-like The data to expand. shape : tuple of int The shape to expand to. """ result = np.full(shape, np.nan) s0 = data.shape result[0: s0[0], 0: s0[1]] = data.copy() return result
[docs] def correct_zero_level_auto(self, r_data, t_data, method, radius): """ Correct image zero level from automatically determined regions. Data arrays are updated in place. Parameters ---------- r_data : array-like R data to correct. t_data : array_like T data corresponding to r_data. method : {'mean', 'median'} Filter method. radius : int Radius in pixels for filter kernel. """ # circular aperture kernel kernel = np.zeros((2 * radius + 1, 2 * radius + 1)) y, x = np.ogrid[-radius:radius + 1, -radius:radius + 1] kernel[x ** 2 + y ** 2 <= radius ** 2] = 1 r_filters = [] t_filters = [] for r, t in zip(r_data, t_data): # mean value and error within aperture at each point, # ignoring any regions containing NaNs if 'mean' in method: r_filter = generic_filter( r, np.mean, footprint=kernel, mode='constant', cval=np.nan) t_filter = generic_filter( t, np.mean, footprint=kernel, mode='constant', cval=np.nan) else: # median filter is much faster than generic filter # with median function r_filter = median_filter( r, footprint=kernel, mode='constant', cval=np.nan) t_filter = median_filter( t, footprint=kernel, mode='constant', cval=np.nan) # account for NaNs that the median filter ignores def func(data): return np.any(np.isnan(data)) r_nan = generic_filter( r, func, footprint=kernel, mode='constant', cval=1) t_nan = generic_filter( t, func, footprint=kernel, mode='constant', cval=1) r_filter[r_nan > 0] = np.nan t_filter[t_nan > 0] = np.nan r_filters.append(r_filter) t_filters.append(t_filter) r_filters = np.array(r_filters) t_filters = np.array(t_filters) # check for negative regions in either R or T at all # HWP positions neg = np.all(r_filters < 0, axis=0) | np.all(t_filters < 0, axis=0) if np.any(neg): # block regions that aren't generally non-negative block = np.tile(~neg, (r_filters.shape[0], 1, 1)) r_filters[block] = np.nan t_filters[block] = np.nan # find collectively lowest region in R and T across all HWP zero_pix = np.nanargmin(np.sum(r_filters, axis=0) + np.sum(t_filters, axis=0)) zero_pix = np.unravel_index(zero_pix, r_data[0].shape) log.info(f'Correcting zero level ' f'at pix x,y: {zero_pix[1]},{zero_pix[0]}') # subtract zero level from individual filter images # at the determined pixel for r, t, rf, tf in zip(r_data, t_data, r_filters, t_filters): rzero = rf[zero_pix] log.info(f' R level: {rzero}') tzero = tf[zero_pix] log.info(f' T level: {tzero}') r -= rzero t -= tzero else: log.info('No negative zero level found; not ' 'subtracting background.')
[docs] def correct_zero_level_region(self, r_data, t_data, method, reglist, refheader, robust=5.0): """ Correct image zero level from specified circular regions. Data arrays are updated in place. Parameters ---------- r_data : array-like R data to correct. t_data : array_like T data corresponding to r_data. method : {'mean', 'median'} Statistics method. reglist : list of list of float List of regions as [RA, Dec, radius] in degrees, matching length of r_data and t_data lists. refheader : astropy.Header Reference header to use for WCS. robust : float Sigma value to use for clipping statistics. Set to 0 to turn off clipping. Raises ------ ValueError If any specified region is not on the array. """ # reference WCS for identifying background pixels # This should be run after shifting images, so that # all are in the same reference frame. ref_wcs = WCS(refheader) # coordinates for region check ny, nx = r_data[0].shape y, x = np.ogrid[0:ny, 0:nx] # iterate first to collect levels; correct later only # if all regions are appropriate r_levels = [] t_levels = [] for r, t, reg in zip(r_data, t_data, reglist): # region center and radius cx, cy = ref_wcs.wcs_world2pix(reg[0], reg[1], 0) cr = reg[2] / np.abs(ref_wcs.wcs.cdelt[0]) # check that region center is in the array if (not np.all(np.isfinite([cx, cy])) or (cx < 0 or cx >= nx or cy < 0 or cy >= ny)): msg = f"Region {reg}, center {cx},{cy} is not " \ f"on array with size x,y={nx},{ny}" log.error(msg) raise ValueError(msg) test = ((x - cx) ** 2 + (y - cy) ** 2 <= cr ** 2) if robust > 0: # sigma clip data before taking stats rmed, rmean, rsig = sigma_clipped_stats(r[test], sigma=robust) tmed, tmean, tsig = sigma_clipped_stats(t[test], sigma=robust) if 'mean' in method: rzero = rmean tzero = tmean else: rzero = rmed tzero = tmed else: # just take stats if 'mean' in method: rzero = np.nanmean(r[test]) tzero = np.nanmean(t[test]) else: rzero = np.nanmedian(r[test]) tzero = np.nanmedian(t[test]) log.info(f'Correcting zero level at pix x,y: {cx:.2f},{cy:.2f}') log.info(f' R level: {rzero}') log.info(f' T level: {tzero}') r_levels.append(rzero) t_levels.append(tzero) # now subtract levels from images for r, t, rl, tl in zip(r_data, t_data, r_levels, t_levels): r -= rl t -= tl
[docs] def run(self): """ Run the data reduction algorithm. Because this step is single-in, single-out (SISO), self.datain must be a DataFits object. The output is also a DataFits object, stored in self.dataout. The process is: 1. Check and gather all input data. 2. Shift all data to a common reference frame. 3. Compute Stokes I from R+T at all angles. 4. Compute Stokes Q and U from R-T at angles separated by 45 degrees. 5. Propagate errors and covariances. """ self.dataout = DataFits(config=self.datain.config) self.dataout.filename = self.datain.filename self.dataout.setheader(self.datain.header) # input: R and T data at each HWP are rotated to north up, and # sampled onto the same scale, but are shifted relative to # each other, as recorded in the WCS in each extension header. # HWP angle value and subarray are recorded in extension headers. # Number of HWP angles must be 4. nhwp = self.datain.getheadval('nhwp') if nhwp != 4: msg = 'Unexpected number of HWP angles. Must be NWHP=4.' log.error(msg) raise ValueError(msg) # get expected tolerance and difference hwp_tol = abs(self.getarg('hwp_tol')) hwp_diff = 45.0 # get zero-level parameters zl_method = str(self.getarg('zero_level_method')).lower() zl_sigma = self.getarg('zero_level_sigma') zl_radius = self.read_radius() zl_region = self.getarg('zero_level_region') if str(zl_region) not in ['header', 'auto']: try: if type(zl_region) is str: region = [re.sub(r'[\'"\[\]]', '', v).strip() for v in str(zl_region).split(',')] else: region = zl_region assert len(region) == 3 region = [float(r) for r in region] except (TypeError, ValueError, AssertionError): msg = f'Badly formatted zero_level_region: {zl_region}. ' \ f'Should be [RA, Dec, radius] in degrees.' log.error(msg) raise ValueError(msg) from None zl_region = region hwplist = [] data_shape = [0, 0] r_data = [] t_data = [] r_sigma = [] t_sigma = [] r_exp = [] t_exp = [] r_head = [] t_head = [] df = self.datain for i in range(nhwp): rd_ext = 'DATA R HWP%d' % i td_ext = 'DATA T HWP%d' % i re_ext = 'ERROR R HWP%d' % i te_ext = 'ERROR T HWP%d' % i rx_ext = 'EXPOSURE R HWP%d' % i tx_ext = 'EXPOSURE T HWP%d' % i hwplist.append(df.getheadval('HWPINIT', dataname=rd_ext)) rimg = df.imageget(rd_ext) data_shape[0] = max(data_shape[0], rimg.shape[0]) data_shape[1] = max(data_shape[1], rimg.shape[1]) timg = df.imageget(td_ext) data_shape[0] = max(data_shape[0], timg.shape[0]) data_shape[1] = max(data_shape[1], timg.shape[1]) r_data.append(rimg) t_data.append(timg) r_sigma.append(df.imageget(re_ext)) t_sigma.append(df.imageget(te_ext)) r_exp.append(df.imageget(rx_ext)) t_exp.append(df.imageget(tx_ext)) r_head.append(df.getheader(rd_ext)) t_head.append(df.getheader(td_ext)) # set HWPINIT for the file self.dataout.setheadval('HWPINIT', hwplist[0], 'Actual value of the initial HWP angle') # fix missing or bad HWPSTART (old scanpol files) try: hwpstart = self.dataout.getheadval('HWPSTART', errmsg=False) except KeyError: hwpstart = -9999 if hwpstart == -9999: self.dataout.setheadval('HWPSTART', hwplist[0], 'HWP initial angle [deg]') # check HWP angles sort_idx = np.argsort(hwplist) hwplist = np.array(hwplist)[sort_idx] diff1 = abs(hwplist[0] - hwplist[2]) diff2 = abs(hwplist[1] - hwplist[3]) if abs(diff1 - hwp_diff) > hwp_tol: log.warning('Stokes Q: HWP angles differ ' 'by %.1f degrees ' '(should be %.1f)' % (diff1, hwp_diff)) if abs(diff2 - hwp_diff) > hwp_tol: log.warning('Stokes U: HWP angles differ ' 'by %.1f degrees ' '(should be %.1f)' % (diff2, hwp_diff)) # shift data to reference refheader = df.header zl_reg_list = [] for i, header in enumerate(r_head): # first expand all images to max size d = self._expand_array(r_data[i], data_shape) s = self._expand_array(r_sigma[i], data_shape) e = self._expand_array(r_exp[i], data_shape) shift_val = self.wcs_shift(header, refheader) if np.allclose(shift_val, 0.0, atol=0.01): r_data[i] = d r_sigma[i] = s r_exp[i] = e else: log.info('Shifting R image {} by x,y=' '{:.1f},{:.1f}'.format(i + 1, shift_val[0], shift_val[1])) # reverse x and y for shift function shift_val = [shift_val[1], shift_val[0]] r_data[i] = adjust.shift(d, shift_val) r_sigma[i] = adjust.shift(s, shift_val) r_exp[i] = adjust.shift(e, shift_val) # check for zero level keywords in the header: # will be used for both R and T if zl_method in ['mean', 'median']: if str(zl_region) == 'header': try: zra = header['ZERO_RA'] zdec = header['ZERO_DEC'] zrad = header['ZERO_RAD'] zl_reg_list.append([zra, zdec, zrad]) except KeyError: # use the first specified region if possible if len(zl_reg_list) >= 1: log.debug('Using primary header region ' 'for zero level.') zl_reg_list.append(zl_reg_list[0]) else: log.warning('Missing zero-level region keys ' '(ZERO_RA, ZERO_DEC, ZERO_RAD).') log.warning('Falling back to auto region method.') zl_region = 'auto' zl_reg_list = [] else: # catch missing values if np.any(np.isclose(zl_reg_list, -9999)): log.warning('Missing zero-level region keys ' '(ZERO_RA, ZERO_DEC, ZERO_RAD).') log.warning('Falling back to auto region method.') zl_region = 'auto' zl_reg_list = [] elif type(zl_region) is list: zl_reg_list.append(zl_region) for i, header in enumerate(t_head): # first expand all images to max size d = self._expand_array(t_data[i], data_shape) s = self._expand_array(t_sigma[i], data_shape) e = self._expand_array(t_exp[i], data_shape) shift_val = self.wcs_shift(header, refheader) if np.allclose(shift_val, 0.0, atol=0.01): t_data[i] = d t_sigma[i] = s t_exp[i] = e else: log.info('Shifting T image {} by x,y=' '{:.1f},{:.1f}'.format(i + 1, shift_val[0], shift_val[1])) shift_val = [shift_val[1], shift_val[0]] t_data[i] = adjust.shift(d, shift_val) t_sigma[i] = adjust.shift(s, shift_val) t_exp[i] = adjust.shift(e, shift_val) # Correct for zero level from specified region or # lowest average value within a specified window if zl_method in ['mean', 'median']: do_auto = True if len(zl_reg_list) == len(r_data): try: self.correct_zero_level_region(r_data, t_data, zl_method, zl_reg_list, refheader, robust=zl_sigma) except ValueError: log.warning('Not applying zero level correction ' 'from specified regions; falling back ' 'to auto method.') else: do_auto = False if do_auto: cdelt = np.abs(3600. * df.getheadval('CDELT1')) radius = int(np.round(zl_radius / cdelt)) self.correct_zero_level_auto(r_data, t_data, zl_method, radius) else: log.debug('No zero level correction attempted.') # Compute Stokes I # Don't use nansum here, since data must overlap to be useful all_data = np.array(r_data + t_data) all_sigma = np.array(r_sigma + t_sigma) stokes_i = np.sum(all_data, axis=0) / float(nhwp) err_i = np.sqrt(np.sum(all_sigma ** 2, axis=0) / float(nhwp ** 2)) # Compute Stokes Q and U # method using R-T pairs 1/3 and 2/4 of HWP angles rmt_data = np.array(r_data) - np.array(t_data) r_var = np.array(r_sigma) ** 2 t_var = np.array(t_sigma) ** 2 rmt_sigma = np.sqrt(r_var + t_var) idx1 = [sort_idx[0]] idx2 = [sort_idx[2]] stokes_q, err_q, cov_qi = self.stokes( idx1, idx2, rmt_data, rmt_sigma, r_var, t_var) idx1 = [sort_idx[1]] idx2 = [sort_idx[3]] stokes_u, err_u, cov_ui = self.stokes( idx1, idx2, rmt_data, rmt_sigma, r_var, t_var) # bad pixel mask: set nans to bad val bpm = np.zeros_like(stokes_i).astype(int) bpm[np.isnan(stokes_i) | np.isnan(stokes_q) | np.isnan(stokes_u)] = 3 # Write out images self.dataout.imageset(stokes_i, "STOKES I") self.dataout.imageset(err_i, "ERROR I") self.dataout.imageset(stokes_q, "STOKES Q") self.dataout.imageset(err_q, "ERROR Q") self.dataout.imageset(stokes_u, "STOKES U") self.dataout.imageset(err_u, "ERROR U") self.dataout.imageset(cov_qi, "COVAR Q I") self.dataout.imageset(cov_ui, "COVAR U I") self.dataout.imageset(np.zeros_like(cov_qi), "COVAR Q U") self.dataout.imageset(bpm, "BAD PIXEL MASK")