def groupby()

in sdks/python/apache_beam/dataframe/frames.py [0:0]


  def groupby(self, by, level, axis, as_index, group_keys, **kwargs):
    if not as_index:
      raise NotImplementedError('groupby(as_index=False)')
    if not group_keys:
      raise NotImplementedError('groupby(group_keys=False)')

    if axis in (1, 'columns'):
      return _DeferredGroupByCols(
          expressions.ComputedExpression(
              'groupbycols',
              lambda df: df.groupby(by, axis=axis, **kwargs), [self._expr],
              requires_partition_by=partitionings.Arbitrary(),
              preserves_partition_by=partitionings.Arbitrary()))

    if level is None and by is None:
      raise TypeError("You have to supply one of 'by' and 'level'")

    elif level is not None:
      if isinstance(level, (list, tuple)):
        grouping_indexes = level
      else:
        grouping_indexes = [level]

      grouping_columns = []

      index = self._expr.proxy().index

      # Translate to level numbers only
      grouping_indexes = [
          l if isinstance(l, int) else index.names.index(l)
          for l in grouping_indexes
      ]

      if index.nlevels == 1:
        to_group_with_index = self._expr
        to_group = self._expr
      else:
        levels_to_drop = [
            i for i in range(index.nlevels) if i not in grouping_indexes
        ]

        # Reorder so the grouped indexes are first
        to_group_with_index = self.reorder_levels(
            grouping_indexes + levels_to_drop)

        grouping_indexes = list(range(len(grouping_indexes)))
        levels_to_drop = list(range(len(grouping_indexes), index.nlevels))
        if levels_to_drop:
          to_group = to_group_with_index.droplevel(levels_to_drop)._expr
        else:
          to_group = to_group_with_index._expr
        to_group_with_index = to_group_with_index._expr

    elif callable(by):

      def map_index(df):
        df = df.copy()
        df.index = df.index.map(by)
        return df

      to_group = expressions.ComputedExpression(
          'map_index',
          map_index, [self._expr],
          requires_partition_by=partitionings.Arbitrary(),
          preserves_partition_by=partitionings.Singleton())

      orig_nlevels = self._expr.proxy().index.nlevels
      to_group_with_index = expressions.ComputedExpression(
          'map_index_keep_orig',
          lambda df: df.set_index([df.index.map(by), df.index]),
          [self._expr],
          requires_partition_by=partitionings.Arbitrary(),
          # Partitioning by the original indexes is preserved
          preserves_partition_by=partitionings.Index(
              list(range(1, orig_nlevels + 1))))

      grouping_columns = []
      # The index we need to group by is the last one
      grouping_indexes = [0]

    elif isinstance(by, DeferredSeries):

      raise NotImplementedError(
          "grouping by a Series is not yet implemented. You can group by a "
          "DataFrame column by specifying its name.")

    elif isinstance(by, np.ndarray):
      raise frame_base.WontImplementError(
          "Grouping by a concrete ndarray is order sensitive.",
          reason="order-sensitive")

    elif isinstance(self, DeferredDataFrame):
      if not isinstance(by, list):
        by = [by]
      # Find the columns that we need to move into the index so we can group by
      # them
      column_names = self._expr.proxy().columns
      grouping_columns = list(set(by).intersection(column_names))
      index_names = self._expr.proxy().index.names
      for label in by:
        if label not in index_names and label not in self._expr.proxy().columns:
          raise KeyError(label)
      grouping_indexes = list(set(by).intersection(index_names))

      if grouping_indexes:
        if set(by) == set(index_names):
          to_group = self._expr
        elif set(by).issubset(index_names):
          to_group = self.droplevel(index_names.difference(by))._expr
        else:
          to_group = self.reset_index(grouping_indexes).set_index(by)._expr
      else:
        to_group = self.set_index(by)._expr

      if grouping_columns:
        # TODO(BEAM-11711): It should be possible to do this without creating an
        # expression manually, by using DeferredDataFrame.set_index, i.e.:
        #   to_group_with_index = self.set_index([self.index] +
        #                                        grouping_columns)._expr
        to_group_with_index = expressions.ComputedExpression(
            'move_grouped_columns_to_index',
            lambda df: df.set_index([df.index] + grouping_columns),
            [self._expr],
            requires_partition_by=partitionings.Arbitrary(),
            preserves_partition_by=partitionings.Index(
                list(range(self._expr.proxy().index.nlevels))))
      else:
        to_group_with_index = self._expr

    else:
      raise NotImplementedError(by)

    return DeferredGroupBy(
        expressions.ComputedExpression(
            'groupbyindex',
            lambda df: df.groupby(
                level=list(range(df.index.nlevels)), **kwargs), [to_group],
            requires_partition_by=partitionings.Index(),
            preserves_partition_by=partitionings.Arbitrary()),
        kwargs,
        to_group,
        to_group_with_index,
        grouping_columns=grouping_columns,
        grouping_indexes=grouping_indexes)