in pythonflow/pfmq/worker.py [0:0]
def run(self):
context = zmq.Context.instance()
# Use a specific identity for the worker such that reconnects don't change it.
identity = uuid.uuid4().bytes
num_retries = 0
while self.max_retries is None or num_retries < self.max_retries:
with context.socket(zmq.REQ) as socket, context.socket(zmq.PAIR) as cancel:
cancel.connect(self._cancel_address)
socket.setsockopt(zmq.IDENTITY, identity)
socket.connect(self.address)
LOGGER.debug('connected to %s', self.address)
poller = zmq.Poller()
poller.register(cancel, zmq.POLLIN)
poller.register(socket, zmq.POLLIN)
socket.send_multipart([b''])
LOGGER.debug('sent sign-up message')
while True:
LOGGER.debug('polling...')
sockets = dict(poller.poll(1000 * self.timeout))
if not sockets:
num_retries += 1
LOGGER.info('time out #%d for %s after %.3f seconds', num_retries,
self.address, self.timeout)
break
num_retries = 0
# Cancel the communication thread
if sockets.get(cancel) == zmq.POLLIN:
LOGGER.debug('received cancel signal on %s', self._cancel_address)
return
# Process messages
if sockets.get(socket) == zmq.POLLIN:
client, _, identifier, *request = socket.recv_multipart()
LOGGER.debug('received REQUEST with identifier %d from %s',
int.from_bytes(identifier, 'little'), client.hex())
try:
response = self.target(self.loads(*request))
status = self.STATUS['ok']
except Exception:
etype, value, tb = sys.exc_info()
response = value, "".join(traceback.format_exception(etype, value, tb))
status = self.STATUS['error']
LOGGER.exception("failed to process REQUEST with identifier %d from %s",
int.from_bytes(identifier, 'little'), client.hex())
try:
response = self.dumps(response)
except Exception:
LOGGER.exception(
"failed to serialise RESPONSE with identifier %d for %s",
int.from_bytes(identifier, 'little'), client.hex()
)
response = b""
status = self.STATUS['serialization_error']
socket.send_multipart([client, b'', identifier, status, response])
LOGGER.debug(
'sent RESPONSE with identifier %s to %s with status %s',
int.from_bytes(identifier, 'little'), client.hex(), self.STATUS[status]
)
LOGGER.error("maximum number of retries (%d) for %s exceeded", self.max_retries,
self.address)