# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Data storage parent class."""
from datetime import datetime
import os
import re
from astropy import log
import configobj
__all__ = ['DataParent']
[docs]
class DataParent(object):
"""
Pipeline data object.
This object stores a config file, header, and data.
"""
# Pipeline version
pipever = '3.2.0'
"""str : Pipeline version."""
def __init__(self, config=None):
"""
Initialize data object and variables.
Parameters
----------
config : `configobj.ConfigObj`, dict, str, or list of str, optional
If specified, the configuration will be loaded.
"""
# set up internal variables
self.filename = ''
self.rawname = ''
self.loaded = False
# Data Variable:
self.data = None
# Header: A dictionary. Lists for HISTORY and COMMENT entries.
self.header = {}
# retrieve config directory location from current file path:
# assumes config is in sofia_redux/instruments/hawc/data/config
# and this file is in sofia_redux/instruments/hawc
pipe_path = os.path.dirname(os.path.abspath(__file__))
self.data_path = os.path.join(pipe_path, 'data', '')
self.config_path = os.path.join(pipe_path, 'data', 'config', '')
# place holder for pipeline mode
self.mode = None
self.config = None
self.config_files = []
self.setconfig(config)
def __getattr__(self, name):
"""
Get attribute.
Allows access to these attributes:
- filenamebegin: filename start
- filenameend: filename end
- filenum: file number, extracted from file name
Each attribute is derived using the regex in the configuration
parameter of the same name (in the [data] section of the
configuration file). See config/pipeconf.cfg for an example.
Parameters
----------
name : str
The attribute to retrieve.
"""
# return filenamebegin if it's requested
if name == 'filenamebegin':
(fpath, fname) = os.path.split(self.filename)
# if filenamebegin is specified, use it
try:
filenamebegin = self.config['data']['filenamebegin']
match = re.search(filenamebegin, fname)
except (KeyError, TypeError):
match = False
filenamebegin = "None"
if match:
# found file name beginning -> return it
return os.path.join(fpath, match.group())
else:
# assume filename format is name.filestep.fits or ....fits.gz
msg = "Filename=%s doesn't match pattern=%s" % \
(fname, filenamebegin)
log.warning(msg)
extloc = fname.rfind('.f')
if (extloc < 0
or fname[extloc:]
not in ['.fts', '.fits', '.fits.gz']):
log.warning('Filename has non-fits extension')
extloc = fname.rfind('.')
if extloc < 0:
extloc = len(fname)
else:
# to add the '.'
extloc += 1
else:
# to add the '.'
extloc += 1
typeloc = fname[0:extloc - 1].rfind('.')
if typeloc < 0:
typeloc = extloc
else:
typeloc += 1
return os.path.join(fpath, fname[:typeloc])
# return filenameend if it's requested
if name == 'filenameend':
(fpath, fname) = os.path.split(self.filename)
# if filenameend is specified, use it
try:
filenameend = self.config['data']['filenameend']
match = re.search(filenameend, fname)
except (KeyError, TypeError):
match = False
if match:
# found file name end -> return it
return match.group()
else:
extloc = fname.rfind('.f')
if (extloc < 0
or fname[extloc:]
not in ['.fts', '.fits', '.fits.gz']):
log.warning('Filename has non-fits extension')
extloc = fname.rfind('.')
if extloc < 0:
extloc = len(fname)
return fname[extloc:]
# return file number if it's requested
if name == 'filenum':
(fpath, fname) = os.path.split(self.filename)
# if filenum is specified in pipeconf, use it
try:
filenum = self.config['data']['filenum']
match = re.search(filenum, fname)
except (KeyError, TypeError):
match = False
if match:
# found file num -> return first match
for m in match.groups():
if m is not None:
return m
# single match -- return it
return match.group()
else:
return None
# raise error if attribute is unknown
msg = "'%s' object has no attribute '%s'" % \
(type(self).__name__, name)
raise AttributeError(msg)
[docs]
def default_config(self):
"""
Set default configuration from config/pipeconf.cfg.
Returns
-------
`configobj.ConfigObj`
"""
# get master default config file
default = os.path.join(self.config_path, 'pipeconf.cfg')
config = configobj.ConfigObj(default)
self.config_files.append(default)
return config
[docs]
def date_override_config(self, date):
"""
Retrieve an override configuration for a particular date.
Parameters
----------
date : datetime
The date of the observation.
Returns
-------
`configobj.ConfigObj`
Any non-default parameters associated with the observation date.
"""
# get overrides for given date if provided
override_conf = None
if date is not None:
date_config = os.path.join(self.config_path, 'date_config.cfg')
dates = dict((dobj['datetime'], dobj['file'])
for dobj in configobj.ConfigObj(date_config).values())
override = None
for datekey in sorted(dates.keys()):
dateobj = datetime.strptime(datekey, "%Y-%m-%dT%H:%M:%S")
if date < dateobj:
override = dates[datekey]
break
if override is not None:
override = os.path.join(self.config_path, override)
override_conf = configobj.ConfigObj(override)
self.config_files.append(override)
log.debug('Override config file for date %s: %s' %
(date, override))
return override_conf
[docs]
def mode_override_config(self, mode):
r"""
Retrieve an override configuration for an observation mode.
Parameters
----------
mode : str
The pipeline mode to retrieve. Should be specified in the
config file with 'mode\_' prepended.
Returns
-------
`configobj.ConfigObj`
Any non-default parameters associated with the observation date.
"""
# get overrides for given date if provided
override_conf = None
if mode is not None and self.config is not None:
mode_key = "mode_%s" % mode
if mode_key in self.config:
override_conf = configobj.ConfigObj(self.config[mode_key])
return override_conf
[docs]
def setconfig(self, config=None, date=None):
"""
Set configuration for the pipe data.
The configuration object is returned. The config
parameter can be one of these:
- A ConfigObj object
- A path string containing the filename of a valid config file
- A list of path strings to valid config files. In this case,
each file is merged in order.
A default configuration will be loaded first, then the config
parameter will be merged into it.
Parameters
----------
config : `configobj.ConfigObj`, str, or list of str, optional
Configuration to merge into the default.
date : datetime, optional
If specified, additional override configurations will
be loaded for this date, if they exist, prior to loading
configurations from `config`.
Returns
-------
`configobj.ConfigObj`
The merged configuration.
"""
# first set default config.
# Anything else provided will be merged onto this config.
self.config = self.default_config()
self.mergeconfig(config=config, date=date)
return self.config
[docs]
def mergeconfig(self, config=None, date=None, mode=None):
r"""
Merge configuration into the existing configuration.
All values from the new configuration are used,
overwriting old values if they are already in the old
configuration.
The order is:
- load the default configuration (data/config/pipeconf.cfg)
- load any overrides for the observation date
- load any overrides for the pipeline mode
- load any user overrides from the config parameter
Parameters
----------
config : `configobj.ConfigObj`, dict, str or `list` of str, optional
Configuration to merge into the default.
date : datetime, optional
If specified, additional override configurations will
be loaded for this date, if they exist, prior to loading
configurations from `config`.
mode : str, optional
The pipeline mode to retrieve. Should be specified in the
config file with 'mode\_' prepended.
"""
# If there is no existing config, call setconfig first
if self.config is None:
self.setconfig(config, date)
return
# Then set any date-specific overrides
if date is not None:
override = self.date_override_config(date)
if override is not None:
self.config.merge(override)
else:
log.debug('No date config file for %s' % date)
# Then set any mode-specific overrides
if mode is not None:
override = self.mode_override_config(mode)
if override is not None:
self.config.merge(override)
# return if nothing else to do
if config is None:
return
if isinstance(config, configobj.ConfigObj) or isinstance(config, dict):
# if config is a ConfObj or dict, merge it
self.config.merge(config)
if hasattr(config, 'filename'):
self.config_files.append(config.filename)
log.debug('User config file: %s' % config.filename)
elif isinstance(config, str):
# if config is a string - check for file existence -> load it
if not os.path.isfile(config):
config = os.path.join(self.config_path, config)
if os.path.isfile(config):
config = os.path.abspath(config)
try:
user_config = configobj.ConfigObj(config)
self.config.merge(user_config)
self.config_files.append(config)
log.debug('User config file: %s' % config)
except configobj.ConfigObjError as error:
msg = 'Error while loading configuration file'
log.error('SetConfig: ' + msg)
raise error
else:
msg = '<%s> is invalid file name for configuration' % config
log.error('SetConfig: ' + msg)
raise IOError(msg)
elif isinstance(config, list):
# merge each one in the order provided
for conf in config:
self.mergeconfig(conf)
else:
raise TypeError('Unexpected type for new configuration file.')
[docs]
def get_pipe_mode(self):
"""
Get the pipeline mode.
Searches for an appropriate pipeline mode in the config file, given
the header values in the passed data. Tries to mach all key=value
pairs in the datakeys value of the mode entries in the config
file. Returns name of the first pipeline mode that matches
the data. Returns None if no matching pipeline mode found.
Returns
-------
str or None
The pipeline mode name, or None if not found.
"""
if self.config is None:
return None
for section in self.config.sections:
if section.startswith('mode_'):
# Get the datakeys and make list of lists with
# format [ [key, val], [key, val], [key,val] ]
try:
datakeys = self.config[section]['datakeys'].split('|')
except KeyError:
log.warning("In configuration, missing"
" datakeys for mode=%s" % section)
continue
datakeys = [dk.split('=') for dk in datakeys]
# Check all keywords in the file
check = True
for dk in datakeys:
try:
value = self.getheadval(dk[0].strip(), errmsg=False)
if str(value).upper().strip() != dk[1].upper().strip():
check = False
except KeyError:
check = False
if check:
log.debug('GetPipeMode: Found mode=%s' % section[5:])
# return mode name w/o 'mode_'
return section[5:]
return None
[docs]
def load(self, filename=''):
"""
Load the data from the file.
This function is not implemented for the parent class. It should
be overridden by child classes.
"""
# raise error -- this should not be called
raise NotImplementedError("No default load function for data parent.")
[docs]
def save(self, filename=''):
"""
Save the data in the object to the specified file.
This function is not implemented for the parent class. It should
be overridden by child classes.
"""
# raise error -- this should not be called
raise NotImplementedError("No default save function for data parent.")
[docs]
def copy(self):
"""
Return a copy of the current object.
Returns
-------
DataParent
"""
# create new object
out = DataParent(config=self.config)
# copy filename and header
out.filename = self.filename
out.rawname = self.rawname
out.loaded = self.loaded
out.header = self.header.copy()
# Copy data - backup if no copy() available
try:
out.data = self.data.copy()
except AttributeError:
out.data = self.data
# return message and new object
return out
[docs]
def mergehead(self, other):
"""
Merge a data object header into the current object's header.
Parameters
----------
other : DataParent
The other object.
"""
# get selfhist and otherhist lists
if 'HISTORY' in self.header:
selfhist = self.header['HISTORY']
else:
selfhist = []
if 'HISTORY' in other.header:
otherhist = other.header['HISTORY']
else:
otherhist = []
# add history keywords (no duplicates)
selfhist += [hist for hist in otherhist if hist not in selfhist]
# if there is something add write back to header
if len(selfhist):
self.header['HISTORY'] = selfhist
# get selfcomm and othercomm lists
if 'COMMENT' in self.header:
selfcomm = self.header['COMMENT']
else:
selfcomm = []
if 'COMMENT' in other.header:
othercomm = other.header['COMMENT']
else:
othercomm = []
# add comment keywords (no duplicates)
selfcomm += [comm for comm in othercomm
if comm not in selfcomm]
# if there is something add write back to header
if len(selfcomm):
self.header['COMMENT'] = selfcomm
# Go through keywords listed in headmerge: assume self is first
headmerge = self.config['headmerge']
for key in headmerge.keys():
if key in self.header and key in other.header:
selfval = self.header[key]
otherval = other.header[key]
operation = headmerge[key].upper()
if operation == 'LAST':
selfval = otherval
elif operation == 'MIN':
selfval = min(selfval, otherval)
elif operation == 'MAX':
selfval = max(selfval, otherval)
elif operation == 'SUM':
selfval += otherval
elif operation == 'OR':
selfval = selfval | otherval
elif operation == 'AND':
selfval = selfval & otherval
elif operation == 'CONCATENATE':
if ',' in str(selfval):
vlist = str(selfval).split(',')
else:
vlist = [str(selfval)]
if ',' in str(otherval):
olist = str(otherval).split(',')
else:
olist = [str(otherval)]
for oval in olist:
if oval not in vlist:
vlist.append(oval)
selfval = ','.join(sorted(vlist))
elif operation == 'DEFAULT':
if type(selfval) is str:
selfval = 'UNKNOWN'
elif type(selfval) is int:
selfval = -9999
elif type(selfval) is float:
selfval = -9999.0
self.header[key] = selfval
[docs]
def getheadval(self, key, errmsg=True):
"""
Get header value.
Returns the value of the requested key from the header.
If the key is present in the [header] section
of the configuration, that value is returned instead.
The following entries are possible in the configuration file:
- KEY = VALUE : VALUE is returned. The system checks
if value is an int or a float, else a string is returned.
- KEY = NEWKEY : The value under header[NEWKEY] is returned.
- KEY = ?_ALTKEY : If the keyword KEY is present, header[KEY] is
returned, else header[ALTKEY] is returned.
If the key can not be found in either the header or the configuration,
a KeyError is produced and a warning is issued.
Parameters
----------
key : str
The keyword value to return.
errmsg : bool, optional
Flag indicating if a log error message should be
issued if the keyword is not found. A KeyError will still
be raised if errmsg is False.
Returns
-------
int, float, or str
The header value.
Raises
------
KeyError
If the keyword is not found.
"""
val = None
# Look in the config
try:
# get the value
val = self.config['header'][key]
# Check if it's optional header replacement i.e. starts with '?_'
if val[:2] in ['?_', '? ', '?-']:
# if key is not in the header ->
# use key name under value instead
if key not in self.header:
key = val[2:].upper()
val = None
# Check if it's a Header replacement (but not T/F)
elif val[0].isalpha() and \
val[:2] not in ['T ', 'F '] and \
val not in ['T', 'F']:
log.info('Getheadval: Using %s value for %s' %
(val.upper(), key))
key = val.upper()
val = None
# Else: read value
else:
# Try as T / F
found = True
if val == 'T' or val[:2] == 'T ':
val = True
elif val == 'F' or val[:2] == 'F ':
val = False
else:
found = False
# Try as int
if not found:
try:
val = int(val)
found = True
except ValueError:
pass
# Try as float
if not found:
try:
val = float(val)
except ValueError:
pass
# If not found - just leave value as string
# update value in header
self.setheadval(key, val)
except KeyError:
# if key is not in config - continue
pass
except TypeError:
# if config is not yet loaded - issue message only
log.debug('GetHeadVal: Missing Configuration')
# Look in the header
if val is None:
# get value from header
try:
val = self.header[key]
except KeyError:
# if keyword is not found
msg = 'Missing %s keyword in header' % key
if errmsg:
log.error('GetHeadVal: %s' % msg)
raise KeyError(msg)
return val
[docs]
def setheadval(self, key, value, comment=''):
"""
Set a keyword value in the header.
Parameters
----------
key : str
The keyword to set.
value : str, int, float, or bool
The value to set.
comment : str, optional
If provided, will be set in the value of the COMMENT
keyword in the header.
"""
# If key is HISTORY or COMMENT: add to list
if key == 'HISTORY' or key == 'COMMENT':
if key in self.header:
self.header[key].append(value)
else:
self.header[key] = [value, ]
else:
# otherwise add as normal keyword
self.header[key] = value
if len(comment) > 0:
self.setheadval('COMMENT', '%s, %s' % (key, comment))
[docs]
def delheadval(self, key):
"""
Delete one or more keywords from the header.
Keywords are deleted from self.header, which defaults to the
first header in the data object.
If the keyword is HISTORY or COMMENT, then all HISTORY or COMMENT
entries will be removed.
Parameters
----------
key : str or list of str
The header keyword(s) to delete.
"""
# If key is a list, remove all entries
if isinstance(key, (list, tuple)):
for k in key:
self.delheadval(k)
# Else if it's a string delete the key - ignore any KeyError
else:
if key in self.header:
del self.header[key]