# Licensed under a 3-clause BSD style license - see LICENSE.rst
import argparse
import datetime
import os
import pathlib
import logging
from typing import List, Tuple, Union, Optional, Dict
import astropy.io.fits as pf
from sofia_redux.visualization import log
from sofia_redux.visualization import signals, setup
from sofia_redux.visualization.display import view
from sofia_redux.visualization.models import model
from sofia_redux.visualization.utils.logger import StreamLogger
from sofia_redux.visualization.utils.eye_error import EyeError
try:
from PyQt6 import QtCore, QtGui, QtWidgets
except ImportError:
HAS_PYQT6 = False
QtCore, QtGui = None, None
# duck type parents to allow class definition
class QtWidgets:
class QWidget:
pass
else:
HAS_PYQT6 = True
__all__ = ['Eye']
[docs]
class Eye(object):
"""
Run the Eye of SOFIA.
This class provides the primary control interface to the display
functions provided by the Eye viewer. It is intended to be
instantiated from the standalone interface
(`sofia_redux.visualization.controller`) or the pipeline
interface (`sofia_redux.visualization.redux_viewer`), but may
also be directly instantiated and controlled, via the API interface.
Parameters
----------
args : argparse.Namespace, optional
Command-line arguments to pass to the Eye interface. Expected
attributes are 'filenames', containing a list of input files
to load, 'log_level' specifying a terminal log level (e.g. 'DEBUG'),
or 'system_logs' = True, to specify that logs should be written
to disk.
view_ : `view.View`, optional
A previously instantiated View object, to register with the
Eye controller. If not provided, a new View instance will be
generated.
"""
def __init__(self, args=None, view_=None):
if not HAS_PYQT6: # pragma: no cover
raise ImportError('PyQt6 package is required for the Eye.')
# set terminal log level
if hasattr(args, 'log_level'):
self.log_level = str(args.log_level).upper()
else:
self.log_level = 'CRITICAL'
self._setup_log_terminal()
self.models = dict()
self.model_index = 0
if view_ is None:
self.signals = signals.Signals()
self.view = view.View(self.signals)
else:
self.signals = view_.signals
self.view = view_
self.setup_eye()
# set up log file if desired, must be after setup_eye,
# because that cleans the handlers
if hasattr(args, 'system_logs') and args.system_logs:
self._setup_log_file()
if args:
log.debug('Applying command line arguments')
self._apply_args(args)
@staticmethod
def _setup_log_file() -> None:
"""Setup the system log file handler."""
# set overall log level to debug
log.setLevel('DEBUG')
# make hidden home directory if necessary
base_loc = os.path.expanduser(os.path.join('~', '.eye_of_sofia',
'event_logs'))
try:
os.makedirs(base_loc, exist_ok=True)
except IOError:
raise IOError(f'Unable to create log directory at {base_loc}')
# make log file name in home directory from current time
template = os.path.join(
base_loc, 'eos_event_%Y-%m-%d_%H-%M-%S.log')
fname = datetime.datetime.now().strftime(template)
fhand = logging.FileHandler(fname, 'at')
fhand.setLevel('DEBUG')
fhand.setFormatter(logging.Formatter(
"%(asctime)s - %(origin)s - %(levelname)s - %(message)s"))
log.addHandler(fhand)
log.debug(f'Event log initiated at: {fname}')
def _setup_log_terminal(self) -> None:
"""Set logging level for the stream logger."""
# terminal is typically CRITICAL only but may
# be overridden in arguments
for hand in log.handlers:
if isinstance(hand, StreamLogger):
try:
hand.setLevel(self.log_level)
except ValueError:
# Invalid log level passed -> ignore
pass
[docs]
def open_eye(self) -> None:
"""Open the GUI."""
log.debug('Opening Eye')
self.view.open_eye()
[docs]
def close(self) -> None:
"""Close the GUI."""
self.view.close()
[docs]
def reset(self) -> None:
"""
Reset to an empty view.
Data is unloaded and all viewer panes are deleted.
"""
self.unload()
self.view.reset()
[docs]
def deleteLater(self) -> None:
"""Delete the view when the control is deleted."""
self.view.deleteLater()
[docs]
def setup_eye(self) -> None:
"""Setup the GUI and signal connections."""
obj = setup.Setup(self)
obj.setup_all()
log.debug('Setup Eye')
def _apply_args(self, args: argparse.Namespace) -> None:
"""Initialize the Eye with command line arguments."""
if hasattr(args, 'filenames') and args.filenames:
log.info('Reading in files from command line')
self.add_data(filenames=args.filenames)
[docs]
def add_data(self, filenames: Optional[str] = None) -> None:
"""
Add data.
Parameters
----------
filenames : list of str, optional
Absolute paths of FITS files to add to the Eye.
If not provided, prompt the user for the filename.
"""
if not filenames:
filenames = QtWidgets.QFileDialog.getOpenFileNames(
self.view, caption="Select Data File(s)",
filter="FITS files (*.fits *.txt);;"
"All files (*)")[0]
if filenames:
added = False
for fname_i in filenames:
fname = os.path.normpath(fname_i)
log.debug(f'Adding data from {fname}')
added_id = self._add_model(filename=fname)
if added_id is not None:
added = True
self.view.add_filename(added_id, fname)
if added:
self.signals.atrophy.emit()
def _add_model(self, filename: str = '',
hdul: Optional[pf.hdu.hdulist.HDUList] = None,
return_filename: bool = False
) -> Optional[Union[str, Tuple[str, str],
Tuple[None, str]]]:
"""
Create a Model from provided data and add it to the Eye.
The data can be provided by either a filename
or the HDUL itself, but only one can be given.
Parameters
----------
filename : str, optional
Name of the file to read.
hdul : astropy.io.fits.HDUList, optional
HDU list contents of a FITS file
Returns
-------
filename : str
Name of the file loaded, which also serves as the
key to find the model in self.models.
Raises
------
RuntimeError :
Raised if none or both `filename` and `hdul` are
provided.
"""
if filename and hdul is not None:
raise RuntimeError('Eye._add_model can only accept `filename` '
'or `hdu`, not both')
if filename:
if os.path.isfile(filename):
args = {'filename': filename}
log.debug(f'Adding model from filename {filename}')
else:
log.warning(f'No such file: {filename}')
if return_filename:
return None, ''
else:
return None
elif hdul is not None:
args = {'hdul': hdul}
log.debug('Adding model from hdul')
try:
filename = hdul.filename()
except TypeError:
filename = hdul[0].header.get('FILENAME', None)
finally:
if not filename:
filename = hdul[0].header.get('FILENAME', 'UNKNOWN')
log.debug(f'Found filename: {filename}')
else:
raise RuntimeError('Need to provide either a filename or HDUL')
if return_filename:
result = (None, filename)
else:
result = None
preexisting = any([m.filename == filename
for m in self.models.values()])
if preexisting:
return result
else:
try:
m = model.Model.add_model(**args)
except FileNotFoundError:
log.warning(f'No such file: {filename}')
return result
except (NotImplementedError, OSError,
RuntimeError, KeyError) as err:
log.debug(f'Error encountered: {str(err)}')
log.warning('Input data is not supported.')
return result
except EyeError as err:
log.warning(str(err))
return result
else:
log.debug(f'Model index: {self.model_index}')
m.index = self.model_index
self.model_index += 1
self.models[m.id] = m
if return_filename:
return m.id, filename
else:
return m.id
# API
[docs]
def set_parent(self, parent: QtWidgets.QWidget) -> None:
"""
Set the parent widget for the view.
Parameters
----------
parent : QtWidgets.QWidget
The parent widget.
"""
self.view.parent = parent
[docs]
def load(self, data_list: list) -> None:
"""
Load a list of data files into the Eye.
Parameters
----------
data_list : list of str or astropy.io.fits.HDUList
Data to load.
"""
log.debug(f'Loading file list ({len(data_list)} items)')
if not isinstance(data_list, list):
data_list = [data_list]
for data in data_list:
if isinstance(data, (str, pathlib.Path)):
try:
hdul = pf.open(data, memmap=False)
except IOError:
raise FileNotFoundError(f'File {data} not found')
log.debug(f'Loading from filename {data}')
elif isinstance(data, pf.hdu.hdulist.HDUList):
hdul = data
log.debug(f'Loading HDUList directly {data}')
else:
message = (f'Eye.load can only accept filenames or '
f'HDUList objects. Provided {type(data)}')
log.error(message)
raise TypeError(message)
added_id, filename = self._add_model(hdul=hdul,
return_filename=True)
hdul.close()
if added_id is not None:
self.view.add_filename(added_id, filename)
self.signals.atrophy.emit()
[docs]
def unload(self) -> None:
"""Remove all loaded data."""
model_ids = list(self.models.keys())
self.remove_data(model_ids=model_ids)
[docs]
def add_panes(self, layout='grid', n_panes=1, kind='spectrum') -> None:
"""
Create blank panes in the figure.
Parameters
----------
layout : ['grid', 'rows', 'columns'], optional
Layout strategy.
n_panes : int, optional
Number of panes to create.
kind : ['spectrum', 'image'], optional
Type of pane to create. Can also be a list of length
`rows` * `cols` to create a mixture.
Raises
------
ValueError :
If the layout to create cannot be inferred from the
provided arguments.
"""
if kind is None:
raise ValueError('Must specify pane type with `kind` '
'keyword.')
if isinstance(kind, list):
if len(kind) != n_panes:
raise ValueError('Length of `kind` must be either one '
'or the number of panes being added.')
elif isinstance(kind, str):
if kind not in ['spectrum', 'onedim',
'image', 'twodim']:
raise ValueError(f'Invalid kind: {kind}')
else:
kind = [kind] * n_panes
self.view.add_panes(n_panes, kind=kind, layout=layout)
[docs]
def number_panes(self) -> int:
"""
Retrieve the number of open panes.
Returns
-------
int
The pane count.
"""
return self.view.pane_count()
[docs]
def get_pane_layout(self) -> Union[None, Tuple[int, int]]:
"""
Retrieve the current pane layout.
Returns
-------
geometry : tuple of int, or None
If there is an active layout, (nrow, ncol) is returned.
Otherwise, None.
"""
return self.view.pane_layout()
[docs]
def assign_data(self, mode: str,
indices: Optional[List[int]] = None) -> None:
"""
Assign models to panes.
Parameters
----------
mode : ['split', 'first', 'last', 'assigned']
How to assign the data. `Split` will split
the models equally between the panes, `first`
will assign all the models to the first pane,
`last` will assign all the models to the last
pane, and `assign` will set the models
according to the values in `indices`.
indices : list of int, optional
List of pane indices to which models should be
assigned. Must match the length of the current list
of loaded models. Must contain valid pane index values.
Raises
------
ValueError :
If an invalid mode is provided, or indices do not match
models or panes.
"""
log.debug(f'Assigning data to panes using {mode}')
possible_modes = ['split', 'first', 'last', 'assigned']
if mode not in possible_modes:
raise ValueError(f'Invalid data assignment mode {mode}. '
f'Valid modes: {possible_modes}')
elif mode == 'assigned':
if not isinstance(indices, list):
raise ValueError(f'Invalid format of `indices` '
f'{type(indices)}. Must be a list')
if len(indices) != len(self.models):
raise ValueError(f'Length of `indices` must match number of '
f'models ({len(indices)} !='
f' {len(self.models)})')
if not all([0 <= i < self.number_panes() for i in indices]):
raise ValueError('Values in `indices` must be valid values '
'corresponding to panes')
self.view.assign_models(mode, self.models, indices)
[docs]
def models_per_pane(self) -> List[int]:
"""
Retrieve the number of models in all active panes.
Returns
-------
model_count : list of int
The model count for each pane.
"""
return self.view.models_per_pane()
[docs]
def set_current_pane(self, pane_id: int) -> None:
"""
Set the current pane in the view.
Parameters
----------
pane_id : int
Pane index to make current.
"""
self.view.set_current_pane(pane_id)
[docs]
def set_fields(self, x_field: Optional[str] = None,
y_field: Optional[str] = None,
z_field: Optional[str] = None,
fields: Optional[str] = None,
panes: Optional[Union[str, List[int]]] = 'all') -> None:
"""
Set the axis fields to show for panes.
Either `fields` or `x_field`, `y_field`, and `z_field`
should be set.
Parameters
----------
x_field : str, optional
The x field to set, if not provided in `fields`.
y_field : str, optional
The y field to set, if not provided in `fields`.
z_field : str, optional
The z field to set, if not provided in `fields`.
fields : dict, optional
Should contain keys 'x', 'y', and 'z', specifying
the field strings as values.
panes : str, None, or list of int, optional
May be set to 'all' to apply to all panes, None
to apply only to the current pane, or a list of
pane indexes to modify.
"""
if fields is None:
fields = {'x': x_field, 'y': y_field, 'z': z_field}
for field in fields.values():
if field is not None:
if not isinstance(field, str):
raise TypeError('Fields must be strings')
self.view.set_fields(fields=fields, panes=panes)
[docs]
def get_fields(self, panes: Optional[Union[str, List[int]]] = 'all'
) -> List:
"""
Get the currently displayed fields for current panes.
Parameters
----------
panes : str, None, or list of int, optional
May be set to 'all' to apply to all panes, None
to apply only to the current pane, or a list of
pane indexes to modify.
Returns
-------
fields : list of list of dict
List of fields for each pane specified.
"""
return self.view.get_fields(panes)
[docs]
def set_units(self, units: Dict,
panes: Optional[Union[str, List[int]]] = 'all') -> None:
"""
Set new units for specified panes.
Parameters
----------
units : dict
New units to apply. Should contain 'x', 'y' keys.
panes : str, None, or list of int, optional
May be set to 'all' to apply to all panes, None
to apply only to the current pane, or a list of
pane indexes to modify.
"""
if not isinstance(units, dict):
raise TypeError('Provided units must be dict')
self.view.set_units(units=units, panes=panes)
[docs]
def get_model_backup(self):
"""
Return a copy of raw loaded data.
"""
self.view.model_backup(self.models)
[docs]
def get_units(self, panes: Optional[Union[str, List[int]]] = 'all') -> \
List:
"""
Get the current units for active panes.
Parameters
----------
panes : str, None, or list of int, optional
May be set to 'all' to apply to all panes, None
to apply only to the current pane, or a list of
pane indexes to modify.
Returns
-------
units : list of list of dict
List of units for each pane specified.
"""
return self.view.get_units(panes)
[docs]
def set_orders(self, orders: Dict) -> None:
"""
Set orders to enable.
Parameters
----------
orders : dict
Keys are model IDs, values are lists of orders to enable.
"""
if not isinstance(orders, dict):
raise TypeError('Provided orders must be dict')
self.view.set_orders(orders)
[docs]
def get_orders(self, panes: Optional[Union[str, List[int]]] = 'all') -> \
Dict:
"""
Get enabled orders.
Parameters
----------
panes : str, None, or list of int, optional
May be set to 'all' to apply to all panes, None
to apply only to the current pane, or a list of
pane indexes to modify.
Returns
-------
orders : dict
Keys are model IDs, values are lists of enabled orders.
"""
return self.view.get_orders(panes)
[docs]
def set_scale(self, scales: Dict,
panes: Optional[Union[str, List[int]]] = 'all') -> None:
"""
Set scale setting for active panes.
Parameters
----------
scales : dict
Keys are 'x' and 'y', values are 'linear' or 'log'.
panes : str, None, or list of int, optional
May be set to 'all' to apply to all panes, None
to apply only to the current pane, or a list of
pane indexes to modify.
"""
if not isinstance(scales, dict):
raise TypeError('Provided scales must be dict')
self.view.set_scales(scales=scales, panes=panes)
[docs]
def get_scale(self,
panes: Optional[Union[str, List[int]]] = 'all') -> List:
"""
Get scale setting for active panes.
Parameters
----------
panes : str, None, or list of int, optional
May be set to 'all' to apply to all panes, None
to apply only to the current pane, or a list of
pane indexes to modify.
Returns
-------
scales : list
Keys are 'x' and 'y', values are 'linear' or 'log'.
"""
return self.view.get_scales(panes=panes)
[docs]
def toggle_controls(self) -> None:
"""Toggle the control panel visibility."""
self.view.toggle_controls()
[docs]
def toggle_cursor(self) -> None:
"""Toggle the cursor panel visibility."""
self.view.toggle_cursor()
[docs]
def toggle_file_panel(self) -> None:
"""Toggle the file panel visibility."""
self.view.toggle_file_panel()
[docs]
def toggle_order_panel(self) -> None:
"""Toggle the order panel visibility."""
self.view.toggle_order_panel()
[docs]
def toggle_axis_panel(self) -> None:
"""Toggle the axis panel visibility."""
self.view.toggle_axis_panel()
[docs]
def toggle_plot_panel(self) -> None:
"""Toggle the plot panel visibility."""
self.view.toggle_plot_panel()
[docs]
def toggle_analysis_panel(self) -> None:
"""Toggle the plot panel visibility."""
self.view.toggle_analysis_panel()
[docs]
def generate(self) -> None:
"""Initiate or refresh the view."""
self.signals.atrophy_bg_full.emit()
self.view.refresh_loop()
[docs]
def save(self, filename: str, **kwargs) -> None:
"""
Save the current view to an image.
Parameters
----------
filename : str
File path to save to.
kwargs : dict
Optional arguments to pass to `view.View.save`.
"""
log.debug(f'Saving image to {filename}')
self.view.save(filename, **kwargs)
[docs]
def remove_data(self, model_ids: Optional[List[str]] = None) -> None:
"""
Remove loaded data from the view.
Parameters
----------
filenames : list of str, optional
If not provided, currently selected files in the file panel
will be removed.
"""
if not model_ids:
model_ids = self.view.current_files_selected()
if not model_ids:
return
if not isinstance(model_ids, list):
model_ids = [model_ids]
for model_id in model_ids:
try:
del self.models[model_id]
except KeyError:
log.warning(f'File {model_id} not found')
continue
self.view.remove_data_from_all_panes(model_id)
# reset model index if count has gone to zero
if len(self.models) == 0:
self.model_index = 0
self.signals.refresh_file_table.emit()
self.signals.atrophy.emit()
[docs]
def remove_panes(self, panes: Optional[Union[str, List[int]]] = 'all'
) -> None:
"""
Remove currently displayed panes.
Parameters
----------
panes : str, None, or list of int, optional
May be set to 'all' to apply to all panes, None
to apply only to the current pane, or a list of
pane indexes to modify.
"""
self.view.remove_panes(panes)
[docs]
def display_selected_model(self) -> None:
"""Display a selected file in the current pane."""
model_ids = self.view.current_files_selected()
if not model_ids:
return
if self.view.timer:
self.view.timer.stop()
errors = list()
for model_id in model_ids:
try:
self.view.display_model(self.models[model_id])
except KeyError:
# self.view.release_atrophy()
raise RuntimeError(f'Cannot locate model {model_id}')
except EyeError:
errors.append(self.models[model_id].filename)
if errors:
if len(errors) == 1:
s = '1 file does not match pane'
else:
s = f'{len(errors)} files do not match pane.'
log.warning(s)
self.signals.atrophy.emit()
if self.view.timer:
self.view.timer.start()