# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Time series combination pipeline step."""
import numpy as np
from astropy import log
from astropy.io import fits
from sofia_redux.instruments.hawc.steps.basehawc import clipped_mean
from sofia_redux.instruments.hawc.stepparent import StepParent
__all__ = ['StepCombine']
[docs]
class StepCombine(StepParent):
"""
Combine time series data for R+T and R-T flux samples.
This step averages all chop-subtracted samples for each nod and
HWP setting, for the R+T and R-T images. Outliers are identified
via iterative sigma-clipping (Chauvenet's criterion). Errors
are propagated from the input variance images or, optionally,
reported as the standard deviation across the time samples.
After this step, R-T images are propagated for polarimetry data
only (NHWP > 1).
This step should be run after the
`sofia_redux.instruments.hawc.steps.StepSplit` pipeline step.
It requires the following extensions: for each
HWP angle *M* and Nod *N* there should be six images: DATA R-T
HWP M NOD N, DATA R+T HWP M NOD N, VAR R-T HWP M,
NOD N, VAR R+T HWP M NOD N, VAR R HWP M NOD N, VAR T
HWP M NOD N. In addition, there must be a table containing the
rows corresponding to a given HWP and Nod, named TABLE HWP M
NOD N. Finally, there should be a single bad pixel mask image.
The output extensions are the same as the input extensions,
except that VAR R+T and VAR R-T are replaced with ERROR R+T
and ERROR R-T extensions.
"""
[docs]
def setup(self):
"""
Set parameters and metadata for the pipeline step.
Output files have PRODTYPE = 'combine', and are named with
the step abbreviation 'CMB'.
Parameters defined for this step are:
sigma : float
Reject outliers more than this many sigma from the mean.
sum_sigma : float
Reject additional R+T outliers more than this many sigma
from the mean.
use_error : bool
Set to True to use the standard deviation across the time
samples as the output error, rather than propagating input
variances.
"""
# Name of the pipeline reduction step
self.name = 'combine'
self.description = 'Combine Time Series'
# Shortcut for pipeline reduction step and identifier for
# saved file names.
self.procname = 'cmb'
# Clear Parameter list
self.paramlist = []
# Append parameters
self.paramlist.append(['sigma', 3.0,
"Reject outliers more than this many "
"sigma from the mean"])
self.paramlist.append(['sum_sigma', 4.0,
"Reject additional R+T outliers more "
"than sum_sigma from the mean"])
self.paramlist.append(['use_error', False,
"Set to True to use Chauvenet output "
"errors rather than propagating input "
"variances"])
[docs]
def comb_table(self, table, newmask):
"""
Average all rows for a table.
Parameters
----------
table : fits.FITS_rec
The table to average.
newmask : array-like of bool
Table rows to combine.
Returns
-------
BinTableHDU
The averaged table.
"""
names = table.names
formats = table.columns.formats
dims = table.columns.dims
units = table.columns.units
cols = []
outrow = self.dataout.tablemergerows(table[newmask])
for n, f, d, u in zip(names, formats, dims, units):
cols.append(fits.Column(name=n, format=f, dim=d, unit=u,
array=[outrow[n]]))
tbhdu = fits.BinTableHDU.from_columns(fits.ColDefs(cols))
return tbhdu
[docs]
def run(self):
r"""
Run the data reduction algorithm.
Because this step is single-in, single-out (SISO),
self.datain must be a DataFits object. The output
is also a DataFits object, stored in self.dataout.
This step combines the chop cycles for each HWP angle
and Nod separately (and each pixel as well). This happens first for
the R-T data, as follows:
1. For each pixel in R-T, compute the mean value and standard
deviation.
2. Reject any chop cycles more than *sigma* away from the mean.
3. Repeat 1-2 until no more chop cycles are removed.
Any masked pixels from R-T deglitching are also masked in R+T.
Additional deglitching in R+T follows the same outlier rejection as
for R-T, with the sigma cutoff specified by the *sum\_sigma*
parameter.
"""
self.dataout = self.datain.copy()
nhwp = self.datain.getheadval('nhwp')
nodpatt = self.datain.getheadval("nodpatt")
nnod = len(set(nodpatt))
chauvenet = self.getarg('sigma')
sum_sigma = self.getarg('sum_sigma')
use_error = self.getarg('use_error')
for hwp in range(nhwp):
for nod in range(nnod):
log.debug('starting hwp %d nod %d' % (hwp, nod))
rmt_data = 'DATA R-T HWP%d NOD%d' % (hwp, nod)
rpt_data = 'DATA R+T HWP%d NOD%d' % (hwp, nod)
rmt_var = 'VAR R-T HWP%d NOD%d' % (hwp, nod)
rpt_var = 'VAR R+T HWP%d NOD%d' % (hwp, nod)
r_var = 'VAR R HWP%d NOD%d' % (hwp, nod)
t_var = 'VAR T HWP%d NOD%d' % (hwp, nod)
rmt_sigma = 'ERROR R-T HWP%d NOD%d' % (hwp, nod)
rpt_sigma = 'ERROR R+T HWP%d NOD%d' % (hwp, nod)
# make sure data is float64
rmt = self.datain.imageget(rmt_data).astype(np.float64)
rpt = self.datain.imageget(rpt_data).astype(np.float64)
rptv = self.datain.imageget(rpt_var).astype(np.float64)
rv = self.datain.imageget(r_var).astype(np.float64)
tv = self.datain.imageget(t_var).astype(np.float64)
table = self.datain.tableget('TABLE HWP%d NOD%d' % (hwp, nod))
nplane, nrow, ncol = rmt.shape
mask = np.zeros_like(rmt)
# run Chauvenet's criterion to reject outliers
mean, sigma = clipped_mean(rmt, mask, sigma=chauvenet)
maskvar = rptv.copy()
maskvar[mask == 1] = np.nan
count = np.sum(1 - mask, axis=0)
rmtv = np.nansum(maskvar, axis=0) / count ** 2
num = int(np.sum(mask))
log.info('R-T deglitching: masked %d of %d '
'samples in hwp %d, nod %d' %
(num, nplane * nrow * ncol, hwp, nod))
# Note: if nhwp = 1, then it doesn't really make sense to have
# R-T data. We'll keep it for this step for consistency,
# but it will be discarded after this step.
self.dataout.imageset(mean, rmt_data)
if use_error:
self.dataout.imageset(sigma, rmt_sigma)
else:
self.dataout.imageset(np.sqrt(rmtv), rmt_sigma)
self.dataout.imagedel(rmt_var)
# run Chauvenet's criterion to find additional outliers
mean, sigma = clipped_mean(rpt, mask, sigma=sum_sigma)
rptv[mask == 1] = np.nan
count = np.sum(1 - mask, axis=0)
rptv = np.nansum(rptv, axis=0) / count ** 2
# propagate R and T variance as well --
# needed for covariance calculations later
rv[mask == 1] = np.nan
tv[mask == 1] = np.nan
rv = np.nansum(rv, axis=0) / count ** 2
tv = np.nansum(tv, axis=0) / count ** 2
log.info('R+T deglitching: masked additional '
'%d of %d samples in hwp %d, nod %d' %
(int(np.sum(mask)) - num,
nplane * nrow * ncol, hwp, nod))
self.dataout.imageset(mean, rpt_data)
if use_error:
# keep the output errors from the Chauvenet algorithm
self.dataout.imageset(sigma, rpt_sigma)
# also set the R and V variances to zero; no covariances
# available for this error propagation method
log.warning('Covariances between initial '
'Stokes parameters are '
'not propagated with Chauvenet errors')
rv *= 0.0
tv *= 0.0
else:
# otherwise, keep the propagated variances
self.dataout.imageset(np.sqrt(rptv), rpt_sigma)
# delete the old variances, add the new ones onto the end
self.dataout.imagedel(rpt_var)
self.dataout.imagedel(r_var)
self.dataout.imagedel(t_var)
self.dataout.imageset(rv, r_var)
self.dataout.imageset(tv, t_var)
tmpmask = np.ones(len(table), dtype=bool)
tbhdu = self.comb_table(table, tmpmask)
self.dataout.tableset(tbhdu.data,
'TABLE HWP%d NOD%d' % (hwp, nod),
tbhdu.header)