Source code for sofia_redux.pipeline.sofia.forcast_spatcal_reduction

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""FORCAST Grism Calibration Reduction pipeline steps"""

import os
import warnings

from astropy import log
from astropy.io import fits
import numpy as np

from sofia_redux.pipeline.sofia.sofia_exception import SOFIAImportError
try:
    import sofia_redux.instruments.forcast
    assert sofia_redux.instruments.forcast
except ImportError:
    raise SOFIAImportError('FORCAST modules not installed')

from sofia_redux.instruments.forcast.hdmerge import hdmerge
from sofia_redux.pipeline.sofia.forcast_reduction import FORCASTReduction
from sofia_redux.pipeline.sofia.parameters.forcast_spatcal_parameters \
    import FORCASTSpatcalParameters
from sofia_redux.pipeline.sofia.forcast_wavecal_reduction \
    import FORCASTWavecalReduction
from sofia_redux.pipeline.sofia.sofia_utilities import parse_apertures
from sofia_redux.spectroscopy.readwavecal import readwavecal
from sofia_redux.toolkit.utilities.fits import hdinsert, getheader
from sofia_redux.toolkit.fitting.polynomial import polyfitnd
from sofia_redux.toolkit.image.adjust import unrotate90
from sofia_redux.toolkit.interpolate import tabinv

__all__ = ['FORCASTSpatcalReduction']


[docs] class FORCASTSpatcalReduction(FORCASTWavecalReduction): r""" FORCAST spectroscopic spatial calibration reduction steps. This reduction object defines specialized reduction steps for generating spatial calibration data from spectroscopic input files. It is selected by the SOFIA chooser only if a top-level configuration flag is supplied (spatcal=True). The final output product from this reduction is a FITS file (\*SCL\*.fits) with PRODTYPE = 'spatcal'. This file can be supplied to the standard spectroscopic pipeline, at the make_profiles step, to specify a new spatial calibration. """ def __init__(self): """Initialize the reduction object.""" super().__init__() # descriptive attributes specific to calibration self.name = 'Spatcal' # product type definitions for spectral steps self.prodtype_map.update( {'make_profiles': 'spatial_profile', 'fit_traces': 'traces_fit'}) self.prodnames.update( {'spatial_profile': 'PRF', 'traces_fit': 'TFT'}) # invert the map for quick lookup of step from type self.step_map = {v: k for k, v in self.prodtype_map.items()} # default recipe and step names self.recipe = ['checkhead', 'clean', 'droop', 'nonlin', 'stack', 'stack_dithers', 'make_profiles', 'locate_apertures', 'trace_continuum', 'fit_traces', 'rectify'] self.processing_steps.update({'fit_traces': 'Fit Trace Positions'})
[docs] def load(self, data, param_class=None): """Call parent load, with spatcal parameters.""" FORCASTReduction.load(self, data, param_class=FORCASTSpatcalParameters)
[docs] def fit_traces(self): """Fit a 2D surface to traced positions.""" param = self.get_parameter_set() x_order = param.get_value('x_fit_order') y_order = param.get_value('y_fit_order') weighted = param.get_value('weighted') wavefile = param.get_value('wavefile') rotation = param.get_value('rotation') xpos, ypos, expected, height = [], [], [], [] data_shape = None appos_arcs = [] hdr_list = [] for i, hdul in enumerate(self.input): if data_shape is None: data_shape = hdul['FLUX'].data.shape # traces from previous step header = hdul[0].header hdr_list.append(header) trace_x = hdul['APERTURE_XPOS'].data trace_y = hdul['APERTURE_YPOS'].data # expected spatial cal from order mask sim_spatcal = self._sim_spatcal(data_shape) arcsec = sim_spatcal[:, data_shape[1] // 2] # calibrated aperture positions appos = np.array(parse_apertures(header['APPOSO01'], 1)[0]) appos_arc = np.interp(appos, np.arange(arcsec.size), arcsec) appos_arcs.extend(appos_arc) # get profile heights from spatial map # and expected value from assumed slit height smap = hdul['SPATIAL_MAP'].data for x, y in zip(trace_x, trace_y): height.append(np.abs(smap[int(np.round(y)), int(np.round(x))])) # nearest aperture position gives expected spatial pos ap = appos_arc[np.argmin(np.abs(appos - y))] expected.append(ap) xpos.append(x) ypos.append(y) # 2D surface fit to all positions if weighted: log.info('Weighting fit by spatial profile height.') error = 1 / np.array(height) else: log.info('Fit is unweighted.') error = None log.info('') sfit_model = polyfitnd(ypos, xpos, expected, [y_order, x_order], error=error, robust=5.0, model=True) log.info(sfit_model) idx = np.arange(data_shape[0], dtype=float) space = np.tile(np.expand_dims(idx, 1), (1, data_shape[1])) idx = np.arange(data_shape[1], dtype=float) wave = np.tile(np.expand_dims(idx, 0), (data_shape[0], 1)) sfit_full = sfit_model(space, wave) sfit = sfit_full.copy() # wavelength calibration from input or pixel positions if os.path.isfile(wavefile): log.info(f'Using {wavefile} for wavelength calibration.') wavecal, _ = readwavecal(wavefile, rotate=rotation) wave_hdr = getheader(wavefile) else: log.info('Using pixel positions for wavelength calibration.') wavecal = wave.copy() wave_hdr = fits.Header() # apply the order mask from the first file mask = self.input[0]['BADMASK'].data wavecal[mask != 0] = np.nan # match spatcal nans to wavecal sfit[np.isnan(wavecal)] = np.nan results = [] regions = [] residuals = [] save_names = [] for i, hdul in enumerate(self.input): # record data del hdul[0].header['APPOSO01'] hdul.append(fits.ImageHDU(wavecal, name='WAVECAL')) hdul.append(fits.ImageHDU(sfit, name='SPATCAL')) # update output name outname = self.update_output(hdul, self.filenum[i], self.prodtypes[self.step_index]) # save if desired if param.get_value('save'): self.write_output(hdul, outname) results.append(hdul) # also write final spatcal file header = hdmerge(hdr_list, hdr_list[0]) # rotate if needed before saving rs = unrotate90(sfit.copy(), rotation) rw = unrotate90(wavecal.copy(), rotation) hdinsert(header, 'ROTATION', rotation, 'Rotate 90deg value') calfile = fits.HDUList( fits.PrimaryHDU(header=header, data=np.array([rw, rs]))) hdinsert(calfile[0].header, 'WCTYPE', '2D', comment='Wavelength calibration type') hdinsert(calfile[0].header, 'WXDEG', wave_hdr.get('WXDEG', 1), comment='X polynomial degree for 2D wavecal') hdinsert(calfile[0].header, 'WYDEG', wave_hdr.get('WYDEG', 0), comment='Y polynomial degree for 2D wavecal') hdinsert(calfile[0].header, 'WCOEFF', wave_hdr.get('WCOEFF', ''), comment='Wavelength fit coefficients') hdinsert(calfile[0].header, 'SXDEG', x_order, comment='X polynomial degree for 2D spatcal') hdinsert(calfile[0].header, 'SYDEG', y_order, comment='Y polynomial degree for 2D spatcal') hdinsert(calfile[0].header, 'SCOEFF', ','.join(str(c) for c in sfit_model.coefficients), comment='Spatial fit coefficients') hdinsert(calfile[0].header, 'NORDERS', 1, comment='Number of orders') hdinsert(calfile[0].header, 'ORDERS', '1', comment='Order numbers') outname = self.getfilename(header, update=True, prodtype='SCL', filenum=self.filenum) calfile[0].header['FILENAME'] = os.path.basename(outname) calfile[0].header['PRODTYPE'] = 'spatcal' self.write_output(calfile, outname) # make a region file to display with warnings.catch_warnings(): warnings.simplefilter('ignore', RuntimeWarning) min_line = np.nanmax(np.nanmin(sfit, axis=0)) max_line = np.nanmin(np.nanmax(sfit, axis=0)) slines = sorted([min_line, max_line] + appos_arcs) trace_fit = [] for xval in idx: aptab = [] for line in slines: aptab.append(tabinv(sfit_full[:, int(xval)], line, missing=np.nan)) trace_fit.append(aptab) trace_fit = np.array(trace_fit).T region = self._trace_region(header, self.filenum, 'TFT', xpos, ypos, idx, trace_fit, fit_direction='x') regions.append(region) # keep residuals for plotting ds = np.nanmean(sfit[1:, :] - sfit[:-1, :]) residuals_data = [xpos, ypos, sfit_model.stats.residuals / ds] residuals.append(residuals_data) pngname = outname.replace('SCL', 'RSD') pngname = os.path.join(self.output_directory, os.path.splitext(pngname)[0] + '.png') save_names.append(pngname) self.input = results self.set_display_data(regions=regions, residuals=residuals) # save residual plot to disk, after assembled in display data self._save_residual_plot(save_names) log.info('')