# Licensed under a 3-clause BSD style license - see LICENSE.rst
from astropy import log, units
import os
import numpy as np
from sofia_redux.scan.source_models.astro_model_2d import AstroModel2D
from sofia_redux.scan.source_models.astro_intensity_map import \
AstroIntensityMap
from sofia_redux.scan.coordinate_systems.index_2d import Index2D
from sofia_redux.scan.coordinate_systems.projector.astro_projector import \
AstroProjector
from sofia_redux.scan.source_models import source_numba_functions as snf
from sofia_redux.scan.utilities import numba_functions
from sofia_redux.scan.utilities.utils import round_values
from sofia_redux.toolkit.utilities import multiprocessing
__all__ = ['PixelMap']
[docs]
class PixelMap(AstroModel2D):
def __init__(self, info, reduction=None):
"""
Initialize a PixelMap source model.
The pixel map source model consists of a collection of
:class:`AstroIntensityMap` models, each containing a astronomical map
generated from a single pixel of the observing instrument. They are
primarily designed to measure pixel offsets from a known source.
Parameters
----------
info : sofia_redux.scan.info.info.Info
The Info object which should belong to this source model.
reduction : sofia_redux.scan.reduction.reduction.Reduction, optional
The reduction for which this source model should be applied.
"""
super().__init__(info, reduction=reduction)
self.pixel_maps = {}
self.template = AstroIntensityMap(info, reduction=reduction)
[docs]
def copy(self, with_contents=True):
"""
Return a copy of the pixel map.
Parameters
----------
with_contents : bool, optional
If `True`, include all pixel maps in the copy.
Returns
-------
PixelMap
"""
new = super().copy()
pixel_maps = {}
if with_contents:
for pixel, pixel_map in self.pixel_maps.items():
if isinstance(pixel_map, AstroIntensityMap):
pixel_maps[pixel] = pixel_map.copy()
new.pixel_maps = pixel_maps
return new
[docs]
def clear_all_memory(self):
"""
Clear all memory references prior to deletion.
Returns
-------
None
"""
super().clear_all_memory()
self.pixel_maps = {}
self.template = None
@property
def referenced_attributes(self):
"""
Return attributes that are referenced with a standard copy operation.
Returns
-------
set (str)
"""
attributes = super().referenced_attributes
attributes.add('pixel_maps')
return attributes
@property
def shape(self):
"""
Return the shape of the map.
Note that this is in numpy (y, x) order.
Returns
-------
tuple of int
"""
if self.template is None:
return ()
return self.template.shape
@shape.setter
def shape(self, new_shape):
"""
Set the shape of the map.
Parameters
----------
new_shape : tuple of int
The shape should be supplied in numpy format (y, x).
Returns
-------
None
"""
self.template.shape = new_shape
[docs]
def is_adding_to_master(self):
"""
Return whether this map is adding to a master map during accumulation.
Returns
-------
bool
"""
return True
[docs]
def clear_process_brief(self):
"""
Remove all process brief information.
Returns
-------
None
"""
super().clear_process_brief()
for pixel_map in self.pixel_maps.values():
if isinstance(pixel_map, AstroIntensityMap):
pixel_map.clear_process_brief()
[docs]
def create_from(self, scans, assign_scans=True):
"""
Initialize model from scans.
Sets the model scans to those provided, and the source model for each
scan as this. All integration gains are normalized to the first scan.
If the first scan is non-sidereal, the system will be forced to an
equatorial frame.
Parameters
----------
scans : list (Scan)
A list of scans from which to create the model.
assign_scans : bool, optional
If `True`, assign the scans to this source model. Otherwise,
there will be no hard link between the scans and source model.
Returns
-------
None
"""
log.info("Initializing pixel maps.")
# Clear all channel position data and set all channels as independent.
for scan in scans:
for integration in scan.integrations:
skip = integration.channels.flagspace.sourceless_flags()
pixels = integration.channels.get_mapping_pixels(
discard_flag=skip)
# This is a data group, so we need to overwrite references
position = pixels.position
position.zero()
pixels.position = position
independent = pixels.independent
independent[:] = True
pixels.independent = independent
self.template = AstroIntensityMap(self.info, reduction=self.reduction)
self.template.create_from(scans, assign_scans=False)
self.pixel_maps = {}
super().create_from(scans, assign_scans=assign_scans)
[docs]
def reset_processing(self):
"""
Reset the source processing.
Reset the source processing for the full source model and all
individual pixel maps.
Returns
-------
None
"""
super().reset_processing()
for pixel_map in self.pixel_maps.values():
if isinstance(pixel_map, AstroIntensityMap):
pixel_map.reset_processing()
[docs]
def clear_content(self):
"""
Clear the data in all pixel maps.
Returns
-------
None
"""
for pixel_map in self.pixel_maps.values():
if isinstance(pixel_map, AstroIntensityMap):
pixel_map.clear_content()
[docs]
def create_lookup(self, integration):
"""
Create the source indices for integration frames.
The source indices contain 1-D lookup values for the pixel indices
of a sample on the source model. The map indices are stored in the
integration frames as the 1-D `source_index` attribute, and the
2-D `map_index` attribute.
Unlike other types of source maps, the PixelMap does not take pixel
positions into account, so we are solely dealing with timestream
positions relative to a (0, 0) pixel position for all channels. This
means we only need to calculate positions for a single channel with
position (0, 0)
Parameters
----------
integration : Integration
The integration for which to create source indices.
Returns
-------
None
"""
log.debug("pixel map: lookup for single pixel at (0, 0)")
self.index_shift_x = numba_functions.log2ceil(self.size_y)
self.index_mask_y = (1 << self.index_shift_x) - 1
frames = integration.frames
if frames.source_index is None:
frames.source_index = np.full((frames.size, 1), -1)
else:
frames.source_index.fill(-1)
if frames.map_index is None:
frames.map_index = Index2D(
np.full((2, frames.size, 1), -1))
else:
frames.map_index.coordinates.fill(-1)
projector = AstroProjector(self.projection)
central_position = integration.channels.data.position[0:1].copy()
central_position.zero()
offsets = frames.project(central_position, projector)
map_indices = self.grid.offset_to_index(offsets)
map_indices.coordinates = round_values(map_indices.coordinates)
frames.map_index.coordinates = map_indices.coordinates
frames.source_index = self.pixel_index_to_source_index(
map_indices.coordinates)
bad_samples = snf.validate_pixel_indices(
indices=map_indices.coordinates,
x_size=self.size_x,
y_size=self.size_y,
valid_frame=frames.valid)
if bad_samples > 0:
log.warning(f"{bad_samples} frames have bad map indices")
[docs]
def add_model_data(self, other, weight=1.0):
"""
Add an increment source model data onto the current model.
Parameters
----------
other : PixelMap
The source model increment.
weight : float, optional
The weight of the source model increment.
Returns
-------
None
"""
if not isinstance(other, PixelMap):
raise ValueError(f"Cannot add {other} to {self}.")
for pixel, pixel_map in other.pixel_maps.items():
if not isinstance(pixel_map, AstroIntensityMap):
continue
this_map = self.pixel_maps.get(pixel)
if this_map is None:
self.pixel_maps[pixel] = pixel_map
elif not isinstance(this_map, AstroIntensityMap):
continue
else:
this_map.add_model_data(pixel_map, weight=weight)
[docs]
def add_points(self, frames, pixels, frame_gains, source_gains):
"""
Add points to the pixel maps.
This may be an extremely time consuming and memory intensive endeavour.
A map is created and stored for each mapping pixel in the detector
channels.
Parameters
----------
frames : Frames
The frames to add to the source model.
pixels : ChannelGroup
The channels (pixels) to add to the source model.
frame_gains : numpy.ndarray (float)
The gain values for all frames of shape (n_frames,).
source_gains : numpy.ndarray (float)
The channel source gains for all channels of shape (all_channels,).
Returns
-------
mapping_frames : int
The number of valid mapping frames added for the model.
"""
dt = pixels.info.instrument.sampling_interval
if isinstance(dt, units.Quantity):
dt = dt.to('second').value
else:
dt = float(dt)
n, frame_data, sample_gains, sample_weights, sample_indices = (
self.get_sample_points(frames, pixels, frame_gains, source_gains))
n_pixels = pixels.size
msg = f"Updating {n_pixels} pixel maps"
jobs = self.reduction.available_reduction_jobs
if jobs > 1: # pragma: no cover
msg += f" in parallel using {jobs} threads"
msg += ' (This may take a while).'
log.debug(msg)
args = (self.template, self.pixel_maps, pixels.fixed_index, frame_data,
sample_gains, sample_weights, dt, sample_indices)
kwargs = None
pixel_maps = multiprocessing.multitask(
self.parallel_safe_add_points, range(n_pixels), args, kwargs,
jobs=jobs, logger=log, force_threading=True)
for (fixed_index, pixel_map) in pixel_maps:
self.pixel_maps[fixed_index] = pixel_map
return n
[docs]
@classmethod
def parallel_safe_add_points(cls, args, pixel_number):
"""
Add points for a single pixel map.
This function is safe for :func:`multitask`.
Parameters
----------
args : 7-tuple
A tuple of arguments where:
args[0] - template (AstroIntensityMap)
args[1] - pixel_maps (dict)
args[2] - fixed_indices (array) (n_pixels,)
args[2] - frame_data (array) (n_frames, n_pixels)
args[3] - sample_gains (array) (n_frames, n_pixels)
args[4] - sample_weights (array) (n_frames, n_pixels)
args[5] - dt (float)
args[6] - sample_indices (array) (2, n_frames, n_pixels)
pixel_number : int
The pixel index for the pixel map.
Returns
-------
pixel_maps : 2-tuple
A tuple of the form (fixed_index, pixel_map).
"""
(template, pixel_maps, fixed_indices, frame_data, sample_gains,
sample_weights, dt, sample_indices) = args
i = pixel_number
fixed_index = fixed_indices[i]
pixel_map = pixel_maps.get(fixed_index)
if pixel_map is None:
pixel_map = template.copy(with_contents=False)
pixel_map.id = f'{fixed_index}'
pixel_map.set_data_shape(template.shape)
pixel_map.map.accumulate_at(
image=frame_data[:, i:i + 1],
gains=sample_gains[:, i:i + 1],
weights=sample_weights[:, i:i + 1],
times=dt,
indices=sample_indices[..., i:i + 1])
return fixed_index, pixel_map
[docs]
def calculate_coupling(self, integration, pixels, source_gains,
sync_gains):
"""Not implemented for pixel maps"""
pass
[docs]
def count_points(self):
"""
Return the number of points in the model.
Returns
-------
int
"""
points = 0
for pixel_map in self.pixel_maps.values():
if isinstance(pixel_map, AstroIntensityMap):
points += pixel_map.count_points()
return points
[docs]
def covariant_points(self):
"""
Return the number of points in the smoothing beam of all maps.
Returns
-------
float
"""
for pixel_map in self.pixel_maps.values():
if isinstance(pixel_map, AstroIntensityMap):
return pixel_map.covariant_points()
return 1.0
[docs]
def set_data_shape(self, shape):
"""
Set the shape of the map.
Parameters
----------
shape : tuple (int)
Returns
-------
None
"""
self.template.set_data_shape(shape)
for pixel_map in self.pixel_maps.values():
if isinstance(pixel_map, AstroIntensityMap):
pixel_map.set_data_shape(shape)
[docs]
def sync_source_gains(self, frames, pixels, frame_gains, source_gains,
sync_gains):
"""
Remove the map source from frame data for all pixel maps.
In addition to source removal, samples are also flagged if the map
is masked at that location.
For a given sample at frame i and channel j, frame data d_{i,j} will be
decremented by dg where:
dg = fg * ( (gain(source) * map[index]) - (gain(sync) * base[index]) )
Here, fg is the frame gain and index is the index on the map of sample
(i,j).
Any masked map value will result in matching samples being flagged.
Parameters
----------
frames : Frames
The frames for which to remove the source gains.
pixels : Channels or ChannelGroup
The channels for which to remove the source gains.
frame_gains : numpy.ndarray (float)
An array of frame gains of shape (n_frames,).
source_gains : numpy.ndarray (float)
An array of channel source gains for all channels of shape
(all_channels,).
sync_gains : numpy.ndarray (float)
an array of channel sync gains for all channels of shape
(all_channels,). The sync gains should contain the prior source
gain.
Returns
-------
None
"""
n_pixels = pixels.size
msg = f"Syncing gains in {n_pixels} pixel maps"
jobs = self.reduction.available_reduction_jobs
if jobs > 1: # pragma: no cover
msg += f" in parallel using {jobs} threads" # pragma: no cover
msg += ' (This may take a while).'
log.debug(msg)
args = (self.pixel_maps, frames, pixels, frame_gains, source_gains,
sync_gains)
kwargs = None
frame_data = multiprocessing.multitask(
self.parallel_safe_sync_source_gains, range(n_pixels),
args, kwargs, jobs=jobs, logger=log, force_threading=True)
for channel, data in enumerate(frame_data):
frames.data[:, channel] = data
[docs]
@classmethod
def parallel_safe_sync_source_gains(cls, args, pixel_number):
"""
Remove the source from all pixel maps.
This function is safe for :func:`multitask`.
Parameters
----------
args : 6-tuple
A tuple of arguments where:
args[0] - pixel_maps (dict)
args[1] - frames (Frames)
args[2] - pixels (ChannelGroup)
args[3] - frame_gains (array) (n_frames,)
args[4] - source_gains (array) (all_channels,)
args[5] - sync_gains (array) (all_channels,)
pixel_number : int
The pixel index for the pixel map.
Returns
-------
channel_frame_data : numpy.ndarray (float)
The synced frame data for `pixel_number` of shape (n_frames,).
"""
(pixel_maps, frames, pixels, frame_gains,
source_gains, sync_gains) = args
fixed_index = pixels.fixed_index[pixel_number]
pixel_map = pixel_maps[fixed_index]
channel_indices = np.full(1, pixel_number)
snf.sync_map_samples(
frame_data=frames.data,
frame_valid=frames.valid,
frame_gains=frame_gains,
channel_indices=channel_indices,
map_values=pixel_map.map.data,
map_valid=pixel_map.map.valid,
map_masked=pixel_map.is_masked(),
map_indices=frames.map_index.coordinates,
base_values=pixel_map.base.data,
base_valid=pixel_map.base.valid,
source_gains=source_gains,
sync_gains=sync_gains,
sample_flags=frames.sample_flag,
sample_blank_flag=frames.flagspace.convert_flag(
'SAMPLE_SOURCE_BLANK').value)
return frames.data[:, pixel_number]
[docs]
def set_base(self):
"""
Set the base to the map (copy of).
Returns
-------
None
"""
for pixel_map in self.pixel_maps.values():
if isinstance(pixel_map, AstroIntensityMap):
pixel_map.set_base()
[docs]
def process_scan(self, scan):
"""
Process a scan.
Parameters
----------
scan : Scan
Returns
-------
None
"""
for pixel_map in self.pixel_maps.values():
if isinstance(pixel_map, AstroIntensityMap):
pixel_map.process_scan(scan)
[docs]
def write(self, path):
"""
Write the source to file.
Parameters
----------
path : str
The directory in which to write the output products.
Returns
-------
None
"""
fits_properties = self.template.map.fits_properties
if self.has_option('pixelmap.writemaps'):
try:
pixels = self.configuration.get_int_list('pixelmap.writemaps')
except ValueError: # pragma: no cover
pixels = None
if pixels is None:
if self.configuration.get_bool('pixelmap.writemaps'):
pixels = list(self.pixel_maps.keys())
if pixels is not None:
for pixel in pixels:
pixel_map = self.pixel_maps.get(pixel)
if isinstance(pixel_map, AstroIntensityMap):
pixel_map.map.fits_properties = fits_properties
pixel_map.write(path)
self.calculate_pixel_data(smooth=False)
self.write_pixel_data(path=path)
[docs]
def write_fits(self, filename):
"""
Write the results to a FITS file.
Not implemented for the single PixelMap.
Parameters
----------
filename : str
Returns
-------
None
"""
return
[docs]
def process(self):
"""
Process the source model.
Processes each pixel map separately. If 'pixelmap.process' is True
in the configuration, each pixel map will be fully processed.
Otherwise, the results are only accumulated.
Returns
-------
None
"""
process = self.configuration.get_bool('pixelmap.process')
for pixel_map in self.pixel_maps.values():
if isinstance(pixel_map, AstroIntensityMap):
if process:
pixel_map.process()
else:
pixel_map.map.end_accumulation()
self.next_generation()
[docs]
def calculate_pixel_data(self, smooth=False):
"""
Calculate pixel position offsets and couplings.
Pixel positions and couplings will be stored in the channel data of the
first scan in the source model. Fits to the peak are calculated using
:func:`AstroIntensityMap.get_peak_source` to determine the x and y
position of the peak as well as the amplitude. The couplings are
determined as::
coupling = peak.amplitude / median(peak.amplitude)
taking the median over all available instrument pixels.
Parameters
----------
smooth : bool, optional
If `True`, smooth the map to the instrument peak resolution before
fitting.
Returns
-------
None
"""
mapping_pixels = self.get_first_scan().channels.get_mapping_pixels(
keep_flag=0)
n_pixels = len(self.pixel_maps)
msg = f"Fitting {n_pixels} pixel positions to source map"
jobs = self.reduction.available_reduction_jobs
if jobs > 1: # pragma: no cover
msg += f" in parallel using {jobs} threads"
msg += '.'
log.debug(msg)
point_size = self.info.instrument.get_point_size()
degree = self.configuration.get_int('pointing.degree', default=3)
reduce_degrees = self.configuration.get_bool('pointing.reduce_degrees')
args = self.pixel_maps, smooth, point_size, degree, reduce_degrees
kwargs = None
peaks = multiprocessing.multitask(
self.parallel_safe_calculate_pixel_data,
mapping_pixels.fixed_index, args, kwargs,
jobs=jobs, logger=log, force_threading=True)
peak_values = {}
pixel_positions = mapping_pixels.position
for pixel_index, fixed_index in enumerate(mapping_pixels.fixed_index):
peak = peaks[pixel_index]
if peak is None:
continue
position = self.projection.project(peak.coordinates)
position.invert()
pixel_positions[pixel_index] = position
peak_values[pixel_index] = peak.peak
if len(peak_values) == 0:
return
mapping_pixels.position = pixel_positions
mean_peak = np.median(list(peak_values.values()))
couplings = mapping_pixels.coupling
for pixel_index, peak in peak_values.items():
couplings[pixel_index] *= peak / mean_peak
mapping_pixels.coupling = couplings
[docs]
@classmethod
def parallel_safe_calculate_pixel_data(cls, args, pixel_number):
"""
Fit a peak to a single pixel.
This function is safe for :func:`multitask`. The peak for any single
pixel is determined using :func:`AstroIntensityMap.get_peak_source`.
Parameters
----------
args : 7-tuple
A tuple of arguments where:
args[0] - pixel_maps (dict)
args[1] - smooth (bool)
args[2] - point_size (units.Quantity)
args[3] - degree (int)
args[4] - reduce_degrees (bool)
pixel_number : int
The pixel fixed index for the pixel map.
Returns
-------
gaussian_model : EllipticalSource or None
"""
pixel_maps, smooth, point_size, degree, reduce_degrees = args
beam_map = pixel_maps.get(pixel_number)
if beam_map is None or not beam_map.is_valid():
return None
obs_map = beam_map.map
if smooth:
obs_map.smooth_to(point_size)
source = beam_map.get_peak_source(
degree=degree, reduce_degrees=reduce_degrees)
return source
[docs]
def write_pixel_data(self, path=None):
"""
Write the pixel data to file.
Parameters
----------
path : str, optional
The directory in which to write the pixel data. The default is
the reduction work path.
Returns
-------
None
"""
if path is None:
path = self.reduction.work_path
channels = self.get_first_scan().channels
source_gain = channels.get_source_gains(filter_corrected=False)
filename = os.path.join(path, f'{self.get_default_core_name()}.rcp')
channels.data.coupling = source_gain / channels.data.gain
hdr = self.get_first_scan().get_first_integration().get_ascii_header()
file_contents = channels.print_pixel_rcp(header=hdr)
with open(filename, 'w') as f:
f.write(file_contents)
log.info(f"Written RCP contents to {filename}.")
[docs]
def set_parallel(self, threads):
"""
Set the number of parallel operations for the pixel map.
Not applicable for the SkyDip model.
Parameters
----------
threads : int
Returns
-------
None
"""
for pixel_map in self.pixel_maps.values():
if pixel_map is not None:
pixel_map.set_parallel(threads)
[docs]
def no_parallel(self):
"""
Disable parallel processing for each pixel map.
Returns
-------
None
"""
for pixel_map in self.pixel_maps.values():
if pixel_map is not None:
pixel_map.no_parallel()
[docs]
def get_parallel(self):
"""
Return the number of parallel threads for the pixel maps.
Returns
-------
threads : int
"""
for pixel_map in self.pixel_maps.values():
if pixel_map is not None:
return pixel_map.get_parallel()
return 1
[docs]
def merge_accumulate(self, other):
"""
Merge another pixel map with this one.
Parameters
----------
other : PixelMap
Returns
-------
None
"""
if not isinstance(other, PixelMap):
raise ValueError(f"Cannot add {other} to {self}.")
args = self.pixel_maps, other.pixel_maps
kwargs = None
jobs = self.reduction.available_reduction_jobs
fixed_indices = list(self.pixel_maps.keys())
updated = multiprocessing.multitask(
self.parallel_safe_merge_accumulate,
fixed_indices, args, kwargs,
jobs=jobs, logger=log, force_threading=True)
for fixed_index, pixel_map in updated:
self.pixel_maps[fixed_index] = pixel_map
[docs]
@classmethod
def parallel_safe_merge_accumulate(cls, args, pixel_index):
"""
Merge a single pixel map onto another.
This function is safe for :func:`multitask`.
Parameters
----------
args : 2-tuple
A tuple of arguments where:
args[0] - pixel_maps (dict)
args[1] - other_maps (dict)
pixel_index : int
The pixel fixed index for the pixel map to merge
Returns
-------
fixed_index, pixel_map : int, AstroIntensityMap
The fixed pixel index and merged pixel map.
"""
pixel_maps, other_maps = args
pixel_map = pixel_maps.get(pixel_index)
other_map = other_maps.get(pixel_index)
if not isinstance(pixel_map, AstroIntensityMap) or not isinstance(
other_map, AstroIntensityMap):
return pixel_index, pixel_map
pixel_map.merge_accumulate(other_map)
return pixel_index, pixel_map
[docs]
def get_source_name(self):
"""
Return the source name for the pixel map.
Returns
-------
name : str
"""
return self.template.get_source_name()
[docs]
def get_unit(self):
"""
Return the map data unit.
Returns
-------
units.Quantity or units.Unit
If a Quantity is returned, the value represents a scaling factor.
"""
return self.template.get_unit()
[docs]
def is_empty(self):
"""
Return whether this pixel map has no data.
Returns
-------
bool
"""
if len(self.pixel_maps) == 0:
return True
for pixel_map in self.pixel_maps.values():
if isinstance(pixel_map, AstroIntensityMap):
return False
else:
return True
[docs]
def process_final(self):
"""
Perform the final processing steps for each pixel map.
Returns
-------
None
"""
updated = multiprocessing.multitask(
self.parallel_safe_process_final, list(self.pixel_maps.keys()),
(self.pixel_maps,), None,
jobs=self.reduction.available_reduction_jobs,
logger=log,
force_threading=True)
for fixed_index, pixel_map in updated:
self.pixel_maps[fixed_index] = pixel_map
[docs]
@classmethod
def parallel_safe_process_final(cls, args, pixel_index):
"""
Perform the final processing steps for a single pixel map.
Parameters
----------
args : 1-tuple
A tuple of arguments where:
args[0] - pixel_maps (dict)
pixel_index : int
The pixel fixed index for the pixel map to merge
Returns
-------
pixel_index, pixel_map : int, AstroIntensityMap
The pixel index and updated pixel map.
"""
pixel_maps = args[0]
pixel_map = pixel_maps.get(pixel_index)
if not isinstance(pixel_map, AstroIntensityMap):
return pixel_index, pixel_map
pixel_map.process_final()
return pixel_index, pixel_map
[docs]
def get_map_2d(self):
"""
Return the 2D map.
This is not valid for a pixel map.
Returns
-------
None
"""
return None