Source code for sofia_redux.calibration.pipecal_config

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Calibration configuration."""

import re
import os

import numpy as np
from astropy import log

from sofia_redux.calibration.pipecal_error import PipeCalError

__all__ = ['pipecal_config', 'read_respfile']


class ParseConfigError(PipeCalError):
    """Standard exception for reading response files."""
    def __init__(self, message):
        message = f'ERROR: Badly formatted response file: {message}'
        super().__init__(message)


[docs] def read_respfile(fname, spectel): """ Read response files. Response files contain the coefficients of polynomial fits to standard atmospheric models, folded through instrumental response models for a particular instrument and mode, at a range of wavelength band-passes (instrument filters). The format is assumed to be: - two lines beginning with '#' that contain reference values used for the response fit, e.g.:: # ALTMIN=35.0 ALTMAX=45.0 ALTREF=41.0 # ZAMIN=30.0 ZAMAX=70.0 ZAREF=45.0 - one line for each filter containing: - the filter reference wavelength - the filter name - a reference response value - any number of polynomial coefficients, beginning with the constant term The filter name is matched to the provided `spectel`, and the corresponding fit coefficients are returned, along with the reference values. Parameters ---------- fname : string Full path name of response file. spectel : string Name of filter used. Returns ------- resp_config : dictionary A dictionary with the details of the filter's response. Keys are: respref, altwvref, altwvrange, zaref, zarange, coeff. Raises ------ PipeCalError If errors are found while reading or parsing the file. """ # Read file try: respfile = open(fname, 'r') except IOError: message = f'Cannot open {fname}' log.error(message) raise PipeCalError(message) # First two lines contain alt/za information: # ALTMIN=35.0 ALTMAX=45.0 ALTREF=41.0 # ZAMIN=30.0 ZAMAX=70.0 ZAREF=45.0 line = respfile.readline().strip() try: values = [float(i.split('=')[-1]) for i in line.split() if i != '#'] if len(values) == 3: altrange = [values[0], values[1]] altref = values[2] else: raise ValueError('missing values') line = respfile.readline().strip() values = [float(i.split('=')[-1]) for i in line.split() if i != '#'] if len(values) == 3: zarange = [values[0], values[1]] zaref = values[2] else: raise ValueError('missing values') except ValueError: message = 'Could not read reference values from response file header' log.error(message) raise PipeCalError(message) # Read the rest of the file, stopping after finding matching spectel coeff = [] respref = None for line in respfile: # If line is empty or starts with #, skip it if line.strip() != '' and not line.startswith('#'): # Split the line by whitespace. # If there are less than 4 fields, raise an error splitline = line.split() if len(splitline) < 4: raise ParseConfigError(fname) # Pull out the second field, this is the filter name (string) # Check if it matches spectel lam = splitline[1].upper() if lam == spectel: # If a match, pull out the third field, this is the # response reference (float). It if isn't a float, # raise an error try: respref = float(splitline[2]) except ValueError: raise ParseConfigError(fname) # The rest of the line are a list of coefficients try: coeff = [float(i) for i in splitline[3:]] except ValueError: raise ParseConfigError(fname) # Found the first matching spectel, so break break respfile.close() if coeff: resp_config = {'respref': respref, 'altwvref': altref, 'altwvrange': altrange, 'zaref': zaref, 'zarange': zarange, 'coeff': coeff} else: resp_config = {} return resp_config
def _get_cal_path(instrument): """Helper function to retrieve the caldata path.""" # Directory 2 steps up from where code is located pkgpath = (os.path.dirname(os.path.dirname(os.path.realpath(__file__))) + os.path.sep) # Join in the instrument name caldata = os.path.join(pkgpath, 'calibration', 'data', instrument.lower(), '') return caldata
[docs] def pipecal_config(header): """ Parse all reference files and return appropriate configuration values. Parameters ---------- header : `astropy.io.fits.header.Header` or dict-like Header of FITS file. Returns ------- config : dictionary Key values pulled from the header and calibration files needed for photometry. Possible keys are: - *caldata* : Path to calibration package data directory - *date* : Observation date - *instrument* : Instrument used to take observation - *runits* : Raw (uncalibrated) units for the instrument - *spectel* : Filter name for observation - *altcfg1* : Alternate configuration (0, 1, 2, or 3). For FORCAST, altcfg1=1 means dual mode with Barr2 dichroic, 2 means dual with Barr3, and 0 means single mode. For FLITECAM, altcfg1=1 means FLIPO configuration, 0 means FLITECAM only. For HAWC+, altcfg1=0 is chop/nod with HWP, altcfg1=1 is chop/nod without HWP, altcfg1=2 is scan with HWP, altcfg1=3 is scan without HWP. - *object* : Object name - *filterdef_file* : File path to filter definition file - *wref* : Reference wavelength, as defined by filterdef_file - *lpivot* : Pivot wavelength, as defined by filterdef_file - *color_corr* : Color correction, as defined by filterdef_file - *aprad* : Aperture radius, as defined by filterdef_file - *bgin* : Background annulus inner radius, as defined by filterdef_file - *bgout* : Background annulus outer radius, as defined by filterdef_file - *fwhm* : FWHM, as defined by filterdef_file - *fitsize* : Subimage size for fits, as defined by filterdef_file - *refcal_file* : File path to reference cal factor file - *calfac* : Series average reference cal factor, as defined in refcal_file - *ecalfac* : Error on series average reference cal factor, as defined in refcal_file - *avgcal_file* : File path to average reference cal factor file - *avgcalfc* : Average reference cal factor, as defined in refcal_file - *avgcaler* : Error on average reference cal factor, as defined in refcal_file - *rfitam_file* : File path to response fit for airmass file - *rfit_am* : A dictionary containing airmass response fit coefficients and metadata, as read from rfitam_file by read_respfile - *rfitalt_file* : File path to response fit for altitude file - *rfit_alt* : A dictionary containing altitude response fit coefficients and metadata, as read from rfitalt_file by read_respfile - *rfitpwv_file* : File path to response fit for water vapor file - *rfit_pwv* : A dictionary containing water vapor response fit coefficients and metadata, as read from rfitam_file by read_respfile - *stdeflux_file* : File path to model error file - *std_eflux* : Percent error on the model of the standard observed, as read from stdeflux_file - *std_scale* : Scale factor to apply to the model of the standard observed, as read from stdeflux_file - *stdflux_file* : File path to standard model file - *std_flux* : Model flux of standard at observed wavelength Keywords are populated only if matching configuration data is found. In particular, standard model data is returned only if the observation object name matches a known standard, as defined in the stddefault.txt file in the data directory. """ # Config is the data structure that'll be filled and returned config = dict() # Read configuration from header try: instrument = header['INSTRUME'].strip().upper() spectel1 = header['SPECTEL1'].strip().upper() spectel2 = header['SPECTEL2'].strip().upper() instcfg = header['INSTCFG'].strip().upper() obj = header['OBJECT'].strip().upper().replace(' ', '') except KeyError: log.error('Missing required keywords for pipecal configuration.') return config # Set the caldata path by instrument caldata = _get_cal_path(instrument) # Store one directory up (the /data path) config['caldata'] = os.path.join( os.path.dirname(os.path.dirname(caldata)), '') # Read the observation date from the file date_str = header['DATE-OBS'].strip() date_arr = re.split(r'[-T]', date_str) dateobs = 99999999 if len(date_arr) > 2: date_join = ''.join(date_arr[:3]) try: dateobs = int(date_join) except ValueError: pass config['date'] = dateobs # Read the instrument configuration if instrument == 'FORCAST': config['runits'] = 'Me/s' detchan = str(header['DETCHAN']).strip().upper() dichroic = header['DICHROIC'].strip().upper() if detchan == '1' or detchan == 'LW': spectel = spectel2 else: spectel = spectel1 # Set ALTCFG1, based on INSTRCFG and DICRHOIC # 0 = single, 1 = Barr2, 2 = Barr3 if instcfg == 'IMAGING_DUAL': if re.match(r'.*BARR.*3.*', dichroic): altcfg1 = 2 else: altcfg1 = 1 else: altcfg1 = 0 elif instrument == 'HAWC_PLUS': config['runits'] = 'counts' spectel = spectel1 hwp = spectel2 instmode = str(header['INSTMODE']).strip().upper() # Set ALTCFG1, based on INSTMODE # 0 = chop/nod, 1 = scan if instmode == 'OTFMAP': if 'OPEN' in hwp or hwp == 'UNKNOWN': altcfg1 = 3 else: altcfg1 = 2 else: if 'OPEN' in hwp or hwp == 'UNKNOWN': altcfg1 = 1 else: altcfg1 = 0 elif instrument == 'FLITECAM': config['runits'] = 'ct/s' spectel = spectel1 # Read instrument config from MISSN-ID # altcfg1: 1 => FLIPO, 0 => just FLITECAM missnid = str(header['MISSN-ID']).strip().upper() if 'FP' in missnid: altcfg1 = 1 else: altcfg1 = 0 else: log.error('Unsupported instrument.') return config config['instrument'] = instrument config['spectel'] = spectel config['altcfg1'] = altcfg1 # Remove _ and - from object name obj = ''.join(re.split(r'[_-]', obj)).upper() config['object'] = obj # Read the calibration default table default_file = os.path.join(caldata, 'caldefault.txt') try: def_cal = np.genfromtxt(default_file, comments='#', dtype=np.str_, unpack=True) date, ac1, fdef, std_eflux, ref_calf, avg_calf, \ rfit_am, rfit_alt, rfit_pwv = def_cal except IOError: log.error('Calibration default file {} does ' 'not exist'.format(default_file)) return config except ValueError: log.error('Calibration default file {} is poorly ' 'formatted. Verify structure.'.format(default_file)) return config # set all wildcard values to current config date = date.astype(int) ac1[ac1 == '.'] = altcfg1 ac1 = ac1.astype(int) # Find the appropriate line in table idx = np.where((date >= dateobs) & (ac1 == altcfg1))[0] count = len(idx) if count == 0: log.error('No pipecal data found for date {}'.format(dateobs)) return config # Take the first applicable date idx = idx[0] # Read the filter definition file # Columns: spectel, altcfg1, lambda_mean, lamda_pivot, # color_correction, aperture radius, background # annulus inner radius, background annulus outer # radius # The color_correction, if not 1.0, is to correct for color leaks # in the filter fname = os.path.join(caldata, fdef[idx]) if fdef[idx] != '.' and os.path.isfile(fname): fname = os.path.abspath(fname) config['filterdef_file'] = fname with open(fname, 'r') as f: for line in f: if line.startswith('#'): continue lval = line.split() spec = lval[0] try: ac1 = int(lval[1]) except ValueError: ac1 = str(lval[1]).strip() if spec == spectel and (ac1 == '.' or ac1 == altcfg1): config['wref'] = float(lval[2]) config['lpivot'] = float(lval[3]) config['color_corr'] = float(lval[4]) config['aprad'] = float(lval[5]) config['bgin'] = float(lval[6]) config['bgout'] = float(lval[7]) config['fwhm'] = float(lval[8]) config['fitsize'] = int(lval[9]) break # Read reference cal factors fname = os.path.join(caldata, ref_calf[idx]) if ref_calf[idx] != '.' and os.path.isfile(fname): fname = os.path.abspath(fname) config['refcal_file'] = fname # Read the ref calfactor file # Columns: spectel, altcfg1, ref_calfac, ref_calfac_err try: spec, ac1, calf, ecalf = np.genfromtxt(fname, dtype=str, unpack=True) except (ValueError, TypeError): log.error('Reference calibration factor file {} is poorly ' 'formatted. Verify structure.'.format(fname)) return config spec = np.array([i.strip().upper() for i in spec]) ac1 = np.array([int(i) for i in ac1]) cidx = np.where((spec == spectel) & (ac1 == altcfg1))[0] if len(cidx) > 0: fields = 'calfac ecalfac'.split() cols = [calf, ecalf] for field, col in zip(fields, cols): try: config[field] = float(col[cidx[0]]) except ValueError: pass # Read average cal factors fname = os.path.join(caldata, avg_calf[idx]) if avg_calf[idx] != '.' and os.path.isfile(fname): fname = os.path.abspath(fname) config['avgcal_file'] = fname # Read the avg calfactor file # Columns: spectel, altcfg1, ref_calfac, ref_calfac_err try: spec, ac1, calf, ecalf = np.genfromtxt(fname, dtype=str, unpack=True) except (ValueError, TypeError): log.error('Reference calibration factor file {} is poorly ' 'formatted. Verify structure.'.format(fname)) return config spec = np.array([i.strip().upper() for i in spec]) ac1 = np.array([int(i) for i in ac1]) cidx = np.where((spec == spectel) & (ac1 == altcfg1))[0] if len(cidx) > 0: fields = ['avgcalfc', 'avgcaler'] cols = [calf, ecalf] for field, col in zip(fields, cols): try: config[field] = float(col[cidx[0]]) except ValueError: pass # Read response fit for airmass fname = os.path.join(caldata, rfit_am[idx]) if rfit_am[idx] != '.' and os.path.isfile(fname): fname = os.path.abspath(fname) config['rfitam_file'] = fname rfit_am = read_respfile(fname, spectel) if len(rfit_am) > 0: config['rfit_am'] = rfit_am # Read response fit for altitude fname = os.path.join(caldata, rfit_alt[idx]) if rfit_alt[idx] != '.' and os.path.isfile(fname): fname = os.path.abspath(fname) config['rfitalt_file'] = fname rfit_alt = read_respfile(fname, spectel) if len(rfit_alt) > 0: config['rfit_alt'] = rfit_alt # Read response fit for water vapor monitor fname = os.path.join(caldata, rfit_pwv[idx]) if rfit_pwv[idx] != '.' and os.path.isfile(fname): fname = os.path.abspath(fname) config['rfitpwv_file'] = fname rfit_pwv = read_respfile(fname, spectel) if len(rfit_pwv) > 0: config['rfit_pwv'] = rfit_pwv # Read standard flux error fname = os.path.join(caldata, std_eflux[idx]) if std_eflux[idx] != '.' and os.path.isfile(fname): fname = os.path.abspath(fname) config['stdeflux_file'] = fname # Read the model_error file # Columns: object_name, %model_error try: ob, merr, mscale = np.genfromtxt(fname, dtype=np.str_, unpack=True) except (ValueError, TypeError): log.error('Standard error file {} is poorly ' 'formatted. Verify structure.'.format(fname)) return config test = np.array([o.strip().upper() in obj for o in ob]) cidx = np.where(test)[0] if len(cidx) > 0: fields = 'std_eflux std_scale'.split() cols = [merr, mscale] for field, col in zip(fields, cols): try: config[field] = float(col[cidx[0]]) except ValueError: pass # Read the standard flux defaul table # Columns: date, altcfg1, object, flux_file default_file = os.path.join(caldata, 'stddefault.txt') try: date, ac1, ob, std_flux = np.genfromtxt( default_file, dtype=np.str_, unpack=True) except IOError: log.error('Standards default file {} does not ' 'exist'.format(default_file)) return config except ValueError: log.error('Standards default file {} is incorrectly ' 'formatted.'.format(default_file)) return config else: ac1[ac1 == '.'] = altcfg1 date = np.array([int(i) for i in date]) ac1 = np.array([int(i) for i in ac1]) # Find appropriate line in table test = np.array([o.strip().upper() in obj for o in ob]) idx = np.where((date >= dateobs) & (ac1 == altcfg1) & test)[0] if len(idx) == 0: return config # Take the first applicable date idx = idx[0] # Read the standard flux file fname = os.path.join(caldata, std_flux[idx]) if std_flux[idx] != '.' and os.path.isfile(fname) and \ 'wref' in config.keys(): fname = os.path.abspath(fname) config['stdflux_file'] = fname # Read the standard flux file # Columns: lambda_ref, lambda_mean, lambda_l, lambda_pivot, # lambda_eff, lambda_eff_ph, lambda_io, width, Response, # F_mean, Fnu_mean, Fnu_lammean, ColorTerm, ColorTerm, # Total Count Rate, Source Size, Source FWHM, Bkgd, # S/N, Fnu_limit, Filter # Only use mean wavelength and Fnu_mean # There are 6 rows of headers to skip try: lmean, fmean = np.genfromtxt(fname, usecols=(1, 10), unpack=True, skip_header=6) except ValueError: log.error('Standard flux file {} is poorly ' 'formatted. Verify structure.'.format(fname)) return config cidx = np.where(np.abs(lmean - config['wref']) < 1e-2)[0] if len(cidx) > 0: cidx = cidx[0] std_flux = float(fmean[cidx]) if np.isnan(std_flux): log.error('Bad standard flux value') else: if 'std_scale' in config.keys(): std_flux *= config['std_scale'] config['std_flux'] = std_flux else: log.warning('Standard flux not found for ' 'wavelength {}'.format(config['wref'])) return config