in tfx/orchestration/experimental/kubernetes/kubernetes_remote_runner.py [0:0]
def run_as_kubernetes_job(pipeline: tfx_pipeline.Pipeline,
tfx_image: str) -> None:
"""Submits and runs a TFX pipeline from outside the cluster.
Args:
pipeline: Logical pipeline containing pipeline args and components.
tfx_image: Container image URI for the TFX container.
Raises:
RuntimeError: When an error is encountered running the Kubernetes Job.
"""
# TODO(ccy): Look for alternative serialization schemes once available.
serialized_pipeline = _serialize_pipeline(pipeline)
arguments = [
'--serialized_pipeline',
serialized_pipeline,
'--tfx_image',
tfx_image,
]
batch_api = kube_utils.make_batch_v1_api()
job_name = 'Job_' + pipeline.pipeline_info.run_id
pod_label = kube_utils.sanitize_pod_name(job_name)
container_name = 'pipeline-orchestrator'
job = kube_utils.make_job_object(
name=job_name,
container_image=tfx_image,
command=_ORCHESTRATOR_COMMAND + arguments,
container_name=container_name,
pod_labels={
'job-name': pod_label,
},
service_account_name=kube_utils.TFX_SERVICE_ACCOUNT,
)
try:
batch_api.create_namespaced_job('default', job, pretty=True)
except client.rest.ApiException as e:
raise RuntimeError('Failed to submit job! \nReason: %s\nBody: %s' %
(e.reason, e.body))
# Wait for pod to start.
orchestrator_pods = []
core_api = kube_utils.make_core_v1_api()
start_time = datetime.datetime.utcnow()
# Wait for the kubernetes job to launch a pod.
while not orchestrator_pods and (datetime.datetime.utcnow() -
start_time).seconds < JOB_CREATION_TIMEOUT:
try:
orchestrator_pods = core_api.list_namespaced_pod(
namespace='default',
label_selector='job-name={}'.format(pod_label)).items
except client.rest.ApiException as e:
if e.status != 404:
raise RuntimeError('Unknown error! \nReason: %s\nBody: %s' %
(e.reason, e.body))
time.sleep(1)
# Transient orchestrator should only have 1 pod.
if len(orchestrator_pods) != 1:
raise RuntimeError('Expected 1 pod launched by Kubernetes job, found %d' %
len(orchestrator_pods))
orchestrator_pod = orchestrator_pods.pop()
pod_name = orchestrator_pod.metadata.name
logging.info('Waiting for pod "default:%s" to start.', pod_name)
kube_utils.wait_pod(
core_api,
pod_name,
'default',
exit_condition_lambda=kube_utils.pod_is_not_pending,
condition_description='non-pending status')
# Stream logs from orchestrator pod.
logging.info('Start log streaming for pod "default:%s".', pod_name)
try:
logs = core_api.read_namespaced_pod_log(
name=pod_name,
namespace='default',
container=container_name,
follow=True,
_preload_content=False).stream()
except client.rest.ApiException as e:
raise RuntimeError(
'Failed to stream the logs from the pod!\nReason: %s\nBody: %s' %
(e.reason, e.body))
for log in logs:
logging.info(log.decode().rstrip('\n'))
resp = kube_utils.wait_pod(
core_api,
pod_name,
'default',
exit_condition_lambda=kube_utils.pod_is_done,
condition_description='done state',
exponential_backoff=True)
if resp.status.phase == kube_utils.PodPhase.FAILED.value:
raise RuntimeError('Pod "default:%s" failed with status "%s".' %
(pod_name, resp.status))