# Licensed under a 3-clause BSD style license - see LICENSE.rst
from astropy import log, units
import numpy as np
from sofia_redux.scan.source_models.astro_data_2d import AstroData2D
from sofia_redux.scan.source_models.maps.image_2d import Image2D
from sofia_redux.scan.source_models.maps.observation_2d import Observation2D
from sofia_redux.scan.utilities.range import Range
from sofia_redux.scan.source_models.beams.elliptical_source import (
EllipticalSource)
from sofia_redux.scan.source_models import source_numba_functions as snf
from sofia_redux.scan.coordinate_systems.index_2d import Index2D
from sofia_redux.scan.coordinate_systems.coordinate_2d import Coordinate2D
from sofia_redux.scan.coordinate_systems.projector.astro_projector import \
AstroProjector
from sofia_redux.scan.coordinate_systems.grid.spherical_grid import \
SphericalGrid
__all__ = ['AstroIntensityMap']
[docs]
class AstroIntensityMap(AstroData2D):
default_map_class = Observation2D
default_image_class = Image2D
def __init__(self, info, reduction=None):
"""
Initialize an astronomical intensity map.
The astronomical intensity map represents the source as an
:class:`Observation2D` map containing data, noise, and exposure values.
It also contains a base image containing the results of the previous
reduction iteration in order to calculate gain and coupling increments.
Parameters
----------
info : sofia_redux.scan.info.info.Info
The Info object which should belong to this source model.
reduction : sofia_redux.scan.reduction.reduction.Reduction, optional
The reduction for which this source model should be applied.
"""
self.map = self.default_map_class()
self.base = None # referenced, not copied
super().__init__(info, reduction=reduction)
self.create_map()
[docs]
def copy(self, with_contents=True):
"""
Return a copy of the source model.
Parameters
----------
with_contents : bool, optional
If `True`, return a true copy of the map. Otherwise, just return
a map with basic metadata.
Returns
-------
AstroIntensityMap
"""
new = super().copy(with_contents=with_contents)
if self.grid is not None and isinstance(new, AstroIntensityMap):
new.map.set_grid(self.grid)
return new
[docs]
def clear_all_memory(self):
"""
Clear all memory references prior to deletion.
Returns
-------
None
"""
super().clear_all_memory()
self.map = None
self.base = None
@property
def referenced_attributes(self):
"""
Return attributes that should be referenced during a copy.
Returns
-------
set (str)
"""
attributes = super().referenced_attributes
attributes.add('base')
return attributes
@property
def flagspace(self):
"""
Return the flagspace associated with the underlying map observation.
Returns
-------
sofia_redux.scan.flags.flags.Flag
"""
if self.map is None:
return super().flagspace
return self.map.flagspace
@property
def shape(self):
"""
Return the shape of the map.
Note that this is in numpy (y, x) order.
Returns
-------
tuple of int
"""
if self.map is None:
return ()
return self.map.shape
@shape.setter
def shape(self, new_shape):
"""
Set the shape of the map.
Parameters
----------
new_shape : tuple of int
The shape should be supplied in numpy format (y, x).
Returns
-------
None
"""
if self.map is None:
return
self.map.shape = new_shape
[docs]
def set_info(self, info):
"""
Set the Info object for the source model.
This sets the provided `info` as the primary Info object containing
the configuration and reduction information for the source model.
The source model will also take ownership of the `info` and set
various parameters from the contents.
Parameters
----------
info : sofia_redux.info.info.Info
Returns
-------
None
"""
super().set_info(info)
if self.map is not None and self.info is not None:
self.map.fits_properties.set_instrument_name(
self.info.instrument.name)
self.map.fits_properties.set_telescope_name(
self.info.telescope.telescope)
[docs]
def get_jansky_unit(self):
"""
Return the Jansky's per unit area.
Returns
-------
janskys : units.Quantity
"""
beam = self.map.underlying_beam
if beam is None:
beam_area = 0 * units.Unit('radian2')
else:
beam_area = beam.area
beam_factor = beam_area.to('radian2').value * units.Unit('beam')
jy_per_beam = self.info.instrument.jansky_per_beam()
jy = (beam_factor * jy_per_beam).to('Jy')
# - Old method (wrong units)
# jy = units.Unit('Jy')
# return beam_area * self.info.instrument.jansky_per_beam() * jy
return jy
[docs]
def add_model_data(self, source_model, weight=1.0):
"""
Add an increment source model data onto the current model.
Parameters
----------
source_model : AstroIntensityMap
The source model increment.
weight : float, optional
The weight of the source model increment.
Returns
-------
None
"""
self.map.accumulate(source_model.map, weight=weight)
[docs]
def merge_accumulate(self, other):
"""
Merge another source with this one.
Parameters
----------
other : AstroIntensityMap
Returns
-------
None
"""
self.map.merge_accumulate(other.map)
[docs]
def stand_alone(self):
"""
Create a stand alone base image.
Returns
-------
None
"""
self.base = self.default_image_class(
x_size=self.size_x, y_size=self.size_y, dtype=float)
[docs]
def create_map(self):
"""
Create the source model map.
Returns
-------
None
"""
self.map = self.default_map_class()
self.map.set_grid(self.grid)
self.map.set_validating_flags(~self.mask_flag)
self.map.add_local_unit(self.get_native_unit())
self.map.set_display_grid_unit(self.info.instrument.get_size_unit())
self.map.fits_properties.set_instrument_name(
self.info.instrument.name)
self.map.fits_properties.set_copyright(
self.map.fits_properties.default_copyright)
if self.reduction is not None:
self.map.set_parallel(self.reduction.max_jobs)
self.map.fits_properties.set_creator_name(
self.reduction.__class__.__name__)
[docs]
def create_from(self, scans, assign_scans=True):
"""
Initialize model from scans.
Sets the model scans to those provided, and the source model for each
scan as this. All integration gains are normalized to the first scan.
If the first scan is non-sidereal, the system will be forced to an
equatorial frame.
Parameters
----------
scans : list (Scan)
A list of scans from which to create the model.
assign_scans : bool, optional
If `True`, assign the scans to this source model. Otherwise,
there will be no hard link between the scans and source model.
Returns
-------
None
"""
self.create_map()
super().create_from(scans, assign_scans=assign_scans)
self.map.fits_properties.set_object_name(
self.get_first_scan().source_name)
self.map.set_underlying_beam(self.get_average_resolution())
self.map.add_local_unit(self.get_kelvin_unit())
self.map.add_local_unit(self.get_jansky_unit())
log.info("\n".join(self.map.get_info()))
self.base = self.default_image_class(
data=np.zeros(self.map.shape, dtype=float))
[docs]
def post_process_scan(self, scan):
"""
Perform post-processing steps on a scan.
At this stage, the map should have already been added to the main
reduction map and we are left with a throw-away map that can be used
to update settings in other objects. The intensity map may be used to
update the pointing for a given scan.
Parameters
----------
scan : Scan
Returns
-------
None
"""
super().post_process_scan(scan)
if self.is_empty():
return
if self.configuration.get_bool('pointing.suggest'):
optimal = self.configuration.get_float('smooth.optimal')
if np.isnan(optimal):
optimal = scan.info.instrument.get_point_size()
else:
optimal = optimal * self.info.instrument.get_size_unit()
self.map.smooth_to(optimal)
if self.has_option('pointing.exposureclip'):
exposure = self.map.get_exposures()
limit = self.configuration.get_float(
'pointing.exposureclip') * exposure.select(0.9)
valid_range = Range(min_val=limit)
exposure.restrict_range(valid_range)
# Robust weight before restricting to potentially tiny search area
self.map.reweight(robust=True)
if self.has_option('pointing.radius'):
# In case pointing.radius is None, use default of infinity
radius = self.configuration.get_float(
'pointing.radius', default=np.inf) * units.Unit('arcsec')
if np.isfinite(radius):
iy, ix = np.indices(self.map.shape)
map_indices = Coordinate2D([ix, iy])
distance = self.map.grid.index_to_offset(map_indices).length
self.map.discard(distance > radius)
spline_degree = self.configuration.get_int('pointing.degree',
default=3)
reduce_degrees = self.configuration.get_bool('pointing.reduce_degrees')
# Sets a GaussianSource or None
scan.pointing = self.get_peak_source(degree=spline_degree,
reduce_degrees=reduce_degrees)
[docs]
def get_peak_index(self):
"""
Return the peak index.
Returns
-------
index : Index2D
The peak (x, y) coordinate.
"""
sign = self.configuration.get_sign('source.sign')
s2n = self.get_significance()
if sign > 0:
y, x = np.unravel_index(np.nanargmax(s2n.data), s2n.shape)
elif sign < 0:
y, x = np.unravel_index(np.nanargmin(s2n.data), s2n.shape)
else:
y, x = np.unravel_index(np.nanargmax(np.abs(s2n.data)), s2n.shape)
return Index2D([x, y])
[docs]
def get_peak_coords(self):
"""
Return the coordinates of the peak value.
Returns
-------
peak_coordinates : Coordinate2D
The (x, y) peak coordinate.
"""
projector = AstroProjector(self.projection)
self.grid.get_offset(self.get_peak_index(), offset=projector.offset)
projector.deproject()
return projector.coordinates
[docs]
def get_peak_source(self, degree=3, reduce_degrees=False):
"""
Return the peak source model.
Parameters
----------
degree : int, optional
The spline degree used to fit the peak map value.
reduce_degrees : bool, optional
If `True`, allow the spline fit to reduce the number of degrees
in cases where there are not enough points available to perform
the spline fit of `degree`. If `False`, a ValueError will be
raised if such a fit fails.
Returns
-------
GaussianSource
"""
self.map.level(robust=True)
beam = self.map.get_image_beam()
peak_source = EllipticalSource(gaussian_model=beam)
peak_source.set_positioning_method(
self.configuration.get_string(
'pointing.method', default='centroid'))
peak_source.position = self.get_peak_coords()
if self.configuration.get_bool('pointing.lsq'):
log.debug("Fitting peak source using LSQ method.")
try:
peak_source.fit_map_least_squares(
self.map, degree=degree, reduce_degrees=reduce_degrees)
except Exception as err:
log.warning(f"Could not fit using LSQ method: {err}")
log.warning("Attempting standard fitting...")
try:
peak_source.fit_map(self.map, degree=degree,
reduce_degrees=reduce_degrees)
except Exception as err:
log.warning(f"Could not fit peak: {err}")
return None
else:
try:
peak_source.fit_map(self.map, degree=degree,
reduce_degrees=reduce_degrees)
except Exception as err:
log.warning(f"Could not fit peak: {err}")
return None
peak_source.deconvolve_with(self.map.smoothing_beam)
critical_s2n = self.configuration.get_float(
'pointing.significance', default=5.0)
if peak_source.peak_significance < critical_s2n:
return None
else:
return peak_source
[docs]
def update_mask(self, blanking_level=None, min_neighbors=2):
"""
Update the map mask based on significance levels and valid neighbors.
If a blanking level is supplied, significance values above or equal to
the blanking level will be masked.
Parameters
----------
blanking_level : float, optional
The significance level used to mark the map. If not supplied,
significance is irrelevant. See above for more details.
min_neighbors : int, optional
The minimum number of neighbors including the pixel itself.
Therefore, the default of 2 excludes single pixels as this would
require a single valid pixel and one valid neighbor.
Returns
-------
None
"""
if blanking_level is None or np.isnan(blanking_level):
blanking_range = np.array([-np.inf, np.inf])
check_blanking = False
else:
blanking_range = np.array([-blanking_level, blanking_level],
dtype=float)
sign = self.configuration.get_sign(self.source_option('sign'))
check_blanking = True
if sign < 0:
blanking_range[1] = np.inf
elif sign > 0:
blanking_range[0] = -np.inf
neighbors = self.map.get_neighbors()
mask = neighbors >= min_neighbors # neighbors includes center pix
mask &= self.map.valid
if check_blanking:
significance = self.map.significance_values()
blank = significance < blanking_range[0]
blank |= significance >= blanking_range[1]
mask &= blank
self.map.set_flags(self.mask_flag, mask)
self.map.unflag(self.mask_flag, ~mask)
[docs]
def merge_mask(self, other_map):
"""
Merge the mask from another map onto this one.
Parameters
----------
other_map : FlaggedArray
Returns
-------
None
"""
other_mask_flag = other_map.flagspace.convert_flag(self.FLAG_MASK)
self.map.set_flags(
self.mask_flag, indices=other_map.is_flagged(other_mask_flag))
[docs]
def is_masked(self):
"""
Return the map mask.
Returns
-------
numpy.ndarray (bool)
"""
return self.map.is_flagged(self.mask_flag)
[docs]
def add_points(self, frames, pixels, frame_gains, source_gains):
"""
Add points to the source model.
Accumulates the combined frame and channel data to the source map
for each frame/channel sample. If a given sample maps to a single map
pixel, that pixel is incremented by::
i = frame_data * weights * gains
w = weights * gains^2
weights = frame_weight / channel_variance
gains = frame_gain * channel_gain
where i is the weighted data increment, and w is the weight increment.
The exposure values are also added to by simply incrementing the time
at any pixel by the sampling interval multiplied by the number of
samples falling within that pixel.
Parameters
----------
frames : Frames
The frames to add to the source model.
pixels : ChannelGroup
The channels (pixels) to add to the source model.
frame_gains : numpy.ndarray (float)
The gain values for all frames of shape (n_frames,).
source_gains : numpy.ndarray (float)
The channel source gains for all channels of shape (all_channels,).
Returns
-------
mapping_frames : int
The number of valid mapping frames added for the model.
"""
dt = pixels.info.instrument.sampling_interval
if isinstance(dt, units.Quantity):
dt = dt.to('second').value
else:
dt = float(dt)
n, frame_data, sample_gains, sample_weights, sample_indices = (
self.get_sample_points(frames, pixels, frame_gains, source_gains))
self.map.accumulate_at(image=frame_data,
gains=sample_gains,
weights=sample_weights,
times=dt,
indices=sample_indices)
return n
[docs]
def mask_samples(self, flag='SAMPLE_SKIP'):
"""
Propagate masked source samples to integration sample flags.
Flags the integration samples using the given flag if they correspond
to a masked source sample.
Parameters
----------
flag : str or int or Enum, optional
The name, integer identifier, or actual flag by which to mask
samples. By default, any integration samples that correspond to
masked source samples will be flagged as 'SAMPLE_SKIP'.
Returns
-------
None
"""
if self.scans is None:
return
for scan in self.scans:
if scan.integrations is None:
continue
for integration in scan.integrations:
self.mask_integration_samples(integration, flag)
[docs]
def mask_integration_samples(self, integration, flag='SAMPLE_SKIP'):
"""
Set the sample flag in an integration for masked map elements.
Parameters
----------
integration : Integration
flag : str or int or Enum, optional
The name, integer identifier, or actual flag by which to flag
samples.
Returns
-------
None
"""
masked = self.is_masked()
if not masked.any():
return
pixels = integration.channels.get_mapping_pixels()
flag_value = integration.frames.flagspace.convert_flag(flag).value
if integration.frames.map_index is None or np.allclose(
integration.frames.map_index.x, -1):
self.create_lookup(integration)
xi, yi = integration.frames.map_index.coordinates[..., pixels.indices]
masked_samples = masked[yi, xi]
if not masked_samples.any():
return
sample_flags = integration.frames.sample_flag[..., pixels.indices]
sample_flags[masked_samples] |= flag_value
integration.frames.sample_flag[..., pixels.indices] = sample_flags
[docs]
def add_frames_from_integration(self, integration, pixels, source_gains,
signal_mode=None):
"""
Add frames from an integration to the source model.
Parameters
----------
integration : Integration
The integration to add.
pixels : ChannelGroup
The channels (pixels) to add to the source model.
source_gains : numpy.ndarray (float)
The source gains for the all channels (pixels). Should be of
shape (all_channels,).
signal_mode : FrameFlagTypes, optional
The signal mode flag, indicating which signal should be used to
extract the frame source gains. Typically, TOTAL_POWER.
Returns
-------
mapping_frames : int
The number of frames that contributed towards mapping.
"""
good_frames = super().add_frames_from_integration(
integration=integration, pixels=pixels, source_gains=source_gains,
signal_mode=signal_mode)
self.add_integration_time(
good_frames * integration.info.instrument.sampling_interval)
return good_frames
[docs]
def sync_source_gains(self, frames, pixels, frame_gains, source_gains,
sync_gains):
"""
Remove the map source from frame data.
In addition to source removal, samples are also flagged if the map
is masked at that location.
For a given sample at frame i and channel j, frame data d_{i,j} will be
decremented by dg where:
dg = fg * ( (gain(source) * map[index]) - (gain(sync) * base[index]) )
Here, fg is the frame gain and index is the index on the map of sample
(i,j).
Any masked map value will result in matching samples being flagged.
Parameters
----------
frames : Frames
The frames for which to remove the source gains.
pixels : Channels or ChannelGroup
The channels for which to remove the source gains.
frame_gains : numpy.ndarray (float)
An array of frame gains of shape (n_frames,).
source_gains : numpy.ndarray (float)
An array of channel source gains for all channels of shape
(all_channels,).
sync_gains : numpy.ndarray (float)
an array of channel sync gains for all channels of shape
(all_channels,). The sync gains should contain the prior source
gain.
Returns
-------
None
"""
snf.sync_map_samples(
frame_data=frames.data,
frame_valid=frames.valid,
frame_gains=frame_gains,
channel_indices=pixels.indices,
map_values=self.map.data,
map_valid=self.map.valid,
map_masked=self.is_masked(),
map_indices=frames.map_index.coordinates,
base_values=self.base.data,
base_valid=self.base.valid,
source_gains=source_gains,
sync_gains=sync_gains,
sample_flags=frames.sample_flag,
sample_blank_flag=frames.flagspace.convert_flag(
'SAMPLE_SOURCE_BLANK').value)
[docs]
def calculate_coupling(self, integration, pixels, source_gains,
sync_gains):
"""
Calculate the channel source coupling values.
Parameters
----------
integration : Integration
pixels : Channels or ChannelGroup
The pixels for which to calculate coupling.
source_gains : numpy.ndarray (float)
The source gains for all frames in the integration of shape
(n_frames,).
sync_gains : numpy.ndarray (float)
The sync gains for all channels in the integration of shape
(all_channels,).
Returns
-------
None
"""
if self.has_option('source.coupling.s2n'):
s2n_range = self.configuration.get_range('source.coupling.s2n')
else:
s2n_range = Range(min_val=5.0)
if integration.frames.map_index is None or np.allclose(
integration.frames.map_index.x, -1):
self.create_lookup(integration)
frame_gains = integration.gain * integration.frames.get_source_gain(
self.signal_mode)
coupling_increment = snf.calculate_coupling_increment(
map_indices=integration.frames.map_index.coordinates,
base_values=self.base.data,
map_values=self.map.data,
map_noise=self.map.get_noise().data,
sync_gains=sync_gains, # Frame space
source_gains=source_gains, # Channel space
frame_data=integration.frames.data,
frame_weight=integration.frames.relative_weight,
frame_gains=frame_gains,
frame_valid=integration.frames.valid,
sample_flags=integration.frames.sample_flag,
channel_indices=pixels.indices,
min_s2n=s2n_range.min,
max_s2n=s2n_range.max,
exclude_flag=self.exclude_samples.value)
coupling = integration.channels.data.coupling
new_coupling = coupling + (coupling * coupling_increment)
integration.channels.data.coupling = new_coupling
# Normalize the couplings to 1.0
integration.channels.modalities.get(
'coupling').modes[0].normalize_gains()
# If the coupling falls out of range, revert to the default of 1.0
if self.has_option('source.coupling.range'):
coupling_range = self.configuration.get_range(
'source.coupling.range')
snf.flag_out_of_range_coupling(
channel_indices=pixels.indices,
coupling_values=pixels.data.coupling,
min_coupling=coupling_range.min,
max_coupling=coupling_range.max,
flags=pixels.data.flag,
blind_flag=pixels.flagspace.convert_flag('BLIND').value)
[docs]
def process_final(self):
"""
Perform the final processing steps.
The additional steps performed for the AstroIntensityMap are
map leveling (if not extended or deep) and map re-weighting.
The map may also be resampled if re-griding is enabled.
Returns
-------
None
"""
super().process_final()
if not (self.configuration.get_bool('extended')
| self.configuration.get_bool('deep')):
if self.enable_level:
self.map.level(robust=True)
if self.enable_weighting:
self.map.reweight(robust=True)
if self.has_option('regrid'):
size_unit = self.info.instrument.get_size_unit()
resolution = self.configuration.get_float('regrid') * size_unit
self.map.resample(resolution)
[docs]
def get_table_entry(self, name):
"""
Return a parameter value for a given name.
Parameters
----------
name : str, optional
Returns
-------
value
"""
if name == 'system':
if not isinstance(self.grid, SphericalGrid):
return None
if self.grid.reference is None:
return None
return self.grid.reference.two_letter_code
if name.startswith('map.'):
return self.map.get_table_entry(name[4:])
return super().get_table_entry(name)
[docs]
def set_data_shape(self, shape):
"""
Set the shape of the map.
Parameters
----------
shape : tuple (int)
Returns
-------
None
"""
self.map.set_data_shape(shape)
[docs]
def set_base(self):
"""
Set the base to the map (copy of).
Returns
-------
None
"""
if self.base is None:
self.stand_alone()
self.base.paste(self.map, report=False)
[docs]
def reset_processing(self):
"""
Reset the source processing.
Returns
-------
None
"""
super().reset_processing()
self.map.reset_processing()
[docs]
def covariant_points(self):
"""
Return the number of points in the smoothing beam of the map.
Returns
-------
float
"""
return self.map.get_points_per_smoothing_beam()
[docs]
def get_map_2d(self):
"""
Return the 2D map.
Returns
-------
Map2D
"""
return self.map
[docs]
def get_source_name(self):
"""
Return the source name.
Returns
-------
str
"""
return self.map.fits_properties.object_name
[docs]
def get_unit(self):
"""
Return the map data unit.
Returns
-------
astropy.units.Quantity
"""
return self.map.unit
[docs]
def get_data(self):
"""
Return the map data.
Returns
-------
Observation2D
"""
return self.map
[docs]
def add_base(self):
"""
Add the base to the observation map.
Returns
-------
None
"""
if self.base is not None:
self.map.add(self.base)
[docs]
def smooth_to(self, fwhm):
"""
Smooth the map to a given FWHM.
Parameters
----------
fwhm : astropy.units.Quantity or Gaussian2D
Returns
-------
None
"""
self.map.smooth_to(fwhm)
[docs]
def filter_source(self, filter_fwhm, filter_blanking=None, use_fft=False):
"""
Apply source filtering.
Parameters
----------
filter_fwhm : astropy.units.Quantity or Coordinate
The filtering FWHM for the source.
filter_blanking : float, optional
Only apply filtering within the optional range -filter_blanking ->
+filter_blanking.
use_fft : bool, optional
If `True`, use FFTs to perform the filtering. Otherwise, do full
convolutions.
Returns
-------
None
"""
if filter_blanking is not None and np.isfinite(filter_blanking):
significance = self.map.get_significance()
valid = significance.valid
significance_values = significance.data
valid &= significance_values <= filter_blanking
valid &= significance_values >= -filter_blanking
else:
valid = None
if use_fft:
log.debug(f"Filtering source above {filter_fwhm} (FFT).")
self.map.fft_filter_above(filter_fwhm, valid=valid)
else:
log.debug(f"Filtering source above {filter_fwhm}.")
self.map.filter_above(filter_fwhm, valid=valid)
self.map.set_filter_blanking(filter_blanking)
[docs]
def set_filtering(self, fwhm):
"""
Set the map filtering FWHM.
Parameters
----------
fwhm : astropy.units.Quantity
Returns
-------
None
"""
self.map.update_filtering(fwhm)
[docs]
def reset_filtering(self):
"""
Reset the map filtering.
Returns
-------
None
"""
self.map.reset_filtering()
[docs]
def filter_beam_correct(self):
"""
Apply beam filter correction.
Returns
-------
None
"""
self.map.filter_beam_correct()
[docs]
def mem_correct(self, lg_multiplier):
"""
Apply maximum entropy correction to the map.
Parameters
----------
lg_multiplier : float
Returns
-------
None
"""
self.map.mem_correct_observation(None, lg_multiplier)
[docs]
def get_clean_local_copy(self, full=False):
"""
Get an unprocessed copy of the source model.
Parameters
----------
full : bool, optional
If True, copy additional parameters for stand-alone reductions
that would otherwise be referenced.
Returns
-------
SourceModel
"""
model = super().get_clean_local_copy(full=full)
if not full:
return model
model.base = model.base.copy()
model.base.clear()
return model