Source code for sofia_redux.spectroscopy.mergespec

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import warnings

from astropy import log
import numba as nb
import numpy as np

from sofia_redux.toolkit.utilities.func import nantrim
from sofia_redux.toolkit.stats import meancomb
from sofia_redux.spectroscopy.combflagstack import combflagstack
from sofia_redux.spectroscopy.interpflagspec import interpflagspec
from sofia_redux.spectroscopy.interpspec import interpspec

__all__ = ['mergespec']


[docs] def mergespec(spec1, spec2, info=None, sum_flux=False, s2n_threshold=None, s2n_statistic='median', noise_test=False, local_noise=False, local_radius=3): """ Combine two spectra into a single spectrum The two arrays are combined along the independent (x) values (row 0 of both arrays). If the x-range of `spec2` overlaps with that of `spec1`, then the dependent (row 1), error (row 2), and bitmask (row 3) of `spec2` is interpolated onto `spec1` and combined appropriately:: |------------| array 1 |--------------| array 2 |-------+++++----------| array 2 merged with array 1 In the combined example above, + indicates a combined point and - indicates a original point from array 1 or 2. If the x-ranges of `spec1` and `spec2` do not overlap, then arrays will simply be joined together. Note that the final output array is sorted according to x (lowest -> highest). If the arrays do not overlap then the separation between the arrays are marked by two NaNs in all rows > 0: one at the last element of the lowest x-ranged array and one as the first element of the highest x-ranged array:: |----------o-| array 1 |-o----------| array 2 |----------oxxo----------| array 2 merged with array 1 In the separated example above, x indicates a NaN and - indicates an original point from array 1 or 2, and 'o' marks the penultimate or second element. Note that all NaNs will be trimmed from the beginning and end of the input arrays Parameters ---------- spec1 : array_like of float (2-4, N) array matching the shape of `spec2` where: spec1[0] = independent variable spec1[1] = dependent variable spec1[2] = error in dependent variable, optional spec1[3] = bitmask spec2 : array_like of float (2-4, N) array matching the shape of `spec1` where: spec2[0] = independent variable spec2[1] = dependent variable spec2[2] = error in dependent variable, optional spec2[3] = bitmask info : dict, optional If supplied will be updated with: overlap_range -> numpy.ndarray (2,) The (min, max) wavelength range over which arrays overlap. sum_flux : bool, optional If True, sum the flux instead of averaging. s2n_threshold : float, optional If set and the value is greater than 0, and errors are provided, data below this value times the reference signal-to-noise (S/N) value in the spectrum will be clipped before combination. s2n_statistic : {'median', 'mean', 'max'}, optional Statistic to use for computing the reference S/N value. Default is median. noise_test : bool, optional If set, only the noise is considered for thresholding spectra. The s2n_threshold is interpreted as a fraction of 1/noise. local_noise : bool, optional If set, noise for the spectrum is computed from the standard deviation in a sliding window with radius local_radius. local_radius : int, optional Sets the local window in pixels for computing noise if local_noise is set. Returns ------- numpy.ndarray of numpy.float64 (2-4, M) array where row index gives the following values: 0: combined independent variable 1: combined dependent variable 2: combined error 3: combined bit-mask """ spec1, spec2 = np.array(spec1), np.array(spec2) shape = spec1.shape if spec1.ndim != 2 or spec2.ndim != 2: raise ValueError("spec1 and spec2 must have 2 dimensions") elif shape[0] < 2: raise ValueError("spec1 and spec2 must have 2 or more rows") elif shape[0] != spec2.shape[0]: raise ValueError("spec1 and spec2 must have equal rows") if s2n_statistic == 'mean': func = np.nanmean elif s2n_statistic == 'max': func = np.nanmax else: func = np.nanmedian doerr = shape[0] >= 3 dobit = shape[0] >= 4 if doerr: # Check for zero-error data bad_error = ~np.isfinite(spec1[2]) | (spec1[2] == 0) spec1[1][bad_error] = np.nan spec1[2][bad_error] = np.nan bad_error = ~np.isfinite(spec2[2]) | (spec2[2] == 0) spec2[1][bad_error] = np.nan spec2[2][bad_error] = np.nan # Trim NaNs from the beginning and ends of the spectrum spec1 = spec1[:, nantrim(spec1[1], 2)] spec2 = spec2[:, nantrim(spec2[1], 2)] range1 = np.nanmin(spec1[0]), np.nanmax(spec1[0]) range2 = np.nanmin(spec2[0]), np.nanmax(spec2[0]) # get good S/N range if desired ok_s2n, s2n_1, s2n_2 = None, None, None if doerr and s2n_threshold is not None and s2n_threshold > 0: if local_noise: log.debug('Using sliding standard deviation for noise') noise_1 = _sliding_stddev(spec1[1], local_radius) noise_2 = _sliding_stddev(spec2[1], local_radius) else: log.debug('Using input error for noise') noise_1 = spec1[2] noise_2 = spec2[2] if noise_test: log.debug(f'Threshold statistic is {func} on 1/N') s2n_1 = 1 / noise_1 s2n_2 = 1 / noise_2 else: log.debug(f'Threshold statistic is {func} on S/N') s2n_1 = spec1[1] / noise_1 s2n_2 = spec2[1] / noise_2 # compute threshold from equal sized portions at ends # of spectra ok_s2n = [s2n_threshold * func(s2n_1), s2n_threshold * func(s2n_2)] log.debug(f'S/N threshold: {ok_s2n}') if not (ok_s2n[0] > 0 and ok_s2n[1] > 0): log.warning(f'Bad S/N; ignoring threshold ' f'values {ok_s2n}') ok_s2n = None overlapped, overlap_range = False, [np.nan, np.nan] overlap = np.empty((shape[0], 0)) overlap2 = (spec2[0] >= range1[0]) & (spec2[0] <= range1[1]) if overlap2.any(): # Interpolate dependent values (and error) from spec2 onto spec1 ie2 = spec2[2, overlap2] if doerr else None if2 = interpspec(spec2[0, overlap2], spec2[1, overlap2], spec1[0], error=ie2, leavenans=True, cval=np.nan) if2, ie2 = if2 if doerr else (if2, None) if ok_s2n is not None: i_s2n_2 = interpspec(spec2[0, overlap2], s2n_2[overlap2], spec1[0], leavenans=True, cval=np.nan) else: i_s2n_2 = None overlap1 = nantrim(if2, 2) overlapped = overlap1.any() if overlapped: overlap = np.full((shape[0], overlap1.sum()), np.nan) overlap[0] = spec1[0][overlap1] if2 = [spec1[1][overlap1], if2[overlap1]] if doerr: ie2 = np.array([spec1[2][overlap1], ie2[overlap1]]) ** 2 # combine dependent values (and error) if sum_flux: overlap[1] = np.sum(if2, axis=0) if doerr: overlap[2] = np.sqrt(np.sum(ie2, axis=0)) elif ok_s2n is not None: use_s1 = ((s2n_1[overlap1] > ok_s2n[0]) & (i_s2n_2[overlap1] <= ok_s2n[1])) use_s2 = ((s2n_1[overlap1] <= ok_s2n[0]) & (i_s2n_2[overlap1] > ok_s2n[1])) combined = meancomb(if2, variance=ie2, ignorenans=False, axis=0, returned=doerr) overlap[1] = combined[0] overlap[1][use_s1] = if2[0][use_s1] overlap[1][use_s2] = if2[1][use_s2] overlap[2] = np.sqrt(combined[1]) overlap[2][use_s1] = np.sqrt(ie2[0][use_s1]) overlap[2][use_s2] = np.sqrt(ie2[1][use_s2]) else: if2 = meancomb(if2, variance=ie2, ignorenans=False, axis=0, returned=doerr) if doerr: overlap[1] = if2[0] overlap[2] = np.sqrt(if2[1]) else: overlap[1] = if2 # interpolate bit-flags from spec2 to spec1 and then combine if dobit: ib2 = interpflagspec( spec2[0, overlap2], spec2[3, overlap2], spec1[0]) overlap[3] = combflagstack( [spec1[3, overlap1], ib2[overlap1]]) overlap_range = [np.nanmin(overlap[0]), np.nanmax(overlap[0])] sortw = np.argsort(overlap[0]) overlap = overlap[np.arange(shape[0])[:, None], sortw[None]] with warnings.catch_warnings(): warnings.simplefilter('ignore', RuntimeWarning) highcut = np.min([range1[1], range2[1]]) lowcut = np.max([range1[0], range2[0]]) if range1[0] < range2[0]: left = spec1[:, spec1[0] < lowcut] else: left = spec2[:, spec2[0] < lowcut] if range1[1] > range2[1]: right = spec1[:, spec1[0] > highcut] else: right = spec2[:, spec2[0] > highcut] if left.shape[1] > 1: sortw = np.argsort(left[0]) left = left[np.arange(shape[0])[:, None], sortw[None]] if not overlapped: left[1:, -1] = np.nan overlap = np.concatenate((left, overlap), axis=1) if right.shape[1] > 1: sortw = np.argsort(right[0]) right = right[np.arange(shape[0])[:, None], sortw[None]] if not overlapped: right[1:, 0] = np.nan overlap = np.concatenate((overlap, right), axis=1) if isinstance(info, dict): info['overlap_range'] = np.array(overlap_range) return overlap
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def _sliding_stddev(data, radius): # pragma: no cover n = data.size r = int(radius) result = np.empty(n, dtype=nb.float64) for i in range(n): start = i - r end = i + r if start < 0: start = 0 if end > n: end = n result[i] = np.nanstd(data[start:end]) return result