[docs]
class SofiaIntegration(Integration):
def __init__(self, scan=None):
"""
Initialize a SOFIA integration.
Parameters
----------
scan : sofia_redux.scan.custom.sofia.scan.scan.SofiaScan, optional
The scan to which this integration belongs.
"""
super().__init__(scan=scan)
[docs]
def set_tau(self, spec=None, value=None):
"""
Set the tau values for the integration.
If a value is explicitly provided without a specification, will be used
to set the zenith tau if ground based, or transmission. If a
specification and value is provided, will set the zenith tau as:
((band_a / t_a) * (value - t_b)) + band_b
where band_a/b are retrieved from the configuration as tau.<spec>.a/b,
and t_a/b are retrieved from the configuration as tau.<instrument>.a/b.
Parameters
----------
spec : str, optional
The tau specification to read from the configuration. If not
supplied, will be read from the configuration 'tau' setting.
value : float, optional
The tau value to set. If not supplied, will be retrieved from the
configuration as tau.<spec>.
Returns
-------
None
"""
if spec is None:
if not self.configuration.is_configured('tau'):
return
if spec is None and value is None:
spec = self.configuration.get_string('tau').lower()
if spec == 'atran':
self.set_atran_tau()
elif spec == 'pwvmodel':
self.set_pwv_model_tau()
else:
super().set_tau(spec=spec, value=value)
[docs]
def set_atran_tau(self):
"""
Set tau based on the ATRAN model.
Returns
-------
None
"""
model = AtranModel(self.configuration.get_options('atran'))
altitude = self.info.aircraft.altitude.midpoint
elevation = self.get_mid_elevation()
c = model.get_relative_transmission(altitude, elevation)
log.debug(f"Applying ATRAN model atmospheric correction: {c:.3f}")
tau = -np.log(model.reference_transmission * c)
tau *= np.sin(elevation)
if isinstance(tau, units.Quantity):
tau = tau.value
self.set_tau_value(tau)
[docs]
def set_pwv_model_tau(self):
"""
Set tau based on the PWV (precipitable water vapor) model.
Returns
-------
None
"""
pwv = self.get_model_pwv().to('micrometer')
log.debug(f"Using PWV model to correct fluxes: PWV = {pwv:.1f}")
self.configuration.parse_key_value('tau.pwv', str(pwv.value))
self.set_tau(spec='pwv', value=pwv.to('micrometer').value)
[docs]
def get_modulation_frequency(self, signal_flag):
"""
Return the modulation frequency.
The modulation frequency is taken from the chopper frequency if
available, or set to 0 Hz otherwise.
Parameters
----------
signal_flag : FrameFlagTypes or str or int
The signal flag (not relevant for this implementation).
Returns
-------
frequency : astropy.units.Quantity.
The modulation frequency in Hz.
"""
if self.info.chopping.chopping:
return self.info.chopping.frequency
return super().get_modulation_frequency(signal_flag)
[docs]
def get_mean_pwv(self):
"""
Get the mean PWV of the integration.
Returns
-------
pwv : astropy.units.Quantity
The mean PWV.
"""
pwv = self.frames.pwv[self.frames.valid]
if pwv.size == 0 or np.isnan(pwv).all():
return np.nan * units.Unit('micrometer')
return np.nanmean(self.frames.pwv[self.frames.valid])
[docs]
def get_mid_elevation(self):
"""
Return the mid-point elevation of the integration.
The mid-point is defined as the average of the elevations of the
first and last valid integration frame.
Returns
-------
elevation : astropy.units.Quantity
"""
el0 = self.frames.horizontal.el[self.get_first_frame_index()]
el1 = self.frames.horizontal.el[self.get_last_frame_index()]
return 0.5 * (el0 + el1)
[docs]
def get_mean_chopper_position(self):
"""
Return the mean chopper position for the integration.
Returns
-------
position : Coordinate2D
The mean (x, y) chopper position.
"""
return self.frames.chopper_position[self.frames.valid].mean()
[docs]
def get_model_pwv(self):
"""
Estimate the PWV from the altitude of the observation.
Returns
-------
PWV : astropy.units.Quantity
The derived PWV.
"""
log.debug("Estimating PWV based on altitude...")
pwv41k = self.configuration.get_float(
'pwv41k', default=29.0) * units.Unit('micrometer')
b = 1.0 / self.configuration.get_float('pwvscale', default=5.0)
alt_kf = (self.info.aircraft.altitude.midpoint / 1000).value
return pwv41k * np.exp(-b * (alt_kf - 41.0))
[docs]
def validate(self):
"""
Validate the integration after a read.
Returns
-------
None
"""
self.validate_pwv()
super().validate()
[docs]
def validate_pwv(self):
"""
Validate the frame PWV set a new value in the configuration if
necessary.
Returns
-------
None
"""
pwv = self.get_mean_pwv()
if pwv == 0 or np.isnan(pwv):
log.debug("--> FIX: Using default PWV model...")
pwv = self.get_model_pwv()
log.debug(f"PWV: {pwv:.3f}")
self.configuration.parse_key_value('tau.pwv', str(pwv.value))