Source code for sofia_redux.instruments.hawc.steps.steprotate

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Stokes Q and U rotation pipeline step."""

from astropy import log
import numpy as np

from sofia_redux.instruments.hawc.stepparent import StepParent

__all__ = ['StepRotate']


[docs] class StepRotate(StepParent): r""" Rotate Stokes Q and U from detector reference frame to sky. The rotation angle is derived from telescope data and the half-wave plate (HWP) position for the observation. The telescope vertical position angle (VPA) is read from the FITS header keyword VPOS_ANG. HWP zero angle and actual initial angle are read from the keywords HWPSTART and HWPINIT, respectively. Input for this step is a DataFits containing STOKES and ERROR frames for I, Q and U each, as well as COVAR Q I, COVAR U I, and COVAR Q U extensions. This step is typically run after the `sofia_redux.instruments.hawc.steps.StepIP` pipeline step. The output image contains the same image frames as the input image. STOKES and ERROR frames for Q and U have been rotated; the covariance frames have been propagated. Notes ----- The rotation angle is defined per the HAWC+ Geometric Reference (Kovács memo, Appendix H), and is applied to the Q and U images with a standard rotation matrix as follows: .. math:: Q' = cos(\alpha) Q + sin(\alpha) U .. math:: U' = sin(\alpha) Q - cos(\alpha) U. .. math:: \sigma_Q' = \sqrt{(cos(\alpha)\sigma_Q)^2 + (sin(\alpha) \sigma_U)^2 + 2 cos(\alpha) sin(\alpha) \sigma_{QU}} .. math:: \sigma_U' = \sqrt{(sin(\alpha)\sigma_Q)^2 + (cos(\alpha) \sigma_U)^2 - 2 cos(\alpha) sin(\alpha) \sigma_{QU}} .. math:: \sigma_{Q'I} = cos(\alpha) \sigma_{QI} + sin(\alpha) \sigma_{UI} .. math:: \sigma_{U'I} = sin(\alpha) \sigma_{QI} - cos(\alpha) \sigma_{UI} .. math:: \sigma_{Q'U'} = cos(\alpha)sin(\alpha)(\sigma_Q^2 - \sigma_U^2) + (sin^2(\alpha) - cos^2(\alpha)) \sigma_{QU}. """
[docs] def setup(self): """ Set parameters and metadata for the pipeline step. Output files have PRODTYPE = 'rotate', and are named with the step abbreviation 'ROT'. Parameters defined for this step are: gridangle : list of float Angle of the grid in degrees (one value for each waveband). hwpzero_tol : float Tolerance for the difference between commanded and actual initial HWP angles. hwpzero_option : str If set to 'commanded', then the HWPSTART keyword will be used if the difference between the initial HWP angles is > hwpzero_tol. If set to 'actual', the HWPINIT keyword will be used. Otherwise, an error will be raised. """ # Name of the pipeline reduction step self.name = 'rotate' self.description = 'Rotate Stokes' # Shortcut for pipeline reduction step and identifier for # saved file names. self.procname = 'rot' # Clear Parameter list self.paramlist = [] # Append parameters self.paramlist.append(['gridangle', [0.0, 0.0, 0.0, 0.0, 0.0], 'Angle of the grid in degrees ' '(for each waveband)']) self.paramlist.append(['hwpzero_tol', 3.0, 'Tolerance in the difference between ' 'commanded and actual initial HWP angles']) self.paramlist.append(['hwpzero_option', 'commanded', 'Option to use between "commanded" or ' '"actual" in case the difference between ' 'the initial HWP angles is > hwpzero_tol'])
[docs] def read_angle(self): """ Read a grid angle value from the parameters. The parameters are expected to be defined as a list, with one entry for each HAWC band. The correct value for the input data is selected from the list. Returns ------- float The grid angle. """ gridang = self.getarg('gridangle') waveband = self.datain.getheadval('spectel1') bands = ['A', 'B', 'C', 'D', 'E'] try: idx = bands.index(waveband[-1]) except (ValueError, IndexError): # waveband not in list msg = 'Cannot parse waveband: %s' % waveband log.error(msg) raise ValueError(msg) try: gridang = gridang[idx] except IndexError: msg = 'Need grid angle values for all wavebands' log.error(msg) raise IndexError(msg) return gridang
[docs] def run(self): """ Run the data reduction algorithm. Because this step is single-in, single-out (SISO), self.datain must be a DataFits object. The output is also a DataFits object, stored in self.dataout. The process is: 1. Compute the rotation angle from VPA, HWP zero, and the grid angle. 2. Rotate Stokes Q and U arrays. 3. Propagate errors and covariances. """ self.dataout = self.datain.copy() nhwp = self.dataout.getheadval('nhwp') if nhwp == 1: log.info('Only 1 HWP, so skipping step %s' % self.name) else: vpa = self.dataout.getheadval('VPOS_ANG') gridang = self.read_angle() hwpzero_tol = self.getarg('hwpzero_tol') hwpzero_option = self.getarg('hwpzero_option') # Test if commanded and actual initial # HWP angles are consistent (within tolerance) hwpzero = self.dataout.getheadval('hwpstart') hwpinit = self.dataout.getheadval('hwpinit') if hwpzero > 180.: hwpzero = hwpzero - 360. if hwpinit > 180.: hwpinit = hwpinit - 360. diffhwp = abs(hwpinit - hwpzero) if diffhwp > hwpzero_tol: if hwpzero_option == 'commanded': log.error('Initial HWP angle difference is above ' 'the tolerance. Will use the %s ' 'value (%s)' % (hwpzero_option, hwpzero)) elif hwpzero_option == 'actual': log.error('Initial HWP angle difference is above ' 'the tolerance. Will use the %s ' 'value (%s)' % (hwpzero_option, hwpinit)) hwpzero = hwpinit else: msg = 'Initial HWP angle difference is above the ' \ 'tolerance. hwpzero_option parameter value ' \ 'must be commanded or actual' log.error(msg) raise ValueError(msg) # The equations below are described at Appendix H of # Attila's memo on 'HAWC+ Geometric Reference' # propagation equations: # I' = I # Q' = c Q + s U # U' = s Q - c U # VI' = VI # VQ' = c^2 VQ + s^2 VU + 2 c s cov(Q,U) # VU' = s^2 VQ + c^2 VU - 2 c s cov(Q,U) # cov(Q',I') = c cov(Q,I) + s cov(U,I) # cov(U',I') = s cov(Q,I) - c cov(U,I) # cov(Q',U') = c s VQ - c s VU + (s^2 - c^2) cov(Q,U) theta = (vpa + 2. * (hwpzero - 5.) + gridang) * np.pi / 180. cos = np.cos(2. * theta) sin = np.sin(2. * theta) oldq = self.dataout.imageget('STOKES Q') oldu = self.dataout.imageget('STOKES U') q = cos * oldq + sin * oldu self.dataout.imageset(q, 'STOKES Q') u = sin * oldq - cos * oldu self.dataout.imageset(u, 'STOKES U') varq = (self.dataout.imageget('ERROR Q'))**2 varu = (self.dataout.imageget('ERROR U'))**2 covqi = self.dataout.imageget('COVAR Q I') covui = self.dataout.imageget('COVAR U I') covqu = self.dataout.imageget('COVAR Q U') varqprime = cos**2 * varq + \ sin**2 * varu + \ 2 * cos * sin * covqu varuprime = sin**2 * varq + \ cos**2 * varu - \ 2 * cos * sin * covqu covqiprime = cos * covqi + sin * covui covuiprime = sin * covqi - cos * covui covquprime = cos * sin * varq - \ cos * sin * varu + \ (sin**2 - cos**2) * covqu self.dataout.imageset(np.sqrt(varqprime), 'ERROR Q') self.dataout.imageset(np.sqrt(varuprime), 'ERROR U') self.dataout.imageset(covqiprime, 'COVAR Q I') self.dataout.imageset(covuiprime, 'COVAR U I') self.dataout.imageset(covquprime, 'COVAR Q U')