# Licensed under a 3-clause BSD style license - see LICENSE.rst
import configobj
import re
from sofia_redux.scan.configuration.options import Options
__all__ = ['Conditions']
[docs]
class Conditions(Options):
append_keys = ('blacklist', 'whitelist', 'forget', 'recall',
'lock', 'unlock', 'add', 'config')
def __init__(self, allow_error=False, verbose=True):
"""
Initialize the configuration conditions.
The configuration conditions are a set of requirements to check for in
the configuration, and actions to perform if those conditions are met.
Processing conditions is generally a recursive operation since applying
actions may subsequently trigger other condition requirements.
Parameters
----------
allow_error : bool, optional
If `True`, allow poorly formatted options to be skipped rather than
raising an error.
verbose : bool, optional
If `True`, issues a warning when a poorly option is encountered.
"""
super().__init__(allow_error=allow_error, verbose=verbose)
def __len__(self):
"""
Return the number of conditions available.
Returns
-------
int
"""
if not hasattr(self.options, '__len__'):
return 0
return len(self.options)
def __setitem__(self, requirement, options):
"""
Set the options for a given requirement.
Parameters
----------
requirement : str
The requirement for the condition to be met. Should be of the form
<key><operator><value>.
options : str or dict or configobj.ConfigObj
The options to apply once the condition is met. If a single string
is provided, it should be of the form {<command_or_key>=<value>} or
<value>. If only <value> is provided, the associated command will
be changed to add=<value>.
Returns
-------
None
"""
self.set(requirement, options)
def __str__(self):
"""
Return a string representation of the configuration.
Returns
-------
str
"""
size = self.size
return f'Contains {size} condition{"s" if size != 1 else ""}.'
def __repr__(self):
"""
Return a string representation of the configuration.
Returns
-------
str
"""
return super().__repr__() + f' {self}'
@property
def size(self):
"""
Return the number of conditions available.
Returns
-------
int
"""
return self.__len__()
[docs]
def copy(self):
"""
Return a copy of the conditions.
Returns
-------
Conditions
"""
return super().copy()
[docs]
def set(self, requirement, options):
"""
Set a condition in the options.
Parameters
----------
requirement : str
Typically a requirement of the form key=value.
options : dict or ConfigObj
The options to apply if the condition is met.
Returns
-------
None
"""
condition_options = self.options_to_dict(options)
if condition_options is None:
self.handle_error(f"Could not parse condition "
f"[{requirement}]: {options}")
return
if requirement not in self.options:
self.options[requirement] = configobj.ConfigObj()
self.merge_options(self.options[requirement], condition_options)
[docs]
def update(self, configuration_options):
"""
Update the stored conditions with those from another configuration.
Parameters
----------
configuration_options : dict or ConfigObj
The configuration options to read and parse.
Returns
-------
None
"""
if 'conditionals' not in configuration_options:
return
options = configuration_options['conditionals']
if not isinstance(options, dict):
self.handle_error(f"Supplied conditionals must be {dict} type.")
return
for key, value in options.items():
self.set(key, value)
[docs]
def check_requirement(self, configuration, requirement):
r"""
Checks requirements and returns if met
Conditions must be of the form <thing><operator><required_value>,
where operator must be one of '=', '!=', '<', '<=', '>', '>='.
Alternatively, a single value may be supplied. If set in the
configuration, True is returned
Parameters
----------
configuration : Configuration
The configuration in which to check the requirement.
requirement : str
The requirement to check.
Returns
-------
bool
"""
if re.search(r'(?<![<!>])=', requirement): # equals
operator = '='
elif re.search(r'<(?!=)', requirement): # less than
operator = '<'
elif re.search(r'>(?!=)', requirement): # greater than
operator = '>'
elif '!=' in requirement:
operator = '!='
elif '<=' in requirement:
operator = '<='
elif '>=' in requirement:
operator = '>='
else:
operator = None
# If there is no operator, assume it is a boolean switch.
if operator is None:
return configuration.get_bool(requirement.strip(), default=False)
s = [s.strip() for s in requirement.split(operator)]
s = [x for x in s if x != '']
if len(s) != 2:
self.handle_error(f"Bad conditional requirement: {requirement}")
return False
key, test_value = s
value = configuration.get(key, default=None)
if value is None:
return False
value = str(value).strip()
if operator == '=' or operator == '==':
try:
value = float(value)
test_value = float(test_value)
return value == test_value
except (ValueError, TypeError):
return value == test_value
if operator == '!=':
try:
value = float(value)
test_value = float(test_value)
return value != test_value
except (ValueError, TypeError):
return value != test_value
try:
value = float(value)
test_value = float(test_value)
except (ValueError, TypeError):
return False
try:
return eval('%f %s %f' % (value, operator, test_value))
except (ValueError, TypeError): # pragma: no cover
return False
[docs]
def get_met_conditions(self, configuration):
"""
Return the actions for met conditions.
Check a configuration with all conditions and return those that are
fulfilled. Conditions in the options that follow the standard
format of {requirement (str): actions (dict)} will have the requirement
checked with the configuration. However, condition options that are of
the form {requirement (str): actions (str)} will always be parsed
assuming that the requirement is met due to the complexities of the
configuration structure and always returned in the output options.
Parameters
----------
configuration : Configuration
Returns
-------
actions : dict
A dict of form {requirement: actions}.
"""
apply_actions = {}
for requirement, actions in self.options.items():
if isinstance(actions, dict):
requirement_met = self.check_requirement(
configuration, requirement)
else:
# in rare instances there is no requirement
requirement_met = True
s = [s.strip() for s in actions.split('=')]
s = [x for x in s if x != '']
if len(s) != 2:
self.handle_error(f"Bad condition: {actions}")
continue
actions = {s[0]: s[1]}
if not requirement_met:
continue
if requirement not in apply_actions:
apply_actions[requirement] = []
requirement_actions = apply_actions[requirement]
if actions not in requirement_actions:
requirement_actions.append(actions)
return apply_actions
[docs]
def process_conditionals(self, configuration, seen=None):
"""
Process all conditions until no further changes are required.
Parameters
----------
configuration : Configuration
seen : set, optional
A set of previously applied conditions and actions. Each member
should be a tuple of the form (requirement, command, action).
Returns
-------
None
"""
if seen is None:
seen = set([])
while self.update_configuration(configuration, seen=seen):
pass
[docs]
def update_configuration(self, configuration, seen=None):
"""
Update the configuration with any met conditions.
Parameters
----------
configuration : Configuration
seen : set, optional
A set of previously applied conditions and actions
Returns
-------
updated : bool
`True` if the configuration was updated, and `False` otherwise.
"""
if seen is None:
seen = set([])
apply_actions = self.get_met_conditions(configuration)
if len(apply_actions) == 0:
return False
contains_update = False
for requirement, actions in apply_actions.items():
configuration.applied_conditions.add(requirement)
commands = {}
for action in actions:
for key, value in action.items():
str_val = str(value)
check_command = (requirement, key, str_val)
if check_command in seen:
continue
contains_update = True
seen.add(check_command)
if key in configuration.command_keys:
if key in commands:
commands[key].append(value) # pragma: no cover
else:
commands[key] = [value]
else:
if 'update' not in commands:
commands['update'] = {key: value}
else:
commands['update'][key] = value
configuration.apply_commands(commands)
return contains_update