Source code for anesthetic.boundary

"""Boundary correction utilities.

Implements local-linear boundary correction for Gaussian kernel density
estimates following:

M. C. Jones (1993),
"Simple boundary correction for kernel density estimation",
Statistics and Computing, 3, 135-146.
https://doi.org/10.1007/BF00147776

J. E. Chacón & T. Duong (2018),
"Multivariate Kernel Smoothing and Its Applications",
Chapman and Hall/CRC, Chapter 4.
https://doi.org/10.1201/9780429485572-4
https://www.researchgate.net/publication/345555871_Modified_density_estimation

"""

import numpy as np
from scipy.special import ndtr
from scipy.stats import norm
from scipy.stats._kde import _get_output_dtype
from scipy.stats._stats import gaussian_kernel_estimate

# Pre-compute 20-point Gauss-Legendre nodes and weights for _bvn_cdf.
_GL_NODES, _GL_WEIGHTS = np.polynomial.legendre.leggauss(20)


def _bvn_cdf(x, y, rho):
    r"""Vectorized bivariate standard-normal CDF.

    Computes `P(X < x, Y < y)` where `(X, Y)` follow a standard bivariate
    normal distribution with correlation `rho`, using the Drezner (1978)
    integral representation with 20-point Gauss-Legendre quadrature.

    Parameters
    ----------
    x, y : np.array
        Evaluation coordinates (arbitrary but matching shape).
    rho : float
        Correlation coefficient, ``-1 < rho < 1``.

    Returns
    -------
    cdf : np.array
        Bivariate normal CDF values, same shape as `x` and `y`.

    """
    cdf = np.empty_like(x)

    # infinities block
    fin = np.isfinite(x) & np.isfinite(y)
    cdf[~fin] = ndtr(x[~fin]) * ndtr(y[~fin])
    if not np.any(fin):
        return cdf

    # finite block
    xf = x[fin]
    yf = y[fin]
    theta = np.arcsin(rho)
    t = theta / 2 * (_GL_NODES + 1)
    w = theta / 2 * _GL_WEIGHTS
    sin_t = np.sin(t)
    cos2_t = np.cos(t)**2
    X = xf[:, None]
    Y = yf[:, None]
    arg = np.exp(-(X**2 + Y**2 - 2 * X * Y * sin_t) / (2 * cos2_t))
    cdf[fin] = (ndtr(xf) * ndtr(yf) + np.sum(arg * w, axis=-1) / (2 * np.pi))
    return cdf


