# Licensed under a 3-clause BSD style license - see LICENSE.rst
import os.path
import re
import astropy.io.fits as pf
from typing import Optional
import numpy as np
import pandas as pd
from sofia_redux.visualization import log
from sofia_redux.visualization.models import high_model
__all__ = ['Model', 'parse_general']
[docs]
class Model(object):
"""
The starting point for Eye of SOFIA models.
Data read in from FITS or non-fits files are stored in a collection
of model object in this directory. The structure of a single
complete FITS file is as follows:
- High-Level Model
- High-level models hold the contents of an entire FITS file.
All interfaces should interact with the high level
model, which controls and manages the lower level models.
- Available models are:
- Grism : for files with multiple images and a single spectrum.
- MultiOrder: for files with multiple images and multiple spectra.
- Mid-Level Models
- Mid-level models hold all data structures that would need to
be included together to be considered valid.
- Available models are:
- Book: For holding multiple images of the same target.
- Order: For holding multiple spectra of the same target.
- Low-Level Models
- Low-level models hold the simplest data sets. For example, a
single `Order` is made of multiple `Spectrum` objects, one for the
wavelength data, one for the flux data, one for the error data, etc.
All operations are performed at this level, such as unit conversions,
but only by interacting through the higher-level models. The user
should not interact with low level models directly.
- Available models are:
- Image: For holding a single 2d data set.
- Spectrum: For holding a single 1d data set.
This module contains the interface controlling the initialization
of a high level model. Once the high level model is created, the viewer
operates on that object. The `Model` object here is not to be instantiated;
it is merely used to implement the `add_model` method.
"""
[docs]
@staticmethod
def add_model(filename: str = '',
hdul: Optional[pf.HDUList] = None) -> high_model.HighModel:
"""
Parse a FITS file into the appropriate high level model.
Either `filename` or `hdul` must be specified.
Parameters
----------
filename : str, optional
Absolute path to the FITS file to read and parse.
hdul : `astropy.io.fits.HDUList`, optional
An astropy HDUList to parse.
Returns
-------
model : model.high_model.HighModel
The high level model populated with the
contents of `filename`.
Raises
------
NotImplementedError
If the current instrument/data type is not supported.
RuntimeError
If invalid arguments are passed.
"""
if filename and hdul is not None:
raise RuntimeError('Model.add_model can only accept `filename` '
'or `hdul`, not both')
if hdul is None:
if filename:
if 'fits' not in filename:
hdul = parse_general(filename)
else:
hdul = pf.open(filename, memmap=False)
else:
raise RuntimeError('Need to provide an hdul or filename.')
header = hdul[0].header
instrument = str(header.get('INSTRUME')).lower()
if instrument in ['forcast', 'flitecam']:
model = high_model.Grism(hdul)
if model.num_orders == 0:
raise NotImplementedError('Image display is not supported.')
elif instrument == 'general':
model = high_model.MultiOrder(hdul, general=True)
elif instrument == 'exes':
model = high_model.MultiOrder(hdul)
elif instrument == 'none' or instrument == '':
# Assign the instrument to 'General' when no instrument
# information has been provided.
hdul[0].header['instrume'] = 'General'
hdul[0].header['prodtype'] = 'General'
if not header.get('XUNIT') and not header.get('XUNITS'):
hdul[0].header['XUNITS'] = 'um'
if not header.get('YUNIT') and not header.get('YUNITS'):
hdul[0].header['YUNITS'] = 'Jy'
model = high_model.MultiOrder(hdul, general=True)
else:
raise NotImplementedError('Instrument is not supported')
# if true filename was supplied, store it in the model
if filename:
model.filename = filename
log.debug(f'Created model with id: {model.id}')
hdul.close()
return model
[docs]
def parse_general(filename: str) -> pf.HDUList:
"""
Parse a text-based data file.
Data is converted into a FITS-style HDUList for further handling.
Based on the number of columns in this dataset, it is parsed
accordingly. If column labels have been provided, it uses them,
otherwise, it assumes a general order of [wavelength, flux, error,
transmission, response] for the columns.
Parameters
----------
filename : str
Absolute path to the non-FITS file to read and parse.
Returns
-------
hdul_read : 'astropy.io.fits.HDUList'
An astropy HDUList
Raises
------
RuntimeError
If invalid file is passed or invalid columns in the file.
"""
header = pf.Header()
header['XUNITS'] = 'um'
header['YUNITS'] = 'Jy'
header['INSTRUME'] = 'General'
header['PRODTYPE'] = 'General'
header['FILENAME'] = os.path.basename(filename)
# determining the delimiter in the file
with open(filename, 'r') as f:
skip_rows = 0
names = list()
for line in f:
if _is_number(line.strip()):
break
else:
names = line.replace('#', '').strip()
names = re.sub(' +', ' ', names)
delimiter = re.findall(r'[,|]|\s,', names)
if delimiter:
delimiter = delimiter[0]
else:
delimiter = ' '
names = names.split(delimiter)
skip_rows += 1
break
f.close()
# Attempting to read the file using pandas
try:
data = pd.read_csv(filename, sep=r'\,|\t+|\s+', skiprows=skip_rows,
names=names, engine='python')
except pd.errors.ParserError:
raise RuntimeError('Could not parse text file') from None
try:
n_columns = data.shape[1]
except IndexError: # pragma: no cover
n_columns = 1
# Data with single column is assumed to be flux and
# is plotted against pixels.
if n_columns == 1:
# assuming its flux
wavelength = np.arange(data.shape[0])
data.insert(0, "wavepos[pixel]", wavelength)
cols = data.columns
if not str(cols[1]).isdigit():
data_new, col_wave, col_flux, col_error, col_trans, col_response = \
None, None, None, None, None, None
try:
if n_columns >= 2:
col_flux = cols[cols.str.contains('flux', flags=re.I)]
col_wave = cols[cols.str.contains('wave', flags=re.I)]
data_new = data.loc[:, [col_wave[0], col_flux[0]]]
if n_columns >= 3:
col_error = cols[cols.str.contains('err', flags=re.I,
regex=False)]
data_new = data.loc[:, [col_wave[0], col_flux[0],
col_error[0]]]
if n_columns >= 4:
col_trans = cols[cols.str.contains('tran', flags=re.I,
regex=False)]
data_new = data.loc[:, [col_wave[0], col_flux[0], col_error[0],
col_trans[0]]]
if n_columns >= 5:
col_response = cols[cols.str.contains('response', flags=re.I,
regex=False)]
data_new = data.loc[:, [col_wave[0], col_flux[0], col_error[0],
col_trans[0], col_response[0]]]
except (IndexError, ValueError, TypeError):
raise RuntimeError('Unexpected columns in text file') from None
if data_new is not None:
data = data_new
cols = data.columns
# Parsing units
if '[' in str(cols[0]):
header['XUNITS'] = cols[0][cols[0].find('[') + 1:cols[
0].find(']')]
elif '(' in str(cols[0]):
header['XUNITS'] = cols[0][cols[0].find('(') + 1:cols[
0].find(')')]
if '[' in str(cols[1]):
header['YUNITS'] = cols[1][cols[1].find('[') + 1:cols[
1].find(']')]
elif '(' in str(cols[1]):
header['YUNITS'] = cols[1][cols[1].find('(') + 1:cols[
1].find(')')]
header['NAXIS1'] = data.shape[0]
header['NAXIS2'] = data.shape[1]
header['NAPS'] = 1
header['NORDERS'] = 1
hdu_read = pf.PrimaryHDU(data.T, header)
# Converting it to hdul
hdul_read = pf.HDUList(hdu_read)
return hdul_read
def _is_number(s) -> bool:
"""
Determines if a string is a number or not.
Returns
-------
bool
True if input can be converted to float; False otherwise
"""
try:
float(s)
except ValueError:
return False
else:
return True