in zktraffic/omni/omni_sniffer.py [0:0]
def _check_packet(self, packet):
"""
check tcp seq duplicates
NOTE: TX/RX duplicate happens on loopback interfaces
:param packet:
:return: None
"""
src, dst = self._parse_packet_src_dst(packet)
tcp = get_ip_packet(packet.load).data
if tcp.flags & dpkt.tcp.TH_RST:
if (src, dst) in self._last_tcp_seq:
del self._last_tcp_seq[(src, dst)]
else:
if not tcp.data: raise BadPacket("no payload")
if (src, dst) in self._last_tcp_seq:
last_seq = self._last_tcp_seq[(src, dst)]
if tcp.seq <= last_seq:
# this exception eliminates dups
raise BadPacket("This sequence(%d<=%d) seen before" % (tcp.seq, last_seq))
self._last_tcp_seq[(src, dst)] = tcp.seq