# Licensed under a 3-clause BSD style license - see LICENSE.rst
import warnings
from astropy import log
import bottleneck as bn
import numpy as np
from sofia_redux.instruments.exes.get_badpix import get_badpix
from sofia_redux.toolkit.utilities.fits import set_log_level
__all__ = ['clean']
[docs]
def clean(data, header, std, mask=None, radius=10, threshold=20.0,
propagate_nan=False):
"""
Correct bad pixels.
Bad pixels to correct may be indicated in an input mask, with good
values indicated with 1 or True and bad values indicated with 0 or False.
Bad pixels may also be identified from a reference bad pixel mask on disk.
The filename should be passed in as header['BPM']. In this FITS image,
pixels that are known to be bad are marked with a value of 0; good pixels
are marked with a value of 1.
Alternatively, bad pixels may be identified from their noise
characteristics: if the standard deviation associated with a pixel is
greater than a threshold value times the mean standard deviation
for the frame, then it is marked as a bad pixel. Bad pixels are
corrected by using neighboring good values to linearly interpolate over
the bad ones. The search for good pixels checks first in the
y-direction, then in the x-direction. If good pixels cannot be
identified within a 10-pixel radius, then the bad pixel will not be
corrected. If there is a different uncertainty frame for each input
data frame, this algorithm should be run in a loop on each frame
individually.
Parameters
----------
data : numpy.ndarray
Data cube of shape (nframe, nspec, nspat) or image (nspec, nspat).
header : fits.Header
Header associated with the input data. Will be updated in-place.
std : numpy.ndarray
2D or 3D uncertainty array (i.e. sqrt(variance)) of shape
(nspec, nspat) or (nframe, nspec, nspat). If a 2D array is
passed in, it will be applied to all frames.
mask : numpy.ndarray of int or bool, optional
Bad pixel array of shape (nspec, nspat) or (nframe, nspec, nspat)
indicating pixels to correct (good=1, bad=0). If not None, will be
updated with additional bad pixels found. It is permitted for a
2D mask to be applied over all frames.
radius : int or array-like of int, optional
The maximum distance over which to perform interpolation. If an
array is supplied, this is the maximum distance for each dimension
in numpy (row, col) order (y, x).
threshold : float, optional
Threshold for bad pixel identification, as a factor to multiply
by the standard deviation.
propagate_nan : bool, optional
If True, bad pixels in the data will be replaced by NaN rather
than interpolated over.
Returns
-------
data, std : numpy.ndarray, numpy.ndarray
The cleaned data and error.
"""
data, std, mask = _check_inputs(data, std, mask=mask)
mask = _apply_badpix_mask(mask, header)
if data.ndim == 3:
cleaned = np.empty_like(data)
cleaned_error = np.empty_like(data)
separate_mask = mask.ndim == 3
separate_std = std.ndim == 3
for frame in range(data.shape[0]):
m = mask[frame] if separate_mask else mask
e = std[frame] if separate_std else std
e, m = _check_noise(e, m, header, threshold)
cleaned[frame], cleaned_error[frame] = _clean_frame(
data[frame], e, m, radius=radius, propagate_nan=propagate_nan)
if not separate_std:
cleaned_error = cleaned_error[0]
result = cleaned, cleaned_error
else:
std, mask = _check_noise(std, mask, header, threshold)
result = _clean_frame(data, std, mask, radius=radius,
propagate_nan=propagate_nan)
return result
def _apply_badpix_mask(mask, header):
"""
Update mask with bad pixels from a reference file.
Parameters
----------
mask : numpy.ndarray of bool
Bad pixel mask to be updated (True = good, False = bad).
header : fits.Header
Header containing bad pixel mask filename, in the BPM keyword.
Returns
-------
mask : numpy.ndarray of bool
The updated mask.
"""
with set_log_level('WARNING'):
bpm = get_badpix(header, clip_reference=True, apply_detsec=True)
if bpm is None:
return mask
# update mask in place to track bad pixels
if mask.ndim == 3:
mask &= bpm[None]
else:
mask &= bpm
return mask
def _check_noise(std, mask, header, stdfac):
"""
Look for noisy or too quiet pixels.
Parameters
----------
std : numpy.ndarray
Error values to check.
mask : numpy.ndarray
Bad pixel mask values. Good pixel=True, bad pixel=False
header : fits.Header
FITS header associated with the data.
stdfac : float
Noisy pixel threshold, as a factor times the error.
Returns
-------
std, mask : numpy.ndarray, numpy.ndarray
The std and mask are updated in-place. Pixels with high error
values are added to the mask as a False value. Pixels with low
values are set to a new minimum.
"""
if header.get('SCAN') in ['MAP', 'FILEMAP']:
return std, mask
if stdfac <= 0:
return std, mask
stdmean = bn.nanmean(std[mask])
if stdmean == 0 or np.isnan(stdmean):
return std, mask
stdmax = stdmean * stdfac
stdmin = stdmean / 16
# todo: revisit statistics
# There may be better options for finding noisy pixels.
# Mark pixels as bad if std > stdmax
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
high_std = std > stdmax
nhigh = np.sum(high_std)
if nhigh != 0:
log.info(f"Found {nhigh} noisy pixels.")
mask[high_std] = False
# Set pixels with std < stdmin to stdmin
np.clip(std, stdmin, None, out=std)
return std, mask
def _clean_frame(data, std, mask, radius=10, propagate_nan=False):
"""
Clean a single frame.
Bad pixels are linearly interpolated over, using neighboring
data. Neighbors in the y-direction are preferentially used.
Parameters
----------
data : numpy.ndarray
(M, N) of float
std : numpy.ndarray
(M, N) of float
mask : numpy.ndarray
(M, N) of bool
radius : int or array_like of int, optional
The maximum distance over which to perform interpolation. If an
array is supplied, this is the maximum distance for each dimension
in numpy (row, col) order (y, x).
propagate_nan : bool, optional
If True, bad pixels in the data will be replaced by NaN rather
than interpolated over.
Returns
-------
cleaned_data, cleaned_std : numpy.ndarray, numpy.ndarray
The cleaned data and updated error both of shape (M, N)
"""
# mark any NaNs in the data if not already marked
mask &= ~np.isnan(data)
if mask.all():
return data, std
if propagate_nan:
data[~mask] = np.nan
std[~mask] = np.nan
return data, std
# todo: There may be better ways to do this interpolation
ny, nx = data.shape
nxny = nx * ny
result = data.copy().ravel()
error = std.copy().ravel()
missing = ~mask
y, x = np.mgrid[:ny, :nx]
w = np.asarray(radius, dtype=int)
if w.size == 1:
w = np.full(2, int(w))
missing_inds = (y * nx + x)[missing]
offsety_inds = (np.arange(w[0] - 1) + 1)
offsetx_inds = (np.arange(w[1] - 1) + 1)
flat_mask = mask.ravel()
x_missing = x[missing]
y_missing = y[missing]
y0inds = (y_missing * nx)[..., None]
x0inds = x_missing[..., None]
# search left of missing points
search = np.subtract.outer(x_missing, offsetx_inds)
left = search >= 0
inds = search + y0inds
inds[~left] = 0
left &= flat_mask[inds]
search[~left] = -1
left = np.max(search, axis=1)
# search right of missing points
search = np.add.outer(x_missing, offsetx_inds)
right = search < nx
inds = search + y0inds
inds[~right] = 0
right &= flat_mask[inds]
search[~right] = nxny
right = np.min(search, axis=1)
# search below missing points
search = np.subtract.outer(y_missing, offsety_inds)
bottom = search >= 0
inds = search * nx + x0inds
inds[~bottom] = 0
bottom &= flat_mask[inds]
search[~bottom] = -1
bottom = np.max(search, axis=1)
# search above missing points
search = np.add.outer(y_missing, offsety_inds)
top = search < ny
inds = search * nx + x0inds
inds[~top] = 0
top &= flat_mask[inds]
search[~top] = nxny
top = np.min(search, axis=1)
# Check if interpolation is possible and get necessary deltas
x_ok = (left >= 0) & (right < nx)
y_ok = (bottom >= 0) & (top < ny)
dx0 = x_missing - left
dy0 = y_missing - bottom
xdist = dx0 + right - x_missing
ydist = dy0 + top - y_missing
# Interpolate across 1 pixel in x if nothing else is closer
select = (x_ok & xdist == 2) & (~y_ok | (ydist > xdist))
y0 = y_missing[select]
d0 = data[y0, left[select]]
d1 = data[y0, right[select]]
e0 = std[y0, left[select]]
e1 = std[y0, right[select]]
replace = missing_inds[select]
result[replace] = d0 + (d1 - d0) * dx0[select] / xdist[select]
error[replace] = np.sqrt((e0 ** 2) + (e1 ** 2))
# Otherwise use y-interpolation
found = select.copy()
select = ~select & y_ok
x0 = x_missing[select]
d0 = data[bottom[select], x0]
d1 = data[top[select], x0]
e0 = std[bottom[select], x0]
e1 = std[top[select], x0]
replace = missing_inds[select]
result[replace] = d0 + (d1 - d0) * dy0[select] / ydist[select]
error[replace] = np.sqrt((e0 ** 2) + (e1 ** 2))
# Finally use x-interpolation if not possible any other way
found |= select
select = ~found & x_ok
y0 = y_missing[select]
d0 = data[y0, left[select]]
d1 = data[y0, right[select]]
e0 = std[y0, left[select]]
e1 = std[y0, right[select]]
replace = missing_inds[select]
result[replace] = d0 + (d1 - d0) * dx0[select] / xdist[select]
error[replace] = np.sqrt((e0 ** 2) + (e1 ** 2))
found |= select
if not found.all():
log.warning("%i pixels could not be cleaned" % np.sum(~found))
log.debug(f'Cleaned {(~mask).sum() - (~found).sum()} bad pixels.')
return result.reshape((ny, nx)), error.reshape((ny, nx))
def _check_inputs(data, std, mask=None):
"""
Check input parameters and standardize them to the required format.
Parameters
----------
data : numpy.ndarray
Data cube of shape (nframe, nspec, nspat) or image (nspec, nspat).
std : numpy.ndarray
2D or 3D uncertainty array (i.e. sqrt(variance)) of shape
(nspec, nspat) or (nframe, nspec, nspat).
mask : numpy.ndarray of int or bool, optional
Bad pixel array of shape (nspec, nspat) or (nframe, nspec, nspat)
indicating pixels to correct (good=1, bad=0).
Returns
-------
data, std, mask : 3-tuple of numpy.ndarray
Standardized `data`, `std`, and `mask` arrays.
"""
if mask is not None:
log.debug(f'Mask: {mask.shape}, {mask.ndim}')
data = np.asarray(data, dtype=float)
if data.ndim not in [2, 3]:
raise ValueError("Data must be a 2 or 3 dimensional array")
shape = data.shape
std = np.asarray(std, dtype=float)
if std.ndim not in [2, 3]:
raise ValueError("Std must be a 2 or 3 dimensional array")
elif std.shape[-2:] != shape[-2:]:
raise ValueError("Std and data shape mismatch")
if mask is None:
mask = np.ones(shape, dtype=bool)
else:
mask = np.asarray(mask, dtype=bool)
if mask.ndim not in [2, 3]:
raise ValueError("Mask must be a 2 or 3 dimensional array")
elif mask.shape[-2:] != shape[-2:]:
raise ValueError("Mask and data shape mismatch")
return data, std, mask