Source code for anesthetic.utils

"""Data-processing utility functions."""
import numpy as np
import pandas
from scipy import special
from scipy.interpolate import interp1d
from scipy.optimize import minimize_scalar
from scipy.stats import kstwobign, entropy
from matplotlib.tri import Triangulation
import contextlib
import inspect
import re


[docs] def logsumexp(a, axis=None, b=None, keepdims=False, return_sign=False): r"""Compute the log of the sum of exponentials of input elements. This function has the same call signature as :func:`scipy.special.logsumexp` and mirrors scipy's behaviour except for ``-np.inf`` input. If a and b are both ``-inf`` then scipy's function will output ``nan`` whereas here we use: .. math:: \lim_{x \to -\infty} x \exp(x) = 0 Thus, if ``a=-inf`` in ``log(sum(b * exp(a))`` then we can set ``b=0`` such that that term is ignored in the sum. """ if b is None: b = np.ones_like(a) b = np.where(a == -np.inf, 0, b) return special.logsumexp(a, axis=axis, b=b, keepdims=keepdims, return_sign=return_sign)
[docs] def neff(w, beta=1): r"""Calculate effective number of samples. Using the Huggins-Roy family of effective samples (https://aakinshin.net/posts/huggins-roy-ess/). Parameters ---------- beta : int, float, str, default = 1 The value of beta used to calculate the number of effective samples according to .. math:: N_{eff} &= \bigg(\sum_{i=0}^n w_i^\beta \bigg)^{\frac{1}{1-\beta}} w_i &= \frac{w_i}{\sum_j w_j} Beta can take any positive value. Larger beta corresponds to a greater compression such that: .. math:: \beta_1 < \beta_2 \Rightarrow N_{eff}(\beta_1) > N_{eff}(\beta_2) Alternatively, beta can take one of the following strings as input: * If 'inf' or 'equal' is supplied (equivalent to beta=inf), then the resulting number of samples is the number of samples when compressed to equal weights, and given by: .. math:: w_i &= \frac{w_i}{\sum_j w_j} N_{eff} &= \frac{1}{\max_i[w_i]} * If 'entropy' is supplied (equivalent to beta=1), then the estimate is determined via the entropy based calculation, also referred to as the channel capacity: .. math:: H &= -\sum_i p_i \ln p_i p_i &= \frac{w_i}{\sum_j w_j} N_{eff} &= e^{H} * If 'kish' is supplied (equivalent to beta=2), then a Kish estimate is computed (Kish, Leslie (1965). Survey Sampling. New York: John Wiley & Sons, Inc. ISBN 0-471-10949-5): .. math:: N_{eff} = \frac{(\sum_i w_i)^2}{\sum_i w_i^2} * str(float) input gets converted to the corresponding float value. """ w = w / np.sum(w) if beta == np.inf or beta == 'inf' or beta == 'equal': return 1 / np.max(w) elif beta == 'entropy' or beta != 'kish' and str(float(beta)) == '1.0': return np.exp(entropy(w)) else: if beta == 'kish': beta = 2 elif isinstance(beta, str): beta = float(beta) return np.sum(w**beta)**(1/(1-beta))
[docs] def compress_weights(w, u=None, ncompress=True): """Compresses weights to their approximate channel capacity.""" if u is None: u = np.random.rand(len(w)) if w is None: w = np.ones_like(u) if ncompress is True or isinstance(ncompress, str): if ncompress is True: ncompress = 'entropy' ncompress = neff(w, beta=ncompress) elif ncompress is False: return w # TODO: remove this in version >= 2.1 if ncompress < 0: raise ValueError( "ncompress<0 is anesthetic 1.0 behaviour. For equally weighted " "samples you should now use ncompress='inf' or ncompress='equal'." ) W = w * ncompress / w.sum() fraction, integer = np.modf(W) integer = integer.astype(int) if is_int(ncompress): remainder = ncompress - integer.sum() mask = fraction > 0 race_time = np.full_like(fraction, np.inf) # exp-race arrival time race_time[mask] = -np.log(u[mask])/fraction[mask] idx = np.argpartition(race_time, remainder-1)[:remainder] extra = np.bincount(idx, minlength=len(integer)) else: extra = (u < fraction).astype(int) return integer + extra
[docs] def quantile(a, q, w=None, interpolation='linear'): """Compute the weighted quantile for a one dimensional array.""" if w is None: w = np.ones_like(a) a = np.array(list(a)) # Necessary to convert pandas arrays w = np.array(list(w)) # Necessary to convert pandas arrays i = np.argsort(a) c = np.cumsum(w[i[1:]]+w[i[:-1]]) c = c / c[-1] c = np.concatenate(([0.], c)) icdf = interp1d(c, a[i], kind=interpolation) quant = icdf(q) if isinstance(q, float): quant = float(quant) return quant
[docs] def var_unbiased(a, w, axis=0, ddof=1): """Compute the unbiased variance from weighted samples. Uses the standard reliability-weight correction var = s2 / (v1 - v2/v1) (for ddof=1), and supports the frequency-weight case by using var = s2 / (v1 - ddof) when w is integer and v1 > 1. Parameters ---------- a : np.array Input samples. w : np.array Associated sample weights. Integer -> frequency weights; Float -> reliability weights. axis : int Axis along which to compute variance. ddof : int, default=1 Delta degrees of freedom. Returns ------- var : float, np.array Unbiased variance. """ mu = np.ma.filled(np.average(a, weights=w, axis=axis), np.nan) v1 = np.ma.filled(w.sum(axis=axis), 0.0) if np.issubdtype(w.dtype, np.integer): v2 = v1 else: # ---- reliability weights branch ---- v2 = np.ma.filled((w ** 2).sum(axis=axis), 0.0) s2 = np.ma.filled((w * (a - mu)**2).sum(axis=axis), 0.0) invalid = (v1 == 0) | (v1**2 - ddof*v2 == 0) | np.isnan(mu) | np.isnan(s2) nans = np.full_like(v1, np.nan, dtype=float) var = np.divide(v1 * s2, v1**2 - ddof*v2, out=nans.copy(), where=~invalid) return var if np.ndim(var) > 0 else np.float64(var)
[docs] def cov_unbiased(a, w, ddof=1, return_corr=False): """Compute the unbiased covariance from weighted samples. Parameters ---------- a : np.array, shape (n_samples, n_features) Input samples in rows, features/parameters in columns. w : np.array, shape (n_samples,) Associated sample weights. Integer -> frequency weights; Float -> reliability weights. ddof : int, default=1 Delta degrees of freedom. Returns ------- cov : ndarray, shape (n_features, n_features) Unbiased covariance matrix. """ a = np.atleast_2d(a) # If all weights are zero, mimic pandas' "all NaN" behaviour upstream. if w.sum() == 0: return np.full((a.shape[1], a.shape[1]), np.nan, dtype=float) # ---- fast path: no NaNs anywhere ---- if not return_corr: if np.isfinite(a).all() and a.shape[0] >= 2 and a.shape[1] >= 2: if np.issubdtype(w.dtype, np.integer): return np.cov(a, rowvar=False, fweights=w, ddof=ddof) else: return np.cov(a, rowvar=False, aweights=w, ddof=ddof) M = np.isfinite(a) # shape (n, p) A = np.where(M, a, 0.0) # shape (n, p) WA = (w[:, None] * A).T # shape (n, p) V1 = (w[:, None] * M).T @ M # shape (p, p) S1 = WA @ M # shape (p, p) S2 = WA @ A # shape (p, p) if np.issubdtype(w.dtype, np.integer): # ---- frequency weights branch ---- V2 = V1 else: # ---- reliability weights branch ---- V2 = ((w**2)[:, None] * M).T @ M # shape (p, p) invalid = (V1 == 0) | (V1**2 - ddof * V2 == 0) cov = np.full_like(S2, np.nan, dtype=float) cov = np.divide(V1 * S2 - S1.T * S1, V1**2 - ddof * V2, out=cov.copy(), where=~invalid) if not return_corr: return cov # pairwise variances on the SAME intersections S2i = (w[:, None] * A * A).T @ M nans = np.full_like(cov, np.nan, dtype=float) var_i = np.divide(V1 * S2i - S1**2, V1**2 - 1 * V2, out=nans.copy(), where=~invalid) invalid = invalid | (var_i * var_i.T <= 0) corr = np.divide(cov, np.sqrt(var_i * var_i.T), out=nans.copy(), where=~invalid) return corr
[docs] def skew_unbiased(a, w, axis=0): """Compute the unbiased skewness from weighted samples. Adapted from Lorenzo Rimoldini (2013): https://arxiv.org/pdf/1304.6564 Parameters ---------- a : np.array Input samples. w : np.array Associated sample weights. Integer -> frequency weights; Float -> reliability weights. axis : int Axis along which to compute kurtosis. Returns ------- skew : float, np.array Unbiased skewness (`G1`). """ mu = np.ma.filled(np.average(a, weights=w, axis=axis), np.nan) v1 = np.ma.filled(w.sum(axis=axis), 0.0) v2 = np.ma.filled((w**2).sum(axis=axis), 0.0) v3 = np.ma.filled((w**3).sum(axis=axis), 0.0) s2 = np.ma.filled((w * (a - mu)**2).sum(axis=axis), 0.0) s3 = np.ma.filled((w * (a - mu)**3).sum(axis=axis), 0.0) nans = np.full_like(v1, np.nan, dtype=float) # ---- frequency weights branch ---- if np.issubdtype(w.dtype, np.integer): n = v1 if np.all(n <= 2): return np.nan invalid = (n <= 2) | np.isnan(mu) | np.isnan(s2) | np.isnan(s3) # pandas convention: zero variance (s2==0) => skew = 0 degenerate = (~invalid) & (s2 == 0) skew = np.divide(n * (n-1)**0.5 * s3, (n-2) * s2**1.5, out=nans.copy(), where=(~invalid) & (~degenerate)) skew = np.where(degenerate, 0.0, skew) return skew if np.ndim(skew) > 0 else np.float64(skew) # ---- reliability weights branch (Rimoldini) ---- if a.shape[axis] <= 2: return np.nan invalid = ((v1 == 0) | (v1**2 - v2 == 0) | (v1**3 - 3*v1*v2 + 2*v3 == 0) | np.isnan(mu) | np.isnan(s2) | np.isnan(s3)) k2 = np.divide(v1 * s2, v1**2 - v2, out=nans.copy(), where=~invalid) k3 = np.divide(v1**2 * s3, v1**3 - 3*v1*v2 + 2*v3, out=nans.copy(), where=~invalid) # pandas convention: zero variance (k2==0) => skew = 0 degenerate = (~invalid) & (k2 == 0) skew = np.divide(k3, k2**1.5, out=nans.copy(), where=(~invalid) & (~degenerate)) skew = np.where(degenerate, 0.0, skew) return skew if np.ndim(skew) > 0 else np.float64(skew)
[docs] def kurt_unbiased(a, w, axis=0): """Compute the unbiased kurtosis from weighted samples. Adapted from Lorenzo Rimoldini (2013): https://arxiv.org/pdf/1304.6564 Parameters ---------- a : np.array Input samples. w : np.array Associated sample weights. Integer -> frequency weights; Float -> reliability weights. axis : int Axis along which to compute kurtosis. Returns ------- kurt : float, np.array Unbiased kurtosis (`G2`). """ mu = np.ma.filled(np.average(a, weights=w, axis=axis), np.nan) v1 = np.ma.filled(w.sum(axis=axis), 0.0) v2 = np.ma.filled((w**2).sum(axis=axis), 0.0) v3 = np.ma.filled((w**3).sum(axis=axis), 0.0) v4 = np.ma.filled((w**4).sum(axis=axis), 0.0) s2 = np.ma.filled((w * (a - mu)**2).sum(axis=axis), 0.0) s4 = np.ma.filled((w * (a - mu)**4).sum(axis=axis), 0.0) nans = np.full_like(v1, np.nan, dtype=float) # ---- frequency weights branch ---- if np.issubdtype(w.dtype, np.integer): n = v1 if np.all(n <= 3): return np.nan invalid = (n <= 3) | np.isnan(mu) | np.isnan(s2) | np.isnan(s4) # pandas convention: zero variance (s2==0) => kurt = 0 degenerate = (~invalid) & (s2 == 0) kurt = np.divide((n+1)*n*(n-1)*s4-3*(n-1)**2*s2**2, (n-2)*(n-3)*s2**2, out=nans.copy(), where=(~invalid) & (~degenerate)) kurt = np.where(degenerate, 0.0, kurt) return kurt if np.ndim(kurt) > 0 else np.float64(kurt) # ---- reliability weights branch (Rimoldini) ---- if a.shape[axis] <= 3: return np.nan den2 = v1**2-v2 den4 = (v1**2 - v2) * (v1**4 - 6*v1**2*v2 + 8*v1*v3 + 3*v2**2 - 6*v4) num41 = v1 * (v1**4 - 4*v1*v3 + 3*v2**2) num42 = 3 * (v1**4 - 2*v1**2*v2 + 4*v1*v3 - 3*v2**2) invalid = ((v1 == 0) | (den2 == 0) | (den4 == 0) | np.isnan(mu) | np.isnan(s2) | np.isnan(s4)) k2 = np.divide(v1 * s2, den2, out=nans.copy(), where=~invalid) k4 = np.divide(num41*s4-num42*s2**2, den4, out=nans.copy(), where=~invalid) # pandas convention: zero variance (k2==0) => kurt = 0 degenerate = (~invalid) & (k2 == 0) invalid = invalid | (k2 == 0) kurt = np.divide(k4, k2**2, out=nans.copy(), where=~invalid) kurt = np.where(degenerate, 0.0, kurt) return kurt if np.ndim(kurt) > 0 else np.float64(kurt)
[docs] def sample_cdf(samples, inverse=False, interpolation='linear'): """Sample the empirical cdf for a 1d array.""" samples = np.sort(samples) ngaps = len(samples)-1 gaps = np.random.dirichlet(np.ones(ngaps)) cdf = np.array([0, *np.cumsum(gaps)]) # The last element should be one (up to floating point errors) because # dirichlet sums to one. Set exactly to avoid interpolation errors. cdf[-1] = 1 if inverse: return interp1d(cdf, samples, kind=interpolation) else: return interp1d(samples, cdf, kind=interpolation, fill_value=(0, 1), bounds_error=False)
[docs] def credibility_interval(samples, weights=None, level=0.68, method="iso-pdf", return_covariance=False, nsamples=12): """Compute the credibility interval of weighted samples. Based on linear interpolation of the cumulative density function, thus expect discretisation errors on the scale of distances between samples. https://github.com/Stefan-Heimersheim/fastCI#readme Parameters ---------- samples : array Samples to compute the credibility interval of. weights : array, default=np.ones_like(samples) Weights corresponding to samples. level : float, default=0.68 Credibility level (probability, <1). method : str, default='iso-pdf' Which definition of interval to use: * ``'iso-pdf'``: Calculate iso probability density interval with the same probability density at each end. Also known as waterline-interval or highest average posterior density interval. This is only accurate if the distribution is sufficiently uni-modal. * ``'lower-limit'``/``'upper-limit'``: Lower/upper limit. One-sided limits for which ``level`` fraction of the (equally weighted) samples lie above/below the limit. * ``'equal-tailed'``: Equal-tailed interval with the same fraction of (equally weighted) samples below and above the interval region. return_covariance: bool, default=False Return the covariance of the sampled limits, in addition to the mean nsamples : int, default=12 Number of CDF samples to improve `mean` and `std` estimate. Returns ------- limit(s) : float, array, or tuple of floats or arrays Returns the credibility interval boundari(es). By default, returns the mean over ``nsamples`` samples, which is either two numbers (``method='iso-pdf'``/``'equal-tailed'``) or one number (``method='lower-limit'``/``'upper-limit'``). If ``return_covariance=True``, returns a tuple (mean(s), covariance) where covariance is the covariance over the sampled limits. """ if level >= 1: raise ValueError('level must be <1, got {0:.2f}'.format(level)) if len(np.shape(samples)) != 1: raise ValueError('Support only 1D arrays for samples') if weights is not None and np.shape(samples) != np.shape(weights): raise ValueError('Shape of samples and weights differs') # Convert to numpy to unify indexing samples = np.array(samples.copy()) if weights is None: weights = np.ones(len(samples)) else: weights = np.array(weights.copy()) # Convert samples to unit weight not the case if not np.all(np.logical_or(weights == 0, weights == 1)): # compress_weights with ncompress='equal' assures weights \in 0,1 # Note that this must be done, we cannot handle weights != 1 # see this discussion for details: # https://github.com/williamjameshandley/anesthetic/pull/188#issuecomment-1274980982 weights = compress_weights(weights, ncompress='equal') indices = np.where(weights)[0] x = samples[indices] # Sample the confidence interval multiple times # to get errorbars on confidence interval boundaries ci_samples = [] for i in range(nsamples): invCDF = sample_cdf(x, inverse=True) if method == 'iso-pdf': # Find smallest interval def distance(Y, level=level): return invCDF(Y+level)-invCDF(Y) res = minimize_scalar(distance, bounds=(0, 1-level), method="Bounded") ci_samples.append(np.array([invCDF(res.x), invCDF(res.x+level)])) elif method == 'lower-limit': # Get value from which we reach the desired level ci_samples.append(invCDF(1-level)) elif method == 'upper-limit': # Get value to which we reach the desired level ci_samples.append(invCDF(level)) elif method == 'equal-tailed': ci_samples.append(np.array([invCDF((1-level)/2), invCDF((1+level)/2)])) else: raise ValueError("Method '{0:}' unknown".format(method)) ci_samples = np.array(ci_samples) if np.shape(ci_samples) == (nsamples, ): if return_covariance: return np.mean(ci_samples), np.cov(ci_samples) else: return np.mean(ci_samples) else: if return_covariance: return np.mean(ci_samples, axis=0), \ np.cov(ci_samples, rowvar=False) else: return np.mean(ci_samples, axis=0)
[docs] def mirror_1d(d, xmin=None, xmax=None): """If necessary apply reflecting boundary conditions.""" if xmin is not None and xmax is not None: xmed = (xmin+xmax)/2 return np.concatenate((2*xmin-d[d < xmed], d, 2*xmax-d[d >= xmed])) elif xmin is not None: return np.concatenate((2*xmin-d, d)) elif xmax is not None: return np.concatenate((d, 2*xmax-d)) else: return d
[docs] def mirror_2d(d_x_, d_y_, xmin=None, xmax=None, ymin=None, ymax=None): """If necessary apply reflecting boundary conditions.""" d_x = d_x_.copy() d_y = d_y_.copy() if xmin is not None and xmax is not None: xmed = (xmin+xmax)/2 d_y = np.concatenate((d_y[d_x < xmed], d_y, d_y[d_x >= xmed])) d_x = np.concatenate((2*xmin-d_x[d_x < xmed], d_x, 2*xmax-d_x[d_x >= xmed])) elif xmin is not None: d_y = np.concatenate((d_y, d_y)) d_x = np.concatenate((2*xmin-d_x, d_x)) elif xmax is not None: d_y = np.concatenate((d_y, d_y)) d_x = np.concatenate((d_x, 2*xmax-d_x)) if ymin is not None and ymax is not None: ymed = (ymin+ymax)/2 d_x = np.concatenate((d_x[d_y < ymed], d_x, d_x[d_y >= ymed])) d_y = np.concatenate((2*ymin-d_y[d_y < ymed], d_y, 2*ymax-d_y[d_y >= ymed])) elif ymin is not None: d_x = np.concatenate((d_x, d_x)) d_y = np.concatenate((2*ymin-d_y, d_y)) elif ymax is not None: d_x = np.concatenate((d_x, d_x)) d_y = np.concatenate((d_y, 2*ymax-d_y)) return d_x, d_y
[docs] def nest_level(lst): """Calculate the nesting level of a list.""" if not isinstance(lst, list): return 0 if not lst: return 1 return max(nest_level(item) for item in lst) + 1
[docs] def histogram(a, **kwargs): """Produce a histogram for path-based plotting. This is a cheap histogram. Necessary if one wants to update the histogram dynamically, and redrawing and filling is very expensive. This has the same arguments and keywords as :func:`numpy.histogram`, but is normalised to 1. """ hist, bin_edges = np.histogram(a, **kwargs) xpath, ypath = np.empty((2, 4*len(hist))) ypath[0::4] = ypath[3::4] = 0 ypath[1::4] = ypath[2::4] = hist xpath[0::4] = xpath[1::4] = bin_edges[:-1] xpath[2::4] = xpath[3::4] = bin_edges[1:] mx = max(ypath) if mx: ypath /= max(ypath) return xpath, ypath
[docs] def histogram_bin_edges(samples, weights, bins='fd', range=None, beta='equal'): """Compute a good number of bins dynamically from weighted samples. Parameters ---------- samples : array_like Input data. weights : array-like Array of sample weights. bins : str, default='fd' String defining the rule used to automatically compute a good number of bins for the weighted samples: * 'fd' : Freedman--Diaconis rule (modified for weighted data) * 'scott' : Scott's rule (modified for weighted data) * 'sqrt' : Square root estimator (modified for weighted data) range : (float, float), optional The lower and upper range of the bins. If not provided, range is simply ``(a.min(), a.max())``. Values outside the range are ignored. The first element of the range must be less than or equal to the second. beta : float, default='equal' The value of beta>0 used to calculate the number of effective samples via :func:`neff`. Returns ------- bin_edges : array of dtype float The edges to pass to :func:`numpy.histogram`. """ if weights is None: weights = np.ones_like(samples) if range is None: minimum = np.min(samples) maximum = np.max(samples) data_range = maximum - minimum else: minimum = range[0] maximum = range[1] data_range = maximum - minimum data_mask = (samples >= minimum) & (samples <= maximum) samples = samples[data_mask] weights = weights[data_mask] w = weights / np.sum(weights) N_eff = neff(w=w, beta=beta) if bins == 'fd': # Freedman--Diaconis rule q1, q3 = quantile(samples, [1/4, 3/4], w=w) bin_width = 2 * (q3 - q1) * N_eff**(-1/3) elif bins == 'scott': # Scott's rule weighted_mean = np.average(samples, weights=w) weighted_var = np.average((samples - weighted_mean)**2, weights=w) weighted_std = np.sqrt(weighted_var) bin_width = (24 * np.pi**0.5 / N_eff)**(1/3) * weighted_std elif bins == 'sqrt': # Square root estimator samples_eff, _ = sample_compression_1d(samples, w=w, ncompress=N_eff) data_range_eff = np.max(samples_eff) - np.min(samples_eff) bin_width = data_range_eff / np.sqrt(N_eff) nbins = int(np.ceil(data_range / bin_width)) bin_edges = np.linspace(minimum, maximum, nbins+1) return bin_edges
[docs] def compute_nlive(death, birth): """Compute number of live points from birth and death contours. Parameters ---------- death, birth : array-like list of birth and death contours Returns ------- nlive : np.array number of live points at each contour """ b = pandas.DataFrame(np.array(birth), columns=['logL']) d = pandas.DataFrame(np.array(death), columns=['logL'], index=b.index + len(b)) b['n'] = +1 d['n'] = -1 t = pandas.concat([b, d]).sort_values(['logL', 'n'], na_position='first') n = t.n.cumsum() return (n[d.index]+1).to_numpy()
[docs] def compute_insertion_indexes(death, birth): """Compute the live point insertion index for each point. For more detail, see `Fowlie et al. (2020) <https://arxiv.org/abs/2006.03371>`_ Parameters ---------- death, birth : array-like list of birth and death contours Returns ------- indexes : np.array live point index at which each live point was inserted """ indexes = np.zeros_like(birth, dtype=int) for i, (b, d) in enumerate(zip(birth, death)): i_live = (death > b) & (birth <= b) live = death[i_live] live.sort() indexes[i] = np.searchsorted(live, d) return indexes
[docs] def unique(a): """Find unique elements, retaining order.""" b = [] for x in a: if x not in b: b.append(x) return b
[docs] def iso_probability_contours(pdf, contours=[0.95, 0.68]): """Compute the iso-probability contour values.""" if len(contours) > 1 and not np.all(contours[:-1] > contours[1:]): raise ValueError( "The kwargs `levels` and `contours` have to be ordered from " "outermost to innermost contour, i.e. in strictly descending " "order when referring to the enclosed probability mass, e.g. " "like the default [0.95, 0.68]. " "This breaking change in behaviour was introduced in version " "2.0.0-beta.10, in order to better match the ordering of other " "matplotlib kwargs." ) contours = [1-p for p in contours] p = np.sort(np.array(pdf).flatten()) m = np.cumsum(p) m /= m[-1] interp = interp1d([0]+list(m), [0]+list(p)) c = list(interp(contours)) return c
[docs] def iso_probability_contours_from_samples(pdf, contours=[0.95, 0.68], weights=None): """Compute the iso-probability contour values.""" if len(contours) > 1 and not np.all(contours[:-1] > contours[1:]): raise ValueError( "The kwargs `levels` and `contours` have to be ordered from " "outermost to innermost contour, i.e. in strictly descending " "order when referring to the enclosed probability mass, e.g. " "like the default [0.95, 0.68]. " "This breaking change in behaviour was introduced in version " "2.0.0-beta.10, in order to better match the ordering of other " "matplotlib kwargs." ) if weights is None: weights = np.ones_like(pdf) contours = [1-p for p in contours] i = np.argsort(pdf) m = np.cumsum(weights[i]) m /= m[-1] interp = interp1d([0]+list(m), [0]+list(pdf[i])) c = list(interp(contours)) return c
[docs] def scaled_triangulation(x, y, cov): """Triangulation scaled by a covariance matrix. Parameters ---------- x, y : array-like x and y coordinates of samples cov : array-like, 2d Covariance matrix for scaling Returns ------- :class:`matplotlib.tri.Triangulation` Triangulation with the appropriate scaling """ L = np.linalg.cholesky(cov) Linv = np.linalg.inv(L) x_, y_ = Linv.dot([x, y]) tri = Triangulation(x_, y_) return Triangulation(x, y, tri.triangles)
[docs] def triangular_sample_compression_2d(x, y, cov, w=None, n=1000): """Histogram a 2D set of weighted samples via triangulation. This defines bins via a triangulation of the subsamples and sums weights within triangles surrounding each point Parameters ---------- x, y : array-like x and y coordinates of samples for compressing cov : array-like, 2d Covariance matrix for scaling w : :class:`pandas.Series`, optional weights of samples n : int, default=1000 number of samples returned. Returns ------- tri : :class:`matplotlib.tri.Triangulation` with an appropriate scaling w : array-like Compressed samples and weights """ # Pre-process samples to not be affected by non-standard indexing # Details: https://github.com/handley-lab/anesthetic/issues/189 x = np.array(x) y = np.array(y) x = pandas.Series(x) if w is None: w = pandas.Series(index=x.index, data=np.ones_like(x)) if n is False: n = len(x) elif n is True or isinstance(n, str): if n is True: n = 'entropy' n = int(neff(w, beta=n)) # Select samples for triangulation if (w != 0).sum() <= n: i = x.index else: i = np.random.choice(x.index, size=n, replace=False, p=w/w.sum()) # Generate triangulation tri = scaled_triangulation(x[i], y[i], cov) # For each point find corresponding triangles trifinder = tri.get_trifinder() j = trifinder(x, y) k = tri.triangles[j[j != -1]] # Compute mass in each triangle, and add it to each corner w_ = np.zeros(len(i)) for i in range(3): np.add.at(w_, k[:, i], w[j != -1]/3) return tri, w_
[docs] def sample_compression_1d(x, w=None, ncompress=True): """Histogram a 1D set of weighted samples via subsampling. This compresses the number of samples, combining weights. Parameters ---------- x : array-like x coordinate of samples for compressing w : :class:`pandas.Series`, optional weights of samples ncompress : int, default=True Degree of compression. * If ``int``: number of samples returned. * If ``True``: compresses to the channel capacity (same as ``ncompress='entropy'``). * If ``False``: no compression. * If ``str``: determine number from the Huggins-Roy family of effective samples in :func:`neff` with ``beta=ncompress``. Returns ------- x, w: array-like Compressed samples and weights """ if ncompress is False: return x, w elif ncompress is True or isinstance(ncompress, str): if ncompress is True: ncompress = 'entropy' ncompress = neff(w, beta=ncompress) x = np.array(x) if w is None: w = np.ones_like(x) w = np.array(w) # Select inner samples for triangulation if len(x) > ncompress: x_ = np.random.choice(x, size=int(ncompress), replace=False) else: x_ = x.copy() x_.sort() # Compress mass onto these subsamples centers = (x_[1:] + x_[:-1])/2 j = np.digitize(x, centers) w_ = np.zeros_like(x_) np.add.at(w_, j, w) return x_, w_
[docs] def is_int(x): """Test whether x is an integer.""" return isinstance(x, int) or isinstance(x, np.integer)
[docs] def match_contour_to_contourf(contours, vmin, vmax): """Get needed `vmin, vmax` to match `contour` colors to `contourf` colors. `contourf` uses the arithmetic mean of contour levels to assign colors, whereas `contour` uses the contour level directly. To get the same colors for `contour` lines as for `contourf` faces, we need some fiddly algebra. """ if len(contours) <= 2: vmin = 2 * vmin - vmax return vmin, vmax else: c0 = contours[0] c1 = contours[1] ce = contours[-2] denom = vmax + ce - c1 - c0 vmin = +(c0 * vmax - c1 * ce + 2 * vmin * (ce - c0)) / denom vmax = -(c0 * vmax + c1 * ce - 2 * vmax * ce) / denom return vmin, vmax
[docs] def insertion_p_value(indexes, nlive, batch=0): """Compute the p-value from insertion indexes, assuming constant nlive. Note that this function doesn't use :func:`scipy.stats.kstest` as the latter assumes continuous distributions. For more detail, see `Fowlie et al. (2020) <https://arxiv.org/abs/2006.03371>`_ For a rolling test, you should provide the optional parameter ``batch!=0``. In this case the test computes the p-value on consecutive batches of size ``nlive * batch``, selects the smallest one and adjusts for multiple comparisons using a Bonferroni correction. Parameters ---------- indexes : array-like list of insertion indexes, sorted by death contour nlive : int number of live points batch : float batch size in units of nlive for a rolling p-value Returns ------- ks_result : dict Kolmogorov-Smirnov test results: * ``D``: Kolmogorov-Smirnov statistic * ``sample_size``: sample size * ``p-value``: p-value if ``batch != 0``: * ``iterations``: bounds of batch with minimum p-value * ``nbatches``: the number of batches in total * ``uncorrected p-value``: p-value without Bonferroni correction """ if batch == 0: bins = np.arange(-0.5, nlive + 0.5, 1.) empirical_pmf = np.histogram( np.array(indexes), bins=bins, density=True, )[0] empirical_cmf = np.cumsum(empirical_pmf) uniform_cmf = np.arange(1., nlive + 1., 1.) / nlive D = abs(empirical_cmf - uniform_cmf).max() sample_size = len(indexes) K = D * np.sqrt(sample_size) ks_result = {} ks_result["D"] = D ks_result["sample_size"] = sample_size ks_result["p-value"] = kstwobign.sf(K) return ks_result else: batch = int(batch * nlive) batches = [ np.array(indexes)[i:i + batch] for i in range(0, len(indexes), batch) ] ks_results = [insertion_p_value(c, nlive) for c in batches] ks_result = min(ks_results, key=lambda t: t["p-value"]) index = ks_results.index(ks_result) ks_result["iterations"] = (index * batch, (index + 1) * batch) ks_result["nbatches"] = n = len(batches) ks_result["uncorrected p-value"] = p = ks_result["p-value"] ks_result["p-value"] = 1. - (1. - p)**n if p != 1 else p * n return ks_result
[docs] @contextlib.contextmanager def temporary_seed(seed): """Context for temporarily setting a numpy seed.""" state = np.random.get_state() np.random.seed(seed) try: yield finally: np.random.set_state(state)
[docs] def adjust_docstrings(obj, pattern, repl, *args, **kwargs): """Adjust the docstrings of a class using regular expressions. After the first argument, the remaining arguments are identical to re.sub. Parameters ---------- cls : class class to adjust pattern : str regular expression pattern repl : str replacement string """ if inspect.isclass(obj): for key, val in obj.__dict__.items(): doc = inspect.getdoc(val) if doc is not None: i = doc.find("See Also") j = doc.find("Examples") if i != -1: doc = doc[:i] if j == -1 else doc[:i] + doc[j:] newdoc = re.sub(pattern, repl, doc, *args, **kwargs) try: obj.__dict__[key].__doc__ = newdoc except AttributeError: pass else: doc = inspect.getdoc(obj) if doc is not None: i = doc.find("See Also") j = doc.find("Examples") if i != -1: doc = doc[:i] if j == -1 else doc[:i] + doc[j:] newdoc = re.sub(pattern, repl, doc, *args, **kwargs) obj.__doc__ = newdoc