# Licensed under a 3-clause BSD style license - see LICENSE.rst
from astropy import log, units
from astropy.io import fits
import numpy as np
import os
from sofia_redux.scan.custom.hawc_plus.info.astrometry import (
HawcPlusAstrometryInfo)
from sofia_redux.scan.custom.hawc_plus.info.chopping import (
HawcPlusChoppingInfo)
from sofia_redux.scan.custom.hawc_plus.info.detector_array import (
HawcPlusDetectorArrayInfo)
from sofia_redux.scan.custom.hawc_plus.info.instrument import (
HawcPlusInstrumentInfo)
from sofia_redux.scan.custom.hawc_plus.info.telescope import (
HawcPlusTelescopeInfo)
from sofia_redux.scan.custom.hawc_plus.info.observation import (
HawcPlusObservationInfo)
from sofia_redux.scan.custom.sofia.info.info import SofiaInfo
from sofia_redux.scan.custom.sofia.info.gyro_drifts import SofiaGyroDriftsInfo
from sofia_redux.scan.custom.sofia.info.extended_scanning import (
SofiaExtendedScanningInfo)
from sofia_redux.scan.utilities.utils import (
insert_info_in_header, to_header_float)
from sofia_redux.toolkit.utilities import multiprocessing
__all__ = ['HawcPlusInfo']
[docs]
class HawcPlusInfo(SofiaInfo):
def __init__(self, configuration_path=None):
"""
Initialize a HawcPlusInfo object.
The HAWC+ information contains metadata on various parts of an
observation that are specific to observations with the instrument.
Parameters
----------
configuration_path : str, optional
An alternate directory path to the configuration tree to be
used during the reduction. The default is
<package>/data/configurations.
"""
super().__init__(configuration_path=configuration_path)
self.name = 'hawc_plus'
self.astrometry = HawcPlusAstrometryInfo()
self.gyro_drifts = SofiaGyroDriftsInfo()
self.chopping = HawcPlusChoppingInfo()
self.detector_array = HawcPlusDetectorArrayInfo()
self.instrument = HawcPlusInstrumentInfo()
self.spectroscopy = None
self.telescope = HawcPlusTelescopeInfo()
self.observation = HawcPlusObservationInfo()
self.scanning = SofiaExtendedScanningInfo()
self.hwp_grouping_angle = 2 * units.Unit('degree')
[docs]
@classmethod
def get_file_id(cls):
"""
Return the file ID.
Returns
-------
str
"""
return 'HAW'
[docs]
def validate_scans(self, scans):
"""
Validate a list of scans specific to the instrument.
Parameters
----------
scans : list (HawcPlusScan)
A list of scans. Scans are culled in-place if they do not meet
certain criteria.
Returns
-------
None
"""
if scans is None or len(scans) < 2 or scans[0] is None:
super().validate_scans(scans)
return
n_scans = len(scans)
first_scan = scans[0]
wavelength = first_scan.info.instrument.wavelength
instrument_config = first_scan.info.instrument.instrument_config
keep_scans = np.full(n_scans, True)
for i in range(1, n_scans):
scan = scans[i]
if scan.info.instrument.wavelength != wavelength:
log.warning(f"Scan {scan.get_id()} in a different band. "
f"Removing from set.")
keep_scans[i] = False
elif scan.info.instrument.instrument_config != instrument_config:
log.warning(f"Scan {scan.get_id()} is in a different "
f"instrument configuration. Removing from set.")
keep_scans[i] = False
else:
limit = scan.configuration.get_float(
'gyrocorrect.max', default=np.nan) * units.Unit('arcsec')
if np.isnan(limit):
continue
if scan.info.gyro_drifts.get_max() > limit:
log.warning(f"Scan {scan.get_id()} has too large gyro "
f"drifts. Removing from set.")
keep_scans[i] = False
for i in range(n_scans - 1, 0, -1):
if not keep_scans[i]:
del scans[i]
super().validate_scans(scans)
[docs]
def max_pixels(self):
"""
Return the maximum number of pixels.
Returns
-------
count : int
"""
return self.instrument.n_store_channels
[docs]
def get_si_pixel_size(self):
"""
Get the science instrument pixel size.
Returns
-------
size : Coordinate2D
The (x, y) pixel sizes, each of which is a units.Quantity.
"""
return self.detector_array.pixel_sizes
[docs]
def group_files_by_hwp(self, filenames, jobs=1, force_threading=False):
"""
Group HAWC+ files by HWP angle.
Parameters
----------
filenames : list (str)
A list of HAWC+ FITS files to group.
jobs : int
The number of parallel jobs used to determine the grouping.
force_threading : bool
If `True`, force parallel processing using threads.
Returns
-------
file_groups : dict
The files grouped by HWP angle {angle : [files]}
"""
if isinstance(filenames, str):
filenames = [filenames]
n_files = len(filenames)
read_jobs = int(np.clip(n_files, 1, jobs))
msg = f"Grouping {n_files} HAWC_PLUS files by HWP angles"
if jobs > 1: # pragma: no cover
msg += f" using {read_jobs} parallel threads."
log.debug(msg)
file_groups = {}
da = self.hwp_grouping_angle
hwp_step = self.instrument.hwp_step
args = filenames, hwp_step
kwargs = None
hwp_angles = multiprocessing.multitask(
self.parallel_safe_determine_hwp_angle, range(n_files), args,
kwargs, jobs=read_jobs, max_nbytes=None,
force_threading=force_threading, logger=log)
for filename, hwp_angle in zip(filenames, hwp_angles):
if np.isnan(hwp_angle):
if None not in file_groups:
file_groups[None] = [filename]
else:
file_groups[None].append(filename)
continue
for angle, angle_files in file_groups.items():
if (angle - da) <= hwp_angle <= (angle + da):
angle_files.append(filename)
break
else:
file_groups[hwp_angle] = [filename]
log.info(f"{len(file_groups)} HWP groups will be reduced.")
return file_groups
[docs]
@classmethod
def parallel_safe_determine_hwp_angle(cls, args, file_index):
"""
Return the HWP (half-wave-plate) angle for a single file.
This function is safe for multiprocessing using
:func:`multiprocessing.multitask`.
Parameters
----------
args : 2-tuple
args[0] : list (str)
A list of FITS file names.
args[1] : units.Quantity
The HWP step used to convert HWP counts to a HWP angle.
file_index : int
The index of the file in args[0] for which to determine the HWP
angle.
Returns
-------
hwp_angle: units.Quantity
"""
filenames, hwp_step = args
filename = filenames[file_index]
return cls.determine_hwp_angle(filename, hwp_step)
[docs]
@classmethod
def determine_hwp_angle(cls, filename, hwp_step):
"""
Determine the mean HWP angle in the given file.
Parameters
----------
filename : str
The FITS file to search.
hwp_step : units.Quantity
The HWP step size for each HWP count.
Returns
-------
hwp_angle : units.Quantity
The average HWP angle in the file.
"""
if not os.path.isfile(filename):
log.error(f"Could not locate file: {filename}")
return np.nan * units.Unit('degree')
hdul = fits.open(filename)
hwp_counts = np.empty(0, dtype=int)
for hdu in hdul:
if isinstance(hdu, fits.BinTableHDU):
extname = hdu.header.get('EXTNAME')
if extname is None or extname.lower().strip() != 'timestream':
continue
if 'hwpCounts' not in hdu.columns.names:
continue
hwp_counts = np.concatenate(
(hwp_counts, hdu.data['hwpCounts'].ravel()))
hdul.close()
if hwp_counts.size == 0:
log.warning(f"Could not find HWP angles for file: {filename}")
return np.nan * units.Unit('degree')
return np.nanmean(hwp_counts) * hwp_step
[docs]
@staticmethod
def split_reduction(reduction, file_groups):
"""
Split the reduction based on files grouped by HWP angle.
Parameters
----------
reduction : Reduction
file_groups : dict
Returns
-------
None
"""
base_config = reduction.configuration
base_name = base_config.get_string('name', default=None)
base_subarray = base_config.get_string_list(
'subarray', delimiter=',', default=None)
reductions = []
for angle, file_group in file_groups.items():
if isinstance(angle, units.Quantity):
angle = angle.to('degree').value
angle_string = f'{angle:.2f}'
r0 = reduction.blank_copy()
t0 = reduction.blank_copy()
for subarray, configuration in zip(
['R0', 'T0'], [r0.configuration, t0.configuration]):
configuration.recall('name') # in case it was disabled
new_name = f'{angle_string}{subarray}.fits'
if base_name is not None:
new_name = f'{base_name}.{new_name}'
configuration.put('name', new_name)
configuration.recall('subarray')
if base_subarray is None:
configuration.put('subarray', subarray)
configuration.lock('subarray')
else:
subarrays = ','.join(np.unique(base_subarray + [subarray]))
configuration.put('subarray', subarrays)
configuration.recall('hwp')
configuration.put('hwp', angle_string)
reductions.append((r0, file_group))
reductions.append((t0, file_group))
reduction.sub_reductions = []
for i, (sub_reduction, file_group) in enumerate(reductions):
reduction.sub_reductions.append(sub_reduction)
sub_reduction.parent_reduction = reduction
sub_reduction.reduction_files = file_group
sub_reduction.reduction_number = i + 1