Source code for sofia_redux.instruments.hawc.steps.stepbinpixels

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Optional pixel binning pipeline step."""

from astropy import log
from astropy.wcs import WCS
import numpy as np

from sofia_redux.instruments.hawc.stepparent import StepParent

__all__ = ['StepBinPixels']


[docs] class StepBinPixels(StepParent): """ Bin pixels to increase signal-to-noise. Input for this image must contain STOKES, ERROR, and COVAR images, as produced by the `sofia_redux.instruments.hawc.steps.StepStokes` pipeline step. The output DataFits has the same extensions as the input. """
[docs] def setup(self): """ Set parameters and metadata for the pipeline step. Output files have PRODTYPE = 'binpixels', and are named with the step abbreviation 'BIN'. Parameters defined for this step are: block_size : {1, 2, 4, 8} Bin size, in pixels. Must divide the 40x64 array evenly into square blocks. If set to 1, no binning will be performed. """ # Name of the pipeline reduction step self.name = 'binpixels' self.description = 'Bin Pixels' # Shortcut for pipeline reduction step and identifier for # saved file names. self.procname = 'bin' # Clear Parameter list self.paramlist = [] # Append parameters self.paramlist.append(['block_size', 1, 'Bin size, in pixels. ' 'Must divide 40x64 array evenly.'])
[docs] def run(self): """ Run the data reduction algorithm. Because this step is single-in, single-out (SISO), self.datain must be a DataFits object. The output is also a DataFits object, stored in self.dataout. The process is: 1. Block sum detector pixels with specified binning. 2. Propagate errors, covariances, and WCS keys. """ # copy datain to dataout self.dataout = self.datain.copy() # get data shape from first image ny, nx = self.dataout.image.shape # block size parameter nw = self.getarg('block_size') if nw <= 1: log.info('No binning performed.') return elif (nx % nw != 0) or (ny % nw != 0): msg = f'Block size {nw} does not divide data shape ' \ f'{ny},{nx} evenly.' log.error(msg) raise ValueError(msg) else: log.info(f'Binning in {nw}x{nw} blocks.') # copy datain to dataout self.dataout = self.datain.copy() # new array shape, to use for block summing new_shape = (ny // nw, nw, nx // nw, nw) # slice the header WCS to get new CDELT and CRPIX header = self.dataout.header hwcs = WCS(header) bin_wcs = hwcs[::nw, ::nw] updates = {'CRPIX1': bin_wcs.wcs.crpix[0], 'CRPIX2': bin_wcs.wcs.crpix[1], 'CDELT1': bin_wcs.wcs.cdelt[0], 'CDELT2': bin_wcs.wcs.cdelt[1], 'PIXSCAL': header['PIXSCAL'] * nw} self.dataout.header.update(updates) # also add the pixel binning to the header self.dataout.setheadval('PIXELBIN', nw, 'Pixel binning block size') # correct each flux extension for stokes in ['I', 'Q', 'U']: fname = f'STOKES {stokes}' ename = f'ERROR {stokes}' # handle non-polarimetry case if fname not in self.dataout.imgnames: continue flux = self.dataout.imageget(fname) var = self.dataout.imageget(ename) ** 2 # shape flux into blocks and count valid pixels in each block_flux = flux.reshape(new_shape) pix_count = np.sum(~np.isnan(block_flux), axis=(1, 3)) # sum over blocks bin_flux = np.nansum(block_flux, axis=(1, 3)) # all-NaN blocks should be NaN bin_flux[pix_count == 0] = np.nan # scale the rest for missing pixels bin_flux[pix_count != 0] *= nw**2 / pix_count[pix_count != 0] # assume var NaNs match flux NaNs bin_var = np.nansum(var.reshape(new_shape), axis=(1, 3)) bin_var[pix_count == 0] = np.nan bin_var[pix_count != 0] *= (nw**2 / pix_count[pix_count != 0])**2 self.dataout.imageset(bin_flux, fname) self.dataout.imageset(np.sqrt(bin_var), ename) # do the same for the covariance extensions for covar in ['Q I', 'U I', 'Q U']: cname = f'COVAR {covar}' if cname not in self.dataout.imgnames: continue cvar = self.dataout.imageget(cname) block_cvar = cvar.reshape(new_shape) pix_count = np.sum(~np.isnan(block_cvar), axis=(1, 3)) bin_cvar = np.nansum(block_cvar, axis=(1, 3)) bin_cvar[pix_count == 0] = np.nan bin_cvar[pix_count != 0] *= (nw**2 / pix_count[pix_count != 0])**2 self.dataout.imageset(bin_cvar, cname) # for the bad pixel mask, just average and floor the # input values bname = 'BAD PIXEL MASK' mask = self.dataout.imageget(bname) bin_mask = np.sum(mask.reshape(new_shape), axis=(1, 3)) self.dataout.imageset(bin_mask // nw**2, bname)