in zktraffic/omni/omni_sniffer.py [0:0]
def __init__(self,
fle_sniffer_factory,
zab_sniffer_factory,
zk_sniffer_factory,
pfilter="tcp",
dump_bad_packet=False,
start=True):
super(OmniSniffer, self).__init__()
self.setDaemon(True)
self.fle_sniffer_factory = fle_sniffer_factory
self.zab_sniffer_factory = zab_sniffer_factory
self.zk_sniffer_factory = zk_sniffer_factory
self._sniffers = {} # dict[(str,int), SnifferBase]
self._pfilter = pfilter
self._dump_bad_packet = dump_bad_packet
self._last_tcp_seq = {} # dict[((str,int),(str,int)), int]
if start: # pragma: no cover
self.start()