[docs]
class Signal(ABC):
referenced_attributes = ['mode', 'integration']
def __init__(self, integration, mode=None, values=None, is_floating=False):
"""
Initialize a Signal object.
The Signal object is used to extract a gain signal from an integration.
Parameters
----------
integration : sofia_redux.scan.integration.integration.Integration
The integration to which the signal belongs.
mode : sofia_redux.scan.channels.mode.mode.Mode, optional
The channel mode for which to create the signal.
values : numpy.ndarray (float), optional
Optional signal values to apply.
is_floating : bool, optional
If `True`, indicates that the signal values have some arbitrary DC
offset. This indicates that it would be advisable to "level"
(subtract this offset) from the signal values before any type of
application.
"""
self.mode = mode
self.integration = integration
self.value = values
self.weight = None
self.drifts = None
self.sync_gains = None
self.resolution = None
self.drift_n = None
self.is_floating = is_floating
if mode is not None:
self.sync_gains = np.zeros(mode.size, dtype=float)
integration.add_signal(self)
if values is not None:
values = np.atleast_1d(values)
self.resolution = utils.roundup_ratio(integration.size,
values.size)
self.value = values
self.drift_n = values.size
def __str__(self):
"""
Return a string representation of the signal.
Returns
-------
str
"""
int_id = self.integration.get_full_id(separator='|')
mode_name = self.mode.name
return f"Signal {int_id}.{mode_name}"
[docs]
def copy(self):
"""
Return a copy of the signal.
Returns
-------
Signal
"""
new = self.__class__(self.integration, self.mode, values=self.value,
is_floating=self.is_floating)
for key, value in self.__dict__.items():
if key in self.referenced_attributes:
setattr(new, key, value)
else:
setattr(new, key, deepcopy(value))
return new
@property
def size(self):
"""
Return the length of the signal.
Returns
-------
n_signal : int
"""
if self.value is None:
return 0
return self.value.size
@property
def info(self):
"""
Return the info object specific to the signal integration.
Returns
-------
Info
"""
return self.integration.info
@property
def configuration(self):
"""
Return the configuration applicable to the signal.
Returns
-------
Configuration
"""
if self.info is None:
return None
return self.info.configuration
[docs]
def get_resolution(self):
"""
Return the signal resolution.
The signal resolution is basically the number of frames for which a
single signal value may be applicable.
Returns
-------
resolution : int
"""
return self.resolution
[docs]
def value_at(self, frame):
"""
Return the signal value for a given integration frame index.
Parameters
----------
frame : int or numpy.ndarray (int)
Returns
-------
signal_value : float
"""
return self.value[frame // self.resolution]
[docs]
def weight_at(self, frame):
"""
Return the signal weight for a given integration frame index.
Note that the standard signal object does not have an
associated weight.
Parameters
----------
frame : int or numpy.ndarray (int)
Returns
-------
signal_weight : float
"""
return 1.0
[docs]
def scale(self, factor):
"""
Scale the signal, drifts, and sync_gains by a given factor.
Signal values and drifts are multiplied by the given factor.
Sync gains (previous gain values) are divided by the given factor.
Parameters
----------
factor : float
Returns
-------
None
"""
self.value *= factor
if self.drifts is not None:
self.drifts *= factor
self.sync_gains /= factor
[docs]
def add(self, value):
"""
Add a DC offset to all signal and drift values.
Parameters
----------
value : float
The value to add.
Returns
-------
None
"""
self.value += value
if self.drifts is not None:
self.drifts += value
[docs]
def subtract(self, value):
"""
Subtract a DC offset from all signal and drift values.
Parameters
----------
value : float
Returns
-------
None
"""
self.add(value * -1)
[docs]
def add_drifts(self):
"""
Add the drifts onto the signal value.
All drifts are erased following this operation.
Returns
-------
None
"""
if self.drifts is None:
return
snf.add_drifts(
signal_values=self.value, drifts=self.drifts,
drift_length=self.drift_n)
self.drifts = None
[docs]
def get_rms(self):
"""
Get the signal RMS.
Returns
-------
rms : float
"""
return np.sqrt(self.get_variance())
[docs]
def get_variance(self):
"""
Get the signal variance.
Returns
-------
variance : float
"""
return snf.get_signal_variance(self.value)
[docs]
def remove_drifts(self, n_frames=None, is_reconstructable=True,
robust=False):
"""
Remove drifts from the signal.
A drift is defined as the average signal for a given block of frames
(the length of which is given by `n_frames`. For each drift, the
average signal value is calculated and subtracted from all signal
values within that drift block. If the drifts are marked as
"reconstructable", they average signal values are stored in the
`drifts` attribute.
Parameters
----------
n_frames : int, optional
The number of frames in each drift. The default is the number
of frames in the signal's integration filter time scale.
is_reconstructable : bool, optional
If `True`, save the drift values for later use.
robust : bool, optional
If `True`, the robust (median) method will be used to determined
the average signal level for each drift.
Returns
-------
None
"""
if n_frames is None:
n_frames = self.integration.frames_for()
n = utils.roundup_ratio(n_frames, self.resolution)
if self.drifts is None or n != self.drift_n:
# Add and destroy the current drifts before calculating new ones.
self.add_drifts()
self.drift_n = n
drift_size = utils.roundup_ratio(self.value.size, n)
drifts = np.zeros(drift_size, dtype=float)
if is_reconstructable:
self.drifts = drifts
elif is_reconstructable:
drifts = self.drifts
else:
drift_size = utils.roundup_ratio(self.value.size, n)
drifts = np.zeros(drift_size, dtype=float)
# Remove the mean signal and store as drifts in n_frames chunks
snf.remove_drifts(
signal_values=self.value,
drifts=drifts,
n_frames=n_frames,
resolution=self.resolution,
integration_size=self.integration.size,
signal_weights=self.weight,
robust=robust)
[docs]
def get_mean(self):
"""
Return the mean signal value and weight.
Returns
-------
mean_value, mean_weight
"""
value, _ = numba_functions.mean(values=self.value, weights=self.weight)
weight = np.inf
return value, weight
[docs]
def differentiate(self):
"""
Differentiate the signal values in-place.
Note that the spacing between sample values is assumed to be in
seconds when calculating the gradient. The `is_floating` attribute
is set to `False` following this operation indicating that the
differentiated signal has no DC offset.
Returns
-------
None
"""
s = self.info.instrument.sampling_interval.decompose().value
snf.differentiate_signal(values=self.value, dt=s * self.resolution)
self.is_floating = False
[docs]
def integrate(self):
"""
Integrate the signal values in-place using the trapezoidal rule.
Note that the spacing between sample values is assumed to be in
seconds. The `is_floating` attribute is set to `True` following this
operation indicating that the integrated signal has an arbitrary
DC offset.
Returns
-------
None
"""
s = self.info.instrument.sampling_interval.decompose().value
snf.integrate_signal(values=self.value, dt=s * self.resolution)
self.is_floating = True
[docs]
def get_differential(self):
"""
Return a differentiated copy of the signal.
Returns
-------
Signal
"""
signal = self.copy()
signal.differentiate()
return signal
[docs]
def get_integral(self):
"""
Return an integrated copy of the signal.
Returns
-------
Signal
"""
signal = self.copy()
signal.integrate()
return signal
[docs]
def level(self, start_frame=None, end_frame=None, robust=False):
"""
Remove the mean value from the signal values.
The mean signal between a given start and end frame is calculated and
subtracted from the signal. This value is returned to the user.
Parameters
----------
start_frame : int, optional
The starting frame from which to level. The default is the first
frame (0).
end_frame : int, optional
The last frame from to level (non-inclusive). The default is the
total number of frames.
robust : bool, optional
If `True`, subtract the median value rather than the mean.
Returns
-------
average : float
The average signal value that was removed.
"""
if start_frame is None:
start_frame = 0
if end_frame is None:
end_frame = self.integration.size
value = self.value
if isinstance(value, units.Quantity):
value = value.value
return snf.level(
values=value,
start_frame=start_frame,
end_frame=end_frame,
resolution=self.resolution,
robust=robust)
[docs]
def smooth(self, fwhm):
"""
Smooth the signal to a given FWHM (given in frames).
Parameters
----------
fwhm : float or int
The FWHM in given in units of frames.
Returns
-------
None
"""
width = fwhm / self.resolution
sigma = gaussian_fwhm_to_sigma * width
n = 2 * (int(np.ceil(width))) + 1
self.smooth_with_kernel(numba_functions.gaussian_kernel(n, sigma))
[docs]
def smooth_with_kernel(self, kernel):
"""
Smooth the signal with a given kernel.
Kernel spacing should be in units of self.resolution
(number of frames).
Parameters
----------
kernel : numpy.ndarray (float)
The kernel to smooth the signal values by.
Returns
-------
None
"""
numba_functions.smooth_1d(self.value, kernel)
[docs]
def set_sync_gains(self, gains):
"""
Copy the gains to the sync_gains attribute.
Parameters
----------
gains : numpy.ndarray (float)
An array of gains to copy to sync_gains.
Returns
-------
None
"""
self.sync_gains = gains.astype(float)
[docs]
def get_gain_increment(self, robust=False):
"""
Return the gain increment and weight increment for each mode channel.
For the robust method, the gain increment value is determined as:
increment = median(frame_data / signal_value)
while the maximum-likelihood method gives the gain increment as:
increment = sum(frame_data * frame_weight * signal_value) / increment_w
Both methods are weighted by frame_weight * signal_value^2 and result
in an increment weight of:
increment_w = sum(frame_weight * signal_value^2).
Parameters
----------
robust : bool, optional
If `True`, use the robust method (median) to determine gain
increments. Otherwise, use the maximum-likelihood method (mean).
Returns
-------
increment, increment_w : numpy.ndarray, numpy.ndarray
The gain increment and increment weight, both of shape
(n_channels,).
"""
if self.configuration.get_bool('signal-response'):
self.integration.comments.append(
f'{{{self.get_covariance():.2f}}}')
log.debug(f"covariance = {self.get_covariance():.2f}")
# Precalculate the gain-weight products...
# frames.temp_c = signal
# frames.temp_wc = frame_weight * signal
# frames.temp_wc2 = frame_weight * signal^2
self.prepare_frame_temp_fields()
if robust:
return self.get_robust_gain_increment()
else:
return self.get_ml_gain_increment()
[docs]
def get_ml_gain_increment(self):
"""
Get the maximum-likelihood (ML) gain increment for the signal.
The maximum likelihood gain increment for a given channel (c) is:
dG = sum(frame_data_{c} * frame_weight * signal_value) / dW
dW = sum(frame_weight * signal_value^2)
No invalid frames of modeling frames will be included in the sum.
Additionally, no flagged samples will be included.
Returns
-------
gain_increments, increment_weights : numpy.ndarray, numpy.ndarray
The gain increments and increment weights for each channel in the
signal mode channel group. Both are arrays of shape (n_channels,).
"""
frames = self.integration.frames
valid_frames = frames.valid & frames.is_unflagged('MODELING_FLAGS')
return snf.get_ml_gain_increment(
frame_data=frames.data,
signal_wc=frames.temp_wc,
signal_wc2=frames.temp_wc2,
sample_flags=frames.sample_flag,
channel_indices=self.mode.channel_group.indices,
valid_frames=valid_frames)
[docs]
def get_robust_gain_increment(self):
"""
Get the robust (median derived) gain increment for the signal.
The gain increment for a given channel is:
increment = median(frame_data / signal_value)
increment_weight = sum(frame_weight * signal_value^2)
No invalid frames of modeling frames will be included in the sum.
Additionally, no flagged samples will be included.
Returns
-------
gain_increments, increment_weights : numpy.ndarray, numpy.ndarray
The gain increments and increment weights for each channel in the
signal mode channel group. Both are arrays of shape (n_channels,).
"""
frames = self.integration.frames
valid_frames = frames.valid & frames.is_unflagged('MODELING_FLAGS')
return snf.get_robust_gain_increment(
frame_data=frames.data,
signal_c=frames.temp_c,
signal_wc2=frames.temp_wc2,
sample_flags=frames.sample_flag,
channel_indices=self.mode.channel_group.indices,
valid_frames=valid_frames)
[docs]
def prepare_frame_temp_fields(self):
"""
Prepare precalculated values for subsequent processing.
The signal values are stored in the temp_* fields of the frame data.
These contain:
temp_c = signal_value
temp_wc = relative_weight * signal_value
temp_wc2 = relative_weight * signal_value^2
Invalid frames have zero values. Frames flagged as "MODELING" are
set to zero weight.
Returns
-------
None
"""
frames = self.integration.frames
signal_values = self.value_at(np.arange(frames.size))
snf.prepare_frame_temp_fields(
signal_values=signal_values,
frame_weights=frames.relative_weight,
frame_valid=frames.valid,
frame_modeling=frames.is_flagged('MODELING_FLAGS'),
frame_c=frames.temp_c,
frame_wc=frames.temp_wc,
frame_wc2=frames.temp_wc2)
[docs]
def get_covariance(self):
"""
Return the signal covariance over all samples (frames, channels).
The covariance only includes unflagged channels and unflagged samples.
The signal covariance is given as:
C = sum_{channels}(xs * xs) / sum_{channels}(ss * xx)
where xs = sum_{frames}(w * x * s), ss = sum_{frames}(w * s * s), and
xx = sum_{frames}(w * x * x). Here w is the channel weight, s is the
signal value for each frame, and x are the frame data values.
Returns
-------
covariance : float
"""
channels = self.mode.channel_group.create_data_group(match_flag=0)
frames = self.integration.frames
covariance = snf.get_covariance(
signal_values=self.value_at(np.arange(frames.size)),
frame_data=frames.data,
frame_valid=frames.valid,
channel_indices=channels.indices,
channel_weights=channels.weight,
sample_flags=frames.sample_flag)
return covariance
[docs]
def resync_gains(self):
"""
Resynchronize gains if any changes have occurred.
The delta gains are calculated for each channel as:
current_gain - last_gain
For any non-zero delta_gain, frame data (x) for that channel will be
updated by decrementing that value by:
x[frame, channel] -= signal[frame] * delta_gain[channel]
Returns
-------
None
"""
gains = self.mode.get_gains()
delta_gains = gains - self.sync_gains
if np.all(delta_gains == 0):
return # nothing to do
frames = self.integration.frames
snf.resync_gains(
frame_data=frames.data,
signal_values=self.value,
resolution=self.resolution,
delta_gains=delta_gains,
channel_indices=self.mode.channel_group.indices,
frame_valid=frames.valid)
self.set_sync_gains(gains)
[docs]
def synchronize_gains(self, sum_wc2=None, is_temp_ready=True):
"""
Synchronize gains in the signal.
If the gains have been updated, applies any changes to the data and
updates dependencies.
Parameters
----------
sum_wc2 : numpy.ndarray (float), optional
An array of gain weights of shape (n_channels,).
is_temp_ready : bool, optional
Indicates whether the frame temporary values have already been
calculated. These should contain::
temp_c = signal_value
temp_wc = relative_weight * signal_value
temp_wc2 = relative_weight * signal_value^2
Returns
-------
None
"""
if self.mode.fixed_gains:
raise ValueError("Cannot change gains for fixed gain modes.")
channel_group = self.mode.channel_group
parms = self.integration.get_dependents(f'gains-{self.mode.name}')
gains = self.mode.get_gains()
delta_gains = gains - self.sync_gains
if (delta_gains == 0).all():
# Do not need to resynchronize if there was no change
return
if not is_temp_ready:
self.prepare_frame_temp_fields()
if sum_wc2 is not None:
parms.clear(channel_group)
frames = self.integration.frames
snf.synchronize_gains(
frame_data=frames.data, # Updated
sample_flags=frames.sample_flag,
frame_valid=frames.valid,
modeling_frames=frames.is_flagged('MODELING_FLAGS'),
channel_indices=channel_group.indices,
delta_gains=delta_gains,
frame_wc2=frames.temp_wc2,
channel_wc2=sum_wc2,
signal_values=frames.temp_c,
frame_parms=parms.for_frame, # Updated
channel_parms=parms.for_channel) # Updated
# Apply the updated mode dependencies
parms.apply(channel_group)
# Register the gains as the ones used for the signal
self.set_sync_gains(gains)
[docs]
def write_signal_values(self, out_file):
"""
Write the signal information to a text file.
Note that in crush this is the "print" method.
Parameters
----------
out_file : str
The name of the file to write to.
Returns
-------
None
"""
n = 1 / (self.resolution * self.integration.info.integration_time)
n = n.decompose().value
with open(out_file, 'w') as f:
print(f'# {n}', file=f)
for value in self.value:
print(f'{value:.3e}', file=f)