# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""EXES Reduction pipeline steps"""
import copy
import os
import re
import time
import warnings
from astropy import log
from astropy import constants as const
from astropy.io import fits
import numpy as np
from sofia_redux.pipeline.sofia.sofia_exception import SOFIAImportError
try:
import sofia_redux.instruments.exes as ex
except ImportError:
raise SOFIAImportError('EXES modules not installed')
from sofia_redux.instruments.exes.mergehdr import mergehdr
from sofia_redux.instruments.exes.utils import parse_central_wavenumber
from sofia_redux.pipeline.gui.qad_viewer import QADViewer
from sofia_redux.pipeline.gui.matplotlib_viewer import MatplotlibViewer
from sofia_redux.pipeline.reduction import Reduction
from sofia_redux.pipeline.sofia.parameters.exes_parameters \
import EXESParameters
from sofia_redux.pipeline.sofia.sofia_utilities import \
parse_bg, parse_apertures
from sofia_redux.toolkit.utilities.fits import \
hdinsert, getheader, set_log_level, gethdul
from sofia_redux.toolkit.image.adjust import rotate90, unrotate90
from sofia_redux.toolkit.image.combine import combine_images
from sofia_redux.visualization.redux_viewer import EyeViewer
__all__ = ['EXESReduction']
[docs]
class EXESReduction(Reduction):
"""
EXES reduction steps.
Primary image reduction algorithms are defined in the EXES
package (`sofia_redux.instruments.exes`). Spectroscopy-related
algorithms are pulled from the `sofia_redux.spectroscopy` package,
and some utilities and display tools come from the `sofia_redux.toolkit`,
`sofia_redux.calibration`, and `sofia_redux.visualization` packages.
This reduction object defines a recipe for data reduction and a
method for each pipeline step that calls the appropriate algorithm
from its source packages.
Attributes
----------
prodtype_map : dict
Maps the pipeline step to a product type, to assign
to the PRODTYPE header key. Keys are pipeline step function
names.
prodnames : dict
3-letter file type code, to assign to the output of a pipeline
step. Keys are the product types (as defined in prodtype_map).
step_map : dict
Inverse of the prodtype_map, for looking up pipeline
step names from product types. Keys are the product types.
prodtypes : list
List of product types, corresponding to the currently
loaded recipe. This list is populated whenever the recipe
attribute is set.
default_recipe : list
Processing recipe used for standard science processing.
default_prodtype_map : list
Product type map used for standard science processing.
default_prodnames : list
Product names used for standard science processing.
default_step_map : list
Step name map used for standard science processing.
sky_spectrum : bool
Flag to indicate that data should be reduced as a sky spectrum
extraction, instead of standard science processing.
sky_prodtype_map : list
Alternate product types to use for sky spectrum extraction.
sky_prodnames : list
Alternate product names to use for sky spectrum extraction.
sky_step_map : list
Alternate step name map to use for sky spectrum extraction.
wcs_keys : list
List of header keywords used for tracking and propagating the
spectral world coordinate system.
spec1d_prodtype : list
List of product types currently and historically used to designate
1D spectral products, used for display purposes and
backwards-compatible handling.
"""
def __init__(self):
"""Initialize the reduction object."""
super().__init__()
# descriptive attributes
self.name = 'Redux'
self.instrument = 'EXES'
self.mode = 'Spectroscopy'
self.data_keys = ['File Name', 'OBJECT', 'OBSTYPE',
'AOR_ID', 'MISSN-ID', 'DATE-OBS',
'INSTCFG', 'INSTMODE',
'SPECTEL1', 'SPECTEL2', 'WAVENO0',
'NODBEAM', 'EXPTIME']
self.pipe_name = 'EXES_REDUX'
self.pipe_version = ex.__version__.replace('.', '_')
# parameters will be instantiated on load
self.parameters = None
# product type definitions for EXES steps
self.prodtype_map = {
'load_data': 'raw',
'coadd_readouts': 'readouts_coadded',
'make_flat': 'flat_appended',
'despike': 'despiked',
'debounce': 'debounced',
'subtract_nods': 'nods_subtracted',
'flat_correct': 'flat_corrected',
'clean_badpix': 'cleaned',
'undistort': 'undistorted',
'correct_calibration': 'calibration_corrected',
'coadd_pairs': 'coadded',
'convert_units': 'calibrated',
'make_profiles': 'rectified_image',
'locate_apertures': 'apertures_located',
'set_apertures': 'apertures_set',
'subtract_background': 'background_subtracted',
'extract_spectra': 'spectra',
'combine_spectra': 'coadded_spectrum',
'refine_wavecal': 'wavecal_refined',
'merge_orders': 'orders_merged',
'specmap': 'specmap'}
self.prodnames = {
'raw': 'RAW',
'readouts_coadded': 'RDC',
'flat_appended': 'FTA',
'despiked': 'DSP',
'debounced': 'DBC',
'nods_subtracted': 'NSB',
'flat_corrected': 'FTD',
'cleaned': 'CLN',
'undistorted': 'UND',
'calibration_corrected': 'CCR',
'coadded': 'COA',
'calibrated': 'CAL',
'rectified_image': 'RIM',
'apertures_located': 'LOC',
'apertures_set': 'APS',
'background_subtracted': 'BGS',
'spectra': 'SPM',
'coadded_spectrum': 'COM',
'wavecal_refined': 'WRF',
'orders_merged': 'MRM',
'specmap': 'SMP'}
# invert the map for quick lookup of step from type
self.step_map = {v: k for k, v in self.prodtype_map.items()}
# This will be populated when the recipe is set
self.prodtypes = list()
# Default recipe and step names
self.recipe = [
'load_data', 'coadd_readouts', 'make_flat', 'despike',
'debounce', 'subtract_nods', 'flat_correct',
'clean_badpix', 'undistort', 'correct_calibration',
'coadd_pairs', 'convert_units',
'make_profiles', 'locate_apertures',
'set_apertures', 'subtract_background',
'extract_spectra', 'combine_spectra',
'refine_wavecal', 'merge_orders', 'specmap']
self.processing_steps = {
'load_data': 'Load Data',
'coadd_readouts': 'Coadd Readouts',
'make_flat': 'Make Flat',
'despike': 'Despike',
'debounce': 'Debounce',
'subtract_nods': 'Subtract Nods',
'flat_correct': 'Flat Correct',
'clean_badpix': 'Clean Bad Pixels',
'undistort': 'Undistort',
'correct_calibration': 'Correct Calibration',
'coadd_pairs': 'Coadd Pairs',
'convert_units': 'Convert Units',
'make_profiles': 'Make Profiles',
'locate_apertures': 'Locate Apertures',
'set_apertures': 'Set Apertures',
'subtract_background': 'Subtract Background',
'extract_spectra': 'Extract Spectra',
'combine_spectra': 'Combine Spectra',
'refine_wavecal': 'Refine Wavecal',
'merge_orders': 'Merge Orders',
'specmap': 'Make Spectral Map'}
# reduction information
self.output_directory = os.getcwd()
self.filenum = list()
# keep a copy of the default recipe and products, for later restoration
self.default_recipe = self.recipe.copy()
self.default_prodtype_map = self.prodtype_map.copy()
self.default_prodnames = self.prodnames.copy()
self.default_step_map = self.step_map.copy()
# store some alternate product types and codes, for sky spectra
self.sky_spectrum = False
self.sky_prodtype_map = {
'subtract_nods': 'sky_nods_subtracted',
'flat_correct': 'sky_flat_corrected',
'clean_badpix': 'sky_cleaned',
'undistort': 'sky_undistorted',
'correct_calibration': 'sky_calibration_corrected',
'coadd_pairs': 'sky_coadded',
'convert_units': 'sky_calibrated',
'make_profiles': 'sky_rectified_image',
'locate_apertures': 'sky_apertures_located',
'set_apertures': 'sky_apertures_set',
'subtract_background': 'sky_background_subtracted',
'extract_spectra': 'sky_spectra',
'combine_spectra': 'sky_coadded_spectrum',
'refine_wavecal': 'sky_wavecal_refined',
'merge_orders': 'sky_orders_merged',
'specmap': 'sky_specmap'}
self.sky_prodnames = {
'sky_nods_subtracted': 'SNS',
'sky_flat_corrected': 'SFT',
'sky_cleaned': 'SCN',
'sky_undistorted': 'SUN',
'sky_calibration_corrected': 'SCR',
'sky_coadded': 'SCO',
'sky_calibrated': 'SCL',
'sky_rectified_image': 'SRM',
'sky_apertures_located': 'SLC',
'sky_apertures_set': 'SAP',
'sky_background_subtracted': 'SBG',
'sky_spectra': 'SSM',
'sky_coadded_spectrum': 'SCM',
'sky_wavecal_refined': 'SWR',
'sky_orders_merged': 'SMM',
'sky_specmap': 'SSS'}
self.sky_step_map = {v: k for k, v in self.sky_prodtype_map.items()}
# store some WCS keys used for tracking and propagating
# the 2D spectral WCS
self.wcs_keys = [
'CTYPE1', 'CTYPE2', 'CUNIT1', 'CUNIT2',
'CRPIX1', 'CRPIX2', 'CRVAL1', 'CRVAL2',
'CDELT1', 'CDELT2', 'CROTA2', 'SPECSYS']
# store some 1D spectrum types, for display and
# historical accommodation
self.spec1d_prodtype = [
'spectra_1d', 'merged_spectrum_1d', 'calibrated_spectrum_1d',
'combined_spectrum', 'combined_spectrum_1d',
'wavecal_refined_1d', 'orders_merged_1d',
'sky_spectra_1d', 'sky_merged_spectrum_1d',
'sky_calibrated_spectrum_1d', 'sky_combined_spectrum',
'sky_combined_spectrum_1d', 'sky_wavecal_refined_1d',
'sky_orders_merged_1d',
'spec', 'combspec', 'calspec', 'mrgspec', 'mrgordspec',
'sky_spec', 'sky_combspec', 'sky_mrgordspec']
def __setattr__(self, name, value):
"""Check if recipe is being set. If so, also set prodtypes."""
if name == 'recipe':
# set the prodtype list
try:
self.prodtypes = [self.prodtype_map[step] for step in value]
except AttributeError:
self.prodtypes = list()
super().__setattr__(name, value)
# input/output methods
[docs]
@staticmethod
def get_filenum(filename):
"""
Get a file number from an EXES file name.
Formats expected are for raw EXES files (e.g. sirius.sci.10001.fits)
or for intermediate processed files (e.g.
F0001_EX_SPE_0101123_EXEELONEXEECHL_COA_10001-10002.fits.
In either case, the field just before the .fits is expected to
contain the file number.
If the input file references a range of files, e.g. '10001-10002',
the first and last file numbers are returned in a list. Otherwise,
a single integer is returned. If neither pattern is found,
'UNKNOWN' is returned.
Parameters
----------
filename : str
FITS file name, raw or intermediate.
Returns
-------
filenum : str or list of str
Integer file numbers referenced in the input file name.
"""
split_filename = re.split('[_.]', os.path.basename(filename))
# check for single file with trailing serial number
# (for split coadds)
if len(split_filename) > 3:
if (re.match(r'^\d+$', split_filename[-2])
and re.match(r'^\d+$', split_filename[-3])):
filenum = '_'.join(split_filename[-3:-1])
return filenum
# otherwise check for ranges of raw file numbers
try:
num = split_filename[-2]
try:
filenum = f'{int(num):04d}'
except ValueError:
filenum = [f'{int(n):04d}' for n in num.split('-')]
except (ValueError, IndexError):
filenum = 'UNKNOWN'
return filenum
[docs]
@staticmethod
def concatenate_filenum(filenum):
"""
Concatenate file numbers, first-last.
File numbers are sorted alphanumerically.
Parameters
----------
filenum : str or list
File numbers to concatenate.
Returns
-------
str
Concatenated file number.
"""
if not isinstance(filenum, list):
filenum = [filenum]
filenums = []
for fn in filenum:
if isinstance(fn, list):
# one more layer of possible lists
for f in fn:
if isinstance(f, list):
filenums.extend(f)
else:
filenums.append(f)
else:
filenums.append(fn)
filenums = [str(fn) for fn in filenums]
if len(filenums) > 1:
# strip any trailing serial numbers
filenums = [f.split('_')[0] for f in filenums]
# keep the first and last, if different
filenums.sort()
if filenums[0] == filenums[-1]:
filenum_str = filenums[0]
else:
filenum_str = '-'.join([filenums[0], filenums[-1]])
else:
filenum_str = filenums[0]
return filenum_str
[docs]
def get_filename(self, header, filenum=None, prodtype=None, update=True):
"""
Create an output filename from an input header.
Parameters
----------
header : astropy.io.fits.Header
Header to create filename from.
filenum : str or list, optional
List of file numbers to concatenate for filename.
prodtype : str, optional
Three letter product type designator.
update : bool, optional
If set, the FILENAME key will be added or updated
in the header.
Returns
-------
filename : str
The output name.
"""
# Get flight number
missn = header.get('MISSN-ID', 'UNKNOWN')
flight = missn.split('_')[-1].lstrip('F')
try:
flight = 'F{0:04d}'.format(int(flight))
except ValueError:
# check for line ops mission
flight = missn.split('_')[-1].lstrip('L')
try:
flight = 'L{0:04d}'.format(int(flight))
except ValueError:
flight = 'UNKNOWN'
# Get instrument
try:
data_type = header['DATATYPE']
except KeyError:
inst = 'EX_SPE'
else:
if data_type == 'IMAGE':
inst = 'EX_IMA'
else:
inst = 'EX_SPE'
# Get AOR-ID
try:
aorid = header['AOR_ID']
except KeyError:
aorid = 'UNKNOWN'
aorid = aorid.replace('_', '')
# Get SPECTEL
try:
spectel1 = header['SPECTEL1']
except KeyError:
spectel1 = 'UNKNOWN'
try:
spectel2 = header['SPECTEL2']
except KeyError:
spectel2 = 'UNKNOWN'
spectel = (spectel1.replace('_', '').strip()
+ spectel2.replace('_', '').strip())
# get file number string
if filenum is None:
fn = 'UNKNOWN'
else:
fn = self.concatenate_filenum(filenum)
# Get product type
if prodtype is None:
prodtype = 'UNKNOWN'
filename = f'{flight}_{inst}_{aorid}_{spectel}_{prodtype}_{fn}.fits'
if update:
hdinsert(header, 'FILENAME', filename, comment='File name')
return filename
[docs]
def update_sofia_keys(self, header):
"""
Update required SOFIA header keywords.
Keywords added or updated are:
- PROCSTAT: set to LEVEL_2
- ASSC_AOR: copied from AOR_ID
- ASSC_OBS: copied from OBS_ID
- ASSC_MSN: copied from MISSN-ID
- OBS_ID: prepended with 'P\\_'
- PIPELINE: set to pipe_name
- PIPEVERS: set to pipe_version
Parameters
----------
header : `astropy.io.fits.Header`
Header to update.
"""
# update procstat if necessary
procstat = header.get('PROCSTAT', 'UNKNOWN').upper()
if procstat in ['UNKNOWN', 'LEVEL_0', 'LEVEL_1']:
hdinsert(header, 'PROCSTAT', 'LEVEL_2',
comment='Processing status')
# get AOR ID from header if science, base otherwise
aorid = header.get('AOR_ID', 'UNKNOWN').strip().upper()
obstype = header.get('OBSTYPE', 'UNKNOWN').strip().upper()
if (obstype in ['FLAT', 'DARK', 'SKY']
and self.parameters is not None
and self.parameters.base_header is not None):
base_aorid = self.parameters.base_header.get(
'AOR_ID', 'UNKNOWN').strip().upper()
if base_aorid != 'UNKNOWN' and base_aorid != aorid:
aorid = base_aorid
header['AOR_ID'] = aorid
# copy AOR_ID, OBS_ID, and MISSN-ID to ASSC* keys
# if not yet present
assc_aor = header.get('ASSC_AOR', 'UNKNOWN').upper()
if assc_aor == 'UNKNOWN':
hdinsert(header, 'ASSC_AOR', aorid,
comment='All input AOR-IDs')
obsid = header.get('OBS_ID', 'UNKNOWN').upper()
assc_obs = header.get('ASSC_OBS', 'UNKNOWN').upper()
if assc_obs == 'UNKNOWN':
hdinsert(header, 'ASSC_OBS', obsid,
comment='All input OBS-IDs')
msnid = header.get('MISSN-ID', 'UNKNOWN').upper()
assc_msn = header.get('ASSC_MSN', 'UNKNOWN').upper()
if assc_msn == 'UNKNOWN':
hdinsert(header, 'ASSC_MSN', msnid,
comment='All input MISSN-IDs')
# update OBS_ID if necessary
if obsid != 'UNKNOWN' and not obsid.startswith('P_'):
hdinsert(header, 'OBS_ID', 'P_' + obsid)
# always update pipeline and pipevers info
hdinsert(header, 'PIPELINE', self.pipe_name,
comment='Pipeline creating this file')
hdinsert(header, 'PIPEVERS', self.pipe_version,
comment='Pipeline version')
# and the current date/time
hdinsert(header, 'DATE', time.strftime('%Y-%m-%dT%H:%M:%S'),
comment='Date of file creation')
[docs]
def update_output(self, hdul, filenum, prodtype):
"""
Update output FITS file after a pipeline step.
Sets the PRODTYPE key, adds a HISTORY message with
the pipeline step name, and updates the FILENAME
key with a new filename, appropriate to the prodtype.
The new filename is returned from the function.
Parameters
----------
hdul : astropy.io.fits.HDUList
Output FITS HDUList.
filenum : str or list
File number(s) to use in the filename.
prodtype : str
Product type for the completed step.
Returns
-------
str
File base name for the output product.
"""
hdinsert(hdul[0].header, 'PRODTYPE', prodtype,
comment='Product type')
# Add a history message with the step name
step_name = self.step_map[prodtype]
step_display_name = self.processing_steps[step_name]
message = f'-- Pipeline step: {step_display_name}'
hdinsert(hdul[0].header, 'HISTORY', message)
params = self.get_parameter_set()
for param in params:
if params[param]['hidden']:
continue
message = f' {param} = {params.get_value(param)}'
hdinsert(hdul[0].header, 'HISTORY', message)
hdinsert(hdul[0].header, 'HISTORY', '--')
hdinsert(hdul[0].header, 'HISTORY', '')
outname = self.get_filename(hdul[0].header, filenum,
self.prodnames[prodtype])
return outname
[docs]
def write_output(self, hdul, outname):
"""
Write an output FITS file to disk.
Outname is joined to self.output_directory, before writing.
Parameters
----------
hdul : astropy.io.fits.HDUList
FITS HDUList to write.
outname : str
File basename to write.
Returns
-------
str
Full path to the output file.
"""
outname = os.path.join(self.output_directory, outname)
with warnings.catch_warnings():
warnings.simplefilter('ignore')
hdul.writeto(outname, overwrite=True)
self.record_outfile(outname)
return outname
def _set_sky_products(self, sky=True):
"""
Set alternate products for sky spectrum extraction.
If `sky` is set, product types and names are updated for
sky products. Parameters are also updated to set defaults
appropriate for sky reductions, if they have not already
been set for sky spectra.
If `sky` is not set, default product types and names are
restored. Parameters are also updated to restore defaults,
if they were previously set for sky reductions.
Parameters
----------
sky : bool, optional
If True, sky products are set. If False, products are reset
to default types.
"""
self.sky_spectrum = sky
self.prodtype_map = self.default_prodtype_map.copy()
self.prodnames = self.default_prodnames.copy()
self.step_map = self.default_step_map.copy()
if sky:
log.info('Setting sky product names.')
self.prodtype_map.update(self.sky_prodtype_map)
self.prodnames.update(self.sky_prodnames)
self.step_map.update(self.sky_step_map)
else:
log.debug('Setting default product names.')
# update product types if possible
try:
self.prodtypes = [self.prodtype_map[step] for step in self.recipe]
except AttributeError: # pragma: no cover
self.prodtypes = list()
# update parameters if available and if the setting has
# changed from initial load
if (self.parameters is not None
and self.parameters.base_header is not None):
hdinsert(self.parameters.base_header, 'SKYSPEC', sky,
'Sky spectrum product')
if self.parameters.sky_spec_set != sky:
log.info('Updating parameters for new sky setting.')
self.update_parameters()
self.parameters.sky_spec_set = sky
[docs]
def load(self, data):
"""
Load input data to make it available to reduction steps.
The process is:
- Call the parent load method to initialize data
reduction variables.
- Use the first loaded FITS header that is not a flat or dark file
to set the observation configuration for the reduction set.
- Use the product type in the base header to determine the
data processing recipe.
- Load parameters for all steps.
- Load the data immediately if it is a 1D spectrum that needs
visualization only; otherwise, just load the raw headers and
defer loading the data from the FITS files.
After this step, the input attribute is populated as required
for the first pipeline step in the recipe.
Parameters
----------
data : list of str or str
Input file paths to load.
"""
Reduction.load(self, data)
# sort raw files by filename, for reduction consistency
self.raw_files.sort(key=os.path.basename)
# read the first non-flat FITS header
# (or the last flat/dark if no science found)
basehead = fits.Header()
for infile in self.raw_files:
basehead = getheader(infile)
if (str(basehead.get('OBSTYPE', 'UNKNOWN')).strip().upper()
not in ['FLAT', 'DARK', 'SKY']):
break
# get product type to determine recipe
prodtype = str(basehead.get('PRODTYPE', 'UNKNOWN')).lower().strip()
# override for special cases (final png image only)
one_d = False
if prodtype in self.spec1d_prodtype:
one_d = True
self.recipe = ['specmap']
else:
if prodtype in self.sky_prodnames.keys():
# sky spectrum intermediate products
sky_spec = True
else:
sky_spec = False
# make sure parameters and products are set accordingly
self._set_sky_products(sky=sky_spec)
hdinsert(basehead, 'SKYSPEC', sky_spec, 'Sky spectrum product')
# self.prodtypes is set from the prodtype map
# when self.recipe is set
self.recipe = self.default_recipe
if prodtype in self.prodtypes:
pidx = self.prodtypes.index(prodtype)
recipe = self.recipe[pidx + 1:]
if len(recipe) == 0:
msg = f"No steps to run for prodtype '{prodtype}'."
log.error(msg)
raise ValueError(msg)
# if the first step isn't load_data, add it in so that
# header overrides are always allowed
if recipe[0] != 'load_data':
recipe = ['load_data'] + recipe
self.recipe = recipe
elif prodtype.upper() != 'UNKNOWN':
msg = f"Unrecognized prodtype '{prodtype}'."
log.error(msg)
raise ValueError(msg)
self.parameters = EXESParameters(base_header=basehead)
if not one_d:
# just load headers, since load data is always called first
self.input = list()
for datafile in self.raw_files:
self.input.append(fits.getheader(datafile))
else:
# load fits directly, no header updates
self.load_fits()
# log all planned processing steps
log.info('Processing steps: {}'.format(self.recipe))
# pass data to QAD for viewing
self.set_display_data(raw=True)
[docs]
def load_fits(self):
"""Load FITS data into the input attribute."""
self.input = list()
self.filenum = list()
for i, datafile in enumerate(self.raw_files):
input_hdul = fits.open(datafile, lazy_load_hdus=False,
memmap=False)
# touch the data to make sure it is in memory
# and store copies in a new HDUList
# (this avoids storing a file handle in the HDUList)
hdul = fits.HDUList()
for hdu in input_hdul:
assert hdu.data[0] is not None
hdul.append(hdu.copy())
input_hdul.close()
# keep file numbers
self.filenum.append(self.get_filenum(datafile))
self.input.append(hdul)
# display methods
[docs]
def register_viewers(self):
"""Return a new QADViewer, ProfileViewer, and SpectralViewer."""
prof = MatplotlibViewer()
prof.name = 'ProfileViewer'
prof.title = 'Spatial Profiles'
prof.layout = 'rows'
prof.max_plot = 99
spec = EyeViewer()
spec.name = 'SpectralViewer'
spec.title = 'Spectra'
spec.layout = 'rows'
viewers = [QADViewer(), prof, spec]
return viewers
def _get_profiles(self, hdul):
"""
Assemble profile data for display in a plot viewer.
Parameters
----------
hdul : astropy.io.fits.HDUList
The data containing profile information.
Returns
-------
display_data : list of dict
Display data formatted for a `MatplotlibViewer`.
"""
disp_plot = []
header = hdul[0].header
title = header.get('FILENAME', '')
orders = [int(x) for x in header.get('ORDERS', '1').split(',')]
for order in orders:
ordnum = f'{order:02d}'
suffix = f'_ORDER_{ordnum}'
if f'SPATIAL_PROFILE{suffix}' in hdul:
slitpos = hdul[f'SLITPOS{suffix}'].data
profile = hdul[f'SPATIAL_PROFILE{suffix}'].data
yunit = hdul[f'SLITPOS{suffix}'].header.get('BUNIT', 'arcsec')
disp = {'args': [slitpos, profile],
'kwargs': {'title': f'{title} \nOrder {ordnum}',
'titlesize': 'medium',
'xlabel': f'Slit position ({yunit})',
'ylabel': 'Normalized median flux'},
'plot_kwargs': {'color': 'gray',
'drawstyle': 'steps-mid'}}
overplots = []
if f'APPOSO{ordnum}' in header:
aps = parse_apertures(
header[f'APPOSO{ordnum}'], 1)[0]
if f'APRADO{ordnum}' in header:
rads = parse_apertures(
header[f'APRADO{ordnum}'], 1)[0]
else:
rads = [None] * len(aps)
if f'PSFRAD{ordnum}' in header:
psfs = parse_apertures(
header[f'PSFRAD{ordnum}'], 1)[0]
else:
psfs = [None] * len(aps)
for a, r, p in zip(aps, rads, psfs):
overplots.append(
{'plot_type': 'vline',
'args': [a],
'kwargs': {'color': '#17becf',
'linestyle': '--'}})
if r is not None:
overplots.append(
{'plot_type': 'vline',
'args': [a - r],
'kwargs': {'color': '#2ca02c',
'linestyle': ':'}})
overplots.append(
{'plot_type': 'vline',
'args': [a + r],
'kwargs': {'color': '#2ca02c',
'linestyle': ':'}})
if p is not None:
overplots.append(
{'plot_type': 'vline',
'args': [a - p],
'kwargs': {'color': '#1f77b4',
'linestyle': '-.'}})
overplots.append(
{'plot_type': 'vline',
'args': [a + p],
'kwargs': {'color': '#1f77b4',
'linestyle': '-.'}})
if f'BGR_O{ordnum}' in header:
bgrs = parse_bg(header[f'BGR_O{ordnum}'], 1)[0]
for reg in bgrs:
if len(reg) == 2:
idx = (slitpos >= reg[0]) & (slitpos <= reg[1])
overplots.append(
{'args': [slitpos[idx],
profile[idx]],
'kwargs': {'color': '#d62728'}})
if overplots:
disp['overplot'] = overplots
disp_plot.append(disp)
return disp_plot
@staticmethod
def _is_spectral(hdul):
"""
Determine if data is displayable by the spectral viewer.
Currently only subsets of products are displayable: 1D
spectra and the orders_merged product.
Parameters
----------
hdul : astropy.io.fits.HDUList
The FITS structure containing spectral data.
Returns
-------
bool
True if input is displayable spectral data; False otherwise.
"""
# todo: update when Eye handles EXES intermediate data
header = hdul[0].header
ptype = str(header.get('PRODTYPE', 'UNKNOWN')).strip().lower()
allowed = ['orders_merged', 'sky_orders_merged',
'spectra_1d', 'combined_spectrum_1d',
'wavecal_refined_1d', 'orders_merged_1d',
'sky_spectra_1d', 'sky_combined_spectrum_1d',
'sky_wavecal_refined_1d', 'sky_orders_merged_1d']
return ptype in allowed
@staticmethod
def _is_1d(hdul):
"""
Determine if data contains a 1D spectrum.
The test is for a specific set of product types, produced
by this pipeline.
Parameters
----------
hdul : astropy.io.fits.HDUList
The FITS structure containing spectral data.
Returns
-------
bool
True if input is 1D spectral data; False otherwise.
"""
# todo: update when Eye handles EXES intermediate data
header = hdul[0].header
ptype = str(header.get('PRODTYPE', 'UNKNOWN')).strip().lower()
one_d = ['spectra_1d', 'combined_spectrum_1d', 'wavecal_refined_1d',
'orders_merged_1d',
'sky_spectra_1d', 'sky_combined_spectrum_1d',
'sky_wavecal_refined_1d', 'sky_orders_merged_1d']
return ptype in one_d
[docs]
def set_display_data(self, raw=False, filenames=None):
"""
Store display data for viewer.
Parameters
----------
raw : bool
If True, display data is taken from self.rawfiles.
If False, display data is taken from self.input.
filenames : list of str, optional
If provided and `raw` is False, file names will be
passed to the viewer instead of self.input.
"""
self.display_data = dict()
if raw:
data_list = self.raw_files
elif filenames is not None:
data_list = filenames
else:
data_list = self.input
# set profile plot and spectral data if necessary
if not raw:
disp_qad = []
disp_plot = []
disp_spec = []
for hdul in data_list:
# get data from disk if necessary
with set_log_level('CRITICAL'):
test_hdul = gethdul(hdul)
if test_hdul is None:
continue
disp_plot.extend(self._get_profiles(test_hdul))
if self._is_spectral(test_hdul):
disp_spec.append(hdul)
if not self._is_1d(test_hdul):
disp_qad.append(hdul)
self.display_data['ProfileViewer'] = disp_plot
if len(disp_spec) > 0:
self.display_data['SpectralViewer'] = disp_spec
if len(disp_qad) > 0:
self.display_data['QADViewer'] = disp_qad
else:
self.display_data['QADViewer'] = data_list
# exes module pipeline steps and helpers
@staticmethod
def _override_header_values(header, params):
"""
Override header values for distortion parameters with user input.
Input parameters may contain values for the following keys:
- cent_wave
- hrfl
- xdfl
- slit_rot
- det_rot
- hrr
- flattamb
- flatemis
Parameters
----------
header : astropy.io.fits.Header
FITS header to update.
params : ParameterSet
Parameter set containing override values.
"""
try:
wavenum = float(params.get_value('cent_wave'))
except (TypeError, ValueError):
pass
else:
if wavenum > 0:
log.info(f'Overriding central wavenumber with {wavenum}')
header['WNO0'] = wavenum
try:
hrfl = float(params.get_value('hrfl'))
except (TypeError, ValueError):
pass
else:
if hrfl != -9999:
log.info(f'Overriding HR focal length with {hrfl}')
header['HRFL0'] = hrfl
try:
xdfl = float(params.get_value('xdfl'))
except (TypeError, ValueError):
pass
else:
if xdfl != -9999:
log.info(f'Overriding XD focal length with {xdfl}')
header['XDFL0'] = xdfl
try:
slit_rot = float(params.get_value('slit_rot'))
except (TypeError, ValueError):
pass
else:
if slit_rot != -9999:
log.info(f'Overriding slit rotation angle with {slit_rot}')
header['SLITROT'] = slit_rot
try:
det_rot = float(params.get_value('det_rot'))
except (TypeError, ValueError):
pass
else:
if det_rot != -9999:
log.info(f'Overriding detector rotation angle with {det_rot}')
header['DETROT'] = det_rot
try:
hrr = float(params.get_value('hrr'))
except (TypeError, ValueError):
pass
else:
if hrr != -9999:
log.info(f'Overriding echelon R number with {hrr}')
header['HRR'] = hrr
try:
flattamb = float(params.get_value('flattamb'))
except (TypeError, ValueError):
pass
else:
if flattamb != -9999:
log.info(f'Overriding flat mirror ambient temperature '
f'with {flattamb}')
header['FLATTAMB'] = flattamb
try:
flatemis = float(params.get_value('flatemis'))
except (TypeError, ValueError):
pass
else:
if flatemis != -9999:
log.info(f'Overriding flat mirror emissivity with {flatemis}')
header['FLATEMIS'] = flatemis
[docs]
def load_data(self):
"""
Load FITS data into memory and standardize headers.
Calls `sofia_redux.instruments.exes.readhdr` to standardize
headers and load expected default values.
If the sky_spec parameter is set, product types and names
are updated for sky spectrum extraction.
"""
from sofia_redux.instruments.exes.readhdr import readhdr
param = self.get_parameter_set()
abort = param.get_value('abort')
sky_spec = param.get_value('sky_spec')
# validate headers and update with defaults
all_valid = True
updated = list()
for header in self.input:
log.info(f"Input: {header['FILENAME']}")
self._override_header_values(header, param)
new_header, valid = readhdr(header, check_header=True)
all_valid &= valid
updated.append(new_header)
log.info('')
if not all_valid and abort:
msg = 'Invalid headers.'
log.error(msg)
self.error = msg
self.input = list()
return
# if all passed validation, read fits files
# into self.input for processing
self.load_fits()
# replace header values with updated values
for i, hdul in enumerate(self.input):
hdul[0].header = updated[i]
# always update sofia keys last
self.update_sofia_keys(hdul[0].header)
# update product types and parameters
# for sky spectrum defaults
self._set_sky_products(sky=sky_spec)
# show loaded data with updated headers
self.set_display_data()
def _check_raw_rasterized(self):
"""
Check raw data to determine if it is a rasterized flat.
The output dictionary has items:
- 'flag' : array-like of int
One entry per input file, where 1 indicates a
raster flat, -1 indicates a raster dark, and 0
indicates neither.
- 'dark' : array-like of float
Raw dark data cube, read from the first raster dark
encountered. None if not found.
- 'dark_header' : astropy.io.fits.Header
FITS header for the first raster dark encountered.
None if not found.
Returns
-------
raster : dict
Raster flat settings for input data.
"""
raster_flat = np.zeros(len(self.raw_files), dtype=int)
raster_dark = None
raster_header = None
for i, hdul in enumerate(self.input):
header = hdul[0].header
calsrc = str(header.get('CALSRC', 'UNKNOWN')).lower().strip()
instmode = str(header['INSTMODE']).lower().strip()
obstype = str(header['OBSTYPE']).lower().strip()
naxis = int(header['NAXIS2'])
if (calsrc == 'blackbody' and instmode == 'stare'
and obstype == 'sky' and naxis < 1024):
# flag flat to derasterize
raster_flat[i] = 1
elif obstype == 'dark' and naxis < 1024:
# keep the first raster dark
if raster_dark is None:
raster_dark = hdul[0].data
raster_header = header
raster_flat[i] = -1
raster = {'flag': raster_flat, 'dark': raster_dark,
'dark_header': raster_header}
return raster
[docs]
def coadd_readouts(self):
"""
Coadd raw readouts.
Calls `sofia_redux.instruments.exes.derasterize` for raster
flats and darks and `sofia_redux.instruments.exes.readraw`
for all other data.
Optionally, if the 'fix_row_gains' parameter is set,
`exes.correct_row_gains` is called after readouts are coadded.
"""
from sofia_redux.instruments.exes.readraw import readraw
from sofia_redux.instruments.exes.derasterize import derasterize
from sofia_redux.instruments.exes.correct_row_gains \
import correct_row_gains
param = self.get_parameter_set()
save = param.get_value('save')
lin_corr = param.get_value('lin_corr')
fix_row_gains = param.get_value('fix_row_gains')
toss_int_sci = param.get_value('toss_int_sci')
toss_int_flat = param.get_value('toss_int_flat')
toss_int_dark = param.get_value('toss_int_dark')
copy_int = param.get_value('copy_integrations')
algorithm = str(param.get_value('algorithm')).lower().strip()
overlap = param.get_value('overlap')
fit_buffer = param.get_value('fit_buffer')
# parse algorithm from:
# options = ['Default for read mode',
# 'Last destructive only',
# 'First / last frame only'
# 'Second / penultimate frame only']
if algorithm.startswith('last'):
algorithm = 0
elif algorithm.startswith('first'):
algorithm = 2
elif algorithm.startswith('second'):
# beware: this option will not work with all
# readout patterns
algorithm = 3
else:
algorithm = None
raster = self._check_raw_rasterized()
results = list()
filenum = list()
for i, hdul in enumerate(self.input):
data = hdul[0].data
header = hdul[0].header
log.info(f"Input: {header['FILENAME']}")
# drop any raster darks --
# they will be handled with raster flats
if raster['flag'][i] == -1:
log.warning('Skipping direct handling for raster dark.')
log.info('')
continue
filenum.append(self.filenum[i])
# pass through any processed flats
if str(header.get('PRODTYPE',
'UNKNOWN')).upper().strip() == 'FLAT':
log.info('Passing processed flat, unmodified.')
log.info('')
results.append(hdul)
continue
# otherwise, assemble raw data
if raster['flag'][i] == 1:
# derasterize flat
log.info('Processing raster flat.')
data, variance, lin_mask = derasterize(
data, header, dark_data=raster['dark'],
dark_header=raster['dark_header'], overlap=overlap,
fit_buffer=fit_buffer)
log.info('')
else:
# get toss parameter by obstype
obstype = str(header['OBSTYPE']).upper()
if obstype == 'FLAT':
toss_nint = toss_int_flat
elif obstype == 'DARK':
toss_nint = toss_int_dark
else:
toss_nint = toss_int_sci
data, variance, lin_mask = readraw(
data, header, do_lincor=lin_corr, algorithm=algorithm,
toss_nint=toss_nint, copy_int=copy_int)
# if desired, fix odd/even row gains
if fix_row_gains:
data = correct_row_gains(data)
new_hdul = fits.HDUList()
new_hdul.append(fits.ImageHDU(data=data, header=header,
name='FLUX'))
# add BUNIT to extension headers
exthead = fits.Header()
hdinsert(exthead, 'BUNIT', 'ct', 'Data units')
new_hdul.append(fits.ImageHDU(data=np.sqrt(variance),
header=exthead,
name='ERROR'))
exthead['BUNIT'] = ''
new_hdul.append(fits.ImageHDU(data=(~lin_mask).astype(int),
header=exthead,
name='MASK'))
outname = self.update_output(new_hdul, self.filenum[i],
self.prodtypes[self.step_index])
if save:
self.write_output(new_hdul, outname)
results.append(new_hdul)
log.info('')
self.input = results
self.filenum = filenum
self.set_display_data()
@staticmethod
def _combine_flats(flats):
"""
Combine separate input flats.
Performs a mean on all input flat data, frame by frame,
weighted by the associated errors. Input flat data shapes
should match.
Parameters
----------
flats : list of astropy.io.fits.HDUList
List of input flat files. Extensions FLUX, ERROR, and MASK
are expected.
Returns
-------
combined_flat : astropy.io.fits.HDUList
Output file containing combined flat data.
"""
# start with the first input file
header = flats[0][0].header
prodtype = str(header.get('PRODTYPE', 'UNKNOWN')).upper().strip()
if prodtype == 'FLAT':
log.warning(f"Previously processed flats: using the first "
f"one only ({header.get('FILENAME', 'UNKNOWN')}).")
return flats[0]
# more than one raw flat: average each card
log.info('Mean-combining input flat frames')
new_hdul = copy.deepcopy(flats[0])
all_cards = []
all_var = []
all_mask = []
all_hdr = []
test_shape = None
for hdul in flats:
if test_shape is None:
test_shape = hdul['FLUX'].data.shape
elif hdul['FLUX'].data.shape != test_shape:
msg = 'Flat files do not match and cannot be combined.'
raise ValueError(msg)
all_cards.append(hdul['FLUX'].data)
all_var.append(hdul['ERROR'].data ** 2)
all_mask.append(hdul['MASK'].data)
all_hdr.append(hdul[0].header)
# mean combine each card separately
# (not robust because likely to be 2 or 3 input files at most)
all_cards = np.array(all_cards)
all_var = np.array(all_var)
for n in range(test_shape[0]):
mean, var = combine_images(all_cards[:, n, :, :],
all_var[:, n, :, :], method='mean',
weighted=True, robust=False,
returned=True)
new_hdul['FLUX'].data[n] = mean
new_hdul['ERROR'].data[n] = np.sqrt(var)
# directly 'or' the 2D mask
new_hdul['MASK'].data = np.any(all_mask, axis=0).astype(int)
# merge all headers
new_hdul[0].header = mergehdr(all_hdr)
return new_hdul
def _write_flat(self, flat, error, illum, filenum,
dark=None, save=True):
"""
Write a processed flat file.
The output file has PRODTYPE = flat, file code FLT, and extensions
FLAT, FLAT_ERROR, DARK (optional), TORT_FLAT, TORT_FLAT_ERROR,
ILLUMINATION, WAVECAL, SPATCAL, and ORDER_MASK.
The TORT_FLAT and TORT_FLAT_ERROR extensions are generated with
`exes.tort`, by undistorting the data and error in the FLAT
and FLAT_ERROR extensions. The WAVECAL, SPATCAL, and ORDER_MASK
extensions are generated with `exes.wavecal` from parameters in
the FLAT extension header.
If saved to disk, this file can be used in future data reductions
in place of raw flat and/or dark files.
Parameters
----------
flat : astropy.io.fits.ImageHDU
HDU with processed flat data and updated tort parameters
in header.
error : astropy.io.fits.ImageHDU
HDU containing error data associated with the flat array.
illum : numpy.ndarray
Illumination array for the undistorted flat.
filenum : str or list of str
Input file number(s) to use in the output flat name.
dark : astropy.io.fits.ImageHDU, optional
If provided, will be appended to the output flat file.
save : bool, optional
If set, the output file is written to disk.
Returns
-------
processed_flat : astropy.io.fits.HDUList
HDU list containing processed flat extensions.
"""
from sofia_redux.instruments.exes.wavecal import wavecal
from sofia_redux.instruments.exes.tort import tort
header = flat.header.copy()
hdul = fits.HDUList()
hdul.append(fits.PrimaryHDU(flat.data, header))
hdul.append(error)
# also append torted flat info
wave_header = header.copy()
wavemap = wavecal(wave_header)
rotation = wave_header.get('ROTATION', 0)
# remove old unit keywords if necessary:
for key in ['BUNIT1', 'BUNIT2', 'BUNIT3']:
if key in wave_header:
del wave_header[key]
iflat = 1 / flat.data
iflatvar = error.data ** 2 * iflat ** 4
tort_flat, tort_flat_var = tort(iflat, header, iflatvar, skew=True)
if dark is not None:
hdul.append(dark)
hdul.append(fits.ImageHDU(rotate90(tort_flat, rotation),
wave_header, name='TORT_FLAT'))
hdul.append(fits.ImageHDU(rotate90(np.sqrt(tort_flat_var), rotation),
wave_header, name='TORT_FLAT_ERROR'))
wave_header['BUNIT'] = ''
hdul.append(fits.ImageHDU(rotate90(illum, rotation),
wave_header, name='ILLUMINATION'))
wave_header['BUNIT'] = 'cm-1'
hdul.append(fits.ImageHDU(rotate90(wavemap[0], rotation),
wave_header, name='WAVECAL'))
wave_header['BUNIT'] = 'arcsec'
hdul.append(fits.ImageHDU(rotate90(wavemap[1], rotation),
wave_header, name='SPATCAL'))
wave_header['BUNIT'] = ''
hdul.append(fits.ImageHDU(rotate90(wavemap[2], rotation).astype((int)),
wave_header, name='ORDER_MASK'))
outname = self.update_output(hdul, filenum,
self.prodtypes[self.step_index])
outname = outname.replace('FTA', 'FLT')
hdul[0].header['PRODTYPE'] = 'flat'
hdul[0].header['FILENAME'] = os.path.basename(outname)
# write to disk if desired
if save:
self.write_output(hdul, outname)
return hdul
[docs]
def make_flat(self):
"""
Make a processed flat file.
If the input data is already a processed flat, it is passed
through without modification. Otherwise, `exes.makeflat` is
called to process the flat file.
If multiple unprocessed flat files are provided, they are
mean-combined before processing. If multiple processed flat
files are provided, only the first one is used.
The output data from this step includes only the science files
with new flat extensions attached. Any flat or dark files are
dropped from further processing.
"""
from sofia_redux.instruments.exes import makeflat as mf
param = self.get_parameter_set()
krot = param.get_value('start_rot')
spacing = param.get_value('predict_spacing')
threshold_factor = param.get_value('threshold')
method = str(param.get_value('edge_method')).lower().strip()
if method.startswith('sobel'):
edge_method = 'sobel'
elif method.startswith('square'):
edge_method = 'sqderiv'
else:
edge_method = 'deriv'
custom_wavemap = str(param.get_value('custom_wavemap')).strip()
opt_rot = param.get_value('opt_rot')
fix_tort = (not opt_rot)
# Determine number of flats loaded in input
flats = list()
flat_filenums = list()
darks = list()
dark_filenums = list()
sources = list()
source_filenums = list()
for i, hdul in enumerate(self.input):
header = hdul[0].header
obstype = str(header['OBSTYPE']).upper()
if obstype == 'FLAT':
flats.append(hdul)
flat_filenums.append(self.filenum[i])
elif obstype == 'DARK':
darks.append(hdul)
dark_filenums.append(self.filenum[i])
else:
sources.append(hdul)
source_filenums.append(self.filenum[i])
if len(flats) == 0:
log.info('No flat file loaded.')
return
elif len(flats) > 1:
# combine flats
flat_hdul = self._combine_flats(flats)
else:
# use the first flat provided
flat_hdul = flats[0]
flat_header = flat_hdul[0].header
flat_prodtype = str(flat_header.get('PRODTYPE',
'UNKNOWN')).upper().strip()
if flat_prodtype == 'FLAT':
log.info('Processed flat provided, using as is.')
flat_data = None
flat_var = None
else:
flat_data = flat_hdul['FLUX'].data
flat_var = flat_hdul['ERROR'].data ** 2
if len(darks) == 0:
dark_header = None
dark_data = None
dark_hdu = None
else:
# Only use the first dark provided
dark_hdul = darks[0]
dark_header = dark_hdul[0].header
dark_data = np.squeeze(dark_hdul['FLUX'].data)
dark_hdu = fits.ImageHDU(dark_data, dark_header, name='DARK')
dark_file = dark_header.get('FILENAME', 'UNKNOWN')
if len(darks) > 1:
log.warning(f"More than 1 dark loaded; using the first one "
f"({dark_file}).")
else:
log.info(f'Using slit dark {dark_file}')
if flat_data is not None:
if str(krot).strip() != '':
flat_header['KROT'] = krot
if str(spacing).strip() != '':
flat_header['SPACING'] = spacing
if str(threshold_factor).strip() != '':
flat_header['THRFAC'] = threshold_factor
central_waveno = parse_central_wavenumber(flat_header)
log.info('\nUsing tort parameters: ')
log.info(f'Central wavenumber: {central_waveno}')
labels = {'HR focal length': 'hrfl', 'XD focal length': 'xdfl',
'Slit rotation': 'slitrot', 'Det. rotation': 'detrot',
'HRR': 'hrr', 'Flat mirror T_amb': 'flattamb',
'Flat mirror emissivity': 'flatemis'}
for label, field in labels.items():
log.info(f'{label:>22s}: {flat_header[field.upper()]}')
log.info('')
kwargs = {'dark': dark_data,
'start_pixel': param.get_value('start_pix'),
'end_pixel': param.get_value('end_pix'),
'top_pixel': param.get_value('top_pix'),
'bottom_pixel': param.get_value('bottom_pix'),
'fix_tort': fix_tort,
'edge_method': edge_method,
'custom_wavemap': custom_wavemap}
for key, value in kwargs.items():
if str(value).strip() == '':
kwargs[key] = None
try:
flat_param = mf.makeflat(flat_data, flat_header, flat_var,
**kwargs)
except ValueError as err:
log.error('Error in makeflat:')
log.error(str(err))
raise ValueError('Error in makeflat') from None
log.info('')
# get updated flat header from returned params
flat_header = flat_param['header']
# store dark if used
if dark_data is not None:
hdinsert(flat_header, 'SLITDRKF',
dark_header.get('FILENAME', 'UNKNOWN'),
'Slit dark file')
hdinsert(flat_header, 'SLITDRKR',
dark_header.get('RAWFNAME', 'UNKNOWN'),
'Slit dark raw file name')
flat = fits.ImageHDU(flat_param['flat'], flat_header, name='FLAT')
flat_error = fits.ImageHDU(np.sqrt(flat_param['flat_variance']),
flat_header, name='FLAT_ERROR')
illum_header = flat_header.copy()
illum_header['BUNIT'] = ''
flat_mask = fits.ImageHDU(flat_param['illum'],
illum_header, name='FLAT_ILLUMINATION')
# write out untorted flat and wavecal map
uflat = self._write_flat(flat, flat_error, flat_param['illum'],
flat_filenums, dark=dark_hdu,
save=param.get_value('save_flat'))
else:
# pre-processed flat
uflat = flat_hdul
flat_header = uflat[0].header
flat = fits.ImageHDU(uflat['FLAT'].data, flat_header, name='FLAT')
flat_error = fits.ImageHDU(uflat['FLAT_ERROR'].data,
flat_header, name='FLAT_ERROR')
rotation = flat_header.get('ROTATION', 0)
illum_header = flat_header.copy()
illum_header['BUNIT'] = ''
flat_mask = fits.ImageHDU(
unrotate90(uflat['ILLUMINATION'].data, rotation),
illum_header, name='FLAT_ILLUMINATION')
# allow copying of dark from flat if present
# and not directly provided
if dark_hdu is None and 'DARK' in uflat:
dark_hdu = uflat['DARK']
# drop flats from input
self.input = sources
self.filenum = source_filenums
# append flat info to sources and save if desired
copy_keys = [
# tort and wavecal values
'DETROT', 'HRFL', 'HRFL0',
'HRR', 'KROT', 'XDFL', 'XDFL0', 'SLITROT',
'WNO0', 'WAVENO0',
# flat calibration values
'BB_TEMP', 'FLATTAMB', 'FLATEMIS', 'BNU_T',
# derived spectral values
'SPACING', 'NT', 'NORDERS',
'ORDERS', 'XORDER1', 'NBELOW',
'ORDR_B', 'ORDR_T', 'ORDR_S', 'ORDR_E',
'SLTH_ARC', 'SLTH_PIX',
'SLTW_ARC', 'SLTW_PIX', 'SLITWID',
'ROTATION', 'RP',
# dark file names
'SLITDRKF', 'SLITDRKR']
for i, hdul in enumerate(self.input):
hdul.append(copy.deepcopy(flat))
hdul.append(copy.deepcopy(flat_error))
hdul.append(copy.deepcopy(flat_mask))
if dark_hdu is not None:
hdul.append(copy.deepcopy(dark_hdu))
# update or add important keys from the flat values
for key in copy_keys:
if key in flat_header:
hdinsert(hdul[0].header, key, flat_header[key],
flat_header.comments[key])
# also add the name of the source flat
hdinsert(hdul[0].header, 'FLTFILE',
uflat[0].header.get('FILENAME', 'UNKNOWN'),
'Processed flat file')
hdinsert(hdul[0].header, 'FLTRFILE',
uflat[0].header.get('RAWFNAME', 'UNKNOWN'),
'Raw flat file name')
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
if param.get_value('save'):
self.write_output(hdul, outname)
# set only the flat in the display data
self.set_display_data(filenames=[uflat])
@staticmethod
def _get_beams(header, flag, n_frames):
"""
Get the frame indices for the specified beam.
For INSTMODE = NOD_ON_SLIT or NOD_OFF_SLIT, A and B beams
alternate. If NODBEAM = A, A beams are every other frame,
starting at 0. If NODBEAM = B, A beams start at 1.
For INSTMODE = MAP, B beams are the last three frames; A
beams are all others.
For any other INSTMODE, all frames are assumed to be A frames.
Parameters
----------
header : astropy.io.fits.Header
FITS header for the data, containing INSTMODE and NODBEAM.
flag : {'A', 'B'}
The beam to retrieve. A is on-source or position 1; B
is off-source or position 2.
n_frames : int
The number of frames in the input data.
Returns
-------
beams : array-like of int
Indices of frames in the specified beam. Size may range
from 0 to n_frames.
"""
if n_frames == 0:
return np.empty(0)
obs_mode = header['INSTMODE'].upper()
nod_beam = header['NODBEAM'].upper()
if obs_mode in ['NOD_OFF_SLIT', 'NOD_ON_SLIT']:
if n_frames < 2:
if flag == 'A':
beams = np.array([0])
else:
beams = np.array([])
else:
if flag == 'A':
if nod_beam == 'B':
beams = np.arange(n_frames // 2) * 2 + 1
else:
beams = np.arange(n_frames // 2) * 2
else:
if nod_beam == 'B':
beams = np.arange(n_frames // 2) * 2
else:
beams = np.arange(n_frames // 2) * 2 + 1
elif obs_mode == 'MAP':
if n_frames < 5:
if flag == 'A':
beams = np.arange(n_frames)
else:
beams = np.empty(0)
else:
if flag == 'A':
beams = np.arange(n_frames - 3)
else:
beams = np.arange(n_frames - 3, n_frames)
else:
if flag == 'A':
beams = np.arange(n_frames)
else:
beams = np.empty(0)
return beams
[docs]
def despike(self):
"""
Flag temporal outliers (spikes).
Calls `sofia_redux.instruments.exes.despike` on each input file.
Typically, A beams within are compared separately from B beams,
but if desired, the beam designation can be ignored and all
frames compared together.
It is also possible to combine all input files before running
despike. In this case, all input will be treated as a single
file in all subsequent steps.
"""
from sofia_redux.instruments.exes import despike as ed
log.info('Running despike')
param = self.get_parameter_set()
save = param.get_value('save')
mark_trashed = param.get_value('mark_trash')
spike_factor = param.get_value('spike_fac')
combine_all = param.get_value('combine_all')
ignore_beams = param.get_value('ignore_beams')
propagate_nan = param.get_value('propagate_nan')
# check for missing input, following flat step
if len(self.input) == 0:
err = 'No source files loaded'
log.error(err)
self.error = err
return
results = list()
if spike_factor == 0:
log.info('Spike factor is 0; despike not applied')
return
if combine_all:
self._combine_nod_pairs(mask2d=True)
# Despike each image
for i, hdul in enumerate(self.input):
data = hdul[0].data
var = hdul['ERROR'].data ** 2
mask = hdul['MASK'].data
good_data = ~(mask.astype(bool))
header = hdul[0].header
log.info('')
log.info(f"Input: {header['FILENAME']}")
if data.ndim > 2:
n_frames = data.shape[0]
else:
n_frames = 1
if ignore_beams:
a_beams = np.arange(n_frames)
b_beams = np.empty(0)
else:
a_beams = self._get_beams(header, 'A', n_frames)
b_beams = self._get_beams(header, 'B', n_frames)
hdinsert(header, 'SPIKEFAC', spike_factor,
comment='Spike factor')
if not mark_trashed:
hdinsert(header, 'TRASH', 0.0)
try:
output = ed.despike(data, header, var,
a_beams, b_beams,
good_data,
propagate_nan=propagate_nan)
spike_data, spike_mask, good_frames = output
except (RuntimeError, ValueError) as err:
log.error('Error in despike algorithm:')
log.error(str(err))
raise ValueError('Error in despike') from None
hdul['FLUX'].data = spike_data
hdul['ERROR'].data = np.sqrt(var)
# mark trashed frames if desired
if mark_trashed:
if n_frames == 1:
spike_mask[:] = False
else:
for j in range(n_frames):
if j not in good_frames:
spike_mask[j][:] = False
if propagate_nan:
# mark spikes as bad
hdul['MASK'].data = (~spike_mask).astype(int)
hdul['FLUX'].data[~spike_mask] = np.nan
hdul['ERROR'].data[~spike_mask] = np.nan
else:
# just modify mask to match data frames
hdul['MASK'].data = np.zeros(hdul['FLUX'].data.shape,
dtype=int)
hdul['MASK'].data[:] = mask
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
if save:
self.write_output(hdul, outname)
# check for completely trashed file and drop from reduction
if np.all(hdul['MASK'].data):
log.info('')
log.warning(f"All data trashed for file "
f"{hdul[0].header['FILENAME']}; "
f"dropping from reduction.")
else:
results.append(hdul)
self.input = results
self.set_display_data()
[docs]
def debounce(self):
"""
Correct for optical shifts (bounces).
Calls `sofia_redux.instruments.exes.debounce`.
"""
import sofia_redux.instruments.exes.debounce as ed
param = self.get_parameter_set()
save = param.get_value('save')
bounce_factor = param.get_value('bounce_fac')
spectral_bounce = param.get_value('spec_direction')
if bounce_factor == 0:
log.info('Bounce factor is 0; debounce not applied.')
return
results = list()
for i, hdul in enumerate(self.input):
data = hdul['FLUX'].data
variance = hdul['ERROR'].data ** 2
header = hdul[0].header
mask = ~(hdul['MASK'].data.astype(bool))
flat = hdul['FLAT'].data
illum = hdul['FLAT_ILLUMINATION'].data
# store the bounce factor in the header:
# the debounce algorithm will pick it up from there
header['BOUNCE'] = bounce_factor
# get beams
if data.ndim > 2:
n_frames = data.shape[0]
else:
n_frames = 1
a_beams = self._get_beams(header, 'A', n_frames)
b_beams = self._get_beams(header, 'B', n_frames)
# debounce
data = ed.debounce(data, header, a_beams, b_beams, flat,
mask, illum, variance, spectral=spectral_bounce)
# store data
hdul['FLUX'].data = data
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
if save:
self.write_output(hdul, outname)
results.append(hdul)
self.input = results
self.set_display_data()
[docs]
def subtract_nods(self):
"""
Subtract B nods from A nods for background correction.
Calls `sofia_redux.instruments.exes.diff_arr` to do the array
subtraction.
If desired, for INSTMODE = NOD_OFF_SLIT, `exes.cirrus`
may be additionally called to correct for residual background.
"""
from sofia_redux.instruments.exes import cirrus as ec
from sofia_redux.instruments.exes import diff_arr as ed
param = self.get_parameter_set()
save = param.get_value('save')
skip_nod = param.get_value('skip_nod')
subtract_bg = param.get_value('subtract_sky')
subtract_dark = param.get_value('subtract_dark')
# Subtract nods for each image
results = list()
for i, hdul in enumerate(self.input):
header = hdul[0].header
data = hdul['FLUX'].data
variance = hdul['ERROR'].data ** 2
flat = hdul['FLAT'].data
mask = hdul['MASK'].data
if 'DARK' in hdul:
dark = hdul['DARK'].data
else:
dark = None
if data.ndim > 2:
n_frames = data.shape[0]
else:
n_frames = 1
a_beams = self._get_beams(header, 'A', n_frames)
b_beams = self._get_beams(header, 'B', n_frames)
no_dark = (subtract_dark and dark is None)
if no_dark:
log.warning('No dark frame available, skipping '
'nod subtraction instead.')
if skip_nod or no_dark:
if i == 0:
log.info('Skipping sky subtraction')
elif len(b_beams) == 0 and not subtract_dark:
log.info(f'No B beams identified. Not subtracting nods '
f'for {os.path.basename(header["FILENAME"])}')
else:
if subtract_bg:
if header['INSTMODE'] == 'NOD_OFF_SLIT':
log.info('Subtracting continuum signal')
data = ec.cirrus(data, header, a_beams, b_beams, flat)
if subtract_dark and dark is not None:
black_dark = True
if len(b_beams) == 0:
b_beams = a_beams.copy()
else:
black_dark = False
data, variance, mask = ed.diff_arr(
data, header, a_beams, b_beams,
variance, mask=mask,
dark=dark, black_dark=black_dark)
# Get output filenames
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
# remove first empty dimension if necessary
hdul['FLUX'].data = np.squeeze(data)
hdul['ERROR'].data = np.sqrt(np.squeeze(variance))
hdul['MASK'].data = np.squeeze(mask)
# remove dark if present - no longer needed
if 'DARK' in hdul:
del hdul['DARK']
if save:
self.write_output(hdul, outname)
results.append(hdul)
self.input = results
self.set_display_data()
[docs]
def flat_correct(self):
"""
Calibrate and flat correct science data.
Input data must have FLAT extensions attached.
Calls `sofia_redux.instruments.exes.calibrate`.
"""
import sofia_redux.instruments.exes.calibrate as ec
param = self.get_parameter_set()
save = param.get_value('save')
skip_flat = param.get_value('skip_flat')
if skip_flat:
log.info('Skipping flat correction.')
return
results = list()
for i, hdul in enumerate(self.input):
data = hdul['FLUX'].data
variance = hdul['ERROR'].data ** 2
header = hdul[0].header
flat = hdul['FLAT'].data
flat_var = hdul['FLAT_ERROR'].data ** 2
data, variance = ec.calibrate(data, header, flat,
variance, flat_var)
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
hdul['FLUX'].data = data
hdul['ERROR'].data = np.sqrt(variance)
hdinsert(header, 'BUNIT', 'erg s-1 cm-2 sr-1 (cm-1)-1',
'Data units')
hdinsert(hdul[0].header, 'RAWUNITS', 'ct',
'Data units before calibration')
hdinsert(hdul['ERROR'].header, 'BUNIT',
'erg s-1 cm-2 sr-1 (cm-1)-1', 'Data units')
if save:
self.write_output(hdul, outname)
results.append(hdul)
self.input = results
self.set_display_data()
[docs]
def clean_badpix(self):
"""
Clean or flag bad pixels.
Calls `sofia_redux.instruments.exes.clean`.
"""
import sofia_redux.instruments.exes.clean as ec
param = self.get_parameter_set()
save = param.get_value('save')
bp_threshold = param.get_value('bp_threshold')
propagate_nan = param.get_value('propagate_nan')
results = list()
for i, hdul in enumerate(self.input):
data = hdul['FLUX'].data
error = hdul['ERROR'].data
mask = ~(hdul['MASK'].data.astype(bool))
header = hdul[0].header
if data.ndim == 2:
data, error = ec.clean(data, header, error, mask,
threshold=bp_threshold,
propagate_nan=propagate_nan)
else:
nframe = data.shape[0]
for j in range(nframe):
data[j], error[j] = ec.clean(data[j], header,
error[j], mask[j],
threshold=bp_threshold,
propagate_nan=propagate_nan)
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
hdul['FLUX'].data = data
hdul['ERROR'].data = error
if save:
self.write_output(hdul, outname)
results.append(hdul)
self.input = results
self.set_display_data()
[docs]
def undistort(self):
"""
Correct for optical distortion.
Calls `sofia_redux.instruments.exes.tort` to do the distortion
correction and `sofia_redux.instruments.exes.wavecal` to generate
wavenumber and spatial calibration images.
After this step, all spectral images are rotated as needed
to align the spectral axis with the x-axis.
"""
from sofia_redux.instruments.exes.tort import tort
from sofia_redux.instruments.exes.wavecal import wavecal
param = self.get_parameter_set()
save = param.get_value('save')
interpolation_method = param.get_value('interpolation_method')
spline_order = param.get_value('spline_order')
block = param.get_value('block_unilluminated')
results = list()
for i, hdul in enumerate(self.input):
header = hdul[0].header
data = hdul['FLUX'].data
variance = hdul['ERROR'].data ** 2
flat_header = hdul['FLAT'].header
flat = hdul['FLAT'].data
flat_var = hdul['FLAT_ERROR'].data ** 2
# set 0 values in flat to NaN so they don't get
# combined with real data in interpolation
flat[flat == 0] = np.nan
data, variance = tort(data, header, variance, skew=True,
interpolation_method=interpolation_method,
order=spline_order)
flat, flat_var = tort(flat, flat_header, flat_var, skew=True,
interpolation_method=interpolation_method,
order=spline_order)
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
# get wavecal map for undistorted pixels
wave_header = header.copy()
wavemap = wavecal(wave_header)
rotation = wave_header.get('ROTATION', 0)
# remove old keywords if necessary:
del header['ROTATION']
for key in ['BUNIT1', 'BUNIT2', 'BUNIT3', 'ROTATION']:
if key in wave_header:
del wave_header[key]
# rotate data and block any unilluminated pixels
hdul['FLAT'].data = rotate90(flat, rotation)
hdul['FLAT_ERROR'].data = rotate90(np.sqrt(flat_var), rotation)
illum = rotate90(hdul['FLAT_ILLUMINATION'].data, rotation)
hdul['FLAT_ILLUMINATION'].data = illum
order_mask = rotate90(wavemap[2], rotation).astype((int))
out_of_range = (illum != 1) | (order_mask == 0)
if data.ndim == 2:
hdul['FLUX'].data = rotate90(data, rotation)
hdul['ERROR'].data = rotate90(np.sqrt(variance), rotation)
if block:
hdul['FLUX'].data[out_of_range] = np.nan
hdul['ERROR'].data[out_of_range] = np.nan
else:
for j in range(data.shape[0]):
hdul['FLUX'].data[j] = rotate90(data[j], rotation)
hdul['ERROR'].data[j] = rotate90(np.sqrt(variance[j]),
rotation)
if block:
hdul['FLUX'].data[j, out_of_range] = np.nan
hdul['ERROR'].data[j, out_of_range] = np.nan
hdul['MASK'].data = np.zeros(hdul['FLUX'].data.shape)
hdul['MASK'].data[np.isnan(hdul['FLUX'].data)] = 1
wave_header['BUNIT'] = 'cm-1'
hdul.append(fits.ImageHDU(
rotate90(wavemap[0], rotation),
wave_header, name='WAVECAL'))
wave_header['BUNIT'] = 'arcsec'
hdul.append(fits.ImageHDU(
rotate90(wavemap[1], rotation),
wave_header, name='SPATCAL'))
wave_header['BUNIT'] = ''
hdul.append(fits.ImageHDU(
rotate90(wavemap[2], rotation).astype((int)),
wave_header, name='ORDER_MASK'))
if save:
self.write_output(hdul, outname)
results.append(hdul)
self.input = results
self.set_display_data()
[docs]
def correct_calibration(self):
"""
Correct calibration for blackbody variation by wavenumber.
The `flat_correct` step calibrates the spectral flux to
physical units based on the blackbody function at the
central wavenumber in the flat. This step corrects the
calibration for the variation of the blackbody with respect
to wavenumber values at each pixel in the spectral image.
Calls `sofia_redux.instruments.exes.makeflat.bnu` on wavenumber
values in the WAVECAL extension to determine a correction image
for the FLUX. This correction image is directly multiplied into
the FLUX data array.
"""
from sofia_redux.instruments.exes.makeflat import bb_cal_factor
param = self.get_parameter_set()
save = param.get_value('save')
skip_correction = param.get_value('skip_correction')
if skip_correction:
log.info('Skipping calibration correction.')
return
results = list()
for i, hdul in enumerate(self.input):
header = hdul[0].header
data = hdul['FLUX'].data
error = hdul['ERROR'].data
wavecal = hdul['WAVECAL'].data
# use the wavecal map to correct the blackbody calibration
# by wavenumber value
wno0 = parse_central_wavenumber(header)
norm = bb_cal_factor(wno0, header['BB_TEMP'],
header['FLATTAMB'], header['FLATEMIS'])
bb_image = bb_cal_factor(wavecal, header['BB_TEMP'],
header['FLATTAMB'], header['FLATEMIS'])
hdul['FLUX'].data = bb_image * data / norm
hdul['ERROR'].data = bb_image * error / norm
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
if save:
self.write_output(hdul, outname)
results.append(hdul)
self.input = results
self.set_display_data()
def _blank_frames(self, bad_frames):
"""
Set bad frames in input data cubes to NaN.
Parameters
----------
bad_frames : list of int
Indices for bad frames.
"""
excluded = False
for i, hdul in enumerate(self.input):
data = hdul['FLUX'].data
error = hdul['ERROR'].data
filename = hdul[0].header.get('FILENAME', 'UNKNOWN')
if data.ndim > 2:
n_frames = data.shape[0]
else:
n_frames = 1
for bad_frame in bad_frames[i]:
n = int(bad_frame)
if 1 <= n <= n_frames:
log.info(f'Excluding frame {n} from file {filename}')
excluded = True
if n_frames == 1:
data[:] = np.nan
error[:] = np.nan
else:
data[n - 1] = np.nan
error[n - 1] = np.nan
if not excluded:
log.warning('No valid frame numbers passed; '
'no exclusions performed.')
log.info('')
def _split_nod_pairs(self):
"""Split each input file into new files, one per frame."""
results = []
filenums = []
for i, hdul in enumerate(self.input):
data = hdul['FLUX'].data
error = hdul['ERROR'].data
flat = hdul['FLAT'].data
illum = hdul['FLAT_ILLUMINATION'].data
base_filenum = self.filenum[i]
if data.ndim > 2:
n_frames = data.shape[0]
else:
n_frames = 1
if n_frames == 1:
results.append(hdul)
filenums.append(base_filenum)
else:
for j in range(n_frames):
new_hdul = copy.deepcopy(hdul)
new_hdul['FLUX'].data = data[j]
new_hdul['ERROR'].data = error[j]
new_hdul['FLAT'].data = flat.copy()
new_hdul['FLAT_ILLUMINATION'].data = illum.copy()
new_filenum = f'{base_filenum}_{j + 1}'
self.update_output(new_hdul, new_filenum,
new_hdul[0].header['PRODTYPE'])
results.append(new_hdul)
filenums.append(new_filenum)
self.input = results
self.filenum = filenums
def _combine_nod_pairs(self, mask2d=False):
"""Combine all input files into a single file, stacking frames."""
# nothing to do if less than 2 files
n_files = len(self.input)
if n_files < 2:
return
# start with the first input file
new_hdul = copy.deepcopy(self.input[0])
# loop through the rest, appending data and error values
new_data = []
new_error = []
new_mask = []
headers = []
for i, hdul in enumerate(self.input):
data = hdul['FLUX'].data
error = hdul['ERROR'].data
mask = hdul['MASK'].data
if data.ndim == 2:
data = np.array([data])
error = np.array([error])
if not mask2d:
mask = np.array([mask])
if i > 0:
if data.shape[-2:] != new_data[0].shape[-2:]:
raise ValueError(f"Data in {hdul[0].header['FILENAME']} "
f"does not match dimensions in "
f"{new_hdul[0].header['FILENAME']}; "
f"files cannot be coadded.")
new_data.append(data)
new_error.append(error)
new_mask.append(mask)
headers.append(hdul[0].header)
# stack the arrays along the first axis
new_hdul['FLUX'].data = np.concatenate(new_data, axis=0)
new_hdul['ERROR'].data = np.concatenate(new_error, axis=0)
if mask2d:
new_hdul['MASK'].data = np.any(new_mask, axis=0).astype(int)
else:
new_hdul['MASK'].data = np.concatenate(new_mask, axis=0)
# merge all headers
new_hdul[0].header = mergehdr(headers)
self.input = [new_hdul]
# all file numbers now used for single file
self.filenum = [self.filenum]
[docs]
def coadd_pairs(self):
"""
Coadd nod pairs.
Calls `sofia_redux.instruments.exes.coadd` to mean combine input
frames in each file.
Optionally, all files may be combined into a single file
to be coadded together. Aternatively, all files may be split
into separate files, one per frame, so that no frames are coadded.
Prior to the coadd, it is possible to apply a small spatial shift
to align frames (`exes.spatial_shift`). It is also possible to
subtract the mean value at each column to correct residual background
levels, if INSTMODE = NOD_ON_SLIT.
To inspect the effect of these options, it is possible to save
intermediate pre-coadd files to disk. If saved, they will have
PRODTYPE = coadd_input and file code COI.
"""
from sofia_redux.instruments.exes.coadd import coadd
from sofia_redux.instruments.exes.spatial_shift import spatial_shift
from sofia_redux.instruments.exes.submean import submean
param = self.get_parameter_set()
save = param.get_value('save')
save_intermediate = param.get_value('save_intermediate')
subtract_sky = param.get_value('subtract_sky')
do_shift = param.get_value('shift')
shift_method = str(param.get_value('shift_method')).lower().strip()
skip_coadd = param.get_value('skip_coadd')
coadd_all = param.get_value('coadd_all_files')
exclude_pairs = param.get_value('exclude_pairs')
threshold = param.get_value('threshold')
weight_method = str(param.get_value('weight_method')).lower().strip()
override_weights = param.get_value('override_weights')
# parse weighting method from:
# options = ['Uniform weights', 'Weight by flat',
# 'Weight by variance']
if 'flat' in weight_method:
weight_mode = None
stdwt = False
elif 'variance' in weight_method:
weight_mode = None
stdwt = True
else:
weight_mode = 'unweighted'
stdwt = True
# parse shift method from:
# options = ['Maximize signal-to-noise',
# 'Maximize signal (sharpen)']
if 'sharp' in shift_method:
sharpen = True
else:
sharpen = False
# set any bad frames to NaN
if str(exclude_pairs).strip() != '':
error_message = ['Could not read exclude pairs '
f"parameter: '{exclude_pairs}'",
'Values should be comma-separated integers, '
'starting with 1 for the first frame.',
'To specify different values for different '
'input files, provide a semi-colon separated '
'list matching the number of input files.']
bad_frames = parse_apertures(exclude_pairs, len(self.input),
error_message=error_message,
allow_empty=True)
self._blank_frames(bad_frames)
# parse override weights if provided
overrides = None
if str(override_weights).strip() != '':
error_message = [f'Could not read override weights '
f"parameter: '{override_weights}'",
'Values should be comma-separated numbers, '
'one for each frame.',
'To specify different values for different '
'input files, provide a semi-colon separated '
'list matching the number of input files.']
overrides = parse_apertures(override_weights, len(self.input),
error_message=error_message,
allow_empty=False)
# split or combine nod pairs across files if desired
if skip_coadd:
self._split_nod_pairs()
# weights are now irrelevant
overrides = None
elif coadd_all:
self._combine_nod_pairs()
# flatten weights list
if overrides is not None:
all_weights = []
for w in overrides:
all_weights.extend(w)
overrides = [all_weights]
# loop through all files, coadding nod pairs within each
results = list()
for i, hdul in enumerate(self.input):
log.info('')
header = hdul[0].header
data = hdul['FLUX'].data
variance = hdul['ERROR'].data ** 2
flat = hdul['FLAT'].data
illum = hdul['FLAT_ILLUMINATION'].data
order_mask = hdul['ORDER_MASK'].data
# get number of frames, reshape data if necessary
if data.ndim > 2:
n_frames = data.shape[0]
else:
n_frames = 1
data = np.array([data])
variance = np.array([variance])
# subtract background before coadd if desired
instmode = str(header.get('INSTMODE')).lower()
if subtract_sky and 'on_slit' in instmode:
data = submean(data, header, flat, illum, order_mask)
# get good frames
frame_mask = np.any(~np.isnan(data), axis=(1, 2))
good_frames = np.arange(n_frames)[frame_mask]
# shift data before coadd if desired
if do_shift and n_frames > 1:
data, variance = spatial_shift(
data, header, flat, variance,
illum=illum, good_frames=good_frames, sharpen=sharpen)
log.info('')
# save modified data prior to coadd, if desired
if save_intermediate:
log.info('Saving intermediate file:')
coi_hdul = copy.deepcopy(hdul)
outname = self.update_output(coi_hdul, self.filenum[i],
self.prodtypes[self.step_index])
coi_header = coi_hdul[0].header
coi_outname = outname.replace('COA', 'COI')
coi_header['FILENAME'] = os.path.basename(coi_outname)
coi_header['PRODTYPE'] = 'coadd_input'
coi_hdul[0].header = coi_header
coi_hdul['FLUX'].data = data.copy()
coi_hdul['ERROR'].data = np.sqrt(variance)
self.write_output(coi_hdul, coi_outname)
log.info('')
if overrides is not None:
weights = (np.array(overrides[i], dtype=float)
/ np.sum(overrides[i]))
mode = 'useweights'
else:
weights = None
mode = weight_mode
cdata, cvar = coadd(data, header, flat, variance,
threshold=threshold,
illum=illum, good_frames=good_frames,
std_wt=stdwt, weight_mode=mode,
weights=weights)
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
hdul['FLUX'].data = cdata
hdul['ERROR'].data = np.sqrt(cvar)
hdul['MASK'].data = np.zeros(hdul['FLUX'].data.shape)
hdul['MASK'].data[np.isnan(hdul['FLUX'].data)] = 1
if save:
self.write_output(hdul, outname)
results.append(hdul)
self.input = results
self.set_display_data()
[docs]
def convert_units(self):
"""
Convert calibrated units to flux in Jy/pixel.
Uses the OMEGAP keyword to correct for the solid angle per pixel.
"""
param = self.get_parameter_set()
save = param.get_value('save')
skip_conversion = param.get_value('skip_conversion')
cal_factor = param.get_value('cal_factor')
zero_level = param.get_value('zero_level')
if skip_conversion:
log.info('Skipping unit conversion.')
return
results = list()
for i, hdul in enumerate(self.input):
conversion = hdul[0].header['OMEGAP'] * 1e21 / const.c.value
for extname in ['FLUX', 'ERROR', 'FLAT', 'FLAT_ERROR']:
hdu = hdul[extname]
hdu.data *= conversion * cal_factor
if 'ERROR' not in extname:
hdu.data += zero_level
if 'FLAT' in extname:
hdu.header['BUNIT'] = 'Jy/(pixel ct)'
else:
hdu.header['BUNIT'] = 'Jy/pixel'
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
if save: # pragma: no cover
self.write_output(hdul, outname)
results.append(hdul)
self.input = results
self.set_display_data()
# spectroscopy module pipeline steps and helpers
[docs]
def make_profiles(self):
"""
Produce spatial profile fits from rectified images.
The rectified images and profiles are stored in self.input
for continued processing.
Calls `sofia_redux.spectroscopy.mkspatprof` and
`sofia_redux.spectroscopy.rectify`.
"""
from sofia_redux.spectroscopy.mkspatprof import mkspatprof
from sofia_redux.spectroscopy.rectify import rectify
# get parameters
param = self.get_parameter_set()
fit_order = param.get_value('fit_order')
bg_sub = param.get_value('bg_sub')
# loop through input, making profiles
results = []
for i, hdul in enumerate(self.input):
header = hdul[0].header
data = hdul['FLUX'].data
var = hdul['ERROR'].data ** 2
omask = hdul['ORDER_MASK'].data
wavecal = hdul['WAVECAL'].data
spatcal = hdul['SPATCAL'].data
flat = hdul['FLAT'].data
log.info(f"Input: {header['FILENAME']}")
log.info(f"Profiling {header['NORDERS']} order(s)")
# extract rectified images for each order
units = fits.Header({'XUNITS': 'cm-1'})
rectimg = rectify(data, omask, wavecal, spatcal,
variance=var, poly_order=0, header=units)
# make the spatial profile from the rectified images
medprof, fitprof = \
mkspatprof(rectimg, return_fit_profile=True,
ndeg=fit_order, bgsub=bg_sub, smooth_sigma=None,
robust=3.0)
# extract matching rectified flat images for each order
rectflat = rectify(flat, omask, wavecal, spatcal, poly_order=0)
# keep the existing extensions, add rectified data for each order
for order in sorted(rectimg.keys()):
ordnum = f'{order:02d}'
rectified = rectimg[order]
rectified_flat = rectflat[order]
if rectified is None or rectified_flat is None:
raise ValueError('Problem in rectification.')
new_image = rectified['image']
new_error = np.sqrt(rectified['variance'])
new_flat = rectified_flat['image']
# make an extension header with spectral WCS
exthead = fits.Header()
for card in rectified['header'].cards:
hdinsert(exthead, card.keyword, card.value, card.comment)
# flux, error, flat
hdinsert(exthead, 'BUNIT',
header.get('BUNIT', 'UNKNOWN'),
'Data units')
hdul.append(fits.ImageHDU(new_image, exthead,
name=f'FLUX_ORDER_{ordnum}'))
hdul.append(fits.ImageHDU(new_error, exthead,
name=f'ERROR_ORDER_{ordnum}'))
hdinsert(exthead, 'BUNIT',
hdul['FLAT'].header.get('BUNIT', 'UNKNOWN'),
'Data units')
hdul.append(fits.ImageHDU(new_flat, exthead,
name=f'FLAT_ORDER_{ordnum}'))
# append extra information from rectification
boolmask = rectified['mask']
mask = np.zeros(boolmask.shape, dtype=int)
mask[~boolmask] = 1
hdinsert(exthead, 'BUNIT', '', 'Data units')
hdul.append(fits.ImageHDU(mask, exthead,
name=f'BADMASK_ORDER_{ordnum}'))
exthead = fits.Header()
xunit = 'cm-1'
hdinsert(exthead, 'BUNIT', xunit, 'Data units')
hdul.append(fits.ImageHDU(rectified['wave'], exthead,
name=f'WAVEPOS_ORDER_{ordnum}'))
yunit = 'arcsec'
hdinsert(exthead, 'BUNIT', yunit, 'Data units')
hdul.append(fits.ImageHDU(rectified['spatial'], exthead,
name=f'SLITPOS_ORDER_{ordnum}'))
# add profiles to HDUList
hdinsert(exthead, 'BUNIT', '', 'Data units')
hdul.append(
fits.ImageHDU(fitprof[order], exthead,
name=f'SPATIAL_MAP_ORDER_{ordnum}'))
hdul.append(
fits.ImageHDU(medprof[order], exthead,
name=f'SPATIAL_PROFILE_ORDER_{ordnum}'))
# remove flat error and illumination -
# no longer needed
if 'FLAT_ERROR' in hdul:
del hdul['FLAT_ERROR']
if 'FLAT_ILLUMINATION' in hdul:
del hdul['FLAT_ILLUMINATION']
# save if desired
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
if param.get_value('save'):
self.write_output(hdul, outname)
results.append(hdul)
log.info('')
self.input = results
self.set_display_data()
[docs]
def locate_apertures(self):
"""
Automatically find aperture centers.
Calls `sofia_redux.spectroscopy.findapertures`.
"""
from sofia_redux.spectroscopy.findapertures import find_apertures
# get parameters
param = self.get_parameter_set()
method = param.get_value('method')
num_aps = param.get_value('num_aps')
input_position = param.get_value('input_position')
fwhm_par = param.get_value('fwhm')
exclude_orders = param.get_value('exclude_orders')
if str(method).strip().lower() == 'fix to center':
log.info('Fixing aperture to slit center.')
positions = None
fix_ap = True
num_aps = 1
elif str(method).strip().lower() == 'fix to input':
log.info('Fixing aperture to input positions.')
positions = parse_apertures(
input_position, len(self.input))
fix_ap = True
else:
log.info('Finding aperture positions from Gaussian fits.')
if str(input_position).strip() == '':
positions = None
else:
positions = parse_apertures(
input_position, len(self.input))
fix_ap = False
# parse orders, applying to all files
exclude = list()
if str(exclude_orders).strip() != '':
try:
exclude = [int(i) for i in exclude_orders.split(',')]
except (ValueError, TypeError, IndexError):
raise ValueError('Invalid order exclusion '
'parameter.') from None
log.info('')
log.info('Apertures:')
profile = dict()
if positions is not None:
guess = dict()
npeaks = len(positions[0])
else:
guess = None
npeaks = num_aps
results = []
fit_fwhm = []
yunit = 'arcsec'
for i, hdul in enumerate(self.input):
for j in range(hdul[0].header['NORDERS']):
if j + 1 in exclude:
log.info(f'Skipping order {j + 1:02d}')
continue
ordnum = f'{j + 1:02d}'
profile[j + 1] = [hdul[f'SLITPOS_ORDER_{ordnum}'].data,
hdul[f'SPATIAL_PROFILE_ORDER_{ordnum}'].data]
if positions is not None:
guess[j + 1] = positions[i]
apertures = find_apertures(profile, npeaks=npeaks, positions=guess,
fwhm=fwhm_par, fix=fix_ap,
box_width=('stddev', 3))
# update output name
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
log.info('')
log.info(f" {hdul[0].header['FILENAME']}")
for order in sorted(apertures.keys()):
ordnum = f'{order:02d}'
appos = []
apsign = []
apfwhm = []
fwhm_list = []
for ap in apertures[order]:
pos = ap['position']
sign = ap['sign']
fwhm = ap['fwhm']
appos.append(f'{pos:.3f}')
apsign.append(f'{sign:d}')
apfwhm.append(f'{fwhm:.3f}')
if fix_ap:
log.info(' Order {}: {:.3f} {} '
'(sign: {})'.format(ordnum, pos, yunit, sign))
else:
log.info(' Order {}: {:.3f} {} '
'(sign: {}, fit '
'FWHM: {:.3f})'.format(ordnum, pos, yunit,
sign, fwhm))
fwhm_list.append(fwhm)
fit_fwhm.extend(fwhm_list)
# add apertures to primary header
hdinsert(hdul[0].header, f'APPOSO{ordnum}', ','.join(appos),
comment=f'Aperture positions [{yunit}]')
hdinsert(hdul[0].header, f'APSGNO{ordnum}', ','.join(apsign),
comment='Aperture signs')
# add FWHM to primary header
if not fix_ap:
comment = f'Fit aperture FWHM [{yunit}]'
else:
comment = f'Assumed aperture FWHM [{yunit}]'
hdinsert(hdul[0].header,
f'APFWHM{ordnum}', ','.join(apfwhm),
comment=comment)
# also add to order flux extension header,
# labeled as order 1
exthead = hdul[f'FLUX_ORDER_{ordnum}'].header
hdinsert(exthead, 'APPOSO01', ','.join(appos),
comment=f'Aperture positions [{yunit}]')
hdinsert(exthead, 'APSGNO01', ','.join(apsign),
comment='Aperture signs')
hdinsert(exthead, 'APFWHM01', ','.join(apfwhm),
comment=comment)
log.info('')
# save if desired
if param.get_value('save'):
self.write_output(hdul, outname)
results.append(hdul)
if len(fit_fwhm) > 0:
log.info('')
log.info(f'Mean fit FWHM: '
f'{np.mean(fit_fwhm):.2f} '
f'+/- {np.std(fit_fwhm):.2f} {yunit}')
self.input = results
self.set_display_data()
[docs]
def set_apertures(self):
"""
Set aperture radii.
Calls `sofia_redux.spectroscopy.getapertures` and
`sofia_redux.spectroscopy.mkapmask`.
"""
from sofia_redux.spectroscopy.getapertures import get_apertures
from sofia_redux.spectroscopy.mkapmask import mkapmask
# get parameters
param = self.get_parameter_set()
full_slit = param.get_value('full_slit')
refit = param.get_value('refit')
apsign_list = param.get_value('apsign')
aprad_list = param.get_value('aprad')
psfrad_list = param.get_value('psfrad')
bgr_list = param.get_value('bgr')
apstart_list = param.get_value('ap_start')
apend_list = param.get_value('ap_end')
manual_ap = (str(apstart_list).strip().lower() != ''
and str(apend_list).strip().lower() != '')
if manual_ap:
full_slit = False
fix_apsign, fix_aprad, fix_psfrad, fix_bgr, fix_ap = \
False, False, False, False, False
if not full_slit:
if str(apsign_list).strip().lower() != '':
apsign_list = parse_apertures(apsign_list,
len(self.input))
fix_apsign = True
if str(aprad_list).strip().lower() != '':
aprad_list = parse_apertures(aprad_list,
len(self.input))
fix_aprad = True
if str(psfrad_list).strip().lower() != '':
psfrad_list = parse_apertures(psfrad_list,
len(self.input))
fix_psfrad = True
if str(bgr_list).strip().lower() != '':
if str(bgr_list).strip().lower() == 'none':
bgr_list = []
fix_bgr = True
else:
bgr_list = parse_bg(bgr_list, len(self.input))
fix_bgr = True
# override apertures if start/end set
if manual_ap:
apstart_list = parse_apertures(apstart_list,
len(self.input))
apend_list = parse_apertures(apend_list,
len(self.input))
fix_aprad = False
fix_psfrad = False
fix_ap = True
results = []
for i, hdul in enumerate(self.input):
log.info('')
header = hdul[0].header
log.info(header['FILENAME'])
# loop over orders
for j in range(header['NORDERS']):
ordnum = f'{j + 1:02d}'
# check for excluded order
appos_key = f'APPOSO{ordnum}'
if appos_key not in header:
continue
log.info('')
log.info(f'Order {ordnum}:')
# retrieve data from input
space = hdul[f'SLITPOS_ORDER_{ordnum}'].data
wave = hdul[f'WAVEPOS_ORDER_{ordnum}'].data
profile = hdul[f'SPATIAL_PROFILE_ORDER_{ordnum}'].data
appos = parse_apertures(header[f'APPOSO{ordnum}'], 1)[0]
apfwhm = parse_apertures(header[f'APFWHM{ordnum}'], 1)[0]
if full_slit:
# fix to full aperture, assume positive, no background
half_slit = (space.max() - space.min()) / 2
ap = {'position': half_slit,
'fwhm': apfwhm[0],
'sign': 1,
'psf_radius': half_slit,
'aperture_radius': half_slit}
aplist = [ap]
aperture_regions = {'apertures': aplist,
'background': {'regions': []}}
else:
if fix_apsign:
apsign = apsign_list[i]
else:
apsign = parse_apertures(
header[f'APSGNO{ordnum}'], 1)[0]
aplist = []
for k, pos in enumerate(appos):
ap = {'position': pos,
'fwhm': apfwhm[k]}
if len(apsign) > k:
ap['sign'] = apsign[k]
else: # pragma: no cover
ap['sign'] = apsign[-1]
if fix_ap:
# start and end explicitly set:
# update position to center of range and set
# radius to half of range
if len(apstart_list[i]) > k:
ap_start = apstart_list[i][k]
else: # pragma: no cover
ap_start = aprad_list[i][-1]
if len(apend_list[i]) > k:
ap_end = apend_list[i][k]
else: # pragma: no cover
ap_end = apend_list[i][-1]
half_aprad = (ap_end - ap_start) / 2
ap['position'] = half_aprad + ap_start
ap['psf_radius'] = half_aprad
ap['aperture_radius'] = half_aprad
else:
if fix_aprad:
if len(aprad_list[i]) > k:
ap['aperture_radius'] = aprad_list[i][k]
else: # pragma: no cover
ap['aperture_radius'] = aprad_list[i][-1]
if fix_psfrad:
if len(psfrad_list[i]) > k:
ap['psf_radius'] = psfrad_list[i][k]
else: # pragma: no cover
ap['psf_radius'] = psfrad_list[i][-1]
aplist.append(ap)
apertures = {j: aplist}
profiles = {j: np.vstack([space, profile])}
if fix_bgr:
aperture_regions = get_apertures(profiles, apertures,
get_bg=False,
refit_fwhm=refit)[j]
if len(bgr_list) > i:
aperture_regions['background'] = {
'regions': bgr_list[i]}
else:
aperture_regions['background'] = {'regions': []}
else:
aperture_regions = get_apertures(profiles, apertures,
refit_fwhm=refit)[j]
# log aperture values
apsign, aprad, appsfrad, header_pos = [], [], [], []
for k, ap in enumerate(aperture_regions['apertures']):
pos = ap['position']
sign = '{:d}'.format(int(ap['sign']))
rad = '{:.3f}'.format(ap['aperture_radius'])
psfrad = '{:.3f}'.format(ap['psf_radius'])
apsign.append(sign)
aprad.append(rad)
appsfrad.append(psfrad)
header_pos.append(f'{pos:.3f}')
log.info(' Aperture {}:'.format(k))
log.info(' position: {:.3f}'.format(pos))
log.info(' sign: {}'.format(sign))
log.info(' PSF radius: {}'.format(psfrad))
log.info(' aperture radius: {}'.format(rad))
log.info('')
# also add the trace into the aperture, for use
# in making an aperture mask
# For EXES, this is always fixed to the aperture position
ap['trace'] = np.array([pos] * len(wave))
log.info('Background regions:')
bgr = []
for start, stop in aperture_regions['background']['regions']:
reg = '{:.3f}-{:.3f}'.format(start, stop)
bgr.append(reg)
log.info(' {}'.format(reg))
if len(bgr) == 0:
log.info(' (None)')
log.info('')
# add to header
hdinsert(header, f'APPOSO{ordnum}', ','.join(header_pos),
comment='Aperture positions [arcsec]')
hdinsert(header, f'APSGNO{ordnum}', ','.join(apsign),
comment='Aperture signs')
hdinsert(header, f'APRADO{ordnum}', ','.join(aprad),
comment='Aperture radii [arcsec]')
hdinsert(header, f'PSFRAD{ordnum}', ','.join(appsfrad),
comment='Aperture PSF radii [arcsec]')
hdinsert(header, f'BGR_O{ordnum}', ','.join(bgr),
comment='Aperture background regions [arcsec]')
# also add to order flux extension header,
# labeled as order 1
exthead = hdul[f'FLUX_ORDER_{ordnum}'].header
hdinsert(exthead, 'APPOSO01', ','.join(header_pos),
comment='Aperture positions [arcsec]')
hdinsert(exthead, 'APSGNO01', ','.join(apsign),
comment='Aperture signs')
hdinsert(exthead, 'APRADO01', ','.join(aprad),
comment='Aperture radii [arcsec]')
hdinsert(exthead, 'PSFRAD01', ','.join(appsfrad),
comment='Aperture PSF radii [arcsec]')
hdinsert(exthead, 'BGR_O01', ','.join(bgr),
comment='Aperture background regions [arcsec]')
# make aperture mask and append to hdul
if full_slit:
# use the whole slit
apmask = np.full((len(space), len(wave)), -1.0)
# but down weight the first and last row
apmask[0, :] = -0.5
apmask[-1, :] = -0.5
else:
apmask = mkapmask(
space, wave, aperture_regions['apertures'],
aperture_regions['background']['regions'])
exthead = hdul[f'BADMASK_ORDER_{ordnum}'].header.copy()
# insert aperture mask after spatial profile for order
idx = hdul.index_of(f'SPATIAL_PROFILE_ORDER_{ordnum}')
hdul.insert(idx + 1,
fits.ImageHDU(
data=np.array(apmask),
header=exthead,
name=f'APERTURE_MASK_ORDER_{ordnum}'))
# save if desired
# update output name
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
if param.get_value('save'):
self.write_output(hdul, outname)
results.append(hdul)
self.input = results
self.set_display_data()
[docs]
def subtract_background(self):
"""
Subtract background along columns.
Calls `sofia_redux.spectroscopy.extspec.col_subbg`.
"""
from sofia_redux.spectroscopy.extspec import col_subbg
# get parameters
param = self.get_parameter_set()
skip_bg = param.get_value('skip_bg')
bg_fit_order = param.get_value('bg_fit_order')
threshold = param.get_value('threshold')
if skip_bg:
log.info('No background subtraction performed.')
return
results = []
for i, hdul in enumerate(self.input):
header = hdul[0].header
log.info('')
log.info(header['FILENAME'])
# loop over orders
for j in range(header['NORDERS']):
ordnum = f'{j + 1:02d}'
# check for excluded order
appos_key = f'APPOSO{ordnum}'
if appos_key not in header:
continue
log.info('')
log.info(f'Order {ordnum}')
# retrieve data from input
image = hdul[f'FLUX_ORDER_{ordnum}'].data
err = hdul[f'ERROR_ORDER_{ordnum}'].data
mask = (hdul[f'BADMASK_ORDER_{ordnum}'].data < 1)
space = hdul[f'SLITPOS_ORDER_{ordnum}'].data
apmask = hdul[f'APERTURE_MASK_ORDER_{ordnum}'].data
has_bg = np.any(np.isnan(apmask))
if not has_bg:
log.info('No background regions defined.')
else:
# correct each column for background identified in apmask
# image and error are corrected in place
nwave = image.shape[1]
for wavei in range(nwave):
with set_log_level('CRITICAL'):
bg_subtracted_col = col_subbg(
space, image[:, wavei], err[:, wavei]**2,
apmask[:, wavei], mask[:, wavei],
bg_fit_order, robust=threshold)
if bg_subtracted_col is not None:
# result is flux, variance, coefficients
image[:, wavei] = bg_subtracted_col[0]
err[:, wavei] = np.sqrt(bg_subtracted_col[1])
# update output name
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
# save if desired
log.info('')
if param.get_value('save'):
self.write_output(hdul, outname)
results.append(hdul)
log.info('')
self.input = results
self.set_display_data()
@staticmethod
def _make_1d(hdul):
"""
Make 1d spectrum file.
Parameters
----------
hdul : astropy.io.fits.HDUList
File containing 2D and 1D spectral extensions.
Returns
-------
spectrum : astropy.io.fits.HDUList
New HDU list in Spextool style, containing 1D spectra.
"""
header = hdul[0].header
spechdr = header.copy()
# add some spextool-required header keywords
bunit = hdul[0].header.get('BUNIT', 'UNKNOWN').replace('/pixel', '')
hdinsert(spechdr, 'XUNITS', 'cm-1', 'Spectral wavelength units')
hdinsert(spechdr, 'YUNITS', bunit, 'Spectral flux units')
try:
del spechdr['BUNIT']
except KeyError: # pragma: no cover
pass
# loop over orders
specset = []
first = True
max_length = 0
orders = []
for j in range(header['NORDERS']):
ordnum = f'{j + 1:02d}'
# check for excluded order
appos_key = f'APPOSO{ordnum}'
if appos_key not in header:
continue
# keep actually used orders
orders.append(j + 1)
suffix = f'_ORDER_{ordnum}'
if (header['NORDERS'] == 1
and f'SPECTRAL_FLUX{suffix}' not in hdul):
suffix = ''
if hdul[f'SPECTRAL_FLUX{suffix}'].data.ndim > 1:
naps = hdul[f'SPECTRAL_FLUX{suffix}'].data.shape[0]
else:
naps = 1
if first:
hdinsert(spechdr, 'NAPS', naps, 'Number of apertures')
first = False
wave = hdul[f'WAVEPOS{suffix}'].data
disp = np.nanmean(wave[1:] - wave[:-1])
hdinsert(spechdr, f'DISPO{ordnum}', disp,
'Dispersion [cm-1 pixel-1]')
if len(wave) > max_length:
max_length = len(wave)
# transmission rows include additional data for various
# molecular species -- use the first only,
# which contains total transmission.
trans = hdul[f'TRANSMISSION{suffix}'].data
if trans.ndim > 1:
trans = trans[0]
for n in range(naps):
if naps > 1:
speclist = [hdul[f'WAVEPOS{suffix}'].data,
hdul[f'SPECTRAL_FLUX{suffix}'].data[n],
hdul[f'SPECTRAL_ERROR{suffix}'].data[n],
trans,
hdul[f'RESPONSE{suffix}'].data[n]]
else:
speclist = [hdul[f'WAVEPOS{suffix}'].data,
hdul[f'SPECTRAL_FLUX{suffix}'].data,
hdul[f'SPECTRAL_ERROR{suffix}'].data,
trans,
hdul[f'RESPONSE{suffix}'].data]
specdata = np.vstack(speclist)
specset.append(specdata)
# update ORDERS and NORDERS in header to actually used values
spechdr['ORDERS'] = ','.join([str(n) for n in orders])
spechdr['NORDERS'] = len(orders)
nspec = len(specset)
spec_array = np.full((nspec, 5, max_length), np.nan)
for i, data in enumerate(specset):
spec_array[i, :, :data.shape[1]] = data
# remove any empty dimensions
spec_array = np.squeeze(spec_array)
spec = fits.HDUList(fits.PrimaryHDU(data=spec_array,
header=spechdr))
return spec
@staticmethod
def _get_atran(header, atranfile, atrandir):
"""
Get transmission data with wavenumber units.
Calls `sofia_redux.instruments.exes.get_atran`.
Parameters
----------
header : astropy.io.fits.Header
Header containing altitude and ZA information, for matching
to model files.
atranfile : str or None
If provided, is used to directly specify the model file to
use, rather than determining a best match from the transmission
directory.
atrandir : str or None
If provided, is used as the source library for model files.
If not provided, the default directory used is
sofia_redux/instruments/exes/data/transmission.
Returns
-------
atran : numpy.ndarray
Array with shape (m, n), where n is the number of data
points in the input model file and m is the number of
rows containing transmission data. The first row is
wavenumber; the second is total fractional atmospheric
transmission at that wavenumber. Third and subsequent rows
are transmission for other specific molecular species.
"""
from sofia_redux.instruments.exes.get_atran import get_atran
resolution = header['RP']
atran = get_atran(header, resolution, filename=atranfile,
atran_dir=atrandir)
if atran is None:
if atrandir is None:
msg = 'No matching transmission files.'
log.warning(msg)
raise ValueError(msg)
else:
log.info('')
log.info(f'Model file not found in {atrandir}; '
f'trying default')
# if not found, try the default directory
atran = get_atran(header, resolution, filename=atranfile,
atran_dir=None)
if atran is None:
msg = 'No matching transmission files.'
log.warning(msg)
raise ValueError(msg)
return atran
[docs]
def combine_spectra(self):
"""
Combine spectra.
Calls `sofia_redux.toolkit.image.combine.combine_images`
for coaddition. The combination method may be configured in
parameters.
"""
# get parameters
param = self.get_parameter_set()
method = param.get_value('method')
weighted = param.get_value('weighted')
robust = param.get_value('robust')
sigma = param.get_value('threshold')
maxiters = param.get_value('maxiters')
combine_aps = param.get_value('combine_aps')
hdr_list = list()
data_list = list()
var_list = list()
exp_list = list()
spectral_sets = dict()
test_wave = dict()
display_files = list()
for i, hdul in enumerate(self.input):
header = hdul[0].header
hdr_list.append(header)
data_list.append(hdul['FLUX'].data)
var_list.append(hdul['ERROR'].data ** 2)
exp_list.append(np.full_like(hdul['FLUX'].data,
hdul[0].header.get('EXPTIME', 0.0)))
for j in range(header['NORDERS']):
ordnum = f'{j + 1:02d}'
# check for excluded order
appos_key = f'APPOSO{ordnum}'
if appos_key not in header:
continue
# check for same wavelength solution
wavepos = hdul[f'WAVEPOS_ORDER_{ordnum}'].data
if i == 0:
test_wave[ordnum] = wavepos
# initialize lists for order
spectral_sets[ordnum] = {'flux': list(),
'var': list(),
'specflux': list(),
'specvar': list(),
'response': list()}
else:
if (ordnum not in test_wave
or not np.allclose(wavepos, test_wave[ordnum])):
msg = 'Mismatched wavenumbers or orders. ' \
'Spectra cannot be combined.'
log.error(msg)
raise ValueError(msg)
# append images to lists
spectral_sets[ordnum]['flux'].append(
hdul[f'FLUX_ORDER_{ordnum}'].data)
spectral_sets[ordnum]['var'].append(
hdul[f'ERROR_ORDER_{ordnum}'].data ** 2)
# gather spectra, accounting for possible multiple apertures
n_ap = 1
specflux = hdul[f'SPECTRAL_FLUX_ORDER_{ordnum}'].data
specvar = hdul[f'SPECTRAL_ERROR_ORDER_{ordnum}'].data ** 2
specresp = hdul[f'RESPONSE_ORDER_{ordnum}'].data
if specflux.ndim > 1:
n_ap = specflux.shape[0]
if n_ap == 1:
specflux = specflux[0]
specvar = specvar[0]
specresp = specresp[0]
if combine_aps and n_ap > 1:
for j in range(n_ap):
spectral_sets[ordnum]['specflux'].append(specflux[j])
spectral_sets[ordnum]['specvar'].append(specvar[j])
spectral_sets[ordnum]['response'].append(specresp[j])
else:
spectral_sets[ordnum]['specflux'].append(specflux)
spectral_sets[ordnum]['specvar'].append(specvar)
spectral_sets[ordnum]['response'].append(specresp)
# combine all the full data arrays
if len(data_list) > 1:
outdata, outvar = combine_images(
data_list, variance=var_list,
method=method, weighted=weighted,
robust=robust, sigma=sigma, maxiters=maxiters)
else:
outdata, outvar = data_list[0], var_list[0]
# merge all the headers
outhdr = mergehdr(hdr_list)
# make a header for the error image
errhead = fits.Header()
# store output data: final extensions are
# FLUX, ERROR, TRANSMISSION
# and by order:
# WAVEPOS, FLUX, ERROR,
# SPECTRAL_FLUX, SPECTRAL_ERROR, TRANSMISSION, RESPONSE
template = self.input[0]
primary = fits.PrimaryHDU(data=outdata, header=outhdr)
hdul = fits.HDUList(primary)
hdinsert(errhead, 'BUNIT', outhdr.get('BUNIT', 'UNKNOWN'),
'Data units')
hdul.append(fits.ImageHDU(data=np.sqrt(outvar),
header=errhead,
name='ERROR'))
hdul.append(template['TRANSMISSION'])
for ordnum in sorted(spectral_sets.keys()):
# combine all the 2D arrays
if len(spectral_sets[ordnum]['flux']) > 1:
outdata, outvar = combine_images(
spectral_sets[ordnum]['flux'],
variance=spectral_sets[ordnum]['var'],
method=method, weighted=weighted,
robust=robust, sigma=sigma, maxiters=maxiters)
else:
outdata = spectral_sets[ordnum]['flux'][0]
outvar = spectral_sets[ordnum]['var'][0]
hdul.append(fits.ImageHDU(
data=outdata,
header=template[f'FLUX_ORDER_{ordnum}'].header,
name=f'FLUX_ORDER_{ordnum}'))
hdul.append(fits.ImageHDU(
data=np.sqrt(outvar),
header=template[f'ERROR_ORDER_{ordnum}'].header,
name=f'ERROR_ORDER_{ordnum}'))
# combine all the 1D spectra
if len(spectral_sets[ordnum]['specflux']) > 1:
outspec, outspecvar = combine_images(
spectral_sets[ordnum]['specflux'],
variance=spectral_sets[ordnum]['specvar'],
method=method, weighted=weighted, robust=robust,
sigma=sigma, maxiters=maxiters)
outresp, _ = combine_images(spectral_sets[ordnum]['response'],
method=method, robust=False)
else:
outspec = spectral_sets[ordnum]['specflux'][0]
outspecvar = spectral_sets[ordnum]['specvar'][0]
outresp = spectral_sets[ordnum]['response'][0]
# wavepos, spectral flux and error, transmission, response
# Note: wavepos and transmission are just copied
# from the first file; the flux, error, and response
# are from combined data
hdul.append(fits.ImageHDU(
data=template[f'WAVEPOS_ORDER_{ordnum}'].data,
header=template[f'WAVEPOS_ORDER_{ordnum}'].header,
name=f'WAVEPOS_ORDER_{ordnum}'))
hdul.append(fits.ImageHDU(
data=outspec,
header=template[f'SPECTRAL_FLUX_ORDER_{ordnum}'].header,
name=f'SPECTRAL_FLUX_ORDER_{ordnum}'))
hdul.append(fits.ImageHDU(
data=np.sqrt(outspecvar),
header=template[f'SPECTRAL_ERROR_ORDER_{ordnum}'].header,
name=f'SPECTRAL_ERROR_ORDER_{ordnum}'))
hdul.append(fits.ImageHDU(
data=template[f'TRANSMISSION_ORDER_{ordnum}'].data,
header=template[f'TRANSMISSION_ORDER_{ordnum}'].header,
name=f'TRANSMISSION_ORDER_{ordnum}'))
hdul.append(fits.ImageHDU(
data=outresp,
header=template[f'RESPONSE_ORDER_{ordnum}'].header,
name=f'RESPONSE_ORDER_{ordnum}'))
outname = self.update_output(hdul, self.filenum,
self.prodtypes[self.step_index])
self.filenum = [self.get_filenum(outname)]
# save if desired
if param.get_value('save'):
log.info('')
# save 1D Spextool-style final products
log.info('1D spectra:')
spec = self._make_1d(hdul)
if self.sky_spectrum:
specname = outname.replace('_SCM_', '_SCS_')
spec[0].header['PRODTYPE'] = 'sky_combined_spectrum_1d'
else:
specname = outname.replace('_COM_', '_CMB_')
spec[0].header['PRODTYPE'] = 'combined_spectrum_1d'
spec[0].header['FILENAME'] = os.path.basename(specname)
self.write_output(spec, specname)
display_files.append(spec)
# also save full product
log.info('')
log.info('Full product (2D images and 1D spectra):')
self.write_output(hdul, outname)
self.input = [hdul]
self.set_display_data(filenames=display_files + self.input)
[docs]
def refine_wavecal(self):
"""
Refine wavecal by setting a new central wavenumber.
User input should identify the order and pixel position of
a known spectral feature as well as the calibrated wavenumber
for that feature. This information is used to derive a new
central wavenumber.
Calls `sofia_redux.instruments.exes.wavecal` to update the
wavelength calibration from the central wavenumber.
"""
from sofia_redux.instruments.exes.wavecal import wavecal
# get parameters
param = self.get_parameter_set()
interactive = param.get_value('interactive')
identify_order = param.get_value('identify_order')
identify_line = param.get_value('identify_line')
identify_waveno = param.get_value('identify_waveno')
# todo - add interactive line selection with Eye plot
if interactive: # pragma: no cover
log.warning('Interactive wavecal not implemented')
# check inputs
test = [str(v).strip()
for v in [identify_order, identify_line, identify_waveno]]
if '' in test:
log.info('No line identified, not refining wavecal.')
return
try:
i_order = int(identify_order)
except (TypeError, ValueError):
i_order = -1
# derive central wavenumber from identified line
display_files = list()
for i, hdul in enumerate(self.input):
header = hdul[0].header
# here, NORDERS must refer to total number of orders for
# the data, not the number extracted
norders = header['NORDERS']
if i_order <= 0 or i_order > norders:
msg = f'Invalid order number {identify_order}'
log.error(msg)
raise ValueError(msg)
ordnum = f'{i_order:02d}'
try:
wavepos = hdul[f'WAVEPOS_ORDER_{ordnum}'].data
except (IndexError, KeyError):
msg = f'Invalid order number {identify_order}'
log.error(msg)
raise ValueError(msg)
# old wavenumber value at pixel position
old_wave = np.interp(identify_line, np.arange(wavepos.size),
wavepos)
# calculate new central wavenumber
central_waveno = parse_central_wavenumber(header)
hrr = header['HRR']
hrdgr = header['HRDGR']
instcfg = header['INSTCFG']
cross_dispersed = ((instcfg == 'HIGH_MED')
or (instcfg == 'HIGH_LOW'))
if cross_dispersed:
dw = 0.5 / (np.sqrt(hrr ** 2 / (1 + hrr ** 2)) * hrdgr)
else:
dw = 1.0
xorder = norders - i_order + 1
wnoi = central_waveno + dw * (xorder - (norders + 1) / 2.0)
wno0 = central_waveno + (identify_waveno
- old_wave) * wnoi / old_wave
log.info(f'Old central wavenumber: {central_waveno}')
log.info(f'New central wavenumber: {wno0}')
# update wavecal in existing orders
header['WNO0'] = wno0
for j in range(norders):
ordnum = f'{j + 1:02d}'
appos_key = f'APPOSO{ordnum}'
if appos_key not in header:
continue
wave_header = header.copy()
new_wave = wavecal(wave_header, order=j + 1)
hdul[f'WAVEPOS_ORDER_{ordnum}'].data = new_wave
# update WCS in flux, error extensions
for ext in [f'FLUX_ORDER_{ordnum}', f'ERROR_ORDER_{ordnum}']:
mid_wave = new_wave[new_wave.size // 2]
wave_scale = np.mean(new_wave[1:] - new_wave[:-1])
hdul[ext].header['CRVAL1'] = mid_wave
hdul[ext].header['CDELT1'] = wave_scale
# update atran extension - old one no longer matches data
atran = hdul['TRANSMISSION'].data
tdata = []
for trans in atran[1:]:
outtrans = np.interp(hdul[f'WAVEPOS_ORDER_{ordnum}'].data,
atran[0], trans,
left=np.nan, right=np.nan)
tdata.append(outtrans)
hdul[f'TRANSMISSION_ORDER_{ordnum}'].data = np.array(tdata)
# update output name
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
# save if desired
if param.get_value('save'):
log.info('')
# save 1D Spextool-style products
log.info('1D spectra:')
spec = self._make_1d(hdul)
if self.sky_spectrum: # pragma: no cover
specname = outname.replace('_WRF_', '_SWS_')
spec[0].header['PRODTYPE'] = 'sky_wavecal_refined_1d'
else:
specname = outname.replace('_WRF_', '_WRS_')
spec[0].header['PRODTYPE'] = 'wavecal_refined_1d'
spec[0].header['FILENAME'] = os.path.basename(specname)
self.write_output(spec, specname)
display_files.append(spec)
# also save full product
log.info('')
log.info('Full product (2D images and 1D spectra):')
self.write_output(hdul, outname)
# set override in load data parameters:
# if reduction is reset, it will be run with the new calibration
load_param = self.parameters.current[0]
load_param.set_value('cent_wave', wno0)
self.set_display_data(filenames=display_files + self.input)
@staticmethod
def _parse_regions(region_string):
"""
Parse wavenumber regions from an input string.
Wavenumber regions should be comma-separated ranges specified
as start-stop, prepended by the order number. Orders should be
separated by semicolons. For example,
"1:780-785,786-787;2:801.5-802.3"
specifies two regions in order 1 and 1 region in order 2.
Parameters
----------
region_string : str
Input parameter string.
Returns
-------
dict
Keys are integer order numbers, values are lists of floating point
region (start, stop) tuples.
"""
bad_msg = ['Could not read wavenumber region '
f"parameter: '{region_string}'",
'Wavenumber regions should be comma-separated '
'ranges as start-stop, prepended by the order number. ',
'Orders should be separated by semi-colons. ',
'For example, "1:780-785,786-787;2:801.5-802.3" '
'specifies two regions in order 1 and 1 region in order 2.']
regions = dict()
order_pos = list(str(region_string).split(';'))
for op in order_pos:
order_set = list(op.split(':'))
try:
order, rg_set = order_set
order = int(order)
except (ValueError, TypeError):
for msg in bad_msg:
log.error(msg)
raise ValueError('Invalid wavenumber '
'region parameter.') from None
rg_set = list(rg_set.split(','))
rg_list = list()
for reg in rg_set:
rg_range = reg.split('-')
try:
start, stop = rg_range
rg_list.append((float(start), float(stop)))
except (ValueError, TypeError):
for msg in bad_msg:
log.error(msg)
raise ValueError('Invalid wavenumber '
'region parameter.') from None
regions[order] = rg_list
return regions
@staticmethod
def _trim_data(hdul, ordnum, nan_regions):
"""
Set specified data to NaN.
2D images and 1D spectra for the specified order are set
to NaN between the wavenumber regions specifed.
Parameters
----------
hdul : astropy.io.fits.HDUList
HDU list containing data to trim.
ordnum : str
Two character order number string, zero-padded.
nan_regions : list of list of float
List of regions to blank. Each element should be a
two-element list or tuple containing a start and end
value for a region, in wavenumber.
"""
# regions are specified as wavenumber ranges:
# use wavepos to get areas to blank in image and spectral_flux,
# in place
wavepos = hdul[f'WAVEPOS_ORDER_{ordnum}'].data
image = hdul[f'FLUX_ORDER_{ordnum}'].data
error = hdul[f'ERROR_ORDER_{ordnum}'].data
spec = hdul[f'SPECTRAL_FLUX_ORDER_{ordnum}'].data
spec_err = hdul[f'SPECTRAL_ERROR_ORDER_{ordnum}'].data
for region in nan_regions:
idx = (wavepos >= region[0]) & (wavepos <= region[1])
if np.any(idx):
image[:, idx] = np.nan
error[:, idx] = np.nan
spec[idx] = np.nan
spec_err[idx] = np.nan
[docs]
def merge_orders(self):
"""
Merge all spectral orders.
Calls `sofia_redux.spectroscopy.mergespec` to merge the
1D spectra and `sofia_redux.toolkit.image.coadd` to combine
the 2D spectral images.
"""
from sofia_redux.spectroscopy.mergespec import mergespec
from sofia_redux.toolkit.image.coadd import coadd
# get parameters
param = self.get_parameter_set()
threshold = param.get_value('threshold')
statistic = param.get_value('statistic')
noise_test = param.get_value('noise_test')
local_noise = param.get_value('local_noise')
local_radius = param.get_value('local_radius')
trim = param.get_value('trim')
trim_regions = param.get_value('trim_regions')
# todo: allow interactive trimming
if trim: # pragma: no cover
log.warning('Interactive trimming not yet implemented.')
if str(trim_regions).strip().lower() != '':
nan_regions = self._parse_regions(trim_regions)
else:
nan_regions = None
results = []
for i, hdul in enumerate(self.input):
log.info('')
header = hdul[0].header
log.info(header['FILENAME'])
# get the atran data from the file
atran = hdul['TRANSMISSION'].data
# loop over orders, merging one at a time
result = None
for j in range(header['NORDERS']):
ordnum = f'{j + 1:02d}'
appos_key = f'APPOSO{ordnum}'
if appos_key not in header:
continue
# trim specified regions if necessary
if nan_regions is not None and j + 1 in nan_regions:
log.info(f'Trimming order {ordnum} regions: '
f'{nan_regions[j + 1]}')
self._trim_data(hdul, ordnum, nan_regions[j + 1])
# skip if spectrum is all nan
spec = hdul[f'SPECTRAL_FLUX_ORDER_{ordnum}'].data
if np.all(np.isnan(spec)):
log.warning(f'No good data in order {ordnum}')
continue
if result is None:
# initialize output from first order
image = hdul[f'FLUX_ORDER_{ordnum}'].data
primary = fits.PrimaryHDU(image, header)
primary.header['EXTNAME'] = 'FLUX'
primary.header['NORDERS'] = 1
flux_header = hdul[f'FLUX_ORDER_{ordnum}'].header
for key in self.wcs_keys:
hdinsert(primary.header, key,
flux_header[key], flux_header.comments[key])
result = fits.HDUList(primary)
for extname in ['ERROR', 'WAVEPOS',
'SPECTRAL_FLUX', 'SPECTRAL_ERROR',
'RESPONSE']:
hdu = hdul[f'{extname}_ORDER_{ordnum}']
hdu.header['EXTNAME'] = extname
result.append(hdu)
else:
# retrieve data from input
image = hdul[f'FLUX_ORDER_{ordnum}'].data
image_header = hdul[f'FLUX_ORDER_{ordnum}'].header
var = hdul[f'ERROR_ORDER_{ordnum}'].data ** 2
wave = hdul[f'WAVEPOS_ORDER_{ordnum}'].data
flux = hdul[f'SPECTRAL_FLUX_ORDER_{ordnum}'].data
err = hdul[f'SPECTRAL_ERROR_ORDER_{ordnum}'].data
response = hdul[f'RESPONSE_ORDER_{ordnum}'].data
# check for apertures
if result['SPECTRAL_FLUX'].data.ndim > 1:
all_wave = result['WAVEPOS'].data.copy()
all_flux = result['SPECTRAL_FLUX'].data.copy()
all_err = result['SPECTRAL_ERROR'].data.copy()
all_resp = result['RESPONSE'].data.copy()
n_ap = all_flux.shape[0]
for n in range(n_ap):
# merge current spectral order with new
merged = [all_wave, all_flux[n], all_err[n]]
incoming = [wave, flux[n], err[n]]
new_spec = mergespec(merged, incoming,
s2n_threshold=threshold,
s2n_statistic=statistic,
noise_test=noise_test,
local_noise=local_noise,
local_radius=local_radius)
# do the same for response but just do
# straight mean in overlaps
merged = [all_wave, all_resp[n]]
incoming = [wave, response[n]]
new_resp = mergespec(merged, incoming)
# make sure response matches flux shape
outresp = np.interp(new_spec[0],
new_resp[0], new_resp[1],
left=np.nan, right=np.nan)
# update output
if n == 0:
result['WAVEPOS'].data = new_spec[0]
result['SPECTRAL_FLUX'].data = new_spec[1]
result['SPECTRAL_ERROR'].data = new_spec[2]
result['RESPONSE'].data = outresp
else:
result['SPECTRAL_FLUX'].data = np.vstack(
[result['SPECTRAL_FLUX'].data,
new_spec[1]])
result['SPECTRAL_ERROR'].data = np.vstack(
[result['SPECTRAL_ERROR'].data,
new_spec[2]])
result['RESPONSE'].data = np.vstack(
[result['RESPONSE'].data,
outresp])
else:
# merge current spectral order with new
merged = [result['WAVEPOS'].data,
result['SPECTRAL_FLUX'].data,
result['SPECTRAL_ERROR'].data]
incoming = [wave, flux, err]
new_spec = mergespec(merged, incoming,
s2n_threshold=threshold,
s2n_statistic=statistic,
noise_test=noise_test,
local_noise=local_noise,
local_radius=local_radius)
# do the same for response but just do
# straight mean in overlaps
merged = [result['WAVEPOS'].data,
result['RESPONSE'].data]
incoming = [wave, response]
new_resp = mergespec(merged, incoming)
# make sure response matches flux shape
outresp = np.interp(new_spec[0],
new_resp[0], new_resp[1],
left=np.nan, right=np.nan)
# update output
result['WAVEPOS'].data = new_spec[0]
result['SPECTRAL_FLUX'].data = new_spec[1]
result['SPECTRAL_ERROR'].data = new_spec[2]
result['RESPONSE'].data = outresp
# coadd images in spectral WCS
with set_log_level('WARNING'):
outhdr, merged_image, merged_var, _ = \
coadd([result['FLUX'].header, image_header],
[result['FLUX'].data, image],
[result['ERROR'].data ** 2, var],
[np.ones_like(result['FLUX'].data),
np.ones_like(image)],
rotate=False, spectral=True)
result['FLUX'].header = outhdr
result['FLUX'].data = merged_image
result['ERROR'].data = np.sqrt(merged_var)
# attach an approximate transmission for the full wavelength range
tdata = []
for trans in atran[1:]:
outtrans = np.interp(result['WAVEPOS'].data,
atran[0], trans,
left=np.nan, right=np.nan)
tdata.append(outtrans)
# insert transmission before response
idx = result.index_of('RESPONSE')
exthead = result['SPECTRAL_FLUX'].header.copy()
result.insert(idx, fits.ImageHDU(np.array(tdata), exthead,
name='TRANSMISSION'))
hdinsert(result['TRANSMISSION'].header, 'BUNIT', '', 'Data units')
# set PROCSTAT to level 3, for final product only
hdinsert(result[0].header, 'PROCSTAT', 'LEVEL_3')
# update output name
outname = self.update_output(result, self.filenum[i],
self.prodtypes[self.step_index])
# save if desired
if param.get_value('save'):
log.info('')
self.write_output(result, outname)
log.info('')
log.info('Saving 1D spectra:')
spec = self._make_1d(result)
if self.sky_spectrum:
specname = outname.replace('SMM', 'SMD')
spec[0].header['PRODTYPE'] = 'sky_orders_merged_1d'
else:
specname = outname.replace('MRM', 'MRD')
spec[0].header['PRODTYPE'] = 'orders_merged_1d'
spec[0].header['FILENAME'] = os.path.basename(specname)
self.write_output(spec, specname)
results.append(result)
log.info('')
self.input = results
self.set_display_data()
[docs]
def specmap(self):
"""
Generate a quick-look spectral plot.
Calls `sofia_redux.visualization.quicklook.make_spectral_plot`.
The output from this step is identical to the input, so is
not saved. As a side effect, a PNG file is saved to disk to the
same base name as the input file, with a '.png' extension.
"""
from matplotlib.backends.backend_agg \
import FigureCanvasAgg as FigureCanvas
from matplotlib.figure import Figure
from sofia_redux.visualization.quicklook import make_spectral_plot
# get parameters
param = self.get_parameter_set()
normalize = param.get_value('normalize')
scale = param.get_value('scale')
ignore_outer = param.get_value('ignore_outer')
atran_plot = param.get_value('atran_plot')
error_plot = param.get_value('error_plot')
colormap = param.get_value('colormap')
overplot_color = param.get_value('overplot_color')
watermark = param.get_value('watermark')
if scale[0] <= 0 and scale[1] >= 100:
scale = None
for i, hdul in enumerate(self.input):
header = hdul[0].header
labels = None
aplot = None
nspec = 1
try:
# merged order full product
wave = hdul['WAVEPOS'].data
xunit = hdul['WAVEPOS'].header.get('BUNIT', 'um')
spec_flux = hdul['SPECTRAL_FLUX'].data
spec_err = hdul['SPECTRAL_ERROR'].data
yunit = hdul['SPECTRAL_FLUX'].header.get('BUNIT',
'UNKNOWN')
# check for multiple rows in atran -- use the first only
atran = hdul['TRANSMISSION'].data
if atran.ndim > 1:
atran = atran[0]
# check for multiple apertures in new-style data
if spec_flux.ndim > 1 and wave.ndim == 1:
nspec = spec_flux.shape[0]
wave = np.tile(wave, (nspec, 1))
labels = [f'Spectrum {j + 1}' for j in range(nspec)]
except KeyError:
# 1D spectrum
xunit = hdul[0].header.get('XUNITS', 'UNKNOWN')
yunit = hdul[0].header.get('YUNITS', 'UNKNOWN')
# check for multiple orders/apertures
if hdul[0].data.ndim > 2:
# multi-order or multi-ap
wave = hdul[0].data[:, 0]
nspec = hdul[0].data.shape[0]
labels = [f'Spectrum {j + 1}' for j in range(nspec)]
spec_flux = hdul[0].data[:, 1]
spec_err = hdul[0].data[:, 2]
try:
atran = hdul[0].data[:, 3]
except IndexError: # pragma: no cover
# may be missing for old data
atran = np.full_like(spec_flux, np.nan)
else:
# single order/ap
wave = hdul[0].data[0]
spec_flux = hdul[0].data[1]
spec_err = hdul[0].data[2]
try:
atran = hdul[0].data[3]
except IndexError: # pragma: no cover
atran = np.full_like(spec_flux, np.nan)
if nspec > 1:
if normalize:
norm = np.nanmedian(spec_flux, axis=1)[:, None]
spec_flux = spec_flux / norm
spec_err = spec_err / norm
yunit = 'normalized'
if ignore_outer > 0:
# set the outer N% of frames to NaN
wstart = int(ignore_outer * wave.shape[1])
wend = int((1 - ignore_outer) * wave.shape[1])
spec_flux[:, :wstart] = np.nan
spec_flux[:, wend:] = np.nan
atran[:wstart] = np.nan
atran[wend:] = np.nan
log.debug(f'Plotting between w={wstart} and w={wend}')
else:
if normalize:
norm = np.nanmedian(spec_flux)
spec_flux = spec_flux / norm
spec_err = spec_err / norm
yunit = 'normalized'
if ignore_outer > 0:
wstart = int(ignore_outer * len(wave))
wend = int((1 - ignore_outer) * len(wave))
spec_flux[:wstart] = np.nan
spec_flux[wend:] = np.nan
atran[:wstart] = np.nan
atran[wend:] = np.nan
log.debug(f'Plotting between w={wstart} and w={wend}')
# set text for title in plot
obj = header.get('OBJECT', 'UNKNOWN')
mode = header.get('INSTCFG', 'UNKNOWN')
basename = os.path.basename(header.get('FILENAME', 'UNKNOWN'))
title = f'Object: {obj}, Mode: {mode}\nFilename: {basename}'
# make the figure for the spectral plot
fig = Figure(figsize=(8, 5))
FigureCanvas(fig)
ax = fig.add_subplot(1, 1, 1)
ax.set_title(title)
if atran_plot and not np.all(np.isnan(atran)):
if wave.ndim > 1:
aplot = [wave.T, atran.T]
else:
aplot = [wave, atran]
# plot spectral flux
spec_err = None if not error_plot else spec_err
make_spectral_plot(ax, wave, spec_flux, spectral_error=spec_err,
scale=scale, labels=labels, colormap=colormap,
xunit=xunit, yunit=yunit,
title=title, overplot=aplot,
overplot_label='Atmospheric Transmission',
overplot_color=overplot_color,
watermark=watermark)
# output filename for image
fname = os.path.splitext(basename)[0] + '.png'
outname = os.path.join(self.output_directory, fname)
fig.savefig(outname, dpi=300)
log.info(f'Saved image to {outname}')