in zktraffic/cli/omni.py [0:0]
def main(_, options):
if options.version:
sys.stdout.write("%s\n" % __version__)
sys.exit(0)
printer = Printer(options.colors,
output=sys.stdout,
skip_print=None if options.include_pings else lambda msg: isinstance(msg, ZAB.Ping))
zk_printer = ZKDefaultPrinter(options.colors, loopback=False, output=sys.stdout)
zk_printer.start()
def fle_sniffer_factory(port):
return Sniffer(None, port, FLE.Message, printer.add, options.dump_bad_packet, start=False)
def zab_sniffer_factory(port):
return Sniffer(None, port, ZAB.QuorumPacket, printer.add, options.dump_bad_packet, start=False)
def zk_sniffer_factory(port):
config = ZKSnifferConfig(None)
config.track_replies = True
config.zookeeper_port = port
config.client_port = 0
if options.include_pings:
config.include_pings()
return ZKSniffer(
config,
zk_printer.request_handler,
zk_printer.reply_handler,
zk_printer.event_handler,
error_to_stderr=True
)
if not options.offline:
sniffer = OmniSniffer(
fle_sniffer_factory,
zab_sniffer_factory,
zk_sniffer_factory,
pfilter=options.packet_filter,
dump_bad_packet=options.dump_bad_packet)
else:
sniffer = OmniSniffer(
fle_sniffer_factory,
zab_sniffer_factory,
zk_sniffer_factory,
pfilter=options.packet_filter,
dump_bad_packet=options.dump_bad_packet,
start=False)
sniffer.run(offline=options.offline)
try:
while (printer.isAlive() or zk_printer.isAlive()) and not options.offline:
sniffer.join(1)
except (KeyboardInterrupt, SystemExit):
pass
# consume all messages
while not printer.empty or not zk_printer.empty:
time.sleep(0.0001)
# stop it
printer.stop()
zk_printer.stop()
while not printer.stopped or not zk_printer.stopped:
time.sleep(0.0001)