def run_ParDo()

in sdks/python/apache_beam/runners/dataflow/dataflow_runner.py [0:0]


  def run_ParDo(self, transform_node, options):
    transform = transform_node.transform
    input_tag = transform_node.inputs[0].tag
    input_step = self._cache.get_pvalue(transform_node.inputs[0])

    # Attach side inputs.
    si_dict = {}
    si_labels = {}
    full_label_counts = defaultdict(int)
    lookup_label = lambda side_pval: si_labels[side_pval]
    named_inputs = transform_node.named_inputs()
    label_renames = {}
    for ix, side_pval in enumerate(transform_node.side_inputs):
      assert isinstance(side_pval, AsSideInput)
      step_name = 'SideInput-' + self._get_unique_step_name()
      si_label = ((SIDE_INPUT_PREFIX + '%d-%s') %
                  (ix, transform_node.full_label))
      old_label = (SIDE_INPUT_PREFIX + '%d') % ix

      label_renames[old_label] = si_label

      assert old_label in named_inputs
      pcollection_label = '%s.%s' % (
          side_pval.pvalue.producer.full_label.split('/')[-1],
          side_pval.pvalue.tag if side_pval.pvalue.tag else 'out')
      si_full_label = '%s/%s(%s.%s)' % (
          transform_node.full_label,
          side_pval.__class__.__name__,
          pcollection_label,
          full_label_counts[pcollection_label])

      # Count the number of times the same PCollection is a side input
      # to the same ParDo.
      full_label_counts[pcollection_label] += 1

      self._add_singleton_step(
          step_name,
          si_full_label,
          side_pval.pvalue.tag,
          self._cache.get_pvalue(side_pval.pvalue),
          side_pval.pvalue.windowing,
          side_pval._side_input_data().access_pattern)
      si_dict[si_label] = {
          '@type': 'OutputReference',
          PropertyNames.STEP_NAME: step_name,
          PropertyNames.OUTPUT_NAME: PropertyNames.OUT
      }
      si_labels[side_pval] = si_label

    # Now create the step for the ParDo transform being handled.
    transform_name = transform_node.full_label.rsplit('/', 1)[-1]
    step = self._add_step(
        TransformNames.DO,
        transform_node.full_label +
        ('/{}'.format(transform_name) if transform_node.side_inputs else ''),
        transform_node,
        transform_node.transform.output_tags)
    # Import here to avoid adding the dependency for local running scenarios.
    # pylint: disable=wrong-import-order, wrong-import-position
    from apache_beam.runners.dataflow.internal import apiclient
    transform_proto = self.proto_context.transforms.get_proto(transform_node)
    transform_id = self.proto_context.transforms.get_id(transform_node)
    use_fnapi = apiclient._use_fnapi(options)
    use_unified_worker = apiclient._use_unified_worker(options)
    # Patch side input ids to be unique across a given pipeline.
    if (label_renames and
        transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
      # Patch PTransform proto.
      for old, new in label_renames.items():
        transform_proto.inputs[new] = transform_proto.inputs[old]
        del transform_proto.inputs[old]

      # Patch ParDo proto.
      proto_type, _ = beam.PTransform._known_urns[transform_proto.spec.urn]
      proto = proto_utils.parse_Bytes(transform_proto.spec.payload, proto_type)
      for old, new in label_renames.items():
        proto.side_inputs[new].CopyFrom(proto.side_inputs[old])
        del proto.side_inputs[old]
      transform_proto.spec.payload = proto.SerializeToString()
      # We need to update the pipeline proto.
      del self.proto_pipeline.components.transforms[transform_id]
      (
          self.proto_pipeline.components.transforms[transform_id].CopyFrom(
              transform_proto))
    # The data transmitted in SERIALIZED_FN is different depending on whether
    # this is a fnapi pipeline or not.
    if (use_fnapi and
        (transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn or
         use_unified_worker)):
      serialized_data = transform_id
    else:
      serialized_data = pickler.dumps(
          self._pardo_fn_data(transform_node, lookup_label))
    step.add_property(PropertyNames.SERIALIZED_FN, serialized_data)
    # TODO(BEAM-8882): Enable once dataflow service doesn't reject this.
    # step.add_property(PropertyNames.PIPELINE_PROTO_TRANSFORM_ID, transform_id)
    step.add_property(
        PropertyNames.PARALLEL_INPUT,
        {
            '@type': 'OutputReference',
            PropertyNames.STEP_NAME: input_step.proto.name,
            PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)
        })
    # Add side inputs if any.
    step.add_property(PropertyNames.NON_PARALLEL_INPUTS, si_dict)

    # Generate description for the outputs. The output names
    # will be 'None' for main output and '<tag>' for a tagged output.
    outputs = []

    all_output_tags = list(transform_proto.outputs.keys())

    # Some external transforms require output tags to not be modified.
    # So we randomly select one of the output tags as the main output and
    # leave others as side outputs. Transform execution should not change
    # dependending on which output tag we choose as the main output here.
    # Also, some SDKs do not work correctly if output tags are modified. So for
    # external transforms, we leave tags unmodified.
    #
    # Python SDK uses 'None' as the tag of the main output.
    main_output_tag = 'None'

    step.encoding = self._get_encoded_output_coder(
        transform_node, output_tag=main_output_tag)

    side_output_tags = set(all_output_tags).difference({main_output_tag})

    # Add the main output to the description.
    outputs.append({
        PropertyNames.USER_NAME: (
            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
        PropertyNames.ENCODING: step.encoding,
        PropertyNames.OUTPUT_NAME: main_output_tag
    })
    for side_tag in side_output_tags:
      # The assumption here is that all outputs will have the same typehint
      # and coder as the main output. This is certainly the case right now
      # but conceivably it could change in the future.
      encoding = self._get_encoded_output_coder(
          transform_node, output_tag=side_tag)
      outputs.append({
          PropertyNames.USER_NAME: (
              '%s.%s' % (transform_node.full_label, side_tag)),
          PropertyNames.ENCODING: encoding,
          PropertyNames.OUTPUT_NAME: side_tag
      })

    step.add_property(PropertyNames.OUTPUT_INFO, outputs)

    # Add the restriction encoding if we are a splittable DoFn
    # and are using the Fn API on the unified worker.
    restriction_coder = transform.get_restriction_coder()
    if restriction_coder:
      step.add_property(
          PropertyNames.RESTRICTION_ENCODING,
          self._get_cloud_encoding(restriction_coder))

    if options.view_as(StandardOptions).streaming:
      is_stateful_dofn = (DoFnSignature(transform.dofn).is_stateful_dofn())
      if is_stateful_dofn:
        step.add_property(PropertyNames.USES_KEYED_STATE, 'true')

        # Also checks whether the step allows shardable keyed states.
        # TODO(BEAM-11360): remove this when migrated to portable job
        #  submission since we only consider supporting the property in runner
        #  v2.
        for pcoll in transform_node.outputs.values():
          if pcoll._unique_name() in self.get_pcoll_with_auto_sharding():
            step.add_property(PropertyNames.ALLOWS_SHARDABLE_STATE, 'true')
            # Currently we only allow auto-sharding to be enabled through the
            # GroupIntoBatches transform. So we also add the following property
            # which GroupIntoBatchesDoFn has, to allow the backend to perform
            # graph optimization.
            step.add_property(PropertyNames.PRESERVES_KEYS, 'true')
            break