# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Raw data preparation pipeline step."""
from astropy import log
from astropy.io import fits
import numpy as np
from sofia_redux.instruments.hawc.stepparent import StepParent
__all__ = ['StepPrepare']
[docs]
class StepPrepare(StepParent):
"""
Prepare input file for processing.
This step separates the flux column into separate columns
for R and T arrays. It may also fill in missing columns,
and reformat or rename existing columns to conventions expected
by subsequent pipeline steps.
Currently steprepare assumes the input array has dimensions 64 x
41, and just splits it into two 32 x 41 sub-arrays which are
named 'R Array' (columns 1-32) and 'T Array' (columns 33-64) in the
output file. If a column named 'Chop Offset' is not present in the
input data, it is created, replicating the column given in the
parameter 'chpoff'. If a column named 'HWP Angle' is not present,
then it is generated by multiplying the 'hwpcounts' column by a
conversion factor obtained from the parameter 'hwpconv'.
Input for this step is a raw FITS file, containing the primary
HDU (with only the header), the Configuration HDU, and a binary
table ('Timestream') containing the arrays, chop and HWP
signals, etc. The output for this step is the same as the input,
with the original flux array removed and 'R Array', 'T Array' columns
added in its place.
"""
[docs]
def setup(self):
"""
Set parameters and metadata for the pipeline step.
Output files have PRODTYPE = 'prepare', and are named with
the step abbreviation 'PRE'.
Parameters defined for this step are:
detcounts : str
Name of the table column containing the detector flux
values for R/T arrays. Usually set to 'SQ1 Feedback'.
hwpcounts : str
Name of the table column containing the HWP counts
(only used if column 'HWP Angle' is not present).
hwpconv : float
Value to convert hwpcounts to HWP Angle
(only used if column 'HWP Angle' is not present).
labmode : bool
If set, data is assumed to be missing header keywords
and data columns for on-sky data. These columns are filled
in with default values to allow processing to continue.
replacenod : bool
If set, the 'Nod Offset' column will be replaced by
a calculation based on RA/DEC. If not set, the original
column will be used. replacenod=True is recommended.
chpoffsofiars : bool
If set, the 'Chop Offset' column will be calculated from
the SofiaChopR/S columns.
colrename : str
List of data columns to rename, as a '|' separated string.
The format is: 'oldname1->newname1|oldname2->newname2|...'.
coldelete : list of str
List of data columns to delete.
pixscalist : list of float
List of PIXSCAL values, one for each HAWC waveband. Used
to write a FITS header keyword.
traceshift : int
Number of samples to shift the data by. If non-zero, all
columns except Timestamp, R Array, and T Array will by
shifted by this amount, in order to align the instrument
readouts with the metadata.
removedropouts : bool
If set, data dropouts (with RA = Dec = 0) will be removed
from the file.
"""
# Name of the pipeline reduction step
self.name = 'prepare'
self.description = 'Prepare Data Arrays'
# Shortcut for pipeline reduction step and identifier for
# saved file names.
self.procname = 'pre'
# Clear Parameter list
self.paramlist = []
# Append parameters
self.paramlist.append(['detcounts', 'SQ1 Feedback',
'Name of the input fits binary table '
'column containing the detector flux '
'values for R/T arrays'])
self.paramlist.append(['hwpcounts', 'hwpCounts',
'Name of the input fits column containing '
'the HWP counts (only used if column '
'"HWP Angle" is not present)'])
self.paramlist.append(['hwpconv', (360. / 1440.),
'Value to convert hwpcounts to HWP '
'Angles (only used if column "HWP Angle" '
'is not present)'])
self.paramlist.append(['labmode', False,
'If TRUE (processing lab data), will '
'fill in with zeros a few columns and '
'keywords that are important for the DRP'])
self.paramlist.append(['replacenod', True,
'If TRUE will replace Nod Offset by '
'calculation based on RA/DEC. If False, '
'use original column (has problems)'])
self.paramlist.append(['chpoffsofiars', True,
'If TRUE will calculate Chop Offset '
'based on SofiaChopR/S. If False, use '
'colrename to specify column to use'])
self.paramlist.append(['colrename', '',
'List of data columns to rename, '
'The format is: '
'oldname1->newname1|oldname2->newname2|...'])
self.paramlist.append(['coldelete', ["hwpA", "hwpB"],
'List of data columns to delete. '
'The format is: ["column1", "column2", ...]'])
self.paramlist.append(['pixscalist', [2.6, 4.0, 4.0, 6.8, 9.0],
'List for PIXSCAL values for each band'])
self.paramlist.append(['traceshift', 0,
'Number of samples to shift the data '
'(default is 0=no shift)'])
self.paramlist.append(['removedropouts', False,
'Remove data dropouts (i.e. data with '
'RA==Dec==0 - default is False)'])
[docs]
def read_pixscal(self):
"""
Read a pixel scale value from the parameters.
The parameters are expected to be defined as a list, with
one entry for each HAWC band. The correct value for the
input data is selected from the list.
Returns
-------
float
The pixel scale.
"""
pixscalist = self.getarg('pixscalist')
waveband = self.datain.getheadval('spectel1')
bands = ['A', 'B', 'C', 'D', 'E']
try:
idx = bands.index(waveband[-1])
except (ValueError, IndexError):
# waveband not in list
msg = 'Cannot parse waveband=%s' % waveband
log.error(msg)
raise ValueError(msg)
try:
pixscal = pixscalist[idx]
except IndexError:
msg = 'Need pixscal values for all wavebands'
log.error(msg)
raise IndexError(msg)
return pixscal
[docs]
def run(self):
"""
Run the data reduction algorithm.
Because this step is single-in, single-out (SISO),
self.datain must be a DataFits object. The output
is also a DataFits object, stored in self.dataout.
The process is:
1. Read the input data table.
2. Split the flux array into R and T subarrays.
3. Rename columns specified in the 'colrename' parameter.
4. Remove any columns specified in the 'coldelete' parameter.
5. Add any columns necessary from a housekeeping data file.
6. Calculate chop and nod offsets and HWP angles as necessary.
7. Shift metadata columns to match flux samples if necessary.
8. Store the data. Input data is modified in place.
"""
# Read input parameters
tstream = self.getarg('detcounts')
hwpcnts = self.getarg('hwpcounts')
hwpconv = self.getarg('hwpconv')
labmode = self.getarg('labmode')
replacenodoffset = self.getarg('replacenod')
chpoffsofiars = self.getarg('chpoffsofiars')
colrename_str = self.getarg('colrename')
coldelete = self.getarg('coldelete')
traceshift = self.getarg('traceshift')
removedropouts = self.getarg('removedropouts')
# Convert colrename to a dictionary
# colrename['oldname']=newname
colrename = {}
if len(colrename_str) > 0:
for colrep in colrename_str.split('|'):
colspl = colrep.split('->')
# don't if '->' not found
if len(colspl) < 2:
log.warning('Invalid colrename item = <%s>' % colrep)
else:
colrename[colspl[0].strip()] = colspl[1].strip()
# Names of the columns in the table input
rawnames = self.datain.table.names
# Handle Detector data
# Get the timestream data into coltstream
coltstream = None
for i, c in enumerate(rawnames):
if c == tstream:
coltstream = self.datain.table.field(c)
if tstream not in rawnames:
msg = "Table %s not found in raw data" % tstream
log.error(msg)
raise ValueError(msg)
# Splitting the input array into R an T
rarray = coltstream[:, :, :64]
tarray = coltstream[:, :, 64:]
# array dimensions (R must match T)
nsamp, nrow, ncol = rarray.shape
# Define new columns to be written -> COL
# Add R array and T array
col = [fits.Column(name='R array', unit='counts', array=rarray,
format='%dD' % (nrow * ncol),
dim='(%d,%d)' % tuple([ncol, nrow])),
fits.Column(name='T array', unit='counts', array=tarray,
format='%dD' % (nrow * ncol),
dim='(%d,%d)' % tuple([ncol, nrow]))]
# Add ColReplace columns only if they are
# not originally in the input file
for oldcname in colrename.keys():
newcname = colrename[oldcname]
# Make sure it's not already in
if newcname not in rawnames:
# Error message if oldcname not in rawnames
if oldcname not in rawnames:
msg = "Column %s not found in raw data" % oldcname
log.error(msg)
raise ValueError(msg)
# Search for column to replace
colunit = None
coldata = None
for i in range(len(rawnames)):
if rawnames[i] == oldcname:
coldata = self.datain.table.field(i)
if len(coldata.shape) > 1:
# raise error for multi-dim --
# needs special handling
msg = "Column %s is multidimensional; " \
"cannot rename" % oldcname
log.error(msg)
raise ValueError(msg)
colunit = self.datain.table.columns.units[i]
col.append(fits.Column(name=newcname, unit=colunit,
array=coldata,
format='1E'))
log.debug('Renaming column %s to %s' %
(oldcname, newcname))
# mark column for deletion
colrename[oldcname] = 'delete'
else:
log.warning("Column %s already in output data, "
"replace is ignored" % newcname)
# Add HWP angle column only if it is not
# originally in input file (convert to degrees)
if 'HWP Angle' not in rawnames:
if hwpcnts not in rawnames:
msg = "Column %s not found in raw data" % hwpcnts
log.error(msg)
raise ValueError(msg)
colhwpang = None
for c in rawnames:
if c == hwpcnts:
colhwpang = self.datain.table.field(c) * hwpconv
col.append(fits.Column(name='HWP Angle',
unit='degrees', array=colhwpang,
format='1E'))
# Delete old Nod Offset column and make new
# column to be replaced by Nod Offset based on RA and DEC
if replacenodoffset:
# Remove existing Nod Offset columns in original of new columns
if 'Nod Offset' in rawnames:
self.datain.table.columns.del_col('Nod Offset')
for c in col:
if c.name == 'Nod Offset':
col.remove(c)
# Append new nod offset column
col.append(fits.Column(name='Nod Offset', unit='arcseconds',
array=np.zeros(nsamp),
format='1E'))
else:
log.warning('Using original Nod Offset column '
'(beware of problems with this column)')
# Prepare a column for Chop Offset based on SofiaChopR and S signals
if chpoffsofiars:
col.append(fits.Column(name='Chop Offset', unit='arcseconds',
array=np.zeros(nsamp),
format='1E'))
else:
log.warning('Using chpoffsofiars = False. Use '
'colrename to define Chop Offset '
'with the correct column.')
# Remove old columns
# Removes the original array (SQ1 Feedback)
# from the input data (will keep only "R Array" and "T Array")
self.datain.table.columns.del_col(tstream)
# Remove other columns from the input data
for i in range(len(coldelete)):
if coldelete[i] in rawnames:
# Added B/C tabhdu = . . . below would crash
self.datain.table.columns.del_col(coldelete[i])
# Remove columns from colreplace
for oldcname in colrename.keys():
if colrename[oldcname] == 'delete':
self.datain.table.columns.del_col(oldcname)
# Replicates the input data to generate the output
self.dataout = self.datain.copy()
self.dataout.filename = self.datain.filename
# Define the original columns to be written
origcols = self.datain.table.columns
# Make new table
newcols = fits.ColDefs(col)
log.debug("New Cols = " + repr(newcols.names))
log.debug("Orig Cols = " + repr(origcols.names))
tbhdu = fits.BinTableHDU.from_columns(newcols + origcols)
# assuming there is only one binary table
tabdataname = self.datain.tabnames[0]
self.dataout.tableset(tbhdu.data, tabdataname, tbhdu.header)
# Modify data
# Configuration for LAB MODE
if labmode:
log.warning('Running in LAB MODE. Will fill in '
'important columns and keywords '
'with random values')
# Fill in certain columns with zeros if those are NaNs
# This is important for stepdemod and stepwcs
for f in ['Azimuth Error', 'Elevation Error',
'Azimuth', 'Elevation', 'Array VPA']:
t = np.where(np.isnan(self.dataout.table.field(f)))
self.dataout.table.field(f)[t] = np.float64(0.0)
if len(t[0]) > 0:
log.warning('%d values in %s column were '
'NaNs and were substituted '
'by zeros' % (len(t[0]), f))
# Fill in important keywords
self.dataout.setheadval("OBSRA", 1.0)
self.dataout.setheadval("OBSDEC", 1.0)
# Adapt PIXSCAL keywords
pixscal = self.read_pixscal()
self.dataout.setheadval("PIXSCAL", pixscal)
# Remove an engineering keyword
self.dataout.delheadval('XPADDING')
# Replace Nod Offset column by a computation
# of offsets based on RA and DEC
if replacenodoffset:
# Reading the reference position from the header
try:
# convert to degrees
telra = self.datain.getheadval("TELRA") * 15.0
teldec = self.datain.getheadval("TELDEC")
except KeyError:
log.warning('TELRA AND TELDEC NOT DEFINED IN '
'INPUT DATA. WILL USE OBSRA AND OBSDEC '
'INSTEAD (not recommended).')
# convert to degrees
telra = self.datain.getheadval("OBSRA") * 15.0
teldec = self.datain.getheadval("OBSDEC")
# Get the RA and DEC columns
try:
# convert to degrees
ra_col = self.dataout.table.field('RA') * 15.0
dec_col = self.dataout.table.field('DEC')
except KeyError:
msg = 'RA and DEC columns not found in data'
log.warning(msg)
raise ValueError(msg)
# Calculate new nod offset column
cos_dec = np.cos(teldec * (np.pi / 180.0))
# convert to arcseconds
offra = ((ra_col - telra) * cos_dec) * 3600.0
offdec = (dec_col - teldec) * 3600.0
self.dataout.table['Nod Offset'] = np.sqrt(offra**2. + offdec**2.)
else:
log.warning('Using original Nod Offset column '
'(beware of problems with this column)')
# Computing Chop Offset based on SofiaChopR and S signals
if chpoffsofiars:
try:
chpr = self.dataout.table.field('SofiaChopR')
chps = self.dataout.table.field('SofiaChopS')
chpsync = self.dataout.table.field('SofiaChopSync')
except KeyError:
msg = 'SofiaChopR and S columns not found in data'
log.warning(msg)
raise ValueError(msg)
# Make sure all chop signals are centered at 0,
# by subtracting the mean value for each one
chpr_col = chpr - np.mean(chpr)
chps_col = chps - np.mean(chps)
chpsync_col = chpsync - np.mean(chpsync)
log.debug('Mean Chop R: {}'.format(np.mean(chpr)))
log.debug('Mean Chop S: {}'.format(np.mean(chps)))
log.debug('Mean Chop Sync {}'.format(np.mean(chpsync)))
# Normalize all signals by dividing them by
# the amplitude (assumed to be (max - min)/2)
low, high = np.percentile(chpr_col, [5, 95])
chpr_col = chpr_col / ((high - low) / 2.)
log.debug('Low, high Chop R: {}, {}'.format(low, high))
low, high = np.percentile(chps_col, [5, 95])
chps_col = chps_col / ((high - low) / 2.)
log.debug('Low, high Chop S: {}, {}'.format(low, high))
low, high = np.percentile(chpsync_col, [5, 95])
chpsync_col = chpsync_col / ((high - low) / 2.)
log.debug('Low, high Chop Sync: {} {}'.format(low, high))
# check for missing chop sync
if np.abs(low) < 1.0 and np.abs(high) < 1.0:
log.warning('Chop Sync signal may be missing; '
'check chop offsets')
# Add ChopR and ChopS with ChopSync, to test
# which of those are in phase with ChopSync
# Here we assume the chop signals have a
# phase difference of either 0 or 180 deg
addchpr_sync = chpr_col + chpsync_col
addchps_sync = chps_col + chpsync_col
stdval = [np.nanstd(chpr_col),
np.nanstd(addchpr_sync),
np.nanstd(chps_col),
np.nanstd(addchps_sync)]
for i, v in enumerate(stdval):
if not np.isfinite(v):
stdval[i] = 1e6
log.debug('Std.Dev for ChopR, ChopR+Sync; '
'ChopS, ChopS+Sync: '
'{}, {}; {}, {}'.format(*stdval))
# Add chopr and chops to Chop Offset
if (stdval[1] > stdval[0]) and \
(stdval[3] > stdval[2]):
self.dataout.table['Chop Offset'] = chpr + chps
log.debug('Assigning Chop Offset to SofiaChopR + '
'SofiaChopS (both are in phase '
'with sofiaChopSync)')
elif (stdval[1] > stdval[0]) and \
(stdval[3] <= stdval[2]):
self.dataout.table['Chop Offset'] = chpr - chps
log.debug('Assigning Chop Offset to '
'SofiaChopR - SofiaChopS (ChopS is out '
'of phase with sofiaChopSync)')
elif (stdval[1] <= stdval[0]) and \
(stdval[3] > stdval[2]):
self.dataout.table['Chop Offset'] = chps - chpr
log.debug('Assigning Chop Offset to '
'SofiaChopS - SofiaChopR (ChopR is out '
'of phase with sofiaChopSync)')
elif (stdval[1] <= stdval[0]) and \
(stdval[3] <= stdval[2]):
self.dataout.table['Chop Offset'] = - chpr - chps
log.debug('Assigning Chop Offset to '
'-1*(SofiaChopR + SofiaChopS) '
'(both are out of phase with sofiaChopSync)')
else:
log.warning('Using chpoffsofiars = False. Use colrename '
'to define Chop Offset with the correct column.')
if 'Chop Offset' not in self.dataout.table.names:
msg = 'Problem with chop offset assignment'
log.error(msg)
raise ValueError(msg)
# Shift all data relative to R/T Array: Loop through all columns
# (nsamp is number of samples)
if traceshift:
for colnam in self.dataout.table.names:
# Skip if it's R/T Array or
# Timestamp (these are the correct ones)
if colnam in ['Timestamp', 'R array', 'T array']:
continue
# Shift data
self.dataout.table[colnam][traceshift:] = \
self.dataout.table[colnam][:-traceshift]
# Cut the table at the start
self.dataout.table = self.dataout.table[traceshift:]
# Remove data with auxiliary (SOFIA HK data) dropouts
if removedropouts and not labmode:
mask = np.where((self.dataout.table['RA'] != 0.0)
& (self.dataout.table['Dec'] != 0.0))
ndiff = len(self.dataout.table) - len(mask[0])
if ndiff:
log.warning('Remove Dropouts: Removed %d points' % ndiff)
log.warning('Dropouts: shorted data '
'from %d to %d points' %
(len(self.dataout.table), len(mask[0])))
self.dataout.tabdata[0] = self.dataout.table[mask]