"""Binning/histogramming classes for scientific computing
YAML interface
==============
All classes defined in `binning` can be stored as and read from YAML files
using the ``binning.yaml`` module::
with open("filename.yml", 'w') as f:
binning.yaml.dump(some_binning, f)
with open("filename.yml", 'r') as f:
some_binning = binning.yaml.full_load(f)
"""
from copy import deepcopy
from tempfile import TemporaryFile
import numpy as np
import yaml
from numpy.lib.recfunctions import rename_fields
# Use this function/object for parallelization where possible
mapper = map
[docs]class PhaseSpace(yaml.YAMLObject):
"""A PhaseSpace defines the possible combinations of variables that characterize an event.
Parameters
----------
variables : iterable of strings
The set of variables that define the phase space.
Attributes
----------
variables : set of str
The set of variables that define the phase space.
Notes
-----
A PhaseSpace can be seen as the carthesian product of its `variables`::
>>> ps = PhaseSpace(variables=['a', 'b', 'c'])
>>> print ps
('a' X 'c' X 'b')
You can check whether a variable is part of a phase space::
>>> 'a' in ps
True
Phase spaces can be compared to one another.
Check whether two phase spaces are identical::
>>> PhaseSpace(['a','b']) == PhaseSpace(['b', 'a'])
True
>>> PhaseSpace(['a', 'b']) == PhaseSpace(['a', 'c'])
False
>>> PhaseSpace(['a', 'b']) != PhaseSpace(['a', 'c'])
True
Check whether one phase space is a sub-space of the other::
>>> PhaseSpace(['a', 'b','c')] > PhaseSpace(['a', 'b'])
True
>>> PhaseSpace(['a', 'c']) < PhaseSpace(['a', 'b','c'])
True
"""
def __init__(self, variables):
self.variables = set(variables)
def __contains__(self, var):
return var in self.variables
def __len__(self):
return len(self.variables)
def __eq__(self, phasespace):
return self.variables == phasespace.variables
def __ne__(self, phasespace):
return self.variables != phasespace.variables
def __le__(self, phasespace):
return self.variables <= phasespace.variables
def __ge__(self, phasespace):
return self.variables >= phasespace.variables
def __lt__(self, phasespace):
return (self.variables <= phasespace.variables) and not (
self.variables == phasespace.variables
)
def __gt__(self, phasespace):
return (self.variables >= phasespace.variables) and not (
self.variables == phasespace.variables
)
def __mul__(self, phasespace):
return PhaseSpace(variables=(self.variables | phasespace.variables))
def __div__(self, phasespace):
return PhaseSpace(variables=(self.variables - phasespace.variables))
def __truediv__(self, phasespace):
# Python 3 div operator
return self.__div__(phasespace)
def __str__(self):
return "('" + "' X '".join(self.variables) + "')"
def __repr__(self):
return f"{type(self).__name__}(variables={self.variables!r})"
[docs] def clone(self):
"""Return a copy of the object."""
return deepcopy(self)
[docs] @classmethod
def to_yaml(cls, dumper, obj):
return dumper.represent_sequence("!PhaseSpace", list(obj.variables))
[docs] @classmethod
def from_yaml(cls, loader, node):
seq = loader.construct_sequence(node)
return cls(variables=seq)
yaml_loader = yaml.FullLoader
yaml_tag = "!PhaseSpace"
[docs]class Bin(yaml.YAMLObject):
"""A Bin is a container for a value that is defined on a subset of an n-dimensional phase space.
Parameters
----------
phasespace : PhaseSpace
The :class:`PhaseSpace` the `Bin` resides in.
value : float, optional
The initialization value of the bin. Default: 0.0
entries : int, optional
The initialization value of the number of entries. Default: 0
sumw2 : float, optional
The initialization value of the sum of squared weights. Default: ``value**2``
value_array : slice of ndarray, optional
A slice of a numpy array, where the value of the bin will be stored.
Default: ``None``
entries_array : slice of ndarray, optional
A slice of a numpy array, where the number entries will be stored.
Default: ``None``
sumw2_array : slice of ndarray, optional
A slice of a numpy array, where the squared weights will be stored.
Default: ``None``
dummy : bool, optional
Do not create a any arrays to store the data.
Default: ``False``
Attributes
----------
value : float
The value of the bin.
entries : int
The number of entries in the bin.
sumw2 : float
The sum of squared weights in the bin.
phasespace : PhaseSpace
The :class:`PhaseSpace` the bin is defined on
"""
def __init__(self, **kwargs):
self.phasespace = kwargs.pop("phasespace", None)
if self.phasespace is None:
raise TypeError("Undefined phase space!")
if not kwargs.pop("dummy", False):
self.value_array = kwargs.pop("value_array", None)
if self.value_array is None:
self.value_array = np.array([kwargs.pop("value", 0.0)], dtype=float)
self.entries_array = kwargs.pop("entries_array", None)
if self.entries_array is None:
self.entries_array = np.array([kwargs.pop("entries", 0)], dtype=int)
self.sumw2_array = kwargs.pop("sumw2_array", None)
if self.sumw2_array is None:
self.sumw2_array = np.array(
[kwargs.pop("sumw2", self.value**2)], dtype=float
)
else:
for key in ["value_array", "entries_array", "sumw2_array"]:
if key in kwargs:
del kwargs[key]
if len(kwargs) > 0:
raise TypeError(f"Unknown kwargs: {kwargs}")
@property
def value(self):
"""(float) The value of the bin.
The sum of weights.
"""
return self.value_array[0]
@value.setter
def value(self, v):
self.value_array[0] = v
@property
def entries(self):
"""(int) The number of entries in the bin."""
return self.entries_array[0]
@entries.setter
def entries(self, v):
self.entries_array[0] = v
@property
def sumw2(self):
"""(float) The sum of squared weights in the bin."""
return self.sumw2_array[0]
@sumw2.setter
def sumw2(self, v):
self.sumw2_array[0] = v
[docs] def event_in_bin(self, event):
"""Check whether the variable combination falls within the bin.
Parameters
----------
event : dict like
A dictionary (or similar object) with one value of each variable
in the binning, e.g.::
{'x': 1.4, 'y': -7.47}
Returns
-------
bool
Whether or not the variable combination lies within the bin.
"""
raise NotImplementedError("This method must be defined in an inheriting class.")
[docs] def fill(self, weight=1.0):
"""Add the weight(s) to the bin.
Also increases the number of entries and sum of squared weights accordingly.
Parameters
----------
weight : float or iterable of floats, optional
Weight(s) to be added to the value of the bin.
"""
try:
# Does the weight have a length?
n = len(weight)
except TypeError:
# No
w = weight
w2 = w**2
n = 1
else:
# Yes
weight = np.asarray(weight)
w = np.sum(weight)
w2 = np.sum(weight**2)
self.value += w
self.entries += n
self.sumw2 += w2
[docs] def is_dummy(self):
"""Return `True` if there is no data array linked to this bin."""
try:
self.value_array
except AttributeError:
return True
else:
return False
def __contains__(self, event):
"""Return True if the event falls within the bin."""
return self.event_in_bin(event)
def __eq__(self, other):
"""Bins are equal if they are of the same type, defined on the same phase space."""
return type(self) is type(other) and self.phasespace == other.phasespace
def __ne__(self, other):
return not self == other
def __add__(self, other):
ret = deepcopy(self)
ret.value = self.value + other.value
ret.entries = self.entries + other.entries
ret.sumw2 = self.sumw2 + other.sumw2
return ret
def __sub__(self, other):
ret = deepcopy(self)
ret.value = self.value - other.value
return ret
def __mul__(self, other):
ret = deepcopy(self)
ret.value = self.value * other.value
return ret
def __div__(self, other):
ret = deepcopy(self)
ret.value = self.value / other.value
return ret
def __truediv__(self, other):
# Python 3 div operator
return self.__div__(other)
def __repr__(self):
return "{}({})".format(
type(self).__name__,
", ".join([f"{k}={v!r}" for k, v in self._get_clone_kwargs().items()]),
)
def _get_clone_kwargs(self, **kwargs):
"""Get the necessary arguments to clone this object."""
args = {
"phasespace": deepcopy(self.phasespace),
}
if self.is_dummy() or kwargs.get("dummy", False):
args["dummy"] = True
else:
args.update(
{
"value_array": deepcopy(self.value_array),
"entries_array": deepcopy(self.entries_array),
"sumw2_array": deepcopy(self.sumw2_array),
}
)
args.update(kwargs)
return args
[docs] def clone(self, **kwargs):
"""Create a functioning copy of the Bin.
Can specify additional kwargs for the initialisation of the new Binning.
"""
args = self._get_clone_kwargs(**kwargs)
return type(self)(**args)
[docs] @classmethod
def to_yaml(cls, dumper, obj):
dic = obj._get_clone_kwargs(dummy=True)
if not obj.is_dummy():
del dic["dummy"]
return dumper.represent_mapping(cls.yaml_tag, dic)
[docs] @classmethod
def from_yaml(cls, loader, node):
dic = loader.construct_mapping(node, deep=True)
return cls(**dic)
yaml_loader = yaml.FullLoader
yaml_tag = "!Bin"
[docs]class RectangularBin(Bin):
"""A Bin defined by min and max values in all variables.
Parameters
----------
variables : iterable of str
The variables with defined edges.
edges : iterable of (int, int)
lower and upper edges for all variables::
[[x_lower, x_upper], [y_lower, y_upper], ...]
include_lower : bool, optional
Does the bin include the lower edges?
include_upper : bool, optional
Does the bin include the upper edges?
**kwargs : optional
Additional keyword arguments are passed to :class:`Bin`.
Attributes
----------
value : float
The value of the bin.
entries : int
The number of entries in the bin.
sumw2 : float
The sum of squared weights in the bin.
phasespace : PhaseSpace
The :class:`PhaseSpace` the bin is defined on
variables : tuple of str
The variable names.
edges : tuple of (int, int)
The bin edges for each variable.
include_lower : bool
Does the bin include the lower edges?
include_upper : bool
Does the bin include the upper edges?
"""
def __init__(
self, variables, edges, include_lower=True, include_upper=False, **kwargs
):
self.variables = tuple(variables)
self.edges = tuple(tuple(x) for x in edges)
self.include_lower = bool(include_lower)
self.include_upper = bool(include_upper)
# Create PhaseSpace from edges if necessary
phasespace = kwargs.get("phasespace", None)
if phasespace is None:
kwargs["phasespace"] = PhaseSpace(self.variables)
# Handle default bin initialization
Bin.__init__(self, **kwargs)
# Check that all edges are valid tuples
for i, var in enumerate(self.variables):
if var not in self.phasespace:
raise ValueError(f"Variable not part of PhaseSpace: {var}")
mi, ma = self.edges[i]
if ma < mi:
raise ValueError(
f"Upper edge is smaller than lower edge for variable {var}."
)
[docs] def event_in_bin(self, event):
"""Check whether an event is within all bin edges.
Parameters
----------
event : dict like
A dictionary (or similar object) with one value of each variable
in the binning, e.g.::
{'x': 1.4, 'y': -7.47}
Returns
-------
bool
Whether or not the variable combination lies within the bin.
"""
inside = True
for i, var in enumerate(self.variables):
mi, ma = self.edges[i]
val = event[var]
if self.include_lower:
if val < mi:
inside = False
break
else:
if val <= mi:
inside = False
break
if self.include_upper:
if val > ma:
inside = False
break
else:
if val >= ma:
inside = False
break
return inside
[docs] def get_center(self):
"""Return the bin center coordinates.
Returns
-------
ndarray
The center coordinates for each variable.
"""
arr = np.asfarray(self.edges)
return arr.sum(axis=1) / 2.0
def __eq__(self, other):
"""RectangularBins are equal if they have the same edges."""
return (
Bin.__eq__(self, other)
and sorted(zip(self.variables, self.edges))
== sorted(zip(other.variables, other.edges))
and self.include_lower == other.include_lower
and self.include_upper == other.include_upper
)
def _get_clone_kwargs(self, **kwargs):
"""Get the necessary arguments to clone this object."""
args = {
"include_upper": self.include_upper,
"include_lower": self.include_lower,
"variables": list(self.variables),
"edges": np.asarray(self.edges).tolist(),
}
args.update(Bin._get_clone_kwargs(self, **kwargs))
return args
yaml_tag = "!RectangularBin"
[docs]class CartesianProductBin(Bin):
"""A Bin that is part of a CartesianProductBinning.
An event is part of a bin, if it has the right data indices in the
constituent binnings.
Parameters
----------
binnings : iterable of Binning
data_indices : iterable of int
Specifies the constituent binnings and the respective data indices.
**kwargs : optional
Additional keyword arguments are passed to :class:`Bin`.
Attributes
----------
value : float
The value of the bin.
entries : int
The number of entries in the bin.
sumw2 : float
The sum of squared weights in the bin.
phasespace : PhaseSpace
The :class:`PhaseSpace` the bin is defined on
binnings : tuple of Binning
data_indices : tuple of int
Specifies the constituent binnings and the respective data indices.
"""
def __init__(self, binnings, data_indices, **kwargs):
self.binnings = tuple(binnings)
self.data_indices = tuple(data_indices)
# Create PhaseSpace from binnings if necessary
if "phasespace" not in kwargs:
kwargs["phasespace"] = PhaseSpace([])
for binning in self.binnings:
kwargs["phasespace"] *= binning.phasespace
Bin.__init__(self, **kwargs)
[docs] def event_in_bin(self, event):
"""Check whether an event is within the bin.
Parameters
----------
event : dict like
A dictionary (or similar object) with one value of each variable
in the binning, e.g.::
{'x': 1.4, 'y': -7.47}
Returns
-------
bool
Whether or not the variable combination lies within the bin.
"""
# Check that the event is at the right data position in all binnings
for binning, i in zip(self.binnings, self.data_indices):
if binning.get_event_data_index(event) != i:
return False
else:
return True
[docs] def get_marginal_bins(self):
"""Return the corresponding bins on the input binnings of the cartesian product.
Returns
-------
(bin_1, bin_2[, bin_3 ...])
See also
--------
get_marginal_subbins
"""
bins = ()
for b, d in zip(self.binnings, self.data_indices):
bins = bins + (b.bins[b.get_data_bin_index(d)],)
return bins
[docs] def get_marginal_subbins(self):
"""Return the corresponding subbins on the input binnings of the cartesian product.
This will return a tuple of tuples of subbins. One tuple for each input binning.
Returns
-------
((bin_1, [subbin_1a ...]), (bin_2, [subbin_2a ...]) [, (bin_3, [subbin_3a ...]) ...])
See also
--------
get_marginal_bins
Binning.get_subbins
"""
bins = ()
for b, d in zip(self.binnings, self.data_indices):
bins = bins + (b.get_subbins(b.get_data_bin_index(d)),)
return bins
def __eq__(self, other):
"""CartesianProductBins are equal, if the binnings and indices are equal."""
try:
if len(self.binnings) != len(other.binnings):
return False
# Try both combinations of self and other
for A, B in [(self, other), (other, self)]:
for self_binning, i in zip(A.binnings, A.data_indices):
# For each binning and index in self...
for other_binning, j in zip(B.binnings, B.data_indices):
# ... check that there is a matchin binning and index in other
if self_binning == other_binning and i == j:
break
else:
# Otherwise return `False`
return False
# Found a match for all elements
return Bin.__eq__(self, other)
except AttributeError:
return False
def _get_clone_kwargs(self, **kwargs):
"""Get the necessary arguments to clone this object."""
args = {
"binnings": [binning.clone(dummy=True) for binning in self.binnings],
"data_indices": list(self.data_indices),
}
args.update(Bin._get_clone_kwargs(self, **kwargs))
return args
yaml_tag = "!CartesianProductBin"
[docs]class Binning(yaml.YAMLObject):
"""A Binning is a set of disjunct Bins.
Parameters
----------
bins : list of Bin
The list of disjoint bins.
subbinnings : dict of {bin_index: Binning}, optional
Subbinnings to replace certain bins.
value_array : slice of ndarray, optional
A slice of a numpy array, where the values of the bins will be stored.
entries_array : slice of ndarray, optional
A slice of a numpy array, where the number of entries will be stored.
sumw2_array : slice of ndarray, optional
A slice of a numpy array, where the squared weights will be stored.
phasespace : PhaseSpace, optional
The :class:`PhaseSpace` the binning resides in.
dummy : bool, optional
Do not create any arrays to store the data.
Attributes
----------
bins : tuple of Bin
The list of disjoint bins on the PhaseSpace.
nbins : int
The number of bins in the binning.
data_size : int
The number of elements in the data arrays.
Might differ from ``nbins`` due to subbinnings.
subbinnings : dict of {bin_index: Binning}, optional
Subbinnings to replace certain bins.
value_array : slice of ndarray
A slice of a numpy array, where the values of the bins are stored.
entries_array : slice of ndarray
A slice of a numpy array, where the number of entries are stored.
sumw2_array : slice of ndarray
A slice of a numpy array, where the squared weights are stored.
phasespace : PhaseSpace
The :class:`PhaseSpace` the binning resides in.
Notes
-----
Subbinnings are used to get a finer binning within a given bin. The bin to
be replaced by the finer binning is specified using the *native* bin
index, i.e. the number it would have before the sub binnings are assigned.
Subbinnings are inserted into the numpy arrays at the position of the
original bins. This changes the *effective* bin number of all later bins.
The data itself is stored in Numpy arrays (or views of such) that are
managed by the :class:`Binning`. The arrays are linked to the contained
:class:`Bin` objects and subbinnings by setting their respective storage
arrays to sliced views of the data arrays. The original arrays in the bins
and subbinnings will always be replaced.
"""
def __init__(
self,
bins,
subbinnings=None,
value_array=None,
entries_array=None,
sumw2_array=None,
phasespace=None,
dummy=False,
):
if isinstance(bins, _BinProxy):
self.bins = bins
else:
self.bins = tuple(bins)
if subbinnings is None:
self.subbinnings = {}
else:
self.subbinnings = dict(subbinnings)
self.phasespace = phasespace
if self.phasespace is None:
self.phasespace = self._get_phasespace()
self.nbins = len(self.bins)
self.data_size = self.nbins
for binning in self.subbinnings.values():
self.data_size += (
binning.data_size - 1
) # Minus one, since one bin gets replaced
if not dummy:
self.value_array = value_array
if self.value_array is None:
self.value_array = np.zeros(self.data_size, dtype=float)
if self.value_array.shape != (self.data_size,):
raise TypeError("Value array shape is not same as (data_size,)!")
self.entries_array = entries_array
if self.entries_array is None:
self.entries_array = np.zeros(self.data_size, dtype=int)
if self.entries_array.shape != (self.data_size,):
raise TypeError("Entries array shape is not same as (data_size,)!")
self.sumw2_array = sumw2_array
if self.sumw2_array is None:
self.sumw2_array = np.zeros(self.data_size, dtype=float)
if self.sumw2_array.shape != (self.data_size,):
raise TypeError("Sumw2 array shape is not same as (data_size,)!")
self.link_arrays()
else:
self.value_array = None
self.entries_array = None
self.sumw2_array = None
def _get_phasespace(self):
"""Get PhaseSpace from Bins and subbinnings."""
ps = PhaseSpace([])
for bin in self.bins:
ps *= bin.phasespace
for binning in self.subbinnings.values():
ps *= binning.phasespace
return ps
[docs] def link_arrays(self):
"""Link the data storage arrays into the bins and sub_binnings."""
self._link_bins()
self._link_subbinnings()
def _link_bins(self):
for i, bin in enumerate(self.bins):
j = self.get_bin_data_index(i)
bin.value_array = self.value_array[j : j + 1]
bin.entries_array = self.entries_array[j : j + 1]
bin.sumw2_array = self.sumw2_array[j : j + 1]
def _link_subbinnings(self):
for i, binning in self.subbinnings.items():
j = self.get_bin_data_index(i)
n = binning.data_size
binning.value_array = self.value_array[j : j + n]
binning.entries_array = self.entries_array[j : j + n]
binning.sumw2_array = self.sumw2_array[j : j + n]
# Also make the subbinnings link the new arrays
binning.link_arrays()
[docs] def get_event_data_index(self, event):
"""Get the data array index of the given event.
Returns `None` if the event does not belong to any bin.
Parameters
----------
event : dict like
A dictionary (or similar object) with one value of each variable
in the binning, e.g.::
{'x': 1.4, 'y': -7.47}
Returns
-------
int or None
The bin number
See also
--------
get_event_bin_index
"""
bin_i = self.get_event_bin_index(event)
data_i = self.get_bin_data_index(bin_i)
if bin_i in self.subbinnings:
data_i += self.subbinnings[bin_i].get_event_data_index(event)
return data_i
[docs] def get_event_bin_index(self, event):
"""Get the bin number of the given event.
Returns `None` if the event does not belong to any bin.
Parameters
----------
event : dict like
A dictionary (or similar object) with one value of each variable
in the binning, e.g.::
{'x': 1.4, 'y': -7.47}
Returns
-------
int or None
The bin number
Notes
-----
The bin number can be used to access the corresponding :class:`Bin`,
or the subbinning in that bin (if it exists)::
i = binning.get_event_bin_index(event)
binning.bins[i]
binning.subbinnings[i]
This is *not* the same as the corresponding index in the data array if
there are any subbinnings present.
This is a dumb method that just loops over all bins until it finds a
fitting one. It should be replaced with something smarter for more
specifig binning classes.
See also
--------
get_event_data_index
get_event_bin
"""
for i in range(len(self.bins)):
if event in self.bins[i]:
return i
return None
[docs] def get_bin_data_index(self, bin_i):
"""Calculate the data array index from the bin number."""
if bin_i is None:
return None
data_i = bin_i
for i, binning in self.subbinnings.items():
if i < bin_i:
data_i = data_i + (
binning.data_size - 1
) # Minus one, because the original bin is replaced
return data_i
[docs] def get_data_bin_index(self, data_i):
"""Calculate the bin number from the data array index.
All data indices inside a subbinning will return the bin index of that
subbinning.
"""
if data_i is None:
return None
bin_i = data_i
for i in sorted(self.subbinnings.keys()):
if i > bin_i:
return bin_i
if i + self.subbinnings[i].data_size > bin_i:
return i
bin_i -= self.subbinnings[i].data_size - 1
return bin_i
[docs] def get_event_bin(self, event):
"""Get the bin of the event.
Returns `None` if the event does not fit in any bin.
Parameters
----------
event : dict like
A dictionary (or similar object) with one value of each variable
in the binning, e.g.::
{'x': 1.4, 'y': -7.47}
Returns
-------
Bin or None
The :class:`Bin` object the event fits into.
"""
i = self.get_event_bin_index(event)
if i is not None:
return self.bins[i]
else:
return None
[docs] def get_subbins(self, data_index):
"""Return a tuple of the bin and subbins corresponding to the data_index.
Paramteters
-----------
data_index : int
Returns
-------
(bin[, subbin[, subbin ...]])
"""
i = self.get_data_bin_index(data_index)
if i in self.subbinnings:
d = self.get_bin_data_index(i) # Offset for data index in subbinning
bins = self.subbinnings[i].get_subbins(data_index - d)
return (self.bins[i],) + bins
else:
return (self.bins[i],)
[docs] def get_event_subbins(self, event):
"""Get the tuple of subbins of the event.
Returns `None` if the event does not fit in any bin.
Parameters
----------
event : dict like
A dictionary (or similar object) with one value of each variable
in the binning, e.g.::
{'x': 1.4, 'y': -7.47}
Returns
-------
([bin[, subbin[, subbin ...]]) or None
"""
i = self.get_event_data_index(event)
if i is not None:
return self.get_subbins(i)
else:
return None
[docs] def get_adjacent_bin_indices(self):
"""Return a list of adjacent bin indices.
Returns
-------
adjacent_indices : list of ndarray
The adjacent indices of each bin
"""
# The general case is that we just don't know which bin is adjacent to
# which. Return a list of empty lists.
return [np.array([], dtype=int)] * self.nbins
[docs] def get_adjacent_data_indices(self):
"""Return a list of adjacent data indices.
Returns
-------
adjacent_indices : list of ndarray
The adjacent indices of each data index
Notes
-----
Data indices inside a subbinning will only ever be adjacent to other
indices inside the same subbinning. There is no information available
about which bins in a subbinning are adjacent to which bins in the
parent binning.
"""
# Start with adjacent bins
i_bin = self.get_adjacent_bin_indices()
# Replace bin indices with data indices
# and remove references to subbinnings
i_data = []
for i, adj in enumerate(i_bin):
if i not in self.subbinnings:
# Regular bin
# Add neighbouring bins translated to data indices
i_data.append([])
for j in adj:
if j not in self.subbinnings:
i_data[-1].append(self.get_bin_data_index(j))
i_data[-1] = np.array(i_data[-1], dtype=int)
else:
# Subbinning
# Add its adjacent data indices offset to correct position
offset = self.get_bin_data_index(i)
for adj in self.subbinnings[i].get_adjacent_data_indices():
i_data.append(adj + offset)
return i_data
[docs] def fill(self, event, weight=1, raise_error=False, rename=None):
"""Fill the events into their respective bins.
Parameters
----------
event : [iterable of] dict like or Numpy structured array or Pandas DataFrame
The event(s) to be filled into the binning.
weight : float or iterable of floats, optional
The weight of the event(s).
Can be either a scalar which is then used for all events
or an iterable of weights for the single events.
Default: 1.
raise_error : bool, optional
Raise a ValueError if an event is not in the binning.
Otherwise ignore the event.
Default: False
rename : dict, optional
Dict for translating event variable names to binning variable names.
Default: `{}`, i.e. no translation
"""
try:
if len(event) == 0:
# Empty iterable? Stop right here
return
except TypeError:
# Not an iterable
event = [event]
if rename is None:
rename = {}
if len(rename) > 0:
try:
# Numpy array?
event = rename_fields(event, rename)
except AttributeError:
try:
# Pandas DataFrame?
event = event.rename(index=str, columns=rename)
except AttributeError:
# Dict?
for e in event:
for name in rename:
e[rename[name]] = e[name]
ibins = None
if ibins is None:
try:
# Try to get bin numbers from a pandas DataFrame
ibins = list(
mapper(
lambda irow: self.get_event_data_index(irow._asdict()),
event.itertuples(),
)
)
except AttributeError:
# Seems like this is not a DataFrame
pass
if ibins is None:
try:
# Try to get bin numbers from structured numpy array
ibins = list(mapper(self.get_event_data_index, np.nditer(event)))
except TypeError:
# Seems like this is not a structured numpy array
pass
if ibins is None:
try:
# Try to get bin numbers from any iterable of events
ibins = list(mapper(self.get_event_data_index, event))
except TypeError:
# We probably only have a single event
ibins = [self.get_event_data_index(event)]
if raise_error and None in ibins:
raise ValueError("Event not part of binning!")
# Compare len of weight list and event list
try:
if len(ibins) != len(weight):
raise ValueError("Different length of event and weight lists!")
except TypeError:
weight = [weight] * len(ibins)
for i, w in zip(ibins, weight):
if i is not None:
self.fill_data_index(i, w)
[docs] def fill_data_index(self, i, weight=1.0):
"""Add the weight(s) to the given data position.
Also increases the number of entries and sum of squared weights accordingly.
Parameters
----------
i : int
The index of the data arrays to be filled.
weight : float or iterable of floats, optional
Weight(s) to be added to the value of the bin.
"""
try:
# Does the weight have a length?
n = len(weight)
except TypeError:
# No
w = weight
w2 = w**2
n = 1
else:
# Yes
weight = np.asarray(weight)
w = np.sum(weight)
w2 = np.sum(weight**2)
self.value_array[i] += w
self.entries_array[i] += n
self.sumw2_array[i] += w2
@staticmethod
def _genfromtxt(filename, delimiter=",", names=True, chunksize=10000):
"""Replacement for numpy's genfromtxt, that should need less memory."""
with open(filename) as f:
if names:
namelist = f.readline().split(delimiter)
dtype = [(name.strip(), float) for name in namelist]
else:
namelist = None
dtype = float
arr = np.array([], dtype=dtype)
rows = []
for line in f:
if len(rows) >= chunksize:
arr = np.concatenate((arr, np.array(rows, dtype=dtype)), axis=0)
rows = []
rows.append(tuple(map(float, line.split(delimiter))))
arr = np.concatenate((arr, np.array(rows, dtype=dtype)), axis=0)
return arr
_csv_buffer = {}
@classmethod
def _load_csv_file_buffered(cls, filename, chunksize):
"""Load a CSV file and save the resulting array in a temporary file.
If the same file is loaded a second time, the buffer is loaded instead
of re-parsing the CSV file.
"""
if filename in cls._csv_buffer:
# File has been loaded before
f = cls._csv_buffer[filename]
f.seek(0)
arr = np.load(f)
else:
# New file
f = TemporaryFile()
arr = cls._genfromtxt(
filename, delimiter=",", names=True, chunksize=chunksize
)
np.save(f, arr)
cls._csv_buffer[filename] = f
return arr
[docs] @classmethod
def fill_multiple_from_csv_file(
cls,
binnings,
filename,
weightfield=None,
weight=1.0,
rename=None,
cut_function=lambda x: x,
buffer_csv_files=False,
chunksize=10000,
**kwargs,
):
"""Fill multiple Binnings from the same csv file(s).
This method saves time, because the numpy array only has to be
generated once. Other than the list of binnings to be filled, the
(keyword) arguments are identical to the ones used by the instance
method :meth:`fill_from_csv_file`.
"""
if rename is None:
rename = {}
# Handle lists recursively
if isinstance(filename, list):
try:
for item, w in zip(filename, weight):
cls.fill_multiple_from_csv_file(
binnings,
item,
weightfield=weightfield,
weight=w,
rename=rename,
cut_function=cut_function,
buffer_csv_files=buffer_csv_files,
**kwargs,
)
except TypeError:
for item in filename:
cls.fill_multiple_from_csv_file(
binnings,
item,
weightfield=weightfield,
weight=weight,
rename=rename,
cut_function=cut_function,
buffer_csv_files=buffer_csv_files,
**kwargs,
)
return
if buffer_csv_files:
data = cls._load_csv_file_buffered(filename, chunksize=chunksize)
else:
data = cls._genfromtxt(
filename, delimiter=",", names=True, chunksize=chunksize
)
data = rename_fields(data, rename)
data = cut_function(data)
if weightfield is not None:
weight = data[weightfield] * weight
for binning in binnings:
binning.fill(data, weight=weight, **kwargs)
[docs] def fill_from_csv_file(self, *args, **kwargs):
"""Fill the binning with events from a CSV file.
Parameters
----------
filename : string or list of strings
The csv file with the data. Can be a list of filenames.
weightfield : string, optional
The column with the event weights.
weight : float or iterable of floats, optional
A single weight that will be applied to all events in the file.
Can be an iterable with one weight for each file if `filename` is a list.
rename : dict, optional
A dict with columns that should be renamed before filling::
{'csv_name': 'binning_name'}
cut_function : function, optional
A function that modifies the loaded data before filling into the binning,
e.g.::
cut_function(data) = data[ data['binning_name'] > some_threshold ]
This is done *after* the optional renaming.
buffer_csv_files : bool, optional
Save the results of loading CSV files in temporary files
that can be recovered if the same CSV file is loaded again. This
speeds up filling multiple Binnings with the same CSV-files considerably!
Default: False
chunksize : int, optional
Load csv file in chunks of <chunksize> rows. This reduces the memory
footprint of the loading operation, but can slow it down.
Default: 10000
Notes
-----
The file must be formated like this::
first_varname,second_varname,...
<first_value>,<second_value>,...
<first_value>,<second_value>,...
<first_value>,<second_value>,...
...
For example::
x,y,z
1.0,2.1,3.2
4.1,2.0,2.9
3,2,1
All values are interpreted as floats. If `weightfield` is given, that
field will be used as weigts for the event. Other keyword arguments
are passed on to the Binning's :meth:`fill` method. If filename is a list,
all elemets are handled recursively.
"""
# Actual filling is handled by static method
Binning.fill_multiple_from_csv_file([self], *args, **kwargs)
[docs] def reset(self, value=0.0, entries=0, sumw2=0.0):
"""Reset all bin values to 0.
Parameters
----------
value : float, optional
Set the bin values to this value.
entries : int, optional
Set the number of entries in each bin to this value.
sumw2 : float, optional
Set the sum of squared weights in each bin to this value.
"""
self.value_array.fill(value)
self.entries_array.fill(entries)
self.sumw2_array.fill(sumw2)
[docs] def get_values_as_ndarray(self, shape=None, indices=None):
"""Return the bin values as ndarray.
Parameters
----------
shape: tuple of ints
Shape of the resulting array.
Default: ``(len(bins),)``
indices: list of ints
Only return the given bins.
Default: Return all bins.
Returns
-------
ndarray
An ndarray with the values of the bins.
"""
if indices is None:
indices = slice(None, None, None)
ret = np.array(self.value_array[indices])
if shape is not None:
ret = ret.reshape(shape, order="C")
else:
ret = ret.reshape((ret.size,), order="C")
return ret
[docs] def set_values_from_ndarray(self, arr):
"""Set the bin values to the values of the ndarray."""
self.value_array.flat[:] = np.asarray(arr).flat
[docs] def get_entries_as_ndarray(self, shape=None, indices=None):
"""Return the number of entries in the bins as ndarray.
Parameters
----------
shape: tuple of ints
Shape of the resulting array.
Default: ``(len(bins),)``
indices: list of ints
Only return the given bins.
Default: Return all bins.
Returns
-------
ndarray
An ndarray with the numbers of entries of the bins.
"""
if indices is None:
indices = slice(None, None, None)
ret = np.array(self.entries_array[indices])
if shape is not None:
ret = ret.reshape(shape, order="C")
else:
ret = ret.reshape((ret.size,), order="C")
return ret
[docs] def set_entries_from_ndarray(self, arr):
"""Set the number of bin entries to the values of the ndarray."""
self.entries_array.flat[:] = np.asarray(arr).flat
[docs] def get_sumw2_as_ndarray(self, shape=None, indices=None):
"""Return the sum of squared weights in the bins as ndarray.
Parameters
----------
shape: tuple of ints
Shape of the resulting array.
Default: ``(len(bins),)``
indices: list of ints
Only return the given bins.
Default: Return all bins.
Returns
-------
ndarray
An ndarray with the sum of squared weights of the bins.
"""
if indices is None:
indices = slice(None, None, None)
ret = np.copy(self.sumw2_array[indices])
if shape is not None:
ret = ret.reshape(shape, order="C")
else:
ret = ret.reshape((ret.size,), order="C")
return ret
[docs] def set_sumw2_from_ndarray(self, arr):
"""Set the sums of squared weights to the values of the ndarray."""
self.sumw2_array.flat[:] = np.asarray(arr).flat
[docs] def event_in_binning(self, event):
"""Check whether an event fits into any of the bins."""
i = self.get_event_data_index(event)
if i is None:
return False
else:
return True
[docs] def iter_subbins(self):
"""Iterate over all bins and subbins.
Will yield a tuple of the bins in this Binning and all subbinnings in
the order they correspond to the data indices.
Yields
------
(bin[, subbin[, subbin ...]])
"""
for i, b in enumerate(self.bins):
if i in self.subbinnings:
for sb in self.subbinnings[i].iter_subbins():
yield (b,) + sb
else:
yield (b,)
[docs] def is_dummy(self):
"""Return `True` if there is no data array linked to this binning."""
if self.value_array is None:
return True
else:
return False
def __contains__(self, event):
return self.event_in_binning(event)
def __eq__(self, other):
"""Binnings are equal if all bins and the phase space are equal."""
return (
self.bins == other.bins
and self.phasespace == other.phasespace
and self.subbinnings == other.subbinnings
)
def __ne__(self, other):
return not self == other
[docs] def marginalize_subbinnings_on_ndarray(self, array, bin_indices=None):
"""Marginalize out the bins corresponding to the subbinnings.
Parameters
----------
array : ndarray
The data to work on.
bin_indices : list of int, optional
The bin indices of the subbinnings to be marginalized.
If no indices are specified, all subbinnings are marginalized.
Returns
-------
new_array : ndarray
"""
if bin_indices is None:
bin_indices = self.subbinnings.keys()
# Create working copy of input array
new_array = np.array(array)
# Determine indices to be removed and set new values
remove_i = []
for i in bin_indices:
if i in self.subbinnings:
binning = self.subbinnings[i]
else:
raise ValueError("No subbinning at bin index %d!" % (i,))
i_data = self.get_bin_data_index(i)
n_data = binning.data_size
remove_i.extend(
range(i_data + 1, i_data + n_data)
) # Skip first one, since we substitute a single bin
# Set marginalized value
new_array[i_data] = np.sum(new_array[i_data : i_data + n_data])
# Remove marginalized elements
remove_i = np.array(sorted(remove_i), dtype=int)
new_array = np.delete(new_array, remove_i, axis=0)
return new_array
[docs] def marginalize_subbinnings(self, bin_indices=None):
"""Return a clone of the Binning with subbinnings removed.
Parameters
----------
bin_indices : list of int, optional
The bin indices of the subbinnings to be marginalized.
If no indices are specified, all subbinnings are marginalized.
Returns
-------
new_binning : Binning
"""
if bin_indices is None:
bin_indices = self.subbinnings.keys()
# Clone the subbinnings that will remain in the binning
subbinnings = {}
for i in self.subbinnings:
if i not in bin_indices:
binning = self.subbinnings[i].clone(dummy=True)
subbinnings[i] = binning
kwargs = {"subbinnings": subbinnings}
if self.is_dummy():
pass
else:
kwargs.update(
{
"value_array": self.marginalize_subbinnings_on_ndarray(
self.value_array, bin_indices
),
"entries_array": self.marginalize_subbinnings_on_ndarray(
self.entries_array, bin_indices
),
"sumw2_array": self.marginalize_subbinnings_on_ndarray(
self.sumw2_array, bin_indices
),
}
)
return self.clone(**kwargs)
[docs] def insert_subbinning_on_ndarray(self, array, bin_index, insert_array):
"""Insert values of a new subbinning into the array.
Parameters
----------
array : ndarray
The data to work on.
bin_index : int
The bin to be replaced with the subbinning.
insert_array : ndarrau
The array to be inserted.
Returns
-------
new_array : ndarray
The modified array.
"""
i_data = self.get_bin_data_index(bin_index)
new_array = np.insert(
array, i_data + 1, insert_array[1:], axis=0
) # Do not insert the first element
new_array[i_data] = insert_array[
0
] # Instead set overwrite the values of the bin
return new_array
[docs] def insert_subbinning(self, bin_index, binning):
"""Insert a new subbinning into the binning.
Parameters
----------
bin_index : int
The bin to be replaced with the subbinning.
binning : Binning
The new subbinning
Returns
-------
new_binning : Binning
A copy of this binning with the new subbinning.
Warnings
--------
This will replace the content of the bin with the content of the new
subbinning!
"""
if bin_index in self.subbinnings:
raise ValueError("Bin %d already has a subbinning!" % (bin_index,))
subbinnings = {}
for i, b in self.subbinnings.items():
subbinnings[i] = b.clone(dummy=True)
subbinnings[bin_index] = binning
kwargs = {
"subbinnings": subbinnings,
"value_array": self.insert_subbinning_on_ndarray(
self.value_array, bin_index, binning.value_array
),
"entries_array": self.insert_subbinning_on_ndarray(
self.entries_array, bin_index, binning.entries_array
),
"sumw2_array": self.insert_subbinning_on_ndarray(
self.sumw2_array, bin_index, binning.sumw2_array
),
}
return self.clone(**kwargs)
def __add__(self, other):
ret = self.clone()
ret.set_values_from_ndarray(
self.get_values_as_ndarray() + other.get_values_as_ndarray()
)
ret.set_entries_from_ndarray(
self.get_entries_as_ndarray() + other.get_entries_as_ndarray()
)
ret.set_sumw2_from_ndarray(
self.get_sumw2_as_ndarray() + other.get_sumw2_as_ndarray()
)
return ret
def _get_clone_kwargs(self, **kwargs):
"""Get the necessary arguments to clone this object."""
args = {
"subbinnings": {
i: binning.clone(dummy=True) for i, binning in self.subbinnings.items()
},
"phasespace": deepcopy(self.phasespace),
}
if "bins" in kwargs:
# Overwrite bins and do not re-create them one by one
args["bins"] = kwargs["bins"]
else:
# Re-create the bins one by one
args["bins"] = [bin.clone(dummy=True) for bin in self.bins]
if self.is_dummy() or kwargs.get("dummy", False):
args["dummy"] = True
else:
args.update(
{
"value_array": deepcopy(self.value_array),
"entries_array": deepcopy(self.entries_array),
"sumw2_array": deepcopy(self.sumw2_array),
}
)
args.update(kwargs)
return args
[docs] def clone(self, **kwargs):
"""Create a functioning copy of the Binning.
Can specify additional kwargs for the initialisation of the new Binning.
"""
args = self._get_clone_kwargs(**kwargs)
return type(self)(**args)
def __repr__(self):
return "{}({})".format(
type(self).__name__,
", ".join([f"{k}={v!r}" for k, v in self._get_clone_kwargs().items()]),
)
[docs] @classmethod
def to_yaml(cls, dumper, obj):
dic = obj._get_clone_kwargs(dummy=True)
if not obj.is_dummy():
del dic["dummy"]
return dumper.represent_mapping(cls.yaml_tag, dic)
[docs] @classmethod
def from_yaml(cls, loader, node):
dic = loader.construct_mapping(node, deep=True)
return cls(**dic)
yaml_loader = yaml.FullLoader
yaml_tag = "!Binning"
[docs]class RectangularBinning(Binning):
"""Binning that contains only :class:`RectangularBin`
Parameters
----------
variables : list of str
The variables the binning is defined on.
bin_edges : list of ((float, float), (float, float), ...)
The list of bin edges defining the bins. The tuples contain the lower
and upper edges of all `variables`, e.g.::
[
((x_low, x_high), (y_low, y_high)),
((x_low, x_high), (y_low, y_high)),
...
]
**kwargs : optional
Additional keyword arguments will be passed to :class:`Binning`.
Attributes
----------
variables : tuple of str
The variables corresponding to the bin edges.
include_upper : bool
Include the upper rather than the lower bin edges.
bins : tuple of Bin
The tuple of RectangularBins.
nbins : int
The number of bins in the binning.
data_size : int
The number of elements in the data arrays.
Might differ from ``nbins`` due to subbinnings.
subbinnings : dict of {bin_index: Binning}
Subbinnings to replace certain bins.
value_array : slice of ndarray
A slice of a numpy array, where the values of the bins are stored.
entries_array : slice of ndarray
A slice of a numpy array, where the number of entries are stored.
sumw2_array : slice of ndarray
A slice of a numpy array, where the squared weights are stored.
phasespace : PhaseSpace
The :class:`PhaseSpace` the binning resides in.
"""
def __init__(self, variables, bin_edges, include_upper=False, **kwargs):
self.variables = tuple(variables)
self.include_upper = bool(include_upper)
bins = []
for edges in bin_edges:
bins.append(
RectangularBin(
variables=variables,
edges=edges,
include_upper=self.include_upper,
include_lower=not self.include_upper,
dummy=True,
)
)
Binning.__init__(self, bins=bins, **kwargs)
def _get_clone_kwargs(self, **kwargs):
"""Get the necessary arguments to clone this object."""
variables = list(self.variables)
bin_edges = [] # Turn all tuples into lists
for bn in self.bins:
bin_edges.append([list(x) for x in bn.edges])
args = {
"variables": list(variables),
"bin_edges": bin_edges,
"include_upper": self.include_upper,
}
args.update(Binning._get_clone_kwargs(self, bins=None, **kwargs))
del args["bins"]
return args
yaml_tag = "!RectangularBinning"
class _BinProxy:
"""Base class for all bin proxies."""
def __init__(self, binning):
self.binning = binning
def __len__(self):
return self.binning.nbins
def __iter__(self):
for i in range(len(self)):
yield self[i]
def __eq__(self, other):
return self.binning == other.binning
def __ne__(self, other):
return not self == other
class _CartesianProductBinProxy(_BinProxy):
"""Indexable class that creates bins on the fly."""
def __getitem__(self, index):
"""Dynamically build an CartesianProductBin when requested."""
tup = self.binning.get_bin_index_tuple(index)
index = self.binning.get_bin_data_index(index)
val_slice = self.binning.value_array[index : index + 1]
ent_slice = self.binning.entries_array[index : index + 1]
sumw2_slice = self.binning.sumw2_array[index : index + 1]
binnings = []
data_indices = []
for i, j in enumerate(tup):
binnings.append(self.binning.binnings[i])
data_indices.append(j)
bin = CartesianProductBin(
binnings,
data_indices,
value_array=val_slice,
entries_array=ent_slice,
sumw2_array=sumw2_slice,
)
return bin
[docs]class CartesianProductBinning(Binning):
"""A Binning that is the cartesian product of two or more Binnings
Parameters
----------
binnings : list of Binning
The Binning objects to be multiplied.
Attributes
----------
binnings : tuple of Binning
The :class:`Binning` objects that make up the Cartesian product.
bins : proxy for Bins
Proxy that will generate :class:`CartesianProductBin` instances,
when accessed.
nbins : int
The number of bins in the binning.
bins_shape : tuple of int
The sizes of the constituent binnings.
data_size : int
The number of elements in the data arrays.
Might differ from ``nbins`` due to subbinnings.
subbinnings : dict of {bin_index: Binning}
Subbinnings to replace certain bins.
value_array : slice of ndarray
A slice of a numpy array, where the values of the bins are stored.
entries_array : slice of ndarray
A slice of a numpy array, where the number of entries are stored.
sumw2_array : slice of ndarray
A slice of a numpy array, where the squared weights are stored.
phasespace : PhaseSpace
The :class:`PhaseSpace` the binning resides in.
Notes
-----
This creates a Binning with as many bins as the product of the number of
bins in the iput binnings.
"""
def __init__(self, binnings, **kwargs):
self.binnings = tuple(binnings)
self.bins_shape = tuple(binning.data_size for binning in self.binnings)
self._stepsize = [1]
# Calculate the step size (or stride) for each binning index.
# We use a row-major ordering (C-style).
# The index of the later binnings varies faster than the ones before:
#
# (0,0) <-> 0
# (0,1) <-> 1
# (0,2) <-> 2
# (1,0) <-> 3
# ...
#
# _stepsize is 1 longer than binnings and bins_shape!
for n in reversed(self.bins_shape):
self._stepsize.insert(0, self._stepsize[0] * n)
self._stepsize = tuple(self._stepsize)
self.nbins = self._stepsize[0]
phasespace = kwargs.get("phasespace", None)
if phasespace is None:
# Create phasespace from binnings
phasespace = PhaseSpace([])
for binning in self.binnings:
phasespace *= binning.phasespace
kwargs["phasespace"] = phasespace
bins = kwargs.pop("bins", None)
if bins is not None:
raise TypeError(
"Cannot define bins of CartesianProductBinning! Define binnings instead."
)
else:
# Create bin proxy
bins = _CartesianProductBinProxy(self)
kwargs["bins"] = bins
Binning.__init__(self, **kwargs)
def _link_bins(self):
# We do not need to link each bin separately,
# the bin proxy takes care of this
pass
[docs] def get_tuple_bin_index(self, tup):
"""Translate a tuple of binning specific bin indices to the linear bin index of the event.
Turns this::
(i_x, i_y, i_z)
into this::
i_bin
The order of the indices in the tuple must conform to the order of
`binnings`. The bins are ordered row-major (C-style), i.e. increasing
the bin number of the last binning by one increases the overall bin
number also by one. The increments of the other variables depend on the
number of bins in each variable.
"""
if None in tup:
return None
i_bin = 0
for i, s in zip(tup, self._stepsize[1:]):
i_bin += s * i
return i_bin
[docs] def get_bin_index_tuple(self, i_bin):
"""Translate the linear bin index of the event to a tuple of single binning bin indices.
Turns this::
i_bin
into this::
(i_x, i_y, i_z)
The order of the indices in the tuple conforms to the order of
`binnings`. The bins are ordered row-major (C-style), i.e. increasing
the bin number of the last variable by one increases the overall bin
number also by one. The increments of the other variables depend on the
number of bins in each variable.
"""
if i_bin is None or i_bin < 0 or i_bin >= self.nbins:
return tuple([None] * len(self.binnings))
tup = tuple(
(i_bin % t) // s for t, s in zip(self._stepsize[:-1], self._stepsize[1:])
)
return tup
[docs] def get_event_tuple(self, event):
"""Get the variable index tuple for a given event."""
tup = []
for binning in self.binnings:
i = binning.get_event_data_index(event)
tup.append(i)
return tuple(tup)
[docs] def get_event_bin_index(self, event):
"""Get the bin index for a given event."""
tup = self.get_event_tuple(event)
return self.get_tuple_bin_index(tup)
[docs] def get_adjacent_bin_indices(self):
"""Return a list of adjacent bin indices.
Returns
-------
adjacent_indices : list of ndarray
The adjacent indices of each bin
"""
# Adjacent bins are based on the adjacent data indices of the
# constituting binnings
adj_tuple = tuple(b.get_adjacent_data_indices() for b in self.binnings)
adj = []
# For all bins
for i_bin in range(self.nbins):
i_adj = []
# Get the tuple of binning data indices
tup = self.get_bin_index_tuple(i_bin)
for i_binning in range(len(tup)):
variations = adj_tuple[i_binning][tup[i_binning]]
var_tup = list(tup)
for k in variations:
var_tup[i_binning] = k
i_adj.append(self.get_tuple_bin_index(var_tup))
adj.append(np.array(sorted(i_adj), dtype=int))
return adj
[docs] def marginalize(self, binning_i, reduction_function=np.sum):
"""Marginalize out the given binnings and return a new CartesianProductBinning.
Parameters
----------
binning_i : iterable of int
Iterable of index of binning to be marginalized.
reduction_function : function
Use this function to marginalize out the entries over the specified variables.
Must support the `axis` keyword argument.
Default: numpy.sum
"""
try:
len(binning_i)
except TypeError:
binning_i = [binning_i]
# Create new binning
new_binnings = [binning.clone(dummy=True) for binning in self.binnings]
for i in sorted(binning_i, reverse=True):
del new_binnings[i]
new_binning = CartesianProductBinning(new_binnings)
# Copy and project values, from binning without subbinnings
axes = tuple(sorted(binning_i))
temp_binning = self.marginalize_subbinnings()
new_values = reduction_function(
temp_binning.get_values_as_ndarray(shape=temp_binning.bins_shape), axis=axes
)
new_entries = reduction_function(
temp_binning.get_entries_as_ndarray(shape=temp_binning.bins_shape),
axis=axes,
)
new_sumw2 = reduction_function(
temp_binning.get_sumw2_as_ndarray(shape=temp_binning.bins_shape), axis=axes
)
new_binning.set_values_from_ndarray(new_values)
new_binning.set_entries_from_ndarray(new_entries)
new_binning.set_sumw2_from_ndarray(new_sumw2)
return new_binning
def _unpack(self):
"""Return the unpacked last remaining binning."""
if len(self.binnings) != 1:
raise RuntimeError("Unpacking only works if there is exactly one binning.")
if len(self.subbinnings) != 0:
raise RuntimeError(
"Unpacking only works if there is exactly zero subbinnings."
)
kwargs = {
"value_array": self.value_array,
"entries_array": self.entries_array,
"sumw2_array": self.sumw2_array,
"dummy": False,
}
return self.binnings[0].clone(**kwargs)
[docs] def project(self, binning_i, **kwargs):
"""Project the binning onto the given binnings and return a new CartesianProductBinning.
The order of the original binnings is preserved. If a single ``int`` is
provided, the returned Binning is of the same type as the respective
binning.
Parameters
----------
binning_i : iterable of int, or int
Iterable of index of binning to be marginalized.
kwargs : optional
Additional keyword arguments are passed on to :meth:`marginalize`.
Returns
-------
CartesianProductBinning or type(self.binnings[binning_i])
"""
try:
i = list(binning_i)
except TypeError:
i = [binning_i]
# Which variables to remove
rm_i = list(range(len(self.binnings)))
list(map(rm_i.remove, i))
ret = self.marginalize(rm_i, **kwargs)
if isinstance(binning_i, int):
return ret._unpack()
else:
return ret
def __eq__(self, other):
"""CartesianProductBinnings are equal if the included Binnings match."""
return (
type(self) is type(other)
and self.binnings == other.binnings
and self.subbinnings == other.subbinnings
)
def _get_clone_kwargs(self, **kwargs):
"""Get the necessary arguments to clone this object."""
args = {
"binnings": list(binning.clone(dummy=True) for binning in self.binnings),
}
args.update(Binning._get_clone_kwargs(self, bins=None, **kwargs))
del args["bins"]
return args
yaml_tag = "!CartesianProductBinning"
class _LinearBinProxy(_BinProxy):
"""Indexable class that creates bins on the fly."""
def __getitem__(self, index):
"""Dynamically build a RectangularBin when requested."""
variable = self.binning.variable
lower = self.binning.bin_edges[index]
upper = self.binning.bin_edges[index + 1]
data_index = self.binning.get_bin_data_index(index)
args = {
"variables": [variable],
"edges": [(lower, upper)],
"include_lower": not self.binning.include_upper,
"include_upper": self.binning.include_upper,
}
if not self.binning.is_dummy():
args.update(
{
"value_array": self.binning.value_array[
data_index : data_index + 1
],
"entries_array": self.binning.entries_array[
data_index : data_index + 1
],
"sumw2_array": self.binning.sumw2_array[
data_index : data_index + 1
],
}
)
rbin = RectangularBin(**args)
return rbin
[docs]class LinearBinning(Binning):
"""A simple binning, defined by bin edges on a single variable.
Parameters
----------
variable : str
The name of te defining variable.
bin_edges : list of float
The bin edges defining the bins.
include_upper : bool, optional
Include the upper edge of bins instead of the default lower edge.
**kwargs : optional
Additional keyword arguments are handed to :class:`Binning`.
Attributes
----------
variable : str
The variable on which the bin edges are defined.
bin_edges : ndarray
The bin edges.
include_upper : bool
Are the upper edges included in each bin?
bins : proxy for Bins
Proxy that will generate :class:`RectangularBin` instances,
when accessed.
nbins : int
The number of bins in the binning.
data_size : int
The number of elements in the data arrays.
Might differ from ``nbins`` due to subbinnings.
subbinnings : dict of {bin_index: Binning}, optional
Subbinnings to replace certain bins.
value_array : slice of ndarray
A slice of a numpy array, where the values of the bins are stored.
entries_array : slice of ndarray
A slice of a numpy array, where the number of entries are stored.
sumw2_array : slice of ndarray
A slice of a numpy array, where the squared weights are stored.
phasespace : PhaseSpace
The :class:`PhaseSpace` the binning resides in.
"""
def __init__(self, variable, bin_edges, include_upper=False, **kwargs):
self.variable = variable
self.bin_edges = np.asfarray(bin_edges)
self.include_upper = bool(include_upper)
self.nbins = self.bin_edges.size - 1
phasespace = kwargs.get("phasespace", None)
if phasespace is None:
# Create phasespace from variable
phasespace = PhaseSpace([variable])
kwargs["phasespace"] = phasespace
bins = kwargs.pop("bins", None)
if bins is not None:
raise TypeError(
"Cannot define bins of LinearBinning! Define bin edges instead."
)
else:
# Create bin proxy
bins = _LinearBinProxy(self)
kwargs["bins"] = bins
Binning.__init__(self, **kwargs)
def _link_bins(self):
# We do not need to link each bin separately,
# the bin proxy takes care of this
pass
[docs] def get_event_bin_index(self, event):
"""Get the bin index for a given event."""
i = int(
np.digitize(event[self.variable], self.bin_edges, right=self.include_upper)
)
# Deal with Numpy's way of handling over- and underflows
if i > 0 and i < len(self.bin_edges):
i -= 1
else:
i = None
return i
[docs] def get_adjacent_bin_indices(self):
"""Return a list of adjacent bin indices.
Returns
-------
adjacent_indices : list of ndarray
The adjacent indices of each bin
"""
# Adjacent bins are the ones before and after
i_bin = np.arange(self.nbins)
i_bin_m = i_bin - 1
i_bin_p = i_bin + 1
adj = list(zip(i_bin_m, i_bin_p))
adj = list(map(np.array, adj))
# Remove out of range elements
adj[0] = np.array([adj[0][1]])
adj[-1] = np.array([adj[-1][0]])
return adj
[docs] def slice(self, start, stop, step=1):
"""Return a new LinearBinning containing the given variable slice
Parameters
----------
start : int
end : int
step : int, optional
The start and stop positions as used with Python slice objects.
Returns
-------
sliced_binning : LinearBinning
A :class:`LinearBinning` consisting of the specified slice.
Notes
-----
This will remove any ``subbinnings`` the linear binning might have.
"""
bin_slice = slice(start, stop, step)
# Create new binning
lower = self.bin_edges[:-1][bin_slice]
upper = self.bin_edges[1:][bin_slice]
new_bin_edges = list(lower) + [upper[-1]]
new_binning = LinearBinning(
variable=self.variable,
bin_edges=new_bin_edges,
include_upper=self.include_upper,
)
# Copy and slice values
temp_binning = self.marginalize_subbinnings()
new_values = temp_binning.get_values_as_ndarray()[bin_slice]
new_entries = temp_binning.get_entries_as_ndarray()[bin_slice]
new_sumw2 = temp_binning.get_sumw2_as_ndarray()[bin_slice]
new_binning.set_values_from_ndarray(new_values)
new_binning.set_entries_from_ndarray(new_entries)
new_binning.set_sumw2_from_ndarray(new_sumw2)
return new_binning
[docs] def remove_bin_edges(self, bin_edge_indices):
"""Return a new LinearBinning with the given bin edges removed.
The values of the bins adjacent to the removed bin edges will be
summed up in the resulting larger bin. Please note that bin values
are lost if the first or last binedge of a variable are removed.
Parameters
----------
bin_edge_indices : lists of integers
A list specifying the bin edge indices that should be removed.
Notes
-----
This will remove any ``subbinnings`` the linear binning might have.
"""
# Create new binning
new_bin_edges = list(self.bin_edges)
for i in sorted(bin_edge_indices, reverse=True):
del new_bin_edges[i]
new_binning = LinearBinning(
variable=self.variable,
bin_edges=new_bin_edges,
include_upper=self.include_upper,
)
# Copy and slice values
temp_binning = self.marginalize_subbinnings()
new_values = temp_binning.get_values_as_ndarray()
new_entries = temp_binning.get_entries_as_ndarray()
new_sumw2 = temp_binning.get_sumw2_as_ndarray()
for i in sorted(bin_edge_indices, reverse=True):
if i > 0 and i < new_values.size:
new_values[i - 1] += new_values[i]
new_entries[i - 1] += new_entries[i]
new_sumw2[i - 1] += new_sumw2[i]
if i < new_values.size:
new_values = np.delete(new_values, i)
new_entries = np.delete(new_entries, i)
new_sumw2 = np.delete(new_sumw2, i)
else:
new_values = np.delete(new_values, -1)
new_entries = np.delete(new_entries, -1)
new_sumw2 = np.delete(new_sumw2, -1)
new_binning.set_values_from_ndarray(new_values)
new_binning.set_entries_from_ndarray(new_entries)
new_binning.set_sumw2_from_ndarray(new_sumw2)
return new_binning
def _get_clone_kwargs(self, **kwargs):
"""Get the necessary arguments to clone this object."""
args = {
"variable": self.variable,
"bin_edges": self.bin_edges.tolist(),
"include_upper": self.include_upper,
}
args.update(Binning._get_clone_kwargs(self, bins=None, **kwargs))
del args["bins"]
return args
def __eq__(self, other):
"""Linear binnings are equal if the variable and edges match."""
return (
type(self) is type(other)
and self.variable == other.variable
and np.all(self.bin_edges == other.bin_edges)
and self.include_upper == other.include_upper
and self.subbinnings == other.subbinnings
)
yaml_tag = "!LinearBinning"
class _RectilinearBinProxy(_BinProxy):
"""Indexable class that creates bins on the fly."""
def __getitem__(self, index):
"""Dynamically build a RectangularBin when requested."""
tup = self.binning.get_bin_index_tuple(index)
edges = tuple(
(edg[j], edg[j + 1]) for edg, j in zip(self.binning.bin_edges, tup)
)
data_index = self.binning.get_bin_data_index(index)
args = {
"variables": self.binning.variables,
"edges": edges,
"include_lower": not self.binning.include_upper,
"include_upper": self.binning.include_upper,
}
if not self.binning.is_dummy():
args.update(
{
"value_array": self.binning.value_array[
data_index : data_index + 1
],
"entries_array": self.binning.entries_array[
data_index : data_index + 1
],
"sumw2_array": self.binning.sumw2_array[
data_index : data_index + 1
],
}
)
rbin = RectangularBin(**args)
return rbin
[docs]class RectilinearBinning(CartesianProductBinning):
"""Special case of :class:`CartesianProductBinning` only consisting of :class:`LinearBinning`
Parameters
----------
variables : iterable of str
bin_edges : iterable of iterable of float
The variable names and bin edges for the LinearBinnings.
include_upper : bool, optional
Make bins include upper edges instead of lower edges.
**kwargs : optional
Additional keyword arguments will be passed to :class:`CartesianProductBinning`.
Attributes
----------
variables : tuple of str
The variables on which the bin edges are defined.
bin_edges : tuple of ndarray
The bin edges defining the :class:`LinearBinning` objects.
include_upper : bool
Are the upper edges included in each bin?
binnings : list of LinearBinning
The :class:`LinearBinning` objects that make up the Cartesian product.
bins : list of Bin
The :class:`RectangularBin` instances.
nbins : int
The number of bins in the binning.
bins_shape : tuple of int
The sizes of the constituent binnings.
data_size : int
The number of elements in the data arrays.
Might differ from ``nbins`` due to subbinnings.
subbinnings : dict of {bin_index: Binning}, optional
Subbinnings to replace certain bins.
value_array : slice of ndarray
A slice of a numpy array, where the values of the bins are stored.
entries_array : slice of ndarray
A slice of a numpy array, where the number of entries are stored.
sumw2_array : slice of ndarray
A slice of a numpy array, where the squared weights are stored.
phasespace : PhaseSpace
The :class:`PhaseSpace` the binning resides in.
"""
def __init__(self, variables, bin_edges, include_upper=False, **kwargs):
self.variables = tuple(variables)
self.bin_edges = tuple(np.array(edg) for edg in bin_edges)
self.include_upper = bool(include_upper)
binnings = []
for var, edges in zip(self.variables, self.bin_edges):
binnings.append(
LinearBinning(var, edges, include_upper=include_upper, dummy=True)
)
kwargs["binnings"] = binnings
bins = kwargs.pop("bins", None)
if bins is not None:
raise TypeError(
"Cannot define bins of RectilinearBinning! Define bin edges instead."
)
else:
# Create bin proxy
bins = _RectilinearBinProxy(self)
CartesianProductBinning.__init__(self, **kwargs)
# Replace cartesian proxy with one returning rectangular bins
self.bins = bins
[docs] def get_variable_index(self, variable):
"""Return the index of the binning corresponding to this variable."""
if isinstance(variable, int):
return variable
else:
return self.variables.index(variable)
[docs] def marginalize(self, binning_i, reduction_function=np.sum):
"""Marginalize out the given binnings and return a new RectilinearBinning.
Parameters
----------
binning_i : iterable of int/str
Iterable of index/variable of binning to be marginalized.
reduction_function : function
Use this function to marginalize out the entries over the specified variables.
Must support the `axis` keyword argument.
"""
try:
len(binning_i)
except TypeError:
binning_i = [binning_i]
binning_i = [self.get_variable_index(i) for i in binning_i]
# Create new binning
new_variables = list(self.variables)
new_bin_edges = list(deepcopy(self.bin_edges))
for i in sorted(binning_i, reverse=True):
del new_bin_edges[i]
del new_variables[i]
new_binning = RectilinearBinning(
variables=new_variables,
bin_edges=new_bin_edges,
include_upper=self.include_upper,
)
# Copy and project values, from binning without subbinnings
axes = tuple(sorted(binning_i))
temp_binning = self.marginalize_subbinnings()
new_values = reduction_function(
temp_binning.get_values_as_ndarray(shape=temp_binning.bins_shape), axis=axes
)
new_entries = reduction_function(
temp_binning.get_entries_as_ndarray(shape=temp_binning.bins_shape),
axis=axes,
)
new_sumw2 = reduction_function(
temp_binning.get_sumw2_as_ndarray(shape=temp_binning.bins_shape), axis=axes
)
new_binning.set_values_from_ndarray(new_values)
new_binning.set_entries_from_ndarray(new_entries)
new_binning.set_sumw2_from_ndarray(new_sumw2)
return new_binning
[docs] def project(self, binning_i, **kwargs):
"""Project the binning onto the given binnings and return a new RectilinearBinning.
The order of the original binnings is preserved. If a single ``int`` is
provided, the returned Binning is of the same type as the respective
binning.
Parameters
----------
binning_i : iterable of int/str, or int/str
Iterable of index of binning to be marginalized.
**kwargs : optional
Additional keyword arguments are passed on to :meth:`marginalize`.
Returns
-------
RectilinearBinning or type(self.binnings[binning_i])
"""
try:
i = list(binning_i)
except TypeError:
i = [binning_i]
i = [self.get_variable_index(var) for var in binning_i]
# Which variables to remove
rm_i = list(range(len(self.binnings)))
list(map(rm_i.remove, i))
ret = self.marginalize(rm_i, **kwargs)
if isinstance(binning_i, int) or isinstance(binning_i, str):
return ret._unpack()
else:
return ret
[docs] def slice(self, slices):
"""Return a new RectilinearBinning containing the given variable slice
Parameters
----------
slices : dict of (variable, (start, stop[, step]))
The start and stop positions for the slices of all variables that
should be sliced.
Returns
-------
sliced_binning : RectilinearBinning
A :class:`RectilinearBinning` consisting of the specified slices.
Notes
-----
This will remove any ``subbinnings`` the binning might have.
"""
# Create new binning edges and slice tuple
new_bin_edges = list(deepcopy(self.bin_edges))
all_slices = []
for i, (var, edges) in enumerate(zip(self.variables, self.bin_edges)):
if var in slices:
bin_slice = slice(*slices[var])
lower = edges[:-1][bin_slice]
upper = edges[1:][bin_slice]
new_bin_edges[i] = list(lower) + [upper[-1]]
all_slices.append(bin_slice)
else:
# This variable does not have to be sliced
all_slices.append(slice(None))
all_slices = tuple(all_slices)
# Create new binning
new_binning = RectilinearBinning(
variables=self.variables,
bin_edges=new_bin_edges,
include_upper=self.include_upper,
)
# Copy and slice values
temp_binning = self.marginalize_subbinnings()
new_values = temp_binning.get_values_as_ndarray(shape=temp_binning.bins_shape)[
all_slices
]
new_entries = temp_binning.get_entries_as_ndarray(
shape=temp_binning.bins_shape
)[all_slices]
new_sumw2 = temp_binning.get_sumw2_as_ndarray(shape=temp_binning.bins_shape)[
all_slices
]
new_binning.set_values_from_ndarray(new_values)
new_binning.set_entries_from_ndarray(new_entries)
new_binning.set_sumw2_from_ndarray(new_sumw2)
return new_binning
[docs] def remove_bin_edges(self, bin_edge_indices):
"""Return a new RectilinearBinning with the given bin edges removed.
The values of the bins adjacent to the removed bin edges will be
summed up in the resulting larger bin. Please note that bin values
are lost if the first or last binedge of a variable are removed.
Parameters
----------
bin_edge_indices : dict of (variable: list of int)
Lists specifying the bin edge indices that should be removed.
Notes
-----
This will remove any ``subbinnings`` the rectilinear binning might have.
"""
# Create new binning
new_bin_edges = []
for var, edg in zip(self.variables, self.bin_edges):
new_edg = list(edg)
if var in bin_edge_indices:
for i in sorted(bin_edge_indices[var], reverse=True):
del new_edg[i]
new_bin_edges.append(new_edg)
new_binning = RectilinearBinning(
variables=self.variables,
bin_edges=new_bin_edges,
include_upper=self.include_upper,
)
# Copy and slice values
temp_binning = self.marginalize_subbinnings()
new_values = temp_binning.get_values_as_ndarray(shape=temp_binning.bins_shape)
new_entries = temp_binning.get_entries_as_ndarray(shape=temp_binning.bins_shape)
new_sumw2 = temp_binning.get_sumw2_as_ndarray(shape=temp_binning.bins_shape)
for j, var in enumerate(self.variables):
if var in bin_edge_indices:
for i in sorted(bin_edge_indices[var], reverse=True):
if i > 0 and i < new_values.shape[j]:
lower_tuple = (slice(None),) * j + (i - 1,) + (Ellipsis,)
upper_tuple = (slice(None),) * j + (i,) + (Ellipsis,)
new_values[lower_tuple] += new_values[upper_tuple]
new_entries[lower_tuple] += new_entries[upper_tuple]
new_sumw2[lower_tuple] += new_sumw2[upper_tuple]
if i < new_values.shape[j]:
new_values = np.delete(new_values, i, axis=j)
new_entries = np.delete(new_entries, i, axis=j)
new_sumw2 = np.delete(new_sumw2, i, axis=j)
else:
new_values = np.delete(new_values, -1, axis=j)
new_entries = np.delete(new_entries, -1, axis=j)
new_sumw2 = np.delete(new_sumw2, -1, axis=j)
new_binning.set_values_from_ndarray(new_values)
new_binning.set_entries_from_ndarray(new_entries)
new_binning.set_sumw2_from_ndarray(new_sumw2)
return new_binning
def __eq__(self, other):
"""RectilinearBinnings are equal if the bin edges and variables match."""
return (
type(self) is type(other)
and self.variables == other.variables
and all(
tuple(
np.array_equal(self.bin_edges[i], other.bin_edges[i])
for i in range(len(self.variables))
)
)
and self.include_upper == other.include_upper
and self.subbinnings == other.subbinnings
)
def _get_clone_kwargs(self, **kwargs):
"""Get the necessary arguments to clone this object."""
args = {
"variables": list(self.variables),
"bin_edges": [edg.tolist() for edg in self.bin_edges],
"include_upper": self.include_upper,
}
args.update(Binning._get_clone_kwargs(self, bins=None, **kwargs))
del args["bins"]
return args
yaml_tag = "!RectilinearBinning"