def _truncated_moments(x, cov, x_limits):
    r"""Compute truncated Gaussian kernel moments.

    For a Gaussian kernel with covariance matrix ``H = D R D``, where
    ``D = diag(sqrt(diag(H)))`` and ``R`` is the corresponding correlation
    matrix, compute the truncated moments of the Gaussian kernel over the part
    of the kernel support that remains inside the box ``x_limits``.

    Parameters
    ----------
    x : np.array
        Evaluation coordinates, shape ``(M, d)``.
    cov : np.array
        Kernel covariance matrix, shape ``(d, d)``.
    x_limits : np.array
        Lower and upper prior bounds in physical coordinates, shape ``(2, d)``.
        Use ``-np.inf`` and ``np.inf`` for unbounded directions.

    Returns
    -------
    a0 : np.array
        Zeroth truncated kernel moment, shape ``(M,)``.
    a1 : np.array
        First bandwidth-scaled truncated kernel moment, shape ``(M, d)``.
    A2 : np.array
        Second bandwidth-scaled truncated kernel moment, shape ``(M, d, d)``.

    Notes
    -----
    The returned moments are the generic ``d``-dimensional objects:

    .. math::
        a_0(x) = \int_{\Omega_x} \phi_R(u) du,

        a_1(x) = \int_{\Omega_x} u \, \phi_R(u) du,

        A_2(x) = \int_{\Omega_x} u u^T \phi_R(u) du,

    where ``u = D^{-1}(x-t)`` and ``\Omega_x`` is the allowed region in these
    bandwidth-scaled coordinates. For ``d = 1`` these reduce exactly to the
    scalar moments used in the 1D Jones correction.

    """
    m, d = x.shape
    if d not in (1, 2):
        raise NotImplementedError(f"boundary correction currently supports "
                                  f"only 1D and 2D KDEs, got d={d}")
    if cov.shape != (d, d):
        raise ValueError(f"cov must have shape {(d, d)}, got {cov.shape}")
    if x_limits.shape != (2, d):
        raise ValueError(f"x_limits must have shape {(2, d)}, "
                         f"got {x_limits.shape}")

    eps = np.finfo(float).eps

    # Split covariance into per-axis scales and a correlation matrix.
    sigma = np.sqrt(np.diag(cov))  # (d,)
    corr = cov / np.outer(sigma, sigma)  # (d, d)
    gaussian = norm(loc=0, scale=1)

    # Lower/upper bounds in bandwidth-scaled coordinates.
    # (M, 2, d)
    u_limits = (x_limits[None, :, :] - x[:, None, :]) / sigma[None, None, :]

    # All 2^d rectangle corners in bandwidth-scaled coordinates.
    # (M, *(2,)*d, d)
    u_corner = np.stack(np.broadcast_arrays(*[
        u_limits[:, :, i].reshape((m,) + tuple(2 if j == i else 1
                                               for j in range(d)))
        for i in range(d)
    ]), axis=-1)

    # Inclusion-exclusion signs for the corner sums.
    signs = np.array([-1, +1])  # (2,)
    for _ in range(d - 1):
        signs = np.multiply.outer(signs, [-1, +1])  # *(2,)*d

    # Multivariate CDF and marginal PDF at each corner.
    if d == 1:
        cdf = gaussian.cdf(u_corner[..., 0])  # (M, 2)
    else:
        rho = np.clip(corr[0, 1], -1 + eps, 1 - eps)
        s = np.sqrt(1 - rho**2)
        cdf = _bvn_cdf(u_corner[..., 0], u_corner[..., 1], rho)  # (M, 2, 2)
    pdf = gaussian.pdf(u_corner)  # (M, *(2,)*d, d)

    # Corner contributions to the gradient and Hessian of a0.
    # grad_i = phi(u_i) * Phi(u_{-i} | u_i).
    grad = np.empty_like(u_corner)  # (M, *(2,)*d, d)
    hess = np.zeros(u_corner.shape + (d,))  # (M, *(2,)*d, d, d)
    for i in range(d):
        ui = u_corner[..., i]
        inf_i = np.isinf(ui)
        grad[..., i] = pdf[..., i]
        corr_hess = 0
        # For d=2, multiply by the conditional CDF Phi(u_j|u_i) and build the
        # off-diagonal Hessian from the conditional PDF. For d=1 this block is
        # skipped and grad_i = phi(u_i).
        if d == 2:
            j = 1 - i
            uj = u_corner[..., j]
            v = (uj - rho * ui) / s
            v = np.where(np.isposinf(uj), +np.inf, v)
            v = np.where(np.isneginf(uj), -np.inf, v)
            inf_i_fin_j = inf_i & np.isfinite(uj)
            if rho == 0:
                v = np.where(inf_i_fin_j, uj / s, v)
            else:
                v = np.where(inf_i_fin_j, -np.sign(rho) * ui, v)
            grad[..., i] *= gaussian.cdf(v)
            hess[..., i, j] = pdf[..., i] * gaussian.pdf(v) / s
            corr_hess += corr[i, j] * hess[..., i, j]
        hess[..., i, i] = np.where(inf_i, 0, -ui * grad[..., i]) - corr_hess

    # Signed corner sums give a0 and its derivatives.
    axes = tuple(range(1, d + 1))  # corner axes
    a0 = np.sum(cdf * signs, axis=axes)  # (M,)
    grad_a0 = -np.sum(grad * signs[..., None], axis=axes)  # (M, d)
    hess_a0 = np.sum(hess * signs[..., None, None], axis=axes)  # (M, d, d)

    # Recover the first and second truncated moments from derivatives of a0.
    a1 = -grad_a0 @ corr  # (M, d)
    A2 = a0[:, None, None] * corr[None, :, :]  # (M, d, d)
    A2 += np.einsum('ij,mjk,kl->mil', corr, hess_a0, corr)  # (M, d, d)

    return a0, a1, A2


