in reader/dds.py [0:0]
def maybe_start_dataset_service():
if not env.has_readers():
return
if packaging.version.parse(tf.__version__) < packaging.version.parse("2.5"):
raise Exception(f"maybe_distribute_dataset requires TF >= 2.5; got {tf.__version__}")
if env.is_dispatcher():
logging.info(f"env.get_reader_port() = {env.get_reader_port()}")
logging.info(f"env.get_dds_journaling_dir() = {env.get_dds_journaling_dir()}")
work_dir = env.get_dds_journaling_dir()
server = tf.data.experimental.service.DispatchServer(
tf.data.experimental.service.DispatcherConfig(
port=env.get_reader_port(),
protocol="grpc",
work_dir=work_dir,
fault_tolerant_mode=bool(work_dir),
)
)
server.join()
elif env.is_reader():
logging.info(f"env.get_reader_port() = {env.get_reader_port()}")
logging.info(f"env.get_dds_dispatcher_address() = {env.get_dds_dispatcher_address()}")
logging.info(f"env.get_dds_worker_address() = {env.get_dds_worker_address()}")
server = tf.data.experimental.service.WorkerServer(
tf.data.experimental.service.WorkerConfig(
port=env.get_reader_port(),
dispatcher_address=env.get_dds_dispatcher_address(),
worker_address=env.get_dds_worker_address(),
protocol="grpc",
)
)
server.join()