"""Pandas DataFrame and Series with weighted samples."""
import warnings
from inspect import signature
import numpy as np
from pandas import Series, DataFrame, concat, MultiIndex
from pandas.core.groupby import GroupBy, SeriesGroupBy, DataFrameGroupBy, ops
from pandas._libs import lib
from pandas._libs.lib import no_default
from pandas.util._exceptions import find_stack_level
from pandas.util import hash_pandas_object
from anesthetic.utils import (compress_weights, neff, quantile,
temporary_seed, adjust_docstrings,
var_unbiased, cov_unbiased, skew_unbiased,
kurt_unbiased, credibility_interval)
from pandas.core.accessor import CachedAccessor
from anesthetic.plotting import PlotAccessor
import pandas as pd
[docs]
def read_csv(filename, *args, **kwargs):
"""Read a CSV file into a ``WeightedDataFrame``."""
df = pd.read_csv(filename, index_col=[0, 1], header=[0, 1],
*args, **kwargs)
wdf = WeightedDataFrame(df)
if wdf.isweighted(0) and wdf.isweighted(1):
wdf.set_weights(wdf.get_weights(axis=1).astype(float),
axis=1, inplace=True)
return wdf
df = pd.read_csv(filename, index_col=[0, 1], *args, **kwargs)
wdf = WeightedDataFrame(df)
if wdf.isweighted(0):
return wdf
df = pd.read_csv(filename, index_col=0, header=[0, 1], *args, **kwargs)
wdf = WeightedDataFrame(df)
if wdf.isweighted(1):
wdf.set_weights(wdf.get_weights(axis=1).astype(float),
axis=1, inplace=True)
return wdf
df = pd.read_csv(filename, index_col=0, *args, **kwargs)
return WeightedDataFrame(df)
[docs]
class WeightedGroupBy(GroupBy):
"""Weighted version of ``pandas.core.groupby.GroupBy``."""
_grouper: ops.BaseGrouper
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _add_weights(self, name, *args, **kwargs):
result = self.agg(lambda df: getattr(self.obj._constructor(df), name)
(*args, **kwargs)).set_weights(self.get_weights())
return result.__finalize__(self.obj, method="groupby")
[docs]
def mean(self, **kwargs): # noqa: D102
return self._add_weights("mean", **kwargs)
[docs]
def std(self, **kwargs): # noqa: D102
return self._add_weights("std", **kwargs)
[docs]
def var(self, **kwargs): # noqa: D102
return self._add_weights("var", **kwargs)
[docs]
def kurt(self, **kwargs): # noqa: D102
return self._add_weights("kurt", **kwargs)
[docs]
def kurtosis(self, **kwargs): # noqa: D102
return self._add_weights("kurtosis", **kwargs)
[docs]
def sem(self, **kwargs): # noqa: D102
return self._add_weights("sem", **kwargs)
[docs]
def skew(self, **kwargs): # noqa: D102
return self._add_weights("skew", **kwargs)
[docs]
def quantile(self, *args, **kwargs): # noqa: D102
return self._add_weights("quantile", *args, **kwargs)
[docs]
def get_weights(self):
"""Return the weights of the grouped samples."""
return self.agg(lambda df: df.get_weights().sum())
def _op_via_apply(self, name, *args, **kwargs):
result = super()._op_via_apply(name, *args, **kwargs)
try:
index = result.index.get_level_values(self.keys)
weights = self.get_weights()[index]
except KeyError:
weights = self.get_weights()
return result.set_weights(weights, level=1)
[docs]
class WeightedSeriesGroupBy(WeightedGroupBy, SeriesGroupBy):
"""Weighted version of ``pandas.core.groupby.SeriesGroupBy``."""
[docs]
def sample(self, *args, **kwargs): # noqa: D102
return super().sample(weights=self.obj.get_weights(), *args, **kwargs)
[docs]
def cov(self, *args, **kwargs): # noqa: D102
return self._op_via_apply("cov", *args, **kwargs)
[docs]
class WeightedDataFrameGroupBy(WeightedGroupBy, DataFrameGroupBy):
"""Weighted version of ``pandas.core.groupby.DataFrameGroupBy``."""
[docs]
def get_weights(self):
"""Return the weights of the grouped samples."""
return super().get_weights().min(axis=1)
def _gotitem(self, key, ndim: int, subset=None): # pragma: no cover
if ndim == 2:
if subset is None:
subset = self.obj
return WeightedDataFrameGroupBy(
subset,
self._grouper,
level=self.level,
grouper=self._grouper,
exclusions=self.exclusions,
selection=key,
as_index=self.as_index,
sort=self.sort,
group_keys=self.group_keys,
observed=self.observed,
dropna=self.dropna,
)
elif ndim == 1:
if subset is None:
subset = self.obj[key]
return WeightedSeriesGroupBy(
subset,
level=self.level,
grouper=self._grouper,
selection=key,
sort=self.sort,
group_keys=self.group_keys,
observed=self.observed,
dropna=self.dropna,
)
raise AssertionError("invalid ndim for _gotitem")
[docs]
def sample(self, *args, **kwargs): # noqa: D102
return super().sample(weights=self.obj.get_weights(), *args, **kwargs)
[docs]
def cov(self, *args, **kwargs): # noqa: D102
return self._op_via_apply("cov", *args, **kwargs)
[docs]
class _WeightedObject(object):
"""Common methods for `WeightedSeries` and `WeightedDataFrame`.
:meta public:
"""
def __init__(self, *args, **kwargs):
weights = kwargs.pop('weights', None)
super().__init__(*args, **kwargs)
if weights is not None:
self.set_weights(weights, inplace=True)
plot = CachedAccessor("plot", PlotAccessor)
""":meta private:"""
[docs]
def isweighted(self, axis=0):
"""Determine if weights are actually present."""
return 'weights' in self._get_axis(axis).names
[docs]
def get_weights(self, axis=0):
"""Retrieve sample weights from an axis."""
if self.isweighted(axis):
return self._get_axis(axis).get_level_values('weights').to_numpy()
else:
return np.ones_like(self._get_axis(axis), dtype=int)
[docs]
def drop_weights(self, axis=0):
"""Drop weights."""
if self.isweighted(axis):
return self.droplevel('weights', axis)
return self.copy().__finalize__(self, "drop_weights")
[docs]
def set_weights(self, weights, axis=0, inplace=False, level=None):
"""Set sample weights along an axis.
Parameters
----------
weights : 1d array-like
The sample weights to put in an index.
axis : int (0,1), default=0
Whether to put weights in an index or column.
inplace : bool, default=False
Whether to operate inplace, or return a new array.
level : int
Which level in the index to insert before.
Defaults to inserting at back
"""
if inplace:
result = self
else:
result = self.copy()
if weights is None:
if result.isweighted(axis=axis):
result = result.drop_weights(axis)
else:
names = [n for n in result._get_axis(axis).names if n != 'weights']
index = [result._get_axis(axis).get_level_values(n) for n in names]
if level is None:
if result.isweighted(axis):
level = result._get_axis(axis).names.index('weights')
else:
level = len(index)
index.insert(level, weights)
names.insert(level, 'weights')
index = MultiIndex.from_arrays(index, names=names)
result = result.set_axis(index, axis=axis)
if inplace:
self._update_inplace(result)
else:
return result.__finalize__(self, "set_weights")
def _rand(self, axis=0):
"""Random number for consistent compression."""
seed = hash_pandas_object(self._get_axis(axis)).sum() % 2**32
with temporary_seed(int(seed)):
return np.random.rand(self.shape[axis])
def _weighted_stat(self, func, axis=0, skipna=True, **kwargs):
"""Compute weighted statistics using common pattern."""
if not self.isweighted(axis):
# Get the calling method name automatically
method_name = func.__name__
# Check if the method exists in pandas DataFrame
if hasattr(super(), method_name):
return getattr(super(), method_name)(axis=axis, skipna=skipna,
**kwargs)
if self.get_weights(axis).sum() == 0:
return self._constructor_sliced(np.nan,
index=self._get_axis(1-axis))
na = self.isna() & skipna
weights = np.broadcast_to(
self.get_weights(axis)[..., None] if axis == 0 else
self.get_weights(axis)[None, ...],
self.shape
)
if skipna:
weights = np.ma.array(weights, mask=na)
result = np.ma.filled(
func(self, na=na, w=weights, axis=axis, skipna=skipna, **kwargs),
np.nan
)
return self._constructor_sliced(result, index=self._get_axis(1-axis))
[docs]
def reset_index(self, level=None, drop=False, inplace=False,
*args, **kwargs):
"""Reset the index, retaining weights."""
weights = self.get_weights()
answer = super().reset_index(level=level, drop=drop,
inplace=False, *args, **kwargs)
answer.set_weights(weights, inplace=True)
if inplace:
self._update_inplace(answer)
else:
return answer.__finalize__(self, "reset_index")
[docs]
def neff(self, axis=0, beta=1):
"""Effective number of samples."""
if self.isweighted(axis):
return neff(self.get_weights(axis), beta=beta)
else:
return self.shape[axis]
[docs]
class WeightedSeries(_WeightedObject, Series):
"""Weighted version of :class:`pandas.Series`."""
[docs]
def mean(self, skipna=True): # noqa: D102
na = self.isna() & skipna
weights = self.get_weights()
if skipna:
weights = np.ma.array(self.get_weights(), mask=na)
if weights.sum() == 0 or skipna and na.all():
return np.nan
return np.average(np.ma.array(self, mask=na), weights=weights)
[docs]
def std(self, skipna=True, **kwargs): # noqa: D102
return np.sqrt(self.var(skipna=skipna, **kwargs))
[docs]
def kurtosis(self, **kwargs): # noqa: D102
return self.kurt(**kwargs)
[docs]
def var(self, skipna=True, **kwargs): # noqa: D102
na = self.isna() & skipna
w = np.ma.array(self.get_weights(), mask=na)
if na.all() or self.isna().any() and not skipna or w.sum() == 0:
return np.float64(np.nan)
return var_unbiased(np.ma.array(self, mask=na), w, **kwargs)
[docs]
def cov(self, other, ddof=1, **kwargs): # noqa: D102
w = self.get_weights()
x, y = self.align(other, join="inner")
if len(x) == 0:
return np.nan
valid = x.notna() & y.notna()
x = x[valid]
y = y[valid]
w = w[valid]
if len(x) == 0 or w.sum() == 0:
return np.nan
X = np.column_stack((x.to_numpy(dtype=float), y.to_numpy(dtype=float)))
return cov_unbiased(X, w, ddof=ddof)[0, 1]
[docs]
def corr(self, other, **kwargs): # noqa: D102
if not self.isweighted():
return super().corr(other, **kwargs)
if self.isna().all():
return np.nan
norm = (self.std(skipna=True, ddof=1) *
other.std(skipna=True, ddof=1))
if norm == 0:
return np.float64(np.nan)
return self.cov(other, ddof=1) / norm
[docs]
def kurt(self, skipna=True, **kwargs): # noqa: D102
if self.isna().all() or self.isna().any() and not skipna:
return np.nan if skipna or self.size == 1 else np.float64(np.nan)
na = self.isna() & skipna
w = np.ma.array(self.get_weights(), mask=na)
return kurt_unbiased(np.ma.array(self, mask=na), w)
[docs]
def skew(self, skipna=True, **kwargs): # noqa: D102
if self.isna().all() or self.isna().any() and not skipna:
return np.nan if skipna or self.size == 1 else np.float64(np.nan)
na = self.isna() & skipna
w = np.ma.array(self.get_weights(), mask=na)
return skew_unbiased(np.ma.array(self, mask=na), w)
[docs]
def sem(self, skipna=True, ddof=1, **kwargs): # noqa: D102
na = self.isna() & skipna
w = np.ma.array(self.get_weights(), mask=na)
V1 = w.sum()
if np.issubdtype(w.dtype, np.integer) and V1 > 1:
# frequency weights
n = np.ma.filled(V1, np.nan)
else:
# reliability weights
n = np.ma.filled(V1**2 / (w**2).sum(), np.nan)
return np.sqrt(self.var(skipna=skipna, ddof=ddof, **kwargs) / n)
[docs]
def quantile(self, q=0.5, interpolation='linear'): # noqa: D102
if self.get_weights().sum() == 0:
return np.nan
return quantile(self.to_numpy(), q, self.get_weights(), interpolation)
[docs]
def compress(self, ncompress=True, weighted=False):
"""Reduce the number of samples by discarding low-weights.
Parameters
----------
ncompress : int, float, str, default=True
Degree of compression.
* If ``True`` (default): reduce to the channel capacity
(theoretical optimum compression), equivalent to
``ncompress='entropy'``.
* If ``> 0``: desired number of samples after compression.
* If ``<= 0``: compress so that all remaining weights are unity.
* If ``str``: determine number from the Huggins-Roy family of
effective samples in :func:`anesthetic.utils.neff`
with ``beta=ncompress``.
weighted : bool, default=False
If False (default), return an unweighted object with potentially
repeated samples.
If True, return a weighted object with non-zero compressed weights.
"""
if (not self.isweighted() and isinstance(ncompress, (bool, str))
or ncompress is False):
return self
w = compress_weights(self.get_weights(), self._rand(), ncompress)
if weighted:
mask = w > 0
return self.drop_weights()[mask].set_weights(w[mask])
else:
return self.drop_weights().repeat(w)
[docs]
def sample(self, *args, **kwargs): # noqa: D102
return super().sample(weights=self.get_weights(), *args, **kwargs)
[docs]
def credibility_interval(self, level=0.68, method="iso-pdf",
return_covariance=False, nsamples=12):
"""Compute the credibility interval of the weighted samples.
Based on linear interpolation of the cumulative density function, thus
expect discretisation errors on the scale of distances between samples.
https://github.com/Stefan-Heimersheim/fastCI#readme
Parameters
----------
level : float, default=0.68
Credibility level (probability, <1).
method : str, default='iso-pdf'
Which definition of interval to use:
* ``'iso-pdf'``: Calculate iso probability density interval with
the same probability density at each end. Also known as
waterline-interval or highest average posterior density interval.
This is only accurate if the distribution is sufficiently
uni-modal.
* ``'lower-limit'``/``'upper-limit'``: Lower/upper limit. One-sided
limits for which ``level`` fraction of the (equally weighted)
samples lie above/below the limit.
* ``'equal-tailed'``: Equal-tailed interval with the same fraction
of (equally weighted) samples below and above the interval
region.
return_covariance: bool, default=False
Return the covariance of the sampled limits, in addition to the
mean
nsamples : int, default=12
Number of CDF samples to improve `mean` and `std` estimate.
Returns
-------
limit(s) : float, array, or tuple of floats or arrays
Returns the credibility interval boundaries of the Series.
By default, returns the mean over ``nsamples`` samples, which is
either two numbers (``method='iso-pdf'``/``'equal-tailed'``) or
one number (``method='lower-limit'``/``'upper-limit'``). If
``return_covariance=True``, returns a tuple (mean(s), covariance)
where covariance is the covariance over the sampled limits.
"""
return credibility_interval(self, weights=self.get_weights(),
level=level, method=method,
return_covariance=return_covariance,
nsamples=nsamples)
@property
def _constructor(self):
return WeightedSeries
@property
def _constructor_expanddim(self):
return WeightedDataFrame
[docs]
def groupby(
self,
by=None,
axis=0,
level=None,
as_index=True,
sort=True,
group_keys=True,
observed=False,
dropna=True,
): # pragma: no cover # noqa: D102
if level is None and by is None:
raise TypeError("You have to supply one of 'by' and 'level'")
if not as_index:
raise TypeError("as_index=False only valid with DataFrame")
axis = self._get_axis_number(axis)
return WeightedSeriesGroupBy(
obj=self,
keys=by,
level=level,
as_index=as_index,
sort=sort,
group_keys=group_keys,
observed=observed,
dropna=dropna,
)
[docs]
class WeightedDataFrame(_WeightedObject, DataFrame):
"""Weighted version of :class:`pandas.DataFrame`."""
[docs]
def mean(self, axis=0, skipna=True, **kwargs): # noqa: D102
def mean(data, na, w, axis, skipna, **kwargs):
if skipna:
data = np.ma.array(data, mask=na)
return np.average(data, weights=w, axis=axis)
return self._weighted_stat(mean, axis, skipna, **kwargs)
[docs]
def std(self, axis=0, skipna=True, **kwargs): # noqa: D102
return np.sqrt(self.var(axis=axis, skipna=skipna, **kwargs))
[docs]
def kurtosis(self, **kwargs): # noqa: D102
return self.kurt(**kwargs)
[docs]
def var(self, axis=0, skipna=True, **kwargs): # noqa: D102
def var(data, na, w, axis, skipna, **kwargs):
if skipna:
data = np.ma.array(data, mask=na)
return var_unbiased(data, w, axis=axis, ddof=kwargs.pop('ddof', 1))
return self._weighted_stat(var, axis, skipna, **kwargs)
[docs]
def cov(self, ddof=1, **kwargs): # noqa: D102
if kwargs:
raise TypeError(f"WeightedDataFrame.cov() got unexpected keyword "
f"arguments {kwargs}")
if not self.isweighted():
return super().cov(ddof=ddof)
cov = cov_unbiased(self, self.get_weights(), ddof=ddof)
return self._constructor(cov, index=self.columns, columns=self.columns)
[docs]
def corr(self, **kwargs): # noqa: D102
if not self.isweighted():
return super().corr(**kwargs)
corr = cov_unbiased(self, self.get_weights(), ddof=1, return_corr=True)
return self._constructor(corr, index=self.columns,
columns=self.columns)
[docs]
def corrwith(self, other, axis=0, drop=False, **kwargs): # noqa: D102
axis = self._get_axis_number(axis)
if not self.isweighted(axis):
return super().corrwith(other, drop=drop, axis=axis, **kwargs)
else:
if isinstance(other, Series):
answer = self.apply(lambda x: other.corr(x, **kwargs),
axis=axis)
return self._constructor_sliced(answer)
left, right = self.align(other, join="inner")
if axis == 1:
left = left.T
right = right.T
weights = left.index.to_frame()['weights']
weights, _ = weights.align(right, join="inner")
# mask missing values
left = left + right * 0
right = right + left * 0
# demeaned data
ldem = left - left.mean()
rdem = right - right.mean()
ddof = kwargs.pop('ddof', 0)
num = (ldem * rdem * weights.to_numpy()[:, None]).sum()
dom = weights.sum() * left.std(ddof=ddof) * right.std(ddof=ddof)
correl = num / dom
if not drop:
# Find non-matching labels along the given axis
result_index = self._get_axis(1-axis).union(
other._get_axis(1-axis)
)
idx_diff = result_index.difference(correl.index)
if len(idx_diff) > 0:
correl = concat([
correl,
Series([np.nan] * len(idx_diff), index=idx_diff)
])
return self._constructor_sliced(correl)
[docs]
def kurt(self, axis=0, skipna=True, **kwargs): # noqa: D102
def kurt(data, na, w, axis, skipna, **kwargs):
if skipna:
data = np.ma.array(data, mask=na)
return kurt_unbiased(data, w, axis=axis)
return self._weighted_stat(kurt, axis, skipna, **kwargs)
[docs]
def skew(self, axis=0, skipna=True, **kwargs): # noqa: D102
def skew(data, na, w, axis, skipna, **kwargs):
if skipna:
data = np.ma.array(data, mask=na)
return skew_unbiased(data, w, axis=axis)
return self._weighted_stat(skew, axis, skipna, **kwargs)
[docs]
def sem(self, axis=0, skipna=True, **kwargs): # noqa: D102
def sem(data, na, w, axis, skipna, **kwargs):
V1 = w.sum(axis=axis)
if np.issubdtype(w.dtype, np.integer) and np.all(V1 > 1):
# frequency weights
n = V1
else:
# reliability weights
n = V1**2 / (w**2).sum(axis=axis)
return np.sqrt(self.var(axis=axis, skipna=skipna, **kwargs) / n)
return self._weighted_stat(sem, axis, skipna, **kwargs)
[docs]
def quantile(self, q=0.5, axis=0, numeric_only=None,
interpolation='linear', method=None): # noqa: D102
if self.isweighted(axis):
if numeric_only is not None or method is not None:
raise NotImplementedError(
"`numeric_only` and `method` kwargs not implemented for "
"`WeightedSeries` and `WeightedDataFrame`."
)
data = np.array([c.quantile(q=q, interpolation=interpolation)
for _, c in self.items()])
if np.isscalar(q):
return self._constructor_sliced(data,
index=self._get_axis(1-axis))
else:
return self._constructor(data.T, index=q,
columns=self._get_axis(1-axis))
else:
if numeric_only is None:
numeric_only = True
if method is None:
method = 'single'
return super().quantile(q=q, axis=axis, numeric_only=numeric_only,
interpolation=interpolation, method=method)
[docs]
def compress(self, ncompress=True, axis=0, weighted=False):
"""Reduce the number of samples by discarding low-weights.
Parameters
----------
ncompress : int, float, str, default=True
Degree of compression.
* If ``True`` (default): reduce to the channel capacity
(theoretical optimum compression), equivalent to
``ncompress='entropy'``.
* If ``> 0``: desired number of samples after compression.
* If ``<= 0``: compress so that all remaining weights are unity.
* If ``str``: determine number from the Huggins-Roy family of
effective samples in :func:`anesthetic.utils.neff`
with ``beta=ncompress``.
weighted : bool, default=False
If False (default), return an unweighted object with potentially
repeated samples.
If True, return a weighted object with non-zero compressed weights.
"""
if (not self.isweighted(axis) and isinstance(ncompress, (bool, str))
or ncompress is False):
return self
w = compress_weights(self.get_weights(axis), self._rand(axis),
ncompress)
df = self.drop_weights(axis)
if weighted:
indices = np.flatnonzero(w > 0)
df = df.take(indices, axis=axis)
return df.set_weights(w[indices], axis=axis)
else:
indices = np.repeat(np.arange(df.shape[axis]), w)
return df.take(indices, axis=axis)
[docs]
def sample(self, *args, **kwargs): # noqa: D102
sig = signature(DataFrame.sample)
axis = sig.bind(self, *args, **kwargs).arguments.get('axis', 0)
if self.isweighted(axis):
return super().sample(weights=self.get_weights(axis),
*args, **kwargs)
else:
return super().sample(*args, **kwargs)
[docs]
def credibility_interval(self, level=0.68, method="iso-pdf",
return_covariance=False, nsamples=12):
"""Compute the credibility interval of the weighted samples.
Based on linear interpolation of the cumulative density function, thus
expect discretisation errors on the scale of distances between samples.
https://github.com/Stefan-Heimersheim/fastCI#readme
Parameters
----------
level : float, default=0.68
Credibility level (probability, <1).
method : str, default='iso-pdf'
Which definition of interval to use:
* ``'iso-pdf'``: Calculate iso probability density interval with
the same probability density at each end. Also known as
waterline-interval or highest average posterior density interval.
This is only accurate if the distribution is sufficiently
uni-modal.
* ``'lower-limit'``/``'upper-limit'``: Lower/upper limit. One-sided
limits for which ``level`` fraction of the (equally weighted)
samples lie above/below the limit.
* ``'equal-tailed'``: Equal-tailed interval with the same fraction
of (equally weighted) samples below and above the interval
region.
return_covariance: bool, default=False
Return the covariance of the sampled limits, in addition to the
mean
nsamples : int, default=12
Number of CDF samples to improve `mean` and `std` estimate.
Returns
-------
limit(s) : float, array, or tuple of floats or arrays
Returns the credibility interval boundaries for each column.
By default, returns the mean over ``nsamples`` samples, which is
either two numbers (``method='iso-pdf'``/``'equal-tailed'``) or
one number (``method='lower-limit'``/``'upper-limit'``). If
``return_covariance=True``, returns a tuple (means, covariances)
where covariances are the covariance over the sampled limits for
each column.
"""
if 'lower' in method:
limits = ['lower']
elif 'upper' in method:
limits = ['upper']
else:
limits = ['lower', 'upper']
cis = [credibility_interval(self[col], weights=self.get_weights(),
level=level, method=method,
return_covariance=return_covariance,
nsamples=nsamples) for col in self.columns]
if return_covariance:
cis, covs = zip(*cis)
mulidx = MultiIndex.from_product([
self.columns.get_level_values(level=0),
limits
])
ncol = len(self.columns)
nlim = len(limits)
covs = np.asarray(covs).reshape(nlim*ncol, nlim).T
covs = DataFrame(covs, index=limits, columns=mulidx)
cis = np.atleast_2d(cis) if 'limit' in method else np.asarray(cis).T
cis = DataFrame(data=cis, index=limits, columns=self.columns)
if return_covariance:
return cis, covs
else:
return cis
@property
def _constructor_sliced(self):
return WeightedSeries
@property
def _constructor(self):
return WeightedDataFrame
[docs]
def groupby(
self,
by=None,
axis=no_default,
level=None,
as_index: bool = True,
sort: bool = True,
group_keys: bool = True,
observed: bool = False,
dropna: bool = True,
): # pragma: no cover # noqa: D102
if axis is not lib.no_default:
axis = self._get_axis_number(axis)
if axis == 1:
warnings.warn(
"DataFrame.groupby with axis=1 is deprecated. Do "
"`frame.T.groupby(...)` without axis instead.",
FutureWarning,
stacklevel=find_stack_level(),
)
else:
warnings.warn(
"The 'axis' keyword in DataFrame.groupby is deprecated "
"and will be removed in a future version.",
FutureWarning,
stacklevel=find_stack_level(),
)
else:
axis = 0
if level is None and by is None:
raise TypeError("You have to supply one of 'by' and 'level'")
return WeightedDataFrameGroupBy(
obj=self,
keys=by,
level=level,
as_index=as_index,
sort=sort,
group_keys=group_keys,
observed=observed,
dropna=dropna,
)
for cls in [WeightedDataFrame, WeightedSeries, WeightedGroupBy,
WeightedDataFrameGroupBy, WeightedSeriesGroupBy]:
adjust_docstrings(cls, r'\bDataFrame\b', 'WeightedDataFrame')
adjust_docstrings(cls, r'\bDataFrames\b', 'WeightedDataFrames')
adjust_docstrings(cls, r'\bSeries\b', 'WeightedSeries')
adjust_docstrings(cls, r'\bDataFrameGroupBy\b', 'WeightedDataFrameGroupBy')
adjust_docstrings(cls, r'\bSeriesGroupBy\b', 'WeightedSeriesGroupBy')
adjust_docstrings(cls, 'core.window.ewm', 'pandas.api.typing')
adjust_docstrings(cls, 'core.window.expanding', 'pandas.api.typing')
adjust_docstrings(cls, 'core.window.rolling', 'pandas.api.typing')
adjust_docstrings(cls, 'core.window', 'pandas.api.typing')
adjust_docstrings(WeightedDataFrame, 'resample', 'pandas.DataFrame.resample')
adjust_docstrings(WeightedSeries, 'resample', 'pandas.Series.resample')