in tfx/orchestration/kubeflow/v2/step_builder.py [0:0]
def _build_container_spec(self) -> ContainerSpec:
"""Builds the container spec for a component.
Returns:
The PipelineContainerSpec represents the container execution of the
component.
Raises:
NotImplementedError: When the executor class is neither ExecutorClassSpec
nor TemplatedExecutorContainerSpec.
"""
assert isinstance(self._node, base_component.BaseComponent)
if self._node.platform_config:
logging.info(
'ResourceSpec with container execution parameters has been passed via platform_config'
)
assert isinstance(
self._node.platform_config, pipeline_pb2.PipelineDeploymentConfig
.PipelineContainerSpec.ResourceSpec
), ('platform_config, if set by the user, must be a ResourceSpec proto '
'specifying vCPU and vRAM requirements')
cpu_limit = self._node.platform_config.cpu_limit
memory_limit = self._node.platform_config.memory_limit
if cpu_limit:
assert (cpu_limit >= 0), ('vCPU must be non-negative')
if memory_limit:
assert (memory_limit >= 0), ('vRAM must be non-negative')
if self._node.platform_config.accelerator.type:
assert (self._node.platform_config.accelerator.count >=
0), ('GPU type and count must be set')
if isinstance(self._node.executor_spec,
executor_specs.TemplatedExecutorContainerSpec):
container_spec = self._node.executor_spec
result = ContainerSpec(
image=container_spec.image,
command=_resolve_command_line(
container_spec=container_spec,
exec_properties=self._node.exec_properties,
))
if self._node.platform_config:
result.resources.CopyFrom(self._node.platform_config)
return result
# The container entrypoint format below assumes ExecutorClassSpec.
if not isinstance(self._node.executor_spec,
executor_spec.ExecutorClassSpec):
raise NotImplementedError(
'Executor spec: % is not supported in Kubeflow V2 yet.'
'Currently only ExecutorClassSpec is supported.')
result = ContainerSpec()
result.image = self._tfx_image
if self._image_cmds:
for cmd in self._image_cmds:
result.command.append(cmd)
executor_path = name_utils.get_full_name(
self._node.executor_spec.executor_class)
# Resolve container arguments.
result.args.append('--executor_class_path')
result.args.append(executor_path)
result.args.append('--json_serialized_invocation_args')
result.args.append('{{$}}')
result.args.extend(self._beam_pipeline_args)
if self._node.platform_config:
result.resources.CopyFrom(self._node.platform_config)
return result