in sdks/python/apache_beam/runners/dataflow/internal/apiclient.py [0:0]
def __init__(
self,
packages,
options,
environment_version,
proto_pipeline_staged_url,
proto_pipeline=None,
_sdk_image_overrides=None):
self.standard_options = options.view_as(StandardOptions)
self.google_cloud_options = options.view_as(GoogleCloudOptions)
self.worker_options = options.view_as(WorkerOptions)
self.debug_options = options.view_as(DebugOptions)
self.pipeline_url = proto_pipeline_staged_url
self.proto = dataflow.Environment()
self.proto.clusterManagerApiService = GoogleCloudOptions.COMPUTE_API_SERVICE
self.proto.dataset = '{}/cloud_dataflow'.format(
GoogleCloudOptions.BIGQUERY_API_SERVICE)
self.proto.tempStoragePrefix = (
self.google_cloud_options.temp_location.replace(
'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE))
if self.worker_options.worker_region:
self.proto.workerRegion = self.worker_options.worker_region
if self.worker_options.worker_zone:
self.proto.workerZone = self.worker_options.worker_zone
# User agent information.
self.proto.userAgent = dataflow.Environment.UserAgentValue()
self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint
self._proto_pipeline = proto_pipeline
self._sdk_image_overrides = _sdk_image_overrides or dict()
if self.google_cloud_options.service_account_email:
self.proto.serviceAccountEmail = (
self.google_cloud_options.service_account_email)
if self.google_cloud_options.dataflow_kms_key:
self.proto.serviceKmsKeyName = self.google_cloud_options.dataflow_kms_key
self.proto.userAgent.additionalProperties.extend([
dataflow.Environment.UserAgentValue.AdditionalProperty(
key='name', value=to_json_value(self._get_python_sdk_name())),
dataflow.Environment.UserAgentValue.AdditionalProperty(
key='version', value=to_json_value(beam_version.__version__))
])
# Version information.
self.proto.version = dataflow.Environment.VersionValue()
_verify_interpreter_version_is_supported(options)
if self.standard_options.streaming:
job_type = 'FNAPI_STREAMING'
else:
if _use_fnapi(options):
job_type = 'FNAPI_BATCH'
else:
job_type = 'PYTHON_BATCH'
self.proto.version.additionalProperties.extend([
dataflow.Environment.VersionValue.AdditionalProperty(
key='job_type', value=to_json_value(job_type)),
dataflow.Environment.VersionValue.AdditionalProperty(
key='major', value=to_json_value(environment_version))
])
# TODO: Use enumerated type instead of strings for job types.
if job_type.startswith('FNAPI_'):
self.debug_options.experiments = self.debug_options.experiments or []
if self.debug_options.lookup_experiment(
'runner_harness_container_image') or _use_unified_worker(options):
# Default image is not used if user provides a runner harness image.
# Default runner harness image is selected by the service for unified
# worker.
pass
else:
runner_harness_override = (get_runner_harness_container_image())
if runner_harness_override:
self.debug_options.add_experiment(
'runner_harness_container_image=' + runner_harness_override)
debug_options_experiments = self.debug_options.experiments
# Add use_multiple_sdk_containers flag if it's not already present. Do not
# add the flag if 'no_use_multiple_sdk_containers' is present.
# TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK
# till version 2.4.
if ('use_multiple_sdk_containers' not in debug_options_experiments and
'no_use_multiple_sdk_containers' not in debug_options_experiments):
debug_options_experiments.append('use_multiple_sdk_containers')
# FlexRS
if self.google_cloud_options.flexrs_goal == 'COST_OPTIMIZED':
self.proto.flexResourceSchedulingGoal = (
dataflow.Environment.FlexResourceSchedulingGoalValueValuesEnum.
FLEXRS_COST_OPTIMIZED)
elif self.google_cloud_options.flexrs_goal == 'SPEED_OPTIMIZED':
self.proto.flexResourceSchedulingGoal = (
dataflow.Environment.FlexResourceSchedulingGoalValueValuesEnum.
FLEXRS_SPEED_OPTIMIZED)
# Experiments
if self.debug_options.experiments:
for experiment in self.debug_options.experiments:
self.proto.experiments.append(experiment)
# Worker pool(s) information.
package_descriptors = []
for package in packages:
package_descriptors.append(
dataflow.Package(
location='%s/%s' % (
self.google_cloud_options.staging_location.replace(
'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE),
package),
name=package))
pool = dataflow.WorkerPool(
kind='local' if self.local else 'harness',
packages=package_descriptors,
taskrunnerSettings=dataflow.TaskRunnerSettings(
parallelWorkerSettings=dataflow.WorkerSettings(
baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT,
servicePath=self.google_cloud_options.dataflow_endpoint)))
pool.autoscalingSettings = dataflow.AutoscalingSettings()
# Set worker pool options received through command line.
if self.worker_options.num_workers:
pool.numWorkers = self.worker_options.num_workers
if self.worker_options.max_num_workers:
pool.autoscalingSettings.maxNumWorkers = (
self.worker_options.max_num_workers)
if self.worker_options.autoscaling_algorithm:
values_enum = dataflow.AutoscalingSettings.AlgorithmValueValuesEnum
pool.autoscalingSettings.algorithm = {
'NONE': values_enum.AUTOSCALING_ALGORITHM_NONE,
'THROUGHPUT_BASED': values_enum.AUTOSCALING_ALGORITHM_BASIC,
}.get(self.worker_options.autoscaling_algorithm)
if self.worker_options.machine_type:
pool.machineType = self.worker_options.machine_type
if self.worker_options.disk_size_gb:
pool.diskSizeGb = self.worker_options.disk_size_gb
if self.worker_options.disk_type:
pool.diskType = self.worker_options.disk_type
if self.worker_options.zone:
pool.zone = self.worker_options.zone
if self.worker_options.network:
pool.network = self.worker_options.network
if self.worker_options.subnetwork:
pool.subnetwork = self.worker_options.subnetwork
pool.workerHarnessContainerImage = (
get_container_image_from_options(options))
# Setting worker pool sdk_harness_container_images option for supported
# Dataflow workers.
environments_to_use = self._get_environments_from_tranforms()
if _use_unified_worker(options):
python_sdk_container_image = get_container_image_from_options(options)
# Adding container images for other SDKs that may be needed for
# cross-language pipelines.
for id, environment in environments_to_use:
if environment.urn != common_urns.environments.DOCKER.urn:
raise Exception(
'Dataflow can only execute pipeline steps in Docker environments.'
' Received %r.' % environment)
environment_payload = proto_utils.parse_Bytes(
environment.payload, beam_runner_api_pb2.DockerPayload)
container_image_url = environment_payload.container_image
container_image = dataflow.SdkHarnessContainerImage()
container_image.containerImage = container_image_url
# Currently we only set following to True for Python SDK.
# TODO: set this correctly for remote environments that might be Python.
container_image.useSingleCorePerContainer = (
container_image_url == python_sdk_container_image)
container_image.environmentId = id
pool.sdkHarnessContainerImages.append(container_image)
if self.debug_options.number_of_worker_harness_threads:
pool.numThreadsPerWorker = (
self.debug_options.number_of_worker_harness_threads)
if self.worker_options.use_public_ips is not None:
if self.worker_options.use_public_ips:
pool.ipConfiguration = (
dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PUBLIC)
else:
pool.ipConfiguration = (
dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE
)
if self.standard_options.streaming:
# Use separate data disk for streaming.
disk = dataflow.Disk()
if self.local:
disk.diskType = 'local'
if self.worker_options.disk_type:
disk.diskType = self.worker_options.disk_type
pool.dataDisks.append(disk)
self.proto.workerPools.append(pool)
sdk_pipeline_options = options.get_all_options()
if sdk_pipeline_options:
self.proto.sdkPipelineOptions = (
dataflow.Environment.SdkPipelineOptionsValue())
options_dict = {
k: v
for k, v in sdk_pipeline_options.items() if v is not None
}
options_dict["pipelineUrl"] = proto_pipeline_staged_url
self.proto.sdkPipelineOptions.additionalProperties.append(
dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
key='options', value=to_json_value(options_dict)))
dd = DisplayData.create_from_options(options)
items = [item.get_dict() for item in dd.items]
self.proto.sdkPipelineOptions.additionalProperties.append(
dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
key='display_data', value=to_json_value(items)))
if self.google_cloud_options.dataflow_service_options:
for option in self.google_cloud_options.dataflow_service_options:
self.proto.serviceOptions.append(option)
if self.google_cloud_options.enable_hot_key_logging:
self.proto.debugOptions = dataflow.DebugOptions(enableHotKeyLogging=True)