def message_from_packet()

in zktraffic/network/sniffer.py [0:0]


  def message_from_packet(self, packet):
    """
    :returns: Returns an instance of Message
    :raises:
      :exc:`BadPacket` if the packet is of an unknown type
      :exc:`DeserializationError` if deserialization failed
      :exc:`struct.error` if deserialization failed
    """
    ip_p = get_ip_packet(packet.load, 0, self._port, self._is_loopback)
    if 0 == len(ip_p.data.data):
      return None

    if ip_p.data.sport != self._port and ip_p.data.dport != self._port:
      raise BadPacket("Wrong port")

    return self._msg_cls.from_payload(
      ip_p.data.data,
      intern("%s:%s" % (get_ip(ip_p, ip_p.src), ip_p.data.sport)),
      intern("%s:%s" % (get_ip(ip_p, ip_p.dst), ip_p.data.dport)),
      packet.time
    )