def run_Read()

in sdks/python/apache_beam/runners/dataflow/dataflow_runner.py [0:0]


  def run_Read(self, transform_node, options):
    transform = transform_node.transform
    step = self._add_step(
        TransformNames.READ, transform_node.full_label, transform_node)
    # TODO(mairbek): refactor if-else tree to use registerable functions.
    # Initialize the source specific properties.

    standard_options = options.view_as(StandardOptions)
    if not hasattr(transform.source, 'format'):
      # If a format is not set, we assume the source to be a custom source.
      source_dict = {}

      source_dict['spec'] = {
          '@type': names.SOURCE_TYPE,
          names.SERIALIZED_SOURCE_KEY: pickler.dumps(transform.source)
      }

      try:
        source_dict['metadata'] = {
            'estimated_size_bytes': json_value.get_typed_value_descriptor(
                transform.source.estimate_size())
        }
      except error.RuntimeValueProviderError:
        # Size estimation is best effort, and this error is by value provider.
        _LOGGER.info(
            'Could not estimate size of source %r due to ' + \
            'RuntimeValueProviderError', transform.source)
      except Exception:  # pylint: disable=broad-except
        # Size estimation is best effort. So we log the error and continue.
        _LOGGER.info(
            'Could not estimate size of source %r due to an exception: %s',
            transform.source,
            traceback.format_exc())

      step.add_property(PropertyNames.SOURCE_STEP_INPUT, source_dict)
    elif transform.source.format == 'text':
      step.add_property(PropertyNames.FILE_PATTERN, transform.source.path)
    elif transform.source.format == 'bigquery':
      if standard_options.streaming:
        raise ValueError(
            'BigQuery source is not currently available for use '
            'in streaming pipelines.')
      debug_options = options.view_as(DebugOptions)
      use_fn_api = (
          debug_options.experiments and
          'beam_fn_api' in debug_options.experiments)
      if use_fn_api:
        raise ValueError(BQ_SOURCE_UW_ERROR)
      step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_AVRO')
      # TODO(silviuc): Add table validation if transform.source.validate.
      if transform.source.table_reference is not None:
        step.add_property(
            PropertyNames.BIGQUERY_DATASET,
            transform.source.table_reference.datasetId)
        step.add_property(
            PropertyNames.BIGQUERY_TABLE,
            transform.source.table_reference.tableId)
        # If project owning the table was not specified then the project owning
        # the workflow (current project) will be used.
        if transform.source.table_reference.projectId is not None:
          step.add_property(
              PropertyNames.BIGQUERY_PROJECT,
              transform.source.table_reference.projectId)
      elif transform.source.query is not None:
        step.add_property(PropertyNames.BIGQUERY_QUERY, transform.source.query)
        step.add_property(
            PropertyNames.BIGQUERY_USE_LEGACY_SQL,
            transform.source.use_legacy_sql)
        step.add_property(
            PropertyNames.BIGQUERY_FLATTEN_RESULTS,
            transform.source.flatten_results)
      else:
        raise ValueError(
            'BigQuery source %r must specify either a table or'
            ' a query' % transform.source)
      if transform.source.kms_key is not None:
        step.add_property(
            PropertyNames.BIGQUERY_KMS_KEY, transform.source.kms_key)
    elif transform.source.format == 'pubsub':
      if not standard_options.streaming:
        raise ValueError(
            'Cloud Pub/Sub is currently available for use '
            'only in streaming pipelines.')
      # Only one of topic or subscription should be set.
      if transform.source.full_subscription:
        step.add_property(
            PropertyNames.PUBSUB_SUBSCRIPTION,
            transform.source.full_subscription)
      elif transform.source.full_topic:
        step.add_property(
            PropertyNames.PUBSUB_TOPIC, transform.source.full_topic)
      if transform.source.id_label:
        step.add_property(
            PropertyNames.PUBSUB_ID_LABEL, transform.source.id_label)
      if transform.source.with_attributes:
        # Setting this property signals Dataflow runner to return full
        # PubsubMessages instead of just the data part of the payload.
        step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '')

      if transform.source.timestamp_attribute is not None:
        step.add_property(
            PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
            transform.source.timestamp_attribute)
    else:
      raise ValueError(
          'Source %r has unexpected format %s.' %
          (transform.source, transform.source.format))

    if not hasattr(transform.source, 'format'):
      step.add_property(PropertyNames.FORMAT, names.SOURCE_FORMAT)
    else:
      step.add_property(PropertyNames.FORMAT, transform.source.format)

    # Wrap coder in WindowedValueCoder: this is necessary as the encoding of a
    # step should be the type of value outputted by each step.  Read steps
    # automatically wrap output values in a WindowedValue wrapper, if necessary.
    # This is also necessary for proper encoding for size estimation.
    # Using a GlobalWindowCoder as a place holder instead of the default
    # PickleCoder because GlobalWindowCoder is known coder.
    # TODO(robertwb): Query the collection for the windowfn to extract the
    # correct coder.
    coder = coders.WindowedValueCoder(
        coders.registry.get_coder(transform_node.outputs[None].element_type),
        coders.coders.GlobalWindowCoder())

    step.encoding = self._get_cloud_encoding(coder)
    step.add_property(
        PropertyNames.OUTPUT_INFO,
        [{
            PropertyNames.USER_NAME: (
                '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
            PropertyNames.ENCODING: step.encoding,
            PropertyNames.OUTPUT_NAME: PropertyNames.OUT
        }])