# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""FORCAST Grism Calibration Reduction pipeline steps"""
import os
from astropy import log
from astropy.io import fits
import numpy as np
from scipy.ndimage import uniform_filter1d
from sofia_redux.pipeline.sofia.sofia_exception import SOFIAImportError
try:
import sofia_redux.instruments.forcast
assert sofia_redux.instruments.forcast
except ImportError:
raise SOFIAImportError('FORCAST modules not installed')
from sofia_redux.pipeline.gui.matplotlib_viewer import MatplotlibViewer
from sofia_redux.pipeline.gui.qad_viewer import QADViewer
from sofia_redux.pipeline.sofia.forcast_reduction import FORCASTReduction
from sofia_redux.pipeline.sofia.forcast_spectroscopy_reduction \
import FORCASTSpectroscopyReduction
from sofia_redux.pipeline.sofia.parameters.forcast_slitcorr_parameters \
import FORCASTSlitcorrParameters
from sofia_redux.pipeline.sofia.sofia_utilities import parse_apertures
from sofia_redux.spectroscopy.extspec import extspec
from sofia_redux.toolkit.utilities.fits import hdinsert
from sofia_redux.toolkit.fitting.polynomial import polyfitnd
from sofia_redux.toolkit.interpolate import tabinv
__all__ = ['FORCASTSlitcorrReduction']
def _err_func(var, weights):
var[weights < 1] = np.nan
count = np.sum(~np.isnan(var))
return np.sqrt(np.nansum(var)) / count
def _med_func(flux, weights):
flux[weights < 1] = np.nan
return np.nanmedian(flux)
[docs]
class FORCASTSlitcorrReduction(FORCASTSpectroscopyReduction):
r"""
FORCAST spesctroscopic slit correction reduction steps.
This reduction object defines specialized reduction steps
for generating slit correction calibration data from spectroscopic
input files. It is selected by the SOFIA chooser only if a top-level
configuration flag is supplied (slitcorr=True). The final
output product from this reduction is a FITS file (\*SCR\*.fits)
with PRODTYPE = 'slit_correction'. This file can be supplied to the
standard spectroscopic pipeline, at the make_profiles step,
to specify a new slit response correction.
"""
def __init__(self):
"""Initialize the reduction object."""
super().__init__()
# descriptive attributes specific to calibration
self.name = 'Slit correction'
# product type definitions for spectral steps
self.prodtype_map.update(
{'rectify': 'test_rectified_image',
'make_profiles': 'rectified_image',
'extract_median_spectra': 'median_spectra',
'normalize': 'normalized_image',
'make_slitcorr': 'slit_correction'})
self.prodnames.update(
{'rectified_image': 'RIM',
'median_spectra': 'MSM',
'normalized_image': 'NIM',
'slit_correction': 'SCR'})
# invert the map for quick lookup of step from type
self.step_map = {v: k for k, v in self.prodtype_map.items()}
# default recipe and step names
self.recipe = ['checkhead', 'clean', 'droop', 'nonlin',
'stack', 'stack_dithers', 'make_profiles',
'locate_apertures', 'extract_median_spectra',
'normalize', 'make_slitcorr']
self.processing_steps.update(
{'make_profiles': 'Make Profiles',
'extract_median_spectra': 'Extract Median Spectra',
'normalize': 'Normalize Response',
'make_slitcorr': 'Make Slit Correction'})
[docs]
def load(self, data, param_class=None):
"""Call parent load, with slitcorr parameters."""
FORCASTReduction.load(self, data,
param_class=FORCASTSlitcorrParameters)
[docs]
def register_viewers(self):
"""Return new viewers."""
prof = MatplotlibViewer()
prof.name = 'ProfileViewer'
prof.title = 'Spatial Profiles'
prof.layout = 'rows'
# using matplotlib viewer instead of Eye,
# for line overlay purposes
spec = MatplotlibViewer()
spec.name = 'SpectralViewer'
spec.title = 'Spectra'
spec.layout = 'rows'
viewers = [QADViewer(), prof, spec]
return viewers
[docs]
def set_display_data(self, raw=False, filenames=None, regions=None,
specviewer='matplotlib'):
"""
Store display data for custom viewers.
Parameters
----------
raw : bool
If True, display data is taken from self.rawfiles.
If False, display data is taken from self.input
filenames : list of str, optional
If provided and `raw` is False, file names will be
passed to QADViewer instead of self.input.
regions : list of str, optional
File names of DS9 region files to pass to QADViewer.
residuals : list of array-like, optional
Fit residual data to pass to ResidualViewer, nplot x 3.
The arrays should be x value, y value, residuals.
"""
super().set_display_data(raw=raw, filenames=filenames,
regions=regions, specviewer=specviewer)
[docs]
def normalize(self):
"""Normalize by central median spectrum."""
param = self.get_parameter_set()
results = []
for i, hdul in enumerate(self.input):
log.info('')
log.info(hdul[0].header['FILENAME'])
# retrieve data from input
flux = hdul['FLUX'].data
err = hdul['ERROR'].data
spec_flux = hdul['SPECTRAL_FLUX'].data
spec_err = hdul['SPECTRAL_ERROR'].data
# normalize to center spectrum
nspec = spec_flux.shape[0]
reference = spec_flux[nspec // 2]
hdul['FLUX'].data = flux / reference
hdul['ERROR'].data = err / reference
hdul['SPECTRAL_FLUX'].data = spec_flux / reference
hdul['SPECTRAL_ERROR'].data = spec_err / reference
# update bunit
hdul['FLUX'].header['BUNIT'] = ''
hdul['ERROR'].header['BUNIT'] = ''
hdul['SPECTRAL_FLUX'].header['BUNIT'] = ''
hdul['SPECTRAL_ERROR'].header['BUNIT'] = ''
# update output name
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
# save if desired
if param.get_value('save'):
self.write_output(hdul, outname)
if param.get_value('save_1d'):
log.info('')
log.info('Saving 1D spectra:')
spec = self._make_1d(hdul)
specname = outname.replace('NIM', 'NMS')
spec[0].header['FILENAME'] = os.path.basename(specname)
spec[0].header['PRODTYPE'] = 'spectra_1d'
self.write_output(spec, specname)
results.append(hdul)
log.info('')
self.input = results
self.set_display_data()
[docs]
def make_slitcorr(self):
"""Fit and smooth normalized data."""
param = self.get_parameter_set()
method = param.get_value('method')
weighted = param.get_value('weighted')
x_order = param.get_value('x_fit_order')
y_order = param.get_value('y_fit_order')
y_order_1d = param.get_value('y_fit_order_1d')
x_width = param.get_value('x_width')
results = []
for i, hdul in enumerate(self.input):
log.info('')
log.info(hdul[0].header['FILENAME'])
# retrieve data from input
header = hdul[0].header
flux = hdul['FLUX'].data
spec_flux = hdul['SPECTRAL_FLUX'].data
spec_err = hdul['SPECTRAL_ERROR'].data
appos = np.array(parse_apertures(header['APPOSO01'], 1)[0])
wave = hdul['WAVEPOS'].data
space = hdul['SLITPOS'].data
# fit to normalized median spectra
error = None
if str(method).lower() == '1d':
sfit = np.full_like(flux, np.nan)
chisq = []
for j, xval in enumerate(wave):
if weighted:
error = spec_err[:, j]
sfit_model = polyfitnd(appos, spec_flux[:, j],
y_order_1d, error=error,
robust=5.0, model=True)
sfit_col = sfit_model(space)
sfit[:, j] = sfit_col
chisq.append(sfit_model.stats.rchi2)
# boxcar smooth in the x direction
sfit = uniform_filter1d(sfit, x_width, axis=1)
log.info(f'Mean reduced chi^2: {np.mean(chisq)}')
log.info('')
else:
if weighted:
error = spec_err
ypos = np.tile(np.expand_dims(appos, 1), (1, wave.size))
xpos = np.tile(np.expand_dims(wave, 0), (appos.size, 1))
sfit_model = polyfitnd(ypos, xpos, spec_flux,
[y_order, x_order], error=error,
robust=5.0, model=True)
log.info(sfit_model)
# expand to full array
ypos = np.tile(np.expand_dims(space, 1), (1, wave.size))
xpos = np.tile(np.expand_dims(wave, 0), (space.size, 1))
sfit = sfit_model(ypos, xpos)
# delete aperture info from header
del header['APPOSO01']
# save fit as slit correction image
hdul = fits.HDUList(fits.PrimaryHDU(header=header, data=sfit))
hdinsert(hdul[0].header, 'SCXDEG', x_order,
comment='X polynomial degree for slit correction')
hdinsert(hdul[0].header, 'SCYDEG', y_order,
comment='Y polynomial degree for slit correction')
hdinsert(hdul[0].header, 'SCCOEFF',
','.join(str(c) for c in sfit_model.coefficients),
comment='Slit correction fit coeff')
# update output name
outname = self.update_output(hdul, self.filenum[i],
self.prodtypes[self.step_index])
# save if desired
if param.get_value('save'):
self.write_output(hdul, outname)
results.append(hdul)
log.info('')
self.input = results
self.set_display_data()