Source code for sofia_redux.scan.channels.channel_numba_functions

# Licensed under a 3-clause BSD style license - see LICENSE.rst

import numba as nb
import numpy as np
from sofia_redux.scan.utilities import numba_functions

nb.config.THREADING_LAYER = 'threadsafe'

__all__ = ['flag_weights', 'get_typical_gain_magnitude',
           'get_one_over_f_stat', 'get_source_nefd']


[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def flag_weights(channel_gain, channel_weight, channel_dof, channel_flags, min_weight, max_weight, exclude_flag, dof_flag, sensitivity_flag, default_weight): # pragma: no cover """ Flag channels according to degrees-of-freedom and weight. Channels will be flagged with the DOF (degrees-of-freedom) flag if its degrees-of-freedom is <= 0, or unflagged otherwise. Channels will also be flagged for sensitivity if:: wg2 < m * min_weight wg2 > m * max_weight where wg2 = channel_weight * channel_gain^2, and:: m = exp(inner_80_percent_mean(ln(1 + wg2))) - 1 Channels that fall within the above range are likewise unflagged for sensitivity. Any channels that are fully or partially marked with the exclude flag will not be included in the mean calculation, but may have flags set. The sum of wg2 for all zero flagged channels will be returned. Parameters ---------- channel_gain : numpy.ndarray (float) The gains for each channel of shape (n_channels,). channel_weight : numpy.ndarray (float) The weights for each channel of shape (n_channels,). Weights are equivalent to 1/variance. channel_dof : numpy.ndarray (float) The degrees-of-freedom for each channel of shape (n_channels,). channel_flags : numpy.ndarray (int) The flags for each channel of shape (n_channels,). Flags will be updated in place. min_weight : float The minimum acceptable (1/variance) weight value. max_weight : float The maximum acceptable (1/variance) weight value. exclude_flag : int An integer flag marking channel flag types that should be excluded from the flagging, or overall weight/gain calculation. dof_flag : int The integer marking channels flagged as having insufficient degrees-of- freedom. sensitivity_flag : int The integer marking channels flagged as having unacceptable weight (variance) values based on whether it falls outside the `min_weight` to `max_weight` range. default_weight : float The default weight value for channels. Any channel with a weight value equivalent to the default value will not be flagged or included in any calculations, as channels with this weight value will not have been processed due to other factors. Returns ------- n_points, weight_sum, channel_flags : int, float, numpy.ndarray (int) `n_points` are the number of channels that have valid `dof` values, are not excluded by `exclude_flag`, and have positive non-default weight values. `weight_sum` is sum(weight * gain^2) for all zero-flagged channels (following unflagging), and `channel_flags` are the updated channel flags. """ n_channels = channel_gain.size weights = np.empty(n_channels, dtype=nb.float64) valid_points = 0 for i in range(n_channels): # Unflag/flag Degrees-of-freedom flag if channel_dof[i] > 0: if (channel_flags[i] & dof_flag) != 0: channel_flags[i] ^= dof_flag else: channel_flags[i] |= dof_flag continue if (channel_flags[i] & exclude_flag) != 0: continue gain = channel_gain[i] if gain == 0: continue weight = channel_weight[i] if (weight <= 0) or (weight == default_weight): continue weights[valid_points] = np.log(1.0 + (weight * (gain ** 2))) valid_points += 1 if valid_points == 0: return valid_points, 0.0, channel_flags if valid_points > 10: mean_log_weight = numba_functions.robust_mean( weights[:valid_points], tails=0.1) else: mean_log_weight = np.nanmedian(weights[:valid_points]) mean_weight = np.exp(mean_log_weight) - 1 lower_weight = min_weight * mean_weight upper_weight = max_weight * mean_weight sum_weight = 0.0 for i in range(n_channels): weight = channel_weight[i] wg2 = weight * (channel_gain[i] ** 2) if (wg2 < lower_weight) or (wg2 > upper_weight): channel_flags[i] |= sensitivity_flag elif weight == default_weight: channel_flags[i] |= sensitivity_flag else: if (channel_flags[i] & sensitivity_flag) != 0: channel_flags[i] ^= sensitivity_flag if channel_flags[i] == 0: sum_weight += wg2 return valid_points, sum_weight, channel_flags
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def get_typical_gain_magnitude(gains): # pragma: no cover """ Return the typical gain magnitude for a gain array. The typical gain magnitude (g_mag) for an array of gains (g) is given by:: g_mag = exp(inner_80_percent_mean(ln(1 + |g|))) - 1 Only finite values of g will be included in the mean calculation. If g_mag evaluates to < 0, then a result of 1 will be returned (which should not happen). Note that this mean will rarely ever evaluate to the true mean. Thus, removing this mean and re-evaluating (as is done with many functions for sofia_scan) will probably get you close to zero, but not actually hit the mark. Just be aware of this as it can cause confusion. Parameters ---------- gains : numpy.ndarray (float) Returns ------- gain_magnitude : float """ n = gains.size log_p1_gains = np.empty(n, dtype=nb.float64) valid = 0 for i in range(n): gain = gains[i] if not np.isfinite(gain): continue log_p1_gains[valid] = np.log(1.0 + abs(gain)) valid += 1 log_p1_mean = numba_functions.robust_mean(log_p1_gains[:valid], tails=0.1) if not np.isfinite(log_p1_mean): return 1.0 average_gain = np.exp(log_p1_mean) - 1.0 if average_gain > 0: return average_gain else: # pragma: no cover return 1.0
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def get_one_over_f_stat(weights, one_over_f_stats, flags): # pragma: no cover """ Return the total channel 1/f statistic. The 1/f statistic is given as: f1_stat = sqrt(sum(w * f^2) / sum(w)) where w are the channel weights and f are the channel 1/f statistics. Parameters ---------- weights : numpy.ndarray (float) The channel weights of shape (n_channels,). one_over_f_stats : numpy.ndarray (float) The channel 1/f statistics of shape (n_channels,). flags : numpy.ndarray (int) The channel flags of shape (n_channels,). Only zero-valued flags will be included in the calculations. Returns ------- f1_stat : float """ wd_sum = 0.0 w_sum = 0.0 for i in range(weights.size): if flags[i] != 0: continue w = weights[i] if w == 0: continue f = one_over_f_stats[i] if np.isnan(f): continue wd_sum += w * f * f w_sum += w if w_sum <= 0: return np.nan return np.sqrt(wd_sum / w_sum)
[docs] @nb.njit(cache=True, nogil=False, parallel=False, fastmath=False) def get_source_nefd(filtered_source_gains, weight, variance, flags, integration_time, integration_gain): # pragma: no cover """ Return the source Noise-Equivalent-Flux-Density (NEFD). The source NEFD is given as: nefd = sqrt(n * t / sum(g^2 / v)) / abs(integration_gain) where n are the number of mapping channels (unflagged and positive weights), t is the integration time, g is the source gain, and v is the channel variance. Parameters ---------- filtered_source_gains : numpy.ndarray (float) The filtered source gains of shape (n_channels,) which is the product of channel coupling, gain, and source filtering. weight : numpy.ndarray (float) The channel weights of shape (n_channels,). variance : numpy.ndarray (float) The channel variances of shape (n_channels,). flags : numpy.ndarray (int) The channel flags of shape (n_channels,) where non-zero values are not included in the calculation. integration_time : float The integration time in arbitrary units (numba). Remember to reconvert after. integration_gain : float The integration gain factor. Returns ------- nefd : float The source NEFD """ mapping_channels = 0 sum_pw = 0.0 n_channels = filtered_source_gains.size abs_gain = np.abs(integration_gain) for i in range(n_channels): if flags[i] != 0: continue if weight[i] <= 0: continue mapping_channels += 1 g = filtered_source_gains[i] if g == 0: continue # Variance should be non-zero since weight already checked sum_pw += g * g / variance[i] if sum_pw == 0: return np.inf else: return np.sqrt(integration_time * mapping_channels / sum_pw) / abs_gain