Source code for sofia_redux.scan.custom.sofia.info.info

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from abc import abstractmethod
from astropy import units, log
from astropy.coordinates import Angle
import os
import numpy as np

from sofia_redux.scan.info.weather_info import WeatherInfo
from sofia_redux.scan.custom.sofia.info.instrument import SofiaInstrumentInfo
from sofia_redux.scan.custom.sofia.info.astrometry import SofiaAstrometryInfo
from sofia_redux.scan.custom.sofia.info.aircraft import SofiaAircraftInfo
from sofia_redux.scan.custom.sofia.info.chopping import SofiaChoppingInfo
from sofia_redux.scan.custom.sofia.info.detector_array import (
    SofiaDetectorArrayInfo)
from sofia_redux.scan.custom.sofia.info.dithering import SofiaDitheringInfo
from sofia_redux.scan.custom.sofia.info.environment import SofiaEnvironmentInfo
from sofia_redux.scan.custom.sofia.info.mapping import SofiaMappingInfo
from sofia_redux.scan.custom.sofia.info.mission import SofiaMissionInfo
from sofia_redux.scan.custom.sofia.info.mode import SofiaModeInfo
from sofia_redux.scan.custom.sofia.info.nodding import SofiaNoddingInfo
from sofia_redux.scan.custom.sofia.info.observation import SofiaObservationInfo
from sofia_redux.scan.custom.sofia.info.origination import SofiaOriginationInfo
from sofia_redux.scan.custom.sofia.info.processing import SofiaProcessingInfo
from sofia_redux.scan.custom.sofia.info.scanning import SofiaScanningInfo
from sofia_redux.scan.custom.sofia.info.spectroscopy import (
    SofiaSpectroscopyInfo)
from sofia_redux.scan.custom.sofia.info.telescope import SofiaTelescopeInfo
from sofia_redux.scan.utilities import utils
from sofia_redux.scan.info.camera.info import CameraInfo

__all__ = ['SofiaInfo']


