Source code for sofia_redux.instruments.exes.get_atran

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import glob
import os
import re

from astropy import log, units
from astropy.io import fits
import numpy as np

from sofia_redux.instruments import exes
from sofia_redux.toolkit.utilities.fits import goodfile, gethdul, hdinsert
from sofia_redux.spectroscopy.smoothres import smoothres

__all__ = ['clear_atran_cache', 'get_atran_from_cache',
           'store_atran_in_cache', 'get_atran']

__atran_cache = {}


[docs] def clear_atran_cache(): """ Clear all data from the atran cache. """ global __atran_cache __atran_cache = {}
[docs] def get_atran_from_cache(atranfile, resolution): """ Retrieves atmospheric transmission data from the atran cache. Checks to see if the file still exists, can be read, and has not been altered since last time. If the file has changed, it will be deleted from the cache so that it can be re-read. Parameters ---------- atranfile : str File path to the atran file resolution : float Spectral resolution used for smoothing. Returns ------- tuple filename : str Used to update ATRNFILE in FITS headers. wave : numpy.ndarray (nwave,) array of wavelengths. unsmoothed : numpy.ndarray 2D array containing the atran data from file smoothed : numpy.ndarray (nwave,) array containing the smoothed atran data """ global __atran_cache key = atranfile, int(resolution) if key not in __atran_cache: return if not goodfile(atranfile): try: del __atran_cache[key] except KeyError: # pragma: no cover # could happen in race conditions, working in parallel pass return modtime = str(os.path.getmtime(atranfile)) if modtime not in __atran_cache.get(key, {}): return log.debug(f'Retrieving atmospheric transmission data from cache ' f'({key[0]}, resolution {key[1]})') return __atran_cache.get(key, {}).get(modtime)
[docs] def store_atran_in_cache(atranfile, resolution, filename, unsmoothed, wave, smoothed): """ Store atmospheric transmission data in the atran cache. Parameters ---------- atranfile : str File path to the atran file resolution : float Spectral resolution used for smoothing. filename : str Used to update ATRNFILE in FITS headers. unsmoothed : numpy.ndarray 2D array containing the raw data from file wave : numpy.ndarray (nwave,) array of wavelengths. smoothed : numpy.ndarray (nwave,) array containing the smoothed atran data """ global __atran_cache key = atranfile, int(resolution) log.debug(f'Storing atmospheric transmission data in cache ' f'({key[0]}, resolution {key[1]})') __atran_cache[key] = {} modtime = str(os.path.getmtime(atranfile)) __atran_cache[key][modtime] = ( filename, unsmoothed, wave, smoothed)
[docs] def get_atran(header, resolution, filename=None, get_unsmoothed=False, atran_dir=None): """ Retrieve reference atmospheric transmission data. PSG model files in the data/transmission directory should be named according to the altitude, ZA, and wavelengths for which they were generated, as: psg_[alt]K_[za]deg_[wmin]-[wmax]um.fits For example, the file generated for altitude of 41,000 feet, ZA of 45 degrees, and wavelengths between 5 and 28 microns should be named: psg_41K_45deg_5-28um.fits Model files are expected to be FITS images containing two rows. The first is wavenumber values in cm-1; the second is the fractional atmospheric transmission expected at that wavenumber. The procedure is: 1. Identify model file by ZA and Altitude, unless override is provided. 2. Read transmission data from file and smooth to expected spectral resolution. 3. Return transmission array. Parameters ---------- header : astropy.io.fits.header.Header ATRNFILE keyword is written to the provided FITS header, containing the name of the model file used. resolution : float Spectral resolution to which transmission data should be smoothed. filename : str, optional Atmospheric transmission file to be used. If not provided, a default file will be retrieved from the data/transmission directory. The file with the closest matching ZA and Altitude to the input data will be used. If an override file is provided, it should be a FITS image file containing wavenumber and transmission data without smoothing. get_unsmoothed : bool, optional If True, return the unsmoothed atran data with original wavenumber array in addition to the smoothed array. atran_dir : str, optional Path to a directory containing model reference FITS files. If not provided, the default set of files packaged with the pipeline will be used. Returns ------- atran : numpy.ndarray A (2, nw) array containing wavenumber and transmission data. unsmoothed : numpy.ndarray, optional An array containing wavenumber, unsmoothed transmission data, and any additional raw data rows, returned only if get_unsmoothed is set. """ if filename is not None: if not goodfile(filename, verbose=True): log.warning(f'File {filename} not found; ' f'retrieving default') filename = None if filename is None: if not isinstance(header, fits.header.Header): log.error("Invalid header") return za_start = float(header.get('ZA_START', 0)) za_end = float(header.get('ZA_END', 0)) if za_start > 0 >= za_end: za = za_start elif za_end > 0 >= za_start: za = za_end else: za = 0.5 * (za_start + za_end) alt_start = float(header.get('ALTI_STA', 0)) alt_end = float(header.get('ALTI_END', 0)) if alt_start > 0 >= alt_end: alt = alt_start elif alt_end > 0 >= alt_start: alt = alt_end else: alt = 0.5 * (alt_start + alt_end) alt /= 1000 log.info(f'Alt, ZA: {alt:.2f} {za:.2f}') true_value = [alt, za] if atran_dir is not None: if not os.path.isdir(str(atran_dir)): log.warning(f'Cannot find transmission directory: {atran_dir}') log.warning('Using default model set.') atran_dir = None if atran_dir is None: atran_dir = os.path.join(os.path.dirname(exes.__file__), 'data', 'transmission') atran_files = glob.glob(os.path.join(atran_dir, 'psg*fits')) regex1 = re.compile(r'^psg_([0-9]+)K_([0-9]+)deg_5-28um\.fits$') # set up some values for tracking best match overall_val = np.inf best_file = None for f in atran_files: # check for filename match match = regex1.match(os.path.basename(f)) if match is not None: match_val = 0 for i in range(2): # file alt or za file_val = float(match.group(i + 1)) # check difference from true value if true_value[i] != 0: d_val = abs(file_val - true_value[i]) / true_value[i] else: d_val = abs(file_val - true_value[i]) match_val += d_val if match_val < overall_val: overall_val = match_val best_file = f log.info('Using nearest Alt/ZA') filename = best_file if filename is None: log.debug('No PSG file found') return # Read the model data from cache if possible log.info(f'Using PSG file: {filename}') atrandata = get_atran_from_cache(filename, resolution) if atrandata is not None: atranfile, unsmoothed, wave, smoothed = atrandata else: hdul = gethdul(filename, verbose=True) if hdul is None or hdul[0].data is None: log.error(f'Invalid data in model file {filename}') return unsmoothed = hdul[0].data hdul.close() atranfile = os.path.basename(filename) wave = unsmoothed[0] # convert wavenumber to wavelength awave = (unsmoothed[0] * units.kayser).to( units.um, equivalencies=units.spectral()).value # flip for monotonic increasing values awave = np.flip(awave) # smooth every row in the input model smoothed = unsmoothed.copy() for i in range(1, unsmoothed.shape[0]): trans = np.flip(unsmoothed[i]) # smooth to constant resolution smooth_trans = smoothres(awave, trans, resolution) # flip back to match wavenumber smoothed[i] = np.flip(smooth_trans) store_atran_in_cache(filename, resolution, atranfile, unsmoothed, wave, smoothed) hdinsert(header, 'ATRNFILE', atranfile) if not get_unsmoothed: return smoothed else: return smoothed, unsmoothed