[docs]
class AstroData2D(AstroModel2D):
FLAG_MASK = ArrayFlags.flags.MASK
def __init__(self, info, reduction=None):
"""
Initialize an astronomical 2D model of the source data.
The AstroData2D is an abstract class that extends the
:class:`AstroModel2D` to operate on the
:class:`sofia_redux.scan.source_models.map.observation_2d.Observaton2D`
class.
Parameters
----------
info : sofia_redux.scan.info.info.Info
The Info object which should belong to this source model.
reduction : sofia_redux.scan.reduction.reduction.Reduction, optional
The reduction for which this source model should be applied.
"""
super().__init__(info=info, reduction=reduction)
[docs]
def create_from(self, scans, assign_scans=True):
"""
Initialize model from scans.
Sets the model scans to those provided, and the source model for each
scan as this. All integration gains are normalized to the first scan.
If the first scan is non-sidereal, the system will be forced to an
equatorial frame.
Parameters
----------
scans : list (sofia_redux.scan.scan.scan.Scan)
A list of scans from which to create the model.
assign_scans : bool, optional
If `True`, assign the scans to this source model. Otherwise,
there will be no hard link between the scans and source model.
Returns
-------
None
"""
super().create_from(scans, assign_scans=assign_scans)
if self.has_option('unit'):
self.get_data().set_unit(self.configuration.get_string('unit'))
@property
def flagspace(self):
"""
Return the flagspace for this source model.
Returns
-------
ArrayFlags
"""
return ArrayFlags
@property
def mask_flag(self):
"""
Return the masking flag for this source model.
Returns
-------
flag : enum.Enum
"""
return self.flagspace.convert_flag(self.FLAG_MASK)
[docs]
def get_weights(self):
"""
Return the weight map for the source model.
Returns
-------
weight_map : sofia_redux.scan.source_models.maps.weight_map.WeightMap
"""
return self.get_data().get_weights()
[docs]
def get_noise(self):
"""
Return the noise map for the source model.
Returns
-------
noise_map : sofia_redux.scan.source_models.maps.noise_map.NoiseMap
"""
return self.get_data().get_noise()
[docs]
def get_significance(self):
"""
Return the data significance map (signal-to-noise).
Returns
-------
sofia_redux.scan.source_models.maps.significance_map.SignificanceMap
"""
return self.get_data().get_significance()
[docs]
def get_exposures(self):
"""
Return the exposure map for the source model.
Returns
-------
sofia_redux.scan.source_models.maps.exposure_map.ExposureMap
"""
return self.get_data().get_exposures()
[docs]
def end_accumulation(self):
"""
End map accumulation (typically scale by inverse weights).
Returns
-------
None
"""
self.get_data().end_accumulation()
[docs]
def get_executor(self):
"""
Return the source map parallel executor.
The executor is not currently implemented in any way.
Returns
-------
executor : object
"""
return self.get_data().executor
[docs]
def set_executor(self, executor):
"""
Set the parallel executor for the source.
The executor is not currently implemented in any way.
Parameters
----------
executor : object
Returns
-------
None
"""
self.get_data().set_executor(executor)
[docs]
def get_parallel(self):
"""
Get the number of parallel operations for the source model.
Returns
-------
threads : int
"""
return self.get_data().parallelism
[docs]
def set_parallel(self, threads):
"""
Set the number of parallel operations for the source model.
Parameters
----------
threads : int
Returns
-------
None
"""
self.get_data().set_parallel(threads)
[docs]
def no_parallel(self):
"""
Disable parallel processing for the model.
Returns
-------
None
"""
self.get_data().set_parallel(0)
[docs]
def clear_content(self):
"""
Clear the data.
Returns
-------
None
"""
self.get_data().clear()
[docs]
def is_empty(self):
"""
Return whether source map is empty.
Returns
-------
bool
"""
return self.get_data().count_points() == 0
[docs]
def count_points(self):
"""
Return the number of points in the source map.
Returns
-------
points : int
"""
return self.get_data().count_points()
[docs]
def get_chi2(self, robust=False):
"""
Get the Chi-squared statistic.
Parameters
----------
robust : bool, optional
If `True`, use the robust (median) method for determining variance.
Otherwise, use a weighted mean.
Returns
-------
chi2 : float
"""
return self.get_significance().variance(robust=robust)
[docs]
def smooth(self):
"""
Smooth the source model.
Returns
-------
None
"""
self.smooth_to(self.smoothing)
[docs]
def filter(self, allow_blanking=False):
"""
Apply filtering to the source.
Parameters
----------
allow_blanking : bool, optional
If `True`, allow the blanking value to be determined from the
'source.filter.blank' configuration value. Otherwise, the filter
blanking value will be set to NaN. The filter blanking level
defines the range of values prior to filter smoothing that are
permitted during the convolution. Any values > filter_level, or
values < filter_level will be marked as invalid and not included.
Returns
-------
None
"""
if (not self.configuration.get_bool('source.filter')
or self.get_source_size() <= 0):
self.reset_filtering()
return
if allow_blanking:
filter_blanking = self.configuration.get_float(
'source.filter.blank', default=np.nan)
else:
filter_blanking = np.nan
filter_fwhm = self.get_filter_scale()
use_fft = self.configuration.get_string('source.filter.type') == 'fft'
self.filter_source(
filter_fwhm=filter_fwhm,
filter_blanking=filter_blanking,
use_fft=use_fft)
[docs]
def get_filter_scale(self):
"""
Return the filter scale.
Returns
-------
filter_fwhm : astropy.units.Quantity
The FWHM of the filter scale.
"""
directive = self.configuration.get_string('source.filter.fwhm',
default='auto')
try:
fwhm = float(directive) * self.info.instrument.get_size_unit()
except ValueError:
fwhm = 5 * self.get_source_size()
return fwhm
[docs]
def process_scan(self, scan):
"""
Process a scan.
Parameters
----------
scan : sofia_redux.scan.scan.scan.Scan
Returns
-------
None
"""
data = self.get_data()
self.end_accumulation()
self.add_base()
if self.enable_level:
self.level(robust=True)
if self.configuration.get_bool('source.despike'):
despike_level = self.configuration.get_float(
'source.despike.level', default=10)
data.despike(despike_level)
self.filter(allow_blanking=False)
scan.weight = 1.0
if self.configuration.get_bool('weighting.scans'):
method = self.configuration.get_string(
'weighting.scans.method', default='rms')
scan.weight = 1.0 / self.get_chi2(robust=(method == 'robust'))
if not np.isfinite(scan.weight):
scan.weight = 0.0
if self.configuration.get_bool('scanmaps'):
file_name = os.path.join(self.reduction.work_path,
f'scan-{scan.mjd}-{scan.get_id()}.fits')
self.write_fits(file_name)
[docs]
def level(self, robust=False):
"""
Level the source model data.
Parameters
----------
robust : bool, optional
If `True`, use the robust (weighted median) method to level data.
Otherwise, use a weighted mean.
Returns
-------
None
"""
self.get_data().level(robust=robust)
[docs]
def process(self):
"""
Process the source model.
Returns
-------
None
"""
self.end_accumulation()
self.next_generation() # increment the map generation
self.info.instrument.resolution = self.get_average_resolution()
if self.enable_level:
self.add_process_brief('{level} ')
if self.configuration.get_bool('source.despike'):
self.add_process_brief('{despike} ')
if (self.configuration.get_bool('source.filter')
and self.get_source_size() > 0):
self.add_process_brief('{filter} ')
if (self.enable_weighting
and self.configuration.get_bool('weighting.scans')):
for scan in self.scans:
if scan.weight != 0:
message = f'{{{1.0 / scan.weight:.2f}x}}'
else:
message = "{inf}"
self.add_process_brief(message)
if self.has_option('source.redundancy'):
self.add_process_brief('(check) ')
redundancy = self.configuration.get_int('source.redundancy')
min_integration_time = (
self.info.instrument.integration_time * redundancy)
exposures = self.get_exposures()
min_integration_time = min_integration_time.to('second').value
exposures.restrict_range(Range(min_val=min_integration_time))
smooth = self.configuration.get_string(
'smooth', default='None').lower().strip()
if smooth != 'none' and not self.configuration.get_bool(
'smooth.external'):
self.add_process_brief('(smooth) ')
self.smooth()
# Apply the filtering to the final map, to reflect the correct blanking
# level
if self.configuration.get_bool('source.filter'):
self.add_process_brief('(filter) ')
self.filter(allow_blanking=True)
self.filter_beam_correct()
# Noise and exposure clip after smoothing for evened-out coverage.
if self.has_option('exposureclip'):
self.add_process_brief('(exposureclip) ')
clip_level = self.configuration.get_float('exposureclip')
exposures = self.get_exposures()
min_clip = clip_level * exposures.select(fraction=0.95)
exposures.restrict_range(Range(min_val=min_clip))
if self.has_option('noiseclip'):
self.add_process_brief('(noiseclip) ')
rms = self.get_noise()
clip_level = self.configuration.get_float('noiseclip')
max_clip = clip_level * rms.select(fraction=0.05)
rms.restrict_range(Range(min_val=0, max_val=max_clip))
if self.enable_bias and self.has_option('clip'):
clip_level = self.configuration.get_float('clip')
self.add_process_brief(f'(clip:{clip_level}) ')
sign = self.configuration.get_sign('source.sign', default=0)
s2n_reject = Range(-clip_level, clip_level)
if sign > 0:
s2n_reject.min = -np.inf
elif sign < 0:
s2n_reject.max = np.inf
self.get_significance().discard_range(s2n_reject)
if self.configuration.get_bool('source.mem'):
self.add_process_brief('(MEM) ')
multiplier = self.configuration.get_float(
'source.mem.lambda', default=0.1)
self.mem_correct(lg_multiplier=multiplier)
if self.configuration.get_bool('source.intermediates'):
file_name = os.path.join(self.reduction.work_path,
'intermediate.fits')
self.write_fits(file_name)
# Coupled with blanking
if not self.configuration.get_bool('source.nosync'):
if self.enable_bias and self.has_option('blank'):
blanking_level = self.get_blanking_level()
self.add_process_brief(f'blank:{blanking_level}) ')
self.update_mask(blanking_level=blanking_level,
min_neighbors=2)
else:
self.update_mask(blanking_level=np.nan, min_neighbors=2)
[docs]
def process_final(self):
"""
Runs any final processing steps.
Returns
-------
None
"""
self.get_data().clear_history()
[docs]
def write_fits(self, filename):
"""
Write the results to a FITS file.
Parameters
----------
filename : str
Returns
-------
None
"""
hdu_list = self.get_data().create_fits()
for hdu in hdu_list:
self.edit_header(hdu.header)
hdu.header['FILENAME'] = filename, 'Name at creation'
self.add_scan_hdus_to(hdu_list)
self.hdul = hdu_list
if not self.configuration.get_bool('write.source'):
return
log.info(f'Writing to {filename}')
hdu_list.writeto(filename, overwrite=True)
hdu_list.close()
[docs]
@abstractmethod
def get_data(self): # pragma: no cover
"""
Return the data for the source model.
Returns
-------
data : sofia_redux.scan.source_models.maps.observation_2d.Observation2D
"""
pass
[docs]
@abstractmethod
def add_base(self): # pragma: no cover
"""
Add a base to the source model data.
Returns
-------
None
"""
pass
[docs]
@abstractmethod
def smooth_to(self, fwhm): # pragma: no cover
"""
Smooth the map using a Gaussian kernel of a given FWHM.
Parameters
----------
fwhm : astropy.units.Quantity
Returns
-------
None
"""
pass
[docs]
@abstractmethod
def filter_source(self, filter_fwhm, filter_blanking=None, use_fft=False
): # pragma: no cover
"""
Filter (smooth) the source above a given FWHM.
Parameters
----------
filter_fwhm : astropy.units.Quantity
The filter FWHM scale to filter above.
filter_blanking : float, optional
use_fft : bool, optional
If `True`, use FFT filtering.
Returns
-------
None
"""
pass
[docs]
@abstractmethod
def set_filtering(self, fwhm): # pragma: no cover
"""
Set the filtering FWHM.
Parameters
----------
fwhm : astropy.units.Quantity
Returns
-------
None
"""
pass
[docs]
@abstractmethod
def reset_filtering(self): # pragma: no cover
"""
Reset the source filtering parameters.
Returns
-------
None
"""
pass
[docs]
@abstractmethod
def filter_beam_correct(self): # pragma: no cover
"""
Apply the filter beam correction.
Returns
-------
None
"""
pass
[docs]
@abstractmethod
def mem_correct(self, lg_multiplier): # pragma: no cover
"""
Apply maximum entropy correction to the map.
Parameters
----------
lg_multiplier : float
The Lagrange multiplier (lambda).
Returns
-------
None
"""
pass
[docs]
@abstractmethod
def update_mask(self, blanking_level=np.nan, min_neighbors=None
): # pragma: no cover
"""
Update the map mask based on significance levels and valid neighbors.
If a blanking level is supplied, significance values above or equal to
the blanking level will be masked.
Parameters
----------
blanking_level : float, optional
The significance level used to mark the map. If not supplied,
significance is irrelevant. See above for more details.
min_neighbors : int, optional
The minimum number of neighbors including the pixel itself.
Therefore, the default of 2 excludes single pixels as this would
require a single valid pixel and one valid neighbor.
Returns
-------
None
"""
pass