# Licensed under a 3-clause BSD style license - see LICENSE.rst
from astropy import log
import numpy as np
from sofia_redux.instruments.exes.readraw import readraw
from sofia_redux.toolkit.fitting.polynomial import polyfitnd
from sofia_redux.toolkit.utilities.fits import set_log_level
__all__ = ['derasterize']
[docs]
def derasterize(data, header, dark_data=None, dark_header=None, overlap=32,
fit_buffer=5):
"""
Read and recombine a rasterized flat file.
When background fluxes are high, raster flats are taken in subarray
chunks to avoid saturating the detector. These chunks are stored in
a custom format in the raw FITS file and need special handling to
be reassembled into a full-frame flat for use with the science data.
The procedure is:
1. Divide the input cube into detector stripes.
2. Call `readraw` for each stripe to coadd raw readouts.
3. Use overlap regions to fit and correct for gain and offset
differences between stripes.
4. Assemble corrected stripes into a full 1024 x 1024 array.
Parameters
----------
data : numpy.ndarray
3D data cube [nframe, nspec, nspat].
header : fits.Header
Header of FITS file. May be updated in place.
dark_data : numpy.ndarray, optional
3D raw raster dark data [nframe, nspec, nspat]. Must match the
input data if provided.
dark_header : fits.Header, optional
Header of dark FITS file.
overlap : int, optional
The size of the overlap region in each detector stripe.
Returns
-------
deraster, variance, mask : 3-tuple of numpy.ndarray
The derasterized and coadded data, variance, and good data mask.
The deraster and variance arrays have shape (nframes, ny, nx).
The mask has shape (ny, nx) and Boolean data type, where True
indicates a good pixel; False indicates a bad pixel.
"""
# check for dark data
if dark_data is not None:
if dark_data.shape != data.shape:
message = f'Dark data has wrong dimensions {dark_data.shape}'
raise RuntimeError(message)
if dark_header is None:
message = 'Dark header must be provided with dark data'
raise RuntimeError(message)
# nx is 1024 + 8 reference pix
# ny is subarray stripe + overlap pixels
# nz is number of stripes x nframes per pattern x npattern
nz, ny, nx = data.shape
full_size = 1024
# stripe size
ns = ny - overlap
nstripe = full_size // ns
nframe = nz // nstripe
if full_size % ns != 0:
message = (f'Specified overlap of {overlap} rows '
f'does not match data dimensions {data.shape}.')
raise RuntimeError(message)
if nz % nstripe != 0:
message = (f'Number of stripes ({nstripe}) does not '
f'match data dimensions {data.shape}.')
raise RuntimeError(message)
deraster = np.zeros((1, full_size, full_size))
variance = np.zeros_like(deraster)
lin_mask = np.full((full_size, full_size), True)
# some fudge values for subarray location and edge effects
# TODO: Clean up
# fit_buffer = 5
bottom_buffer = 2
offset_fudge = 2
last_overlap = None
for i in range(nstripe):
log.info('')
log.info(f'Stripe {i + 1}')
log.info('')
zstart = i * nframe
zstop = zstart + nframe
raw_frames = data[zstart:zstop]
# 2nd and subsequent subarray seem to be offset by a couple
# pixels in y, and the bottom row or two look bad
if i == nstripe - 1:
# last stripe: use offset, clip the bottom,
# don't go beyond top row
fudge = offset_fudge
bottom = bottom_buffer
top = 0
raw_ystart = full_size - ns - overlap - fudge
elif i > 0:
# middle stripes: use offset, clip the bottom,
# expand the top into the overlap
fudge = offset_fudge
bottom = bottom_buffer
top = bottom_buffer
raw_ystart = i * ns - fudge
else:
# first stripe: no offset, don't clip the bottom,
# expand the top into the overlap
fudge = 0
bottom = 0
top = bottom_buffer
raw_ystart = 0
# index into the full output array
ystart = i * ns - fudge + bottom
ystop = i * ns + ns - fudge + top
# set the ectpat to extract the correct region of the bad pixel mask
raw_header = header.copy()
raw_ystop = ns + overlap
ectpat = f'0 1 {raw_ystart // 2} {raw_ystop // 2} 0 {nx}'
raw_header['ECTPAT'] = ectpat
raw_header['FRAMETIM'] *= full_size / ny
# coadd frames with simple destructive mode, no linearity,
# tossing first 2 ints
log.info('Reading flat frames')
coadd, var, lin = readraw(raw_frames, raw_header, algorithm=0,
do_lincor=False, toss_nint=2)
coadd = np.squeeze(coadd)
var = np.squeeze(var)
# directly subtract dark if available
if dark_data is not None:
raw_dark = dark_data[zstart:zstop]
raw_header = dark_header.copy()
raw_header['ECTPAT'] = ectpat
raw_header['FRAMETIM'] *= full_size / ny
log.info('Subtracting dark frames')
with set_log_level('WARNING'):
coadd_dark, _, _ = readraw(raw_dark, raw_header, algorithm=0,
do_lincor=False, toss_nint=2)
coadd -= np.squeeze(coadd_dark)
if i == 0:
# no correction for first stripe
corrected_coadd = coadd
last_overlap = corrected_coadd[
ns + fit_buffer:ns + overlap - fit_buffer,
fit_buffer:full_size - fit_buffer]
else:
# overlap with previous is at bottom of stripe array
if i != nstripe - 1:
overlap_section = coadd[
fit_buffer:overlap - fit_buffer,
fit_buffer:full_size - fit_buffer]
else:
# doubled for last stripe
overlap_section = coadd[
fit_buffer + overlap: 2 * overlap - fit_buffer,
fit_buffer:full_size - fit_buffer]
# derive fit gain/offset from overlap
diff = last_overlap - overlap_section
coeff = polyfitnd(overlap_section.ravel(), diff.ravel(),
1, robust=6.0)
# correct new stripe to previous
correction = coadd * coeff[1] + coeff[0]
corrected_coadd = coadd + correction
# keep top section for overlap with next stripe
last_overlap = corrected_coadd[
ns + fit_buffer:ns + overlap - fit_buffer,
fit_buffer:full_size - fit_buffer]
# place stripe in output flat
if i < nstripe - 1:
# all but last stripe: overlap is on top
deraster[0, ystart:ystop] = corrected_coadd[bottom:ns + top]
variance[0, ystart:ystop] = var[bottom:ns + top]
lin_mask[ystart:ystop] = lin[bottom:ns + top]
else:
# last stripe: overlap is on bottom
deraster[0, ystart:ystop] = corrected_coadd[overlap + bottom:]
variance[0, ystart:ystop] = var[overlap + bottom:]
lin_mask[ystart:ystop] = lin[overlap + bottom:]
# scale factor for subarray
deraster *= full_size / ny
variance *= (full_size / ny) ** 2
# set header keywords for future steps
header['DATASRC'] = 'CALIBRATION'
header['OBSTYPE'] = 'FLAT'
header['DETSEC'] = '[1:1024,1:1024]'
header['NSPAT'] = full_size
header['NSPEC'] = full_size
return deraster, variance, lin_mask