def run_as_kubernetes_job()

in tfx/orchestration/experimental/kubernetes/kubernetes_remote_runner.py [0:0]


def run_as_kubernetes_job(pipeline: tfx_pipeline.Pipeline,
                          tfx_image: str) -> None:
  """Submits and runs a TFX pipeline from outside the cluster.

  Args:
    pipeline: Logical pipeline containing pipeline args and components.
    tfx_image: Container image URI for the TFX container.

  Raises:
    RuntimeError: When an error is encountered running the Kubernetes Job.
  """

  # TODO(ccy): Look for alternative serialization schemes once available.
  serialized_pipeline = _serialize_pipeline(pipeline)
  arguments = [
      '--serialized_pipeline',
      serialized_pipeline,
      '--tfx_image',
      tfx_image,
  ]
  batch_api = kube_utils.make_batch_v1_api()
  job_name = 'Job_' + pipeline.pipeline_info.run_id
  pod_label = kube_utils.sanitize_pod_name(job_name)
  container_name = 'pipeline-orchestrator'
  job = kube_utils.make_job_object(
      name=job_name,
      container_image=tfx_image,
      command=_ORCHESTRATOR_COMMAND + arguments,
      container_name=container_name,
      pod_labels={
          'job-name': pod_label,
      },
      service_account_name=kube_utils.TFX_SERVICE_ACCOUNT,
  )
  try:
    batch_api.create_namespaced_job('default', job, pretty=True)
  except client.rest.ApiException as e:
    raise RuntimeError('Failed to submit job! \nReason: %s\nBody: %s' %
                       (e.reason, e.body))

  # Wait for pod to start.
  orchestrator_pods = []
  core_api = kube_utils.make_core_v1_api()
  start_time = datetime.datetime.utcnow()

  # Wait for the kubernetes job to launch a pod.
  while not orchestrator_pods and (datetime.datetime.utcnow() -
                                   start_time).seconds < JOB_CREATION_TIMEOUT:
    try:
      orchestrator_pods = core_api.list_namespaced_pod(
          namespace='default',
          label_selector='job-name={}'.format(pod_label)).items
    except client.rest.ApiException as e:
      if e.status != 404:
        raise RuntimeError('Unknown error! \nReason: %s\nBody: %s' %
                           (e.reason, e.body))
    time.sleep(1)

  # Transient orchestrator should only have 1 pod.
  if len(orchestrator_pods) != 1:
    raise RuntimeError('Expected 1 pod launched by Kubernetes job, found %d' %
                       len(orchestrator_pods))
  orchestrator_pod = orchestrator_pods.pop()
  pod_name = orchestrator_pod.metadata.name

  logging.info('Waiting for pod "default:%s" to start.', pod_name)
  kube_utils.wait_pod(
      core_api,
      pod_name,
      'default',
      exit_condition_lambda=kube_utils.pod_is_not_pending,
      condition_description='non-pending status')

  # Stream logs from orchestrator pod.
  logging.info('Start log streaming for pod "default:%s".', pod_name)
  try:
    logs = core_api.read_namespaced_pod_log(
        name=pod_name,
        namespace='default',
        container=container_name,
        follow=True,
        _preload_content=False).stream()
  except client.rest.ApiException as e:
    raise RuntimeError(
        'Failed to stream the logs from the pod!\nReason: %s\nBody: %s' %
        (e.reason, e.body))

  for log in logs:
    logging.info(log.decode().rstrip('\n'))

  resp = kube_utils.wait_pod(
      core_api,
      pod_name,
      'default',
      exit_condition_lambda=kube_utils.pod_is_done,
      condition_description='done state',
      exponential_backoff=True)

  if resp.status.phase == kube_utils.PodPhase.FAILED.value:
    raise RuntimeError('Pod "default:%s" failed with status "%s".' %
                       (pod_name, resp.status))