# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""FLITECAM Imaging Reduction pipeline steps"""
import os
from astropy import log
import numpy as np
from sofia_redux.pipeline.sofia.sofia_exception import SOFIAImportError
try:
import sofia_redux.instruments.flitecam
assert sofia_redux.instruments.flitecam
except ImportError:
raise SOFIAImportError('FLITECAM modules not installed')
try:
import sofia_redux.instruments.forcast.configuration as dripconfig
except ImportError:
raise SOFIAImportError('FORCAST modules not installed')
from sofia_redux.pipeline.sofia.flitecam_reduction \
import FLITECAMReduction
from sofia_redux.pipeline.sofia.parameters.flitecam_imaging_parameters \
import FLITECAMImagingParameters
from sofia_redux.pipeline.sofia.forcast_imaging_reduction \
import FORCASTImagingReduction
from sofia_redux.toolkit.utilities.fits import hdinsert, gethdul
__all__ = ['FLITECAMImagingReduction']
[docs]
class FLITECAMImagingReduction(FLITECAMReduction, FORCASTImagingReduction):
"""
FLITECAM imaging reduction steps.
Primary image reduction algorithms are defined in the flitecam
package (`sofia_redux.instruments.flitecam`). Calibration-related
algorithms are pulled from the `sofia_redux.calibration` package, and
some utilities come from the `sofia_redux.toolkit` package. This
reduction object requires that all three packages be installed.
This reduction object defines a method for each pipeline
step, that calls the appropriate algorithm from its source
packages.
"""
def __init__(self):
"""Initialize the reduction object."""
FLITECAMReduction.__init__(self)
# descriptive attributes
self.mode = 'imaging'
# product type definitions for FLITECAM steps
self.prodtype_map.update({
'clip_image': 'clipped',
'make_flat': 'flat',
'correct_gain': 'gain_corrected',
'subtract_sky': 'background_subtracted',
'register': 'registered',
'tellcor': 'telluric_corrected',
'coadd': 'coadded',
'fluxcal': 'calibrated',
'imgmap': 'imgmap'
})
self.prodnames.update({
'clipped': 'CLP',
'flat': 'FLT',
'gain_corrected': 'GCR',
'background_subtracted': 'BGS',
'registered': 'REG',
'telluric_corrected': 'TEL',
'coadded': 'COA',
'calibrated': 'CAL',
'imgmap': 'IMP',
})
# invert the map for quick lookup of step from type
self.step_map = {v: k for k, v in self.prodtype_map.items()}
# default recipe and step names
self.recipe = ['check_header', 'correct_linearity',
'clip_image', 'make_flat', 'correct_gain',
'subtract_sky', 'register', 'tellcor',
'coadd', 'fluxcal', 'imgmap']
self.processing_steps.update({
'clip_image': 'Clip Image',
'make_flat': 'Make Flat',
'correct_gain': 'Correct Gain',
'subtract_sky': 'Subtract Sky',
'register': 'Register Images',
'tellcor': 'Telluric Correct',
'coadd': 'Combine Images',
'fluxcal': 'Flux Calibrate',
'imgmap': 'Make Image Map',
})
# load some default FORCAST configurations, for registration step
# Actual values will be provided in parameters.
dripconfig.load()
[docs]
def load(self, data, param_class=None):
"""Call parent load, with imaging parameters."""
FLITECAMReduction.load(self, data,
param_class=FLITECAMImagingParameters)
[docs]
def clip_image(self):
"""Clip image to useful portion of detector."""
from sofia_redux.instruments.flitecam.clipimg import clipimg
from sofia_redux.instruments.flitecam.expmap import expmap
from sofia_redux.instruments.flitecam.maskbp import maskbp
# get parameters
param = self.get_parameter_set()
datasec = param.get_value('datasec')
skip_clean = param.get_value('skip_clean')
outdata = []
for i, hdul in enumerate(self.input):
log.info('')
log.info(f"Input: {hdul[0].header['FILENAME']}")
# clip data
result = clipimg(hdul, datasec)
# mask bad pixels
if skip_clean:
log.info('Skipping bad pixel identification')
else:
result = maskbp(result, cval=np.nan)
# append exposure map
result = expmap(result)
outname = self.update_output(
result, self.filenum[i], self.prodtypes[self.step_index])
# save if desired
if param.get_value('save'):
self.write_output(result, outname)
outdata.append(result)
self.input = outdata
# set display data to input
self.set_display_data()
[docs]
def make_flat(self):
"""
Make a flat field from input data.
The procedure is:
- Check for a previously made flat. If present, use it.
- If sky files are loaded, use those to make the flat.
Correct the sky files with others to verify flat.
- Otherwise, make a flat out of all input except the current
file. If less than 3 files are loaded, no flat will be
generated.
"""
from sofia_redux.instruments.flitecam.mkflat import mkflat
from sofia_redux.instruments.flitecam.split_input import split_input
# get parameters
param = self.get_parameter_set()
flatfile = param.get_value('flatfile')
skip_flat = param.get_value('skip_flat')
# check input options
if skip_flat:
log.info('Skipping flat generation. Data will not be '
'gain-corrected.')
return
# sort data, looking for sky files and data to correct
manifest = split_input(self.input)
flat = None
if os.path.isfile(flatfile):
flat = gethdul(flatfile)
if flat is None or 'FLAT' not in flat:
raise ValueError(f'Bad flat file: {flatfile}')
log.info(f'Using previously generated flat file {flatfile}.')
else:
if len(manifest['sky']) > 0:
# sky files present, make the flat from them
flat_input = manifest['sky']
log.info('Using sky files to make flat:')
for f in flat_input:
log.info(f" {f[0].header.get('FILENAME', 'UNKNOWN')}")
flat = mkflat(flat_input)
log.info('')
# warn if insufficient data to generate a flat
if flat is None and len(self.input) < 3:
log.warning('Too few files to generate flat. '
'Data will not be gain corrected.')
outdata = []
for i, hdul in enumerate(self.input):
# if not already generated, make the flat from all input
# except the current file
if flat is None:
# this procedure requires at least 2 files other
# than the current one
if len(self.input) < 3:
this_flat = None
else:
flat_input = self.input[:i] + self.input[i + 1:]
log.info('')
log.info(f"Using remaining source files to make flat for "
f"{hdul[0].header.get('FILENAME', 'UNKNOWN')}:")
for f in flat_input:
log.info(f" {f[0].header.get('FILENAME', 'UNKNOWN')}")
this_flat = mkflat(flat_input)
else:
this_flat = flat
# attach flat to hdul
if this_flat is not None:
for extname in ['FLAT', 'FLAT_ERROR', 'FLAT_BADMASK']:
# allow flat error and badmask not to exist,
# in externally provided flats
if extname in this_flat:
hdul.append(this_flat[extname])
outdata.append(hdul)
# loop through output again to update names and headers
# and save to disk
log.info('')
for i, hdul in enumerate(outdata):
outname = self.update_output(
hdul, self.filenum[i], self.prodtypes[self.step_index])
# save if desired
if param.get_value('save'):
self.write_output(hdul, outname)
self.input = outdata
# set display data to input
self.set_display_data()
[docs]
def correct_gain(self):
"""Correct gain variations."""
from sofia_redux.instruments.flitecam.gaincor import gaincor
param = self.get_parameter_set()
outdata = []
for i, hdul in enumerate(self.input):
if 'FLAT' in hdul:
# correct data and propagate errors
hdul = gaincor(hdul)
else:
log.warning('No FLAT extension present; not correcting data.')
outname = self.update_output(
hdul, self.filenum[i], self.prodtypes[self.step_index])
# save if desired
if param.get_value('save'):
self.write_output(hdul, outname)
outdata.append(hdul)
self.input = outdata
# set display data to input
self.set_display_data()
[docs]
def subtract_sky(self):
"""Subtract sky background level."""
from sofia_redux.instruments.flitecam.backsub import backsub
from sofia_redux.instruments.flitecam.split_input import split_input
from sofia_redux.instruments.forcast.peakfind import peakfind
param = self.get_parameter_set()
skyfile = param.get_value('skyfile')
skip_sky = param.get_value('skip_sky')
method = param.get_value('sky_method')
if 'median' in str(method).lower():
method = 'median'
else:
method = 'flatnorm'
# check input options
if skip_sky:
log.info('Skipping sky subtraction. Data will not be '
'background corrected.')
log.info('Sky files will be propagated to the following steps.')
return
sky = None
if os.path.isfile(skyfile):
sky = gethdul(skyfile)
if sky is None or 'FLUX' not in sky:
raise ValueError(f'Bad sky file: {skyfile}')
log.info(f'Using previously generated sky file {skyfile}.')
# get sky files to drop from input, if present
manifest = split_input(self.input)
propagate = manifest['object'] + manifest['standard']
# special case: only sky files were loaded. In that
# case go ahead and correct them, for a check of the flat.
if len(propagate) == 0 and len(manifest['sky']) > 0:
log.warning('Only sky files are present; propagating them '
'to following steps.')
propagate = manifest['sky']
elif len(manifest['sky']) > 0:
log.info('Dropping sky files from further propagation.')
outdata = []
outfilenum = []
for i, hdul in enumerate(self.input):
# skip the file if it's not a science file
if hdul not in propagate:
continue
# keep the file number for future steps
outfilenum.append(self.filenum[i])
# now correct the data and propagate errors
hdul = backsub(hdul, bgfile=sky, method=method)
# for standards, mark the likely source position
# near CRPIX1/2
if hdul in manifest['standard']:
sx = hdul[0].header['CRPIX1'] - 1
sy = hdul[0].header['CRPIX2'] - 1
try:
shape = hdul['FLUX'].data.shape
wdw = int(np.min([100, sx, sy,
shape[1] - sx,
shape[0] - sy]))
xmin, xmax = int(sx - wdw), int(sx + wdw)
ymin, ymax = int(sy - wdw), int(sy + wdw)
stamp = hdul['FLUX'].data[ymin:ymax, xmin:xmax]
peak = peakfind(stamp, npeaks=1, silent=True,
positive=True, coordinates=True,
fwhm=6.0, smooth=False)
if peak is not None and len(peak) == 1:
px = peak[0][0] + xmin
py = peak[0][1] + ymin
if (0 < px < shape[1]
and 0 < py < shape[0]):
sx, sy = px, py
except (IndexError, ValueError,
TypeError, RuntimeError) as err: # pragma: no cover
log.debug(f'Peakfind error: {str(err)}')
hdinsert(hdul[0].header, 'SRCPOSX', sx,
'Source x-position')
hdinsert(hdul[0].header, 'SRCPOSY', sy,
'Source y-position')
outname = self.update_output(
hdul, self.filenum[i], self.prodtypes[self.step_index])
# save if desired
if param.get_value('save'):
self.write_output(hdul, outname)
outdata.append(hdul)
self.input = outdata
self.filenum = outfilenum
# set display data to input
self.set_display_data()