# Licensed under a 3-clause BSD style license - see LICENSE.rst
import warnings
from astropy import log
from astropy.io import fits
import numpy as np
from sofia_redux.toolkit.utilities.fits import add_history_wrap, hdinsert, kref
from sofia_redux.instruments.forcast.background import background
from sofia_redux.instruments.forcast.getpar import getpar
from sofia_redux.instruments.forcast.jbclean import jbclean
from sofia_redux.instruments.forcast.readmode import readmode
from sofia_redux.instruments.forcast.read_section import read_section
addhist = add_history_wrap('Stack')
__all__ = ['add_stacks', 'background_scale', 'stack_c2nc2',
'stack_map', 'stack_c3d', 'stack_cm',
'stack_stare', 'convert_to_electrons',
'subtract_background', 'stack']
[docs]
def add_stacks(data, header, variance=None):
"""
Add data frames together at the same stack (position)
The number of frames returned in the output will be equal
to floor(nframes / nstacks) if OTMODE is AD (All Destructive).
For SUR mode, the original data will be returned.
Parameters
----------
data : numpy.ndarray
3d data array to sum (nframes, nrow, ncol)
header : astropy.io.fits.header.Header
FITS header
variance : numpy.ndarray, optional
3d variance array to propagate (nframes, nrow, ncol)
Returns
-------
2-tuple
summed data array (npos, nrow, ncol)
propagate variance array (npos, nrow, ncol)
"""
if not isinstance(header, fits.header.Header):
msg = "stack failed (invalid header at add_stacks)"
log.error(msg)
return
if not isinstance(data, np.ndarray) or len(data.shape) != 3:
msg = "stack failed (invalid data at add_stacks)"
addhist(header, msg)
log.error(msg)
return
nframes = data.shape[0]
nstacks = getpar(header, 'OTSTACKS', dtype=int, default=None)
if nstacks is None:
msg = "stack failed (invalid OTSTACKS %s)" % nstacks
addhist(header, msg)
log.error(msg)
return
posdata = data.copy()
dovar = isinstance(variance, np.ndarray) and variance.shape == data.shape
posvar = variance.copy() if dovar else None
mode = getpar(header, 'OTMODE', dtype=str).upper().strip()
if mode == 'AD': # (All Destructive)
if nstacks <= 1:
return posdata, posvar
if nframes % nstacks > 0:
log.warning(
"sum_frames (AD) - nframes is not a multiple of nstacks")
log.warning("ignoring last frames")
npos = int(data.shape[0] / nstacks)
dout = np.full((npos, data.shape[-2], data.shape[-1]), np.nan)
vout = dout.copy() if dovar else None
for i in range(npos):
i0, i1 = i * nstacks, (i + 1) * nstacks
dout[i] = np.nansum(posdata[i0: i1], axis=0)
if dovar:
vout[i] = np.nansum(posvar[i0: i1], axis=0)
return dout, vout
else:
msg = 'stack failed (invalid OTMODE %s)' % mode
addhist(header, msg)
log.error(msg)
return
[docs]
def background_scale(data, header, mask=None):
"""
Return frame scale levels
Parameters
----------
data : numpy.ndarray
input data array (nframe, nrow, ncol)
header : astropy.io.fits.header.Header
mask : numpy.ndarray, optional
mask to pass into sofia_redux.instruments.forcast.background
(nrow, ncol). Defines pixels to include in background
calculation (True=include)
Returns
-------
numpy.array
scale factors for each frame or None if not applied
"""
bgscale = getpar(header, 'BGSCALE', default=False, dtype=bool,
comment='background scale')
if not bgscale:
return
addhist(header, 'All frames scaled to BG level of first chop position')
section = read_section(*data.shape[-2:])
return background(data, section, mask=mask)
[docs]
def stack_c2nc2(data, header, variance=None, bglevel=None, extra=None):
"""
Run the stacking algorithm on C2NC2 data
Calculates the chop-subtracted frames. For frame i (in steps of 2),
the resulting chop-subtracted frame would be:
chop = frame[i] - (frame[i+1] * bgscale[i]/bgscale[i+1])
chop frames are then summed
Parameters
----------
data : numpy.ndarray
(npos, nrow, ncol)
header : astropy.io.fits.header.Header
FITS header to update with HISTORY messages
variance : numpy.ndarray, optional
variance array to propagate (npos, nrow, ncol)
bglevel : array_like, optional
should be of length npos
extra : dict, optional
If set will be updated with:
chopsub -> numpy.ndarray (npos / 2, nrow, ncol)
chop-subtracted data
chopsub_var -> numpy.ndarray (npos / 2, nrow, ncol)
propagated chop-subtracted variance
Returns
-------
2-tuple
numpy.ndarray : The stacked data array (nrow, ncol)
numpy.ndarray : The propagated variance array (nrow, ncol)
"""
if not isinstance(data, np.ndarray) or len(data.shape) != 3:
msg = "stack failed (invalid data at stack_c2nc2)"
addhist(header, msg)
log.error(msg)
return
chopsub = data.copy()
dovar = isinstance(variance, np.ndarray) and variance.shape == data.shape
if not dovar and variance is not None:
msg = "variance not propagated (invalid variance at stack_c2nc2)"
addhist(header, msg)
log.warning(msg)
var = variance.copy() if dovar else None
on = np.array([i % 2 == 0 for i in range(data.shape[0])]).astype(bool)
scale = [1] * data.shape[0] if bglevel is None else bglevel
scale = np.array(scale)
if len(scale) != data.shape[0]:
msg = 'variance not propagated (invalid background at stack_c2nc2)'
addhist(header, msg)
log.error(msg)
return
chopscale = np.array([[scale[on] / scale[~on]]]).T
for val in chopscale:
addhist(header, 'Scaling for frame 2: %f' % val)
log.info('Scaling for frame 2: %f' % val)
chopsub = chopsub[on] - (chopsub[~on] * chopscale)
if dovar:
var = var[on] + (var[~on] * (chopscale ** 2))
if isinstance(extra, dict):
extra['chopsub'] = chopsub
return np.nansum(chopsub, axis=0), np.nansum(var, axis=0)
[docs]
def stack_map(data, header, variance=None, bglevel=None, extra=None):
"""
Run the stacking algorithm on MAP (Mapping mode) data
Calculates the chop and nod-subtracted frames. For each frame in a
set of 4 using scale s, the algorithm would be:
1. chop1 = frame1 - (frame2 * s1/s2)
2. chop2 = frame3 - (frame4 * s3/s4)
3. nod = chop1 - (chop2 * s1/s3)
4. result = sum(nods from each set of 4)
5. if NODBEAM = 'B', multiply by -1
If the number of frames is not divisible by four, frames at the
end will be clipped from any reductions. e.g., in the above
algorithm, if there were 10 frames, only frames 1-8 would be
included. i.e. steps 1-3 would be run on frames 1-4, then on
frames 5-8, and then the sum would be returned.
Parameters
----------
data : numpy.ndarray
(npos, nrow, ncol)
header : astropy.io.fits.header.Header
FITS header to update with HISTORY messages
variance : numpy.ndarray, optional
variance array to propagate (npos, nrow, ncol)
bglevel : array_like, optional
should be of length npos
extra : dict, optional
If set will be updated with:
chopsub -> numpy.ndarray (npos / 2, nrow, ncol)
chop-subtracted data
nodsub -> numpy.ndarray (npos / 4, nrow, ncol)
nod-subtracted data
Returns
-------
2-tuple
numpy.ndarray : The stacked data array (nrow, ncol)
numpy.ndarray : The propagated variance array (nrow, ncol)
"""
if not isinstance(data, np.ndarray) or len(data.shape) != 3 or \
data.shape[0] < 4:
msg = "stack failed (invalid data at stack_map)"
addhist(header, msg)
log.error(msg)
return
dovar = isinstance(variance, np.ndarray) and variance.shape == data.shape
if not dovar and variance is not None:
msg = "variance not propagated (invalid variance at stack_map)"
addhist(header, msg)
log.warning(msg)
var = variance.copy() if dovar else None
# Clip additional frames
d = data.copy()
additional_frames = data.shape[0] % 4
if additional_frames > 0:
msg = "invalid number of frames (ignoring last %s for stack_map)"
addhist(header, msg)
log.warning(msg)
d = d[:-additional_frames]
if dovar:
var = var[:-additional_frames]
if bglevel is not None:
bglevel = bglevel[:-additional_frames]
# Get ratio of chop scales
on = np.array([i % 2 == 0 for i in range(d.shape[0])]).astype(bool)
scale = [1] * d.shape[0] if bglevel is None else bglevel
scale = np.array(scale)
if len(scale) != d.shape[0]:
msg = 'stack failed (invalid background at stack_map)'
addhist(header, msg)
log.error(msg)
return
# Begin with the chops
chopscale = np.array([[scale[on] / scale[~on]]]).T
chopsub = d[on] - (d[~on] * chopscale)
chopsub_var = var[on] + (var[~on] * (chopscale ** 2)) if dovar else None
# Get ratio of nod scales
chopon_scale = scale[on]
mid = len(on) // 2
nodon = on[:mid]
# Apply nod scaling to chop-subtracted data data
nodscale = np.array([[chopon_scale[nodon] / chopon_scale[~nodon]]]).T
chopsub[~nodon] *= nodscale
# Get nod subtraction
nodsub = chopsub[nodon] - chopsub[~nodon]
if dovar:
chopsub_var[~nodon] *= nodscale ** 2
nodsub_var = chopsub_var[nodon] + chopsub_var[~nodon]
else:
nodsub_var = None
# Additional reporting to the user and header
for i in range(d.shape[0] // 4):
scales = chopscale[i * 2], chopscale[i * 2 + 1], nodscale[i]
if bglevel is not None:
addhist(header, 'Scaling factors for frames '
'2,3,4: %f,%f,%f' % scales)
log.info('Scaling factors for frames '
'2,3,4: %f,%f,%f' % scales)
if isinstance(extra, dict):
extra['chopsub'] = chopsub
extra['nodsub'] = nodsub
# Check whether data is A or B beam, multiply by -1 if B
sign = -1 if getpar(header, 'NODBEAM') == 'B' else 1
return np.nansum(nodsub, axis=0) * sign, np.nansum(nodsub_var, axis=0)
[docs]
def stack_c3d(data, header, variance=None, extra=None):
"""
Run the stacking algorithm on C3D data (3 position chop with dither)
result = frame1 - frame2 - frame3
Parameters
----------
data : numpy.ndarray
(3, nrow, ncol)
header : astropy.io.fits.header.Header
FITS header to update with HISTORY messages
variance : numpy.ndarray, optional
variance array to propagate (3, nrow, ncol)
extra : dict, optional
If set will be updated with:
chopsub -> numpy.ndarray (nrow, ncol)
chop-subtracted data (same as output data in this case)
Returns
-------
2-tuple
numpy.ndarray : The stacked data array (nrow, ncol)
numpy.ndarray : The propagated variance array (nrow, ncol)
"""
if not isinstance(data, np.ndarray) or len(data.shape) != 3 or \
data.shape[0] != 3:
msg = "stack failed (invalid data at stack_c3d)"
addhist(header, msg)
log.error(msg)
return
chopsub = data.copy()
dovar = isinstance(variance, np.ndarray) and variance.shape == data.shape
if not dovar and variance is not None:
msg = "variance not propagated (invalid variance at stack_c3d)"
addhist(header, msg)
log.warning(msg)
var = variance.copy() if dovar else None
chopsub[1:] *= -1
chopsub = np.nansum(chopsub, axis=0)
var = np.nansum(var, axis=0)
if isinstance(extra, dict):
extra['chopsub'] = chopsub.copy()
return chopsub, var
[docs]
def stack_cm(data, header, variance=None, extra=None):
"""
Run the stacking algorithm on CM data (multi-position chop)
result = frame1 - frame2 - frame3 - ... - frameN
Parameters
----------
data : numpy.ndarray
(nframe, nrow, ncol)
header : astropy.io.fits.header.Header
FITS header to update with HISTORY messages
variance : numpy.ndarray, optional
variance array to propagate (nframe, nrow, ncol)
extra : dict, optional
If set will be updated with:
chopsub -> numpy.ndarray (nrow, ncol)
chop-subtracted data (same as output data in this case)
Returns
-------
2-tuple
numpy.ndarray : The stacked data array (nrow, ncol)
numpy.ndarray : The propagated variance array (nrow, ncol)
"""
if not isinstance(data, np.ndarray) or len(data.shape) != 3 or \
len(data.shape) < 2:
msg = "stack failed (invalid data at stack_cm)"
addhist(header, msg)
log.error(msg)
return
chopsub = data.copy()
dovar = isinstance(variance, np.ndarray) and variance.shape == data.shape
if not dovar and variance is not None:
msg = "variance not propagated (invalid variance at stack_cm)"
addhist(header, msg)
log.warning(msg)
var = variance.copy() if dovar else None
# allow subtraction on all frames if not found in header
nchop = getpar(
header, 'CHPNPOS', dtype=int, default=data.shape[0], warn=True)
if nchop != data.shape[0]:
msg = "data shape mismatch with CHPNPOS (stack_cm) "
msg += "- ignoring last frames"
addhist(header, msg)
log.warning(msg)
chopsub = chopsub[:nchop]
if dovar:
var = var[:nchop]
chopsub[1:] *= -1
chopsub = np.nansum(chopsub, axis=0)
var = np.nansum(var, axis=0)
if isinstance(extra, dict):
extra['chopsub'] = chopsub.copy()
return chopsub, var
[docs]
def stack_stare(data, header, variance=None):
"""
Run the stacking algorithm on STARE data
result = median of all frames
The variance is approximated as (pi/2) times the variance of a mean
operation.
Parameters
----------
data : numpy.ndarray
(nframes, nrow, ncol)
header : astropy.io.fits.header.Header
FITS header to update with HISTORY messages
variance : numpy.ndarray, optional
variance array to propagate (nframes, nrow, ncol)
Returns
-------
2-tuple
numpy.ndarray : The stacked data array (nrow, ncol)
numpy.ndarray : The propagated variance array (nrow, ncol)
"""
if not isinstance(data, np.ndarray) or len(data.shape) != 3:
msg = "stack failed (invalid data at stack_stare)"
addhist(header, msg)
log.error(msg)
return
# if only one plane, return input
if data.shape[0] == 1:
return data, variance
result = data.copy()
dovar = isinstance(variance, np.ndarray) and variance.shape == data.shape
if not dovar and variance is not None:
msg = "variance not propagated (invalid variance at stack_stare)"
addhist(header, msg)
log.warning(msg)
var = variance.copy() if dovar else None
with warnings.catch_warnings():
warnings.simplefilter('ignore')
result = np.nanmedian(result, axis=0)
if dovar:
weight = np.sum(~np.isnan(var), axis=0)
nzi = weight != 0
var = np.nansum(var, axis=0)
var[nzi] *= 0.5 * np.pi / (weight[nzi] ** 2)
var[~nzi] = np.nan
return result, var
[docs]
def convert_to_electrons(data, header, variance=None):
"""
Convert data to mega-electrons per seconds
The following keywords must exist in the header
in order to calculate the conversion
factor:
- FRMRATE: frame rate (Hz)
- EPERADU: electrons/adu
Parameters
----------
data : numpy.ndarray
data to scale
header : astropy.io.fits.header.Header
FITS header
variance : numpy.ndarray, optional
variance to propagate
Returns
-------
2-tuple
numpy.ndarray : data in units of mega-electrons per second
numpy.ndarray : propagated variance
"""
vals = []
for key in ['FRMRATE', 'EPERADU']:
vals.append(getpar(header, key, dtype=float, default=None, warn=True))
if None in vals:
msg = "Could not convert to electrons (invalid FRMRATE/EPERADU)"
addhist(header, msg)
log.error(msg)
return
factor = 1e-6 * vals[0] * vals[1]
data *= factor
if variance is not None:
variance *= factor ** 2
addhist(header, 'Counts converted to millions of e/s using')
addhist(header, 'FRMRATE=%s and EPERADU=%s' % (vals[0], vals[1]))
return data, variance
[docs]
def subtract_background(data, header=None, mask=None, stat='mode'):
"""
Remove background from data
Parameters
----------
data : numpy.ndarray
input data (nrow, ncol)
header : astropy.io.fits.header.Header, optional
FITS header to update with HISTORY messages
mask : numpy.ndarray, optional
mask defining which pixels to use for background calculation.
True=Use.
stat : {'mode', 'median'}
Statistic to use in calculating the residual background.
Returns
-------
numpy.ndarray
background subtracted data (nrow, ncol)
"""
if header is None:
header = fits.header.Header()
if not isinstance(data, np.ndarray) or len(data.shape) not in [2, 3]:
msg = "background not subtracted (invalid data)"
addhist(header, msg)
log.warning(msg)
return
if stat not in ['mode', 'median']:
log.warning('Unrecognized background statistic; using stat=mode.')
stat = 'mode'
result = data.copy()
ndim = len(data.shape)
if ndim == 2:
result = np.array([result])
if not getpar(header, 'BGSUB', dtype=bool, default=False,
comment='residual background subtracted after stack'):
return
section = read_section(data.shape[-1], data.shape[-2])
bglevel = background(data, section, stat=stat, mask=mask)
if bglevel is None:
msg = "could not subtract background (invalid background)"
addhist(header, msg)
log.warning(msg)
return
for frame, sub in zip(result, bglevel):
msg = "Removing BG Level: %s" % sub
addhist(header, msg)
log.info(msg)
frame -= sub
if ndim == 2:
result = result[0]
return result
[docs]
def stack(data, header, variance=None, mask=None, extra=None, stat='mode'):
"""
Subtracts chop/nod pairs to remove astronomical/telescope background
This function uses sofia_redux.instruments.forcast.readmode
to read the chop/nod mode from the header; this determines how the
frames are subtracted. If the BGSUB keyword is set to True in the
configuration file, then any residual background level will be subtracted
from the stacked data. If the BGSCALE keyword is set to True in the
configuration file, the individual frames will be multiplicatively
scaled to the same level before subtraction. If the JBCLEAN keyword
is set to median, sofia_redux.instruments.forcast.jbclean will be
called on the stacked data. At the end of the function, the units
will be converted to mega-electrons per second, using FRMRATE and
EPERADU keywords in the header.
Parameters
----------
data : numpy.ndarray
Input data array (nimage, nrow, ncol)
header : astropy.io.fits.header.Header
Input FITS header; will be updated with HISTORY message
variance : numpy.ndarray, optional
Variance array (nimage, nrow, ncol) to update in parallel
with the data array.
mask : numpy.ndarray, optional
(col, row) Illumination mask to indicate regions of the image to
use in calculating the background (True = good)
extra : dict, optional
If provided, will be updated with:
posdata -> numpy.ndarray: summed stacks
chopsub -> numpy.ndarray: chop-subtracted data
nodsub -> numpy.ndarray: nod-subtracted data
stat : {'mode', 'median'}
Statistic to use in calculating the residual background.
Returns
-------
2-tuple
- numpy.ndarray (nrow, ncol) stacked data
- numpy.ndarray (nrow, ncol)
"""
if not isinstance(header, fits.header.Header):
header = fits.header.Header()
msg = "Invalid image header"
addhist(header, msg)
log.error(msg)
return
if not isinstance(data, np.ndarray) or len(data.shape) not in [2, 3]:
msg = 'Stack was not applied (Invalid data)'
addhist(header, msg)
log.error(msg)
return
dovar = isinstance(variance, np.ndarray) and variance.shape == data.shape
if variance is not None and not dovar:
msg = "Not propagating variance (invalid variance)"
addhist(header, msg)
log.warning(msg)
var = variance.copy() if dovar else None
if mask is not None:
if not isinstance(mask, np.ndarray) or mask.shape != data.shape[-2:]:
msg = "background mask invalid"
addhist(header, msg)
log.warning(msg)
mask = None
else:
mask = mask.astype(bool)
# Get everything 3-D (nframes, nrow, ncol)
ndim = len(data.shape)
if ndim == 2:
posdata = np.array([data.copy()])
if dovar:
var = np.array([var])
else:
posdata = data.copy()
# Add frames
result = add_stacks(posdata, header, variance=var)
if result is None:
log.error("stack failed")
return
posdata, var = result
if isinstance(extra, dict):
extra['posdata'] = posdata
bglevel = background_scale(posdata, header, mask=mask)
mode = readmode(header)
# run appropriate method
c2_modes = ['C2', 'C2NC2']
map_modes = ['NAS', 'NOS', 'C2NC4', 'NXCAC', 'C2N', 'NMC',
'NPC', 'NPCCAS', 'NPCNAS', 'C2ND', 'SLITSCAN',
'SLITSCAN_NMC', 'SLITSCAN_NXCAC', 'MAP']
if mode in c2_modes:
result = stack_c2nc2(posdata, header, variance=var,
bglevel=bglevel, extra=extra)
elif mode in map_modes:
result = stack_map(posdata, header, variance=var,
bglevel=bglevel, extra=extra)
elif mode == 'C3D':
result = stack_c3d(
posdata, header, variance=var, extra=extra)
elif mode == 'CM':
result = stack_cm(
posdata, header, variance=var, extra=extra)
elif mode == 'STARE':
result = stack_stare(posdata, header, variance=var)
else:
msg = "Stack failed (invalid instrument mode)"
addhist(header, msg)
log.error(msg)
return
if result is None:
msg = "Aborting stack - stacking failed"
addhist(header, msg)
log.error(msg)
return
else:
stacked, var = result
# add NaNs back in where result is identically zero
stacked[stacked == 0] = np.nan
if var is not None:
var[var == 0] = np.nan
if mode != 'STARE':
choptsa = getpar(header, 'CHOPTSAC', default=None, dtype=int,
comment="Chop convention. Should be -1 after OC1B")
if choptsa is None:
msg = "Value of chop tsa convention was " \
"not found. Using default: -1"
elif abs(choptsa) == 1:
msg = "Value of chop tsa convention is %i" % choptsa
stacked = -1 * choptsa * stacked
else:
msg = "Value of chop tsa convention (%i) " % choptsa
msg += "is different from -1, 1. "
msg += "Using default value: -1"
addhist(header, msg)
log.info(msg)
# Clean jailbars if JBCLEAN='MEDIAN'. If method = is 'FFT', cleaning
# is done in sofia_redux.instruments.forcast.clean instead.
jbmethod = getpar(header, 'JBCLEAN', dtype=str, default='x',
comment="Jail bar cleaning algorithm")
if jbmethod.strip().lower() == 'median':
result = jbclean(stacked, header, variance=var)
if result is None:
msg = "Jailbars not removed (jbclean MEDIAN method failed)"
addhist(header, msg)
log.warning(msg)
else:
log.info('Jailbars cleaned with MEDIAN method')
stacked, var = result
else:
log.info('Jailbars not removed')
# Convert to mega-electrons per second
result = convert_to_electrons(stacked, header, variance=var)
if result is None:
# Not a fatal error
msg = "Could not convert to Mega-electrons per second"
addhist(header, msg)
log.warning(msg)
else:
stacked, var = result
# Remove any residual background remaining
bgsub = subtract_background(stacked, header=header, mask=mask, stat=stat)
if isinstance(bgsub, np.ndarray):
addhist(header, "Residual background subtracted from final stack")
stacked = bgsub
# update the product type
hdinsert(header, 'PRODTYPE', 'STACKED', refkey=kref)
return stacked, var