zktraffic/fle/message.py (124 lines of code) (raw):
# ==================================================================================================
# Copyright 2015 Twitter, Inc.
# --------------------------------------------------------------------------------------------------
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this work except in compliance with the License.
# You may obtain a copy of the License in the LICENSE file, or at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================================
from datetime import datetime
from zktraffic.base.network import BadPacket
from zktraffic.base.util import read_long, read_number, read_string
class PeerState(object):
LOOKING = 0
FOLLOWING = 1
LEADING = 2
OBSERVING = 3
STATES = [LOOKING, FOLLOWING, LEADING, OBSERVING]
NAMES = [
"looking",
"following",
"leading",
"observing",
]
@classmethod
def invalid(cls, state):
return state not in cls.STATES
@classmethod
def to_str(cls, state):
return "" if cls.invalid(state) else cls.NAMES[state]
class Message(object):
PROTO_VER = -65536
OLD_LEN = 28
WITH_VERSION_LEN = 36
__slots__ = ()
@classmethod
def from_payload(cls, data, src, dst, timestamp):
if len(data) < 16:
raise BadPacket("Too small")
proto, offset = read_long(data, 0)
if proto == cls.PROTO_VER:
server_id, offset = read_long(data, offset)
election_addr, _ = read_string(data, offset)
return Initial(timestamp, src, dst, server_id, election_addr)
if len(data) >= cls.OLD_LEN:
state, offset = read_number(data, 0)
if PeerState.invalid(state):
raise BadPacket("Invalid state: %d" % state)
leader, offset = read_long(data, offset)
zxid, offset = read_long(data, offset)
election_epoch, offset = read_long(data, offset)
peer_epoch, offset = read_long(data, offset) if len(data) > cls.OLD_LEN else (-1, offset)
version = 0
config = ""
if len(data) > cls.WITH_VERSION_LEN:
version, offset = read_number(data, offset)
if version == 2:
config, _ = read_string(data, offset)
return Notification(
timestamp,
src,
dst,
state,
leader,
zxid,
election_epoch,
peer_epoch,
version,
config
)
raise BadPacket("Unknown unknown")
@property
def timestr(self):
return datetime.fromtimestamp(self.timestamp).strftime("%H:%M:%S:%f")
class Initial(Message):
__slots__ = ("timestamp", "src", "dst", "server_id", "election_addr")
def __init__(self, timestamp, src, dst, server_id, election_addr):
self.timestamp = timestamp
self.src = src
self.dst = dst
self.server_id = server_id
self.election_addr = election_addr
def __str__(self):
return "%s(\n%s=%s,\n%s=%s,\n%s=%s,\n%s=%s,\n%s=%s\n)\n" % (
"Initial",
" " * 5 + "timestamp", self.timestr,
" " * 5 + "src", self.src,
" " * 5 + "dst", self.dst,
" " * 5 + "server_id", self.server_id,
" " * 5 + "election_addr", self.election_addr
)
class Notification(Message):
__slots__ = (
"timestamp",
"src",
"dst",
"state",
"leader",
"zxid",
"election_epoch",
"peer_epoch",
"version",
"config"
)
def __init__(self, timestamp, src, dst, state, leader, zxid, election_epoch, peer_epoch, version, config):
self.timestamp = timestamp
self.src = src
self.dst = dst
self.state = state
self.leader = leader
self.zxid = zxid
self.election_epoch = election_epoch
self.peer_epoch = peer_epoch
self.version = version
self.config = config
@property
def state_literal(self):
return PeerState.to_str(self.state)
def __str__(self):
config = [" " * 10 + cline for cline in self.config.split("\n")]
return "%s(\n%s=%s,\n%s=%s,\n%s=%s,\n%s=%s,\n%s=%s,\n%s=%s,\n%s=%s,\n%s=%s,\n%s=%s,\n%s=\n%s\n)\n" % (
"Notification",
" " * 5 + "timestamp", self.timestr,
" " * 5 + "src", self.src,
" " * 5 + "dst", self.dst,
" " * 5 + "state", self.state_literal,
" " * 5 + "leader", self.leader,
" " * 5 + "zxid", self.zxid,
" " * 5 + "election_epoch", self.election_epoch,
" " * 5 + "peer_epoch", self.peer_epoch,
" " * 5 + "version", self.version,
" " * 5 + "config", "\n".join(config),
)