def _run()

in tfx/orchestration/experimental/centralized_kubernetes_orchestrator/main.py [0:0]


def _run() -> None:
  """Runs the main orchestration loop."""
  with contextlib.ExitStack() as stack:
    stack.enter_context(event_observer.init())

    mlmd_handle = stack.enter_context(_create_mlmd_connection())
    orchestrator_servicer = kubernetes_orchestrator_service.KubernetesOrchestratorServicer(
        mlmd_handle)

    server = _start_grpc_server(orchestrator_servicer)
    stack.callback(server.stop, grace=None)

    task_queue = tq.TaskQueue()

    service_job_manager = service_jobs.DummyServiceJobManager()
    task_manager = stack.enter_context(
        tm.TaskManager(
            mlmd_handle,
            task_queue,
            max_active_task_schedulers=_MAX_ACTIVE_TASK_SCHEDULERS_FLAG.value))
    last_active = time.time()

    iteration = 0
    while not _INACTIVITY_TTL_SECS_FLAG.value or time.time(
    ) - last_active <= _INACTIVITY_TTL_SECS_FLAG.value:
      try:
        iteration += 1
        logging.info('Orchestration loop: iteration #%d (since process start).',
                     iteration)
        event_observer.check_active()

        # Last pipeline state change time is useful to decide if wait period
        # between iterations can be short-circuited.
        last_state_change_time_secs = (
            pipeline_state.last_state_change_time_secs())

        if pipeline_ops.orchestrate(mlmd_handle, task_queue,
                                    service_job_manager):
          last_active = time.time()

        time_budget = _DEFAULT_POLLING_INTERVAL_SECS_FLAG.value
        logging.info(
            'Orchestration loop: waiting %s seconds before next iteration.',
            time_budget)
        while time_budget > 0.0:
          # Task manager should never be "done" unless there was an error.
          if task_manager.done():
            if task_manager.exception():
              raise task_manager.exception()
            else:
              raise RuntimeError(
                  'Task manager unexpectedly stalled due to an internal error.')

          # Short-circuit if state change is detected.
          if (pipeline_state.last_state_change_time_secs() >
              last_state_change_time_secs):
            last_state_change_time_secs = (
                pipeline_state.last_state_change_time_secs())
            logging.info(
                'Orchestration loop: detected state change, exiting wait period '
                'early (with %s of %s seconds remaining).', time_budget,
                _DEFAULT_POLLING_INTERVAL_SECS_FLAG.value)
            break

          time_budget = _sleep_tick_duration_secs(time_budget)
      except Exception:  # pylint: disable=broad-except
        logging.exception('Exception in main orchestration loop!')
        raise

    logging.info('Exiting due to no pipeline run in %s seconds',
                 _INACTIVITY_TTL_SECS_FLAG.value)