Source code for sofia_redux.scan.custom.sofia.info.sofia_info_numba_functions

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import numba as nb
import numpy as np

nb.config.THREADING_LAYER = 'threadsafe'

__all__ = ['get_drift_corrections']


[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def get_drift_corrections(frame_utc, frame_valid, drift_utc_ranges, drift_deltas): # pragma: no cover """ Get the drift corrections by linear interpolation between drifts. The drift correction will be given as: delta_i * (frame_time - start_i) / (end_i - start_i) where the start and end are the UTC start and end times for a given drift where start < frame_time <= end. If the frame_time is greater than the maximum available drift end time, then extrapolation will be used. Parameters ---------- frame_utc : numpy.ndarray (float) An array of shape (n_frames,) containing the frame UTC times. frame_valid : numpy.ndarray (bool) A boolean mask of shape (n_frames,) where `False` skips the correction calculation for that frame. drift_utc_ranges : numpy.ndarray (float) The start and end times for the drifts of shape (n_drifts, 2) where index [0, 0] gives the start time of the first drift, and index [0, 1] gives the end time of the first drift. drift_deltas : numpy.ndarray (float) The drift offset deltas of shape (n_drifts, 2) where index [0, 0] gives the delta of the first drift in the x-direction, and index [0, 1] gives the delta of the first drift in the y-direction. Returns ------- drift_correction, extrapolation_frame : numpy.ndarray (float), int The drift correction is an array of shape (n_frames, 2) where index [0, 0] gives the correction for frame 0 in the x-direction and index [0, 1] gives the correction for frame 0 in the y-direction. The extrapolation frame will be set to a positive number indicating the frame from which extrapolation was required. If no extrapolation occurred, this value is -1. """ extrapolate_index = -1 drift_index = 0 n_frames = frame_utc.size n_drifts = drift_utc_ranges.shape[0] drift_correction = np.empty((n_frames, 2), dtype=nb.float64) if n_drifts == 0: for frame in range(n_frames): drift_correction[frame, 0] = 0.0 drift_correction[frame, 1] = 0.0 return drift_correction, extrapolate_index min_utc = drift_utc_ranges[0, 0] max_utc = drift_utc_ranges[0, 1] utc_span = max_utc - min_utc dx = drift_deltas[0, 0] dy = drift_deltas[0, 1] for frame in range(frame_utc.size): if not frame_valid[frame]: drift_correction[frame, 0] = 0.0 drift_correction[frame, 1] = 0.0 continue utc = frame_utc[frame] if extrapolate_index < 0 and (utc < min_utc or utc > max_utc): drift_index += 1 if drift_index >= n_drifts: extrapolate_index = frame drift_index = -1 min_utc = drift_utc_ranges[drift_index, 0] max_utc = drift_utc_ranges[drift_index, 1] utc_span = max_utc - min_utc dx = drift_deltas[drift_index, 0] dy = drift_deltas[drift_index, 1] dt = (utc - min_utc) / utc_span drift_correction[frame, 0] = dx * dt drift_correction[frame, 1] = dy * dt return drift_correction, extrapolate_index