Source code for anesthetic.samples

"""Main classes for the anesthetic module.

- :class:`anesthetic.samples.Samples`
- :class:`anesthetic.samples.MCMCSamples`
- :class:`anesthetic.samples.NestedSamples`
"""
import numpy as np
import scipy
import pandas
import copy
import warnings
from pandas import MultiIndex, Series
from collections.abc import Sequence
from anesthetic.utils import (compute_nlive, compute_insertion_indexes,
                              is_int, logsumexp)
from anesthetic.gui.plot import RunPlotter
from anesthetic.weighted_labelled_pandas import WeightedLabelledDataFrame
from anesthetic.plot import (make_1d_axes, make_2d_axes,
                             AxesSeries, AxesDataFrame)
from anesthetic.utils import adjust_docstrings


[docs] class Samples(WeightedLabelledDataFrame): """Storage and plotting tools for general samples. Extends the :class:`pandas.DataFrame` by providing plotting methods and standardising sample storage. Example plotting commands include - ``samples.plot_1d(['paramA', 'paramB'])`` - ``samples.plot_2d(['paramA', 'paramB'])`` - ``samples.plot_2d([['paramA', 'paramB'], ['paramC', 'paramD']])`` Parameters ---------- data : np.array Coordinates of samples. shape = (nsamples, ndims). columns : list(str) reference names of parameters weights : np.array weights of samples. logL : np.array loglikelihoods of samples. labels : dict or array-like mapping from columns to plotting labels label : str Legend label logzero : float, default=-1e30 The threshold for `log(0)` values assigned to rejected sample points. Anything equal or below this value is set to `-np.inf`. """ _metadata = WeightedLabelledDataFrame._metadata + ['label'] def __init__(self, *args, **kwargs): # TODO: remove this in version >= 2.1 if 'root' in kwargs: root = kwargs.pop('root') name = self.__class__.__name__ raise ValueError( "As of anesthetic 2.0, root is no longer a keyword argument.\n" "To update your code, replace \n\n" ">>> from anesthetic import %s\n" ">>> %s(root=%s)\n\nwith\n\n" ">>> from anesthetic import read_chains\n" ">>> read_chains(%s)" % (name, name, root, root) ) logzero = kwargs.pop('logzero', -1e30) logL = kwargs.pop('logL', None) if logL is not None: logL = np.array(logL) logL = np.where(logL <= logzero, -np.inf, logL) self.label = kwargs.pop('label', None) super().__init__(*args, **kwargs) if logL is not None: self['logL'] = logL if self.islabelled(axis=1): self.set_label('logL', r'$\ln\mathcal{L}$') @property def _constructor(self): return Samples
[docs] def plot_1d(self, axes=None, *args, **kwargs): """Create an array of 1D plots. Parameters ---------- axes : plotting axes, optional Can be: * list(str) or str * :class:`pandas.Series` of :class:`matplotlib.axes.Axes` If a :class:`pandas.Series` is provided as an existing set of axes, then this is used for creating the plot. Otherwise, a new set of axes are created using the list or lists of strings. If not provided, then all parameters are plotted. This is intended for plotting a sliced array (e.g. `samples[['x0','x1]].plot_1d()`. kind : str, default='kde_1d' What kind of plots to produce. Alongside the usual pandas options {'hist', 'box', 'kde', 'density'}, anesthetic also provides * 'hist_1d': :func:`anesthetic.plot.hist_plot_1d` * 'kde_1d': :func:`anesthetic.plot.kde_plot_1d` * 'fastkde_1d': :func:`anesthetic.plot.fastkde_plot_1d` Warning -- while the other pandas plotting options {'line', 'bar', 'barh', 'area', 'pie'} are also accessible, these can be hard to interpret/expensive for :class:`Samples`, :class:`MCMCSamples`, or :class:`NestedSamples`. logx : list(str), optional Which parameters/columns to plot on a log scale. Needs to match if plotting on top of a pre-existing axes. label : str, optional Legend label added to each axis. Returns ------- axes : :class:`pandas.Series` of :class:`matplotlib.axes.Axes` Pandas array of axes objects """ # TODO: remove this in version >= 2.1 if 'plot_type' in kwargs: raise ValueError( "You are using the anesthetic 1.0 kwarg \'plot_type\' instead " "of anesthetic 2.0 \'kind\'. Please update your code." ) if axes is None: axes = self.drop_labels().columns if not isinstance(axes, AxesSeries): _, axes = make_1d_axes(axes, labels=self.get_labels_map(), logx=kwargs.pop('logx', None)) logx = axes._logx else: logx = kwargs.pop('logx', axes._logx) if logx != axes._logx: raise ValueError(f"logx does not match the pre-existing axes." f"logx={logx}, axes._logx={axes._logx}") kwargs['kind'] = kwargs.get('kind', 'kde_1d') kwargs['label'] = kwargs.get('label', self.label) # TODO: remove this in version >= 2.1 if kwargs['kind'] == 'kde': warnings.warn( "You are using \'kde\' as a plot kind. " "\'kde_1d\' is the appropriate keyword for anesthetic. " "Your plots may look odd if you use this argument." ) elif kwargs['kind'] == 'hist': warnings.warn( "You are using \'hist\' as a plot kind. " "\'hist_1d\' is the appropriate keyword for anesthetic. " "Your plots may look odd if you use this argument." ) for x, ax in axes.items(): if x in self and kwargs['kind'] is not None: xlabel = self.get_label(x) if np.isinf(self[x]).any(): warnings.warn(f"column {x} has inf values.") selfx = self[x].replace([-np.inf, np.inf], np.nan) selfx.plot(ax=ax, xlabel=xlabel, logx=x in logx, *args, **kwargs) ax.set_xlabel(xlabel) else: ax.plot([], []) return axes
[docs] def plot_2d(self, axes=None, *args, **kwargs): """Create an array of 2D plots. To avoid interfering with y-axis sharing, one-dimensional plots are created on a separate axis, which is monkey-patched onto the argument ax as the attribute ax.twin. Parameters ---------- axes : plotting axes, optional Can be: - list(str) if the x and y axes are the same - [list(str),list(str)] if the x and y axes are different - :class:`pandas.DataFrame` of :class:`matplotlib.axes.Axes` If a :class:`pandas.DataFrame` is provided as an existing set of axes, then this is used for creating the plot. Otherwise, a new set of axes are created using the list or lists of strings. If not provided, then all parameters are plotted. This is intended for plotting a sliced array (e.g. `samples[['x0','x1]].plot_2d()`. It is not advisible to plot an entire frame, as it is computationally expensive, and liable to run into linear algebra errors for degenerate derived parameters. kind/kinds : dict, optional What kinds of plots to produce. Dictionary takes the keys 'diagonal' for the 1D plots and 'lower' and 'upper' for the 2D plots. The options for 'diagonal' are: - 'kde_1d': :func:`anesthetic.plot.kde_plot_1d` - 'hist_1d': :func:`anesthetic.plot.hist_plot_1d` - 'fastkde_1d': :func:`anesthetic.plot.fastkde_plot_1d` - 'kde': :meth:`pandas.Series.plot.kde` - 'hist': :meth:`pandas.Series.plot.hist` - 'box': :meth:`pandas.Series.plot.box` - 'density': :meth:`pandas.Series.plot.density` The options for 'lower' and 'upper' are: - 'kde_2d': :func:`anesthetic.plot.kde_contour_plot_2d` - 'hist_2d': :func:`anesthetic.plot.hist_plot_2d` - 'scatter_2d': :func:`anesthetic.plot.scatter_plot_2d` - 'fastkde_2d': :func:`anesthetic.plot.fastkde_contour_plot_2d` - 'kde': :meth:`pandas.DataFrame.plot.kde` - 'scatter': :meth:`pandas.DataFrame.plot.scatter` - 'hexbin': :meth:`pandas.DataFrame.plot.hexbin` There are also a set of shortcuts provided in :attr:`plot_2d_default_kinds`: - 'kde_1d': 1d kde plots down the diagonal - 'kde_2d': 2d kde plots in lower triangle - 'kde': 1d & 2d kde plots in lower & diagonal - 'hist_1d': 1d histograms down the diagonal - 'hist_2d': 2d histograms in lower triangle - 'hist': 1d & 2d histograms in lower & diagonal - 'scatter_2d': 2d scatter in lower triangle - 'scatter': 1d histograms down diagonal & 2d scatter in lower triangle Feel free to add your own to this list! Default: {'diagonal': 'kde_1d', 'lower': 'kde_2d', 'upper':'scatter_2d'} diagonal_kwargs, lower_kwargs, upper_kwargs : dict, optional kwargs for the diagonal (1D)/lower or upper (2D) plots. This is useful when there is a conflict of kwargs for different kinds of plots. Note that any kwargs directly passed to plot_2d will overwrite any kwarg with the same key passed to <sub>_kwargs. Default: {} logx, logy : list(str), optional Which parameters/columns to plot on a log scale for the x-axis and y-axis, respectively. Needs to match if plotting on top of a pre-existing axes. label : str, optional Legend label added to each axis. Returns ------- axes : :class:`pandas.DataFrame` of :class:`matplotlib.axes.Axes` Pandas array of axes objects """ # TODO: remove this in version >= 2.1 if 'types' in kwargs: raise ValueError( "You are using the anesthetic 1.0 kwarg \'types\' instead of " "anesthetic 2.0 \'kind' or \'kinds\' (synonyms). " "Please update your code." ) kind = kwargs.pop('kind', 'default') kind = kwargs.pop('kinds', kind) if isinstance(kind, str) and kind in self.plot_2d_default_kinds: kind = self.plot_2d_default_kinds.get(kind) if (not isinstance(kind, dict) or not set(kind.keys()) <= {'lower', 'upper', 'diagonal'}): raise ValueError(f"{kind} is not a valid input. `kind`/`kinds` " "must be a dict mapping " "{'lower','diagonal','upper'} to an allowed plot " "(see `help(NestedSamples.plot2d)`), or one of " "the following string shortcuts: " f"{list(self.plot_2d_default_kinds.keys())}") if axes is None: axes = self.drop_labels().columns if not isinstance(axes, AxesDataFrame): _, axes = make_2d_axes(axes, labels=self.get_labels_map(), upper=('upper' in kind), lower=('lower' in kind), diagonal=('diagonal' in kind), logx=kwargs.pop('logx', None), logy=kwargs.pop('logy', None)) logx = axes._logx logy = axes._logy else: logx = kwargs.pop('logx', axes._logx) logy = kwargs.pop('logy', axes._logy) if logx != axes._logx or logy != axes._logy: raise ValueError(f"logx or logy not matching existing axes:" f"logx={logx}, axes._logx={axes._logx}" f"logy={logy}, axes._logy={axes._logy}") local_kwargs = {pos: kwargs.pop('%s_kwargs' % pos, {}) for pos in ['upper', 'lower', 'diagonal']} kwargs['label'] = kwargs.get('label', self.label) for pos in local_kwargs: local_kwargs[pos].update(kwargs) for y, row in axes.iterrows(): for x, ax in row.items(): if ax is not None: pos = ax.position lkwargs = local_kwargs.get(pos, {}) lkwargs['kind'] = kind.get(pos, None) # TODO: remove this in version >= 2.1 if lkwargs['kind'] == 'kde': warnings.warn( "You are using \'kde\' as a plot kind. " "\'kde_1d\' and \'kde_2d\' are the appropriate " "keywords for anesthetic. Your plots may look " "odd if you use this argument." ) elif lkwargs['kind'] == 'hist': warnings.warn( "You are using \'hist\' as a plot kind. " "\'hist_1d\' and \'hist_2d\' are the appropriate " "keywords for anesthetic. Your plots may look " "odd if you use this argument." ) if x in self and y in self and lkwargs['kind'] is not None: xlabel = self.get_label(x) ylabel = self.get_label(y) if np.isinf(self[x]).any(): warnings.warn(f"column {x} has inf values.") if x == y: selfx = self[x].replace([-np.inf, np.inf], np.nan) selfx.plot(ax=ax.twin, xlabel=xlabel, logx=x in logx, *args, **lkwargs) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) else: if np.isinf(self[x]).any(): warnings.warn(f"column {y} has inf values.") selfxy = self[[x, y]] selfxy = selfxy.replace([-np.inf, np.inf], np.nan) selfxy = selfxy.dropna(axis=0) selfxy.plot(x, y, ax=ax, xlabel=xlabel, logx=x in logx, logy=y in logy, ylabel=ylabel, *args, **lkwargs) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) else: if x == y: ax.twin.plot([], []) else: ax.plot([], []) axes._set_logticks() return axes
plot_2d_default_kinds = { 'default': {'diagonal': 'kde_1d', 'lower': 'kde_2d', 'upper': 'scatter_2d'}, 'kde': {'diagonal': 'kde_1d', 'lower': 'kde_2d'}, 'kde_1d': {'diagonal': 'kde_1d'}, 'kde_2d': {'lower': 'kde_2d'}, 'fastkde': {'diagonal': 'fastkde_1d', 'lower': 'fastkde_2d'}, 'hist': {'diagonal': 'hist_1d', 'lower': 'hist_2d'}, 'hist_1d': {'diagonal': 'hist_1d'}, 'hist_2d': {'lower': 'hist_2d'}, 'scatter': {'diagonal': 'hist_1d', 'lower': 'scatter_2d'}, 'scatter_2d': {'lower': 'scatter_2d'}, }
[docs] def importance_sample(self, logL_new, action='add', inplace=False): """Perform importance re-weighting on the log-likelihood. Parameters ---------- logL_new : np.array New log-likelihood values. Should have the same shape as `logL`. action : str, default='add' Can be any of {'add', 'replace', 'mask'}. * add: Add the new `logL_new` to the current `logL`. * replace: Replace the current `logL` with the new `logL_new`. * mask: treat `logL_new` as a boolean mask and only keep the corresponding (True) samples. inplace : bool, default=False Indicates whether to modify the existing array, or return a new frame with importance sampling applied. Returns ------- samples : :class:`Samples`/:class:`MCMCSamples`/:class:`NestedSamples` Importance re-weighted samples. """ if inplace: samples = self else: samples = self.copy() if action == 'add': new_weights = samples.get_weights() new_weights *= np.exp(logL_new - logL_new.max()) samples.set_weights(new_weights, inplace=True) samples.logL += logL_new elif action == 'replace': logL_new2 = logL_new - samples.logL new_weights = samples.get_weights() new_weights *= np.exp(logL_new2 - logL_new2.max()) samples.set_weights(new_weights, inplace=True) samples.logL = logL_new elif action == 'mask': samples = samples[logL_new] else: raise NotImplementedError("`action` needs to be one of " "{'add', 'replace', 'mask'}, but '%s' " "was requested." % action) if inplace: self._update_inplace(samples) else: return samples.__finalize__(self, "importance_sample")
# TODO: remove this in version >= 2.1 @property def tex(self): # noqa: disable=D102 raise NotImplementedError( "This is anesthetic 1.0 syntax. You need to update, e.g.\n" "samples.tex[label] = tex # anesthetic 1.0\n" "samples.set_label(label, tex) # anesthetic 2.0\n\n" "tex = samples.tex[label] # anesthetic 1.0\n" "tex = samples.get_label(label) # anesthetic 2.0" )
[docs] def to_hdf(self, path_or_buf, key, *args, **kwargs): # noqa: D102 import anesthetic.read.hdf return anesthetic.read.hdf.to_hdf(path_or_buf, key, self, *args, **kwargs)
[docs] class MCMCSamples(Samples): """Storage and plotting tools for MCMC samples. Any new functionality specific to MCMC (e.g. convergence criteria etc.) should be put here. Parameters ---------- data : np.array Coordinates of samples. shape = (nsamples, ndims). columns : array-like reference names of parameters weights : np.array weights of samples. logL : np.array loglikelihoods of samples. labels : dict or array-like mapping from columns to plotting labels label : str Legend label logzero : float, default=-1e30 The threshold for `log(0)` values assigned to rejected sample points. Anything equal or below this value is set to `-np.inf`. """ _metadata = Samples._metadata + ['root'] @property def _constructor(self): return MCMCSamples
[docs] def remove_burn_in(self, burn_in, reset_index=False, inplace=False): """Remove burn-in samples from each MCMC chain. Parameters ---------- burn_in : int or float or array_like Fraction or number of samples to remove or keep: * ``if 0 < burn_in < 1``: remove first fraction of samples * ``elif 1 < burn_in``: remove first number of samples * ``elif -1 < burn_in < 0``: keep last fraction of samples * ``elif burn_in < -1``: keep last number of samples * ``elif type(burn_in)==list``: different burn-in for each chain reset_index : bool, default=False Whether to reset the index counter to start at zero or not. inplace : bool, default=False Indicates whether to modify the existing array or return a copy. """ chains = self.groupby(('chain', '$n_\\mathrm{chain}$'), sort=False, group_keys=False) nchains = chains.ngroups if isinstance(burn_in, (int, float)): ndrop = np.full(nchains, burn_in) elif isinstance(burn_in, (list, tuple, np.ndarray)) \ and len(burn_in) == nchains: ndrop = np.array(burn_in) else: raise ValueError("`burn_in` has to be a scalar or an array of " "length matching the number of chains " "`nchains=%d`. However, you provided " "`burn_in=%s`" % (nchains, burn_in)) if np.all(np.abs(ndrop) < 1): nsamples = chains.count().iloc[:, 0].to_numpy() ndrop = ndrop * nsamples ndrop = ndrop.astype(int) data = self.drop(chains.apply(lambda g: g.head(ndrop[g.name-1]), include_groups=False).index, inplace=inplace) if reset_index: data = data.reset_index(drop=True, inplace=inplace) return data
[docs] def Gelman_Rubin(self, params=None, per_param=False): """Gelman--Rubin convergence statistic of multiple MCMC chains. Determine the Gelman--Rubin convergence statistic ``R-1`` by computing and comparing the within-chain variance and the between-chain variance. This follows the routine as outlined in `Lewis (2013), section IV.A. <https://arxiv.org/abs/1304.4473>`_ Note that this requires more than one chain. To circumvent this, you could overwrite the ``'chain'`` column, splitting the samples into two or more sets. Parameters ---------- params : list(str) List of column names (i.e. parameters) to be included in the convergence calculation. Default: all parameters (except those parameters that contain 'prior', 'chi2', or 'logL' in their names) per_param : bool or str, default=False Whether to return the per-parameter convergence statistic ``R-1``. * If ``False``: returns only the total convergence statistic. * If ``True``: returns the total convergence statistic and the per-parameter convergence statistic. * If ``'par'``: returns only the per-parameter convergence statistic. * If ``'cov'``: returns only the per-parameter covariant convergence statistic. * If ``'all'``: returns the total convergence statistic and the per-parameter covariant convergence statistic. Returns ------- Rminus1 : float Total Gelman--Rubin convergence statistic ``R-1``. The smaller, the better converged. Aiming for ``Rminus1~0.01`` should normally work well. Rminus1_par : :class:`pandas.DataFrame` Per-parameter Gelman--Rubin convergence statistic. Rminus1_cov : :class:`pandas.DataFrame` Per-parameter covariant Gelman--Rubin convergence statistic. """ self.columns.set_names(['params', 'labels'], inplace=True) if params is None: params = [key for key in self.columns.get_level_values('params') if 'prior' not in key and 'chi2' not in key and 'logL' not in key and 'chain' not in key] chains = self[params+['chain']].groupby( ('chain', '$n_\\mathrm{chain}$'), sort=False, ) nchains = chains.ngroups # Within chain variance ``W`` # (average variance within each chain): W = chains.cov().groupby(level=('params', 'labels'), sort=False).mean() # Between-chain variance ``B`` # (variance of the chain means): B = chains.mean().drop_weights().cov() # We don't weight `B` with the effective number of samples (sum of the # weights), here, because we want to notice outliers from shorter # chains. # In order to be conservative, we generally want to underestimate `W` # and overestimate `B`, since `W` goes in the denominator and `B` in # the numerator of the Gelman--Rubin statistic `Rminus1`. try: # note: scipy's cholesky returns U, not L invU = np.linalg.inv(scipy.linalg.cholesky(W)) except np.linalg.LinAlgError as e: raise np.linalg.LinAlgError( "Make sure you do not have linearly dependent parameters, " "e.g. having both `As` and `A=1e9*As` causes trouble.") from e D = np.linalg.eigvalsh(invU.T @ ((nchains+1)/nchains * B) @ invU) # The factor of `(nchains+1)/nchains` accounts for the additional # uncertainty from using a finite number of chains. Rminus1_tot = np.max(np.abs(D)) if per_param is False: return Rminus1_tot Rminus1 = (nchains + 1) / nchains * B / W.drop_weights() Rminus1_par = pandas.DataFrame(np.diag(Rminus1), index=B.columns, columns=['R-1']) if per_param is True: return Rminus1_tot, Rminus1_par if per_param == 'par': return Rminus1_par Rminus1_cov = pandas.DataFrame(Rminus1, index=B.columns, columns=W.columns) if per_param == 'cov': return Rminus1_cov return Rminus1_tot, Rminus1_cov
[docs] class NestedSamples(Samples): """Storage and plotting tools for Nested Sampling samples. We extend the :class:`Samples` class with the additional methods: * ``self.live_points(logL)`` * ``self.set_beta(beta)`` * ``self.prior()`` * ``self.posterior_points(beta)`` * ``self.prior_points()`` * ``self.stats()`` * ``self.logZ()`` * ``self.D_KL()`` * ``self.d()`` * ``self.recompute()`` * ``self.gui()`` * ``self.importance_sample()`` Parameters ---------- data : np.array Coordinates of samples. shape = (nsamples, ndims). columns : list(str) reference names of parameters logL : np.array loglikelihoods of samples. logL_birth : np.array or int birth loglikelihoods, or number of live points. labels : dict optional mapping from column names to plot labels label : str Legend label default: basename of root beta : float thermodynamic inverse temperature default: 1. logzero : float The threshold for `log(0)` values assigned to rejected sample points. Anything equal or below this value is set to `-np.inf`. default: -1e30 """ _metadata = Samples._metadata + ['root', '_beta'] def __init__(self, *args, **kwargs): logzero = kwargs.pop('logzero', -1e30) self._beta = kwargs.pop('beta', 1.) logL_birth = kwargs.pop('logL_birth', None) if not isinstance(logL_birth, int) and logL_birth is not None: logL_birth = np.array(logL_birth) logL_birth = np.where(logL_birth <= logzero, -np.inf, logL_birth) super().__init__(logzero=logzero, *args, **kwargs) if logL_birth is not None: self.recompute(logL_birth, inplace=True) @property def _constructor(self): return NestedSamples def _compute_insertion_indexes(self): logL = self.logL.to_numpy() logL_birth = self.logL_birth.to_numpy() self['insertion'] = compute_insertion_indexes(logL, logL_birth) @property def beta(self): """Thermodynamic inverse temperature.""" return self._beta @beta.setter def beta(self, beta): self._beta = beta logw = self.logw(beta=beta) self.set_weights(np.exp(logw - logw.max()), inplace=True)
[docs] def set_beta(self, beta, inplace=False): """Change the inverse temperature. Parameters ---------- beta : float Inverse temperature to set. (``beta=0`` corresponds to the prior distribution.) inplace : bool, default=False Indicates whether to modify the existing array, or return a copy with the inverse temperature changed. """ if inplace: self.beta = beta else: data = self.copy() data.beta = beta return data
[docs] def prior(self, inplace=False): """Re-weight samples at infinite temperature to get prior samples.""" return self.set_beta(beta=0, inplace=inplace)
# TODO: remove this in version >= 2.1
[docs] def ns_output(self, *args, **kwargs): # noqa: disable=D102 raise NotImplementedError( "This is anesthetic 1.0 syntax. You need to update, e.g.\n" "samples.ns_output(1000) # anesthetic 1.0\n" "samples.stats(1000) # anesthetic 2.0\n\n" "Check out the new temperature functionality: help(samples.stats)," " as well as average loglikelihoods: help(samples.logL_P)" )
[docs] def stats(self, nsamples=None, beta=None): r"""Compute Nested Sampling statistics. Using nested sampling we can compute: - ``logZ``: Bayesian evidence .. math:: \log Z = \int L \pi d\theta - ``D_KL``: Kullback--Leibler divergence .. math:: D_{KL} = \int P \log(P / \pi) d\theta - ``logL_P``: posterior averaged log-likelihood .. math:: \langle\log L\rangle_P = \int P \log L d\theta - ``d_G``: Gaussian model dimensionality (or posterior variance of the log-likelihood) .. math:: d_G/2 = \langle(\log L)^2\rangle_P - \langle\log L\rangle_P^2 see `Handley and Lemos (2019) <https://arxiv.org/abs/1903.06682>`_ for more details on model dimensionalities. (Note that all of these are available as individual functions with the same signature.) In addition to point estimates nested sampling provides an error bar or more generally samples from a (correlated) distribution over the variables. Samples from this distribution can be computed by providing an integer nsamples. Nested sampling as an athermal algorithm is also capable of producing these as a function of inverse thermodynamic temperature beta. This is provided as a vectorised function. If nsamples is also provided a MultiIndex dataframe is generated. These obey Occam's razor equation: .. math:: \log Z = \langle\log L\rangle_P - D_{KL}, which splits a model's quality ``logZ`` into a goodness-of-fit ``logL_P`` and a complexity penalty ``D_KL``. See `Hergt et al. (2021) <https://arxiv.org/abs/2102.11511>`_ for more detail. Parameters ---------- nsamples : int, optional - If nsamples is not supplied, calculate mean value - If nsamples is integer, draw nsamples from the distribution of values inferred by nested sampling beta : float, array-like, optional inverse temperature(s) beta=1/kT. Default self.beta Returns ------- if beta is scalar and nsamples is None: Series, index ['logZ', 'd_G', 'DK_L', 'logL_P'] elif beta is scalar and nsamples is int: :class:`Samples`, index range(nsamples), columns ['logZ', 'd_G', 'DK_L', 'logL_P'] elif beta is array-like and nsamples is None: :class:`Samples`, index beta, columns ['logZ', 'd_G', 'DK_L', 'logL_P'] elif beta is array-like and nsamples is int: :class:`Samples`, index :class:`pandas.MultiIndex` the product of beta and range(nsamples) columns ['logZ', 'd_G', 'DK_L', 'logL_P'] """ logw = self.logw(nsamples, beta) if nsamples is None and beta is None: samples = self._constructor_sliced(index=self.columns[:0], dtype=float) else: samples = Samples(index=logw.columns, columns=self.columns[:0]) samples['logZ'] = self.logZ(logw) samples.set_label('logZ', r'$\ln\mathcal{Z}$') w = np.exp(logw-samples['logZ']) betalogL = self._betalogL(beta) S = (logw*0).add(betalogL, axis=0) - samples.logZ samples['D_KL'] = (S*w).sum() samples.set_label('D_KL', r'$\mathcal{D}_\mathrm{KL}$') samples['logL_P'] = samples['logZ'] + samples['D_KL'] samples.set_label('logL_P', r'$\langle\ln\mathcal{L}\rangle_\mathcal{P}$') samples['d_G'] = ((S-samples.D_KL)**2*w).sum()*2 samples.set_label('d_G', r'$d_\mathrm{G}$') samples.label = self.label return samples
[docs] def logX(self, nsamples=None): """Log-Volume. The log of the prior volume contained within each iso-likelihood contour. Parameters ---------- nsamples : int, optional - If nsamples is not supplied, calculate mean value - If nsamples is integer, draw nsamples from the distribution of values inferred by nested sampling Returns ------- if nsamples is None: WeightedSeries like self elif nsamples is int: WeightedDataFrame like self, columns range(nsamples) """ if nsamples is None: t = np.log(self.nlive/(self.nlive+1)) else: r = np.log(np.random.rand(len(self), nsamples)) w = self.get_weights() r = self.nlive._constructor_expanddim(r, self.index, weights=w) t = r.divide(self.nlive, axis=0) t.columns.name = 'samples' logX = t.cumsum() logX.name = 'logX' return logX
# TODO: remove this in version >= 2.1
[docs] def dlogX(self, nsamples=None): # noqa: disable=D102 raise NotImplementedError( "This is anesthetic 1.0 syntax. You should instead use logdX." )
[docs] def logdX(self, nsamples=None): """Compute volume of shell of loglikelihood. Parameters ---------- nsamples : int, optional - If nsamples is not supplied, calculate mean value - If nsamples is integer, draw nsamples from the distribution of values inferred by nested sampling Returns ------- if nsamples is None: WeightedSeries like self elif nsamples is int: WeightedDataFrame like self, columns range(nsamples) """ logX = self.logX(nsamples) logXp = logX.shift(1, fill_value=0) logXm = logX.shift(-1, fill_value=-np.inf) logdX = np.log(1 - np.exp(logXm-logXp)) + logXp - np.log(2) logdX.name = 'logdX' return logdX
def _betalogL(self, beta=None): """Log(L**beta) convenience function. Parameters ---------- beta : scalar or array-like, optional inverse temperature(s) beta=1/kT. Default self.beta Returns ------- if beta is scalar: WeightedSeries like self elif beta is array-like: WeightedDataFrame like self, columns of beta """ if beta is None: beta = self.beta logL = self.logL if np.isscalar(beta): betalogL = beta * logL betalogL.name = 'betalogL' else: betalogL = logL._constructor_expanddim(np.outer(self.logL, beta), self.index, columns=beta) betalogL.columns.name = 'beta' return betalogL
[docs] def logw(self, nsamples=None, beta=None): """Log-nested sampling weight. The logarithm of the (unnormalised) sampling weight log(L**beta*dX). Parameters ---------- nsamples : int, optional - If nsamples is not supplied, calculate mean value - If nsamples is integer, draw nsamples from the distribution of values inferred by nested sampling - If nsamples is array, nsamples is assumed to be logw and returned (implementation convenience functionality) beta : float, array-like, optional inverse temperature(s) beta=1/kT. Default self.beta Returns ------- if nsamples is array-like: WeightedDataFrame equal to nsamples elif beta is scalar and nsamples is None: WeightedSeries like self elif beta is array-like and nsamples is None: WeightedDataFrame like self, columns of beta elif beta is scalar and nsamples is int: WeightedDataFrame like self, columns of range(nsamples) elif beta is array-like and nsamples is int: WeightedDataFrame like self, MultiIndex columns the product of beta and range(nsamples) """ if np.ndim(nsamples) > 0: return nsamples logdX = self.logdX(nsamples) betalogL = self._betalogL(beta) if logdX.ndim == 1 and betalogL.ndim == 1: logw = logdX + betalogL elif logdX.ndim > 1 and betalogL.ndim == 1: logw = logdX.add(betalogL, axis=0) elif logdX.ndim == 1 and betalogL.ndim > 1: logw = betalogL.add(logdX, axis=0) else: cols = MultiIndex.from_product([betalogL.columns, logdX.columns]) logdX = logdX.reindex(columns=cols, level='samples') betalogL = betalogL.reindex(columns=cols, level='beta') logw = betalogL+logdX return logw
[docs] def logZ(self, nsamples=None, beta=None): """Log-Evidence. Parameters ---------- nsamples : int, optional - If nsamples is not supplied, calculate mean value - If nsamples is integer, draw nsamples from the distribution of values inferred by nested sampling - If nsamples is array, nsamples is assumed to be logw beta : float, array-like, optional inverse temperature(s) beta=1/kT. Default self.beta Returns ------- if nsamples is array-like: :class:`pandas.Series`, index nsamples.columns elif beta is scalar and nsamples is None: float elif beta is array-like and nsamples is None: :class:`pandas.Series`, index beta elif beta is scalar and nsamples is int: :class:`pandas.Series`, index range(nsamples) elif beta is array-like and nsamples is int: :class:`pandas.Series`, :class:`pandas.MultiIndex` columns the product of beta and range(nsamples) """ logw = self.logw(nsamples, beta) logZ = logsumexp(logw, axis=0) if np.isscalar(logZ): return logZ else: return logw._constructor_sliced(logZ, name='logZ', index=logw.columns).squeeze()
_logZ_function_shape = '\n' + '\n'.join(logZ.__doc__.split('\n')[1:]) # TODO: remove this in version >= 2.1
[docs] def D(self, nsamples=None): # noqa: disable=D102 raise NotImplementedError( "This is anesthetic 1.0 syntax. You need to update, e.g.\n" "samples.D(1000) # anesthetic 1.0\n" "samples.D_KL(1000) # anesthetic 2.0\n\n" "Check out the new temperature functionality: help(samples.D_KL), " "as well as average loglikelihoods: help(samples.logL_P)" )
[docs] def D_KL(self, nsamples=None, beta=None): """Kullback--Leibler divergence.""" logw = self.logw(nsamples, beta) logZ = self.logZ(logw, beta) betalogL = self._betalogL(beta) S = (logw*0).add(betalogL, axis=0) - logZ w = np.exp(logw-logZ) D_KL = (S*w).sum() if np.isscalar(D_KL): return D_KL else: return self._constructor_sliced(D_KL, name='D_KL', index=logw.columns).squeeze()
D_KL.__doc__ += _logZ_function_shape # TODO: remove this in version >= 2.1
[docs] def d(self, nsamples=None): # noqa: disable=D102 raise NotImplementedError( "This is anesthetic 1.0 syntax. You need to update, e.g.\n" "samples.d(1000) # anesthetic 1.0\n" "samples.d_G(1000) # anesthetic 2.0\n\n" "Check out the new temperature functionality: help(samples.d_G), " "as well as average loglikelihoods: help(samples.logL_P)" )
[docs] def d_G(self, nsamples=None, beta=None): """Bayesian model dimensionality.""" logw = self.logw(nsamples, beta) logZ = self.logZ(logw, beta) betalogL = self._betalogL(beta) S = (logw*0).add(betalogL, axis=0) - logZ w = np.exp(logw-logZ) D_KL = (S*w).sum() d_G = ((S-D_KL)**2*w).sum()*2 if np.isscalar(d_G): return d_G else: return self._constructor_sliced(d_G, name='d_G', index=logw.columns).squeeze()
d_G.__doc__ += _logZ_function_shape
[docs] def logL_P(self, nsamples=None, beta=None): """Posterior averaged loglikelihood.""" logw = self.logw(nsamples, beta) logZ = self.logZ(logw, beta) betalogL = self._betalogL(beta) betalogL = (logw*0).add(betalogL, axis=0) w = np.exp(logw-logZ) logL_P = (betalogL*w).sum() if np.isscalar(logL_P): return logL_P else: return self._constructor_sliced(logL_P, name='logL_P', index=logw.columns).squeeze()
logL_P.__doc__ += _logZ_function_shape
[docs] def contour(self, logL=None): """Convert contour from (index or None) to a float loglikelihood. Convention is that live points are inclusive of the contour. Helper function for: - NestedSamples.live_points, - NestedSamples.dead_points, - NestedSamples.truncate. Parameters ---------- logL : float or int, optional Loglikelihood or iteration number If not provided, return the contour containing the last set of live points. Returns ------- logL : float Loglikelihood of contour """ if logL is None: logL = self.loc[self.logL > self.logL_birth.max()].logL.iloc[0] elif isinstance(logL, float): pass else: logL = float(self.logL[logL]) return logL
[docs] def live_points(self, logL=None): """Get the live points within a contour. Parameters ---------- logL : float or int, optional Loglikelihood or iteration number to return live points. If not provided, return the last set of active live points. Returns ------- live_points : Samples Live points at either: - contour logL (if input is float) - ith iteration (if input is integer) - last set of live points if no argument provided """ logL = self.contour(logL) i = ((self.logL >= logL) & (self.logL_birth < logL)).to_numpy() return Samples(self[i]).set_weights(None)
[docs] def dead_points(self, logL=None): """Get the dead points at a given contour. Convention is that dead points are exclusive of the contour. Parameters ---------- logL : float or int, optional Loglikelihood or iteration number to return dead points. If not provided, return the last set of dead points. Returns ------- dead_points : Samples Dead points at either: - contour logL (if input is float) - ith iteration (if input is integer) - last set of dead points if no argument provided """ logL = self.contour(logL) i = ((self.logL < logL)).to_numpy() return Samples(self[i]).set_weights(None)
[docs] def truncate(self, logL=None): """Truncate the run at a given contour. Returns the union of the live_points and dead_points. Parameters ---------- logL : float or int, optional Loglikelihood or iteration number to truncate run. If not provided, truncate at the last set of dead points. Returns ------- truncated_run : NestedSamples Run truncated at either: - contour logL (if input is float) - ith iteration (if input is integer) - last set of dead points if no argument provided """ dead_points = self.dead_points(logL) live_points = self.live_points(logL) index = np.concatenate([dead_points.index, live_points.index]) return self.loc[index].recompute()
[docs] def posterior_points(self, beta=1): """Get equally weighted posterior points at temperature beta.""" return self.set_beta(beta).compress('equal')
[docs] def prior_points(self, params=None): """Get equally weighted prior points.""" return self.posterior_points(beta=0)
[docs] def gui(self, params=None): """Construct a graphical user interface for viewing samples.""" return RunPlotter(self, params)
[docs] def importance_sample(self, logL_new, action='add', inplace=False): """Perform importance re-weighting on the log-likelihood. Parameters ---------- logL_new : np.array New log-likelihood values. Should have the same shape as `logL`. action : str, default='add' Can be any of {'add', 'replace', 'mask'}. * add: Add the new `logL_new` to the current `logL`. * replace: Replace the current `logL` with the new `logL_new`. * mask: treat `logL_new` as a boolean mask and only keep the corresponding (True) samples. inplace : bool, optional Indicates whether to modify the existing array, or return a new frame with importance sampling applied. default: False Returns ------- samples : :class:`NestedSamples` Importance re-weighted samples. """ samples = super().importance_sample(logL_new, action=action) mask = (samples.logL > samples.logL_birth).to_numpy() samples = samples[mask].recompute() if inplace: self._update_inplace(samples) else: return samples.__finalize__(self, "importance_sample")
[docs] def recompute(self, logL_birth=None, inplace=False): """Re-calculate the nested sampling contours and live points. Parameters ---------- logL_birth : array-like or int, optional * array-like: the birth contours. * int: the number of live points. * default: use the existing birth contours to compute nlive inplace : bool, default=False Indicates whether to modify the existing array, or return a new frame with contours resorted and nlive recomputed """ if inplace: samples = self else: samples = self.copy() nlive_label = r'$n_\mathrm{live}$' if is_int(logL_birth): nlive = logL_birth samples.sort_values('logL', inplace=True) samples.reset_index(drop=True, inplace=True) n = np.ones(len(self), int) * nlive n[-nlive:] = np.arange(nlive, 0, -1) samples['nlive', nlive_label] = n else: if logL_birth is not None: label = r'$\ln\mathcal{L}_\mathrm{birth}$' samples['logL_birth'] = logL_birth if self.islabelled(): samples.set_label('logL_birth', label) if 'logL_birth' not in samples: raise RuntimeError("Cannot recompute run without " "birth contours logL_birth.") invalid = (samples.logL <= samples.logL_birth).to_numpy() n_bad = invalid.sum() n_equal = (samples.logL == samples.logL_birth).sum() if n_bad: warnings.warn("%i out of %i samples have logL <= logL_birth," "\n%i of which have logL == logL_birth." "\nThis may just indicate numerical rounding " "errors at the peak of the likelihood, but " "further investigation of the chains files is " "recommended." "\nDropping the invalid samples." % (n_bad, len(samples), n_equal), RuntimeWarning) samples = samples[~invalid].reset_index(drop=True) samples.sort_values('logL', inplace=True) samples.reset_index(drop=True, inplace=True) nlive = compute_nlive(samples.logL, samples.logL_birth) samples['nlive'] = nlive if self.islabelled(): samples.set_label('nlive', nlive_label) samples.beta = samples._beta if np.any(pandas.isna(samples.logL)): warnings.warn("NaN encountered in logL. If this is unexpected, you" " should investigate why your likelihood is throwing" " NaNs. Dropping these samples at prior level", RuntimeWarning) samples = samples[samples.logL.notna().to_numpy()].recompute() if inplace: self._update_inplace(samples) else: return samples.__finalize__(self, "recompute")
[docs] def merge_nested_samples(runs): """Merge one or more nested sampling runs. Parameters ---------- runs : list(:class:`NestedSamples`) List or array-like of one or more nested sampling runs. If only a single run is provided, this recalculates the live points and as such can be used for masked runs. Returns ------- samples : :class:`NestedSamples` Merged run. """ merge = pandas.concat(runs, ignore_index=True) return merge.recompute()
[docs] def merge_samples_weighted(samples, weights=None, label=None): r"""Merge sets of samples with weights. Combine two (or more) samples so the new PDF is P(x|new) = weight_A P(x|A) + weight_B P(x|B). The number of samples and internal weights do not affect the result. Parameters ---------- samples : list(:class:`NestedSamples`) or list(:class:`MCMCSamples`) List or array-like of one or more MCMC or nested sampling runs. weights : list(double) or None Weight for each run in samples (normalized internally). Can be omitted if samples are :class:`NestedSamples`, then exp(logZ) is used as weight. label : str or None, default=None Label for the new samples. Returns ------- new_samples : :class:`Samples` Merged (weighted) run. """ if not (isinstance(samples, Sequence) or isinstance(samples, Series)): raise TypeError("samples must be a list of samples " "(Sequence or pandas.Series)") mcmc_samples = copy.deepcopy([Samples(s) for s in samples]) if weights is None: try: logZs = np.array(copy.deepcopy([s.logZ() for s in samples])) except AttributeError: raise ValueError("If samples includes MCMCSamples " "then weights must be given.") # Subtract logsumexp to avoid numerical issues (similar to max(logZs)) logZs -= logsumexp(logZs) weights = np.exp(logZs) else: if len(weights) != len(samples): raise ValueError("samples and weights must have the same length," "each weight is for a whole sample. Currently", len(samples), len(weights)) new_samples = [] for s, w in zip(mcmc_samples, weights): # Normalize the given weights new_weights = s.get_weights() / s.get_weights().sum() new_weights *= w/np.sum(weights) s = Samples(s, weights=new_weights) new_samples.append(s) new_samples = pandas.concat(new_samples) new_weights = new_samples.get_weights() new_weights /= new_weights.max() new_samples.set_weights(new_weights, inplace=True) new_samples.label = label return new_samples
adjust_docstrings(Samples.to_hdf, r'(pd|pandas)\.DataFrame', 'DataFrame') adjust_docstrings(Samples.to_hdf, 'DataFrame', 'pandas.DataFrame') adjust_docstrings(Samples.to_hdf, r'(pd|pandas)\.read_hdf', 'read_hdf') adjust_docstrings(Samples.to_hdf, 'read_hdf', 'pandas.read_hdf') adjust_docstrings(Samples.to_hdf, ':func:`open`', '`open`')