# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""FORCAST Imaging Reduction pipeline steps"""
import os
import warnings
import numpy as np
from astropy import log
from astropy.io import fits
from astropy.wcs import WCS
from sofia_redux.pipeline.sofia.sofia_exception import SOFIAImportError
try:
import sofia_redux.instruments.forcast
assert sofia_redux.instruments.forcast
except ImportError:
raise SOFIAImportError('FORCAST modules not installed')
from sofia_redux.calibration.pipecal_config import pipecal_config
from sofia_redux.calibration.pipecal_error import PipeCalError
import sofia_redux.instruments.forcast.configuration as dripconfig
from sofia_redux.instruments.forcast.getcalpath import getcalpath
from sofia_redux.instruments.forcast.getpar import getpar
from sofia_redux.pipeline.reduction import Reduction
from sofia_redux.pipeline.sofia.forcast_reduction import FORCASTReduction
from sofia_redux.pipeline.sofia.parameters.forcast_imaging_parameters \
import FORCASTImagingParameters
from sofia_redux.toolkit.utilities.fits import hdinsert, getheader
# these imports are not used here, but are needed to avoid
# a numba error on linux systems
from sofia_redux.toolkit.resampling import tree
assert tree
__all__ = ['FORCASTImagingReduction']
[docs]
class FORCASTImagingReduction(FORCASTReduction):
"""
FORCAST imaging reduction steps.
Primary image reduction algorithms are defined in the DRIP
package (`sofia_redux.instruments.forcast`). Calibration-related
algorithms are pulled from the `sofia_redux.calibration` package,
and some utilities come from the `sofia_redux.toolkit` package.
This reduction object requires that all three packages be installed.
This reduction object defines a method for each pipeline
step, that calls the appropriate algorithm from its source
packages.
Attributes
----------
prodtype_map : dict
Maps the pipeline step to a product type, to assign
to the PRODTYPE key. Keys are pipeline step function names.
prodnames : dict
3-letter file type code, to assign to the output of a pipeline
step. Keys are the product types (as defined in prodtype_map).
step_map : dict
Inverse of the prodtype_map, for looking up pipeline
step names from product types. Keys are the product types.
prodtypes : list
List of product types, corresponding to the currently
loaded recipe. This list is populated whenever the recipe
attribute is set.
slit_image_recipe : list
Alternate processing recipe to use when input data is
marked as a slit image (SLIT!=NONE). Calibration and
coaddition steps are skipped.
mosaic_recipe : list
Alternate processing recipe to use when input is telluric-
corrected or calibrated. Only calibration, registration,
and coaddition are applied.
basehead : `astropy.io.fits.Header`
Header for the first raw input file loaded, used for calibration
configuration.
calres : dict-like
Reduction mode and auxiliary file configuration mapping,
as returned from the sofia_redux.instruments.forcast `getcalpath`
function.
cal_conf : dict-like
Flux calibration and atmospheric correction configuration,
as returned from the sofia_redux.calibration `pipecal_config`
function.
"""
def __init__(self):
"""Initialize the reduction object."""
super().__init__()
# descriptive attributes specific to imaging
self.mode = 'Imaging'
# product type definitions for imaging steps
self.prodtype_map.update(
{'undistort': 'undistorted',
'merge': 'merged',
'register': 'registered',
'tellcor': 'telluric_corrected',
'coadd': 'coadded',
'fluxcal': 'calibrated',
'mosaic': 'mosaic',
'imgmap': 'imgmap'})
self.prodnames.update(
{'undistorted': 'UND',
'merged': 'MRG',
'registered': 'REG',
'telluric_corrected': 'TEL',
'coadded': 'COA',
'calibrated': 'CAL',
'mosaic': 'MOS',
'imgmap': 'IMP'})
# invert the map for quick lookup of step from type
self.step_map = {v: k for k, v in self.prodtype_map.items()}
# default recipe and step names
self.recipe = ['checkhead', 'clean', 'droop', 'nonlin',
'stack', 'undistort', 'merge', 'register',
'tellcor', 'coadd', 'fluxcal', 'imgmap']
self.processing_steps.update(
{'undistort': 'Correct Distortion',
'merge': 'Merge Chops/Nods',
'register': 'Register Images',
'tellcor': 'Telluric Correct',
'coadd': 'Coadd',
'fluxcal': 'Flux Calibrate',
'mosaic': 'Mosaic',
'imgmap': 'Make Image Map'})
# also define a couple alternate recipes for
# special situations
self.default_recipe = self.recipe.copy()
self.slit_image_recipe = ['checkhead', 'clean', 'droop', 'nonlin',
'stack', 'undistort', 'merge']
self.mosaic_recipe = ['fluxcal', 'register', 'coadd', 'imgmap']
# photometric flux calibration information
self.cal_conf = None
[docs]
def load(self, data, param_class=None):
"""
Load input data to make it available to reduction steps.
The process is:
- Call the parent load method to initialize data
reduction variables.
- Use the first loaded FITS header to determine and load
the DRIP configuration (`sofia_redux.instruments.forcast.getcalpath`,
`sofia_redux.instruments.forcast.configuration`).
- Use the loaded configuration and the product type in the
base header to determine the data processing recipe.
- Use the base header to load a calibration configuration
(`sofia_redux.calibration.pipecal_config`).
- Load parameters for all steps.
- Load the data immediately if starting from an intermediate
step; otherwise, just load the raw headers and defer loading
the data from the FITS files.
After this step, the input attribute is populated as required
for the first pipeline step in the recipe.
Parameters
----------
data : list of str or str
Input file paths to load.
param_class : class, optional
Parameters to instantiate, if not FORCASTImagingParameters.
Initialization arguments must match.
"""
# call the parent method to initialize
# reduction variables
Reduction.load(self, data)
# read and save the first FITS header
self.basehead = getheader(self.raw_files[0])
self.calres = getcalpath(self.basehead)
log.debug('Full DRIP cal configuration:')
for key, value in self.calres.items():
log.debug(' {}: {}'.format(key, value))
# load sofia_redux.instruments.forcast config
dripconfig.load(self.calres['conffile'])
# get product type to determine recipe
intermediate = False
prodtype = getpar(self.basehead, 'PRODTYPE', default='UNKNOWN')
# check for non-standard recipes first
if self.calres['slit'] not in ['UNKNOWN', 'NONE', '']:
# slit image
log.info('Slit image detected; using alternate recipe')
self.recipe = self.slit_image_recipe
elif prodtype in ['telluric_corrected', 'calibrated']:
# cal files to mosaic
log.info('TEL or CAL files detected; using mosaic recipe')
self.recipe = self.mosaic_recipe
else:
self.recipe = self.default_recipe
# get remaining recipe
if prodtype in self.prodtypes:
pidx = self.prodtypes.index(prodtype)
self.recipe = self.recipe[pidx + 1:]
if len(self.recipe) == 0:
raise ValueError("No steps to run for "
"prodtype '{}'.".format(prodtype))
intermediate = True
elif prodtype.upper() != 'UNKNOWN':
intermediate = True
# read in the pipecal configuration
self.cal_conf = pipecal_config(self.basehead)
log.debug('Full pipecal configuration:')
for key, value in self.cal_conf.items():
log.debug(' {}: {}'.format(key, value))
if param_class is None:
self.parameters = FORCASTImagingParameters(
drip_cal_config=self.calres,
drip_config=dripconfig.configuration,
pipecal_config=self.cal_conf)
else: # pragma: no cover
# this option is not currently used
self.parameters = param_class(
drip_cal_config=self.calres,
drip_config=dripconfig.configuration,
pipecal_config=self.cal_conf)
# if not starting from raw data, load the files in
# immediately
if intermediate:
self.load_fits(intermediate=True)
else:
# just load headers
self.input = []
for datafile in self.raw_files:
self.input.append(fits.getheader(datafile))
[docs]
def reorganize_c2nc2(self):
"""
Fix old-style C2NC2 files to newer data organization.
Early science flights took C2NC2 data in C2 mode,
with nods separated into separate files. The nod pattern
was: A B A A B A A B. Later flights used the same nod
pattern, but repackaged B nods with the A nods into 5
files: AB BA AB BA AB. This function performs the same
function for old-style data, so that all further steps
may be run in the same way as for the new-style data.
"""
from sofia_redux.instruments.forcast.hdmerge import hdmerge
log.info('Reorganizing old-style C2NC2 to new-style A/B files')
# assume pattern is ABAABAAB
last_hd = None
last_nod = None
last_idx = None
last_fn = None
new_filenum = []
new_input = []
for i, hdul in enumerate(self.input):
idx = int(hdul[0].header['DTHINDEX'])
fn = self.filenum[i]
if idx in [2, 5, 8]:
nodbeam = 'B'
else:
nodbeam = 'A'
if (last_nod is not None
and last_nod != nodbeam
and abs(idx - last_idx) == 1):
# neighbor A and B found
# keep file numbers
new_filenum.append([last_fn, fn])
# stack data
new_data = np.vstack([last_hd[0].data, hdul[0].data])
new_err = np.vstack([last_hd[1].data, hdul[1].data])
hdr_list = [last_hd[0].header, hdul[0].header]
if nodbeam == 'A':
ref_hdr = hdr_list[1]
else:
ref_hdr = hdr_list[0]
new_hdr = hdmerge(hdr_list, reference_header=ref_hdr)
new_hdr['NODBEAM'] = last_nod
log.debug(new_hdr['ASSC_OBS'])
new_hd = last_hd.copy()
new_hd[0].data = new_data
new_hd[1].data = new_err
new_hd[0].header = new_hdr
new_input.append(new_hd)
last_hd = hdul
last_nod = nodbeam
last_idx = idx
last_fn = fn
self.input = new_input
self.filenum = new_filenum
[docs]
def filter_shift(self):
"""
For early data, shift reference pixels for filter offsets.
Header keywords CRPIX1 and CRPIX2 for all files in
self.input are updated with specified offset values.
Pixel offsets by filter are listed in the
sofia_redux.instruments.forcast/data/filtershift.txt file.
After 2015, filter offsets were applied by the instrument
software, so this step is not required.
"""
log.info('Shifting CRPIX for filter offsets.')
px = self.calres['pixshiftx']
py = self.calres['pixshifty']
log.info('Pixel offsets (x,y): {}, {}'.format(px, py))
for hdul in self.input:
for hdu in hdul:
hdu.header['CRPIX1'] += px
hdu.header['CRPIX2'] += py
[docs]
def load_fits(self, intermediate=False):
"""
Load FITS data into memory.
Handles raw data, as well as intermediate data. Intermediate
data may have been produced by the current pipeline version
(multiple HDUs expected), or from the v1 IDL pipeline
(single HDU expected).
Loaded data are stored in the input attribute.
Parameters
----------
intermediate : bool
If False, the
sofia_redux.instruments.forcast.readfits.readfits` function will
be used to read in the data and calculate the
associated error images. If True, the data will
just be read in from disk.
"""
self.input = []
self.filenum = []
for i, datafile in enumerate(self.raw_files):
if not intermediate:
# load raw data and rearrange into appropriate
# extensions
hdul = self._load_raw_data(datafile)
else:
# otherwise, just read the data
input_hdul = fits.open(datafile, lazy_load_hdus=False,
memmap=False)
# touch the data to make sure it is in memory
# and store copies in a new HDUList
# (this avoids storing a file handle in the HDUList)
hdul = fits.HDUList()
for hdu in input_hdul:
assert hdu.data[0] is not None
hdul.append(hdu.copy())
input_hdul.close()
# for legacy data
if len(hdul) == 1:
data = hdul[0].data
header = hdul[0].header
hdinsert(header, 'EXTNAME', 'FLUX',
comment='extension name')
primary = fits.PrimaryHDU(data=data[0, :, :],
header=header)
# fix a couple known bad old WCS keywords
bad_keys = ['XPIXELSZ', 'YPIXELSZ']
for bad_key in bad_keys:
try:
del primary.header[bad_key]
except KeyError:
pass
# make a basic extension header from
# the WCS in the primary
try:
wcs = WCS(primary.header)
except (ValueError, IndexError,
KeyError, MemoryError): # pragma: no cover
log.warning('Bad WCS in input')
ehead = fits.Header()
else:
ehead = wcs.to_header(relax=True)
hdinsert(ehead, 'BUNIT',
header.get('BUNIT', 'UNKNOWN'),
comment='Data units')
with warnings.catch_warnings():
warnings.simplefilter('ignore')
err_data = np.sqrt(data[1, :, :])
err = fits.ImageHDU(data=err_data,
header=ehead, name='ERROR')
if data.shape[0] > 2:
# same for exposure map
hdinsert(ehead, 'BUNIT', 's', comment='Data units')
expmap = fits.ImageHDU(
data=data[2, :, :],
header=ehead, name='EXPOSURE')
hdul = fits.HDUList([primary, err, expmap])
else:
hdul = fits.HDUList([primary, err])
# get the file number from the filename
self.filenum.append(self.getfilenum(datafile))
# update required SOFIA keys if necessary
self.update_sofia_keys(hdul[0].header)
self.input.append(hdul)
# rearrange old-style C2NC2 data if necessary
if not intermediate and self.calres['cnmode'] == 'C2NC2':
self.reorganize_c2nc2()
# update CRPIX for filter shifts if necessary
if not intermediate \
and (self.calres.get('pixshiftx', 0) != 0
or self.calres.get('pixshifty', 0) != 0):
self.filter_shift()
# store the data in display variables
self.set_display_data(raw=True)
[docs]
def undistort(self):
"""
Correct for optical distortion.
Calls `sofia_redux.instruments.forcast.undistort.undistort`.
Detailed pinhole model parameters should be specified in the
DRIP config file.
"""
# import from sofia_redux.instruments.forcast
from sofia_redux.instruments.forcast.undistort import undistort
# get parameters
param = self.get_parameter_set()
# set pinfile from calconf
pinfile = param.get_value('pinfile')
if os.path.isfile(pinfile):
pinname = pinfile.split(self.calres['pathcal'])[-1]
log.info('Using pinhole file {}'.format(pinname))
log.info('')
else:
msg = 'No pinhole file provided.'
log.error(msg)
self.error = msg
return
# skip undistort
skip_data = param.get_value('skip_undistort')
if skip_data:
log.info('Skipping distortion correction')
return
outdata = []
if param.get_value('save'):
display_files = []
else:
display_files = None
for i, hdul in enumerate(self.input):
header = hdul[0].header
data = hdul['FLUX'].data
err = hdul['ERROR'].data
# correct for optical distortion
result = undistort(
data, header, variance=err**2,
extrapolate=param.get_value('extrapolate'),
pinhole=pinfile)
if result is None:
msg = "Problem in sofia_redux.instruments.forcast.undistort."
log.error(msg)
self.error = msg
return
else:
outimg, outvar = result
# add pinfile to header
hdinsert(header, 'PINFILE', pinname,
comment='Pinhole mask file')
# update hdul with result
hdul[0].header = header
hdul['FLUX'].data = outimg
hdul['ERROR'].data = np.sqrt(outvar)
outname = self.update_output(
hdul, self.filenum[i], self.prodtypes[self.step_index])
# save if desired
if param.get_value('save'):
outname = self.write_output(hdul, outname)
display_files.append(outname)
outdata.append(hdul)
self.input = outdata
self.set_display_data(filenames=display_files)
[docs]
def merge(self):
"""
Merge on-array chops/nods.
Calls `sofia_redux.instruments.forcast.merge.merge`.
Merging algorithm may be specified in parameters.
"""
# import from sofia_redux.instruments.forcast
from sofia_redux.instruments.forcast.merge import merge
# get parameters
param = self.get_parameter_set()
cormerge_idx = param['cormerge']['option_index']
dripconfig.configuration['cormerge'] = \
self.parameters.merge_opt[cormerge_idx]
skip_rotation = param.get_value('skip_rotation')
outdata = []
if param.get_value('save'):
display_files = []
else:
display_files = None
for i, hdul in enumerate(self.input):
header = hdul[0].header
data = hdul['FLUX'].data
err = hdul['ERROR'].data
# merge chops and nods if desired
# also normalize fluxes for the number of on-source
# exposures, and rotate image to standard coordinates
# (North up, East left)
normmap = np.zeros_like(data)
if skip_rotation:
result = merge(data, header, variance=err**2,
skip_rotation=True, normmap=normmap,
strip_border=True)
log.info('Skipping rotation by sky angle')
else:
result = merge(data, header, variance=err**2,
normmap=normmap, strip_border=True)
if result is None:
msg = "Problem in sofia_redux.instruments.forcast.merge."
log.error(msg)
self.error = msg
return
else:
outimg, outvar = result
# update integration time
nexp = np.nanmax(normmap)
itime = header['DETITIME'] / 2.0
max_exptime = nexp * itime
hdinsert(header, 'EXPTIME', max_exptime,
comment='Nominal on-source integration time [s]')
hdinsert(header, 'NEXP', nexp,
comment='Approximate number of exposures')
# change normmap to an exposure time map
normmap *= itime
# update hdul with result
hdul[0].header = header
hdul['FLUX'].data = outimg
# update WCS for error extension
wcs = WCS(header)
ehead = wcs.to_header(relax=True)
hdinsert(ehead, 'BUNIT',
header.get('BUNIT', 'UNKNOWN'),
comment='Data units')
hdul[1] = fits.ImageHDU(data=np.sqrt(outvar), header=ehead,
name='ERROR')
# append normmap
nhead = ehead.copy()
nhead['BUNIT'] = 's'
nmap_hdu = fits.ImageHDU(data=normmap, header=nhead,
name='EXPOSURE')
hdul.append(nmap_hdu)
outname = self.update_output(
hdul, self.filenum[i], self.prodtypes[self.step_index])
# save if desired
if param.get_value('save'):
outname = self.write_output(hdul, outname)
display_files.append(outname)
outdata.append(hdul)
self.input = outdata
self.set_display_data(filenames=display_files)
[docs]
def register(self):
"""
Register frames to a reference coordinate system.
Calls `sofia_redux.instruments.forcast.register_datasets.get_shifts`.
Registration algorithm may be specified in parameters.
"""
# import from sofia_redux.instruments.forcast
from sofia_redux.instruments.forcast.register_datasets \
import get_shifts, resize_datasets
# get parameters
param = self.get_parameter_set()
corcoadd_idx = param['corcoadd']['option_index']
corcoadd = self.parameters.merge_opt[corcoadd_idx]
# set a couple parameter values in config
dripconfig.configuration['xyshift'] = param.get_value('xyshift')
dripconfig.configuration['mfwhm'] = param.get_value('mfwhm')
offsets = param.get_value('offsets')
if str(offsets).strip().lower() not in ['', 'none']:
log.debug('Offsets: {}'.format(offsets))
overrides = list(offsets.split(';'))
if len(overrides) != len(self.input):
msg = "Number of offsets (separated by ';') " \
"does not match number of images."
log.error(msg)
self.error = msg
return
for i, ofs in enumerate(overrides):
try:
ofx, ofy = ofs.split(',')
overrides[i] = (-1 * float(ofx), -1 * float(ofy))
except (ValueError, IndexError):
msg = "Must provide valid x,y offsets for " \
"each image."
log.error(msg)
self.error = msg
return
corcoadd = 'OVERRIDE'
else:
overrides = None
log.debug('Overrides: {}'.format(overrides))
log.info('Algorithm: {}'.format(corcoadd))
if corcoadd == 'WCS':
shifts = [(0., 0.)] * len(self.input)
else:
# organize data into datasets for registration
datasets = []
dcrpix = []
for hdul in self.input:
header = hdul[0].header
data = hdul['FLUX'].data
err = hdul['ERROR'].data
nmap = hdul['EXPOSURE'].data
datasets.append((data, header, err ** 2, nmap))
dcrpix.append([header.get('DCRPIX1', 0.0),
header.get('DCRPIX2', 0.0)])
dcrpix = np.array(dcrpix)
# resize to the same size -- necessary for cross-correlation
if corcoadd == 'XCOR' or corcoadd == 'CENTROID':
datasets = resize_datasets(datasets)
with warnings.catch_warnings():
warnings.simplefilter('ignore')
shifts = get_shifts(datasets, user_shifts=overrides,
algorithm=corcoadd, do_wcs_shift=True)
# check for errors, correct for relative image changes
for idx, s in enumerate(shifts):
if s is None or not np.all(np.isfinite(s)):
log.warning("Failed to register dataset %i; "
"setting shift to 0." % idx)
shifts[idx] = (0., 0.)
elif corcoadd.lower() == 'header':
# DCRPIX1,2 records change to CRPIX values
# from undistort and merge -- if it has changed
# differently from the reference image, the change
# needs to be added into header shifts, which
# are calculated from dither values only
log.debug('Original shift: {}'.format(shifts[idx]))
log.debug('Delta CRPIX: {}'.format(dcrpix[idx]))
log.debug('Reference delta CRPIX: {}'.format(dcrpix[0]))
shifts[idx] += dcrpix[idx] - dcrpix[0]
log.debug('New shift: {}'.format(shifts[idx]))
outdata = []
disp_shifts = []
if param.get_value('save'):
display_files = []
else:
display_files = None
for i, hdul in enumerate(self.input):
shift = shifts[i]
shiftstr = '{:.2f},{:.2f}'.format(*shift)
disp_shifts.append(shiftstr)
# update the header with the shifts
for hdu in hdul:
hdu.header['CRPIX1'] += shift[0]
hdu.header['CRPIX2'] += shift[1]
# add some history messages
header = hdul[0].header
hdinsert(header, 'HISTORY',
'Register: Method: {}'.format(corcoadd))
hdinsert(header, 'HISTORY',
'Register: WCS shifts applied: {}'.format(shiftstr))
outname = self.update_output(
hdul, self.filenum[i], self.prodtypes[self.step_index])
outdata.append(hdul)
# save if desired
if param.get_value('save'):
outname = self.write_output(hdul, outname)
display_files.append(outname)
log.info('CRPIX shifts used:')
log.info(';'.join(disp_shifts))
self.input = outdata
self.set_display_data(filenames=display_files)
[docs]
def tellcor(self):
"""
Correct for atmospheric absorption.
Calls `sofia_redux.calibration.pipecal_config.pipecal_config`
and `sofia_redux.calibration.pipecal_util.apply_tellcor`.
For standards, photometry is performed with
`sofia_redux.calibration.pipecal_util.run_photometry`.
"""
from sofia_redux.calibration.pipecal_util \
import apply_tellcor, run_photometry
from sofia_redux.calibration.pipecal_config import pipecal_config
# get parameters
param = self.get_parameter_set()
use_wv = param.get_value('use_wv')
outdata = []
if param.get_value('save'):
display_files = []
else:
display_files = None
for i, hdul in enumerate(self.input):
header = hdul[0].header
data = hdul['FLUX'].data
err = hdul['ERROR'].data
# use pipecal to telluric-correct
# allow config to vary by file
# (in case data spans multiple flight series)
config = pipecal_config(header)
outimg, outvar = apply_tellcor(
data, header, config, variance=err**2,
use_wv=use_wv)
# run photometry for standards
if self.calres['obstype'] == 'STANDARD_FLUX':
log.info('')
try:
run_photometry(outimg, header, outvar, config,
allow_badfit=True)
except PipeCalError: # pragma: no cover
log.warning('Photometry failed.')
log.info('')
# update hdul with result
hdul[0].header = header
hdul['FLUX'].data = outimg
hdul['ERROR'].data = np.sqrt(outvar)
outname = self.update_output(
hdul, self.filenum[i], self.prodtypes[self.step_index])
# save if desired
if param.get_value('save'):
outname = self.write_output(hdul, outname)
display_files.append(outname)
outdata.append(hdul)
self.input = outdata
self.set_display_data(filenames=display_files)
[docs]
def coadd(self):
"""
Combine registered images.
Calls `sofia_redux.toolkit.image.combine.combine_images` for
image coaddition. For standards, photometry is run on
the coadded image with
`sofia_redux.calibration.pipecal_util.run_photometry`.
Input headers are merged with
`sofia_redux.instruments.forcast.hdmerge.hdmerge`.
The combination method may be configured in parameters,
or coadd may be skipped entirely if desired. In this case,
a COA file is written to disk for each input file.
"""
from sofia_redux.calibration.pipecal_util import run_photometry
from sofia_redux.instruments.forcast.hdmerge import hdmerge
from sofia_redux.toolkit.image.coadd import coadd
# get parameters
param = self.get_parameter_set()
do_coadd = not param.get_value('skip_coadd')
if not do_coadd:
# just write COA files to disk for each input
if param.get_value('save'):
display_files = []
else:
display_files = None
for i, hdul in enumerate(self.input):
outname = self.update_output(
hdul, self.filenum[i],
self.prodtypes[self.step_index])
if param.get_value('save'):
outname = self.write_output(hdul, outname)
display_files.append(outname)
self.set_display_data(filenames=display_files)
return
reference = param.get_value('reference')
method = param.get_value('method')
weighted = param.get_value('weighted')
robust = param.get_value('robust')
sigma = param.get_value('threshold')
maxiters = param.get_value('maxiters')
smoothing = param.get_value('smoothing')
rotation = not param.get_value('skip_rotation')
if 'target' in str(reference).lower():
log.info('Correcting for target motion, if necessary.')
reference = 'target'
else:
log.info('Using first image as reference WCS.')
reference = 'first'
hdr_list = []
data_list = []
var_list = []
exp_list = []
for hdul in self.input:
hdr_list.append(hdul[0].header)
data_list.append(hdul['FLUX'].data)
var_list.append(hdul['ERROR'].data**2)
exp_list.append(hdul['EXPOSURE'].data)
outhdr, outdata, outvar, expmap = coadd(
hdr_list, data_list, var_list, exp_list,
method=method, weighted=weighted,
robust=robust, sigma=sigma, maxiters=maxiters, rotate=rotation,
smoothing=smoothing, reference=reference)
# output header
outhdr = hdmerge(hdr_list, reference_header=outhdr)
extwcs = WCS(outhdr).to_header(relax=True)
# add some history messages for combination parameters
hdinsert(outhdr, 'HISTORY',
'Coadd: Method: {}'.format(method))
if method == 'mean':
hdinsert(outhdr, 'HISTORY',
'Coadd: Weighted: {}'.format(weighted))
hdinsert(outhdr, 'HISTORY',
'Coadd: Robust: {}'.format(robust))
if robust:
hdinsert(outhdr, 'HISTORY',
'Coadd: Threshold: {}'.format(sigma))
hdinsert(outhdr, 'HISTORY',
'Coadd: Max. Iters: {}'.format(maxiters))
# update integration time from map
exptime = np.nanmax(expmap)
hdinsert(outhdr, 'EXPTIME', exptime,
comment='Nominal on-source integration time [s]')
# re-run photometry for standards
# use the basehead calibration config
if self.calres['obstype'] == 'STANDARD_FLUX':
log.info('')
try:
run_photometry(outdata, outhdr, outvar, self.cal_conf,
allow_badfit=True)
except PipeCalError: # pragma: no cover
log.warning('Photometry failed.')
log.info('')
# check if data should be marked as a level 4 mosaic
# (i.e. input is already calibrated)
procstat = getpar(outhdr, 'PROCSTAT',
default='UNKNOWN', dtype=str)
procstat = procstat.strip().upper()
if procstat == 'LEVEL_3':
hdinsert(outhdr, 'PROCSTAT', 'LEVEL_4',
comment='Processing status')
ptype = 'mosaic'
else:
ptype = self.prodtypes[self.step_index]
# store output data
hdul = fits.HDUList()
# copy only the expected extensions
expected = ['FLUX', 'ERROR', 'EXPOSURE']
for hdu in self.input[0]:
if hdu.header.get('EXTNAME', 'UNKNOWN').upper() in expected:
hdul.append(hdu)
hdul[0].header = outhdr
hdul['FLUX'].data = outdata
hdinsert(extwcs, 'BUNIT', outhdr.get('BUNIT', 'UNKNOWN'),
comment='Data units')
hdul['ERROR'] = fits.ImageHDU(data=np.sqrt(outvar), header=extwcs,
name='ERROR')
hdinsert(extwcs, 'BUNIT', 's', comment='Data units')
hdul['EXPOSURE'] = fits.ImageHDU(data=expmap, header=extwcs,
name='EXPOSURE')
outname = self.update_output(hdul, self.filenum, ptype)
self.input = [hdul]
self.filenum = [self._catfilenum(self.filenum)]
# save if desired
if param.get_value('save'):
outname = self.write_output(hdul, outname)
self.set_display_data(filenames=[outname])
else:
self.set_display_data()
[docs]
def fluxcal(self):
"""
Calibrate flux to physical units.
Calls `sofia_redux.calibration.pipecal_util.apply_fluxcal`. For
standards, photometry may optionally be re-run,
using `sofia_redux.calibration.pipecal_util.run_photometry`. The
pipecal config is determined individually for each
file, so that different calibration factors may be
applied to each file if necessary.
"""
from sofia_redux.calibration.pipecal_util \
import apply_fluxcal, run_photometry
from sofia_redux.calibration.pipecal_config import pipecal_config
# get parameters
param = self.get_parameter_set()
do_phot = param.get_value('rerun_phot')
srcpos = param.get_value('srcpos')
fitsize = param.get_value('fitsize')
fwhm = param.get_value('fwhm')
profile = param.get_value('profile')
# args for photometry
kwargs = {'fitsize': fitsize,
'fwhm': fwhm,
'profile': profile}
# read srcpos parameter
if do_phot and str(srcpos).strip().lower() not in ['', 'none']:
try:
cx, cy = [float(n) for n in srcpos.split(',')]
kwargs['srcpos'] = [cx, cy]
except (ValueError, IndexError):
msg = "Invalid source position specified."
log.error(msg)
self.error = msg
return
outdata = []
if param.get_value('save'):
display_files = []
else:
display_files = None
for i, hdul in enumerate(self.input):
header = hdul[0].header
data = hdul['FLUX'].data
err = hdul['ERROR'].data
variance = err**2
# allow config to vary
config = pipecal_config(header)
# re-run photometry first if desired
if do_phot and self.calres['obstype'] == 'STANDARD_FLUX':
run_photometry(data, header, variance, config,
**kwargs)
# use pipecal to calibrate
outimg, outvar = apply_fluxcal(
data, header, config, variance=variance,
write_history=True)
# set beam keywords in header
cdelt = np.abs(header.get('CDELT1', 0.768 / 3600))
beam = config.get('fwhm', 5.0) * cdelt
hdinsert(header, 'BMAJ', beam,
comment='Beam major axis (deg)')
hdinsert(header, 'BMIN', beam,
comment='Beam minor axis (deg)')
hdinsert(header, 'BPA', 0.0,
comment='Beam angle (deg)')
# if data is not calibrated (cal factor could not be found),
# then continue
if '3' not in header['PROCSTAT']:
outdata.append(hdul)
continue
# print source flux after calibration
if self.calres['obstype'] == 'STANDARD_FLUX':
try:
flux = header['STAPFLX'] / header['CALFCTR']
flux_err = header['STAPFLXE'] / header['CALFCTR']
log.info('')
log.info('After calibration:')
log.info('Source Flux: '
'{:.2f} +/- {:.2f} Jy'.format(flux, flux_err))
except (KeyError, ValueError):
pass
else:
try:
modlflx = header['MODLFLX']
modlflxe = header['MODLFLXE']
log.info(
'Model Flux: '
'{:.3f} +/- {:.3f} Jy'.format(modlflx, modlflxe))
log.info(
'Percent difference from model: '
'{:.1f}%'.format(100 * (flux - modlflx) / modlflx))
except KeyError:
pass
log.info('')
# update hdul with result
hdul[0].header = header
hdul['FLUX'].data = outimg
hdinsert(hdul[1].header, 'BUNIT',
header.get('BUNIT', 'UNKNOWN'),
comment='Data units')
hdul[1].data = np.sqrt(outvar)
outname = self.update_output(
hdul, self.filenum[i], self.prodtypes[self.step_index])
# save if desired
if param.get_value('save'):
outname = self.write_output(hdul, outname)
display_files.append(outname)
outdata.append(hdul)
self.input = outdata
self.set_display_data(filenames=display_files)
[docs]
def imgmap(self):
"""
Generate a quick-look image map.
Calls `sofia_redux.visualization.quicklook.make_image`.
The output from this step is identical to the input, so is
not saved. As a side effect, a PNG file is saved to disk to the
same base name as the input file, with a '.png' extension.
"""
from sofia_redux.visualization.quicklook import make_image
# get parameters
param = self.get_parameter_set()
colormap = param.get_value('colormap')
scale = param.get_value('scale')
n_contour = param.get_value('n_contour')
contour_color = param.get_value('contour_color')
fill_contours = param.get_value('fill_contours')
grid = param.get_value('grid')
beam = param.get_value('beam')
watermark = param.get_value('watermark')
crop_border = param.get_value('crop_border')
for i, hdul in enumerate(self.input):
header = hdul[0].header
# set text for title and subtitle
obj = header.get('OBJECT', 'UNKNOWN')
spectel = self.calres['spectel']
basename = os.path.basename(header.get('FILENAME', 'UNKNOWN'))
title = f'Object: {obj}, Filter: {spectel}'
subtitle = f'Filename: {basename}'
# add a default beam to the header if standard keywords
# are not present
if beam and 'BMAJ' not in header:
cdelt = np.abs(header.get('CDELT1', 0.768 / 3600))
beam = 5.0 * cdelt
hdinsert(header, 'BMAJ', beam,
comment='Beam major axis (deg)')
hdinsert(header, 'BMIN', beam,
comment='Beam minor axis (deg)')
hdinsert(header, 'BPA', 0.0,
comment='Beam angle (deg)')
# set crop to trim NaN border if needed
crop_region = None
if crop_border:
img = hdul['FLUX'].data
badmask = np.isnan(img)
badmask[img == 0] = True
strip_rows = np.all(badmask, axis=1)
strip_cols = np.all(badmask, axis=0)
xl = np.argmax(~strip_cols)
xu = np.argmax(np.flip(~strip_cols))
yl = np.argmax(~strip_rows)
yu = np.argmax(np.flip(~strip_rows))
xu, yu = len(strip_cols) - xu, len(strip_rows) - yu
crop_region = [(xu - xl) / 2 + xl, (yu - yl) / 2 + yl,
(xu - xl) / 2, (yu - yl) / 2]
# make the image figure
fig = make_image(hdul, colormap=colormap,
scale=scale, n_contour=n_contour,
contour_color=contour_color,
fill_contours=fill_contours, title=title,
subtitle=subtitle, grid=grid, beam=beam,
watermark=watermark, crop_region=crop_region,
crop_unit='pixel')
# output filename for image
fname = os.path.splitext(basename)[0] + '.png'
outname = os.path.join(self.output_directory, fname)
fig.savefig(outname, dpi=300)
log.info(f'Saved image to {outname}')