# Licensed under a 3-clause BSD style license - see LICENSE.rst
from astropy import units, log
from astropy.io import fits
from astropy.io.fits.verify import VerifyWarning
from astropy.time import Time
from astropy.coordinates import FK4, FK5, BaseCoordinateFrame
from astropy.coordinates import concatenate as astro_concat
from astropy.coordinates.angles import Angle
from astropy.modeling.functional_models import Gaussian2D
from astropy.stats import gaussian_fwhm_to_sigma
import bottleneck as bn
from copy import deepcopy
import math
import numpy as np
import random
import re
import string
import warnings
from sofia_redux import scan as sofia_scan_module
from sofia_redux.scan.utilities.range import Range
from sofia_redux.scan.utilities import numba_functions
from sofia_redux.toolkit.utilities.func import clear_numba_cache as \
toolkit_clear_numba_cache
__all__ = ['UNKNOWN_STRING_VALUE', 'UNKNOWN_FLOAT_VALUE',
'UNKNOWN_INT_VALUE', 'UNKNOWN_BOOL_VALUE', 'get_string',
'get_bool', 'get_int', 'get_float', 'get_range',
'get_list', 'get_string_list', 'get_float_list',
'get_int_list', 'to_int_list', 'parse_time',
'parse_angle', 'get_dms_angle', 'get_hms_time',
'get_sign', 'get_epoch', 'valid_value', 'robust_mean',
'robust_sigma_clip_mask', 'roundup_ratio', 'rotate',
'log2round', 'pow2round', 'pow2floor', 'pow2ceil',
'skycoord_insert_blanks', 'dict_intersection',
'dict_difference', 'to_header_quantity', 'to_header_float',
'combine_beams', 'convolve_beam', 'deconvolve_beam',
'encompass_beam', 'encompass_beam_fwhm', 'get_beam_area',
'get_header_quantity', 'ascii_file_to_frame_data',
'insert_into_header', 'insert_info_in_header', 'to_header_cards',
'clear_numba_cache', 'round_values', 'get_comment_unit',
'safe_sidereal_time']
UNKNOWN_STRING_VALUE = ''
UNKNOWN_FLOAT_VALUE = -9999.0
UNKNOWN_INT_VALUE = -9999
UNKNOWN_BOOL_VALUE = False
ieee_remainder = np.vectorize(math.remainder)
[docs]
def get_string(value, default=None):
"""
Return a string representation of the given value.
Parameters
----------
value : thing
The value to resolve.
default : str, optional
The default result to return if a conversion is not possible,
or `value` matches the UNKNOWN_STRING_VALUE.
Returns
-------
str
"""
if value is None or value == UNKNOWN_STRING_VALUE:
return default
try:
result = str(value)
except (ValueError, TypeError):
result = default
return result
[docs]
def get_bool(value, default=False):
"""
Return a bool representation of the given value.
Parameters
----------
value : thing
The value to resolve.
default : bool, optional
The default result to return if a conversion is not possible,
or `value` matches the UNKNOWN_BOOL_VALUE.
Returns
-------
bool
"""
if value is None:
return default
value = str(value).strip().lower()
if len(value) == 0:
return default
if value[0] in ['0', 'n', 'f']:
return False
return True
[docs]
def get_int(value, default=0):
"""
Return an integer representation of the given value.
Parameters
----------
value : thing
The value to resolve.
default : int, optional
The default result to return if a conversion is not possible,
or `value` matches the UNKNOWN_INT_VALUE.
Returns
-------
int
"""
if value is None:
return default
try:
value = int(value)
except (ValueError, TypeError):
return default
if value == UNKNOWN_INT_VALUE:
return default
return value
[docs]
def get_float(value, default=np.nan):
"""
Return a float representation of the given value.
Parameters
----------
value : thing
The value to resolve.
default : float, optional
The default result to return if a conversion is not possible,
or `value` matches the UNKNOWN_FLOAT_VALUE.
Returns
-------
int
"""
if value is None:
return default
try:
value = float(value)
except (ValueError, TypeError):
return default
if np.isclose(value, UNKNOWN_FLOAT_VALUE, equal_nan=True):
return default
return value
[docs]
def get_range(value, default=Range(), is_positive=False):
"""
Return a Range object generated from the given value.
Parameters
----------
value : thing
The thing to resolve.
default : Range, optional
The default result to return if a Range could not be resolved.
is_positive : bool, optional
If `True`, all values in the range are considered positive and any
'-' character in `spec` will be treated as a delimiter rather than
a minus sign.
Returns
-------
Range
"""
if value is None:
return default
elif isinstance(value, Range):
return value.copy()
elif not isinstance(value, str) and len(value) == 2:
try:
return Range(min_val=value[0], max_val=value[1])
except ValueError:
return default
try:
return Range.from_spec(value, is_positive=is_positive)
except ValueError:
return default
[docs]
def get_list(value):
"""
Resolve the given value into a list.
Parameters
----------
value : list or thing
Returns
-------
list
"""
if value is None:
return []
if isinstance(value, list):
return value
return [value]
[docs]
def get_string_list(value, delimiter=',', default=None):
"""
Return a list of strings from the given single string.
Parameters
----------
value : str
The string to convert into a list.
delimiter : str, optional
The string delimiter used to separate one element from the next.
default : list, optional
The result to return if the value passed in is `None`.
Returns
-------
list (str)
"""
if value is None:
return default
if isinstance(value, str):
value = [s.strip() for s in re.split(delimiter, value)]
return value
[docs]
def get_float_list(value, delimiter=',', default=None):
"""
Return a list of floats from a single string.
Parameters
----------
value : str
The string to convert into a list of floats.
delimiter : str, optional
The string delimiter used to separate one element from the next.
default : list, optional
The result to return if the value passed in is `None`.
Returns
-------
list (float)
"""
value = get_string_list(value, delimiter=delimiter, default=None)
if value is None:
return default
return [get_float(x) for x in value]
[docs]
def get_int_list(value, delimiter=',', default=None, is_positive=True):
"""
Return a list of ints from a single string.
Parameters
----------
value : str
The string to convert into a list of integers.
delimiter : str, optional
The string delimiter used to separate one element from the next.
default : list, optional
The result to return if the value passed in is `None`.
is_positive : bool, optional
If `True`, ranges may be specified using both ':' and '-' characters in
a string. Otherwise, the '-' character will imply a negative value.
Returns
-------
list (int)
"""
values = get_string_list(value, delimiter=delimiter, default=None)
if values is None:
return default
return to_int_list(values, is_positive=is_positive)
[docs]
def to_int_list(values, is_positive=True):
"""
Convert all elements in a list to integers.
String representations of integers may also be included as ranges.
The default behaviour is to treat the '-' character as range rather than
a minus sign. Ranges should generally be specified using the ':'
character. To allow negative values, set `is_positive=False`.
For example
>>> print(to_int_list(['1-3']))
[1, 2, 3]
>>> print(to_int_list(['-1:3'], is_positive=False))
[-1, 0, 1, 2, 3]
If is_positive is not set, the meaning of '-' is ambiguous and can result
in errors.
Parameters
----------
values : list
is_positive : bool, optional
If `True`, ranges may be specified using both ':' and '-' characters in
a string. Otherwise, the '-' character will imply a negative value.
Returns
-------
list (int)
"""
result = []
for value in values:
if isinstance(value, str):
splitter = r'[-:]' if is_positive else r'[:]'
if re.search(splitter, value) is not None:
start, stop = re.split(splitter, value)
result.extend(list(np.arange(int(start), int(stop) + 1)))
else:
result.append(int(value))
else:
result.append(int(value))
return result
[docs]
def parse_time(hms_string, angle=False):
"""
Returns H:M:S string as time Quantity.
Parameters
----------
hms_string : string or float
angle : bool, optional
If `True`, return an hour angle unit instead of hour unit.
Returns
-------
time : units.Quantity
An hour if angle is `False`, otherwise an hour angle.
"""
hms = re.split(r'[\t\n\r:hmsHMS]', str(hms_string))
hms = [x.strip() for x in hms]
hms = [x for x in hms if x != '']
unit = units.Unit('hourangle') if angle else units.Unit('hour')
if len(hms) != 3:
raise ValueError(f"Cannot parse {hms_string} as an hh:mm:ss string.")
try:
hms = [float(x) for x in hms]
except (ValueError, TypeError):
raise ValueError(f"Cannot parse {hms_string} as an hh:mm:ss string.")
return ((hms[0]) + (hms[1] / 60) + (hms[2] / 3600)) * unit
[docs]
def parse_angle(dms):
"""
Convert a degree:minute:second string into an angle.
Parameters
----------
dms : str
Returns
-------
angle : astropy.units.Quantity
"""
dms = dms.strip()
negative = dms.startswith('-')
if negative:
dms = dms[1:]
regex = r'[- \t\n\r:dmsDMS]'
s = re.split(regex, dms)
s = [x for x in s if s != '']
try:
angle = int(s[0]) + (int(s[1]) / 60) + (float(s[2]) / 3600)
except (ValueError, TypeError):
raise ValueError(f"Cannot parse {dms} as a dd:mm:ss string.")
if negative:
angle *= -1
return angle * units.Unit('deg')
[docs]
def get_dms_angle(value, default=np.nan):
"""
Return a degree angle quantity from a given value.
The provided value may be a Quantity, float, int, or string value. If a
string value is provided, it will be parsed according to
:func:`parse_angle`.
Parameters
----------
value : int or float or str or units.Quantity
default : int or float or units.Quantity, optional
The default angle to return in cases where the `value` cannot be
parsed correctly
Returns
-------
angle : units.Quantity
The resolved angle in degrees.
"""
degree = units.Unit('deg')
if not isinstance(default, units.Quantity):
try:
default = float(default) * degree
except (ValueError, TypeError):
raise ValueError(f"Cannot convert default ({default}) to degrees.")
else:
try:
default = default.to(degree)
except units.UnitConversionError:
raise ValueError(f"Cannot convert default ({default} to degrees.")
if isinstance(value, units.Quantity):
try:
return value.to(degree)
except units.UnitConversionError:
log.warning(f"Cannot convert {value.unit} to {degree}")
return default
if value is None:
return default
try:
value = float(value)
if np.isclose(value, UNKNOWN_FLOAT_VALUE, equal_nan=True):
return default
value *= degree
except (ValueError, TypeError):
try:
value = parse_angle(str(value))
except (ValueError, TypeError):
log.warning(f"Attempting to parse {value} as a dd:mm:ss string.")
return default
return value
[docs]
def get_hms_time(value, angle=False, default=np.nan):
"""
Return a time or hour angle quantity from a given value.
The provided value may be a Quantity, float, int, or string value. If a
string value is provided, it will be parsed according to
:func:`parse_angle`.
Parameters
----------
value : int or float or str or units.Quantity
angle : bool, optional
If `True`, return an hour angle unit instead of hour unit.
default : int or float or units.Quantity, optional
The default angle to return in cases where the `value` cannot be
parsed correctly
Returns
-------
time : units.Quantity
The resolved time in hours, or as an hour angle.
"""
unit = units.Unit('hourangle') if angle else units.Unit('hour')
if not isinstance(default, units.Quantity):
try:
default = float(default) * unit
except (ValueError, TypeError):
raise ValueError(f"Cannot convert default ({default}) to {unit}.")
else:
try:
default = default.to(unit)
except units.UnitConversionError:
raise ValueError(f"Cannot convert default ({default} to {unit}.")
if isinstance(value, units.Quantity):
try:
return value.to(unit)
except units.UnitConversionError:
log.warning(f"Cannot convert {value.unit} to {unit}")
return default
if value is None:
return default
try:
value = float(value)
if np.isclose(value, UNKNOWN_FLOAT_VALUE, equal_nan=True):
return default
value *= unit
except (ValueError, TypeError):
try:
value = parse_time(str(value), angle=angle)
except (ValueError, TypeError):
log.warning(f"Attempting to parse {value} as an hh:mm:ss string.")
return default
return value
[docs]
def get_sign(value, default=0):
"""
Return an integer representation of a sign.
Parameters
----------
value : int or float or str
The value to parse.
default : int, optional
The default sign to return.
Returns
-------
sign : int
1 for a positive sign, -1 for a negative sign, and 0 for no sign.
"""
if isinstance(value, (int, float)):
if value == 0 or np.isnan(value):
return 0
return -1 if value < 0 else 1
if value is None:
return default
str_value = str(value).strip().lower()
if str_value in ['+', 'pos', 'positive', 'plus']:
return 1
elif str_value in ['-', 'neg', 'negative', 'minus']:
return -1
elif str_value in ['*', 'any']:
return 0
else:
try:
value = float(value)
if value < 0:
return -1
elif value > 0:
return 1
else:
return 0
except ValueError:
return default
[docs]
def get_epoch(equinox):
"""
Return an astropy frame representing the equinox of an observation.
An FK4 frame will be returned in instances where `equinox` is a string and
the first letter is 'B' (B1950 for example), or the year is less than 1984.
In other instances a Julian FK5 frame will be returned.
Parameters
----------
equinox : BaseCoordinateFrame or Time or float or int or str
The equinox of the observation
Returns
-------
epoch : BaseCoordinateFrame
Typically and FK5 or FK5 frame unless another BaseCoordinateFrame was
passed in as `equinox`.
"""
if isinstance(equinox, BaseCoordinateFrame):
return equinox
elif isinstance(equinox, Time):
if equinox.datetime.year < 1984:
return FK4(equinox=equinox)
else:
return FK5(equinox=equinox)
equinox = str(equinox).strip().upper()
if equinox[0].isalpha():
prefix = equinox[0]
year = float(equinox[1:])
else:
year = float(equinox)
prefix = 'B' if year < 1984 else 'J'
equinox = prefix + str(year)
frame = FK4 if prefix == 'B' else FK5
return frame(equinox=Time(equinox))
[docs]
def valid_value(value):
"""
Return if a given value is valid in the context of unknown values.
The unknown values are `utils.UNKNOWN_INT_VALUE`,
`utils.UNKNOWN_STRING_VALUE`, `utils.UNKNOWN_FLOAT_VALUE`, and NaN.
Parameters
----------
value : bool or int or str or float
The value to check against the list of UNKNOWN values.
Returns
-------
valid : bool
`False` if the value and type matches the corresponding unknown
value or is not a bool, int, str, or float. NaNs also return `False`.
All other values will return `True`.
"""
if isinstance(value, bool):
return True
elif isinstance(value, int):
return value != UNKNOWN_INT_VALUE
elif isinstance(value, str):
return value != UNKNOWN_STRING_VALUE
elif isinstance(value, float):
return (value != UNKNOWN_FLOAT_VALUE) and not np.isnan(value)
else:
return False
[docs]
def robust_mean(array, tails=None):
"""
Performs a mean calculation with optional tails.
NaNs are automatically excluded from the mean calculation. The `tails`
parameter should be between 0 and 0.5, and will exclude
`tails` * `array.size` from both the beginning and end of the sorted
array values.
Parameters
----------
array : numpy.ndarray or Quantity
tails : float, optional
The fraction of tails to discount from the mean calculation. The
default (`None`), does not exclude any tails and considers the entire
population.
Returns
-------
mean : float or Quantity
The mean of the array.
"""
if isinstance(array, units.Quantity):
u = array.unit
else:
u = None
if tails is None:
result = bn.nanmean(array)
return result if u is None else result * u
array = np.asarray(array)
dn = round_values(array.size * tails)
result = bn.nanmean(np.sort(array)[dn:-dn])
return result if u is None else result * u
[docs]
def robust_sigma_clip_mask(values, weights=None, mask=None, sigma=5.0,
verbose=False, max_iterations=5):
"""
Return a masking array indicating sigma clipped elements of an input array.
Iteratively identifies outliers in a given input array based on the number
of standard deviations (`sigma`) of each datum from the median value of
the given `values`.
On each iteration, values that are more than `sigma` * std(`values`) away
from the median are ignored on subsequent iterations in which new medians
and standard deviations are calculated.
Note that while any multi-dimensional arrays may be provided, the
calculated median and standard deviation on each iteration will apply
to the entirety of the data set.
The iterations will desist once `max_iterations` iterations have occurred,
no additional elements are identified as outliers on an iteration, or all
elements have been masked.
Parameters
----------
values : units.Quantity or numpy.ndarray (float)
The values for which to calculated the sigma clipping masked array.
NaN values will be ignored in all calculations and appear as `False` in
the output mask array.
weights : units.Quantity or numpy.ndarray (float), optional
Optional weighting for the values. These should typically represent
the inverse variance of the provided data. If provided, the calculated
median will be weighted accordingly using
:func:`numba_functions.smart_median`, as will the derived standard
deviation.
mask : numpy.ndarray (bool), optional
An optional starting mask where `False` indicates an invalid or clipped
value. Will not be updated in-place.
sigma : float, optional
The number of standard deviations away from the median that will result
in a datum being clipped.
verbose : bool, optional
If `True`, will output log messages indicating the total number of
clipped values, the median, and the standard deviation on each
iteration.
max_iterations : int, optional
The maximum number of iterations that will be used to derive the
masking array.
Returns
-------
mask : np.ndarray (bool)
The sigma clipped masking array where `False` indicates a clipped value
and `True` indicates a valid value.
"""
if isinstance(values, units.Quantity):
unit = values.unit
values = values.value
else:
unit = None
if isinstance(weights, units.Quantity):
weights = weights.value
if mask is None:
mask = np.isfinite(values)
else:
mask = mask & np.isfinite(values)
for _ in range(max_iterations):
if verbose:
log.info(f"Valid values: {mask.sum()}")
current_values = values[mask]
current_weights = None if weights is None else weights[mask]
median_value = numba_functions.smart_median(
current_values, weights=current_weights)[0]
standard_deviation = np.sqrt(
np.cov(current_values, aweights=current_weights))
limit = standard_deviation * sigma
if verbose:
line = f"Median: {median_value} +/- {standard_deviation}"
if unit is not None:
line += f" {unit}"
log.info(line)
valid = np.abs(current_values - median_value) <= limit
if valid.all():
break
mask[mask] = valid
# This is probably redundant and can't be tested easily
if not valid.any(): # pragma: no cover
break
return mask
[docs]
def roundup_ratio(a, b):
"""
Returns int((a + b - 1) / b).
Parameters
----------
a : int or float or numpy.ndarray
b : int or float or numpy.ndarray
Returns
-------
ratio : int
The roundup ratio.
"""
if hasattr(a, '__len__') or hasattr(b, '__len__'):
a = np.asarray(a)
b = np.asarray(b)
return ((a + b - 1) / b).astype(int)
return numba_functions.roundup_ratio(a, b)
[docs]
def rotate(positions, angle):
"""
Rotate 2 dimensional coordinates by a given angle.
Parameters
----------
positions : numpy.ndarray
x, y positions of shape (N, 2)
angle : float or astropy.units.Quantity or numpy.ndarray (float)
The rotation angle (anti-clockwise) in radians (float) or as an
astropy unit. If supplied as an array, should be of shape (N,).
Returns
-------
numpy.ndarray
Rotated x, y positions
"""
sin_a = np.sin(angle)
cos_a = np.cos(angle)
result = np.empty_like(positions)
result[:, 0] = (positions[:, 0] * cos_a) - (positions[:, 1] * sin_a)
result[:, 1] = (positions[:, 0] * sin_a) + (positions[:, 1] * cos_a)
return result
[docs]
def log2round(x):
"""
Returns the rounded value of log_2(x).
Examples
--------
>>> print(log2round(1024))
10
>>> halfway = int(2 ** 9.5)
>>> print(log2round(halfway))
9
>>> print(log2round(halfway + 1))
10
Parameters
----------
x : int or float or numpy.ndarray
The input value(s).
Returns
-------
log2 : int or numpy.ndarray (int)
"""
return round_values(np.log2(x))
[docs]
def pow2round(x):
"""
Returns the value rounded to the closest 2^N integer.
Examples
--------
>>> print(pow2round(1023))
1024
>>> halfway = int(2 ** 9.5)
>>> print(pow2round(halfway))
512
>>> print(pow2round(halfway + 1))
1024
Parameters
----------
x : int or float or numpy.ndarray
The input value(s).
Returns
-------
result : int or numpy.ndarray (int)
"""
return 1 << log2round(x)
[docs]
def pow2floor(x):
"""
Returns the value floored to the closest 2^N integer.
Examples
--------
>>> print(pow2floor(1023))
512
>>> print(pow2floor(1024))
1024
>>> print(pow2floor(1025))
1024
Parameters
----------
x : int or float or numpy.ndarray
The input value(s).
Returns
-------
result : int or numpy.ndarray (int)
"""
l2 = np.log2(x)
if isinstance(l2, np.ndarray):
return 2 ** l2.astype(int)
return 2 ** int(np.log2(x))
[docs]
def pow2ceil(x):
"""
Returns the value ceiled to the closest 2^N integer.
Examples
--------
>>> print(pow2ceil(1023))
1024
>>> print(pow2ceil(1024))
1024
>>> print(pow2ceil(1025))
2048
Parameters
----------
x : int or float or numpy.ndarray
The input value(s).
Returns
-------
result : int or numpy.ndarray (int)
"""
value = pow2floor(x)
if not isinstance(value, np.ndarray):
return value if value == x else value * 2
value[value != np.asarray(x)] *= 2
return value
[docs]
def skycoord_insert_blanks(coordinates, insert_indices):
"""
Insert zeroed coordinates in an astropy coordinate frame.
Parameters
----------
coordinates : astropy.coordinates.BaseCoordinateFrame or SkyCoord
insert_indices : numpy.ndarray (float)
The indices at which to insert zero values.
Returns
-------
astropy.coordinates.BaseCoordinateFrame
"""
n_insert = insert_indices.size
new = astro_concat((coordinates, coordinates))
new.cache.clear()
new.data.lat[coordinates.size:] *= 0
new.data.lon[coordinates.size:] *= 0
old_indices = np.arange(coordinates.size)
good_indices = old_indices.copy()
for index in insert_indices:
good_indices[index:] += 1
bad_indices = np.arange(n_insert) + insert_indices
full_indices = np.zeros(coordinates.size + n_insert, dtype=int)
full_indices[good_indices] = old_indices
full_indices[bad_indices] = coordinates.size + np.arange(n_insert)
return new[full_indices].copy()
[docs]
def dict_intersection(options1, options2, initialized=False):
"""
Return the intersection of two dictionaries.
Parameters
----------
options1 : dict
options2 : dict
initialized : bool, optional
A switch for use on recursive calls. Don't mess.
Returns
-------
intersection : dict
"""
if initialized:
result = dict()
else:
result = options1.__class__()
for k in set(options1.keys()) & set(options2.keys()):
value1 = options1[k]
value2 = options2[k]
if isinstance(value1, dict) and isinstance(value2, dict):
result[k] = dict_intersection(value1, value2, initialized=True)
elif value1 == value2:
result[k] = value1
return result
[docs]
def dict_difference(options1, options2, initialized=False):
"""
Return the difference of two dictionaries.
Parameters
----------
options1 : dict
options2 : dict
initialized : bool, optional
A switch for use on recursive calls. Don't mess.
Returns
-------
dict
"""
if initialized:
result = dict()
else:
result = options1.__class__()
for key, value in options1.items():
if key not in options2:
result[key] = deepcopy(value)
continue
other_value = options2[key]
if isinstance(value, dict) and isinstance(other_value, dict):
diff = dict_difference(value, other_value, initialized=True)
if len(diff) > 0:
result[key] = diff
continue
if value != other_value:
result[key] = value
return result
[docs]
def combine_beams(beam1, beam2, deconvolve=False):
"""
Convolve or deconvolve one beam by another.
Parameters
----------
beam1 : beams.gaussian_2d.Gaussian2D or Gaussian2D
The original beam to modify.
beam2 : Gaussian2D or None or beams.gaussian_2d.Gaussian2D
The beam to modify `beam1` with.
deconvolve : bool, optional
If `True`, indicates a deconvolution rather than a convolution.
Returns
-------
beams.gaussian_2d.Gaussian2D or Gaussian2D
The combined beam.
"""
if beam2 is None:
return beam1.copy()
a2x = beam1.x_stddev ** 2
a2y = beam1.y_stddev ** 2
b2x = beam2.x_stddev ** 2
b2y = beam2.y_stddev ** 2
direction = -1 if deconvolve else 1
a = a2x - a2y
b = b2x - b2y
rad = units.Unit('radian')
theta1 = beam1.theta
theta2 = beam2.theta
if not isinstance(theta1, units.Quantity):
theta1 = theta1 * rad
if not isinstance(theta2, units.Quantity):
theta2 = theta2 * rad
delta = theta2 - theta1
delta_unit = delta.unit
delta = ((delta.to(rad).value % np.pi) * rad).to(delta_unit)
c2 = (a ** 2) + (b ** 2) + (2 * direction * a * b * np.cos(delta))
if hasattr(beam1.x_stddev, 'unit') and beam1.x_stddev.unit is not None:
unit = beam1.x_stddev.unit
else:
unit = None
new = beam1.copy()
if c2 < 0: # pragma: no cover
if unit is None:
new.x_stddev = 0.0
new.y_stddev = 0.0
else:
new.x_stddev = 0.0 * unit
new.y_stddev = 0.0 * unit
return new
c = np.sqrt(c2)
b2 = a2x + a2y + (direction * (b2x + b2y))
x2 = 0.5 * (b2 + c)
y2 = 0.5 * (b2 - c)
if x2 < 0: # pragma: no cover
x2 = 0.0 if unit is None else 0.0 * (unit ** 2) # pragma: no cover
if y2 < 0:
y2 = 0.0 if unit is None else 0.0 * (unit ** 2)
x_stddev = np.sqrt(x2)
y_stddev = np.sqrt(y2)
if unit is not None:
x_stddev = x_stddev.to(unit)
y_stddev = y_stddev.to(unit)
if c == 0 or np.isnan(c):
theta = 0.0 * delta_unit
else:
sin_beta = direction * np.sin(delta).value * b / c
if isinstance(sin_beta, units.Quantity):
sin_beta = sin_beta.decompose().value
theta = (theta1.value + (0.5 * np.arcsin(sin_beta))) * rad
theta = theta.to(delta_unit)
new.x_stddev = x_stddev
new.y_stddev = y_stddev
new.theta = theta
return new
[docs]
def convolve_beam(beam1, beam2):
"""
Convolve one beam by another.
Parameters
----------
beam1 : beams.gaussian_2d.Gaussian2D or Gaussian2D
The Gaussian beam to convolve.
beam2 : beams.gaussian_2d.Gaussian2D or Gaussian2D
The Gaussian beam to convolve `beam1` with.
Returns
-------
beam : beams.gaussian_2d.Gaussian2D or Gaussian2D
"""
return combine_beams(beam1, beam2, deconvolve=False)
[docs]
def deconvolve_beam(beam1, beam2):
"""
Deconvolve one beam by another.
Parameters
----------
beam1 : beams.gaussian_2d.Gaussian2D or Gaussian2D
The Gaussian beam to deconvolve.
beam2 : beams.gaussian_2d.Gaussian2D or Gaussian2D
The Gaussian beam to deconvolve `beam1` by.
Returns
-------
beam : beams.gaussian_2d.Gaussian2D or Gaussian2D
"""
return combine_beams(beam1, beam2, deconvolve=True)
[docs]
def encompass_beam(beam1, beam2):
"""
Encompass one beam by another.
Parameters
----------
beam1 : beams.gaussian_2d.Gaussian2D or Gaussian2D
The Gaussian beam to encompass.
beam2 : beams.gaussian_2d.Gaussian2D or Gaussian2D
The Gaussian beam to encompass `beam1` with.
Returns
-------
beam : beams.gaussian_2d.Gaussian2D or Gaussian2D
"""
delta_angle = beam2.theta - beam1.theta
cos_a = np.cos(delta_angle)
sin_a = np.sin(delta_angle)
if hasattr(beam1.x_stddev, 'unit'):
base_unit = beam1.x_stddev.unit
else:
base_unit = None
min_x = np.hypot(beam2.x_stddev * cos_a, beam2.y_stddev * sin_a)
min_y = np.hypot(beam2.x_stddev * sin_a, beam2.y_stddev * cos_a)
if base_unit is not None:
if base_unit != min_x.unit:
min_x = min_x.decompose().to(base_unit)
if base_unit != min_y.unit:
min_y = min_y.decompose().to(base_unit)
beam = beam1.copy()
if beam1.x_stddev < min_x:
beam.x_stddev = min_x
if beam1.y_stddev < min_y:
beam.y_stddev = min_y
return beam
[docs]
def encompass_beam_fwhm(beam, fwhm):
"""
Encompass one beam by a given fwhm.
Parameters
----------
beam : beams.gaussian_2d.Gaussian2D or Gaussian2D or None
The Gaussian beam to encompass.
fwhm : float or units.Quantity
The full-width-half-max to encompass `beam1` with.
Returns
-------
beams.gaussian_2d.Gaussian2D or Gaussian2D
"""
stddev = gaussian_fwhm_to_sigma * fwhm
if beam is None:
return Gaussian2D(x_stddev=stddev, y_stddev=stddev)
encompassed_beam = beam.copy()
if beam.x_stddev < stddev:
encompassed_beam.x_stddev = stddev
if beam.y_stddev < stddev:
encompassed_beam.y_stddev = stddev
return encompassed_beam
[docs]
def get_beam_area(psf):
"""
Return the area of a 2-dimensional Gaussian model.
Parameters
----------
psf : beams.gaussian_2d.Gaussian2D or Gaussian2D or None
Returns
-------
area : float or units.Quantity
"""
if psf is None:
return 0.0
if hasattr(psf.x_stddev, 'unit'):
unit = psf.x_stddev.unit
else:
unit = None
area = 2.0 * np.pi * psf.x_stddev * psf.y_stddev
if unit is not None:
area = area.to(unit ** 2)
return area
[docs]
def ascii_file_to_frame_data(ascii_file):
"""
Convert time stream data from a ascii file to a numpy array.
Parameters
----------
ascii_file : str
The filename for the ascii file. Output from SOFSCAN via the
"write.ascii" configuration option.
Returns
-------
frame_data : numpy.ndarray (float)
The frame data of shape (n_frames, n_channels).
"""
with open(ascii_file, 'r') as f:
contents = f.readlines()
contents = [
c.strip().replace('---', 'NaN').replace('\t\t', '\t').split('\t')
for c in contents[1:]]
n_frames = len(contents)
n_channels = len(contents[0])
frame_data = np.empty((n_frames, n_channels), dtype=float)
for frame in range(n_frames):
frame_data[frame] = contents[frame]
return frame_data
[docs]
def clear_numba_cache(): # pragma: no cover
"""
Aggressively remove the numba cache for the sofia_scan package.
This is necessary when numba does not pick up changes in currently cached
functions.
Returns
-------
None
"""
toolkit_clear_numba_cache(module=sofia_scan_module)
[docs]
def round_values(x):
"""
Round any given value to the expected result.
The function :func:`np.round` does not round values as one might expect.
If int(x) is equal to an odd number, #.5 numbers will be rounded down
rather than up (as expected). This function attempts to fix that.
Parameters
----------
x : int or float or numpy.ndarray or units.Quantity
The value to round.
Returns
-------
rounded : int or numpy.ndarray (int) or units.Quantity
"""
if isinstance(x, int):
return x
if isinstance(x, float):
return numba_functions.round_value(x)
is_quantity = isinstance(x, units.Quantity)
if not is_quantity:
x = np.asarray(x)
if x.dtype == int:
return x
if is_quantity:
unit, values = x.unit, x.value
else:
unit, values = None, x
if values.shape == ():
result = np.asarray(numba_functions.round_value(float(values)))
else:
result = numba_functions.round_array(values)
if unit is None:
return result
return result * unit
def calculate_position_angle(lon1, lat1, lon2, lat2):
"""
Position Angle (East of North) between two points on a sphere.
Parameters
----------
lon1 : float or units.Quantity
The longitude of position 1.
lat1: float or units.Quantity
The latitude of position 1.
lon2 : float or units.Quantity
The longitude of position 2.
lat2 : float or units.Quantity
The latitude of position 2.
Returns
-------
pa : units.Quantity
The (positive) position angle of the vector pointing from position
1 to position 2. If any of the angles are arrays, this will contain
an array following the appropriate `numpy` broadcasting rules.
"""
delta_lon = lon2 - lon1
cos_lat = np.cos(lat2)
x = (np.sin(lat2) * np.cos(lat1)
- cos_lat * np.sin(lat1) * np.cos(delta_lon))
y = np.sin(delta_lon) * cos_lat
pa = Angle(np.arctan2(y, x), units.Unit('radian'))
pa = pa.wrap_at(360 * units.Unit('degree')).to('degree').value
return pa * units.Unit('degree')
[docs]
def safe_sidereal_time(t, kind, longitude=None, model=None):
"""
Calculate sidereal time, even if offline.
This is a wrapper around :func:`Time.sidereal_time` in cases where the IERS
table cannot be downloaded.
Parameters
----------
t : Time
The time for which to calculate the sidereal time.
kind : str
May be one of {'mean', 'apparent'} i.e., accounting for precession
only, or also for nutation.
longitude : units.Quantity or float or numpy.ndarray, optional
The longitude on the Earth at which to compute the Earth rotation
angle. If a float is provided, it should be in degrees.
model : str, optional
The precession model to use.
Returns
-------
longitude : units.Quantity
The local sidereal time in hourangle units.
"""
try:
return t.sidereal_time(kind, longitude=longitude, model=model)
except Exception as err:
log.warning(f"There was an error calculating sidereal time: {err}")
from astropy.utils.iers import iers
with iers.conf.set_temp('auto_max_age', None): # An offline option
return t.sidereal_time(kind, longitude=longitude, model=model)