Source code for sofia_redux.instruments.fifi_ls.combine_grating_scans

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import os

from astropy import log
from astropy.io import fits
import numpy as np

from sofia_redux.toolkit.utilities \
    import (gethdul, write_hdul, hdinsert, multitask)

__all__ = ['get_lambda_overlap', 'combine_extensions',
           'combine_grating_scans', 'wrap_combine_grating_scans']


[docs] def get_lambda_overlap(hdul): """ Get overlapping wavelengths for all extensions. Parameters ---------- hdul : fits.HDUList Returns ------- 2-tuple overlap minimum, overlap maximum """ overlap_min, overlap_max = [], [] ng = hdul[0].header.get('NGRATING', 1) for idx in range(ng): name = f'LAMBDA_G{idx}' w = hdul[name].data overlap_min.append(w.min()) overlap_max.append(w.max()) overlap_min = max(overlap_min) overlap_max = min(overlap_max) return overlap_min, overlap_max
[docs] def combine_extensions(hdul, correct_bias=False): """ Combine all extensions into a single extension. Parameters ---------- hdul : fits.HDUList correct_bias : bool, optional If True, additive offset between separate grating scans will be removed, using overlapping wavelength regions to determine the value of the bias. Returns ------- fits.HDUList """ # get number of grating scans ng = hdul[0].header.get('NGRATING', 1) # check data dimensions for first flux extension nd = hdul['FLUX_G0'].data.ndim if nd > 2 and ng > 1: msg = 'Grating scans are not supported in OTF mode.' log.error(msg) return # for single grating scan, just grab data from input if ng == 1: fluxes = hdul['FLUX_G0'].data stddevs = hdul['STDDEV_G0'].data lambdas = hdul['LAMBDA_G0'].data # expand x and y values to match data shape xes = np.zeros_like(fluxes) yes = np.zeros_like(fluxes) ras = np.zeros_like(fluxes) decs = np.zeros_like(fluxes) xes[..., :] = hdul['XS_G0'].data[..., :] yes[..., :] = hdul['YS_G0'].data[..., :] ras[..., :] = hdul['RA_G0'].data[..., :] decs[..., :] = hdul['DEC_G0'].data[..., :] exthdr = hdul['FLUX_G0'].header mean_bg = [exthdr.get('BGLEVL_A', 0), exthdr.get('BGLEVL_B', 0)] else: lambdas = np.zeros((16 * ng, 25)) fluxes = np.zeros_like(lambdas) stddevs = np.zeros_like(lambdas) xes = np.zeros_like(lambdas) yes = np.zeros_like(lambdas) ras = np.zeros_like(lambdas) decs = np.zeros_like(lambdas) mval = np.full((ng, 25), np.nan) bgval = np.zeros((ng, 2)) overlap_min, overlap_max = 0, 0 if correct_bias: overlap_min, overlap_max = get_lambda_overlap(hdul) if overlap_min >= overlap_max: log.info("No overlapping wavelengths; " "not correcting bias offset") correct_bias = False for idx in range(ng): name = f'FLUX_G{idx}' widx = np.arange(16 * idx, 16 * (idx + 1)) data = hdul[name].data stddev = hdul[name.replace('FLUX', 'STDDEV')].data w = hdul[name.replace('FLUX', 'LAMBDA')].data # Read the raw background levels from the extension headers # for averaging exthdr = hdul[name].header bgval[idx, :] = np.array( [exthdr.get('BGLEVL_A', 0), exthdr.get('BGLEVL_B', 0)]) lambdas[widx, :] = w fluxes[widx, :] = data stddevs[widx, :] = stddev xes[widx, :] = hdul[name.replace('FLUX', 'XS')].data yes[widx, :] = hdul[name.replace('FLUX', 'YS')].data ras[widx, :] = hdul[name.replace('FLUX', 'RA')].data decs[widx, :] = hdul[name.replace('FLUX', 'DEC')].data # If correcting bias, get the mean value of all overlapping # data for this extension if correct_bias: midx = (w[:, 0] >= overlap_min) & (w[:, 0] <= overlap_max) if midx.sum() > 3: mval[idx, :] = np.nanmean(data[midx, :], axis=0) else: mval[idx, :] = np.nan if correct_bias: # Fix bias offset for each grating position at each spaxel mlevel = np.resize(np.nanmean(mval, axis=0), (ng, 25)) diff = mlevel - mval diff[np.isnan(diff)] = 0 for idx in range(ng): widx = np.arange(idx * 16, (idx + 1) * 16) fluxes[widx, :] = fluxes[widx, :] + diff[idx, :] # sort by wavelength sortidx = lambdas.argsort(axis=0) static_inds = np.indices(lambdas.shape) lambdas = lambdas[sortidx, static_inds[1]] fluxes = fluxes[sortidx, static_inds[1]] stddevs = stddevs[sortidx, static_inds[1]] xes = xes[sortidx, static_inds[1]] yes = yes[sortidx, static_inds[1]] ras = ras[sortidx, static_inds[1]] decs = decs[sortidx, static_inds[1]] mean_bg = np.nanmean(bgval, axis=0) # Update header with combined values result = fits.HDUList(fits.PrimaryHDU(header=hdul[0].header)) primehead = result[0].header hdinsert(primehead, 'BGLEVL_A', mean_bg[0], comment='Background level in A nod (ADU/s/spaxel)') hdinsert(primehead, 'BGLEVL_B', mean_bg[1], comment='Background level in B nod (ADU/s/spaxel)') hdinsert(primehead, 'PRODTYPE', 'scan_combined') outname = os.path.basename(hdul[0].header.get('FILENAME', 'UNKNOWN')) outname = outname.replace('FLF', 'SCM').replace('XYC', 'SCM') hdinsert(primehead, 'FILENAME', outname) primehead['HISTORY'] = "Grating scans combined into single extension" exthdr = fits.Header() exthdr['BUNIT'] = ('adu/(Hz s)', 'Data units') result.append(fits.ImageHDU(fluxes, header=exthdr, name='FLUX')) result.append(fits.ImageHDU(stddevs, header=exthdr, name='STDDEV')) exthdr['BUNIT'] = 'um' result.append(fits.ImageHDU(lambdas, header=exthdr, name='LAMBDA')) exthdr['BUNIT'] = 'arcsec' result.append(fits.ImageHDU(xes, header=exthdr, name='XS')) result.append(fits.ImageHDU(yes, header=exthdr, name='YS')) exthdr['BUNIT'] = 'hourangle' result.append(fits.ImageHDU(ras, header=exthdr, name='RA')) exthdr['BUNIT'] = 'degree' result.append(fits.ImageHDU(decs, header=exthdr, name='DEC')) return result
[docs] def combine_grating_scans(filename, correct_bias=False, write=False, outdir=None): """ Combine separate grating positions in FIFI-LS data. Combines separate grating positions in FIFI-LS data into a single extension. Will produce an unevenly spaced wavelength grid if multiple grating (inductosyn) extensions exist in the input file. The procedure is: 1. Read the wavelength values for all grating scans to determine overlapping wavelength regions. 2. Loop over the grating scan extensions, reading the flux from each. Calculate the mean value of any overlapping wavelength regions. Store all data in an unsorted cube. 3. Loop over the spaxels in the cube, correcting the flux to the mean value of all overlap regions, then sorting the data by its associated wavelength value. 4. Create FITS file and (optionally) write results to disk. The output FITS file contains FLUX, STDDEV, LAMBDA, XS, YS, RA, and DEC data arrays. For most data, all arrays have dimension 25 x nw, where nw is the combined number of wavelength samples across all input grating scans. For OTF data, only one grating scan is expected or allowed, so nw is always 16. Output cubes for FLUX, STDDEV, XS, YS, RA, and DEC are 25 x 16 x n_ramp. The LAMBDA array is 25 x 16. Parameters ---------- filename : str Path to the file to be combined correct_bias : bool, optional If True, additive offset between separate grating scans will be removed, using overlapping wavelength regions to determine the value of the bias. outdir : str, optional Directory path to write output. If None, output files will be written to the same directory as the input files. write : bool, optional If True, write to disk and return the path to the output file. If False, return the HDUL. The output filename is created from the input filename, with the suffix 'XYC' or 'FLF' replaced with 'SCM'. Returns ------- fits.HDUList or str Either the HDU (if write is False) or the filename of the output file (if write is True) """ if isinstance(outdir, str): if not os.path.isdir(outdir): log.error("Output directory %s does not exist" % outdir) return hdul = gethdul(filename, verbose=True) if hdul is None: return if not isinstance(filename, str): filename = hdul[0].header['FILENAME'] if not isinstance(outdir, str): outdir = os.path.dirname(filename) result = combine_extensions(hdul, correct_bias=correct_bias) if result is None: log.error('Combination failed') return if not write: return result else: return write_hdul(result, outdir=outdir, overwrite=True)
def combine_scans_wrap_helper(_, kwargs, filename): return combine_grating_scans(filename, **kwargs)
[docs] def wrap_combine_grating_scans(files, outdir=None, correct_bias=True, allow_errors=False, write=False, jobs=None): """ Wrapper for combine_grating_scans over multiple files. See `combine_grating_scans` for full description of reduction on a single file. Parameters ---------- files : array_like of str paths to files to be scan-combined outdir : str, optional Directory path to write output. If None, output files will be written to the same directory as the input files. correct_bias : bool, optional If True, additive offset between separate grating scans will be removed, using overlapping wavelength regions to determine the value of the bias. allow_errors : bool, optional If True, return all created files on error. Otherwise, return None write : bool, optional If True, write the output to disk and return the filename instead of the HDU. jobs : int, optional Specifies the maximum number of concurrently running jobs. Values of 0 or 1 will result in serial processing. A negative value sets jobs to `n_cpus + 1 + jobs` such that -1 would use all cpus, and -2 would use all but one cpu. Returns ------- tuple of str output filenames written to disk """ if isinstance(files, str): files = [files] if not hasattr(files, '__len__'): log.error("Invalid input files type (%s)" % repr(files)) return kwargs = {'outdir': outdir, 'write': write, 'correct_bias': correct_bias} output = multitask(combine_scans_wrap_helper, files, None, kwargs, jobs=jobs) failure = False result = [] for x in output: if x is None: failure = True else: result.append(x) if failure: if len(result) > 0: if not allow_errors: log.error("Errors were encountered but the following " "files were created:\n%s" % '\n'.join(result)) return return tuple(result)