Source code for sofia_redux.scan.frames.frames_numba_functions

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import numba as nb
import numpy as np

nb.config.THREADING_LAYER = 'threadsafe'

__all__ = ['add_dependents', 'validate_frames', 'downsample_data']


[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=True) def add_dependents(dependents, dp, frame_valid, start_frame=None, end_frame=None, subtract=False): # pragma: no cover """ Add increments to the frame dependents. Increments will only be added to (or subtracted from) frame dependents for valid frames. Parameters ---------- dependents : numpy.ndarray (float) The frame dependents to increment of shape (n_frames,). dp : numpy.ndarray (float) or float The dependent values to add. frame_valid : numpy.ndarray (bool) A boolean mask of shape (n_frames,) where `False` excludes that frame from any processing. start_frame : int, optional The starting frame. end_frame : int, optional The non-inclusive ending frame. subtract : bool, optional If `True`, remove the dependents rather than adding. Returns ------- None """ n_frames = frame_valid.size if start_frame is None: start_frame = 0 if end_frame is None: end_frame = n_frames add_values = np.asarray(dp).flat singular = len(add_values) == 1 if singular: singular_value = add_values[0] else: singular_value = 0.0 if subtract: for frame in range(start_frame, end_frame): if not frame_valid[frame]: continue if singular: dependents[frame] -= singular_value else: dependents[frame] -= add_values[frame] else: for frame in range(start_frame, end_frame): if not frame_valid[frame]: continue if singular: dependents[frame] += singular_value else: dependents[frame] += add_values[frame]
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def validate_frames(valid, cos_a, sin_a, native_sin_lat, native_cos_lat, validated, has_telescope_info, mount, left_nasmyth, right_nasmyth): # pragma: no cover """ Checks validity of frame data and updates the angles based on mount type. The cos(latitude) and sin(latitude) values will be updated at this point if frames contain telescope data (`has_telescope_info`=`True`) and the current values are non-finite. the sin(latitude), cos(latitude) angles will be updated based on the mount type from the native sin(latitude) and cos(latitude) angles:: mount sin(latitude) cos(latitude) ------------- ------------- ------------- Left Nasmyth -sin(latitude) cos(latitude) Right Nasmyth sin(latitude) cos(latitude) Other 0 1 Note that latitude is in native coordinates (no reversal by coordinate axis will be applied). Parameters ---------- valid : numpy.ndarray (bool) Set to `True` if a frame is valid and `False` otherwise. cos_a : numpy.ndarray (float) The cos(angle) values of shape (n_frames,). Updated in-place. sin_a : numpy.ndarray (float) The sin(angle) values of shape (n_frames,). Updated in-place. native_sin_lat : numpy.ndarray (float) The sin(latitude) values of the native coordinates with shape (n_frames,). native_cos_lat : numpy.ndarray (float) The cos(latitude) values of the native coordinates with shape (n_frames,). validated : numpy.ndarray (bool) Indicates if a frame has been checked. If `True`, no action is taken. Otherwise, the validated flag will be set to `True` and cos_a/sin_a values will be fixed if NaN. has_telescope_info : numpy.ndarray (bool) If `True`, indicates that telescope information is available. If so, the `sin_a` and `cos_a` values will be updated depending on the mount type. mount : int The mount flag. left_nasmyth : int The left Nasmyth mount flag. right_nasmyth : int The right Nasmyth mount flag. Returns ------- None """ n_frames = cos_a.size for frame in range(n_frames): if validated[frame]: continue if not valid[frame]: validated[frame] = True continue validated[frame] = True if not has_telescope_info[frame]: continue # Fix bad angles if np.isfinite(cos_a[frame]) and np.isfinite(sin_a[frame]): continue if mount == left_nasmyth: sin_a[frame] = -native_sin_lat[frame] cos_a[frame] = native_cos_lat[frame] continue elif mount == right_nasmyth: sin_a[frame] = native_sin_lat[frame] cos_a[frame] = native_cos_lat[frame] continue else: sin_a[frame] = 0.0 cos_a[frame] = 1.0
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def downsample_data(data, sample_flag, valid, window, start_indices ): # pragma: no cover """ Downsample data to a new resolution using a window kernel. Parameters ---------- data : numpy.ndarray (float) The high resolution data to downsample of shape (n_frames, n_channels). sample_flag : numpy.ndarray (int) The integer sample flags marking normal data (0) or some other type (nonzero). The low resolution data will be the result of an OR operation of all sample flags within the kernel window. valid : numpy.ndarray (float) The boolean mask validity array for the low-resolution output data of shape (low_resolution_frames,) where `False` indicates that the low resolution output frame is invalid and should not be populated. window : numpy.ndarray (float) The convolution kernel to apply for the downsampling of shape (n_windows,). This should be supplied in a normalized form such that sum(window) = 1 as no subsequent normalization occurs. start_indices : numpy.ndarray (int) For each low-resolution frame, the frame index on the high resolution data indicating where the first element of the `window` should be placed for the convolution. Should be of shape (low_resolution_frames,). Returns ------- low_resolution_data, low_resolution_flags : numpy.ndarray, numpy.ndarray The low resolution data and sample flags. """ n_window = window.size high_frames, n_channels = data.shape low_frames = valid.size new_data = np.empty((low_frames, n_channels), dtype=nb.float64) new_flag = np.empty((low_frames, n_channels), dtype=nb.int64) for channel in range(n_channels): for new_frame in range(low_frames): if not valid[new_frame]: new_data[new_frame, channel] = np.nan new_flag[new_frame, channel] = 0 continue start_index = start_indices[new_frame] d_value = 0.0 d_flag = 0 for window_index in range(n_window): scaling = window[window_index] if scaling == 0: continue frame_index = start_index + window_index value = scaling * data[frame_index, channel] flag = sample_flag[frame_index, channel] d_value += value d_flag |= flag new_data[new_frame, channel] = d_value new_flag[new_frame, channel] = d_flag return new_data, new_flag