Source code for sofia_redux.scan.flags.flag_numba_functions

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import numba as nb
import numpy as np

from sofia_redux.toolkit.splines import spline_utils

nb.config.THREADING_LAYER = 'threadsafe'

__all__ = ['set_flags', 'unflag', 'flatten_nd_indices',
           'is_flagged', 'is_unflagged', 'get_mem_correction',
           'set_new_blank_value']


[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=True) def set_flags(flag_array, flag, indices=None): # pragma: no cover """ Add a flag indicator to an array of flags. Parameters ---------- flag_array : numpy.ndarray (int) An array of flag values to be updated in-place of shape (n,). flag : int The integer flag to set. indices : numpy.ndarray (int) or int, optional An array indicating which `flag_array` indices to flag with `flag`. The default is all indices. Returns ------- None """ flat_array = flag_array.flat if indices is None: for index in range(flag_array.size): flat_array[index] |= flag else: if flag_array.ndim > 1: flat_indices = flatten_nd_indices(indices, flag_array.shape).flat else: flat_indices = np.asarray(indices).flat for index in flat_indices: flat_array[index] |= flag
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=True) def unflag(flag_array, flag=None, indices=None): # pragma: no cover """ Remove a flag indicator from an array of flags. Parameters ---------- flag_array : numpy.ndarray (int) An array of flag values to be updated in-place of shape (n,). flag : int, optional The integer flag to remove. If not supplied, all flags are removed (set to zero). indices : numpy.ndarray (int) or int, optional An array indicating which `flag_array` indices to flag with `flag`. The default is all indices. Returns ------- None """ flat_array = flag_array.flat if indices is None: if flag is None: for index in range(flag_array.size): flat_array[index] = 0 else: for index in range(flag_array.size): flag_value = flat_array[index] if (flag_value & flag) == 0: continue flat_array[index] = flag_value ^ flag return if flag_array.ndim > 1: flat_indices = flatten_nd_indices(indices, flag_array.shape).flat else: flat_indices = np.asarray(indices).flat if flag is None: for index in flat_indices: flat_array[index] = 0 else: for index in flat_indices: flag_value = flat_array[index] if (flag_value & flag) == 0: continue flat_array[index] = flag_value ^ flag
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=True) def flatten_nd_indices(indices, array_shape): # pragma: no cover """ Converts ND array indices to flat indices. Parameters ---------- indices : tuple or numpy.ndarray (int) The ND indices in the form that would be returned by :func:`np.nonzero`. array_shape : tuple (int) or numpy.ndarray (int) The shape of the array for which to generate flat indices. Returns ------- array.flat, flat_indices : numpy.ndarray, numpy.ndarray (int) """ n_dimensions = len(indices) _, _, steps = spline_utils.flat_index_mapping(np.asarray(array_shape)) n = indices[0].size flat_indices = np.zeros(n, dtype=nb.int64) for dimension in range(n_dimensions): step = steps[dimension] index_line = indices[dimension] for i in range(n): flat_indices[i] += step * index_line[i] return flat_indices
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=True) def is_flagged(flag_array, flag=None, exact=False): # pragma: no cover """ Return whether a flag array is flagged with a given flag. Parameters ---------- flag_array : numpy.ndarray (int) An array of integer flags. flag : int, optional The flag to check. If not supplied, any non-zero flag will be returned as a `True` value. exact : bool, optional If `True`, a flagged result is one that exactly matches the flag. Otherwise, a flagged result is one which contains the flag. Returns ------- flagged : numpy.ndarray (bool) A mask the same shape as `flag_array` where `True` indicates an element is flagged. """ mask = np.empty(flag_array.shape, dtype=nb.b1) flat_mask = mask.flat flat_flag = flag_array.flat if flag is None: for i in range(flag_array.size): flat_mask[i] = flat_flag[i] != 0 elif flag == 0: for i in range(flag_array.size): flat_mask[i] = flat_flag[i] == 0 elif exact: for i in range(flag_array.size): flat_mask[i] = flat_flag[i] == flag else: for i in range(flag_array.size): flat_mask[i] = (flat_flag[i] & flag) != 0 return mask
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=True) def is_unflagged(flag_array, flag=None, exact=False): # pragma: no cover """ Return whether a flag array is unflagged by the given flag. Parameters ---------- flag_array : numpy.ndarray (int) An array of integer flags. flag : int, optional The flag to check. If not supplied, returns `True` if `flagged_array` is zero. exact : bool, optional If `True`, an unflagged result is one that does not exactly match the flag. Otherwise, an unflagged result is one which does not contain the flag. Returns ------- unflagged : numpy.ndarray (bool) A mask the same shape as `flag_array` where `True` indicates an element is unflagged. """ mask = np.empty(flag_array.shape, dtype=nb.b1) flat_mask = mask.flat flat_flag = flag_array.flat if flag is None: for i in range(flag_array.size): flat_mask[i] = flat_flag[i] == 0 elif flag == 0: for i in range(flag_array.size): flat_mask[i] = flat_flag[i] != 0 elif exact: for i in range(flag_array.size): flat_mask[i] = flat_flag[i] != flag else: for i in range(flag_array.size): flat_mask[i] = (flat_flag[i] & flag) == 0 return mask
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def get_mem_correction(data, noise, multiplier=0.1, valid=None, model=None ): # pragma: no cover """ Determine the Maximum-Entropy-Method (MEM) correction. The MEM correction is given by: dx = sign(x) * n * multiplier * log(sqrt(x^2 + n^2) / sqrt(m^2 + n^2)) Where x is a data value, n is the noise, and m is the model. Any invalid values (NaN, zero-divisions, marked invalid etc.) will result in zero valued MEM correction (dx) on output. Parameters ---------- data : numpy.ndarray (float) A data array of values with arbitrary shape (shape,). noise : numpy.ndarray (float) The noise array with shape (shape,). multiplier : float, optional The Lagrange multiplier. valid : numpy.ndarray (bool), optional A boolean mask of shape (shape,) where `False` excludes an element from the MEM correction. model : numpy.ndarray (float), optional An optional model of shape (shape,). By default, all model values are zero. Returns ------- mem_correction : numpy.ndarray (float) The MEM correction of shape (shape,). """ flat_data = data.flat flat_noise = noise.flat if valid is None: do_valid = False flat_valid = np.empty(0, dtype=nb.b1).ravel() else: do_valid = True flat_valid = valid.ravel() if model is None: do_model = False flat_model = np.empty(0, dtype=nb.float64).ravel() else: do_model = True flat_model = model.ravel() mem_values = np.empty_like(data) flat_mem = mem_values.flat for i in range(data.size): if do_valid and not flat_valid[i]: flat_mem[i] = 0.0 continue value = flat_data[i] if np.isnan(value): flat_mem[i] = 0.0 continue if do_model: target = flat_model[i] if np.isnan(target): flat_mem[i] = 0.0 continue else: target = 0.0 sigma = flat_noise[i] if np.isnan(sigma): flat_mem[i] = 0.0 continue d_target = np.hypot(target, sigma) if d_target == 0: flat_mem[i] = 0.0 continue d_value = np.hypot(value, sigma) mem_value = multiplier * sigma * np.log(d_value / d_target) if value < 0: mem_value *= -1 flat_mem[i] = mem_value return mem_values
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def set_new_blank_value(data, old_blank, new_blank): # pragma: no cover """ Set all old blanking levels in the data to the new blanking level. Parameters ---------- data : numpy.ndarray An arbitrarily shaped data array of arbitrary type. old_blank : int or float new_blank : int or float or None Returns ------- None """ if old_blank == new_blank or old_blank is new_blank or new_blank is None: return data = data.ravel() for i in range(data.size): if data[i] == old_blank or data[i] is old_blank: data[i] = new_blank