Source code for sofia_redux.visualization.utils.unit_conversion

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from typing import Dict, Any
import warnings
import numpy as np
import astropy.units as u
import astropy.modeling as am

from sofia_redux.visualization import log

__all__ = ['parse_unit', 'convert_flux', 'convert_wave']

Me = u.def_unit('Me', u.Mct)
sec = u.def_unit('sec', u.s)
cm1 = u.def_unit('cm-1', u.kayser)
u.add_enabled_units([Me, sec, cm1])


[docs] def parse_unit(unit_string): """ Parse a unit string into a unit object. Parameters ---------- unit_string : str Unit to be parsed. Generally, any unit understood by the `astropy.units` module is supported. Additionally, 'cm-1' is supported as an alias for the 'kayser' unit. Returns ------- astropy.units.Unit Raises ------ ValueError If the unit is not recognized. """ with warnings.catch_warnings(): warnings.filterwarnings('ignore', module='astropy') try: unit = u.Unit(unit_string, parse_strict='raise') except ValueError: new_string = unit_string.replace('(cm-1)', 'kayser') log.debug(f'\tValue Error, try {new_string}') unit = u.Unit(new_string, parse_strict='silent') log.debug(f'Parsed {unit_string} into {unit} ({type(unit)}).') return unit
[docs] def convert_flux(in_flux, start_unit, end_unit, wavelength, wave_unit): """ Convert flux data to new units. The wavelength array is required to convert from some flux units to others, via the `astropy.units.spectral_density` equivalency. Parameters ---------- in_flux : array-like Input flux data to convert. start_unit : astropy.units.Unit or str Starting unit for flux data, to convert from. end_unit : astropy.units.Unit or str Desired unit for flux data, to convert to. wavelength : array-like Wavelength data associated with the flux. wave_unit : astropy.units.Unit or str Current wavelength unit. Returns ------- out_flux : array-like Converted flux array. The wavelength array is unchanged. Raises ------ ValueError If the input and output units are incompatible. """ # check for same input values if start_unit == end_unit: log.debug(f'Start and end units are same: {start_unit}') return in_flux if isinstance(wave_unit, str): wave_unit = u.Unit(wave_unit) if isinstance(start_unit, str): start_unit = u.Unit(start_unit) if isinstance(end_unit, str): end_unit = u.Unit(end_unit) # check again in case new unit equalities were found if start_unit == end_unit: log.debug(f'Start and end units are same: {start_unit}') return in_flux log.debug(f'Converting {len(wavelength)} of {type(wavelength)} ' f'{type(wavelength[0])}') wave = wavelength * wave_unit in_flux = in_flux * start_unit try: out_flux = in_flux.to(end_unit, equivalencies=u.spectral_density(wave)) except u.core.UnitConversionError: log.debug(f'Units not convertible: {start_unit} -> {end_unit}') raise ValueError('Inconvertible units') from None return out_flux.value
[docs] def convert_wave(wavelength: np.array, start_unit: str, end_unit: str) -> np.array: """ Convert wavelength data to new units. Parameters ---------- wavelength : array-like Input wavelength data to convert. start_unit : str Starting unit for wavelength data, to convert from. end_unit : str Desired unit for wavelength data, to convert to. Returns ------- out_wave : array-like Converted wavelength array. Raises ------ ValueError If the input and output units are incompatible. """ start = u.Unit(start_unit) end = u.Unit(end_unit) if start == end: log.debug(f'Start and end units are same: {start_unit}') return wavelength log.debug(f'Converting wavelength from {start_unit} to {end_unit}') try: out_wave = (wavelength * start).to(end, equivalencies=u.spectral()).value except u.core.UnitConversionError: log.debug(f'Units not convertible: {start_unit} -> {end_unit}') raise ValueError('Inconvertible units') from None return out_wave
def convert_model_fit(fit: am.Model, start_units: Dict[str, Any], end_units: Dict[str, Any], wave: np.ndarray): """ Converts units of a fitted model. Parameters ---------- fit : astropy.modeling.Model Fitted model. start_units : dict Dictionary of starting units containing a key (axis) and a corresponding value (unit/number). end_units : dict Dictionary of ending units containing a key (axis) and a corresponding value (unit/number). wave : np.ndarray A wavelength array from the initial fit, to be used for unit conversion. Returns ------- fit : astropy.modeling.Model Updated actual fit to the data after unit conversion. changed : bool Boolean to denotes if a unit change in the fit is done or not. Raises: ------- ValueError If no input unit has been assigned. """ input_units = _confirm_quantity(start_units) output_units = _confirm_quantity(end_units) old_fit = fit.with_units_from_data(**input_units) new_fit = fit.with_units_from_data(**output_units) changed = False equivs = u.spectral_density(wave * input_units['x']) for name in fit.param_names: old_param = getattr(old_fit, name) new_param = getattr(new_fit, name) if new_param.unit is None or new_param.unit == old_param.unit: continue try: quantity = (old_param.value * old_param.unit).to( new_param.unit).value except u.core.UnitConversionError: try: quantity = (old_param.value * old_param.unit).to( new_param.unit, equivalencies=equivs).value except (u.core.UnitConversionError, IndexError): try: quantity = _convert_slope(old_param, new_param, equivs, input_units, output_units) except (u.core.UnitConversionError, IndexError): raise ValueError(f'Unable to convert units for ' f'{old_param} from {old_param.unit} ' f'to {new_param.unit}') from None setattr(fit, name, quantity) changed = True return fit, changed def _confirm_quantity(units): """ A method to determine quantity for each key in the units dictionary. Parameters ----------- units: dict A dictionary containing key and a corresponding value. Returns ------- quantities: dict A dictionary containing keys (axis) and their corresponding values which are obtained by determining if its a unit or a number. """ quantities = dict() for key, value in units.items(): if isinstance(value, str): quant = 1 * parse_unit(value) elif isinstance(value, u.UnitBase): quant = 1 * value elif isinstance(value, u.quantity.Quantity): quant = value else: quant = None key = key.split('_')[0] quantities[key] = quant return quantities def _convert_slope(old_param, new_param, equivs, input_units, output_units): """ Converts units of a slope. Parameters ---------- old_param : astropy.modelling.Model.name Old fitting parameters of a model. new_param : astropy.modelling.name New fitting parameters of a model. equivs : astropy.units.spectral_density Equivalence pairs that handle spectral density with reagrd to wavelength. input_units : dict A dictionary containing keys (axis) and their corresponding input units. output_units : dict A dictionary containing keys (axis) and their corresponding output units. Returns ------- quantity : float """ # remove denominator to get just flux old_top = old_param * input_units['x'] new_top = new_param * input_units['x'] try: converted_top = (old_top.value * old_top.unit).to( new_top.unit).value except u.core.UnitConversionError: converted_top = (old_top.value * old_top.unit).to( new_top.unit, equivalencies=equivs).value try: quantity = converted_top * input_units['x'].to(output_units['x']) except u.core.UnitConversionError: # pragma: no cover # no use case for this clause yet quantity = converted_top * input_units['x'].to(output_units['x'], equivalencies=equivs) try: return quantity.value except AttributeError: return quantity