Source code for sofia_redux.scan.filters.whitening_filter

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from astropy import log
import numpy as np

from sofia_redux.scan.filters.adaptive_filter import AdaptiveFilter
from sofia_redux.scan.utilities.range import Range
from sofia_redux.scan.utilities import numba_functions
from sofia_redux.scan.filters import filters_numba_functions as fnf
from sofia_redux.scan.utilities import utils

__all__ = ['WhiteningFilter']


[docs] class WhiteningFilter(AdaptiveFilter): def __init__(self, integration=None, data=None): """ Initialize an integration whitening filter. Parameters ---------- integration : Integration, optional data : numpy.ndarray (float), optional An array of shape (nt, n_channels) where nt is the nearest power of 2 integer above the number of integration frames. i.e., if n_frames=5, nt=8, or if n_frames=13, nt=16. If not provided will be set to frame_data * frame_relative_weight. """ self.level = 1.2 self.significance = 2.0 self.windows = -1 # The number of stacked windows # The frequency channel indices of the white-level measuring range self.white_from = -1 self.white_to = -1 self.one_over_f_bin = -1 self.white_noise_bin = -1 self.amplitudes = None # The amplitude at reduced resolution self.amplitude_weights = None super().__init__(integration=integration, data=data) @property def channel_dependent_attributes(self): """ Return attributes that are dependent on the parent channels. This is required during a slim operation (reducing number of channels). Returns ------- set (str) """ attributes = super().channel_dependent_attributes attributes.add('amplitudes') attributes.add('amplitude_weights') return attributes
[docs] def update_config(self): """ Determine whether the filter is configuration and if it's pedantic. Loads additional settings for the whitening filter including size of the profiles and assigning noise bins. Returns ------- None """ super().update_config() self.set_size(self.integration.frames_for( self.integration.filter_time_scale)) if self.has_option('level'): level = np.sqrt(utils.get_float(self.option('level'))) self.level = max(1.0, level) window_size = numba_functions.pow2ceil(2 * self.nF) if self.nt < window_size: window_size = self.nt self.windows = self.nt // window_size if self.has_option('proberange'): probe_range_spec = self.option('proberange') if probe_range_spec.lower() == 'auto': # The frequency cut off (half-max) of the typical # point-source response f_point = (0.44 / self.integration.get_point_crossing_time() ).to(unit='Hz').value probe = Range(min_val=0.2 * f_point, max_val=1.14 * f_point) else: probe = Range.from_spec(probe_range_spec, is_positive=True) else: probe = Range(min_val=0.0, max_val=self.nF * self.dF) probe.intersect_with(0, self.nF) self.white_from = int(max(1, np.floor(probe.min / self.dF))) self.white_to = int(min(self.nF, np.ceil(probe.max / self.dF) + 1)) if self.has_option('1overf.freq'): f = utils.get_float(self.option('1overf.freq')) self.one_over_f_bin = int(np.clip(int(f / self.dF), 1, self.nF)) else: self.one_over_f_bin = 2 if self.has_option('1overf.ref'): f = utils.get_float(self.option('1overf.ref')) self.white_noise_bin = int(np.clip(int(f / self.dF), 1, self.nF)) else: self.white_noise_bin = self.nF // 2 # Make sure the probing range contains enough channels and that the # range is valid if self.has_option('minchannels'): min_probe_channels = utils.get_int(self.option('minchannels')) else: min_probe_channels = 16 max_white = self.white_to - min_probe_channels + 1 if self.white_from > max_white: self.white_from = max_white if self.white_from < 0: # pragma: no cover # This should never happen self.white_from = 0 if self.white_from > max_white: # In case min channels is too large self.white_to = min(min_probe_channels + 1, self.nF)
[docs] def set_size(self, n): """ Set the size of the profiles. Resampling will occur if the size of the profile changes. Parameters ---------- n : float or int The new size. Returns ------- None """ if int(n) == self.nF: return super().set_size(n) self.nF = int(n) shape = self.channels.size, self.nF self.amplitudes = np.empty(shape, dtype=float) self.amplitude_weights = np.empty(shape, dtype=np.float64)
[docs] def update_profile(self, channels=None): """ Update the profile for given channel(s). Parameters ---------- channels : ChannelGroup, optional The channels to update. By default, will use all filtering channels. Returns ------- None """ self.calc_mean_amplitudes(channels=channels) self.whiten_profile(channels=channels)
[docs] def calc_mean_amplitudes(self, channels=None): """ Calculates the mean amplitudes in the window of a spectrum. The amplitude of the FFT spectrum is calculated as the mean value within a given window (usually 1). The weight of the mean operation will also be stored in the `amplitude_weights` attribute. These are used later to calculate the channel profiles. Parameters ---------- channels : ChannelGroup, optional The channels to update. By default, will use all filtering channels. Returns ------- None """ if channels is None or channels is self.channels: channel_indices = np.arange(self.channels.size) else: channel_indices = self.channels.find_fixed_indices( channels.fixed_index) if self.channel_profiles is None or self.channel_profiles.size == 0: self.channel_profiles = np.ones((self.channels.size, self.nF), dtype=float) self.amplitudes.fill(0.0) self.amplitude_weights.fill(0.0) # Get coarse average spectrum (FFT is stored in the filter attribute) fnf.calc_mean_amplitudes( amplitudes=self.amplitudes, amplitude_weights=self.amplitude_weights, spectrum=self.data, windows=self.windows, channel_indices=channel_indices)
[docs] def whiten_profile(self, channels=None): """ Create the channel filtering profiles for whitening. Will also set channel 1/f noise statistics. Parameters ---------- channels : ChannelGroup, optional The channels to update. By default, will use all filtering channels. Returns ------- None """ log.debug("Whitening channel profile.") if channels is None or channels is self.channels: channel_indices = np.arange(self.channels.size) else: channel_indices = self.channels.find_fixed_indices( channels.fixed_index) one_over_f_stat = self.channels.one_over_f_stat one_over_f_stat[channel_indices] = fnf.whiten_profile( amplitudes=self.amplitudes, amplitude_weights=self.amplitude_weights, profiles=self.profile, channel_profiles=self.channel_profiles, white_from=self.white_from, white_to=self.white_to, filter_level=self.level, significance=self.significance, one_over_f_bin=self.one_over_f_bin, white_noise_bin=self.white_noise_bin, channel_indices=channel_indices) self.channels.one_over_f_stat = one_over_f_stat
[docs] def get_id(self): """ Return the filter ID. Returns ------- filter_id : str """ return 'wh'
[docs] def get_config_name(self): """ Return the configuration name. Returns ------- config_name : str """ return 'filter.whiten'
[docs] def dft_filter(self, channels=None): """ Return the filter rejection using a discrete FFT. UNSUPPORTED FOR THE WHITENING FILTER. Parameters ---------- channels : ChannelGroup, optional The channel group for which the filtering applied. By default, set to the filtering channels. Returns ------- None """ super().dft_filter(channels=channels)