Source code for sofia_redux.toolkit.convolve.kernel

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import numpy as np
from scipy import ndimage

from sofia_redux.toolkit.convolve.base import ConvolveBase
from sofia_redux.toolkit.convolve.filter import savgol, savgol_windows

__all__ = ['apply_ndkernel', 'convolve', 'savitzky_golay', 'KernelConvolve',
           'BoxConvolve', 'SavgolConvolve']


[docs] def apply_ndkernel(data, kernel, axes=None, normalize=True, is_error=False, **kwargs): """ Apply a kernel over multiple features Parameters ---------- data : array_like (shape) kernel : array_like Must have 1 dimension or the same number of features as "data" axes : array_like of int (ndim) normalize : bool, optional If True, normalize the kernel is_error : bool, optional If True, assumes input data are error values and propagates accordingly kwargs : dict, optional Optional keywords to pass into scipy.ndimage.convolve1d or scipy.ndimage.convolve. Returns ------- convolved : numpy.ndarray (shape) "data" convolved with "kernel". """ data = np.asarray(data, dtype=float) kernel = np.asarray(kernel, dtype=float) by_axis = kernel.ndim == 1 or data.ndim == 1 if not by_axis and (data.ndim != kernel.ndim): raise ValueError("kernel must have 1 dimension or the same " "number of features as the input sample") if by_axis and axes is None: axes = np.arange(data.ndim) result = data ** 2 if is_error else data if normalize: ksum = np.abs(kernel).sum() k = kernel / ksum if ksum != 0 else kernel else: k = kernel if is_error: k = k ** 2 if not by_axis: result = ndimage.convolve(data, k, **kwargs) return np.sqrt(result) if is_error else result for axis in axes: result = ndimage.convolve1d(result, k, axis=axis, **kwargs) return np.sqrt(result) if is_error else result
[docs] class KernelConvolve(ConvolveBase): """Generic convolution with a kernel""" def __init__(self, *args, error=1, mask=None, stats=True, do_error=True, robust=0, eps=0.01, maxiter=100, normalize=True, ignorenans=True, **kwargs): self._kernel = np.asarray(args[-1]).astype(float) self._normalize = normalize super().__init__(*args, error=error, mask=mask, do_error=do_error, stats=stats, robust=robust, eps=eps, maxiter=maxiter, ignorenans=ignorenans, **kwargs) def _convolve(self, cleaned): self._result = apply_ndkernel( cleaned, self._kernel, axes=self._axes, normalize=self._normalize, **self._fit_kwargs).ravel() if self.do_error: self._interpolated_error = apply_ndkernel( self.error, self._kernel, axes=self._axes, is_error=True, normalize=self._normalize, **self._fit_kwargs).ravel()
[docs] class BoxConvolve(KernelConvolve): """Convolution with a box kernel (mean)""" def __init__(self, *args, error=1, mask=None, stats=True, do_error=True, robust=0, eps=0.01, maxiter=100, normalize=True, ignorenans=True, **kwargs): args = args[:-1] + (np.ones(args[-1]),) super().__init__(*args, error=error, mask=mask, do_error=do_error, stats=stats, robust=robust, eps=eps, maxiter=maxiter, normalize=normalize, ignorenans=ignorenans, **kwargs)
[docs] class SavgolConvolve(ConvolveBase): """Convolve using Savitzky-Golay filter""" def __init__(self, *args, error=1, mask=None, stats=True, do_error=True, robust=0, eps=0.01, maxiter=100, scale=False, order=2, ignorenans=True, **kwargs): self._scaled = scale self._order = order self._window = None super().__init__(*args, error=error, mask=mask, do_error=do_error, stats=stats, robust=robust, eps=eps, maxiter=maxiter, ignorenans=ignorenans, **kwargs) def _parse_model_args(self): z = self.reshape(self._samples[-1], copy=False) self._order, self._window = savgol_windows( self._order, self._model_args, *(tuple(self._samples[:-1]) + (z,)), scale=self._scaled) def _convolve(self, cleaned): self._result = savgol( cleaned, self._window, order=self._order, axes=self._axes, check=False, **self._fit_kwargs).ravel() if self.do_error: self._interpolated_error = savgol( self.error, self._window, order=self._order, is_error=True, axes=self._axes, check=False, **self._fit_kwargs).ravel()
[docs] def convolve(*args, error=None, mask=None, stats=False, do_error=None, robust=0, eps=0.01, maxiter=100, normalize=True, ignorenans=True, **kwargs): """ Convolve an N-dimensional array with a user defined kernel or fixed box. A wrapper function for the `KernelConvolve`. Parameters ---------- args : [samples], data, kernel "data" must beĀ an n-dimensional array of size (shape,). Optionally, coordinate values for each data element may be provided for each dimension ("samples"). If not provided, they will be determined to be regularly spaced at unit intervals. "samples" must be provided for each dimension (x, y, z, ... order), where each array matches the data (shape,). Finally, a kernel of the same dimensions as "data" must be provided following the "data" argument. If "kernel" is an integer, a box kernel will be applied with a width of "kernel" over each dimension. error : float or int or array, optional A scalar value containing the error for each data point or an array of values may be provided and propagated through the convolution and interpolation. If provided, and "do_error" is `None` or True, the returned result will be a tuple of the form (convolved_result, error). mask : array of bool, optional If provided must match data (shape,). `False` values indicate data locations that should be excluded from convolution calculations. `False` values will be replaced by linear interpolated values. stats : bool, optional If `True`, return statistics on the convolution/interpolation in the returned result. do_error : bool, optional If `True`, and `error` is not `None`, return the propagated error in addition to the result. robust : int or float, optional If non-zero, will perform iterations of the convolution, masking out data points if the abs(residual/error) is greater than "robust". Iteration will continue until the delta_sigma/sigma ratio is less than "eps". eps : float, optional The precision limit for use in the "robust" iteration process. maxiter : int, optional The maximum number of iterations to attempt in "robust" iteration. normalize : bool, optional If True, the kernel will be normalized such that sum(kernel) = 1 prior to convolution. ignorenans : bool, optional If True, NaNs will be masked out during convolution and interpolation. Otherwise, they will propagate through all calculations. kwargs : dict, optional Optional keyword arguments to pass into BoxConvolve or KernelConvolve parent class. Returns ------- convolved_result, [error], [statistics] : array or tuple of array The convolved result, and optional propagated error (if "error" is provided and "do_error" is not "False") and statistics (if "stats" is `True`. The output shape for the result and error will be the same as the input data. """ if hasattr(args[-1], '__len__'): convolver = KernelConvolve else: convolver = BoxConvolve if error is None: do_error = False elif do_error is None: do_error = error is not None c = convolver(*args, error=error, mask=mask, stats=stats, do_error=do_error, robust=robust, eps=eps, maxiter=maxiter, normalize=normalize, ignorenans=ignorenans, **kwargs) result = c.result if not do_error and not stats: return result result = result, if do_error: result = result + (c.error,) if stats: result = result + (c.stats,) return result
[docs] def savitzky_golay(*args, order=2, error=1, mask=None, do_error=False, robust=0, eps=0.01, maxiter=100, model=False, scale=False, ignorenans=True, **kwargs): """ Apply a least-squares (Savitzky-Golay) polynomial filter Convenience wrapper for toolkit.convolve.kernel.SavgolConvolve Any NaN or False "mask" values will be interpolated over before filtering. Error will be propagated correctly and will account for interpolation over masked/NaN values or outliers removed during "robust" iteration. Arguments are supplied in the following order: [x0, x1, ... xn], values, window, [optional parameters] where x are dimension coordinates supplied in the same order as the axes of values. Therefore, if you supplied arguments in the order x, y, z then you should expect the zeroth axis of z to relate to the x-coordinates and the first axis of z to relate to the y-coordinates. This is contrary to standard numpy (row (y), col (x)) ordering. For consistency with numpy, one would supply the arguments in the y, x, z order. The x0, x1,... independent values do not need to be supplied, and instead, the features of values will be used to generate a regular grid of coordinates. Be aware however that the independent variables are used to rescale window widths if "scale" is set to True, and also used for linear interpolation over masked, NaN, or identified outliers (if "robust"). Parameters ---------- args : (ndim + 2)-tuple or 2-tuple - args[-1] contains the filtering window width. This should be a positive integer greater than order and less than the size of the image. - args[-2] contains the dependent variables as an array of shape (shape). If independent variables were supplied, (shape) must match across args[:ndim + 1]. Otherwise, the dimension and shape is determined by args[-2]. - args[:ndim] contain the independent variables. These are used for interpolation purposes or "scale" and should be of size (shape). However, they do not need to be specified if interpolation is being carried out on a regular grid. In this case, the user should just supply the dependent variables as an array of the correct dimension and shape. order : int, optional Order of polynomial to use in designing the filter. Higher orders produce sharper features. error : float or array_like of float (shape), optional Dependent variable errors to propagate mask : array_like or bool (shape), optional Mask indicating good (True) and bad (False) samples. Bad values will not be included in the fit and will be replaced with linear interpolation prior to filtering. do_error : bool, optional If True, return the propagated error in addition to the filtered data. robust : int or float, optional If > 0, taken as the sigma threshold above which to identify outliers. Outliers are those identified as abs(x_i - x_med) / MAD > "robust" where x is the residual of (data - fit) x_med is the median, MAD is the median absolute deviation defined as 1.482 * median(abs(x_i - x_med)). The fit will be iterated upon until the set of identified outliers does not change, or any change in the relative RMS is less than "eps", or "maxiter" iterations have occured. eps : float, optional The limit of fractional check in RMS if "robust" is > 0 maxiter : int, optional The maximum number iterations if "robust" is > 0 model : bool, optional If set, return the fitting model with lots of nice little methods and statistics to play with. scale : bool, optional If True, scale "window" to the average spacing between samples over each dimension. Note that this replaces "width" in the old IDL version. This option should not be used if working in multiple non-orthogonal features, as average spacing per dimension is taken as the average separation between ordered dimensional coordinates. ignorenans : bool, optional If True (default and highly recommended), NaNs will be removed and interpolated over. kwargs : dict, optional. Optional keywords and values that would be passed into scipy.signal.savgol_filter. Returns ------- filtered_data : numpy.ndarray of numpy.float64 (shape) or Model The filtered data or an instance of toolkit.base.Model. Notes ----- Edge conditions are handled differently according to whether the data is one dimensional or not. If it is, then extrapolation is permitted. However, extrapolation is not permitted in features of > 2. In these cases, everything should work as expected unless the corner samples are NaN. Interpolation is not possible and NaNs will be applied to the filtered array according to width/2 of the window centered on the corner NaN. Also note, the IDL version "robustsg" was only 1-dimensional and savitzky_golay will perform exactly the same in this case. Examples -------- Generate coefficients for cubic-polynomial with a window of 5 >>> import numpy as np >>> x = np.arange(5) >>> y = [0, 0, 1, 0, 0] >>> filtered = savitzky_golay(x, y, 5, order=2) >>> assert np.allclose(filtered, [-3/35, 12/35, 17/35, 12/35, -3/35]) 2-dimensional example >>> (y, x), z = np.mgrid[:11, :11], np.zeros((11, 11)) >>> z[5, 5] = 1 >>> x = x / 2 Coordinates are inferred if not supplied >>> filtered1 = savitzky_golay(x, y, z, 5, order=2) >>> filtered2 = savitzky_golay(z, 5, order=2) >>> assert np.allclose(filtered1, filtered2) >>> filtered3 = savitzky_golay(x, y, z, 5, order=2, scale=True) >>> print(np.sum(filtered1 != 0)) # window = (5, 5) x, y pixels 25 >>> print(np.sum(filtered3 != 0)) # window = (11, 5) x, y, pixels 55 Note that in the last case, scale scaled the x window width to 11 pixels since the average spacing in x is 0.5. i.e. a window width of 1 is equal to 2 x-pixels. Therefore, the window is expanded to 2 * 5 = 10, and then an extra pixel is added on to ensure that the filter is centered (odd number of pixels). """ sg = SavgolConvolve(*args, order=order, error=error, mask=mask, do_error=do_error, robust=robust, eps=eps, maxiter=maxiter, stats=model, scale=scale, ignorenans=ignorenans, **kwargs) if model: return sg return (sg.result, sg.error) if do_error else sg.result