Source code for sofia_redux.instruments.hawc.steps.stepprepare

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Raw data preparation pipeline step."""

from astropy import log
from astropy.io import fits
import numpy as np

from sofia_redux.instruments.hawc.stepparent import StepParent

__all__ = ['StepPrepare']


[docs] class StepPrepare(StepParent): """ Prepare input file for processing. This step separates the flux column into separate columns for R and T arrays. It may also fill in missing columns, and reformat or rename existing columns to conventions expected by subsequent pipeline steps. Currently steprepare assumes the input array has dimensions 64 x 41, and just splits it into two 32 x 41 sub-arrays which are named 'R Array' (columns 1-32) and 'T Array' (columns 33-64) in the output file. If a column named 'Chop Offset' is not present in the input data, it is created, replicating the column given in the parameter 'chpoff'. If a column named 'HWP Angle' is not present, then it is generated by multiplying the 'hwpcounts' column by a conversion factor obtained from the parameter 'hwpconv'. Input for this step is a raw FITS file, containing the primary HDU (with only the header), the Configuration HDU, and a binary table ('Timestream') containing the arrays, chop and HWP signals, etc. The output for this step is the same as the input, with the original flux array removed and 'R Array', 'T Array' columns added in its place. """
[docs] def setup(self): """ Set parameters and metadata for the pipeline step. Output files have PRODTYPE = 'prepare', and are named with the step abbreviation 'PRE'. Parameters defined for this step are: detcounts : str Name of the table column containing the detector flux values for R/T arrays. Usually set to 'SQ1 Feedback'. hwpcounts : str Name of the table column containing the HWP counts (only used if column 'HWP Angle' is not present). hwpconv : float Value to convert hwpcounts to HWP Angle (only used if column 'HWP Angle' is not present). labmode : bool If set, data is assumed to be missing header keywords and data columns for on-sky data. These columns are filled in with default values to allow processing to continue. replacenod : bool If set, the 'Nod Offset' column will be replaced by a calculation based on RA/DEC. If not set, the original column will be used. replacenod=True is recommended. chpoffsofiars : bool If set, the 'Chop Offset' column will be calculated from the SofiaChopR/S columns. colrename : str List of data columns to rename, as a '|' separated string. The format is: 'oldname1->newname1|oldname2->newname2|...'. coldelete : list of str List of data columns to delete. pixscalist : list of float List of PIXSCAL values, one for each HAWC waveband. Used to write a FITS header keyword. traceshift : int Number of samples to shift the data by. If non-zero, all columns except Timestamp, R Array, and T Array will by shifted by this amount, in order to align the instrument readouts with the metadata. removedropouts : bool If set, data dropouts (with RA = Dec = 0) will be removed from the file. """ # Name of the pipeline reduction step self.name = 'prepare' self.description = 'Prepare Data Arrays' # Shortcut for pipeline reduction step and identifier for # saved file names. self.procname = 'pre' # Clear Parameter list self.paramlist = [] # Append parameters self.paramlist.append(['detcounts', 'SQ1 Feedback', 'Name of the input fits binary table ' 'column containing the detector flux ' 'values for R/T arrays']) self.paramlist.append(['hwpcounts', 'hwpCounts', 'Name of the input fits column containing ' 'the HWP counts (only used if column ' '"HWP Angle" is not present)']) self.paramlist.append(['hwpconv', (360. / 1440.), 'Value to convert hwpcounts to HWP ' 'Angles (only used if column "HWP Angle" ' 'is not present)']) self.paramlist.append(['labmode', False, 'If TRUE (processing lab data), will ' 'fill in with zeros a few columns and ' 'keywords that are important for the DRP']) self.paramlist.append(['replacenod', True, 'If TRUE will replace Nod Offset by ' 'calculation based on RA/DEC. If False, ' 'use original column (has problems)']) self.paramlist.append(['chpoffsofiars', True, 'If TRUE will calculate Chop Offset ' 'based on SofiaChopR/S. If False, use ' 'colrename to specify column to use']) self.paramlist.append(['colrename', '', 'List of data columns to rename, ' 'The format is: ' 'oldname1->newname1|oldname2->newname2|...']) self.paramlist.append(['coldelete', ["hwpA", "hwpB"], 'List of data columns to delete. ' 'The format is: ["column1", "column2", ...]']) self.paramlist.append(['pixscalist', [2.6, 4.0, 4.0, 6.8, 9.0], 'List for PIXSCAL values for each band']) self.paramlist.append(['traceshift', 0, 'Number of samples to shift the data ' '(default is 0=no shift)']) self.paramlist.append(['removedropouts', False, 'Remove data dropouts (i.e. data with ' 'RA==Dec==0 - default is False)'])
[docs] def read_pixscal(self): """ Read a pixel scale value from the parameters. The parameters are expected to be defined as a list, with one entry for each HAWC band. The correct value for the input data is selected from the list. Returns ------- float The pixel scale. """ pixscalist = self.getarg('pixscalist') waveband = self.datain.getheadval('spectel1') bands = ['A', 'B', 'C', 'D', 'E'] try: idx = bands.index(waveband[-1]) except (ValueError, IndexError): # waveband not in list msg = 'Cannot parse waveband=%s' % waveband log.error(msg) raise ValueError(msg) try: pixscal = pixscalist[idx] except IndexError: msg = 'Need pixscal values for all wavebands' log.error(msg) raise IndexError(msg) return pixscal
[docs] def run(self): """ Run the data reduction algorithm. Because this step is single-in, single-out (SISO), self.datain must be a DataFits object. The output is also a DataFits object, stored in self.dataout. The process is: 1. Read the input data table. 2. Split the flux array into R and T subarrays. 3. Rename columns specified in the 'colrename' parameter. 4. Remove any columns specified in the 'coldelete' parameter. 5. Add any columns necessary from a housekeeping data file. 6. Calculate chop and nod offsets and HWP angles as necessary. 7. Shift metadata columns to match flux samples if necessary. 8. Store the data. Input data is modified in place. """ # Read input parameters tstream = self.getarg('detcounts') hwpcnts = self.getarg('hwpcounts') hwpconv = self.getarg('hwpconv') labmode = self.getarg('labmode') replacenodoffset = self.getarg('replacenod') chpoffsofiars = self.getarg('chpoffsofiars') colrename_str = self.getarg('colrename') coldelete = self.getarg('coldelete') traceshift = self.getarg('traceshift') removedropouts = self.getarg('removedropouts') # Convert colrename to a dictionary # colrename['oldname']=newname colrename = {} if len(colrename_str) > 0: for colrep in colrename_str.split('|'): colspl = colrep.split('->') # don't if '->' not found if len(colspl) < 2: log.warning('Invalid colrename item = <%s>' % colrep) else: colrename[colspl[0].strip()] = colspl[1].strip() # Names of the columns in the table input rawnames = self.datain.table.names # Handle Detector data # Get the timestream data into coltstream coltstream = None for i, c in enumerate(rawnames): if c == tstream: coltstream = self.datain.table.field(c) if tstream not in rawnames: msg = "Table %s not found in raw data" % tstream log.error(msg) raise ValueError(msg) # Splitting the input array into R an T rarray = coltstream[:, :, :64] tarray = coltstream[:, :, 64:] # array dimensions (R must match T) nsamp, nrow, ncol = rarray.shape # Define new columns to be written -> COL # Add R array and T array col = [fits.Column(name='R array', unit='counts', array=rarray, format='%dD' % (nrow * ncol), dim='(%d,%d)' % tuple([ncol, nrow])), fits.Column(name='T array', unit='counts', array=tarray, format='%dD' % (nrow * ncol), dim='(%d,%d)' % tuple([ncol, nrow]))] # Add ColReplace columns only if they are # not originally in the input file for oldcname in colrename.keys(): newcname = colrename[oldcname] # Make sure it's not already in if newcname not in rawnames: # Error message if oldcname not in rawnames if oldcname not in rawnames: msg = "Column %s not found in raw data" % oldcname log.error(msg) raise ValueError(msg) # Search for column to replace colunit = None coldata = None for i in range(len(rawnames)): if rawnames[i] == oldcname: coldata = self.datain.table.field(i) if len(coldata.shape) > 1: # raise error for multi-dim -- # needs special handling msg = "Column %s is multidimensional; " \ "cannot rename" % oldcname log.error(msg) raise ValueError(msg) colunit = self.datain.table.columns.units[i] col.append(fits.Column(name=newcname, unit=colunit, array=coldata, format='1E')) log.debug('Renaming column %s to %s' % (oldcname, newcname)) # mark column for deletion colrename[oldcname] = 'delete' else: log.warning("Column %s already in output data, " "replace is ignored" % newcname) # Add HWP angle column only if it is not # originally in input file (convert to degrees) if 'HWP Angle' not in rawnames: if hwpcnts not in rawnames: msg = "Column %s not found in raw data" % hwpcnts log.error(msg) raise ValueError(msg) colhwpang = None for c in rawnames: if c == hwpcnts: colhwpang = self.datain.table.field(c) * hwpconv col.append(fits.Column(name='HWP Angle', unit='degrees', array=colhwpang, format='1E')) # Delete old Nod Offset column and make new # column to be replaced by Nod Offset based on RA and DEC if replacenodoffset: # Remove existing Nod Offset columns in original of new columns if 'Nod Offset' in rawnames: self.datain.table.columns.del_col('Nod Offset') for c in col: if c.name == 'Nod Offset': col.remove(c) # Append new nod offset column col.append(fits.Column(name='Nod Offset', unit='arcseconds', array=np.zeros(nsamp), format='1E')) else: log.warning('Using original Nod Offset column ' '(beware of problems with this column)') # Prepare a column for Chop Offset based on SofiaChopR and S signals if chpoffsofiars: col.append(fits.Column(name='Chop Offset', unit='arcseconds', array=np.zeros(nsamp), format='1E')) else: log.warning('Using chpoffsofiars = False. Use ' 'colrename to define Chop Offset ' 'with the correct column.') # Remove old columns # Removes the original array (SQ1 Feedback) # from the input data (will keep only "R Array" and "T Array") self.datain.table.columns.del_col(tstream) # Remove other columns from the input data for i in range(len(coldelete)): if coldelete[i] in rawnames: # Added B/C tabhdu = . . . below would crash self.datain.table.columns.del_col(coldelete[i]) # Remove columns from colreplace for oldcname in colrename.keys(): if colrename[oldcname] == 'delete': self.datain.table.columns.del_col(oldcname) # Replicates the input data to generate the output self.dataout = self.datain.copy() self.dataout.filename = self.datain.filename # Define the original columns to be written origcols = self.datain.table.columns # Make new table newcols = fits.ColDefs(col) log.debug("New Cols = " + repr(newcols.names)) log.debug("Orig Cols = " + repr(origcols.names)) tbhdu = fits.BinTableHDU.from_columns(newcols + origcols) # assuming there is only one binary table tabdataname = self.datain.tabnames[0] self.dataout.tableset(tbhdu.data, tabdataname, tbhdu.header) # Modify data # Configuration for LAB MODE if labmode: log.warning('Running in LAB MODE. Will fill in ' 'important columns and keywords ' 'with random values') # Fill in certain columns with zeros if those are NaNs # This is important for stepdemod and stepwcs for f in ['Azimuth Error', 'Elevation Error', 'Azimuth', 'Elevation', 'Array VPA']: t = np.where(np.isnan(self.dataout.table.field(f))) self.dataout.table.field(f)[t] = np.float64(0.0) if len(t[0]) > 0: log.warning('%d values in %s column were ' 'NaNs and were substituted ' 'by zeros' % (len(t[0]), f)) # Fill in important keywords self.dataout.setheadval("OBSRA", 1.0) self.dataout.setheadval("OBSDEC", 1.0) # Adapt PIXSCAL keywords pixscal = self.read_pixscal() self.dataout.setheadval("PIXSCAL", pixscal) # Remove an engineering keyword self.dataout.delheadval('XPADDING') # Replace Nod Offset column by a computation # of offsets based on RA and DEC if replacenodoffset: # Reading the reference position from the header try: # convert to degrees telra = self.datain.getheadval("TELRA") * 15.0 teldec = self.datain.getheadval("TELDEC") except KeyError: log.warning('TELRA AND TELDEC NOT DEFINED IN ' 'INPUT DATA. WILL USE OBSRA AND OBSDEC ' 'INSTEAD (not recommended).') # convert to degrees telra = self.datain.getheadval("OBSRA") * 15.0 teldec = self.datain.getheadval("OBSDEC") # Get the RA and DEC columns try: # convert to degrees ra_col = self.dataout.table.field('RA') * 15.0 dec_col = self.dataout.table.field('DEC') except KeyError: msg = 'RA and DEC columns not found in data' log.warning(msg) raise ValueError(msg) # Calculate new nod offset column cos_dec = np.cos(teldec * (np.pi / 180.0)) # convert to arcseconds offra = ((ra_col - telra) * cos_dec) * 3600.0 offdec = (dec_col - teldec) * 3600.0 self.dataout.table['Nod Offset'] = np.sqrt(offra**2. + offdec**2.) else: log.warning('Using original Nod Offset column ' '(beware of problems with this column)') # Computing Chop Offset based on SofiaChopR and S signals if chpoffsofiars: try: chpr = self.dataout.table.field('SofiaChopR') chps = self.dataout.table.field('SofiaChopS') chpsync = self.dataout.table.field('SofiaChopSync') except KeyError: msg = 'SofiaChopR and S columns not found in data' log.warning(msg) raise ValueError(msg) # Make sure all chop signals are centered at 0, # by subtracting the mean value for each one chpr_col = chpr - np.mean(chpr) chps_col = chps - np.mean(chps) chpsync_col = chpsync - np.mean(chpsync) log.debug('Mean Chop R: {}'.format(np.mean(chpr))) log.debug('Mean Chop S: {}'.format(np.mean(chps))) log.debug('Mean Chop Sync {}'.format(np.mean(chpsync))) # Normalize all signals by dividing them by # the amplitude (assumed to be (max - min)/2) low, high = np.percentile(chpr_col, [5, 95]) chpr_col = chpr_col / ((high - low) / 2.) log.debug('Low, high Chop R: {}, {}'.format(low, high)) low, high = np.percentile(chps_col, [5, 95]) chps_col = chps_col / ((high - low) / 2.) log.debug('Low, high Chop S: {}, {}'.format(low, high)) low, high = np.percentile(chpsync_col, [5, 95]) chpsync_col = chpsync_col / ((high - low) / 2.) log.debug('Low, high Chop Sync: {} {}'.format(low, high)) # check for missing chop sync if np.abs(low) < 1.0 and np.abs(high) < 1.0: log.warning('Chop Sync signal may be missing; ' 'check chop offsets') # Add ChopR and ChopS with ChopSync, to test # which of those are in phase with ChopSync # Here we assume the chop signals have a # phase difference of either 0 or 180 deg addchpr_sync = chpr_col + chpsync_col addchps_sync = chps_col + chpsync_col stdval = [np.nanstd(chpr_col), np.nanstd(addchpr_sync), np.nanstd(chps_col), np.nanstd(addchps_sync)] for i, v in enumerate(stdval): if not np.isfinite(v): stdval[i] = 1e6 log.debug('Std.Dev for ChopR, ChopR+Sync; ' 'ChopS, ChopS+Sync: ' '{}, {}; {}, {}'.format(*stdval)) # Add chopr and chops to Chop Offset if (stdval[1] > stdval[0]) and \ (stdval[3] > stdval[2]): self.dataout.table['Chop Offset'] = chpr + chps log.debug('Assigning Chop Offset to SofiaChopR + ' 'SofiaChopS (both are in phase ' 'with sofiaChopSync)') elif (stdval[1] > stdval[0]) and \ (stdval[3] <= stdval[2]): self.dataout.table['Chop Offset'] = chpr - chps log.debug('Assigning Chop Offset to ' 'SofiaChopR - SofiaChopS (ChopS is out ' 'of phase with sofiaChopSync)') elif (stdval[1] <= stdval[0]) and \ (stdval[3] > stdval[2]): self.dataout.table['Chop Offset'] = chps - chpr log.debug('Assigning Chop Offset to ' 'SofiaChopS - SofiaChopR (ChopR is out ' 'of phase with sofiaChopSync)') elif (stdval[1] <= stdval[0]) and \ (stdval[3] <= stdval[2]): self.dataout.table['Chop Offset'] = - chpr - chps log.debug('Assigning Chop Offset to ' '-1*(SofiaChopR + SofiaChopS) ' '(both are out of phase with sofiaChopSync)') else: log.warning('Using chpoffsofiars = False. Use colrename ' 'to define Chop Offset with the correct column.') if 'Chop Offset' not in self.dataout.table.names: msg = 'Problem with chop offset assignment' log.error(msg) raise ValueError(msg) # Shift all data relative to R/T Array: Loop through all columns # (nsamp is number of samples) if traceshift: for colnam in self.dataout.table.names: # Skip if it's R/T Array or # Timestamp (these are the correct ones) if colnam in ['Timestamp', 'R array', 'T array']: continue # Shift data self.dataout.table[colnam][traceshift:] = \ self.dataout.table[colnam][:-traceshift] # Cut the table at the start self.dataout.table = self.dataout.table[traceshift:] # Remove data with auxiliary (SOFIA HK data) dropouts if removedropouts and not labmode: mask = np.where((self.dataout.table['RA'] != 0.0) & (self.dataout.table['Dec'] != 0.0)) ndiff = len(self.dataout.table) - len(mask[0]) if ndiff: log.warning('Remove Dropouts: Removed %d points' % ndiff) log.warning('Dropouts: shorted data ' 'from %d to %d points' % (len(self.dataout.table), len(mask[0]))) self.dataout.tabdata[0] = self.dataout.table[mask]