# Licensed under a 3-clause BSD style license - see LICENSE.rst
from configobj import ConfigObj
from copy import deepcopy
import os
from astropy import log
import fnmatch
import json
from sofia_redux import scan
from sofia_redux.scan.configuration.options import Options
from sofia_redux.scan.configuration.aliases import Aliases
from sofia_redux.scan.configuration.conditions import Conditions
from sofia_redux.scan.configuration.dates import DateRangeOptions
from sofia_redux.scan.configuration.fits import FitsOptions
from sofia_redux.scan.configuration.iterations import IterationOptions
from sofia_redux.scan.configuration.objects import ObjectOptions
from sofia_redux.scan.utilities.utils import (
dict_difference, get_range, insert_info_in_header)
__all__ = ['Configuration']
[docs]
class Configuration(Options):
r"""
A handler for configuration settings used during a SOFSCAN reduction.
The configuration for a SOFSCAN reduction is highly complex and can
involve hundreds of specific settings based on user intent or the
contents of the data being processed.
Notes
-----
*CONDITIONS*
How a reduction should proceed are handled by "conditionals". Conditions
are used to perform a set of actions or update the configuration when a
specific requirement is met. An example of such is::
[conditionals]
[[fits.SIBS_X=15.5]]
subarray = T0, R0
This means that if the SIBS_X keyword value in the FITS header for a
certain scan is equal to 15.5, then the value of subarray in the
configuration should be set to ['T0', 'R0']. The first level of
conditional keys should always be a requirement of the form
<key><operator><value> where <operator> may be replaced by any standard
Python conditional operator such as =, ==, !=, <, etc.
For SOFSCAN reductions, there are a few types of condition classes that
specifically relate to settings for a given SOFSCAN iteration, the date of
observation, the source being observed, and the values in the FITS
header. Please see :class:`DateRangeOptions`, :class:`FitsOptions`,
:class:`IterationOptions, and :class:`ObjectOptions` for further
details.
Note that all processing conditions is necessarily recursive since
triggering one condition may allow other condition requirements to be
met and so on. This is done during configuration validation, which
will indefinitely check and apply conditions until no further changes
are detected. Therefore, care should be taken when designing a
configuration such that no infinite loops are created.
*ALIASES*
A feature of the configuration is the ability to alias keys and
reference values through the :class:`Alias` class. A key may be aliased
using something like::
[aliases]
pols = correlated.polarrays
subs = correlated.subarrays
which means that the "pols" key in the configuration will always refer
to the polarrays branch of the correlated options. Values may be
referenced in place by using {?<key>} notation, where <key> refers to
any other existing key/value in the configuration. For example,
jumpdata = {?configpath}/hawc_plus/flux_jump_FS15_v3.fits.gz would
replace {?configpath} with the correct value when retrieved from the
configuration.
*COMMANDS*
There are certain keys in the configuration that are used to perform
specific actions or set attributes for certain key/values. These are
never passed into the set of key/values in the configuration and are
processed separately. There are:
- blacklist :
Never allow access to this key for any reason. It may not
be altered, retrieved, or made visible to the SOFSCAN reduction. A
blacklisted key should remain so for the entire reduction.
- whitelist :
Always allow access to this key and never allow any
modification to its value for the entire reduction.
- forget :
Temporarily disable access to this keys value by the SOFSCAN
reduction. Access may be granted by using the "recall" command.
- recall :
Allow access to a previously forgotten key.
- lock :
Do not allow any further modifications to they key values or
attributes.
- unlock :
Unlock a previously locked key.
- add :
Add a key to the configuration. This will set the value of this
key to "True" when retrieved by the reduction.
- rounds :
Set the maximum number of iterations for the SOFSCAN reduction.
The value must be an integer (or reference an integer).
- config :
Read and merge the contents of another configuration whose
file path is set as the value.
*SYNTAX*
Configuration files should be formatted according to
`configobj`. A shorthand format may be used to reference
multiple configuration levels using a dot separator. For example,
correlated.sky.gains.range = 0.3:3 is the same as setting::
[correlated]
[[sky]]
[[[gains]]]
range = 0.3:3
The "fits" key is special and is used to refer to key/values in the
FITS header. For example, fits.SPECTEL1 will refer to the SPECTEL1
value in the header, not the configuration.
"""
# The command keys are those found in the configuration contents that
# instruct the configuration to perform some task. All other keys/values
# are parsed as configuration options.
command_keys = ('blacklist', 'whitelist', 'forget', 'recall',
'lock', 'unlock', 'add', 'rounds', 'config')
# In certain circumstances, a configuration condition may be triggered that
# issues multiple commands to the main configuration object. The order
# that those commands are applied is set here, and is very important to
# fulfilling the intent of the user. For example, if an update to a locked
# value is required before it is locked again, it should be unlocked,
# updated, and then locked.
command_order = ('blacklist', 'whitelist', 'unlock', 'config', 'update',
'recall', 'rounds', 'add', 'forget', 'lock')
# These are the keys that mark all value contents as belonging to a
# particular attribute of the Configuration rather than the configuration
# body. For example, "aliases" should not be added as an option in the
# configuration, but instead be passed to the Aliases object to be used
# while processing all other settings.
section_keys = {'aliases', 'conditionals', 'date', 'iteration', 'object'}
handler_keys = section_keys.union(['fits'])
def __init__(self, configuration_path=None, allow_error=False,
verbose=False):
"""
Initialize a Configuration object for use with a sofia_scan Reduction.
Parameters
----------
configuration_path : str, optional
A path to the SOFSCAN configuration directory to be used in the
reduction. If not supplied, defaults to
<package_path>/data/configurations.
allow_error : bool, optional
If `True`, allow poorly formatted options to be skipped rather than
raising an error.
verbose : bool, optional
If `True`, issues a warning when a poorly specified option is
encountered.
"""
super().__init__(allow_error=allow_error, verbose=verbose)
self.instrument_name = None
self.enabled = False
self.config_files = []
self.aliases = Aliases()
self.conditions = Conditions()
self.dates = DateRangeOptions()
self.iterations = IterationOptions()
self.objects = ObjectOptions()
self.fits = FitsOptions()
self.current_source = None
self.current_date = None
self.locked = set()
self.disabled = set()
self.whitelisted = set()
self.forgotten = set()
self.applied_conditions = set()
self.work_path = None
self.set_error_handling(allow_error)
self.set_verbosity(verbose)
if configuration_path is None:
package_path = os.path.dirname(os.path.abspath(scan.__file__))
configuration_path = os.path.join(
package_path, 'data', 'configurations')
self.config_path = configuration_path
if not os.path.isdir(self.config_path):
self.handle_error(
f"Configuration directory not found ({self.config_path})",
error_class=NotADirectoryError)
self.options['configpath'] = None
return
self.options['configpath'] = configuration_path
[docs]
def copy(self):
"""
Return a copy of the configuration.
Returns
-------
Configuration
"""
return super().copy()
[docs]
def clear(self):
"""
Clear all options, handler options, and settings.
Returns
-------
None
"""
super().clear()
# Preserve the configuration path.
self.options['configpath'] = self.config_path
self.locked = set()
self.disabled = set()
self.whitelisted = set()
self.forgotten = set()
self.applied_conditions = set()
self.config_files = []
for handler in [self.aliases, self.conditions, self.dates,
self.iterations, self.objects, self.fits]:
handler.clear()
def __contains__(self, key):
"""
Check if a key is available in the configuration.
Note that disabled (forgotten, blacklisted) keys will be returned as
`False`.
Parameters
----------
key : str
The key to check.
Returns
-------
bool
"""
uk = self.aliases(key)
split_key = uk.split('.')
check_string = ''
for i, add_fork in enumerate(split_key):
if i == 0:
check_string = add_fork
else:
check_string += f'.{add_fork}'
if check_string in self.disabled:
return False
return self.exists(uk, options=self.options)
def __delitem__(self, key):
"""
Delete a key from the options.
Parameters
----------
key : str or object
Returns
-------
None
"""
if self.options is None:
raise KeyError("Options are not initialized.")
self.purge(key)
[docs]
def exists(self, key, options=None):
"""
Check if a given key exists in the configuration, even if disabled.
Parameters
----------
key : str
The key to check.
options : dict or ConfigObj, optional
The configuration options to check. If not supplied, uses the
current configuration options.
Returns
-------
bool
"""
if options is None:
options = self.options
if key in options:
return True
paths = self.aliases(key).split('.')
for path in paths:
if not isinstance(options, dict):
return False
options = options.get(path)
if options is None:
return False
return True
[docs]
def get(self, *args, default=None, unalias=True):
"""
Retrieve a value from the configuration.
Parameters
----------
args : tuple (str)
The configuration key levels such as
['correlated', 'sky', 'biasgains'].
default : object, optional
The value to return if not found in the configuration.
unalias : bool, optional
If `True`, unalias all keys provided in `args`.
Returns
-------
value : str or object
The returned string value if found in the configuration, or
`default` if not found.
"""
level = self.get_branch(*args, default=default, unalias=unalias)
if isinstance(level, dict):
result = level.get('value', default)
else:
result = level # is a final value
return result
[docs]
def get_branch(self, *args, default=None, unalias=True):
"""
Retrieve a configuration options branch.
Parameters
----------
args : tuple (str)
The configuration key levels such as ['correlated', 'sky'].
default : object, optional
The value to return if the branch was not found in the
configuration.
unalias : bool, optional
If `True`, unalias all keys provided in `args` and any retrieved
configuration values.
Returns
-------
branch : dict or str or object
The configuration branch if present or `default` if not.
"""
key = self.aliases('.'.join(args))
if self.is_disabled(key):
return default
branches = key.split('.')
level = self.options
for branch in branches:
if branch in level:
level = level[branch]
else:
if unalias:
return self.aliases.unalias_branch_values(self, default)
else:
return default
if unalias:
return self.aliases.unalias_branch_values(self, level)
else:
return level
[docs]
def set_error_handling(self, allow_error):
"""
Set the error handling for all section handlers.
Parameters
----------
allow_error : bool
`True` if errors are permitted during configuration parsing.
Returns
-------
None
"""
self.allow_error = allow_error
for handler in [self.aliases, self.conditions, self.dates,
self.iterations, self.objects, self.fits]:
handler.allow_error = allow_error
[docs]
def set_verbosity(self, verbose):
"""
Set verbose messages, and do the same for all section handlers.
Parameters
----------
verbose : bool
If `True`, will emit certain log messages that may otherwise be
slightly annoying.
Returns
-------
None
"""
self.verbose = verbose
for handler in [self.aliases, self.conditions, self.dates,
self.iterations, self.objects, self.fits]:
handler.verbose = verbose
[docs]
def get_section_handler(self, section_name):
"""
Return the configuration handler object for a specific section.
Note that the FitsOptions handler will not be returned since that
configuration processing is handled in a different manner.
Parameters
----------
section_name : str
The name of the configuration handler to retrieve
Returns
-------
handler : Options or None
Either a :class:`Aliases`, :class:`Conditions`,
:class:`DateRangeOptions`, :class:`IterationsOptions`, or None if
the section name does not reference a valid handler.
"""
section_name = str(section_name).lower().strip()
if section_name == 'aliases':
return self.aliases
elif section_name == 'conditionals':
return self.conditions
elif section_name == 'date':
return self.dates
elif section_name == 'iteration':
return self.iterations
elif section_name == 'object':
return self.objects
else:
return None
[docs]
def parse_to_section(self, key, value):
"""
Parse a key-value as a section update.
Returns `True` if the key maps to a valid section in the configuration.
Parameters
----------
key : str
The key to attempt to parse to a section of the configuration.
value : dict or str
The values to parse to the section.
Returns
-------
parsed_as_section : bool
`True` if the key-value successfully mapped to a section of the
configuration other than the main body, and `False` otherwise.
"""
section_handler = self.get_section_handler(key)
if section_handler is None:
return False
section_handler.update({key: value})
return True
[docs]
def set_instrument(self, instrument):
"""
Set an instrument for the configuration.
Parameters
----------
instrument : str or object
Should be the name of the instrument or an object with the
instrument name in the "name" attribute. This will have the result
of allowing access to configuration files in the <instrument>
directory in the configuration path.
Returns
-------
None
"""
if instrument is None:
return
if isinstance(instrument, str):
self.instrument_name = instrument.strip().lower()
elif hasattr(instrument, 'name'):
self.instrument_name = instrument.name
else:
self.handle_error(
f"Could not parse {type(instrument)} as an instrument.")
@property
def current_iteration(self):
"""
Return the current iteration of the SOFSCAN reduction
Returns
-------
int or None
An integer if the iteration has been set previously, and `None`
otherwise.
"""
return self.iterations.current_iteration
@current_iteration.setter
def current_iteration(self, iteration):
"""
Set the current iteration for the SOFSCAN reduction.
The configuration will be validated if this value is set.
Parameters
----------
iteration : int
The iteration to set.
Returns
-------
None
"""
self.set_iteration(iteration, validate=True)
@property
def max_iteration(self):
"""
Return the maximum number of iterations for the SOFSCAN reduction.
Returns
-------
int or None
An int if the maximum number of iterations has been set and `None`
otherwise.
"""
return self.iterations.max_iteration
@max_iteration.setter
def max_iteration(self, value):
"""
Set the maximum number of iterations for the SOFSCAN reduction.
Parameters
----------
value : int or float or str
The maximum number of iterations. Must be able to be parsed to an
int.
Returns
-------
None
"""
self.parse_key_value('rounds', value)
@property
def blacklisted(self):
"""
Return the current set of blacklisted keywords.
Blacklisted keywords should not be accessible or modifiable by the
reduction.
Returns
-------
set (str)
"""
return self.disabled & self.locked
@property
def preserved_cards(self):
"""
Return the current dictionary of preserved FITS header keyword values.
The preserved FITS header cards are those taken from the original FITS
file and preserved for later use without fear that they will be
modified.
Returns
-------
dict
A dictionary of the form {keyword: (value, comment)}.
"""
if self.fits is None:
return {}
return self.fits.preserved_cards
@property
def user_path(self):
"""
Return the path to the user configuration directory.
Returns
-------
directory : str
"""
return os.path.join(os.path.expanduser('~'), '.sofscan')
@property
def expected_path(self):
"""
Return the path to the expected configuration directory.
If the .sofscan2 directory is not found in the user home directory,
returns the default configuration directory.
Returns
-------
directory : str
"""
user_path = self.user_path
if not os.path.isdir(user_path):
return self.config_path
else: # pragma: no cover
return user_path
[docs]
def resolve_filepath(self, filename):
"""
Return a full file path to the file.
If the file name given is already a complete path, it will be returned.
Otherwise, it will be joined with the `expected_path` and returned.
Note that no check is performed to see if the file exists.
Parameters
----------
filename : str
The full or partial path to a given file.
Returns
-------
full_filepath : str
"""
f = self.aliases.unalias_value(self.options, filename)
if f.startswith(os.sep):
return f
else:
return os.path.join(self.expected_path, f)
[docs]
def get_configuration_filepath(self, key):
"""
Return a file path from the given options.
Note that no check is performed to see if the path is valid or exists.
Parameters
----------
key : str
The configuration key from which to extract the file path.
Returns
-------
filepath : str or None
A string if found in the configuration, and `None` otherwise.
"""
f = self.get_string(key)
if f is None:
return None
return self.resolve_filepath(f)
[docs]
def find_configuration_files(self, filename):
"""
Find all matching files in the SOFSCAN configuration directories.
Checks the base SOFSCAN configuration directory, the user directory,
and the instrument directories for any file matching that which is
given. Not that alias references may be passed in as filenames too.
All output files will exist and be full paths.
Parameters
----------
filename : str
The filename to find.
Returns
-------
files : list (str)
A list of all matching files in the configuration directories.
"""
f = self.aliases.unalias_value(self.options, filename)
user_path = self.expected_path
configuration_files = []
if f.startswith(os.sep) or f.startswith('/'):
# A full file path
if os.path.isfile(f):
configuration_files.append(os.path.abspath(f))
return configuration_files
else:
msg = f'File not found: {filename}.'
if self.verbose:
log.warning(msg)
else:
log.debug(msg)
return configuration_files
base_file = os.path.join(self.config_path, f)
if os.path.isfile(base_file):
configuration_files.append(base_file)
if user_path != self.config_path: # pragma: no cover
user_file = os.path.join(user_path, f)
if (os.path.isfile(user_file)
and user_file not in configuration_files):
configuration_files.append(user_file)
if self.instrument_name is not None:
base_instrument_file = os.path.join(
self.config_path, self.instrument_name, f)
if (os.path.isfile(base_instrument_file)
and base_instrument_file not in configuration_files):
configuration_files.append(base_instrument_file)
if user_path != self.config_path: # pragma: no cover
user_instrument_file = os.path.join(
user_path, self.instrument_name, f)
if (os.path.isfile(user_instrument_file)
and user_instrument_file not in configuration_files):
configuration_files.append(user_instrument_file)
# If not matching files found, check absolute path
if len(configuration_files) == 0:
if self.verbose:
log.warning(f"No matching configuration files for {filename}.")
else:
log.debug(f"No matching configuration files for {filename}.")
return configuration_files
[docs]
def priority_file(self, filename_or_key):
"""
Returns the highest priority matching filename from configuration.
The order of files from lowest to highest priority is:
base_configuration -> user_configuration -> base_instrument ->
user_instrument
The configuration options in higher priority files overrule those in
lower priorities. This returns the highest priority configuration file
available.
Parameters
----------
filename_or_key : str
The name of the file or configuration key. If a matching key is
found in the configuration, it takes priority over the filename.
Returns
-------
str or None
The highest priority matching file, or `None` if not found.
"""
if filename_or_key in self:
filename = self.get_string(filename_or_key)
else:
filename = filename_or_key
matching_files = self.find_configuration_files(filename)
if len(matching_files) == 0:
return None
return matching_files[-1]
[docs]
def update(self, configuration_options):
"""
Update the configuration options with another set of options..
Parameters
----------
configuration_options : dict or ConfigObj
The configuration options to read and parse.
Returns
-------
None
"""
self.read_configuration(configuration_options, validate=True)
[docs]
def read_configuration_file(self, filename, validate=True):
"""
Read and apply a given configuration file.
Parameters
----------
filename : str
The file path to the configuration file.
validate : bool, optional
If `True`, validate the configuration following the read.
Returns
-------
None
"""
if not os.path.isfile(filename):
self.handle_error(f"Not a file: {filename}")
return
if filename in self.config_files:
return
self.config_files.append(filename)
config = ConfigObj(filename)
log.info(f"Reading configuration from {filename}")
self.read_configuration(config, validate=validate)
[docs]
def read_configuration(self, config, validate=True):
"""
Read a given configuration and apply to this Configuration.
Parameters
----------
config : str or dict or ConfigObj or Configuration
The configuration to read and apply. A string value is assumed to
be the file path to a configuration file, and will be read using
:func:`Configuration.read_configuration_file`.
validate : bool, optional
If `True`, validate the configuration following the read. This
will examine and apply all conditionals.
Returns
-------
None
"""
if isinstance(config, str): # is a filename
filenames = self.find_configuration_files(config)
for filename in filenames:
self.read_configuration_file(filename, validate=validate)
return
elif isinstance(config, Configuration):
if not config.enabled:
return
config = config.options
elif not isinstance(config, dict):
msg = (f"Configuration must be of type {str}, {dict}, "
f"or {Configuration}. Received {type(config)}.")
self.handle_error(msg)
return
config = self.normalize_options(config)
self.update_sections(config, validate=False)
self.parse_configuration_body(config)
self.aliases.resolve_configuration(self)
if validate:
self.validate()
self.enabled = True
[docs]
def read_configurations(self, configuration_string, validate=True):
"""
Read and apply multiple configurations.
Parameters
----------
configuration_string : str
A list of comma separated file paths to configuration files.
validate : bool, optional
If `True`, validate the configuration following a read.
Returns
-------
None
"""
configuration_strings = [
s.strip() for s in configuration_string.split(',')]
for configuration_file in configuration_strings:
self.read_configuration(configuration_file, validate=validate)
[docs]
def validate(self):
"""
Apply all conditions to the configuration.
Returns
-------
None
"""
self.conditions.process_conditionals(self)
[docs]
def set_object(self, source_name, validate=True):
"""
Set the source object in the configuration.
Parameters
----------
source_name : str
The name of the astronomical source.
validate : bool, optional
If `True`, validate the configuration once the source has been set.
Returns
-------
None
"""
self.objects.set_object(self, source_name, validate=validate)
[docs]
def set_iteration(self, iteration, validate=True):
"""
Set the current iteration of the reduction in the configuration.
Parameters
----------
iteration : int
The iteration number.
validate : bool, optional
If `True`, validate the configuration once the iteration number has
been set.
Returns
-------
None
"""
self.iterations.set_iteration(self, iteration, validate=validate)
[docs]
def set_date(self, date, validate=True):
"""
Set the observation date in the configuration.
Parameters
----------
date : str or int or float
The observation date. If a string is used, it should be in ISOT
format in UTC scale. Integers and floats will be parsed as
MJD times in the UTC scale.
validate : bool, optional
If `True`, validate the configuration once the date has been set.
Returns
-------
None
"""
self.dates.set_date(self, date, validate=validate)
[docs]
def set_serial(self, serial, validate=True):
"""
Sets options based on a scan serial number.
Parameters
----------
serial : int
validate : bool, optional
If `True`, validate the whole configuration after setting the
serial options.
Returns
-------
None
"""
serial_branch = self.get_branch('serial')
if serial_branch is None:
return
for serial_key, serial_options in serial_branch.items():
serial_range = get_range(serial_key, is_positive=True)
if serial_range.in_range(serial):
self.apply_configuration_options(
serial_options, validate=validate)
[docs]
def apply_configuration_options(self, options, validate=True):
"""
Apply options to the configuration.
The dictionary keys in the options are first examined and separated
into commands, keyword=value settings, and configuration section
updates. Sections will be updated first, followed by single
keyword=value updates. Finally, configuration commands will be
processed using :func:`Configuration.apply_commands`.
Parameters
----------
options : dict or ConfigObj
The options to apply to the configuration.
validate : bool, optional
If `True`, validate the configuration once the options have been
applied.
Returns
-------
None
"""
if not isinstance(options, dict) or len(options) == 0:
return
commands = {}
other = {}
sections = {}
for command, action in options.items():
if command in self.command_keys:
if isinstance(action, list):
commands[command] = action
else:
commands[command] = [action]
elif command in self.section_keys:
sections[command] = action
else:
other[command] = action
self.update_sections(sections, validate=False)
for key, value in other.items():
self.parse_key_value(key, value)
self.apply_commands(commands)
if validate:
self.validate()
[docs]
def update_sections(self, options, validate=True):
"""
Update sections in the configuration.
The configuration aliases, conditions, dates, iterations, and objects
will be updated if present in `options`.
Parameters
----------
options : dict or ConfigObj.
A dictionary used to update the sections.
validate : bool, optional
If `True`, validate the configuration once all sections have been
updated.
Returns
-------
None
"""
for section in [
'aliases', 'date', 'iteration', 'object', 'conditionals']:
self.get_section_handler(section).update(options)
if validate:
self.validate()
[docs]
def parse_configuration_body(self, options):
"""
Parse configuration options and apply.
Will filter out any sectional keys, but set and apply any other
options. Note that configuration commands will always be processed
last.
Parameters
----------
options : dict or ConfigObj
The configuration options to parse.
Returns
-------
None
"""
commands = {}
for key, value in options.items():
if key in self.section_keys:
continue # Sections are handled separately
if key in self.command_keys:
commands[key] = value
continue # Command keys require special handling
self.merge_options({key: value})
self.apply_commands(commands)
[docs]
def apply_commands(self, commands, command_order=None):
"""
Apply configuration commands to the configuration.
Configuration commands are special keywords that instruct the
configuration to perform a certain action such as setting an attribute
for a certain key value like blacklisting it from any use in the
reduction, or locking it's value in place.
Parameters
----------
commands : dict or ConfigObj
The commands to apply in {command_name: actions} format, where
command_name is a string, and actions may be a dict, list or
comma-separated string of actions to apply for the given command
name.
command_order : list (str), optional
The order in which to apply commands. This can be very important
since one might wish to unlock a key, set it's value, and then
finally re-lock it's value in place. If the lock command is
issued first, no update may take place etc.
Returns
-------
None
"""
if command_order is None:
command_order = self.command_order
for command in command_order:
if command not in commands:
continue
keys = commands[command]
if command == 'update':
if isinstance(keys, dict):
for key, value in keys.items():
self.parse_key_value(key, value)
else:
for (key, value) in keys:
self.parse_key_value(key, value)
else:
if isinstance(keys, str):
keys = [s.strip() for s in keys.split(',')]
for key in keys:
self.parse_key_value(command, key)
[docs]
def put(self, *args, check=True):
"""
Set a value in the configuration.
Parameters
----------
args : tuple (str)
The last argument (args[-1]) should always contain the value that
is being set in the configuration. All values prior to that are
considered part of the configuration key. For example,
args=('correlated', 'sky', 'biasgains', '0.3:3') sets
{'correlated': {'sky': {'biasgains': '0.3:3'}}} in the
configuration.
check : bool, optional
If `True`, check that a key is not locked before attempting to set
a value. If it is locked, no changes will be made.
Returns
-------
None
"""
value = args[-1]
key = self.aliases('.'.join(args[:-1]))
if check:
if self.is_locked(key):
return
self.remove_disabled_key(key)
self.merge_options(self.key_value_to_dict(key, value))
[docs]
def remove_disabled_key(self, key):
"""
Enable all disabled branches upto and including key.
Parameters
----------
key : str
Returns
-------
None
"""
set_key = self.matching_set_dot_key(key, self.disabled)
if set_key is None:
return
self.disabled.remove(set_key)
[docs]
def read_fits(self, header_or_file, extension=0, validate=True):
"""
Read contents of a FITS file or header and apply to the configuration.
Parameters
----------
header_or_file : fits.Header or str
A FITS header or the file path to a FITS file to read.
extension : int, optional
If a file path was passed in as the argument, the extension of the
FITS file from which to take the header. The default is the
primary (0) extension.
validate : bool, optional
If `True`, fully validate the configuration once the FITS header
has been read.
Returns
-------
None
"""
self.fits.update_header(header_or_file, extension=extension)
self.merge_fits_options()
if validate:
self.validate()
self.preserve_header_keys()
self.fits.enabled = True
[docs]
def merge_fits_options(self):
"""
Merge the FITS handler options into the main configuration body.
Once a FITS header has been read by the fits handler, this method
creates a copy of those key/values in the main configuration body for
easy access to the values or advanced handling methods.
Returns
-------
None
"""
fits_config = ConfigObj()
fits_config['fits'] = deepcopy(self.fits.options)
self.merge_options(fits_config)
[docs]
@staticmethod
def key_value_to_dict(key, value):
"""
Convert a string key and a given value to a dictionary.
The configuration can use a dot (".") separator to mark dictionary
levels using a string. For example a.b.c = 1 represents [a][b][c] = 1.
Examples
--------
>>> print(Configuration.key_value_to_dict('a.b.c', 1))
{'a': {'b': {'c': 1}}}
Parameters
----------
key : str
The dot key string to convert.
value : str or object
The final assigned value.
Returns
-------
dict
"""
branches = key.split('.')
result = dict()
current = result
for branch in branches[:-1]:
current[branch] = dict()
current = current[branch]
current[branches[-1]] = value
return result
[docs]
@staticmethod
def matching_wildcard(string_array, wildcard_pattern):
"""
Find and return all strings in an array matching a given pattern.
Patterns will be parsed according to :func:`fnmatch.fnmatch`.
Parameters
----------
string_array : list (str)
A list of strings from which to find strings matching
`wildcard_pattern`.
wildcard_pattern : str
The pattern to match using :func:`fnmatch.fnmatch`.
Returns
-------
list (str)
A list of strings matching `wildcard_pattern` from `string_array`.
"""
return [s for s in string_array
if fnmatch.fnmatch(s, wildcard_pattern)]
[docs]
def matching_wildcard_keys(self, wildcard_key, flat_dictionary=None):
"""
Find all matching configuration keys for a given wildcard.
Wildcards are parsed according to :func:`fnmatch.fnmatch`. All
configuration keys available in the main configuration body will be
checked and returned if found.
Parameters
----------
wildcard_key : str
The string pattern to search for using :func:`fnmatch.fnmatch`.
flat_dictionary : dict or ConfigObj, optional
A flat dictionary to search through for matching keys. A flat
dictionary refers to a single level dictionary. If not supplied,
the configuration body is flattened (e.g. [key1][key2] = 1 is
converted to key1.key2 = 1).
Returns
-------
list (str)
A list of dot-separated keys that match `wildcard_key`.
"""
if flat_dictionary is None:
flat_dictionary = self.flatten(self.options)
return self.matching_wildcard(
list(flat_dictionary.keys()), wildcard_key)
[docs]
def flatten(self, options, base=None, basestring='', unalias=True,
keep_value=False):
"""
Transforms a nested dictionary to a single level representation.
A nested dictionary such as {'a': {'b': {'c': 1}}} will be converted
to a dot-separated single level version where dots (".") are used to
mark different dictionary levels ({'a.b.c': 1}).
Parameters
----------
options : dict or ConfigObj
The nested dictionary to flatten.
base : ConfigObj, optional
The base options to update during recursive calls to `flatten`.
This should not be provided by the user during standard use.
basestring : str, optional
The current dot-string branch representation of a dictionary level
during recursive calls to `flatten`. It should not be provided by
the user during standard use.
unalias : bool, optional
If `True`, unalias all keys.
keep_value : bool, optional
If `True`, keep .value settings in the configuration.
Returns
-------
flat_dictionary : ConfigObj
A flattened version of `options`.
"""
new = ConfigObj()
return_base = base is None
if return_base:
base = new
for k, v in options.items():
if unalias:
uk = self.aliases(basestring + k)
else:
uk = basestring + k
if isinstance(v, (dict, ConfigObj)):
if len(v) == 0:
base[uk] = {}
else:
self.flatten(v, base=base, basestring=uk + '.')
else:
if not keep_value:
if uk.endswith('.value'):
base[uk[:-6]] = v
else:
base[uk] = v
else:
base[uk] = v
if return_base:
return base
[docs]
@classmethod
def expand_options(cls, options):
"""
Convert a potentially dot-separated dictionary to multi-level.
Note that no aliasing, checking of values or any other configuration
options are performed.
Examples
--------
>>> options = {'write.source': False, 'foo.bar.baz': 1}
>>> print(Configuration.expand_options(options))
{'write': {'source': False}, 'foo': {'bar': {'baz': 1}}}
Parameters
----------
options : ConfigObj or dict
The options to expand.
Returns
-------
expanded_options : ConfigObj
The options expanded without any dot-separators.
"""
expanded = ConfigObj()
for key, value in options.items():
branch = cls.key_value_to_dict(key, value)
expanded.merge(branch)
return expanded
[docs]
def normalize_options(self, options):
"""
Convert a dictionary prior to merging into the configuration.
Converts options to a standard multi-level dictionary with all string
values. No aliasing is performed.
Parameters
----------
options : dict or ConfigObj
The options to normalize.
Returns
-------
normalized_options : ConfigObj
The normalized options.
"""
return self.expand_options(self.stringify(options))
[docs]
def merge_options(self, merge_options, options=None, chain=None,
keep_disabled=False):
"""
Merge a set of options into currently existing options.
Merges `merge_options` into `options` using rules and settings from the
:class:`Configuration`. Sectional keys such as 'iteration' or
'conditionals' will be parsed by *this* Configuration and validated.
Note that final validation on the configuration body is not performed
and should be done so manually.
FITS header options are *NOT* handled here, and will essentially be
skipped over. However, fits instructions that exist in the
configuration will be parsed. FITS header should instead be read in by
:func:`Configuration.read_fits` and added via
:func:`Configuration.merge_fits_options`.
Parameters
----------
merge_options : dict or ConfigObj
The new options that should be merged into `options`.
options : dict or ConfigObj, optional
The options which to merge `merge_options` into. Generally, this
should not be supplied by the user and defaults to the main
configuration body of options. When supplied, it is usually for
recursive calls.
chain : list (str), optional
A list of previously encountered dictionary levels for use during
recursive calls. This should generally not be supplied by the
user.
keep_disabled : bool, optional
If `True`, allow a configuration value to be changed, but never
remove it from the current set of disabled keys.
Returns
-------
None
"""
if options is None:
options = self.options
if chain is None:
chain = []
in_handler = False
else:
chain = chain.copy()
# keys can sometimes relate to handlers such as 'fits.OBJECT'
if len(chain) > 0:
in_handler = str(chain[0]).lower().strip() in self.handler_keys
else: # pragma: no cover
# Just in case
in_handler = False
barred_branches = self.locked | self.blacklisted
for key, value in merge_options.items():
new_chain = chain + [key]
dot_key = '.'.join(new_chain)
if self.dot_key_in_set(dot_key, barred_branches):
continue
if not keep_disabled:
self.remove_disabled_key(dot_key)
# Attempt to parse to a section of the options.
if not in_handler and self.parse_to_section(key, value):
continue
# Add in the key-value if not already present.
if key not in options:
options.merge({key: value})
continue
# Merge into the existing options.
options_value = options[key]
if isinstance(options_value, dict):
if isinstance(value, dict): # Recursive
self.merge_options(value, options=options_value,
chain=new_chain)
else:
options_value['value'] = value
else:
if isinstance(value, dict):
options[key] = {'value': options_value}
options[key].merge(value)
else:
options[key] = value
[docs]
def dot_key_in_set(self, key, test_set):
"""
Check if a dot-separated key exists in a given set of strings.
The `key` will be unaliased before any comparison is attempted.
Parameters
----------
key : str
The key to test.
test_set : iterable (str)
A list, set, tuple, etc. of strings. If the unaliased `key` exists
in the set, returns `True`.
Returns
-------
bool
"""
return self.matching_set_dot_key(key, test_set) is not None
[docs]
def matching_set_dot_key(self, key, test_set):
"""
Return the dotted key from a test set that relates to `key`.
Parameters
----------
key : str
The key to find.
test_set : iterable (str)
A list, set, tuple, etc. of strings. If the unaliased `key` exists
in the set, returns `True`.
Returns
-------
str or None
A string if found, and `None` otherwise.
"""
check_string = ''
uk = self.aliases(key)
for k in uk.split('.'):
if check_string == '':
check_string = k
else:
check_string += '.' + k
if check_string in test_set:
return check_string
return None
[docs]
def parse_key_value(self, key, value):
r"""
Parse and apply a key-value pair into the configuration.
This method will firstly unalias any given key. Note that wildcards
may be provided ('\*') to apply `value` to all matching keys found in
the configuration. For example, if both "key1" and "key2" exist in
the configuration, Configuration.parse_key_value("key\*", "abc") will
set the value of both to "abc".
If `key` relates to a configuration command it will be applied to any
value found in `value`. For example,
Configuration.parse_key_value('lock', ['key1', 'key2']) will lock
key1 and key2. Finally, if the `key` was not a command, it will be
processed using :func:`Configuration.put` ->
:func:`Configuration.merge` which will add the key-value to the
configuration body, or process it for a certain configuration section.
Parameters
----------
key : str
The configuration key for which `value` applies. This may be an
actual configuration key, configuration command, or section.
Wildcards ('\*') may be used to apply `value` to more than one
configuration key.
value : str or dict or iterable
The value to apply for `key` in the configuration.
Returns
-------
None
"""
key = self.aliases(key)
# Recursive call if wildcard used
flat_config = None
if '*' in key:
flat_config = self.flatten(self.options)
matching_keys = self.matching_wildcard_keys(
key, flat_dictionary=flat_config)
for k in matching_keys:
self.parse_key_value(k, value) # Recursive
elif key in self.command_keys:
command = key
if isinstance(value, list):
keys = value
else:
# The command applied to numerous keys
keys = str(value).split(',')
for applied_to_key in keys:
if '*' in applied_to_key:
matching_keys = self.matching_wildcard_keys(
applied_to_key, flat_dictionary=flat_config)
for k in matching_keys:
self.parse_key_value(command, k)
if key == 'config':
self.read_configurations(value, validate=True)
elif command == 'blacklist':
self.blacklist(applied_to_key)
elif command == 'whitelist':
self.whitelist(applied_to_key)
elif command == 'forget':
self.forget(applied_to_key)
elif command == 'recall':
self.recall(applied_to_key)
elif command == 'lock':
self.lock(applied_to_key)
elif command == 'unlock':
self.unlock(applied_to_key)
elif command == 'add':
self.add_new_branch(applied_to_key)
elif command == 'rounds':
self.iterations.max_iteration = int(applied_to_key)
else: # pragma: no cover
self.handle_error(f"Command ({key}) not implemented.")
else:
self.put(key, value)
[docs]
def blacklist(self, key):
"""
Blacklists a given key in the configuration.
Once a configuration key has been blacklisted, it will be unavailable
for retrieval or setting throughout the entire reduction via standard
configuration functions. This is a safe way to ensure certain settings
are never applied.
Parameters
----------
key : str
The configuration key to blacklist.
Returns
-------
None
"""
uk = self.aliases(key)
if uk in self.blacklisted:
return
if uk in self.locked:
if self.verbose:
log.warning(f"Cannot blacklist locked option: {key}")
else:
log.debug(f"Cannot blacklist locked option: {key}")
return
self.disabled.add(uk)
self.locked.add(uk)
[docs]
def whitelist(self, key):
"""
Whitelist a configuration key.
Whitelisting a configuration key removes it from the blacklist if
set and unlocks it. It will still be disabled however. Locked options
cannot be whitelisted.
Parameters
----------
key : str
The configuration key to whitelist.
Returns
-------
None
"""
uk = self.aliases(key)
if uk in self.locked:
if uk in self.blacklisted:
self.locked.remove(uk)
elif self.verbose:
log.warning(f"Cannot whitelist locked option: {key}")
else:
log.debug(f"Cannot whitelist locked option: {key}")
[docs]
def lock(self, key):
"""
Lock a given configuration key.
Once a configuration key has been locked, its current configuration
settings cannot be altered via standard configuration functions.
Parameters
----------
key : str
The configuration key to lock.
Returns
-------
None
"""
uk = self.aliases(key)
if uk in self.locked:
return
self.locked.add(uk)
[docs]
def unlock(self, key):
"""
Unlock a configuration key.
If a configuration key was previously locked, unlock that key and allow
subsequent changes to it's status or value in the configuration. Note
that blacklisted keys cannot be unlocked.
Parameters
----------
key : str
The configuration key to unlock.
Returns
-------
None
"""
uk = self.aliases(key)
if not self.is_blacklisted(uk):
try:
self.locked.remove(uk)
except KeyError:
pass
[docs]
def forget(self, key):
"""
Forget a key in the configuration.
Disables a given key in the configuration such that it's value or
status may not be altered or retrieved. If key is "blacklist" or
"conditions", all blacklisted keywords or conditions in the
configuration will be removed.
Parameters
----------
key : str
The configuration key to forget.
Returns
-------
None
"""
if self.is_locked(key) and not self.is_blacklisted(key):
if self.verbose:
log.warning(f"Cannot forget locked option: {key}")
else:
log.debug(f"Cannot forget locked option: {key}")
return
if key == 'blacklist':
for k in self.blacklisted:
self.whitelist(k)
return
if key == 'conditions':
self.conditions.clear()
return
self.disabled.add(self.aliases(key))
[docs]
def recall(self, key):
"""
Recall (remember) a given key in the configuration.
Performs a reverse `forget` operation on the given keyword so that it's
value may be retrieved from the configuration or changed if applicable.
Parameters
----------
key : str
The configuration key to recall.
Returns
-------
None
"""
if self.is_locked(key):
if self.verbose:
log.warning(f"Cannot recall locked option: {key}")
else:
log.debug(f"Cannot recall locked option: {key}")
else:
try:
self.disabled.remove(self.aliases(key))
except KeyError:
pass
[docs]
def has_option(self, key):
"""
Check if a key is available in the configuration.
Parameters
----------
key : str
The configuration key to check.
Returns
-------
available : bool
"""
return self.aliases(key) in self
[docs]
def set_option(self, key, value=True):
"""
Set an option in the configuration.
The option will always be added as a branch. I.e, a dictionary in the
configuration options as {`key`: {value: `value`}}. This is in
contrast to :func:`Configuration.put` which will place the option
as {`key`: `value`} in certain instances.
Parameters
----------
key : str
The configuration key to set.
value : object, optional
The value to set in the configuration.
Returns
-------
None
"""
self.add_new_branch(self.aliases(key), value)
[docs]
def get_options(self, *args, default=None, unalias=True):
"""
Retrieve the configuration options for a given key.
The options refer to a specific branch of the configuration and must
retrieve a dictionary like value. If the key refers to a singular
configuration value, the default will be returned instead.
Parameters
----------
args : tuple (str)
The configuration key levels such as ['correlated', 'sky'].
default : object, optional
The value to return if the options could not be retrieved from the
configuration.
unalias : bool, optional
If `True`, unalias all keys in `args` before attempting to retrieve
a value.
Returns
-------
options : dict or str or object
The configuration branch or value if the options were found, or
the `default` value otherwise. The returned options will be a copy
of the configuration values, so changing these will have no impact
on the actual configuration.
"""
level = self.get_branch(*args, default=default, unalias=unalias)
if not isinstance(level, dict):
return default
result = deepcopy(level)
if 'value' in result:
del result['value']
return result
[docs]
def is_locked(self, key):
"""
Return if a configuration key is currently locked.
Parameters
----------
key : str
The configuration key to check.
Returns
-------
locked : bool
"""
return (self.dot_key_in_set(key, self.locked)
or self.dot_key_in_set(f'{key}.value', self.locked))
[docs]
def is_disabled(self, key):
"""
Return if a configuration key is currently disabled.
Parameters
----------
key : str
The configuration key to check.
Returns
-------
disabled : bool
"""
return self.dot_key_in_set(key, self.disabled)
[docs]
def is_blacklisted(self, key):
"""
Return if a configuration key is blacklisted.
Parameters
----------
key : str
The configuration key to check.
Returns
-------
blacklisted : bool
"""
return self.dot_key_in_set(key, self.disabled & self.locked)
[docs]
def add_new_branch(self, key, value=True):
"""
Add a new branch to the configuration.
Parameters
----------
key : str
The path to the configuration branch to add. Branch levels should
be separated by a '.'. Note that if `key` is of the form
"my_key_to_set=my_value", "my_value" will be used in place of
`value`.
value : str or object
The value to set in the configuration.
Returns
-------
None
"""
if '=' in key:
key, set_value = [s.strip() for s in key.split('=')]
else:
set_value = value
branch_value_key = key + '.value'
self.put(branch_value_key, set_value)
[docs]
def get_keys(self, branch_name=None):
"""
Return a list of all keys in a given options branch.
Parameters
----------
branch_name : str, optional
The name of the options branch in the configuration. If not
supplied, return all first level active configuration branches.
Returns
-------
keys : list (str)
"""
if branch_name is None:
branch_keys = []
for key in self.options.keys():
keys = self.get_keys(key)
if keys is not None:
branch_keys.append(key)
return branch_keys
branch = self.get_branch(branch_name, default=None)
if not isinstance(branch, dict):
return None
return list(branch.keys())
[docs]
def get_filepath(self, key, default=None, get_all=False):
"""
Return the file path for a key value in the configuration.
Parameters
----------
key : str
The name of the configuration key in which the file is referenced.
default : str or object, optional
The default file to look for if not found in the configuration.
get_all : bool, optional
If `True`, return all matching file paths for a value found in the
configuration. If `False`, only return a single highest priority
file.
Returns
-------
str or list (str) or None
The highest priority file or a list of all found matching files.
`None` will be returned if no file can be found.
"""
value = self.get_string(key, default=default)
if value is None:
return None
value = self.find_configuration_files(value)
if len(value) == 0:
return None
if get_all:
return value # a list of all filename matches
else:
return value[-1] # highest priority file
[docs]
def purge(self, key):
"""
Completely remove a key from the configuration.
Parameters
----------
key : str
Returns
-------
None
"""
if not self.has_option(key):
return
key = self.aliases(key)
branches = key.split('.')
options = self.options
seen = []
for branch in branches:
seen.append(branch)
if '.'.join(seen) == key:
del options[branch]
break
options = options[branch]
[docs]
def get_flat_alphabetical(self, options=None, unalias=True,
keep_value=False):
"""
Return all configuration keys in a flattened form.
The output from this method is a single-level dictionary containing
flattened keys and values. A flattened key uses dot-separators to
distinguish dictionary levels. E.g., a.b.c:value means {a:b:c:value}.
Keys will be ordered alphabetically.
Parameters
----------
options : ConfigObj or dict, optional
The options to retrieve alphabetical keys. The default are the
configuration options.
unalias : bool, optional
If `True`, unalias all keys.
keep_value : bool, optional
If `True`, keep .value keys in the configuration.
Returns
-------
dict
"""
if options is None:
options = self.options
flat_options = self.flatten(options, unalias=unalias,
keep_value=keep_value)
keys = list(sorted(flat_options.keys()))
result = {}
for key in keys:
result[key] = flat_options[key]
return result
[docs]
def get_active_options(self, options=None):
"""
Return all options that are not disabled in the configuration.
Parameters
----------
options : ConfigObj or dict, optional
The options from which to prune disabled values/branches.
Returns
-------
enabled_options : ConfigObj
"""
if options is None:
options = deepcopy(self.options)
flat_options = self.get_flat_alphabetical(options, keep_value=True)
keep = {}
for key, value in flat_options.items():
if not self.is_disabled(key):
keep[key] = value
return self.expand_options(keep)
[docs]
def order_options(self, options=None, unalias=False):
"""
Order the keys in options alphabetically.
Parameters
----------
options : ConfigObj or dict, options
The options to order. If not supplied defaults to the
configuration options.
unalias : bool, optional
If `True`, unalias all keys.
Returns
-------
ordered_options : ConfigObj
"""
if options is None:
options = deepcopy(self.options)
return self.expand_options(
self.get_flat_alphabetical(
options, keep_value=True, unalias=unalias))
[docs]
def set_outpath(self):
"""
Set the output directory based on the configuration.
If the configuration path does not exist, it will be created if the
'outpath.create' option is set. Otherwise, an error will be raised.
Returns
-------
None
"""
self.work_path = self.get_configuration_filepath('outpath')
if self.work_path is None:
self.work_path = os.getcwd()
if not os.path.isdir(self.work_path):
log.warning(f"The specified output path does not exist: "
f"{self.work_path}")
if not self.get_bool('outpath.create'):
log.error("Change 'outpath' to an existing directory, or "
"set 'outpath.create' to create a path "
"automatically.")
raise ValueError(f"Specified output path does not exist: "
f"{self.work_path}")
log.info(f"Creating output directory: {self.work_path}")
os.makedirs(self.work_path)
[docs]
def configuration_difference(self, config):
"""
Return the difference between this configuration and another.
Parameters
----------
config : Configuration
Returns
-------
difference : Configuration
"""
difference = self.copy()
difference.options = difference.options.__class__()
flat = self.flatten(self.options)
for key in flat.keys():
if not self.is_configured(key):
continue
value1 = self[key]
if not config.is_configured(key):
difference.parse_key_value(key, value1)
continue
value2 = config[key]
if value1 != value2:
difference.parse_key_value(key, value1)
# Now the other sections
difference.fits.options = dict_difference(
self.fits.options, config.fits.options)
difference.conditions.options = dict_difference(
self.conditions.options, config.conditions.options)
difference.dates.options = dict_difference(
self.dates.options, config.dates.options)
difference.iterations.options = dict_difference(
self.iterations.options, config.iterations.options)
difference.objects.options = dict_difference(
self.objects.options, config.objects.options)
difference.aliases.options = dict_difference(
self.aliases.options, config.aliases.options)
return difference
[docs]
def lock_rounds(self, max_rounds=None):
"""
Lock the number of rounds in-place for the reduction.
Parameters
----------
max_rounds : int or str or float, optional
The new maximum number of rounds for the reduction.
Returns
-------
None
"""
self.iterations.lock_rounds(maximum_iterations=max_rounds)
[docs]
def check_trigger(self, trigger):
"""
Check to see if the requirement for a trigger has been fulfilled.
Parameters
----------
trigger : str
A trigger of the form <key> or <key><operator><value>. If a single
key is provided, the trigger is `True` so long as the bool value
for the key evaluates as such. Otherwise, the value for <key> will
be evaluated using <operator> (=, !=, <, <=, >, >=) agains <value>.
Returns
-------
bool
"""
return self.conditions.check_requirement(self, trigger)