# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Utility and convenience functions for common pipecal use cases."""
import warnings
from astropy import log
import numpy as np
from photutils.detection import find_peaks
from sofia_redux.calibration.pipecal_calfac import pipecal_calfac
from sofia_redux.calibration.pipecal_error import PipeCalError
from sofia_redux.calibration.pipecal_photometry import pipecal_photometry
from sofia_redux.calibration.pipecal_rratio import pipecal_rratio
from sofia_redux.toolkit.utilities.fits import hdinsert
__all__ = ['average_za', 'average_alt', 'average_pwv',
'guess_source_position', 'add_calfac_keys',
'add_phot_keys', 'get_fluxcal_factor',
'apply_fluxcal', 'get_tellcor_factor', 'apply_tellcor',
'run_photometry']
[docs]
def average_za(header):
"""
Robust average of zenith angle from FITS header.
Keys used are ZA_START and ZA_END. If both are good
(>= 0), they will be averaged. If only one is good,
it will be returned. If both are bad, a PipeCalError
will be raised.
Parameters
----------
header : `astropy.io.fits.header.Header`
FITS header containing ZA keys.
Returns
-------
float
The ZA value
Raises
------
PipeCalError
If no valid ZA is found.
"""
try:
zasta = float(header['ZA_START'])
except (ValueError, TypeError, KeyError):
zasta = -9999
try:
zaend = float(header['ZA_END'])
except (ValueError, TypeError, KeyError):
zaend = -9999
if zasta > 0 and zaend < 0:
za = zasta
elif zasta < 0 and zaend > 0:
za = zaend
elif zasta < 0 and zaend < 0:
msg = 'Bad ZA value in header'
log.error(msg)
raise PipeCalError(msg)
else:
za = (zasta + zaend) / 2
return za
[docs]
def average_alt(header):
"""
Robust average of altitude from FITS header.
Keys used are ALTI_STA and ALTI_END. If both are good
(>= 0), they will be averaged. If only one is good,
it will be returned. If both are bad, a PipeCalError
will be raised.
Input values are expected in feet; return value is in
kilo-feet, i.e. input values are divided by 1000 before
returning.
Parameters
----------
header : `astropy.io.fits.header.Header`
FITS header containing altitude keys.
Returns
-------
float
The altitude value, in kilo-feet
Raises
------
PipeCalError
If no valid altitude is found.
"""
try:
altsta = float(header['ALTI_STA'])
except (ValueError, TypeError, KeyError):
altsta = -9999
try:
altend = float(header['ALTI_END'])
except (ValueError, TypeError, KeyError):
altend = -9999
if altsta > 0 and altend < 0:
alt = altsta
elif altsta < 0 and altend > 0:
alt = altend
elif altsta < 0 and altend < 0:
msg = 'Bad altitude value in header'
log.error(msg)
raise PipeCalError(msg)
else:
alt = (altsta + altend) / 2
alt /= 1000
return alt
[docs]
def average_pwv(header):
"""
Robust average of precipitable water vapor from FITS header.
Header key used is WVZ_OBS only. WVZ_STA and WVZ_END may be present
in header, but are considered unreliable.
Parameters
----------
header : `astropy.io.fits.header.Header`
FITS header containing PWV keys.
Returns
-------
float
The PWV value
Raises
------
PipeCalError
If no valid PWV is found.
"""
try:
pwv = float(header['WVZ_OBS'])
except (ValueError, TypeError, KeyError):
pwv = -9999
if pwv < 0:
msg = 'Bad PWV value in header'
log.error(msg)
raise PipeCalError(msg)
return pwv
[docs]
def guess_source_position(header, image, srcpos=None):
"""
Estimate the position of a standard source in the image.
The following information sources are checked in order:
1. The srcpos parameter
2. SRCPOSX and SRCPOSY keywords in the header
3. The brightest peak found by photutils.find_peaks.
4. CRPIX1 and CRPIX2 keywords in the header.
If a successful value is found at any stage, no further
checks are done.
Parameters
----------
header : `astropy.io.fits.header.Header`
FITS header corresponding to the image.
image : array
2D image data to check for peaks.
srcpos : list or tuple, optional
If provided, should be the desired source
position, listed as (x,y), indexed from 0.
Returns
-------
list, or None
The estimated source position, as (x,y), zero-indexed.
None is returned if no valid source positions could be found.
"""
# Check the srcpos input. If it isn't provided, fill it
# with info from the header
if not srcpos:
if 'SRCPOSX' in header and 'SRCPOSY' in header:
if header['SRCPOSX'] == 0 and header['SRCPOSY'] == 0:
srcpos = None
else:
srcpos = [header['SRCPOSX'], header['SRCPOSY']]
log.debug('Found SRCPOS in header: {}'.format(srcpos))
# if still no srcpos, try find_peaks
if not srcpos:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
peak = find_peaks(image, np.nanmedian(image), npeaks=1)
try:
srcpos = [peak['x_peak'][0], peak['y_peak'][0]]
log.debug('SRCPOS from find_peaks: {}'.format(srcpos))
except (TypeError, IndexError):
srcpos = None
# if *still* no srcpos, try CRPIX
if not srcpos:
if 'CRPIX1' in header and 'CRPIX2' in header:
srcpos = [header['CRPIX1'], header['CRPIX2']]
log.debug('SRCPOS from CRPIX: {}'.format(srcpos))
else:
srcpos = None
log.debug('SRCPOS not found')
return srcpos
[docs]
def add_calfac_keys(header, config):
"""
Add calibration-related keywords to a header.
The following keys are added or updated:
PROCSTAT, BUNIT, CALFCTR, ERRCALF, LAMREF, LAMPIVOT,
COLRCORR, REFCALZA, REFCALAW, REFCALWV, and REFCALF3.
Parameters
----------
header : `astropy.io.fits.header.Header`
The FITS header to update.
config : dict-like
Calibration configuration values, as produced by
`pipecal.pipecal_config.pipecal_config`.
"""
# assume all necessary values are present in config
runits = config['runits']
f3 = config['refcal_file'].partition(config['caldata'])[-1]
try:
pstat = str(header['PROCSTAT']).strip().upper()
except KeyError:
pstat = 'UNKNOWN'
if pstat != 'LEVEL_4':
hdinsert(header, 'PROCSTAT', 'LEVEL_3', 'Processing status')
hdinsert(header, 'BUNIT', 'Jy/pixel', 'Data units')
hdinsert(header, 'CALFCTR', config['calfac'],
'Calibration factor ({}/Jy)'.format(runits))
hdinsert(header, 'ERRCALF', config['ecalfac'],
'Calibration factor uncertainty ({}/Jy)'.format(runits))
hdinsert(header, 'LAMREF', config['wref'],
'Reference wavelength (microns)')
hdinsert(header, 'LAMPIVOT', config['lpivot'],
'Pivot wavelength (microns)')
hdinsert(header, 'COLRCORR', config['color_corr'],
'Color correction factor')
hdinsert(header, 'REFCALZA', config['rfit_am']['zaref'],
'Reference calibration zenith angle')
hdinsert(header, 'REFCALAW', config['rfit_alt']['altwvref'],
'Reference calibration altitude')
hdinsert(header, 'REFCALWV', config['rfit_pwv']['altwvref'],
'Reference calibration PWV')
hdinsert(header, 'REFCALF3', f3,
'Calibration reference file')
[docs]
def add_phot_keys(header, phot, config=None, srcpos=None):
"""
Add photometry-related keywords to a header.
The primary input for this function (`phot`) should be the data
structure returned by the pipecal_photometry function.
All keys in this structure are added to the FITS header.
In addition, the following keys may be added or updated,
from the `config` or `srcpos` input:
SRCPOSX, SRCPOSY, MODLFLX, MODLFLXE, REFSTD1, REFSTD2,
REFSTD3, LAMREF, LAMPIVOT, COLRCORR, REFCALZA,
REFCALAW, REFCALWV, AVGCALFC, AVGCALER, and RUNITS.
Parameters
----------
header : `astropy.io.fits.header.Header`
The FITS header to update.
phot : list
Photometry values, as produced by
`pipecal.pipecal_photometry.pipecal_photometry`.
config : dict-like, optional
Calibration configuration values, as produced by
`pipecal.pipecal_config.pipecal_config`.
srcpos : list-like, optional
The starting position estimate for photometry, as
(x,y), zero-indexed. If provided, will be set in the
SRCPOSX and SRCPOSY keywords.
"""
# add the starting guess position
if srcpos is not None:
hdinsert(header, 'SRCPOSX', srcpos[0],
'Initial source position for photometry (x)')
hdinsert(header, 'SRCPOSY', srcpos[1],
'Initial source position for photometry (y)')
# add photometry keys
if phot is None:
phot = []
for i in range(len(phot)):
entry = phot[i]
key = entry['key']
val = entry['value']
com = entry['comment']
if isinstance(val, list):
hdinsert(header, key, val[0], com)
hdinsert(header, key + 'E', val[1], 'Error in {}'.format(com))
else:
hdinsert(header, key, val, com)
# add other standard values from config
if config is not None:
# try the model flux keys; pass if not present
try:
mflux = config['std_flux']
mfluxe = mflux * config['std_eflux'] / 100
hdinsert(header, 'MODLFLX', mflux, 'Model flux (Jy)')
hdinsert(header, 'MODLFLXE', mfluxe, 'Model flux error (Jy)')
except KeyError:
pass
# same for the standard reference files
try:
f1 = config['filterdef_file'].partition(config['caldata'])[-1]
f2 = config['stdflux_file'].partition(config['caldata'])[-1]
f3 = config['stdeflux_file'].partition(config['caldata'])[-1]
hdinsert(header, 'REFSTD1', f1, 'Standard reference file')
hdinsert(header, 'REFSTD2', f2, 'Standard reference file')
hdinsert(header, 'REFSTD3', f3, 'Standard reference file')
except KeyError:
pass
# assume basic reference values are present together
if 'wref' in config:
hdinsert(header, 'LAMREF', config['wref'],
'Reference wavelength (microns)')
hdinsert(header, 'LAMPIVOT', config['lpivot'],
'Pivot wavelength (microns)')
hdinsert(header, 'COLRCORR', config['color_corr'],
'Color correction factor')
hdinsert(header, 'REFCALZA', config['rfit_am']['zaref'],
'Reference calibration zenith angle')
hdinsert(header, 'REFCALAW', config['rfit_alt']['altwvref'],
'Reference calibration altitude')
hdinsert(header, 'REFCALWV', config['rfit_pwv']['altwvref'],
'Reference calibration PWV')
# check for an average cal factor
if 'avgcalfc' in config:
hdinsert(header, 'AVGCALFC', config['avgcalfc'],
'Average calibration factor')
hdinsert(header, 'AVGCALER', config['avgcaler'],
'Average cal factor error')
try:
f4 = config['avgcal_file'].partition(config['caldata'])[-1]
hdinsert(header, 'AVCLFILE', f4,
'Average calibration reference file')
except KeyError: # pragma: no cover
pass
# add the raw units for the instrument
if 'runits' in config:
hdinsert(header, 'RUNITS', config['runits'],
'Raw data units (before calibration)')
[docs]
def get_fluxcal_factor(header, config,
update=False, write_history=False):
"""
Retrieve a flux calibration factor from configuration.
The returned factor is intended to divide raw image data
in order to calibrate it to physical units (Jy/pixel).
That is::
calibrated = flux / cal_factor.
The input header may be updated with FITS keywords,
via `add_calfac_keys`, if desired. A history message
may also be added to the header, if desired.
Parameters
----------
header : `astropy.io.fits.header.Header`
The FITS header to update.
config : dict-like
Calibration configuration values, as produced by
`pipecal.pipecal_config.pipecal_config`.
update : bool, optional
If set, calibration keywords will be updated.
write_history : bool, optional
If set, and update is also True, a history message
will be added to the header.
Returns
-------
float, float
The calibration factor and its associated error.
"""
if not update:
write_history = False
if 'calfac' not in config:
if write_history:
msg1 = 'No reference flux calibration available'
try:
msg2 = 'for SPECTEL={}, ALTCFG1={}, ' \
'DATE={}. '.format(config['spectel'],
config['altcfg1'],
config['date'])
except KeyError:
msg2 = '. '
msg3 = 'Data is not calibrated.'
log.warning(msg1 + msg2 + msg3)
header['HISTORY'] = ' '
header['HISTORY'] = msg1
if len(msg2) > 2:
header['HISTORY'] = msg2
header['HISTORY'] = msg3
header['HISTORY'] = ' '
calfac, ecalfac = None, None
else:
f1 = config['refcal_file'].partition(config['caldata'])[-1]
calfac = config['calfac']
ecalfac = config['ecalfac']
# write relevant keys to header
if update:
add_calfac_keys(header, config)
if write_history:
header['HISTORY'] = ' '
header['HISTORY'] = 'Flux calibration information:'
header['HISTORY'] = ' Using reference file: {}'.format(f1)
header['HISTORY'] = ' Average reference calibration factor:'
header['HISTORY'] = ' {} +/- {}'.format(calfac, ecalfac)
header['HISTORY'] = 'Data has been divided by cal ' \
'factor to convert'
header['HISTORY'] = ' from {} to Jy'.format(config['runits'])
header['HISTORY'] = ' '
return calfac, ecalfac
[docs]
def apply_fluxcal(data, header, config,
variance=None, covariance=None, write_history=True):
"""
Apply a flux calibration factor to an image.
The image is calibrated as::
calibrated = flux / cal_factor
If provided, the variance and/or covariance are propagated
as well, as::
calibrated_variance = variance / cal_factor^2
calibrated_covariance = covariance / cal_factor^2
The provided header is also updated with calibration keys and,
optionally, a history message.
Parameters
----------
data : array
Image to calibrate.
header : `astropy.io.fits.header.Header`
FITS header to update.
config : dict-like
Calibration configuration values, as produced by
`pipecal.pipecal_config.pipecal_config`.
variance : array, optional
Variance image to propagate, if desired.
covariance : array, optional
Covariance image to propagate, if desired.
write_history : bool, optional
If set, a history message will be added to the header.
Returns
-------
array, or 2- or 3-length tuple
If only data is provided, it is returned as an array.
If variance is provided, the return value is (data, variance).
If covariance is provided, the return value is
(data, variance, covariance); if the variance was not provided,
it will be set to None.
"""
calfac, _ = get_fluxcal_factor(header, config, update=True,
write_history=write_history)
if calfac is None:
corrdata, corrvar, corrcovar = data, variance, covariance
else:
# apply factor to data
# do not propagate error on calfactor to variance
corrdata = data / calfac
if variance is not None:
corrvar = variance / calfac ** 2
else:
corrvar = None
if covariance is not None:
corrcovar = covariance / calfac ** 2
else:
corrcovar = None
if covariance is not None:
return corrdata, corrvar, corrcovar
elif variance is not None:
return corrdata, corrvar
else:
return corrdata
[docs]
def get_tellcor_factor(header, config, update=False, use_wv=False):
"""
Retrieve a telluric correction factor from configuration.
The correction factor is calculated from the ratio of the R
value at the reference ZA and altitude/PWV to the R value
at the observed ZA and altitude/PWV. R values are
calculated by `pipecal.pipecal_rratio.pipecal_rratio`.
The returned value is intended to be multiplied into the
data, in order to correct it to the reference values.
Observed values for ZA and altitude or PWV are read from the
FITS header.
Parameters
----------
header : `astropy.io.fits.header.Header`
The FITS header to update.
config : dict-like
Calibration configuration values, as produced by
`pipecal.pipecal_config.pipecal_config`.
update : bool, optional
If set, calibration keywords will be updated.
use_wv : bool, optional
If set, precipitable water vapor will be used as the
reference value, instead of altitude.
Returns
-------
float
The telluric correction factor.
"""
# get ALT, ZA, PWV from header
altwv = -9999.
if use_wv:
altwv = average_pwv(header)
if altwv < 0 or altwv > 50:
log.warning(f'Bad WV value: {altwv}')
log.warning('Using altitude instead.')
use_wv = False
else:
log.debug(f'Using WV value {altwv}')
if not use_wv:
altwv = average_alt(header)
log.debug(f'Using Alt value {altwv}')
za = average_za(header)
# reference files
f1 = config['rfitam_file'].partition(config['caldata'])[-1]
zaref = config['rfit_am']['zaref']
if use_wv:
# R ratio for precipitable water vapor (PWV) reference
altwv_str = 'PWV'
f2 = config['rfitpwv_file'].partition(config['caldata'])[-1]
altwvref = config['rfit_pwv']['altwvref']
rratio = pipecal_rratio(za, altwv, zaref, altwvref,
config['rfit_am']['coeff'],
config['rfit_pwv']['coeff'],
pwv=True)
ref_rratio = pipecal_rratio(zaref, altwvref, zaref, altwvref,
config['rfit_am']['coeff'],
config['rfit_pwv']['coeff'],
pwv=True)
else:
# R ratio for altitude reference
altwv_str = 'altitude'
f2 = config['rfitalt_file'].partition(config['caldata'])[-1]
altwvref = config['rfit_alt']['altwvref']
rratio = pipecal_rratio(za, altwv, zaref, altwvref,
config['rfit_am']['coeff'],
config['rfit_alt']['coeff'],
pwv=False)
ref_rratio = pipecal_rratio(zaref, altwvref, zaref, altwvref,
config['rfit_am']['coeff'],
config['rfit_alt']['coeff'],
pwv=False)
# correction factor to reference Alt/ZA
corr_fac = ref_rratio / rratio
# update header with keywords
if update:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
hdinsert(header, 'TELCORR', corr_fac,
'Telluric correction factor to ZA/{}'.format(altwv_str))
hdinsert(header, 'REFCALZA', zaref,
'Reference calibration zenith angle')
if use_wv:
hdinsert(header, 'REFCALWV', altwvref,
'Reference calibration PWV')
else:
hdinsert(header, 'REFCALAW', altwvref,
'Reference calibration altitude')
hdinsert(header, 'REFCALF1', f1,
'Calibration reference file')
hdinsert(header, 'REFCALF2', f2,
'Calibration reference file')
# update header with history
header['HISTORY'] = 'Using reference files:'
header['HISTORY'] = ' {}'.format(f1)
header['HISTORY'] = ' {}'.format(f2)
header['HISTORY'] = 'Reference ZA={}, ' \
'{}={}'.format(zaref, altwv_str, altwvref)
header['HISTORY'] = 'Telluric correction factor for ZA={:.2f}, ' \
'{}={}:'.format(za, altwv_str, altwv)
header['HISTORY'] = ' {}'.format(corr_fac)
return corr_fac
[docs]
def apply_tellcor(data, header, config, variance=None, covariance=None,
use_wv=False):
"""
Apply a telluric correction factor to an image.
The image is corrected as::
corrected = flux * tel_factor
If provided, the variance and/or covariance are propagated
as well, as::
corrected_variance = variance * tel_factor^2
corrected_covariance = covariance / tel_factor^2
The provided header is also updated with relevant keywords and
a history message.
Parameters
----------
data : array
Image to correct.
header : `astropy.io.fits.header.Header`
FITS header to update.
config : dict-like
Calibration configuration values, as produced by
`pipecal.pipecal_config.pipecal_config`.
variance : array, optional
Variance image to propagate, if desired.
covariance : array, optional
Covariance image to propagate, if desired.
use_wv : bool, optional
If set, precipitable water vapor will be used as the
reference value, instead of altitude.
Returns
-------
array, or 2- or 3-length tuple
If only data is provided, it is returned as an array.
If variance is provided, the return value is (data, variance).
If covariance is provided, the return value is
(data, variance, covariance); if the variance was not provided,
it will be set to None.
Raises
------
PipeCalError
If no valid telluric correction factor is found.
"""
# correction factor to reference Alt/ZA/WV
try:
corr_fac = get_tellcor_factor(header, config,
update=True, use_wv=use_wv)
except (KeyError, AttributeError, ValueError, TypeError) as err:
log.debug(str(err))
raise PipeCalError('Response data not found; cannot apply '
'telluric correction') from None
# correct data and variance
corrdata = data * corr_fac
if variance is not None:
corrvar = variance * corr_fac**2
else:
corrvar = None
if covariance is not None:
corrcovar = covariance * corr_fac**2
else:
corrcovar = None
if covariance is not None:
return corrdata, corrvar, corrcovar
elif variance is not None:
return corrdata, corrvar
else:
return corrdata
[docs]
def run_photometry(data, header, var, config, **kwargs):
"""
Run photometry on an image of a standard source.
Input images are assumed to be telluric-corrected
(e.g. by the apply_tellcor function).
If the image is uncalibrated, and a model flux is known for
the source, values from aperture photometry
will be used to attempt to calculate and record a reference
calibration factor. This value and its associated error
will be recorded in REFCALFC and REFCALER, respectively.
If the image is previously calibrated, and a model flux is
known, the values will be compared, and a log message will
display the percent difference from the model.
If a model flux is unknown, the source flux will be reported
and recorded in the FITS header, but no further calculations
will be performed.
Parameters
----------
data : array
Image to correct.
header : `astropy.io.fits.header.Header`
FITS header to update.
var : array
Variance image to propagate, if desired.
config : dict-like
Calibration configuration values, as produced by
`pipecal.pipecal_config.pipecal_config`.
kwargs : dict-like, optional
Extra parameters to provide to the photometry function
If not provided, but values are available in
`config`, the configuration values will be used as
the default. If parameters are still not present,
defaults defined by the pipecal_photometry function will
be used.
"""
# get provided parameters, or reasonable defaults
if 'srcpos' not in kwargs:
kwargs['srcpos'] = guess_source_position(header, data)
if 'aprad' not in kwargs:
if 'aprad' in config:
kwargs['aprad'] = config['aprad']
if 'skyrad' not in kwargs:
try:
kwargs['skyrad'] = [config['bgin'], config['bgout']]
except KeyError:
pass
if 'fwhm' not in kwargs:
if 'fwhm' in config:
kwargs['fwhm'] = config['fwhm']
if 'fitsize' not in kwargs:
if 'fitsize' in config:
kwargs['fitsize'] = config['fitsize']
if 'runits' not in kwargs:
if 'runits' in config:
kwargs['runits'] = config['runits']
phot = pipecal_photometry(data, var, **kwargs)
# update header
add_phot_keys(header, phot, config, kwargs['srcpos'])
if ('BUNIT' in header and 'jy' in
str(header['BUNIT']).lower().strip()):
calib = True
else:
calib = False
# log the position and flux
try:
log.info('Source: {}'.format(config['object']))
except KeyError:
pass
try:
log.info('Source Position (x,y): '
'{:.2f}, {:.2f}'.format(header['STCENTX'],
header['STCENTY']))
if calib:
log.info('Source Flux: '
'{:.2f} +/- {:.2f} Jy'.format(header['STAPFLX'],
header['STAPFLXE']))
else:
try:
runits = kwargs['runits']
except KeyError:
runits = 'counts'
log.info('Source Flux: '
'{:.2f} +/- {:.2f} {}'.format(header['STAPFLX'],
header['STAPFLXE'],
runits))
except KeyError:
log.warning('Photometry failed.')
return
# if model flux is available, calculate ref cal factor
if 'std_flux' not in config:
log.warning('No model flux available; not calculating '
'reference cal factor.')
else:
flux = header['STAPFLX']
flux_err = header['STAPFLXE']
# log the model flux
try:
modlflx = header['MODLFLX']
log.info('Model Flux: '
'{:.3f} +/- {:.3f} Jy'.format(modlflx,
header['MODLFLXE']))
if calib:
log.info('Percent difference: '
'{:.1f}%'.format(100 * (flux - modlflx) / modlflx))
except KeyError:
pass
try:
ref_calfac, ref_ecalfac = \
pipecal_calfac(flux, flux_err, config)
hdinsert(header, 'REFCALFC', ref_calfac,
'Reference calibration factor')
hdinsert(header, 'REFCALER', ref_ecalfac,
'Reference calibration factor error')
if not calib:
log.info('Calculated calibration factor: '
'{:.4f}'.format(ref_calfac))
except ValueError:
log.warning('Bad flux; not adding REFCALFC.')