def _kde_eval(kde, x):
    r"""Evaluate KDE and first bandwidth-scaled kernel moment in a single pass.

    For a KDE with kernel covariance ``H = D R D``, kernel ``K``, and samples
    ``s_i`` with weights ``w_i``, computes

    .. math::
        f(x) = m_0(x) &= \sum_i w_i K(x - s_i), \\
               m_1(x) &= \sum_i w_i D^{-1}(x-s_i) K(x-s_i)
                       = -R \, \nabla_{D^{-1}x} f(x).

    In 1D, this reduces to the usual bandwidth-scaled kernel moment
    ``m_1(x) = \sum_i w_i (x-s_i) K(x-s_i) / h``.

    Parameters
    ----------
    kde : scipy.stats.gaussian_kde
        Fitted KDE object (1D or 2D).
    x : np.array
        Evaluation coordinates, shape ``(M, d)``.

    Returns
    -------
    f : np.array
        KDE density, shape ``(M,)``.
        Equivalently the zeroth bandwidth-scaled kernel moment.
    moment1 : np.array
        First bandwidth-scaled kernel moment, shape ``(M, d)``.
        Equivalently, ``moment1 = -R ∇_{D^{-1}x} f`` in the
        bandwidth-scaled coordinates.

    """
    output_dtype, spec = _get_output_dtype(kde.covariance, x)

    # Pack the KDE weights and first raw moments into one Cython call.
    # (N, d+1)
    weighted_fields = np.empty((kde.n, kde.d + 1), dtype=output_dtype)
    weighted_fields[:, 0] = kde.weights  # (N,)
    weighted_fields[:, 1:] = kde.weights[:, None] * kde.dataset.T  # (N, d)

    estimate = gaussian_kernel_estimate[spec](kde.dataset.T,
                                              weighted_fields,
                                              x,
                                              kde.cho_cov,
                                              output_dtype)
    f = estimate[:, 0]  # (M,)

    # Convert first raw moments into bandwidth-scaled residual moments.
    residual = x * f[:, None] - estimate[:, 1:]  # (M, d)
    sigma = np.sqrt(np.diag(kde.covariance))  # (d,)
    moment1 = residual / sigma[None, :]  # (M, d)

    return f, moment1


def _corrected_density(f, m1, a0, a1, A2, order):
    """Compute boundary-corrected density from KDE and truncated moments.

    Implements renormalisation (order 0) and local-linear correction
    (order 1) from the generic vector/matrix moments ``a0``, ``a1``, ``A2``.

    For ``d = 1`` this reduces exactly to the scalar Jones (1993), Eq. (3.4).
    https://doi.org/10.1007/BF00147776

    For ``d = 2`` this follows Chacón & Duong (2018), Sec. 4.3.2, Eq. (4.4).
    https://doi.org/10.1201/9780429485572-4
    (Note, our notation matches Jones, not Chacón & Duong.)

    Parameters
    ----------
    f : np.array
        Uncorrected KDE density, shape ``(M,)``.
    m1 : np.array
        First bandwidth-scaled kernel moment, shape ``(M, d)``.
    a0 : np.array
        Zeroth truncated kernel moment, shape ``(M,)``.
    a1 : np.array
        First bandwidth-scaled truncated kernel moment, shape ``(M, d)``.
    A2 : np.array
        Second bandwidth-scaled truncated kernel moment, shape ``(M, d, d)``.
    order : int
        Boundary correction order (0 or 1).

    Returns
    -------
    p : np.array
        Corrected density, clamped to non-negative values. The linear
        correction can produce negative values near boundaries; when the local
        linear system is singular or ill-conditioned, the correction falls back
        to renormalisation.

    """
    p = np.zeros_like(f)  # (M,)

    # Order-0 correction: renormalize by the truncated kernel mass.
    mask = a0 > 0  # (M,)
    p[mask] = f[mask] / a0[mask]

    if order == 0:
        np.maximum(p, 0, out=p)
        return p
    if order != 1:
        raise ValueError(f"order must be 0 or 1, got {order}")

    # Order-1 correction: solve the local linear system built from a1 and A2.
    determinant = np.linalg.det(A2)  # (M,)
    mask &= np.isfinite(determinant) & (determinant > 0)

    if np.any(mask):
        # Solve A2 x = [m1, a1] in one batched call.
        rhs = np.stack([m1[mask], a1[mask]], axis=-1)  # (M', d, 2)
        sol = np.linalg.solve(A2[mask], rhs)  # (M', d, 2)
        A2m1 = np.zeros_like(m1)  # (M, d)
        A2a1 = np.zeros_like(a1)  # (M, d)
        A2m1[mask] = sol[..., 0]
        A2a1[mask] = sol[..., 1]

        # p = (f - a1^T A2^{-1} m1) / (a0 - a1^T A2^{-1} a1)
        # Reduces to (a2 f - a1 m1) / (a0 a2 - a1^2) in 1D.
        numerator = f - np.einsum('mi,mi->m', a1, A2m1)  # (M,)
        denominator = a0 - np.einsum('mi,mi->m', a1, A2a1)  # (M,)

        # Apply the linear correction only where the system stays well-defined.
        mask &= denominator > 0
        mask &= np.isfinite(numerator) & np.isfinite(denominator)
        p[mask] = numerator[mask] / denominator[mask]

    # Local-linear correction can go negative near boundaries; clamp to zero.
    np.maximum(p, 0, out=p)
    return p


