[docs]
class Model(object):
"""
Base model Class for fitting N-dimensional data
Attributes
----------
stats : argparse.Namespace
Contains the following statistics:
samples : numpy.ndarray (ndim, nsamples)
The samples used to fit the polynomial. samples[:-1]
contain the independent variables for each dimension
and samples[-1] contains the dependent variables.
Note that all NaNs will have been stripped from the
original input samples.
ndata : int
The number of samples used to fit the polynomial
fit : numpy.array (nsamples,)
The fitted polynomial over the original sample points
residuals : numpy.ndarray (nsamples,)
The residual of fit(data) - data
sigma : numpy.ndarray (ncoeffs,)
The error of each polynomial coefficient (will only
be calculated if a covariance matrix exists)
dof : int
Degrees of Freedom of the fit
rms : float
Root Mean Square deviation of the fit
chi2 : float
Chi-Squared
rchi2 : float
Reduced Chi-Squared
q : float
Goodness of fit, or survival function. The probability (0->1)
that one of the `samples` is greater than `chi2` away from the
`fit`.
"""
def __init__(self, *args, error=1, mask=None, covar=True, stats=True,
robust=0, eps=0.01, maxiter=100, ignorenans=True,
fit_kwargs=None, eval_kwargs=None):
self._samples = None
self._error = None
self._interpolated_error = None
self._usermask = None
self._model_args = None
self._model_kwargs = None
self._initial_shape = None
self._ignorenans = ignorenans
self.termination = None
self._parse_args(error, mask, *args)
self._parse_model_args()
self._ndim, self._nsamples = self._samples.shape
self._ndim -= 1 # only include independent variables
self._nparam = None
self._fit_kwargs = fit_kwargs
self._eval_kwargs = eval_kwargs
self.success = False
self.covar = covar
self.covariance = None
self.mask = self._usermask
self.robust = robust
self.stats = None
self.dostats = stats or self.robust > 0
self.maxiter = maxiter
self.eps = eps
self.initial_fit()
self._iteration = 1
self._state = "initial fit"
if self.robust > 0: # pragma: no cover
if self.covar and self.covariance is not None:
covariance1 = self.covariance.copy()
else:
covariance1 = None
self._iterate()
if self._iteration == 1 and covar and self.success:
self.covariance = covariance1
else:
self.refit_mask(self.mask, covar=True)
@property
def state(self):
return self._state
@property
def error(self):
"""Don't create the error unless asked for or already present
This is for the errors of the samples only
"""
if self._interpolated_error is None:
if hasattr(self._error, '__len__'):
self._interpolated_error = self._error
else:
self._interpolated_error = np.full(
self.mask.shape, float(self._error))
return self.reshape(self._interpolated_error)
[docs]
def reshape(self, flattened_array, copy=True):
array = flattened_array.reshape(self._initial_shape)
return array.copy() if copy else array
def __repr__(self):
return "%s (%i features, %s parameters)" % (
self.__class__.__name__, self._ndim, self._nparam)
def __str__(self):
s = "Name: %s\n" % self.__class__.__name__
s += self._stats_string()
s += self._parameters_string()
return s
def _parameters_string(self):
"""Place holder for model parameters"""
return ''
def _stats_string(self):
s = ''
if self.stats is not None:
n_in = self.mask.size
n_nan = self._samples.shape[1] - n_in
n_outliers = np.sum(~self.mask) - n_nan
s += "\n Statistics"
s += "\n--------------------------------"
s += "\nNumber of original points : %i" % n_in
s += "\n Number of NaNs : %i" % n_nan
s += "\n Number of outliers : %i" % n_outliers
s += "\n Number of points fit : %i" % self.mask.sum()
s += "\n Degrees of freedom : %i" % self.stats.dof
s += "\n Chi-Squared : %f" % self.stats.chi2
s += "\n Reduced Chi-Squared : %f" % self.stats.rchi2
s += "\n Goodness-of-fit (Q) : %f" % self.stats.q
s += "\n RMS deviation of fit : %f" % self.stats.rms
if self.robust > 0:
s += '\n Outlier sigma threshold : %s' % self.robust
s += '\n eps (delta_sigma/sigma) : %s' % self.eps
s += '\n Iterations : %s' % self._iteration
s += '\n Iteration termination : %s' % self.termination
s += '\n'
return s
[docs]
def print_stats(self):
"""
Print statistical information on the fit to stdout.
Returns
-------
None
"""
print(self._stats_string())
[docs]
def print_params(self):
"""
Print parameters to stdout.
Returns
-------
None
"""
print(self._parameters_string())
[docs]
def __call__(self, *independent_values, dovar=False): # pragma: no cover
"""
Evaluate the model
Parameters
----------
samples : n-tuple of array_like (shape)
n-features of independent variables.
dovar : bool, optional
If True return the variance of the fit in addition to the
fit.
Returns
-------
fit, [variance] : numpy.ndarray (shape), [numpy.ndarray (shape)]
The output fit and optionally, the variance.
"""
if len(independent_values) != self._ndim:
raise ValueError(
"Require %i features of independent values" % self._ndim)
v = stack(*independent_values)
r = self.evaluate(v, dovar=dovar)
fit, var = (r[0], r[1]) if dovar else (r, None)
isarr = hasattr(independent_values[0], '__len__')
havevar = var is not None
test = np.asarray(independent_values[0])
if test.ndim > 1:
shape = test.shape
fit = fit.reshape(shape)
if havevar:
var = var.reshape(shape)
elif not isarr:
fit = fit[0]
if havevar:
var = var[0]
return (fit, var) if dovar else fit
@staticmethod
def _create_coordinates(*args):
nargs = len(args)
if nargs < 2:
raise ValueError(
"Require at least 2 arguments (f(x), model_args)")
if nargs >= 3: # have coorindates as args[0]
return args
shape = np.asarray(args[-2]).shape
c = np.meshgrid(*(np.arange(s, dtype=float) for s in shape),
indexing='ij')
return tuple(c) + args
def _parse_args(self, error, mask, *args):
args = self._create_coordinates(*args)
nargs = len(args)
self._ndim = nargs - 2
self._initial_shape = np.asarray(args[-2]).shape
self._samples = stack(*args[:-1])
self._model_args = args[-1]
nsamples = self._samples.shape[1]
if hasattr(error, '__len__'):
error = np.asarray(error).astype(float).ravel()
if error.size != nsamples:
raise ValueError("Error size does not match number of samples")
else:
try:
error = float(error)
except (ValueError, TypeError):
error = 1.0
self._error = error
if self._ignorenans:
nanmask = remove_sample_nans(self._samples, error, mask=True)
else:
nanmask = np.full(self._samples[-1].shape, True)
if hasattr(mask, '__len__'):
usermask = np.asarray(mask).astype(bool).ravel()
if mask.size != nsamples:
raise ValueError("Mask size does not match number of samples")
usermask &= nanmask
else:
usermask = nanmask # no copy
self._usermask = usermask
def _fit_statistics(self): # pragma: no cover
if not self.dostats:
return
self.stats = Namespace()
stats = self.stats
n0 = self.mask.size
mask = self.mask & self._usermask
stats.n = n = mask.sum()
if n > 1:
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
stats.dof = stats.n - self._nparam
stats.fit = self.evaluate(self._samples[:-1], dovar=False)
stats.residuals = self._samples[-1] - stats.fit
r = stats.residuals.compress(mask)
if self.covariance is not None:
stats.sigma = np.sqrt(np.diag(self.covariance))
else:
stats.sigma = None
stats.rms = bn.nanstd(r) * np.sqrt(n / (n - 1))
stats.chi2 = bn.nansum((r / self.error.ravel()[mask]) ** 2)
if stats.dof != 0:
stats.rchi2 = stats.chi2 / stats.dof
else:
stats.rchi2 = np.inf
stats.q = 1 - chi2.sf(stats.chi2, stats.dof)
else:
stats.fit = np.full(n0, np.nan)
stats.residuals = np.full(n0, np.nan)
stats.dof = 0
stats.rms = stats.chi2 = stats.rchi2 = stats.q = np.nan
def _iterate(self): # pragma: no cover
"""
Iterates to refine the polynomial fit
1. Identify outliers in the residuals of data - fit
2. Re-fit the polynomial excluding all outliers
3. Goto 1.
The iteration is terminated after a set number of iterations or
the relative delta between successive residual RMS values is less
than a set value.
Notes
-----
The covariance calculation is skipped which results in a speed
increase in most cases. However, if the covariance is required,
and only 1 iteration occurs, this results in a slight speed
decrease.
"""
self._iteration = 1
if self.robust <= 0:
return
self.termination = "initial"
last_rms = self.stats.rms
relative_delta = np.inf
min_valid_points = max([2, self._nparam])
for _ in range(self._iteration, self.maxiter):
if last_rms <= np.finfo(float).eps: # pragma: no cover
self.termination = ("solution found to within %s precision"
% float)
break
elif relative_delta < self.eps: # pragma: no cover
self.termination = "delta_rms/rms = %f" % relative_delta
break
last_rms = self.stats.rms
mask = find_outliers(self.stats.residuals, threshold=self.robust)
if mask.sum() < min_valid_points:
self.termination = "insufficient samples remain"
self.mask.fill(False)
self.success = False
break
elif np.allclose(self.mask, mask):
self.termination = "delta_rms = 0"
break
self.refit_mask(mask, covar=False)
self._iteration += 1
if not self.success: # pragma: no cover
self.termination = "fit failed"
break
delta = abs(self.stats.rms - last_rms)
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
relative_delta = delta / last_rms
else: # pragma: no cover
self.termination = "reached maximum iterations"
def _parse_model_args(self): # pragma: no cover
"""Place holder to perform operation on model arguments"""
pass
[docs]
def initial_fit(self): # pragma: no cover
"""Place holder"""
pass
[docs]
def refit_mask(self, mask, covar=False): # pragma: no cover
"""Place holder"""
pass
[docs]
def evaluate(self, samples, dovar=False): # pragma: no cover
"""Place holder"""
if True is not False:
raise ValueError("Create this method")
elif self.stats or samples or dovar:
return []