in sdks/python/apache_beam/runners/dataflow/dataflow_runner.py [0:0]
def run_Read(self, transform_node, options):
transform = transform_node.transform
step = self._add_step(
TransformNames.READ, transform_node.full_label, transform_node)
# TODO(mairbek): refactor if-else tree to use registerable functions.
# Initialize the source specific properties.
standard_options = options.view_as(StandardOptions)
if not hasattr(transform.source, 'format'):
# If a format is not set, we assume the source to be a custom source.
source_dict = {}
source_dict['spec'] = {
'@type': names.SOURCE_TYPE,
names.SERIALIZED_SOURCE_KEY: pickler.dumps(transform.source)
}
try:
source_dict['metadata'] = {
'estimated_size_bytes': json_value.get_typed_value_descriptor(
transform.source.estimate_size())
}
except error.RuntimeValueProviderError:
# Size estimation is best effort, and this error is by value provider.
_LOGGER.info(
'Could not estimate size of source %r due to ' + \
'RuntimeValueProviderError', transform.source)
except Exception: # pylint: disable=broad-except
# Size estimation is best effort. So we log the error and continue.
_LOGGER.info(
'Could not estimate size of source %r due to an exception: %s',
transform.source,
traceback.format_exc())
step.add_property(PropertyNames.SOURCE_STEP_INPUT, source_dict)
elif transform.source.format == 'text':
step.add_property(PropertyNames.FILE_PATTERN, transform.source.path)
elif transform.source.format == 'bigquery':
if standard_options.streaming:
raise ValueError(
'BigQuery source is not currently available for use '
'in streaming pipelines.')
debug_options = options.view_as(DebugOptions)
use_fn_api = (
debug_options.experiments and
'beam_fn_api' in debug_options.experiments)
if use_fn_api:
raise ValueError(BQ_SOURCE_UW_ERROR)
step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_AVRO')
# TODO(silviuc): Add table validation if transform.source.validate.
if transform.source.table_reference is not None:
step.add_property(
PropertyNames.BIGQUERY_DATASET,
transform.source.table_reference.datasetId)
step.add_property(
PropertyNames.BIGQUERY_TABLE,
transform.source.table_reference.tableId)
# If project owning the table was not specified then the project owning
# the workflow (current project) will be used.
if transform.source.table_reference.projectId is not None:
step.add_property(
PropertyNames.BIGQUERY_PROJECT,
transform.source.table_reference.projectId)
elif transform.source.query is not None:
step.add_property(PropertyNames.BIGQUERY_QUERY, transform.source.query)
step.add_property(
PropertyNames.BIGQUERY_USE_LEGACY_SQL,
transform.source.use_legacy_sql)
step.add_property(
PropertyNames.BIGQUERY_FLATTEN_RESULTS,
transform.source.flatten_results)
else:
raise ValueError(
'BigQuery source %r must specify either a table or'
' a query' % transform.source)
if transform.source.kms_key is not None:
step.add_property(
PropertyNames.BIGQUERY_KMS_KEY, transform.source.kms_key)
elif transform.source.format == 'pubsub':
if not standard_options.streaming:
raise ValueError(
'Cloud Pub/Sub is currently available for use '
'only in streaming pipelines.')
# Only one of topic or subscription should be set.
if transform.source.full_subscription:
step.add_property(
PropertyNames.PUBSUB_SUBSCRIPTION,
transform.source.full_subscription)
elif transform.source.full_topic:
step.add_property(
PropertyNames.PUBSUB_TOPIC, transform.source.full_topic)
if transform.source.id_label:
step.add_property(
PropertyNames.PUBSUB_ID_LABEL, transform.source.id_label)
if transform.source.with_attributes:
# Setting this property signals Dataflow runner to return full
# PubsubMessages instead of just the data part of the payload.
step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '')
if transform.source.timestamp_attribute is not None:
step.add_property(
PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
transform.source.timestamp_attribute)
else:
raise ValueError(
'Source %r has unexpected format %s.' %
(transform.source, transform.source.format))
if not hasattr(transform.source, 'format'):
step.add_property(PropertyNames.FORMAT, names.SOURCE_FORMAT)
else:
step.add_property(PropertyNames.FORMAT, transform.source.format)
# Wrap coder in WindowedValueCoder: this is necessary as the encoding of a
# step should be the type of value outputted by each step. Read steps
# automatically wrap output values in a WindowedValue wrapper, if necessary.
# This is also necessary for proper encoding for size estimation.
# Using a GlobalWindowCoder as a place holder instead of the default
# PickleCoder because GlobalWindowCoder is known coder.
# TODO(robertwb): Query the collection for the windowfn to extract the
# correct coder.
coder = coders.WindowedValueCoder(
coders.registry.get_coder(transform_node.outputs[None].element_type),
coders.coders.GlobalWindowCoder())
step.encoding = self._get_cloud_encoding(coder)
step.add_property(
PropertyNames.OUTPUT_INFO,
[{
PropertyNames.USER_NAME: (
'%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
PropertyNames.ENCODING: step.encoding,
PropertyNames.OUTPUT_NAME: PropertyNames.OUT
}])