in sdks/python/apache_beam/dataframe/frames.py [0:0]
def groupby(self, by, level, axis, as_index, group_keys, **kwargs):
if not as_index:
raise NotImplementedError('groupby(as_index=False)')
if not group_keys:
raise NotImplementedError('groupby(group_keys=False)')
if axis in (1, 'columns'):
return _DeferredGroupByCols(
expressions.ComputedExpression(
'groupbycols',
lambda df: df.groupby(by, axis=axis, **kwargs), [self._expr],
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary()))
if level is None and by is None:
raise TypeError("You have to supply one of 'by' and 'level'")
elif level is not None:
if isinstance(level, (list, tuple)):
grouping_indexes = level
else:
grouping_indexes = [level]
grouping_columns = []
index = self._expr.proxy().index
# Translate to level numbers only
grouping_indexes = [
l if isinstance(l, int) else index.names.index(l)
for l in grouping_indexes
]
if index.nlevels == 1:
to_group_with_index = self._expr
to_group = self._expr
else:
levels_to_drop = [
i for i in range(index.nlevels) if i not in grouping_indexes
]
# Reorder so the grouped indexes are first
to_group_with_index = self.reorder_levels(
grouping_indexes + levels_to_drop)
grouping_indexes = list(range(len(grouping_indexes)))
levels_to_drop = list(range(len(grouping_indexes), index.nlevels))
if levels_to_drop:
to_group = to_group_with_index.droplevel(levels_to_drop)._expr
else:
to_group = to_group_with_index._expr
to_group_with_index = to_group_with_index._expr
elif callable(by):
def map_index(df):
df = df.copy()
df.index = df.index.map(by)
return df
to_group = expressions.ComputedExpression(
'map_index',
map_index, [self._expr],
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Singleton())
orig_nlevels = self._expr.proxy().index.nlevels
to_group_with_index = expressions.ComputedExpression(
'map_index_keep_orig',
lambda df: df.set_index([df.index.map(by), df.index]),
[self._expr],
requires_partition_by=partitionings.Arbitrary(),
# Partitioning by the original indexes is preserved
preserves_partition_by=partitionings.Index(
list(range(1, orig_nlevels + 1))))
grouping_columns = []
# The index we need to group by is the last one
grouping_indexes = [0]
elif isinstance(by, DeferredSeries):
raise NotImplementedError(
"grouping by a Series is not yet implemented. You can group by a "
"DataFrame column by specifying its name.")
elif isinstance(by, np.ndarray):
raise frame_base.WontImplementError(
"Grouping by a concrete ndarray is order sensitive.",
reason="order-sensitive")
elif isinstance(self, DeferredDataFrame):
if not isinstance(by, list):
by = [by]
# Find the columns that we need to move into the index so we can group by
# them
column_names = self._expr.proxy().columns
grouping_columns = list(set(by).intersection(column_names))
index_names = self._expr.proxy().index.names
for label in by:
if label not in index_names and label not in self._expr.proxy().columns:
raise KeyError(label)
grouping_indexes = list(set(by).intersection(index_names))
if grouping_indexes:
if set(by) == set(index_names):
to_group = self._expr
elif set(by).issubset(index_names):
to_group = self.droplevel(index_names.difference(by))._expr
else:
to_group = self.reset_index(grouping_indexes).set_index(by)._expr
else:
to_group = self.set_index(by)._expr
if grouping_columns:
# TODO(BEAM-11711): It should be possible to do this without creating an
# expression manually, by using DeferredDataFrame.set_index, i.e.:
# to_group_with_index = self.set_index([self.index] +
# grouping_columns)._expr
to_group_with_index = expressions.ComputedExpression(
'move_grouped_columns_to_index',
lambda df: df.set_index([df.index] + grouping_columns),
[self._expr],
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Index(
list(range(self._expr.proxy().index.nlevels))))
else:
to_group_with_index = self._expr
else:
raise NotImplementedError(by)
return DeferredGroupBy(
expressions.ComputedExpression(
'groupbyindex',
lambda df: df.groupby(
level=list(range(df.index.nlevels)), **kwargs), [to_group],
requires_partition_by=partitionings.Index(),
preserves_partition_by=partitionings.Arbitrary()),
kwargs,
to_group,
to_group_with_index,
grouping_columns=grouping_columns,
grouping_indexes=grouping_indexes)