def maybe_start_dataset_service()

in reader/dds.py [0:0]


def maybe_start_dataset_service():
  if not env.has_readers():
    return

  if packaging.version.parse(tf.__version__) < packaging.version.parse("2.5"):
    raise Exception(f"maybe_distribute_dataset requires TF >= 2.5; got {tf.__version__}")

  if env.is_dispatcher():
    logging.info(f"env.get_reader_port() = {env.get_reader_port()}")
    logging.info(f"env.get_dds_journaling_dir() = {env.get_dds_journaling_dir()}")
    work_dir = env.get_dds_journaling_dir()
    server = tf.data.experimental.service.DispatchServer(
      tf.data.experimental.service.DispatcherConfig(
        port=env.get_reader_port(),
        protocol="grpc",
        work_dir=work_dir,
        fault_tolerant_mode=bool(work_dir),
      )
    )
    server.join()

  elif env.is_reader():
    logging.info(f"env.get_reader_port() = {env.get_reader_port()}")
    logging.info(f"env.get_dds_dispatcher_address() = {env.get_dds_dispatcher_address()}")
    logging.info(f"env.get_dds_worker_address() = {env.get_dds_worker_address()}")
    server = tf.data.experimental.service.WorkerServer(
      tf.data.experimental.service.WorkerConfig(
        port=env.get_reader_port(),
        dispatcher_address=env.get_dds_dispatcher_address(),
        worker_address=env.get_dds_worker_address(),
        protocol="grpc",
      )
    )
    server.join()