Source code for sofia_redux.scan.source_models.source_numba_functions

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import numba as nb
import numpy as np

from sofia_redux.toolkit.splines import spline_utils

nb.config.THREADING_LAYER = 'threadsafe'

__all__ = ['calculate_coupling_increment', 'get_sample_points',
           'blank_sample_values', 'flag_out_of_range_coupling',
           'sync_map_samples', 'get_delta_sync_parms', 'flag_outside',
           'validate_pixel_indices', 'add_skydip_frames',
           'get_source_signal']


[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def calculate_coupling_increment(map_indices, base_values, map_values, map_noise, sync_gains, source_gains, frame_data, frame_weight, frame_gains, frame_valid, sample_flags, channel_indices, min_s2n, max_s2n, exclude_flag ): # pragma: no cover """ Calculate and return the coupling increment factor. The coupling increment factor may be used to increment current coupling values and weights for each channel by:: increment = sum_{frames}(fw * residual * expected) / sum_{frames}(fw * expected^2) where fw are the frame weights, d are the frame data and:: expected = frame_gain * channel_source_gain * map_value prior = frame_gain * channel_sync_gain * base_value residual = d + prior - expected The channel sync gains and base values are the channel source gains and map values from the previous iteration. Parameters ---------- map_indices : numpy.ndarray (int) The indices of each frame/channel sample on the source map. Should be of shape (n_dimensions, n_frames, all_channels) with dimensions in (x, y) FITS order. base_values : numpy.ndarray (float) An image of arbitrary grid shape in n_dimensions. Contains the base map values (priors). map_values : numpy.ndarray (float) The current map image of arbitrary grid shape in n_dimensions. map_noise : numpy.ndarray (float) The current map noise values of arbitrary grid shape in n_dimensions. sync_gains : numpy.ndarray (float) The previous channel source gains (sync gains) of shape (all_channels,). source_gains : numpy.ndarray (float) The current channel source gains of shape (all_channels,). frame_data : numpy.ndarray (float) The integration frame data of shape (n_frames, all_channels,). frame_weight : numpy.ndarray (float) The relative frame weights of shape (n_frames,). frame_gains : numpy.ndarray (float) The frame gains of shape (n_frames,). frame_valid : numpy.ndarray (bool) A boolean mask of shape (n_frames,) where `False` excludes a frame from processing. sample_flags : numpy.ndarray (int) The integration sample flags of shape (n_frames, all_channels) where any sample flagged with `exclude_flag` will be excluded from processing. channel_indices : numpy.ndarray (int) The channel indices for which to calculate coupling increments of shape (n_channels,). min_s2n : float The minimum signal-to-noise ratio on the map for which to calculate coupling increments. max_s2n : float The maximum signal-to-noise ratio on the map for which to calculate coupling increments. exclude_flag : int An integer flag used to exclude samples from processing. Returns ------- coupling_increment : numpy.ndarray (float) The coupling increment factors of shape (all_channels,). """ # Frame gains are integration.gain * frame.source_gain n_frames, n_channels = frame_data.shape coupling_increment = np.zeros(n_channels, dtype=nb.float64) coupling_weight = np.zeros(n_channels, dtype=nb.float64) for frame in range(n_frames): if not frame_valid[frame]: continue weight = frame_weight[frame] if weight == 0: continue frame_gain = frame_gains[frame] if frame_gain == 0: continue # Remove source from all but the blind channels. for channel in channel_indices: if (sample_flags[frame, channel] & exclude_flag) != 0: continue x, y = map_indices[:, frame, channel] if x < 0 or y < 0: continue noise = map_noise[y, x] if noise <= 0: continue map_value = map_values[y, x] s2n = abs(map_value / noise) if s2n < min_s2n or s2n > max_s2n: continue base_value = base_values[y, x] prior = frame_gain * sync_gains[channel] * base_value expected = frame_gain * source_gains[channel] * map_value residual = frame_data[frame, channel] + prior - expected coupling_increment[channel] += weight * residual * expected coupling_weight[channel] += weight * expected * expected for channel in channel_indices: weight = coupling_weight[channel] if weight > 0: coupling_increment[channel] /= weight else: coupling_increment[channel] = 0.0 return coupling_increment
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def get_sample_points(frame_data, frame_gains, frame_weights, source_gains, channel_variance, valid_frames, map_indices, channel_indices, sample_flags, exclude_sample_flag ): # pragma: no cover """ Return the cross-product of frame and channel gains. Parameters ---------- frame_data : numpy.ndarray (float) The frame data of shape (n_frames, all_channels). frame_gains : numpy.ndarray (float) The frame gains of shape (n_frames,). frame_weights : numpy.ndarray (float) The frame relative weights of shape (n_frames,). source_gains : numpy.ndarray (float) The channel (pixel) gains of shape (all_channels,) channel_variance : numpy.ndarray (float) The channel (pixel) variances of shape (n_channels,) valid_frames : numpy.ndarray (bool) A boolean mask of shape (n_frames,) where `False` indicates a frame that should be excluded from all calculations. map_indices : numpy.ndarray (int) The map indices of shape (2, n_frames, all_channels) containing the (ix, iy) map indices. For pixel maps, all pixels have zero position, and therefore, map_indices depend solely of frame position. In these cases, the shape should be (2, n_frames, 1) to indicate that only a single channel position is relevant. channel_indices : numpy.ndarray (int) The channel indices of shape (n_channels,) mapping n_channels onto all_channels. sample_flags : numpy.ndarray (int) The sample flag array of shape (n_frames, all_channels). Any flag containing `exclude_sample_flag` will be ignored. exclude_sample_flag : int Indicates which sample flags should be ignored. Returns ------- n, data, gains, weights, indices : 5-tuple (int, array+) The total number of mapping frames and the cross product of frame_gains and source_gains as an array of shape (n_frames, n_channels). Any invalid frames/sample flags will result in a NaN gain value for the sample in question. """ n_frames = frame_gains.size n_channels = channel_indices.size n_dimensions = map_indices.shape[0] mapping_frames = 0 sample_data = np.empty((n_frames, n_channels), dtype=nb.float64) sample_gains = np.empty((n_frames, n_channels), dtype=nb.float64) sample_weights = np.empty((n_frames, n_channels), dtype=nb.float64) sample_indices = np.empty((n_dimensions, n_frames, n_channels), dtype=nb.int64) is_pixel_map = map_indices.shape[2] == 1 and n_channels > 1 for frame in range(n_frames): frame_gain = frame_gains[frame] frame_weight = frame_weights[frame] if (not valid_frames[frame] or frame_gain == 0 or frame_weight == 0 or np.isnan(frame_gain)): for i in range(n_channels): blank_sample_values(frame, i, n_dimensions, sample_data, sample_gains, sample_weights, sample_indices) continue mapping_frames += 1 for i, channel in enumerate(channel_indices): channel_gain = source_gains[channel] var = channel_variance[i] sample_flag = sample_flags[frame, channel] if (channel_gain == 0 or var == 0 or (sample_flag & exclude_sample_flag != 0)): blank_sample_values(frame, i, n_dimensions, sample_data, sample_gains, sample_weights, sample_indices) continue sample_gains[frame, i] = frame_gain * channel_gain sample_data[frame, i] = frame_data[frame, channel] sample_weights[frame, i] = frame_weight / var if is_pixel_map: channel_map_index = 0 else: channel_map_index = channel for dimension in range(n_dimensions): sample_indices[dimension, frame, i] = ( map_indices[dimension, frame, channel_map_index]) return (mapping_frames, sample_data, sample_gains, sample_weights, sample_indices)
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def blank_sample_values(frame, channel_index, n_dimensions, sample_data, sample_gains, sample_weights, sample_indices ): # pragma: no cover """ Set bad frame/channel indices to blank values. Sets frame data and sample gains to NaN values at the frame/channel_index. weights are set to zero and sample indices are set to -1. Parameters ---------- frame : int The bad frame index. channel_index : int The bad channel index. n_dimensions : int The number of dimensions in the data. sample_data : numpy.ndarray (float) The sample data of shape (n_frames, n_channels). sample_gains : numpy.ndarray (float) The sample gains of shape (n_frames, n_channels). sample_weights : numpy.ndarray (float) The sample weights of shape (n_frames, n_channels). sample_indices : numpy.ndarray (int) The sample map indices of shape (n_dimensions, n_frames, n_channels). Returns ------- None """ sample_gains[frame, channel_index] = np.nan sample_data[frame, channel_index] = np.nan sample_weights[frame, channel_index] = 0.0 for dimension in range(n_dimensions): sample_indices[dimension, frame, channel_index] = -1
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=True) def flag_out_of_range_coupling(channel_indices, coupling_values, min_coupling, max_coupling, flags, blind_flag ): # pragma: no cover """ Flags channels with an out-of-range coupling value as "BLIND" Note that only previously unflagged channels will be flagged. Parameters ---------- channel_indices : numpy.ndarray (int) The channel indices to check of shape (n_channels,). coupling_values : numpy.ndarray (float) The coupling values of shape (all_channels,). min_coupling : float The minimum allowable coupling value. max_coupling : float The maximum allowable coupling value. flags : numpy.ndarray (int) The channel flags of shape (all_channels,). Will be updated in-place. blind_flag : int The integer flag marking a "BLIND" channel. Returns ------- None """ for channel in channel_indices: if flags[channel] != 0: continue coupling_value = coupling_values[channel] if (coupling_value < min_coupling) or (coupling_value > max_coupling): flags[channel] |= blind_flag
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def sync_map_samples(frame_data, frame_valid, frame_gains, channel_indices, map_values, map_valid, map_masked, map_indices, base_values, base_valid, source_gains, sync_gains, sample_flags, sample_blank_flag): # pragma: no cover """ Remove map source gains from frame data and flag samples. For a given sample at frame i and channel j, frame data d_{i,j} will be decremented by dg where:: dg = fg * ( (gain(source) * map[index]) - (gain(sync) * base[index]) ) Here, fg is the frame gain and index is the index on the map of sample (i,j). Any masked map value will result in matching samples being flagged. Parameters ---------- frame_data : numpy.ndarray (float) The frame data of shape (n_frames, all_channels). Data will be updated in-place. frame_valid : numpy.ndarray (bool) A boolean mask of shape (n_frames,) where `False` excludes a frame from all processing. frame_gains : numpy.ndarray (float) An array of frame gains of shape (n_frames,). channel_indices : numpy.ndarray (int) The channel indices of shape (n_channels,) for which to sync map gains. map_values : numpy.ndarray (float) The current map supplied as a regular grid of arbitrary shape and dimensions (image_shape,). map_valid : numpy.ndarray (bool) A boolean mask of shape (image_shape,) where `False` excludes a map element from synchronization. map_masked : numpy.ndarray (bool) A boolean mask of shape (image_shape,) where `True` will result in any frame/channel sample being flagged with the `sample_blank_flag` flag. Similarly, `False` will unflag samples. map_indices : numpy.ndarray (int) The sample map indices of shape (n_dimensions, n_frames, all_channels) where dimensions are ordered in (x,y) FITS format. These contain map (pixel) indices for each sample. For pixel maps, all pixels will have zero position values, so the array should be of shape (n_dimensions, n_frames, 1) in this case. base_values : numpy.ndarray (float) The base map values (priors) of shape (image_shape,). base_valid : numpy.ndarray (bool) A boolean mask of shape (image_shape,) where `False` indicates that the corresponding base map value is invalid and will be set to zero during processing. source_gains : numpy.ndarray (float) The channel source gains of shape (all_channels,). sync_gains : numpy.ndarray (float) The prior channel source gains of shape (all_channels,). sample_flags : numpy.ndarray (int) The sample flags of shape (n_frames, all_channels,). Will be updated in-place. sample_blank_flag : int The integer flag with which to flag samples that contain a masked map value. Returns ------- None """ map_shape = np.asarray(map_values.shape) _, _, row_col_steps = spline_utils.flat_index_mapping(map_shape) fits_shape = map_shape[::-1] # Convert from (row, col) to (col, row) for FITS type data indexing steps = row_col_steps[::-1] n_dimensions = steps.size flat_map_values = map_values.flat flat_map_valid = map_valid.flat flat_map_masked = map_masked.flat flat_base_values = base_values.flat flat_base_valid = base_valid.flat unflag_blank = ~sample_blank_flag is_pixel_map = map_indices.shape[2] == 1 and channel_indices.size > 1 n_frames = frame_data.shape[0] for frame in range(n_frames): if not frame_valid[frame]: continue frame_gain = frame_gains[frame] for channel in channel_indices: if is_pixel_map: map_index = map_indices[:, frame, 0] else: map_index = map_indices[:, frame, channel] flat_index = 0 for dimension in range(n_dimensions): index = map_index[dimension] if index < 0 or index >= fits_shape[dimension]: flat_index = -1 break flat_index += steps[dimension] * index if flat_index < 0: continue # Invalid index if not flat_map_valid[flat_index]: continue # Remove from frame data # Do not check for flags to get a true difference image. if frame_gain > 0: if not flat_base_valid[flat_index]: base_value = 0.0 else: base_value = flat_base_values[flat_index] map_value = flat_map_values[flat_index] decrement = source_gains[channel] * map_value decrement -= sync_gains[channel] * base_value decrement *= frame_gain frame_data[frame, channel] -= decrement # Blank samples here. if flat_map_masked[flat_index]: sample_flags[frame, channel] |= sample_blank_flag else: sample_flags[frame, channel] &= unflag_blank
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def get_source_signal(frame_data, frame_valid, frame_gains, frame_weights, channel_flags, channel_variance, map_values, map_valid, map_indices, sync_gains ): # pragma: no cover """ Remove map source gains from frame data and flag samples. For a given sample at frame i and channel j, frame data d_{i,j} will be decremented by dg where:: dg = fg * ( (gain(source) * map[index]) - (gain(sync) * base[index]) ) Here, fg is the frame gain and index is the index on the map of sample (i,j). Any masked map value will result in matching samples being flagged. Parameters ---------- frame_data : numpy.ndarray (float) The frame data of shape (n_frames, all_channels). Data will be updated in-place. frame_valid : numpy.ndarray (bool) A boolean mask of shape (n_frames,) where `False` excludes a frame from all processing. frame_gains : numpy.ndarray (float) An array of frame gains of shape (n_frames,). frame_weights : numpy.ndarray (float) The frame weights of shape (n_frames,). channel_flags : numpy.ndarray (int) The channel flags of shape (n_channels,). Any nonzero value will channel_variance : numpy.ndarray (int) The channel variance of shape (n_channels,). map_values : numpy.ndarray (float) The current map supplied as a regular grid of arbitrary shape and dimensions (image_shape,). map_valid : numpy.ndarray (bool) A boolean mask of shape (image_shape,) where `False` excludes a map element from synchronization. map_indices : numpy.ndarray (int) The sample map indices of shape (n_dimensions, n_frames, all_channels) where dimensions are ordered in (x,y) FITS format. These contain map (pixel) indices for each sample. For pixel maps, all pixels will have zero position values, so the array should be of shape (n_dimensions, n_frames, 1) in this case. sync_gains : numpy.ndarray (float) The prior channel source gains of shape (all_channels,). Returns ------- source_signal, source_error : numpy.ndarray, numpy.ndarray """ map_shape = np.asarray(map_values.shape) _, _, row_col_steps = spline_utils.flat_index_mapping(map_shape) fits_shape = map_shape[::-1] # Convert from (row, col) to (col, row) for FITS type data indexing steps = row_col_steps[::-1] n_dimensions = steps.size flat_map_values = map_values.flat flat_map_valid = map_valid.flat n_frames = frame_data.shape[0] n_channels = channel_variance.size is_pixel_map = map_indices.shape[2] == 1 and n_channels > 1 source_signal = np.full(frame_data.shape, np.nan) source_error = np.zeros(frame_data.shape) for frame in range(n_frames): if not frame_valid[frame]: continue frame_gain = frame_gains[frame] if frame_gain <= 0: continue frame_weight = frame_weights[frame] for channel in range(n_channels): if channel_flags[channel] != 0: continue if is_pixel_map: map_index = map_indices[:, frame, 0] else: map_index = map_indices[:, frame, channel] flat_index = 0 for dimension in range(n_dimensions): index = map_index[dimension] if index < 0 or index >= fits_shape[dimension]: flat_index = -1 break flat_index += steps[dimension] * index if flat_index < 0: continue # Invalid index if not flat_map_valid[flat_index]: continue sample_gain = sync_gains[channel] * frame_gain var = channel_variance[channel] if var > 0: wt = sample_gain * sample_gain * frame_weight / var if wt > 0: source_error[frame, channel] = np.sqrt(1.0 / wt) map_value = flat_map_values[flat_index] increment = map_value * sample_gain source_signal[frame, channel] = increment return source_signal, source_error
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def get_delta_sync_parms(channel_source_gains, channel_indices, channel_flags, channel_variance, frame_weight, frame_source_gains, frame_valid, frame_flags, source_flag, n_points ): # pragma: no cover """ Calculate frame/channel dependent deltas following source synchronization. Source synchronization is the process of removing the source model from the integration frame data. Following this, dependents (scaled degrees-of- freedom) need to be updated to account for the operation. The dependent increments are calculated as follows:: channel_dp = n_p * channel_gain^2 / variance frame_dp = n_f * frame_weight * |frame_gain| n_p = n_points / sum_{channels}(channel_gain^2/variance) n_f = n_points / sum_{frames}(frame_weight * frame_gain) where n_points effective number of pixels in the source model (possibly accounting for smoothing). Parameters ---------- channel_source_gains : numpy.ndarray (float) The channel source gains of shape (all_channels,). channel_indices : numpy.ndarray (int) The indices of the channels to process of shape (n_channels,). channel_flags : numpy.ndarray (int) The integer flags marking various status types of shape (n_channels,). Only zero-values flags will be included in the calculation. channel_variance : numpy.ndarray (float) The variance associated with each channel of shape (n_channels,). frame_weight : numpy.ndarray (float) The relative weight of each frame of shape (n_frames,). frame_source_gains : numpy.ndarray (float) The frame source gains of shape (n_frames,). frame_valid : numpy.ndarray (bool) A boolean mask where `False` excludes a given frame from inclusion in the calculation and results in that same frame receiving a zero-valued dependent increment. Should be of shape (n_frames,). frame_flags : numpy.ndarray (int) The integer flags marking various status types for each frame of shape (n_frames,). Any frames marked with `source_flag` will be excluded from all calculations and receive a zero-valued dependent increment. source_flag : int The integer used to mark a frame with the SOURCE status. n_points : int or float The effective number of points in the source model. Returns ------- frame_dp, channel_dp : numpy.ndarray, numpy.ndarray The frame and channel dependent deltas of shape (n_frames,) and (n_channels,), both of float type. """ n_channels = channel_indices.size n_frames = frame_valid.size sum_pw = 0.0 for i, channel in enumerate(channel_indices): if channel_flags[i] != 0: continue gain = channel_source_gains[channel] sum_pw += gain * gain / channel_variance[i] sum_fw = 0.0 for frame in range(n_frames): if not frame_valid[frame]: continue if (frame_flags[frame] & source_flag) != 0: continue sum_fw += frame_weight[frame] * frame_source_gains[frame] n_p = n_points / sum_pw if sum_pw > 0 else 0.0 n_f = n_points / sum_fw if sum_fw > 0 else 0.0 frame_dp = np.empty(n_frames, dtype=nb.float64) channel_dp = np.empty(n_channels, dtype=nb.float64) for i, channel in enumerate(channel_indices): if channel_flags[i] != 0: channel_dp[i] = 0.0 continue gain = channel_source_gains[channel] channel_dp[i] = n_p * gain * gain / channel_variance[i] for frame in range(n_frames): if not frame_valid[frame] or (frame_flags[frame] & source_flag) != 0: frame_dp[frame] = 0.0 continue frame_dp[frame] = (n_f * frame_weight[frame] * np.abs(frame_source_gains[frame])) return frame_dp, channel_dp
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def flag_outside(sample_coordinates, valid_frames, channel_indices, sample_flags, skip_flag, map_range): # pragma: no cover """ Flag samples outside of the allowed mapping range. Frames with no channels inside the allowable range will be flagged as invalid. Parameters ---------- sample_coordinates : numpy.ndarray (float) An array of shape (2, n_frames, n_channels) containing the (x, y) coordinates of each sample measurement. valid_frames : numpy.ndarray (bool) A boolean mask of shape (n_frames,). Frames flagged as `False` will not be included in any calculations. channel_indices : numpy.ndarray (int) The indices mapping n_channels onto all_channels. sample_flags : numpy.ndarray (int) The sample flags to update. An array of shape (n_frames, all_channels). skip_flag : int The integer flag to mark a sample as SKIP (outside mapping range). map_range : numpy.ndarray (float) An array of shape (2, 2) containing the [[min(x), max(x)], [min(y), max(y)]] allowable map range. Returns ------- None """ min_x, max_x = map_range[0] min_y, max_y = map_range[1] for frame_index in range(sample_coordinates.shape[1]): valid = False if not valid_frames[frame_index]: continue for i, channel_index in enumerate(channel_indices): x = sample_coordinates[0, frame_index, i] y = sample_coordinates[1, frame_index, i] if min_x <= x <= max_x and min_y <= y <= max_y: valid = True continue sample_flags[frame_index, channel_index] |= skip_flag valid_frames[frame_index] = valid
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def flag_z_outside(z, min_z, max_z, valid_frames, channel_indices, sample_flags, skip_flag): # pragma: no cover """ Flag samples outside of the allowed mapping range. Frames with no channels inside the allowable range will be flagged as invalid. Parameters ---------- z : numpy.ndarray (float) An array of shape (n_channels,) containing the z coordinates of each channel. min_z : float The minimum allowable z-value. max_z : float The maximum allowable z-value. valid_frames : numpy.ndarray (bool) A boolean mask of shape (n_frames,). Frames flagged as `False` will not be included in any calculations. channel_indices : numpy.ndarray (int) The indices mapping n_channels onto all_channels. sample_flags : numpy.ndarray (int) The sample flags to update. An array of shape (n_frames, all_channels). skip_flag : int The integer flag to mark a sample as SKIP (outside mapping range). Returns ------- None """ n = sample_flags.shape[0] for i, channel_index in enumerate(channel_indices): if z[i] < min_z or z[i] > max_z: for frame_index in range(n): if not valid_frames[frame_index]: continue sample_flags[frame_index, channel_index] |= skip_flag
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=True) def validate_pixel_indices(indices, x_size, y_size, valid_frame=None ): # pragma: no cover """ Set pixel indices outside of the map range to (0, 0). Parameters ---------- indices : numpy.ndarray (int) Pixel indices of shape (2, n_frames, n_channels) containing the (x, y) pixel indices. Any invalid sample (frame, channel) will be updated to (-1, -1) in-place. x_size : int The size of the map in x. y_size : int The size of the map in y. valid_frame : numpy.ndarray (bool), optional An optional flag mask where `False` excludes the given frame from the bad sample count (return value). Returns ------- bad_samples : int The number of pixels that fall outside the range of the map extent. """ bad_samples = 0 n_dimensions, n_frames, n_channels = indices.shape if valid_frame is None: check_valid = False valid = np.empty(0, dtype=nb.b1) else: check_valid = True valid = valid_frame for frame_index in range(n_frames): if check_valid: frame_is_valid = valid[frame_index] else: frame_is_valid = True for i in range(n_channels): # the channel indices px = indices[0, frame_index, i] py = indices[1, frame_index, i] if not (0 <= px < x_size): indices[:, frame_index, i] = -1 if frame_is_valid: bad_samples += 1 continue if not (0 <= py < y_size): indices[:, frame_index, i] = -1 if frame_is_valid: bad_samples += 1 continue return bad_samples
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=True) def validate_voxel_indices(indices, x_size, y_size, z_size, valid_frame=None ): # pragma: no cover """ Set pixel indices outside of the map range to (0, 0). Parameters ---------- indices : numpy.ndarray (int) Pixel indices of shape (3, n_frames, n_channels) containing the (x, y, z) pixel indices. Any invalid sample (frame, channel) will be updated to (-1, -1, -1) in-place. x_size : int The size of the map in x. y_size : int The size of the map in y. z_size : int The size of the map in z. valid_frame : numpy.ndarray (bool), optional An optional flag mask where `False` excludes the given frame from the bad sample count (return value). Returns ------- bad_samples : int The number of pixels that fall outside the range of the map extent. """ bad_samples = 0 n_dimensions, n_frames, n_channels = indices.shape if valid_frame is None: check_valid = False valid = np.empty(0, dtype=nb.b1) else: check_valid = True valid = valid_frame for frame_index in range(n_frames): if check_valid: frame_is_valid = valid[frame_index] else: frame_is_valid = True for i in range(n_channels): # the channel indices px = indices[0, frame_index, i] py = indices[1, frame_index, i] pz = indices[2, frame_index, i] if not (0 <= px < x_size): indices[:, frame_index, i] = -1 if frame_is_valid: bad_samples += 1 continue if not (0 <= py < y_size): indices[:, frame_index, i] = -1 if frame_is_valid: bad_samples += 1 continue if not (0 <= pz < z_size): indices[:, frame_index, i] = -1 if frame_is_valid: bad_samples += 1 continue return bad_samples
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def add_skydip_frames(data, weight, signal_values, signal_weights, frame_weights, frame_valid, data_bins ): # pragma: no cover """ Add signal values to the sky dip model data. The skydip model data (`data`) and weights (`weight`) are added to using increments of:: data += frame_weights * signal_weights * signal_values weight += frame_weights * signal_weights Parameters ---------- data : numpy.ndarray (float) The skydip model data of shape (n_data,). weight : numpy.ndarray (float) The skydip model data weight of shape (n_data,). signal_values : numpy.ndarray (float) The signal values of shape (n_frames,) signal_weights : numpy.ndarray (float) The signal value weights of shape (n_frames,) frame_weights : numpy.ndarray (float) The frame relative weights of shape (n_frames,) frame_valid : numpy.ndarray (bool) A mask of shape (n_frames,) where `False` exclude a given frame from processing. data_bins : numpy.ndarray (int) A mapping array of shape (n_frames,) in which data_bins[i] maps frame i onto the `data` and `weight` array indices. Returns ------- None """ n_frames = frame_valid.size data_size = data.size for frame in range(n_frames): if not frame_valid[frame]: continue data_bin = data_bins[frame] if data_bin < 0 or data_bin >= data_size: continue w = frame_weights[frame] * signal_weights[frame] if w == 0: continue data[data_bin] += w * signal_values[frame] weight[data_bin] += w