def _setup_on_fle_notification()

in zktraffic/omni/omni_sniffer.py [0:0]


  def _setup_on_fle_notification(self, packet, message):
    """
    parse config and setup self._sniffers on FLE.Notification
    :param packet:
    :param message:
    """
    assert isinstance(message, FLE.Notification)
    src, dst = self._parse_packet_src_dst(packet)
    config = QuorumConfig(message.config)
    servers = [x for x in config.entries if isinstance(x, QuorumConfig.Server)]
    for server in servers:
      zab_fle_ip = server.zab_fle_hostname
      if zab_fle_ip == 'localhost':
        assert src[0] == '127.0.0.1', 'foreign localhost(src %s != 127.0.0.1)' % src[0]
        assert dst[0] == '127.0.0.1', 'foreign localhost(dst %s != 127.0.0.1)' % dst[0]
        zab_fle_ip = '127.0.0.1'

      zk_ip = server.zk_hostname
      if zk_ip in ('localhost', '0.0.0.0'):
        zk_ip = zab_fle_ip

      try:
        # check whether correct ip string
        socket.inet_aton(zab_fle_ip)
        socket.inet_aton(zk_ip)
      except Exception as e:
        raise BadPacket(e)

      self._regist_sniffer(zab_fle_ip, server.fle_port, 'fle')
      self._regist_sniffer(zab_fle_ip, server.zab_port, 'zab')
      self._regist_sniffer(zk_ip, server.zk_port, 'zk')