def _build_container_spec()

in tfx/orchestration/kubeflow/v2/step_builder.py [0:0]


  def _build_container_spec(self) -> ContainerSpec:
    """Builds the container spec for a component.

    Returns:
      The PipelineContainerSpec represents the container execution of the
      component.

    Raises:
      NotImplementedError: When the executor class is neither ExecutorClassSpec
      nor TemplatedExecutorContainerSpec.
    """

    assert isinstance(self._node, base_component.BaseComponent)

    if self._node.platform_config:
      logging.info(
          'ResourceSpec with container execution parameters has been passed via platform_config'
      )
      assert isinstance(
          self._node.platform_config, pipeline_pb2.PipelineDeploymentConfig
          .PipelineContainerSpec.ResourceSpec
      ), ('platform_config, if set by the user, must be a ResourceSpec proto '
          'specifying vCPU and vRAM requirements')
      cpu_limit = self._node.platform_config.cpu_limit
      memory_limit = self._node.platform_config.memory_limit
      if cpu_limit:
        assert (cpu_limit >= 0), ('vCPU must be non-negative')
      if memory_limit:
        assert (memory_limit >= 0), ('vRAM must be non-negative')

      if self._node.platform_config.accelerator.type:
        assert (self._node.platform_config.accelerator.count >=
                0), ('GPU type and count must be set')

    if isinstance(self._node.executor_spec,
                  executor_specs.TemplatedExecutorContainerSpec):
      container_spec = self._node.executor_spec
      result = ContainerSpec(
          image=container_spec.image,
          command=_resolve_command_line(
              container_spec=container_spec,
              exec_properties=self._node.exec_properties,
          ))
      if self._node.platform_config:
        result.resources.CopyFrom(self._node.platform_config)
      return result

    # The container entrypoint format below assumes ExecutorClassSpec.
    if not isinstance(self._node.executor_spec,
                      executor_spec.ExecutorClassSpec):
      raise NotImplementedError(
          'Executor spec: % is not supported in Kubeflow V2 yet.'
          'Currently only ExecutorClassSpec is supported.')

    result = ContainerSpec()
    result.image = self._tfx_image
    if self._image_cmds:
      for cmd in self._image_cmds:
        result.command.append(cmd)
    executor_path = name_utils.get_full_name(
        self._node.executor_spec.executor_class)
    # Resolve container arguments.
    result.args.append('--executor_class_path')
    result.args.append(executor_path)
    result.args.append('--json_serialized_invocation_args')
    result.args.append('{{$}}')
    result.args.extend(self._beam_pipeline_args)

    if self._node.platform_config:
      result.resources.CopyFrom(self._node.platform_config)
    return result