Source code for sofia_redux.instruments.fifi_ls.pointing_discard
import os
from astropy.time import Time
import numpy as np
import pandas as pd
[docs]
def get_timing(primehead):
'''
Set up the pointing-discard-feature by calculating the duration of one ramp
and grating position. The duration of the ramp and grating position is
calculated by extracting the corresponding keywords from the FITS header of
the currently loaded FITS file.
Parameters
----------
primehead : astropy.io.fits.Header
Primary header of the loaded FITS file.
Returns
-------
dt_ramp, dt_grating : float, float
Duration in seconds of one ramp and one grating position.
'''
# Calculate the number of ramps per grating position
ramps_per_grating = 2 * primehead['C_CHOPLN'] \
/ primehead['RAMPLN_{}'.format(primehead['CHANNEL'][0])] \
* primehead['C_CYC_{}'.format(primehead['CHANNEL'][0])]
# Calculate the time interval in seconds of one ramp
dt_ramp = primehead['RAMPLN_{}'.format(primehead['CHANNEL'][0])] / 250
# Calculate the time interval in seconds of one grating position
dt_grating = ramps_per_grating * dt_ramp
return dt_ramp, dt_grating
[docs]
def load_pointing_data(primehead, pointing_directory=None):
'''
Load the file listing the pointing errors. The file must be in the “.tsv”
format and named as the mission ID, which is extracted from the FITS header.
The default path to the pointing error file is “./data/pointing_error/” but
this can be adjusted with the “pointing_directory” parameter.
Parameters
----------
primehead : astropy.io.fits.Header
Primary header of the loaded FITS file.
pointing_directory : str, optional
Path to the directory containing the pointing error files.
Returns
-------
df_pointing_error : pandas.DataFrame
Dataframe listing the pointing errors at a given time.
'''
# Load pointing error information
if pointing_directory is None:
pointing_directory = os.path.join(os.path.dirname(__file__), 'data',
'pointing_error')
pointing_err_file = os.path.join(pointing_directory,
'{}.tsv'.format(primehead['missn-id']))
df_pointing_error = pd.read_csv(pointing_err_file,
sep='\t', index_col='timestamp')
return df_pointing_error
[docs]
def get_pointing_mask(primehead, pointing_threshold, grating_idx, dt_ramp,
dt_grating, df_pointing_error):
'''
Create a boolean mask to set ramp values to NaN if the pointing at the ramp
time is below the defined threshold.
Parameters
----------
primehead : astropy.io.fits.Header
Primary header of the loaded FITS file.
pointing_threshold : astropy.io.fits.Header
Absolute threshold of the pointing error in arcsec. The slopes of ramps
with a pointing error greater than the define threshold are set to NaN.
grating_idx : int
Index of the grating position.
dt_ramp : float
Duration of one ramp in seconds.
dt_grating : float
Duration of one grating position in seconds.
df_pointing_error : pandas.DataFrame
Dataframe listing the pointing errors at a given time.
Returns
-------
pointing_mask : pandas.Series
Boolean mask used to discard ramps with bad pointing.
'''
# Calculate timestamp of each ramp in the current grating position
ramp_times = np.arange(round(dt_grating/dt_ramp))*dt_ramp + dt_ramp/2 \
+ grating_idx * dt_grating + Time(primehead['DATE-OBS']).unix
# Calculate the combined pointing error of RA and Dec
df_pointing_error['error_combined'] = pd.Series(
np.sqrt(df_pointing_error.error_RA**2
+ df_pointing_error.error_Dec**2))
# Calculate the mean ramp time for each chop cycle
n_ramps_chop_cyc = int(2 * primehead['C_CHOPLN'] \
/ primehead['RAMPLN_{}'.format(primehead['CHANNEL'][0])])
chop_cycle_times = np.mean(ramp_times.reshape(-1,n_ramps_chop_cyc), axis=1)
# Create an empty dataframe with the calculated chop cycle times.
# The pointing error columns are filled with NaNs.
df_chop_cycle_times = pd.DataFrame(index=chop_cycle_times,
columns=df_pointing_error.columns,
).astype(df_pointing_error.dtypes)
# Merge the new empty dataframe with the loaded pointing error dataframe
df_merged = pd.concat([df_pointing_error, df_chop_cycle_times])
# Interpolate the pointing error at the mean chop cycle times
df_merged = df_merged.sort_index().interpolate('index', limit_area='inside')
# Remove all rows but the rows listing the chop cycle times
df_chop_cycle_pointing_errors = df_merged.loc[chop_cycle_times]
# Create boolean pointing mask
pointing_mask = df_chop_cycle_pointing_errors.error_combined \
< pointing_threshold
# Repeat the pointing mask to adjust shape
pointing_mask = np.repeat(pointing_mask, 2)
return pointing_mask