in zktraffic/network/sniffer.py [0:0]
def message_from_packet(self, packet):
"""
:returns: Returns an instance of Message
:raises:
:exc:`BadPacket` if the packet is of an unknown type
:exc:`DeserializationError` if deserialization failed
:exc:`struct.error` if deserialization failed
"""
ip_p = get_ip_packet(packet.load, 0, self._port, self._is_loopback)
if 0 == len(ip_p.data.data):
return None
if ip_p.data.sport != self._port and ip_p.data.dport != self._port:
raise BadPacket("Wrong port")
return self._msg_cls.from_payload(
ip_p.data.data,
intern("%s:%s" % (get_ip(ip_p, ip_p.src), ip_p.data.sport)),
intern("%s:%s" % (get_ip(ip_p, ip_p.dst), ip_p.data.dport)),
packet.time
)