# Licensed under a 3-clause BSD style license - see LICENSE.rst
from datetime import datetime
import os
from astropy import log
from astropy.io import fits
from astropy.table import Table
from astropy.time import Time
import numpy as np
from sofia_redux.instruments.fifi_ls.make_header import make_header
from sofia_redux.toolkit.utilities \
import (hdinsert, write_hdul, gethdul, multitask, valid_num)
__all__ = ['get_channel', 'get_split_params', 'trim_data',
'name_output_files', 'separate_chops',
'split_grating_and_chop', 'wrap_split_grating_and_chop']
[docs]
def get_channel(hdul, channel_index=3):
"""
Returns RED or BLUE channel extracted from header and data.
If the channel cannot be read from the prime header, it is read
from the data header.
Parameters
----------
hdul : fits.HDUList
channel_index : int, optional
data header location determining channel
Returns
-------
str
RED, BLUE, or UNKNOWN
"""
channel = str(hdul[0].header.get('CHANNEL', 'UNKNOWN'))
result = 'UNKNOWN'
channel = channel.strip().upper()
if valid_num(channel):
if channel == '1':
result = 'BLUE'
elif channel == '0':
result = 'RED'
elif channel != 'UNKNOWN':
if channel in ['BLUE', 'RED']:
result = channel
else:
blue = (hdul[1].data['HEADER'][0][channel_index] & 2) // 2 == 1
result = 'BLUE' if blue else 'RED'
return result
[docs]
def get_split_params(hdul, channel_index=3, sample_index=4,
ramp_index=5):
"""
Check if a file can be split by basic header checks. If so,
return the necessary parameters for splitting the file.
Parameters
----------
hdul : fits.HDUList
channel_index : int, optional
data header index indicating channel
sample_index : int, optional
data header index indicating number of samples in data
ramp_index : int, optional
data header index indicating number of ramps in data
Returns
-------
dict
G_STRT (int) -> grating/inductosyn start
G_PSUP (int) -> number of grating positions going up
G_SZUP (int) -> step size on the way up
G_PSDN (int) -> number of grating positions coming down
G_SZDN (int) -> step size on the way down
G_CYC (int) -> number of grating sweeps (up-down)
C_CYC (int) -> number of chop cycles per grating position
P_STRT (int) -> primary array start
P_PSUP (int) -> primary array number of positions going up
P_SZUP (int) -> primary array step size on the way up
RAMPLN (int) -> ramp length
CHOPLN (int) -> chop length
C_AMP (float) -> chop amplitude
CHANNEL (str) -> channel 'RED' or 'BLUE'
success (bool) -> True indicates all parameters are valid
"""
x = {'success': True}
header = hdul[0].header
chop_scheme = str(header.get('C_SCHEME', 'UNKNOWN')).strip()
fname = header.get('FILENAME', 'UNKNOWN')
if chop_scheme != '2POINT':
log.error("Invalid chopper scheme for file %s" % fname)
x['success'] = False
try:
x['C_SCHEME'] = int(chop_scheme[0])
except ValueError:
x['success'] = False
x['C_SCHEME'] = None
x['channel_index'] = channel_index
x['sample_index'] = sample_index
x['ramp_index'] = ramp_index
x['CHANNEL'] = get_channel(hdul, channel_index=x['channel_index'])
if x['CHANNEL'] not in ['RED', 'BLUE']:
log.error("Invalid spectral channel for file %s" % fname)
x['success'] = False
hdinsert(header, 'CHANNEL', x['CHANNEL'], comment='Detector channel')
x['CHOPLN'] = header.get('C_CHOPLN')
x['C_AMP'] = header.get('C_AMP', 0)
for key in ['G_STRT', 'G_PSUP', 'G_SZUP', 'G_PSDN',
'G_SZDN', 'G_CYC', 'C_CYC', 'RAMPLN']:
color_key = key + '_%s' % ('R' if x['CHANNEL'] == 'RED' else 'B')
x[key] = header.get(color_key)
x['PRIMARAY'] = header.get('PRIMARAY', 'UNKNOWN').upper().strip()
suffix = 'R' if x['PRIMARAY'] == 'RED' else 'B'
for key in ['STRT', 'PSUP', 'SZUP']:
x['P_' + key] = header.get('G_%s_%s' % (key, suffix))
for key, value in x.items():
if key in ['CHANNEL', 'PRIMARAY', 'success']:
continue
if not valid_num(value) or float(value) == -9999:
log.error("Invalid %s for file %s" % (key, fname))
x['success'] = False
else:
dtype = float if key == 'C_AMP' else int
x[key] = dtype(value)
rl, cl = x['RAMPLN'], x['CHOPLN']
# ramp length and chop length must be integer multiples of each other
try:
lengths = sorted([rl, cl])
except TypeError:
log.error("Bad choplength or ramplength")
x['success'] = False
else:
if int(lengths[1] / lengths[0]) * lengths[0] != lengths[1]:
log.error(
"Choplength is not an integer multiple/quotient of ramplength")
x['success'] = False
if rl == cl:
log.error("Unexpected 2POINT chop, case 1: "
"ramplength = choplength for file %s" % fname)
x['success'] = False
if (rl > cl) and (rl // cl) % 2 == 0:
log.error("Unexpected 2POINT chop, case 2: "
"ramplength = 2*n*choplength where "
"n = 1, 2, 3... for file %s" % fname)
x['success'] = False
# if c_amp is zero, the chop length is irrelevant - set to rampln
if x['C_AMP'] == 0:
x['CHOPLN'] = x['RAMPLN']
return x
[docs]
def trim_data(hdul, params):
"""
Remove partial ramps and "unpaired" chop plateaus.
Note that 4POINT chop is untested
Assumptions:
1. file starts with chop0
2. chop0 and a following chop1 (2POINT) / chop2 (4POINT)
makes a pair (chop1 and chop3 makes another pair for
4POINT chop scheme).
3. no missing ramps except at beginning or end of file
Parameters
----------
hdul : fits.HDUList
params : dict
Returns
-------
fits.HDUList
"""
# get scan position data if present
if 'SCANPOS' in hdul:
posdata = hdul['SCANPOS'].data
else:
posdata = None
# remove partial ramps
data = hdul[1].data
head = np.argwhere(data['header'][:, params['sample_index']] == 0)
head = 0 if len(head) == 0 else head[0, 0]
last_frame = data['header'][-1][params['sample_index']]
tail = len(data)
if last_frame != (params['RAMPLN'] - 1):
last_ramp = data['header'][-1][params['ramp_index']]
tail = np.argwhere(data['header'][:, params['ramp_index']]
>= last_ramp)
tail = tail[tail > head]
tail = tail[0] if len(tail) != 0 else len(data)
data = data[head:tail]
if posdata is not None:
posdata = posdata[head:tail]
# Remove unpaired chops
# number of ramps per chop phase or number of ramps to co-add in fit_ramps
n = params['CHOPLN'] // params['RAMPLN']
if n > 0:
index = data['header'][:, params['ramp_index']] // n
chop_phase = index % params['C_SCHEME']
head = np.argwhere(chop_phase == 0)
head = head[0, 0] if len(head) != 0 else 0
tail = np.argwhere(chop_phase == np.nanmax(chop_phase))
tail = (tail[-1, 0] + 1) if len(tail) != 0 else len(chop_phase)
data = data[head:tail]
if posdata is not None:
posdata = posdata[head:tail]
hdu1 = fits.BinTableHDU(data, header=hdul[1].header)
hdul[1] = hdu1
if posdata is not None:
hdul['SCANPOS'] = fits.BinTableHDU(posdata, name='SCANPOS')
return hdul
[docs]
def name_output_files(hdul):
"""
Names the split files.
Parameters
----------
hdul : fits.HDUList
Returns
-------
2-tuple
str : chop0 filename
str : chop1 filename
"""
filenum = hdul[0].header.get('FILENUM', 'UNKNOWN')
aorid = hdul[0].header.get('AOR_ID', 'UNKNOWN').strip().replace('_', '')
if aorid == '':
aorid = 'UNKNOWN'
flight = hdul[0].header.get('MISSN-ID', 'UNKNOWN').split('_')[-1][1:]
if valid_num(flight):
flight = 'F%04i' % int(flight)
else:
flight = 'UNKNOWN'
channel = get_channel(hdul)
if channel in ['RED', 'BLUE']:
channel = channel[:3]
else:
channel = 'UN'
result = tuple('%s_FI_IFS_%s_%s_CP%i_%s.fits' %
(flight, aorid, channel, i, filenum) for i in range(2))
return result
[docs]
def separate_chops(hdul, params):
"""
Separate data into different files, by chop index.
Parameters
----------
hdul : fits.HDUList
params : dict
Returns
-------
list of fits.HDUList
"""
if params['RAMPLN'] > params['CHOPLN']:
log.error("Case where ramplength > choplength not accounted for")
return
elif params['RAMPLN'] == params['CHOPLN'] and params['C_AMP'] != 0:
log.error("Case where ramplength >= choplength not accounted for")
return
# Make arrays of inductosyn positions
pos = params['G_STRT'] + np.arange(params['G_PSUP']) * params['G_SZUP']
# For reference in spatial offset calculations, later
prime = params['P_STRT'] + np.arange(params['P_PSUP']) * params['P_SZUP']
# This shouldn't happen, but just in case: if the number of primary
# positions is somehow less than the number of secondary positions,
# extend the array and copy the last primary position into all
# remaining elements
if len(prime) < len(pos):
prime = np.concatenate(
(prime, np.full(len(pos) - len(prime), prime[-1])))
n = params['CHOPLN'] // params['RAMPLN'] # ramps per chop phase
data = hdul[1].data
readouts = len(data)
log.debug("number of readouts = %i" % readouts)
log.debug("ramplength = %i" % params['RAMPLN'])
log.debug("chop amplitude = %f" % params['C_AMP'])
binsize = readouts // (n * (params['G_PSUP'] + params['G_PSDN']))
log.debug("grating and chop binsize = %i" % binsize)
# get scan position data if present
if 'SCANPOS' in hdul:
posdata = hdul['SCANPOS'].data
else:
posdata = None
outfiles = name_output_files(hdul)
if params['C_AMP'] != 0:
data_header = hdul[1].data['HEADER']
chop = ((data_header[:, params['ramp_index']] // n).astype(int)) % 2
else:
chop = np.zeros(len(data), dtype=int)
nodpos = hdul[0].header.get('NODPOS', 0)
fname = hdul[0].header.get('FILENAME', 'UNKNOWN')
result = []
for idx, outfile in enumerate(outfiles):
if not (chop == idx).any():
continue
hdu0 = hdul[0].copy()
primehead = hdu0.header
chop_idx = np.argwhere(chop == idx).ravel()
primehead['HISTORY'] = '----------------------'
primehead['HISTORY'] = 'Data reduction history'
primehead['HISTORY'] = '----------------------'
primehead['HISTORY'] = 'Chops split into separate filenames'
primehead['HISTORY'] = 'Grating scans split into separate extensions'
hdinsert(primehead, 'NGRATING', params['G_PSUP'],
'Number of grating positions')
hdinsert(primehead, 'CHOPNUM', idx, 'Chop number')
hdinsert(primehead, 'PRODTYPE', 'grating_chop_split')
hdinsert(primehead, 'FILENAME', outfile)
newhdul = fits.HDUList([hdu0])
for i in range(params['G_PSUP']):
try:
ext = data[chop_idx[(i * binsize): (i + 1) * binsize]]
except (IndexError, TypeError):
log.error("Array data does not match header "
"data for file %s" % fname)
return
image_data = ext['DATA']
image_hdr = fits.ImageHDU(image_data).header
t = Time(datetime.utcnow(), format='datetime').isot.split('.')[0]
image_hdr['DATE'] = t, 'Creation UTC data of FITS header'
image_hdr['INDPOS'] = pos[i], 'Inductosyn position '
image_hdr['INDPOS_P'] = (
prime[i], 'Prime array inductosyn position')
image_hdr['CHOPNUM'] = idx
image_hdr['NODPOS'] = nodpos
image_hdr['BUNIT'] = ('adu', 'Data units')
newext = fits.ImageHDU(
image_data, name=f'FLUX_G{i}',
header=image_hdr)
newhdul.append(newext)
if posdata is not None:
posd = posdata[chop_idx[(i * binsize): (i + 1) * binsize]]
newhdul.append(
fits.BinTableHDU(posd, name=f'SCANPOS_G{i}'))
result.append(newhdul)
return result
def _derive_positions(hdul, params):
"""
Derive position data for OTF scans.
Parameters
----------
hdul : fits.HDUList
Input OTF A nod data.
params : dict
Split parameters, as returned from get_split_params.
Returns
-------
fits.HDUList
Updated HDUList with 'SCANPOS' binary table attached.
"""
header = hdul[0].header
# sample size
data = hdul[1].data['DATA']
nreadout = data.shape[0]
# time keys
# start of frame 1
unixstart = header['UNIXSTRT']
# start of scan motion
otfstart = header['OTFSTART']
# data rate (sec / frame)
alpha = header['ALPHA']
# scan duration
duration = header['TRK_DRTN']
# first valid frame/ramp after motion starts
frame1 = int(np.ceil((otfstart - unixstart) / alpha))
ramp_start = frame1 - frame1 % params['RAMPLN']
# check start and end values.
# If bad, warn but allow it
if ramp_start < 0 or frame1 < 0:
log.warning(f"Bad OTF keywords for file "
f"{header.get('FILENAME', 'UNKNOWN')}: calculated "
f"scan start {ramp_start} < 0.")
log.warning('Check UNIXSTRT, OTFSTART.')
log.warning('Setting scan start to start of readouts.')
ramp_start = 0
frame1 = 0
# last valid frame/ramp after motion ends
frame2 = int(np.floor(frame1 + duration / alpha))
ramp_end = frame2 + params['RAMPLN'] - frame2 % params['RAMPLN'] - 1
if ramp_end >= nreadout:
log.warning(f"Bad OTF keywords for file "
f"{header.get('FILENAME', 'UNKNOWN')}: calculated scan "
f"end {ramp_end} > {nreadout} readouts")
log.warning('Check UNIXSTRT, OTFSTART, TRK_DRTN.')
log.warning('Setting scan end to end of readouts.')
ramp_end = nreadout - 1
# flag useful range
log.debug(f'Useful OTF range: readout {ramp_start} to {ramp_end} '
f'out of {nreadout}')
flag = np.full(nreadout, True)
flag[:ramp_start] = False
flag[ramp_end + 1:] = False
# compute UNIX time for center of ramp
ftime = np.full(nreadout, unixstart, dtype=float)
ftime += np.arange(nreadout, dtype=float) * alpha + alpha / 2.0
# update exptime in header, subtracting time for unusable data
header['EXPTIME'] -= (~flag).sum() * alpha
# scan speed in RA/Dec directions, in arcsec/sec
obslamv = header['OBSLAMV']
obsbetv = header['OBSBETV']
# check for zero speed in either direction
if np.allclose(obslamv, 0):
obslamv = 0.0
if np.allclose(obsbetv, 0):
obsbetv = 0.0
# base position
dlam_base = header['DLAM_MAP']
dbet_base = header['DBET_MAP']
# update positions for motion: allow extrapolation
# within the first and last valid ramps
motion_index = np.arange(nreadout) - frame1
dlam = dlam_base + obslamv * alpha * motion_index
dbet = dbet_base + obsbetv * alpha * motion_index
# set to assumed position beyond ramp ends
dlam[:ramp_start] = dlam[ramp_start]
dbet[:ramp_start] = dbet[ramp_start]
dlam[ramp_end + 1:] = dlam[ramp_end]
dbet[ramp_end + 1:] = dbet[ramp_end]
positions = Table()
positions['DLAM_MAP'] = dlam
positions['DBET_MAP'] = dbet
positions['FLAG'] = flag
positions['UNIXTIME'] = ftime
hdul.append(fits.BinTableHDU(positions, name='SCANPOS'))
return hdul
[docs]
def split_grating_and_chop(filename, write=False, outdir=None):
"""
Split FIFI-LS raw data file into separate FITS files.
Files are split based on the chop cycle, with each of the grating
positions in a separate FITS extension in each file.
The procedure is:
1. Read a raw data FITS file
2. (optional) Check header for compliance with SOFIA requirements.
Abort if failure.
3. Reorganize data according to chopper phase and inductosyn
position.
4. (optional) Write output files to disk
FITS files written to disk will each contain data from a single
chopper phase. The typical case is a 2-point chop, for which two
FITS files are written. No-chop mode is also supported, in which
case one file is written to disk. The zeroth extension in the output
file contains the prime header only. There are n_scan image
extensions, containing a header (includes the keyword INDPOS,
indicating the inductosyn positions of the scan) and a data array,
with dimensions [26, 18] x (readout frames).
For A nods taken in OTF (scanning) mode, an additional binary table is
attached to the file, containing the on-sky position for each readout
sample. The table has columns DLAM_MAP, DBET_MAP, FLAG, and UNIXTIME,
where DLAM_MAP and DBET_MAP indicate the RA and Dec offset from the
base position, respectively, and UNIXTIME indicates the time in UNIX
seconds for the readout. The FLAG column holds a Boolean value
indicating scanning status: if True, the telescope was in scanning
motion for the readout; if False, scanning motion had either not
begun, or had stopped prior to the readout. Readouts for which the
FLAG is False may not have accurate position data.
Only 2-point chopper schemes are allowed (C_SCHEME='2POINT'). If the
chop amplitude (C_AMP) is zero, it will be treated as a no-chop
("total power") observation.
The output file name is generated from the flight number, AOR-ID,
detector channel, and raw file number, as::
chop 0 = <FLIGHT>_FI_IFS_<AOR-ID>_<DETCHAN>_CP0_<FILENUM>.fits
chop 1 = <FLIGHT>_FI_IFS_<AOR-ID>_<DETCHAN>_CP1_<FILENUM>.fits
The prime header is also modified from the original, to conform to
SOFIA standards (via fifi_ls.make_header).
Parameters
----------
filename : str
File path to the raw FITS data file.
write : bool, optional
If True, write generated HDU Lists to file
outdir : str, optional
Name of the output path. By default new files will be saved
to the same directory as `filename`.
Returns
-------
tuple of fits.HDUList or tuple of str
Contains HDULists is write is False, otherwise paths to output
files
"""
if isinstance(outdir, str):
if not os.path.isdir(outdir):
log.error("Output directory %s does not exist" % outdir)
return
hdul = gethdul(filename, verbose=True)
if hdul is None:
return
if isinstance(filename, str):
# input file is string: standardize header
new_header = make_header(hdul[0].header)
if new_header is None:
log.error('Problem updating header')
else:
# assume header is already standardized;
# get filename from header
filename = hdul[0].header['FILENAME']
if not isinstance(outdir, str):
outdir = os.path.dirname(filename)
if len(hdul) < 2:
log.error("HDUList missing extension 1 for file %s" % filename)
return
if hdul[1].header.get('XTENSION').strip().upper() != 'BINTABLE':
log.error("Expected BINTABLE extension in extension 1 for file %s" %
filename)
return
extname = hdul[1].header.get('EXTNAME', 'UNKNOWN').strip().upper()
if extname != 'FIFILS_RAWDATA':
log.error("Can only split FIFILS_rawdata: "
"extension 1 EXTNAME = %s for %s" % (extname, filename))
return
colnames = hdul[1].data.columns.names
colnames = [x.strip().upper() for x in colnames]
if 'HEADER' not in colnames or 'DATA' not in colnames:
log.error('Missing expected DATA and HEADER columns in BINTABLE %s' %
filename)
return
params = get_split_params(hdul)
if not params['success']:
log.error('Problem getting split parameters')
return
# attach a position offset table for OTF mode scans
instmode = str(hdul[0].header.get('INSTMODE', 'UNKNOWN')).upper()
nodbeam = str(hdul[0].header.get('NODBEAM', 'B')).upper().strip()
if 'OTF' in instmode and nodbeam == 'A':
try:
hdul = _derive_positions(hdul, params)
except KeyError:
log.error("Missing required keywords for OTF position data")
hdul = None
if hdul is None:
log.error('Problem reading OTF data data')
return
hdul = trim_data(hdul, params)
if hdul is None:
log.error('Problem trimming data')
return
hduls = separate_chops(hdul, params)
if hduls is None:
log.error('Problem separating chops')
return
if not write:
return hduls
else:
result = []
for hdul in hduls:
result.append(write_hdul(hdul, outdir=outdir, overwrite=True))
return result
def split_wrap_helper(_, kwargs, filename):
return split_grating_and_chop(filename, **kwargs)
[docs]
def wrap_split_grating_and_chop(files, outdir=None,
allow_errors=False, write=False,
jobs=None):
"""
Wrapper for split_grating_and_chop over multiple files.
Parameters
----------
files : array_like of str
list of filepaths to FIFI-LS FITS files
outdir : str, optional
name of the output path. If None, will be saved to the same
directory as the input file used to generate the split files.
allow_errors : bool, optional
If True, return all created files on error. Otherwise, return None
write : bool, optional
If True, write the output to disk and return the filename instead
of the HDU.
jobs : int, optional
Specifies the maximum number of concurrently running jobs.
Values of 0 or 1 will result in serial processing. A negative
value sets jobs to `n_cpus + 1 + jobs` such that -1 would use
all cpus, and -2 would use all but one cpu.
Returns
-------
tuple of str
output filenames written to disk
"""
if isinstance(files, str):
files = [files]
if not hasattr(files, '__len__'):
log.error("Invalid input files type (%s)" % repr(files))
return
kwargs = {'outdir': outdir, 'write': write}
output = multitask(split_wrap_helper, files, None, kwargs,
jobs=jobs)
failure = False
result = []
for x in output:
if x is None:
failure = True
else:
result.extend(x)
if failure:
if len(result) > 0:
if not allow_errors:
log.error("Errors were encountered but the following "
"files were created:\n%s" % '\n'.join(result))
return
return tuple(result)