def _check_packet()

in zktraffic/omni/omni_sniffer.py [0:0]


  def _check_packet(self, packet):
    """
    check tcp seq duplicates
    NOTE: TX/RX duplicate happens on loopback interfaces
    :param packet:
    :return: None
    """
    src, dst = self._parse_packet_src_dst(packet)
    tcp = get_ip_packet(packet.load).data
    if tcp.flags & dpkt.tcp.TH_RST:
      if (src, dst) in self._last_tcp_seq:
        del self._last_tcp_seq[(src, dst)]
    else:
      if not tcp.data: raise BadPacket("no payload")
      if (src, dst) in self._last_tcp_seq:
        last_seq = self._last_tcp_seq[(src, dst)]
        if tcp.seq <= last_seq:
          # this exception eliminates dups
          raise BadPacket("This sequence(%d<=%d) seen before" % (tcp.seq, last_seq))
      self._last_tcp_seq[(src, dst)] = tcp.seq