Source code for anesthetic.weighted_pandas

"""Pandas DataFrame and Series with weighted samples."""

import warnings
from inspect import signature
import numpy as np
from pandas import Series, DataFrame, concat, MultiIndex
from pandas.core.groupby import GroupBy, SeriesGroupBy, DataFrameGroupBy, ops
from pandas._libs import lib
from pandas._libs.lib import no_default
from pandas.util._exceptions import find_stack_level
from pandas.util import hash_pandas_object
from numpy.ma import masked_array
from anesthetic.utils import (compress_weights, neff, quantile,
                              temporary_seed, adjust_docstrings)
from pandas.core.dtypes.missing import notna
from pandas.core.accessor import CachedAccessor
from anesthetic.plotting import PlotAccessor
import pandas as pd


[docs] def read_csv(filename, *args, **kwargs): """Read a CSV file into a ``WeightedDataFrame``.""" df = pd.read_csv(filename, index_col=[0, 1], header=[0, 1], *args, **kwargs) wdf = WeightedDataFrame(df) if wdf.isweighted(0) and wdf.isweighted(1): wdf.set_weights(wdf.get_weights(axis=1).astype(float), axis=1, inplace=True) return wdf df = pd.read_csv(filename, index_col=[0, 1], *args, **kwargs) wdf = WeightedDataFrame(df) if wdf.isweighted(0): return wdf df = pd.read_csv(filename, index_col=0, header=[0, 1], *args, **kwargs) wdf = WeightedDataFrame(df) if wdf.isweighted(1): wdf.set_weights(wdf.get_weights(axis=1).astype(float), axis=1, inplace=True) return wdf df = pd.read_csv(filename, index_col=0, *args, **kwargs) return WeightedDataFrame(df)
[docs] class WeightedGroupBy(GroupBy): """Weighted version of ``pandas.core.groupby.GroupBy``.""" _grouper: ops.BaseGrouper def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _add_weights(self, name, *args, **kwargs): result = self.agg(lambda df: getattr(self.obj._constructor(df), name) (*args, **kwargs)).set_weights(self.get_weights()) return result.__finalize__(self.obj, method="groupby")
[docs] def mean(self, *args, **kwargs): # noqa: D102 return self._add_weights("mean", *args, **kwargs)
[docs] def std(self, *args, **kwargs): # noqa: D102 return self._add_weights("std", *args, **kwargs)
[docs] def median(self, *args, **kwargs): # noqa: D102 return self._add_weights("median", *args, **kwargs)
[docs] def var(self, *args, **kwargs): # noqa: D102 return self._add_weights("var", *args, **kwargs)
[docs] def kurt(self, *args, **kwargs): # noqa: D102 return self._add_weights("kurt", *args, **kwargs)
[docs] def kurtosis(self, *args, **kwargs): # noqa: D102 return self._add_weights("kurtosis", *args, **kwargs)
[docs] def sem(self, *args, **kwargs): # noqa: D102 return self._add_weights("sem", *args, **kwargs)
[docs] def skew(self, *args, **kwargs): # noqa: D102 return self._add_weights("skew", *args, **kwargs)
[docs] def quantile(self, *args, **kwargs): # noqa: D102 return self._add_weights("quantile", *args, **kwargs)
[docs] def get_weights(self): """Return the weights of the grouped samples.""" return self.agg(lambda df: df.get_weights().sum())
def _op_via_apply(self, name, *args, **kwargs): result = super()._op_via_apply(name, *args, **kwargs) try: index = result.index.get_level_values(self.keys) weights = self.get_weights()[index] except KeyError: weights = self.get_weights() return result.set_weights(weights, level=1)
[docs] class WeightedSeriesGroupBy(WeightedGroupBy, SeriesGroupBy): """Weighted version of ``pandas.core.groupby.SeriesGroupBy``."""
[docs] def sample(self, *args, **kwargs): # noqa: D102 return super().sample(weights=self.obj.get_weights(), *args, **kwargs)
[docs] def cov(self, *args, **kwargs): # noqa: D102 return self._op_via_apply("cov", *args, **kwargs)
[docs] class WeightedDataFrameGroupBy(WeightedGroupBy, DataFrameGroupBy): """Weighted version of ``pandas.core.groupby.DataFrameGroupBy``."""
[docs] def get_weights(self): """Return the weights of the grouped samples.""" return super().get_weights().min(axis=1-self.axis)
def _gotitem(self, key, ndim: int, subset=None): # pragma: no cover if ndim == 2: if subset is None: subset = self.obj return WeightedDataFrameGroupBy( subset, self._grouper, axis=self.axis, level=self.level, grouper=self._grouper, exclusions=self.exclusions, selection=key, as_index=self.as_index, sort=self.sort, group_keys=self.group_keys, observed=self.observed, mutated=self.mutated, dropna=self.dropna, ) elif ndim == 1: if subset is None: subset = self.obj[key] return WeightedSeriesGroupBy( subset, level=self.level, grouper=self._grouper, selection=key, sort=self.sort, group_keys=self.group_keys, observed=self.observed, dropna=self.dropna, ) raise AssertionError("invalid ndim for _gotitem")
[docs] def sample(self, *args, **kwargs): # noqa: D102 return super().sample(weights=self.obj.get_weights(), *args, **kwargs)
[docs] def cov(self, *args, **kwargs): # noqa: D102 return self._op_via_apply("cov", *args, **kwargs)
[docs] class _WeightedObject(object): """Common methods for `WeightedSeries` and `WeightedDataFrame`. :meta public: """ def __init__(self, *args, **kwargs): weights = kwargs.pop('weights', None) super().__init__(*args, **kwargs) if weights is not None: self.set_weights(weights, inplace=True) plot = CachedAccessor("plot", PlotAccessor) """:meta private:"""
[docs] def isweighted(self, axis=0): """Determine if weights are actually present.""" return 'weights' in self._get_axis(axis).names
[docs] def get_weights(self, axis=0): """Retrieve sample weights from an axis.""" if self.isweighted(axis): return self._get_axis(axis).get_level_values('weights').to_numpy() else: return np.ones_like(self._get_axis(axis))
[docs] def drop_weights(self, axis=0): """Drop weights.""" if self.isweighted(axis): return self.droplevel('weights', axis) return self.copy().__finalize__(self, "drop_weights")
[docs] def set_weights(self, weights, axis=0, inplace=False, level=None): """Set sample weights along an axis. Parameters ---------- weights : 1d array-like The sample weights to put in an index. axis : int (0,1), default=0 Whether to put weights in an index or column. inplace : bool, default=False Whether to operate inplace, or return a new array. level : int Which level in the index to insert before. Defaults to inserting at back """ if inplace: result = self else: result = self.copy() if weights is None: if result.isweighted(axis=axis): result = result.drop_weights(axis) else: names = [n for n in result._get_axis(axis).names if n != 'weights'] index = [result._get_axis(axis).get_level_values(n) for n in names] if level is None: if result.isweighted(axis): level = result._get_axis(axis).names.index('weights') else: level = len(index) index.insert(level, weights) names.insert(level, 'weights') index = MultiIndex.from_arrays(index, names=names) result = result.set_axis(index, axis=axis, copy=False) if inplace: self._update_inplace(result) else: return result.__finalize__(self, "set_weights")
def _rand(self, axis=0): """Random number for consistent compression.""" seed = hash_pandas_object(self._get_axis(axis)).sum() % 2**32 with temporary_seed(int(seed)): return np.random.rand(self.shape[axis])
[docs] def reset_index(self, level=None, drop=False, inplace=False, *args, **kwargs): """Reset the index, retaining weights.""" weights = self.get_weights() answer = super().reset_index(level=level, drop=drop, inplace=False, *args, **kwargs) answer.set_weights(weights, inplace=True) if inplace: self._update_inplace(answer) else: return answer.__finalize__(self, "reset_index")
[docs] def neff(self, axis=0, beta=1): """Effective number of samples.""" if self.isweighted(axis): return neff(self.get_weights(axis), beta=beta) else: return self.shape[axis]
[docs] class WeightedSeries(_WeightedObject, Series): """Weighted version of :class:`pandas.Series`."""
[docs] def mean(self, skipna=True): # noqa: D102 if self.get_weights().sum() == 0: return np.nan null = self.isnull() & skipna return np.average(masked_array(self, null), weights=self.get_weights())
[docs] def std(self, *args, **kwargs): # noqa: D102 return np.sqrt(self.var(*args, **kwargs))
[docs] def kurtosis(self, *args, **kwargs): # noqa: D102 return self.kurt(*args, **kwargs)
[docs] def median(self, *args, **kwargs): # noqa: D102 if self.get_weights().sum() == 0: return np.nan return self.quantile(*args, **kwargs)
[docs] def var(self, skipna=True): # noqa: D102 if self.get_weights().sum() == 0: return np.nan null = self.isnull() & skipna mean = self.mean(skipna=skipna) if np.isnan(mean): return mean return np.average(masked_array((self-mean)**2, null), weights=self.get_weights())
[docs] def cov(self, other, *args, **kwargs): # noqa: D102 this, other = self.align(other, join="inner", copy=False) if len(this) == 0: return np.nan weights = self.index.to_frame()['weights'] weights, _ = weights.align(other, join="inner", copy=False) valid = notna(this) & notna(other) if not valid.all(): this = this[valid] other = other[valid] weights = weights[valid] return np.cov(this, other, aweights=weights)[0, 1]
[docs] def corr(self, other, *args, **kwargs): # noqa: D102 norm = self.std(skipna=True)*other.std(skipna=True) return self.cov(other, *args, **kwargs)/norm
[docs] def kurt(self, skipna=True): # noqa: D102 if self.get_weights().sum() == 0: return np.nan null = self.isnull() & skipna mean = self.mean(skipna=skipna) std = self.std(skipna=skipna) if np.isnan(mean) or np.isnan(std): return np.nan return np.average(masked_array(((self-mean)/std)**4, null), weights=self.get_weights())
[docs] def skew(self, skipna=True): # noqa: D102 if self.get_weights().sum() == 0: return np.nan null = self.isnull() & skipna mean = self.mean(skipna=skipna) std = self.std(skipna=skipna) if np.isnan(mean) or np.isnan(std): return np.nan return np.average(masked_array(((self-mean)/std)**3, null), weights=self.get_weights())
[docs] def mad(self, skipna=True): # noqa: D102 if self.get_weights().sum() == 0: return np.nan null = self.isnull() & skipna mean = self.mean(skipna=skipna) if np.isnan(mean): return np.nan return np.average(masked_array(abs(self-mean), null), weights=self.get_weights())
[docs] def sem(self, skipna=True): # noqa: D102 return np.sqrt(self.var(skipna=skipna)/self.neff())
[docs] def quantile(self, q=0.5, interpolation='linear'): # noqa: D102 if self.get_weights().sum() == 0: return np.nan return quantile(self.to_numpy(), q, self.get_weights(), interpolation)
[docs] def compress(self, ncompress=True): """Reduce the number of samples by discarding low-weights. Parameters ---------- ncompress : int, str, default=True Degree of compression. * If ``True`` (default): reduce to the channel capacity (theoretical optimum compression), equivalent to ``ncompress='entropy'``. * If ``> 0``: desired number of samples after compression. * If ``<= 0``: compress so that all remaining weights are unity. * If ``str``: determine number from the Huggins-Roy family of effective samples in :func:`anesthetic.utils.neff` with ``beta=ncompress``. """ i = compress_weights(self.get_weights(), self._rand(), ncompress) return self.repeat(i)
[docs] def sample(self, *args, **kwargs): # noqa: D102 return super().sample(weights=self.get_weights(), *args, **kwargs)
@property def _constructor(self): return WeightedSeries @property def _constructor_expanddim(self): return WeightedDataFrame
[docs] def groupby( self, by=None, axis=0, level=None, as_index=True, sort=True, group_keys=True, observed=False, dropna=True, ): # pragma: no cover # noqa: D102 if level is None and by is None: raise TypeError("You have to supply one of 'by' and 'level'") if not as_index: raise TypeError("as_index=False only valid with DataFrame") axis = self._get_axis_number(axis) return WeightedSeriesGroupBy( obj=self, keys=by, axis=axis, level=level, as_index=as_index, sort=sort, group_keys=group_keys, observed=observed, dropna=dropna, )
[docs] class WeightedDataFrame(_WeightedObject, DataFrame): """Weighted version of :class:`pandas.DataFrame`."""
[docs] def mean(self, axis=0, skipna=True, *args, **kwargs): # noqa: D102 if self.isweighted(axis): if self.get_weights(axis).sum() == 0: return self._constructor_sliced(np.nan, index=self._get_axis(1-axis)) null = self.isnull() & skipna mean = np.average(masked_array(self, null), weights=self.get_weights(axis), axis=axis) return self._constructor_sliced(mean, index=self._get_axis(1-axis)) else: return super().mean(axis=axis, skipna=skipna, *args, **kwargs)
[docs] def std(self, *args, **kwargs): # noqa: D102 return np.sqrt(self.var(*args, **kwargs))
[docs] def kurtosis(self, *args, **kwargs): # noqa: D102 return self.kurt(*args, **kwargs)
[docs] def median(self, *args, **kwargs): # noqa: D102 return self.quantile(*args, **kwargs)
[docs] def var(self, axis=0, skipna=True, *args, **kwargs): # noqa: D102 if self.isweighted(axis): if self.get_weights(axis).sum() == 0: return self._constructor_sliced(np.nan, index=self._get_axis(1-axis)) null = self.isnull() & skipna mean = self.mean(axis=axis, skipna=skipna) var = np.average(masked_array((self-mean)**2, null), weights=self.get_weights(axis), axis=axis) return self._constructor_sliced(var, index=self._get_axis(1-axis)) else: return super().var(axis=axis, skipna=skipna, *args, **kwargs)
[docs] def cov(self, *args, **kwargs): # noqa: D102 if self.isweighted(): null = self.isnull() mean = self.mean(skipna=True) x = masked_array(self - mean, null) cov = np.ma.dot(self.get_weights()*x.T, x) \ / self.get_weights().sum().T if kwargs: raise NotImplementedError("The keywords %s are not implemented" "for the calculation of the" "covariance with weighted samples." % kwargs) return self._constructor(cov, index=self.columns, columns=self.columns) else: return super().cov(*args, **kwargs)
[docs] def corr(self, method="pearson", skipna=True, *args, **kwargs): # noqa: D102 if self.isweighted(): cov = self.cov() diag = np.sqrt(np.diag(cov)) return cov.divide(diag, axis=1).divide(diag, axis=0) else: return super().corr(*args, **kwargs)
[docs] def corrwith(self, other, axis=0, drop=False, method="pearson", *args, **kwargs): # noqa: D102 axis = self._get_axis_number(axis) if not self.isweighted(axis): return super().corrwith(other, drop=drop, axis=axis, method=method, *args, **kwargs) else: if isinstance(other, Series): answer = self.apply(lambda x: other.corr(x, method=method), axis=axis) return self._constructor_sliced(answer) left, right = self.align(other, join="inner", copy=False) if axis == 1: left = left.T right = right.T weights = left.index.to_frame()['weights'] weights, _ = weights.align(right, join="inner", copy=False) # mask missing values left = left + right * 0 right = right + left * 0 # demeaned data ldem = left - left.mean() rdem = right - right.mean() num = (ldem * rdem * weights.to_numpy()[:, None]).sum() dom = weights.sum() * left.std() * right.std() correl = num / dom if not drop: # Find non-matching labels along the given axis result_index = self._get_axis(1-axis).union( other._get_axis(1-axis)) idx_diff = result_index.difference(correl.index) if len(idx_diff) > 0: correl = concat([correl, Series([np.nan] * len(idx_diff), index=idx_diff)]) return self._constructor_sliced(correl)
[docs] def kurt(self, axis=0, skipna=True, *args, **kwargs): # noqa: D102 if self.isweighted(axis): if self.get_weights(axis).sum() == 0: return self._constructor_sliced(np.nan, index=self._get_axis(1-axis)) null = self.isnull() & skipna mean = self.mean(axis=axis, skipna=skipna) std = self.std(axis=axis, skipna=skipna) kurt = np.average(masked_array(((self-mean)/std)**4, null), weights=self.get_weights(axis), axis=axis) return self._constructor_sliced(kurt, index=self._get_axis(1-axis)) else: return super().kurt(axis=axis, skipna=skipna, *args, **kwargs)
[docs] def skew(self, axis=0, skipna=True, *args, **kwargs): # noqa: D102 if self.isweighted(axis): if self.get_weights(axis).sum() == 0: return self._constructor_sliced(np.nan, index=self._get_axis(1-axis)) null = self.isnull() & skipna mean = self.mean(axis=axis, skipna=skipna) std = self.std(axis=axis, skipna=skipna) skew = np.average(masked_array(((self-mean)/std)**3, null), weights=self.get_weights(axis), axis=axis) return self._constructor_sliced(skew, index=self._get_axis(1-axis)) else: return super().skew(axis=axis, skipna=skipna, *args, **kwargs)
[docs] def mad(self, axis=0, skipna=True, *args, **kwargs): # noqa: D102 if self.isweighted(axis): if self.get_weights(axis).sum() == 0: return self._constructor_sliced(np.nan, index=self._get_axis(1-axis)) null = self.isnull() & skipna mean = self.mean(axis=axis, skipna=skipna) mad = np.average(masked_array(abs(self-mean), null), weights=self.get_weights(axis), axis=axis) return self._constructor_sliced(mad, index=self._get_axis(1-axis)) else: return super().var(axis=axis, skipna=skipna, *args, **kwargs)
[docs] def sem(self, axis=0, skipna=True): # noqa: D102 n = self.neff(axis) return np.sqrt(self.var(axis=axis, skipna=skipna)/n)
[docs] def quantile(self, q=0.5, axis=0, numeric_only=None, interpolation='linear', method=None): # noqa: D102 if self.isweighted(axis): if numeric_only is not None or method is not None: raise NotImplementedError( "`numeric_only` and `method` kwargs not implemented for " "`WeightedSeries` and `WeightedDataFrame`." ) data = np.array([c.quantile(q=q, interpolation=interpolation) for _, c in self.items()]) if np.isscalar(q): return self._constructor_sliced(data, index=self._get_axis(1-axis)) else: return self._constructor(data.T, index=q, columns=self._get_axis(1-axis)) else: if numeric_only is None: numeric_only = True if method is None: method = 'single' return super().quantile(q=q, axis=axis, numeric_only=numeric_only, interpolation=interpolation, method=method)
[docs] def compress(self, ncompress=True, axis=0): """Reduce the number of samples by discarding low-weights. Parameters ---------- ncompress : int, str, default=True Degree of compression. * If ``True`` (default): reduce to the channel capacity (theoretical optimum compression), equivalent to ``ncompress='entropy'``. * If ``> 0``: desired number of samples after compression. * If ``<= 0``: compress so that all remaining weights are unity. * If ``str``: determine number from the Huggins-Roy family of effective samples in :func:`anesthetic.utils.neff` with ``beta=ncompress``. """ if self.isweighted(axis): i = compress_weights(self.get_weights(axis), self._rand(axis), ncompress) data = np.repeat(self.to_numpy(), i, axis=axis) i = self.drop_weights(axis)._get_axis(axis).repeat(i) df = self._constructor(data=data) df = df.set_axis(i, axis=axis, copy=False) df = df.set_axis(self._get_axis(1-axis), axis=1-axis, copy=False) return df else: return self
[docs] def sample(self, *args, **kwargs): # noqa: D102 sig = signature(DataFrame.sample) axis = sig.bind(self, *args, **kwargs).arguments.get('axis', 0) if self.isweighted(axis): return super().sample(weights=self.get_weights(axis), *args, **kwargs) else: return super().sample(*args, **kwargs)
@property def _constructor_sliced(self): return WeightedSeries @property def _constructor(self): return WeightedDataFrame
[docs] def groupby( self, by=None, axis=no_default, level=None, as_index: bool = True, sort: bool = True, group_keys: bool = True, observed: bool = False, dropna: bool = True, ): # pragma: no cover # noqa: D102 if axis is not lib.no_default: axis = self._get_axis_number(axis) if axis == 1: warnings.warn( "DataFrame.groupby with axis=1 is deprecated. Do " "`frame.T.groupby(...)` without axis instead.", FutureWarning, stacklevel=find_stack_level(), ) else: warnings.warn( "The 'axis' keyword in DataFrame.groupby is deprecated " "and will be removed in a future version.", FutureWarning, stacklevel=find_stack_level(), ) else: axis = 0 if level is None and by is None: raise TypeError("You have to supply one of 'by' and 'level'") return WeightedDataFrameGroupBy( obj=self, keys=by, axis=axis, level=level, as_index=as_index, sort=sort, group_keys=group_keys, observed=observed, dropna=dropna, )
for cls in [WeightedDataFrame, WeightedSeries, WeightedGroupBy, WeightedDataFrameGroupBy, WeightedSeriesGroupBy]: adjust_docstrings(cls, r'\bDataFrame\b', 'WeightedDataFrame') adjust_docstrings(cls, r'\bDataFrames\b', 'WeightedDataFrames') adjust_docstrings(cls, r'\bSeries\b', 'WeightedSeries') adjust_docstrings(cls, 'core', 'pandas.core') adjust_docstrings(cls, 'pandas.core.window.Rolling.quantile', 'pandas.core.window.rolling.Rolling.quantile') adjust_docstrings(cls, r'\bDataFrameGroupBy\b', 'WeightedDataFrameGroupBy') adjust_docstrings(cls, r'\bSeriesGroupBy\b', 'WeightedSeriesGroupBy') adjust_docstrings(WeightedDataFrame, 'resample', 'pandas.DataFrame.resample') adjust_docstrings(WeightedSeries, 'resample', 'pandas.Series.resample')