# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""HAWC Reduction pipeline steps"""
import gc
import importlib
import os
import warnings
from copy import deepcopy
from astropy import log
from sofia_redux.pipeline.reduction import Reduction
from sofia_redux.pipeline.gui.qad_viewer import QADViewer
from sofia_redux.pipeline.sofia.parameters.hawc_parameters \
import HAWCParameters, STEPLISTS
from sofia_redux.pipeline.sofia.sofia_exception import SOFIAImportError
try:
from sofia_redux.instruments import hawc
except ImportError:
raise SOFIAImportError('HAWC modules not installed')
from sofia_redux.instruments.hawc.datafits import DataFits
from sofia_redux.instruments.hawc.stepparent import StepParent
from sofia_redux.instruments.hawc import steps
# this import is not used here but is needed to avoid
# a numba bug on Linux systems
from sofia_redux.instruments.hawc.steps import basehawc
assert basehawc
__all__ = ['HAWCReduction']
[docs]
class HAWCReduction(Reduction):
"""
HAWC+ reduction steps.
Reduction algorithms are all defined in the
`sofia_redux.instruments.hawc` module.
This reduction object does not define a method for each pipeline
step. Rather, it calls run_drp_step each time step is called,
and uses the DRP infrastructure to determine which step to run.
Some reduction "steps" are defined in this class as super-steps,
combining several DRP steps together. This may be used for convenience
(e.g. the `make_flats` step, which saves the user from running a
separate pipeline to produce flat files in nod or nod-pol modes) or
for performance (e.g. the `demodulate` step, which loads one large
raw file at a time, so that all data is not in memory at once).
Attributes
----------
step_class : dict
All step classes available from the DRP. Keys are
pipeline step names.
step_name : dict
Names of all pipeline steps available from the DRP. Keys
are pipeline step class names.
param_lists : dict
DRP pipeline step parameter lists for all available steps.
Keys are pipeline step names.
extra_steps : dict
Locally defined pipeline steps. Keys are step names,
values are step descriptions.
override_steplist : dict
Pipeline step lists to override the ones defined in the
DRP configuration files. These lists should include any
locally defined steps that replace DRP steps.
Keys are pipeline modes, values are lists of steps to
run.
intermediate : bool
If True, the input data is an intermediate product, and
a subset of the full pipeline recipe is being run.
auxout : list of str
Auxiliary output files produced by the pipeline, intended
for display. These may include PNG images and DS9 region
files.
"""
def __init__(self):
"""Initialize the reduction object."""
super().__init__()
# descriptive attributes
self.name = "DRP"
self.instrument = "HAWC"
self.mode = "any"
self.data_keys = ['File Name', 'OBJECT',
'INSTCFG', 'INSTMODE', 'CALMODE',
'OBSTYPE', 'SPECTEL1', 'SPECTEL2',
'ALTI_STA', 'ZA_START', 'EXPTIME',
'PRODTYPE', 'FILEGPID', 'SCRIPTID']
self.pipe_name = "HAWC_DRP"
self.pipe_version = hawc.__version__.replace('.', '_')
# recipe and parameters will be defined later,
# when files are loaded
self.parameters = None
self.recipe = []
# set an environment variable used by the pipeline
# to determine relative paths to reference files
os.environ['DPS_HAWCPIPE'] = os.path.dirname(hawc.__file__)
# load steps from the hawc.steps module
self.step_class, self.step_name, \
self.processing_steps, self.param_lists = self.load_step_packs()
# describe a few extra locally-defined steps
self.extra_steps = {'make_flats': 'Make Flats',
'demodulate': 'Demodulate Chops',
'process_intcal': 'Process INTCAL'}
self.override_steplist = STEPLISTS
# don't keep a data copy in memory for initial steps
self.allow_undo = False
# raise an error if input list is empty
self.check_input = True
# reduction variables
self.override_mode = None
self.intermediate = False
self.output_directory = os.getcwd()
self.auxout = []
# ignore warnings from pipeline steps (numpy, scipy, astropy, etc.)
warnings.filterwarnings('ignore')
[docs]
def load(self, data):
"""
Load input data to make it available to reduction steps.
Data files are loaded into new DRP DataFits objects. From
these objects, the pipeline mode and steps are determined.
The configuration file in the DRP package (plus any relevant
date-specific or user overrides) is used to determine the
default parameters for all pipeline steps.
After loading, headers are read from all input data and
stored in the display_data attribute for display in the QAD
header viewer. The raw data is not displayed.
Parameters
----------
data : `list` of str
Input data file names to be loaded.
"""
# call the parent method
super().load(data)
self.input = []
self.mode = None
msgs = []
for i, datafile in enumerate(data):
df = DataFits()
df.loadhead(datafile, dataname='all')
if df.mode is None and not self.override_mode:
raise ValueError("Pipeline mode not found "
"for {}".format(datafile))
# special case: mode=intcal is used to make flats,
# but is not the primary mode, unless only intcals are
# loaded
if self.mode is None and \
(df.mode != 'intcal' or i == len(data) - 1):
# override primary mode if necessary
if self.override_mode:
override_mode = str(self.override_mode)
mode_str = f'mode_{override_mode}'
if mode_str in df.config \
or mode_str in self.override_steplist:
df.mode = override_mode
df.mergeconfig(mode=override_mode)
else:
raise ValueError(f"Pipeline override mode "
f"{override_mode} not found")
# use the primary mode to define the recipe and parameters
msgs.append("Pipeline mode: {}".format(df.mode))
self.mode = df.mode
if self.mode in self.override_steplist:
stepnames = self.override_steplist[self.mode]
else:
stepnames = \
df.config['mode_{}'.format(df.mode)]['stepslist']
self.recipe = []
for j, step in enumerate(stepnames):
# if step is locally defined, use it
if hasattr(self, step):
self.recipe.append(step)
try:
self.processing_steps[step] = \
self.extra_steps[step]
except KeyError:
self.processing_steps[step] = step
elif step in self.step_name:
# otherwise, use the version in the DRP steps
self.recipe.append(self.step_name[step])
else:
# if not found, raise an error
raise ValueError("Pipeline step {} "
"not found".format(step))
# check for intermediate file: modify recipe if necessary
try:
prodtype = df.getheadval('PRODTYPE', errmsg=False)
except KeyError:
prodtype = 'unknown'
# check for intermediate step in recipe, with
# an exception for re-running the final image
prodtype = str(prodtype).strip().lower()
if prodtype in self.recipe \
and prodtype not in ['polmap', 'imgmap']:
idx = self.recipe.index(prodtype)
# recipe is all steps after the input one
self.recipe = self.recipe[idx + 1:]
if len(self.recipe) == 0:
raise ValueError("No steps to run for "
"prodtype {}.".format(prodtype))
# set a flag to mark the recipe as modified
self.intermediate = True
# compose a message for the steps and the config files used,
# for logging after all files have loaded
msgs.append("Processing steps: {}".format(self.recipe))
msgs.append("Config files:")
for cfile in df.config_files:
msgs.append(" {}".format(cfile))
# update redux parameters from new config
self.parameters = HAWCParameters(config=df.config,
param_lists=self.param_lists,
mode=self.override_mode)
self.load_parameters()
log.info("Input: {}".format(datafile))
self.input.append(df)
# log the mode and config files
for msg in msgs:
log.info(msg)
# pass headers to viewer
self.auxout = []
self.set_display_data()
[docs]
def load_step_packs(self):
"""
Load DRP pipeline step modules.
Returns
-------
step_class : dict
All step classes available from the DRP. Keys are
pipeline step names.
step_name : dict
Names of all pipeline steps available from the DRP. Keys
are pipeline step class names.
processing_steps : dict
Display names for pipeline steps. Keys are pipeline step
names.
param_lists : dict
DRP pipeline step parameter lists for all available steps.
Keys are pipeline step names.
"""
step_class = {}
step_name = {}
step_desc = {}
param_lists = {}
for class_name in steps.__all__:
cls = getattr(steps, class_name)
if issubclass(cls, StepParent) and \
'parent' not in class_name.lower():
step = cls()
class_name = cls.__name__
step_class[step.name] = class_name
step_name[class_name] = step.name
step_desc[step.name] = step.description
param_lists[step.name] = step.paramlist
return step_class, step_name, step_desc, param_lists
[docs]
def register_viewers(self):
"""
Instantiate viewers appropriate to the reduction.
This method instantiates and returns a `QADViewer` object, used
to display data from this reduction in DS9. Data for the viewer
is stored in the `display_data` attribute.
Returns
-------
`list` of `Viewer`
All viewers supported by this reduction object.
"""
viewers = [QADViewer()]
return viewers
[docs]
def run_drp_step(self, step_name=None, use_param=True):
"""
Run a DRP pipeline step.
Pipeline steps are assumed to be implemented in a module of
the name 'step' + `step_name`, in the
`sofia_redux.instruments.hawc.steps` package.
The step class name is stored in the `step_class` attribute.
The step class is imported, instantiated, then called on
the data in the input attribute. If the data have not yet
been loaded from disk, they are loaded at this time. If the
pipeline step is a single-input step, it is called in a loop,
once per input file. If it is a multi-input step, it is called
once on all the input data.
Whether the output data is saved to disk or displayed after the step
completes is controlled by a Redux parameter, applied to each
step via the `HAWCParameters` class.
At the end of this method, output data are stored in the input
attribute, so that they are available for the next processing
step.
Parameters
----------
step_name : str, optional
Name of the step to run. If not provided, the current
step_index will be used to retrieve the step from the
recipe.
use_param : bool, optional
If False, parameters from the Redux GUI will not
be passed to the pipeline step. This is used for some
sub-steps in the locally defined reduction steps.
Raises
------
ValueError
If the pipeline step is not found in the
sofia_redux.instruments.hawc.steps package, or it cannot
be imported.
IOError
If an input file cannot be read.
"""
# check for valid input
if len(self.input) == 0:
raise ValueError("No input data to process.")
# get step name from recipe
try:
if step_name is None:
step_name = self.recipe[self.step_index]
step_mod = 'step' + step_name
step_class = self.step_class[step_name]
except (IndexError, KeyError):
raise ValueError("Pipeline step {} not found".format(step_name))
# import the step module and class
try:
redmod = importlib.import_module(
'sofia_redux.instruments.hawc.steps.{}'.format(step_mod))
redclass = getattr(redmod, step_class)
except (ImportError, AttributeError):
raise ValueError("Pipeline step {} not found".format(step_name))
# instantiate the class
step = redclass()
# get parameters and pass them to the step
parset = self.get_parameter_set()
pardict = {}
if use_param:
for pkey, pval in parset.items():
pardict[pkey] = pval['value']
# check whether output data should be saved, displayed, loaded
try:
save = parset['save']['value']
except KeyError:
save = False
try:
display = parset['display']['value']
except KeyError:
display = False
try:
load_data = parset['load']['value']
except KeyError:
load_data = True
try:
keep_auxout = parset['keep_auxout']['value']
except KeyError:
keep_auxout = False
# first load the data if necessary
for df in self.input:
if load_data:
if not df.loaded:
input_fname = df.filename
if os.path.isfile(df.filename):
fn = df.filename
elif os.path.isfile(df.rawname):
fn = df.rawname
else:
msg = 'Could not load ' \
'file {}'.format(df.filename)
log.error(msg)
raise IOError(msg)
log.debug('Loading {} into memory'.format(fn))
df.load(fn)
# reset filename to previous value -- DRP
# uses it to form output names for the next steps
df.filename = input_fname
# set filename path to output directory after load
# so that output goes to the right place
df.filename = os.path.join(
self.output_directory,
os.path.basename(df.filename))
# multi-input steps: require list input
if step.iomode.startswith('MI'):
log.info("Input: All files")
# run the step
newout = step(self.input, **pardict)
# record/save the output
if type(newout) is not list:
newout = [newout]
out = []
auxout = []
for df in newout:
if save:
df.filename = os.path.join(
self.output_directory,
os.path.basename(df.filename))
df.save()
self.record_outfile(df.filename)
out.append(df)
# check for auxiliary out files
if len(step.auxout) > 0:
auxout += step.auxout
# whitespace, for readability
log.info('')
else:
# single input steps: loop over input
out = []
auxout = []
for df in self.input:
log.info("Input: {}".format(os.path.basename(df.filename)))
# run the step
try:
out_df = step(df, **pardict)
except ValueError as err:
# if one file had an error, attempt
# to continue on with the rest
log.debug(err)
log.warning('Processing error: skipping file '
'{} in further reduction '
'steps'.format(
os.path.basename(df.filename)))
continue
# record/save the output
if save:
out_df.filename = os.path.join(
self.output_directory,
os.path.basename(out_df.filename))
out_df.save()
self.record_outfile(out_df.filename)
out.append(out_df)
# check for auxiliary out files
if len(step.auxout) > 0:
auxout += step.auxout
log.info('')
# keep the output data for the next step
self.input = out
# allow undo for the next step if possible
try:
next_parset = self.get_parameter_set(self.step_index + 1)
self.allow_undo = next_parset['undo']['value']
except (KeyError, IndexError):
# leave as is if not available
pass
# pass self.input data to viewer
if not keep_auxout:
self.auxout = []
self.set_display_data(display=display, auxout=auxout)
[docs]
def set_display_data(self, display=False, auxout=None):
"""
Pass data in self.input to display.
Parameters
----------
display : bool, optional
If True, data should be displayed in DS9. If False,
only the headers and `auxout` are displayed
auxout : `list` of str, optional
If provided, will be passed to the QADViewer along with
any headers or data the input attribute.
"""
if auxout is None:
auxout = []
else:
auxout = auxout.copy()
auxout += self.auxout[:]
ddata = []
if display:
# make an hdulist out of the datafits object,
# with no tables attached
for df in self.input:
ddata.append(df.to_hdulist(save_tables=False))
else:
# always display headers
for df in self.input:
ddata.append(df.to_header_list())
# always display aux out (images, etc.)
if len(auxout) > 0:
ddata += auxout
self.auxout = auxout
self.display_data['QADViewer'] = ddata
[docs]
def step(self, alt_method=None):
"""
Run a reduction step.
This method called the reduction step specified in the
`recipe` attribute, at the current `step_index`. If the
step index is out of range for the recipe, this method will
just return.
For the HAWC pipeline, `redux.Reduction.step` is called with
default parameters for any step that is defined locally. The
parent method is called with ``alt_method='run_drp_step'`` for
any step that is defined in the DRP package only.
Parameters
----------
alt_method : str, optional
This parameter is ignored for HAWC reductions.
Returns
-------
str
An error message if the reduction step produced any
errors; an empty string otherwise.
"""
if self.step_index >= len(self.recipe):
return 'No steps to run.'
step_name = self.recipe[self.step_index]
if hasattr(self, step_name):
return super().step()
else:
return super().step(alt_method='run_drp_step')
# define some super-steps: combinations of DRP steps
# to run as one step
[docs]
def make_flats(self):
"""
Make nod or nod-pol flats from INT_CAL files.
This step is called before the main nod or nod-pol
steps, when intcal files (FITS keyword CALMODE = INT_CAL)
are loaded along with science files (CALMODE = UNKNOWN).
It produces flat files in a location known to the science
steps, for later use in the current reduction.
This super-step calls all steps defined for 'mode_intcal'
in the DRP configuration, on all input files with mode 'intcal'.
At the end of the step, the input attribute is set to
include all non-intcal files, for processing the normal
science pipeline steps.
"""
flat_input = []
other = []
for df in self.input:
if str(df.mode).lower().strip() == 'intcal':
flat_input.append(df)
else:
other.append(df)
if len(flat_input) == 0:
# do nothing
log.warning("No flats found.")
return
# get flat steps from config file in first input file
df = flat_input[0]
flat_steps = df.config['mode_intcal']['stepslist']
# check for DMD file: modify recipe if necessary
# No other intermediate input types are allowed
# for intcal files.
try:
prodtype = df.getheadval('PRODTYPE', errmsg=False)
except KeyError:
prodtype = 'unknown'
prodtype = str(prodtype).strip().lower()
if prodtype == 'demodulate':
idx = flat_steps.index('StepDemodulate')
# recipe is all steps after the input one
flat_steps = flat_steps[idx + 1:]
if len(flat_steps) == 0:
raise ValueError("No steps to run for "
"prodtype {}.".format(prodtype))
# run the steps on the input flats
self.input = flat_input
parset = self.get_parameter_set()
save = parset['save']['value']
for i, step in enumerate(flat_steps):
try:
step_name = self.step_name[step]
except KeyError:
raise ValueError("Pipeline step {} "
"not found".format(step))
log.info("Sub-step: {}".format(self.processing_steps[step_name]))
log.info('')
# set save parameter for final step only
if i == len(flat_steps) - 1:
parset['save']['value'] = save
use_param = True
else:
parset['save']['value'] = False
use_param = False
# call the regular runner on the step
self.run_drp_step(step_name=step_name, use_param=use_param)
# flats are done;
# set aside the rest of the files for processing
self.input = other
[docs]
def process_intcal(self):
"""
Process INT_CAL files for skycal generation.
This step is an alias for the `make_flats` step,
given a separate name in order to assign different default
parameters.
"""
self.make_flats()
[docs]
def demodulate(self):
"""
Demodulate chop-nod data.
This super-step calls all steps up to and including the
StepDemodulate step defined for the current pipeline mode
in the DRP configuration. The full set of steps is called
on one file at a time, such that only one raw, un-demodulated
data file is in memory at a time. This helps improve
performance and memory usage for the early pipeline steps.
"""
if len(self.input) == 0:
raise ValueError('No input data.')
# get demod steps from config file in first input file
df = self.input[0]
# if intermediate, just run the DRP demodulate
if self.intermediate:
self.run_drp_step(step_name='demodulate')
return
try:
mode_steps = df.config['mode_{}'.format(df.mode)]['stepslist']
except KeyError:
# mode not found -- just run the DRP version of
# demodulate
self.run_drp_step(step_name='demodulate')
return
try:
dmd_idx = mode_steps.index('StepDemodulate')
except ValueError:
log.warning('No demodulate steps found.')
return
demod_steps = mode_steps[:dmd_idx + 1]
# get parameters for this step
# (copy because intermediate steps set current parameters
# to defaults)
parset = deepcopy(self.get_parameter_set())
# copy the input datafits (headers only) to a new list
input_data = self.input.copy()
self.input = []
# run all steps on each file in turn, so that only
# one raw file is in memory at a time
out = []
for df in input_data:
self.input = [df]
for i, step in enumerate(demod_steps):
try:
step_name = self.step_name[step]
except KeyError:
raise ValueError("Pipeline step {} "
"not found".format(step))
log.info("Sub-step: {}".format(
self.processing_steps[step_name]))
log.info('')
# use user parameters for final step only
if i == len(demod_steps) - 1:
log.debug('Setting user parameters '
'for {}'.format(step_name))
# deepcopy again because next file will again modify
# the current parameters on the early steps
self.set_parameter_set(deepcopy(parset))
else:
# set step defaults for Redux control parameters
try:
log.debug('Setting default parameters '
'for {}'.format(step_name))
par_function = getattr(self.parameters, step_name)
par_function(self.step_index)
except AttributeError:
log.warning('Parameters for step '
'{} not found'.format(step_name))
# call the regular runner on the step
self.run_drp_step(step_name=step_name)
# keep the final output file
out.append(self.input[0])
# trigger garbage collection after each file
# (doesn't seem to have much impact, but can't hurt)
gc.collect()
# store the demodulated data for the next step
self.input = out
self.set_display_data()