Source code for remu.migration

"""Module handling the creation and use of migration matrices."""

from copy import deepcopy
from warnings import warn

import numpy as np

from .binning import Binning, CartesianProductBinning


[docs]class ResponseMatrix: """Matrix that describes the detector response to true events. Parameters ---------- reco_binning : RectangularBinning The Binning object describing the reco categorization. truth_binning : RectangularBinning The Binning object describing the truth categorization. nuisance_indices : list of ints, optional List of indices of nuisance truth bins. These are treated like their efficiency is exactly 1. impossible_indices :list of ints, optional List of indices of impossible reco bins. These are treated like their probability is exactly 0. response_binning : CartesianProductBinning, optional The Binning object describing the reco and truth categorization. Usually this will be generated from the truth and reco binning. Notes ----- The truth and reco binnings will be combined with their `cartesian_product` method. The truth bins corresonding to the `nuisance_indices` will be treated like they have a total efficiency of 1. The reco bins corresonding to the `impossible_indices` will be treated like they are filled with a probability of 0. Two response matrices can be combined by adding them ``new_resp = respA + respB``. This yields a new matrix that is equivalent to one that has been filled with the data in both ``respA`` and ``respB``. The truth and reco binnings in ``respA`` and ``respB`` must be identical for this to make sense. Attributes ---------- truth_binning : Binning The :class:`.Binning` object for the truth information of the events. reco_binning : Binning The :class:`.Binning` object for the reco information of the events. response_binning : CartesianProductBinning The :class:`.CartesianProductBinning` of reco and truth binning. nuisance_indices : list of int The truth data indices that will be handled as nuisance bins. impossible_indices : list of int The reco data indices that will be treated as impossible to occur. filled_truth_indices : list of int The data indices of truth bins that have at least one event in them. """ def __init__( self, reco_binning, truth_binning, nuisance_indices=None, impossible_indices=None, response_binning=None, ): if nuisance_indices is None: nuisance_indices = [] if impossible_indices is None: impossible_indices = [] self.truth_binning = truth_binning self.reco_binning = reco_binning if response_binning is None: self.response_binning = CartesianProductBinning( [reco_binning.clone(dummy=True), truth_binning.clone(dummy=True)] ) else: self.response_binning = response_binning self.nuisance_indices = nuisance_indices self.impossible_indices = impossible_indices self._update_filled_indices() def _update_filled_indices(self): """Update the list of filled truth indices.""" self.filled_truth_indices = np.argwhere( self.get_truth_entries_as_ndarray() > 0 ).flatten()
[docs] def fill(self, *args, **kwargs): """Fill events into the binnings.""" self.truth_binning.fill(*args, **kwargs) self.reco_binning.fill(*args, **kwargs) self.response_binning.fill(*args, **kwargs) self._update_filled_indices()
def _fix_rounding_errors(self): """Fix rounding errors that cause impossible matrices.""" resp = self.get_response_values_as_ndarray() truth = self.get_truth_values_as_ndarray() resp = resp.reshape((resp.size // truth.size, truth.size), order="C") resp = np.sum(resp, axis=0) diff = truth - resp if np.any(truth < 0): raise RuntimeError("Illegal response matrix: Negative true weight!") if np.any(resp < 0): raise RuntimeError( "Illegal response matrix: Negative total reconstructed weight!" ) if np.any(diff < -1e-9): # Allow rounding errors raise RuntimeError( "Illegal response matrix: Higher total reconstructed than true weight!" ) if np.any(diff < 0.0): # But make sure truth is >= reco fixed_truth = np.where(diff < 0, resp, truth) self.truth_binning.set_values_from_ndarray(fixed_truth)
[docs] def fill_from_csv_file(self, *args, **kwargs): """Fill binnings from csv file. See :meth:`Binning.fill_from_csv_file <remu.binning.Binning.fill_from_csv_file>` for a description of the parameters. See also -------- fill_up_truth_from_csv_file : Re-fill only truth bins from different file. """ Binning.fill_multiple_from_csv_file( [self.truth_binning, self.reco_binning, self.response_binning], *args, **kwargs, ) self._fix_rounding_errors() self._update_filled_indices()
[docs] def fill_up_truth_from_csv_file(self, *args, **kwargs): """Re-fill the truth bins with the given csv file. This can be used to get proper efficiencies if the true signal events are saved in a separate file from the reconstructed events. It takes the same parameters as :meth:`fill_from_csv_file`. Notes ----- A new truth binning is created and filled with the events from the provided file. Each bin is compared to the corresponding bin in the already present truth binning. The larger value of the two is taken as the new truth. This way, event types that are not present in the pure truth data, e.g. background, are not affected by this. It can only *increase* the value of the truth bins, lowering their efficiency. For each truth bin, one of the following *must* be true for this operation to make sense: * All events in the migration matrix are also present in the truth file. In this case, the additional truth events lower the efficiency of the truth bin. This is the case, for example, if not all true signal events are reconstructed. * All events in the truth file are also present in the migration matrix. In this case, the events in the truth file have no influence on the response matrix. This is the case, for example, if only a subset of the reconstructed background is saved in the truth file. If there are events in the response matrix that are not in the truth tree *and* there are events in the truth tree that are not in the response matrix, this method will lead to a *wrong* efficiency of the affected truth bin. """ new_truth_binning = deepcopy(self.truth_binning) new_truth_binning.reset() new_truth_binning.fill_from_csv_file(*args, **kwargs) return self._replace_smaller_truth(new_truth_binning)
[docs] def fill_up_truth(self, *args, **kwargs): """Re-fill the truth bins with the given events file. This can be used to get proper efficiencies if the true signal events are stored separate from the reconstructed events. It takes the same parameters as :meth:`fill`. Notes ----- A new truth binning is created and filled with the events from the provided events. Each bin is compared to the corresponding bin in the already present truth binning. The larger value of the two is taken as the new truth. This way, event types that are not present in the pure truth data, e.g. background, are not affected by this. It can only *increase* the value of the truth bins, lowering their efficiency. For each truth bin, one of the following *must* be true for this operation to make sense: * All events in the migration matrix are also present in the new truth events. In this case, the additional truth events lower the efficiency of the truth bin. This is the case, for example, if not all true signal events are reconstructed. * All events in the new truth events are also present in the migration matrix. In this case, the events in the new truth events have no influence on the response matrix. This is the case, for example, if only a subset of the reconstructed background is saved in the truth file. If there are events in the response matrix that are not in the new truth events *and* there are events in the new truth events that are not in the response matrix, this method will lead to a *wrong* efficiency of the affected truth bin. """ new_truth_binning = deepcopy(self.truth_binning) new_truth_binning.reset() new_truth_binning.fill(*args, **kwargs) return self._replace_smaller_truth(new_truth_binning)
def _replace_smaller_truth(self, new_truth_binning): new_values = new_truth_binning.get_values_as_ndarray() new_entries = new_truth_binning.get_entries_as_ndarray() new_sumw2 = new_truth_binning.get_sumw2_as_ndarray() old_values = self.truth_binning.get_values_as_ndarray() old_entries = self.truth_binning.get_entries_as_ndarray() old_sumw2 = self.truth_binning.get_sumw2_as_ndarray() if np.any(new_values < 0): i = np.argwhere(new_values < 0) raise RuntimeError( "Filled-up values are negative in %d bins." % (i.size,), stacklevel=3 ) where = new_values > 0 diff_v = new_values - old_values diff_e = new_entries - old_entries # Check for bins where the fill-up is less than the original if np.any(where & (diff_v < -1e-9)): i = np.argwhere(where & (diff_v < -1e-9)) warn( "Filled-up values are less than the original filling in %d bins. This should not happen!" % (i.size,), stacklevel=3, ) if np.any(where & (diff_e < 0)): i = np.argwhere(where & (diff_e < 0)) warn( "Filled-up entries are less than the original filling in %d bins. This should not happen!" % (i.size,), stacklevel=3, ) where = where & (diff_v >= 0) & (diff_e >= 0) self.truth_binning.set_values_from_ndarray( np.where(where, new_values, old_values) ) self.truth_binning.set_entries_from_ndarray( np.where(where, new_entries, old_entries) ) self.truth_binning.set_sumw2_from_ndarray(np.where(where, new_sumw2, old_sumw2)) self._fix_rounding_errors() self._update_filled_indices()
[docs] def reset(self): """Reset all binnings.""" self.truth_binning.reset() self.reco_binning.reset() self.response_binning.reset() self._update_filled_indices()
[docs] def set_truth_values_from_ndarray(self, *args, **kwargs): """Set the values of the truth binning as `ndarray`.""" self.truth_binning.set_values_from_ndarray(*args, **kwargs)
[docs] def set_truth_entries_from_ndarray(self, *args, **kwargs): """Set the number of entries in the truth binning as `ndarray`.""" self.truth_binning.set_entries_from_ndarray(*args, **kwargs) self._update_filled_indices()
[docs] def set_truth_sumw2_from_ndarray(self, *args, **kwargs): """Set the sum of squared weights in the truth binning as `ndarray`.""" self.truth_binning.set_sumw2_from_ndarray(*args, **kwargs)
[docs] def set_reco_values_from_ndarray(self, *args, **kwargs): """Set the values of the reco binning as `ndarray`.""" self.reco_binning.set_values_from_ndarray(*args, **kwargs)
[docs] def set_reco_entries_from_ndarray(self, *args, **kwargs): """Set the number of entries in the reco binning as `ndarray`.""" self.reco_binning.set_entries_from_ndarray(*args, **kwargs)
[docs] def set_reco_sumw2_from_ndarray(self, *args, **kwargs): """Set the sum of squared weights in the reco binning as `ndarray`.""" self.reco_binning.set_sumw2_from_ndarray(*args, **kwargs)
[docs] def set_response_values_from_ndarray(self, *args, **kwargs): """Set the values of the response binning as `ndarray`.""" self.response_binning.set_values_from_ndarray(*args, **kwargs)
[docs] def set_response_entries_from_ndarray(self, *args, **kwargs): """Set the number of entries in the response binning as `ndarray`.""" self.response_binning.set_entries_from_ndarray(*args, **kwargs)
[docs] def set_response_sumw2_from_ndarray(self, *args, **kwargs): """Set the sum of squared weights in the response binning as `ndarray`.""" self.response_binning.set_sumw2_from_ndarray(*args, **kwargs)
[docs] def get_truth_values_as_ndarray(self, *args, **kwargs): """Get the values of the truth binning as `ndarray`.""" return self.truth_binning.get_values_as_ndarray(*args, **kwargs)
[docs] def get_truth_entries_as_ndarray(self, *args, **kwargs): """Get the number of entries in the truth binning as `ndarray`.""" return self.truth_binning.get_entries_as_ndarray(*args, **kwargs)
[docs] def get_truth_sumw2_as_ndarray(self, *args, **kwargs): """Get the sum of squared weights in the truth binning as `ndarray`.""" return self.truth_binning.get_sumw2_as_ndarray(*args, **kwargs)
[docs] def get_reco_values_as_ndarray(self, *args, **kwargs): """Get the values of the reco binning as `ndarray`.""" return self.reco_binning.get_values_as_ndarray(*args, **kwargs)
[docs] def get_reco_entries_as_ndarray(self, *args, **kwargs): """Get the number of entries in the reco binning as `ndarray`.""" return self.reco_binning.get_entries_as_ndarray(*args, **kwargs)
[docs] def get_reco_sumw2_as_ndarray(self, *args, **kwargs): """Get the sum of squared weights in the reco binning as `ndarray`.""" return self.reco_binning.get_sumw2_as_ndarray(*args, **kwargs)
[docs] def get_response_values_as_ndarray(self, *args, **kwargs): """Get the values of the response binning as `ndarray`.""" return self.response_binning.get_values_as_ndarray(*args, **kwargs)
[docs] def get_response_entries_as_ndarray(self, *args, **kwargs): """Get the number of entries in the response binning as `ndarray`.""" return self.response_binning.get_entries_as_ndarray(*args, **kwargs)
[docs] def get_response_sumw2_as_ndarray(self, *args, **kwargs): """Get the sum of squared weights in the response binning as `ndarray`.""" return self.response_binning.get_sumw2_as_ndarray(*args, **kwargs)
@staticmethod def _normalize_matrix(M): """Make sure all efficiencies are less than or equal to 1.""" eff = np.sum(M, axis=-2) eff = np.where(eff < 1.0, 1.0, eff)[..., np.newaxis, :] return M / eff
[docs] def get_response_matrix_as_ndarray(self, shape=None, truth_indices=None): """Return the ResponseMatrix as a ndarray. Uses the information in the truth and response binnings to calculate the response matrix. Parameters ---------- shape : tuple of ints, optional The shape of the returned ndarray. Default: ``(#(reco bins), #(truth bins))`` truth_indices : list of ints, optional Only return the response of the given truth bins. Default: Return full matrix. Returns ------- ndarray Notes ----- If shape is `None`, it s set to ``(#(reco bins), #(truth bins))``. The expected response of a truth vector can then be calculated like this:: v_reco = response_matrix.dot(v_truth) If `truth_indices` are provided, a sliced matrix with only the given columns will be returned. See also -------- get_mean_response_matrix_as_ndarray """ if truth_indices is None: truth_indices = slice(None, None, None) original_shape = (self.reco_binning.data_size, self.truth_binning.data_size) # Get the bin response entries M = self.get_response_values_as_ndarray(original_shape)[:, truth_indices] # Normalize to number of simulated events N_t = self.get_truth_values_as_ndarray(indices=truth_indices) M /= np.where(N_t > 0.0, N_t, 1.0) # Deal with bins where N_reco > N_truth M = ResponseMatrix._normalize_matrix(M) if shape is not None: M = M.reshape(shape, order="C") return M
def _get_stat_error_parameters( self, expected_weight=1.0, nuisance_indices=None, impossible_indices=None, truth_indices=None, ): r"""Return $\beta^t_1j$, $\beta^t_2j$, $\alpha^t_{ij}$, $\hat{w}^t_{ij}$ and $\sigma(w^t_{ij})$. Used for calculations of statistical variance. If `truth_indices` are provided, a sliced matrix with only the given columns will be returned. """ if nuisance_indices is None: nuisance_indices = self.nuisance_indices if impossible_indices is None: impossible_indices = self.impossible_indices if truth_indices is None: truth_indices = slice(None, None, None) else: # Translate nuisance indices to sliced indices i = np.searchsorted(truth_indices, nuisance_indices) mask = i < len(truth_indices) i = i[mask] nuisance_indices = np.asarray(nuisance_indices)[mask] mask = nuisance_indices == np.asarray(truth_indices)[i] nuisance_indices = np.array(i[mask]) del mask del i N_reco = self.reco_binning.data_size N_truth = self.truth_binning.data_size orig_shape = (N_reco, N_truth) epsilon = 1e-50 resp_entries = self.get_response_entries_as_ndarray(orig_shape)[ :, truth_indices ] truth_entries = self.get_truth_entries_as_ndarray(indices=truth_indices) # Get parameters of Beta distribution characterizing the efficiency. # Assume a prior of Beta(1,1), i.e. flat in efficiency. beta1 = np.sum(resp_entries, axis=0) # "Waste bin" of not selected events waste_entries = truth_entries - beta1 if np.any(waste_entries < 0): raise RuntimeError( "Illegal response matrix: More reconstructed than true events!" ) beta1 = np.asarray(beta1 + 1, dtype=float) beta2 = np.asarray(waste_entries + 1, dtype=float) # Set efficiency of nuisance bins to 1, i.e. beta2 to (almost) zero. beta2[nuisance_indices] = epsilon # Get parameters of Dirichlet distribution characterizing the distribution within the reco bins. # Assume a prior where we expect most of the events to be clustered in a few reco bins. # Most events should end up divided into about 3 bins per reco variable: # the correct one and the two neighbouring ones. # Since the binning is orthogonal, we expect the number of bins to be roughly 3**N_variables. # This leads to prior parameters >1 for degenerate reco binnings with < 3 bins/variable. # We protect against that by setting the maximum prior value to 1. n_vars = len(self.reco_binning.phasespace) prior = min(1.0, 3.0**n_vars / (N_reco - len(impossible_indices))) alpha = np.asarray(resp_entries, dtype=float) + prior # Set efficiency of impossible bins to (almost) 0 alpha[impossible_indices] = epsilon # Estimate mean weight resp1 = self.get_response_values_as_ndarray(orig_shape)[:, truth_indices] resp2 = self.get_response_sumw2_as_ndarray(orig_shape)[:, truth_indices] truth1 = self.get_truth_values_as_ndarray(indices=truth_indices) truth2 = self.get_truth_sumw2_as_ndarray(indices=truth_indices) # Add truth bin of all events resp1 = np.append(resp1, truth1[np.newaxis, :], axis=0) resp2 = np.append(resp2, truth2[np.newaxis, :], axis=0) resp_entries = np.append(resp_entries, truth_entries[np.newaxis, :], axis=0) i = (resp_entries > 0) & (resp1 > 0) mu = resp1 / np.where(i, resp_entries, 1) mu[-1] = np.where( i[-1], mu[-1], expected_weight ) # Set empty truth bins to expected weight mu[:-1, :] = np.where( i[:-1], mu[:-1, :], mu[-1, :] ) # Set empty reco bins to average truth weight # Add pseudo observation for variance estimation resp1_p = resp1 + expected_weight resp2_p = resp2 + expected_weight**2 resp_entries_p = resp_entries + 1 # resp_entries_p2 = resp_entries_p**2 # Since `w_ij` is the mean weight, the variance is just the error of the mean. # # |---- sum of weights # v |---- sample variance # w_ij = W_ij / N_ij <---- number of entries v # var(w_ij) = var(W_ij) / (N_ij)**2 = var(W) / N_ij # = ( (V_ij / N_ij) - (W_ij / N_ij)**2 ) / N_ij # ^ # |----- sum of squared weights # var = ( (resp2_p / resp_entries_p) - (resp1_p / resp_entries_p) ** 2 ) / resp_entries_p sigma = np.sqrt(var) # Add an epsilon so sigma is always > 0 sigma += epsilon return beta1, beta2, alpha, mu, sigma
[docs] def get_mean_response_matrix_as_ndarray(self, shape=None, **kwargs): """Get the means of the posterior distributions of the response matrix elements. This is different from the "raw" matrix one gets from :meth:`get_response_matrix_as_ndarray`. The latter simply divides the sum of weights in the respective bins. Parameters ---------- shape : tuple of ints, optional The shape of the returned matrices. Defaults to ``(#(reco bins), #(truth bins))``. expected_weight : float, optional The expected average weight of the events. This is used int the calculation of the weight variance. Default: 1.0 nuisance_indices : list of ints, optional List of truth bin indices. These bins will be treated like their efficiency is exactly 1. Default: Use the `nuisance_indices` attribute of the ResponseMatrix. impossible_indices : list of ints, optional List of reco bin indices. These bins will be treated like their probability is exactly 0. Default: Use the `impossible_indices` attribute of the ResponseMatrix. truth_indices : list of ints, optional List of truth bin indices. Only return the response of the given truth bins. Default: Return full matrices. Returns ------- ndarray See also -------- get_response_matrix_as_ndarray get_statistical_variance_as_ndarray generate_random_response_matrices """ beta1, beta2, alpha, mu, sigma = self._get_stat_error_parameters(**kwargs) # Unweighted binomial reconstructed probability (efficiency) # Posterior mean estimate = beta1 / (beta1 + beta2) beta0 = beta1 + beta2 effj = beta1 / beta0 # Unweighted (multinomial) smearing probabilty # Posterior mean estimate = alpha / alpha0 alpha0 = np.sum(alpha, axis=0) pij = np.asarray(alpha, dtype=float) / alpha0 # Weight correction wij = mu[:-1] wj = mu[-1] mij = wij / wj # Combine the three MM = mij * pij * effj # Re-normalise after weight corrections MM = ResponseMatrix._normalize_matrix(MM) if shape is not None: MM = MM.reshape(shape, order="C") return MM
[docs] def get_statistical_variance_as_ndarray(self, shape=None, **kwargs): """Get the statistical variance of the single ResponseMatrix elements as ndarray. The variance is estimated from the actual bin contents in a Bayesian motivated way. Parameters ---------- shape : tuple of ints, optional The shape of the returned matrix. Defaults to ``(#(reco bins), #(truth bins))``. kwargs : optional See :meth:`get_mean_response_matrix_as_ndarray` for a description of more optional `kwargs`. Returns ------- ndarray Notes ----- The response matrix creation is modeled as a three step process: 1. Reconstruction efficiency according to a binomial process. 2. Distribution of truth events among the reco bins according to a multinomial distribution. 3. Correction of the categorical probabilities according to the mean weights of the events in each bin. So the response matrix element can be written like this:: R_ij = m_ij * p_ij * eff_j where ``eff_j`` is the total efficiency of events in truth bin ``j``, ``p_ij`` is the unweighted multinomial reconstruction probability in reco bin ``i`` and ``m_ij`` the weight correction. The variance of ``R_ij`` is estimated by estimating the variances of these values separately. The variance of ``eff_j`` is estimated by using the Bayesian conjugate prior for biinomial distributions: the Beta distribution. We assume a prior that is uniform in the reconstruction efficiency. We then update it with the simulated events. The variance of the posterior distribution is taken as the variance of the efficiency. The variance of ``p_ij`` is estimated by using the Bayesian conjugate prior for multinomial distributions: the Dirichlet distribution. We assume a prior that is uniform in the ignorant about reconstruction probabilities. We then update it with the simulated events. The variance of the posterior distribution is taken as the variance of the transition probability. If a list of `nuisance_indices` is provided, the probabilities of *not* reconstructing events in the respective truth categories will be fixed to 0. This is useful for background categories where one is not interested in the true number of events. If a list of `impossible_indices` is provided, the probabilities of reconstructing events in the respective reco categories will be fixed to 0. This is useful for bins that are impossible to have any events in them by their definition. The variances of m_ij is estimated from the errors of the average weights in the matrix elements as classical "standard error of the mean". To avoid problems with bins with 0 or 1 entries, we add a "prior expectation" point to the data. This ensures that all bins have at least 1 entry (no divisions by zero) and that variances can be estimated even for bins with only one (true) entry (from the difference to the expected value). This is just an estimate! The true variance of the randomly generated response matrices can deviate from the returned numbers. Also, these variances ignore the correlations between matrix elements. If no shape is specified, it will be set to `(N_reco, N_truth)`. If `truth_indices` are provided, a sliced matrix with only the given columns will be returned. See also -------- get_mean_response_matrix_as_ndarray generate_random_response_matrices get_in_bin_variation_as_ndarray """ beta1, beta2, alpha, mu, sigma = self._get_stat_error_parameters(**kwargs) # Unweighted binomial reconstruction probability (efficiency) # Posterior mean estimate = beta1 / (beta1 + beta2) beta0 = beta1 + beta2 effj = beta1 / beta0 # Posterior variance effj_var = beta1 * beta2 / (beta0**2 * (beta0 + 1)) # Unweighted (multinomial) smearing probabilty # Posterior mean estimate = alpha / alpha0 alpha0 = np.sum(alpha, axis=0) pij = np.asarray(alpha, dtype=float) / alpha0 # Posterior variance pij_var = np.asarray(alpha0 - alpha, dtype=float) pij_var *= alpha pij_var /= alpha0**2 * (alpha0 + 1) # Weight correction wij = mu[:-1] wj = mu[-1] mij = wij / wj # Standard error propagation # # var(m_ij) = var(w_ij) / w_j**2 + (w_ij/w_j**2)**2 * var(w_j) wj2 = wj**2 var = sigma**2 mij_var = var[:-1] / wj2 + (wij / wj2) ** 2 * var[-1] # Combine uncertainties effj2 = effj**2 pij2 = pij**2 mij2 = mij**2 MM = mij2 * pij2 * effj_var + mij2 * pij_var * effj2 + mij_var * pij2 * effj2 if shape is not None: MM = MM.reshape(shape, order="C") return MM
[docs] def get_in_bin_variation_as_ndarray( self, shape=None, truth_indices=None, normalize=True, **kwargs ): """Get an estimate for the variation of the response within a bin. The in-bin variation is estimated from the maximum difference to the surrounding truth bins. The differences can be normalized to the estimated statistical errors, so values close to one indicate a statistically dominated variation. Parameters ---------- shape : tuple of ints, optional The shape of the returned ndarray. Default: ``(#(reco bins), #(truth bins))`` truth_indices : list of ints, optional Return a sliced matrix with only the given columns. normalize : bool, optional Divide the variation by the statistical variance **kwargs : optional Additional keyword arguments are passed to :meth:`get_mean_response_matrix_as_ndarray` and :meth:`get_statistical_variance_as_ndarray`. Returns ------- ndarray See also -------- get_statistical_variance_as_ndarray """ response = self.get_mean_response_matrix_as_ndarray(**kwargs) if normalize: variance = self.get_statistical_variance_as_ndarray(**kwargs) adjacent = self.truth_binning.get_adjacent_data_indices() variation = np.zeros_like(response) for reco_index in range(response.shape[0]): for truth_index in range(response.shape[1]): diff = response[reco_index, truth_index] diff = diff - response[reco_index, adjacent[truth_index]] diff = diff**2 if normalize: var = variance[reco_index, truth_index] var = var + variance[reco_index, adjacent[truth_index]] diff = diff / var variation[reco_index, truth_index] = np.sqrt(np.max(diff)) if truth_indices is not None: variation = variation[:, truth_indices] if shape is not None: variation.shape = shape return variation
@staticmethod def _dirichlet(alpha, size=None): """Reimplements np.random.dirichlet. The original implementation is not suitable for very low alphas. """ params = np.asarray(alpha, dtype=float) if size is None: total_size = len(alpha) else: try: total_size = tuple(list(size) + [len(alpha)]) except TypeError: total_size = tuple(list([size]) + [len(alpha)]) if len(params) == 1: # Special case for response matrices with only one reco bin return np.ones(total_size) xs = np.zeros(total_size) xs[..., 0] = np.random.beta(params[0], np.sum(params[1:]), size=size) for j in range(1, len(params) - 1): phi = np.random.beta(params[j], sum(params[j + 1 :]), size=size) xs[..., j] = (1 - np.sum(xs, axis=-1)) * phi xs[..., -1] = 1 - np.sum(xs, axis=-1) # Fix rounding errors xs[xs < 0] = 0 return xs
[docs] def generate_random_response_matrices(self, size=None, shape=None, **kwargs): """Generate random response matrices according to the estimated variance. Parameters ---------- size : int or tuple of ints, optional How many random matrices should be generated. shape : tuple of ints, optional The shape of the returned matrices. Defaults to ``(#(reco bins), #(truth bins))``. kwargs : optional See :meth:`get_mean_response_matrix_as_ndarray` for a description of more optional `kwargs`. Returns ------- ndarray Notes ----- This is a three step process: 1. Draw the binomal efficiencies from Beta distributions 2. Draw the multinomial reconstruction probabilities from a Dirichlet distribution. 3. Draw weight corrections from normal distributions. If no shape is specified, it will be set to ``(#(reco bins, #(truth bins))``. If `truth_indices` are provided, a sliced matrix with only the given columns will be returned. See also -------- get_mean_response_matrix_as_ndarray get_statistical_variance_as_ndarray """ beta1, beta2, alpha, mu, sigma = self._get_stat_error_parameters(**kwargs) # Generate efficiencies if size is None: eff_size = beta1.shape else: try: eff_size = tuple(size) except TypeError: eff_size = (size,) eff_size = eff_size + beta1.shape effj = np.random.beta(beta1, beta2, eff_size) # Transpose so we have an array of dirichlet parameters alpha = alpha.T # Generate truth bin by truth bin pij = [] for j in range(alpha.shape[0]): pij.append(self._dirichlet(alpha[j], size=size)) pij = np.array(pij) # Reorganise axes pij = np.moveaxis(pij, 0, -1) # Append original shape to requested size of data sets if size is not None: try: size = list(size) except TypeError: size = [size] size.extend(mu.shape) # Generate random weights wij = np.abs(np.random.normal(mu, sigma, size=size)) wj = wij[..., -1, :] wij = wij[..., :-1, :] mij = wij / wj[..., np.newaxis, :] response = mij * pij * effj[..., np.newaxis, :] # Re-normalise after weight corrections response = ResponseMatrix._normalize_matrix(response) # Adjust shape if shape is None: truth_indices = kwargs.pop("truth_indices", None) if truth_indices is None: shape = (self.reco_binning.data_size, self.truth_binning.data_size) else: shape = (self.reco_binning.data_size, len(truth_indices)) response = response.reshape(list(response.shape[:-2]) + list(shape), order="C") return response
def __add__(self, other): """Add two matrices together, combining their data.""" ret = deepcopy(self) ret.truth_binning = self.truth_binning + other.truth_binning ret.reco_binning = self.reco_binning + other.reco_binning ret.response_binning = self.response_binning + other.response_binning ret._fix_rounding_errors() ret._update_filled_indices() return ret
[docs] def export(self, filename, compress=False, nstat=None, sparse=True): """Save all necessary information for using the response matrix. Saves all necessary information for using the response matrix` in a NumPy ``.npz`` archive. Parameters ---------- filename : str or file Where to store the arrays. compress : bool, optional Whether to use compression. nstat : int, optional How many random variations of the matrix to generate. Default: Export mean matrix, no random variation sparse : bool, optional Should a sparse version be exported, or the full matrix. See also -------- ResponseMatrixArrayBuilder.export """ if nstat is None: matrices = self.get_mean_response_matrix_as_ndarray()[np.newaxis, ...] else: matrices = self.generate_random_response_matrices(size=nstat) truth_entries = self.get_truth_entries_as_ndarray() if sparse: sparse_indices = np.flatnonzero(truth_entries) matrices = matrices[..., sparse_indices] data = { "matrices": matrices, "truth_entries": truth_entries, "sparse_indices": sparse_indices, "is_sparse": True, } else: data = { "matrices": matrices, "truth_entries": truth_entries, } if compress: np.savez_compressed(filename, **data) else: np.savez(filename, **data)
[docs] def clone(self): """Create a functioning copy of the response matrix.""" reco_binning = self.reco_binning.clone() response_binning = self.response_binning.clone() truth_binning = self.truth_binning.clone() nuisance_indices = deepcopy(self.nuisance_indices) impossible_indices = deepcopy(self.impossible_indices) return ResponseMatrix( reco_binning, truth_binning, nuisance_indices=nuisance_indices, impossible_indices=impossible_indices, response_binning=response_binning, )
[docs]class ResponseMatrixArrayBuilder: """Class that generates consistent ndarrays from multiple response matrix objects. Parameters ---------- nstat : int The number of random matrices to be generated for each ResponseMatrix. If `nstat` is 0, no random matrices are generated. Instead the output of :meth:`ResponseMatrix.get_response_matrix_as_ndarray` is used. Notes ----- This class is used to generate the random response matrices from multiple toy simulations covering the detector uncertainties. Each toy simulation yields one :class:`ResponseMatrix` object. These are then combined with the :class:`ResponseMatrixArrayBuilder`. The :class:`ResponseMatrixArrayBuilder` is designed to use as little memory as possible. It stores only the necessary information of the :class:`ResponseMatrix`, *not* the :class:`ResponseMatrix` objects themselves. It only stores the truth bins that were actually filled. The matrices must have been built using the same truth information! The filled bins may only differ in the nuisance bins. When creating the final :class:`ndarray`, missing nuisance columns that were filled in some but not all matrices are filled with default values (0). The relative efficiencies of the nuisance bins are guaranteed to be consistent between the different response matrices. The absolute value of the efficiencies is not conserved. In fact, the efficiency for nuisance bin ``j`` in :class:`ResponseMatrix` ``t`` will be:: eff_tj = N_tj / max_t( N_tj) where ``N_tj`` is the value of nuisance truth bin ``j`` in :class:`ResponseMatrix` ``t``. This means the efficiency of the nuisance bins can decrease with the number of added matrices. This has to be taken into account when creating truth templates for the nuisance bins. They should consist of the *max* of the selected nuisance events over the different toy simulations. This way, the template multiplied with the ndarray will re-create the number of selected nuisance events in each bin for all toy response matrices. Attributes ---------- nstat : int The number of statistical throws to generate for each added matrix. nmatrices : int The number of matrices that were added. """ def __init__(self, nstat): """ """ self.nstat = nstat self.reset()
[docs] def reset(self): """Reset everything to 0.""" self.nmatrices = 0 self._matrices = [] self._weights = [] self._mean_matrices = [] self._truth_values = [] self._truth_entries = None self._filled_indices = [] self._nuisance_indices = None
[docs] def add_matrix(self, response_matrix, weight=1.0): """Add a matrix to the collection. Parameters ---------- response_matrix : :class:`ResponseMatrix` weight : float, optional Notes ----- This immediately triggers the generation of `nstat` random variations of `response_matrix`. See also -------- ResponseMatrix.generate_random_response_matrices """ # Check that the nuisance indices are identical nuisance_indices = response_matrix.nuisance_indices if self._nuisance_indices is None: self._nuisance_indices = nuisance_indices elif set(self._nuisance_indices) != set(nuisance_indices): raise RuntimeError("Matrices have different nuisance indices!") filled_indices = response_matrix.filled_truth_indices if self.nstat > 0: matrix = response_matrix.generate_random_response_matrices( self.nstat, truth_indices=filled_indices ) else: matrix = response_matrix.get_response_matrix_as_ndarray( truth_indices=filled_indices )[np.newaxis, ...] mean_matrix = response_matrix.get_mean_response_matrix_as_ndarray( truth_indices=filled_indices ) truth_values = response_matrix.get_truth_values_as_ndarray( indices=filled_indices ) truth_entries = ( response_matrix.get_truth_entries_as_ndarray() ) # We need *all* entries self._filled_indices.append(filled_indices) self._matrices.append(matrix) self._weights.append(weight) self._mean_matrices.append(mean_matrix) self._truth_values.append(truth_values) if self._truth_entries is None: self._truth_entries = truth_entries else: self._truth_entries = np.maximum(self._truth_entries, truth_entries) self.nmatrices += 1
def _get_filled_truth_indices_set(self): """Return the set of filled truth indices.""" all_indices = set() for i in self._filled_indices: all_indices.update(i) return all_indices
[docs] def get_filled_truth_indices(self): """Return the list of filled truth indices. This list contains the indices of all truth bins that have been filled in at least one of the matrices that were added to the `ResponseMatrixArrayBuilder`. """ return sorted(self._get_filled_truth_indices_set())
[docs] def get_truth_entries_as_ndarray(self): """Return the array of maximum entries in the truth bins.""" return self._truth_entries
def _get_truth_value_scale(self, tv): """Get scale to make nuisance bins consistent. The nuisance bins must be scaled between the multiple matrices, because in each matrix their efficiency is 1 by definition. Ideally they all would be scaled to the true number of true events in each truth bin, but this information is not available for nuisance bins. Instead we use the sum of truth values over all matrices as denominator off the efficiency, e.g. the efficiency of nuisance truth bin j in matrix t: eff_tj = N_tj / max_t( N_tj) This means the efficiency of the nuisance bins can decrease with the number of added matrices. This has to be taken into account when creating truth templates for the nuisance bins. They should consist of the *max* of the selected nuisance events over the different toy simulations. This way, the template multiplied with the ndarray will re-create the number of selected nuisance events in each bin for all toy response matrices. """ all_indices = self._get_filled_truth_indices_set() nuisance_indices = set(self._nuisance_indices) filled_nuisance_indices = all_indices & nuisance_indices max_tv = np.max(tv, axis=0) max_tv = np.where(max_tv > 0, max_tv, 1.0) scale = np.ones_like(tv) # Start with scales = 1 for i in np.searchsorted(sorted(all_indices), sorted(filled_nuisance_indices)): scale[:, i] = tv[:, i] / max_tv[i] # Set scale of nuisance indices return scale
[docs] def get_random_response_matrices_as_ndarray(self): """Get the response matrices as consistent ndarray. Returns ------- M, weights : ndarray A big :class:`ndarray` containing `nstat` generated matrices for each :class:`ResponseMatrix` that has been added, and a vector of weights for corresponding the matrices. Notes ----- The returned matrices will only contain the columns, i.e. truth bins, that have been filled in at least one of the added :class:`ResponseMatrix` objects. The shape of the returned array will be:: (#(RepsonseMatrices) x nstat, #(reco bins), #(filled truth bins)) The indices of the returned (filled) truth bins can be requested with the :meth:`get_filled_truth_indices` method. See also -------- get_mean_response_matrices_as_ndarray """ M = [] tv = [] # Insert missing columns all_indices = self._get_filled_truth_indices_set() nuisance_indices = set(self._nuisance_indices) for indices, matrix, truth_values in zip( self._filled_indices, self._matrices, self._truth_values ): missing_indices = all_indices - set(indices) # Make sure only nuisance indices are missing if len(missing_indices - nuisance_indices) > 0: raise RuntimeError("Truth difference in non-nuisance index!") missing_indices = list(missing_indices) insert_positions = np.searchsorted(indices, missing_indices) extended_matrix = np.insert(matrix, insert_positions, 0.0, axis=-1) M.append(extended_matrix) extended_truth_values = np.insert( truth_values, insert_positions, 0.0, axis=-1 ) tv.append(extended_truth_values) M = np.array(M) tv = np.array(tv) # Scale (nuisance) truth bins so they are consistent scale = self._get_truth_value_scale(tv) M = M * scale[:, np.newaxis, np.newaxis, :] # Broadcast weights weights = np.broadcast_to(np.array(self._weights)[:, np.newaxis], M.shape[:2]) # Reshape to vector of matrices. M.shape = (max(self.nstat, 1) * self.nmatrices,) + M.shape[2:] weights = weights.flatten() return M, weights
[docs] def get_mean_response_matrices_as_ndarray(self): """Get the mean response matrices as ndarray. Returns ------- M, weights : ndarray A big :class:`ndarray` containing `nstat` generated matrices for each :class:`ResponseMatrix` that has been added, and a vector of weights for corresponding the matrices. Notes ----- The returned matrices will only contain the columns, i.e. truth bins, that have been filled in at least one of the added :class:`ResponseMatrix` objects. The shape of the returned array will be:: (#(RepsonseMatrices), #(reco bins), #(filled truth bins)) The indices of the returned (filled) truth bins can be requested with the :meth:`get_filled_truth_indices` method. See also -------- get_random_response_matrices_as_ndarray """ M = [] tv = [] # Insert missing columns all_indices = self._get_filled_truth_indices_set() nuisance_indices = set(self._nuisance_indices) for indices, matrix, truth_values in zip( self._filled_indices, self._mean_matrices, self._truth_values ): missing_indices = all_indices - set(indices) # Make sure only nuisance indices are missing if len(missing_indices - nuisance_indices) > 0: raise RuntimeError("Truth difference in non-nuisance index!") missing_indices = list(missing_indices) insert_positions = np.searchsorted(indices, missing_indices) extended_matrix = np.insert(matrix, insert_positions, 0.0, axis=-1) M.append(extended_matrix) extended_truth_values = np.insert( truth_values, insert_positions, 0.0, axis=-1 ) tv.append(extended_truth_values) M = np.array(M) tv = np.array(tv) # Scale (nuisance) truth bins so they are consistent scale = self._get_truth_value_scale(tv) M = M * scale[:, np.newaxis, :] weights = np.array(self._weights) return M, weights
[docs] def export(self, filename, compress=False): """Save all necessary information for using the response matrix. Saves all necessary information for using the response matrix in a NumPy ``.npz`` archive. Parameters ---------- filename : str or file Where to store the arrays compress : bool, optional Whether to use compression See also -------- ResponseMatrix.export """ matrices, weights = self.get_random_response_matrices_as_ndarray() truth_entries = self.get_truth_entries_as_ndarray() data = { "matrices": matrices, "weights": weights, "truth_entries": truth_entries, "sparse_indices": np.flatnonzero(truth_entries), "is_sparse": True, } if compress: np.savez_compressed(filename, **data) else: np.savez(filename, **data)