Source code for sofia_redux.pipeline.reduction

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Base class for Redux reduction objects."""

from collections import OrderedDict
import datetime as dt
import os

from astropy import log
from astropy.io import fits

import sofia_redux.pipeline
from sofia_redux.pipeline.parameters import Parameters, ParameterSet
from sofia_redux.pipeline.viewer import Viewer

__all__ = ['Reduction']


[docs] class Reduction(object): """ Reduction steps and data. Attributes ---------- name : str Name of the reduction type. instrument : str Name of the instrument supported by this reduction object. mode : str Name of the instrument mode supported by this object. pipe_name : str Name of the data reduction software package. pipe_version : str Pipeline version. param_file : str Input parameter file, if provided. parameters : Parameters A Redux Parameters object. This will hold all current values of reduction parameters, to be provided to the steps when they are run. recipe : `list` of str List of processing steps to run. Normally, the step names are method names implemented in the current reduction class. processing_steps : dict Dictionary associating pipeline step names with more descriptive display names. step_index : int The current step in the recipe, to be run next. error : `list` of str A list of error messaged generated by reduction steps, for reporting back to the user. data_keys : `list` of str A list of keywords that describe the input data, for display to the user. 'File Name' and 'File Size' keywords have general definitions applicable to most input types. Other keywords are assumed to be readable from the data structure's .header attribute, as ``data.header[key]``. data_id : `OrderedDict` Holds the association between `data_keys` and values for the currently loaded data. allow_undo : bool Flag to indicate whether an 'undo' operation should be allowed in GUI context. The undo operation is based on a serialization of this object: if the current object cannot be serialized, or if it would be too memory-expensive to do so, this flag should be set to False. raw_files : `list` of str Input file paths. out_files : `list` of str Output file paths, for files generated by the current reduction. input : `list` Input data for the next processing step. List members may be any type understood by the processing step. display_data: dict Data to be displayed by any viewers associated with the current reduction. Keys should be the names of the viewers (`Viewer.name`); values may be any data type understood by the viewer. output_directory : str Directory path to which any output files generated should be saved. """ def __init__(self): """Initialize the reduction.""" # descriptive attributes self.name = "Basic Reduction" self.instrument = "any" self.mode = "any" self.pipe_name = "Redux" self.pipe_version = sofia_redux.pipeline.__version__ # associations self.param_file = None self.parameters = Parameters() # processing description: # order of steps self.recipe = ["log_input"] # step names and descriptions self.processing_steps = {"log_input": "Log Input Files"} # step status self.step_index = 0 # error messages from pipeline steps self.error = [] # data identification keys self.data_keys = ['File Name', 'File Size'] self.data_id = OrderedDict() # specify whether undo should be allowed in # an interactive GUI (set to False if internal # data is too large to duplicate, or reduction # object cannot be pickled) self.allow_undo = True # specify whether reduction steps should raise an error # if the input list is empty self.check_input = False # data storage: # raw files as provided self.raw_files = [] # output files as written self.out_files = [] # input to the next step self.input = [] # data to display self.display_data = {} # directory to save output to self.output_directory = None @property def description(self): """str: Description of the reduction (name, instrument, and mode).""" if self.pipe_version: version = ' v' + '.'.join(str(self.pipe_version).split('_')) else: version = '' msg = f"{self.name}{version} for {self.instrument} " \ f"in {self.mode} mode" return msg
[docs] def cleanup(self): """Perform any necessary cleanup tasks before shutting down.""" pass
[docs] def edit_parameters(self, key, step_index=None, value=None, options=None, option_index=None, hidden=False): """ Edit the parameters for a reduction step. Parameters ---------- key : str Parameter key name. step_index : int, optional Index of the reduction step in the reduction recipe. If not provided, the current step_index will be used. value : str, float, int, bool, or list; optional Parameter value. options : list; optional Enumerated parameter value options. option_index : int, optional Index of the selected option, for enumerated values. """ if step_index is None: step_index = self.step_index if 0 <= step_index < len(self.recipe): param = self.parameters.current[step_index] param.set_value(key, value=value, options=options, option_index=option_index, hidden=hidden)
[docs] def get_key_value(self, header, key): """ Retrieve a value from a metadata header. Parameters ---------- header : dict-like Metadata header. May be any type for which the data is accessible as header[key]. key : str Header keyword. Returns ------- str String representation of the header value, or 'UNKNOWN' if the keyword was not found or could not be converted to a string. """ try: value = str(header[key]).strip().upper() except (KeyError, TypeError): value = 'UNKNOWN' return value
[docs] def get_parameter_set(self, step_index=None): """ Get the current parameter set for a step. Parameters ---------- step_index : int, optional Index of the step in the reduction recipe. If not provided, the current step_index will be used. Returns ------- ParameterSet The parameters for the specified step. """ if step_index is None: step_index = self.step_index if 0 <= step_index < len(self.recipe) and \ len(self.parameters.current) > 0: param = self.parameters.current[step_index] else: param = ParameterSet() return param
[docs] def load(self, data): """ Load input data to make it usable to reduction steps. This function only stores the data as is in the raw_files attribute. This function should be overridden by subclasses if more complex behavior is needed, e.g. to call the `load_fits_files` method, or set the display data variables. Parameters ---------- data : `list` of str or str Input data file names to be loaded. """ self.step_index = 0 if type(data) is not list: data = [data] self.raw_files = data self.input = [] self.out_files = [] self.display_data = {}
[docs] def load_data_id(self): """ Load the data description. Calls `read_data_key` for all keys in the `data_keys` attribute on the currently loaded data. This method populates the `data_id` attribute. """ data_id = OrderedDict() for key in self.data_keys: data_id[key] = [] for i, fname in enumerate(self.raw_files): try: # try to get header from first object in list hdul = self.input[i] header = hdul[0].header except (IndexError, AttributeError, TypeError): # try again, this time assume input object # has a .header try: hdu = self.input[i] header = hdu.header except (IndexError, AttributeError, TypeError): # try again, assume input *is* a header try: header = self.input[i] except IndexError: # give up header = None for key in self.data_keys: data_id[key].append( self.read_data_key(key, fname=fname, header=header)) self.data_id = data_id
[docs] def load_fits_files(self, data): """ Load input FITS files into memory. Input data files are stored as `astropy.io.fits.HDUList` objects in the `input` attribute. Any input files that cannot be read as FITS files will trigger a log warning, but will otherwise be ignored. Parameters ---------- data : `list` of str Input FITS file paths. """ if type(data) is not list: data = [data] self.input = [] good_files = [] for datafile in data: # skip anything that isn't a file if not os.path.isfile(datafile): log.warning("{} is not a file; skipping".format(datafile)) continue try: hdul = fits.open(datafile, mode='readonly', ignore_missing_end=True) hdul.verify('silentfix') except (OSError, ValueError, fits.verify.VerifyError): log.warning("Could not read {} as FITS; " "skipping".format(datafile)) continue # copy to a new hdul to disconnect from the file # object -- otherwise, the data is not serializable copy_hdul = fits.HDUList([hdu.copy() for hdu in hdul]) self.input.append(copy_hdul) hdul.close() good_files.append(datafile) # revise the raw file list to include only the good ones, # so that it matches the input list self.raw_files = good_files
[docs] def load_parameters(self): """ Set the parameter list for the current reduction recipe. For each reduction step, this method calls `Parameters.add_current_parameters` to set default values, then checks for a method corresponding to the step name in the Parameters object. If such a method is defined, it is called. This allows custom logic related to parameter setting to be defined for each reduction step. If the `param_file` attribute is not null, loaded parameters are overridden with any values defined in that file (via the `Parameters.from_config` method). """ self.parameters.current = [] self.parameters.stepnames = [] for idx, step in enumerate(self.recipe): # set the current set from the default self.parameters.add_current_parameters(step) # if the step has a corresponding function in the # parameter class, run it. This allows custom # behavior for parameters, based on currently # loaded data. if hasattr(self.parameters, step): par_function = getattr(self.parameters, step) par_function(idx) # override any default parameters with a defined param file if self.param_file is not None: self.parameters.from_config(self.param_file)
[docs] def update_parameters(self): """ Update parameter values for the current reduction. For each reduction step, this method checks for a method corresponding to each step name in the Parameters object. If such a method is defined, it is called. If the `param_file` attribute is not null, loaded parameters are overridden with any values defined in that file (via the `Parameters.from_config` method). Unlike the `load_parameters` method, any previously edited parameters not affected by either the step methods or the input file retain their edited values. If parameters have not yet been loaded, this function has no effect. """ if len(self.parameters.current) != len(self.recipe): # unloaded parameters return for idx, step in enumerate(self.recipe): # if the step has a corresponding function in the # parameter class, run it if hasattr(self.parameters, step): par_function = getattr(self.parameters, step) par_function(idx) # override any default parameters with a defined param file if self.param_file is not None: self.parameters.from_config(self.param_file)
[docs] def log_input(self): """Log input data at INFO level.""" # This is mostly just a dummy step for generic reductions. param = self.get_parameter_set() if 'message' in param: log.info(param.get_value('message')) if 'warning' in param: log.warning(param.get_value('warning')) if 'error' in param: log.error(param.get_value('error')) log.info("Input data files:") for datafile in self.raw_files: log.info(" {}".format(datafile))
[docs] def log_reduce(self, end=False): """ Log the pipeline name and version. This method is called at the beginning and end of the `reduce` function to describe the reduction that is about to run. It produces log messages at the INFO level. Parameters ---------- end : bool If True, log a finishing message for the end of the reduction. Otherwise, log the start up message. """ if not end: msg = "Running {}".format(self.description) log.info('') log.info('=' * len(msg)) log.info(msg) log.info("Pipeline: {} v{}".format(self.pipe_name, self.pipe_version)) log.info('=' * len(msg)) log.info('') else: msg = "Reduction complete." log.info('') log.info('=' * len(msg)) log.info(msg) log.info('=' * len(msg)) log.info('')
[docs] def log_step(self, step_name, params=None): """ Log the reduction step name, time, and parameters. This method is called at the beginning of the `step` method. Messages are logged at INFO level. Hidden parameters are not logged. Parameters ---------- step_name : str Name of the reduction step. params : ParameterSet Parameters for the reduction step. """ current_time = str(dt.datetime.now()) log.info('') log.info('-' * len(current_time)) log.info(step_name) log.info('-' * len(step_name)) log.info(current_time) if params: log.info('') log.info('Parameters:') for param in params: if params[param]['hidden']: # don't log hidden parameters continue log.info(" {} = {}".format(param, params.get_value(param))) log.info('') log.info('-' * len(current_time)) log.info('')
[docs] def read_data_key(self, key, fname=None, header=None): """ Read a data keyword from a metadata header. Parameters ---------- key : str Data keyword, not case sensitive. 'File Name' and 'File Size' have special values; all other keywords are retrieved from the `header`. fname : str, optional Name of the file being described. Its basename is returned if key is 'File Name'. header : dict-like Metadata header. Returns ------- str A string representation of the metadata value. """ if key.lower() == 'file name': return os.path.basename(str(fname)) elif key.lower() == 'file size': try: sz = os.path.getsize(fname) unit = 'B' for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if abs(sz) < 1024.0: break sz /= 1024.0 return "{:.1f} {}".format(sz, unit) except (OSError, TypeError): return 'UNKNOWN' else: return self.get_key_value(header, key)
[docs] def record_outfile(self, outfile): """ Store an output file name. Output file names are recorded in the `out_files` attribute, which is used to generate output manifests. The output file name is also logged at the INFO level. Parameters ---------- outfile : str Path to the output file. """ if type(outfile) is not list: outfile = [outfile] for fname in outfile: if fname in self.out_files: continue else: self.out_files.append(fname) log.info("Wrote output file {}".format(fname))
[docs] def register_viewers(self): """ Instantiate viewers appropriate to the reduction. This method instantiates and returns any `Viewer` objects that may be used to display data from this reduction. Data for the viewers is stored in the `display_data` attribute, but no reference to the viewers is stored in the reduction object itself. Viewers should be controlled through an `Interface` object. In this implementation, the default Viewer is returned, which only prints a message at DEBUG level. This method should be overridden if more complex viewers are required. Returns ------- list of Viewer All viewers supported by this reduction object. """ # default viewer: only prints a debug log message viewers = [Viewer()] return viewers
[docs] def reduce(self): """ Reduce the current data. All steps in the current reduction recipe is run. This function assumes that no steps have been run yet. """ self.log_reduce() for i in range(len(self.recipe)): status = self.step() if str(status).strip() != '': log.error(status) raise RuntimeError('Pipeline step error.') self.log_reduce(end=True)
[docs] def set_parameter_set(self, param, step_index=None): """ Set a parameter set for a reduction step. This function is intended for restoring parameters from a previously saved ParameterSet object. Use `edit_parameters` for an interface that does not require a ParameterSet object as input. Parameters ---------- param : ParameterSet A full set of parameters for the reduction step. step_index : int, optional The reduction step index in the current reduction recipe. If not provided, the current step_index will be used. """ if step_index is None: step_index = self.step_index if 0 <= step_index < len(self.recipe): self.parameters.current[step_index] = param
[docs] def step(self, alt_method=None): """ Run a reduction step. This method calls the reduction step specified in the `recipe` attribute, at the current `step_index`. If the step index is out of range for the recipe, this method will just return. Normally, reduction steps are defined as methods in the reduction object. Sometimes, a reduction package may have its own calling methods, rather than offering modular reduction steps. In this case, the `alt_method` parameter should be used to specify a reduction method that calls the external interface for each pipeline step, marshalling and passing data to the interface as required. Parameters ---------- alt_method : str, optional If provided, this method will be run instead of the method named by the reduction step name. This is used in cases where the reduction step is not implemented or described in the reduction object itself, but in an external package. Returns ------- str An error message if the reduction step produced any errors; an empty string otherwise. Raises ------ RuntimeError If there is no input to process, and self.check_input is True. """ # no-op if step index is out of range if not 0 <= self.step_index < len(self.recipe): return '' step_method = self.recipe[self.step_index] step_name = self.processing_steps[step_method] self.log_step(step_name, self.get_parameter_set()) # load/access steps if alt_method is not None: step_function = getattr(self, alt_method) else: step_function = getattr(self, step_method) # check for input if self.check_input and len(self.input) == 0: raise RuntimeError('No input to process.') # call the step step_function() # check for errors; return as string status = '' if self.error: if type(self.error) is not list: self.error = [self.error] status = '; '.join(self.error) self.error = [] self.step_index += 1 return status