in sdks/python/apache_beam/dataframe/frame_base.py [0:0]
def wrapper(*args, **kwargs):
for key, values in restrictions.items():
if key in kwargs:
value = kwargs[key]
else:
try:
ix = getfullargspec(func).args.index(key)
except ValueError:
# TODO: fix for delegation?
continue
if len(args) <= ix:
continue
value = args[ix]
if callable(values):
check = values
elif isinstance(values, list):
check = lambda x, values=values: x in values
else:
check = lambda x, value=value: x == value
if not check(value):
raise NotImplementedError(
'%s=%s not supported for %s' % (key, value, name))
deferred_arg_indices = []
deferred_arg_exprs = []
constant_args = [None] * len(args)
from apache_beam.dataframe.frames import _DeferredIndex
for ix, arg in enumerate(args):
if isinstance(arg, DeferredBase):
deferred_arg_indices.append(ix)
deferred_arg_exprs.append(arg._expr)
elif isinstance(arg, _DeferredIndex):
# TODO(robertwb): Consider letting indices pass through as indices.
# This would require updating the partitioning code, as indices don't
# have indices.
deferred_arg_indices.append(ix)
deferred_arg_exprs.append(
expressions.ComputedExpression(
'index_as_series',
lambda ix: ix.index.to_series(), # yapf break
[arg._frame._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Arbitrary()))
elif isinstance(arg, pd.core.generic.NDFrame):
deferred_arg_indices.append(ix)
deferred_arg_exprs.append(expressions.ConstantExpression(arg, arg[0:0]))
else:
constant_args[ix] = arg
deferred_kwarg_keys = []
deferred_kwarg_exprs = []
constant_kwargs = {key: None for key in kwargs}
for key, arg in kwargs.items():
if isinstance(arg, DeferredBase):
deferred_kwarg_keys.append(key)
deferred_kwarg_exprs.append(arg._expr)
elif isinstance(arg, pd.core.generic.NDFrame):
deferred_kwarg_keys.append(key)
deferred_kwarg_exprs.append(
expressions.ConstantExpression(arg, arg[0:0]))
else:
constant_kwargs[key] = arg
deferred_exprs = deferred_arg_exprs + deferred_kwarg_exprs
if inplace:
actual_func = _copy_and_mutate(func)
else:
actual_func = func
def apply(*actual_args):
actual_args, actual_kwargs = (actual_args[:len(deferred_arg_exprs)],
actual_args[len(deferred_arg_exprs):])
full_args = list(constant_args)
for ix, arg in zip(deferred_arg_indices, actual_args):
full_args[ix] = arg
full_kwargs = dict(constant_kwargs)
for key, arg in zip(deferred_kwarg_keys, actual_kwargs):
full_kwargs[key] = arg
return actual_func(*full_args, **full_kwargs)
if (requires_partition_by.is_subpartitioning_of(partitionings.Index()) and
sum(isinstance(arg.proxy(), pd.core.generic.NDFrame)
for arg in deferred_exprs) > 1):
# Implicit join on index if there is more than one indexed input.
actual_requires_partition_by = partitionings.Index()
else:
actual_requires_partition_by = requires_partition_by
result_expr = expressions.ComputedExpression(
name,
apply,
deferred_exprs,
requires_partition_by=actual_requires_partition_by,
preserves_partition_by=preserves_partition_by)
if inplace:
args[0]._expr = result_expr
else:
return DeferredFrame.wrap(result_expr)