in tfx/orchestration/experimental/centralized_kubernetes_orchestrator/main.py [0:0]
def _run() -> None:
"""Runs the main orchestration loop."""
with contextlib.ExitStack() as stack:
stack.enter_context(event_observer.init())
mlmd_handle = stack.enter_context(_create_mlmd_connection())
orchestrator_servicer = kubernetes_orchestrator_service.KubernetesOrchestratorServicer(
mlmd_handle)
server = _start_grpc_server(orchestrator_servicer)
stack.callback(server.stop, grace=None)
task_queue = tq.TaskQueue()
service_job_manager = service_jobs.DummyServiceJobManager()
task_manager = stack.enter_context(
tm.TaskManager(
mlmd_handle,
task_queue,
max_active_task_schedulers=_MAX_ACTIVE_TASK_SCHEDULERS_FLAG.value))
last_active = time.time()
iteration = 0
while not _INACTIVITY_TTL_SECS_FLAG.value or time.time(
) - last_active <= _INACTIVITY_TTL_SECS_FLAG.value:
try:
iteration += 1
logging.info('Orchestration loop: iteration #%d (since process start).',
iteration)
event_observer.check_active()
# Last pipeline state change time is useful to decide if wait period
# between iterations can be short-circuited.
last_state_change_time_secs = (
pipeline_state.last_state_change_time_secs())
if pipeline_ops.orchestrate(mlmd_handle, task_queue,
service_job_manager):
last_active = time.time()
time_budget = _DEFAULT_POLLING_INTERVAL_SECS_FLAG.value
logging.info(
'Orchestration loop: waiting %s seconds before next iteration.',
time_budget)
while time_budget > 0.0:
# Task manager should never be "done" unless there was an error.
if task_manager.done():
if task_manager.exception():
raise task_manager.exception()
else:
raise RuntimeError(
'Task manager unexpectedly stalled due to an internal error.')
# Short-circuit if state change is detected.
if (pipeline_state.last_state_change_time_secs() >
last_state_change_time_secs):
last_state_change_time_secs = (
pipeline_state.last_state_change_time_secs())
logging.info(
'Orchestration loop: detected state change, exiting wait period '
'early (with %s of %s seconds remaining).', time_budget,
_DEFAULT_POLLING_INTERVAL_SECS_FLAG.value)
break
time_budget = _sleep_tick_duration_secs(time_budget)
except Exception: # pylint: disable=broad-except
logging.exception('Exception in main orchestration loop!')
raise
logging.info('Exiting due to no pipeline run in %s seconds',
_INACTIVITY_TTL_SECS_FLAG.value)