Source code for anesthetic.labelled_pandas

"""Pandas DataFrame and Series with labelled columns."""
from pandas import Series, DataFrame, MultiIndex
from pandas.core.indexing import (_LocIndexer as _LocIndexer_,
                                  _AtIndexer as _AtIndexer_)
import numpy as np
from functools import cmp_to_key
from pandas.errors import IndexingError
import pandas as pd


[docs] def read_csv(filename, *args, **kwargs): """Read a CSV file into a ``LabelledDataFrame``.""" df = pd.read_csv(filename, index_col=[0, 1], header=[0, 1], *args, **kwargs) ldf = LabelledDataFrame(df) if ldf.islabelled(0) and ldf.islabelled(1): return ldf df = pd.read_csv(filename, index_col=[0, 1], *args, **kwargs) ldf = LabelledDataFrame(df) if ldf.islabelled(0): return ldf df = pd.read_csv(filename, index_col=0, header=[0, 1], *args, **kwargs) ldf = LabelledDataFrame(df) if ldf.islabelled(1): return ldf df = pd.read_csv(filename, index_col=0, *args, **kwargs) return LabelledDataFrame(df)
[docs] def ac(funcs, *args): """Accessor function helper. Given a list of callables `funcs`, and their arguments `*args`, evaluate each of these, catching exceptions, and then sort results by their dimensionality, smallest first. Return the non-exceptional result with the smallest dimensionality. """ results = [] errors = [] for f, l in funcs: try: results.append((f(*args), l)) except (KeyError, ValueError, TypeError, IndexingError) as e: errors.append(e) def cmp(X, Y): x, _ = X y, _ = Y if x.ndim > y.ndim: return 1 elif x.ndim < y.ndim: return -1 else: x_levels = 0 y_levels = 0 if x.ndim > 0: x_levels += x.index.nlevels y_levels += y.index.nlevels if x.ndim > 1: x_levels += x.columns.nlevels y_levels += y.columns.nlevels if x_levels < y_levels: return 1 elif x_levels > y_levels: return -1 else: return 0 results.sort(key=cmp_to_key(cmp)) for s, l in results: if s is not None: if hasattr(s, "name"): try: if l[s.name]: s.name = l[s.name] except (TypeError, KeyError): pass return s raise errors[-1]
class _LocIndexer(_LocIndexer_): def __getitem__(self, key): return ac([(_LocIndexer_("loc", super(_LabelledObject, self.obj.drop_labels(i)) ).__getitem__, self.obj.get_labels_map(i)) for i in self.obj._all_axes()], key) class _AtIndexer(_AtIndexer_): def __getitem__(self, key): return ac([(_AtIndexer_("at", super(_LabelledObject, self.obj.drop_labels(i)) ).__getitem__, self.obj.get_labels_map(i)) for i in self.obj._all_axes()], key)
[docs] class _LabelledObject(object): """Common methods for `LabelledSeries` and `LabelledDataFrame`. :meta public: """ def __init__(self, *args, **kwargs): if not hasattr(self, '_labels'): self._labels = ("labels", "labels") labels = kwargs.pop(self._labels[0], None) super().__init__(*args, **kwargs) if labels is not None: self.set_labels(labels, inplace=True)
[docs] def islabelled(self, axis=0): """Search for existence of labels.""" intersection = set(self._labels) & set(self._get_axis(axis).names) return min(intersection) if intersection else False
[docs] def get_labels(self, axis=0): """Retrieve labels from an axis.""" labs = self.islabelled(axis) if labs: return self._get_axis(axis).get_level_values(labs).to_numpy() else: return None
[docs] def get_labels_map(self, axis=0, fill=True): """Retrieve mapping from paramnames to labels from an axis.""" try: labs = self.islabelled(axis) index = self._get_axis(axis) if labs: labels_map = index.to_frame().droplevel(labs)[labs] if fill: replacement = labels_map.loc[labels_map == ''].index labels_map.loc[labels_map == ''] = replacement.astype( labels_map.loc[labels_map != ''].dtype) return labels_map else: return index.to_series() except (ValueError, TypeError): return None
[docs] def get_label(self, param, axis=0): """Retrieve mapping from paramnames to labels from an axis.""" return self.get_labels_map(axis)[param]
[docs] def set_label(self, param, value, axis=0, inplace=False): """Set a specific label to a specific value on an axis.""" labels = self.get_labels_map(axis, fill=False) labels[param] = value return self.set_labels(labels, axis=axis, inplace=inplace)
[docs] def drop_labels(self, axis=0): """Drop the labels from an axis if present.""" axes = np.atleast_1d(axis) result = self.copy() for axis in axes: if axis is not None and self.islabelled(axis): result = result.droplevel(self.islabelled(axis), axis) return result.__finalize__(self, "drop_labels")
def _all_axes(self): if isinstance(self, LabelledSeries): return [0, None] else: return [0, 1, [0, 1], None] @property def loc(self): return _LocIndexer("loc", self) @property def at(self): return _AtIndexer("at", self)
[docs] def xs(self, key, axis=0, level=None, drop_level=True): return ac([(super(_LabelledObject, self.drop_labels(i)).xs, self.get_labels_map(i)) for i in self._all_axes()], key, axis, level, drop_level)
def __getitem__(self, key): return ac([(super(_LabelledObject, self.drop_labels(i)).__getitem__, self.get_labels_map(i)) for i in self._all_axes()], key)
[docs] def set_labels(self, labels, axis=0, inplace=False, level=None): """Set labels along an axis.""" if inplace: result = self else: result = self.copy() labs = result.islabelled(axis) if labels is None: if labs: result = result.drop_labels(axis) else: names = [n for n in result._get_axis(axis).names if n != labs] index = [result._get_axis(axis).get_level_values(n) for n in names] if level is None: if labs: level = result._get_axis(axis).names.index(labs) names.insert(level, labs) else: level = len(index) names.insert(level, result._labels[axis]) index.insert(level, labels) index = MultiIndex.from_arrays(index, names=names) result = result.set_axis(index, axis=axis, copy=False) if inplace: self._update_inplace(result) else: return result.__finalize__(self, "set_labels")
[docs] def reset_index(self, level=None, drop=False, inplace=False, *args, **kwargs): labels = self.get_labels() answer = super().reset_index(level=level, drop=drop, inplace=False, *args, **kwargs) answer.set_labels(labels, inplace=True) if inplace: self._update_inplace(answer) else: return answer.__finalize__(self, "reset_index")
[docs] class LabelledSeries(_LabelledObject, Series): """Labelled version of :class:`pandas.Series`.""" _metadata = Series._metadata + ['_labels'] @property def _constructor(self): return LabelledSeries @property def _constructor_expanddim(self): return LabelledDataFrame
[docs] class LabelledDataFrame(_LabelledObject, DataFrame): """Labelled version of :class:`pandas.DataFrame`.""" _metadata = DataFrame._metadata + ['_labels'] @property def _constructor(self): return LabelledDataFrame @property def _constructor_sliced(self): return LabelledSeries
[docs] def transpose(self, copy=False): # noqa: D102 result = super().transpose(copy=copy) result._labels = result._labels[::-1] return result
T = property( transpose, doc=DataFrame.transpose.__doc__ )