# Licensed under a 3-clause BSD style license - see LICENSE.rst
from datetime import datetime
import os
import warnings
from astropy import log
from astropy.io import fits
from astropy.stats import sigma_clipped_stats
from astropy.table import Table
from astropy.time import Time
import bottleneck as bn
import numba as nb
import numpy as np
from scipy.stats import linregress
from sofia_redux.instruments.fifi_ls.get_badpix \
import get_badpix, clear_badpix_cache
from sofia_redux.instruments.fifi_ls.get_resolution import get_resolution
from sofia_redux.instruments.fifi_ls.pointing_discard \
import get_timing, load_pointing_data, get_pointing_mask
from sofia_redux.toolkit.stats import meancomb
from sofia_redux.toolkit.utilities \
import (hdinsert, gethdul, write_hdul, multitask)
__all__ = ['get_readout_range', 'resize_data', 'fit_data',
'process_extension', 'fit_ramps', 'wrap_fit_ramps']
DEBUG = False
[docs]
def get_readout_range(header):
"""
Returns the readout range as extracted from header.
If the observations occurred between January 2014 and March 2015,
we ignore the first 3 readouts, otherwise ignore the first 2.
The last readout is also ignored.
Parameters
----------
header : fits.Header
Returns
-------
2-tuple:
int : first readout
int : ramplength
"""
channel = header.get('CHANNEL', 'UNKNOWN').strip().upper()
ramplength = header.get(
'RAMPLN_%s' % ('R' if channel == 'RED' else 'B'))
dateobs = header.get('DATE-OBS',
Time(datetime.utcnow(), format='datetime').isot)
dateobs = Time(dateobs, format='isot')
drange = [Time('2014-01-01T00:00:00', format='isot'),
Time('2015-03-01T00:00:00', format='isot')]
read_start = 3 if (drange[0] < dateobs < drange[1]) else 2
return read_start, ramplength
[docs]
def resize_data(data, readout_range, indpos, remove_first_ramps=True,
subtract_bias=True, indpos_sigma=3.0):
"""
Trim and reshape data to separate ramps.
Parameters
----------
data : numpy.ndarray
(1 [optional], nramps, nwave (18), nspaxel (26))
readout_range : array_like of int
(start readout, ramp length)
indpos : int
Expected inductosyn position for the grating
remove_first_ramps : bool, optional
Remove the first two ramps.
subtract_bias : bool, optional
Subtract the empty pixel value.
indpos_sigma : float, optional
If >0, will be used to flag ramps with mean grating
position that deviates from the indpos value by this
many sigma, or has a standard deviation greater
than this value. Flags are returned in the bad_ramps
array.
Returns
-------
data, bad_ramps : numpy.ndarray, numpy.ndarray
Resized data is a floating point array with dimensions
(ramps/spaxel, ramplength, nwave (16), nspaxel (25)).
The secondary array is a Boolean array with dimensions
(ramps/spaxel), containing bad ramp flags (True = bad)
for later propagation in ramp fits.
"""
if not isinstance(data, np.ndarray):
log.error("Invalid data")
return
elif (data.ndim != 3
or data.shape[-1] < 25
or data.shape[-2] != 18):
log.error("Invalid data shape")
return
elif not hasattr(readout_range, '__len__') or len(readout_range) != 2:
log.error("Invalid readout_range")
return
grating_values = None
if indpos_sigma > 0:
# use the values in the last spaxel to check for
# bad grating position
mask = 0b0000000000011111
grating_values = (
data[:, 2::4, 25].astype(np.uint16)
+ 2**16 * (data[:, 3::4, 25] & mask).astype(np.int32)
)
# average the 4 samples
grating_values = np.mean(grating_values, axis=1)
# check for appropriate values
gmean, gmed, gstd = sigma_clipped_stats(grating_values)
log.debug(f'Expected INDPOS: {indpos}; reported: {gmean} +/- {gstd}')
if np.allclose(gstd, 0) or np.allclose(gmean, 0) or \
np.abs(gmean - indpos) > 5 * gstd:
# mean is well off from expected value -- this likely
# means the data is older, and does not contain grating
# value measurements
log.debug('Grating measurements do not match expected '
'value. Turning off grating instability data '
'rejection.')
grating_values = None
if subtract_bias:
# subtract the first spexel (empty pixel) value,
# accounting for invalid int16 values
fdata = np.float32(data)
empty_pix = fdata[:, 0, :] + 2**15
all_pix = fdata + 5000.0
all_pix -= np.tile(np.expand_dims(empty_pix, 1), (1, 18, 1))
all_pix[all_pix < -32768] = -32768
all_pix[all_pix > 32767] = 32767
data = np.int16(all_pix)
# remove the last spaxel and the first and last spexel
data = data[:, 1:17, :25]
readout_start, ramplength = readout_range
nreadouts = data.size
nramps = nreadouts // ramplength
ramps_per_spaxel = nramps // (25 * 16)
if nreadouts != (25 * 16 * ramplength * ramps_per_spaxel):
log.error(f"Number of readouts in data shape {data.shape} "
"does not match header")
return
# Reshape the data into separate ramp dimensions
# (nramps, 16, 25) -> (ramps/spaxel, ramplength, 16, 25)
# i.e. (n_ramp, n_readout, n_wave, n_spaxel)
newshape = ramps_per_spaxel, ramplength, 16, 25
data = data.reshape(newshape)
log.debug(f'# ramps, # readouts/ramp: {ramps_per_spaxel}, {ramplength}')
if grating_values is not None:
grating_values = grating_values.reshape(ramps_per_spaxel, ramplength)
# remove the first ramps from each spaxel if required
if remove_first_ramps and ramps_per_spaxel > 2:
log.debug('Removing first 2 ramps')
data = data[2:]
if grating_values is not None:
grating_values = grating_values[2:]
# remove the first 2 or 3, and last readout if possible
if ramplength > (readout_start + 1):
data = data[:, readout_start:-1, :, :]
# convert data to float
data = np.asarray(data, dtype=float)
# make a bad sample mask from the grating values
bad_ramps = np.full(data.shape[0], False)
if grating_values is not None:
# average value over ramp
ramp_value = np.mean(grating_values, axis=1)
ramp_std = np.std(grating_values, axis=1)
# use full clipped standard deviation for flagging
gmean, gmed, gstd = sigma_clipped_stats(grating_values)
bad_mean = np.abs(ramp_value - indpos) > indpos_sigma * gstd
bad_std = np.abs(ramp_std / gstd) > indpos_sigma
bad_ramps = bad_mean | bad_std
log.debug(f'Found {np.sum(bad_ramps)} bad ramps '
f'(out of {np.size(bad_ramps)}).')
bad_ramp_idx = np.unique(np.where(bad_ramps)[0])
if len(bad_ramp_idx) > 0:
log.debug(f'Bad ramp index: {bad_ramp_idx}')
if DEBUG: # pragma: no cover
log.info(f'Bad ramp index: {bad_ramp_idx}')
from matplotlib import pyplot as plt
plt.plot(grating_values)
plt.plot(ramp_value, color='black', linestyle="-",
label='ramp-averaged value')
plt.axhline(indpos, color='lightgray', linestyle="--",
alpha=0.5, label='expected value')
plt.axhline(indpos - gstd, color='darkgray', linestyle="-.",
alpha=0.5)
plt.axhline(indpos + gstd, color='darkgray', linestyle="-.",
alpha=0.5, label='standard deviation')
plt.axhline(indpos - indpos_sigma * gstd, color='gray',
linestyle=":", alpha=0.5)
plt.axhline(indpos + indpos_sigma * gstd, color='gray',
linestyle=":", alpha=0.5,
label=f'{indpos_sigma} sigma limit')
for idx in bad_ramp_idx:
plt.axvline(idx, alpha=0.2, color='black')
plt.title(f'Rejected ramps: {bad_ramp_idx}')
plt.xlabel('Ramp number')
plt.ylabel('Inductosyn value')
plt.legend()
plt.tight_layout()
plt.show()
return data, bad_ramps
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def calculate_fit(data, maxidx, rmplngth): # pragma: no cover
ramps_per_spaxel, nramp, nwave, nspaxel = data.shape
mat = np.empty((nramp, 2), dtype=nb.int64)
ata = np.empty((nramp, 2, 2), dtype=nb.float64)
vfac = np.empty(nramp, dtype=nb.float64)
sum1 = 0
sum2 = 0
for i in range(nramp):
sum1 += i
sum2 += i * i
mat[i, 0] = 1
mat[i, 1] = i
ata[i, 0, 0] = i + 1
ata[i, 0, 1] = sum1
ata[i, 1, 0] = sum1
ata[i, 1, 1] = sum2
if i > 0:
vfac[i] = 1.0 / np.sum((mat[:i + 1, 1] - (i / 2)) ** 2)
else:
vfac[i] = 0.0
slopes = np.empty((ramps_per_spaxel, nwave, nspaxel), dtype=nb.float64)
var = np.empty((ramps_per_spaxel, nwave, nspaxel), dtype=nb.float64)
for i in nb.prange(ramps_per_spaxel):
for j in range(nwave):
for k in range(nspaxel):
idx = maxidx[i, j, k]
if idx == 0 or idx == (nramp - 1):
idx = nramp
# rmlength is 27 for "full ramps only" option, otherwise 2
elif idx <= rmplngth:
slopes[i, j, k] = np.nan
var[i, j, k] = np.nan
continue
line = data[i, :idx, j, k]
a = ata[idx - 1]
b = np.empty(2, dtype=nb.float64)
b[0] = 0.0
b[1] = 0.0
for ln in range(idx):
b[0] += line[ln]
b[1] += line[ln] * ln
c = np.linalg.solve(a, b)
slopes[i, j, k] = c[1]
sdevd = 0.0
for ln in range(idx):
diff = c[0] + (c[1] * ln) - line[ln]
sdevd += diff * diff
var[i, j, k] = vfac[idx - 1] * sdevd / (idx - 2)
return slopes, var
[docs]
def fit_data(data, rmplngth=2, s2n=10, threshold=5, allow_zero_variance=True,
average_ramps=True, bad_ramps=None, pointing_mask=None):
"""
Applies linear fit (y = ax + b) over the second dimension of a 4D array.
Highly optimized for this particular problem. The flux is
determined by the slope of the second dimension (ramps).
After collapsing the ramp dimension, multiple ramps are averaged for
each spaxel and spexel.
Parameters
----------
data : numpy.ndarray
(ramps/spaxel, ramplength, nwave, nspaxel)
rmplngth : int, optional
Minimum ramp length
s2n : float, optional
Signal-to-noise below which data will be considered questionable
and will be ignored. Set <= 0 to turn off signal-to-noise filtering.
threshold : float, optional
Robust rejection threshold (in sigma) for combining slopes of
individual ramps.
allow_zero_variance : bool, optional
If True, does not set data points with zero variance to NaN. This
option is here to replicate the behaviour of the previous IDL
version.
average_ramps : bool, optional
If True, all ramps in the extension are averaged together. This
is desirable for all modes except OTF scans.
bad_ramps : numpy.ndarray, optional
If provided, should be an array of bool, matching the number
of ramps/spaxel (data.shape[0]) where True indicates a bad ramp
(e.g. due to grating position instability).
pointing_mask : pandas.Series, optional
Boolean mask used to discard ramps with bad pointing.
Returns
-------
numpy.ndarray, numpy.ndarray
flux and standard deviation arrays of size (spexel, spaxel)
or (16, 25) for FIFI-LS chopped observations. For OTF, the size
is (ramps/spaxel, spexel, spaxel)
"""
maxidx = bn.nanargmax(data, axis=1)
if bad_ramps is not None:
maxidx[bad_ramps] = -1
slopes, var = calculate_fit(data, maxidx, rmplngth)
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
if isinstance(s2n, (float, int)) and s2n > 0:
setnan = np.full(slopes.shape, False)
nzi = var > 0
setnan[nzi] = (slopes[nzi] / np.sqrt(var[nzi])) < s2n
if allow_zero_variance:
setnan[var == 0] = False
slopes[setnan] = np.nan
# set ramps with bad pointing to NaN
if pointing_mask is not None:
slopes[~pointing_mask] = np.nan
var[~pointing_mask] = np.nan
info = {}
flux, mvar = meancomb(slopes, robust=threshold, variance=var,
axis=0, returned=True, info=info)
if average_ramps:
return flux, np.sqrt(mvar)
else:
# data.shape : (ramps/spaxel, ramplength, nwave(16), ns(slopespaxel(25))
#ramps_per_spaxel, nramp, nwave, nspaxel = data.shape
# flag outliers from meancomb before calculating the variance
slopes[~info['mask']] = np.nan
var[~info['mask']] = np.nan
# 1st order linear fit
nr, nspe, nspa = slopes.shape
# reshape into ramps, pixel for easier thinking
slopes_r = np.reshape(slopes,(nr,nspe*nspa))
# Create arrays to store the coefficients and the fitted
# data for each pixel
coefficients = np.zeros((nspe*nspa, 2)) # 2 for slope and intercept
fitted_data = np.zeros_like(slopes_r)
# Loop over each pixel and perform a linear fit over the ramps
for i in range(nspe*nspa):
# Create a mask for non-NaN values
valid_mask = ~np.isnan(slopes_r[:, i])
x = np.arange(nr)[valid_mask]
y = slopes_r[:, i][valid_mask]
# get rid of empty pixels
# Empty pixels, i.e., pixels with no valid data points
# (all NaNs), will have coefficients set to [0, 0] and
# fitted_data will remain all zeros. This is because
# when there are no valid data points to perform the
# linear fit, linregress will return slope and
# intercept values of zero. We handle this situation by
# checking if len(x) > 0, which ensures that there is
# at least one valid data point to perform the linear
# fit. Tested by JupyterNotebook snippet.
if len(x) > 0:
slope, intercept, _, _, _ = linregress(x, y)
coefficients[i] = [slope, intercept]
fitted_data[:, i] = slope * np.arange(nr) + intercept
# get the difference between the fitted data and original
# data while reshaping back to ramps, spexel, spaxel
diff = (np.reshape(slopes_r,(nr,nspe,nspa))
- np.reshape(fitted_data,(nr,nspe,nspa)))
# Calculate variance over slopes
svar = np.nanvar(diff,0)
# np.nan_to_num(svar, copy=False, nan=0.0)
for k in nb.prange(data.shape[0]): # prange: loop can be parallised
for i in range(data.shape[2]):
for j in range(data.shape[3]):
if not np.isnan(var[k, i, j]) and not np.isnan(svar[i, j]):
var[k, i, j] = np.sqrt((var[k, i, j] * var[k, i, j])
+ (svar[i, j] * svar[i, j]))
sqrt_var = np.where(np.isnan(var), np.nan, np.sqrt(var))
return slopes, sqrt_var
[docs]
def process_extension(hdu, readout_range, rmplngth=2, threshold=5,
s2n=10, remove_first=None,
subtract_bias=True, badmask=None,
average_ramps=True, posdata=None,
indpos_sigma=3.0, pointing_mask=None):
"""
Wrapper to process a single HDU extension.
Parameters
----------
hdu : ImageHDU
input HDU extension
readout_range : array_like of int
(start readout, ramplength)
rmplngth : int, optional
Minimum ramp length
threshold : float, optional
Robust rejection threshold (in sigma) for combining slopes of
individual ramps.
s2n : float, optional
Signal-to-noise below which data will be considered questionable
and will be ignored. Set <= 0 to turn off signal-to-noise filtering.
remove_first : bool, optional
if True, remove the first two ramps prior to fitting
subtract_bias : bool, optional
If True, subtract the value of the empty zeroth pixel for each
spaxel prior to fitting.
badmask : numpy.ndarray, optional
(npoints, (spexel, spaxel)) array specifying indices of bad pixels
average_ramps : bool, optional
If set, data is averaged over ramps (default). If not, all ramps
are returned, as appropriate for OTF mode data, for which each ramp
is at a different sky position.
posdata : fits.Table, optional
OTF scan position data table. If provided and `average_ramps` is
set to False, then the input DLAM_MAP and DBET_MAP for each
readout are averaged over each ramp; the FLAG key is and-ed over
the ramp. The UNIXTIME for the first and last retained ramps
are averaged and added to the header as RAMPSTRT and RAMPEND
keywords, respectively.
indpos_sigma : float, optional
If >0, will be used to discard samples with grating
position that deviates from the expected INDPOS value by this
many sigma.
pointing_mask : pandas.Series, optional
Boolean mask used to discard ramps with bad pointing.
Returns
-------
flux, stddev : ImageHDU, ImageHDU
Optionally, a third element may be returned: a BinTableHDU containing
OTF position data, propagated from the `posdata` argument.
"""
if hdu.data is None:
log.error("No data in HDU")
return
data = hdu.data.copy()
image_hdr = hdu.header.copy()
indpos = image_hdr['INDPOS']
result = resize_data(data, readout_range, indpos,
remove_first_ramps=remove_first,
subtract_bias=subtract_bias,
indpos_sigma=indpos_sigma)
if result is None:
return
flux, bad_ramps = result
nramp = flux.shape[0]
nread = readout_range[1]
if pointing_mask is not None and remove_first and len(pointing_mask)>nramp:
pointing_mask = pointing_mask.iloc[2:]
# fit slopes to ramps, average if desired
# Different dimensions for different OBS Modes!!!
flux, stddev = fit_data(flux, s2n=s2n, threshold=threshold,
average_ramps=average_ramps,
bad_ramps=bad_ramps, rmplngth=rmplngth,
pointing_mask=pointing_mask)
# apply bad mask to spaxels/spexels
if isinstance(badmask, np.ndarray):
if average_ramps:
flux[badmask[:, 0], badmask[:, 1]] = np.nan
stddev[badmask[:, 0], badmask[:, 1]] = np.nan
else:
flux[:, badmask[:, 0], badmask[:, 1]] = np.nan
stddev[:, badmask[:, 0], badmask[:, 1]] = np.nan
# average position data for each ramp if available
# (but don't average over ramps)
if posdata is not None:
if average_ramps:
log.warning('Incompatible arguments: posdata cannot '
'be propagated with average_ramps=True.')
log.warning('Dropping scan position data from further '
'reduction.')
posdata = None
else:
# reshape: nreadout -> n_ramp, n_readout
if len(posdata) != (nramp * nread):
log.error("Number of readouts does not match header")
return
newshape = (nramp, nread)
# logical-and of flags over each ramp
reshaped = posdata['FLAG'].reshape(newshape)
flag = np.all(reshaped, axis=1)
# get first and last averaged ramp times
reshaped = posdata['UNIXTIME'].reshape(newshape)
ftime = np.mean(reshaped, axis=1)[flag]
hdinsert(image_hdr, 'RAMPSTRT', ftime[0],
'UNIX time of first ramp [s]')
hdinsert(image_hdr, 'RAMPEND', ftime[-1],
'UNIX time of last ramp [s]')
# new table with only good ramp-averaged position data
# todo: consider averaging spatially local ramps as well
# -> Christian says don't do it
pdata = Table()
for name in ['DLAM_MAP', 'DBET_MAP']:
reshaped = posdata[name].reshape(newshape)
pdata[name] = np.mean(reshaped, axis=1)[flag]
posdata = pdata
# trim flux ramps to useful range as well
if not flag.all():
log.debug(f'Trimming {(~flag).sum()} '
f'ramps outside scan motion')
flux = flux[flag, :, :]
stddev = stddev[flag, :, :]
# add a bg level header keyword
mflux = 0.0 if np.isnan(flux).all() else np.nanmean(flux)
hdinsert(image_hdr, 'BGLEVL_A', mflux,
comment='BG level nod A (ADU/s)')
# add a bg level header keyword
mstddev = 0.0 if np.isnan(stddev).all() else np.nanmean(stddev)
hdinsert(image_hdr, 'MSTDDEV', mstddev,
comment='Mean STDDEV (ADU/s)')
name = image_hdr['EXTNAME']
image_hdr['BUNIT'] = 'adu/s'
hdu1 = fits.ImageHDU(flux, header=image_hdr, name=name)
hdu2 = fits.ImageHDU(stddev, header=image_hdr,
name=name.replace('FLUX', 'STDDEV'))
if posdata is not None:
hdu3 = fits.BinTableHDU(posdata, name=name.replace('FLUX', 'SCANPOS'))
return hdu1, hdu2, hdu3
else:
return hdu1, hdu2
[docs]
def fit_ramps(filename, rmplngth=2, s2n=10, threshold=5, badpix_file=None,
write=False, outdir=None, remove_first=True,
subtract_bias=True, indpos_sigma=3.0, pointing_discard=False,
pointing_directory=None, pointing_threshold_psf_frac=0.1):
"""
Fit straight lines to raw voltage ramps to calculate corresponding flux.
If writing to disk, the output filename is created from the input
filename, with the suffix 'CP0' or 'CP1' replaced with 'RP0' or
'RP1'. The resulting HDU contains n_scan * 2 image extensions, for
FLUX and STDDEV data for each grating scan, named with a grating scan
index (e.g. FLUX_G0, STDDEV_G0, FLUX_G1, STDDDEV_G1, etc.).
For all data except OTF mode A nods, the output image arrays have
dimensions 25 x 16. For OTF A nods, the ramps are not averaged to
produce a single 2D array but rather propagated as a 3D cube, along
with separate scanning position data. Dimensions for these data
are 25 x 16 x n_ramp, and an additional table called SCANPOS_G0
is attached to the output file, containing the scan position data,
averaged over each ramp. It is assumed that OTF data files contain
only one grating scan.
Input files for this step should have been generated by
fifi_ls.split_grating_and_chop.
The procedure is:
Loop through the extensions, fitting ramps in each:
1. Remove the 26th spaxel (chopper values) and the first and
last spexel.
2. Remove the first ramp from each spaxel and the first and last
readout in each ramp.
3. Loop over spaxels and spexels, fitting each ramp with a line.
Record the slope and the error on the slope. Combine the
slopes from all ramps with a robust weighted mean. Record
the error on the mean as the error on the flux.
4. (optional) Create FITS file and write results to disk.
Parameters
----------
filename : str
Name of the chop-split, grating-split file (including the path
if not in the current working directory)
rmplngth : int, optional
Minimum ramp length
s2n : float, optional
Signal-to-noise below which data will be considered questionable
and will be ignored. Set <= 0 to turn off signal-to-noise filtering.
threshold : float, optional
Robust rejection threshold (in sigma) for combining slopes of
individual ramps.
badpix_file : str, optional
badpix file to be used. If not provided, a default file will be
retrieved from the data/badpix_files directory, matching the date
and channel of the input header. If an override file is provided,
it should be a text file containing two columns, spaxel and
spexel coordinates. Spaxels are numbered from 1 to 25, spexels
from 1 to 16.
write : bool, optional
If True, write the output to disk and return the filename instead
of the HDU.
outdir : str, optional
If writing to disk, use to set the output directory. By default the
output directory will be the same as the input filename location.
remove_first : bool, optional
If True, remove the first two ramps prior to fitting.
subtract_bias : bool, optional
If True, subtract the value of the empty zeroth pixel prior to
fitting.
indpos_sigma : float, optional
If >0, will be used to discard samples with grating
position that deviates from the expected INDPOS value by this
many sigma.
pointing_discard : bool, optional
If true, discard ramps with bad pointing.
pointing_directory : str, optional
Path to the directory containing the pointing error files used if
pointing discard is set to true.
pointing_threshold_psf_frac : float, optional
The relative pointing threshold is specified as a fraction of the FWHM
of the mean PSF within the respective nod phase. The slopes of ramps
with a pointing error greater than the defined threshold are set to NaN.
Returns
-------
fits.HDUList or str
Either the HDU (if write is False) or the filename of the output file
(if write is True)
"""
if isinstance(outdir, str):
if not os.path.isdir(outdir):
log.error("Output directory %s does not exist" % outdir)
return
hdul = gethdul(filename, verbose=True)
if hdul is None:
return
readout_range = get_readout_range(hdul[0].header)
if (not hasattr(readout_range, '__len__')
or len(readout_range) != 2
or None in readout_range):
log.error("Invalid readout range")
return
if not isinstance(filename, str):
filename = hdul[0].header['FILENAME']
if not isinstance(outdir, str):
outdir = os.path.dirname(filename)
if DEBUG or pointing_discard: # pragma: no cover
log.info(f'Working on: {filename}')
else:
log.debug(f'Working on: {filename}')
outfile = os.path.basename(filename).replace('CP', 'RP')
primehead = hdul[0].header.copy()
hdinsert(primehead, 'FILENAME', outfile)
hdinsert(primehead, 'PRODTYPE', 'ramps_fit')
if pointing_discard:
timing_params = get_timing(primehead)
df_pointing_error = load_pointing_data(primehead, pointing_directory)
fwhm_psf = get_resolution(header=primehead, spatial=True)
pointing_threshold = pointing_threshold_psf_frac * fwhm_psf
badmask = get_badpix(primehead, filename=badpix_file)
hdul_new = fits.HDUList([fits.PrimaryHDU(header=primehead)])
ngrating = primehead.get('NGRATING', 1)
remove_first_passed = remove_first
for idx in range(ngrating):
hdu = hdul[f'FLUX_G{idx}']
# check for OTF mode data
if f'SCANPOS_G{idx}' in hdul:
average_ramps = False
remove_first = False
posdata = hdul[f'SCANPOS_G{idx}'].data
pointing_mask = None # no ramps are discard for OTF scans
else:
average_ramps = True
remove_first = remove_first_passed
posdata = None
if pointing_discard:
pointing_mask = get_pointing_mask(primehead, pointing_threshold,
idx, *timing_params,
df_pointing_error)
log.info('%i ramps discarded at grating position %i ' \
'based on a pointing threshold of %.2f arcsec' %
(np.sum(~pointing_mask), idx, pointing_threshold))
else:
pointing_mask = None
ext = process_extension(
hdu, readout_range, threshold=threshold,
s2n=s2n, remove_first=remove_first,
subtract_bias=subtract_bias, badmask=badmask,
average_ramps=average_ramps, posdata=posdata,
indpos_sigma=indpos_sigma, rmplngth=rmplngth,
pointing_mask=pointing_mask)
if ext is None:
log.error("Failed to process extension %i: %s" %
(idx + 1, filename))
return
# append flux and stddev extensions
hdul_new.append(ext[0])
hdul_new.append(ext[1])
# append posdata too if present
if len(ext) > 2:
hdul_new.append(ext[2])
hdul_new[0].header['HISTORY'] = "Ramps fit; bad pixels flagged with NaN"
if remove_first:
hdul_new[0].header['HISTORY'] = "Ramps fit; removed first 2 ramps"
hdul_new[0].header['HISTORY'] = \
"Ramps fit; fitting on readouts [%i: %i]" % (
readout_range[0] + 1, readout_range[1] - 1)
if write:
return write_hdul(hdul_new, outdir=outdir, overwrite=True)
else:
return hdul_new
def fit_ramps_wrap_helper(_, kwargs, filename):
return fit_ramps(filename, **kwargs)
[docs]
def wrap_fit_ramps(files, s2n=30, threshold=5, badpix_file=None,
outdir=None, remove_first=True, subtract_bias=True,
indpos_sigma=3.0, allow_errors=False,
write=False, jobs=None, rmplngth=2, pointing_discard=False,
pointing_directory=None, pointing_threshold_psf_frac=0.1):
"""
Wrapper for fit_ramps over multiple files.
See `fit_ramps` for full description of reduction on a single file.
Parameters
----------
files : array_like of str
s2n : float, optional
threshold : float, optional
badpix_file : str, optional
outdir : str, optional
remove_first : bool, optional
subtract_bias : bool, optional
indpos_sigma : float, optional
allow_errors : bool, optional
If True, return all created files on error. Otherwise, return None
write : bool, optional
If True, write the output to disk and return the filename instead
of the HDU.
jobs : int, optional
Specifies the maximum number of concurrently running jobs.
Values of 0 or 1 will result in serial processing. A negative
value sets jobs to `n_cpus + 1 + jobs` such that -1 would use
all cpus, and -2 would use all but one cpu.
rmplngth : int, optional
Minimum ramp length
pointing_discard : bool, optional
If true, discard ramps with bad pointing.
pointing_directory : str, optional
Path to the directory containing the pointing error files used if
pointing discard is set to true.
pointing_threshold_psf_frac : float, optional
The relative pointing threshold is specified as a fraction of the FWHM
of the mean PSF within the respective nod phase. The slopes of ramps
with a pointing error greater than the defined threshold are set to NaN.
Returns
-------
tuple of str
output filenames written to disk
"""
if isinstance(files, str):
files = [files]
if not hasattr(files, '__len__'):
log.error("Invalid input files type (%s)" % repr(files))
return
clear_badpix_cache()
kwargs = {
's2n': s2n, 'threshold': threshold, 'badpix_file': badpix_file,
'outdir': outdir, 'remove_first': remove_first,
'subtract_bias': subtract_bias, 'indpos_sigma': indpos_sigma,
'write': write, 'rmplngth': rmplngth,
'pointing_discard': pointing_discard,
'pointing_directory': pointing_directory,
'pointing_threshold_psf_frac': pointing_threshold_psf_frac}
if DEBUG: # pragma: no cover
jobs = 1
output = multitask(fit_ramps_wrap_helper, files, None, kwargs,
jobs=jobs)
failure = False
result = []
for x in output:
if x is None:
failure = True
else:
result.append(x)
if failure:
if len(result) > 0:
if not allow_errors:
log.error("Errors were encountered but the following "
"files were created:\n%s" % '\n'.join(result))
return
clear_badpix_cache()
return tuple(result)