# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Base class for Redux reduction objects."""
from collections import OrderedDict
import datetime as dt
import os
from astropy import log
from astropy.io import fits
import sofia_redux.pipeline
from sofia_redux.pipeline.parameters import Parameters, ParameterSet
from sofia_redux.pipeline.viewer import Viewer
__all__ = ['Reduction']
[docs]
class Reduction(object):
"""
Reduction steps and data.
Attributes
----------
name : str
Name of the reduction type.
instrument : str
Name of the instrument supported by this reduction object.
mode : str
Name of the instrument mode supported by this object.
pipe_name : str
Name of the data reduction software package.
pipe_version : str
Pipeline version.
param_file : str
Input parameter file, if provided.
parameters : Parameters
A Redux Parameters object. This will hold all current values
of reduction parameters, to be provided to the steps when they
are run.
recipe : `list` of str
List of processing steps to run. Normally, the step names
are method names implemented in the current reduction class.
processing_steps : dict
Dictionary associating pipeline step names with more
descriptive display names.
step_index : int
The current step in the recipe, to be run next.
error : `list` of str
A list of error messaged generated by reduction steps,
for reporting back to the user.
data_keys : `list` of str
A list of keywords that describe the input data, for display
to the user. 'File Name' and 'File Size' keywords have
general definitions applicable to most input types.
Other keywords are assumed to be readable from the data structure's
.header attribute, as ``data.header[key]``.
data_id : `OrderedDict`
Holds the association between `data_keys` and values for the
currently loaded data.
allow_undo : bool
Flag to indicate whether an 'undo' operation should be allowed
in GUI context. The undo operation is based on a serialization
of this object: if the current object cannot be serialized, or
if it would be too memory-expensive to do so, this flag should
be set to False.
raw_files : `list` of str
Input file paths.
out_files : `list` of str
Output file paths, for files generated by the current reduction.
input : `list`
Input data for the next processing step. List members may be
any type understood by the processing step.
display_data: dict
Data to be displayed by any viewers associated with the current
reduction. Keys should be the names of the viewers (`Viewer.name`);
values may be any data type understood by the viewer.
output_directory : str
Directory path to which any output files generated should be saved.
"""
def __init__(self):
"""Initialize the reduction."""
# descriptive attributes
self.name = "Basic Reduction"
self.instrument = "any"
self.mode = "any"
self.pipe_name = "Redux"
self.pipe_version = sofia_redux.pipeline.__version__
# associations
self.param_file = None
self.parameters = Parameters()
# processing description:
# order of steps
self.recipe = ["log_input"]
# step names and descriptions
self.processing_steps = {"log_input": "Log Input Files"}
# step status
self.step_index = 0
# error messages from pipeline steps
self.error = []
# data identification keys
self.data_keys = ['File Name', 'File Size']
self.data_id = OrderedDict()
# specify whether undo should be allowed in
# an interactive GUI (set to False if internal
# data is too large to duplicate, or reduction
# object cannot be pickled)
self.allow_undo = True
# specify whether reduction steps should raise an error
# if the input list is empty
self.check_input = False
# data storage:
# raw files as provided
self.raw_files = []
# output files as written
self.out_files = []
# input to the next step
self.input = []
# data to display
self.display_data = {}
# directory to save output to
self.output_directory = None
@property
def description(self):
"""str: Description of the reduction (name, instrument, and mode)."""
if self.pipe_version:
version = ' v' + '.'.join(str(self.pipe_version).split('_'))
else:
version = ''
msg = f"{self.name}{version} for {self.instrument} " \
f"in {self.mode} mode"
return msg
[docs]
def cleanup(self):
"""Perform any necessary cleanup tasks before shutting down."""
pass
[docs]
def edit_parameters(self, key, step_index=None,
value=None, options=None,
option_index=None, hidden=False):
"""
Edit the parameters for a reduction step.
Parameters
----------
key : str
Parameter key name.
step_index : int, optional
Index of the reduction step in the reduction recipe.
If not provided, the current step_index will be used.
value : str, float, int, bool, or list; optional
Parameter value.
options : list; optional
Enumerated parameter value options.
option_index : int, optional
Index of the selected option, for enumerated values.
"""
if step_index is None:
step_index = self.step_index
if 0 <= step_index < len(self.recipe):
param = self.parameters.current[step_index]
param.set_value(key, value=value,
options=options,
option_index=option_index,
hidden=hidden)
[docs]
def get_key_value(self, header, key):
"""
Retrieve a value from a metadata header.
Parameters
----------
header : dict-like
Metadata header. May be any type for which the data is
accessible as header[key].
key : str
Header keyword.
Returns
-------
str
String representation of the header value, or 'UNKNOWN' if
the keyword was not found or could not be converted to a
string.
"""
try:
value = str(header[key]).strip().upper()
except (KeyError, TypeError):
value = 'UNKNOWN'
return value
[docs]
def get_parameter_set(self, step_index=None):
"""
Get the current parameter set for a step.
Parameters
----------
step_index : int, optional
Index of the step in the reduction recipe. If not provided,
the current step_index will be used.
Returns
-------
ParameterSet
The parameters for the specified step.
"""
if step_index is None:
step_index = self.step_index
if 0 <= step_index < len(self.recipe) and \
len(self.parameters.current) > 0:
param = self.parameters.current[step_index]
else:
param = ParameterSet()
return param
[docs]
def load(self, data):
"""
Load input data to make it usable to reduction steps.
This function only stores the data as is in the raw_files
attribute. This function should be overridden by subclasses
if more complex behavior is needed, e.g. to call the
`load_fits_files` method, or set the display data variables.
Parameters
----------
data : `list` of str or str
Input data file names to be loaded.
"""
self.step_index = 0
if type(data) is not list:
data = [data]
self.raw_files = data
self.input = []
self.out_files = []
self.display_data = {}
[docs]
def load_data_id(self):
"""
Load the data description.
Calls `read_data_key` for all keys in the `data_keys` attribute
on the currently loaded data. This method populates the
`data_id` attribute.
"""
data_id = OrderedDict()
for key in self.data_keys:
data_id[key] = []
for i, fname in enumerate(self.raw_files):
try:
# try to get header from first object in list
hdul = self.input[i]
header = hdul[0].header
except (IndexError, AttributeError, TypeError):
# try again, this time assume input object
# has a .header
try:
hdu = self.input[i]
header = hdu.header
except (IndexError, AttributeError, TypeError):
# try again, assume input *is* a header
try:
header = self.input[i]
except IndexError:
# give up
header = None
for key in self.data_keys:
data_id[key].append(
self.read_data_key(key, fname=fname, header=header))
self.data_id = data_id
[docs]
def load_fits_files(self, data):
"""
Load input FITS files into memory.
Input data files are stored as `astropy.io.fits.HDUList`
objects in the `input` attribute. Any input files that
cannot be read as FITS files will trigger a log warning,
but will otherwise be ignored.
Parameters
----------
data : `list` of str
Input FITS file paths.
"""
if type(data) is not list:
data = [data]
self.input = []
good_files = []
for datafile in data:
# skip anything that isn't a file
if not os.path.isfile(datafile):
log.warning("{} is not a file; skipping".format(datafile))
continue
try:
hdul = fits.open(datafile, mode='readonly',
ignore_missing_end=True)
hdul.verify('silentfix')
except (OSError, ValueError, fits.verify.VerifyError):
log.warning("Could not read {} as FITS; "
"skipping".format(datafile))
continue
# copy to a new hdul to disconnect from the file
# object -- otherwise, the data is not serializable
copy_hdul = fits.HDUList([hdu.copy() for hdu in hdul])
self.input.append(copy_hdul)
hdul.close()
good_files.append(datafile)
# revise the raw file list to include only the good ones,
# so that it matches the input list
self.raw_files = good_files
[docs]
def load_parameters(self):
"""
Set the parameter list for the current reduction recipe.
For each reduction step, this method calls
`Parameters.add_current_parameters` to set default values,
then checks for a method corresponding to the step name
in the Parameters object. If such a method is defined, it
is called. This allows custom logic related to parameter
setting to be defined for each reduction step.
If the `param_file` attribute is not null, loaded parameters
are overridden with any values defined in that file (via the
`Parameters.from_config` method).
"""
self.parameters.current = []
self.parameters.stepnames = []
for idx, step in enumerate(self.recipe):
# set the current set from the default
self.parameters.add_current_parameters(step)
# if the step has a corresponding function in the
# parameter class, run it. This allows custom
# behavior for parameters, based on currently
# loaded data.
if hasattr(self.parameters, step):
par_function = getattr(self.parameters, step)
par_function(idx)
# override any default parameters with a defined param file
if self.param_file is not None:
self.parameters.from_config(self.param_file)
[docs]
def update_parameters(self):
"""
Update parameter values for the current reduction.
For each reduction step, this method checks for a method
corresponding to each step name in the Parameters object.
If such a method is defined, it is called.
If the `param_file` attribute is not null, loaded parameters
are overridden with any values defined in that file (via the
`Parameters.from_config` method).
Unlike the `load_parameters` method, any previously edited parameters
not affected by either the step methods or the input file retain
their edited values. If parameters have not yet been loaded, this
function has no effect.
"""
if len(self.parameters.current) != len(self.recipe):
# unloaded parameters
return
for idx, step in enumerate(self.recipe):
# if the step has a corresponding function in the
# parameter class, run it
if hasattr(self.parameters, step):
par_function = getattr(self.parameters, step)
par_function(idx)
# override any default parameters with a defined param file
if self.param_file is not None:
self.parameters.from_config(self.param_file)
[docs]
def log_reduce(self, end=False):
"""
Log the pipeline name and version.
This method is called at the beginning and end of the `reduce`
function to describe the reduction that is about to run. It
produces log messages at the INFO level.
Parameters
----------
end : bool
If True, log a finishing message for the end of the reduction.
Otherwise, log the start up message.
"""
if not end:
msg = "Running {}".format(self.description)
log.info('')
log.info('=' * len(msg))
log.info(msg)
log.info("Pipeline: {} v{}".format(self.pipe_name,
self.pipe_version))
log.info('=' * len(msg))
log.info('')
else:
msg = "Reduction complete."
log.info('')
log.info('=' * len(msg))
log.info(msg)
log.info('=' * len(msg))
log.info('')
[docs]
def log_step(self, step_name, params=None):
"""
Log the reduction step name, time, and parameters.
This method is called at the beginning of the `step`
method. Messages are logged at INFO level. Hidden
parameters are not logged.
Parameters
----------
step_name : str
Name of the reduction step.
params : ParameterSet
Parameters for the reduction step.
"""
current_time = str(dt.datetime.now())
log.info('')
log.info('-' * len(current_time))
log.info(step_name)
log.info('-' * len(step_name))
log.info(current_time)
if params:
log.info('')
log.info('Parameters:')
for param in params:
if params[param]['hidden']:
# don't log hidden parameters
continue
log.info(" {} = {}".format(param,
params.get_value(param)))
log.info('')
log.info('-' * len(current_time))
log.info('')
[docs]
def read_data_key(self, key, fname=None, header=None):
"""
Read a data keyword from a metadata header.
Parameters
----------
key : str
Data keyword, not case sensitive. 'File Name' and
'File Size' have special values; all other keywords are
retrieved from the `header`.
fname : str, optional
Name of the file being described. Its basename is returned
if key is 'File Name'.
header : dict-like
Metadata header.
Returns
-------
str
A string representation of the metadata value.
"""
if key.lower() == 'file name':
return os.path.basename(str(fname))
elif key.lower() == 'file size':
try:
sz = os.path.getsize(fname)
unit = 'B'
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if abs(sz) < 1024.0:
break
sz /= 1024.0
return "{:.1f} {}".format(sz, unit)
except (OSError, TypeError):
return 'UNKNOWN'
else:
return self.get_key_value(header, key)
[docs]
def record_outfile(self, outfile):
"""
Store an output file name.
Output file names are recorded in the `out_files`
attribute, which is used to generate output manifests.
The output file name is also logged at the INFO level.
Parameters
----------
outfile : str
Path to the output file.
"""
if type(outfile) is not list:
outfile = [outfile]
for fname in outfile:
if fname in self.out_files:
continue
else:
self.out_files.append(fname)
log.info("Wrote output file {}".format(fname))
[docs]
def register_viewers(self):
"""
Instantiate viewers appropriate to the reduction.
This method instantiates and returns any `Viewer` objects that may
be used to display data from this reduction. Data for the viewers
is stored in the `display_data` attribute, but no reference to
the viewers is stored in the reduction object itself. Viewers
should be controlled through an `Interface` object.
In this implementation, the default Viewer is returned,
which only prints a message at DEBUG level. This method should
be overridden if more complex viewers are required.
Returns
-------
list of Viewer
All viewers supported by this reduction object.
"""
# default viewer: only prints a debug log message
viewers = [Viewer()]
return viewers
[docs]
def reduce(self):
"""
Reduce the current data.
All steps in the current reduction recipe is run. This
function assumes that no steps have been run yet.
"""
self.log_reduce()
for i in range(len(self.recipe)):
status = self.step()
if str(status).strip() != '':
log.error(status)
raise RuntimeError('Pipeline step error.')
self.log_reduce(end=True)
[docs]
def set_parameter_set(self, param, step_index=None):
"""
Set a parameter set for a reduction step.
This function is intended for restoring parameters
from a previously saved ParameterSet object. Use
`edit_parameters` for an interface that does not require
a ParameterSet object as input.
Parameters
----------
param : ParameterSet
A full set of parameters for the reduction step.
step_index : int, optional
The reduction step index in the current reduction
recipe. If not provided, the current step_index
will be used.
"""
if step_index is None:
step_index = self.step_index
if 0 <= step_index < len(self.recipe):
self.parameters.current[step_index] = param
[docs]
def step(self, alt_method=None):
"""
Run a reduction step.
This method calls the reduction step specified in the
`recipe` attribute, at the current `step_index`. If the
step index is out of range for the recipe, this method will
just return.
Normally, reduction steps are defined as methods in the
reduction object. Sometimes, a reduction package may have its
own calling methods, rather than offering modular reduction
steps. In this case, the `alt_method` parameter should be used
to specify a reduction method that calls the external interface
for each pipeline step, marshalling and passing data to the
interface as required.
Parameters
----------
alt_method : str, optional
If provided, this method will be run instead
of the method named by the reduction step name. This
is used in cases where the reduction step is not
implemented or described in the reduction object itself,
but in an external package.
Returns
-------
str
An error message if the reduction step produced any
errors; an empty string otherwise.
Raises
------
RuntimeError
If there is no input to process, and self.check_input is True.
"""
# no-op if step index is out of range
if not 0 <= self.step_index < len(self.recipe):
return ''
step_method = self.recipe[self.step_index]
step_name = self.processing_steps[step_method]
self.log_step(step_name, self.get_parameter_set())
# load/access steps
if alt_method is not None:
step_function = getattr(self, alt_method)
else:
step_function = getattr(self, step_method)
# check for input
if self.check_input and len(self.input) == 0:
raise RuntimeError('No input to process.')
# call the step
step_function()
# check for errors; return as string
status = ''
if self.error:
if type(self.error) is not list:
self.error = [self.error]
status = '; '.join(self.error)
self.error = []
self.step_index += 1
return status