# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Polarization vector pipeline step."""
from astropy import log
from astropy import wcs as astwcs
from astropy.io import fits
import numpy as np
from sofia_redux.instruments.hawc.stepparent import StepParent
__all__ = ['StepPolVec']
[docs]
class StepPolVec(StepParent):
r"""
Calculate polarization vectors.
This step uses the Stokes parameter images and associated
errors and covariances to compute the percent polarization
and angle for each pixel. It also computes the debiased
polarization and the rotated angle (for B-field directions).
These values are all stored in a FITS table extension named
'Pol Data'.
The input for this step is a DataFits object with images:
STOKES I, ERROR I, STOKES Q, STOKES U, COVAR Q I, COVAR U I,
COVAR Q U. This step is typically run after
`sofia_redux.instruments.hawc.steps.StepMerge`.
Output for this step contains the same image frames as the input
files, plus a table with the polarization vectors for each
pixel (POL DATA). Images of these data are also added as separate
extensions. The added extensions are named: PERCENT POL,
DEBIASED PERCENT POL, ERROR PERCENT POL, POL ANGLE, ROTATED POL
ANGLE, ERROR POL ANGLE, POL FLUX, ERROR POL FLUX, DEBIASED POL FLUX.
Finally, the PROCSTAT header keyword will be updated to LEVEL\_4
after this step.
Notes
-----
The standard equations are used to convert from Stokes
parameters to polarization percentage and angle:
.. math:: \theta = \frac{90}{\pi} arctan\Big(\frac{U}{Q}\Big)
.. math:: \sigma_\theta = \frac{90}{\pi (Q^2 + U^2)}
\sqrt{(U\sigma_Q)^2 +
(Q\sigma_U)^2 - 2 Q U \sigma_{QU}}.
The percent polarization (:math:`p`) and its error are calculated as
.. math:: p = 100 \sqrt{\Big(\frac{Q}{I}\Big)^2
+ \Big(\frac{U}{I}\Big)^2}
.. math:: \sigma_p = \frac{100}{I} \sqrt{\frac{1}{(Q^2 + U^2)}
\Big[(Q \sigma_Q)^2
+ (U \sigma_U)^2 + 2 Q U \sigma_{QU}\Big]
+ \Big[\Big(\frac{Q}{I}\Big)^2
+ \Big(\frac{U}{I}\Big)^2\Big]
\sigma_I^2 - 2 \frac{Q}{I}\sigma_{QI}
- 2 \frac{U}{I} \sigma_{UI}}.
The debiased polarization percentage (:math:`p'`)is also calculated, as:
.. math:: p' = \sqrt{p^2 - \sigma_p^2}.
The polarization efficiency provided in the 'eff' parameter is
applied to the Q and U values (and their associated errors and
covariances) after calculating :math:`\theta`, but before calculating
percent polarization.
"""
[docs]
def setup(self):
"""
Set parameters and metadata for the pipeline step.
Output files have PRODTYPE = 'polvec', and are named with
the step abbreviation 'VEC'.
Parameters defined for this step are:
eff : list of float
Telescope + instrument polarization efficiency. One
value per HAWC waveband.
"""
# Name of the pipeline reduction step
self.name = 'polvec'
self.description = 'Compute Vectors'
# Shortcut for pipeline reduction step and identifier for
# saved file names.
self.procname = 'vec'
# Clear Parameter list
self.paramlist = []
# Append parameters
self.paramlist.append(['eff', [1.0, 1.0, 1.0, 1.0, 1.0],
'telescope + instrument polarization '
'efficiency for each waveband'])
[docs]
def read_eff(self):
"""
Read an efficiency value from the parameters.
The parameters are expected to be defined as a list, with
one entry for each HAWC band. The correct value for the
input data is selected from the list.
Returns
-------
float
The polarization efficiency.
"""
waveband = self.datain.getheadval('spectel1')
bands = ['A', 'B', 'C', 'D', 'E']
try:
idx = bands.index(waveband[-1])
except (ValueError, IndexError):
# waveband not in list
msg = 'Cannot parse waveband: %s' % waveband
log.error(msg)
raise ValueError(msg)
try:
eff = self.getarg('eff')[idx]
except IndexError:
msg = 'Need efficiency values for all wavebands'
log.error(msg)
raise IndexError(msg)
return eff
[docs]
def run(self):
"""
Run the data reduction algorithm.
Because this step is single-in, single-out (SISO),
self.datain must be a DataFits object. The output
is also a DataFits object, stored in self.dataout.
The process is:
1. Compute polarization vector magnitude and direction.
Propagate errors accordingly.
2. Store the data in tables and images.
"""
self.dataout = self.datain.copy()
nhwp = self.dataout.getheadval('nhwp')
# suppress warnings when I do stuff with NaN's or divide by zero
np.seterr(invalid='ignore', divide='ignore')
if nhwp == 1:
log.info('Only 1 HWP, so skipping step %s' % self.name)
else:
si = self.dataout.imageget("STOKES I")
sq = self.dataout.imageget("STOKES Q")
su = self.dataout.imageget("STOKES U")
di = self.dataout.imageget("ERROR I")
dq = self.dataout.imageget("ERROR Q")
du = self.dataout.imageget("ERROR U")
covqi = self.dataout.imageget("COVAR Q I")
covui = self.dataout.imageget("COVAR U I")
covqu = self.dataout.imageget("COVAR Q U")
# Compute parameters
# theta = 90/pi * atan(U/Q)
# Vtheta = (90/pi)^2 *
# (Q^2 + U^2)^-2 (U^2 VQ + Q^2VU - 2 Q U cov(Q,U))
# (arctan2 puts angle in right quadrant)
theta = (90. / np.pi) * np.arctan2(su, sq)
dtheta = (90. / (np.pi * (sq**2 + su**2))) * \
np.sqrt((su * dq)**2 + (sq * du)**2 - 2 * sq * su * covqu)
rot_theta = theta + 90
rot_theta[rot_theta > 90] -= 180
# apply efficiency correction to Q and U
# (this gets propagated to p)
eff = self.read_eff()
sq /= eff
su /= eff
dq /= eff
du /= eff
covqi /= eff
covui /= eff
covqu /= eff**2
# polarized flux
# f = (Q^2 + U^2) ^ (1/2)
# Vf = (1/f^2)(Q^2 VQ + U^2 Vu + 2 Q U cov(Q,U))
pflux = np.sqrt(sq**2 + su**2)
dpflux = np.sqrt((sq * dq)**2
+ (su * du)**2
+ 2 * sq * su * covqu) / pflux
debias_pflux = pflux**2 - dpflux**2 # ricean debiasing
mask = np.where(debias_pflux < 0) # catch negative square roots
debias_pflux[mask] = 0
debias_pflux = np.sqrt(debias_pflux)
# polarization fraction
# p = (1/I) (Q^2 + U^2)^(1/2)
# Vp = (1/(I^4 p^2)) (Q^2 VQ + U^2 VU + 2 Q U cov(Q,U))
# + 1/I^4 ((Q^2 + U^2) VI - 2 Q I cov(Q,I) - 2 U I cov(U,i))
p = np.sqrt((1 / si**2) * (sq**2 + su**2))
vp = (p**-2 * si**-4) * ((sq * dq)**2
+ (su * du)**2
+ 2 * sq * su * covqu) \
+ (si**-4) * ((sq**2 + su**2) * di**2
- 2 * sq * si * covqi
- 2 * su * si * covui)
dp = np.sqrt(vp)
# convert p to percentage
p = 100 * p
dp = 100 * dp
# debias data
debias_p = p**2 - dp**2 # ricean debiasing
mask = np.where(debias_p < 0) # catch negative square roots
debias_p[mask] = 0
debias_p = np.sqrt(debias_p)
# Note that here we set x equal to columns and y equal to rows
ny, nx = si.shape
y, x = np.mgrid[0:ny, 0:nx] + 1
x = x.flatten()
y = y.flatten()
# Convert pixels to wcs
wcs = astwcs.WCS(self.datain.header)
ra, dec = wcs.wcs_pix2world(x, y, 1) # zero-based input pixels
# create table columns
cols = [fits.Column(name="Pixel X", format='J', array=x),
fits.Column(name="Pixel Y", format='J', array=y),
fits.Column(name="Right Ascension", format='D',
array=ra, unit='deg'),
fits.Column(name="Declination", format='D',
array=dec, unit='deg'),
fits.Column(name="Percent Pol", format='D',
array=p.flatten()),
fits.Column(name="Debiased Percent Pol", format='D',
array=debias_p.flatten()),
fits.Column(name="Err. Percent Pol", format='D',
array=dp.flatten()),
fits.Column(name="Theta", format='D', unit='deg',
array=theta.flatten()),
fits.Column(name="Rotated Theta", format='D',
unit='deg', array=rot_theta.flatten()),
fits.Column(name="Err. Theta", format='D', unit='deg',
array=dtheta.flatten())]
c = fits.ColDefs(cols)
tbhdu = fits.BinTableHDU.from_columns(c)
self.dataout.tableset(tbhdu.data, "POL DATA",
tbhdu.header)
# creating header with wcs info to use in new HDU's
wcshead = fits.Header()
wcshead['CDELT1'] = self.datain.header['CDELT1']
wcshead['CDELT2'] = self.datain.header['CDELT2']
wcshead['CRVAL1'] = self.datain.header['CRVAL1']
wcshead['CRVAL2'] = self.datain.header['CRVAL2']
wcshead['CRPIX1'] = self.datain.header['CRPIX1']
wcshead['CRPIX2'] = self.datain.header['CRPIX2']
wcshead['CTYPE1'] = self.datain.header['CTYPE1']
wcshead['CTYPE2'] = self.datain.header['CTYPE2']
wcshead['EQUINOX'] = self.datain.header['EQUINOX']
wcshead['NHWP'] = self.datain.header['NHWP']
# now write out images of the relevant data so they can easily
# be viewed in something like ds9
# Q/U and its errors will overwrite the previous HDUs
# They now have pol. efficiency factor applied
# Also replace dI to make sure WCS is set
self.dataout.imageset(di, "Error I", wcshead)
self.dataout.imageset(sq, "Stokes Q", wcshead)
self.dataout.imageset(dq, "Error Q", wcshead)
self.dataout.imageset(su, "Stokes U", wcshead)
self.dataout.imageset(du, "Error U", wcshead)
self.dataout.imageset(p, "Percent Pol", wcshead)
self.dataout.imageset(debias_p, "Debiased Percent Pol", wcshead)
self.dataout.imageset(dp, "Error Percent Pol", wcshead)
self.dataout.imageset(theta, "Pol Angle", wcshead)
self.dataout.imageset(rot_theta, "Rotated Pol Angle", wcshead)
self.dataout.imageset(dtheta, "Error Pol Angle", wcshead)
self.dataout.imageset(pflux, "Pol Flux", wcshead)
self.dataout.imageset(dpflux, "Error Pol Flux", wcshead)
self.dataout.imageset(debias_pflux, "Debiased Pol Flux", wcshead)
# delete the covariance images -- they're no longer needed
self.dataout.imagedel("COVAR Q I")
self.dataout.imagedel("COVAR U I")
self.dataout.imagedel("COVAR Q U")
# set BUNIT appropriately for all extensions
for name in ['Stokes I', 'Error I',
'Stokes Q', 'Error Q',
'Stokes U', 'Error U',
'Pol Flux', 'Error Pol Flux',
'Debiased Pol Flux']:
self.dataout.setheadval('BUNIT', 'Jy/pixel',
comment='Data units',
dataname=name)
for name in ["Percent Pol", "Debiased Percent Pol",
"Error Percent Pol"]:
self.dataout.setheadval('BUNIT', 'percent',
comment='Data units',
dataname=name)
for name in ["Pol Angle", "Rotated Pol Angle",
"Error Pol Angle"]:
self.dataout.setheadval('BUNIT', 'deg',
comment='Data units',
dataname=name)
self.dataout.setheadval('PROCSTAT', 'LEVEL_4')