[docs]
class AdaptiveFilter(VariedFilter):
def __init__(self, integration=None, data=None):
"""
Initialize an integration adaptive filter.
The adaptive filter is an abstract class where each channel has an
individual frequency response (see :class:`VariedFilter`).
Parameters
----------
integration : Integration, optional
data : numpy.ndarray (float), optional
An array of shape (nt, n_channels) where nt is the nearest power of
2 integer above the number of integration frames. i.e., if
n_frames=5, nt=8, or if n_frames=13, nt=16. If not provided will
be set to frame_data * frame_relative_weight.
"""
self.channel_profiles = None
self.profile = None
self.nF = 0
self.dF = np.nan
super().__init__(integration=integration, data=data)
@property
def channel_dependent_attributes(self):
"""
Return attributes that are dependent on the parent channels.
This is required during a slim operation (reducing number of channels).
Returns
-------
set (str)
"""
attributes = super().channel_dependent_attributes
attributes.add('channel_profiles')
attributes.add('profile')
return attributes
[docs]
def get_profile(self):
"""
Return the adaptive filter profile.
Returns
-------
profile : numpy.ndarray (float)
"""
return self.profile
[docs]
def set_integration(self, integration):
"""
Set the varied filter integration.
Sets the padding of the FFT filter, the number of frequencies, the
frequency spacing, and retrieves the channels from the integration if
necessary.
Parameters
----------
integration : Integration
Returns
-------
None
"""
super().set_integration(integration)
channels = self.get_channels()
self.channel_profiles = np.zeros((channels.size, 0), dtype=float)
[docs]
def set_size(self, nf):
"""
Set the number of frequencies in the adaptive filter.
Parameters
----------
nf : int
The number of frequencies in the adaptive filter.
Returns
-------
None
"""
if self.profile is None or self.profile.shape[1] != nf:
self.profile = np.zeros((self.channels.size, nf), dtype=float)
channels = self.get_channels()
dt = self.integration.info.sampling_interval.decompose().value
self.dF = 0.5 / (nf * dt)
self.update_source_profile()
if self.channel_profiles.size != 0:
old_profile = self.channel_profiles.copy()
self.channel_profiles = np.zeros((channels.size, nf), dtype=float)
self.resample(old_profile, self.channel_profiles)
[docs]
def resample(self, old_profile, new_profile):
"""
ResamplePolynomial the old profile to a new profile.
Parameters
----------
old_profile : numpy.ndarray (float)
An array of shape (n_channels, n1) containing the current profile.
new_profile : numpy.ndarray (float)
The new array of shape (n_channels, n2) to populate.
Returns
-------
None
"""
fnf.resample(old_profile, new_profile)
self.channel_profiles = new_profile
[docs]
def post_filter_channels(self, channels=None):
"""
Performs the post-filtering channels steps.
Parameters
----------
channels : ChannelGroup, optional
The channel group for which to perform the post-filtering step.
If not supplied, defaults to the filtering channels.
Returns
-------
None
"""
self.accumulate_profiles(channels=channels)
super().post_filter_channels(channels=channels)
[docs]
def accumulate_profiles(self, channels=None):
"""
Accumulates the channel profiles into a single profile.
Parameters
----------
channels : ChannelGroup, optional
The channel group for which to accumulate profiles.
The accumulated profile is the multiplication of each channel profile.
Returns
-------
None
"""
if channels is None:
channel_indices = np.arange(self.channels.size)
else:
channel_indices = self.channels.find_fixed_indices(
channels.fixed_index)
fnf.accumulate_profiles(
profiles=self.profile,
channel_profiles=self.channel_profiles,
channel_indices=channel_indices)
[docs]
def response_at(self, fch):
"""
Return the response at a given frequency channel(s).
Parameters
----------
fch : int or numpy.ndarray (int or bool) or slice
The frequency channel or channels in question.
Returns
-------
response : numpy.ndarray (float)
The response array of shape (n_channels,) or
(n_channels, fch.size).
"""
n_channels = self.channels.size
if self.profile is None:
if not isinstance(fch, np.ndarray) or fch.shape == ():
return np.full(n_channels, 1.0)
return np.full((n_channels, fch.size), 1.0)
indices = fch * self.profile.shape[1] // (self.nf + 1)
return self.profile[:, indices]
[docs]
def get_valid_profiles(self, channels=None):
"""
Return the valid channel profiles.
Parameters
----------
channels : ChannelGroup, optional
The channel group for which to get profiles. The default is
all filtering channels.
Returns
-------
channel_profiles : numpy.ndarray (float)
"""
if channels is None:
channel_indices = np.arange(self.channels.size)
else:
channel_indices = self.channels.find_fixed_indices(
channels.fixed_index)
if self.channel_profiles is None:
if self.profile is not None:
n_freq = self.profile.shape[1]
return np.full((self.channels.size, n_freq), 1.0)
else:
return None
return self.channel_profiles[channel_indices]
[docs]
def count_parms(self, channels=None):
"""
Return the rejection filter sum above the high pass frequency.
channels : ChannelGroup, optional
The channel group for which to determine dependents. The
default is all filtering channels.
Returns
-------
dependents : numpy.ndarray (float)
An array of shape (n_channels,).
"""
if channels is None:
channel_indices = np.arange(self.channels.size)
else:
channel_indices = self.channels.find_fixed_indices(
channels.fixed_index)
n_channels = channel_indices.size
if self.profile is None:
return np.zeros(n_channels, dtype=float)
dt = self.integration.filter_time_scale.decompose().value
high_pass_freq = 0.5 / dt
min_f = int(np.ceil(high_pass_freq / self.dF))
p = self.profile[channel_indices]
parms = np.sum(1.0 - (p[:, min_f:] ** 2), axis=1)
return parms
[docs]
def update_source_profile(self):
"""
Update the filter source profile.
Returns
-------
None
"""
if self.profile is None:
return
if self.source_profile is not None:
if self.source_profile.size == self.profile.shape[1]:
return
nf = self.profile.shape[1]
t = self.integration.get_point_crossing_time().decompose().value
sigma = gaussian_sigma_to_fwhm / (2 * np.pi * t * self.dF)
a = -0.5 / (sigma ** 2)
f = np.arange(nf)
self.source_profile = np.exp(a * (f ** 2))
self.source_norm = float(np.sum(self.source_profile))
[docs]
def calc_point_response(self, channels=None):
"""
Return the point response of the source profile.
channels : ChannelGroup, optional
The channel group for which to calculate the point response. The
default is all filtering channels.
The point response is given as:
sum(profile <below hi-pass>) + sum(profile * response <above hi-pass>)
divided by the `source_norm` attribute.
Returns
-------
response : numpy.ndarray (float)
The point response for each channel of shape (n_channels,).
"""
# Start from the 1/f filter cutoff
dt = self.integration.filter_time_scale.decompose().value
high_pass_freq = 0.5 / dt
min_fch = int(np.ceil(high_pass_freq / self.dF))
if channels is None or channels is self.channels:
channel_indices = np.arange(self.channels.size)
else:
channel_indices = self.channels.find_fixed_indices(
channels.fixed_index)
return fnf.calculate_channel_point_responses(
min_fch=min_fch,
source_profile=self.source_profile,
profiles=self.profile,
channel_indices=channel_indices,
source_norm=self.source_norm)
[docs]
@abstractmethod
def get_id(self): # pragma: no cover
"""
Return the filter ID.
Returns
-------
filter_id : str
"""
pass
[docs]
@abstractmethod
def get_config_name(self): # pragma: no cover
"""
Return the configuration name.
Returns
-------
config_name : str
"""
pass