Source code for sofia_redux.scan.custom.fifi_ls.integration.integration

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from astropy import log, units
import numpy as np

from sofia_redux.scan.custom.fifi_ls.integration import (
    fifi_ls_integration_numba_functions)
from sofia_redux.scan.custom.sofia.integration.integration import (
    SofiaIntegration)

__all__ = ['FifiLsIntegration']


[docs] class FifiLsIntegration(SofiaIntegration): def __init__(self, scan=None): """ Initialize a FIFI-LS integration. Parameters ---------- scan : sofia_redux.scan.custom.hawc_plus.scan.scan.FifiLsScan The scan to which this integration belongs (optional). """ super().__init__(scan=scan) @property def scan_astrometry(self): """ Return the scan astrometry. Returns ------- FifiLsAstrometryInfo """ return super().scan_astrometry
[docs] def apply_configuration(self): """ Apply configuration options to an integration. Returns ------- None """ pass
[docs] def read(self, hdul): """ Read integration information from a FITS HDU List. Parameters ---------- hdul : astropy.io.fits.HDUList A list of data HDUs containing "timestream" data. Returns ------- None """ log.info("Processing scan data:") records = hdul['FLUX'].data.shape[0] log.debug(f"Reading {records} frames from HDU list.") sampling = (1.0 / self.info.instrument.integration_time).to( units.Unit('Hz')) minutes = (self.info.instrument.sampling_interval * records).to( units.Unit('min')) log.info(f"Sampling at {sampling:.3f} ---> {minutes:.2f}.") self.frames.initialize(self, records) self.frames.read_hdul(hdul)
[docs] def validate(self): """ Validate the integration after a read. Returns ------- None """ self.flag_zeroed_channels() super().validate()
[docs] def flag_zeroed_channels(self): """ Flags all channels with completely zeroed frame data as DISCARD/DEAD. Returns ------- None """ log.debug("Flagging zeroed channels.") fifi_ls_integration_numba_functions.flag_zeroed_channels( frame_data=self.frames.data, frame_valid=self.frames.valid, channel_indices=np.arange(self.channels.size), channel_flags=self.channels.data.flag, discard_flag=self.channel_flagspace.convert_flag('DISCARD').value) # Flag discarded channels as DEAD self.channels.data.set_flags( 'DEAD', indices=self.channels.data.is_flagged('DISCARD'))
[docs] def get_full_id(self, separator='|'): """ Return the full integration ID. Parameters ---------- separator : str, optional The separator character/phase between the scan and integration ID. Returns ------- str """ return self.scan.get_id()
[docs] def get_first_frame(self, reference=0): """ Return the first valid frame. Parameters ---------- reference : int, optional The first actual frame index after which to return the first valid frame. The default is the first (0). Returns ------- FifiLsFrames """ return super().get_first_frame(reference=reference)
[docs] def get_last_frame(self, reference=None): """ Return the first valid frame. Parameters ---------- reference : int, optional The last actual frame index before which to return the last valid frame. The default is the last. Returns ------- FifiLsFrames """ return super().get_last_frame(reference=reference)
[docs] def get_crossing_time(self, source_size=None): """ Return the crossing time for a given source size. Parameters ---------- source_size : Coordinate2D1, optional The size of the source. If not supplied, defaults to (in order of priority) the source size in the scan model, or the instrument source size. Returns ------- time : astropy.units.Quantity The crossing time in time units. """ if source_size is None: if self.scan.source_model is None: source_size = self.info.instrument.get_source_size() else: source_size = self.scan.source_model.get_source_size() return super().get_crossing_time(source_size=source_size.x)