[docs] def boundary_correction_1d(kde, x, order=1, xmin=None, xmax=None): r"""Boundary correction for a 1D Gaussian KDE. Parameters ---------- kde : scipy.stats.gaussian_kde Fitted 1D KDE object. x : np.array Evaluation points. xmin, xmax : float, optional Lower/upper bounds. order : int, default=1 Boundary correction order. * < 0: no correction --- return raw KDE estimate. * 0: renormalisation --- O(h) bias. * 1: linear correction (Jones 1993, Eq. 3.4) --- O(h²) bias. Returns ------- p : np.array Boundary-corrected density values. """ if xmin is None and xmax is None or order < 0: return kde(x) # (2, d) x_limits = np.array([[-np.inf if xmin is None else xmin], [np.inf if xmax is None else xmax]], dtype=float) # Evaluate the raw KDE, truncated moments, and corrected density. f, m1 = _kde_eval(kde, x[:, None]) a0, a1, A2 = _truncated_moments(x[:, None], kde.covariance, x_limits) p = _corrected_density(f, m1, a0, a1, A2, order) if xmin is not None: p[x < xmin] = 0 if xmax is not None: p[x > xmax] = 0 return p
[docs] def boundary_correction_2d(kde, X, Y, order=1, xmin=None, xmax=None, ymin=None, ymax=None): r"""Boundary correction for a 2D Gaussian KDE. Applies renormalisation (order 0) or the full non-separable linear correction (order 1) using the full kernel covariance matrix. Parameters ---------- kde : scipy.stats.gaussian_kde Fitted 2D KDE object. X, Y : np.array 2D evaluation grids (from np.mgrid). xmin, xmax, ymin, ymax : float, optional Bounds per axis. order : int, default=1 Boundary correction order. * < 0: no correction --- return raw KDE estimate. * 0: renormalisation --- O(h) bias. * 1: linear correction --- O(h²) bias. Returns ------- p : np.array Boundary-corrected 2D density, same shape as X and Y. """ x = X.ravel() y = Y.ravel() has_x_bounds = xmin is not None or xmax is not None has_y_bounds = ymin is not None or ymax is not None if not has_x_bounds and not has_y_bounds or order < 0: return kde([x, y]).reshape(X.shape) # (2, d) x_limits = np.array([[-np.inf if xmin is None else xmin, -np.inf if ymin is None else ymin], [np.inf if xmax is None else xmax, np.inf if ymax is None else ymax]], dtype=float) # Evaluate the raw KDE, truncated moments, and corrected density. coords = np.column_stack([x, y]) # (M, d) f, m1 = _kde_eval(kde, coords) a0, a1, A2 = _truncated_moments(coords, kde.covariance, x_limits) p = _corrected_density(f, m1, a0, a1, A2, order) if xmin is not None: p[x < xmin] = 0 if xmax is not None: p[x > xmax] = 0 if ymin is not None: p[y < ymin] = 0 if ymax is not None: p[y > ymax] = 0 return p.reshape(X.shape)