# Licensed under a 3-clause BSD style license - see LICENSE.rst
import math
import numba as nb
import numpy as np
import sys
from sofia_redux.toolkit.splines import spline_utils
nb.config.THREADING_LAYER = 'threadsafe'
__all__ = ['smart_median_1d', 'smart_median_2d', 'smart_median',
'roundup_ratio', 'level', 'smooth_1d', 'gaussian_kernel',
'mean', 'box_smooth_along_zero_axis', 'log2round', 'log2ceil',
'pow2round', 'pow2floor', 'pow2ceil', 'regular_kernel_convolve',
'regular_coarse_kernel_convolve', 'smooth_values_at',
'smooth_value_at', 'point_aligned_smooth', 'point_smooth',
'sequential_array_add', 'index_of_max', 'robust_mean']
_bad_int_value = -sys.maxsize - 1
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def roundup_ratio(a, b): # pragma: no cover
"""
Returns int((a + b - 1) / b).
Parameters
----------
a : int or float
b : int or float
Returns
-------
ratio : int
The roundup ratio.
"""
return int((a + b - 1) / b)
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def level(values, start=0, end=-1, resolution=1.0): # pragma: no cover
"""
Subtract the average value from all values and return the average.
The average is calculated between the start and end indices of the supplied
values. Leveling is applied inplace on the values (between start and end
index), and the average value is returned to the caller.
Parameters
----------
values : numpy.ndarray (float)
The array to level
start : int, optional
The start index from which to start the leveling. The default is the
first index (0).
end : int, optional
The index at which to stop leveling. The default is the last index
(-1).
resolution : int or float, optional
The resolution of the indices. If not 1, the start and end index are
modified to start // resolution, and ceil(end / resolution).
Returns
-------
average : float
The average value subtracted from all values.
"""
if end < 0:
end = end + values.size + 1
start = int(start // resolution)
end = min(roundup_ratio(end, resolution), values.size)
v_sum = 0.0
n = 0
for i in range(start, end):
value = values[i]
if not np.isnan(value):
v_sum += value
n += 1
if n == 0:
return 0.0
ave = v_sum / n
for i in range(start, end):
values[i] -= ave
return ave
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def smooth_1d(values, kernel): # pragma: no cover
"""
Apply 1-D kernel convolution in place.
Smooths an array of values using kernel convolution of the form:
s_i = sum_{j=1}^{n}(k_j * x_{i - n//2 + j}) / sum(k)
I.e. the smoothed value for point i is the convolution of the kernel with
the values where the kernel is centered over point i, where the center of
the kernel is defined as n // 2 (integer) and n is the size of the kernel.
Therefore, smoothing with an odd sized kernel is advisable to avoid any
offset in the output convolution.
Any NaN values will be ignored, resulting in zero-contribution to the
smoothed values. As such, no NaN values will be present following the
smooth operation.
Parameters
----------
values : numpy.ndarray of float
The array to convolve. Will be updated in-place.
kernel : numpy.ndarray of float
The kernel by which to convolve.
Returns
-------
None
"""
ic = kernel.size // 2
smoothed = np.empty(values.size, dtype=nb.float64)
width = kernel.size
for i in range(values.size):
sum_wv = 0.0
sum_w = 0.0
for j in range(width): # kernel index
vi = i - ic + j # value index for the kernel index centered on i
if vi < 0 or vi >= values.size:
continue
v = values[vi]
w = kernel[j]
if np.isnan(v):
continue
sum_wv += v * w
sum_w += np.abs(w)
if sum_w > 0:
smoothed[i] = sum_wv / sum_w
else:
smoothed[i] = 0.0
for i in range(values.size):
values[i] = smoothed[i]
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def gaussian_kernel(n, sigma): # pragma: no cover
"""
Create a Gaussian convolution kernel.
The Gaussian will be centered on the `n // 2` element with a maximum value
of 1 when n is odd. The resultant kernel will be of size n, and be created
according to:
g_i = exp(-(i - n//2)^2 / 2sigma^2)
where i ranges from 0 to n - 1.
Parameters
----------
n : int
Size of the kernel.
sigma : float
Gaussian standard deviation.
Returns
-------
kernel : numpy.ndarray (float)
The Gaussian kernel
"""
kernel = np.empty(n, dtype=nb.float64)
ic = n // 2
a = -0.5 / (sigma * sigma)
for i in range(n):
x2 = i - ic
x2 *= x2
kernel[i] = math.exp(a * x2)
return kernel
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def mean(values, weights=None, start=0, end=-1): # pragma: no cover
"""
Return the (optionally weighted) mean value of an array.
NaN and zero weighted values are not included in the calculation. All
arrays must be one dimensional.
Parameters
----------
values : numpy.ndarray (float)
An array of values with shape (n,).
weights : numpy.ndarray (float), optional
An array of weights with shape (n,).
start : int, optional
The inclusive start index of the values included in the mean
calculation. The default is the first value (0).
end : int, optional
The non-inclusive end index for the values included in the mean
calculation. Negative values are wrapped to values.size + 1 + end.
The default is the last value (-1).
Returns
-------
mean, weight : float, float
The mean value and weight sum.
"""
if end < 0:
end = end + values.size + 1
sum_v = 0.0
sum_w = 0.0
weighted = weights is not None
for i in range(start, end):
v = values[i]
if np.isnan(v):
continue
if weighted:
w = weights[i]
if w == 0:
continue
else:
w = 1.0
sum_v += w * v
sum_w += w
if sum_w == 0:
return 0.0, 0.0
return sum_v / sum_w, sum_w
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def box_smooth_along_zero_axis(values, bin_size, valid=None, fill_value=np.nan
): # pragma: no cover
"""
Smooth a 2-D array of values along the zeroth axis.
Results will only be generated for those indices where the bin range is
fully inside the array range. All other values will be set to the
`fill_value` (NaN by default).
Parameters
----------
values : numpy.ndarray (float)
The values to smooth as an array of shape (dimensions, N). Smoothing
occurs along each dimension (axis=1).
bin_size : int
The smoothing bin size.
valid : numpy.ndarray (int)
A boolean masked array where `True` indicates a value that may be used.
fill_value : float, optional
The value with which to fill smoothed values that do not have enough
data to calculate.
Returns
-------
smoothed : numpy.ndarray (float)
The smoothed values of shape (dimensions, N).
"""
if bin_size <= 1:
return values.astype(nb.float64)
dimensions, size = values.shape
result = np.full(values.shape, fill_value, dtype=nb.float64)
sums = np.zeros(dimensions, dtype=nb.float64)
bin_left = bin_size // 2
bin_right = bin_size - bin_left
if (bin_size % 2) == 0:
bin_left -= 1
else:
bin_right -= 1
max_i = size - bin_right
if valid is None:
valid = np.full(size, True)
# Looping over values at start of the array (not included in result)
n_valid = 0
for i in range(bin_size - 1):
if valid[i]:
n_valid += 1
for dimension in range(dimensions):
sums[dimension] += values[dimension, i]
# Start populating results
for i in range(bin_left, max_i):
right_i = i + bin_right
if valid[right_i]:
n_valid += 1
for dimension in range(dimensions):
sums[dimension] += values[dimension, right_i]
if n_valid > 0:
for dimension in range(dimensions):
result[dimension, i] = sums[dimension] / n_valid
left_i = i - bin_left
if valid[left_i]:
n_valid -= 1
for dimension in range(dimensions):
sums[dimension] -= values[dimension, left_i]
return result
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def log2round(x): # pragma: no cover
"""
Return the rounded value of log_2(x).
Parameters
----------
x : int or float
Returns
-------
int
"""
return int(round_value(np.log2(x)))
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def log2ceil(x): # pragma: no cover
"""
Return the ceiled value of log_2(x).
Parameters
----------
x : int or float
Returns
-------
int
"""
return int(np.ceil(np.log2(x)))
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def pow2round(x): # pragma: no cover
"""
Return 2 to the power of round(log_2(x)).
Rounds a number to the nearest (log_2 scale) power of 2.
Parameters
----------
x : int or float
Returns
-------
int
"""
return 1 << log2round(x)
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def pow2floor(x): # pragma: no cover
"""
Return 2 to the power of floor(log_2(x)).
Finds where 2^n <= x < 2^(n+1) and returns 2^n.
Parameters
----------
x : int or float
Returns
-------
int
"""
return 2 ** int(np.log2(x))
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def pow2ceil(x): # pragma: no cover
"""
Return 2 to the power of ceil(log_2(x)).
Finds where 2^(n - 1) <= x < 2^n and returns 2^n.
Parameters
----------
x : int or float
Returns
-------
int
"""
value = pow2floor(x)
return value if value == x else value * 2
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def regular_kernel_convolve(data, kernel, kernel_reference_index=None,
weight=None, valid=None): # pragma: no cover
"""
Direct convolution of ND data with a given kernel.
Fast Numba implementation of direct kernel convolution for arbitrary
dimensions. The actual convolution is performed on a point-by-point basis
with :func:`point_aligned_smooth`, while the majority of this function
relates to process reducing N-dimensions to a single dimension. Note that
this is not suitable for 1-dimensional convolution.
Parameters
----------
data : numpy.ndarray (float)
The data to convolve.
kernel : numpy.ndarray (float)
The kernel to convolve the data with. Should have the same
dimensionality as `data`.
kernel_reference_index : numpy.ndarray (int), optional
The index marking the center of the kernel. If not provided, defaults
to ceil((kernel.shape - 1) / 2).
weight : numpy.ndarray (float), optional
An optional weighting array.
valid : numpy.ndarray (bool), optional
An optional validating array where `False` excludes a datum from
inclusion in processing. Effectively the same as setting weight[i] = 0
for index i.
Returns
-------
convolved, smoothed_weights : numpy.ndarray (float), numpy.ndarray (float)
"""
weighted = weight is not None
validated = valid is not None
if weighted:
flat_weight = weight.ravel()
else: # For numba compilation
flat_weight = np.empty(0, dtype=nb.float64)
if validated:
flat_valid = valid.ravel()
else: # For numba compilation
flat_valid = np.empty(0, dtype=nb.b1)
kernel_shape = np.atleast_1d(np.asarray(kernel.shape))
if kernel_reference_index is None:
reference_index = (kernel_shape - 1) / 2.0
offset = (reference_index % 1) != 0
reference_index = reference_index.astype(nb.int64) + offset
else:
reference_index = kernel_reference_index.astype(nb.int64)
result = np.empty(data.shape, dtype=nb.float64)
result_weight = np.empty(data.shape, dtype=nb.float64)
flat_result = result.ravel()
flat_result_weight = result_weight.ravel()
data_shape = np.asarray(data.shape)
kernel_indices, _, kernel_steps = spline_utils.flat_index_mapping(
kernel_shape)
data_indices, _, data_steps = spline_utils.flat_index_mapping(data_shape)
n_data = data.size
flat_kernel = kernel.ravel()
flat_data = data.ravel()
for i in range(n_data):
flat_result[i], flat_result_weight[i] = point_aligned_smooth(
flat_data=flat_data,
flat_kernel=flat_kernel,
flat_weight=flat_weight,
flat_valid=flat_valid,
data_index=data_indices[:, i],
kernel_indices=kernel_indices,
kernel_reference_index=reference_index,
data_shape=data_shape,
data_steps=data_steps,
validated=validated,
weighted=weighted)
return result, result_weight
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def regular_coarse_kernel_convolve(data, kernel, steps,
kernel_reference_index=None,
weight=None,
valid=None): # pragma: no cover
"""
Direct convolution of ND data with a given kernel onto a coarse grid.
This function is similar to :func:`regular_kernel_convolve` in that
it results in an output that is the convolution of regular data with
a regular grid, except that the output dimensions are altered. The size
of the output grid is::
shape_out = ceil(data.shape / steps)
and is generally intended to speed up processing time by reducing the
number of points at which a solution is required. Later, if necessary and
mathematically sound, the solution can be interpolated back onto a fine
grid. Only downsampling is supported using integer step values > 1.
Parameters
----------
data : numpy.ndarray (float)
The data to convolve.
kernel : numpy.ndarray (float)
The kernel to convolve the data with. Should have the same
dimensionality as `data`.
steps : numpy.ndarray (int)
The steps for the coarse convolution.
kernel_reference_index : numpy.ndarray (int), optional
The index marking the center of the kernel. If not provided, defaults
to ceil((kernel.shape - 1) / 2).
weight : numpy.ndarray (float), optional
An optional weighting array.
valid : numpy.ndarray (bool), optional
An optional validating array where `False` excludes a datum from
inclusion in processing. Effectively the same as setting weight[i] = 0
for index i.
Returns
-------
convolved, weights, shape : array (float), array (float), array (int)
The convolved result and weights on a coarse grid as 1-D arrays.
The arrays may be reshaped using `shape` which contains the shape of
the coarse grids (numba can't do arbitrary array shapes in N-D with
Numba).
"""
weighted = weight is not None
validated = valid is not None
data_shape = np.asarray(data.shape)
n_dimensions = kernel.ndim
ratio = np.empty(n_dimensions, dtype=nb.int64)
for dimension in range(n_dimensions):
ratio[dimension] = roundup_ratio(data_shape[dimension],
steps[dimension])
n_course = int(np.prod(ratio))
course_signal = np.empty(n_course, dtype=nb.float64)
course_weight = np.empty(n_course, dtype=nb.float64)
if kernel_reference_index is None:
reference_index = (np.asarray(kernel.shape) - 1) / 2.0
offset = (reference_index % 1) != 0
reference_index = reference_index.astype(nb.int64) + offset
else:
reference_index = kernel_reference_index.astype(nb.int64)
if weighted:
flat_weight = weight.ravel()
else:
flat_weight = np.empty(0, dtype=nb.float64)
if validated:
flat_valid = valid.ravel()
else:
flat_valid = np.empty(0, dtype=nb.b1)
kernel_indices, _, kernel_steps = spline_utils.flat_index_mapping(
np.asarray(kernel.shape))
data_indices, _, data_steps = spline_utils.flat_index_mapping(data_shape)
course_indices, _, course_steps = spline_utils.flat_index_mapping(ratio)
n_dimensions, n_kernel = kernel_indices.shape
flat_kernel = kernel.ravel()
flat_data = data.ravel()
flat_result = course_signal.ravel()
flat_result_weight = course_weight.ravel()
offset_indices = kernel_indices.copy()
for dimension in range(n_dimensions):
offset_indices[dimension] -= reference_index[dimension]
for i in range(n_course):
flat_data_index = 0
for dimension in range(n_dimensions):
index = course_indices[dimension, i] * steps[dimension]
flat_data_index += data_steps[dimension] * index
data_index = data_indices[:, flat_data_index]
w_sum = 0.0
wd_sum = 0.0
for j in range(n_kernel):
k = flat_kernel[j]
if k == 0:
continue
ref_data_index = 0
offset_index = offset_indices[:, j]
for dimension in range(n_dimensions):
oi = offset_index[dimension] + data_index[dimension]
if oi < 0:
break
elif oi >= data_shape[dimension]:
break
ref_data_index += oi * data_steps[dimension]
else:
if validated and not flat_valid[ref_data_index]:
continue
if weighted:
w = flat_weight[ref_data_index]
if w == 0:
continue
else:
w = 1.0
kw = k * w
wd_sum += kw * flat_data[ref_data_index]
w_sum += kw
if w_sum > 0:
wd_sum /= w_sum
else:
w_sum = 0.0
wd_sum = 0.0
flat_result_weight[i] = w_sum
flat_result[i] = wd_sum
return course_signal, course_weight, ratio
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def smooth_values_at(data, kernel, indices, kernel_reference_index, knots,
coefficients, degrees, panel_mapping, panel_steps,
knot_steps, nk1, spline_mapping, weight=None, valid=None
): # pragma: no cover
"""
Return the values of data, smoothed by a kernel at the given indices.
A "smooth value" is one in which the convolution of the data with a kernel
centered over a specific point is calculated and returned. This function
is essentially a wrapper around :func:`smooth_value_at` to process multiple
points in a single pass. This is a moderately low-level function, and
requires a spline representation of the kernel to have been previously
defined (see :class:`sofia_redux.toolkit.splines.spline.Spline` for further
details).
If the indices align perfectly with existing data indices (i.e., the
difference between the `indices` and `kernel_reference_index` can be
represented exactly as an integer), no spline representation is required as
direct convolution with the kernel is possible.
Notes
-----
For the purposes of debugging, please remember that any spline parameters
have their dimensionality expressed in (x, y, z, ...) order, but all other
parameters (such as `indices`) use native numpy ordering (..., z, y, x).
If the parameters were calculated (as expected) via the spline class and
other native `sofia_scan` functions, then no conversion is necessary.
However, if any of these parameters were manually created, please be aware
of this difference.
Parameters
----------
data : numpy.ndarray (float)
The data from which to calculate smoothed values. Must be an array
of arbitrary shape, but have n_dimensions.
kernel : numpy.ndarray (float)
The kernel to smooth with. Must be an array of arbitrary shape, but
match the same number of dimensions as `data`.
indices : numpy.ndarray (int)
The indices in relation to `data` for which to calculate smoothed
values. Must be of shape (n_dimensions, n) and use numpy dimensional
ordering (y, x).
kernel_reference_index : numpy.ndarray (int)
The kernel reference index specifying the center of the kernel.
Must be of shape (n_dimensions,) and use numpy dimensional ordering
(y, x).
knots : numpy.ndarray (float)
The knots as calculated by the :class:`Spline` object on `kernel`.
These should be of shape (n_dimensions, max_knots) where max_knots is
the maximum possible number of knots for a spline representation of
`kernel` over all dimensions. Dimensions should be ordered as (x, y).
coefficients : numpy.ndarray (float)
The spline coefficients of shape (n_coefficients,).
degrees : numpy.ndarray (int)
The spline degrees for each dimension of shape (n_dimensions,).
Dimensions should be ordered as (x, y).
panel_mapping : numpy.ndarray (int)
The panel mapping translation for the spline fit. Should be of shape
(n_dimensions, n_panels). Dimensions should be ordered as (x, y).
panel_steps : numpy.ndarray (int)
The panel steps translation for the spline fit. Should be of shape
(n_dimensions,). Dimensions should be ordered as (x, y).
knot_steps : numpy.ndarray (int)
The spline knot steps translation for the spline fit. Should be of
shape (n_dimensions,). Dimensions should be ordered as (x, y).
nk1 : numpy.ndarray (int)
Another spline mapping parameter which is equal to
n_knots - degrees - 1 for the spline. Should be of shape
(n_dimensions,). Dimensions should be ordered as (x, y).
spline_mapping : numpy.ndarray (int)
A spline mapping translation for the spline knots. Should be of shape
(n_dimensions, max_knots). Dimensions should be ordered as (x, y).
weight : numpy.ndarray (float), optional
The optional `data` weights. Should have the same shape as `data`.
valid : numpy.ndarray (bool), optional
An optional array that marks good `data` values as True, and all others
that should not be used in the fit as `False`. Should be the same
shape as `data`.
Returns
-------
smooth_values, smooth_weights : numpy.ndarray, numpy.ndarray
The smoothed values and weights as determined by convolution of `data`
with `kernel`, and possibly adjusted for position by a spline fit.
Both will be of shape (n,) matching the number of indices passed in as
an argument.
"""
data_shape = np.asarray(data.shape)
kernel_shape = np.asarray(kernel.shape)
kernel_indices, _, kernel_steps = spline_utils.flat_index_mapping(
kernel_shape)
data_indices, _, data_steps = spline_utils.flat_index_mapping(data_shape)
if weight is None:
flat_weight = np.empty(0, dtype=nb.float64)
weighted = False
else:
flat_weight = weight.ravel()
weighted = True
if valid is None:
flat_valid = np.empty(0, dtype=nb.b1)
validated = False
else:
flat_valid = valid.ravel()
validated = True
n = indices.shape[1]
smooth_values = np.empty(n, dtype=nb.float64)
smooth_weights = np.empty(n, dtype=nb.float64)
for i in range(n):
smooth_values[i], smooth_weights[i] = smooth_value_at(
data=data,
kernel=kernel,
index=indices[:, i],
kernel_indices=kernel_indices,
kernel_reference_index=kernel_reference_index,
knots=knots,
coefficients=coefficients,
degrees=degrees,
panel_mapping=panel_mapping,
panel_steps=panel_steps,
knot_steps=knot_steps,
nk1=nk1,
spline_mapping=spline_mapping,
data_shape=data_shape,
kernel_shape=kernel_shape,
data_steps=data_steps,
flat_weight=flat_weight,
flat_valid=flat_valid,
weighted=weighted,
validated=validated)
return smooth_values, smooth_weights
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def smooth_value_at(data, kernel, index, kernel_indices,
kernel_reference_index, knots, coefficients,
degrees, panel_mapping, panel_steps,
knot_steps, nk1, spline_mapping, data_shape,
kernel_shape, data_steps,
flat_weight, flat_valid, weighted, validated
): # pragma: no cover
"""
Return the convolution of data with a kernel at a single point.
This low-level function basically sorts a convolution of `data` with
`kernel` at a single point (`index`) into processing by one of two
algorithms. If the index aligns perfectly with a point on the kernel,
the direct convolution is calculated via :func:`point_aligned_smooth`.
Otherwise, a slightly shifted version of the kernel is generated using
spline interpolation so that the shifted kernel indices align with those
of the data array, and direct convolution is possible. In this case,
convolution is performed using :func:`point_smooth`.
Parameters
----------
data : numpy.ndarray (float)
The data from which to calculate smoothed values. Must be an array
of arbitrary shape, but have n_dimensions.
kernel : numpy.ndarray (float)
The kernel to smooth with. Must be an array of arbitrary shape, but
match the same number of dimensions as `data`.
index : numpy.ndarray (int or float)
The index on `data` at which to calculate the smooth value. This
should be an array of shape (n_dimensions,) using numpy (y, x)
ordering.
kernel_indices : numpy.ndarray (int)
An array of shape (n_dimensions, kernel.size) as returned by
:func:`spline_utils.flat_index_mapping`. This gives the N-dimensional
kernel index for a flat kernel. I.e., kernel.ravel()[i] is the same
as kernel[kernel_indices[i]]. Note that dimensions are ordered using
the Numpy (y, x) convention.
kernel_reference_index : numpy.ndarray (int)
The kernel reference index specifying the center of the kernel.
Must be of shape (n_dimensions,) and use numpy dimensional ordering
(y, x).
knots : numpy.ndarray (float)
The knots as calculated by the :class:`Spline` object on `kernel`.
These should be of shape (n_dimensions, max_knots) where max_knots is
the maximum possible number of knots for a spline representation of
`kernel` over all dimensions. Dimensions should be ordered as (x, y).
coefficients : numpy.ndarray (float)
The spline coefficients of shape (n_coefficients,).
degrees : numpy.ndarray (int)
The spline degrees for each dimension of shape (n_dimensions,).
Dimensions should be ordered as (x, y).
panel_mapping : numpy.ndarray (int)
The panel mapping translation for the spline fit. Should be of shape
(n_dimensions, n_panels). Dimensions should be ordered as (x, y).
panel_steps : numpy.ndarray (int)
The panel steps translation for the spline fit. Should be of shape
(n_dimensions,). Dimensions should be ordered as (x, y).
knot_steps : numpy.ndarray (int)
The spline knot steps translation for the spline fit. Should be of
shape (n_dimensions,). Dimensions should be ordered as (x, y).
nk1 : numpy.ndarray (int)
Another spline mapping parameter which is equal to
n_knots - degrees - 1 for the spline. Should be of shape
(n_dimensions,). Dimensions should be ordered as (x, y).
spline_mapping : numpy.ndarray (int)
A spline mapping translation for the spline knots. Should be of shape
(n_dimensions, max_knots). Dimensions should be ordered as (x, y).
data_shape : numpy.ndarray (int)
The shape of `data` as an array of shape (n_dimensions,) in
Numpy (y, x) order.
kernel_shape : numpy.ndarray (int)
The shape of `kernel` as an array of shape (n_dimensions,) in Numpy
(y, x) order.
data_steps : numpy.ndarray (int)
An array of shape (n_dimensions,) in Numpy (y, x) order giving the
number of elements one would need to jump on a flattened `data` for
a single increment along a given dimension on the ND-array. Please
see :func:`spline_utils.flat_index_mapping` for further details.
flat_weight : numpy.ndarray (float)
The associated weight values for data, flattened to a single dimension
array of shape (data.size,). I.e., `flat_weight` = weight.ravel().
If no weighting is required, this should be an empty array of
shape (0,).
flat_valid : numpy.ndarray (bool)
An array marking data values as valid (`True`) or invalid (`False`).
Any invalid data will not be included in the calculation. This should
be a flattened singular dimension array taken from one that was
originally the same shape as `data` and be of shape (data.size,).
I.e., `flat_valid` = valid.ravel(). If no validity checking is
required (assumes all data points are valid), set this to an empty
array of shape (0,).
weighted : bool
If `True`, indicates that weighting is required and `flat_weight`
should be of shape (data.size,). Otherwise, no weighting is required,
and `flat_weight` may be of shape (0,).
validated : bool
If `True`, indicates that validity checking is required and
`flat_valid` should be of shape (data.size,). Otherwise, `flat_valid`
may be of shape (0,).
Returns
-------
smooth_value, smooth_weight : float, float
The derived smooth value and associated weight by convolving `data`
with `kernel` at `index`.
"""
n_dimensions = index.size
index_difference = index - kernel_reference_index
for dimension in range(n_dimensions):
if (index_difference[dimension]
!= np.floor(index_difference[dimension])):
# Must shift kernel using spline interpolation
return point_smooth(
flat_data=data.ravel(),
index=index,
kernel_indices=kernel_indices,
kernel_reference_index=kernel_reference_index,
knots=knots,
coefficients=coefficients,
degrees=degrees,
panel_mapping=panel_mapping,
panel_steps=panel_steps,
knot_steps=knot_steps,
nk1=nk1,
spline_mapping=spline_mapping,
data_shape=data_shape,
kernel_shape=kernel_shape,
data_steps=data_steps,
weighted=weighted,
validated=validated,
flat_weight=flat_weight,
flat_valid=flat_valid)
else:
# Perform direct convolution
return point_aligned_smooth(
flat_data=data.ravel(),
flat_kernel=kernel.ravel(),
flat_weight=flat_weight,
flat_valid=flat_valid,
data_index=np.asarray(index, dtype=nb.int64),
kernel_indices=kernel_indices,
kernel_reference_index=kernel_reference_index,
data_shape=data_shape,
data_steps=data_steps,
validated=validated,
weighted=weighted)
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def point_aligned_smooth(flat_data, flat_kernel, flat_weight, flat_valid,
data_index, kernel_indices, kernel_reference_index,
data_shape, data_steps, validated, weighted
): # pragma: no cover
"""
Perform direct kernel convolution at a single point.
This low-level function is designed to return the convolved result and
associated weight of data with a kernel at a given point, where all kernel
points perfectly align with points in the data array.
Parameters
----------
flat_data : numpy.ndarray (float)
A flattened version of N-dimensional data (data.ravel()) of shape (n,).
flat_kernel : numpy.ndarray (float)
A flattened version of an N-dimensions kernel (kernel.ravel()) of shape
(m,).
flat_weight : numpy.ndarray (float)
A flattened version of N-dimensional weights (weight.ravel()). If
`weighted` is `True`, must be of shape (n,).
flat_valid : numpy.ndarray (bool)
A flattened version of an N-dimensional validity array (valid.ravel()).
If `validated` is `True`, must be of shape (n,).
data_index : numpy.ndarray (int)
The N-D index on the original data array at which to determine the
smoothed value. Must be of shape (n_dimensions,) and by in Numpy
(y, x) order.
kernel_indices : numpy.ndarray (int)
An array of shape (n_dimensions, kernel.size) as returned by
:func:`spline_utils.flat_index_mapping`. This gives the N-dimensional
kernel index for a flat kernel. I.e., kernel.ravel()[i] is the same
as kernel[kernel_indices[i]]. Note that dimensions are ordered using
the Numpy (y, x) convention.
kernel_reference_index : numpy.ndarray (int)
The kernel reference index specifying the center of the kernel.
Must be of shape (n_dimensions,) and use numpy dimensional ordering
(y, x).
data_shape : numpy.ndarray (int)
The shape of `data` as an array of shape (n_dimensions,) in
Numpy (y, x) order.
data_steps : numpy.ndarray (int)
An array of shape (n_dimensions,) in Numpy (y, x) order giving the
number of elements one would need to jump on a flattened `data` for
a single increment along a given dimension on the ND-array. Please
see :func:`spline_utils.flat_index_mapping` for further details.
weighted : bool
If `True`, indicates that weighting is required and `flat_weight`
should be of shape (data.size,). Otherwise, no weighting is required,
and `flat_weight` may be of shape (0,).
validated : bool
If `True`, indicates that validity checking is required and
`flat_valid` should be of shape (data.size,). Otherwise, `flat_valid`
may be of shape (0,).
Returns
-------
smooth_value, smooth_weight : float, float
The derived smooth value and associated weight by convolving data with
kernel at `data_index`.
"""
w_sum = 0.0
wd_sum = 0.0
n_dimensions = data_steps.size
for j in range(flat_kernel.size):
k = flat_kernel[j]
if k == 0:
continue
ref_data_index = 0
for dimension in range(n_dimensions):
oi = (kernel_indices[dimension, j]
- kernel_reference_index[dimension]
+ data_index[dimension])
if oi < 0:
break
elif oi >= data_shape[dimension]:
break
ref_data_index += int(oi) * data_steps[dimension]
else:
if validated and not flat_valid[ref_data_index]:
continue
if weighted:
w = flat_weight[ref_data_index]
if w == 0:
continue
else:
w = 1.0
kw = k * w
w_sum += kw
wd_sum += kw * flat_data[ref_data_index]
if w_sum > 0:
wd_sum /= w_sum
else:
w_sum = 0.0
wd_sum = 0.0
return wd_sum, w_sum
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def point_smooth(flat_data, index, kernel_indices, kernel_reference_index,
knots, coefficients, degrees, panel_mapping, panel_steps,
knot_steps, nk1, spline_mapping, data_shape, kernel_shape,
data_steps, weighted, validated, flat_weight, flat_valid
): # pragma: no cover
"""
Convolution with a spline representation of a kernel at a single point.
This is a more complex interpretation of :func:`point_aligned_smooth`
intended for use when convolution occurs at a point on the kernel that does
not have an exact value. I.e. we need to calculate the intermediate kernel
value between two definite kernel values. For these purposes, spline
interpolation is used to derive offset kernel values to be used during
convolution. There are many different spline parameters which will need
to have been previously defined. However, these can all be calculated
easily by creating a :class:`sofia_redux.toolkit.splines.spline.Spline`
representation of the kernel and parsing the various attributes into this
function.
Parameters
----------
flat_data : numpy.ndarray (float)
A flattened version of N-dimensional data (data.ravel()) of
shape (n,).
index : numpy.ndarray (int)
The N-D index on the original data array at which to determine the
smoothed value. Must be of shape (n_dimensions,) and by in Numpy
(y, x) order.
kernel_indices : numpy.ndarray (int)
An array of shape (n_dimensions, kernel.size) as returned by
:func:`spline_utils.flat_index_mapping`. This gives the N-dimensional
kernel index for a flat kernel. I.e., kernel.ravel()[i] is the same
as kernel[kernel_indices[i]]. Note that dimensions are ordered using
the Numpy (y, x) convention.
kernel_reference_index : numpy.ndarray (int)
The kernel reference index specifying the center of the kernel.
Must be of shape (n_dimensions,) and use numpy dimensional
ordering (y, x).
knots : numpy.ndarray (float)
The knots as calculated by the :class:`Spline` object on `kernel`.
These should be of shape (n_dimensions, max_knots) where max_knots is
the maximum possible number of knots for a spline representation of
`kernel` over all dimensions. Dimensions should be ordered as (x, y).
coefficients : numpy.ndarray (float)
The spline coefficients of shape (n_coefficients,).
degrees : numpy.ndarray (int)
The spline degrees for each dimension of shape (n_dimensions,).
Dimensions should be ordered as (x, y).
panel_mapping : numpy.ndarray (int)
The panel mapping translation for the spline fit. Should be of shape
(n_dimensions, n_panels). Dimensions should be ordered as (x, y).
panel_steps : numpy.ndarray (int)
The panel steps translation for the spline fit. Should be of shape
(n_dimensions,). Dimensions should be ordered as (x, y).
knot_steps : numpy.ndarray (int)
The spline knot steps translation for the spline fit. Should be of
shape (n_dimensions,). Dimensions should be ordered as (x, y).
nk1 : numpy.ndarray (int)
Another spline mapping parameter which is equal to
n_knots - degrees - 1 for the spline. Should be of shape
(n_dimensions,). Dimensions should be ordered as (x, y).
spline_mapping : numpy.ndarray (int)
A spline mapping translation for the spline knots. Should be of shape
(n_dimensions, max_knots). Dimensions should be ordered as (x, y).
data_shape : numpy.ndarray (int)
The shape of `data` as an array of shape (n_dimensions,) in
Numpy (y, x) order.
kernel_shape : numpy.ndarray (int)
The shape of `kernel` as an array of shape (n_dimensions,) in Numpy
(y, x) order.
data_steps : numpy.ndarray (int)
An array of shape (n_dimensions,) in Numpy (y, x) order giving the
number of elements one would need to jump on a flattened `data` for
a single increment along a given dimension on the ND-array. Please
see :func:`spline_utils.flat_index_mapping` for further details.
weighted : bool
If `True`, indicates that weighting is required and `flat_weight`
should be of shape (data.size,). Otherwise, no weighting is required,
and `flat_weight` may be of shape (0,).
validated : bool
If `True`, indicates that validity checking is required and
`flat_valid` should be of shape (data.size,). Otherwise, `flat_valid`
may be of shape (0,).
flat_weight : numpy.ndarray (float)
A flattened version of N-dimensional weights (weight.ravel()). If
`weighted` is `True`, must be of shape (n,).
flat_valid : numpy.ndarray (bool)
A flattened version of an N-dimensional validity array (valid.ravel()).
If `validated` is `True`, must be of shape (n,).
Returns
-------
smooth_value, smooth_weight : float, float
The derived smooth value and associated weight by convolving data with
spline representation of the kernel at `index`.
"""
n_dimensions, n_kernel = kernel_indices.shape
kernel_coordinate = np.empty(n_dimensions, dtype=nb.float64)
k1 = degrees + 1
n_spline = int(np.prod(k1))
work_spline = np.empty((n_dimensions, np.max(k1)), dtype=nb.float64)
lower_bounds = np.zeros(n_dimensions, dtype=nb.float64)
upper_bounds = kernel_shape - 1
i0 = index - kernel_reference_index
# Continue here, since splines are in (x, y) order...
wd_sum = 0.0
w_sum = 0.0
nd1 = n_dimensions - 1
for j in range(n_kernel):
flat_data_index = 0
for dimension in range(n_dimensions):
coordinate = i0[dimension] + kernel_indices[dimension, j]
data_index = int(np.ceil(coordinate))
if data_index < 0:
break
elif data_index >= data_shape[dimension]:
break
# Need to convert numpy (y, x) ordering to
# spline (x, y) ordering...
kernel_coordinate[nd1 - dimension] = data_index - i0[dimension]
flat_data_index += data_steps[dimension] * data_index
else:
if validated and not flat_valid[flat_data_index]:
continue
if weighted:
w = flat_weight[flat_data_index]
if w == 0:
continue
else:
w = 1.0
kernel_value = spline_utils.single_fit(
coordinate=kernel_coordinate,
knots=knots,
coefficients=coefficients,
degrees=degrees,
panel_mapping=panel_mapping,
panel_steps=panel_steps,
knot_steps=knot_steps,
nk1=nk1,
spline_mapping=spline_mapping,
k1=k1,
n_spline=n_spline,
work_spline=work_spline,
lower_bounds=lower_bounds,
upper_bounds=upper_bounds)
if np.isnan(kernel_value):
continue
kw = w * kernel_value
w_sum += kw
wd_sum += kw * flat_data[flat_data_index]
if w_sum == 0:
return 0.0, 0.0
return wd_sum / w_sum, w_sum
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def sequential_array_add(array, add_values, at_indices, valid_array=None,
valid_indices=None): # pragma: no cover
"""
Add one array of values to another.
Unlike standard adding, values can be added from `add_values` to a single
index of `array` multiple times.
Examples
--------
>>> x = np.zeros(5)
>>> y = np.ones(5)
>>> indices = np.array([0, 0, 3, 3, 3])
>>> added = sequential_array_add(x, y, indices)
>>> print(added, x)
[ True False False True False] [2. 0. 0. 3. 0.]
Parameters
----------
array : numpy.ndarray (float or int)
The array to add to of shape (shape1). Addition will be performed
in-place.
add_values : numpy.ndarray (float or int)
The values to add to the array of shape (shape2)
at_indices : numpy.ndarray (int)
The indices on `array` for which to add `add_values`. Must be of
shape (n_dimensions, values.size or add_values.shape).
One-dimensional arrays may be of shape (values.size,).
valid_array : numpy.ndarray (bool), optional
An array of shape (shape1) where `True` indicates that `array` may be
added to at that index.
valid_indices : numpy.ndarray (bool), optional
An array of shape (shape2) where `False` excludes the same element of
`add_values` from being added to `array`.
Returns
-------
added : numpy.ndarray (bool)
An array the same shape as `array` where `True` indicates that an
element has been added to.
"""
array_shape = np.asarray(array.shape)
(array_indices, _,
array_steps) = spline_utils.flat_index_mapping(array_shape)
flat_array = array.ravel()
added_to = np.full(array.shape, False)
flat_add = add_values.ravel()
at_indices = np.atleast_2d(at_indices)
n_dimensions = array.ndim
flat_added_to = added_to.ravel()
n_add = int(np.prod(np.array(at_indices.shape[1:])))
valid = np.full(n_add, True)
flat_indices = np.zeros(n_add, dtype=nb.int64)
if valid_array is None:
do_valid_array = False
flat_valid_array = np.empty(0, dtype=nb.b1)
else:
do_valid_array = True
flat_valid_array = valid_array.ravel()
if valid_indices is None:
do_valid_indices = False
flat_valid_indices = np.empty(0, dtype=nb.b1)
else:
do_valid_indices = True
flat_valid_indices = valid_indices.ravel()
for dimension in range(n_dimensions):
step = array_steps[dimension]
index_line = at_indices[dimension].ravel()
max_index = array_shape[dimension]
for i in range(n_add):
if not valid[i]:
continue
if do_valid_indices and not flat_valid_indices[i]:
valid[i] = False
continue
index = index_line[i]
if index < 0 or index >= max_index:
valid[i] = False
continue
flat_indices[i] += step * index
for i in range(n_add):
if not valid[i]:
continue
index = flat_indices[i]
if do_valid_array and not flat_valid_array[index]:
continue
flat_array[index] += flat_add[i]
flat_added_to[index] = True
return added_to
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def index_of_max(array, valid=None, sign=1): # pragma: no cover
"""
Return the maximum value and index of that value.
Parameters
----------
array : numpy.ndarray (float)
The array of data values to check.
valid : numpy.ndarray (bool), optional
Optionally exclude elements by setting the corresponding element in
valid to `False`.
sign : int or float, optional
If sign < 0, return the minimum value/index. If sign > 0 (default),
return the maximum value/index. If sign == 0, return the maximum
absolute value/index.
Returns
-------
value, index : 2-tuple (float, numpy.ndarray (int))
The max/min/max-absolute value and corresponding index.
"""
index = -1
if valid is None:
check_valid = False
flat_valid = np.empty(0, dtype=nb.b1)
else:
check_valid = True
flat_valid = valid.ravel()
flat_indices, _, flat_steps = spline_utils.flat_index_mapping(
np.asarray(array.shape))
flat_array = array.ravel()
positive = False
absolute = False
negative = False
if sign > 0:
check_value = -np.inf
positive = True
elif sign < 0:
check_value = np.inf
negative = True
else:
check_value = 0.0
absolute = True
for i in range(flat_array.size):
value = flat_array[i]
if np.isnan(value):
continue
if check_valid and not flat_valid[i]:
continue
if positive and value > check_value:
check_value = value
index = i
elif negative and value < check_value:
check_value = value
index = i
elif absolute:
value = np.abs(value)
if value > check_value:
check_value = value
index = i
if index == -1:
return np.nan, np.full(array.ndim, -1)
else:
return flat_array[index], flat_indices[:, index]
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def robust_mean(values, tails=None): # pragma: no cover
"""
Calculate the robust mean value excluding tails.
Parameters
----------
values : numpy.ndarray (float)
tails : float, optional
Calculate the mean between the sorted values within the tails of the
distribution.
Returns
-------
mean : float
"""
if tails is None:
return np.nanmean(values)
sorted_values = np.sort(values)
n = sorted_values.size
for last in range(n):
if np.isnan(sorted_values[last]):
break
else:
last = n
dn = round_value(tails * last)
start = dn
end = last - dn
if start >= end:
return np.nan
value_sum = 0.0
for i in range(start, end):
value_sum += sorted_values[i]
return value_sum / (end - start)
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def round_value(x): # pragma: no cover
"""
A fix to :func:`np.round` so that values are always rounded as expected.
In special cases, when `int(x)` is even, and it's decimal value is exactly
#.5, :func:`np.round` floors the value instead of performing a ceil.
This function fixes that.
Note that NaN/infinite values are rounded as -sys.maxsize - 1.
Parameters
----------
x : float or int
The value to round.
Returns
-------
rounded_x : int
"""
if not np.isfinite(x):
return _bad_int_value
if x % 1 != 0.5:
return int(np.round(x))
int_x = int(x)
if int_x % 2 != 0:
return int(np.round(x))
if x < 0:
return int_x - 1
else:
return int_x + 1
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def round_array(x): # pragma: no cover
"""
Correctly round a given array.
Please see :func:`round_value` for a description of the problem with
:func:`np.round`.
Parameters
----------
x : numpy.ndarray
The array to round.
Returns
-------
rounded_x : numpy.ndarray (int)
"""
result = np.empty(x.shape, dtype=nb.int64)
flat_result = result.flat
flat_x = x.flat
for i, value in enumerate(flat_x):
rounded_value = round_value(value)
flat_result[i] = rounded_value
return result