Source code for sofia_redux.scan.signal.correlated_signal

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from copy import deepcopy

from astropy import units
import numpy as np

from sofia_redux.scan.signal.signal import Signal
from sofia_redux.scan.integration.dependents.dependents import Dependents
from sofia_redux.scan.utilities import numba_functions
from sofia_redux.scan.signal import signal_numba_functions as snf

__all__ = ['CorrelatedSignal']


[docs] class CorrelatedSignal(Signal): def __init__(self, integration, mode): """ Initialize a correlated signal. The correlated signal is used to extract a signal from the integration defined by the supplied correlated mode. The mode defines a set of instrument channels, and how gain should be determined from them. Parameters ---------- integration : sofia_redux.scan.integration.integration.Integration The integration to which this signal belongs. mode : sofia_redux.scan.channels.mode.correlated_mode.CorrelatedMode The correlated modality mode for this signal. """ super().__init__(integration, mode=mode) self.dependents = Dependents(integration, mode.name) self.debug = False self.source_filtering = None # per channel self.generation = 0 self.resolution = mode.get_frame_resolution(integration) self.value = np.zeros(mode.signal_length(integration), dtype=float) self.weight = np.zeros(self.value.size, dtype=float) self.drift_n = self.value.size
[docs] def copy(self): """ Return a copy of the signal. Returns ------- CorrelatedSignal """ new = self.__class__(self.integration, self.mode) for key, value in self.__dict__.items(): if key in self.referenced_attributes: setattr(new, key, value) else: setattr(new, key, deepcopy(value)) return new
[docs] def weight_at(self, frame): """ Return the weight at a given frame index. Parameters ---------- frame : int Returns ------- weight : float """ return self.weight[frame // self.resolution]
[docs] def get_variance(self): """ Return the signal variance. The signal variance is returned as: v = sum(w * x^2) / sum(w) where x are the signal values and w are the signal weights. Returns ------- variance : float """ return snf.get_signal_variance(values=self.value, weights=self.weight)
[docs] def level(self, start_frame=None, end_frame=None, robust=False): """ Remove the mean value from the signal values. The mean signal between a given start and end frame is calculated and subtracted from the signal. This value is returned to the user. Parameters ---------- start_frame : int, optional The starting frame from which to level. The default is the first frame (0). end_frame : int, optional The last frame from to level (non-inclusive). The default is the total number of frames. robust : bool, optional If `True`, subtract the median value rather than the mean. Returns ------- average : float The average signal value that was removed. """ if start_frame is None: start_frame = 0 if end_frame is None: end_frame = self.integration.size value = self.value if isinstance(value, units.Quantity): value = value.value return snf.level( values=value, start_frame=start_frame, end_frame=end_frame, resolution=self.resolution, weights=self.weight, robust=robust)
[docs] def get_median(self): """ Return the median signal value and weight. Returns ------- median, median_weight : float, float """ return numba_functions.smart_median_1d( values=self.value, weights=self.weight, max_dependence=1.0)
[docs] def get_mean(self): """ Return the mean signal value and weight. Returns ------- mean, mean_weight """ return numba_functions.mean(values=self.value, weights=self.weight)
[docs] def differentiate(self): """ Differentiate the signal values and weights in-place. Note that the spacing between sample values is assumed to be in seconds when calculating the gradient. Returns ------- None """ s = self.info.instrument.sampling_interval.decompose().value snf.differentiate_weighted_signal( values=self.value, weights=self.weight, dt=self.resolution * s)
[docs] def integrate(self): """ Integrate the signal values and weights in-place. Note that the spacing between sample values is assumed to be in seconds. The `is_floating` attribute is set to `True` following this operation indicating that the integrated signal has an arbitrary DC offset. Returns ------- None """ s = self.info.instrument.sampling_interval.decompose().value snf.integrate_weighted_signal( values=self.value, weights=self.weight, dt=s * self.resolution) self.is_floating = True
[docs] def get_differential(self): """ Return a differentiated copy of the signal. Returns ------- CorrelatedSignal """ return super().get_differential()
[docs] def get_integral(self): """ Return an integrated copy of the signal. Returns ------- CorrelatedSignal """ return super().get_integral()
[docs] def get_parms(self): """ Return the degrees of freedom for the signal. The degrees of freedom are given as DOF = number_of(weights > 0) * (1 - 1/drift_n) Returns ------- float """ n = np.sum(self.weight > 0) return n * (1.0 - (1.0 / self.drift_n))
[docs] def update(self, robust=False): """ Update the frame data by the signal. The gain deltas are derived and subtracted from the frame data. Dependents are updated and new source filtering are derived. Parameters ---------- robust : bool, optional If `True` use the robust method (median) to derive means. Returns ------- None """ # Get correlated for all frames, even those that are no good, but # only use channels that are valid, and skip over flagged samples channel_group = self.mode.channel_group good_channels = self.mode.get_valid_channels() frames = self.integration.frames resolution = self.mode.get_frame_resolution(self.integration) gains = self.mode.get_gains() self.sync_gains = delta_gains = gains - self.sync_gains resync_gains = (delta_gains != 0).any() & (self.value != 0).any() channel_group.temp = np.zeros(channel_group.size, dtype=float) channel_group.temp_g = gains.copy() channel_group.temp_wg = channel_group.weight * channel_group.temp_g channel_group.temp_wg2 = channel_group.temp_wg * channel_group.temp_g # Remove channels with zero gain/weight from good channels # Pre-calculate the channel dependents good_channels.delete_indices(good_channels.temp_wg2 == 0) # Correct for lowered degrees of freedom due to prior filtering good_channels.temp = (good_channels.temp_wg2 * good_channels.get_filtering(self.integration)) # Clear the dependents in all mode channels self.dependents.clear(channel_group, start=0, end=self.integration.size) # Resync gains if necessary if resync_gains: snf.resync_gains( frame_data=frames.data, signal_values=self.value, resolution=resolution, delta_gains=delta_gains, channel_indices=channel_group.indices, frame_valid=frames.valid) modeling_frames = frames.is_flagged('MODELING_FLAGS') # Calculate the gain increments and weights if robust: dc, dc_weight = self.get_robust_correlated( good_channels, modeling_frames=modeling_frames) else: dc, dc_weight = self.get_ml_correlated( good_channels, modeling_frames=modeling_frames) snf.apply_gain_increments( frame_data=frames.data, # updated by -= channel_gain * dc frame_weight=frames.relative_weight, frame_valid=frames.valid, modeling_frames=modeling_frames, frame_dependents=self.dependents.for_frame, # updated channel_g=channel_group.temp_g, channel_fwg2=channel_group.temp, channel_indices=channel_group.indices, channel_dependents=self.dependents.for_channel, # updated sample_flags=frames.sample_flag, signal_values=self.value, # updated by += dc signal_weights=self.weight, # updated to dc_weight resolution=resolution, increment=dc, increment_weight=dc_weight) # Apply the mode dependents only to the channels that have contributed self.dependents.apply(channels=good_channels, start=0, end=frames.size) # Update the gain values used for signal extraction. self.set_sync_gains(gains) self.generation += 1 # Calculate the point-source_filtering by decorrelation. self.calc_filtering()
[docs] def get_frame_data_signal(self): """ Get the signal as it would appear translated to the frame data. Returns ------- frame_data_signal : numpy.ndarray (float) The signal contribution to the frame data of shape (n_frames, n_channels). """ channel_group = self.mode.channel_group frames = self.integration.frames resolution = self.mode.get_frame_resolution(self.integration) frame_data_signal = snf.get_frame_data_signal( frame_data=frames.data, signal_values=self.value, resolution=resolution, gains=self.mode.get_gains(), channel_indices=channel_group.indices, frame_valid=frames.valid) return frame_data_signal
[docs] def get_ml_correlated(self, channel_group, modeling_frames=None): """ Get the maximum-likelihood correlated gain increment and weight. The gain increments are given as: dC_s = sum_{f | s}{w_f * sum_{c} {w_c g_c d_{f,c}} / dW_s where dW_s is the gain increment weight given by dW_s = sum_{f | s}{w_f} sum_{c} {w_c g_c^2} Here {f | s} indicate the frames in each signal block, w_f is the frame weight, w_c is the channel_weight, g_c is the channel gain, and d_{f, c} is the frame data value for frame f and channel c. Parameters ---------- channel_group : ChannelGroup modeling_frames : numpy.ndarray (bool), optional A boolean mask where `True` indicates that a frame is used for modeling and should be excluded from the calculations. Returns ------- gain_increment, increment_weight : numpy.ndarray, numpy.ndarray The gain increments and associated weights both of shape (n_signal,) or (n_frames // self.resolution). """ frames = self.integration.frames if modeling_frames is None: modeling_frames = frames.is_flagged('MODELING_FLAGS') valid_frames = frames.valid & np.logical_not(modeling_frames) return snf.get_ml_correlated( frame_data=frames.data, frame_weights=frames.relative_weight, frame_valid=valid_frames, channel_indices=channel_group.indices, channel_wg=channel_group.temp_wg, channel_wg2=channel_group.temp_wg2, sample_flags=frames.sample_flag, resolution=self.resolution)
[docs] def get_robust_correlated(self, channel_group, modeling_frames=None, max_dependence=0.25): """ Get the median derived correlated gain increment and weight. Derived in a very similar way to the `get_ml_correlated` method except replacing the mean operation with a median. Please see :func:`snf.get_robust_correlated` for further details. Parameters ---------- channel_group : ChannelGroup modeling_frames : numpy.ndarray (bool), optional A boolean mask where `True` indicates that a frame is used for modeling and should be excluded from the calculations. max_dependence : float, optional Returns ------- gain_increment, increment_weight : numpy.ndarray, numpy.ndarray The gain increments and associated weights both of shape (n_signal,) or (n_frames // self.resolution). """ frames = self.integration.frames if modeling_frames is None: modeling_frames = frames.is_flagged('MODELING_FLAGS') valid_frames = frames.valid & np.logical_not(modeling_frames) return snf.get_robust_correlated( frame_data=frames.data, frame_weights=frames.relative_weight, frame_valid=valid_frames, channel_indices=channel_group.indices, channel_g=channel_group.temp_g, channel_wg2=channel_group.temp_wg2, sample_flags=frames.sample_flag, resolution=self.resolution, max_dependence=max_dependence)
[docs] def calc_filtering(self): """ Update the source filtering of the signal. Where phi is the channel dependents, they are updated by:: phi = mean(phi + phi * channel_overlaps) The signal source filtering is set to 1 - phi. The channel source filtering has the prior correction undone, and then updated as:: csf *= signal source filtering. Returns ------- None """ if self.source_filtering is None: self.source_filtering = np.ones(self.mode.size, dtype=float) channel_group = self.mode.channel_group cf, sf = snf.calculate_filtering( channel_indices=channel_group.indices, channel_dependents=self.dependents.for_channel, overlaps=channel_group.overlaps.toarray(), channel_valid=channel_group.is_unflagged(self.mode.skip_flags), n_parms=self.get_parms(), channel_source_filtering=channel_group.source_filtering, signal_source_filtering=self.source_filtering) channel_group.source_filtering = cf self.source_filtering = sf