def wrapper()

in sdks/python/apache_beam/dataframe/frame_base.py [0:0]


  def wrapper(*args, **kwargs):
    for key, values in restrictions.items():
      if key in kwargs:
        value = kwargs[key]
      else:
        try:
          ix = getfullargspec(func).args.index(key)
        except ValueError:
          # TODO: fix for delegation?
          continue
        if len(args) <= ix:
          continue
        value = args[ix]
      if callable(values):
        check = values
      elif isinstance(values, list):
        check = lambda x, values=values: x in values
      else:
        check = lambda x, value=value: x == value

      if not check(value):
        raise NotImplementedError(
            '%s=%s not supported for %s' % (key, value, name))
    deferred_arg_indices = []
    deferred_arg_exprs = []
    constant_args = [None] * len(args)
    from apache_beam.dataframe.frames import _DeferredIndex
    for ix, arg in enumerate(args):
      if isinstance(arg, DeferredBase):
        deferred_arg_indices.append(ix)
        deferred_arg_exprs.append(arg._expr)
      elif isinstance(arg, _DeferredIndex):
        # TODO(robertwb): Consider letting indices pass through as indices.
        # This would require updating the partitioning code, as indices don't
        # have indices.
        deferred_arg_indices.append(ix)
        deferred_arg_exprs.append(
            expressions.ComputedExpression(
                'index_as_series',
                lambda ix: ix.index.to_series(),  # yapf break
                [arg._frame._expr],
                preserves_partition_by=partitionings.Singleton(),
                requires_partition_by=partitionings.Arbitrary()))
      elif isinstance(arg, pd.core.generic.NDFrame):
        deferred_arg_indices.append(ix)
        deferred_arg_exprs.append(expressions.ConstantExpression(arg, arg[0:0]))
      else:
        constant_args[ix] = arg

    deferred_kwarg_keys = []
    deferred_kwarg_exprs = []
    constant_kwargs = {key: None for key in kwargs}
    for key, arg in kwargs.items():
      if isinstance(arg, DeferredBase):
        deferred_kwarg_keys.append(key)
        deferred_kwarg_exprs.append(arg._expr)
      elif isinstance(arg, pd.core.generic.NDFrame):
        deferred_kwarg_keys.append(key)
        deferred_kwarg_exprs.append(
            expressions.ConstantExpression(arg, arg[0:0]))
      else:
        constant_kwargs[key] = arg

    deferred_exprs = deferred_arg_exprs + deferred_kwarg_exprs

    if inplace:
      actual_func = _copy_and_mutate(func)
    else:
      actual_func = func

    def apply(*actual_args):
      actual_args, actual_kwargs = (actual_args[:len(deferred_arg_exprs)],
                                    actual_args[len(deferred_arg_exprs):])

      full_args = list(constant_args)
      for ix, arg in zip(deferred_arg_indices, actual_args):
        full_args[ix] = arg

      full_kwargs = dict(constant_kwargs)
      for key, arg in zip(deferred_kwarg_keys, actual_kwargs):
        full_kwargs[key] = arg

      return actual_func(*full_args, **full_kwargs)

    if (requires_partition_by.is_subpartitioning_of(partitionings.Index()) and
        sum(isinstance(arg.proxy(), pd.core.generic.NDFrame)
            for arg in deferred_exprs) > 1):
      # Implicit join on index if there is more than one indexed input.
      actual_requires_partition_by = partitionings.Index()
    else:
      actual_requires_partition_by = requires_partition_by

    result_expr = expressions.ComputedExpression(
        name,
        apply,
        deferred_exprs,
        requires_partition_by=actual_requires_partition_by,
        preserves_partition_by=preserves_partition_by)
    if inplace:
      args[0]._expr = result_expr

    else:
      return DeferredFrame.wrap(result_expr)