Source code for sofia_redux.instruments.hawc.steps.stepskycal

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Sky calibration pipeline step."""

import os

from astropy import log
from astropy.stats.sigma_clipping import sigma_clip
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
from matplotlib.figure import Figure
import numpy as np
import pandas as pd

from sofia_redux.instruments.hawc.datafits import DataFits
from sofia_redux.instruments.hawc.steploadaux import StepLoadAux
from sofia_redux.instruments.hawc.stepmiparent import StepMIParent


__all__ = ['StepSkycal']


[docs] class StepSkycal(StepMIParent, StepLoadAux): """ Generate a reference sky calibration file. This step divides scan mode sky flats by processed INT_CAL data, then combines and normalizes the sky flats to generate a master reference sky cal. """
[docs] def setup(self): """ Set parameters and metadata for the pipeline step. Output files have PRODTYPE = 'skycal', and are named with the step abbreviation 'SCL'. Parameters defined for this step are: normalize : bool If set, sky cals are divided by their median values to normalize them before sigma-clipping. sigma_lower : float Lower sigma value to use for clipping. sigma_upper : float Upper sigma value to use for clipping. ttor : float Scale factor for the T array to the R array scalfitkeys : list of str Keys that need to match between data file reference SCAL file, for comparison. binning : str Binning method for comparison histogram. dclfile : str File name glob to match DCL files. Default is 'intcals/*DCL*.fits' to match DCL files in a folder named intcals, in the same directory as the input file. dclfitkeys : list of str Keys that need to match between DCL and data file. """ # Name of the pipeline reduction step self.name = 'skycal' self.description = 'Make SkyCal' # Shortcut for pipeline reduction step and identifier for # saved file names. self.procname = 'scl' # Clear Parameter list self.paramlist = [] self.paramlist.append(['normalize', False, 'Normalize skycal for each array by ' 'its median value.']) self.paramlist.append(['sigma_lower', 3.0, 'Lower sigma value to use for clipping.']) self.paramlist.append(['sigma_upper', 3.0, 'Upper sigma value to use for clipping.']) self.paramlist.append(['ttor', 1.275, 'Scale factor for T/R flatfield']) self.paramlist.append(['bins', 'fd', 'Binning method for comparison histogram.']) self.paramlist.append(['scalfitkeys', ['SPECTEL1'], 'Match keys for SCAL comparison.']) self.paramlist.append(['pixfile', 'pixel*.dat', 'File name pattern for pixeldata files.']) self.paramlist.append(['pixfitkeys', [], 'Match keys for pixeldata files.']) self.paramlist.append(['ref_pixpath', '', 'Path to reference pixel files.']) self.paramlist.append(['ref_pixfile', [], 'Reference pixel files, one per band.']) # Get parameters for StepLoadAux, replace auxfile with dclfile self.loadauxsetup('dcl')
[docs] def loadauxname(self, auxpar='', data=None, multi=False): """ Search for files matching auxfile. Overrides the default function in order to make flat path relative to data output directory if necessary. Parameters ---------- auxpar : str, optional A name for the aux file parameter to use. This allows loadauxfiles to be used multiple times in a given pipe step (for example for darks and flats). Default value is self.auxpar which is set by loadauxsetup(). data : DataFits or DataText, optional A data object to match the auxiliary file to. If no data is specified, self.datain is used (for Multi Input steps self.datain[0]). multi : bool, optional If set, a list of file names is returned instead of a single file name. Returns ------- str or list of str The matching auxiliary file(s). """ # override loadauxname to make aux path relative to # data output directory if necessary # Set auxpar if len(auxpar) == 0: auxpar = self.auxpar # Get parameters auxfile = os.path.expandvars(self.getarg(auxpar + 'file')) if not os.path.isabs(auxfile): # if input folder is not an absolute path, make it # relative to the data location auxfile = os.path.join( os.path.dirname(self.datain[0].filename), auxfile) return self._loadauxnamefile(auxfile, auxpar, data, multi, backup=False)
[docs] def read_refpix(self): """ Read a reference pixel file from the parameters. The parameter is expected to be defined as a list, with one entry for each HAWC band. The correct value for the input data is selected from the list. Returns ------- pixfile : str Path to the reference file. """ pixfiles = self.getarg('ref_pixfile') pixpath = os.path.expandvars(self.getarg('ref_pixpath')) waveband = self.datain[0].getheadval('spectel1') bands = ['A', 'B', 'C', 'D', 'E'] try: idx = bands.index(waveband[-1]) except (ValueError, IndexError): # waveband not in list msg = f'Cannot parse waveband: {waveband}' log.error(msg) raise ValueError(msg) try: pixfile = os.path.expandvars(pixfiles[idx]) if pixfile == '': log.warning(f'No reference pixel file ' f'specified for {waveband}.') pixfile = None else: pixfile = os.path.join(pixpath, pixfile) if not os.path.isfile(pixfile): msg = f'Pixel file {pixfile} not found' log.error(msg) raise ValueError(msg) else: log.info(f'Found default pixel file: {pixfile}') except IndexError: log.warning(f'No reference pixel file specified for {waveband}.') pixfile = None return pixfile
@staticmethod def _log_stats(rgain, tgain, rbad, tbad): log.info('') log.info(" R Array Stats:") log.info(" Min : {:4.2f}".format(np.nanmin(rgain))) log.info(" Max : {:4.2f}".format(np.nanmax(rgain))) log.info(" Median : {:4.2f} \u00B1 {:4.2f}". format(np.nanmedian(rgain), np.nanstd(rgain))) log.info(" Bad Pixel : {:<6d}".format(np.sum(rbad == 1))) log.info(" T Array Stats:") log.info(" Min : {:4.2f}".format(np.nanmin(tgain))) log.info(" Max : {:4.2f}".format(np.nanmax(tgain))) log.info(" Median : {:4.2f} \u00B1 {:4.2f}". format(np.nanmedian(tgain), np.nanstd(tgain))) log.info(" Bad Pixel : {:<6d}".format(np.sum(tbad == 2))) log.info(" T/R Ratio: {:5.3f}".format( np.nanmedian(tgain) / np.nanmedian(rgain))) log.info('') def _make_flat_plot(self, auxname, rgain, tgain, basename): scal = DataFits(auxname) prgain = scal.imageget('R ARRAY GAIN') ptgain = scal.imageget('T ARRAY GAIN') try: prbad = scal.imageget('R BAD PIXEL MASK') except ValueError: # pragma: no cover prbad = np.zeros_like(prgain) try: ptbad = scal.imageget('T BAD PIXEL MASK') except ValueError: # pragma: no cover ptbad = np.zeros_like(ptgain) log.info('') log.info('Default flat statistics:') self._log_stats(prgain, ptgain, prbad, ptbad) fig = Figure() FigureCanvas(fig) bins = self.getarg('bins') ax = fig.add_subplot(2, 2, 1) ax.hist(prgain.flatten(), bins=bins, label='R Gain') ax.set_xlim([0, 2.0]) ax.legend() ax.set_title('Default %.2f $\\pm$ %.2f' % (float(np.nanmedian(prgain)), float(np.nanstd(prgain)))) ax = fig.add_subplot(2, 2, 2) ax.hist(ptgain.flatten(), bins=bins, label='T Gain') ax.set_xlim([0.5, 2.5]) ax.legend() ax.set_title('Default %.2f $\\pm$ %.2f' % (float(np.nanmedian(ptgain)), float(np.nanstd(ptgain)))) ax = fig.add_subplot(2, 2, 3) try: ax.hist(rgain.flatten(), bins=bins, label='R Gain') except ValueError: # pragma: no cover log.warning('Could not plot R histogram') ax.set_xlim([0, 2.0]) ax.legend() ax.set_title('Scan map %.2f $\\pm$ %.2f' % (float(np.nanmedian(rgain)), float(np.nanstd(rgain)))) ax = fig.add_subplot(2, 2, 4) try: ax.hist(tgain.flatten(), bins=bins, label='T Gain') except ValueError: # pragma: no cover log.warning('Could not plot T histogram') ax.set_xlim([0.5, 2.5]) ax.legend() ax.set_title('Scan map %.2f $\\pm$ %.2f' % (float(np.nanmedian(tgain)), float(np.nanstd(tgain)))) fig.tight_layout() # output file name outfile = os.path.splitext(basename)[0] + '_comparison.png' # save figure fig.savefig(outfile, dpi=200) fig.clear() log.info('Saved result %s' % outfile) return outfile @staticmethod def _average_pixel_data(pixfiles): f_tbl = {} f_cols = ['ch', 'gain', 'weight', 'flag', 'eff', 'Gmux1', 'Gmux2', 'idx', 'sub', 'row', 'col'] a_cols = ['ch', 'Gmux1', 'Gmux2', 'idx', 'sub', 'row', 'col'] new_df = {} for i in range(len(pixfiles)): f_id = os.path.splitext( os.path.basename(pixfiles[i]))[0].split('-')[-1] f_tbl[f_id] = pd.read_table(pixfiles[i], names=f_cols, comment='#') m_cols = ['ch', f'gain_{i}', f'weight_{i}', f'flag_{i}', f'eff_{i}', 'Gmux1', 'Gmux2', 'idx', 'sub', 'row', 'col'] if i == 0: new_df = pd.read_table(pixfiles[i], names=m_cols, comment='#') else: tmp_df = pd.read_table(pixfiles[i], names=m_cols, comment='#') new_df = pd.merge(new_df, tmp_df, how='outer', left_on=a_cols, right_on=a_cols).sort_values(['idx']) # keep gain, weight, and eff for averaging; # directly or-combine character flags gain = [] weight = [] eff = [] flags = None for key in new_df.keys(): if key.find('gain') != -1: gain.append(new_df[key]) elif key.find('weight') != -1: weight.append(new_df[key]) elif key.find('eff') != -1: eff.append(new_df[key]) elif key.find('flag') != -1: new_flags = list(new_df[key].fillna('-')) if flags is None: flags = new_flags else: # join unique characters for i in range(len(flags)): join_flags = set(flags[i]) | set(new_flags[i]) all_flags = ''.join(sorted(join_flags)) # should contain - only if there are no other flags if all_flags != '-': all_flags = all_flags.replace('-', '') flags[i] = all_flags new_df['avg_gain'] = np.nanmean(gain, axis=0) new_df['avg_weight'] = np.nanmean(weight, axis=0) new_df['avg_eff'] = np.nanmean(eff, axis=0) new_df['flag'] = flags n_cols = ['ch', 'avg_gain', 'avg_weight', 'flag', 'avg_eff', 'Gmux1', 'Gmux2', 'idx', 'sub', 'row', 'col'] new_df = new_df[n_cols] return f_tbl, new_df @staticmethod def _get_pix_header(pixfiles): hdr = [] scan_line = 3 for i, pixfile in enumerate(pixfiles): with open(pixfile, 'r') as fh: for line in fh.readlines(): if line.startswith('#'): if i == 0: hdr.append(line) if 'Scan' in line: # keep the index for the "Scan" line # from the first file if i == 0: scan_line = len(hdr) - 1 else: # for all other files, concat in the # scan number to this description nbr = line.split('-')[-1] hdr[scan_line] = hdr[scan_line].rstrip() \ + ',' + nbr return hdr @staticmethod def _gain_stat(tag, df, idx=None, key='gain'): gain = df[key] x_med = np.median(gain) x_std = gain.std() x_max = gain.max() x_min = gain.min() if idx is not None: gain = df[df.idx.isin(idx)][key] x_gain = sigma_clip(gain, sigma=3.0) log.info(f" {tag} Median: {x_med:.3f} \u00B1 {x_std:.3f}, " f"Max: {x_max:.3f}, Min: {x_min:.3f}") x_gs = {'ID': tag, 'Gain': x_gain, 'Median': x_med, 'StdDev': x_std, 'Max': x_max, 'Min': x_min} return x_gs def _plot_gains(self, data, average, default, basename): # first plot: histograms fig = Figure(figsize=(12, 8)) FigureCanvas(fig) fig.suptitle('Histogram of pixel*.dat file') # layout from number of files + average + default n = len(data) + 2 ncol = int(np.ceil(np.sqrt(n))) nrow = int(np.ceil(float(n) / ncol)) # put each data set in a new subplot ax0 = None bins = self.getarg('bins') for i, dataset in enumerate(data + [average, default]): # create an axis if i == 0: ax = fig.add_subplot(nrow, ncol, i + 1) ax0 = ax else: ax = fig.add_subplot(nrow, ncol, i + 1, sharex=ax0, sharey=ax0) # handle missing default if dataset is None: ax.text(0.5, 0.5, '(No default provided)', horizontalalignment='center', verticalalignment='center', transform=ax.transAxes) continue # histogram the gain and label with stats ax.hist(dataset['Gain'], bins=bins, label=f" {dataset['ID']}\n" f"{dataset['Median']:.3f} \u00B1 " f"{dataset['StdDev']:.3f}") ax.set(xlabel='Gain', ylabel='Counts') ax.legend() fig.tight_layout() # save figure outhist = os.path.splitext(basename)[0] + '_histogram.png' fig.savefig(outhist, dpi=200) log.info(f'Saved result {outhist}') # no second plot if default is None if default is None or len(default['Gain']) == 0: return outhist, # second plot: pixel-to-pixel comparison fig2 = Figure(figsize=(12, 8)) FigureCanvas(fig2) fig2.suptitle('Pixel-to-pixel comparison of pixel*.dat file') # layout from number of files + average ncol = 1 nrow = len(data) + 1 ax0 = None minval = np.nanmin(default['Gain']) maxval = np.nanmax(default['Gain']) for i, dataset in enumerate(data + [average]): if i == 0: ax = fig2.add_subplot(nrow, ncol, i + 1) ax0 = ax else: ax = fig2.add_subplot(nrow, ncol, i + 1, sharex=ax0, sharey=ax0) # plot individual and average pixel gains # against the default ax.plot(default['Gain'], dataset['Gain'], '.') ax.set(xlabel=default['ID'], ylabel=dataset['ID']) # plot default line ax.plot([minval, maxval], [minval, maxval], '-r') fig2.tight_layout() # save figure outpix = os.path.splitext(basename)[0] + '_pix2pix.png' fig2.savefig(outpix, dpi=200) log.info(f'Saved result {outpix}') return outhist, outpix @staticmethod def _write_pixfile(outfile, header, av_df): with open(outfile, 'w') as fh: for line in header: fh.write(line) # format data for text file av_df['avg_gain'] = av_df['avg_gain'].map('{:.3f}'.format) av_df['avg_weight'] = av_df['avg_weight'].map('{:.3E}'.format) av_df['avg_eff'] = av_df['avg_eff'].map('{:.3f}'.format) av_df['Gmux1'] = av_df['Gmux1'].map('{:.3f}'.format) av_df['Gmux2'] = av_df['Gmux2'].map('{:.3f}'.format) av_df.to_csv(outfile, sep='\t', index=None, header=0, mode='a') log.info(f'Saved result {outfile}')
[docs] def run(self): """ Run the data reduction algorithm. This step is run as a multiple-in single-out (MISO) step: self.datain should be a list of DataFits, and output will also be a single DataFits, stored in self.dataout. The process is to make a skyflat for the DRP pipeline: 1. Divide R and T gains by the INTCAL DCL files closest in time, to derive master sky gains. 2. Mean combine all sky gains and optionally normalize by the median value. 3. Sigma clip the gains arrays to identify additional bad pixels. 4. Scale the T array to the R array. 5. Compare the produced skyflat with the current default, saving a comparison image to disk with the filename as basename + '_comparison.png'. Then, to make a pixeldata (flat/bad pixel table) for the scan mode pipeline: 1. Read in pixeldata files. 2. Mean-combine gains for each pixel. 3. Or-combine flags for each pixel. 4. Generate plots comparing the individual and averaged pixel data files to an existing default file. Plot file names are basename + '_histogram.png' and basename + '_pix2pix.png'. 5. Save the combined table to disk, with the filename as basename + '.dat'. """ # loop over input data, dividing by intcal rgains = [] tgains = [] for data in self.datain: # get matching DCL, closest in time to the input file try: # this will log an error message if no DCL is found dcl = self.loadauxfile(data=data) except ValueError: dcl = None # get DCL images if dcl is not None: dcl_rgain = dcl.imageget('R ARRAY GAIN') dcl_tgain = dcl.imageget('T ARRAY GAIN') dcl_rbad = dcl.imageget('R BAD PIXEL MASK') dcl_tbad = dcl.imageget('T BAD PIXEL MASK') dcl_rgain[dcl_rbad != 0] = np.nan dcl_tgain[dcl_tbad != 0] = np.nan else: # allow flat to be saved, even if no DCL is # available dcl_rgain = 1.0 dcl_tgain = 1.0 # get sky flat images sky_rgain = data.imageget('R ARRAY GAIN') sky_tgain = data.imageget('T ARRAY GAIN') sky_rbad = data.imageget('R BAD PIXEL MASK') sky_tbad = data.imageget('T BAD PIXEL MASK') # set NaNs from bad masks sky_rgain[sky_rbad != 0] = np.nan sky_tgain[sky_tbad != 0] = np.nan # divide sky flat by intcal rgains.append(sky_rgain / dcl_rgain) tgains.append(sky_tgain / dcl_tgain) # mean combine the flats, allowing NaNs to propagate rgain = np.mean(rgains, axis=0) tgain = np.mean(tgains, axis=0) # normalize if desired if self.getarg('normalize'): log.info('') log.info('Normalizing gain arrays by median value.') log.info(f' R median: {np.nanmedian(rgain)}') log.info(f' T median: {np.nanmedian(tgain)}') rgain /= np.nanmedian(rgain) tgain /= np.nanmedian(tgain) # make bad pixel masks from the new gains rbad = np.zeros(rgain.shape, dtype=int) rbad[np.isnan(rgain)] = 1 tbad = np.zeros(tgain.shape, dtype=int) tbad[np.isnan(tgain)] = 2 # log flat statistics log.info('') log.info('Flat statistics before sigma clip:') self._log_stats(rgain, tgain, rbad, tbad) # sigma clip both arrays for additional bad pixels sigma_lower = self.getarg('sigma_lower') sigma_upper = self.getarg('sigma_upper') rgain = sigma_clip(rgain, masked=True, sigma_upper=sigma_upper, sigma_lower=sigma_lower, copy=False) rgain = rgain.filled(np.nan) tgain = sigma_clip(tgain, masked=True, sigma_upper=sigma_upper, sigma_lower=sigma_lower, copy=False) tgain = tgain.filled(np.nan) # propagate to bad mask rbad[np.isnan(rgain)] = 1 tbad[np.isnan(tgain)] = 2 # scale T to R ttor = self.getarg('ttor') tgain *= ttor * np.nanmedian(rgain) / np.nanmedian(tgain) # log flat statistics again log.info('Flat statistics after sigma clip and T scaling:') self._log_stats(rgain, tgain, rbad, tbad) # make output data by copying first input self.dataout = self.datain[0].copy() for data in self.datain[1:]: self.dataout.mergehead(data) # update gain and mask images self.dataout.imageset(rgain, 'R ARRAY GAIN') self.dataout.imageset(tgain, 'T ARRAY GAIN') self.dataout.imageset(rbad, 'R BAD PIXEL MASK') self.dataout.imageset(tbad, 'T BAD PIXEL MASK') # get the basename for side effect output files tmp = self.dataout.copy() self.updateheader(tmp) basename = tmp.filename # compare to existing pipeline flat log.info('') log.info('Comparison to default flat:') try: auxfile = os.path.expandvars( self.dataout.config['mkflat']['scalfile']) auxname = self._loadauxnamefile(auxfile, 'scal', self.dataout, False, backup=False) except (KeyError, ValueError): auxname = None log.warning('No default pipeline flat found; ' 'comparison plot will not be generated.') # make SCL comparison plot self.auxout = [] if auxname is not None: flat_plot = self._make_flat_plot(auxname, rgain, tgain, basename) self.auxout.append(flat_plot) # get pixeldata files for combination: any files that match the # 'pixfile' glob parameter log.info('') log.info('Get pixel data files:') try: pixfiles = self.loadauxname(auxpar='pix', data=None, multi=True) except ValueError: # just return if no pixeldata found log.warning("No pixel data files found; skipping pixel file " "combination.") return # also get a default pixel file, if available de_file = self.read_refpix() log.info('') # average all input pixel data files f_df, av_df = self._average_pixel_data(pixfiles) # compare to existing pixeldata if de_file is not None: col_names = ('ch', 'gain', 'weight', 'flag', 'eff', 'Gmux1', 'Gmux2', 'idx', 'sub', 'row', 'col') de_df = pd.read_table(de_file, names=col_names, comment='#') # get matching pixels for comparison purposes idx = set(de_df['idx']) for key in f_df: idx &= set(f_df[key]['idx']) else: de_df = None idx = None log.info('Pixel data stats:') log.info('') stats = [] for key in f_df: stats.append(self._gain_stat(f'File {key}', f_df[key], idx=idx)) log.info(' --') avg_stats = self._gain_stat('Average', av_df, idx=idx, key='avg_gain') if de_df is not None: default_stats = self._gain_stat('Default', de_df, idx=idx) else: default_stats = None log.info('') plots = self._plot_gains(stats, avg_stats, default_stats, basename) self.auxout.extend(list(plots)) # compile output header from input pixel files header = self._get_pix_header(pixfiles) # write output file outfile = os.path.splitext(basename)[0] + '.dat' self._write_pixfile(outfile, header, av_df) self.auxout.append(outfile) log.info('')