Source code for sofia_redux.instruments.hawc.steps.basemap

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Mix-in class for mapping utilities."""

from astropy import wcs as astwcs
from astropy import log
from astropy.io import fits
import numpy as np

from sofia_redux.toolkit.utilities.func import stack

__all__ = ['BaseMap']


[docs] class BaseMap(object): """ Mapping utilities for pipeline steps. This class is designed to be used as mix-in for any pipeline step class that requires mapping functionality. It is assumed that the pipeline step will handle data input and output, as well as appropriate parameters. """ def __init__(self): # placeholders for necessary attributes, to be # defined in pipeline steps or assigned by local methods self.datain = None self.dataout = None self.nhwp = 0 self.pdata = [] self.allhead = None self.pmap = {}
[docs] def getarg(self, parname): """ Get input parameter. This should be implemented by the pipeline step. """ raise NotImplementedError
[docs] def addgap(self, data, x, y): """ Adjust x and y pixel coordinates for the gap between subarrays. Header keywords ALNGAPX, ALNGAPY, and ALNROTA are used to determine the gap parameters. The x and y arrays are updated in place. Parameters ---------- data : DataFits Input data to adjust x : array-like X coordinates for the input image. Must be 2-dimensional (nrow x ncol). y : array-like Y coordinates for the input image. Must be 2-dimensional (nrow x ncol). """ try: gapx = data.getheadval("alngapx", errmsg=False) except KeyError: msg = 'Missing ALNGAPX, assuming equal to 0.0' log.debug(msg) gapx = 0.0 try: gapy = data.getheadval("alngapy", errmsg=False) except KeyError: msg = 'Missing ALNGAPY, assuming equal to 0.0' log.debug(msg) gapy = 0.0 try: gapangle = np.radians(data.getheadval("alnrota", errmsg=False)) except KeyError: msg = 'Missing ALNROTA, assuming equal to 0.0' log.debug(msg) gapangle = 0.0 if np.allclose([gapx, gapy, gapangle], 0): return # we assume that the sub-arrays are split across columns in the middle. nrow, ncol = x.shape ncol = ncol // 2 # pixel coordinates of sub-array, relative to center suby, subx = np.mgrid[0:nrow, 0:ncol] # center of data is (0,0) subx = subx - ncol / 2. + 0.5 suby = suby - nrow / 2. + 0.5 # rotate sub-array by gapangle counterclockwise tmp = np.cos(gapangle) * subx - np.sin(gapangle) * suby suby = np.sin(gapangle) * subx + np.cos(gapangle) * suby subx = tmp # shift by gapx and gapy suby += gapy subx += gapx # revert to 0,0 being bottom left subx = subx - 0.5 + ncol / 2. suby = suby - 0.5 + nrow / 2. # update coordinates of right half of x,y x[:, ncol:] = subx + ncol y[:, ncol:] = suby
[docs] def checkvalid(self): """ Check data consistency. All input data are checked to make sure that SPECTEL1 and the number of HWP angles are the same across all input files. The number of HWP angles found is set as self.nhwp. """ log.debug('Started checking for valid input files') nwave = len(set([d.getheadval('SPECTEL1') for d in self.datain])) if nwave != 1: msg = "SPECTEL1 not the same among all input files!" log.error(msg) raise ValueError(msg) nhwp = set([d.getheadval('nhwp') for d in self.datain]) if len(nhwp) > 1: msg = "Number of HWP angles not the same among all input files!" log.error(msg) raise ValueError(msg) self.nhwp = list(nhwp)[0] log.debug('Finished checking valid input files')
[docs] def sumexptime(self): """ Sum exposure time over input data. The EXPTIME keyword in self.dataout is set to the summed value as the total exposure time on-source. """ log.debug('Started summing exposure times for input files') totexptime = sum([d.getheadval('EXPTIME') for d in self.datain]) self.dataout.setheadval('EXPTIME', totexptime, 'Total on-source exposure time [s]') log.debug('Finished summing exposure times for input files')
[docs] def read_resample_data(self, maxgoodpix): """ Read data from input images. This function ingests the data from self.datain, storing in the self.pdata list of dictionaries. Keys for each data dictionary are as follows; values are flattened arrays corresponding to good pixels only. - mask: bad pixel mask values - ra: RA coordinates - dec: Dec coordinates - I: Stokes I flux values - dI: Stokes I error values The following additional keys are present only if self.nhwp > 1: - Q: Stokes Q flux values - dQ: Stokes Q error values - U: Stokes U flux values - dU: Stokes U error values - QIcov: Q-I covariance values - UIcov: U-I covariance values - QUcov: Q-U covariance values Parameters ---------- maxgoodpix : int The maximum mask value to be considered good. If maxgoodpix=0, only good pixels present in both R and T arrays are used for stokes I. If maxgoodpix=2, widow pixels are also used for stokes I. """ log.debug("starting read_resample_data()") # We create self.pdata. pdata will be a list of python # dictionaries to contain all the data. self.pdata = [] for i, f in enumerate(self.datain): temp = {} naxis1, naxis2 = f.getheadval('NAXIS1'), f.getheadval('NAXIS2') pix2, pix1 = np.mgrid[0:naxis2, 0:naxis1] self.addgap(f, pix1, pix2) tempwcs = astwcs.WCS(f.header) ra, dec = tempwcs.wcs_pix2world(pix1, pix2, 0) # convert the mask to a float so we can keep track of fractional # overlap based on gaussian smoothing if 'BAD PIXEL MASK' in f.imgnames: temp['mask'] = f.imageget('BAD PIXEL MASK').astype(np.float64) else: temp['mask'] = np.zeros((naxis2, naxis1), dtype=np.float64) ngood = np.where(temp['mask'] <= maxgoodpix) temp['mask'] = temp['mask'][ngood] temp['ra'] = ra[ngood] temp['dec'] = dec[ngood] if 'STOKES I' in f.imgnames: temp['I'] = f.imageget('STOKES I').astype(np.float64)[ngood] temp['dI'] = f.imageget('ERROR I').astype(np.float64)[ngood] else: temp['I'] = f.imageget('PRIMARY IMAGE').astype( np.float64)[ngood] temp['dI'] = f.imageget('NOISE').astype(np.float64)[ngood] if self.nhwp > 1: # polarization for var in ('Q', 'U'): temp[var] = f.imageget( 'STOKES %s' % var).astype(np.float64)[ngood] temp['d' + var] = f.imageget( 'ERROR %s' % var).astype(np.float64)[ngood] temp['QIcov'] = f.imageget( 'COVAR Q I').astype(np.float64)[ngood] temp['UIcov'] = f.imageget( 'COVAR U I').astype(np.float64)[ngood] temp['QUcov'] = f.imageget( 'COVAR Q U').astype(np.float64)[ngood] log.debug('storing {} good pixels ' '(out of {})'.format(ngood[0].size, naxis1 * naxis2)) self.pdata.append(temp) log.debug("finished read_resample_data()")
[docs] def make_resample_map(self, cdelt): """ Construct map parameters from input data. The coordinate grid for the map is determined from the range of the RA and Dec coordinates in the input data. The grid and associated reference values are stored in self.pmap, for use by the resampling algorithm. A draft header containing appropriate WCS information for the output map is stored in self.allhead. The 'proj' parameter is used to set the projection type in the output header. The 'sizelimit' argument is tested against, to determine if the output map is too large. This error case is most likely due to inappropriately grouped input files. These parameters must be defined by the pipeline step inheriting from this class. Parameters ---------- cdelt : float The pixel scale for the output map, in arcsec. """ log.debug("starting make_resample_map()") proj = self.getarg('proj') ra = np.hstack([d['ra'] for d in self.pdata]) dec = np.hstack([d['dec'] for d in self.pdata]) base_ra = np.mean(ra) base_dec = np.mean(dec) # offsets in arcsec, with RA reversed xs = -1 * (ra - base_ra) * np.cos(np.radians(base_dec)) * 3600. ys = (dec - base_dec) * 3600. coordinates = stack(xs, ys) xmin = np.nanmin(xs) xmax = np.nanmax(xs) ymin = np.nanmin(ys) ymax = np.nanmax(ys) xrange = xmax - xmin yrange = ymax - ymin # size of x and y axes in pixels naxis1 = int(np.ceil(xrange / cdelt)) naxis2 = int(np.ceil(yrange / cdelt)) # check for too large map limit = self.getarg('sizelimit') if naxis1 > limit or naxis2 > limit: msg = "Output map is too large (> %.0f pixels)" % limit log.error(msg) raise ValueError(msg) # output grid in arcsec offsets xout = np.arange(naxis1, dtype=float) * cdelt + xmin yout = np.arange(naxis2, dtype=float) * cdelt + ymin grid = xout, yout # update crval to match an actual output pixel crval2 = base_dec + (yout[naxis2 // 2] / 3600.) crval1 = base_ra - (xout[naxis1 // 2] / (3600. * np.cos(np.radians(crval2)))) # draft an output header allhead = fits.Header() allhead['SIMPLE'] = 'T' allhead['BITPIX'] = -64 allhead['NAXIS'] = 2 allhead['NAXIS1'] = naxis1 allhead['NAXIS2'] = naxis2 allhead['CDELT1'] = -1 * cdelt / 3600. allhead['CDELT2'] = cdelt / 3600. allhead['CRVAL1'] = crval1 allhead['CRVAL2'] = crval2 allhead['CRPIX1'] = naxis1 // 2 + 1 allhead['CRPIX2'] = naxis2 // 2 + 1 allhead['CTYPE1'] = 'RA---%s' % proj allhead['CTYPE2'] = 'DEC--%s' % proj allhead['EQUINOX'] = self.datain[0].getheadval('EQUINOX') allhead['NHWP'] = self.nhwp log.debug("make_resample_map: " "naxis1,naxis2 = %d,%d" % (naxis1, naxis2)) self.allhead = allhead # set the grid information in self.pmap self.pmap = { 'base_ra': base_ra, 'base_dec': base_dec, 'xmin': xmin, 'xmax': xmax, 'ymin': ymin, 'ymax': ymax, 'shape': (ra.size, naxis2, naxis1), 'xout': xout, 'yout': yout, 'xrange': xrange, 'yrange': yrange, 'delta': (cdelt, cdelt), 'coordinates': coordinates, 'grid': grid} log.debug("finished make_resample_map()")