# Licensed under a 3-clause BSD style license - see LICENSE.rst
from astropy import log
import astropy.constants as const
import astropy.units as u
import numpy as np
from sofia_redux.instruments.exes.clean import clean
from sofia_redux.instruments.exes.derive_tort import derive_tort
from sofia_redux.instruments.exes.tortcoord import tortcoord
from sofia_redux.instruments.exes.utils import parse_central_wavenumber
from sofia_redux.toolkit.stats import meancomb
__all__ = ['makeflat', 'blackbody_pnu', 'bnu', 'bb_cal_factor']
[docs]
def makeflat(cards, header, variance, robust=4.0, radius=10,
black_frame=None, dark=None,
fix_tort=False, edge_method='deriv', custom_wavemap=None,
start_pixel=None, end_pixel=None,
top_pixel=None, bottom_pixel=None):
"""
Generate calibrated flat frame; set distortion parameters.
The procedure is:
1. Black, sky, and shiny frames are identified. Typically,
only black frames are present for EXES flats.
2. The black frame is used to set and test distortion parameters.
3. A difference frame is calculated (typically black-dark)
and normalized by the black-body function at the ambient
temperature (hdr['BBTEMP']).
4. The inverse frame (bb / (black - dark)) is calculated.
5. Unilluminated pixels are set to zero in the inverse frame.
Calibration is performed by multiplying science data by the output frame.
Parameters
----------
cards : numpy.ndarray
3D cube [nframe, nspec, nspat] or 2D image [nspec, nspat] containing
flat frames (black, sky, or shiny).
header : fits.Header
FITS header for the flat file.
variance : numpy.ndarray
Variance array, matching `cards` shape.
robust : float, optional
Threshold for outlier rejection in robust mean combination, specified
as a factor times the standard deviation.
radius : int, optional
Pixel radius to search for good pixels, used for interpolation over
bad pixels in the flat frames.
black_frame : int, optional
Index of the black frame in the input `cards` (typically 0). If
not provided, the black frame is set as the card with the highest
mean value.
dark : numpy.ndarray, optional
Slit dark frame image [nspec, nspat]. If provided, and the input
flat has a black card only, it will be subtracted from the black
frame to make the difference image.
fix_tort : bool, optional
If True, no attempt will be made to optimize distortion parameters
for cross-dispersed modes.
edge_method : {'deriv', 'sqderiv', 'sobel'}, optional
Sets the edge enhancement method for optimizing the cross-dispersed
distortion parameters. May be one derivative ('deriv'),
squared derivative ('sqderiv'), or Sobel ('sobel'). custom_wavemap
custom_wavemap : str or bool, optional
Filename for a text file containing explicit order edges for a
cross-dispersed mode, as whitespace-separated integers indicating
bottom, top, left, and right edges for each order (one per line).
If set, it is used to directly set the order mask for a
cross-dispersed mode. If set to a value other than a string or
None, a 'customWVM.dat' file is expected in the current directory.
top_pixel : int, optional
If provided, is used to directly set the top edge of the order.
Used for single-order modes only (medium, low); ignored for
cross-dispersed modes.
bottom_pixel : int, optional
If provided, is used to directly set the bottom edge of the order.
Used for single-order modes only (medium, low); ignored for
cross-dispersed modes.
start_pixel : int, optional
If provided, is used to directly set the left edge of all orders.
May be used for either single-order or cross-dispersed modes.
end_pixel : int, optional
If provided, is used to directly set the right edge of all orders.
May be used for either single-order or cross-dispersed modes.
Returns
-------
"""
params = _check_inputs(cards, header, variance,
robust=robust, dark=dark, radius=radius)
_set_black_frame(params, black_frame)
_set_shiny_and_sky_frames(params)
_set_process_type(params)
_check_saturation(params)
_process_cards(params)
_calculate_responsive_quantum_efficiency(params)
_undistort_flat(params, edge_method=edge_method,
fix_tort=fix_tort, custom_wavemap=custom_wavemap,
start_pixel=start_pixel,
end_pixel=end_pixel, top_pixel=top_pixel,
bottom_pixel=bottom_pixel)
# If no cards/flat desired, set them to 1, now that testtort is done
if params['cardmode'] in ['NONE', 'UNKNOWN']:
shape = params['ny'], params['nx']
params['cards'].fill(1.0)
params['flat_variance'] = np.zeros(shape, dtype=np.float64)
params['flat'] = np.ones(shape, dtype=np.float64)
params['illum'] = np.ones(shape, dtype=int)
return params
_create_flat(params, robust=robust)
return params
[docs]
def blackbody_pnu(wavenumber, temperature):
"""
Black-body photon function.
Accepts arrays for wavenumber and/or temperature, as long
as their shapes can be broadcast together.
Parameters
----------
wavenumber : float or array-like of float
Wavenumber values for computing the black-body function.
temperature : float or array-like of float
Temperature values for computing the black-body function.
Returns
-------
pnu : float or array-like of float
Photons at input values, in Hz/cm.
"""
t = u.Quantity(temperature, 'Kelvin')
v = u.Quantity(wavenumber, 'Kayser').to('Hz', equivalencies=u.spectral())
pnu = 2 * (v ** 2) / const.c
pnu /= np.exp((const.h * v) / (const.k_B * t)) - 1.0
return pnu.to(u.Hz / u.cm).value
[docs]
def bnu(wavenumber, temperature):
"""
Black-body intensity function.
Accepts arrays for wavenumber and/or temperature, as long
as their shapes can be broadcast together.
Parameters
----------
wavenumber : float or array-like of float
Wavenumber values for computing the black-body function.
temperature : float or array-like of float
Temperature values for computing the black-body function.
Returns
-------
bnu : float or array-like of float
Blackbody intensity in erg s-1 cm-2 (cm-1)-1.
"""
t = u.Quantity(temperature, 'Kelvin')
v = u.Quantity(wavenumber, 'Kayser').to('Hz', equivalencies=u.spectral())
bnu = 2 * const.h * (v ** 3) / const.c
bnu /= np.exp((const.h * v) / (const.k_B * t)) - 1.0
return bnu.value * 1e5
[docs]
def bb_cal_factor(wavenumber, bb_temp, flat_tamb, flat_emis):
"""
Calibration factor for EXES blackbody source + flat mirror.
The EXES blackbody plate is located outside of the dewar,
with one reflection off of a flat mirror before entering the
EXES window. This flat field system seems to produce photons
like the sum of two blackbodies:
B(BB_TEMP, lambda) * (1-emissivity) + emissivity * B(T_ambient),
where emissivity is (1-reflectance) of the flat mirror.
Expected values are emissivity = 0.1. (reflectance = 0.9) and
T_ambient = 290 K.
Parameters
----------
wavenumber : float or array-like of float
Wavenumber values for computing the black-body function.
bb_temp : float
Blackbody source temperature value.
flat_tamb : float
Ambient temperature for flat mirror.
flat_emis : float
Emissivity factor (1 - reflectance) for the flat mirror.
Returns
-------
"""
bnu_t = bnu(wavenumber, bb_temp)
bnu_t_amb = bnu(wavenumber, flat_tamb)
cal_factor = bnu_t * (1 - flat_emis) + bnu_t_amb * flat_emis
return cal_factor
def _check_inputs(cards, header, variance, robust=4.0, dark=None, radius=10):
"""Check input dimensions and modes."""
cards = np.asarray(cards, dtype=float)
variance = np.asarray(variance, dtype=float)
if cards.shape != variance.shape:
raise ValueError("Card shape does not match variance shape")
cardmode = str(header['CARDMODE']).strip().upper()
instcfg = str(header['INSTCFG']).strip().upper()
nx = int(header['NSPAT'])
ny = int(header['NSPEC'])
eperadu = float(header['EPERADU'])
slitval = float(header['SLITVAL'])
temp = float(header['BB_TEMP'])
flat_tamb = float(header['FLATTAMB'])
flat_emis = float(header['FLATEMIS'])
pixel_width = float(header['PIXELWD'])
frametime = float(header['FRAMETIM'])
gain = float(header['PAGAIN'])
satval = float(header['SATVAL'])
# Planned or updated central wavenumber
waveno0 = parse_central_wavenumber(header)
if dark is not None:
dark = np.asarray(dark, dtype=float)
if dark.shape != (ny, nx):
raise ValueError("Dark does not match expected shape from header")
# Check no cards or tort, or if tort is from object
if cardmode in ['NONE', 'UNKNOWN']:
raise ValueError("CARDMODE is unspecified")
# If camera, then use flatmode = shiny or sky
if instcfg == 'CAMERA':
if cardmode != 'SKY':
log.info("Setting flatmode = shiny for camera mode")
cardmode = 'SHINY'
# Get parameters for instrument configuration
if instcfg in ['HIGH_MED', 'HIGH_LOW']:
focal_length = float(header['HRFL0'])
r_number = float(header['HRR'])
else:
focal_length = float(header['XDFL0'])
r_number = float(header['XDR'])
frgain = frametime * gain
if frgain > 0:
maxval = satval / frgain
else:
maxval = satval
# Check cards
if cards.ndim == 2:
cards = cards[None]
ncards = cards.shape[0]
# Check the number of cards against mode
if ncards == 1:
cardmode = 'BLK'
elif ncards < 3 and cardmode in ['SHINY', 'BLKSHINY']:
log.warning("Shiny frame not read. Changing cardmode to BLKSKY")
cardmode = 'BLKSKY'
# Check the values in each card
means = np.empty(ncards, dtype=float)
for i in range(ncards):
means[i] = meancomb(cards[i].ravel(), robust=robust, returned=False)
bad_frames = np.isnan(means)
if bad_frames.any():
badidx = np.nonzero(bad_frames)[0]
for frame in badidx:
msg = f"Bad data in flat frame {frame}"
if frame < 2:
raise ValueError(f"Cannot proceed: {msg}")
elif cardmode in ['SHINY', 'BLKSHINY']:
log.warning(msg)
log.info("Changing mode to BLKSKY")
cardmode = 'BLKSKY'
else:
log.info(msg)
log.info(f"This is allowable for frame: {frame}")
return {
'cards': cards,
'variance': variance,
'header': header,
'dark': dark,
'cardmode': cardmode,
'instcfg': instcfg,
'ncards': ncards,
'nx': nx,
'ny': ny,
'eperadu': eperadu,
'slitval': slitval,
'temp': temp,
'flat_tamb': flat_tamb,
'flat_emis': flat_emis,
'waveno0': waveno0,
'focal_length': focal_length,
'r_number': r_number,
'maxval': maxval,
'pixel_width': pixel_width,
'card_means': means,
'radius': radius
}
def _set_black_frame(params, black_frame):
"""Set the black frame index."""
if black_frame is not None:
try:
black_frame = int(black_frame)
except (TypeError, ValueError):
black_frame = -1
if not (0 <= black_frame < params['ncards']):
raise ValueError(f"Cannot use black_frame={black_frame} "
f"for {params['ncards']} cards")
elif params['cardmode'] == 'SKY':
black_frame = 0
else:
black_frame = np.argmax(params['card_means'])
log.info(f"Flat frame {black_frame} is brightest")
params['black_frame'] = black_frame
def _set_shiny_and_sky_frames(params):
"""Set shiny and sky frame indices."""
# Note: this logic was copied from the original TEXES pipeline,
# but shiny has never been used for EXES
black_frame = params['black_frame']
nc = params['ncards']
means = params['card_means']
if black_frame == 0:
shiny_frame = 2
sky_frame = 3 if (nc == 4 and (means[3] < means[1])) else 1
elif black_frame == 1:
shiny_frame = 3
sky_frame = 2 if (nc == 4 and (means[2] < means[0])) else 0
else:
shiny_frame = black_frame - 2
sky_frame = black_frame - 1
if shiny_frame >= nc:
shiny_frame = black_frame
if sky_frame >= nc:
sky_frame = black_frame
if sky_frame >= 2:
sky_frame2 = sky_frame - 2
else:
sky_frame2 = sky_frame + 2
if sky_frame2 >= nc:
sky_frame2 = sky_frame
params['sky_frame'] = sky_frame
params['sky_frame2'] = sky_frame2
params['shiny_frame'] = shiny_frame
def _set_process_type(params):
"""Check cardmode is correct for current parameters."""
cardmode = params['cardmode']
black_frame = params['black_frame']
shiny_frame = params['shiny_frame']
if cardmode in ['BLK', 'NONE', 'UNKNOWN']:
process_type = 'BLK'
elif cardmode in ['SKY']:
process_type = 'SKY'
elif cardmode in ['SHINY'] and shiny_frame != black_frame:
process_type = 'SHINY'
elif cardmode in ['BLKSKY', 'OBJ', 'BLKOBJ']:
process_type = 'BLKSKY'
elif cardmode in ['BLKSHINY'] and shiny_frame != black_frame:
process_type = 'BLKSHINY'
else:
if shiny_frame != black_frame:
raise ValueError(f"Unrecognizable cardmode: {cardmode}")
else:
raise ValueError("Cardmode is unusable without shiny frame")
params['process_type'] = process_type
def _check_saturation(params, max_saturation=0.04):
"""Generate a saturation mask and check for too many bad pixels."""
process_type = params['process_type']
maxval = params['maxval']
if maxval <= 0:
mask = np.full((params['ny'], params['nx']), True)
elif process_type == 'SKY':
mask = params['cards'][params['sky_frame']] <= maxval
elif process_type == 'SHINY':
mask = params['cards'][params['shiny_frame']] <= maxval
else:
mask = params['cards'][params['black_frame']] <= maxval
saturated = np.sum(~mask)
if saturated > (max_saturation * params['nx']):
msg = f"{saturated} pixels saturated in black."
if process_type != 'SHINY':
msg += " Try using cardmode='SHINY'."
raise ValueError(msg)
params['mask'] = mask
def _process_cards(params):
"""Process flat cards according to mode."""
process_type = params['process_type']
if process_type == 'BLK':
_process_blk(params)
elif process_type == 'SKY':
_process_sky(params)
elif process_type == 'SHINY':
_process_shiny(params)
elif process_type == 'BLKSKY':
_process_blksky(params)
elif process_type == 'BLKSHINY':
_process_blkshiny(params)
else:
raise ValueError("Unknown process type")
# at this point, there should be a card1 and card2 in params.
# Replace the cards with this set
params['cards'] = np.array([params['card1'], params['card2']])
def _process_blk(params):
"""Make diff and stddev frames for BLK mode."""
# Accounts for change in the EXES observing pattern of only doing
# stares at the blackbody + slit darks
log.info("Processing BLK type cards:")
shape = params['ny'], params['nx']
cards = params['cards']
black_frame = params['black_frame']
if params['dark'] is None:
log.info('No slit dark available.')
# Mark saturated pixels
black_card = cards[black_frame]
card1 = black_card.copy()
diff = card1.copy()
card2 = np.zeros(shape, dtype=float)
if params['sky_frame'] != black_frame:
sky_card = cards[params['sky_frame']]
nzi = black_card != 0
card2[nzi] = (black_card[nzi] - sky_card[nzi]) / black_card[nzi]
params['card_variance'] = params['variance'][black_frame].copy()
else:
log.info('Subtracting slit dark.')
black_card = cards[0]
card1 = black_card.copy()
card2 = black_card - params['dark'].copy()
diff = card2.copy()
card2[black_card == 0] = 0
params['card_variance'] = params['variance'][0].copy()
params['card1'] = card1
params['card2'] = card2
params['diff'] = diff
params['stddev'] = np.sqrt(params['card_variance'])
def _process_sky(params):
"""Make diff and stddev frames for SKY mode."""
log.info("Processing SKY type cards")
sky_frame = params['sky_frame']
sky_card = params['cards'][sky_frame]
card1 = sky_card.copy()
card2 = sky_card.copy()
diff = sky_card.copy()
params['card1'] = card1
params['card2'] = card2
params['diff'] = diff
params['card_variance'] = params['variance'][sky_frame].copy()
params['stddev'] = np.sqrt(params['card_variance'])
def _process_shiny(params):
"""Make diff and stddev frames for SHINY mode."""
log.info("Processing SHINY type cards")
shiny_frame = params['shiny_frame']
shiny_card = params['cards'][shiny_frame]
sky_card = params['cards'][params['sky_frame']]
card1 = shiny_card.copy()
diff = shiny_card.copy()
card2 = np.zeros(shiny_card.shape, dtype=float)
if shiny_frame != params['sky_frame']:
nzi = shiny_card != 0
card2[nzi] = sky_card[nzi] / shiny_card[nzi]
params['card1'] = card1
params['card2'] = card2
params['diff'] = diff
params['card_variance'] = params['variance'][shiny_frame].copy()
params['stddev'] = np.sqrt(params['card_variance'])
def _process_blksky(params):
"""Make diff and stddev frames for BLKSKY mode."""
log.info("Processing BLKSKY type cards")
black_frame = params['black_frame']
shiny_frame = params['shiny_frame']
black_card = params['cards'][black_frame]
do_dark = params['dark'] is not None
if black_frame != shiny_frame:
card1 = black_card - params['cards'][shiny_frame]
else:
card1 = black_card.copy()
if do_dark:
log.info('Subtracting slit dark instead of sky.')
diff = black_card - params['dark']
else:
diff = black_card - params['cards'][params['sky_frame']]
card2 = np.zeros(black_card.shape, dtype=float)
nzi = black_card != 0
card2[nzi] = diff[nzi]
if not do_dark:
card2[nzi] /= black_card[nzi]
params['card1'] = card1
params['card2'] = card2
params['diff'] = diff
params['card_variance'] = params['variance'][black_frame].copy()
params['stddev'] = np.sqrt(params['card_variance'])
def _process_blkshiny(params):
"""Make diff and stddev frames for BLKSHINY mode."""
log.info("Processing BLKSHINY type cards")
black_frame = params['black_frame']
shiny_frame = params['shiny_frame']
black_card = params['cards'][black_frame]
shiny_card = params['cards'][shiny_frame]
sky_card = params['cards'][params['sky_frame']]
card1 = black_card - shiny_card
diff = card1.copy()
card2 = np.zeros(card1.shape, dtype=float)
nzi = black_card != 0
card2[nzi] = (black_card[nzi] - sky_card[nzi]) / black_card[nzi]
params['card1'] = card1
params['card2'] = card2
params['diff'] = diff
params['card_variance'] = params['variance'][black_frame].copy()
params['stddev'] = np.sqrt(params['card_variance'])
def _calculate_responsive_quantum_efficiency(params):
"""Calculate RQE for the black frame."""
dwno = params['waveno0'] * params['slitval']
dwno /= 2 * params['focal_length'] * params['r_number']
pnut = blackbody_pnu(params['waveno0'], params['temp'])
a_omega = (params['pixel_width'] ** 2) * np.pi / (4 * 36)
black_mean = params['card_means'][params['black_frame']]
rqe = black_mean * params['eperadu']
rqe /= pnut * a_omega * dwno
log.info(f"Mean RQE over black frame: {rqe}")
params['rqe'] = rqe
def _undistort_flat(params, edge_method='deriv', custom_wavemap=None,
fix_tort=False, start_pixel=None, end_pixel=None,
top_pixel=None, bottom_pixel=None):
"""Undistort the black frame and tune distortion parameters."""
header = params['header'].copy()
# clean card 0 before using
card0 = params['cards'][0].copy()
card0, _ = clean(card0, params['header'], params['stddev'].copy(),
mask=params['mask'], radius=params['radius'])
tortdata, tortillum = derive_tort(
card0, header, maxiter=5, fixed=fix_tort,
edge_method=edge_method, custom_wavemap=custom_wavemap,
top_pixel=top_pixel, bottom_pixel=bottom_pixel,
start_pixel=start_pixel, end_pixel=end_pixel)
# tortdata is not used later, but tortillum is
params['tortdata'] = tortdata
params['tortillum'] = tortillum
params['header'] = header
def _create_flat(params, robust=4.0):
"""Create the flat from the diff frame."""
# Clean the diff frame (normally black-sky or black-dark)
diff = params['diff']
header = params['header'].copy()
ny, nx = shape = params['ny'], params['nx']
illum = params['tortillum'].copy()
diff, stddev = clean(diff, header, params['stddev'],
mask=params['mask'], radius=params['radius'])
params['diff'] = diff
params['stddev'] = stddev
# Check the diff frame for overall negative value
mean_diff = meancomb(diff.ravel(), robust=robust, returned=False)
if mean_diff <= 0:
log.warning("Mean flat diff <= 0; Setting flat = 1")
params['flat'] = np.ones(shape, dtype=np.float64)
params['flat_variance'] = np.zeros(shape, dtype=np.float64)
params['illum'] = np.ones(shape, dtype=int)
return
# Set flat = 0 if diff is small to prevent huge flat values.
thrfac = float(header['THRFAC'])
if thrfac < 0.2:
dmin = 0.05 * mean_diff
elif thrfac > 1:
dmin = 0.25 * mean_diff
else:
dmin = 0.25 * thrfac * mean_diff
# Find pixels above threshold
idx = diff > dmin
if not idx.any():
log.warning("No pixels found above threshold; Setting flat = 1")
params['flat'] = np.ones(shape, dtype=np.float64)
params['flat_variance'] = np.zeros(shape, dtype=np.float64)
params['illum'] = np.ones(shape, dtype=int)
return
flat = np.zeros(shape, dtype=np.float64)
flat_variance = np.zeros(shape, dtype=np.float64)
cal_factor = bb_cal_factor(params['waveno0'], params['temp'],
params['flat_tamb'], params['flat_emis'])
flat[idx] = cal_factor / diff[idx]
flat_variance[idx] = ((params['stddev'][idx] ** 2)
* (cal_factor ** 2)
/ (diff[idx] ** 4))
header['BNU_T'] = cal_factor
header['BUNIT'] = 'erg s-1 cm-2 sr-1 (cm-1)-1 ct-1'
ux, uy = tortcoord(header, skew=True)
u1 = ux.astype(int)
v1 = uy.astype(int)
u2 = u1 + 1
v2 = v1 + 1
# Check for pixels out of bounds and mark with -1
cond1 = (u1 >= 0) & (u2 < nx) & (v1 >= 0) & (v2 < ny)
if not cond1.all():
idx = np.where(~cond1)
illum[idx] = -1
# Check the four nearest pixels for bad values and mark with 0
u1 = np.clip(u1, 0, nx - 1)
u2 = np.clip(u2, 0, nx - 1)
v1 = np.clip(v1, 0, ny - 1)
v2 = np.clip(v2, 0, ny - 1)
cond2 = diff[v1, u1] <= dmin
cond2 |= diff[v2, u1] <= dmin
cond2 |= diff[v1, u2] <= dmin
cond2 |= diff[v2, u2] <= dmin
idx = cond1 & cond2
if idx.any():
illum[np.where(idx)] = 0
# Update params
params['header'] = header
params['illum'] = illum
params['flat'] = flat
params['flat_variance'] = flat_variance