# Licensed under a 3-clause BSD style license - see LICENSE.rst
from astropy import log
import numba as nb
import numpy as np
from sofia_redux.scan.custom.fifi_ls.info.astrometry import (
FifiLsAstrometryInfo)
from sofia_redux.scan.custom.fifi_ls.info.detector_array import (
FifiLsDetectorArrayInfo)
from sofia_redux.scan.custom.fifi_ls.info.instrument import (
FifiLsInstrumentInfo)
from sofia_redux.scan.custom.sofia.info.info import SofiaInfo
from sofia_redux.scan.custom.sofia.info.extended_scanning import (
SofiaExtendedScanningInfo)
from sofia_redux.scan.utilities.utils import (
insert_info_in_header, to_header_float)
from sofia_redux.scan.source_models.source_numba_functions import (
get_source_signal)
from sofia_redux.scan.coordinate_systems.coordinate_2d import Coordinate2D
from sofia_redux.scan.signal.correlated_signal import CorrelatedSignal
from sofia_redux.scan.signal import signal_numba_functions as snf
__all__ = ['FifiLsInfo', 'normalize_scan_coordinates', 'correct_for_gain']
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def normalize_scan_coordinates(ra, dec, x, y, z, data, error, valid, flags,
channel_valid): # pragma: no cover
"""
Convert (frame, pixel) indexed data to consistent flat arrays.
Parameters
----------
ra : numpy.ndarray (float)
The right-ascension values of shape (n_frames, n_pixels).
dec : numpy.ndarray (float)
The declination values of shape (n_frames, n_pixels).
x : numpy.ndarray (float)
The detector x-coordinates of shape (n_frames, n_pixels).
y : numpy.ndarray (float)
The detector y-coordinates of shape (n_frames, n_pixels).
z : numpy.ndarray (float)
The pixel wavelength values of shape (n_pixels,).
data : numpy.ndarray (float)
The detector data values of shape (n_frames, n_pixels).
error : numpy.ndarray (float)
The detector error values of shape (n_frames, n_pixels).
valid : numpy.ndarray (bool)
The valid frames (`True`) of shape (n_frames,).
flags : numpy.ndarray (int)
The sample flags of shape (n_frames, n_pixels). Only zero valued flags
will be included in the output.
channel_valid : numpy.ndarray (bool)
A boolean mask of shape (n_pixels,) where `False` indicates that a
given channel is bad and should not be included.
Returns
-------
flattened : 7-tuple (numpy.ndarray)
Seven arrays each of shape (n,) where n is the number of zero-valued
samples that are also valid frames. The order of the arrays are:
(RA, DEC, WAVE, X, Y, DATA, ERROR).
"""
n_frames, n_pixels = x.shape
n_max = data.size
ra_out = np.empty(n_max, dtype=nb.float64)
dec_out = np.empty(n_max, dtype=nb.float64)
x_out = np.empty(n_max, dtype=nb.float64)
y_out = np.empty(n_max, dtype=nb.float64)
z_out = np.empty(n_max, dtype=nb.float64)
d_out = np.full(n_max, np.nan)
e_out = np.zeros(n_max)
i = 0
for pixel in range(n_pixels):
if not channel_valid[pixel]:
continue
z_value = z[pixel]
for frame in range(n_frames):
if not valid[frame]:
continue
if flags[frame, pixel]:
continue
ra_out[i] = ra[frame, pixel]
dec_out[i] = dec[frame, pixel]
x_out[i] = x[frame, pixel]
y_out[i] = y[frame, pixel]
z_out[i] = z_value
d_out[i] = data[frame, pixel]
e_out[i] = error[frame, pixel]
i += 1
return (ra_out[:i], dec_out[:i], z_out[:i], x_out[:i], y_out[:i],
d_out[:i], e_out[:i])
[docs]
@nb.njit(cache=True, nogil=False, parallel=False, fastmath=False)
def correct_for_gain(data, frame_valid, frame_gains, sync_gains,
channel_flags, sample_flags, source_blank_flag
): # pragma: no cover
"""
Use the calculated gains from the scan reduction to normalize data.
Parameters
----------
data : numpy.ndarray (float)
The frame data to be corrected of shape (n_frames, n_channels).
Updated in-place.
frame_valid : numpy.ndarray (bool)
A boolean mask of shape (n_frames,) where `False` indicates that a
particular frame is invalid and should not be included.
frame_gains : numpy.ndarray (float)
The frame gains as calculated by the scan reduction of shape
(n_frames,).
sync_gains : numpy.ndarray (float)
The channel gains as calculated by the scan reduction of shape
(n_channels,).
channel_flags : numpy.ndarray (int)
An array of channel flags of shape (n_channels,) where any nonzero
value indicates that there is some problem with the calculated gain
value or channel properties, and that it should not be included in the
corrected data.
sample_flags : numpy.ndarray (int)
source_blank_flag
Returns
-------
"""
n_frames, n_channels = data.shape
for frame in range(n_frames):
if not frame_valid[frame]:
continue
fg = frame_gains[frame]
for channel in range(n_channels):
if channel_flags[channel] != 0:
data[frame, channel] = np.nan
continue
flag = sample_flags[frame, channel]
if flag != 0 and flag != source_blank_flag:
data[frame, channel] = np.nan
continue
gain = sync_gains[channel] * fg
if gain == 0 or np.isnan(gain):
data[frame, channel] = np.nan
else:
data[frame, channel] /= gain
[docs]
class FifiLsInfo(SofiaInfo):
def __init__(self, configuration_path=None):
"""
Initialize the FIFI-LS information.
The HAWC+ information contains metadata on various parts of an
observation that are specific to observations with the instrument.
Parameters
----------
configuration_path : str, optional
An alternate directory path to the configuration tree to be
used during the reduction. The default is
<package>/data/configurations.
"""
super().__init__(configuration_path=configuration_path)
self.name = 'fifi_ls'
self.astrometry = FifiLsAstrometryInfo()
self.detector_array = FifiLsDetectorArrayInfo()
self.instrument = FifiLsInstrumentInfo()
self.spectroscopy = None
self.scanning = SofiaExtendedScanningInfo()
[docs]
@classmethod
def get_file_id(cls):
"""
Return the file ID.
Returns
-------
str
"""
return 'FIFI'
[docs]
def validate_scans(self, scans):
"""
Validate a list of scans specific to the instrument.
Parameters
----------
scans : list (HawcPlusScan)
A list of scans. Scans are culled in-place if they do not meet
certain criteria.
Returns
-------
None
"""
if scans is None or len(scans) < 2 or scans[0] is None:
super().validate_scans(scans)
return
n_scans = len(scans)
first_scan = scans[0]
wavelength = first_scan.info.instrument.wavelength
instrument_config = first_scan.info.instrument.instrument_config
keep_scans = np.full(n_scans, True)
for i in range(1, n_scans):
scan = scans[i]
if scan.info.instrument.wavelength != wavelength:
log.warning(f"Scan {scan.get_id()} in a different band. "
f"Removing from set.")
keep_scans[i] = False
elif scan.info.instrument.instrument_config != instrument_config:
log.warning(f"Scan {scan.get_id()} is in a different "
f"instrument configuration. Removing from set.")
keep_scans[i] = False
for i in range(n_scans - 1, 0, -1):
if not keep_scans[i]:
del scans[i]
super().validate_scans(scans)
[docs]
def max_pixels(self):
"""
Return the maximum number of pixels.
Returns
-------
count : int
"""
return self.detector_array.pixels
[docs]
def get_si_pixel_size(self):
"""
Get the science instrument pixel size.
Returns
-------
size : Coordinate2D
The (x, y) pixel sizes, each of which is a units.Quantity.
"""
return self.detector_array.pixel_sizes
[docs]
@staticmethod
def combine_reduction_scans_for_resampler(reduction): # pragma: no cover
"""
Used to combine and extract the data for subsequent reduction.
Parameters
----------
reduction : Reduction
Returns
-------
combined_data : dict
"""
combined_info = {}
n_samples = 0
source = reduction.source
signal_mode = source.signal_mode
header_list = []
filenames = []
all_results = []
scan_samples = []
corner_coordinates = []
corner_xy_coordinates = []
for scan in reduction.scans:
# Need to get the corner positions for each scan
positions = scan.channels.data.position
min_x, min_y = positions.min.coordinates.to('arcsec').value
max_x, max_y = positions.max.coordinates.to('arcsec').value
corners = Coordinate2D(np.asarray([[min_x, min_x, max_x, max_x],
[min_y, max_y, max_y, min_y]]),
unit='arcsec')
for integration in scan.integrations:
frames = integration.frames
frame_gains = integration.gain * frames.get_source_gain(
signal_mode)
source_gains = integration.source_sync_gain
channel_data = integration.channels.data
source_signal, source_error = get_source_signal(
frame_data=frames.data,
frame_valid=frames.valid,
frame_gains=frame_gains,
frame_weights=frames.relative_weight,
channel_flags=channel_data.flag,
channel_variance=channel_data.variance,
map_values=source.map.data,
map_valid=source.map.valid,
map_indices=frames.map_index.coordinates,
sync_gains=source_gains)
total_data = frames.data.copy()
blank_flag = int(frames.flagspace.convert_flag(
'SAMPLE_SOURCE_BLANK').value)
# Re-normalize data and error
correct_for_gain(data=total_data,
frame_valid=frames.valid,
frame_gains=frame_gains,
sync_gains=source_gains,
channel_flags=channel_data.flag,
sample_flags=frames.sample_flag,
source_blank_flag=blank_flag)
equatorial = frames.get_equatorial(channel_data.position)
detector = frames.info.detector_array
xy = detector.equatorial_to_detector_coordinates(equatorial)
ra = equatorial.ra.to('hourangle').value
dec = equatorial.dec.to('degree').value
x = xy.x.to('arcsec').value
y = xy.y.to('arcsec').value
wave = channel_data.wavelength.to('um').value
valid = frames.valid & (frames.flag == 0)
flags = frames.sample_flag.copy()
flags[flags == blank_flag] = 0
corner_equatorial = frames.get_equatorial(corners)
corner_ra = corner_equatorial.ra.to('hourangle').value
corner_dec = corner_equatorial.dec.to('degree').value
corner_coordinates.append((corner_ra, corner_dec))
corner_xy = detector.equatorial_to_detector_coordinates(
corner_equatorial)
corner_xy_coordinates.append(
[corner_xy.x.to('arcsec').value,
corner_xy.y.to('arcsec').value])
channel_valid = channel_data.flag == 0
r = normalize_scan_coordinates(
ra, dec, x, y, wave, total_data, source_error, valid,
flags, channel_valid)
n_samples += r[0].size
scan_samples.append(r[0].size)
header_list.append(scan.configuration.fits.header.copy())
filenames.append(scan.info.origin.filename)
all_results.append(r)
coordinates = np.empty((3, n_samples), dtype=float)
xy_coordinates = np.empty((2, n_samples), dtype=float)
data = np.empty(n_samples, dtype=float)
error = np.empty(n_samples, dtype=float)
start = 0
i = 1
while all_results:
r = all_results.pop(0)
n = r[0].size
coordinates[:, start:start + n] = r[:3]
xy_coordinates[:, start:start + n] = r[3:5]
data[start:start + n] = r[5]
error[start:start + n] = r[6]
start += n
i += 1
combined_info['filenames'] = filenames
combined_info['headers'] = header_list
combined_info['coordinates'] = coordinates
combined_info['xy_coordinates'] = xy_coordinates
combined_info['flux'] = data
combined_info['error'] = error
combined_info['samples'] = np.asarray(scan_samples)
combined_info['corners'] = corner_coordinates
combined_info['xy_corners'] = corner_xy_coordinates
return combined_info