def __init__()

in sdks/python/apache_beam/runners/dataflow/internal/apiclient.py [0:0]


  def __init__(
      self,
      packages,
      options,
      environment_version,
      proto_pipeline_staged_url,
      proto_pipeline=None,
      _sdk_image_overrides=None):
    self.standard_options = options.view_as(StandardOptions)
    self.google_cloud_options = options.view_as(GoogleCloudOptions)
    self.worker_options = options.view_as(WorkerOptions)
    self.debug_options = options.view_as(DebugOptions)
    self.pipeline_url = proto_pipeline_staged_url
    self.proto = dataflow.Environment()
    self.proto.clusterManagerApiService = GoogleCloudOptions.COMPUTE_API_SERVICE
    self.proto.dataset = '{}/cloud_dataflow'.format(
        GoogleCloudOptions.BIGQUERY_API_SERVICE)
    self.proto.tempStoragePrefix = (
        self.google_cloud_options.temp_location.replace(
            'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE))
    if self.worker_options.worker_region:
      self.proto.workerRegion = self.worker_options.worker_region
    if self.worker_options.worker_zone:
      self.proto.workerZone = self.worker_options.worker_zone
    # User agent information.
    self.proto.userAgent = dataflow.Environment.UserAgentValue()
    self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint
    self._proto_pipeline = proto_pipeline
    self._sdk_image_overrides = _sdk_image_overrides or dict()

    if self.google_cloud_options.service_account_email:
      self.proto.serviceAccountEmail = (
          self.google_cloud_options.service_account_email)
    if self.google_cloud_options.dataflow_kms_key:
      self.proto.serviceKmsKeyName = self.google_cloud_options.dataflow_kms_key

    self.proto.userAgent.additionalProperties.extend([
        dataflow.Environment.UserAgentValue.AdditionalProperty(
            key='name', value=to_json_value(self._get_python_sdk_name())),
        dataflow.Environment.UserAgentValue.AdditionalProperty(
            key='version', value=to_json_value(beam_version.__version__))
    ])
    # Version information.
    self.proto.version = dataflow.Environment.VersionValue()
    _verify_interpreter_version_is_supported(options)
    if self.standard_options.streaming:
      job_type = 'FNAPI_STREAMING'
    else:
      if _use_fnapi(options):
        job_type = 'FNAPI_BATCH'
      else:
        job_type = 'PYTHON_BATCH'
    self.proto.version.additionalProperties.extend([
        dataflow.Environment.VersionValue.AdditionalProperty(
            key='job_type', value=to_json_value(job_type)),
        dataflow.Environment.VersionValue.AdditionalProperty(
            key='major', value=to_json_value(environment_version))
    ])
    # TODO: Use enumerated type instead of strings for job types.
    if job_type.startswith('FNAPI_'):
      self.debug_options.experiments = self.debug_options.experiments or []

      if self.debug_options.lookup_experiment(
          'runner_harness_container_image') or _use_unified_worker(options):
        # Default image is not used if user provides a runner harness image.
        # Default runner harness image is selected by the service for unified
        # worker.
        pass
      else:
        runner_harness_override = (get_runner_harness_container_image())
        if runner_harness_override:
          self.debug_options.add_experiment(
              'runner_harness_container_image=' + runner_harness_override)
      debug_options_experiments = self.debug_options.experiments
      # Add use_multiple_sdk_containers flag if it's not already present. Do not
      # add the flag if 'no_use_multiple_sdk_containers' is present.
      # TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK
      # till version 2.4.
      if ('use_multiple_sdk_containers' not in debug_options_experiments and
          'no_use_multiple_sdk_containers' not in debug_options_experiments):
        debug_options_experiments.append('use_multiple_sdk_containers')
    # FlexRS
    if self.google_cloud_options.flexrs_goal == 'COST_OPTIMIZED':
      self.proto.flexResourceSchedulingGoal = (
          dataflow.Environment.FlexResourceSchedulingGoalValueValuesEnum.
          FLEXRS_COST_OPTIMIZED)
    elif self.google_cloud_options.flexrs_goal == 'SPEED_OPTIMIZED':
      self.proto.flexResourceSchedulingGoal = (
          dataflow.Environment.FlexResourceSchedulingGoalValueValuesEnum.
          FLEXRS_SPEED_OPTIMIZED)
    # Experiments
    if self.debug_options.experiments:
      for experiment in self.debug_options.experiments:
        self.proto.experiments.append(experiment)
    # Worker pool(s) information.
    package_descriptors = []
    for package in packages:
      package_descriptors.append(
          dataflow.Package(
              location='%s/%s' % (
                  self.google_cloud_options.staging_location.replace(
                      'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE),
                  package),
              name=package))

    pool = dataflow.WorkerPool(
        kind='local' if self.local else 'harness',
        packages=package_descriptors,
        taskrunnerSettings=dataflow.TaskRunnerSettings(
            parallelWorkerSettings=dataflow.WorkerSettings(
                baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT,
                servicePath=self.google_cloud_options.dataflow_endpoint)))

    pool.autoscalingSettings = dataflow.AutoscalingSettings()
    # Set worker pool options received through command line.
    if self.worker_options.num_workers:
      pool.numWorkers = self.worker_options.num_workers
    if self.worker_options.max_num_workers:
      pool.autoscalingSettings.maxNumWorkers = (
          self.worker_options.max_num_workers)
    if self.worker_options.autoscaling_algorithm:
      values_enum = dataflow.AutoscalingSettings.AlgorithmValueValuesEnum
      pool.autoscalingSettings.algorithm = {
          'NONE': values_enum.AUTOSCALING_ALGORITHM_NONE,
          'THROUGHPUT_BASED': values_enum.AUTOSCALING_ALGORITHM_BASIC,
      }.get(self.worker_options.autoscaling_algorithm)
    if self.worker_options.machine_type:
      pool.machineType = self.worker_options.machine_type
    if self.worker_options.disk_size_gb:
      pool.diskSizeGb = self.worker_options.disk_size_gb
    if self.worker_options.disk_type:
      pool.diskType = self.worker_options.disk_type
    if self.worker_options.zone:
      pool.zone = self.worker_options.zone
    if self.worker_options.network:
      pool.network = self.worker_options.network
    if self.worker_options.subnetwork:
      pool.subnetwork = self.worker_options.subnetwork
    pool.workerHarnessContainerImage = (
        get_container_image_from_options(options))

    # Setting worker pool sdk_harness_container_images option for supported
    # Dataflow workers.
    environments_to_use = self._get_environments_from_tranforms()
    if _use_unified_worker(options):
      python_sdk_container_image = get_container_image_from_options(options)

      # Adding container images for other SDKs that may be needed for
      # cross-language pipelines.
      for id, environment in environments_to_use:
        if environment.urn != common_urns.environments.DOCKER.urn:
          raise Exception(
              'Dataflow can only execute pipeline steps in Docker environments.'
              ' Received %r.' % environment)
        environment_payload = proto_utils.parse_Bytes(
            environment.payload, beam_runner_api_pb2.DockerPayload)
        container_image_url = environment_payload.container_image

        container_image = dataflow.SdkHarnessContainerImage()
        container_image.containerImage = container_image_url
        # Currently we only set following to True for Python SDK.
        # TODO: set this correctly for remote environments that might be Python.
        container_image.useSingleCorePerContainer = (
            container_image_url == python_sdk_container_image)
        container_image.environmentId = id
        pool.sdkHarnessContainerImages.append(container_image)

    if self.debug_options.number_of_worker_harness_threads:
      pool.numThreadsPerWorker = (
          self.debug_options.number_of_worker_harness_threads)
    if self.worker_options.use_public_ips is not None:
      if self.worker_options.use_public_ips:
        pool.ipConfiguration = (
            dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PUBLIC)
      else:
        pool.ipConfiguration = (
            dataflow.WorkerPool.IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE
        )

    if self.standard_options.streaming:
      # Use separate data disk for streaming.
      disk = dataflow.Disk()
      if self.local:
        disk.diskType = 'local'
      if self.worker_options.disk_type:
        disk.diskType = self.worker_options.disk_type
      pool.dataDisks.append(disk)
    self.proto.workerPools.append(pool)

    sdk_pipeline_options = options.get_all_options()
    if sdk_pipeline_options:
      self.proto.sdkPipelineOptions = (
          dataflow.Environment.SdkPipelineOptionsValue())

      options_dict = {
          k: v
          for k, v in sdk_pipeline_options.items() if v is not None
      }
      options_dict["pipelineUrl"] = proto_pipeline_staged_url
      self.proto.sdkPipelineOptions.additionalProperties.append(
          dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
              key='options', value=to_json_value(options_dict)))

      dd = DisplayData.create_from_options(options)
      items = [item.get_dict() for item in dd.items]
      self.proto.sdkPipelineOptions.additionalProperties.append(
          dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
              key='display_data', value=to_json_value(items)))

    if self.google_cloud_options.dataflow_service_options:
      for option in self.google_cloud_options.dataflow_service_options:
        self.proto.serviceOptions.append(option)

    if self.google_cloud_options.enable_hot_key_logging:
      self.proto.debugOptions = dataflow.DebugOptions(enableHotKeyLogging=True)