Source code for sofia_redux.instruments.exes.tort

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from astropy import log
import numba as nb
import numpy as np

from sofia_redux.instruments.exes.tortcoord import tortcoord
from sofia_redux.toolkit.image.utilities import map_coordinates

__all__ = ['tort']


[docs] def tort(data, header, variance=None, skew=False, order=3, missing=np.nan, badfrac=0.5, get_illum=False, interpolation_method='spline'): """ Correct image for optical distortion. Calls `exes.tortcoord` to calculate undistorted coordinates from raw pixel coordinates and distortion parameters in the header. The image is then interpolated onto the undistorted coordinates. If the variance is provided, it is also interpolated onto undistorted coordinates. Parameters ---------- data : numpy.ndarray [nframe, nspec, nspat] cube or [nspec, nspat] image. header : fits.Header FITS header with distortion parameters set. variance : numpy.ndarray, optional Variance array matching `data` dimensions. skew : bool, optional If True, correct for echelon slit skewing. order : int, optional Order of interpolation (0-5). 0 is nearest value, 1 is linear interpolation, 3 is cubic interpolation. missing : float, optional Value to use in output frames for missing data. badfrac : float, optional The allowable fraction of NaN-valued points used to generate an interpolated value. If this fraction is greater than `badfrac`, it will be replaced by `missing`. Note that all NaNs are replaced by zero before interpolation. get_illum : bool, optional If set, an undistorted illumination mask will be returned. interpolation_method : {'spline', 'convolution'}, optional If 'convolution', a separable parametric cubic convolution algorithm is applied, replicating the original IDL interpolation method for the EXES pipeline. If 'spline', the interpolation is performed via piecewise splines of the specified order. Returns ------- tortdata, [tortvar], [tortillum] : numpy.ndarray or tuple of numpy.ndarray The corrected data and optional variance and illumination arrays (if supplied and/or requested). Will be of shape [nframe, nspec, nspat] if nframe > 1; [nspec, nspat] otherwise. """ # get original data dimensions ndim = data.ndim # make sure data has 3 dimensions and matches variance # also, make a flat illumination array data_ok, data, variance, illum = _check_data( data, header, variance=variance) if not data_ok: raise ValueError('Data cannot be corrected.') # convert raw coordinates to undistorted coordinates # (including echelon slit skewing if desired) u, v = tortcoord(header, skew=skew) tortdata = np.empty(data.shape, dtype=np.float64) dovar = variance is not None tortvar = np.empty_like(tortdata) if dovar else None tortillum = np.empty_like(tortdata) if get_illum else None if interpolation_method == 'spline': map_function = map_coordinates else: map_function = _cubic_convolution for i in range(data.shape[0]): tortdata[i] = map_function(data[i], np.array([v, u]), order=order, cval=missing, threshold=badfrac) if dovar: # same as data tortvar[i] = map_function(variance[i], np.array([v, u]), order=order, cval=missing, threshold=badfrac) if get_illum: # missing value should be -1, but it may need NaN handling # to match data if np.isnan(missing): cval = np.nan else: cval = -1 tortillum[i] = map_function(illum[i].astype(float), np.array([v, u]), order=order, cval=cval, threshold=badfrac, clip=False) tortillum[i][np.isnan(tortillum[i])] = -1 if ndim == 2: tortdata = tortdata[0] if dovar: tortvar = tortvar[0] if get_illum: tortillum = tortillum[0] if not dovar and not get_illum: return tortdata result = tortdata, if dovar: result = result + (tortvar,) if get_illum: result = result + (np.round(tortillum).astype(int),) return result
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def _cubic_convolution(data, coords, alpha=-0.5, cval=np.nan, threshold=0.5, order=3, clip=True): # pragma: no cover """ Separable 2D parametric cubic convolution interpolation. Directly replicates behavior of the IDL interpolate function, used in the original version of this pipeline. Following a formulation in: Reichenbach, Stephen E. and Geng, Frank, "Two-Dimensional Cubic Convolution" (2003). CSE Journal Articles. 15. data : numpy.ndarray The data array to map with n_dimensions dimensions with a size of N. coordinates : numpy.ndarray The coordinates at which `data` is evaluated. Must be of shape (n_dimensions, N). Dimensions are ordered using the Numpy (y, x) convention. order : int, optional Included for compatibility with map_coordinates inputs; not used in this function. cval : float, optional The value to set for missing data. clip : bool, optional Whether to clip the output to the range of values of the input image. This is enabled by default, since higher order interpolation may produce values outside the given input range. threshold : float, optional Points with total weight >= `threshold` are considered valid; others will be set to cval in the output. Returns ------- mapped_coordinates : numpy.ndarray The result of transforming the data. The shape of the output is derived from that of `coordinates` by dropping the first axis. """ ny, nx = data.shape result = np.full((ny, nx), cval, dtype=nb.float64) minval = np.nanmin(data) maxval = np.nanmax(data) v, u = coords.astype(nb.float64) knots = [-3, -2, -1, 0, 1, 2, 3] for i in range(ny): for j in range(nx): # check for extrapolated point if ((v[i, j] <= 0 or v[i, j] >= ny - 1) or (u[i, j] <= 0 or u[i, j] >= nx - 1)): continue cv = int(round(v[i, j])) cu = int(round(u[i, j])) r_s = 0 sum_weight = 0 for yk in knots: # check/pad boundaries yi = cv + yk if yi < -1 or yi >= ny + 1: continue if yi < 0: yi = 0 elif yi >= ny: yi = ny - 1 # kernel function in y y = abs(v[i, j] - (cv + yk)) if y <= 1: f0y = 2 * y ** 3 - 3 * y ** 2 + 1 f1y = y ** 3 - y ** 2 elif y <= 2: f0y = 0 f1y = y**3 - 5 * y**2 + 8 * y - 4 else: f0y = 0 f1y = 0 for xk in knots: # check/pad boundaries xi = cu + xk if xi < -1 or xi >= nx + 1: continue if xi < 0: xi = 0 elif xi >= nx: xi = nx - 1 # data at knot in input image p_m = data[yi, xi] if np.isnan(p_m): continue # kernel function in x x = abs(u[i, j] - (cu + xk)) if x <= 1: f0x = 2 * x**3 - 3 * x**2 + 1 f1x = x**3 - x**2 elif x <= 2: f0x = 0 f1x = x**3 - 5 * x**2 + 8 * x - 4 else: f0x = 0 f1x = 0 f_s = (f0x + alpha * f1x) * (f0y + alpha * f1y) r_s += p_m * f_s sum_weight += f_s if sum_weight > threshold: value = r_s * 1 / sum_weight if clip: if value < minval: value = minval elif value > maxval: value = maxval result[i, j] = value return result def _check_data(data, header, variance=None): """Check input data dimensions.""" hnx = header['NSPAT'] hny = header['NSPEC'] data = np.asarray(data, dtype=float) if data.ndim == 2: data = data[None] nframes, ny, nx = shape = data.shape illum = np.ones(shape, dtype=int) if (ny, nx) != (hny, hnx): log.error("Data (y, x) dimensions do not match header: " "%s != %s" % (repr((ny, nx)), repr((hny, hnx)))) return False, data, variance, illum if variance is not None: if variance.ndim == 2: variance = variance[None] variance = np.asarray(variance, dtype=float) if variance.shape != shape: log.error("Variance and data shape mismatch: " "%s != %s" % (repr(variance.shape), repr(shape))) return False, data, variance, illum return True, data, variance, illum