# Licensed under a 3-clause BSD style license - see LICENSE.rst
import re
from astropy import log
from astropy.io import fits
import numpy as np
from numpy.polynomial.polynomial import polyval
from sofia_redux.toolkit.utilities.fits import add_history_wrap, hdinsert, kref
from sofia_redux.instruments.forcast.getdetchan import getdetchan
from sofia_redux.instruments.forcast.getpar import getpar
addhist = add_history_wrap('Image nonlin')
__all__ = ['get_siglev', 'get_camera_and_capacitance',
'get_reference_scale', 'get_coefficients',
'get_coeff_limits', 'imgnonlin']
[docs]
def get_siglev(header):
"""
Return the signal level from the header
Parameters
----------
header : astropy.io.fits.header.Header
Returns
-------
numpy.ndarray
"""
siglev = getpar(header, 'NLINSLEV', dtype=str, default='NONE', warn=True,
comment='Signal level of background')
if siglev == 'NONE':
return
siglev = [float(x) for x in re.sub(r'[\[\]]', '', siglev).split(',')]
return np.array(siglev)
[docs]
def get_camera_and_capacitance(header):
"""
Read header and determine camera
Parameters
----------
header : astropy.io.fits.header.Header
Returns
-------
str
camera + capacitance. All uppercase. For example, 'LWCLO'
"""
detchan = getdetchan(header)
camera = 'SWC' if detchan == 'SW' else 'LWC'
epadu = getpar(header, 'EPERADU', dtype=int, default=None, warn=True)
if epadu is None:
return
cap = {136: 'Lo', 1294: 'Hi'}.get(epadu)
if cap is None:
return
return (camera + cap).upper().strip()
[docs]
def get_reference_scale(header, camcap, update=False):
"""
Get non-linearity scale from the header based on camera and capacitance
Parameters
----------
header : astropy.io.fits.header.Header
camcap : str
camera + 2-letter-capacitance, e.g., SWCHI, LWCLO, etc.
update : bool, optional
If True, update the header with a HISTORY message stating the
scale
Returns
-------
tuple of float
reference, scale
"""
refsig = getpar(header, 'NLR' + camcap,
dtype=float, default=9000, warn=True,
comment='count reference for linearity correction')
scale = getpar(header, 'NLS' + camcap,
dtype=float, default=refsig, warn=True,
comment='count scale for linearity correction')
if update:
addhist(header, 'Scale is %f' % scale)
return refsig, scale
[docs]
def get_coefficients(header, camcap, update=False):
"""
Get non-linearity coefficients from the header based on camera and
capacitance
Parameters
----------
header : astropy.io.fits.header.Header
camcap : str
camera + 2-letter-capacitance, e.g., SWCHI, LWCLO, etc.
update : bool, optional
If True, update the header with the coefficients. keys will
be NLINC# where # represents a number. The NLC + camap key
will also be removed as it does not fit in the line. A
HISTORY message will also be appended stating the read
coefficients
Returns
-------
numpy.ndarray
float coefficient values from header
"""
# Get coeff depending on camera and cap
cread = getpar(header, 'NLC' + camcap, dtype=str, default='NONE',
warn=True, comment='linearity correction coefficients')
if cread == 'NONE':
return
coeffs = [float(x) for x in re.sub(r'[\[\]]', '', cread).split(',')]
if update:
addhist(header, 'Coeff=%s' % cread)
key = ('NLC' + camcap)[:8]
if key in header:
del header[key]
for idx, val in enumerate(coeffs):
hdinsert(header, 'NLINC%s' % idx, val, refkey=kref,
comment='linearity correction coeff #%s' % idx)
return np.array(coeffs)
[docs]
def get_coeff_limits(header, camcap, update=False):
"""
Get non-linearity coefficient limits from the header based on camera
and capacitance
Parameters
----------
header : astropy.io.fits.header.Header
camcap : str
camera + 2-letter-capacitance, e.g., SWCHI, LWCLO, etc.
update : bool, optional
If True, update the header with HISTORY messages
Returns
-------
numpy.ndarray
float coefficient limit values from header of length 2
"""
# Get lims depending on camera and cap
limread = getpar(header, 'LIM' + camcap, dtype=str, default='NONE',
comment='linearity correction limits', warn=True)
if limread == 'NONE':
return
if update:
addhist(header, 'level limits are %s' % limread)
lims = [float(x) for x in re.sub(r'[\[\]]', '', limread).split(',')]
if len(lims) != 2:
return
return np.array(lims)
[docs]
def imgnonlin(data, header, siglev=None, variance=None):
"""
Corrects for non-linearity in detector response due to general background.
The header must contain the information to determine the camera. If
siglev is not passed, the header must also contain a keyword
(NLINSLEV) to indicate the background level and size of the section
used to calculate the level. In practice, this means that
sofia_redux.instruments.forcast.background should be run first to
calculate the background level.
Parameters
----------
data : numpy.ndarray
Input data array (nimage, nrow, ncol)
header : astropy.io.fits.header.Header
Input FITS header. Will be updated with a HISTORY message
siglev : array_like, optional
Background level. There should be a single value for each input
frame
variance : numpy.ndarray, optional
Variance array (nimage, nrow, ncol) to update in parallel with the
data frame.
Returns
-------
numpy.ndarray, numpy.ndarray
The linearity corrected array (nimage, nrow, ncol) or (nrow, ncol)
The propagated variance array (nimage, nrow, ncol) or (nrow, ncol)
"""
if not isinstance(header, fits.header.Header):
log.error("invalid header")
return
if not isinstance(data, np.ndarray) or len(data.shape) not in [2, 3]:
addhist(header, 'not corrected (invalid data)')
log.error("invalid data")
return
ndim = len(data.shape)
d = data.copy()
if ndim == 2:
d = np.array([d])
dovar = variance is not None and variance.shape == data.shape
var = variance.copy() if dovar else None
if variance is not None and not dovar:
addhist(header, 'Not propagating variance (invalid variance)')
log.error('invalid variance')
if ndim == 2:
var = np.array([var])
elif not dovar:
var = [None] * 3
# Read background level in header
if siglev is None:
siglev = get_siglev(header)
else:
siglev = np.array(siglev)
if not isinstance(siglev, np.ndarray):
addhist(header, 'not corrected (invalid signal levels)')
log.error('invalid signal levels')
return
elif len(siglev) != d.shape[0]:
addhist(header, 'not corrected (mismatch data and signal levels)')
log.error('signal size does not match data shape')
return
# Get the camera and capacitance level
camcap = get_camera_and_capacitance(header)
if camcap is None:
addhist(header, 'not corrected (unknown capacitance)')
log.error('E/ADU is not correctly defined')
return
log.info("Using camera %s with %s capacitance" % (camcap[:3], camcap[3:]))
# get non-linearity reference and scale
refscale = get_reference_scale(header, camcap, update=True)
# get non-linearity coefficients
coeffs = get_coefficients(header, camcap, update=True)
if coeffs is None:
addhist(header, 'not corrected (invalid non-linearity coefficients)')
log.error('invalid non-linearity coefficients')
return
# get non-linearity coefficient limits
lims = get_coeff_limits(header, camcap, update=True)
if lims is None:
msg = 'invalid limits of non-linearity coefficients'
addhist(header, 'not corrected (%s)' % msg)
log.error(msg)
return
# Check limits
outside = np.where((siglev < lims[0]) | (siglev > lims[1]))[0]
if len(outside) > 0:
hdinsert(header, 'NLINFLAG', True, refkey=kref,
comment='flag outside range levels in lin. correction')
for idx in outside:
msg = 'Signal level %s for plane %i outside range' % (
siglev[idx], idx)
addhist(header, msg)
log.warning("level %f outside fit range: %s" % (siglev[idx], lims))
addhist(header, 'not corrected (level outside correction range)')
log.warning('level outside correction range')
return
# Apply correction
plane = 0
xval = (siglev - refscale[0]) / refscale[1]
corr = polyval(xval, coeffs)
for factor, frame, vframe in zip(corr, d, var):
msg = 'factor of plane %s is %s' % (plane, factor)
log.info(msg)
addhist(header, msg)
frame /= factor
if dovar:
vframe /= factor ** 2
plane += 1
if ndim == 2:
d, var = d[0], var[0]
elif not dovar:
var = None
return d, var