Source code for sofia_redux.instruments.hawc.steps.stepscanmappol

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Scanning polarimetry image reconstruction pipeline step."""

from datetime import datetime
import os
import warnings

from astropy import log
from astropy.io import fits
from astropy.wcs import WCS
import numpy as np

from sofia_redux.instruments.hawc.datafits import DataFits
from sofia_redux.instruments.hawc.stepmoparent import StepMOParent
from sofia_redux.instruments.hawc.steps.stepscanmap import StepScanMap
from sofia_redux.scan.reduction.reduction import Reduction

__all__ = ['StepScanMapPol']


[docs] class StepScanMapPol(StepMOParent): """ Reconstruct an image from scanning polarimetry data. This step requires that scanning polarimetry data are taken with four HWP angles, one per input file. Sets of data are identified by the SCRIPTID keyword, and are passed to the scan map algorithm one group at a time. R0 and T0 output maps are created for each input file to. This step assembles them into a single file with R0 and T0 extensions (DATA, ERROR, and EXPOSURE) for each HWP angle. One output file is produced for each set of 4 HWP angles. """
[docs] def setup(self): """ Set parameters and metadata for the pipeline step. Output files have PRODTYPE = 'scanmappol', and are named with the step abbreviation 'SMP'. Parameters defined for this step are: vpa_tol : float If differences between telescope angles (VPA) are more than this value, this step will issue a warning. use_frames : str Frames to use from the reduction. Specify a particular range, as '400:-400', or '400:1000'. grid : float Output pixel scale. If not set, default values from scan map configuration will be used. deep : bool If set, faint point-like emission is prioritized. faint : bool If set, faint emission (point-like or extended) is prioritized. extended : bool If set, extended emission is prioritized. This may increase noise on large angular scales. options : str Additional options to pass to the scan map algorithm. """ # Name of the pipeline reduction step self.name = 'scanmappol' self.description = 'Construct Scan Map' # Shortcut for pipeline reduction step and identifier for # saved file names. self.procname = 'smp' # Clear Parameter list self.paramlist = [] # Append parameters self.paramlist.append(['save_intermediate', False, 'Save individual scanmap frames']) self.paramlist.append(['vpa_tol', 5.0, 'Tolerance for matching VPA angle']) self.paramlist.append(['use_frames', '', "Frames to use from reduction. " "Specify a particular range, as " "'400:-400', or '400:1000'."]) self.paramlist.append(['grid', '', "Output pixel scale, if not default. " "Specify in arcsec."]) self.paramlist.append(['deep', False, 'Attempt to recover faint point-like ' 'emission']) self.paramlist.append(['faint', False, 'Attempt to recover faint emission ' '(point-like or extended)']) self.paramlist.append(['extended', False, 'Attempt to recover extended emission ' '(may increase noise)']) self.paramlist.append(['options', '', 'Additional options for scan reconstruction'])
[docs] def assemble_scanpol(self, dataout, basehead): """ Assemble scan maps into a single file. HWP angles are determined from the HWPINIT key in the output file headers. If 4 unique angles with two subarray images each are not found, an error is raised. Parameters ---------- dataout : `list` of DataFits Output scan maps to assemble. basehead : fits.Header Baseline header to update for output file. Returns ------- DataFits The assembled data file. Raises ------ ValueError If output is missing keywords HWPINIT, SUBARRAY, or TELVPA, or if 4 unique HWP angles are not found, or if the wrong number of subarray images are found (should be one R and one T). """ errmsg = 'Unexpected scan map output for SCANPOL mode. ' \ 'Check SUBARRAY, HWPINIT, SCRIPTID, TELVPA.' # sort by HWP and subarray hwp = {} dates = [] actual_hwp = {} actual_vpa = {} for outfile in dataout: # get the date, for determining header order try: dateobs_str = outfile.getheadval('DATE-OBS') dateobs = datetime.strptime(dateobs_str.split('.')[0], "%Y-%m-%dT%H:%M:%S") dates.append(dateobs) except (KeyError, ValueError): dates.append(datetime.now()) try: hwpinit = float(outfile.getheadval('HWPINIT')) subarray = outfile.getheadval('SUBARRAY') telvpa = float(outfile.getheadval('TELVPA')) except KeyError: log.error('Scan map output missing required keywords: ' 'HWPINIT, SUBARRAY, or TELVPA') raise ValueError(errmsg) hwpround = int(np.round(hwpinit / 10, decimals=0) * 10) # telvpa + 180 for storage rotvpa = telvpa + 180. if rotvpa > 360: rotvpa = rotvpa - 360. if hwpround in hwp: if subarray in hwp[hwpround]: log.error('Too many subarray={} images for HWP ' 'angle={}.'.format(subarray, hwpround)) raise ValueError(errmsg) hwp[hwpround][subarray] = outfile actual_hwp[hwpround].append(hwpinit) actual_vpa[hwpround].append(rotvpa) else: hwp[hwpround] = {subarray: outfile} actual_hwp[hwpround] = [hwpinit] actual_vpa[hwpround] = [rotvpa] # check that there are exactly 4 angles # (NOTE: DRP supports multiples of 4, but we don't currently # use more than 4, so keep it simple here.) hwps = sorted(hwp.keys()) nhwp = len(hwps) if nhwp != 4: log.error('Must be exactly 4 HWP ' 'angles. Found: {}.'.format(hwps)) raise ValueError(errmsg) # now store R and T at each angle df = DataFits(config=self.config) # merge in all the input headers sort_order = np.argsort(dates) log.debug(f'Setting header from ' f'{dataout[sort_order[0]].filename}') df.setheader(dataout[sort_order[0]].header) exptime = df.getheadval('EXPTIME') ref_arr = df.getheadval('SUBARRAY') for idx in sort_order[1:]: outdf = dataout[idx] # for time accounting, sum exposure time only over one # simultaneously observed subarray if outdf.getheadval('SUBARRAY') == ref_arr: exptime += outdf.getheadval('EXPTIME') log.debug('Merging header for ' '{}'.format(outdf.filename)) df.mergehead(outdf) # set the summed exptime df.setheadval('EXPTIME', exptime, 'Total on-source exposure time [s]') # add a few keywords needed for polarimetry processing df.setheadval('NHWP', nhwp, 'Number of HWP Angles') df.setheadval('PIXSCAL', df.getheadval('CDELT2') * 3600., 'Pixel scale [arcsec]') wcs0 = {} for i, hwp_angle in enumerate(hwps): if 'R0' not in hwp[hwp_angle]: log.error('Missing R0 at HWP angle {}'.format(hwp_angle)) raise ValueError(errmsg) if 'T0' not in hwp[hwp_angle]: log.error('Missing T0 at HWP angle {}'.format(hwp_angle)) raise ValueError(errmsg) rfile = hwp[hwp_angle]['R0'] rdata = rfile.imageget('PRIMARY IMAGE') rerr = rfile.imageget('NOISE') rexp = rfile.imageget('EXPOSURE') rwcs = WCS(rfile.header) tfile = hwp[hwp_angle]['T0'] tdata = tfile.imageget('PRIMARY IMAGE') terr = tfile.imageget('NOISE') texp = tfile.imageget('EXPOSURE') twcs = WCS(tfile.header) # divide by 2 for normalization with # standard scan map calibration factors rdata /= 2.0 rerr /= 2.0 tdata /= 2.0 terr /= 2.0 # set image extensions, with appropriate WCS keys rd_ext = 'DATA R HWP%d' % i td_ext = 'DATA T HWP%d' % i re_ext = 'ERROR R HWP%d' % i te_ext = 'ERROR T HWP%d' % i rx_ext = 'EXPOSURE R HWP%d' % i tx_ext = 'EXPOSURE T HWP%d' % i if i == 0: # don't overwrite primary header df.imageset(rdata, rd_ext) wcs0 = rwcs.to_header() else: df.imageset(rdata, rd_ext, rwcs.to_header()) df.imageset(tdata, td_ext, twcs.to_header()) df.imageset(rerr, re_ext, rwcs.to_header()) df.imageset(terr, te_ext, twcs.to_header()) df.imageset(rexp, rx_ext, rwcs.to_header()) df.imageset(texp, tx_ext, twcs.to_header()) # set a few more header values as needed d_ext = [rd_ext, td_ext] e_ext = [re_ext, te_ext] x_ext = [rx_ext, tx_ext] r_ext = [rd_ext, re_ext, rx_ext] t_ext = [td_ext, te_ext, tx_ext] # set BUNIT in data and error extensions (usually counts) try: bunit = rfile.header['BUNIT'] except KeyError: log.warning('BUNIT not found in scan map output') bunit = 'UNKNOWN' for x in d_ext + e_ext: df.setheadval('BUNIT', bunit, comment='Data units', dataname=x) # set BUNIT in exposure extensions (seconds) for x in x_ext: df.setheadval('BUNIT', 's', comment='Data units', dataname=x) # set HWP angle in all extensions for x in r_ext + t_ext: hwpinit = float(np.mean(actual_hwp[hwp_angle])) df.setheadval('HWPINIT', hwpinit, comment='HWP angle [deg]', dataname=x) # set SUBARRAY=R0 in R extensions for x in r_ext: df.setheadval('SUBARRAY', 'R0', comment='Subarrays in image', dataname=x) # set SUBARRAY=T0 in T extensions for x in t_ext: df.setheadval('SUBARRAY', 'T0', comment='Subarrays in image', dataname=x) # set average VPA angle for x in r_ext + t_ext: vpos = float(np.mean(actual_vpa[hwp_angle])) df.setheadval('VPOS_ANG', vpos, comment='Average Array VPA [deg]', dataname=x) # Copy and save headers scnhead = df.header.copy() df.header = basehead.copy() # Merge scanmap headers StepScanMap.merge_scan_hdr(df, scnhead, self.datain) # Update primary header with proper WCS for key, value in wcs0.items(): df.setheadval(key, value) # Update SOFIA mandated keywords (since this is first pipe step) obsid = 'P_' + basehead['OBS_ID'] df.setheadval('OBS_ID', obsid) df.setheadval('PIPELINE', 'HAWC_DRP') return df
def _run_scanmap(self, argset): """ Run a scan reduction on a data set. Helper function to create a map from a group and assemble the output. Parameters ---------- argset : tuple This tuple should have 4 elements: a list of strings, a dict, a fits.Header, and a string. These are the input files, the keyword parameters, a baseline FITS header, and the group identification string. Returns ------- df : DataFits The output data object. group : str The group identifier. """ infiles, kwargs, basehead, group = argset # log input log.debug(f'All provided options: {kwargs}') log.debug(f'Input files: {infiles}') # run map reduction with warnings.catch_warnings(): warnings.simplefilter('ignore', RuntimeWarning) reduction = Reduction('hawc_plus') output_list = reduction.run(infiles, **kwargs) # read output file(s) if (output_list is None) or (None in output_list): log.error('No output created.') return None, group elif isinstance(output_list, fits.HDUList): log.error('Unexpected output for scan pol mode. ' 'Check INSTCFG.') return None, group dataout = [] for hdul in output_list: df = DataFits(config=self.config) df.load(hdul=hdul) dataout.append(df) try: df = self.assemble_scanpol(dataout, basehead) except ValueError as err: log.error('Encountered error in assembly:') log.error(err) df = None return df, group
[docs] def run(self): """ Run the data reduction algorithm. This step is run as a multi-in multi-out (MIMO) step: self.datain should be a list of DataFits, and output will also be a list of DataFits, stored in self.dataout. The process is: 1. Group input data by the header keyword SCRIPTID. 2. Assemble the scan map options from input parameters. 3. Call the iterative scan map reconstructor. Internally, the process will run multiple times, with appropriate arguments for each HWP angle. It will create multiple output maps: one for each of the R0 and T0 subarrays at each HWP angle. For the standard four-angle case, 8 maps are created. 4. Assemble the output maps into a single output file per group, with a separate extension for flux, error, and exposure at each HWP angle and subarray. 5. Merge header keywords from the input FITS files to generate an appropriate output header for each output file. """ vtol = self.getarg('vpa_tol') # group data by scriptid groups = {} vpa = {} fnum = {} for df in self.datain: try: scriptid = df.getheadval('SCRIPTID') except KeyError: scriptid = 'UNKNOWN' if scriptid in groups: groups[scriptid].append(df) vpa[scriptid].append(df.getheadval('TELVPA')) if df.filenum is not None: fnum[scriptid].append(df.filenum) else: fnum[scriptid].append('UNKNOWN') else: groups[scriptid] = [df] vpa[scriptid] = [df.getheadval('TELVPA')] if df.filenum is not None: fnum[scriptid] = [df.filenum] else: fnum[scriptid] = ['UNKNOWN'] # if scriptids are all different (old data), just run # the data together counts = np.array([len(groups[s]) for s in groups]) if len(self.datain) > 1 and not np.any(counts > 1): log.warning('No matching SCRIPTIDs. ' 'Running all data together.') groups = {'ALL': self.datain} vpa = {'ALL': [v[0] for v in vpa.values()]} fnum = {'ALL': [v[0] for v in fnum.values()]} # collect input options in dict kwargs = {} options = {} if not self.getarg('save_intermediate'): kwargs['write'] = {'source': False} # output path outpath = os.path.dirname(self.datain[0].filename) kwargs['outpath'] = outpath # add scanpol defaults kwargs['scanpol'] = True # add additional top-level parameters for arg in ['deep', 'faint', 'extended']: if self.getarg(arg): kwargs[arg] = True # add frame clipping if necessary use_frames = str(self.getarg('use_frames')).strip() use_frames = StepScanMap.check_use_frames(self.datain, use_frames) if use_frames != '': kwargs['frames'] = use_frames # set the output pixel scale if supplied try: grid = float(str(self.getarg('grid')).strip()) except (ValueError, TypeError): grid = None if grid is not None: kwargs['grid'] = grid # add additional options from parameters at end, # so they can override any defaults set by the above additional = str(self.getarg('options')).strip() if additional != '': all_val = additional.split() for val in all_val: try: k, v = val.split('=') except (IndexError, ValueError, TypeError): pass else: options[k] = v kwargs['options'] = options # loop over groups, reducing together self.dataout = [] for i, group in enumerate(groups): kw = kwargs.copy() log.info('Group {}/{}: ' 'SCRIPTID = {}'.format(i + 1, len(groups), group)) log.info(' Mean, std VPA: ' '{:.2f} +/- {:.2f}'.format(np.mean(vpa[group]), np.std(vpa[group]))) # warn if vpa is outside tolerance if np.any(np.array(vpa[group]) - np.min(vpa[group]) > vtol): log.warning('VPA is outside tolerance for ' 'group {}: {}'.format(i + 1, vpa[group])) log.warning('Should be within {} degrees'.format(vtol)) # output base name and header d1 = groups[group][0] basehead = d1.header outname = os.path.basename(d1.filenamebegin + self.procname.upper() + d1.filenameend) if not self.getarg('save_intermediate'): kw['name'] = outname # add input file names infiles = [] for datain in groups[group]: # get filename if os.path.exists(datain.filename): fname = datain.filename else: rawname = datain.rawname if os.path.exists(rawname): fname = rawname else: msg = 'File {} not found'.format(datain.filename) log.error(msg) raise ValueError(msg) infiles.append(fname) # argument set to run the reduction on the group argset = (infiles, kw, basehead, group) # run the group df, group = self._run_scanmap(argset) # check for errored reduction if df is None: msg = 'Problem in assembling scanpol ' \ 'data for group {}'.format(group) log.error(msg) log.warning('Dropping files from reduction:') for bad_df in groups[group]: log.warning(' {}'.format( os.path.basename(bad_df.filename))) continue # update header, filename with MISO-style handling # for file numbers df.filename = groups[group][0].filename self.filenum = fnum[group] self.iomode = 'MISO' self.updateheader(df) self.iomode = 'MIMO' self.dataout.append(df)