"""Module that deals with the calculations of likelihoods."""
import numpy as np
from scipy import optimize, stats
from scipy.misc import derivative
# Use this function/object for parallelization where possible
mapper = map
# Empty slices
_SLICE = slice(None)
[docs]class JeffreysPrior:
"""Universal non-informative prior for use in Bayesian MCMC analysis.
Parameters
----------
response_matrix : ndarray
Response matrix that translates truth into reco bins.
Can be an array of matrices.
translation_function : function
The function to translate a vector of parameters into a vector of truth
expectation values::
truth_vector = translation_function(parameter_vector)
It must support translating arrays of parameter vectors into arrays of
truth vectors::
[truth_vector, ...] = translation_function([parameter_vector, ...])
parameter_limits : iterable of tuple of floats
An iterable of lower and upper limits of the hypothesis' parameters.
The number of limits determines the number of parameters. Parameters
can be `None`. This sets no limit in that direction.
::
[ (x1_min, x1_max), (x2_min, x2_max), ... ]
default_values : iterable of floats
The default values of the parameters
dx : array like
Array of step sizes to be used in numerical differentiation.
Default: ``numpy.full(len(parameter_limits), 1e-3)``
total_truth_limit : float
Maximum total number of truth events to consider in the prior. This can
be used to make priors proper in a consistent way, since the limit is
defined in the truth space, rather than the prior parameter space.
Notes
-----
The `JeffreysPrior` object can be called like a function. It will return
the prior log-likliehood of the given set of parameters::
prior_likelihood = jeffreys_prior(parameter_vector)
If the prior was constructed with more than one response matrix,
the matrix to be used for the calculation can be chosen with the
`toy_index` argument::
jeffreys_prior(parameter_vector, toy_index=5)
By construction, the JeffreysPrior will return the log probability
`-inf`, a probability of 0, when the expected *reco* values do not
depend on one of the parameters. In this case the "useless" parameter
should be removed. It simply cannot be constrained with the given
detector response.
Attributes
----------
response_matrix : ndarray
The response matrix used to calculate the Fisher matrix.
default_values : ndarray
The default values of the parameters.
lower_limits : ndarray
Lower limits of the parameters.
upper_limits : ndarray
Upper limits of the parameters.
total_truth_limit : float
The upper limit of total number of true events.
dx : ndarray
Array of step sizes to be used in numerical differentiation.
"""
def __init__(
self,
response_matrix,
translation_function,
parameter_limits,
default_values,
dx=None,
total_truth_limit=None,
):
old_shape = response_matrix.shape
new_shape = (int(np.prod(old_shape[:-2])), old_shape[-2], old_shape[-1])
self.response_matrix = response_matrix.reshape(new_shape)
self._translate = translation_function
limits = list(zip(*parameter_limits))
self.lower_limits = np.array(
[x if x is not None else -np.inf for x in limits[0]]
)
self.upper_limits = np.array(
[x if x is not None else np.inf for x in limits[1]]
)
self.total_truth_limit = total_truth_limit or np.inf
self.default_values = np.array(default_values)
self._npar = len(parameter_limits)
self._nreco = response_matrix.shape[-2]
self._i_diag = np.diag_indices(self._npar)
self.dx = dx or np.full(self._npar, 1e-3)
[docs] def translate(self, parameters):
"""Translate the parameter vector to a truth vector.
Parameters
----------
parameters : ndarray like
Vector of the hypothesis parameters.
Returns
-------
ndarray
Vector of the corresponding truth space expectation values.
"""
return self._translate(parameters)
[docs] def fisher_matrix(self, parameters, toy_index=0):
"""Calculate the Fisher information matrix for the given parameters.
Parameters
----------
parameters : array like
The parameters of the translation function.
toy_index : int, optional
The index of the response matrix to be used for the calculation
Returns
-------
ndarray
The Fisher information matrix.
"""
resp = self.response_matrix[toy_index]
npar = self._npar
nreco = self._nreco
par_mat = np.array(np.broadcast_to(parameters, (npar, npar)))
i_diag = self._i_diag
# list of parameter sets -> list of reco values
def reco_expectation(theta):
ret = np.tensordot(self.translate(theta), resp, [[-1], [-1]])
return ret
# npar by npar matrix of reco values
expect = np.broadcast_to(reco_expectation(par_mat), (npar, npar, nreco))
# Diff expects the last axis to be the parameters, not the reco values!
# varied parameter set -> transposed list of reco values
def curried(x):
par_mat[i_diag] = x
return reco_expectation(par_mat).T
diff = derivative(curried, x0=parameters, dx=self.dx).T
diff_i = np.broadcast_to(diff, (npar, npar, nreco))
diff_j = np.swapaxes(diff_i, 0, 1)
# Nansum to ignore 0. * 0. / 0.
# Equivalent to ignoring those reco bins
fish = np.nansum(diff_i * diff_j / expect, axis=-1)
return fish
def __call__(self, value, toy_index=0):
"""Calculate the prior probability of the given parameter set."""
# Out of bounds?
if np.any(value < self.lower_limits) or np.any(value > self.upper_limits):
return -np.inf
# Out of total truth bound?
if np.sum(self.translate(value)) > self.total_truth_limit:
return -np.inf
fish = self.fisher_matrix(value, toy_index)
with np.errstate(under="ignore"):
sign, log_det = np.linalg.slogdet(fish)
return 0.5 * log_det
[docs]class DataModel:
"""Base class for representation of data statistical models.
The resulting object can be called like a function::
log_likelihood = likelihood_calculator(prediction)
Parameters
----------
data_vector : array_like
Shape: ``([a,b,...,]n_reco_bins,)``
Attributes
----------
data_vector : ndarray
The data vector ``k``.
"""
def __init__(self, data_vector):
self.data_vector = np.asarray(data_vector)
[docs] def log_likelihood(self, reco_vector):
"""Calculate the likelihood of the provided expectation values.
The reco vector can have a shape ``([c,d,]n_reco_bins,)``. Assuming the
data is of shape ``([a,b,...,]n_reco_bins,)``, the output will be of
shape ``([a,b,...,][c,d,...,])``.
"""
raise NotImplementedError("Must be implemented in a subclass!")
[docs] @classmethod
def generate_toy_data(cls, reco_vector, size=None):
"""Generate toy data according to the expectation values.
The reco vector can have a shape ``([c,d,]n_reco_bins,)``. Assuming the
requested size is ``(a,b,...,)``, the output will be of shape
``([a,b,...,][c,d,...,])``.
"""
raise NotImplementedError("Must be implemented in a subclass!")
[docs] @classmethod
def generate_toy_data_model(cls, *args, **kwargs):
"""Generate toy data model according to the expectation values.
All arguments are passed to :meth:`generate_toy_data`.
"""
return cls(cls.generate_toy_data(*args, **kwargs))
def __call__(self, *args, **kwargs):
return self.log_likelihood(*args, **kwargs)
[docs]class PoissonData(DataModel):
"""Class for fast Poisson likelihood calculations.
The resulting object can be called like a function::
log_likelihood = likelihood_calculator(prediction)
Parameters
----------
data_vector : array_like int
Shape: ``([a,b,...,]n_reco_bins,)``
Notes
-----
Assumes that data does not change to avoid re-calculating things::
Poisson PMF:
p(k, mu) = (mu^k / k!) * exp(-mu)
ln(p(k, mu)) = k*ln(mu) - mu - ln(k!)
ln(k!) = -ln(p(k,mu)) + k*ln(mu) - mu = -ln(p(k,1.)) - 1.
Attributes
----------
data_vector : ndarray
The data vector ``k``.
k0 : ndarray
Mask where ``k == 0``.
ln_k_factorial : ndarray
The precomputed ``ln(k!)``.
"""
def __init__(self, data_vector):
# Save constants for PMF calculation
with np.errstate(divide="ignore", invalid="ignore"):
self.data_vector = np.asarray(data_vector, dtype=int)
k = self.data_vector
self.k0 = k == 0
self.ln_k_factorial = -(
stats.poisson.logpmf(k, np.ones_like(k, dtype=float)) + 1.0
)
@staticmethod
def _poisson_logpmf(k, k0, ln_k_factorial, mu):
with np.errstate(divide="ignore", invalid="ignore"):
pmf = k * np.log(mu) - ln_k_factorial - mu
# Need to take care of special case mu=0: k=0 -> logpmf=0, k>=1 -> logpmf=-inf
mu0 = mu == 0
pmf[mu0 & k0] = 0
pmf[mu0 & ~k0] = -np.inf
pmf[~np.isfinite(pmf)] = -np.inf
pmf = np.sum(pmf, axis=-1)
return pmf
[docs] def log_likelihood(self, reco_vector):
"""Calculate the likelihood of the provided expectation values.
The reco vector can have a shape ``([c,d,]n_reco_bins,)``. Assuming the
data is of shape ``([a,b,...,]n_reco_bins,)``, the output will be of
shape ``([a,b,...,][c,d,...,])``.
"""
reco_vector = np.asfarray(reco_vector)
data_shape = self.data_vector.shape # = ([a,b,...,]n_reco_bins,)
reco_shape = reco_vector.shape # = ([c,d,...,]n_reco_bins,)
# Cast vectors to the shape ([a,b,...,][c,d,...,]n_reco_bins,)
data_index = (
(_SLICE,) * (len(data_shape) - 1)
+ (np.newaxis,) * (len(reco_shape) - 1)
+ (_SLICE,)
)
reco_index = (
(np.newaxis,) * (len(data_shape) - 1)
+ (_SLICE,) * (len(reco_shape) - 1)
+ (_SLICE,)
)
# Calculate the log probabilities of shape ([a,b,...,][c,d,...,]).
return self._poisson_logpmf(
self.data_vector[data_index],
self.k0[data_index],
self.ln_k_factorial[data_index],
reco_vector[reco_index],
)
[docs] @classmethod
def generate_toy_data(cls, reco_vector, size=None):
"""Generate toy data according to the expectation values.
The reco vector can have a shape ``([c,d,]n_reco_bins,)``. Assuming the
requested size is ``(a,b,...,)``, the output will be of shape
``([a,b,...,][c,d,...,])``.
"""
mu = reco_vector
if size is not None:
# Append truth vector shape to requested shape of data sets
try:
shape = list(size)
except TypeError:
shape = [size]
shape.extend(mu.shape)
size = shape
data = np.random.poisson(mu, size=size)
return data
[docs]class SystematicsConsumer:
"""Class that consumes the systematics axis on an array of log likelihoods."""
[docs] @staticmethod
def consume_axis(log_likelihood, weights=None):
"""Collapse the systematic axes according to the systematics mode."""
raise NotImplementedError("Must be implemented in a subclass!")
def __call__(self, *args, **kwargs):
return self.consume_axis(*args, **kwargs)
[docs]class NoSystematics(SystematicsConsumer):
"""SystematicsConsumer that does nothing."""
[docs] @staticmethod
def consume_axis(log_likelihood, weights=None):
return log_likelihood
[docs]class MarginalLikelihoodSystematics(SystematicsConsumer):
"""SystematicsConsumer that averages over the systematic axis.
Optionally applies weights.
"""
[docs] @staticmethod
def consume_axis(log_likelihood, weights=None):
if weights is None:
weights = np.ones_like(log_likelihood)
log_weights = np.log(weights / np.sum(weights, axis=-1, keepdims=True))
weighted = log_likelihood + log_weights
# Avoid numerical problems by using this "trick"
max_weighted = np.max(weighted, axis=-1, keepdims=True)
i_inf = ~np.isfinite(max_weighted)
max_weighted[i_inf] = 0.0
weighted = weighted - max_weighted
with np.errstate(under="ignore"):
ret = max_weighted[..., 0] + np.logaddexp.reduce(weighted, axis=-1)
return ret
[docs]class ProfileLikelihoodSystematics(SystematicsConsumer):
"""SystematicsConsumer that maximises over the systematic axes."""
[docs] @staticmethod
def consume_axis(log_likelihood, weights=None):
return np.max(log_likelihood, axis=-1)
[docs]class Predictor:
"""Base class for objects that turn sets of parameters to predictions.
Parameters
----------
bounds : ndarray
Lower and upper bounds for all parameters. Can be ``+/- np.inf``.
defaults : ndarray
"Reasonable" default values for the parameters. Used in optimisations.
Attributes
----------
bounds : ndarray
Lower and upper bounds for all parameters. Can be ``+/- np.inf``.
defaults : ndarray
"Reasonable" default values for the parameters. Used in optimisations.
"""
def __init__(self, bounds, defaults):
self.bounds = np.asarray(bounds)
self.defaults = np.asarray(defaults)
[docs] def check_bounds(self, parameters):
"""Check that all parameters are within bounds."""
parameters = np.asfarray(parameters)
check = (parameters >= self.bounds[:, 0]) & (parameters <= self.bounds[:, 1])
return np.all(check, axis=-1)
[docs] def prediction(self, parameters, systematics_index=_SLICE):
"""Turn a set of parameters into an ndarray of reco predictions.
Parameters
----------
parameters : ndarray
systematics_index : int, optional
Returns
-------
prediction, weights : ndarray
Notes
-----
The regular output must have at least two dimensions. The last
dimension corresponds to the number of predictions, e.g. reco bin
number. The second to last dimension corresponds to systematic
prediction uncertainties of a single parameter set.
Parameters can be arbitrarily shaped ndarrays. The method must support
turning a multiple sets of parameters into an array of predictions::
parameters.shape == ([c,d,...,]n_parameters,)
prediction.shape == ([c,d,...,]n_systematics,n_predictions,)
weights.shape == ([c,d,...,]n_systematics,)
If the `systematics_index` is specified, only the respective value on
the systematics axis should be returned::
prediction.shape == ([c,d,...,]n_predictions,)
weights.shape == ([c,d,...,],)
"""
raise NotImplementedError("This function must be implemented in a subclass!")
[docs] def fix_parameters(self, fix_values):
"""Return a new Predictor with fewer free parameters.
Parameters
----------
fix_values : iterable
List of the parameter values that the parameters should be fixed at.
The list must be of the same length as the number of parameters of `predictor`.
Any parameters that should remain unfixed should be specified with ``None`` or
``np.nan``.
See also
--------
FixedParameterPredictor
"""
return FixedParameterPredictor(self, fix_values)
[docs] def compose(self, other):
"""Return a new Predictor that is a composition with `other`.
::
new_predictor(parameters) = self(other(parameters))
"""
return ComposedPredictor([self, other])
def __call__(self, *args, **kwargs):
"""See :meth:`prediction`."""
return self.prediction(*args, **kwargs)
[docs]class ComposedPredictor(Predictor):
"""Wrapper class that composes different Predictors into one.
Parameters
----------
predictors : list of Predictor
The Predictors will be composed in turn. The second must predict
parameters for the first, the third for the second, etc. The last
Predictor defines what parameters will be accepted by the resulting
Predictor.
combine_systematics : string, optional
The strategy how to combine the systematics of the Predictors.
Default: "cartesian"
Notes
-----
Systematics will be handled according to the `combine_systematics`
parameter. Possible values are:
``"cartesian"``
Combine systematics in a Cartesian product. There will be one
output prediction for each possible combination of intermediate
systematics.
``"same"``
Assume that the systamtics are the same and no combination is done.
The dimensions and weights for the systematics _must_ be identical
for all provided Predictors.
"""
def __init__(self, predictors, combine_systematics="cartesian"):
self.predictors = predictors
# TODO: Currently taking the bounds from the last predictor
# This might still run into the bounds of intermediate predictors
self.bounds = predictors[-1].bounds
self.defaults = predictors[-1].defaults
self.combine_systematics = combine_systematics
def _compose_cartesian(self, parameters, systematics_index):
"""Apply cartesian combination of systematics."""
parameters = np.asarray(parameters)
orig_shape = parameters.shape
orig_len = len(orig_shape)
weights = np.ones(parameters.shape[:-1])
for pred in reversed(self.predictors):
parameters, w = pred.prediction(parameters)
weights = weights[..., np.newaxis] * w
# Flatten systematics
# original_parameters.shape = ([c,d,...,]n_parameters)
# chained_output.shape = ([c,d,...,]systn,...,syst0,n_reco)
# reordered_output.shape = ([c,d,...,]syst0,...,systn,n_reco)
# desired_output.shape = ([c,d,...,]syst,n_reco)
shape = parameters.shape
shape_len = len(shape)
new_order = (
tuple(range(orig_len - 1)) # c,d,...
+ tuple(range(shape_len - 2, orig_len - 2, -1)) # syst0,syst1,...
+ (shape_len - 1,) # n_reco
)
parameters = np.transpose(parameters, new_order)
parameters = parameters.reshape(
orig_shape[:-1] + (np.prod(shape[orig_len - 1 : -1]),) + shape[-1:]
)
# c,d,... syst0,syst1,...
new_order = tuple(range(orig_len - 1)) + tuple(
range(shape_len - 2, orig_len - 2, -1)
)
weights = np.transpose(weights, new_order)
weights = weights.reshape(
orig_shape[:-1] + (np.prod(shape[orig_len - 1 : -1]),)
)
return parameters[..., systematics_index, :], weights[..., systematics_index]
def _compose_same(self, parameters, systematics_index):
"""Apply 'same' combination of systematics."""
parameters = np.asarray(parameters)
for i, pred in enumerate(reversed(self.predictors)):
if isinstance(systematics_index, int):
# Just use a single systematic index
parameters, w = pred.prediction(
parameters, systematics_index=systematics_index
)
continue
# Need to handle all systematics
if i == 0:
# First predictor
# Just do the regular thing
parameters, w = pred.prediction(
parameters, systematics_index=systematics_index
)
else:
# Second and on, need to make sure to apply the right systematics
# to the right parameters
temp_pars = []
for s in range(parameters.shape[-2]):
# Loop over all systematic indices
p, _ = pred.prediction(parameters[..., s, :], systematics_index=s)
temp_pars.append(p[..., np.newaxis, :]) # Insert systematics axis
parameters = np.concatenate(temp_pars, axis=-2)
weights = w
return parameters, weights
[docs] def prediction(self, parameters, systematics_index=_SLICE):
"""Turn a set of parameters into an ndarray of predictions.
Parameters
----------
parameters : ndarray
systematics_index : int, optional
Returns
-------
prediction, weights : ndarray
Notes
-----
The optional argument `systematics_index` will be applied to the final
output of the composed predictions, e.g. the flattened Cartesian
product of the intermediate systematics if the combination strategy is
"cartesian".
"""
if self.combine_systematics == "cartesian":
return self._compose_cartesian(parameters, systematics_index)
elif self.combine_systematics == "same":
return self._compose_same(parameters, systematics_index)
else:
raise ValueError(
"%s is not a valid systematics combination strategy."
% (self.combine_systematics)
)
[docs]class SummedPredictor(Predictor):
"""Wrapper class that sums predictions of multiple Predictors.
Parameters
----------
predictors : list of Predictor
The Predictors whos output will be added. The resulting Predictor will
accept the concatenated list of the separate original input parameters
in the same order as they appear in the list.
scale_factors : iterable of float or arrays: optional
Provide a scaling factor for each predictor. The output will be
multiplied with the factor before summing up the predictions.
Each factor can also be an array of factors so that each systematic
variation gets a different scaling factor. But it needs to be applicable
by multiplication, i.e. ``scaled_out = factor * out``. This can be achieved
by giving each matrix the shape ``(n_systematics, 1)``.
combine_systematics : string, optional
The strategy how to combine the systematics of the Predictors.
Default: "cartesian"
Notes
-----
Systematics will be handled according to the `combine_systematics`
parameter. Possible values are:
``"cartesian"``
Combine systematics in a Cartesian product. There will be one
output prediction for each possible combination of intermediate
systematics.
``"same"``
Assume that the systamtics are the same and no combination is done.
The dimensions and weights for the systematics _must_ be identical
for all provided Predictors.
See also
--------
ConcatenatedPredictor
"""
def __init__(self, predictors, scale_factors=None, combine_systematics="cartesian"):
self.predictors = predictors
self.bounds = np.concatenate([p.bounds for p in predictors], axis=0)
self.defaults = np.concatenate([p.defaults for p in predictors], axis=0)
self.scale_factors = scale_factors
self.combine_systematics = combine_systematics
[docs] def prediction(self, parameters, systematics_index=_SLICE):
"""Turn a set of parameters into an ndarray of predictions.
Parameters
----------
parameters : ndarray
systematics_index : int, optional
Returns
-------
prediction, weights : ndarray
Notes
-----
The optional argument `systematics_index` will be applied to the final
output of the summed predictions, i.e. the flattened combination of the
original systematics.
"""
parameters = np.asarray(parameters)
orig_shape = parameters.shape
orig_len = len(orig_shape)
weights = np.ones(parameters.shape[:-1])
prediction = np.array([0.0])
i_par = 0
# Get the scale factors or generate ones
scale_factors = self.scale_factors
if scale_factors is None:
scale_factors = np.ones_like(self.predictors)
if len(scale_factors) != len(self.predictors):
raise ValueError(
"Must provide same number of predictors and scale factors!"
)
for pred, factor in zip(self.predictors, scale_factors):
# Consume the given parameters in order.
j_par = i_par + len(pred.defaults)
par = parameters[..., i_par:j_par]
i_par = j_par
p, w = pred.prediction(par)
p = np.asarray(factor) * p
if self.combine_systematics == "cartesian":
# Sum up predictions and create new axis for systematics
# p.shape = ([c,d,...,]syst_n+1,n_reco)
# prediction.shape = ([c,d,...,]syst_0,...,syst_n-1,n_reco)
# new.shape = ([c,d,...,]syst_0,...,syst_n-1,syst_n,n_reco)
ndim_diff = prediction.ndim - p.ndim
slices = (
(Ellipsis,) + (np.newaxis,) * (ndim_diff + 1) + (slice(None),) * 2
)
p = p[slices]
prediction = prediction[..., np.newaxis, :] + p
slices = (Ellipsis,) + (np.newaxis,) * (ndim_diff + 1) + (slice(None),)
w = w[slices]
weights = weights[..., np.newaxis] * w
elif self.combine_systematics == "same":
# All weights and predictions have the same shape
# Weights have the same values
prediction = prediction + p
weights = w
else:
raise ValueError(
"%s is not a valid systematics combination strategy."
% (self.combine_systematics)
)
# Flatten output
# original_parameters.shape = ([c,d,...,]n_parameters)
# summed_output.shape = ([c,d,...,]syst0,...,systn,n_reco)
# desired_output.shape = ([c,d,...,]syst,n_reco)
shape = prediction.shape
prediction = prediction.reshape(
orig_shape[:-1] + (np.prod(shape[orig_len - 1 : -1]),) + shape[-1:]
)
weights = weights.reshape(
orig_shape[:-1] + (np.prod(shape[orig_len - 1 : -1]),)
)
return prediction[..., systematics_index, :], weights[..., systematics_index]
[docs]class ConcatenatedPredictor(Predictor):
"""Wrapper class that concatenates predictions of multiple Predictors.
Parameters
----------
predictors : list of Predictor
The Predictors whos output will be concateanted.
share_parameters : bool, default=False
If ``True``, the `predictors` must accept the same number of
parameters. The resulting predictor will then accept the same number
and apply them to all. Otherwise the resulting Predictor will accept
the concatenated list of the separate original input parameters in the
same order as they appear in `predictors`.
combine_systematics : string, optional
The strategy how to combine the systematics of the Predictors.
Default: "cartesian"
Notes
-----
Systematics will be handled according to the `combine_systematics`
parameter. Possible values are:
``"cartesian"``
Combine systematics in a Cartesian product. There will be one
output prediction for each possible combination of intermediate
systematics.
``"same"``
Assume that the systamtics are the same and no combination is done.
The dimensions and weights for the systematics _must_ be identical
for all provided Predictors.
See also
--------
SummedPredictor
"""
def __init__(
self, predictors, share_parameters=False, combine_systematics="cartesian"
):
self.predictors = predictors
if share_parameters:
self.bounds = predictors[0].bounds
self.defaults = predictors[0].defaults
else:
self.bounds = np.concatenate([p.bounds for p in predictors], axis=0)
self.defaults = np.concatenate([p.defaults for p in predictors], axis=0)
self.share_parameters = share_parameters
self.combine_systematics = combine_systematics
def _concatenate_cartesian(self, parameters):
"""Calculate predictions using the "cartesian" systematics strategy."""
orig_shape = parameters.shape
weights = np.ones(parameters.shape[:-1])
predictions = []
i_par = 0
for pred in self.predictors:
if not self.share_parameters:
# Consume the given parameters in order.
j_par = i_par + len(pred.defaults)
par = parameters[..., i_par:j_par]
i_par = j_par
else:
par = parameters
p, w = pred.prediction(par)
predictions.append(p)
# Bring weights in right shape
# w.shape = ([c,d,...,]syst_n)
# weights.shape = ([c,d,...,]syst_0*...*syst_n-1)
# new.shape = ([c,d,...,]syst_0*...*syst_n-1*syst_n)
ndim_diff = weights.ndim - w.ndim
slices = (Ellipsis,) + (np.newaxis,) * (ndim_diff + 1) + (slice(None),)
w = w[slices]
weights = weights[..., np.newaxis] * w
weights = weights.reshape(orig_shape[:-1] + (-1,))
# Concatenate predictions with right systematic axes
# pred shapes = ([c,d,...,]syst_i,n_reco_i)
# desired output shape = ([c,d,...,]syst0*...*systn,n_reco)
prediction = predictions[0]
for i in range(1, len(predictions)):
# prediction.shape = ([c,d,...,]syst_X,n_reco_X)
# p1.shape = ([c,d,...,]syst_n,n_reco_n)
# intermediate.shape = ([c,d,...,]sys_X,syst_n,n_reco)
# final.shape = ([c,d,...,]sys_X*syst_n,n_reco)
p1 = predictions[i]
p0 = np.broadcast_to(
prediction[..., np.newaxis, :],
orig_shape[:-1]
+ (prediction.shape[-2], p1.shape[-2], prediction.shape[-1]),
)
p1 = np.broadcast_to(
p1[..., np.newaxis, :, :],
orig_shape[:-1] + (prediction.shape[-2], p1.shape[-2], p1.shape[-1]),
)
prediction = np.concatenate([p0, p1], axis=-1)
prediction = prediction.reshape(
orig_shape[:-1] + (-1, prediction.shape[-1])
)
return prediction, weights
def _concatenate_same(self, parameters):
"""Calculate predictions using the "same" systematics strategy."""
weights = np.ones(parameters.shape[:-1])
predictions = []
i_par = 0
for pred in self.predictors:
if not self.share_parameters:
# Consume the given parameters in order.
j_par = i_par + len(pred.defaults)
par = parameters[..., i_par:j_par]
i_par = j_par
else:
par = parameters
p, w = pred.prediction(par)
predictions.append(p)
weights = w
prediction = np.concatenate(predictions, axis=-1)
return prediction, weights
[docs] def prediction(self, parameters, systematics_index=_SLICE):
"""Turn a set of parameters into an ndarray of predictions.
Parameters
----------
parameters : ndarray
systematics_index : int, optional
Returns
-------
prediction, weights : ndarray
Notes
-----
The optional argument `systematics_index` will be applied to the final
output of the summed predictions, i.e. the flattened combination of the
original systematics.
"""
parameters = np.asarray(parameters)
if self.combine_systematics == "cartesian":
prediction, weights = self._concatenate_cartesian(parameters)
elif self.combine_systematics == "same":
prediction, weights = self._concatenate_same(parameters)
else:
raise ValueError(
"%s is not a valid systematics combination strategy."
% (self.combine_systematics)
)
return prediction[..., systematics_index, :], weights[..., systematics_index]
[docs]class FixedParameterPredictor(Predictor):
"""Wrapper class that fixes parameters of another predictor.
The resulting `Predictor` will have fewer parameters, namely those that have
not been fixed.
Paramters
---------
predictor : Predictor
The original predictor which will have some of its parameters fixed.
fix_values : iterable
List of the parameter values that the parameters should be fixed at.
The list must be of the same length as the number of parameters of `predictor`.
Any parameters that should remain unfixed should be specified with ``None`` or
``np.nan``.
"""
def __init__(self, predictor, fix_values):
self.predictor = predictor
self.fix_values = np.array(fix_values, dtype=float)
insert_mask = ~np.isnan(self.fix_values)
insert_indices = np.argwhere(insert_mask).flatten()
self.insert_values = self.fix_values[insert_indices]
# Indices must be provided as indices on the array with the missing values
self.insert_indices = insert_indices - np.arange(insert_indices.size)
self.bounds = predictor.bounds[~insert_mask]
self.defaults = predictor.defaults[~insert_mask]
[docs] def insert_fixed_parameters(self, parameters):
"""Insert the fixed parameters into a vector of free parameters."""
parameters = np.asarray(parameters)
return np.insert(parameters, self.insert_indices, self.insert_values, axis=-1)
[docs] def prediction(self, parameters, systematics_index=_SLICE):
"""Turn a set of parameters into an ndarray of predictions.
Parameters
----------
parameters : ndarray
systematics_index : int, optional
Returns
-------
prediction, weights : ndarray
"""
parameters = self.insert_fixed_parameters(parameters)
return self.predictor(parameters, systematics_index)
[docs]class LinearEinsumPredictor(Predictor):
"""Predictor calcuating a linear combination of the paramters using `np.einsum`.
Paramters
---------
subscripts : str
The subscripts string provided to `np.einsum`.
coefficients : ndarray or tuple of
The first operand to be supplies to `np.einsum`.
Shape: ``(n_systematics, [I, J, ...,] K)``
constants : ndarray, optional
Constants to be added to the result of `np.einsum` after flattening it.
Shape: ``([n_systematics,] n_output)``
weights : ndarray, optional
Shape: ``([n_systematics,])``
bounds : ndarray, optional
Lower and upper bounds for all parameters. Can be ``+/- np.inf``.
defaults : ndarray, optional
"Reasonable" default values for the parameters. Used in optimisation.
reshape_parameters : tuple of int, optional
Reshape the last axis of the parameters to the given shape before
the einsum contraction.
Notes
-----
At least one of `bounds`, `defaults`, or `reshape_parameters` must be
provided in order to calculate the expected number of parameters.
The prediction will be calculated using `np.einsum` using the provided
`subscripts` string. The two operands will be the provided `coefficients`
as well as the parameters of the prediction function call.
The parameters will have the shape ``[(A,B,...,)+]reshape_parameters``, if
`reshape_parameters` is set, or ``([A,B,...,]n_parameters)`` otherwise.
The output of the `np.einsum` _must_ have the shape
``([A,B,...,]n_systematics,[X,Y,...,]Z)``, no matter whethre the dimensions
``A, B, ...`` actually exist.
Examples
--------
This predictor will calculate the product of the parameters with the
systematically varied matrices:
>>> M = [ [[1,0], [0,1]], [[0.9, 0], [0, 0.8]] ]
>>> p = LinearEinsumPredictor("ijk,...k->...ij", M, defaults=[1,1])
>>> p([1.0, 2.0])
(array([[1. , 2. ],
[0.9, 1.6]]),
array([1., 1.]))
Interpret parameters as matrix and do a matrix multiplication:
>>> M = [ [[1,0], [0,1]], [[0.9, 0], [0, 0.8]] ]
>>> p = LinearEinsumPredictor("ijk,...kl->...ijl", M, reshape_parameters=(2,2))
>>> p([1.0, 2.0, 3.0, 4.0])
(array([[1. , 2. , 3. , 4. ],
[0.9, 1.8, 2.4, 3.2]]),
array([1., 1.]))
Do an element-wise multiplication:
>>> M = [ [1, 2, 3, 4] ]
>>> p = LinearEinsumPredictor("ij,...j->...ij", M, reshape_parameters=(4,))
>>> p([1.0, 1.0, 1.0, 1.0])
(array([[1., 2., 3., 4.]]), array([1.]))
"""
def __init__(
self,
subscripts,
coefficients,
constants=0.0,
weights=1.0,
bounds=None,
defaults=None,
reshape_parameters=None,
):
self.subscripts = subscripts
self.coefficients = np.asfarray(coefficients)
self.constants = np.asfarray(constants)
while self.constants.ndim < 2:
self.constants = self.constants[np.newaxis, ...]
self.weights = np.asfarray(weights)
while self.weights.ndim < 1:
self.weights = self.weights[np.newaxis, ...]
if bounds is not None:
n_parameters = len(bounds)
elif defaults is not None:
n_parameters = len(defaults)
elif reshape_parameters is not None:
n_parameters = np.prod(reshape_parameters)
else:
raise RuntimeError(
"At least on of `bounds`, `defaults`, or `reshape_parameters` must be provided!"
)
if bounds is None:
bounds = np.array([(-np.inf, np.inf)] * n_parameters)
if defaults is None:
defaults = np.array([1.0] * n_parameters)
self.reshape_parameters = reshape_parameters
Predictor.__init__(self, bounds, defaults)
[docs] def prediction(self, parameters, systematics_index=_SLICE):
"""Turn a set of parameters into a reco prediction.
Returns
-------
prediction : ndarray
weights : ndarray
"""
if isinstance(systematics_index, int):
if systematics_index >= len(self.coefficients):
raise IndexError("Systematic index is out of bounds.")
systematics_index = slice(systematics_index, systematics_index + 1)
remove_systematics = True
else:
remove_systematics = False
coefficients = self.coefficients[systematics_index]
weights = self.weights
if len(weights) > 1:
# If there is only one, it applies to all systematics
weights = weights[systematics_index]
constants = self.constants
if len(constants) > 1:
# If there is only one, it applies to all systematics
constants = constants[systematics_index]
# Get parameters as vector
parameters = np.asarray(parameters)
orig_shape = parameters.shape
# Reshape parameters if requested
if self.reshape_parameters is not None:
parameters = parameters.reshape(orig_shape[:-1] + self.reshape_parameters)
# Calculate einsum
prediction = np.einsum(self.subscripts, coefficients, parameters)
# Flatten output
shape = prediction.shape[: len(orig_shape)] + (-1,)
prediction = prediction.reshape(shape)
# Apply constants
prediction = prediction + constants
# Reshape weights
weights = np.broadcast_to(weights, prediction.shape[:-1])
if remove_systematics:
prediction.shape = prediction.shape[:-2] + prediction.shape[-1:]
weights.shape = weights.shape[:-1]
return prediction, weights
[docs]class MatrixPredictor(LinearEinsumPredictor):
"""Predictor that uses a matrix to fold parameters into reco space.
::
output = matrix X parameters [+ constants]
Parameters
----------
matrices : ndarray
Shape: ``([n_systematics,]n_reco_bins,n_parameters)``
constants : ndarray, optional
Shape: ``([n_systematics,]n_reco_bins)``
weights : ndarray, optional
Shape: ``(n_systematics)``
bounds : ndarray, optional
Lower and upper bounds for all parameters. Can be ``+/- np.inf``.
defaults : ndarray, optional
"Reasonable" default values for the parameters. Used in optimisation.
sparse_indices : list or array of int, optional
Used with sparse matrices that provide only the specified columns.
All other columns are assumed to be 0, i.e. the parameters corresponding
to these have no effect.
See also
--------
Predictor
Attributes
----------
bounds : ndarray
Lower and upper bounds for all parameters. Can be ``+/- np.inf``.
defaults : ndarray
"Reasonable" default values for the parameters. Used optimisation.
sparse_indices : list or array of int or slice
Used with sparse matrices that provide only the specified columns.
All other columns are assumed to be 0, i.e. the parameters corresponding
to these have no effect.
"""
def __init__(self, matrices, constants=0.0, *, sparse_indices=_SLICE, **kwargs):
self.matrices = np.asfarray(matrices)
while self.matrices.ndim < 3:
self.matrices = self.matrices[np.newaxis, ...]
self.sparse_indices = sparse_indices
LinearEinsumPredictor.__init__(
self,
"ijk,...k->...ij",
self.matrices,
constants=constants,
reshape_parameters=self.matrices.shape[-1:],
**kwargs,
)
[docs] def prediction(self, parameters, *args, **kwargs):
"""Turn a set of parameters into a reco prediction.
Returns
-------
prediction : ndarray
weights : ndarray
"""
# Get parameters as vector
parameters = np.asarray(parameters)
# Account for spare matrices
sparse_parameters = parameters[..., self.sparse_indices]
pred, weight = LinearEinsumPredictor.prediction(
self, sparse_parameters, *args, **kwargs
)
return pred, weight
[docs] def compose(self, other):
"""Return a new Predictor that is a composition with `other`.
::
new_predictor(parameters) = self(other(parameters))
"""
if isinstance(other, MatrixPredictor):
return ComposedMatrixPredictor([self, other])
else:
return Predictor.compose(self, other)
[docs] def fix_parameters(self, fix_values):
"""Return a new Predictor with fewer free parameters.
Parameters
----------
fix_values : iterable
List of the parameter values that the parameters should be fixed at.
The list must be of the same length as the number of parameters of `predictor`.
Any parameters that should remain unfixed should be specified with ``None`` or
``np.nan``.
See also
--------
FixedParameterMatrixPredictor
"""
return FixedParameterMatrixPredictor(self, fix_values)
[docs]class ComposedMatrixPredictor(MatrixPredictor, ComposedPredictor):
"""Composition of MatrixPredictors.
Parameters
----------
predictors : list of Predictor
The Predictors will be composed in turn. The second must predict
parameters for the first, the third for the second, etc. The last
Predictor defines what parameters will be accepted by the resulting
Predictor.
evaluation_point : array_like float, optional
Shape: ``(n_parameters,)``
Anchor point for the linearisation.
evaluation_steps : array_like float, optional
Shape: ``(n_parameters,)``
Step away from the anchor point for the linearisation.
combine_systematics : string, optional
The strategy how to combine the systematics of the Predictors.
Default: "cartesian"
Notes
-----
This :class:`MatrixPredictor` will calculate a linear coefficients and
offsets by evaluating the composed provided predictors at
`evaluation_point`. The gradient at that point will be calculated by adding
the `evaluation_steps` for each parameter separately.
For a composition of multiple :class:`MatrixPredictor`, this will still
lead to an exact representation (modulo variations from numerical
accuracy). For non-linear, generic :class:`Predictor`, this means we are
generating a linear approximation at `evaluation_point`.
If no `evaluation_point` or `evaluation_steps` is provided, they will be
generated from the last predictor's defaults and bounds.
.. warning:: The auto-generated steps should work for linear functions,
but might be completely unsuitable, i.e. too large, for linear
approximations of general functions.
Systematics will be handled according to the `combine_systematics`
parameter. Possible values are:
``"cartesian"``
Combine systematics in a Cartesian product. There will be one
output prediction for each possible combination of intermediate
systematics.
``"same"``
Assume that the systamtics are the same and no combination is done.
The dimensions and weights for the systematics _must_ be identical
for all provided Predictors.
See also
--------
MatrixPredictor
ComposedPredictor
"""
def __init__(
self, predictors, evaluation_point=None, evaluation_steps=None, **kwargs
):
# Init like ComposedPredictor
ComposedPredictor.__init__(self, predictors, **kwargs)
if evaluation_point is None:
evaluation_point = self.defaults
if evaluation_steps is None:
evaluation_steps = (self.bounds[:, 1] - self.bounds[:, 0]) / 100.0
# Use step size 1 if bounds are infinite
evaluation_steps[~np.isfinite(evaluation_steps)] = 1.0
# Flip steps that would go out of upper bound
i = evaluation_point + evaluation_steps > self.bounds[:, 1]
evaluation_steps[i] *= -1.0
# Use methods of regular ComposedPredictor to calculate everything
y0, weights = ComposedPredictor.prediction(self, evaluation_point)
columns = []
for i, dx in enumerate(evaluation_steps):
par = np.array(evaluation_point)
par[i] += dx
y1, _ = ComposedPredictor.prediction(self, par)
columns.append(((y1 - y0) / dx)[..., np.newaxis])
matrices = np.concatenate(columns, axis=-1)
constants = y0 - (matrices @ evaluation_point)
MatrixPredictor.__init__(
self,
matrices,
constants=constants,
weights=weights,
bounds=self.bounds,
defaults=self.defaults,
sparse_indices=_SLICE,
)
[docs]def LinearizedPredictor(predictor, evaluation_point=None, evaluation_steps=None):
"""Thin wrapper to create a linearized predictor.
Parameters
----------
predictor : Predictor
The predictor that should be linearized.
evaluation_point : array_like float, optional
Shape: ``(n_parameters,)``
Anchor point for the linearisation.
evaluation_steps : array_like float, optional
Shape: ``(n_parameters,)``
Step away from the anchor point for the linearisation.
Returns
-------
predictor : ComposedMatrixPredictor
Notes
-----
This is just a thin wrapper function to create a
:class:`ComposedMatrixPredictor`. Instead of multiple :class:`Predictor`,
it only accepts a single one.
See also
--------
ComposedMatrixPredictor
"""
return ComposedMatrixPredictor(
[
predictor,
],
evaluation_point=evaluation_point,
evaluation_steps=evaluation_steps,
)
[docs]class FixedParameterMatrixPredictor(MatrixPredictor, FixedParameterPredictor):
"""Wrapper class that fixes parameters of a linear predictor.
Speeds things up considerably compared to the universal `FixedParameterPredictor`,
but only works with `MatrixPredictor`.
Paramters
---------
predictor : MatrixPredictor
The original predictor which will have some of its parameters fixed.
fix_values : iterable
List of the parameter values that the parameters should be fixed at.
The list must be of the same length as the number of parameters of `predictor`.
Any parameters that should remain unfixed should be specified with ``None`` or
``np.nan``.
See also
--------
FixedParameterPredictor
MatrixPredictor
"""
def __init__(self, predictor, fix_values):
FixedParameterPredictor.__init__(self, predictor, fix_values)
matrices = self.predictor.matrices[..., np.isnan(self.fix_values)]
const_par = self.fix_values
const_par[np.isnan(self.fix_values)] = 0.0
constants, weights = self.predictor(const_par)
MatrixPredictor.__init__(
self,
matrices,
constants=constants,
weights=weights,
bounds=self.bounds,
defaults=self.defaults,
sparse_indices=_SLICE,
)
[docs]class ResponseMatrixPredictor(MatrixPredictor):
"""Event rate predictor from ResponseMatrix objects.
Parameters
----------
filename : str or file object
The exported information of a ResponseMatrix or
ResponseMatrixArrayBuilder.
"""
def __init__(self, filename):
data = np.load(filename)
matrices = data["matrices"]
constants = 0.0
weights = data.get("weights", 1.0)
if data.get("is_sparse", False):
sparse_indices = data["sparse_indices"]
else:
sparse_indices = _SLICE
eps = np.finfo(
float
).eps # Add epsilon so there is a very small allowed range for empty bins
bounds = [(0.0, x + eps) for x in data["truth_entries"]]
defaults = data["truth_entries"] / 2.0
MatrixPredictor.__init__(
self,
matrices,
constants=constants,
weights=weights,
bounds=bounds,
defaults=defaults,
sparse_indices=sparse_indices,
)
[docs]class TemplatePredictor(MatrixPredictor):
"""MatrixPredictor from templates.
Parameters
----------
templates : array like
The templates to be combined together.
Each template gets its own weight parameter.
Shape: ``([n_systematics,]n_templates,len(template))``
constants : ndarray, optional
Shape: ``([n_systematics,]n_reco_bins)``
**kwargs : optional
Additional keyword arguments are passed to the MatrixPredictor.
"""
def __init__(self, templates, constants=0, **kwargs):
matrices = np.asarray(templates)
matrices = np.array(np.swapaxes(matrices, -1, -2))
bounds = kwargs.pop("bounds", [(0.0, np.inf)] * matrices.shape[-1])
defaults = kwargs.pop("defaults", [1.0] * matrices.shape[-1])
MatrixPredictor.__init__(
self,
matrices,
constants=constants,
bounds=bounds,
defaults=defaults,
**kwargs,
)
[docs]class LikelihoodCalculator:
"""Class that calculates the likelihoods of parameter sets.
Parameters
----------
data_model : DataModel
Object that describes the statistical model of the data.
predictor : Predictor
The object that translates parameter sets into reco expectation values.
systematics : {'marginal', 'profile'} or SystematicsConsumer, optional
Specifies how to handle systematic prediction uncertainties, i.e. multiple
predictions from a single parameter set.
Notes
-----
TODO
"""
def __init__(self, data_model, predictor, systematics="marginal"):
self.data_model = data_model
self.predictor = predictor
if systematics == "marginal" or systematics == "average":
self.systematics = MarginalLikelihoodSystematics
elif systematics == "profile" or systematics == "maximum":
self.systematics = ProfileLikelihoodSystematics
else:
self.systematics = systematics
[docs] def log_likelihood(self, *args, **kwargs):
"""Calculate the log likelihood of the given parameters.
Passes all arguments to the `predictor`.
"""
prediction, weights = self.predictor.prediction(*args, **kwargs)
log_likelihood = self.data_model.log_likelihood(prediction)
log_likelihood = self.systematics.consume_axis(log_likelihood, weights)
# Fix out of bounds to -inf
check = self.predictor.check_bounds(*args)
if check.ndim == 0:
if not check:
log_likelihood = -np.inf
else:
log_likelihood[..., ~check] = -np.inf
return log_likelihood
[docs] def generate_toy_likelihood_calculators(self, parameters, N=1, **kwargs):
"""Generate LikelihoodCalculator objects with randomly varied data.
Accepts only single set of parameters.
Returns
-------
toy_calculators : list of LikelihoodCalculator
"""
parameters = np.asarray(parameters)
if parameters.ndim != 1:
raise ValueError("Parameters must be 1D array!")
predictor = self.predictor
systematics = self.systematics
prediction, weights = predictor(parameters, **kwargs)
weights = weights / np.sum(weights, axis=-1)
toys = []
for _i in range(N):
j = np.random.choice(len(weights), p=weights)
data_model = self.data_model.generate_toy_data_model(prediction[j])
toys.append(LikelihoodCalculator(data_model, predictor, systematics))
return toys
[docs] def fix_parameters(self, fix_values):
"""Return a new LikelihoodCalculator with fewer free parameters.
Parameters
----------
fix_values
"""
data = self.data_model
pred = self.predictor.fix_parameters(fix_values)
syst = self.systematics
return LikelihoodCalculator(data, pred, syst)
[docs] def compose(self, predictor):
"""Return a new LikelihoodCalculator with the composed Predictor.
Parameters
----------
predictor : Predictor
The predictor of the calculator will be composed with this.
"""
data = self.data_model
pred = self.predictor.compose(predictor)
syst = self.systematics
return LikelihoodCalculator(data, pred, syst)
def __call__(self, *args, **kwargs):
return self.log_likelihood(*args, **kwargs)
[docs]class LikelihoodMaximizer:
"""Class to maximise the likelihood over a parameter space."""
[docs] def minimize(self, fun, x0, bounds, **kwargs):
"""General minimisation function."""
raise NotImplementedError("Must be implemented in a subclass!")
[docs] def maximize_log_likelihood(self, likelihood_calculator, **kwargs):
"""Maximise the likelihood"""
def fun(x):
fun = -likelihood_calculator(x)
return fun
bounds = likelihood_calculator.predictor.bounds
x0 = likelihood_calculator.predictor.defaults
if len(x0) == 0:
# Nothing to optimise, return dummy result
opt = optimize.OptimizeResult()
opt.x = np.ndarray(0)
opt.fun = -likelihood_calculator(opt.x)
opt.log_likelihood = -opt.fun
else:
opt = self.minimize(fun, x0, bounds, **kwargs)
opt.log_likelihood = -opt.fun
return opt
def __call__(self, *args, **kwargs):
return self.maximize_log_likelihood(*args, **kwargs)
[docs]class BasinHoppingMaximizer(LikelihoodMaximizer):
"""Class to maximise the likelihood over a parameter space.
Uses SciPy's Basin Hopping algorithm.
Parameters
----------
**kwargs : optional
Arguments to be passed to the basin hopping function.
"""
def __init__(self, **kwargs):
self.kwargs = kwargs
[docs] def minimize(self, fun, x0, bounds, **kwargs):
minimizer_kwargs = {
"bounds": bounds,
}
# expected log likelihood variation in the order of degrees of freedom
args = {
"T": len(x0),
"niter": 10,
"minimizer_kwargs": minimizer_kwargs,
}
args.update(self.kwargs)
return optimize.basinhopping(fun, x0, **args)
[docs]class HypothesisTester:
"""Class for statistical tests of hypotheses."""
_default_maximizer = BasinHoppingMaximizer()
def __init__(self, likelihood_calculator, maximizer=_default_maximizer):
self.likelihood_calculator = likelihood_calculator
self.maximizer = maximizer
[docs] def likelihood_p_value(self, parameters, N=2500, **kwargs):
"""Calculate the likelihood p-value of a set of parameters.
The likelihood p-value is the probability of hypothetical alternative
data yielding a lower likelihood than the actual data, given that the
simple hypothesis described by the parameters is true.
Parameters
----------
parameters : array like
The evaluated hypotheis expressed as a vector of its parameters.
Can be a multidimensional array of vectors. The p-value for each
vector is calculated independently.
N : int, optional
The number of MC evaluations of the hypothesis.
**kwargs : optional
Additional keyword arguments will be passed to the likelihood
calculator.
Returns
-------
p : float or ndarray
The likelihood p-value.
Notes
-----
The p-value is estimated by creating ``N`` data samples according to
the given ``parameters``. The data is varied by both the statistical
and systematic uncertainties resulting from the prediction. The number
of data-sets that yield a likelihood as bad as, or worse than the
likelihood given the actual data, ``n``, are counted. The estimate for
``p`` is then::
p = n/N.
The variance of the estimator follows that of binomial statistics::
var(n) Np(1-p) 1
var(p) = ------ = ------- <= ---- .
N^2 N^2 4N
The expected uncertainty can thus be directly influenced by choosing an
appropriate number of toy data sets.
See also
--------
max_likelihood_p_value
max_likelihood_ratio_p_value
"""
parameters = np.array(parameters)
shape = parameters.shape[:-1]
parameters.shape = (np.prod(shape, dtype=int), parameters.shape[-1])
LC = self.likelihood_calculator # Calculator
p_values = []
for par in parameters:
L0 = LC(par, **kwargs) # Likelihood given data
toy_LC = LC.generate_toy_likelihood_calculators(par, N=N, **kwargs)
toy_L = list(
mapper(lambda C, par=par, kwargs=kwargs: C(par, **kwargs), toy_LC)
)
toy_L = np.array(toy_L)
p_values.append(np.sum(L0 >= toy_L, axis=-1) / N)
p_values = np.array(p_values, dtype=float)
p_values.shape = shape
return p_values
[docs] def max_likelihood_p_value(self, fix_parameters=None, N=250):
"""Calculate the maximum-likelihood p-value.
The maximum-likelihood p-value is the probability of the data yielding
a lower maximum likelihood (over the possible parameter space of the
likelihood calculator) than the actual data, given that the best-fit
parameter set of is true.
Parameters
----------
fix_parameters : array like, optional
Optionally fix some or all of the paramters of the
:class:`LikelihoodCalculator`.
N : int, optional
The number of MC evaluations of the hypothesis.
**kwargs : optional
Additional keyword arguments will be passed to the maximiser.
Returns
-------
p : float or ndarray
The maximum-likelihood p-value.
Notes
-----
When used to reject composite hypotheses, this p-value is somtime
called the "profile plug-in p-value", as one "plugs in" the maximum
likelihood estimate of the hypothesis parameters to calculate it. It's
coverage properties are not exact, so care has to be taken to make sure
it performs as expected (e.g. by testing it with simulated data)..
The p-value is estimated by randomly creating `N` data samples
according to the given `truth_vector`. The number of data-sets that
yield a likelihood ratio as bad as, or worse than the likelihood given
the actual data, `n`, are counted. The estimate for `p` is then::
p = n/N.
The variance of the estimator follows that of binomial statistics::
var(n) Np(1-p) 1
var(p) = ------ = ------- <= ---- .
N^2 N^2 4N
The expected uncertainty can thus be directly influenced by choosing an
appropriate number of evaluations.
See also
--------
likelihood_p_value
max_likelihood_ratio_p_value
LikelihoodCalculator.fix_parameters
"""
if fix_parameters is None:
LC = self.likelihood_calculator # Calculator
else:
LC = self.likelihood_calculator.fix_parameters(fix_parameters) # Calculator
maxer = self.maximizer # Maximiser
opt = maxer(LC)
opt_par = opt.x
L0 = opt.log_likelihood
toy_LC = LC.generate_toy_likelihood_calculators(opt_par, N=N)
toy_opt = list(mapper(lambda C, maxer=maxer: maxer(C), toy_LC))
toy_L = np.asfarray([topt.log_likelihood for topt in toy_opt])
p_value = np.sum(L0 >= toy_L, axis=-1) / N
return p_value
def _max_log_likelihood_ratio(
self, LC, fix_parameters, alternative_fix_parameters, return_parameters=False
):
# Calculator 0
LC0 = LC.fix_parameters(fix_parameters)
# Calculator 1
if alternative_fix_parameters is None:
LC1 = LC
else:
LC1 = LC.fix_parameters(alternative_fix_parameters)
maxer = self.maximizer # Maximiser
opt0 = maxer(LC0)
opt1 = maxer(LC1)
L0 = opt0.log_likelihood
L1 = opt1.log_likelihood
if return_parameters:
# "Unfix" the parameters and generate toy calculators
full_parameters = LC0.predictor.insert_fixed_parameters(opt0.x)
return L0 - L1, full_parameters
else:
return L0 - L1
[docs] def max_likelihood_ratio_p_value(
self, fix_parameters, alternative_fix_parameters=None, N=250, **kwargs
):
"""Calculate the maximum-likelihood-ratio p-value.
The maximum-likelihood-ratio p-value is the probability of the data
yielding a lower ratio of maximum likelihoods (over the possible
parameter spaces of the composite hypotheses) than the actual data,
given that the best-fit parameter set of the tested hypothesis `H0` is
true.
Parameters
----------
fix_parameters : array like
Fix some or all of the paramters of the
:class:`LikelihoodCalculator`. This defines the tested hypothesis
`H0`.
alternative_fix_parameters : array like, optional
Optionally fix some of the paramters of the
:class:`LikelihoodCalculator` to define the alternative Hypothesis
`H1`. If this is not specified, `H1` is the fully unconstrained
calculator.
N : int, optional
The number of MC evaluations of the hypothesis.
**kwargs : optional
Additional keyword arguments will be passed to the maximiser.
Returns
-------
p : float or ndarray
The maximum-likelihood-ratio p-value.
Notes
-----
When used to reject composite hypotheses, this p-value is sometimes
called the "profile plug-in p-value", as one "plugs in" the maximum
likelihood estimate of the hypothesis parameters to calculate it. It's
coverage properties are not exact, so care has to be taken to make sure
it performs as expected (e.g. by testing it with simulated data).
The p-value is estimated by randomly creating `N` data samples
according to the given `truth_vector`. The number of data-sets that
yield a likelihood ratio as bad as, or worse than the likelihood given
the actual data, `n`, are counted. The estimate for `p` is then::
p = n/N.
The variance of the estimator follows that of binomial statistics::
var(n) Np(1-p) 1
var(p) = ------ = ------- <= ---- .
N^2 N^2 4N
The expected uncertainty can thus be directly influenced by choosing an
appropriate number of evaluations.
See also
--------
wilks_max_likelihood_ratio_p_value
likelihood_p_value
max_likelihood_p_value
"""
LC = self.likelihood_calculator
ratio0, parameters = self._max_log_likelihood_ratio(
LC, fix_parameters, alternative_fix_parameters, return_parameters=True
)
# Generate toy data
toy_LC = LC.generate_toy_likelihood_calculators(parameters, N=N)
# Calculate ratios for toys
def fun(
LC,
fix_parameters=fix_parameters,
alternative_fix_parameters=alternative_fix_parameters,
):
return self._max_log_likelihood_ratio(
LC, fix_parameters, alternative_fix_parameters
)
toy_ratios = np.array(list(mapper(fun, toy_LC)))
# Callculate p-value
p_value = np.sum(ratio0 >= toy_ratios, axis=-1) / N
return p_value
[docs] def wilks_max_likelihood_ratio_p_value(
self, fix_parameters, alternative_fix_parameters=None, **kwargs
):
"""Calculate the maximum-likelihood-ratio p-value using Wilk's theorem.
The maximum-likelihood-ratio p-value is the probability of the data
yielding a lower ratio of maximum likelihoods (over the possible
parameter spaces of the composite hypotheses) than the actual data,
given that the best-fit parameter set of the tested hypothesis `H0` is
true. This method assumes that Wilk's theorem holds.
Parameters
----------
fix_parameters : array like
Fix some or all of the paramters of the
:class:`LikelihoodCalculator`. This defines the tested hypothesis
`H0`.
alternative_fix_parameters : array like, optional
Optionally fix some of the paramters of the
:class:`LikelihoodCalculator` to define the alternative Hypothesis
`H1`. If this is not specified, `H1` is the fully unconstrained
calculator.
**kwargs : optional
Additional keyword arguments will be passed to the maximiser.
Returns
-------
p : float
The maximum-likelihood-ratio p-value.
Notes
-----
This method assumes that Wilks' theorem holds, i.e. that the logarithm
of the maximum likelihood ratio of the two hypothesis is distributed
like a chi-squared distribution::
ndof = number_of_parameters_of_H1 - number_of_parameters_of_H0
p_value = scipy.stats.chi2.sf(-2*log_likelihood_ratio, df=ndof)
See also
--------
max_likelihood_ratio_p_value
likelihood_p_value
max_likelihood_p_value
"""
LC = self.likelihood_calculator
ratio0 = self._max_log_likelihood_ratio(
LC, fix_parameters, alternative_fix_parameters
)
# Likelihood ratio should be distributed like a chi2 distribution
if alternative_fix_parameters is None:
# All parameters - unfixed parameters in H0
ndof = len(fix_parameters) - np.sum(
np.isnan(np.array(fix_parameters, dtype=float))
)
else:
# Unfixed parameters in H1 - unfixed parameters in H0
ndof = np.sum(
np.isnan(np.array(alternative_fix_parameters, dtype=float))
) - np.sum(np.isnan(np.array(fix_parameters, dtype=float)))
return stats.chi2.sf(-2.0 * ratio0, df=ndof)