Source code for sofia_redux.calibration.standard_model.calibration_io

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Handle file I/O for model generation"""

import os
import numpy as np
import pandas as pd
import astropy.io.fits as pf

from sofia_redux.calibration.standard_model import thermast
from sofia_redux.calibration.standard_model import isophotal_wavelength as iso
from sofia_redux.calibration.pipecal_error import PipeCalError


[docs] def model_spectrum(infile, txt=False, dataframe=False, alpha=None, temp=None, wmin=40., wmax=300., df_index=0): """ Read in or generate model spectrum. Parameters ---------- infile : str, pandas.DataFrame Defines how to generate the model. It can be the name of a file (if model is in an ASCII format or FITS format), set to "Blackbody" to generate a blackbody model, set to "PowerLaw" to generate a power law mdodel, or it could be a straight pandas DataFrame. txt : bool If set, `infile` is the name of an ASCII formatted file. dataframe : bool If set, `infile` is a pandas DataFrame. alpha : float The power law index to use if `infile` is set to "PowerLaw". temp : float The blackbody temperature to use if `infile` is set to "Blackbody". wmin : float, optional Minimum wavelength of the spectrum. Defaults to 40 microns. wmax : float, optional Maximum wavelength of the spectrum. Defaults to 300 microns. Returns ------- wavelength : numpy.array Wavelengths in stellar spectrum. flux : numpy.array Flux at each wavelength in `wavelength`. power_law : bool A flag to indicate the model is a generated power law model. blackbody : bool A flag to indicate the model is a generated blackbody model. """ if txt: power_law = False blackbody = False wstar, fstar = read_text(infile) elif dataframe: power_law = False blackbody = False try: filename = infile['model_file'].iloc[df_index] except KeyError: raise PipeCalError('Input dataframe is improperly formatted.') wstar, fstar = read_text(filename) elif infile.lower() == 'powerlaw': if not alpha: raise PipeCalError('Power Law model requires an index alpha') else: power_law = True blackbody = False wstar, fstar = generate_power_law(alpha, wmin=wmin, wmax=wmax) elif infile.lower() == 'blackbody': if not temp: raise PipeCalError('Blackbody model requires an input temperature') else: power_law = False blackbody = True wstar, fstar = generate_blackbody(temp, wmin=wmin, wmax=wmax) else: power_law = False blackbody = False wstar, fstar = read_fits(infile) return wstar, fstar, power_law, blackbody
[docs] def read_text(filename): """ Read spectrum input file if formatted in plain text. Parameters ---------- filename : str Name of file to read. Returns ------- wavelength : numpy.array Wavelengths in stellar spectrum. flux : numpy.array Flux at each wavelength in `wavelength`. """ with open(filename, 'r') as f: com = f.readline()[0] wavelength, flux = np.loadtxt(filename, unpack=True, usecols=(0, 1), comments=com) return wavelength, flux
[docs] def generate_power_law(alpha, wmin=40., wmax=300.): """ Generate a power law input spectrum. Parameters ---------- alpha : float Power law index. wmin : float, optional Minimum wavelength of the spectrum. Defaults to 40 microns. wmax : float, optional Maximum wavelength of the spectrum. Defaults to 300 microns. Returns ------- wavelength : numpy.array Wavelengths in stellar spectrum. flux : numpy.array Flux at each wavelength in `wavelength`. """ c = 2.99792e14 # cm/s # cm2mum = 1e4 # Convert cm to microns wref = 24.0 fref = 1.0 # nuref = c * cm2mum / wref nuref = c / wref dw = 0.005 nw = int((wmax - wmin) / dw) + 1 wstar = np.arange(nw) * dw + wmin nuarr = c / wstar fstar = fref * (nuarr / nuref) ** alpha return wstar, fstar
[docs] def generate_blackbody(temp, wmin=40., wmax=300.): """ Generate a simple blackbody spectrum. Parameters ---------- temp : float Temperature of the asteroid. wmin : float, optional Minimum wavelength of the spectrum. Defaults to 40 microns. wmax : float, optional Maximum wavelength of the spectrum. Defaults to 300 microns. Returns ------- wavelength : numpy.array Wavelengths in stellar spectrum. flux : numpy.array Flux at each wavelength in `wavelength`. """ c = 2.99792e14 # um/s Jy2W = 1e-26 # Convert Jy to W/m2/Hz dw = 0.005 nw = int((wmax - wmin) / dw) + 1 wstar = np.arange(nw) * dw + wmin fstar = np.pi * thermast.planck_function(wstar, temp) \ * wstar ** 2 / (Jy2W * c) return wstar, fstar
[docs] def read_fits(infile): """ Read spectrum from a FITS file. Parameters ---------- infile : str Name of FITS file to read. Returns ------- wavelength : numpy.array Wavelengths in stellar spectrum. flux : numpy.array Flux at each wavelength in `wavelength`. """ hdul = pf.open(infile) data = hdul[0].data[0] wstar = data[0, :] fstar = data[1, :] return wstar, fstar
[docs] def read_atran(atmofile, ws, no_atm=False, wmin=40., wmax=300.): """ Read in atmospheric transmission model. Parameters ---------- atmofile : str Name of the ATRAN file. ws : numpy.array Wavelengths of spectrum. no_atm : bool If set, do not read in a ATRAN file. wmin : float, optional Minimum wavelength of spectrum to use. Defaults to 40 microns. wmax : float, optional Maximum wavelength of spectrum to use. Defaults to 300 microns. Returns ------- wa : numpy.array Wavelengths of atmospheric transmission spectrum in microns. ta : numpy.array Transmission at each wavelength in `wa`. afile : str Full path of ATRAN file read in. """ atran_location = '/dps/calibrations/ATRAN/fits/' local_apath = './atranfiles/fits/' if no_atm: wa = ws ta = np.ones_like(wa) afile = None else: if not os.path.isfile(atmofile): # pragma: no cover if os.sep not in atmofile: afile = os.path.join(atran_location, atmofile) if not os.path.isfile(afile): print(f'Cannot find ATRAN file {atmofile} in default ' f'location {atran_location}.\nUsing local copy.') afile = os.path.join(local_apath, atmofile) else: afile = os.path.join(os.getcwd(), atmofile) else: afile = os.path.join(os.getcwd(), atmofile) hdul_a = pf.open(afile) watm = hdul_a[0].data[0] tatm = hdul_a[0].data[1] indx = (watm >= wmin) & (watm <= wmax) wa = watm[indx] ta = tatm[indx] return wa, ta, afile
[docs] def calibration_data_path(): """ Location of local calibration data. Returns ------- caldata : str Full path to local data. """ pkgpath = (os.path.dirname( os.path.dirname(os.path.realpath(__file__))) + os.path.sep) caldata = os.path.join(*[pkgpath, 'data', 'models']) return caldata
[docs] def open_outfile_and_header(outfile, no_atm=False, afile=None, infile=None, index=0): """ Open the outfile and print all headers. Parameters ---------- outfile : str, None Filename of ouptut file. If None, pull outfile from `infile` or use a default filename. no_atm : bool If set, no ATRAN file was read in. afile : str Name of ARAN file read in. Returns ------- outf : file Handler pointing to open output file. """ if no_atm: print('\nNo atmosphere') else: print(f'\nUsing ATRAN: {afile}') print(' lambda_ref lambda_mean lambda_1 lambda_pivot' ' lambda_eff lambda_eff_jv lambda_iso ' 'width Response F_mean Fnu_mean ColorTerm ' 'ColorTerm Source_Rate Source_Size Source_FWHM' 'Bkgd_Power NEP NEFD MDCF' ' Npix Filter') print(' microns microns microns microns ' 'microns microns microns microns' ' W/mJy W/m^2/mum Jy ' 'Watts pix arcsec Watts ' 'W/sqrt(Hz) Jy/sqrt(Hz) mJy') if outfile is None: if isinstance(infile, pd.DataFrame): outfile = infile['cal_file'][index] else: outfile = 'flux_values.out' outf = open(outfile, 'w') outf.write(f'{outfile}\n') if no_atm: outf.write('No atmosphere\n\n') else: outf.write(f'{os.path.basename(afile)}\n\n') outf.write('lambda_ref lambda_mean lambda_1 lambda_pivot ' 'lambda_eff lambda_eff_jv lambda_iso ' 'width Response F_mean Fnu_mean ' 'ColorTerm ColorTerm Source_Rate Source_Size ' 'Source_FWHM Bkgd_Power NEP NEFD ' 'MDCF Npix Filter\n') outf.write('microns microns microns microns ' 'microns microns microns ' 'microns W/mJy W/m^2/mum ' 'Jy Watts ' 'pix arcsec ' 'Watts W/sqrt(Hz) Jy/sqrt(Hz) mJy\n\n') return outf
[docs] def report_result(result, filter_name, outf): """ Format the result of the calibration and report it. Parameters ---------- result : dict Collection of calibration results. filter_name : str Name of the current filter. outf : IOStream Open output file to write to. Returns ------- None """ s = (f"{result['lambda_c']:.5e} {result['lambda_mean']:.5e} " f"{result['lambda_1']:.5e} " f"{result['lambda_pivot']:.5e} " f"{result['lambda_eff']:.5e} " f"{result['lambda_eff_jv']:.5e} {result['isophotal_wt']:.5e} " f"{result['width']:.5e} {result['response']:.5e} " f"{result['flux_mean']:.5e} {result['flux_nu_mean']:.5e} " f"{result['color_term_k0']:.5e} " f"{result['color_term_k1']:.5e} {result['source_rate']:.5e} " f"{result['source_size']:.5e} " f"{result['source_fwhm']:.5e} {result['background_power']:.5e} " f"{result['nep']:.5e} " f"{result['nefd']:.5e} {result['mdcf']:.5e} " f"{result['npix_mean']:.5e} {result['lambda_prime']:.5e} " f"{result['lamcorr']:.5e} " f"{os.path.basename(filter_name)}") print(s) outf.write(s + '\n')
[docs] def plot_spectrum(model_wave, model_flux, power_law, blackbody, isophotal_weight, calibration_results, outfile=None): """ Generate a plot of the final spectrum. Parameters ---------- ws : numpy.array Wavelength of spectrum. fs : numpy.array Flux at each wavelength in `ws`. power_law : bool If set, spectrum was generated from a power law. blackbody : bool If set, spectrumw as generated from an ideal blackbody. lam_iso_wt : numpy.array ISO weighted wavelength in each filter. Nf : int Number of filters. outfile : str Name of the output file, that the file the plots is saved to is based on. Returns ------- None """ from matplotlib.backends.backend_agg \ import FigureCanvasAgg as FigureCanvas from matplotlib.figure import Figure print('\nPlotting isophotal wavelengths') fig = Figure(figsize=(10, 10)) FigureCanvas(fig) ax = fig.add_subplot(1, 1, 1) fmin = np.min(model_flux) fmax = np.max(model_flux) f_mean = calibration_results['flux_mean'] isophotal_weight = calibration_results['isophotal_wt'] if power_law == 1 or blackbody == 1: ax.plot(model_wave, model_flux) else: ax.step(model_wave, model_flux, where='mid') ax.plot(isophotal_weight, f_mean, color='red', linestyle='--') print('Lambda_iso <F_lambda> F_lambda(lambda_iso)') for i in range(len(isophotal_weight)): ax.scatter(isophotal_weight.iloc[i], f_mean.iloc[i], marker='d', color='k') fiso = iso.interpol(model_flux, model_wave, isophotal_weight.iloc[i]) print(f'{isophotal_weight.iloc[i]:.5e}\t{f_mean.iloc[i]:.5e}\t{fiso:.5e}') ax.set_ylabel([fmin, fmax]) ax.set_yscale('log') ax.set_xlabel('Wavelength (micron)') ax.set_ylabel('Flux (W/m2/micron)') if outfile is None: plotname = 'spectrum.png' else: plotname = '.'.join(outfile.split('.')[:-1]) + '.png' fig.savefig(plotname, bbox_inches='tight', dpi=300) print(f'Plotting to {plotname}')
[docs] def unique_wavelengths(wavelengths, flux, wmin=40., wmax=300.): """ Select out a window of unique wavelengths. Parameters ---------- wavelengths : numpy.array Full list of wavelengths in microns. flux : numpy.array Flux at each wavelength in `wavelengths`. wmin : float, optional Minimum wavelength to allow. Defaults to 40 microns. wmax :float, optional Maximum wavelength to allow. Defaults to 300 microns. Returns ------- ws : numpy.array Unique values of `wavelengths` between `wmin` and `wmax`. fs : numpy.array Flux at each wavelength in `ws`. """ wsin, indicies = np.unique(wavelengths, return_index=True) fsin = flux[indicies] indx = (wsin >= wmin) & (wsin <= wmax) ws = wsin[indx] fs = fsin[indx] return ws, fs