Source code for sofia_redux.instruments.fifi_ls.lambda_calibrate

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import os

import astropy.constants as const
from astropy.io import fits
from astropy import log
from astropy import units
import numpy as np
import pandas as pd

from sofia_redux.instruments import fifi_ls
from sofia_redux.toolkit.utilities \
    import (gethdul, write_hdul, hdinsert, goodfile,
            multitask)

__all__ = ['clear_wavecal_cache', 'get_wavecal_from_cache',
           'store_wavecal_in_cache', 'read_wavecal', 'wave',
           'lambda_calibrate', 'wrap_lambda_calibrate']

__wavecal_cache = {}


[docs] def clear_wavecal_cache(): """ Clear all data from the wavecal cache. """ global __wavecal_cache __wavecal_cache = {}
[docs] def get_wavecal_from_cache(wavecalfile): """ Retrieves wavelength calibration data from the wavecal cache. Checks to see if the file still exists, can be read, and has not been altered since last time. If the file has changed, it will be deleted from the cache so that it can be re-read. Parameters ---------- wavecalfile : str File path to the wavecal file Returns ------- wavecal : pd.DataFrame Wavelength calibration table """ global __wavecal_cache if wavecalfile not in __wavecal_cache: return if not goodfile(wavecalfile): try: del __wavecal_cache[wavecalfile] except KeyError: # pragma: no cover pass return modtime = str(os.path.getmtime(wavecalfile)) if modtime not in __wavecal_cache.get(wavecalfile, {}): return log.debug("Retrieving data from wavecal file (%s) in cache" % wavecalfile) return __wavecal_cache.get(wavecalfile, {}).get(modtime)
[docs] def store_wavecal_in_cache(wavecalfile, wavecal): """ Store wavecal data in the wavecal cache. Parameters ---------- wavecalfile : str File path to the wavecal file wavecal : pd.DataFrame Wave calibration table """ global __wavecal_cache log.debug(f"Storing wavecal data ({wavecalfile}) in cache") __wavecal_cache[wavecalfile] = {} modtime = str(os.path.getmtime(wavecalfile)) __wavecal_cache[wavecalfile][modtime] = wavecal
[docs] def read_wavecal(calfile=None): """ Read and return the data from the wavecal file. Parameters ---------- calfile : str, optional Path to the calibration results text file. If not specified, it will be read from fifi_ls/data/wave_cal/FIFI_LS_WaveCal_Coeffs.txt. Returns ------- pd.DataFrame """ if calfile is None: calfile = os.path.join( os.path.dirname(fifi_ls.__file__), 'data', 'wave_cal', 'FIFI_LS_WaveCal_Coeffs.txt') wavecal = get_wavecal_from_cache(calfile) if wavecal is not None: return wavecal if not goodfile(calfile, verbose=True): raise ValueError( f"Cannot read wavelength calibration file: {calfile}") try: colnames = ['Date', 'ch', 'g0', 'NP', 'a', 'PS', 'QOFF', 'QS'] colnames += [f'ISOFF{i + 1}' for i in range(25)] df = pd.read_csv(calfile, comment='#', sep=r'\s+', names=colnames) except Exception as err: # pragma: no cover raise ValueError(f"Cannot parse {calfile} to dataframe: {str(err)}") df.calfile = calfile log.debug(f"Loaded wavelength calibration file: {calfile}") store_wavecal_in_cache(calfile, df) return df
[docs] def wave(ind, date, dichroic, blue=None, wavecal=None): """ Calculate wavelengths for each spectral pixel. Requires FIFI_LS_WaveCal_Coeffs.txt file, in fifi_ls/data/wave_cal/, as provided by FIFI-LS team. The procedure is: 1. Read wavelength calibration file. This file is stored in a text table format, provided by the FIFI-LS instrument team. The FIFI-LS team also provides the wavelength equation. 2. Loop over spaxels, calculating the wavelength and spectral width for each spexel, from the parameters specified in the calibration file. 3. Return an array of wavelengths. Parameters ---------- ind : int Inductosyn position date : array_like of int [Y, M, D] vector dichroic : int Dichroic value (105 or 130) blue : str, optional Either 'B1' to indicate first BLUE order of 'B2' to indicate second BLUE order. If not provided, data is assumed to be RED channel wavecal : pd.DataFrame, optional DataFrame containing wave calibration data. May be supplied in order to remove the overhead of reading the file every iteration of this function. Returns ------- dict Contains: - 'wavelength' (numpy.ndarray): a 2-D array (16, 25) containing the wavelength of each pixel for each module in microns. - 'width' (numpy.ndarray): a 2-D array (16, 25) containing the spectral width of each pixel for each spatial module in microns - 'calfile' (str): the name of the wavelength calibration file used """ if blue is None: channel = 'R' elif str(blue).upper() in ['B1', 'B2']: channel = str(blue).upper() else: log.error("BLUE must be B1 or B2") return try: ind = int(ind) except (TypeError, ValueError): log.error("Invalid IND") return if ind < 0: log.error("IND must be positive") return if len(date) != 3: log.error("DATE must be an array of length 3") return if not isinstance(wavecal, pd.DataFrame) or len(wavecal) == 0: wavecal = read_wavecal() # Get date in YYYYMMDD format obsdate = int(''.join([str(x).zfill(2) for x in date])) wavecal_dates = np.array(pd.unique(wavecal['Date'])) wavecal_dates = wavecal_dates[wavecal_dates <= obsdate] if len(wavecal_dates) == 0: log.error(f"No calibration data found for date {obsdate}") return last_date = wavecal_dates.max() if 'B' in channel: # no dichroic difference for blue config = channel else: config = f'{channel}{dichroic}' rows = wavecal[(wavecal['Date'] == last_date) & (wavecal['ch'] == config)] if len(rows) == 0: log.error(f'{last_date} {config} wavecal data not found') return cal = rows.iloc[0] # fixed parameter by channel if 'B' in config: gamma = 8.90080e-03 else: gamma = 1.67200e-02 # axis 0 pix = np.arange(1, 17) sign = 2 * (((pix - cal['QOFF']) > 0) - 0.5) delta = ((pix - 8.5) * cal['PS']) delta += sign * cal['QS'] * (pix - cal['QOFF']) ** 2 # axis 1 isf = 1.0 isoff = np.array([cal[f'ISOFF{i + 1}'] for i in range(25)]) modules = np.arange(25) slitpos = 25 - (6 * (modules // 5)) + (modules % 5) g = cal['g0'] g *= np.cos(np.arctan((slitpos - cal['NP']) / cal['a'])) phi = 2 * np.pi * isf * (ind + isoff) / (2 ** 24) # cross terms for wavelength w = np.sin(np.add.outer(delta + gamma, phi)) w += np.sin(phi - gamma)[None] w *= 1000 * g[None] # cross terms for pixel width p = np.cos(np.add.outer(delta + gamma, phi)) p *= (cal['PS'] + 2 * sign * cal['QS'] * (pix - cal['QOFF']))[..., None] p *= 1000 * g[None] if channel.lower() == 'b2': w /= 2 p /= 2 wavefile = wavecal.calfile if hasattr(wavecal, 'calfile') else None return {'wavelength': w, 'width': p, 'wavefile': wavefile}
[docs] def lambda_calibrate(filename, obsdate=None, outdir=None, write=False, wavecal=None): """ Apply spectral calibration. The procedure is: 1. Read input file. 2. Call fifi_ls.wave to calculate wavelength values and spectral widths in microns for each spexel. 3. Create FITS file and (optionally) write results to disk. The generated FITS file contains n_scan * 3 image extensions: DATA, STDDEV, and LAMBDA for each scan, named with a '_G{i}' suffix for each grating scan index i. The LAMBDA arrays are always 25 x 16. The DATA and STDDEV arrays are 25 x 16 x n_ramp for OTF mode A nods, and 25 x 16 otherwise. If a SCANPOS_G{i} table is present in the input, it will also be attached unmodified to the output. Parameters ---------- filename : str File to be wavelength-calibrated. Should have been generated by fifi_ls.combine_nods. obsdate : array_like of int, optional Date of observation. Intended for files that do not have the DATE-OBS keyword (and value) in the FITS primary header (early files do not). Format is [YYYY,MM,DD]. outdir : str, optional Directory path to write output. If None, output files will be written to the same directory as the input files. write : bool, optional If True, write to disk and return the path to the output file. If False, return the HDUList. The output directory is the same as the directory of the input file. wavecal : pd.DataFrame, optional DataFrame containing wave calibration data. May be supplied in order to remove the overhead of reading the FIFI_LS_WaveCal_Coeffs.txt file every iteration of this function. Returns ------- fits.HDUList or str Either the HDUList (if write is False) or the filename of the output file (if write is True). """ hdul = gethdul(filename, verbose=True) if hdul is None: return if not isinstance(filename, str): filename = hdul[0].header['FILENAME'] if isinstance(outdir, str): if not os.path.isdir(outdir): log.error("Output directory %s does not exist" % outdir) return if not isinstance(outdir, str): outdir = os.path.dirname(filename) if not isinstance(wavecal, pd.DataFrame) or len(wavecal) == 0: wavecal = read_wavecal() if wavecal is None: log.warning("Unable to read wave calibration data") return dichroic = hdul[0].header.get('DICHROIC') channel = hdul[0].header.get('CHANNEL') b_order = int(hdul[0].header.get('G_ORD_B', -1)) if channel == 'BLUE': if b_order in [1, 2]: blue = 'B%i' % b_order else: log.error("Invalid Blue grating order in file %s" % filename) return else: blue = None if obsdate is None: obsdate = hdul[0].header.get('DATE-OBS') try: obsdate = [int(x) for x in obsdate[:10].split('-')] except (ValueError, TypeError, IndexError): log.error('Invalid DATE-OBS in file %s' % filename) return outname = os.path.basename(filename) for replace in ['NCM', 'CSB', 'RP0', 'RP1']: outname = outname.replace(replace, 'WAV') result = fits.HDUList(fits.PrimaryHDU(header=hdul[0].header)) wavefile = None speed_of_light = const.c.to(units.um / units.s).value ngrating = hdul[0].header.get('NGRATING', 1) for idx in range(ngrating): name = f'FLUX_G{idx}' stdname = f'STDDEV_G{idx}' exthdr = hdul[name].header data = hdul[name].data stddev = hdul[stdname].data ind = exthdr.get('INDPOS') calibration = wave(ind, obsdate, dichroic, blue=blue, wavecal=wavecal) if calibration is None: log.error("Wavelength calibration failed") return if calibration['wavefile'] is not None: wavefile = calibration['wavefile'] wavelength = calibration['wavelength'] pixelwidth = calibration['width'] # transform the fluxes into flux densities by dividing the # flux by the frequency interval dnu = speed_of_light * pixelwidth / (wavelength ** 2) data /= dnu stddev /= dnu log.debug("Pixel width (dlambda) at 2,2: %s" % ', '.join([str(x) for x in pixelwidth[:, 12]])) log.debug("Pixel width (dnu) at 2,2: %s" % ', '.join([str(x) for x in dnu[:, 12]])) exthdr['BUNIT'] = ('adu/(Hz s)', 'Data units') result.append(fits.ImageHDU(data, header=exthdr, name=name)) result.append(fits.ImageHDU(stddev, header=exthdr, name=stdname)) exthdr['BUNIT'] = 'um' result.append(fits.ImageHDU(wavelength, header=exthdr, name=name.replace('FLUX', 'LAMBDA'))) # propagate scan positions forward if present pname = f'SCANPOS_G{idx}' if pname in hdul: result.append(hdul[pname].copy()) # update header result[0].header['HISTORY'] = 'Wavelength calibrated' result[0].header['PRODTYPE'] = 'wavelength_calibrated' result[0].header['FILENAME'] = outname if wavefile is not None: hdinsert(result[0].header, 'WAVEFILE', os.path.basename(wavefile), comment='Wavelength calibration file') if not write: return result else: return write_hdul(result, outdir=outdir, overwrite=True)
def lambda_calibrate_wrap_helper(_, kwargs, filename): return lambda_calibrate(filename, **kwargs)
[docs] def wrap_lambda_calibrate(files, outdir=None, obsdate=None, allow_errors=False, write=False, jobs=None): """ Wrapper for lambda_calibrate over multiple files. See `lambda_calibrate` for full description of reduction on a single file. Parameters ---------- files : array_like of str paths to files to be wavelength calibrated outdir : str, optional Directory path to write output. If None, output files will be written to the same directory as the input files. obsdate : array_like of int, optional Date of observation. Intended for files that do not have the DATE-OBS keyword (and value) in the FITS primary header (early files do not). Format is [YYYY,MM,DD]. allow_errors : bool, optional If True, return all created files on error. Otherwise, return None write : bool, optional If True, write the output to disk and return the filename instead of the HDU. jobs : int, optional Specifies the maximum number of concurrently running jobs. Values of 0 or 1 will result in serial processing. A negative value sets jobs to `n_cpus + 1 + jobs` such that -1 would use all cpus, and -2 would use all but one cpu. Returns ------- tuple of str output filenames written to disk """ if isinstance(files, str): files = [files] if not hasattr(files, '__len__'): log.error("Invalid input files type (%s)" % repr(files)) return clear_wavecal_cache() kwargs = {'outdir': outdir, 'obsdate': obsdate, 'write': write} output = multitask(lambda_calibrate_wrap_helper, files, None, kwargs, jobs=jobs) failure = False result = [] for x in output: if x is None: failure = True else: result.append(x) if failure: if len(result) > 0: if not allow_errors: log.error("Errors were encountered but the following " "files were created:\n%s" % '\n'.join(result)) return clear_wavecal_cache() return tuple(result)