# Licensed under a 3-clause BSD style license - see LICENSE.rst
import itertools
import math
import warnings
import numba as nb
import numpy as np
from astropy import log
from scipy import interpolate
from scipy.ndimage import shift
from scipy.spatial import Delaunay
from sofia_redux.toolkit.utilities.func import nantrim, stack, to_array_shape
__all__ = ['line_shift', 'interpolate_nans', 'spline', 'sincinterp',
'interp_1d_point', 'interp_1d_point_with_error',
'interp_error_1d', 'interp_error_nd', 'interp_error',
'Interpolate', 'tabinv', 'findidx']
[docs]
def line_shift(y, offset, order=3, missing=np.nan):
"""
Shift an equally spaced array of data values by an offset
Required because no Python standard interpolation algorithms
allow for nan values in the input data without slowing down
the processing by several orders of magnitude.
Parameters
----------
y : array_like
equally spaced input data
offset : offset
offset to shift_image data. Units are the input data spacing
order : int
values must be 2-5.
2-5: spline interpolation of the same order
missing : int or float
numpy.nan values are treated as missing and will be ignored
during the fit. In the output array, any missing values
will be replaced by `missing`.
Returns
-------
array_like
numpy.ndarray. Will be of numpy.float32 if y was int type
"""
intype = type(y[0])
if order == 0:
offset = int(offset)
result = shift(np.float64(y), offset,
order=0, cval=missing)
if not (np.isnan(result).any() and np.issubdtype(intype, np.integer)):
result = intype(result)
return result
mask = ~np.isnan(y)
weights = np.float64(mask)
result = np.empty_like(weights)
result.fill(missing)
ny = int(weights.sum())
order = order if order < ny else int(ny - 1)
if order <= 0:
return result
yind = np.arange(len(y))
tck = interpolate.splrep(yind[mask], y[mask], s=0, k=order)
xout = yind - offset
valid = (xout >= min(yind[mask])) & (xout <= max(yind[mask]))
if ~np.any(valid):
return result
yout = interpolate.splev(xout[valid], tck)
result[yind[valid]] = yout
if not np.issubdtype(intype, np.integer):
return intype(result)
else:
return np.float32(result)
[docs]
def interpolate_nans(x, y, xout, missing=np.nan, order=3, width=1, tck=False):
"""
Interpolate values containing NaNs
Parameters
----------
x : np.array
independent variable
y : np.array
dependent variable
xout : np.array
output locations
missing : int or float, optional
value to fill when a value cannot be determined
width : spacing between x values. Used to determine minimum order
tck : bool, optional
if True, returns an array containing the vector limits of knots,
B-spline coefficients, and the degree of the spline.
order : int
spline order
Returns
-------
np.array or tuple
y interpolated onto xout or tck
"""
itype = np.asarray(y).dtype
if not tck:
result = np.full((len(xout),), missing, dtype=itype)
else:
result = [], [], []
if order < 1 or order > 5:
log.error("order must be between 1 and 5")
return result
mask = ~np.isnan(y)
nvalid = mask.sum()
if not np.any(mask):
return result
order = order if order < nvalid else nvalid - 1
if order <= 0:
return result
idxvalid = np.arange(len(y))[mask]
xvalid = x[idxvalid]
yvalid = y[idxvalid]
sortidx = np.array(
[x for _, x in sorted(zip(xvalid, np.arange(nvalid)))])
sortx = xvalid[sortidx]
lower = np.array(list(
map(lambda z: ((xvalid < z) & (xvalid >= z - order * width)).sum(),
sortx)))
upper = np.array(list(
map(lambda z: ((xvalid > z) & (xvalid <= z + order * width)).sum(),
sortx)))
# we do not want to fit areas that are not bounded by valid data
# so define regions that can be fitted successfully using a determined
# order. single values will just have to be plonked into a cell
singles = np.where((lower == 0) & (upper == 0))[0]
for loner in singles:
lidx = sortidx[loner]
if not tck:
result[idxvalid[lidx]] = yvalid[lidx]
else:
result[0].append(xvalid[lidx])
result[1].append(yvalid[lidx])
if len(lower) <= 1 or len(upper) <= 1: # pragma: no cover
return result
starts = (lower == 0) & (upper != 0)
starts[:-1][starts[:-1] & starts[1:]] = False
stops = (lower != 0) & (upper == 0)
stops[1:][stops[1:] & stops[:-1]] = False
box_lower = np.where(starts)[0]
box_upper = np.where(stops)[0]
for bl in box_lower:
bu = box_upper[box_upper > bl].min()
inds = sortidx[bl: bu + 1]
outidx = (xout >= xvalid[inds].min()) & (xout <= xvalid[inds].max())
if not np.any(outidx):
continue
fx = xvalid[inds]
fy = yvalid[inds]
fidx = fx.argsort()
fx = fx[fidx]
fy = fy[fidx]
maxorder = (lower[bl: bu + 1] + upper[bl: bu + 1]).max()
maxorder = order if maxorder > order else maxorder
box_tck = interpolate.splrep(fx, fy, k=maxorder, s=0)
if tck:
result[0].extend(box_tck[0])
result[1].extend(box_tck[1])
result[2].append(box_tck[2])
else:
result[outidx] = interpolate.splev(xout[outidx], box_tck)
return result
[docs]
def spline(x, y, xout, sigma=1.0):
"""
Perform cubic spline (tensioned) interpolation
Replicates IDL spline function. There are no standard Python
Libraries that do this. This function fits the input points
exactly allowing flexibility between the points where the
"tension" of the fit is determined by sigma.
Parameters
----------
x : array_like of float (N,)
Independent values. Values MUST be monotonically increasing
y : array_like of float (N,)
Dependent values.
xout : float or array_like of float (M,)
New Independent values
sigma : float, optional
The amount of "tension" that is applied to the curve. The
default value is 1.0. If sigma is close to 0, (e.g., 0.01),
then effectively there is a cubic spline fit. If sigma is
large, (e.g., greater than 10), then the fit will be like a
polynomial interpolation
Returns
-------
yout : float or numpy.ndarray of float (M,)
`y` interpolated at `xout`.
Notes
-----
Author: Walter W. Jones, Naval Research Laboratory, Sept 26, 1976.
Reviewer: Sidney Prahl, Texas Instruments.
Adapted for IDL: DMS, March, 1983.
CT, RSI, July 2003: Added double precision support and DOUBLE keyword,
use vector math to speed up the loops.
CT, RSI, August 2003: Must have at least 3 points.
Adapted for Python: Dan Perera (USRA), April, 2019
"""
if sigma < 0.001:
sigma = 0.001
x, y = np.array(x).astype(float), np.array(y).astype(float)
n = x.size
if x.ndim != 1:
raise ValueError("x array must have 1 dimension")
elif y.ndim != 1:
raise ValueError("y array must have 1 dimension")
elif n < 3:
raise ValueError("require at least 3 elements in x and y")
elif y.size != n:
raise ValueError("x and y shape mismatch")
isarr = hasattr(xout, '__len__')
if not isarr:
xout = [xout]
xout = np.array(xout).astype(float)
if xout.ndim != 1:
raise ValueError("xout must have 1 dimension")
yp = np.zeros(2 * n) # storage
delx1 = x[1] - x[0]
right = delx1 < 0
dx1 = (y[1] - y[0]) / delx1
delx2 = x[2] - x[1]
delx12 = x[2] - x[0]
c1 = -(delx12 + delx1) / delx12 / delx1
c2 = delx12 / delx1 / delx2
c3 = -delx1 / delx12 / delx2
nm1 = n - 1
slpp1 = c1 * y[0] + c2 * y[1] + c3 * y[2]
deln = x[nm1] - x[nm1 - 1]
delnm1 = x[nm1 - 1] - x[nm1 - 2]
delnn = x[nm1] - x[nm1 - 2]
c1 = (delnn + deln) / delnn / deln
c2 = -delnn / deln / delnm1
c3 = deln / delnn / delnm1
slppn = c3 * y[nm1 - 2] + c2 * y[nm1 - 1] + c1 * y[nm1]
sigmap = sigma * nm1 / (x[nm1] - x[0])
dels = sigmap * delx1
exps = np.exp(dels)
sinhs = 0.5 * (exps - (1.0 / exps))
sinhin = 1.0 / (delx1 * sinhs)
diag1 = sinhin * (dels * 0.5 * (exps + (1.0 / exps)) - sinhs)
diagin = 1.0 / diag1
yp[0] = diagin * (dx1 - slpp1)
spdiag = sinhin * (sinhs - dels)
yp[n] = diagin * spdiag
delx2 = np.diff(x) # x[1:] - x[:-1]
dx2 = np.diff(y) / delx2 # (y[1:] - y[:-1]) / delx2
dels = sigmap * delx2
exps = np.exp(dels)
sinhs = 0.5 * (exps - (1.0 / exps))
sinhin = 1.0 / (delx2 * sinhs)
diag2 = sinhin * (dels * (0.5 * (exps + (1.0 / exps))) - sinhs)
diag2[1:] += diag2[:-1]
diag2[0] = 0
dx2nm1 = dx2[nm1 - 1] # need to save this to calculate yp[nm1]
dx2[1:] -= dx2[:-1]
dx2[0] = 0
spdiag = sinhin * (sinhs - dels)
# Need to do an iterative loop for this part
for i in range(1, nm1):
diagin = 1.0 / (diag2[i] - spdiag[i - 1] * yp[i + n - 1])
yp[i] = diagin * (dx2[i] - spdiag[i - 1] * yp[i - 1])
yp[i + n] = diagin * spdiag[i]
diagin = 1.0 / (diag1 - spdiag[nm1 - 1] * yp[n + nm1 - 1])
yp[nm1] = diagin * (slppn - dx2nm1 - spdiag[nm1 - 1] * yp[nm1 - 1])
for i in range(n - 2, -1, -1):
yp[i] -= yp[i + n] * yp[i + 1]
# find subscript where x[subs] > xout(j) > xx[subs-1]
subs1 = np.digitize(xout, x[1:nm1], right=right)
subs = subs1 + 1
s = x[nm1] - x[0]
sigmap = sigma * nm1 / s
del1 = xout - x[subs1]
del2 = x[subs] - xout
dels = x[subs] - x[subs1]
exps1 = np.exp(sigmap * del1)
sinhd1 = 0.5 * (exps1 - (1.0 / exps1))
exps = np.exp(sigmap * del2)
sinhd2 = 0.5 * (exps - (1.0 / exps))
exps *= exps1
sinhs = 0.5 * (exps - (1.0 / exps))
spl = (yp[subs] * sinhd1 + yp[subs1] * sinhd2) / sinhs
spl += ((y[subs] - yp[subs]) * del1 + (y[subs1] - yp[subs1]) * del2) / dels
return spl if isarr else spl[0]
[docs]
def sincinterp(x, y, xout, dampfac=3.25, ksize=21,
skipsort=False, cval=np.nan):
"""
Perform a sinc interpolation on a data set
Parameters
----------
x : array_like of (int or float)
(shape1,) The independent values of the data
y : array_like of (int or float)
(shape1,) The dependent values of the data
xout : array_like of (int or float) or (int or float)
(shape2,) The new independent values of the data
dampfac : int or float, optional
damping factor for sinc interpolation
ksize : float or int, optional
Kernel size used for interpolation. This should be a positive
odd integer. If an even value is passed, then the kernel size
will be increased by one. Float values will be floored.
skipsort : bool, optional
By default, x and y data are sorted by the x value. This can
be a fairly intensive operation, so if you know the data is
already sorted, then set `skipsort` to skip sorting. Note, this
is dangerous as no error will be reported. Be sure you have
a sorted `x` before attempting.
cval: float, optional
Value to fill in requested interpolation points outside the
data range.
Returns
-------
numpy.ndarray of float
(shape2,) The new dependent values of the data
"""
ksize = int(ksize)
if ksize % 2 == 0:
ksize += 1
if ksize < 1:
raise ValueError("ksize must be a positive (odd) integer")
x = np.array(x).astype(float)
y = np.array(y).astype(float)
if x.shape != y.shape:
raise ValueError("x and y array shape mismatch")
isarr = hasattr(xout, '__len__')
if not isarr:
xout = [xout]
xout = np.array(xout).astype(float)
mask = np.isfinite(x) & np.isfinite(y)
yout = np.full(xout.shape, cval)
if mask.sum() < 2: # cannot interpolate
return yout
x, y = x[mask], y[mask]
if not skipsort:
xsort = np.argsort(x)
x, y = x[xsort], y[xsort]
xx = tabinv(x, xout)
ix = xx.astype(int)
# Values directly on points
fx = xx - ix
onpoint = fx == 0
if onpoint.any():
yout[onpoint] = np.take(y, ix[onpoint])
# use sinc interpolation for the points having fractional values
if not onpoint.all():
dx = fx[~onpoint]
kernel = np.arange(ksize) - (ksize // 2)
kx = np.resize(kernel, (dx.size, kernel.size)).T
lobe = kx.copy()
kx = kx - dx
sinc = np.exp(-(kx / dampfac) ** 2) * np.sinc(kx)
lobe = np.clip(lobe + ix[~onpoint], 0, x.size - 1)
vals = np.take(y, lobe)
yout[~onpoint] = np.sum(sinc * vals, axis=0)
if not isarr:
yout = yout[0]
return yout
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def interp_1d_point(x, y, xout): # pragma: no cover
"""
Perform linear interpolation at a single point. `x` must be monotonically
increasing or decreasing containing unique values.
Superfast compared to anything else including `numpy.interp`.
Parameters
----------
x : array_like of float
(N,) array of independent values. Must be monotonically
increasing or decreasing.
y : array_like of float
(N,) array of dependent values.
xout : float
New independent value.
Returns
-------
yout : float
The new dependent values at `xout`.
"""
# Check for direction
if x[0] > x[-1]: # this is ok - just a view, not copy
x = x[::-1]
y = y[::-1]
if x[0] > xout:
return np.nan
elif x[-1] < xout:
return np.nan
if x[0] == xout:
return y[0]
right = -1
for i in range(1, x.size):
if x[i] >= xout:
right = i
break
rx = x[right]
if rx == xout:
return y[right]
left = right - 1
lx = x[left]
ly = y[left]
ry = y[right]
dy = ry - ly
weight = (xout - lx) / (rx - lx)
return ly + (weight * dy)
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def interp_1d_point_with_error(x, y, error, xout): # pragma: no cover
"""
Perform linear interpolation at a single point with error propagation.
`x` must be monotonically increasing or decreasing containing unique
values.
Superfast compared to anything else including `numpy.interp`.
Parameters
----------
x : array_like of float
(N,) array of independent values. Must be monotonically
increasing or decreasing.
y : array_like of float
(N,) array of dependent values.
error : array_like of float
(N,) array of errors in dependent values.
xout : float
New independent value.
Returns
-------
yout, eout : float, float
The new dependent values and error at `xout`.
"""
# Check for direction
if x[0] > x[-1]: # this is ok - just a view, not copy
x = x[::-1]
y = y[::-1]
error = error[::-1]
if x[0] > xout:
return np.nan, np.nan
elif x[-1] < xout:
return np.nan, np.nan
if x[0] == xout:
return y[0], error[0]
right = -1
for i in range(1, x.size):
if x[i] >= xout:
right = i
break
rx = x[right]
if rx == xout:
return y[right], error[right]
left = right - 1
lx = x[left]
ly = y[left]
ry = y[right]
dy = ry - ly
weight = (xout - lx) / (rx - lx)
yout = ly + (weight * dy)
lv = error[left]
lv *= lv
rv = error[right]
rv *= rv
vsum = lv + rv
lv += vsum * weight * weight
right_weight = 1.0 - weight
rv += vsum * right_weight * right_weight
if lv < rv:
eout = math.sqrt(lv)
else:
eout = math.sqrt(rv)
return yout, eout
[docs]
def interp_error_1d(x, error, xout, cval=np.nan):
"""
Perform linear interpolation of errors
Parameters
----------
x : array_like of float
(N,) array of independent values.
error : array_like of float
(N,) array of errors in dependent values.
xout : int or float or (array_like of (int or float))
New independent values. Either a scalar or array of shape (M,)
cval : float, optional
Value to fill in requested interpolation points outside the
data range.
Returns
-------
float or numpy.ndarray
The new dependent error values at `xout`. Will be either a
float or array of shape (M,) and numpy.float64 type depending
on the supplied `xout`.
"""
isarr = hasattr(xout, '__len__')
if not isarr:
xout = [xout]
x, error, xout = np.array(x), np.array(error), np.array(xout)
shape, oshape = x.shape, xout.shape
if error.shape != shape:
raise ValueError("x and error shape mismatch")
var = error ** 2
error = np.full(oshape, float(cval))
# determine index of xout on x
idx = findidx(x, xout, left=np.nan, right=np.nan)
mask = np.isfinite(idx)
if not mask.any():
return error
xout, eout = xout[mask], np.empty(mask.sum())
left = np.floor(idx[mask]).astype(int)
right = np.ceil(idx[mask]).astype(int)
# where no interpolation is required, just copy the values over
pmask = left == right
eout[pmask] = var[left][pmask]
# now look at the points where interpolation is required
pmask = np.logical_not(pmask, out=pmask)
if pmask.any():
left, right, xout = left[pmask], right[pmask], xout[pmask]
vsum = var[left] + var[right]
weights = (xout - x[left]) / (x[right] - x[left])
vleft = var[left] + ((weights ** 2) * vsum)
vright = var[right] + (((1 - weights) ** 2) * vsum)
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
# take the smallest (not closest) value
eout[pmask] = np.clip(vleft, 0, vright)
error[mask] = np.sqrt(eout)
if not isarr:
error = error[0]
return error
[docs]
def interp_error_nd(points, error, xi, cval=np.nan):
"""Propagate errors using Delaunay triangulation in N-dimensions
Uses `interp_error_1d` to propagate errors in 1-dimension following
linear interpolation, or propagates errors in N-dimensions when Delaunay
triangulation has been used for linear interpolation.
This is not accurate for any other type of interpolation other than the
types mentioned above.
Parameters
----------
points : numpy.ndarray (nsamples, ndim) or Delaunay triangulation
Coordinates of points or the pre-computed scipy.spatial.Delaunay
triangulation.
error : float or numpy.ndarray (nsamples,)
Error at each point.
xi : numpy.ndarray (npoints, ndim)
Points at which to interpolate data.
cval : float, optional
Value to fill in requested interpolation points outside the
data range.
Returns
-------
numpy.ndarray (npoints,)
"""
tri = points if isinstance(points, Delaunay) else Delaunay(points)
ndim = tri.points.shape[1]
simplices = tri.find_simplex(xi)
mask = simplices != -1
result = np.full(xi.shape[0], cval)
if not mask.any():
return result
vertices = tri.simplices[simplices[mask]]
if not hasattr(error, '__len__'):
error = to_array_shape(float(error), np.max(vertices) + 1)
# Find points that do not require interpolation
vpoints = tri.points[vertices]
xi_find = xi[mask]
same_points = np.all(xi_find[:, None] == vpoints, axis=2)
same = same_points.any(axis=1)
if same.any():
values = result[mask]
values[same] = error[vertices[same_points]]
result[mask] = values
mask[same] = False
if not mask.any():
return result
vinds = tri.simplices[simplices[mask]]
else:
vinds = vertices
var = error ** 2
var = var[vinds]
transform = tri.transform[simplices[mask]]
b = np.einsum('ijk,ik->ij',
transform[:, :ndim, :ndim],
xi[mask] - transform[:, ndim, :])
weights = np.c_[b, 1 - b.sum(axis=1)]
v0 = var + (weights ** 2) * var
v1 = var + ((1 - weights) ** 2) * var
result[mask] = np.sqrt(np.min([v0, v1], axis=0).sum(axis=1))
return result
[docs]
def interp_error(points, error, xi, cval=np.nan):
"""Propagate errors using linear interpolation in N-dimensions
Allows linear interpolation of errors over 1 or multiple dimensions
using `interp_error_1d` or `interp_error_nd`. Note, error propagation
is only valid for linear interpolation in 1-dimension or linear
interpolation through Delaunay triangulation in N-dimensions.
Parameters
----------
points : numpy.ndarray (nsamples, ndim) or Delaunay triangulation
Coordinates of points or the pre-computed scipy.spatial.Delaunay
triangulation.
error : float or numpy.ndarray (nsamples,)
Error at each point.
xi : numpy.ndarray (npoints, ndim)
Points at which to interpolate data.
cval : float, optional
Value to fill in requested interpolation points outside the
data range.
Returns
-------
numpy.ndarray (npoints,)
"""
if isinstance(points, Delaunay):
return interp_error_nd(points, error, xi, cval=cval)
points = np.asarray(points)
if points.ndim == 1:
return interp_error_1d(points, error, xi, cval=cval)
elif points.ndim == 2:
if points.shape[1] == 1:
return interp_error_1d(points[:, 0], error, xi, cval=cval)
else:
return interp_error_nd(points, error, xi, cval=cval)
else:
raise ValueError(
"points must be a 1 (npoints,) or 2 (npoints, ndim) array "
"unless supplying the Delaunay triangulation")
[docs]
class Interpolate(object):
"""
Fast interpolation on a regular grid
Much like scipy.interpolate.RegularGridInterpolator except better.
Allows for cubic interpolation, omission of grid coordinates,
and NaN handling.
Parameters
----------
args : array_like or tuple of array_like
Either a single array whose coordinates will be determined
by the features, or arrays of independent values followed
by the dependent values.
method : str, optional
One of {'linear', 'cubic', 'nearest'}
cubic : float, optional
Defines the value of "a" below for the fast approximation of
the bicubic interpolator. a = -0.5 produces third order
convergence with respect to the sampling interval. The
convolution weights are defined as::
w(x) = (a+2)|x|^3 - (a+3)|x|^2 + 1 for |x|<=1,
a|x|^3 - 5a|x|^2 + 8a|x| - 4a for 1<|x|<2,
0 otherwise
If method is None, setting cubic to a float will result in
cubic interpolation using the above weightings in each
dimension. The default value for `cubic` is -0.5.
mode : str, optional
One of {'nearest', 'reflect', 'mirror', 'wrap', 'constant'}
which determines how edge conditions are handled when the
interpolation kernel overlaps a border. Valid values and
behaviours are as follows:
- 'nearest' (a a a a | a b c d | d d d d):
The input is extended by replicating the last pixel
- 'reflect' (d c b a | a b c d | d c b a):
The input is extended by reflecting about the edge of
the last pixel.
- 'mirror' (d c b | a b c d | c b a):
The input is extended by reflecting about the center of
last pixel.
- 'wrap' (a b c d | a b c d | a b c d):
The input is extended by wrapping around the opposite
edge.
- `constant` (k k k k | a b c d | k k k k):
The input is extended by filling all values beyond the
edge with the same constant value defined by the `cval`
parameter.
cval : float, optional
The value used to fill values when `mode` is 'constant'.
ignorenans : bool, optional
If True, NaNs will be ignored in all calculations where possible.
"""
def __init__(self, *args, method=None, cval=np.nan, cubic=None,
ignorenans=True, error=None, mode='constant'):
self._valid_modes = [
'nearest', 'reflect', 'mirror', 'wrap', 'constant']
self._valid_methods = ['linear', 'cubic', 'nearest']
if method is None:
method = 'linear' if cubic is None else 'cubic'
if method not in self._valid_methods:
raise ValueError("Method '%s' is not defined" % method)
self.method = method
if method == 'cubic' and cubic is None:
cubic = -0.5
self.cubic = cubic
if mode not in self._valid_modes:
raise ValueError("Edge mode '%s' is not available" % mode)
self.mode = mode
self._product = np.nanprod if ignorenans else np.prod
self._sum = np.nansum if ignorenans else np.sum
self._min = np.nanmin if ignorenans else np.min
self.cval = cval
self.values = None
self.variance = None
self.do_error = False
self.ndim = None
self.grid = ()
self.dgrid = ()
self.parse_arguments(*args, error=error)
[docs]
def parse_arguments(self, *args, error=None):
"""
Parse initialization arguments.
Parameters
----------
args : array_like or tuple of array_like
Either a single array whose coordinates will be determined
by the features, or arrays of independent values followed
by the dependent values.
error : numpy.ndarray, optional
The associated error values for the data.
Returns
-------
None
"""
nargs = len(args)
if nargs == 1:
values = np.asarray(args[0])
points = tuple(np.arange(side) for side in values.shape[::-1])
else:
values, points = np.asarray(args[-1]), args[:-1][::-1]
if len(points) != values.ndim:
raise ValueError(
"There are %d grid arrays, but values has %d features"
% (len(points), values.ndim))
self.ndim = values.ndim
if hasattr(values, 'dtype') and hasattr(values, 'astype'):
if not np.issubdtype(values.dtype, np.inexact):
values = values.astype(float)
fill_value_dtype = np.asarray(self.cval).dtype
if (hasattr(values, 'dtype') and not
np.can_cast(fill_value_dtype, values.dtype,
casting='same_kind')):
raise ValueError("cval must be of a type compatible with values")
self.grid = self.dgrid = ()
for i, p in enumerate(points):
if not np.all(np.diff(p) > 0.):
raise ValueError("The points in dimension %d must be strictly "
"ascending" % i)
if not np.asarray(p).ndim == 1:
raise ValueError("The points in dimension %d must be "
"1-dimensional" % i)
if not values.shape[i] == len(p):
raise ValueError("There are %d grid points and %d values in "
"dimension %d" % (len(p), values.shape[i], i))
g = np.asarray(p)
d = np.empty(g.shape)
d[:-1] = np.diff(g)
d[-1] = d[-2]
self.grid += g,
self.dgrid += d,
self.set_values_and_error(values, error=error)
[docs]
def set_values_and_error(self, values, error=None):
"""
Set new interpolating values and error.
Parameters
----------
values : numpy.ndarray (int or float)
The new values to set. Must be the same shape as the interpolation
grid.
error : numpy.ndarray (int or float), optional
Optional error values to set.
Returns
-------
None
"""
self.set_values(values)
if error is not None:
self.variance = np.full_like(self.values, np.nan)
self.variance[(slice(0, -1),) * values.ndim] = np.asarray(
error ** 2)
self.do_error = True
else:
self.variance = None
self.do_error = False
[docs]
def set_values(self, values):
"""
Reset the interpolation values only.
Parameters
----------
values : numpy.ndarray, optional
The new values to set. Must be the same shape as the interpolation
grid.
Returns
-------
None
"""
self.values = np.full([s + 1 for s in values.shape], self.cval,
dtype=values.dtype)
self.values[(slice(0, -1),) * values.ndim] = values
[docs]
def __call__(self, *args, method=None, cubic=None, mode=None):
"""
Interpolation at coordinates.
Parameters
----------
args : array_like or tuple of array_like
The coordinates to sample the gridded data at. The order of
features follow the 'xy' rather than 'ij' convention. For
example, arguments for two-dimensional data should be supplied
in the order (x, y).
method : str
The method of interpolation to perform. One of {'linear', 'cubic',
'nearest'}.
"""
if method is None:
method = self.method if cubic is None else 'cubic'
if method not in self._valid_methods:
raise ValueError("Method '%s' is not defined" % method)
self.method = method
if self.method == 'cubic':
if cubic is not None:
self.cubic = cubic
elif self.cubic is None:
self.cubic = -0.5
if mode is None:
mode = self.mode
if mode not in self._valid_modes:
raise ValueError("Edge mode %s is not available" % mode)
self.mode = mode
if len(args) != self.ndim:
raise ValueError(
"require %i input arguments for %i-D data" %
(self.ndim, self.ndim))
shape_in = np.asarray(args[0]).shape
xi = stack(*args, copy=False)[::-1]
indices, norm_distances = self._find_indices(xi)
result = None
if method == 'linear':
result = self._evaluate_linear(indices, norm_distances)
elif method == 'nearest':
result = self._evaluate_nearest(indices, norm_distances)
elif method == 'cubic':
result = self._evaluate_cubic(indices, norm_distances)
return result.reshape(shape_in)
[docs]
@staticmethod
def cubic_weights(distances, a=-0.5):
d = np.abs(np.asarray(distances))
w = np.zeros(d.shape)
i = d <= 1
w[i] = ((a + 2) * d[i] ** 3) - ((a + 3) * d[i] ** 2) + 1
i = (d > 1) & (d < 2)
w[i] = (a * d[i] ** 3) - (5 * a * d[i] ** 2) + (8 * a * d[i]) - (4 * a)
return w
def _evaluate_cubic(self, indices, norm_distances, cubic=-0.5):
indset = itertools.product(*indices)
weights = itertools.product(*[self.cubic_weights(d, a=cubic)
for d in norm_distances])
s = indices[0].shape[0] ** self.ndim, indices[0].shape[1]
values = np.empty(s, dtype=self.values.dtype)
for vals, inds, wts in zip(values, indset, weights):
vals[:] = self.values[inds] * self._product(wts, axis=0)
return self._sum(values, axis=0)
def _evaluate_linear(self, indices, norm_distances):
indset = itertools.product(*indices)
weights = itertools.product(*[abs(1 - d) for d in norm_distances])
s = indices[0].shape[0] ** self.ndim, indices[0].shape[1]
values = np.empty(s, dtype=self.values.dtype)
for vals, inds, wts in zip(values, indset, weights):
vals[:] = self.values[inds] * self._product(wts, axis=0)
return self._sum(values, axis=0)
def _evaluate_nearest(self, indices, norm_distances):
idx_res = []
select_points = np.arange(indices[0].shape[1])
for i, yi in zip(indices, norm_distances):
idx = np.argmin(yi, axis=0)
idx_res.append(i[idx, select_points])
return self.values[tuple(idx_res)]
def _find_indices(self, xi):
# find relevant interpolants for xi
indices = []
# compute distance to lower edge in unity units
norm_distances = []
dx = 2 if self.method == 'cubic' else 1
offsets = np.arange(-dx, dx)
# iterate through features
for x, grid, dgrid in zip(xi, self.grid, self.dgrid):
highlim = grid.size - 1
i = np.searchsorted(grid, x)
l0 = i == 0
h0 = i == grid.size
i = i[:, None] + offsets[None]
inds = np.clip(i, 0, highlim)
d = (x[:, None] - grid[inds]) / dgrid[inds]
d = abs(d[:, dx]) % 1
d[l0] = 0
d[h0] = 1
d = abs(d[:, None] + offsets)
low = i < 0
high = i > highlim
if self.mode == 'nearest':
pass
elif self.mode == 'reflect':
inds[low] = abs(i[low])
inds[high] = 2 * highlim - i[high]
elif self.mode == 'mirror':
inds[low] = abs(i[low] + 1)
inds[high] = 2 * highlim - i[high] + 1
elif self.mode == 'wrap':
inds[low] = highlim + i[low] + 1
inds[high] = i[high] - highlim - 1
elif self.mode == 'constant':
low |= l0[:, None]
high |= h0[:, None]
inds[low] = -1
inds[high] = -1
else:
raise ValueError("unknown edge mode (%s)" % self.mode)
indices.append(inds.T)
norm_distances.append(d.T)
return indices, norm_distances
[docs]
def tabinv(array, xvals, missing=None, fast=True):
"""
Find the effective index of a function value in an ordered vector with
NaN handling.
Parameters
----------
array : array_like
The array to be searched. Should be monotonic increasing or
decreasing.
xvals : the function value(s) whose effective index is sought
missing : float or int, optional
Value to return if outside the limits. Default uses constant
value of the first or last value in the array. Only pertinent
if `fast` is True.
fast : bool, optional
If False, uses `findidx` to check if x in monotonic and does
not allow extrapolation beyond the limits of `array`. NaNs
will break monotonic check if not limited to padding on the
edges of `array`. If fast is True, np.interp is used without
any form of error checking.
Returns
-------
numpy.ndarray
The effective index or indices of `array`
Examples
--------
>>> from sofia_redux.toolkit.interpolate.interpolate import tabinv
>>> import numpy as np
>>> import pytest
>>> x = [np.nan, np.nan, 1, 2, np.nan, 3, np.nan, np.nan]
>>> tabinv(x, 1.5)
2.5
>>> with pytest.raises(ValueError):
... tabinv(x, 1.5, fast=False)
...
>>> x = [np.nan, np.nan, 1, 2, 3, np.nan, np.nan]
>>> tabinv(x, 1.5, fast=False)
2.5
"""
array = np.asarray(array, dtype=float)
if array.ndim != 1:
raise ValueError("Array must have 1-dimension")
if fast:
return np.interp(xvals, array, np.arange(len(array)),
left=missing, right=missing)
else:
idx = nantrim(array, 2, bounds=True)
return findidx(array[idx[0]: idx[1]], xvals) + idx[0]
[docs]
def findidx(ix, ox, left=0, right=None):
"""
Finds the effective index of a function value in an ordered array.
Formerly tabinv. findidx will abort if xi is not monotonic.
Equality of neighboring values in xi is allowed but results may
not be unique. This requirement may mean that xi padded with
zeroes could cause findidx to abort.
A binary search is used to find the values xi[i] and xi[i+1] where
xi[i] < xo < xi[i+1]. Output (ieff) is then computed using linear
interpolation between i and i+1::
ieff = i + (xo - xi[i]) / (xi[i+1] - xi[i])
Let n = number of elements in xi::
if (x < xi[0]) or (x > xi[n-1]) then xo = NaN
Parameters
----------
ix : array_like of float
(N,) The array to be searched, must be monotonic increasing or
decreasing.
ox : (array_like of float) or float
(M,) The function value(s) whose effective index is sought.
left : float, optional
Value to return for ox < min(ix). default is 0
right : float, optional
Value to return for ox > max(ix). default is len(ix) - 1
Returns
-------
float or numpy.ndarray
(M,) The effective index or indices of xo in xi. Note that
output type will be float and will need to changed to
integer in order to be used for indexing another array.
"""
ix = np.asarray(ix)
n = ix.size
if np.isnan(ix).any():
raise ValueError("NaNs detected: cannot test for mononcity")
if hasattr(ox, '__len__'):
ox = np.array(ox)
shape = ox.shape
ox = ox.ravel()
else:
ox = np.array([ox])
shape = None
if n <= 1:
return 0.0 if shape is None else np.zeros(shape)
# Initialize binary search area and compute number of divisions needed
ileft, iright = np.zeros((2, ox.size), dtype=int)
ndivisions = int(np.log10(n) / np.log10(2) + 1)
# Test for monotonicity
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
i = (ix - np.roll(ix, 1))[1:]
a = i >= 0
if a.sum() == (n - 1): # test for increasing array
np.add(iright, n - 1, out=iright)
else: # test for decreasing array
a = i <= 0
if a.sum() == (n - 1):
np.add(ileft, n - 1, out=ileft)
else:
raise ValueError("xi is not monotonic")
# Perform binary search by successively dividing search
# interval in half
for i in range(ndivisions):
idiv = (ileft + iright) // 2
xval = ix[idiv]
less, greater = ox <= xval, ox > xval
np.multiply(ileft, less, out=ileft)
np.add(ileft, idiv * greater, out=ileft)
np.multiply(iright, greater, out=iright)
np.add(iright, idiv * less, out=iright)
# Interpolate between interval of width = 1
# value on left and right sides
xleft, xright = ix[ileft], ix[iright]
iszero = xleft == xright
ieff = ((xright - ox) * ileft).astype(float)
np.add(ieff, (ox - xleft) * iright, out=ieff)
np.add(ieff, iszero * ileft, out=ieff)
np.divide(ieff, (xright - xleft + iszero), out=ieff)
# do not allow extrapolation beyond ends
np.clip(ieff, 0, n - 1, out=ieff)
past_edge = ox < np.nanmin(ix)
ieff[past_edge] = left
past_edge[...] = ox > np.nanmax(ix)
if right is None:
ieff[past_edge] = n - 1
else:
ieff[past_edge] = right
return ieff[0] if shape is None else np.reshape(ieff, shape)