in sdks/python/apache_beam/runners/dataflow/dataflow_runner.py [0:0]
def run_ParDo(self, transform_node, options):
transform = transform_node.transform
input_tag = transform_node.inputs[0].tag
input_step = self._cache.get_pvalue(transform_node.inputs[0])
# Attach side inputs.
si_dict = {}
si_labels = {}
full_label_counts = defaultdict(int)
lookup_label = lambda side_pval: si_labels[side_pval]
named_inputs = transform_node.named_inputs()
label_renames = {}
for ix, side_pval in enumerate(transform_node.side_inputs):
assert isinstance(side_pval, AsSideInput)
step_name = 'SideInput-' + self._get_unique_step_name()
si_label = ((SIDE_INPUT_PREFIX + '%d-%s') %
(ix, transform_node.full_label))
old_label = (SIDE_INPUT_PREFIX + '%d') % ix
label_renames[old_label] = si_label
assert old_label in named_inputs
pcollection_label = '%s.%s' % (
side_pval.pvalue.producer.full_label.split('/')[-1],
side_pval.pvalue.tag if side_pval.pvalue.tag else 'out')
si_full_label = '%s/%s(%s.%s)' % (
transform_node.full_label,
side_pval.__class__.__name__,
pcollection_label,
full_label_counts[pcollection_label])
# Count the number of times the same PCollection is a side input
# to the same ParDo.
full_label_counts[pcollection_label] += 1
self._add_singleton_step(
step_name,
si_full_label,
side_pval.pvalue.tag,
self._cache.get_pvalue(side_pval.pvalue),
side_pval.pvalue.windowing,
side_pval._side_input_data().access_pattern)
si_dict[si_label] = {
'@type': 'OutputReference',
PropertyNames.STEP_NAME: step_name,
PropertyNames.OUTPUT_NAME: PropertyNames.OUT
}
si_labels[side_pval] = si_label
# Now create the step for the ParDo transform being handled.
transform_name = transform_node.full_label.rsplit('/', 1)[-1]
step = self._add_step(
TransformNames.DO,
transform_node.full_label +
('/{}'.format(transform_name) if transform_node.side_inputs else ''),
transform_node,
transform_node.transform.output_tags)
# Import here to avoid adding the dependency for local running scenarios.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.runners.dataflow.internal import apiclient
transform_proto = self.proto_context.transforms.get_proto(transform_node)
transform_id = self.proto_context.transforms.get_id(transform_node)
use_fnapi = apiclient._use_fnapi(options)
use_unified_worker = apiclient._use_unified_worker(options)
# Patch side input ids to be unique across a given pipeline.
if (label_renames and
transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
# Patch PTransform proto.
for old, new in label_renames.items():
transform_proto.inputs[new] = transform_proto.inputs[old]
del transform_proto.inputs[old]
# Patch ParDo proto.
proto_type, _ = beam.PTransform._known_urns[transform_proto.spec.urn]
proto = proto_utils.parse_Bytes(transform_proto.spec.payload, proto_type)
for old, new in label_renames.items():
proto.side_inputs[new].CopyFrom(proto.side_inputs[old])
del proto.side_inputs[old]
transform_proto.spec.payload = proto.SerializeToString()
# We need to update the pipeline proto.
del self.proto_pipeline.components.transforms[transform_id]
(
self.proto_pipeline.components.transforms[transform_id].CopyFrom(
transform_proto))
# The data transmitted in SERIALIZED_FN is different depending on whether
# this is a fnapi pipeline or not.
if (use_fnapi and
(transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn or
use_unified_worker)):
serialized_data = transform_id
else:
serialized_data = pickler.dumps(
self._pardo_fn_data(transform_node, lookup_label))
step.add_property(PropertyNames.SERIALIZED_FN, serialized_data)
# TODO(BEAM-8882): Enable once dataflow service doesn't reject this.
# step.add_property(PropertyNames.PIPELINE_PROTO_TRANSFORM_ID, transform_id)
step.add_property(
PropertyNames.PARALLEL_INPUT,
{
'@type': 'OutputReference',
PropertyNames.STEP_NAME: input_step.proto.name,
PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)
})
# Add side inputs if any.
step.add_property(PropertyNames.NON_PARALLEL_INPUTS, si_dict)
# Generate description for the outputs. The output names
# will be 'None' for main output and '<tag>' for a tagged output.
outputs = []
all_output_tags = list(transform_proto.outputs.keys())
# Some external transforms require output tags to not be modified.
# So we randomly select one of the output tags as the main output and
# leave others as side outputs. Transform execution should not change
# dependending on which output tag we choose as the main output here.
# Also, some SDKs do not work correctly if output tags are modified. So for
# external transforms, we leave tags unmodified.
#
# Python SDK uses 'None' as the tag of the main output.
main_output_tag = 'None'
step.encoding = self._get_encoded_output_coder(
transform_node, output_tag=main_output_tag)
side_output_tags = set(all_output_tags).difference({main_output_tag})
# Add the main output to the description.
outputs.append({
PropertyNames.USER_NAME: (
'%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
PropertyNames.ENCODING: step.encoding,
PropertyNames.OUTPUT_NAME: main_output_tag
})
for side_tag in side_output_tags:
# The assumption here is that all outputs will have the same typehint
# and coder as the main output. This is certainly the case right now
# but conceivably it could change in the future.
encoding = self._get_encoded_output_coder(
transform_node, output_tag=side_tag)
outputs.append({
PropertyNames.USER_NAME: (
'%s.%s' % (transform_node.full_label, side_tag)),
PropertyNames.ENCODING: encoding,
PropertyNames.OUTPUT_NAME: side_tag
})
step.add_property(PropertyNames.OUTPUT_INFO, outputs)
# Add the restriction encoding if we are a splittable DoFn
# and are using the Fn API on the unified worker.
restriction_coder = transform.get_restriction_coder()
if restriction_coder:
step.add_property(
PropertyNames.RESTRICTION_ENCODING,
self._get_cloud_encoding(restriction_coder))
if options.view_as(StandardOptions).streaming:
is_stateful_dofn = (DoFnSignature(transform.dofn).is_stateful_dofn())
if is_stateful_dofn:
step.add_property(PropertyNames.USES_KEYED_STATE, 'true')
# Also checks whether the step allows shardable keyed states.
# TODO(BEAM-11360): remove this when migrated to portable job
# submission since we only consider supporting the property in runner
# v2.
for pcoll in transform_node.outputs.values():
if pcoll._unique_name() in self.get_pcoll_with_auto_sharding():
step.add_property(PropertyNames.ALLOWS_SHARDABLE_STATE, 'true')
# Currently we only allow auto-sharding to be enabled through the
# GroupIntoBatches transform. So we also add the following property
# which GroupIntoBatchesDoFn has, to allow the backend to perform
# graph optimization.
step.add_property(PropertyNames.PRESERVES_KEYS, 'true')
break