Source code for sofia_redux.instruments.hawc.steps.stepshift

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Array alignment pipeline step."""

from astropy import log
import numpy as np

from sofia_redux.instruments.hawc.stepparent import StepParent
from sofia_redux.toolkit.image import adjust

__all__ = ['StepShift']


[docs] class StepShift(StepParent): """ Align the R and T arrays. This step performs various array manipulations to trim and align the detector subarrays. First, the last row is removed, since it is unilluminated. Then, the R1 and T1 subarrays are rotated to align with R0 and T0, and all subarrays are flipped over the y-axis. The first and last chops in the demodulated data are removed. Finally, the R array is shifted to align with the T array. Only linear pixel shifts between R and T are performed here. Any additional misalignment (e.g. rotation) is handled in later mapping steps. The parameters passed here are set as header keywords, to be applied as needed by `sofia_redux.instruments.hawc.steps.StepMerge`. This step will also check for the TRCKSTAT keyword in the FITS header. If it has been set to BAD, this step will issue an error and halt processing. Input for this step is a flat-fielded file, where the R Array, T Array, and their variances are now images instead of columns in the data table. This step is typically run after `sofia_redux.instruments.hawc.steps.StepFlat`. Output from this step is the same as the input file, except that the R and T images, variances, and masks have been trimmed and shifted. Additionally, some unneeded columns are removed from the DEMODULATED DATA table, and header keywords are set to describe R and T relative geometry. These keywords are ALNANG1, ALNANG2, ALNMAGX, ALNMAGY, ALNGAPX, ALNGAPY, and ALNROTA. """
[docs] def setup(self): """ Set parameters and metadata for the pipeline step. Output files have PRODTYPE = 'shift', and are named with the step abbreviation 'SFT'. Parameters defined for this step are: angle1 : float Rotation angle of R0 relative to T0, in degrees counter-clockwise, to be stored in keyword ALNANG1. angle2 : float Rotation angle of R1 relative to T1, in degrees counterclockwise, to be stored in keyword ALNANG2. mag : list of float Magnification of R relative to T, in the x, y pixel direction, given as [mx, my]. This will be stored in keywords ALNMAGX, ALNMAGY. disp1 : list of float Pixel displacement of R0 relative to T0, in the x, y directions, given as [dx, dy]. This will be applied to the data with a linearly interpolated shift. disp2 : list of float Pixel displacement of R1 relative to T1, in the x, y directions, given as [dx, dy]. This will be applied to the data with a linearly interpolated shift. gapx : float Displacement in x pixels between T0 and T1. This will be stored in keyword ALNGAPX. gapy : float Displacement in y pixels between T0 and T1. This will be stored in keyword ALNGAPY. gapangle : float Rotation angle in degrees counter-clockwise between T0 and T1. This will be stored in keyword ALNROTA. """ # Name of the pipeline reduction step self.name = 'shift' self.description = 'Align Arrays' # Shortcut for pipeline reduction step and identifier for # saved file names. self.procname = 'sft' # Clear Parameter list self.paramlist = [] # Append parameters self.paramlist.append(['angle1', 0.0, 'Rotation angle of R0 relative to T0, ' 'in degrees counterclockwise']) self.paramlist.append(['angle2', 0.0, 'Rotation angle of R1 relative to T1, ' 'in degrees counterclockwise']) self.paramlist.append(['mag', [1.0, 1.0], 'Magnification of R relative to T, ' 'in the x, y pixel direction']) self.paramlist.append(['disp1', [0.0, 0.0], "Pixel displacement of R0 relative to T0, " "in the x, y directions"]) self.paramlist.append(['disp2', [0.0, 0.0], "Pixel displacement of R1 relative to T1, " "in the x, y directions"]) self.paramlist.append(['gapx', 0.0, 'Displacement in x pixels between T0 and T1']) self.paramlist.append(['gapy', 0.0, 'Displacement in y pixels between T0 and T1']) self.paramlist.append(['gapangle', 0.0, 'Rotation angle in degrees CCW between ' 'T0 and T1'])
[docs] def run(self): """ Run the data reduction algorithm. Because this step is single-in, single-out (SISO), self.datain must be a DataFits object. The output is also a DataFits object, stored in self.dataout. The process is: 1. Check for tracking status problems. 2. Remove unneeded data table columns. 3. Rotate, flip, and trim data arrays. 4. Store alignment keywords. 5. Shift the R array to match the T array. Raises ------ ValueError If TRCKSTAT is BAD. """ # Check if tracking issues were found in stepDemod if 'TRCKSTAT' in self.datain.header: trckstatus = self.datain.getheadval('TRCKSTAT') if "BAD" in trckstatus: msg = 'Bad file due to tracking issues - more ' \ 'than 1/3 of samples were removed in stepDemod' log.error(msg) raise ValueError(msg) else: log.warning('No TRCKSTAT keyword found in the header') originalheader = self.datain.header # Remove columns cols_to_del = ['Chop Offset Imag', 'SLOPES R', 'SLOPES T', 'RAWMED R', 'RAWMED T', 'INTERCEPTS R', 'INTERCEPTS T', 'R ARRAY GAIN', 'T ARRAY GAIN', 'R ARRAY IMAG', 'T ARRAY IMAG', 'R ARRAY IMAG VAR', 'T ARRAY IMAG VAR'] try: tabnames = self.datain.table.names except AttributeError: msg = 'No valid tables in input data' log.error(msg) raise ValueError(msg) for a in cols_to_del: for b in tabnames: if a == b: self.datain.tabledelcol(a) self.dataout = self.datain.copy() # Set the header equal to the input header self.dataout.setheader(originalheader) # Rotate R1, T1, Flip all arrays for datname in ['R array', 'T array', 'R array VAR', 'T array VAR', 'R BAD PIXEL MASK', 'T BAD PIXEL MASK']: data = self.dataout.imageget(datname) nrow, ncol = data.shape[-2:] # Cut by 1 row (if size longer than 40) and do flips and twists # (A flip and rotate is just a flip so # each half only needs one flip) if len(data.shape) > 2: if nrow == 41: data = data[:, :40, :] nrow -= 1 data[..., :ncol // 2] = \ data[:, range(nrow - 1, -1, -1), :ncol // 2] data[..., ncol // 2:ncol] = \ data[..., range(ncol - 1, ncol // 2 - 1, -1)] else: if nrow == 41: data = data[:40, :] nrow -= 1 data[:, :ncol // 2] = \ data[range(nrow - 1, -1, -1), :ncol // 2] data[:, ncol // 2:ncol] = \ data[:, range(ncol - 1, ncol // 2 - 1, -1)] self.dataout.imageset(data, datname) # Remove first and last chops from images and table # (usually produced by demodulation artifacts) for datname in ['R array', 'T array', 'R array VAR', 'T array VAR']: data = self.dataout.imageget(datname) data = data[1:-1, :, :] self.dataout.imageset(data, datname) table = self.dataout.tableget('demodulated data') table = table[1:-1] self.dataout.tableset(table, 'demodulated data') # Get values and arrays disp1 = self.getarg('disp1') disp2 = self.getarg('disp2') mag1 = self.getarg('mag') angle1 = self.getarg('angle1') angle2 = self.getarg('angle2') gapx = self.getarg('gapx') gapy = self.getarg('gapy') gaprot = self.getarg('gapangle') rmask = self.dataout.imageget('R BAD PIXEL MASK') demodr = self.dataout.imageget('R array') demodrv = self.dataout.imageget('R array VAR') # write to fits header alignment info self.dataout.setheadval("ALNANG1", angle1, 'Rotation angle in degrees of R1 wrt to T1') self.dataout.setheadval("ALNANG2", angle2, 'Rotation angle in degrees of R2 wrt to T2') self.dataout.setheadval("ALNMAGX", mag1[0], 'Mag. of R wrt to T in the X direction') self.dataout.setheadval("ALNMAGY", mag1[1], 'Mag. of R wrt to T in the Y direction') self.dataout.setheadval("ALNGAPX", gapx, 'displacement in x pixels between T1 and T2') self.dataout.setheadval("ALNGAPY", gapy, 'displacement in y pixels between T1 and T2') self.dataout.setheadval("ALNROTA", gaprot, 'Rotation angle in degrees CCW ' 'between T1 and T2') # Note, we assume the R and T arrays are the same size and that the # arrays are cleanly divisible by 2 in number of columns nz, nrow, ncol = demodr.shape ncol = ncol // 2 # Apply shifts to R1 relative to T1 ix1 = disp1[0] iy1 = disp1[1] datar1 = demodr[:, :, :ncol] datarv1 = demodrv[:, :, :ncol] maskr1 = rmask[:, :ncol] # must be tenth of a pixel or more to bother shifted = False if not np.allclose([ix1, iy1], 0, atol=1e-2): datar1 = adjust.shift(datar1, [0, iy1, ix1]) datarv1 = adjust.shift(datarv1, [0, iy1, ix1]) maskr1 = adjust.shift(maskr1, [iy1, ix1], mode='constant', missing=4, order=0) shifted = True # Apply shifts to R2 relative to T2 ix2 = disp2[0] iy2 = disp2[1] datar2 = demodr[:, :, ncol:] datarv2 = demodrv[:, :, ncol:] maskr2 = rmask[:, ncol:] if not np.allclose([ix2, iy2], 0, atol=1e-2): datar2 = adjust.shift(datar2, [0, iy2, ix2]) datarv2 = adjust.shift(datarv2, [0, iy2, ix2]) maskr2 = adjust.shift(maskr2, [iy2, ix2], mode='constant', missing=4, order=0) shifted = True if shifted: # some sort of shift applied log.debug('Shifted R array to match T array') # combine demodulated data newdata = np.concatenate((datar1, datar2), axis=2) # combine variance newvar = np.concatenate((datarv1, datarv2), axis=2) # combine r1 & r2 masks newmask = np.concatenate((maskr1, maskr2), axis=1) # write out updated data self.dataout.imageset(newdata, 'R Array') self.dataout.imageset(newvar, 'R Array VAR') self.dataout.imageset(newmask, 'R BAD PIXEL MASK')