[docs]
class Filter(ABC):
def __init__(self, integration=None, data=None):
"""
Initialize an integration filter.
The filter is designed to filter integration data using an FFT.
This is an abstract class designed to hold a few main parameters
necessary for filtering. Each filter requires an integration on
which to operate, and defines a response at any given frequency.
Parameters
----------
integration : Integration, optional
data : numpy.ndarray (float), optional
An array of shape (nt, n_channels) where nt is the nearest
power of 2 integer above the number of integration frames. i.e.,
if n_frames=5, nt=8, or if n_frames=13, nt=16.
"""
self.integration = None
self.channels = None
self.parms = None # The integration filter dependents
self.frame_parms = None # Frame dependents
self.data = None # Temporary or real filter data workspace
self.points = None # frame relative weight sums for each channel
self.is_sub_filter = False # Whether this is a sub-filter of main
self.nt = 0 # pow2ceil of integration.size
self.nf = 0 # The Nyquist frequency index
self.df = 0.0 # The frequency spacing
self.dft = False # Whether to use the discrete Fourier transform
self.pedantic = False # If True, level filter data after application
self.enabled = False # Whether the filter is enabled in the config
if data is not None:
self.data = data
if integration is not None:
self.set_integration(integration)
[docs]
def copy(self):
"""
Return a copy of the filter.
All attributes are copied aside from the integration and channels which
are referenced only.
Returns
-------
Filter
"""
new = self.__class__(integration=None, data=None)
for attribute, value in self.__dict__.items():
if attribute in self.referenced_attributes:
setattr(new, attribute, value)
elif hasattr(value, 'copy'):
setattr(new, attribute, value.copy())
else:
setattr(new, attribute, deepcopy(value))
return new
@property
def referenced_attributes(self):
"""
Return attributes that should be referenced during the copy operation.
Returns
-------
set (str)
"""
return {'integration', 'channels'}
@property
def channel_dependent_attributes(self):
"""
Return attributes that are dependent on the parent channels.
This is required during a slim operation (reducing number of channels)
which may happen to the filter integration.
Returns
-------
set (str)
"""
return {'points'}
@property
def size(self):
"""
Return the number of integration frames.
Returns
-------
n_frames : int
"""
if self.frames is None:
return 0
return self.frames.size
@property
def channel_size(self):
"""
Return the number of channels in the filter.
Returns
-------
n_channels : int
"""
if self.channels is None:
return 0
return self.channels.size
@property
def frames(self):
"""
Return the integration frames.
Returns
-------
Frames
"""
if self.integration is None:
return None
return self.integration.frames
@property
def info(self):
"""
Return the integration information.
Returns
-------
Info
"""
if self.integration is None:
return None
return self.integration.info
@property
def configuration(self):
"""
Return the configuration (taken from the integration).
Returns
-------
Configuration
"""
if self.integration is None:
return None
return self.integration.configuration
@property
def flagspace(self):
"""
Return the frame flagspace.
Returns
-------
Flags
"""
if self.integration is None:
return None
return self.integration.frames.flagspace
@property
def channel_flagspace(self):
"""
Return the channel flagspace.
Returns
-------
Flags
"""
if self.channels is None:
return None
return self.channels.flagspace
@property
def valid_filtering_frames(self):
"""
Return frames that are valid and not flagged as modeling.
Returns
-------
valid_frames : numpy.ndarray (bool)
An array of shape (n_frames,) where `True` marks a valid frame to
be included in filtering operations.
"""
if self.frames is None:
return np.empty(0, dtype=bool)
return self.frames.is_unflagged(
self.flagspace.flags.MODELING_FLAGS) & self.frames.valid
[docs]
def reindex(self):
"""Reindex the channel groups to be consistent with parent channels.
When the channels in the filter integration change for some reason
(such as a slim operation in which bad channels are removed), filter
attributes that map to integration channels will also need to be
updated to account for such a change.
At a base level, the filter attributes that need to be updated include
channel dependents, and the `points` (sum of frame weights) for
each channel. The `data` attribute is also set to `None`, since it
is designed to be a temporary workspace that may or may not depend on
the integration channels depending on the type of filter in question.
"""
if self.channels is None:
return
self.data = None
keep_indices = self.channels.new_indices_in_old()
self.channels.reindex()
if self.parms is not None:
self.parms = self.integration.get_dependents(
self.get_config_name())
channel_attributes = self.channel_dependent_attributes
for attribute, value in self.__dict__.items():
if attribute not in channel_attributes:
continue
if not isinstance(value, np.ndarray):
continue
setattr(self, attribute, value[keep_indices])
[docs]
def has_option(self, key):
"""
Return whether an option is set in the configuration for this filter.
The actual key checked for in the configuration will be:
<filter_config_name>.<key>
I.e., the "level" key for a whitening filter will be checked for in the
configuration as "filter.whiten.level".
Parameters
----------
key : str
The name of the option for the filter.
Returns
-------
is_configured : bool
"""
if self.integration is None:
return False
return self.integration.has_option(f'{self.get_config_name()}.{key}')
[docs]
def option(self, key):
"""
Return the value for a given filter option.
The given `key` relates specifically to this filter in the
configuration. As such, the actual configuration will search for the
configuration value associated with <filter_config_name>.<key>. I.e.,
the "level" key for a whitening filter will be checked for in the
configuration as "filter.whiten.level".
Parameters
----------
key : str
The name of the option for the filter.
Returns
-------
str
"""
if self.integration is None:
return None
return self.configuration.get(f'{self.get_config_name()}.{key}')
[docs]
def make_temp_data(self, channels=None):
"""
Create the initial temporary data.
Creates a temporary data array of shape (n_channels, n_frames).
Parameters
----------
channels : ChannelGroup, optional
The channels for which to create the temporary data. If not
supplied, defaults to the filtering channels.
Returns
-------
None
"""
if self.data is not None:
self.data = None
self.points = None
if channels is None:
channels = self.get_channels()
filter_size = numba_functions.pow2ceil(self.integration.size)
n_channels = channels.size
self.data = np.zeros((n_channels, filter_size), dtype=float)
self.points = np.zeros(n_channels, dtype=float)
[docs]
def discard_temp_data(self):
"""
Destroy the temporary data.
Returns
-------
None
"""
self.data = None
self.points = None
[docs]
def is_enabled(self):
"""
Return whether the filter is enabled.
Returns
-------
bool
"""
return self.enabled
[docs]
def get_temp_data(self):
"""
Return the temporary data.
Returns
-------
numpy.ndarray (float)
"""
return self.data
[docs]
def set_temp_data(self, data):
"""
Set the temporary data.
Parameters
----------
data : numpy.ndarray (float)
Returns
-------
None
"""
self.data = data
[docs]
def rejection_at(self, fch):
"""
Return the filter rejection at given frequency channel(s).
The filter rejection is equal to 1 minus the filter response:
rejection = 1 - response
Parameters
----------
fch : int or numpy.ndarray (int or bool) or slice.
Returns
-------
response : float or numpy.ndarray (float)
"""
return 1.0 - self.response_at(fch)
[docs]
def count_parms(self):
"""
Return the total dependent count of the filter.
The number of filter dependents is calculated as the sum of the
filter rejection (varies from 0 to 1) between the high-pass and
Nyquist frequency (inclusive).
Returns
-------
rejected : float or numpy.ndarray (float)
The sum of the rejected signal above the high pass frequency. If
this is a filter that varies by channel, an array will be returned
of shape (n_channels,). Otherwise, a singular float value will be
returned.
"""
min_freq = self.get_high_pass_index()
rejection = self.rejection_at(np.arange(min_freq, self.nf))
if rejection.ndim < 2:
return np.sum(rejection)
else:
return np.sum(rejection, axis=1)
[docs]
def get_channels(self):
"""
Return the associated channel group for the integration filter.
Returns
-------
ChannelGroup
"""
return self.channels
[docs]
def set_channels(self, channels):
"""
Set the filter channels.
The channels attribute will be set to a ChannelGroup type.
Parameters
----------
channels : Channels or ChannelData or ChannelGroup
Returns
-------
None
"""
if channels is None:
self.channels = None
return
if isinstance(channels, Channels):
self.channels = channels.create_channel_group(name='all')
elif isinstance(channels, ChannelData):
if not isinstance(channels, ChannelGroup):
group_class = self.info.get_channel_group_class()
self.channels = group_class(
channels, indices=np.arange(channels.size), name='all')
else:
self.channels = channels.copy()
else:
raise ValueError(f"Channels must be {Channels} or {ChannelData}, "
f"not {channels}.")
[docs]
def set_integration(self, integration):
"""
Set the filter integration.
Sets the padding of the FFT filter, the number of frequencies, the
frequency spacing, and retrieves the channels from the integration if
necessary.
Parameters
----------
integration : Integration
Returns
-------
None
"""
self.integration = integration
self.nt = numba_functions.pow2ceil(integration.size)
self.nf = self.nt // 2
dt = self.info.instrument.sampling_interval.decompose().value
self.df = 1.0 / (dt * self.nt)
self.set_channels(integration.channels)
[docs]
def update_config(self):
"""
Update the filter based on configuration settings.
Returns
-------
None
"""
if self.integration is None:
return
self.enabled = self.integration.has_option(self.get_config_name())
self.pedantic = self.integration.configuration.get_bool(
'filter.mrproper')
[docs]
def update_profile(self, channels=None): # pragma: no cover
"""
Update the profile for given channel(s).
Parameters
----------
channels : int or numpy.ndarray (int), optional
The channel indices to update.
Returns
-------
None
"""
pass
[docs]
def apply(self, report=True):
"""
Apply the filter to the integration.
There are a few steps that are performed during filter application:
1. Read the configuration to update any settings.
- Stop if the filter is not enabled in the configuration.
2. Pre-filter the channels. This generally involved removing the
filter frame and channel dependents form the integration, before
zeroing them in the filter in order to be re-calculated.
3. Apply the filter to the channels.
4. Post-filter the channels. This generally involves adding the
filter frame and channel dependents back to the integration.
Parameters
----------
report : bool, optional
If `True`, add messages to the integration comments on the
filtering.
Returns
-------
applied : bool
`True` if filtering occurred, and `False` otherwise.
"""
self.update_config()
if not self.is_enabled():
return False
self.integration.comments.append(self.get_id())
self.pre_filter()
self.frame_parms = np.zeros(self.integration.size, dtype=float)
self.apply_to_channels(channels=self.get_channels())
self.parms.add_for_frames(self.frame_parms)
self.post_filter()
if report:
self.report()
return True
[docs]
def apply_to_channels(self, channels=None):
"""
Apply the filter the integration for the channel group.
Parameters
----------
channels : ChannelGroup, optional
If not supplied, defaults to the filter channels.
Returns
-------
None
"""
local_temp = self.data is None
if channels is None:
channels = self.get_channels()
if local_temp:
self.make_temp_data(channels=channels)
elif channels.size != self.data.shape[0]:
self.discard_temp_data()
self.make_temp_data(channels=channels)
self.load_time_streams(channels=channels)
self.pre_filter_channels(channels=channels)
if self.dft:
self.dft_filter(channels=channels)
else:
self.fft_filter(channels=channels)
if self.pedantic:
self.level_data_for_channels(channels=channels)
self.post_filter_channels(channels=channels)
self.remove(channels=channels)
if local_temp:
self.discard_temp_data()
[docs]
def pre_filter(self):
"""
Perform the pre-filtering steps.
During the pre-filtering steps, dependents are retrieved from the
filter integration and cleared (subtracted from integration dependents,
integration channels, and zeroed).
Returns
-------
None
"""
if self.parms is None:
self.parms = self.integration.get_dependents(
self.get_config_name())
# Sub filters should not directly change the dependents in the
# integration since this is the job of the parent filter.
if not self.is_sub_filter:
self.parms.clear(self.get_channels(),
start=0,
end=self.integration.size)
else:
self.parms.for_frame.fill(0.0)
self.parms.for_channel.fill(0.0)
[docs]
def post_filter(self):
"""
Perform the post-filtering steps.
During the post-filtering steps, dependents are added to the
integration and integration channels.
Returns
-------
None
"""
# Sub filters should not directly change the dependents in the
# integration since this is the job of the parent filter.
if not self.is_sub_filter:
self.parms.apply(self.get_channels(),
start=0,
end=self.integration.size)
[docs]
def pre_filter_channels(self, channels=None): # pragma: no cover
"""
Performs the pre-filtering channels steps.
Parameters
----------
channels : ChannelGroup, optional
The channel group for which to perform the pre-filtering step.
If not supplied, defaults to the filtering channels.
Returns
-------
None
"""
pass
[docs]
def post_filter_channels(self, channels=None): # pragma: no cover
"""
Performs the post-filtering channels steps.
Parameters
----------
channels : ChannelGroup, optional
The channel group for which to perform the post-filtering step.
If not supplied, defaults to the filtering channels.
Returns
-------
None
"""
# Remove the DC component...
# level_data_for(channels)
pass
[docs]
def remove(self, channels=None):
"""
Subtract the filtered signal from the integration frame data.
Parameters
----------
channels : ChannelGroup, optional
The channel group for which to remove the signal. If not supplied,
defaults to the filtering channels.
Returns
-------
None
"""
if channels is None:
channels = self.get_channels()
self.remove_from_frames(
self.data, self.integration.frames, channels)
[docs]
@staticmethod
def remove_from_frames(rejected_signal, frames, channels):
"""
Remove the rejected signal from frame data.
Parameters
----------
rejected_signal : numpy.ndarray (float)
The rejected signal of shape (filtering_channels.size, n_frames).
frames : Frames
The frames for which to remove the rejected signal.
channels : ChannelGroup
The channel for which to subtract the signal.
Returns
-------
None
"""
fnf.remove_rejection_from_frames(
frame_data=frames.data,
frame_valid=frames.valid,
channel_indices=channels.indices,
rejected_signal=rejected_signal)
[docs]
def report(self):
"""
Add messages to the integration comments regarding the filtering.
Returns
-------
None
"""
if self.integration.channels.n_mapping_channels > 0:
msg = f'({self.get_mean_point_response()})'
else:
msg = '(---)'
self.integration.comments.append(msg)
[docs]
def load_time_streams(self, channels=None):
"""
Load time streams from integration frame data.
Timestream data is defined as the frame data multiplied by their
relative weights with the mean subtracted. Output data are stored
in the `data` temporary array. Invalid data are set to zero. The
`points` array is also populated and contains the sum of frame weights
for each channel. Invalid points are set to zero.
Parameters
----------
channels : ChannelGroup, optional
The channels for which to load time streams. If not supplied,
defaults to the filtering channels.
Returns
-------
None
"""
if channels is None:
channels = self.get_channels()
if self.data is None:
self.make_temp_data(channels=channels)
elif self.data.shape[0] != channels.size:
self.discard_temp_data()
self.make_temp_data(channels=channels)
frames = self.integration.frames
fnf.load_timestream(
frame_data=frames.data,
frame_weights=frames.relative_weight,
frame_valid=frames.valid,
modeling_frames=frames.is_flagged('MODELING_FLAGS'),
channel_indices=channels.indices,
sample_flags=frames.sample_flag,
timestream=self.data,
points=self.points)
[docs]
def fft_filter(self, channels=None):
"""
Apply the FFT filter to the temporary data.
Converts data into a rejected (un-levelled) signal
Parameters
----------
channels : ChannelGroup, optional
The channels for which to apply the filter. If not supplied,
defaults to the stored filtering channels.
Returns
-------
None
"""
freq_channels = np.arange(self.nf + 1)
rejection = self.rejection_at(freq_channels)
if not rejection.any():
return
if channels is None:
channels = self.get_channels()
# Pad with zeros as necessary.
zero_fill = False
if self.data.shape[1] > self.integration.size:
zero_fill = True
self.data[:, self.integration.size:] = 0.0
self.data = scipy.fft.rfft(self.data, axis=1)
self.update_profile(channels=channels)
self.data[:, 0].real = 0.0
if rejection.ndim == 1:
self.data[:, 0].imag *= rejection[self.nf]
self.data[:, 1:] *= rejection[1:]
else:
self.data[:, 0].imag *= rejection[:, self.nf]
self.data[:, 1:] *= rejection[:, 1:]
self.data = scipy.fft.irfft(self.data, axis=1)
if zero_fill:
self.data[:, self.integration.size:] = 0.0
[docs]
def dft_filter(self, channels=None):
"""
Return the filter rejection using a discrete FFT.
The speed of the FFT depends significantly on the number of non-zero
rejection values. This will be faster that the `fft_filter` method
under certain circumstances.
Parameters
----------
channels : ChannelGroup, optional
The channel group for which the filtering applied. By default,
set to the filtering channels.
Returns
-------
None
"""
freq_channels = np.arange(self.nf + 1)
rejection = self.rejection_at(freq_channels)
if not rejection.any():
return
fnf.dft_filter_channels(
frame_data=self.data,
rejection=rejection,
n_frames=self.integration.size)
[docs]
def calc_point_response(self):
"""
Returns the point response of a Gaussian source for the filter.
It is assumed a Gaussian source profile is crossed during the
integration crossing time such that
sigma(t) = 1
t/2.35 * 2pi * sigma(f) = 1
sigma(f) = 2.35 / (2pi * t)
df = 1 / (n dt)
sigma(F) = sigma(f) / df = 2.35 * n * dt / (2pi * t)
The filter response is not accounted for below the high-pass
timescale.
Returns
-------
point_response : float
"""
t = self.integration.get_point_crossing_time()
sigma = gaussian_sigma_to_fwhm / (2 * np.pi * t
* self.df).decompose().value
a = -0.5 / (sigma ** 2)
f0 = self.integration.get_modulation_frequency(
'TOTAL_POWER').decompose().value / self.df # Usually the chopper
# Calculate the x=0 component -- O(N)
# Below the hi-pass time-scale, the filter has no effect, so count it
# as such
min_fi = self.get_high_pass_index()
frequencies = np.arange(self.nf + 1)
response = self.response_at(frequencies).copy()
response[:min_fi] = 1.0
# Calculate the true source filtering above the hi-pass timescale
# Consider it a symmetric source profile, peaked at the origin --
# It's peak is simply the sum of the cosine terms, which are the real
# part of the amplitudes. So, the filter correction is simply the
# ratio of the sum of the filtered real amplitudes relative to the sum
# of the original real amplitudes.
source_response = np.exp(a * ((frequencies - f0) ** 2))
source_response += np.exp(a * ((frequencies + f0) ** 2))
return np.sum(source_response * response) / np.sum(source_response)
[docs]
def get_high_pass_index(self):
"""
Return the high pass filter frequency index.
Returns
-------
index : int
"""
hi_pass_f = (0.5 / self.integration.filter_time_scale).to(
unit='Hz').value
if np.isnan(hi_pass_f) or hi_pass_f < 0:
return 1
return int(np.ceil(hi_pass_f / self.df))
[docs]
def level_data_for_channels(self, channels=None):
"""
Level (remove average) from the filter data.
Parameters
----------
channels : ChannelGroup, optional
The channels for which to average the data, by default
Returns
-------
None
"""
self.level_for_channels(self.data, channels=channels)
[docs]
def level_for_channels(self, signal, channels=None):
"""
Level (remove average) from a given signal.
Parameters
----------
signal : numpy.ndarray (float)
The data to level of shape (channels.size, n_frames).
channels : ChannelGroup, optional
The channel group for which the data applied. By default set
to the filtering channels.
Returns
-------
None
"""
if channels is None or channels is self.channels:
channel_indices = np.arange(self.channels.size)
else:
channel_indices = self.channels.find_fixed_indices(
channels.fixed_index)
frames = self.integration.frames
fnf.level_for_channels(
signal=signal,
valid_frame=frames.valid,
modeling_frame=frames.is_flagged('MODELING_FLAGS'),
sample_flag=frames.sample_flag,
channel_indices=channel_indices)
[docs]
def level_data(self):
"""
Level (remove average from) the filter data.
Returns
-------
None
"""
self.level(self.data)
[docs]
def level(self, signal):
"""
Level the given signal data.
Parameters
----------
signal : numpy.ndarray (float)
The signal data to level. Leveling occurs for each channel over
all frames. The signal shape should be (n_channels, n_frames) or
(n_frames,).
Returns
-------
None
"""
if signal.ndim == 1:
fnf.level_1d(data=signal, n_frames=self.integration.size)
if signal.ndim == 2:
fnf.level(data=signal, n_frames=self.integration.size)
[docs]
def set_dft(self, value):
"""
Set the DFT flag that determines whether to use FFT or DFT filtering.
Parameters
----------
value : bool
If `True`, sets the filtering method to DFT.
Returns
-------
None
"""
self.dft = value
[docs]
@abstractmethod
def get_id(self): # pragma: no cover
"""
Return the filter ID.
Returns
-------
filter_id : str
"""
pass
[docs]
@abstractmethod
def get_config_name(self): # pragma: no cover
"""
Return the configuration name.
Returns
-------
config_name : str
"""
pass
[docs]
@abstractmethod
def response_at(self, fch): # pragma: no cover
"""
Return the response at a given frequency channel(s).
Parameters
----------
fch : int or numpy.ndarray (int or bool) or slice
The frequency channel or channels in question.
Returns
-------
response : float or numpy.ndarray (float)
"""
pass
[docs]
@abstractmethod
def get_mean_point_response(self): # pragma: no cover
"""
Return the mean point response of the filter.
Returns
-------
response : float
"""
pass