Source code for anesthetic.utils

"""Data-processing utility functions."""
import numpy as np
import pandas
from scipy import special
from scipy.interpolate import interp1d
from scipy.stats import kstwobign, entropy
from matplotlib.tri import Triangulation
import contextlib
import inspect
import re


[docs] def logsumexp(a, axis=None, b=None, keepdims=False, return_sign=False): r"""Compute the log of the sum of exponentials of input elements. This function has the same call signature as :func:`scipy.special.logsumexp` and mirrors scipy's behaviour except for ``-np.inf`` input. If a and b are both ``-inf`` then scipy's function will output ``nan`` whereas here we use: .. math:: \lim_{x \to -\infty} x \exp(x) = 0 Thus, if ``a=-inf`` in ``log(sum(b * exp(a))`` then we can set ``b=0`` such that that term is ignored in the sum. """ if b is None: b = np.ones_like(a) b = np.where(a == -np.inf, 0, b) return special.logsumexp(a, axis=axis, b=b, keepdims=keepdims, return_sign=return_sign)
[docs] def neff(w, beta=1): r"""Calculate effective number of samples. Using the Huggins-Roy family of effective samples (https://aakinshin.net/posts/huggins-roy-ess/). Parameters ---------- beta : int, float, str, default = 1 The value of beta used to calculate the number of effective samples according to .. math:: N_{eff} &= \bigg(\sum_{i=0}^n w_i^\beta \bigg)^{\frac{1}{1-\beta}} w_i &= \frac{w_i}{\sum_j w_j} Beta can take any positive value. Larger beta corresponds to a greater compression such that: .. math:: \beta_1 < \beta_2 \Rightarrow N_{eff}(\beta_1) > N_{eff}(\beta_2) Alternatively, beta can take one of the following strings as input: * If 'inf' or 'equal' is supplied (equivalent to beta=inf), then the resulting number of samples is the number of samples when compressed to equal weights, and given by: .. math:: w_i &= \frac{w_i}{\sum_j w_j} N_{eff} &= \frac{1}{\max_i[w_i]} * If 'entropy' is supplied (equivalent to beta=1), then the estimate is determined via the entropy based calculation, also referred to as the channel capacity: .. math:: H &= -\sum_i p_i \ln p_i p_i &= \frac{w_i}{\sum_j w_j} N_{eff} &= e^{H} * If 'kish' is supplied (equivalent to beta=2), then a Kish estimate is computed (Kish, Leslie (1965). Survey Sampling. New York: John Wiley & Sons, Inc. ISBN 0-471-10949-5): .. math:: N_{eff} = \frac{(\sum_i w_i)^2}{\sum_i w_i^2} * str(float) input gets converted to the corresponding float value. """ w = w / np.sum(w) if beta == np.inf or beta == 'inf' or beta == 'equal': return 1 / np.max(w) elif beta == 'entropy' or beta != 'kish' and str(float(beta)) == '1.0': return np.exp(entropy(w)) else: if beta == 'kish': beta = 2 elif isinstance(beta, str): beta = float(beta) return np.sum(w**beta)**(1/(1-beta))
[docs] def compress_weights(w, u=None, ncompress=True): """Compresses weights to their approximate channel capacity.""" if u is None: u = np.random.rand(len(w)) if w is None: w = np.ones_like(u) if ncompress is True or isinstance(ncompress, str): if ncompress is True: ncompress = 'entropy' ncompress = neff(w, beta=ncompress) elif ncompress is False: return w # TODO: remove this in version >= 2.1 if ncompress < 0: raise ValueError( "ncompress<0 is anesthetic 1.0 behaviour. For equally weighted " "samples you should now use ncompress='inf' or ncompress='equal'." ) W = w * ncompress / w.sum() fraction, integer = np.modf(W) extra = (u < fraction).astype(int) return (integer + extra).astype(int)
[docs] def quantile(a, q, w=None, interpolation='linear'): """Compute the weighted quantile for a one dimensional array.""" if w is None: w = np.ones_like(a) a = np.array(list(a)) # Necessary to convert pandas arrays w = np.array(list(w)) # Necessary to convert pandas arrays i = np.argsort(a) c = np.cumsum(w[i[1:]]+w[i[:-1]]) c = c / c[-1] c = np.concatenate(([0.], c)) icdf = interp1d(c, a[i], kind=interpolation) quant = icdf(q) if isinstance(q, float): quant = float(quant) return quant
[docs] def mirror_1d(d, xmin=None, xmax=None): """If necessary apply reflecting boundary conditions.""" if xmin is not None and xmax is not None: xmed = (xmin+xmax)/2 return np.concatenate((2*xmin-d[d < xmed], d, 2*xmax-d[d >= xmed])) elif xmin is not None: return np.concatenate((2*xmin-d, d)) elif xmax is not None: return np.concatenate((d, 2*xmax-d)) else: return d
[docs] def mirror_2d(d_x_, d_y_, xmin=None, xmax=None, ymin=None, ymax=None): """If necessary apply reflecting boundary conditions.""" d_x = d_x_.copy() d_y = d_y_.copy() if xmin is not None and xmax is not None: xmed = (xmin+xmax)/2 d_y = np.concatenate((d_y[d_x < xmed], d_y, d_y[d_x >= xmed])) d_x = np.concatenate((2*xmin-d_x[d_x < xmed], d_x, 2*xmax-d_x[d_x >= xmed])) elif xmin is not None: d_y = np.concatenate((d_y, d_y)) d_x = np.concatenate((2*xmin-d_x, d_x)) elif xmax is not None: d_y = np.concatenate((d_y, d_y)) d_x = np.concatenate((d_x, 2*xmax-d_x)) if ymin is not None and ymax is not None: ymed = (ymin+ymax)/2 d_x = np.concatenate((d_x[d_y < ymed], d_x, d_x[d_y >= ymed])) d_y = np.concatenate((2*ymin-d_y[d_y < ymed], d_y, 2*ymax-d_y[d_y >= ymed])) elif ymin is not None: d_x = np.concatenate((d_x, d_x)) d_y = np.concatenate((2*ymin-d_y, d_y)) elif ymax is not None: d_x = np.concatenate((d_x, d_x)) d_y = np.concatenate((d_y, 2*ymax-d_y)) return d_x, d_y
[docs] def nest_level(lst): """Calculate the nesting level of a list.""" if not isinstance(lst, list): return 0 if not lst: return 1 return max(nest_level(item) for item in lst) + 1
[docs] def histogram(a, **kwargs): """Produce a histogram for path-based plotting. This is a cheap histogram. Necessary if one wants to update the histogram dynamically, and redrawing and filling is very expensive. This has the same arguments and keywords as :func:`numpy.histogram`, but is normalised to 1. """ hist, bin_edges = np.histogram(a, **kwargs) xpath, ypath = np.empty((2, 4*len(hist))) ypath[0::4] = ypath[3::4] = 0 ypath[1::4] = ypath[2::4] = hist xpath[0::4] = xpath[1::4] = bin_edges[:-1] xpath[2::4] = xpath[3::4] = bin_edges[1:] mx = max(ypath) if mx: ypath /= max(ypath) return xpath, ypath
[docs] def histogram_bin_edges(samples, weights, bins='fd', range=None, beta='equal'): """Compute a good number of bins dynamically from weighted samples. Parameters ---------- samples : array_like Input data. weights : array-like Array of sample weights. bins : str, default='fd' String defining the rule used to automatically compute a good number of bins for the weighted samples: * 'fd' : Freedman--Diaconis rule (modified for weighted data) * 'scott' : Scott's rule (modified for weighted data) * 'sqrt' : Square root estimator (modified for weighted data) range : (float, float), optional The lower and upper range of the bins. If not provided, range is simply ``(a.min(), a.max())``. Values outside the range are ignored. The first element of the range must be less than or equal to the second. beta : float, default='equal' The value of beta>0 used to calculate the number of effective samples via :func:`neff`. Returns ------- bin_edges : array of dtype float The edges to pass to :func:`numpy.histogram`. """ if weights is None: weights = np.ones_like(samples) if range is None: minimum = np.min(samples) maximum = np.max(samples) data_range = maximum - minimum else: minimum = range[0] maximum = range[1] data_range = maximum - minimum data_mask = (samples >= minimum) & (samples <= maximum) samples = samples[data_mask] weights = weights[data_mask] w = weights / np.sum(weights) N_eff = neff(w=w, beta=beta) if bins == 'fd': # Freedman--Diaconis rule q1, q3 = quantile(samples, [1/4, 3/4], w=w) bin_width = 2 * (q3 - q1) * N_eff**(-1/3) elif bins == 'scott': # Scott's rule weighted_mean = np.average(samples, weights=w) weighted_var = np.average((samples - weighted_mean)**2, weights=w) weighted_std = np.sqrt(weighted_var) bin_width = (24 * np.pi**0.5 / N_eff)**(1/3) * weighted_std elif bins == 'sqrt': # Square root estimator samples_eff, _ = sample_compression_1d(samples, w=w, ncompress=N_eff) data_range_eff = np.max(samples_eff) - np.min(samples_eff) bin_width = data_range_eff / np.sqrt(N_eff) nbins = int(np.ceil(data_range / bin_width)) bin_edges = np.linspace(minimum, maximum, nbins+1) return bin_edges
[docs] def compute_nlive(death, birth): """Compute number of live points from birth and death contours. Parameters ---------- death, birth : array-like list of birth and death contours Returns ------- nlive : np.array number of live points at each contour """ b = pandas.DataFrame(np.array(birth), columns=['logL']) d = pandas.DataFrame(np.array(death), columns=['logL'], index=b.index + len(b)) b['n'] = +1 d['n'] = -1 t = pandas.concat([b, d]).sort_values(['logL', 'n']) n = t.n.cumsum() return (n[d.index]+1).to_numpy()
[docs] def compute_insertion_indexes(death, birth): """Compute the live point insertion index for each point. For more detail, see `Fowlie et al. (2020) <https://arxiv.org/abs/2006.03371>`_ Parameters ---------- death, birth : array-like list of birth and death contours Returns ------- indexes : np.array live point index at which each live point was inserted """ indexes = np.zeros_like(birth, dtype=int) for i, (b, d) in enumerate(zip(birth, death)): i_live = (death > b) & (birth <= b) live = death[i_live] live.sort() indexes[i] = np.searchsorted(live, d) return indexes
[docs] def unique(a): """Find unique elements, retaining order.""" b = [] for x in a: if x not in b: b.append(x) return b
[docs] def iso_probability_contours(pdf, contours=[0.95, 0.68]): """Compute the iso-probability contour values.""" if len(contours) > 1 and not np.all(contours[:-1] > contours[1:]): raise ValueError( "The kwargs `levels` and `contours` have to be ordered from " "outermost to innermost contour, i.e. in strictly descending " "order when referring to the enclosed probability mass, e.g. " "like the default [0.95, 0.68]. " "This breaking change in behaviour was introduced in version " "2.0.0-beta.10, in order to better match the ordering of other " "matplotlib kwargs." ) contours = [1-p for p in contours] p = np.sort(np.array(pdf).flatten()) m = np.cumsum(p) m /= m[-1] interp = interp1d([0]+list(m), [0]+list(p)) c = list(interp(contours))+[max(p)] return c
[docs] def iso_probability_contours_from_samples(pdf, contours=[0.95, 0.68], weights=None): """Compute the iso-probability contour values.""" if len(contours) > 1 and not np.all(contours[:-1] > contours[1:]): raise ValueError( "The kwargs `levels` and `contours` have to be ordered from " "outermost to innermost contour, i.e. in strictly descending " "order when referring to the enclosed probability mass, e.g. " "like the default [0.95, 0.68]. " "This breaking change in behaviour was introduced in version " "2.0.0-beta.10, in order to better match the ordering of other " "matplotlib kwargs." ) if weights is None: weights = np.ones_like(pdf) contours = [1-p for p in contours] i = np.argsort(pdf) m = np.cumsum(weights[i]) m /= m[-1] interp = interp1d([0]+list(m), [0]+list(pdf[i])) c = list(interp(contours))+[max(pdf)] return c
[docs] def scaled_triangulation(x, y, cov): """Triangulation scaled by a covariance matrix. Parameters ---------- x, y : array-like x and y coordinates of samples cov : array-like, 2d Covariance matrix for scaling Returns ------- :class:`matplotlib.tri.Triangulation` Triangulation with the appropriate scaling """ L = np.linalg.cholesky(cov) Linv = np.linalg.inv(L) x_, y_ = Linv.dot([x, y]) tri = Triangulation(x_, y_) return Triangulation(x, y, tri.triangles)
[docs] def triangular_sample_compression_2d(x, y, cov, w=None, n=1000): """Histogram a 2D set of weighted samples via triangulation. This defines bins via a triangulation of the subsamples and sums weights within triangles surrounding each point Parameters ---------- x, y : array-like x and y coordinates of samples for compressing cov : array-like, 2d Covariance matrix for scaling w : :class:`pandas.Series`, optional weights of samples n : int, default=1000 number of samples returned. Returns ------- tri : :class:`matplotlib.tri.Triangulation` with an appropriate scaling w : array-like Compressed samples and weights """ # Pre-process samples to not be affected by non-standard indexing # Details: https://github.com/handley-lab/anesthetic/issues/189 x = np.array(x) y = np.array(y) x = pandas.Series(x) if w is None: w = pandas.Series(index=x.index, data=np.ones_like(x)) if n is False: n = len(x) elif n is True or isinstance(n, str): if n is True: n = 'entropy' n = int(neff(w, beta=n)) # Select samples for triangulation if (w != 0).sum() < n: i = x.index else: i = np.random.choice(x.index, size=n, replace=False, p=w/w.sum()) # Generate triangulation tri = scaled_triangulation(x[i], y[i], cov) # For each point find corresponding triangles trifinder = tri.get_trifinder() j = trifinder(x, y) k = tri.triangles[j[j != -1]] # Compute mass in each triangle, and add it to each corner w_ = np.zeros(len(i)) for i in range(3): np.add.at(w_, k[:, i], w[j != -1]/3) return tri, w_
[docs] def sample_compression_1d(x, w=None, ncompress=True): """Histogram a 1D set of weighted samples via subsampling. This compresses the number of samples, combining weights. Parameters ---------- x : array-like x coordinate of samples for compressing w : :class:`pandas.Series`, optional weights of samples ncompress : int, default=True Degree of compression. * If ``int``: number of samples returned. * If ``True``: compresses to the channel capacity (same as ``ncompress='entropy'``). * If ``False``: no compression. * If ``str``: determine number from the Huggins-Roy family of effective samples in :func:`neff` with ``beta=ncompress``. Returns ------- x, w: array-like Compressed samples and weights """ if ncompress is False: return x, w elif ncompress is True or isinstance(ncompress, str): if ncompress is True: ncompress = 'entropy' ncompress = neff(w, beta=ncompress) x = np.array(x) if w is None: w = np.ones_like(x) w = np.array(w) # Select inner samples for triangulation if len(x) > ncompress: x_ = np.random.choice(x, size=int(ncompress), replace=False) else: x_ = x.copy() x_.sort() # Compress mass onto these subsamples centers = (x_[1:] + x_[:-1])/2 j = np.digitize(x, centers) w_ = np.zeros_like(x_) np.add.at(w_, j, w) return x_, w_
[docs] def is_int(x): """Test whether x is an integer.""" return isinstance(x, int) or isinstance(x, np.integer)
[docs] def match_contour_to_contourf(contours, vmin, vmax): """Get needed `vmin, vmax` to match `contour` colors to `contourf` colors. `contourf` uses the arithmetic mean of contour levels to assign colors, whereas `contour` uses the contour level directly. To get the same colors for `contour` lines as for `contourf` faces, we need some fiddly algebra. """ if len(contours) <= 2: vmin = 2 * vmin - vmax return vmin, vmax else: c0 = contours[0] c1 = contours[1] ce = contours[-2] denom = vmax + ce - c1 - c0 vmin = +(c0 * vmax - c1 * ce + 2 * vmin * (ce - c0)) / denom vmax = -(c0 * vmax + c1 * ce - 2 * vmax * ce) / denom return vmin, vmax
[docs] def insertion_p_value(indexes, nlive, batch=0): """Compute the p-value from insertion indexes, assuming constant nlive. Note that this function doesn't use :func:`scipy.stats.kstest` as the latter assumes continuous distributions. For more detail, see `Fowlie et al. (2020) <https://arxiv.org/abs/2006.03371>`_ For a rolling test, you should provide the optional parameter ``batch!=0``. In this case the test computes the p-value on consecutive batches of size ``nlive * batch``, selects the smallest one and adjusts for multiple comparisons using a Bonferroni correction. Parameters ---------- indexes : array-like list of insertion indexes, sorted by death contour nlive : int number of live points batch : float batch size in units of nlive for a rolling p-value Returns ------- ks_result : dict Kolmogorov-Smirnov test results: * ``D``: Kolmogorov-Smirnov statistic * ``sample_size``: sample size * ``p-value``: p-value if ``batch != 0``: * ``iterations``: bounds of batch with minimum p-value * ``nbatches``: the number of batches in total * ``uncorrected p-value``: p-value without Bonferroni correction """ if batch == 0: bins = np.arange(-0.5, nlive + 0.5, 1.) empirical_pmf = np.histogram( np.array(indexes), bins=bins, density=True, )[0] empirical_cmf = np.cumsum(empirical_pmf) uniform_cmf = np.arange(1., nlive + 1., 1.) / nlive D = abs(empirical_cmf - uniform_cmf).max() sample_size = len(indexes) K = D * np.sqrt(sample_size) ks_result = {} ks_result["D"] = D ks_result["sample_size"] = sample_size ks_result["p-value"] = kstwobign.sf(K) return ks_result else: batch = int(batch * nlive) batches = [ np.array(indexes)[i:i + batch] for i in range(0, len(indexes), batch) ] ks_results = [insertion_p_value(c, nlive) for c in batches] ks_result = min(ks_results, key=lambda t: t["p-value"]) index = ks_results.index(ks_result) ks_result["iterations"] = (index * batch, (index + 1) * batch) ks_result["nbatches"] = n = len(batches) ks_result["uncorrected p-value"] = p = ks_result["p-value"] ks_result["p-value"] = 1. - (1. - p)**n if p != 1 else p * n return ks_result
[docs] @contextlib.contextmanager def temporary_seed(seed): """Context for temporarily setting a numpy seed.""" state = np.random.get_state() np.random.seed(seed) try: yield finally: np.random.set_state(state)
[docs] def adjust_docstrings(obj, pattern, repl, *args, **kwargs): """Adjust the docstrings of a class using regular expressions. After the first argument, the remaining arguments are identical to re.sub. Parameters ---------- cls : class class to adjust pattern : str regular expression pattern repl : str replacement string """ if inspect.isclass(obj): for key, val in obj.__dict__.items(): doc = inspect.getdoc(val) if doc is not None: newdoc = re.sub(pattern, repl, doc, *args, **kwargs) try: obj.__dict__[key].__doc__ = newdoc except AttributeError: pass else: doc = inspect.getdoc(obj) if doc is not None: newdoc = re.sub(pattern, repl, doc, *args, **kwargs) obj.__doc__ = newdoc