"""Data-processing utility functions."""
import numpy as np
import pandas
from scipy import special
from scipy.interpolate import interp1d
from scipy.optimize import minimize_scalar
from scipy.stats import kstwobign, entropy
from matplotlib.tri import Triangulation
import contextlib
import inspect
import re
[docs]
def logsumexp(a, axis=None, b=None, keepdims=False, return_sign=False):
r"""Compute the log of the sum of exponentials of input elements.
This function has the same call signature as
:func:`scipy.special.logsumexp` and mirrors scipy's behaviour except for
``-np.inf`` input. If a and b are both ``-inf`` then scipy's function will
output ``nan`` whereas here we use:
.. math::
\lim_{x \to -\infty} x \exp(x) = 0
Thus, if ``a=-inf`` in ``log(sum(b * exp(a))`` then we can set ``b=0``
such that that term is ignored in the sum.
"""
if b is None:
b = np.ones_like(a)
b = np.where(a == -np.inf, 0, b)
return special.logsumexp(a, axis=axis, b=b, keepdims=keepdims,
return_sign=return_sign)
[docs]
def neff(w, beta=1):
r"""Calculate effective number of samples.
Using the Huggins-Roy family of effective samples
(https://aakinshin.net/posts/huggins-roy-ess/).
Parameters
----------
beta : int, float, str, default = 1
The value of beta used to calculate the number of effective samples
according to
.. math::
N_{eff} &= \bigg(\sum_{i=0}^n w_i^\beta \bigg)^{\frac{1}{1-\beta}}
w_i &= \frac{w_i}{\sum_j w_j}
Beta can take any positive value. Larger beta corresponds to a greater
compression such that:
.. math::
\beta_1 < \beta_2 \Rightarrow N_{eff}(\beta_1) > N_{eff}(\beta_2)
Alternatively, beta can take one of the following strings as input:
* If 'inf' or 'equal' is supplied (equivalent to beta=inf), then the
resulting number of samples is the number of samples when compressed
to equal weights, and given by:
.. math::
w_i &= \frac{w_i}{\sum_j w_j}
N_{eff} &= \frac{1}{\max_i[w_i]}
* If 'entropy' is supplied (equivalent to beta=1), then the estimate
is determined via the entropy based calculation, also referred to as
the channel capacity:
.. math::
H &= -\sum_i p_i \ln p_i
p_i &= \frac{w_i}{\sum_j w_j}
N_{eff} &= e^{H}
* If 'kish' is supplied (equivalent to beta=2), then a Kish estimate
is computed (Kish, Leslie (1965). Survey Sampling.
New York: John Wiley & Sons, Inc. ISBN 0-471-10949-5):
.. math::
N_{eff} = \frac{(\sum_i w_i)^2}{\sum_i w_i^2}
* str(float) input gets converted to the corresponding float value.
"""
w = w / np.sum(w)
if beta == np.inf or beta == 'inf' or beta == 'equal':
return 1 / np.max(w)
elif beta == 'entropy' or beta != 'kish' and str(float(beta)) == '1.0':
return np.exp(entropy(w))
else:
if beta == 'kish':
beta = 2
elif isinstance(beta, str):
beta = float(beta)
return np.sum(w**beta)**(1/(1-beta))
[docs]
def compress_weights(w, u=None, ncompress=True):
"""Compresses weights to their approximate channel capacity."""
if u is None:
u = np.random.rand(len(w))
if w is None:
w = np.ones_like(u)
if ncompress is True or isinstance(ncompress, str):
if ncompress is True:
ncompress = 'entropy'
ncompress = neff(w, beta=ncompress)
elif ncompress is False:
return w
# TODO: remove this in version >= 2.1
if ncompress < 0:
raise ValueError(
"ncompress<0 is anesthetic 1.0 behaviour. For equally weighted "
"samples you should now use ncompress='inf' or ncompress='equal'."
)
W = w * ncompress / w.sum()
fraction, integer = np.modf(W)
integer = integer.astype(int)
if is_int(ncompress):
remainder = ncompress - integer.sum()
mask = fraction > 0
race_time = np.full_like(fraction, np.inf) # exp-race arrival time
race_time[mask] = -np.log(u[mask])/fraction[mask]
idx = np.argpartition(race_time, remainder-1)[:remainder]
extra = np.bincount(idx, minlength=len(integer))
else:
extra = (u < fraction).astype(int)
return integer + extra
[docs]
def quantile(a, q, w=None, interpolation='linear'):
"""Compute the weighted quantile for a one dimensional array."""
if w is None:
w = np.ones_like(a)
a = np.array(list(a)) # Necessary to convert pandas arrays
w = np.array(list(w)) # Necessary to convert pandas arrays
i = np.argsort(a)
c = np.cumsum(w[i[1:]]+w[i[:-1]])
c = c / c[-1]
c = np.concatenate(([0.], c))
icdf = interp1d(c, a[i], kind=interpolation)
quant = icdf(q)
if isinstance(q, float):
quant = float(quant)
return quant
[docs]
def var_unbiased(a, w, axis=0, ddof=1):
"""Compute the unbiased variance from weighted samples.
Uses the standard reliability-weight correction
var = s2 / (v1 - v2/v1) (for ddof=1),
and supports the frequency-weight case by using
var = s2 / (v1 - ddof) when w is integer and v1 > 1.
Parameters
----------
a : np.array
Input samples.
w : np.array
Associated sample weights.
Integer -> frequency weights;
Float -> reliability weights.
axis : int
Axis along which to compute variance.
ddof : int, default=1
Delta degrees of freedom.
Returns
-------
var : float, np.array
Unbiased variance.
"""
mu = np.ma.filled(np.average(a, weights=w, axis=axis), np.nan)
v1 = np.ma.filled(w.sum(axis=axis), 0.0)
if np.issubdtype(w.dtype, np.integer):
v2 = v1
else:
# ---- reliability weights branch ----
v2 = np.ma.filled((w ** 2).sum(axis=axis), 0.0)
s2 = np.ma.filled((w * (a - mu)**2).sum(axis=axis), 0.0)
invalid = (v1 == 0) | (v1**2 - ddof*v2 == 0) | np.isnan(mu) | np.isnan(s2)
nans = np.full_like(v1, np.nan, dtype=float)
var = np.divide(v1 * s2, v1**2 - ddof*v2, out=nans.copy(), where=~invalid)
return var if np.ndim(var) > 0 else np.float64(var)
[docs]
def cov_unbiased(a, w, ddof=1, return_corr=False):
"""Compute the unbiased covariance from weighted samples.
Parameters
----------
a : np.array, shape (n_samples, n_features)
Input samples in rows, features/parameters in columns.
w : np.array, shape (n_samples,)
Associated sample weights.
Integer -> frequency weights;
Float -> reliability weights.
ddof : int, default=1
Delta degrees of freedom.
Returns
-------
cov : ndarray, shape (n_features, n_features)
Unbiased covariance matrix.
"""
a = np.atleast_2d(a)
# If all weights are zero, mimic pandas' "all NaN" behaviour upstream.
if w.sum() == 0:
return np.full((a.shape[1], a.shape[1]), np.nan, dtype=float)
# ---- fast path: no NaNs anywhere ----
if not return_corr:
if np.isfinite(a).all() and a.shape[0] >= 2 and a.shape[1] >= 2:
if np.issubdtype(w.dtype, np.integer):
return np.cov(a, rowvar=False, fweights=w, ddof=ddof)
else:
return np.cov(a, rowvar=False, aweights=w, ddof=ddof)
M = np.isfinite(a) # shape (n, p)
A = np.where(M, a, 0.0) # shape (n, p)
WA = (w[:, None] * A).T # shape (n, p)
V1 = (w[:, None] * M).T @ M # shape (p, p)
S1 = WA @ M # shape (p, p)
S2 = WA @ A # shape (p, p)
if np.issubdtype(w.dtype, np.integer):
# ---- frequency weights branch ----
V2 = V1
else:
# ---- reliability weights branch ----
V2 = ((w**2)[:, None] * M).T @ M # shape (p, p)
invalid = (V1 == 0) | (V1**2 - ddof * V2 == 0)
cov = np.full_like(S2, np.nan, dtype=float)
cov = np.divide(V1 * S2 - S1.T * S1, V1**2 - ddof * V2,
out=cov.copy(), where=~invalid)
if not return_corr:
return cov
# pairwise variances on the SAME intersections
S2i = (w[:, None] * A * A).T @ M
nans = np.full_like(cov, np.nan, dtype=float)
var_i = np.divide(V1 * S2i - S1**2, V1**2 - 1 * V2,
out=nans.copy(), where=~invalid)
invalid = invalid | (var_i * var_i.T <= 0)
corr = np.divide(cov, np.sqrt(var_i * var_i.T),
out=nans.copy(), where=~invalid)
return corr
[docs]
def skew_unbiased(a, w, axis=0):
"""Compute the unbiased skewness from weighted samples.
Adapted from Lorenzo Rimoldini (2013):
https://arxiv.org/pdf/1304.6564
Parameters
----------
a : np.array
Input samples.
w : np.array
Associated sample weights.
Integer -> frequency weights;
Float -> reliability weights.
axis : int
Axis along which to compute kurtosis.
Returns
-------
skew : float, np.array
Unbiased skewness (`G1`).
"""
mu = np.ma.filled(np.average(a, weights=w, axis=axis), np.nan)
v1 = np.ma.filled(w.sum(axis=axis), 0.0)
v2 = np.ma.filled((w**2).sum(axis=axis), 0.0)
v3 = np.ma.filled((w**3).sum(axis=axis), 0.0)
s2 = np.ma.filled((w * (a - mu)**2).sum(axis=axis), 0.0)
s3 = np.ma.filled((w * (a - mu)**3).sum(axis=axis), 0.0)
nans = np.full_like(v1, np.nan, dtype=float)
# ---- frequency weights branch ----
if np.issubdtype(w.dtype, np.integer):
n = v1
if np.all(n <= 2):
return np.nan
invalid = (n <= 2) | np.isnan(mu) | np.isnan(s2) | np.isnan(s3)
# pandas convention: zero variance (s2==0) => skew = 0
degenerate = (~invalid) & (s2 == 0)
skew = np.divide(n * (n-1)**0.5 * s3, (n-2) * s2**1.5,
out=nans.copy(), where=(~invalid) & (~degenerate))
skew = np.where(degenerate, 0.0, skew)
return skew if np.ndim(skew) > 0 else np.float64(skew)
# ---- reliability weights branch (Rimoldini) ----
if a.shape[axis] <= 2:
return np.nan
invalid = ((v1 == 0) | (v1**2 - v2 == 0) | (v1**3 - 3*v1*v2 + 2*v3 == 0)
| np.isnan(mu) | np.isnan(s2) | np.isnan(s3))
k2 = np.divide(v1 * s2, v1**2 - v2, out=nans.copy(), where=~invalid)
k3 = np.divide(v1**2 * s3, v1**3 - 3*v1*v2 + 2*v3,
out=nans.copy(), where=~invalid)
# pandas convention: zero variance (k2==0) => skew = 0
degenerate = (~invalid) & (k2 == 0)
skew = np.divide(k3, k2**1.5, out=nans.copy(),
where=(~invalid) & (~degenerate))
skew = np.where(degenerate, 0.0, skew)
return skew if np.ndim(skew) > 0 else np.float64(skew)
[docs]
def kurt_unbiased(a, w, axis=0):
"""Compute the unbiased kurtosis from weighted samples.
Adapted from Lorenzo Rimoldini (2013):
https://arxiv.org/pdf/1304.6564
Parameters
----------
a : np.array
Input samples.
w : np.array
Associated sample weights.
Integer -> frequency weights;
Float -> reliability weights.
axis : int
Axis along which to compute kurtosis.
Returns
-------
kurt : float, np.array
Unbiased kurtosis (`G2`).
"""
mu = np.ma.filled(np.average(a, weights=w, axis=axis), np.nan)
v1 = np.ma.filled(w.sum(axis=axis), 0.0)
v2 = np.ma.filled((w**2).sum(axis=axis), 0.0)
v3 = np.ma.filled((w**3).sum(axis=axis), 0.0)
v4 = np.ma.filled((w**4).sum(axis=axis), 0.0)
s2 = np.ma.filled((w * (a - mu)**2).sum(axis=axis), 0.0)
s4 = np.ma.filled((w * (a - mu)**4).sum(axis=axis), 0.0)
nans = np.full_like(v1, np.nan, dtype=float)
# ---- frequency weights branch ----
if np.issubdtype(w.dtype, np.integer):
n = v1
if np.all(n <= 3):
return np.nan
invalid = (n <= 3) | np.isnan(mu) | np.isnan(s2) | np.isnan(s4)
# pandas convention: zero variance (s2==0) => kurt = 0
degenerate = (~invalid) & (s2 == 0)
kurt = np.divide((n+1)*n*(n-1)*s4-3*(n-1)**2*s2**2, (n-2)*(n-3)*s2**2,
out=nans.copy(), where=(~invalid) & (~degenerate))
kurt = np.where(degenerate, 0.0, kurt)
return kurt if np.ndim(kurt) > 0 else np.float64(kurt)
# ---- reliability weights branch (Rimoldini) ----
if a.shape[axis] <= 3:
return np.nan
den2 = v1**2-v2
den4 = (v1**2 - v2) * (v1**4 - 6*v1**2*v2 + 8*v1*v3 + 3*v2**2 - 6*v4)
num41 = v1 * (v1**4 - 4*v1*v3 + 3*v2**2)
num42 = 3 * (v1**4 - 2*v1**2*v2 + 4*v1*v3 - 3*v2**2)
invalid = ((v1 == 0) | (den2 == 0) | (den4 == 0)
| np.isnan(mu) | np.isnan(s2) | np.isnan(s4))
k2 = np.divide(v1 * s2, den2, out=nans.copy(), where=~invalid)
k4 = np.divide(num41*s4-num42*s2**2, den4, out=nans.copy(), where=~invalid)
# pandas convention: zero variance (k2==0) => kurt = 0
degenerate = (~invalid) & (k2 == 0)
invalid = invalid | (k2 == 0)
kurt = np.divide(k4, k2**2, out=nans.copy(), where=~invalid)
kurt = np.where(degenerate, 0.0, kurt)
return kurt if np.ndim(kurt) > 0 else np.float64(kurt)
[docs]
def sample_cdf(samples, inverse=False, interpolation='linear'):
"""Sample the empirical cdf for a 1d array."""
samples = np.sort(samples)
ngaps = len(samples)-1
gaps = np.random.dirichlet(np.ones(ngaps))
cdf = np.array([0, *np.cumsum(gaps)])
# The last element should be one (up to floating point errors) because
# dirichlet sums to one. Set exactly to avoid interpolation errors.
cdf[-1] = 1
if inverse:
return interp1d(cdf, samples, kind=interpolation)
else:
return interp1d(samples, cdf, kind=interpolation,
fill_value=(0, 1), bounds_error=False)
[docs]
def credibility_interval(samples, weights=None, level=0.68, method="iso-pdf",
return_covariance=False, nsamples=12):
"""Compute the credibility interval of weighted samples.
Based on linear interpolation of the cumulative density function, thus
expect discretisation errors on the scale of distances between samples.
https://github.com/Stefan-Heimersheim/fastCI#readme
Parameters
----------
samples : array
Samples to compute the credibility interval of.
weights : array, default=np.ones_like(samples)
Weights corresponding to samples.
level : float, default=0.68
Credibility level (probability, <1).
method : str, default='iso-pdf'
Which definition of interval to use:
* ``'iso-pdf'``: Calculate iso probability density interval with the
same probability density at each end. Also known as
waterline-interval or highest average posterior density interval.
This is only accurate if the distribution is sufficiently uni-modal.
* ``'lower-limit'``/``'upper-limit'``: Lower/upper limit. One-sided
limits for which ``level`` fraction of the (equally weighted) samples
lie above/below the limit.
* ``'equal-tailed'``: Equal-tailed interval with the same fraction of
(equally weighted) samples below and above the interval region.
return_covariance: bool, default=False
Return the covariance of the sampled limits, in addition to the mean
nsamples : int, default=12
Number of CDF samples to improve `mean` and `std` estimate.
Returns
-------
limit(s) : float, array, or tuple of floats or arrays
Returns the credibility interval boundari(es). By default,
returns the mean over ``nsamples`` samples, which is either
two numbers (``method='iso-pdf'``/``'equal-tailed'``) or one number
(``method='lower-limit'``/``'upper-limit'``). If
``return_covariance=True``, returns a tuple (mean(s), covariance)
where covariance is the covariance over the sampled limits.
"""
if level >= 1:
raise ValueError('level must be <1, got {0:.2f}'.format(level))
if len(np.shape(samples)) != 1:
raise ValueError('Support only 1D arrays for samples')
if weights is not None and np.shape(samples) != np.shape(weights):
raise ValueError('Shape of samples and weights differs')
# Convert to numpy to unify indexing
samples = np.array(samples.copy())
if weights is None:
weights = np.ones(len(samples))
else:
weights = np.array(weights.copy())
# Convert samples to unit weight not the case
if not np.all(np.logical_or(weights == 0, weights == 1)):
# compress_weights with ncompress='equal' assures weights \in 0,1
# Note that this must be done, we cannot handle weights != 1
# see this discussion for details:
# https://github.com/williamjameshandley/anesthetic/pull/188#issuecomment-1274980982
weights = compress_weights(weights, ncompress='equal')
indices = np.where(weights)[0]
x = samples[indices]
# Sample the confidence interval multiple times
# to get errorbars on confidence interval boundaries
ci_samples = []
for i in range(nsamples):
invCDF = sample_cdf(x, inverse=True)
if method == 'iso-pdf':
# Find smallest interval
def distance(Y, level=level):
return invCDF(Y+level)-invCDF(Y)
res = minimize_scalar(distance, bounds=(0, 1-level),
method="Bounded")
ci_samples.append(np.array([invCDF(res.x),
invCDF(res.x+level)]))
elif method == 'lower-limit':
# Get value from which we reach the desired level
ci_samples.append(invCDF(1-level))
elif method == 'upper-limit':
# Get value to which we reach the desired level
ci_samples.append(invCDF(level))
elif method == 'equal-tailed':
ci_samples.append(np.array([invCDF((1-level)/2),
invCDF((1+level)/2)]))
else:
raise ValueError("Method '{0:}' unknown".format(method))
ci_samples = np.array(ci_samples)
if np.shape(ci_samples) == (nsamples, ):
if return_covariance:
return np.mean(ci_samples), np.cov(ci_samples)
else:
return np.mean(ci_samples)
else:
if return_covariance:
return np.mean(ci_samples, axis=0), \
np.cov(ci_samples, rowvar=False)
else:
return np.mean(ci_samples, axis=0)
[docs]
def mirror_1d(d, xmin=None, xmax=None):
"""If necessary apply reflecting boundary conditions."""
if xmin is not None and xmax is not None:
xmed = (xmin+xmax)/2
return np.concatenate((2*xmin-d[d < xmed], d, 2*xmax-d[d >= xmed]))
elif xmin is not None:
return np.concatenate((2*xmin-d, d))
elif xmax is not None:
return np.concatenate((d, 2*xmax-d))
else:
return d
[docs]
def mirror_2d(d_x_, d_y_, xmin=None, xmax=None, ymin=None, ymax=None):
"""If necessary apply reflecting boundary conditions."""
d_x = d_x_.copy()
d_y = d_y_.copy()
if xmin is not None and xmax is not None:
xmed = (xmin+xmax)/2
d_y = np.concatenate((d_y[d_x < xmed], d_y, d_y[d_x >= xmed]))
d_x = np.concatenate((2*xmin-d_x[d_x < xmed], d_x,
2*xmax-d_x[d_x >= xmed]))
elif xmin is not None:
d_y = np.concatenate((d_y, d_y))
d_x = np.concatenate((2*xmin-d_x, d_x))
elif xmax is not None:
d_y = np.concatenate((d_y, d_y))
d_x = np.concatenate((d_x, 2*xmax-d_x))
if ymin is not None and ymax is not None:
ymed = (ymin+ymax)/2
d_x = np.concatenate((d_x[d_y < ymed], d_x, d_x[d_y >= ymed]))
d_y = np.concatenate((2*ymin-d_y[d_y < ymed], d_y,
2*ymax-d_y[d_y >= ymed]))
elif ymin is not None:
d_x = np.concatenate((d_x, d_x))
d_y = np.concatenate((2*ymin-d_y, d_y))
elif ymax is not None:
d_x = np.concatenate((d_x, d_x))
d_y = np.concatenate((d_y, 2*ymax-d_y))
return d_x, d_y
[docs]
def nest_level(lst):
"""Calculate the nesting level of a list."""
if not isinstance(lst, list):
return 0
if not lst:
return 1
return max(nest_level(item) for item in lst) + 1
[docs]
def histogram(a, **kwargs):
"""Produce a histogram for path-based plotting.
This is a cheap histogram. Necessary if one wants to update the histogram
dynamically, and redrawing and filling is very expensive.
This has the same arguments and keywords as :func:`numpy.histogram`, but is
normalised to 1.
"""
hist, bin_edges = np.histogram(a, **kwargs)
xpath, ypath = np.empty((2, 4*len(hist)))
ypath[0::4] = ypath[3::4] = 0
ypath[1::4] = ypath[2::4] = hist
xpath[0::4] = xpath[1::4] = bin_edges[:-1]
xpath[2::4] = xpath[3::4] = bin_edges[1:]
mx = max(ypath)
if mx:
ypath /= max(ypath)
return xpath, ypath
[docs]
def histogram_bin_edges(samples, weights, bins='fd', range=None, beta='equal'):
"""Compute a good number of bins dynamically from weighted samples.
Parameters
----------
samples : array_like
Input data.
weights : array-like
Array of sample weights.
bins : str, default='fd'
String defining the rule used to automatically compute a good number
of bins for the weighted samples:
* 'fd' : Freedman--Diaconis rule (modified for weighted data)
* 'scott' : Scott's rule (modified for weighted data)
* 'sqrt' : Square root estimator (modified for weighted data)
range : (float, float), optional
The lower and upper range of the bins. If not provided, range is simply
``(a.min(), a.max())``. Values outside the range are ignored. The first
element of the range must be less than or equal to the second.
beta : float, default='equal'
The value of beta>0 used to calculate the number of effective
samples via :func:`neff`.
Returns
-------
bin_edges : array of dtype float
The edges to pass to :func:`numpy.histogram`.
"""
if weights is None:
weights = np.ones_like(samples)
if range is None:
minimum = np.min(samples)
maximum = np.max(samples)
data_range = maximum - minimum
else:
minimum = range[0]
maximum = range[1]
data_range = maximum - minimum
data_mask = (samples >= minimum) & (samples <= maximum)
samples = samples[data_mask]
weights = weights[data_mask]
w = weights / np.sum(weights)
N_eff = neff(w=w, beta=beta)
if bins == 'fd': # Freedman--Diaconis rule
q1, q3 = quantile(samples, [1/4, 3/4], w=w)
bin_width = 2 * (q3 - q1) * N_eff**(-1/3)
elif bins == 'scott': # Scott's rule
weighted_mean = np.average(samples, weights=w)
weighted_var = np.average((samples - weighted_mean)**2, weights=w)
weighted_std = np.sqrt(weighted_var)
bin_width = (24 * np.pi**0.5 / N_eff)**(1/3) * weighted_std
elif bins == 'sqrt': # Square root estimator
samples_eff, _ = sample_compression_1d(samples, w=w, ncompress=N_eff)
data_range_eff = np.max(samples_eff) - np.min(samples_eff)
bin_width = data_range_eff / np.sqrt(N_eff)
nbins = int(np.ceil(data_range / bin_width))
bin_edges = np.linspace(minimum, maximum, nbins+1)
return bin_edges
[docs]
def compute_nlive(death, birth):
"""Compute number of live points from birth and death contours.
Parameters
----------
death, birth : array-like
list of birth and death contours
Returns
-------
nlive : np.array
number of live points at each contour
"""
b = pandas.DataFrame(np.array(birth), columns=['logL'])
d = pandas.DataFrame(np.array(death), columns=['logL'],
index=b.index + len(b))
b['n'] = +1
d['n'] = -1
t = pandas.concat([b, d]).sort_values(['logL', 'n'], na_position='first')
n = t.n.cumsum()
return (n[d.index]+1).to_numpy()
[docs]
def compute_insertion_indexes(death, birth):
"""Compute the live point insertion index for each point.
For more detail, see `Fowlie et al. (2020)
<https://arxiv.org/abs/2006.03371>`_
Parameters
----------
death, birth : array-like
list of birth and death contours
Returns
-------
indexes : np.array
live point index at which each live point was inserted
"""
indexes = np.zeros_like(birth, dtype=int)
for i, (b, d) in enumerate(zip(birth, death)):
i_live = (death > b) & (birth <= b)
live = death[i_live]
live.sort()
indexes[i] = np.searchsorted(live, d)
return indexes
[docs]
def unique(a):
"""Find unique elements, retaining order."""
b = []
for x in a:
if x not in b:
b.append(x)
return b
[docs]
def iso_probability_contours(pdf, contours=[0.95, 0.68]):
"""Compute the iso-probability contour values."""
if len(contours) > 1 and not np.all(contours[:-1] > contours[1:]):
raise ValueError(
"The kwargs `levels` and `contours` have to be ordered from "
"outermost to innermost contour, i.e. in strictly descending "
"order when referring to the enclosed probability mass, e.g. "
"like the default [0.95, 0.68]. "
"This breaking change in behaviour was introduced in version "
"2.0.0-beta.10, in order to better match the ordering of other "
"matplotlib kwargs."
)
contours = [1-p for p in contours]
p = np.sort(np.array(pdf).flatten())
m = np.cumsum(p)
m /= m[-1]
interp = interp1d([0]+list(m), [0]+list(p))
c = list(interp(contours))
return c
[docs]
def iso_probability_contours_from_samples(pdf, contours=[0.95, 0.68],
weights=None):
"""Compute the iso-probability contour values."""
if len(contours) > 1 and not np.all(contours[:-1] > contours[1:]):
raise ValueError(
"The kwargs `levels` and `contours` have to be ordered from "
"outermost to innermost contour, i.e. in strictly descending "
"order when referring to the enclosed probability mass, e.g. "
"like the default [0.95, 0.68]. "
"This breaking change in behaviour was introduced in version "
"2.0.0-beta.10, in order to better match the ordering of other "
"matplotlib kwargs."
)
if weights is None:
weights = np.ones_like(pdf)
contours = [1-p for p in contours]
i = np.argsort(pdf)
m = np.cumsum(weights[i])
m /= m[-1]
interp = interp1d([0]+list(m), [0]+list(pdf[i]))
c = list(interp(contours))
return c
[docs]
def scaled_triangulation(x, y, cov):
"""Triangulation scaled by a covariance matrix.
Parameters
----------
x, y : array-like
x and y coordinates of samples
cov : array-like, 2d
Covariance matrix for scaling
Returns
-------
:class:`matplotlib.tri.Triangulation`
Triangulation with the appropriate scaling
"""
L = np.linalg.cholesky(cov)
Linv = np.linalg.inv(L)
x_, y_ = Linv.dot([x, y])
tri = Triangulation(x_, y_)
return Triangulation(x, y, tri.triangles)
[docs]
def triangular_sample_compression_2d(x, y, cov, w=None, n=1000):
"""Histogram a 2D set of weighted samples via triangulation.
This defines bins via a triangulation of the subsamples and sums weights
within triangles surrounding each point
Parameters
----------
x, y : array-like
x and y coordinates of samples for compressing
cov : array-like, 2d
Covariance matrix for scaling
w : :class:`pandas.Series`, optional
weights of samples
n : int, default=1000
number of samples returned.
Returns
-------
tri :
:class:`matplotlib.tri.Triangulation` with an appropriate scaling
w : array-like
Compressed samples and weights
"""
# Pre-process samples to not be affected by non-standard indexing
# Details: https://github.com/handley-lab/anesthetic/issues/189
x = np.array(x)
y = np.array(y)
x = pandas.Series(x)
if w is None:
w = pandas.Series(index=x.index, data=np.ones_like(x))
if n is False:
n = len(x)
elif n is True or isinstance(n, str):
if n is True:
n = 'entropy'
n = int(neff(w, beta=n))
# Select samples for triangulation
if (w != 0).sum() <= n:
i = x.index
else:
i = np.random.choice(x.index, size=n, replace=False, p=w/w.sum())
# Generate triangulation
tri = scaled_triangulation(x[i], y[i], cov)
# For each point find corresponding triangles
trifinder = tri.get_trifinder()
j = trifinder(x, y)
k = tri.triangles[j[j != -1]]
# Compute mass in each triangle, and add it to each corner
w_ = np.zeros(len(i))
for i in range(3):
np.add.at(w_, k[:, i], w[j != -1]/3)
return tri, w_
[docs]
def sample_compression_1d(x, w=None, ncompress=True):
"""Histogram a 1D set of weighted samples via subsampling.
This compresses the number of samples, combining weights.
Parameters
----------
x : array-like
x coordinate of samples for compressing
w : :class:`pandas.Series`, optional
weights of samples
ncompress : int, default=True
Degree of compression.
* If ``int``: number of samples returned.
* If ``True``: compresses to the channel capacity
(same as ``ncompress='entropy'``).
* If ``False``: no compression.
* If ``str``: determine number from the Huggins-Roy family of effective
samples in :func:`neff` with ``beta=ncompress``.
Returns
-------
x, w: array-like
Compressed samples and weights
"""
if ncompress is False:
return x, w
elif ncompress is True or isinstance(ncompress, str):
if ncompress is True:
ncompress = 'entropy'
ncompress = neff(w, beta=ncompress)
x = np.array(x)
if w is None:
w = np.ones_like(x)
w = np.array(w)
# Select inner samples for triangulation
if len(x) > ncompress:
x_ = np.random.choice(x, size=int(ncompress), replace=False)
else:
x_ = x.copy()
x_.sort()
# Compress mass onto these subsamples
centers = (x_[1:] + x_[:-1])/2
j = np.digitize(x, centers)
w_ = np.zeros_like(x_)
np.add.at(w_, j, w)
return x_, w_
[docs]
def is_int(x):
"""Test whether x is an integer."""
return isinstance(x, int) or isinstance(x, np.integer)
[docs]
def match_contour_to_contourf(contours, vmin, vmax):
"""Get needed `vmin, vmax` to match `contour` colors to `contourf` colors.
`contourf` uses the arithmetic mean of contour levels to assign colors,
whereas `contour` uses the contour level directly. To get the same colors
for `contour` lines as for `contourf` faces, we need some fiddly algebra.
"""
if len(contours) <= 2:
vmin = 2 * vmin - vmax
return vmin, vmax
else:
c0 = contours[0]
c1 = contours[1]
ce = contours[-2]
denom = vmax + ce - c1 - c0
vmin = +(c0 * vmax - c1 * ce + 2 * vmin * (ce - c0)) / denom
vmax = -(c0 * vmax + c1 * ce - 2 * vmax * ce) / denom
return vmin, vmax
[docs]
def insertion_p_value(indexes, nlive, batch=0):
"""Compute the p-value from insertion indexes, assuming constant nlive.
Note that this function doesn't use :func:`scipy.stats.kstest` as the
latter assumes continuous distributions.
For more detail, see `Fowlie et al. (2020)
<https://arxiv.org/abs/2006.03371>`_
For a rolling test, you should provide the optional parameter ``batch!=0``.
In this case the test computes the p-value on consecutive batches of size
``nlive * batch``, selects the smallest one and adjusts for multiple
comparisons using a Bonferroni correction.
Parameters
----------
indexes : array-like
list of insertion indexes, sorted by death contour
nlive : int
number of live points
batch : float
batch size in units of nlive for a rolling p-value
Returns
-------
ks_result : dict
Kolmogorov-Smirnov test results:
* ``D``: Kolmogorov-Smirnov statistic
* ``sample_size``: sample size
* ``p-value``: p-value
if ``batch != 0``:
* ``iterations``: bounds of batch with minimum p-value
* ``nbatches``: the number of batches in total
* ``uncorrected p-value``: p-value without Bonferroni correction
"""
if batch == 0:
bins = np.arange(-0.5, nlive + 0.5, 1.)
empirical_pmf = np.histogram(
np.array(indexes),
bins=bins,
density=True,
)[0]
empirical_cmf = np.cumsum(empirical_pmf)
uniform_cmf = np.arange(1., nlive + 1., 1.) / nlive
D = abs(empirical_cmf - uniform_cmf).max()
sample_size = len(indexes)
K = D * np.sqrt(sample_size)
ks_result = {}
ks_result["D"] = D
ks_result["sample_size"] = sample_size
ks_result["p-value"] = kstwobign.sf(K)
return ks_result
else:
batch = int(batch * nlive)
batches = [
np.array(indexes)[i:i + batch]
for i in range(0, len(indexes), batch)
]
ks_results = [insertion_p_value(c, nlive) for c in batches]
ks_result = min(ks_results, key=lambda t: t["p-value"])
index = ks_results.index(ks_result)
ks_result["iterations"] = (index * batch, (index + 1) * batch)
ks_result["nbatches"] = n = len(batches)
ks_result["uncorrected p-value"] = p = ks_result["p-value"]
ks_result["p-value"] = 1. - (1. - p)**n if p != 1 else p * n
return ks_result
[docs]
@contextlib.contextmanager
def temporary_seed(seed):
"""Context for temporarily setting a numpy seed."""
state = np.random.get_state()
np.random.seed(seed)
try:
yield
finally:
np.random.set_state(state)
[docs]
def adjust_docstrings(obj, pattern, repl, *args, **kwargs):
"""Adjust the docstrings of a class using regular expressions.
After the first argument, the remaining arguments are identical to re.sub.
Parameters
----------
cls : class
class to adjust
pattern : str
regular expression pattern
repl : str
replacement string
"""
if inspect.isclass(obj):
for key, val in obj.__dict__.items():
doc = inspect.getdoc(val)
if doc is not None:
i = doc.find("See Also")
j = doc.find("Examples")
if i != -1:
doc = doc[:i] if j == -1 else doc[:i] + doc[j:]
newdoc = re.sub(pattern, repl, doc, *args, **kwargs)
try:
obj.__dict__[key].__doc__ = newdoc
except AttributeError:
pass
else:
doc = inspect.getdoc(obj)
if doc is not None:
i = doc.find("See Also")
j = doc.find("Examples")
if i != -1:
doc = doc[:i] if j == -1 else doc[:i] + doc[j:]
newdoc = re.sub(pattern, repl, doc, *args, **kwargs)
obj.__doc__ = newdoc