Source code for sofia_redux.scan.frames.frames

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from abc import abstractmethod
from astropy import units
import inspect
import numpy as np

from sofia_redux.scan.flags.frame_flags import FrameFlags
from sofia_redux.scan.flags.flagged_data import FlaggedData
from sofia_redux.scan.utilities.class_provider import \
    frames_instance_for
from sofia_redux.scan.frames import frames_numba_functions
from sofia_redux.scan.coordinate_systems.coordinate import Coordinate
from sofia_redux.scan.coordinate_systems.coordinate_2d import Coordinate2D
from sofia_redux.scan.coordinate_systems.equatorial_coordinates import \
    EquatorialCoordinates
from sofia_redux.scan.coordinate_systems.epoch.epoch import J2000
from sofia_redux.scan.flags.mounts import Mount
from sofia_redux.scan.coordinate_systems.index_2d import Index2D

__all__ = ['Frames']


[docs] class Frames(FlaggedData): flagspace = FrameFlags def __init__(self): """ The Frames class contains all time-dependent data in an integration. """ super().__init__() self.mjd = None self.lst = None self.sin_a = None self.cos_a = None self.dof = None self.dependents = None self.relative_weight = None self.sign = None self.transmission = None self.temp_c = None self.temp_wc = None self.temp_wc2 = None self.has_telescope_info = None self.valid = None self.validated = None # Special 2d data self.data = None self.sample_flag = None self.source_index = None self.map_index = None self.sample_equatorial = None # Vectors self.equatorial = None self.chopper_position = None # Special reference self.integration = None self.equatorial_system = None # channel indices self.frame_fixed_channels = None @property def referenced_attributes(self): """ Referenced attributes are those that are referenced during copy. Returns ------- set (str) """ refs = super().referenced_attributes refs.add('integration') return refs @property def readout_attributes(self): """ Returns attributes that will be operated on by the `shift` method. Returns ------- set (str) """ return {'data'} @property def default_field_types(self): """ Used to define the default values for data arrays. Returns a dictionary of structure {field: default_value}. The default values have the following effects: type - empty numpy array of the given type. value - full numpy array of the given value. astropy.units.Unit - empty numpy array (float) in the given unit. astropy.units.Quantity - full numpy array of the given quantity. If a tuple is provided, the array will have additional axes appended such that the first element gives the type as above, and any additional integers give additional axes dimensions, e.g. (0.0, 2, 3) would result in a numpy array filled with zeros of shape (self.size, 2, 3). Returns ------- fields : dict """ fields = super().default_field_types fields.update({ 'mjd': np.nan, 'lst': np.nan * units.Unit('hourangle'), 'sin_a': np.nan, 'cos_a': np.nan, 'dof': 1.0, 'dependents': 0.0, 'relative_weight': 1.0, 'sign': 1, 'transmission': 1.0, 'temp_c': float, 'temp_wc': float, 'temp_wc2': float, 'has_telescope_info': True, 'valid': True, 'validated': False, 'equatorial': (EquatorialCoordinates, 'degree'), 'chopper_position': (Coordinate2D, 'arcsec'), 'frame_fixed_channels': -1, }) return fields @property def default_channel_fields(self): """ Returns default frame/channel type default values. This framework is similar to `default_field_types`, but is used to populate frame/channel data of shape (n_frames, n_channels). Returns ------- fields : dict """ return {'data': float, 'sample_flag': 0, 'source_index': -1, 'map_index': (Index2D, -1), 'sample_equatorial': units.Unit('deg')} @property def internal_attributes(self): """ Returns attribute names that are internal to the data for get actions. These attributes should always be returned as-is regardless of indexing. Returns ------- set (str) """ attributes = super().internal_attributes attributes.add('frame_fixed_channels') return attributes @property def channel_size(self): """ The number of channels in the frame data. Returns ------- int """ if self.data is None: return 0 return self.data.shape[1] @property def scan(self): if self.integration is None: return None else: return self.integration.scan @property def info(self): """ Return the scan info object. Returns ------- Info """ if self.scan is None: return None return self.scan.info @property def astrometry(self): """ Returns the astrometry information from the parent scan. Returns ------- AstrometryInfo """ if self.info is None: return None return self.info.astrometry @property def scan_equatorial(self): """ Return the scan equatorial coordinates. Returns ------- equatorial : EquatorialCoordinates The equatorial coordinates (single RA/DEC) of the scan. """ if self.astrometry is None: return None return self.astrometry.equatorial @property def channels(self): if self.integration is None: return None else: return self.integration.channels @property def configuration(self): if self.info is None: return None else: return self.info.configuration @property def channel_fixed_index(self): if self.channels is None: return None else: return self.channels.data.fixed_index @property def special_fields(self): """ Return fields that do not comply with the shape of other data. This is of particular importance for `delete_indices`. Although all arrays must have shape[0] = self.size, special handling may be required in certain cases. Returns ------- fields : set (str) """ fields = super().special_fields fields.add('integration') return fields
[docs] @classmethod def instance_from_instrument_name(cls, name): """ Return a Frames instance for a given instrument. Parameters ---------- name : str The name of the instrument. Returns ------- Frames """ return frames_instance_for(name)
[docs] def insert_blanks(self, insert_indices): """ Inserts blank frame data. Actual indices should be passed in. To delete based on fixed index values, please convert first using `find_fixed_indices`. Blank data are set to 0 in whatever unit is applicable. Parameters ---------- insert_indices : numpy.ndarray of (int) The index locations to insert. Returns ------- None """ super().insert_blanks(insert_indices) new_invalid = insert_indices + np.arange(insert_indices.size) self.valid[new_invalid] = False
[docs] def initialize(self, integration, size): """ Initializes a frame object with default values. Parameters ---------- integration : Integration size : int The total number of frames. Returns ------- None """ self.set_frame_size(size) self.integration = integration self.set_channels(self.channels) # self.channels are from integration self.equatorial.set_epoch(self.integration.info.telescope.epoch)
[docs] def set_default_channel_values(self, channel_size): """ Populate channel data fields with default values. The default values are loaded from the `default_channel_fields` property which returns a dictionary of the form {field_name: value}. If the value is a type, the default values will be empty numpy arrays. Other valid values can be astropy quantities or standard python types such as int, float, str, etc. All fields will be set to numpy arrays of the same type and filled with the same value. All arrays will be of the shape (self.size, channel_size). Parameters ---------- channel_size : int The number of channels Returns ------- None """ shape = self.size, channel_size for key, value in self.default_channel_fields.items(): if isinstance(value, type): setattr(self, key, np.empty(shape, dtype=value)) elif isinstance(value, units.Quantity): setattr(self, key, np.full(shape, value.value) * value.unit) elif isinstance(value, units.UnitBase): setattr(self, key, np.empty(shape, dtype=float) * value) elif isinstance(value, tuple): # only handle coordinate tuples if (inspect.isclass(value[0]) and issubclass(value[0], Coordinate)): coordinate_class, fill_value = value[0], value[1] base = coordinate_class() base.set_shape(shape) if isinstance(fill_value, (units.UnitBase, str)): base.change_unit(fill_value) else: base.fill(fill_value) setattr(self, key, base) else: setattr(self, key, np.full(shape, value))
[docs] def set_channels(self, channels=None): """ Predominantly updates fixed channel indices. Will set default values if the channel type data has not already been defined. Otherwise, the channels provided must be a subset of the previously defined channels. In this case, channel type data is slimmed down to those that are still found in the new channels. Parameters ---------- channels : Channels, optional The channels from which to base channel type data. Defaults to integration channels. Returns ------- None """ if channels is None: channels = self.channels if channels is None or self.channels is None: raise ValueError("No channels supplied or available.") if self.data is None: self.set_default_channel_values(channels.size) self.frame_fixed_channels = self.channel_fixed_index.copy() return self.validate_channel_indices()
[docs] def find_channel_fixed_indices(self, fixed_indices, cull=True): """ Returns the actual indices given channel fixed indices. The fixed indices are those that are initially loaded. Returned indices are their locations in the data arrays. This operates on the channel fixed indices, important for some fields of the frame data (data, sample_flags, etc.). Parameters ---------- fixed_indices : int or np.ndarray (int) The fixed indices of the channels. cull : bool, optional If `True`, do not include fixed indices not found in the result. If `False`, missing indices will be replaced by -1. Returns ------- indices : ndarray of int The indices of `fixed_indices` in the data arrays. """ if self.channels is None: return np.empty(0, dtype=int) return self.channels.data.find_fixed_indices(fixed_indices, cull=cull)
[docs] def set_frame_size(self, n_frames): """ Set the number of frames in an integration and default values. Parameters ---------- n_frames : int Returns ------- None """ self.fixed_index = np.arange(n_frames) self.set_default_values()
[docs] def get_frame_count(self, keep_flag=None, discard_flag=None, match_flag=None): """ Return the number of frames in an integration. A number of flags may also be supplied to return the number of a certain type of frame. Parameters ---------- keep_flag : int or ChannelFlagTypes, optional Flag values to keep in the calculation. discard_flag : int or ChannelFlagTypes, optional Flag values to discard_flag from the calculation. match_flag : int or ChannelFlagTypes, optional Only matching flag values will be used in the calculation. Returns ------- n_frames : int The number of matching frames in the integration. """ if self.size == 0: return 0 elif keep_flag is None and discard_flag is None and match_flag is None: return self.valid.sum() else: valid = self.get_flagged_indices(keep_flag=keep_flag, discard_flag=discard_flag, match_flag=match_flag, indices=False) valid &= self.valid return valid.sum()
[docs] def set_transmission(self, transmission, indices=None): """ Set the frame transmission. Parameters ---------- transmission : float or numpy.ndarray (float) The transmission values to set. indices : int or slice or numpy.ndarray (int or bool) The frame indices to update. The default is all frames. Returns ------- None """ if indices is None: indices = slice(None) if self.is_singular: self.transmission = transmission else: self.transmission[indices] = transmission
[docs] def slim(self, channels=None): """ Slim channel type data to those channels still present. Parameters ---------- channels : Channels, optional The channels object. If not supplied, defaults to the integration channels. Returns ------- None """ if channels is None: self.validate_channel_indices() else: self.set_channels(channels) self.source_index = None self.map_index = None
[docs] def validate_channel_indices(self): """ Check that the frame channel indices are consistent. Returns ------- None """ for indices in [self.channel_fixed_index, self.frame_fixed_channels]: if not isinstance(indices, np.ndarray): raise ValueError("Channel fixed indices for frames not set.") if self.channel_fixed_index.shape == self.frame_fixed_channels.shape: if np.allclose(self.channel_fixed_index, self.frame_fixed_channels): return mask = self.channel_fixed_index[:, None] == ( self.frame_fixed_channels) present = np.any(mask, axis=0) indices = np.nonzero(present)[0] for channel_attribute in self.default_channel_fields.keys(): value = getattr(self, channel_attribute, None) if value is None: continue setattr(self, channel_attribute, value[:, indices]) self.frame_fixed_channels = self.channel_fixed_index.copy()
[docs] def jackknife(self): """ Randomly set signs for roughly half of the frames. Returns ------- None """ random = np.random.random(self.size) self.sign[random < 0.5] *= -1
[docs] def get_source_gain(self, mode_flag): """ Return the source gain. The basic frame class will only return a result for the TOTAL_POWER flag. All others will raise an error. The source gain here is defined as gain = transmission * sign. Parameters ---------- mode_flag : FrameFlagTypes or str or int The gain mode flag type. Returns ------- gain : numpy.ndarray (float) The source gains. """ mode_flag = self.flagspace.convert_flag(mode_flag) if mode_flag != self.flagspace.flags.TOTAL_POWER: raise ValueError(f"{self.__class__} does not define " f"{mode_flag} signal mode.") return self.sign * self.transmission
[docs] def validate(self): """ Validate frame data after read. Should set the `validated` (checked) attribute if necessary. Returns ------- None """ native_coordinates = self.get_absolute_native_coordinates() frames_numba_functions.validate_frames( valid=self.valid, cos_a=self.cos_a, sin_a=self.sin_a, native_sin_lat=native_coordinates.sin_lat, native_cos_lat=native_coordinates.cos_lat, validated=self.validated, has_telescope_info=self.has_telescope_info, mount=self.info.instrument.mount.value, left_nasmyth=Mount.LEFT_NASMYTH.value, right_nasmyth=Mount.RIGHT_NASMYTH.value)
[docs] def get_equatorial(self, offsets, indices=None, equatorial=None): """ Return equatorial coordinates given offsets from the base equatorial. The return result (lon, lat) is: lon = base_lon + (position.lon / cos(scan.lat)) lat = base_lat + position.lat Parameters ---------- offsets : Coordinate2D The (x, y) equatorial offsets of shape () or (shape,) indices : numpy.ndarray (int), optional The frame indices that apply. The default is all indices. equatorial : EquatorialCoordinates, optional The equatorial output frame. The default is the same as the frame equatorial frame. Returns ------- equatorial : EquatorialCoordinates """ indices, size = self.get_index_size(indices) if equatorial is None: equatorial = self.equatorial.empty_copy() if equatorial.epoch.singular: equatorial.epoch = self.equatorial.epoch.copy() else: # pragma: no cover # not commonly used equatorial.epoch = self.equatorial.epoch[indices].copy() native_offsets = self.get_native_xy(offsets, indices=indices) x = (self.equatorial.x[indices] / self.info.astrometry.equatorial.cos_lat) y = self.equatorial.y[indices] shaped = len(native_offsets.shape) > 1 if shaped: x, y = x[..., None], y[..., None] equatorial.set_native_longitude(x + native_offsets.x) equatorial.set_native_latitude(y + native_offsets.y) return equatorial
[docs] def get_equatorial_native_offset(self, position, indices=None, offset=None): """ Return the horizontal offsets of a position relative to scan center. The final return position is the sum of the position offsets, and the equatorial coordinates of the frame relative to the scan center. Parameters ---------- position : Coordinate2D The (x, y) horizontal offsets of shape () or (m,) giving the (lon, lat) offset positions. If not set, the default is to return the offsets relative to the scan equatorial position. indices : int or numpy.ndarray (int), optional The frame indices that apply of shape (n,) (if an array was used). The default is all indices. offset : Coordinate2D, optional An optional output array to store and return the coordinates. Returns ------- native_offsets : Coordinate2D An array containing the sum of the equatorial offsets of the frame data and the supplied positions. If multiple frame indices and multiple positions are supplied, the resulting coordinate shape will be (n, m). Otherwise, the result will be shaped as either (n,) or (m,) or () depending on if indices/position are singular. """ offset = self.get_base_equatorial_native_offset( indices=indices, offset=offset) native_position = self.get_native_xy(position, indices=indices) shaped = len(native_position.shape) > 1 x, y = offset.x, offset.y if shaped: x, y = x[..., None], y[..., None] offset.set_x(x + position.x, copy=False) offset.set_y(y + position.y, copy=False) return offset
[docs] def get_base_equatorial_native_offset(self, indices=None, offset=None): """ Return equatorial native offsets of the frames from the scan reference. Parameters ---------- indices : int or slice or numpy.ndarray (int or bool) The frame indices for which to extract the offsets. offset : Coordinate2D, optional An optional coordinate object to hold the results. Returns ------- equatorial_offsets : Coordinate2D """ if indices is None: indices = slice(None) if self.is_singular: equatorial = self.equatorial else: equatorial = self.equatorial[indices] return equatorial.get_native_offset_from(self.scan_equatorial, offset=offset)
[docs] def get_base_native_offset(self, indices=None, offset=None): """ Return equatorial native offsets of the frames from the scan reference. Parameters ---------- indices : int or slice or numpy.ndarray (int or bool) The frame indices for which to extract the offsets. offset : Coordinate2D, optional An optional coordinate object to hold the results. Returns ------- equatorial_offsets : Coordinate2D """ return self.get_base_equatorial_native_offset( indices=indices, offset=offset)
[docs] def get_native_offset(self, position, indices=None, offset=None): """ Get the equatorial offsets for the given position. Parameters ---------- position : Coordinate2D The (x, y) offsets. indices : numpy.ndarray (int), optional The frame indices that apply. The default is all indices. offset : Coordinate2D, optional The coordinate object on which to store the offsets (returned). Returns ------- native_offsets : Coordinate2D """ return self.get_equatorial_native_offset( position, indices=indices, offset=offset)
[docs] def get_focal_plane_offset(self, position, indices=None, offset=None): """ Return the offsets on the focal plane from given positional offsets. The positions are the rotated from telescope coordinates to pixel coordinates. Parameters ---------- position : Coordinate2D The native offsets to convert to a focal plane offset. indices : numpy.ndarray (int), optional The frame indices that apply. The default is all indices. offset : Coordinate2D, optional An optional container to store the offsets (returned). Returns ------- offsets : Coordinate2D The (x, y) offset coordinates on the focal plane. """ if indices is None: indices = slice(None) offset = self.get_base_native_offset(indices=indices, offset=offset) shaped = offset.shape != () and position.shape != () px, py = position.x, position.y if self.is_singular: cos_a, sin_a = self.cos_a, self.sin_a else: cos_a, sin_a = self.cos_a[indices], self.sin_a[indices] rx = (offset.x * cos_a) + (offset.y * sin_a) ry = (offset.y * cos_a) - (offset.x * sin_a) if shaped: px, py = px[None], py[None] rx, ry = rx[..., None], ry[..., None] offset.set_x(px + rx, copy=False) offset.set_y(py + ry, copy=False) return offset
[docs] def get_apparent_equatorial(self, indices=None, apparent=None): """ Precess equatorial coordinates to the Scan MJD. Parameters ---------- indices : int or slice or numpy.ndarray (int or bool) The frame indices for which to get the apparent equatorial coordinates. The default is all indices. apparent : EquatorialCoordinates, optional If supplied, will be updated with the equatorial coordinates and returned. Otherwise, a fresh coordinate frame will be created. Returns ------- apparent_equatorial : EquatorialCoordinates """ if indices is None: indices = slice(None) if self.is_singular: equatorial = self.equatorial else: equatorial = self.equatorial[indices] if apparent is None: apparent = equatorial.copy() else: apparent.copy_coordinates(equatorial) self.info.astrometry.to_apparent.precess(apparent) return apparent
[docs] @abstractmethod def get_absolute_native_coordinates(self): # pragma: no cover """ Get absolute spherical (including chopper) coords in telescope frame. This is named getNativeCoords() in CRUSH Returns ------- coordinates : SphericalCoordinates """ pass
[docs] @abstractmethod def get_absolute_native_offsets(self): # pragma: no cover """ Return absolute spherical offsets in telescope frame. This is named GetNativeOffset() in CRUSH Returns ------- offsets : Coordinate2D The (x, y) native offsets. """ pass
[docs] def pointing_at(self, offset, indices=None): """ Applies pointing correction to coordinates via subtraction. Parameters ---------- offset : astropy.units.Quantity (numpy.ndarray) An array of indices : numpy.ndarray (int), optional The frame indices that apply. The default is all indices. Returns ------- None """ native_offset = self.get_absolute_native_offsets() if native_offset is not None: if indices is not None: if self.is_singular: sub_offset = native_offset else: sub_offset = native_offset[indices] sub_offset.subtract(offset) if self.is_singular: native_offset = sub_offset else: native_offset[indices] = sub_offset else: native_offset.subtract(offset) coordinates = self.get_absolute_native_coordinates() if coordinates is not None: if indices is not None: if self.is_singular: sub_coord = coordinates else: sub_coord = coordinates[indices] sub_coord.subtract_offset(offset) if self.is_singular: coordinates = sub_coord else: coordinates[indices] = sub_coord else: coordinates.subtract_offset(offset)
[docs] def scale(self, factor, indices=None): """ Scale all data (in `data`) by `factor`. Parameters ---------- factor : int or float indices : int or slice or numpy.ndarray (int or bool) The frame indices to scale. The default is all. Returns ------- None """ if indices is None: indices = slice(None) if factor == 0: if self.is_singular: self.data = 0.0 else: self.data[indices] = 0.0 else: if self.is_singular: self.data *= factor else: self.data[indices] *= factor
[docs] def invert(self, indices=None): """ Multiply all data (in `data`) by -1. Parameters ---------- indices : int or slice or numpy.ndarray (int or bool) The frame indices to scale. The default is all. Returns ------- None """ self.scale(-1.0, indices=indices)
[docs] def get_rotation(self, indices=None): """ Returns the tan of sin(angle), cos(angle). Parameters ---------- indices : numpy.ndarray (int), optional The frame indices. The default is all indices. Returns ------- angle : astropy.units.Quantity (numpy.ndarray) An array of angles of size (N,) or (indices.size,). """ rad = units.Unit('radian') if indices is None: indices = slice(None) if self.is_singular: angle = np.arctan2(self.sin_a, self.cos_a) else: angle = np.arctan2(self.sin_a[indices], self.cos_a[indices]) * rad if isinstance(angle, units.Quantity): angle = angle.to(rad) else: angle *= rad return angle
[docs] def set_rotation(self, angle, indices=None): """ Set the `sin_a` and `cos_a` attributes from an angle. The `sin_a` and `cos_a` attributes define the rotation from the pixel coordinates to the telescope coordinates. Parameters ---------- angle : int or float or numpy.ndarray or Quantity The angle to set. If an array is supplied, must be the same size as `indices`. indices : numpy.ndarray (int), optional The frame indices to set. The default is all indices. Returns ------- None """ if indices is None: indices = slice(None) if self.is_singular: self.sin_a = np.sin(angle) self.cos_a = np.cos(angle) else: self.sin_a[indices] = np.sin(angle) self.cos_a[indices] = np.cos(angle)
[docs] def get_native_x(self, focal_plane_position, indices=None): """ Return the native x coordinates from a given focal plane position. Parameters ---------- focal_plane_position : Coordinates2D The (x, y) focal plane offsets. indices : int or numpy.ndarray or int, optional The frame indices for which to calculate x. The default is all frames of shape. If a slice or array is provided, will be of length n. Returns ------- native_x : astropy.units.Quantity (numpy.ndarray) An array of shape () or (n,) or (n, shape). """ indices, n = self.get_index_size(indices) single_index = n == 0 single_position = focal_plane_position.shape == () if self.is_singular: cos_a = self.cos_a sin_a = self.sin_a else: cos_a = self.cos_a[indices] sin_a = self.sin_a[indices] xp = focal_plane_position.x yp = focal_plane_position.y if single_index or single_position: x = cos_a * xp x -= sin_a * yp else: x = cos_a[..., None] * xp[None] x -= sin_a[..., None] * yp[None] return x
[docs] def get_native_y(self, focal_plane_position, indices=None): """ Return the native y coordinates from a given focal plane position. Parameters ---------- focal_plane_position : Coordinate2D The (x, y) focal plane offsets as a scalar or shape (shape,) indices : int or numpy.ndarray or int, optional The frame indices for which to calculate x. The default is all frames of shape. If a slice or array is provided, will be of length n. Returns ------- native_xy : astropy.units.Quantity (numpy.ndarray) An array of shape () or (n,) or (n, shape). """ indices, n = self.get_index_size(indices) single_index = n == 0 single_position = focal_plane_position.shape == () if self.is_singular: cos_a = self.cos_a sin_a = self.sin_a else: cos_a = self.cos_a[indices] sin_a = self.sin_a[indices] xp = focal_plane_position.x yp = focal_plane_position.y if single_index or single_position: y = sin_a * xp y += cos_a * yp else: y = sin_a[..., None] * xp[None] y += cos_a[..., None] * yp[None] return y
[docs] def get_native_xy(self, focal_plane_position, indices=None, coordinates=None): """ Return the native x/y coordinates from a given focal plane position. Rotates the focal plane positions by the stored cos(a) and sin(a) frame positions. Parameters ---------- focal_plane_position : Coordinates2D The (x, y) focal plane offsets as a scalar of shape (shape,). indices : int or numpy.ndarray or int, optional The frame indices for which to calculate x. The default is all frames of shape. If a slice or array is provided, will be of length n. coordinates : Coordinate2D, optional An optional output coordinate system to hold the result (returned). Returns ------- native_xy : Coordinates2D The native (x, y) coordinates. """ indices, n = self.get_index_size(indices) single_index = n == 0 single_position = focal_plane_position.shape == () if self.is_singular: cos_a = self.cos_a sin_a = self.sin_a else: cos_a = self.cos_a[indices] sin_a = self.sin_a[indices] xp = focal_plane_position.x yp = focal_plane_position.y if single_index or single_position: x = cos_a * xp x -= sin_a * yp y = sin_a * xp y += cos_a * yp else: x = cos_a[..., None] * xp[None] x -= sin_a[..., None] * yp[None] y = sin_a[..., None] * xp[None] y += cos_a[..., None] * yp[None] if coordinates is None: coordinates = Coordinate2D(unit=self.equatorial.offset_unit) coordinates.set_x(x, copy=False) coordinates.set_y(y, copy=False) return coordinates
[docs] def add_data_from(self, other_frames, scaling=1.0, indices=None): """ Add data from other frames to these. The other frames' data must match the shape of the indices specified, or the current frames' full data array if indices=None. Parameters ---------- other_frames : Frames scaling : float, optional indices : int or slice or numpy.ndarray (int or bool) The frame indices to add to. Returns ------- None """ if indices is None: indices = slice(None) if scaling != 1.0: if self.is_singular: self.data = self.data + (scaling * other_frames.data) else: self.data[indices] += scaling * other_frames.data else: if self.is_singular: self.data = self.data + other_frames.data else: self.data[indices] += other_frames.data if self.is_singular: self.sample_flag = self.sample_flag | other_frames.sample_flag else: self.sample_flag[indices] |= other_frames.sample_flag
[docs] def project(self, position, projector, indices=None): """ Project focal plane offsets. Parameters ---------- position : Coordinate2D The (x, y) position to project to offset. projector : AstroProjector The projector to store and determine the projected offsets. indices : int or slice or numpy.ndarray (int or bool) The frame indices to project. Returns ------- offsets : Coordinate2D The projector offsets. These will also be stored in the projector offsets attribute """ if projector.is_focal_plane(): projector.set_reference_coordinates() # Deproject SFL focal plane offsets focal_plane_offset = self.get_focal_plane_offset( position, indices=indices) projector.coordinates.add_native_offset(focal_plane_offset) projector.project() elif self.info.astrometry.is_nonsidereal: projector.set_reference_coordinates() # Deproject SFL native offsets equatorial_offset = self.get_equatorial_native_offset( position, indices=indices) projector.equatorial.add_native_offset(equatorial_offset) projector.project_from_equatorial() else: self.get_equatorial( position, indices=indices, equatorial=projector.equatorial) projector.project_from_equatorial() return projector.offset
[docs] def native_to_native_equatorial_offset( self, offset, indices=None, in_place=True): """ Convert native offsets to native equatorial offsets. Parameters ---------- offset : Coordinate2D The native (x, y) offsets. indices : int or slice or numpy.ndarray (int or bool) The frame indices for which to calculate offsets. The default is all frames (not used by equatorial frames). in_place : bool, optional If `True`, modify the coordinates in place. Otherwise, return a copy of the offsets. Returns ------- native_equatorial_offsets : Coordinate2D """ return offset if in_place else offset.copy()
[docs] def native_to_equatorial_offset(self, offset, indices=None, in_place=True): """ Convert native offsets to equatorial offsets. Parameters ---------- offset : Coordinate2D The native (x, y) offsets. indices : int or slice or numpy.ndarray (int or bool) The frame indices for which to calculate offsets. The default is all frames (not used by equatorial frames). in_place : bool, optional If `True`, modify the coordinates in place. Otherwise, return a copy of the offsets. Returns ------- equatorial_offsets : Coordinate2D """ if not in_place: offset = offset.copy() offset = self.native_to_native_equatorial_offset(offset, in_place=True) offset.scale_x(-1.0) return offset
[docs] def native_equatorial_to_native_offset( self, offset, indices=None, in_place=True): """ Convert native equatorial offsets to native offsets. Parameters ---------- offset : Coordinate2D The native (x, y) equatorial offsets. indices : int or slice or numpy.ndarray (int or bool) The frame indices for which to calculate offsets. The default is all frames (not used by equatorial frames). in_place : bool, optional If `True`, modify the coordinates in place. Otherwise, return a copy of the offsets. Returns ------- native_offsets : Coordinate2D """ return offset if in_place else offset.copy()
[docs] def equatorial_to_native_offset(self, offset, indices=None, in_place=True): """ Convert equatorial offsets to native offsets. Parameters ---------- offset : Coordinate2D The equatorial (x, y) offsets. indices : int or slice or numpy.ndarray (int or bool) The frame indices for which to calculate offsets. The default is all frames (not used by equatorial frames). in_place : bool, optional If `True`, modify the coordinates in place. Otherwise, return a copy of the offsets. Returns ------- native_offsets : Coordinate2D """ if not in_place: offset = offset.copy() offset.scale_x(-1.0) self.native_equatorial_to_native_offset(offset) return offset
[docs] def native_to_equatorial(self, native, indices=None, equatorial=None): """ Convert native coordinates to equatorial coordinates. Parameters ---------- native : EquatorialCoordinates The coordinates to convert. indices : int or slice or numpy.ndarray (int or bool) The frame indices for which to calculate offsets. The default is all frames (not used by equatorial frames). equatorial : EquatorialCoordinates, optional If not supplied, the returned coordinates will have a J2000 epoch. Otherwise, the equatorial coordinates provided will be populated. Returns ------- equatorial_coordinates : EquatorialCoordinates """ if equatorial is None: equatorial = EquatorialCoordinates(epoch=J2000) equatorial.copy_coordinates(native) return equatorial
[docs] def equatorial_to_native(self, equatorial, indices=None, native=None): """ Convert equatorial coordinates to native coordinates. Parameters ---------- equatorial : EquatorialCoordinates The equatorial coordinates to convert. indices : int or slice or numpy.ndarray (int or bool) The frame indices for which to calculate offsets. The default is all frames (not used by equatorial frames). native : SphericalCoordinates, optional The native coordinates to populate. Will default to spherical coordinates if not provided. Returns ------- native_coordinates : EquatorialCoordinates """ if native is None: native = EquatorialCoordinates(epoch=J2000) native.copy_coordinates(equatorial) return native
[docs] def get_native_offset_from(self, reference, indices=None): """ Return the native offset from a reference coordinate. Parameters ---------- reference : EquatorialCoordinates The reference position from which to derive the offsets of the frame positions. indices : numpy.ndarray (int), optional The frame indices to use. The default is all indices. Returns ------- Coordinate2D """ return self.get_equatorial_native_offset_from( reference, indices=indices)
[docs] def get_equatorial_native_offset_from(self, reference, indices=None): """ Find the native equatorial offset from a reference position. The result will be: dx = (x - ref(x)) / cos(ref(y)) dy = (y - ref(y)) Parameters ---------- reference : EquatorialCoordinates The reference equatorial coordinate(s). indices : int or numpy.ndarray (int), optional The frame indices for which to derive offsets. If an array is provided is should be of shape (n,). The default is all indices. Returns ------- offset : Coordinate2D The native equatorial offsets between the frame equatorial positions and a reference position. """ if indices is None: indices = slice(None) if self.is_singular: equatorial = self.equatorial else: equatorial = self.equatorial[indices] return equatorial.get_native_offset_from(reference)
[docs] def get_first_frame_index_from(self, index): """ Return the first valid frame index after and including a given index. Parameters ---------- index : int Returns ------- first_frame : int """ if index < 0: index = self.size + index return np.nonzero(self.valid[index:])[0][0] + index
[docs] def get_first_frame_index(self, reference=0): """ Return the first valid frame index of the integration. Returns ------- first_frame : int reference : int, optional If supplied, finds the first frame from `reference`, rather than the first index (0). May take negative values to indicate an index relative to the last. """ return self.get_first_frame_index_from(reference)
[docs] def get_last_frame_from(self, index): """ Return the last valid frame index before and including a given index. Parameters ---------- index : int Returns ------- last_frame : int """ if index < 0: index = self.size + index return np.nonzero(self.valid[:index])[0][-1]
[docs] def get_last_frame_index(self, reference=None): """ Return the last valid frame index of the integration. Returns ------- last_frame : int reference : int, optional reference : int, optional If supplied, finds the last frame before `reference`, rather than the last index (self.size). May take negative values to indicate an index relative to the last index. """ if reference is None: reference = self.size return self.get_last_frame_from(reference)
[docs] def get_first_frame_value(self, field): """ Return the first valid frame data for the given field. Parameters ---------- field : str Name of the frame data field. Returns ------- value """ index = self.get_first_frame_index() values = getattr(self, field, None) if values is None: raise ValueError(f"{self} does not contain {field} field.") return values[index]
[docs] def get_last_frame_value(self, field): """ Return the last valid frame data for the given field. Parameters ---------- field : str Name of the frame data field. Returns ------- value """ index = self.get_last_frame_index() values = getattr(self, field, None) if values is None: raise ValueError(f"{self} does not contain {field} field.") return values[index]
[docs] def get_first_frame_value_from(self, index, field): """ Return the first valid frame value before and including a given index. Parameters ---------- index : int field : str Name of the frame data field. Returns ------- value """ index = self.get_first_frame_index_from(index) values = getattr(self, field, None) if values is None: raise ValueError(f"{self} does not contain {field} field.") return values[index]
[docs] def get_last_frame_value_from(self, index, field): """ Return the last valid frame value before and including a given index. Parameters ---------- index : int field : str Name of the frame data field. Returns ------- value """ index = self.get_last_frame_from(index) values = getattr(self, field, None) if values is None: raise ValueError(f"{self} does not contain {field} field.") return values[index]
[docs] def get_first_frame(self, reference=0): """ Return the first valid frame. Parameters ---------- reference : int, optional The first actual frame index after which to return the first valid frame. The default is the first (0). Returns ------- Frames """ return self[self.get_first_frame_index(reference=reference)]
[docs] def get_last_frame(self, reference=None): """ Return the first valid frame. Parameters ---------- reference : int, optional The last actual frame index before which to return the last valid frame. The default is the last. Returns ------- Frames """ return self[self.get_last_frame_index(reference=reference)]
[docs] def add_dependents(self, dependents, start=None, end=None): """ Add dependents from frame data. Parameters ---------- dependents : numpy.ndarray (float) Must be the same size as self.size. start : int, optional The starting frame. end : int, optional The exclusive ending frame. Returns ------- None """ frames_numba_functions.add_dependents( dependents=self.dependents, dp=dependents, frame_valid=self.valid, start_frame=start, end_frame=end, subtract=False)
[docs] def remove_dependents(self, dependents, start=None, end=None): """ Remove dependents from frame data. Parameters ---------- dependents : numpy.ndarray (float) Must be the same size as self.size. start : int, optional The starting frame. end : int, optional The exclusive ending frame. Returns ------- None """ frames_numba_functions.add_dependents( dependents=self.dependents, dp=dependents, frame_valid=self.valid, start_frame=start, end_frame=end, subtract=True)
[docs] def shift_frames(self, n_frames): """ Shift the "readout" data by a number of frames in either direction. This will only shift data fields contained in the 'readout_attributes' frame property. For frame `i`, the result of shifting by `n_frames` will be: new[i + n_frames] = old[i] Frames shifted outside of an array will be set to invalid. Parameters ---------- n_frames : int Returns ------- None """ shift_valid = np.roll(self.valid, n_frames) if n_frames > 0: shift_valid[:n_frames] = False elif n_frames < 0: shift_valid[n_frames:] = False self.valid &= shift_valid invalid = np.logical_not(self.valid) for field in self.readout_attributes: value = getattr(self, field, None) if value is None: continue if isinstance(value, np.ndarray): saved_invalid = value[invalid] value = np.roll(value, n_frames, axis=0) value[invalid] = saved_invalid elif isinstance(value, Coordinate): value.shift(n_frames, fill_value=np.nan) setattr(self, field, value)
[docs] @staticmethod def correct_factor_dimensions(factor, array): """ Corrects the factor dimensionality prior to an array +-/* etc. Frame operations are frequently of the form result = factor op array where factor is of shape (n_frames,) and array is of shape (n_frames, ...). This procedure updates the factor shape so that array operations are possible. E.g., if factor is of shape (5,) and array is of shape (5, 10), then the output factor will be of shape (5, 1) and allow the two arrays to operate with each other. Parameters ---------- factor : int or float or numpy.ndarray The factor to check. array : numpy.ndarray The array to check against Returns ------- working_factor : numpy.ndarray """ return Coordinate2D.correct_factor_dimensions(factor, array)
[docs] def set_from_downsampled(self, frames, start_indices, valid, window): """ Set the data for these frames by downsampling higher-res frames. Parameters ---------- frames : Frames The frames at a higher resolution. start_indices : numpy.ndarray (int) The start indices containing the first index of the high resolution frame for each convolution with the window function. Should be of shape (self.size,). valid : numpy.ndarray (bool) A boolean mask indicating whether a downsampled frame could be determined from the higher resolution frames. Should be of shape (self.size,). window : numpy.ndarray (float) The window function used for convolution of shape (n_window,). Returns ------- None """ data, sample_flag = frames_numba_functions.downsample_data( data=frames.data, sample_flag=frames.sample_flag, valid=valid, window=window, start_indices=start_indices ) self.data = data self.sample_flag = sample_flag self.valid = valid.copy()