# Licensed under a 3-clause BSD style license - see LICENSE.rst
import glob
import os
import re
from astropy import log
from astropy.io import fits
import numpy as np
from sofia_redux.instruments import fifi_ls
from sofia_redux.instruments.fifi_ls.get_resolution import get_resolution
from sofia_redux.toolkit.utilities import goodfile, gethdul, hdinsert
from sofia_redux.spectroscopy.smoothres import smoothres
__all__ = ['clear_atran_cache', 'get_atran_from_cache',
'store_atran_in_cache', 'get_atran', 'get_atran_interpolated']
__atran_cache = {}
[docs]
def clear_atran_cache():
"""
Clear all data from the atran cache.
"""
global __atran_cache
__atran_cache = {}
[docs]
def get_atran_from_cache(atranfile, resolution):
"""
Retrieves atmospheric transmission data from the atran cache.
Checks to see if the file still exists, can be read, and has not
been altered since last time. If the file has changed, it will be
deleted from the cache so that it can be re-read.
Parameters
----------
atranfile : str
File path to the atran file
resolution : float
Spectral resolution used for smoothing.
Returns
-------
tuple
filename : str
Used to update ATRNFILE in FITS headers.
wave : numpy.ndarray
(nwave,) array of wavelengths.
unsmoothed : numpy.ndarray
(nwave,) array containing the atran data from file
smoothed : numpy.ndarray
(nwave,) array containing the smoothed atran data
"""
global __atran_cache
key = atranfile, int(resolution)
if key not in __atran_cache:
return
if not goodfile(atranfile):
try:
del __atran_cache[key]
except KeyError: # pragma: no cover
# could happen in race conditions, working in parallel
pass
return
modtime = str(os.path.getmtime(atranfile))
if modtime not in __atran_cache.get(key, {}):
return
log.debug(f'Retrieving ATRAN data from cache '
f'({key[0]}, resolution {key[1]})')
return __atran_cache.get(key, {}).get(modtime)
[docs]
def store_atran_in_cache(atranfile, resolution, filename, wave,
unsmoothed, smoothed):
"""
Store atran data in the atran cache.
Parameters
----------
atranfile : str
File path to the atran file
resolution : float
Spectral resolution used for smoothing.
filename : str
Used to update ATRNFILE in FITS headers.
wave : numpy.ndarray
(nwave,) array of wavelengths.
unsmoothed : numpy.ndarray
(nwave,) array containing the atran data from file
smoothed : numpy.ndarray
(nwave,) array containing the smoothed atran data
"""
global __atran_cache
key = atranfile, int(resolution)
log.debug(f'Storing ATRAN data in cache '
f'({key[0]}, resolution {key[1]})')
__atran_cache[key] = {}
modtime = str(os.path.getmtime(atranfile))
__atran_cache[key][modtime] = (
filename, wave, unsmoothed, smoothed)
[docs]
def get_atran(header, resolution=None, filename=None,
get_unsmoothed=False, use_wv=False,
atran_dir=None):
"""
Retrieve reference atmospheric transmission data.
ATRAN files in the data/atran_files directory should be named
according to the altitude, ZA, and wavelengths for which they
were generated, as:
atran_[alt]K_[za]deg_[wmin]-[wmax]mum.fits
For example, the file generated for altitude of 41,000 feet,
ZA of 45 degrees, and wavelengths between 40 and 300 microns
should be named:
atran_41K_45deg_40-300mum.fits
If use_wv is set, files named for the precipitable water vapor
values for which they were generated will be used instead, as:
atran_[alt]K_[za]deg_[wv]pwv_[wmin]-[wmax]mum.fits
The procedure is:
1. Identify ATRAN file by ZA, Altitude, and WV , unless override is
provided.
2. Read ATRAN data from file and smooth to expected spectral
resolution.
3. Return transmission array.
Parameters
----------
header : fits.Header
ATRNFILE keyword is written to the provided FITS header,
containing the name of the ATRAN file used.
resolution : float, optional
Spectral resolution to which ATRAN data should be smoothed.
filename : str, optional
Atmospheric transmission file to be used. If not provided,
a default file will be retrieved from the data/atran_files
directory. The file with the closest matching ZA and
Altitude to the input data will be used. If override file
is provided, it should be a FITS image file containing
wavelength and transmission data without smoothing.
get_unsmoothed : bool, optional
If True, return the unsmoothed atran data in addition to the
smoothed.
atran_dir : str, optional
Path to a directory containing ATRAN reference FITS files.
If not provided, the default set of files packaged with the
pipeline will be used.
use_wv : bool, optional
If set, water vapor values from the header will be used
to select the correct ATRAN file.
Returns
-------
numpy.ndarray
Filename of the atmospheric transmission file used and a
(2, nw) array containing wavelengths and (optionally
smoothed) transmission data.
"""
if filename is not None:
if not goodfile(filename, verbose=True):
log.warning(f'File {filename} not found; '
f'retrieving default')
filename = None
if not isinstance(header, fits.Header):
log.error('Invalid header')
return
if resolution is None:
log.warning('Getting default resolution from G_WAVE in header')
resolution = get_resolution(header)
if filename is None:
za_start = float(header.get('ZA_START', 0))
za_end = float(header.get('ZA_END', 0))
if za_start > 0 >= za_end:
za = za_start
elif za_end > 0 >= za_start:
za = za_end
else:
za = 0.5 * (za_start + za_end)
alt_start = float(header.get('ALTI_STA', 0))
alt_end = float(header.get('ALTI_END', 0))
if alt_start > 0 >= alt_end:
alt = alt_start
elif alt_end > 0 >= alt_start:
alt = alt_end
else:
alt = 0.5 * (alt_start + alt_end)
alt /= 1000
wv_obs = float(header.get('WVZ_OBS', 0))
if wv_obs > 0:
wv = wv_obs
else:
wv_start = float(header.get('WVZ_STA', 0))
wv_end = float(header.get('WVZ_END', 0))
if wv_start > 0 >= wv_end:
wv = wv_start
elif wv_end > 0 >= wv_start:
wv = wv_end
else:
wv = 0.5 * (wv_start + wv_end)
if use_wv and wv < 2:
log.warning(f'Bad WV value: {wv}')
log.warning('Using default ATRAN file.')
use_wv = False
log.debug(f'Alt, ZA, WV: {alt:.2f} {za:.2f} {wv:.2f}')
true_value = [alt, za, wv]
if atran_dir is not None:
if not os.path.isdir(str(atran_dir)):
log.warning(f'Cannot find ATRAN directory: {atran_dir}')
log.warning('Using default ATRAN set.')
atran_dir = None
if atran_dir is None:
atran_dir = os.path.join(os.path.dirname(fifi_ls.__file__),
'data', 'atran_files')
atran_files = glob.glob(os.path.join(atran_dir, 'atran*fits'))
regex1 = re.compile(r'^atran_([0-9]+)K_([0-9]+)deg_40-300mum\.fits$')
regex2 = re.compile(r'^atran_([0-9]+)K_([0-9]+)deg_'
r'([0-9]+)pwv_40-300mum\.fits$')
# set up some values for tracking best atran match
wv_overall_val = np.inf
wv_best_file = None
overall_val = np.inf
best_file = None
for f in atran_files:
# check for WV match
match = regex2.match(os.path.basename(f))
if use_wv and wv > 0 and match is not None:
match_val = 0
for i in range(3):
# file alt, za, or wv
file_val = float(match.group(i + 1))
# check difference from true value
d_val = abs(file_val - true_value[i]) / true_value[i]
match_val += d_val
if match_val < wv_overall_val:
wv_overall_val = match_val
wv_best_file = f
else:
# otherwise, check for non-WV match
match = regex1.match(os.path.basename(f))
if match is not None:
match_val = 0
for i in range(2):
# file alt or za
file_val = float(match.group(i + 1))
# check difference from true value
d_val = abs(file_val - true_value[i]) / true_value[i]
match_val += d_val
if match_val < overall_val:
overall_val = match_val
best_file = f
if use_wv and wv_best_file is not None:
log.debug('Using nearest Alt/ZA/WV')
filename = wv_best_file
else:
log.debug('Using nearest Alt/ZA')
filename = best_file
if filename is None:
log.error('No ATRAN file found')
return
# Read the atran data from cache if possible
log.debug(f'Using ATRAN file: {filename}')
atrandata = get_atran_from_cache(filename, resolution)
if atrandata is not None:
atranfile, wave, unsmoothed, smoothed = atrandata
else:
hdul = gethdul(filename, verbose=True)
if hdul is None or hdul[0].data is None:
log.error(f'Invalid data in ATRAN file {filename}')
return
data = hdul[0].data
atranfile = os.path.basename(filename)
wave = data[0]
unsmoothed = data[1]
smoothed = smoothres(data[0], data[1], resolution)
store_atran_in_cache(filename, resolution, atranfile,
data[0], data[1], smoothed)
hdinsert(header, 'ATRNFILE', atranfile)
if not get_unsmoothed:
return np.vstack((wave, smoothed))
else:
return (np.vstack((wave, smoothed)),
np.vstack((wave, unsmoothed)))
[docs]
def get_atran_interpolated(header, resolution=None,
get_unsmoothed=False, use_wv=False,
atran_dir=None):
if not isinstance(header, fits.Header):
log.error('Invalid header')
return
if resolution is None:
log.warning('Getting default resolution from G_WAVE in header')
resolution = get_resolution(header)
# get ZA
za_start = float(header.get('ZA_START', 0))
za_end = float(header.get('ZA_END', 0))
if za_start > 0 >= za_end:
za = za_start
elif za_end > 0 >= za_start:
za = za_end
else:
za = 0.5 * (za_start + za_end)
# get altitude in thousands feet
alt_start = float(header.get('ALTI_STA', 0))
alt_end = float(header.get('ALTI_END', 0))
if alt_start > 0 >= alt_end:
alt = alt_start
elif alt_end > 0 >= alt_start:
alt = alt_end
else:
alt = 0.5 * (alt_start + alt_end)
alt /= 1000
# get water vapor
wv_obs = float(header.get('WVZ_OBS', 0))
if wv_obs > 0:
wv = wv_obs
else:
wv_start = float(header.get('WVZ_STA', 0))
wv_end = float(header.get('WVZ_END', 0))
if wv_start > 0 >= wv_end:
wv = wv_start
elif wv_end > 0 >= wv_start:
wv = wv_end
else:
wv = 0.5 * (wv_start + wv_end)
if atran_dir is not None:
if not os.path.isdir(str(atran_dir)):
log.warning(f'Cannot find ATRAN directory: {atran_dir}')
log.warning('Using default ATRAN set.')
atran_dir = None
if atran_dir is None:
atran_dir = os.path.join(os.path.dirname(fifi_ls.__file__),
'data', 'atran_files')
za_values = list(range(30,75,5))
wv_values = [1, 2, 3, 4, 5, 6, 7,
8, 9, 10, 11, 12, 13,
14, 15, 16, 17, 18, 20,
22, 25, 27, 30, 32, 35,
37, 40, 45, 50]
# clip values to atran data range
if not za_values[-1] >= za >= za_values[0]:
log.warning('za={} outside of available ATRAN data.'.format(za))
za = np.clip(za, a_min=za_values[0], a_max=za_values[-1])
log.warning('Setting zenith angle to {} deg'.format(za))
za_high, za_low = za_values[-1], za_values[-2]
else:
za_high, za_low = np.inf, np.inf
if use_wv and not wv_values[-1] >= wv >= wv_values[0]:
log.warning('wv={} outside of available ATRAN data.'.format(wv))
wv = np.clip(wv, a_min=wv_values[0], a_max=wv_values[-1])
log.warning('Setting water vapor to {} um'.format(wv))
wv_high, wv_low = wv_values[-1], wv_values[-2]
else:
wv_high, wv_low = np.inf, np.inf
if not 45 >= alt >= 38:
log.warning('alt={} outside of available ATRAN data.'.format(alt))
alt = np.clip(alt, a_min=38, a_max=45)
log.warning('Setting altitude to {}K ft'.format(round(alt)))
log.debug(f'Alt, ZA, WV: {alt:.2f} {za:.2f} {wv:.2f}')
# find the higher and lower boundaries
for i, w in enumerate(wv_values):
if w > wv:
wv_high = w
wv_low = wv_values[i-1]
break
for i, z in enumerate(za_values):
if z > za:
za_high = z
za_low = za_values[i-1]
break
# build filenames
current_atran_filenames = {}
if use_wv:
current_atran_filenames["za1_wv1"] = \
("atran_{}K_{}deg_{}pwv_40-300mum.fits".format(
round(alt), za_low, wv_low), za_low, wv_low)
current_atran_filenames["za1_wv2"] = \
("atran_{}K_{}deg_{}pwv_40-300mum.fits".format(
round(alt), za_low, wv_high), za_low, wv_high)
current_atran_filenames["za2_wv1"] = \
("atran_{}K_{}deg_{}pwv_40-300mum.fits".format(
round(alt), za_high, wv_low), za_high, wv_low)
current_atran_filenames["za2_wv2"] = \
("atran_{}K_{}deg_{}pwv_40-300mum.fits".format(
round(alt), za_high, wv_high), za_high, wv_high)
else:
current_atran_filenames["za1"] = \
("atran_{}K_{}deg_40-300mum.fits".format(
round(alt), za_low,), za_low, None)
current_atran_filenames["za2"] = \
("atran_{}K_{}deg_40-300mum.fits".format(
round(alt), za_high), za_high, None)
# load all files
atran_data = {}
atrnfile_fits_keyword = ''
for key, value in current_atran_filenames.items():
filename = value[0]
_za = value[1] # the ZA value of this file
_wv = value[2] # the WV value of this file
atrandata = get_atran_from_cache(filename, resolution)
if atrandata is not None:
atran_data[key] = atrandata, _za, _wv
else:
hdul = gethdul(os.path.join(atran_dir, filename), verbose=True)
if hdul is None or hdul[0].data is None:
log.error(f'Invalid data in ATRAN file {filename}')
return
data = hdul[0].data
atranfile = os.path.basename(filename)
wave = data[0]
unsmoothed = data[1]
smoothed = smoothres(data[0], data[1], resolution)
atran_data[key] = (atranfile, wave, unsmoothed, smoothed), _za, _wv
atrnfile_fits_keyword += filename + ', '
store_atran_in_cache(os.path.join(atran_dir, filename), resolution,
atranfile, data[0], data[1], smoothed)
if use_wv:
# interpolate za for two pwv
za1_za2_wv1 = interpolate_two_atran_files(atran_data["za1_wv1"],
atran_data["za2_wv1"],
za, 0)
za1_za2_wv2 = interpolate_two_atran_files(atran_data["za1_wv2"],
atran_data["za2_wv2"],
za, 0)
# interpolate WV and hold ZA
interpolated = interpolate_two_atran_files(za1_za2_wv1,
za1_za2_wv2,
wv, 1)
else:
interpolated = interpolate_two_atran_files(atran_data["za1"],
atran_data["za2"],
za, 0)
wave = interpolated[0][1]
unsmoothed = interpolated[0][2]
smoothed = smoothres(wave, unsmoothed, resolution)
hdinsert(header, 'ATRNFILE', atrnfile_fits_keyword[:-2])
if not get_unsmoothed:
return np.vstack((wave, smoothed))
else:
return (np.vstack((wave, smoothed)),
np.vstack((wave, unsmoothed)))
def interpolate_two_atran_files(atran_data1, atran_data2, des_val, itype=0):
"""
Interpolate data from two atran files.
Parameters
----------
atran_data1, atran_data2 : tuple
((filename, wave, unsmoothed, smoothed), za, wv)
des_val : float
The value to be interpolated for. Can be zenith angle (ZA) or water
vapor (WV)
itype: int
0 for ZA
1 for WV
Returns
-------
interpolated_atran_data: tuple
same nested tuple format as input parameters
(("", wave, unsmoothed, None), za, wv)
"""
if not np.allclose(atran_data1[0][1], atran_data2[0][1]):
raise ValueError("Incompatible atran wavelengths to interpolate")
dy = atran_data2[0][2] - atran_data1[0][2]
if itype == 0:
# interpolating zenith angle
dx = atran_data2[1] - atran_data1[1]
t = dy/dx
data_int = atran_data1[0][2] + t * (des_val - atran_data1[1])
za = des_val
wv = atran_data1[2]
elif itype == 1:
# interpolating water vapor
dx = atran_data2[2] - atran_data1[2]
t = dy/dx
data_int = atran_data1[0][2] + t * (des_val - atran_data1[2])
za = atran_data1[1]
wv = des_val
else:
raise TypeError("itype has to be either 0 (ZA) or 1 (WV)")
interpolated_atran_data0 = ("", atran_data1[0][1], data_int, None)
return (interpolated_atran_data0, za, wv)