# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Scan image reconstruction pipeline step."""
import os
import warnings
from astropy import log
from astropy.io import fits
import numpy as np
from sofia_redux.instruments.hawc.datafits import DataFits
from sofia_redux.instruments.hawc.stepmiparent import StepMIParent
from sofia_redux.instruments.hawc.stepmoparent import StepMOParent
from sofia_redux.scan.reduction.reduction import Reduction
__all__ = ['StepScanMap']
[docs]
class StepScanMap(StepMOParent):
"""
Reconstruct an image from scanning data.
Input data for this step are raw HAWC data FITS files.
Output from this step is typically a single output DataFits,
with 4 image planes (HDUs): SIGNAL, EXPOSURE, NOISE and S/N.
Additionally, binary table HDUs (index 4+) are added for each input scan
(controlled by the write.scandata option), which contains original
header information for each scan, per-scan reduction summary data,
and a binary table of scan data (e.g. pixel gains, weights,
per-pixel spectral filter profile, pixel noise spectra etc.).
"""
[docs]
def setup(self):
"""
Set parameters and metadata for the pipeline step.
Output files have PRODTYPE = 'scanmap', and are named with
the step abbreviation 'SMP'.
Parameters defined for this step are:
noout : bool
If set, FITS output is not expected and will not be
loaded. This may be set for off-nominal reductions, like
skydip observations.
subarray : str
Specify the subarrays to use in the reduction. Default is
all available ('R0,T0,R1').
use_frames : str
Frames to use from the reduction. Specify a particular
range, as '400:-400' or '400:1000'
grid : float
Output pixel scale. If not set, default values from scan map
configuration will be used.
deep : bool
If set, faint point-like emission is prioritized.
faint : bool
If set, faint emission (point-like or extended) is prioritized.
extended : bool
If set, extended emission is prioritized.
This may increase noise on large angular scales.
options : str
Additional options to pass to the scan map algorithm.
"""
# Name of the pipeline reduction step
self.name = 'scanmap'
self.description = 'Construct Scan Map'
# Shortcut for pipeline reduction step and identifier for
# saved file names.
self.procname = 'smp'
# Clear Parameter list
self.paramlist = []
# Append parameters
self.paramlist.append(['noout', False,
'No FITS output is expected'])
self.paramlist.append(['subarray', 'R0,T0,R1',
"Subarrays to use in reduction "
"('' for default)"])
self.paramlist.append(['use_frames', '',
"Frames to use from reduction. "
"Specify a particular range, as "
"'400:-400', or '400:1000'."])
self.paramlist.append(['grid', '',
"Output pixel scale, if not default. "
"Specify in arcsec."])
self.paramlist.append(['deep', False,
'Attempt to recover faint point-like '
'emission'])
self.paramlist.append(['faint', False,
'Attempt to recover faint emission '
'(point-like or extended)'])
self.paramlist.append(['extended', False,
'Attempt to recover extended emission '
'(may increase noise)'])
self.paramlist.append(['options', '',
'Additional options for scan reconstruction'])
[docs]
@staticmethod
def merge_scan_hdr(df, scnhead, datain):
"""
Merge headers from scan output files.
The header in the `df` is updated in place.
Parameters
----------
df : DataFits
The output data to update.
scnhead : fits.Header
The header from the scan output file.
datain : list of DataFits
The list of input data to merge into the output data.
"""
# Get card lists and add scan keywords
othercards = scnhead.cards
for cardi, card in enumerate(othercards):
kwd = card.keyword
# skip comments
if kwd == 'COMMENT':
continue
# otherwise add key normally
try:
if len(card.comment) > 0:
df.setheadval(kwd, card.value, card.comment)
else:
df.setheadval(kwd, card.value)
except (KeyError, ValueError, TypeError):
log.warning("Unable to add FITS keyword %s=%s" %
(kwd, card.value))
# set ASSC_AOR and ASSC_MSN value in input header, before merging
try:
df.setheadval('ASSC_AOR',
df.getheadval('AOR_ID'),
'Associated AORs')
except KeyError:
pass
try:
df.setheadval('ASSC_MSN',
df.getheadval('MISSN-ID'),
'Associated Mission IDs')
except KeyError:
pass
# update header keywords from datain list
for i, df_in in enumerate(datain):
try:
df_in.setheadval('ASSC_AOR', df_in.getheadval('AOR_ID'),
'Associated AORs')
except KeyError:
pass
try:
df_in.setheadval('ASSC_MSN', df_in.getheadval('MISSN-ID'),
'Associated Mission IDs')
except KeyError:
pass
# remove any previous history for less clutter
df_in.delheadval('HISTORY')
df.mergehead(df_in)
# Remove an engineering keyword if necessary
df.delheadval('XPADDING')
[docs]
@staticmethod
def check_use_frames(datain, use_frames):
if use_frames == '':
return use_frames
try:
nframes = np.array([float(df.header.get('EXPTIME'))
* float(df.header.get('SMPLFREQ'))
for df in datain])
except (ValueError, TypeError):
log.warning('Could not read EXPTIME or SMPLFREQ from header.')
return use_frames
else:
try:
ranges = use_frames.split(',')
for r in ranges:
s = r.split(':')
if len(s) > 1:
if s[0].strip() not in ['', '*']:
nframes -= abs(int(s[0]))
if s[1].strip() not in ['', '*']:
nframes -= abs(int(s[1]))
elif s[0].strip() not in ['', '*']:
nframes -= abs(int(s[0]))
if np.any(nframes < 10):
log.warning('Frame range parameter is out of '
'bounds for this scan set. Turning off '
'frame clipping.')
use_frames = ''
except (ValueError, TypeError):
log.warning('Bad use_frames parameter. Turning off '
'frame clipping.')
use_frames = ''
return use_frames
[docs]
def run(self):
"""
Run the data reduction algorithm.
This step is run as a multi-in multi-out (MIMO) step:
self.datain should be a list of DataFits, and output
will also be a list of DataFits, stored in self.dataout.
The process is:
1. Assemble the scan map options from input parameters.
2. Call the iterative scan map reconstructor.
3. Retrieve output and update headers as necessary.
"""
# collect input options in dict
kwargs = {'write': {'source': False}}
options = {}
# output path
outpath = os.path.dirname(self.datain[0].filename)
kwargs['outpath'] = outpath
# output basename
outname = os.path.basename(self.datain[0].filenamebegin
+ self.procname.upper()
+ self.datain[0].filenameend)
kwargs['name'] = outname
# get options
subarr = str(self.getarg('subarray')).strip()
if subarr != '':
kwargs['subarray'] = subarr
kwargs['lock'] = 'subarray'
# add additional top-level parameters
for arg in ['deep', 'faint', 'extended']:
if self.getarg(arg):
kwargs[arg] = True
# add frame clipping if necessary
use_frames = str(self.getarg('use_frames')).strip()
use_frames = self.check_use_frames(self.datain, use_frames)
if use_frames != '':
kwargs['frames'] = use_frames
# set the output pixel scale if supplied
try:
grid = float(str(self.getarg('grid')).strip())
except (ValueError, TypeError):
grid = None
if grid is not None:
kwargs['grid'] = grid
# add additional options from parameters at end,
# so they can override any defaults set by the above
additional = str(self.getarg('options')).strip()
if additional != '':
all_val = additional.split()
for val in all_val:
try:
k, v = val.split('=')
except (IndexError, ValueError, TypeError):
pass
else:
options[k] = v
kwargs['options'] = options
# get input file names
infiles = []
for datain in self.datain:
if os.path.exists(datain.filename):
infiles.append(datain.filename)
else:
rawname = datain.rawname
if os.path.exists(rawname):
infiles.append(rawname)
# log input
log.debug(f'All provided options: {kwargs}')
log.debug(f'Input files: {infiles}')
# run the reduction
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
reduction = Reduction('hawc_plus')
output_hdul = reduction.run(infiles, **kwargs)
# If no output requested
if self.getarg('noout'):
log.debug('Not loading output - copy from input')
# copy input
self.dataout = self.datain
# SOFIA keywords are not updated since this output is not
# intended to be saved
return
# read output file(s)
if output_hdul is not None and isinstance(output_hdul, fits.HDUList):
# single output file
df = DataFits(config=self.config)
df.filename = self.datain[-1].filename
df.load(hdul=output_hdul)
# store the output in dataout
self.dataout = df
else:
if output_hdul is None:
log.error('No output created.')
raise ValueError('No output created.')
else:
log.error('Unexpected output for scan imaging mode. '
'Check INSTCFG.')
raise ValueError("Expected output not found.")
# Copy and save headers
scnhead = self.dataout.header.copy()
self.dataout.header = self.datain[0].header.copy()
self.merge_scan_hdr(self.dataout, scnhead, self.datain)
# Update SOFIA mandated keywords (since this is first pipe step)
obsid = 'P_' + self.datain[0].getheadval('OBS_ID')
self.dataout.setheadval('OBS_ID', obsid)
self.dataout.setheadval('PIPELINE', 'HAWC_DRP')
# Set BUNIT correctly in S/N and exposure extensions
if 'S/N' in self.dataout.imgnames and \
'EXPOSURE' in self.dataout.imgnames:
self.dataout.setheadval('BUNIT', '',
comment='Data units',
dataname='S/N')
self.dataout.setheadval('BUNIT', 's',
comment='Data units',
dataname='EXPOSURE')
# Set output list for MIMO compatibility
self.dataout = [self.dataout]
[docs]
def runend(self, data):
"""
Clean up after a pipeline step.
Override the default method to call MISO-style header
updating, in the case where a single file is returned.
Parameters
----------
data : list of DataFits or DataText
Output data to update.
"""
# if one output file, use the MISO update header;
# if multiple, use MIMO
if len(data) == 1:
log.debug('Updating headers for single-output format')
self.iomode = 'MISO'
StepMIParent.runend(self, data[0])
else:
log.debug('Updating headers for multiple-output format')
self.iomode = 'MIMO'
StepMOParent.runend(self, data)