[docs]
class Map2D(Overlay):
UNDERLYING_BEAM_FITS_ID = 'I'
SMOOTHING_BEAM_FITS_ID = 'S'
CORRECTED_BEAM_FITS_ID = 'C'
FILTER_BEAM_FITS_ID = 'X'
def __init__(self, data=None, blanking_value=np.nan, dtype=float,
shape=None, unit=None):
"""
Initialize a Map2D instance.
The 2D map is an extension of the :class:`Image` and :class:`Overlay`
classes. In addition to the standard image handling methods, a grid
is also included to allow for projections and deprojections of pixels
to/from real coordinates.
An underlying beam representing the observation PSF may also be
modeled as well as a beam to represent any smoothing operations that
may be applied.
Parameters
----------
data : numpy.ndarray, optional
Data to initialize the flagged array with. If supplied, sets the
shape of the array. Note that the data type will be set to that
defined by the `dtype` parameter.
blanking_value : int or float, optional
The blanking value defines invalid values in the data array. This
is the equivalent of defining a NaN value.
dtype : type, optional
The data type of the data array.
shape : tuple (int), optional
The shape of the data array. This will only be relevant if
`data` is not defined.
unit : str or units.Unit or units.Quantity, optional
The data unit.
"""
self.fits_properties = FitsProperties()
self.grid = FlatGrid2D()
self.display_grid_unit = None
self.underlying_beam = None # Gaussian2D
self.smoothing_beam = None # Gaussian2D
self.filter_fwhm = np.nan * units.Unit('deg')
self.correcting_fwhm = np.nan * units.Unit('deg')
self.filter_blanking = np.inf
self.reuse_index = Coordinate2D()
self.skip_model_edit_header = False
super().__init__(blanking_value=blanking_value, dtype=dtype,
data=data, shape=shape, unit=unit)
self.set_image(self.basis)
[docs]
def copy(self, with_contents=True):
"""
Return a copy of the map.
Returns
-------
Map2D
"""
return super().copy(with_contents=with_contents)
def __eq__(self, other):
"""
Check if this Map2D instance is functionally equivalent to another.
Parameters
----------
other : Map2D
Returns
-------
equal : bool
"""
if self is other:
return True
if other.__class__ != self.__class__:
return False
if self.fits_properties != other.fits_properties:
return False
if isinstance(self.correcting_fwhm, (Coordinate2D, Coordinate2D1)):
if self.correcting_fwhm != other.correcting_fwhm:
return False
elif not np.isclose(self.correcting_fwhm, other.correcting_fwhm,
equal_nan=True):
return False
if isinstance(self.filter_fwhm, (Coordinate2D, Coordinate2D1)):
if self.filter_fwhm != other.filter_fwhm:
return False
elif not np.isclose(self.filter_fwhm, other.filter_fwhm,
equal_nan=True):
return False
if self.grid != other.grid:
return False
if self.display_grid_unit != other.display_grid_unit:
return False
if self.underlying_beam != other.underlying_beam:
return False
if self.smoothing_beam != other.smoothing_beam:
return False
return super().__eq__(other)
[docs]
@classmethod
def default_beam(cls):
"""
Return the default beam for this map.
Returns
-------
Gaussian2D
"""
return Gaussian2D
[docs]
@classmethod
def numpy_to_fits(cls, coordinates):
"""
Convert numpy based (x, y) coordinates/indices to FITS coordinates.
Parameters
----------
coordinates : numpy.ndarray
Returns
-------
Coordinate2D
"""
coordinates = super().numpy_to_fits(coordinates)
return Coordinate2D(coordinates)
@property
def pixel_area(self):
"""
Return the pixel area of the grid.
Returns
-------
area : units.Quantity or float
"""
return self.grid.get_pixel_area()
@property
def reference(self):
"""
Return the reference position of the grid.
Returns
-------
Coordinate2D
"""
return self.grid.reference
@reference.setter
def reference(self, value):
"""
Set the reference position of the grid.
Parameters
----------
value : Coordinate2D
Returns
-------
None
"""
self.grid.set_reference(value)
@property
def reference_index(self):
"""
Return the reference index of the grid.
Returns
-------
index : Coordinate2D
"""
return self.grid.reference_index
@reference_index.setter
def reference_index(self, value):
"""
Set the reference index of the grid.
Parameters
----------
value : Coordinate2D
Returns
-------
None
"""
self.grid.set_reference_index(value)
@property
def projection(self):
"""
Return the grid projection.
The grid projection transforms coordinates to map offsets.
Returns
-------
Projection2D
"""
return self.grid.projection
@projection.setter
def projection(self, value):
"""
Set the grid projection.
Parameters
----------
value : Projection2D
Returns
-------
None
"""
self.grid.set_projection(value)
[docs]
def reset_processing(self):
"""
Reset the processing status.
Sets the smoothing beam to be consistent with the current grid, and
removes all filtering and correcting FWHM parameters.
Returns
-------
None
"""
if self.fits_properties is not None:
self.fits_properties.reset_processing()
self.reset_smoothing()
self.reset_filtering()
[docs]
def reset_smoothing(self):
"""
Reset the map smoothing.
Returns
-------
None
"""
self.set_pixel_smoothing()
[docs]
def reset_filtering(self):
"""
Reset the map filtering.
Returns
-------
None
"""
deg = units.Unit('deg')
self.set_filtering(np.nan * deg)
self.set_correcting_fwhm(np.nan * deg)
self.set_filter_blanking(np.nan)
[docs]
def renew(self):
"""
Renew the map by clearing all processing and data.
Returns
-------
None
"""
self.reset_processing()
self.clear()
[docs]
def add_proprietary_units(self):
"""
Add proprietary units to the local units.
Returns
-------
None
"""
self.add_local_unit(np.nan * units.Unit('beam'),
alternate_names=['beam', 'BEAM', 'Beam', 'bm',
'BM', 'Bm'])
self.add_local_unit(self.grid.get_pixel_area() * units.Unit('pixel'),
alternate_names=['pixel', 'PIXEL', 'Pixel',
'PIXELS', 'Pixels', 'pxl', 'PXL',
'Pxl'])
[docs]
def no_data(self):
"""
Discard all data.
Returns
-------
None
"""
self.discard()
[docs]
def is_filtered(self):
"""
Return whether the map has been filtered.
Returns
-------
filtered : bool
"""
return not np.isnan(self.filter_fwhm)
[docs]
def is_corrected(self):
"""
Return whether the map has been corrected.
Returns
-------
bool
"""
return not np.isnan(self.correcting_fwhm)
[docs]
def is_filter_blanked(self):
"""
Return whether the map is filter blanked.
Returns
-------
bool
"""
return np.isfinite(self.filter_blanking)
[docs]
def set_correcting_fwhm(self, fwhm):
"""
Set the correcting FWHM.
Parameters
----------
fwhm : units.Quantity
Returns
-------
None
"""
self.correcting_fwhm = fwhm
[docs]
def set_default_unit(self):
"""
Set the default unit for the map data.
Returns
-------
None
"""
self.add_proprietary_units()
super().set_default_unit()
[docs]
def set_unit(self, unit):
"""
Set the map data unit.
Parameters
----------
unit : astropy.units.Quantity or str or astropy.units.Unit
The unit to set as the map data unit. Should be a quantity
(value and unit type). If a string or Unit is supplied, the
map data unit will be set to the value located in the local_units
dictionary. If no such value exists, a KeyError will be raised.
Returns
-------
None
"""
super().set_unit(unit)
image = self.get_image()
if image is not None:
image.set_unit(unit)
[docs]
def set_grid(self, grid):
"""
Set the map grid.
Parameters
----------
grid : SphericalGrid
Returns
-------
None
"""
if self.smoothing_beam is not None and self.grid is not None:
# Undo prior pixel smoothing, if any
old_pixel_beam = self.get_pixel_smoothing().copy()
self.smoothing_beam = utils.deconvolve_beam(
self.smoothing_beam, old_pixel_beam)
self.grid = grid
# Apply new pixel smoothing
pixel_beam = self.get_pixel_smoothing().copy()
if self.smoothing_beam is None or self.smoothing_beam.area == 0:
self.smoothing_beam = pixel_beam
else:
self.smoothing_beam = utils.encompass_beam(
self.smoothing_beam, pixel_beam)
[docs]
def set_resolution(self, resolution, redo=False):
"""
Set the resolution of the grid.
Sets a new resolution for the map grid. If the smoothing beam has
already been determined and `redo` is `True`, it is first deconvolved
by the original grid resolution before being encompassed by a smoothing
beam determined from the new grid resolution.
Parameters
----------
resolution : float or numpy.ndarray or units.Quantity or Coordinate2D
An array of shape (2,) giving the (x, y) grid resolution.
redo : bool, optional
If `True` deconvolve with the smoothing beam, and convolve
after the resolution is set.
Returns
-------
None
"""
if self.smoothing_beam is not None and self.grid is not None and redo:
self.smoothing_beam = utils.deconvolve_beam(
self.smoothing_beam, self.get_pixel_smoothing())
self.grid.set_resolution(resolution)
if self.smoothing_beam is None:
self.smoothing_beam = self.get_pixel_smoothing()
else:
self.smoothing_beam.encompass(self.get_pixel_smoothing())
[docs]
def set_underlying_beam(self, psf):
"""
Set the underlying beam.
Parameters
----------
psf : Gaussian2D or units.Quantity
A Gaussian PSF model, or a FWHM from which to create the model.
Returns
-------
None
"""
if isinstance(psf, Gaussian2D):
self.underlying_beam = psf.copy()
else:
self.underlying_beam = Gaussian2D(x_fwhm=psf, y_fwhm=psf)
[docs]
def set_pixel_smoothing(self):
"""
Set the smoothing beam to the pixel smoothing beam.
Returns
-------
None
"""
self.smoothing_beam = self.get_pixel_smoothing().copy()
[docs]
def set_smoothing(self, psf):
"""
Set the smoothing beam.
Parameters
----------
psf : Gaussian2D or units.Quantity
A Gaussian PSF model, or a FWHM from which to create the model.
Returns
-------
None
"""
if isinstance(psf, Gaussian2D):
self.smoothing_beam = psf.copy()
else:
self.smoothing_beam = Gaussian2D(x_fwhm=psf, y_fwhm=psf)
[docs]
def set_filtering(self, fwhm):
"""
Set the filtering FWHM.
Parameters
----------
fwhm : .units.Quantity
Returns
-------
None
"""
self.filter_fwhm = fwhm
[docs]
def set_filter_blanking(self, value):
"""
Set the filter blanking.
Parameters
----------
value : float or None
The value to set for filter blanking. If `None` is supplied, the
filter blanking value is set to NaN (disabled).
Returns
-------
None
"""
if value is None:
self.filter_blanking = np.nan
else:
self.filter_blanking = float(value)
[docs]
def set_image(self, image):
"""
Set the basis image.
Parameters
----------
image : Image2D or numpy.ndarray
Returns
-------
None
"""
if isinstance(image, np.ndarray):
image = Image2D(data=image,
blanking_value=self.blanking_value,
dtype=self.dtype)
if image is not self.basis:
self.set_basis(image)
self.claim_image(image)
[docs]
def set_display_grid_unit(self, unit):
"""
Set the grid display unit.
The display grid unit defines the spatial units of the map.
Parameters
----------
unit : str or units.Unit or units.Quantity or None
Returns
-------
None
"""
if isinstance(unit, str):
self.display_grid_unit = units.Unit(unit) * 1.0
elif isinstance(unit, units.Unit):
self.display_grid_unit = unit * 1.0
elif isinstance(unit, units.Quantity):
self.display_grid_unit = unit
elif unit is None:
pass
else:
raise ValueError(f"Unit must be {str}, {units.Unit}, "
f"{units.Quantity}, or {None}.")
[docs]
def get_area(self):
"""
Return the total area of the map.
The total area is the number of pixels multiplied by the pixel area.
Returns
-------
area : units.Quantity or float
"""
return self.count_valid_points() * self.grid.get_pixel_area()
[docs]
def get_display_grid_unit(self):
"""
Return the display grid unit.
Returns
-------
units.Quantity
"""
if self.display_grid_unit is not None:
return self.display_grid_unit
return self.get_default_grid_unit()
[docs]
def get_default_grid_unit(self):
"""
Return the default grid unit.
Returns
-------
units.Quantity
"""
if self.grid is None:
return self.get_unit('pixel')
return self.grid.get_default_unit()
[docs]
def get_image(self, dtype=None, blanking_value=None):
"""
Return the basis image.
Parameters
----------
dtype : type, optional
The image data type.
blanking_value : int or float, optional
The new image blanking value.
Returns
-------
Image2D
"""
if dtype is not None or blanking_value is not None:
log.warning("Cannot change base image type or blanking value from "
"Map2D.")
return self.basis
[docs]
def get_image_beam(self):
"""
Return the image beam.
The image beam is the underlying beam convolved with the smoothing
beam. If one is not set, the other is returned. If neither is set,
`None` is returned.
Returns
-------
Gaussian2D or None
"""
if self.underlying_beam is None and self.smoothing_beam is None:
return None
elif self.underlying_beam is None and self.smoothing_beam is not None:
return self.smoothing_beam.copy()
elif self.smoothing_beam is None and self.underlying_beam is not None:
return self.underlying_beam.copy()
beam = self.underlying_beam.copy()
beam.convolve_with(self.smoothing_beam)
return beam
[docs]
def get_image_beam_area(self):
"""
Get the beam area of the image beam.
Returns
-------
float or units.Quantity
"""
if self.underlying_beam is None:
underlying_area = 0.0
else:
underlying_area = self.underlying_beam.area
if self.smoothing_beam is None:
smoothing_area = 0.0
else:
smoothing_area = self.smoothing_beam.area
return underlying_area + smoothing_area
[docs]
def get_filter_area(self):
"""
Get the filtering beam area.
Returns
-------
area : units.Quantity
"""
if self.filter_fwhm is None:
return 0.0 * units.Unit('degree') ** 2
return Gaussian2D.AREA_FACTOR * (self.filter_fwhm ** 2)
[docs]
def get_filter_correction_factor(self, underlying_fwhm=None):
"""
Return the filter correction factor.
The filtering correction factor is given as::
factor = 1 / (1 - ((a1 - a2) / (a1 + a3)))
where a1 is the underlying beam area, a2 is the smoothing beam area,
and a3 is the filtering beam area.
Parameters
----------
underlying_fwhm : astropy.units.Quantity or float, optional
The underlying FWHM. If not supplied, defaults to the map
underlying beam FWHM.
Returns
-------
correction_factor : float
"""
if np.isnan(self.filter_fwhm):
return 1.0
if underlying_fwhm is None:
underlying_beam = self.underlying_beam
else:
underlying_beam = Gaussian2D(x_fwhm=underlying_fwhm,
y_fwhm=underlying_fwhm)
# Get the appropriate unit for unavailable beam FWHMs
for beam in [underlying_beam, self.smoothing_beam]:
if beam is not None and isinstance(beam.x_fwhm, units.Quantity):
unit = beam.x_fwhm.unit
break
else:
if isinstance(self.filter_fwhm, units.Quantity):
unit = self.filter_fwhm.unit
else:
unit = units.dimensionless_unscaled
u2 = unit ** 2
if underlying_beam is None:
underlying_beam_area = 0.0 * u2
else:
underlying_beam_area = underlying_beam.area
if self.smoothing_beam is None:
smoothing_beam_area = 0.0 * u2
else:
smoothing_beam_area = self.smoothing_beam.area
denominator = underlying_beam_area + self.get_filter_area()
if denominator == 0:
return 1.0
factor = ((underlying_beam_area - smoothing_beam_area) / denominator
).decompose().value
return 1.0 / (1.0 - factor)
[docs]
def get_pixel_smoothing(self):
"""
Return a Gaussian model representing pixel smoothing.
Returns
-------
Gaussian2D
"""
factor = Gaussian2D.FWHM_TO_SIZE
return Gaussian2D(x_fwhm=self.grid.resolution.x / factor,
y_fwhm=self.grid.resolution.y / factor,
theta=0.0 * units.Unit('deg'))
[docs]
def get_resolution(self):
"""
Return the grid resolution in (x, y).
Returns
-------
resolution : Coordinate2D
"""
return self.grid.resolution
[docs]
def get_anti_aliasing_beam_for(self, map2d):
"""
Return the anti-aliasing beam for a given Map2D.
The anti-aliasing beam is the beam representing the pixel smoothing for
this map deconvolved by the input map smoothing beam. If the smoothing
beam of the input map encompasses the pixel smoothing of this map,
`None` will be returned.
Parameters
----------
map2d : Map2D
Returns
-------
anti_aliasing_beam : Gaussian2D or None
"""
map_smoothing = map2d.smoothing_beam
pixelization = self.get_pixel_smoothing()
if map_smoothing is None:
return pixelization
elif map_smoothing.is_encompassing(pixelization):
return None
antialias = pixelization.copy()
antialias.deconvolve_with(map_smoothing)
return antialias
[docs]
def get_anti_aliasing_beam_image_for(self, map2d):
"""
Return an antialiasing beam image.
The anti-aliasing beam is the beam representing the pixel smoothing for
this map deconvolved by the smoothing beam of the input map. A
representation of this beam is returned by projecting it onto the grid
of this map. If there is no valid anti-aliasing beam, `None` is
returned.
Parameters
----------
map2d : Map2D
Returns
-------
anti_aliasing_beam_image : numpy.ndarray (float) or None
"""
antialias = self.get_anti_aliasing_beam_for(map2d)
if antialias is None:
return None
return antialias.get_beam_map(self.grid)
[docs]
def get_table_entry(self, name):
"""
Return a parameter value for a given name.
Parameters
----------
name : str, optional
Returns
-------
value
"""
if name == 'beams':
return self.count_beams()
elif name == 'min':
return self.min() / self.unit.value
elif name == 'max':
return self.max() / self.unit.value
elif name == 'unit':
return str(self.unit.unit)
elif name == 'mean':
return self.mean()[0] / self.unit.value
elif name == 'median':
return self.median()[0] / self.unit.value
elif name == 'rms':
return self.rms(robust=True) / self.unit.value
else:
return self.fits_properties.get_table_entry(name)
[docs]
def get_info(self):
"""
Return strings describing the map.
Returns
-------
list of str
"""
size_unit = self.get_display_grid_unit()
pxy = self.grid.get_pixel_size().coordinates
if size_unit is not None:
if isinstance(size_unit, units.UnitBase):
size_value = 1.0
else:
size_value, size_unit = size_unit.value, size_unit.unit
px, py = (pxy.to(size_unit) / size_value).value
unit_str = f' {size_unit}'
elif (isinstance(pxy, units.Quantity)
and pxy.unit
!= units.dimensionless_unscaled): # pragma: no cover
unit_str = f' {pxy.unit}'
px, py = pxy.value
else: # pragma: no cover
px, py = pxy
unit_str = ''
u_beam = self.underlying_beam
u_fwhm = 0.0 if u_beam is None else u_beam.fwhm
i_beam = self.get_image_beam()
i_fwhm = 0.0 if i_beam is None else i_beam.fwhm
info = ["Map information:",
f"Image Size: {self.get_size_string()} pixels "
f"({px} x {py}{unit_str}).",
self.grid.to_string(),
f'Instrument PSF: {u_fwhm:.5f} '
f'(includes pixelization)',
f'Image resolution: {i_fwhm:.5f} '
f'(includes smoothing)']
return info
[docs]
def get_points_per_smoothing_beam(self):
"""
Get the number of pixels per smoothing beam.
Returns
-------
float
"""
if self.smoothing_beam is None:
return 1.0
points = self.smoothing_beam.area / self.grid.get_pixel_area()
points = points.decompose().value
return np.clip(points, 1.0, None)
[docs]
def copy_processing_from(self, other):
"""
Copy the processing from another map.
Processing includes the underlying and smoothing beams, the filtering
FWHM, filtering blanking value and correcting FWHM.
Parameters
----------
other : Map2D
Returns
-------
None
"""
self.underlying_beam = deepcopy(other.underlying_beam)
self.smoothing_beam = deepcopy(other.smoothing_beam)
self.filter_fwhm = other.filter_fwhm
self.filter_blanking = other.filter_blanking
self.correcting_fwhm = other.correcting_fwhm
[docs]
def copy_properties_from(self, other):
"""
Copy the properties from another map.
The properties copied include anything under
:func:`Map2D.copy_processing_from` along with the map grid and FITS
properties.
Parameters
----------
other : Map2D
Returns
-------
None
"""
if other.fits_properties is None:
self.fits_properties = None
else:
self.fits_properties = other.fits_properties.copy()
self.copy_processing_from(other)
self.filter_fwhm = other.filter_fwhm
self.correcting_fwhm = other.correcting_fwhm
self.filter_blanking = other.filter_blanking
if other.grid is not None:
self.grid = other.grid.copy()
self.display_grid_unit = deepcopy(other.display_grid_unit)
self.underlying_beam = deepcopy(other.underlying_beam)
self.smoothing_beam = deepcopy(other.smoothing_beam)
[docs]
def merge_properties_from(self, other):
"""
Merge properties from another map.
Merging the properties results in the encompassed smoothing beam of
this map by the other, and the minimum filtering FWHM of both maps.
Parameters
----------
other : Map2D
Returns
-------
None
"""
if other.smoothing_beam is not None:
if self.smoothing_beam is not None:
self.smoothing_beam = utils.encompass_beam(
self.smoothing_beam, other.smoothing_beam)
else:
self.smoothing_beam = deepcopy(other.smoothing_beam)
unit = self.filter_fwhm.unit
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
self.filter_fwhm = np.nanmin([self.filter_fwhm.to(unit).value,
other.filter_fwhm.to(unit).value]
) * unit
[docs]
def add_smoothing(self, psf):
"""
Convolve the smoothing beam with a PSF (Point Spread Function).
Parameters
----------
psf : Gaussian2D or astropy.units.Quantity
A Gaussian model or an FWHM from which to create the model.
Returns
-------
None
"""
if isinstance(psf, units.Quantity):
psf = Gaussian2D(x_fwhm=psf, y_fwhm=psf)
if self.smoothing_beam is None:
self.smoothing_beam = psf.copy()
else:
self.smoothing_beam.convolve_with(psf)
[docs]
def filter_beam_correct(self):
"""
Scale the map data by the filter correction factor.
The filter correction factor is determined by applying
:func:`Map2D.get_filter_correction_factor` to the underlying beam of
the map. All data are scaled by this factor.
Returns
-------
None
"""
beam = self.underlying_beam
if isinstance(beam, Gaussian2D):
fwhm = beam.get_circular_equivalent_fwhm()
else:
fwhm = 0.0 * units.Unit('degree')
self.filter_correct(fwhm)
[docs]
def filter_correct(self, underlying_fwhm, reference=None, valid=None):
"""
Apply a filtering correction factor to the map data.
The filtering correction factor is derived from the supplied
`underlying_fwhm` using :func:`Map2D.get_filter_correction_factor`, and
is only applied to valid data elements. The validity of the elements
may be supplied or determined from the map itself or a supplied
`reference`.
If a correction factor was previously applied, it will be removed
before applying the new factor.
Parameters
----------
underlying_fwhm : units.Quantity or Coordinate
The underlying FWHM of the beam used to derive the filtering
correction factor to apply to the data.
reference : FlaggedArray or numpy.ndarray, optional
If `valid` is not supplied, calculate the validity mask from this
reference array and whether those values are within the blanking
value range, and are themselves valid (if the reference is a
FlaggedArray). Only valid data will be scaled by the filtering
correction factor.
valid : numpy.ndarray (bool), optional
A boolean mask with the same shape as the map data. Elements
flagged as `False` will not be scaled by the filtering correction
factor. If not supplied, the validity mask will be derived from
this map, or a supplied `reference` array.
Returns
-------
None
"""
if valid is None:
if reference is None:
reference = self
if isinstance(reference, FlaggedArray):
ref_data = reference.data
ref_valid = reference.valid
else:
ref_data = reference
ref_valid = True
blanking_value = self.filter_blanking
valid = ref_data <= blanking_value
valid &= ref_data >= -blanking_value
valid &= ref_valid
if self.is_corrected():
if underlying_fwhm == self.correcting_fwhm:
return
self.undo_filter_correct(valid=valid)
self.data[valid] *= self.get_filter_correction_factor(underlying_fwhm)
self.set_correcting_fwhm(underlying_fwhm)
[docs]
def undo_filter_correct(self, reference=None, valid=None):
"""
Undo the last filter correction.
Performs the reverse of :func:`Map2D.filter_correct` with the last
used underlying FWHM.
Parameters
----------
reference : FlaggedArray or numpy.ndarray (float), optional
The data set to determine valid data within the blanking range.
Defaults to self.data.
valid : numpy.ndarray (bool), optional
`True` indicates a data element that may have the filter correction
factor un-applied.
Returns
-------
None
"""
if not self.is_corrected():
return
if valid is None:
if reference is None:
reference = self
if isinstance(reference, FlaggedArray):
ref_data = reference.data
ref_valid = reference.valid
else:
ref_data = reference
ref_valid = True
blanking_value = self.filter_blanking
valid = ref_data <= blanking_value
valid &= ref_data >= -blanking_value
valid &= ref_valid
last_correction_factor = self.get_filter_correction_factor(
self.correcting_fwhm)
self.data[valid] /= last_correction_factor
self.set_correcting_fwhm(np.nan * units.Unit('degree'))
[docs]
def update_filtering(self, fwhm):
"""
Update the filtering.
The filtering is only updated when it is NaN or the provided filtering
FWHM is less than that which is currently set.
Parameters
----------
fwhm : units.Quantity
Returns
-------
None
"""
if np.isnan(self.filter_fwhm):
self.filter_fwhm = fwhm
else:
self.filter_fwhm = min(self.filter_fwhm, fwhm)
[docs]
def parse_coordinate_info(self, header, alt=''):
"""
Parse and apply the WCS information from a FITS header.
This process sets the grid for the map based on the contents of a given
FITS header.
Parameters
----------
header : astropy.io.fits.header.Header
alt : str, optional
The alternate WCS transform to use. This replaces the "a" part of
the CTYPEia cards.
Returns
-------
None
"""
self.set_grid(Grid2D.from_header(header, alt=alt))
[docs]
def parse_corrected_beam(self, header):
"""
Parse the corrected beam from a FITS header.
The correcting beam FWHM is taken from the CBMAJ and CBMIN keywords,
or the CORRECTN keyword in the FITS header. If neither is found, the
default correcting FWHM defaults to NaN.
Parameters
----------
header : astropy.io.fits.header.Header
Returns
-------
None
"""
major_fwhm_key = f'{self.CORRECTED_BEAM_FITS_ID}BMAJ'
if major_fwhm_key in header:
beam = Gaussian2D()
beam.parse_header(
header=header,
size_unit=self.get_default_grid_unit(),
fits_id=self.CORRECTED_BEAM_FITS_ID)
self.correcting_fwhm = beam.fwhm
else:
unit = self.get_display_grid_unit()
if unit is None: # pragma: no cover
unit = self.get_default_grid_unit()
if unit is None:
unit = 'degree'
else:
if isinstance(unit, units.Quantity):
unit = unit.unit
self.correcting_fwhm = utils.get_header_quantity(
header, 'CORRECTN',
default=np.nan,
default_unit=unit).to(unit)
[docs]
def parse_smoothing_beam(self, header):
"""
Parse the smoothing beam from a FITS header.
The smoothing beam is taken from the SBMAJ and SBMIN keywords, or the
SMOOTH keyword in the FITS header. The no keywords are found, the
smoothing beam FWHM defaults the pixel smoothing.
Parameters
----------
header : astropy.io.fits.header.Header
Returns
-------
None
"""
major_fwhm_key = f'{self.SMOOTHING_BEAM_FITS_ID}BMAJ'
if major_fwhm_key in header:
self.smoothing_beam = Gaussian2D()
self.smoothing_beam.parse_header(
header=header,
size_unit=self.get_default_grid_unit(),
fits_id=self.SMOOTHING_BEAM_FITS_ID)
else:
unit = self.get_display_grid_unit()
if unit is None: # pragma: no cover
unit = self.get_default_grid_unit()
if unit is None:
unit = 'degree'
elif isinstance(unit, units.Quantity):
unit = unit.unit
fwhm = utils.get_header_quantity(
header, 'SMOOTH',
default=0.0,
default_unit=unit).to(unit)
self.smoothing_beam = Gaussian2D(x_fwhm=fwhm, y_fwhm=fwhm)
pixel_smoothing = np.sqrt(self.grid.get_pixel_area()
/ Gaussian2D.AREA_FACTOR)
if not isinstance(pixel_smoothing, units.Quantity):
pixel_smoothing = pixel_smoothing * self.smoothing_beam.x_fwhm.unit
self.smoothing_beam.encompass(pixel_smoothing)
[docs]
def parse_filter_beam(self, header):
"""
Parse the filtering beam from a FITS header.
The correcting beam FWHM is taken from the XBMAJ and XBMIN keywords,
or the EXTFLTR keyword in the FITS header. If neither is found, the
default filtering FWHM defaults to NaN.
Parameters
----------
header : astropy.io.fits.header.Header
Returns
-------
None
"""
major_fwhm_key = f'{self.FILTER_BEAM_FITS_ID}BMAJ'
if major_fwhm_key in header:
beam = Gaussian2D()
beam.parse_header(
header,
size_unit=self.get_default_grid_unit(),
fits_id=self.FILTER_BEAM_FITS_ID)
self.filter_fwhm = beam.fwhm
else:
unit = self.get_display_grid_unit()
if unit is None: # pragma: no cover
unit = self.get_default_grid_unit()
if unit is None:
unit = 'degree'
elif isinstance(unit, units.Quantity):
unit = unit.unit
self.filter_fwhm = utils.get_header_quantity(
header, 'EXTFLTR', default=np.nan,
default_unit=unit).to(unit)
[docs]
def parse_underlying_beam(self, header):
"""
Parse the underlying beam from a FITS header.
Uses IBMAJ/IBMIN if available, will then look to BMAJ/MIN, will then
look for BEAM, and finally the old RESOLUTN.
Parameters
----------
header : astropy.io.fits.header.Header
Returns
-------
None
"""
major_fwhm_key = f'{self.UNDERLYING_BEAM_FITS_ID}BMAJ'
self.underlying_beam = Gaussian2D()
display_unit = self.get_display_grid_unit()
fits_unit = self.get_default_grid_unit()
if display_unit is None: # pragma: no cover
display_unit = 'degree'
elif isinstance(display_unit, units.Quantity):
display_unit = display_unit.unit
if fits_unit is None: # pragma: no cover
fits_unit = 'degree'
elif isinstance(fits_unit, units.Quantity): # pragma: no cover
fits_unit = fits_unit.unit
if major_fwhm_key in header:
self.underlying_beam.parse_header(
header, size_unit=fits_unit,
fits_id=self.UNDERLYING_BEAM_FITS_ID)
elif 'BEAM' in header:
self.underlying_beam.fwhm = utils.get_header_quantity(
header, 'BEAM', default=np.nan,
default_unit=display_unit).to(display_unit)
elif 'BMAJ' in header:
self.underlying_beam.parse_header(
header, size_unit=fits_unit, fits_id='')
self.underlying_beam.deconvolve_with(self.smoothing_beam)
elif 'RESOLUTN' in header:
resolution = utils.get_header_quantity(
header, 'RESOLUTN', default=np.nan,
default_unit=display_unit).to(display_unit)
if self.smoothing_beam is None:
self.underlying_beam.fwhm = resolution
elif resolution > self.smoothing_beam.major_fwhm:
self.underlying_beam.fwhm = np.sqrt(
(resolution ** 2) - (self.smoothing_beam.x_fwhm
* self.smoothing_beam.y_fwhm))
else:
self.underlying_beam.fwhm = 0.0 * units.Unit(display_unit)
else:
self.underlying_beam.fwhm = 0.0 * units.Unit(display_unit)
[docs]
def edit_coordinate_info(self, header):
"""
Update a header with the current WCS information.
Parameters
----------
header : astropy.io.fits.header.Header
Returns
-------
None
"""
if self.grid is not None:
self.grid.edit_header(header)
[docs]
def claim_image(self, image):
"""
Claim an image.
Parameters
----------
image : Image2D
Returns
-------
None
"""
image.set_unit(self.unit)
image.set_parallel(self.parallelism)
image.set_executor(self.executor)
[docs]
def count_beams(self):
"""
Return the number of beams in the map.
Returns
-------
float
"""
return (self.get_area() / self.get_image_beam_area()).decompose().value
[docs]
def count_independent_points(self, area):
"""
Find the number of independent points in a given area.
In 1-D at least 3 points per beam are needed to separate a positioned
point source from an extended source. Similarly, 9 points per beam
are necessary for 2-D.
Parameters
----------
area : astropy.Quantity
The area to consider.
Returns
-------
points : int
"""
beam_area = self.get_image_beam_area()
if self.smoothing_beam is None:
smoothing_area = 0.0
else:
smoothing_area = self.smoothing_beam.area
if self.is_filtered():
area_factor = 2 * np.pi * (gaussian_fwhm_to_sigma ** 2)
filter_area = area_factor * (self.filter_fwhm ** 2)
eta = 1.0 - (smoothing_area / filter_area).decompose().value
else:
eta = 1.0
pixel_area = self.grid.get_pixel_area()
if smoothing_area == 0:
inverse_points_per_beam = 0.0
else:
inverse_points_per_beam = eta * min(
9, smoothing_area / pixel_area)
return int(np.ceil((1.0 + area / beam_area).decompose().value
* inverse_points_per_beam))
[docs]
def nearest_to_offset(self, offset):
"""
Return the nearest map index for a given offset.
Parameters
----------
offset : Coordinate2D or tuple or numpy.ndarray or list.
The spatial offset given as a coordinate or tuple of
(x, y) offsets.
Returns
-------
x_index, y_index : 2-tuple (int or numpy.ndarray)
The x and y map indices.
"""
if not isinstance(offset, Coordinate2D):
self.reuse_index.set(offset)
offset = self.reuse_index
ix, iy = utils.round_values(offset.coordinates)
return ix, iy
[docs]
def convert_range_value_to_index(self, ranges):
"""
Calculate the index range for a given value range.
Converts a range of dimensional offsets to an appropriate range of
map indices for use in cropping.
Parameters
----------
ranges : units.Quantity or Coordinate2D
An array of shape (n_dimensions, 2) containing the minimum and
maximum ranges in each dimension. Dimension ordering is FITS based
(x, y).
Returns
-------
index_range : numpy.ndarray (int)
The ranges as indices (integer values) on the grid.
"""
ranges = Coordinate2D(ranges, unit=self.get_display_grid_unit())
index_range = np.asarray(self.nearest_to_offset(
self.grid.offset_to_index(ranges)))
if self.verbose:
span = ranges.span
if (isinstance(span.unit, units.Unit)
and span.unit != units.dimensionless_unscaled):
unit_str = f' {span.unit}'
distance = span.coordinates.value
else: # pragma: no cover
unit_str = ''
distance = span.coordinates
distance_string = 'x'.join(str(x) for x in distance) + unit_str
log.debug(f"Will crop to {distance_string}.")
return index_range
[docs]
def crop(self, ranges):
"""
Crop the image data.
Parameters
----------
ranges : numpy.ndarray (int,) or units.Quantity
The ranges to set crop the data to. Should be of shape
(n_dimensions, 2) where ranges[0, 0] would give the minimum crop
limit for the first dimension and ranges[0, 1] would give the
maximum crop limit for the first dimension. In this case, the
'first' dimension is in FITS format. i.e., (x, y) for a 2-D image.
If a Quantity is supplied this should contain the min and max
grid values to clip to in each dimension.
Returns
-------
None
"""
if self.data is None or self.size == 0:
return
if isinstance(ranges, units.Quantity):
ranges = self.convert_range_value_to_index(ranges)
else:
ranges = np.asarray(self.nearest_to_offset(ranges))
self.get_image().crop(ranges)
self.grid.reference_index.subtract(Coordinate2D(ranges[:, 0]))
[docs]
def auto_crop(self):
"""
Auto crop the image data.
The data is cropped to the extent of valid data point indices.
Returns
-------
ranges : numpy.ndarray (int)
The cropping range for each dimension of shape (n_dimensions, 2)
where [..., 0] is the minimum range and [..., 1] is the maximum
crop value (inclusive).
"""
if self.data is None:
return
ranges = self.get_index_range() # This is a Coordinate2D (float)
ranges = ranges.coordinates.astype(int)
if len(ranges) != 2 or None in ranges: # pragma: no cover
return
if (np.allclose(ranges[:, 0], 0)
and np.allclose(ranges[:, 1], self.shape[::-1])):
return # No cropping required
if self.verbose:
distance = ranges[:, 1] - ranges[:, 0]
distance_string = 'x'.join([str(x) for x in distance])
log.debug(f"Auto-cropping: {distance_string}")
self.crop(ranges)
return ranges
[docs]
def smooth_to(self, psf):
"""
Smooth to a given PSF or FWHM.
Parameters
----------
psf : float or Gaussian2D or Quantity
Returns
-------
None
"""
if not isinstance(psf, Gaussian2D):
if not isinstance(psf, units.Quantity):
psf = psf * self.get_display_grid_unit()
psf = Gaussian2D(x_fwhm=psf, y_fwhm=psf)
if self.smoothing_beam is not None:
if self.smoothing_beam.is_encompassing(psf):
return
psf.deconvolve_with(self.smoothing_beam)
self.smooth_with_psf(psf)
[docs]
def smooth_with_psf(self, psf):
"""
Smooth with a given PSF or FWHM.
Parameters
----------
psf : float or Gaussian2D or Quantity
Returns
-------
None
"""
if not isinstance(psf, Gaussian2D):
if not isinstance(psf, units.Quantity):
psf = psf * self.get_display_grid_unit()
psf = Gaussian2D(x_fwhm=psf, y_fwhm=psf)
extent = psf.extent()
steps = Index2D(
extent.coordinates / (5 * self.grid.get_pixel_size().coordinates))
beam_map = psf.get_beam_map(self.grid)
self.fast_smooth(beam_map, steps)
[docs]
def smooth(self, beam_map, reference_index=None, weights=None):
"""
Smooth the data with a given beam map kernel.
Parameters
----------
beam_map : numpy.ndarray (float)
The beam map image kernel with which to smooth the map of shape
(ky, kx).
reference_index : Coordinate2D or numpy.ndarray
The reference pixel index of the kernel in (x, y).
weights : numpy.ndarray (float)
The map weights of shape (ny, nx).
Returns
-------
None
"""
super().smooth(beam_map, reference_index=reference_index,
weights=weights)
self.add_smoothing(
self.default_beam().get_equivalent(beam_map, self.grid.resolution))
[docs]
def fast_smooth(self, beam_map, steps, reference_index=None, weights=None):
"""
Smooth the data with a given beam map kernel using fast method.
Parameters
----------
beam_map : numpy.ndarray (float)
The beam map image kernel with which to smooth the map of shape
(ky, kx).
steps : Index2D
The fast step skips in (x, y).
reference_index : Coordinate2D, optional
The reference pixel index of the kernel in (x, y). The default is
the beam map center ((kx-1)/2, (ky-1)/2).
weights : numpy.ndarray (float), optional
The map weights of shape (ny, nx). The default is no weighting.
Returns
-------
None
"""
super().fast_smooth(beam_map, steps, reference_index=reference_index,
weights=weights)
self.add_smoothing(
self.default_beam().get_equivalent(beam_map, self.grid.resolution))
[docs]
def filter_above(self, fwhm, valid=None):
"""
Filter the image using the supplied filtering FWHM.
The image data is modified via::
filtered = data - convolve(data, Gaussian(fwhm))
Parameters
----------
fwhm : units.Quantity or Coordinate2D1
The FWHM of the Gaussian with which to filter the image.
valid : numpy.ndarray (bool), optional
An optional mask where `False` excludes a map element from
inclusion in the convolution and subtraction.
Returns
-------
None
"""
extended = self.copy()
if (hasattr(extended, 'is_zero_weight_valid') # pragma: no cover
and hasattr(extended, 'validate')): # pragma: no cover
# Make sure zero weights are flagged
extended.validate()
extended.is_zero_weight_valid = True
# Null out the points that are to be skipped over by the validator
if valid is not None:
valid = valid & self.valid
invalid = np.logical_not(valid)
extended.clear(indices=invalid)
extended.unflag(indices=invalid)
else:
valid = None
extended.smooth_to(fwhm)
image = self.get_image()
image.add(extended, indices=valid, factor=-1)
self.update_filtering(fwhm)
[docs]
def fft_filter_above(self, fwhm, valid=None, weight=None):
"""
Filter the image with the supplied FWHM using the FFT method.
Parameters
----------
fwhm : units.Quantity
The FWHM of the Gaussian with which to filter the image.
valid : numpy.ndarray (bool), optional
An optional mask where `False` excludes a map element from
inclusion in the convolution and subtraction.
weight : numpy.ndarray (float), optional
An optional weighting array with the same shape as the map data.
These should be the inverse variance values.
Returns
-------
None
"""
ny = numba_functions.pow2ceil(self.shape[0] * 2)
nx = numba_functions.pow2ceil(self.shape[1] * 2)
if valid is None:
valid = self.valid
else:
valid = valid & self.valid
if weight is None:
weight = np.ones(self.shape, dtype=float)
elif isinstance(weight, FlaggedArray):
weight = weight.data
weight = weight * valid
sum_weight = (weight[valid] ** 2).sum()
n = valid.sum()
rmsw = np.sqrt(sum_weight / n)
if rmsw <= 0:
return
values = np.zeros(valid.shape, dtype=float)
values[valid] = weight[valid] * self.data[valid]
transformer = np.zeros((ny, nx), dtype=float)
transformer[:valid.shape[0], :valid.shape[1]] = values
y, x = np.indices(transformer.shape)
x -= nx // 2 - 1
y -= ny // 2 - 1
sigma = fwhm * gaussian_fwhm_to_sigma
resolution = self.grid.resolution.coordinates
a = -0.5 * ((resolution / sigma) ** 2).decompose().value
g = np.exp(a[0] * x ** 2 + a[1] * y ** 2)
c = fftconvolve(transformer, g / g.sum(), mode='same')
c = c[:self.shape[0], :self.shape[1]]
image = self.get_image()
image.add(c, factor=-1 / rmsw) # Subtract re-weighted
self.update_filtering(fwhm)
[docs]
def resample_from_map(self, map2d, weights=None):
"""
Resample from one map to another.
Parameters
----------
map2d : Map2D
The map to resample from.
weights : numpy.ndarray (float), optional
Optional weights to include during the resampling. Weights should
be the same shape as the `map2d` and represent inverse variance.
Returns
-------
None
"""
beam = self.get_anti_aliasing_beam_image_for(map2d)
map_indices = self.get_index_transform_to(map2d)
if isinstance(map_indices, Coordinate2D1):
map_indices = map_indices.to_coordinate_3d()
self.resample_from(map2d, map_indices, kernel=beam, weights=weights)
self.copy_processing_from(map2d)
[docs]
def resample(self, resolution):
"""
Resample the map to a new resolution.
Parameters
----------
resolution : units.Quantity
A resolution for all dimensions if scalar, or each dimension if
supplied as an array of shape (n_dimensions,) using the (x, y)
FITS convention.
Returns
-------
None
"""
original = self.copy()
original.fits_properties = self.fits_properties.copy()
if resolution.shape == ():
resolution = np.full(self.ndim, resolution.value) * resolution.unit
scaling = (self.grid.resolution.coordinates / resolution
).decompose().value
new_shape = np.ceil(np.asarray(self.shape) * scaling).astype(int)
self.set_data_shape(new_shape)
self.set_grid(original.grid.for_resolution(resolution))
self.resample_from_map(original)