Source code for sofia_redux.spectroscopy.binspec

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from astropy import log
import numpy as np
from sofia_redux.toolkit.resampling.resample_utils import fasttrapz
from scipy.interpolate import interp1d

__all__ = ['binspec']


[docs] def binspec(x, y, delta, xout=None, lmin=None, lmax=None, average=False): """ Bin a spectrum between lmin and lmax with bins delta wide Output wavelength bins are calculated from input lmin/lmax/dl. Flux values at edges of bins are interpolated, and values between the edges are summed. If desired, the summed flux is divided by the bin size. If either lmin or lmax is None, and xout is provided, the bins are assumed to be uneven and may have gaps. Parameters ---------- x : array_like of (int or float) Independent variable y : array_like of (int or float) Dependent variable delta : int or float or (array_like of (int or float)) Bin width. May be a scalar value or 1-D array (N_out,) matching the output data array size of the second dimension. xout : int or float or array_like of (int or float), optional Output locations lmin : int or float, optional Minimum value of independent variable lmax : int or float, optional Maximum value of independent variable average : bool, optional If True, average the y over the bin (to conserve flux) Returns ------- numpy.ndarray (2, N) where [0, :] = x out, and [1, :] = y out """ try: x = np.array(x).astype(float) except (ValueError, TypeError): log.error("Invalid input x data type") return try: y = np.array(y).astype(float) except (ValueError, TypeError): log.error("Invalid input y data type") return if x.size < 2: log.error("At least two input points are required") return if x.shape != y.shape: log.error("X and Y arrays have different dimensions") return if xout is not None: try: if not hasattr(xout, '__len__'): xout = [xout] xout = np.array(xout).astype(float) except (ValueError, TypeError): log.error("Invalid output x data type") return try: if not hasattr(delta, '__len__'): delta = [delta] delta = np.array(delta).astype(float) ndelta = delta.size except (ValueError, TypeError): log.error("Invalid delta data type") return if (lmin is None or lmax is None) and xout is not None: # Assume we have a set of output central wavelengths to match nxout = xout.size if ndelta != nxout: if hasattr(delta, '__len__'): delta = delta[0] delta = np.full(len(xout), delta) xledg = xout - delta / 2 xhedg = xout + delta / 2 elif ndelta == 1: # single value for delta (constant dl) lmin = x.min() if lmin is None else lmin lmax = x.max() if lmax is None else lmax nedgs = int((lmax - lmin) / delta) + 1 xledg = np.arange(nedgs) * delta + lmin xhedg = xledg.copy() + delta xout = xledg + delta / 2 nxout = xout.size delta = np.full(nxout, delta[0]) else: lmin = x.min() if lmin is None else lmin lmax = x.max() if lmax is None else lmax nbins = ndelta xlow = lmin idx = 0 xledg = [] xhedg = [] while idx < nbins: xupp = xlow + delta[idx] if xupp <= lmax: xledg.append(xlow) xhedg.append(xupp) else: # pragma: no cover # this clause is covered in tests, # but coverage doesn't count it break xlow = xupp idx += 1 xledg = np.array(xledg) xhedg = np.array(xhedg) xout = (xledg + xhedg) / 2 nxout = xout.size delta = xhedg - xledg nout = len(xout) yout = np.zeros(nxout) if nxout > nout else np.zeros(nout) # interpolate values at edges of bins xedgarr = list(xledg) xedgarr.append(xhedg[-1]) finterp = interp1d(x, y, fill_value='extrapolate') yedgarr = finterp(xedgarr) for i, (lower, upper) in enumerate(zip(xledg, xhedg)): idx = np.array((x > lower) & (x <= upper)) area = fasttrapz(y[idx], x[idx]) # add on edges if idx.sum() != 0: iidx = np.nonzero(idx)[0] i0, i1 = iidx[0], iidx[-1] x0, x1 = x[i0], x[i1] y0, y1 = y[i0], y[i1] if lower < x0: area += (x0 - lower) * (yedgarr[i] + y0) / 2.0 if upper > x1: area += (upper - x1) * (yedgarr[i + 1] + y1) / 2.0 yout[i] = area if average: yout /= delta return np.stack((xout, yout))