[docs] class SofiaInfo(WeatherInfo, CameraInfo): def __init__(self, configuration_path=None): """ Initialize a SofiaInfo object. The SOFIA information contains metadata on various parts of an observation that are specific to SOFIA. Parameters ---------- configuration_path : str, optional An alternate directory path to the configuration tree to be used during the reduction. The default is <package>/data/configurations. """ super().__init__(configuration_path=configuration_path) self.name = 'sofia' self.history = [] self.configuration_files = set() self.instrument = SofiaInstrumentInfo() self.astrometry = SofiaAstrometryInfo() self.aircraft = SofiaAircraftInfo() self.chopping = SofiaChoppingInfo() self.detector_array = SofiaDetectorArrayInfo() self.dithering = SofiaDitheringInfo() self.environment = SofiaEnvironmentInfo() self.mapping = SofiaMappingInfo() self.mission = SofiaMissionInfo() self.mode = SofiaModeInfo() self.nodding = SofiaNoddingInfo() self.observation = SofiaObservationInfo() self.origin = SofiaOriginationInfo() self.processing = SofiaProcessingInfo() self.scanning = SofiaScanningInfo() self.spectroscopy = SofiaSpectroscopyInfo() self.telescope = SofiaTelescopeInfo()
[docs] def register_config_file(self, filename): """ Register a configuration file in the history and for reference. Parameters ---------- filename : str Returns ------- None """ super().register_config_file(filename) if filename is None: return self.configuration_files.add(filename) self.append_history_message(f'AUX: {filename}')
[docs] def read_configuration(self, configuration_file='default.cfg', validate=True): """ Read and apply a configuration file. Parameters ---------- configuration_file : str, optional Path to, or name of, a configuration file. validate : bool, optional If `True` (default), validate information read from the configuration file. Returns ------- None """ super().read_configuration(configuration_file=configuration_file, validate=validate) config_files = self.configuration.config_files if config_files is None: # pragma: no cover # Very hard to hit this code return config_files = list(np.unique(config_files)) for config_file in config_files: self.append_history_message(f'AUX: {config_file}')
[docs] def get_name(self): """ Return the name of the instrument. Returns ------- name : str """ if self.instrument is None or self.instrument.name is None: return super().get_name() return self.instrument.name
[docs] def apply_configuration(self): """ Apply a configuration to the information. Returns ------- None """ super().apply_configuration() log.info(f"[{self.observation.source_name}] " f"of AOR {self.observation.aor_id}") log.info(f"Observed on {self.astrometry.date} " f"at {self.astrometry.start_time} " f"by {self.origin.observer}") if self.astrometry.equatorial is not None: log.info(f"Equatorial: {self.astrometry.equatorial}") if self.telescope.boresight_equatorial is not None: log.info(f"Boresight: {self.telescope.boresight_equatorial}") if self.astrometry.requested_equatorial is not None: log.info(f"Requested: {self.astrometry.requested_equatorial}") kft = (self.aircraft.altitude.midpoint.to(self.aircraft.kft)).value temp = self.environment.ambient_t.to( units.K, equivalencies=units.temperature()).value log.info(f"Altitude: {kft:.2f} kft, Tamb: {temp:.3f} K") if self.telescope.focus_t is not None: log.info(f"Focus: {self.telescope.focus_t}") hwp = self.configuration.get_float('hwp', default=np.nan) if not np.isnan(hwp): hwp_header = hwp, 'Actual value of the initial HWP angle (degree)' self.configuration.fits.preserved_cards['HWPINIT'] = hwp_header self.configuration.fits.header['HWPINIT'] = hwp_header self.configuration.fits.reread() self.configuration.merge_fits_options() self.parse_history(self.configuration.fits.header)
[docs] def append_history_message(self, message): """ Add a FITS history message for later addition to a FITS header. Parameters ---------- message : str or list (str) The history message(s) to add. Returns ------- None """ if message is None: return if self.history is None: self.history = [] if isinstance(message, str): if message in self.history: return self.history.append(message) elif isinstance(message, list): for msg in message: self.append_history_message(msg)
[docs] def edit_image_header(self, header, scans=None): """ Edit an image header with available information. Parameters ---------- header : astropy.fits.Header The FITS header to apply. scans : list (Scan), optional A list of scans to use during editing. Returns ------- None """ super().edit_image_header(header, scans=scans) if scans is None: return mjds = [scan.mjd for scan in scans] first_scan = scans[np.argmin(mjds)] last_scan = scans[np.argmax(mjds)] aors = [scan.info.observation.aor_id for scan in scans] aors = [aor for aor in aors if aor is not None] mission_ids = [scan.info.mission.mission_id for scan in scans] mission_ids = [mid for mid in mission_ids if mid is not None] freqs = [scan.info.instrument.frequency.decompose().value for scan in scans] # SOFIA date and time keys header['DATE-OBS'] = first_scan.info.astrometry.time_stamp header.comments['DATE-OBS'] = 'Start of observation' utc_range = [first_scan.info.astrometry.utc.start, last_scan.info.astrometry.utc.end] for i, utc in enumerate(utc_range): if isinstance(utc, units.Quantity) and utc.unit == 'hour': utc_range[i] = utc.value * units.Unit('hourangle') utc_str = ['00:00:NaN'] * 2 if not np.isnan(utc_range[0]): utc_str[0] = Angle(utc_range[0]).to_string( sep=':', pad=True, precision=3) if not np.isnan(utc_range[1]): utc_str[1] = Angle(utc_range[1]).to_string( sep=':', pad=True, precision=3) header['UTCSTART'] = utc_str[0], 'UTC start of first scan' header['UTCEND'] = utc_str[1], 'UTC end of last scan' # SOFIA INSTRUMENT keys self.instrument.exposure_time = self.get_total_exposure_time( scans=scans) # SOFIA array keys if self.detector_array is not None and len(scans) == 1: self.detector_array.boresight_index = ( first_scan.info.detector_array.boresight_index) self.edit_header(header) # SOFIA observation keys first_scan.info.observation.edit_header(header) # SOFIA mission keys first_scan.info.mission.edit_header(header) # SOFIA origination keys origin = first_scan.info.origin.copy() if self.configuration.has_option('organization'): origin.organization = self.configuration.get_string('organization') origin.creator = 'sofscan' origin.filename = None # FILENAME fills automatically at writing. origin.edit_header(header) # SOFIA environmental keys environment = first_scan.info.environment.copy() environment.merge(last_scan.info.environment) environment.edit_header(header) # SOFIA aircraft keys aircraft = first_scan.info.aircraft.copy() aircraft.merge(last_scan.info.aircraft) aircraft.edit_header(header) # SOFIA telescope keys telescope = first_scan.info.telescope.copy() telescope.merge(last_scan.info.telescope) telescope.edit_header(header) # SOFIA collection keys first_scan.info.mode.edit_header(header) if first_scan.info.mode.is_chopping: first_scan.info.chopping.edit_header(header) if first_scan.info.mode.is_nodding: first_scan.info.nodding.edit_header(header) if first_scan.info.mode.is_dithering: dither = first_scan.info.dithering.copy() if len(scans) > 1: dither.index = utils.UNKNOWN_INT_VALUE dither.edit_header(header) if first_scan.info.mode.is_mapping: first_scan.info.mapping.edit_header(header) if first_scan.info.mode.is_scanning: scanning = first_scan.info.scanning.copy() scanning.merge(last_scan.info.scanning) scanning.edit_header(header) # SOFIA data processing keys processing = self.processing.get_processing( is_calibrated=self.configuration.has_option('calibrated'), dims=header.get('NAXIS', 0), quality_level=self.get_lowest_quality(scans)) processing.associated_aors = aors processing.associated_mission_ids = mission_ids processing.associated_frequencies = freqs processing.edit_header(header) first_scan.info.configuration.add_preserved_header_keys(header)
[docs] @staticmethod def has_tracking_error(scans): """ Report whether any scan in a set contains a telescope tracking error. Parameters ---------- scans : list (SofiaScan) A list of scans. Returns ------- tracking_error : bool `True` if any scan contains a telescope tracking error. `False` otherwise. """ if scans is None: return False for scan in scans: if scan.info.telescope.has_tracking_error: return True else: return False
[docs] def edit_header(self, header): """ Edit a scan header with available information. Parameters ---------- header : astropy.fits.Header The FITS header to apply. Returns ------- None """ self.observation.edit_header(header) self.mission.edit_header(header) self.origin.edit_header(header) self.environment.edit_header(header) self.aircraft.edit_header(header) self.telescope.edit_header(header) self.instrument.edit_header(header) self.mode.edit_header(header) if self.chopping is not None: self.chopping.edit_header(header) if self.nodding is not None: self.nodding.edit_header(header) if self.dithering is not None: self.dithering.edit_header(header) if self.mapping is not None: self.mapping.edit_header(header) if self.scanning is not None: self.scanning.edit_header(header) self.processing.edit_header(header)
[docs] @staticmethod def get_total_exposure_time(scans=None): """ Return the total integration time for a list of scans. Parameters ---------- scans : list (Scan), optional A list of scans from which to get the total integration time. Returns ------- time : astropy.units.Quantity """ time = 0.0 * units.Unit('second') if scans is None: return time for scan in scans: time += scan.info.instrument.exposure_time return time
[docs] @staticmethod def get_lowest_quality(scans): """ Return the lowest quality processing flag from a list of scans. Parameters ---------- scans : list (Scan) A list of scans from which to determine the lowest quality processing level. Returns ------- QualityFlagTypes """ flag_values = [scan.info.processing.quality_level.value for scan in scans] min_index = np.argmin(flag_values) return scans[min_index].info.processing.quality_level
[docs] def add_history(self, header, scans=None): """ Add HISTORY messages to a FITS header. Parameters ---------- header : astropy.io.fits.header.Header The header to update with HISTORY messages. scans : list (Scan), optional A list of scans to add HISTORY messages from if necessary. Returns ------- None """ super().add_history(header, scans=scans) self.validate_configuration_registration() self.append_history_message(f'PWD: {os.getcwd()}') # Add obs-ID for all input scans if scans is not None: if not isinstance(scans, list): scans = [scans] for i, scan in enumerate(scans): self.append_history_message( f' OBS-ID[{i + 1}]: {scan.get_id()}') if self.history is not None: for message in self.history: header['HISTORY'] = f' {message}'
[docs] def parse_history(self, header): """ Parse all history messages in the header to the scan info. Parameters ---------- header : astropy.io.fits.Header Returns ------- None """ self.history = [] if 'HISTORY' in header: history = header['HISTORY'] if isinstance(history, str): self.history = [history] else: self.history = list(history) if len(self.history) > 0: log.debug(f"Processing History: " f"{len(self.history)} entries found.")
[docs] def get_ambient_kelvins(self): """ Get the ambient temperature in Kelvins. Returns ------- kelvins : units.Quantity """ return self.environment.ambient_t.to( 'Kelvin', equivalencies=units.temperature())
[docs] def get_ambient_pressure(self): """ Get the ambient pressure. Returns ------- pressure : units.Quantity """ return np.nan * units.Unit('Pascal')
[docs] def get_ambient_humidity(self): """ Get the ambient humidity. Returns ------- humidity : units.Quantity """ return np.nan * units.Unit('gram/m3')
[docs] def get_wind_direction(self): """ Return the wind direction. Returns the tail vs. head wind. Returns ------- direction : units.Quantity """ if self.aircraft.ground_speed > self.aircraft.air_speed: return -180.0 * units.Unit('degree') else: return 0.0 * units.Unit('degree')
[docs] def get_wind_speed(self): """ Return the wind speed. Returns ------- speed : units.Quantity """ return np.abs(self.aircraft.ground_speed - self.aircraft.air_speed)
[docs] def get_wind_peak(self): """ Return the wind peak. Returns ------- speed : units.Quantity """ return np.nan * units.Unit('m/second')
[docs] def validate_scans(self, scans): """ Validate a list of scans specific to the instrument Parameters ---------- scans : list (SofiaScan) A list of scans. Returns ------- None """ if scans is None or len(scans) == 0 or scans[0] is None: super().validate_scans(scans) return if len(scans) == 1: first_scan = scans[0] if first_scan.get_observing_time() < 3.3 * units.Unit('minute'): self.set_pointing(first_scan) super().validate_scans(scans)
[docs] @staticmethod def get_plate_scale(angular_size, physical_size): """ Return plate scaling. The plate scaling is in radians/m for the focal plane projected through the telescope. Parameters ---------- angular_size : list (astropy.units.Quantity or float) x, y angles representing the projected angular size on the sky. If float values are used, must be supplied in radians. physical_size : list (astropy.units.Quantity or float) x, y physical/geometric size on the focal plane unit. If float values are used, must be in meters. Returns ------- plate_scale : astropy.units.Quantity """ rpm = units.Unit('radian/meter') result = angular_size[0] * angular_size[1] result /= physical_size[0] * physical_size[1] result = np.sqrt(result) if isinstance(result, units.Quantity): return result.to(rpm) else: return result * rpm
[docs] @abstractmethod def get_si_pixel_size(self): # pragma: no cover """ Get the science instrument pixel size. Returns ------- size : Coordinate2D The (x, y) pixel sizes, each of which is a units.Quantity. """ pass
[docs] @abstractmethod def get_file_id(self): # pragma: no cover """ Return the file ID. Returns ------- str """ pass