in zktraffic/omni/omni_sniffer.py [0:0]
def _setup_on_fle_notification(self, packet, message):
"""
parse config and setup self._sniffers on FLE.Notification
:param packet:
:param message:
"""
assert isinstance(message, FLE.Notification)
src, dst = self._parse_packet_src_dst(packet)
config = QuorumConfig(message.config)
servers = [x for x in config.entries if isinstance(x, QuorumConfig.Server)]
for server in servers:
zab_fle_ip = server.zab_fle_hostname
if zab_fle_ip == 'localhost':
assert src[0] == '127.0.0.1', 'foreign localhost(src %s != 127.0.0.1)' % src[0]
assert dst[0] == '127.0.0.1', 'foreign localhost(dst %s != 127.0.0.1)' % dst[0]
zab_fle_ip = '127.0.0.1'
zk_ip = server.zk_hostname
if zk_ip in ('localhost', '0.0.0.0'):
zk_ip = zab_fle_ip
try:
# check whether correct ip string
socket.inet_aton(zab_fle_ip)
socket.inet_aton(zk_ip)
except Exception as e:
raise BadPacket(e)
self._regist_sniffer(zab_fle_ip, server.fle_port, 'fle')
self._regist_sniffer(zab_fle_ip, server.zab_port, 'zab')
self._regist_sniffer(zk_ip, server.zk_port, 'zk')