# Licensed under a 3-clause BSD style license - see LICENSE.rst
from astropy import log
import numpy as np
from sofia_redux.toolkit.image.adjust import shift
from sofia_redux.toolkit.fitting.polynomial import Polyfit
from sofia_redux.toolkit.interpolate import interpolate_nans
__all__ = ['get_wave_shift', 'fluxcal']
[docs]
def get_wave_shift(flux, correction, shift_limit, model_order,
subsample=10):
"""
Get pixel shift between flux and correction curve.
Flux is directly shifted then corrected to determine shift value with
lowest RMS, compared to a low order polynomial model.
The correction curve is typically atmospheric transmission * response.
Parameters
----------
flux : numpy.ndarray of float
Spectral flux (nw,).
correction : numpy.ndarray of float
Correction curve (nw,)
shift_limit : float
Maximum shift to consider.
model_order : int
Polynomial order of continuum model, for calculating optimum
residuals.
subsample : int, optional
Subsampling for sub-pixel shifts. Set to 1 to get whole pixel
shifts only.
Returns
-------
float
The wavelength shift in pixels.
"""
# subsample data to get sub-pixel lag info
xin = np.arange(flux.size, dtype=float)
if subsample > 1:
xout = np.arange(flux.size * subsample, dtype=float) / subsample
sflux = interpolate_nans(xin, flux, xout)
scorrect = interpolate_nans(xin, correction, xout)
else:
xout = xin
subsample = 1
sflux = flux.copy()
scorrect = correction.copy()
good = ~np.isnan(sflux) & ~np.isnan(scorrect)
if not np.any(good):
return np.nan
# lag array: distance from center wavelength
ns = sflux.size
lag = np.arange(ns, dtype=float) - ns / 2
lag /= subsample
fit_rchisq = []
fit_shift = []
fit_corr = []
for sval in lag[np.abs(lag) < shift_limit]:
test_flux = np.interp(xout, xout + sval, sflux,
left=np.nan, right=np.nan)
corr_data = test_flux / scorrect
poly = Polyfit(xout[good], corr_data[good], model_order)
fit_rchisq.append(poly.stats.rchi2)
fit_shift.append(sval)
fit_corr.append(corr_data)
# find the best shift by minimizing residuals
min_idx = np.argmin(fit_rchisq)
shiftval = fit_shift[min_idx]
"""
# plotting code for debugging
from matplotlib import pyplot as plt
plt.plot((xin - flux.size / 2.0),
flux / np.mean(flux), label='input flux')
plt.plot(lag, sflux / np.mean(sflux[good]), label='subsampled flux')
plt.plot(lag, scorrect / np.mean(scorrect[good]),
label='correction curve')
plt.plot(lag + shiftval, sflux / np.mean(sflux[good]),
label='shifted flux')
plt.plot(lag, fit_corr[min_idx] / np.nanmean(fit_corr[min_idx]),
label='corrected flux')
plt.legend()
plt.show()
plt.plot(fit_shift, fit_rchisq, label='reduced chi-squared')
plt.axvline(shiftval)
plt.title(f'Shift value: {shiftval}')
plt.legend()
plt.show()
#"""
return shiftval
[docs]
def fluxcal(spectra, atran, response=None,
auto_shift=False, shift_limit=5.0,
shift_subsample=10, model_order=1):
"""
Calibrate and telluric correct spectral flux.
The provided atmospheric transmission and response values are
interpolated onto the wavelength range for each order. The correction
curve is computed as transmission * response.
Optionally, an optimum wavelength shift for the 1D spectra, relative
to the correction curve, may be computed by minimizing the residuals
in the corrected spectrum. If the calculated shift is greater than
0.1 pixel and less than `shift_limit`, the spectrum is shifted by
this amount, via a linear interpolation, along the wavelength dimension
before correction.
The 1D spectral_flux and spectral error provided are divided by the
correction curve. For the 2D spectral flux and error, each
row is divided by the correction curve.
If multiple ATRAN data sets are provided, then the optimum correction
will be determined by fitting a low order polynomial to the corrected
1D spectrum. The correction curve that produces the lowest chi-squared
residuals on the fit is selected. The index of the ATRAN data set
chosen is returned in the output dictionary, with key 'atran_index'.
The calculated wavelength shift and the interpolated transmission,
response, and response error are stored in the output dictionary,
with keys 'wave_shift', 'transmission', 'response', and
'response_error', respectively.
Parameters
----------
spectra : dict
Spectra to calibrate.
Structure is:
order (int) -> list of dict
flux -> numpy.ndarray (ns, nw)
Rectified 2D spectral flux image.
error -> numpy.ndarray (ns, nw)
Rectified 2D spectral error image.
wave -> numpy.ndarray (nw,)
Wavelength coordinates.
spectral_flux -> numpy.ndarray (nw,)
1D spectral flux.
spectral_error -> numpy.ndarray (nw,)
1D spectral error.
wave_shift -> float
Manual shift to apply in the wavelength dimension
(pixels). If present and not None, will override
the `auto_shift` parameter.
atran : numpy.ndarray or list of numpy.ndarray
A (2, nt) array; first element is the wavelength coordinate, second
is the fractional transmission. If a list of arrays is provided,
the optimum one will be selected.
response : dict, optional
The instrumental response for the order. If not provided,
only the ATRAN correction will be applied.
Structure is:
order (int) -> dict
wave -> numpy.ndarray (nr,)
Wavelength coordinates. Need not match the spectral
flux coordinates.
response -> numpy.ndarray (nr,)
Instrument response, in raw units/Jy.
error -> numpy.ndarray (nr,)
Error on the response.
auto_shift : bool, optional
If set, the spectrum cross-correlated with the
response * transmission, and the calculated wavelength
shift will be applied to the flux and error images and spectra.
shift_limit : float, optional
Maximum wavelength shift to be applied by the `auto_shift`,
in pixels.
shift_subsample : int, optional
Subsampling for wavelength shifts. Set to 1 to get whole pixel
shifts only.
model_order : int, optional
Polynomial order for continuum model, used in optimizing wavelength
shifts and ATRAN optimization.
Returns
-------
spectra : dict
Calibrated spectra.
Structure is:
order (int) -> list of dict
flux -> numpy.ndarray (ns, nw)
Calibrated rectified 2D spectral flux image.
error -> numpy.ndarray (ns, nw)
Calibrated rectified 2D spectral error image.
wave -> numpy.ndarray (nw,)
Wavelength coordinates.
spectral_flux -> numpy.ndarray (nw,)
Calibrated 1D spectral flux.
spectral_error -> numpy.ndarray (nw,)
Calibrated 1D spectral error.
wave_shift -> float
Wavelength shift applied.
atran_index -> int
Index of the ATRAN data set selected.
transmission -> numpy.ndarray (nw,)
Transmission correction applied to flux.
response -> numpy.ndarray (nw,)
Response correction applied to flux.
response_error -> numpy.ndarray (nw,)
Error on the response.
"""
# use all orders defined in spectra
orders = np.unique(list(spectra.keys())).astype(int)
# if more than one atran provided, choose the best one
if not isinstance(atran, list):
if isinstance(atran, np.ndarray) and atran.ndim < 3:
atran = [atran]
if len(atran) > 1:
optimize = True
else:
optimize = False
# lower limit for wavelength shifts, in pixels --
# below this, it's not worth the interpolation
eps = 0.1
# Loop through each order
result = {}
for orderi, order in enumerate(orders):
result[order] = []
for speci, spectrum in enumerate(spectra[order]):
out_spectrum = {}
# bail if there's no good data
if np.all(np.isnan(spectrum['spectral_flux'])):
log.error(f'No good flux in order {order}, spectrum {speci}')
return
wave = spectrum['wave']
dw = np.mean(wave[1:] - wave[0:-1])
out_spectrum['wave'] = wave.copy()
# check for manual wave shift
if 'wave_shift' in spectrum and spectrum['wave_shift'] is not None:
check_auto = False
wave_shift = spectrum['wave_shift']
else:
check_auto = auto_shift
wave_shift = 0.0
# interpolate response onto the wavelength range
if response is not None:
rwave = response[order]['wave']
rdata = response[order]['response']
rerr = response[order]['error']
rmatch = np.interp(wave, rwave, rdata,
left=np.nan, right=np.nan)
ematch = np.interp(wave, rwave, rerr,
left=np.nan, right=np.nan)
else:
rmatch = np.ones_like(wave)
ematch = np.zeros_like(wave)
# for all atran files, interpolate onto the
# spectral wavelength range
all_tran = []
all_corr = []
all_shift = []
fit_chisq = []
fit_rchisq = []
for i, (awave, adata) in enumerate(atran):
# interpolate atran onto input wavelengths
amatch = np.interp(wave, awave, adata,
left=np.nan, right=np.nan)
# divide spectrum by transmission and response
flux = spectrum['spectral_flux'].copy()
correction = amatch * rmatch
corr_data = flux / correction
good = ~np.isnan(corr_data)
all_tran.append(amatch)
# auto shift if desired
shiftval = wave_shift
if check_auto:
shiftval = get_wave_shift(
flux, correction, shift_limit, model_order,
subsample=shift_subsample)
if np.isnan(shiftval):
log.warning('Could not calculate wave shift; '
'setting to 0.')
shiftval = 0.0
log.debug('Calculated wave shift: {}'.format(shiftval))
all_shift.append(shiftval)
if np.abs(shiftval) > eps:
if not check_auto or np.abs(shiftval) < shift_limit:
# interpolate spectrum from shifted wavelength onto
# standard ATRAN wavelength
flux = np.interp(wave - shiftval * dw, wave, flux,
left=np.nan, right=np.nan)
corr_data = flux / correction
good = ~np.isnan(corr_data)
all_corr.append(corr_data)
# if optimizing, fit a low order polynomial to the corrected
# spectrum, and calculate residuals and chi-squared error
if optimize:
robust = 6.0
poly = Polyfit(wave[good], corr_data[good], model_order,
robust=robust)
fit_chisq.append(poly.stats.chi2)
fit_rchisq.append(poly.stats.rchi2)
"""
# plotting code for debugging
print(f'ATRAN spectrum {i} rchi2: {poly.stats.rchi2}')
from matplotlib import pyplot as plt
plt.plot(wave[good], corr_data[good],
label='corrected flux')
plt.plot(wave, poly(wave), label='poly fit')
plt.legend()
plt.show()
#"""
# find the best atran correction by minimizing residuals
if optimize and len(fit_chisq) > 0:
atran_index = np.argmin(fit_rchisq)
out_spectrum['fit_chisq'] = fit_chisq
out_spectrum['fit_rchisq'] = fit_rchisq
out_spectrum['all_corrected'] = all_corr
else:
# or else just take the first one
atran_index = 0
best_tran = all_tran[atran_index]
best_shift = all_shift[atran_index]
# shift images and spectra if needed
dnames = ['spectral_flux', 'spectral_error',
'flux', 'error']
if check_auto and np.abs(best_shift) > shift_limit:
log.warning('Calculated shift of {:.2f} pixels is too large. '
'Not applying auto '
'shift.'.format(best_shift))
best_shift = 0.
elif 0 < np.abs(best_shift) < eps:
log.debug('Wave shift of {:.2f} pixels is very small. '
'Setting to zero.'.format(best_shift))
best_shift = 0.
elif np.abs(best_shift) > eps:
for name in dnames:
data = spectrum[name]
if data.ndim > 1:
offsets = [0., best_shift]
else:
offsets = [best_shift]
out_spectrum[name] = shift(data, offsets)
# copy over data if not already there
for name in dnames:
if name not in out_spectrum:
out_spectrum[name] = spectrum[name].copy()
# divide spectral flux and image by transmission
# (no error to propagate)
out_spectrum['spectral_flux'] /= best_tran
out_spectrum['spectral_error'] /= best_tran
# for the image: divide each row by the correction factor
out_spectrum['flux'] /= best_tran[None, :]
out_spectrum['error'] /= best_tran[None, :]
# divide by response as well, propagating statistical errors
out_spectrum['spectral_flux'] /= rmatch
out_spectrum['flux'] /= rmatch[None, :]
sflx = out_spectrum['spectral_flux']
serr = out_spectrum['spectral_error']
out_spectrum['spectral_error'] = \
np.sqrt(serr**2 + ematch**2 * sflx**2) / rmatch
sflx = out_spectrum['flux']
serr = out_spectrum['error']
out_spectrum['error'] = \
np.sqrt(serr**2
+ ematch[None, :]**2 * sflx**2) / rmatch[None, :]
# append the transmission and response for reference
out_spectrum['wave_shift'] = best_shift
out_spectrum['atran_index'] = atran_index
out_spectrum['transmission'] = best_tran
out_spectrum['response'] = rmatch
out_spectrum['response_error'] = ematch
result[order].append(out_spectrum)
return result