def run()

in pythonflow/pfmq/worker.py [0:0]


    def run(self):
        context = zmq.Context.instance()
        # Use a specific identity for the worker such that reconnects don't change it.
        identity = uuid.uuid4().bytes

        num_retries = 0
        while self.max_retries is None or num_retries < self.max_retries:
            with context.socket(zmq.REQ) as socket, context.socket(zmq.PAIR) as cancel:
                cancel.connect(self._cancel_address)
                socket.setsockopt(zmq.IDENTITY, identity)
                socket.connect(self.address)
                LOGGER.debug('connected to %s', self.address)

                poller = zmq.Poller()
                poller.register(cancel, zmq.POLLIN)
                poller.register(socket, zmq.POLLIN)

                socket.send_multipart([b''])
                LOGGER.debug('sent sign-up message')

                while True:
                    LOGGER.debug('polling...')
                    sockets = dict(poller.poll(1000 * self.timeout))

                    if not sockets:
                        num_retries += 1
                        LOGGER.info('time out #%d for %s after %.3f seconds', num_retries,
                                    self.address, self.timeout)
                        break

                    num_retries = 0

                    # Cancel the communication thread
                    if sockets.get(cancel) == zmq.POLLIN:
                        LOGGER.debug('received cancel signal on %s', self._cancel_address)
                        return

                    # Process messages
                    if sockets.get(socket) == zmq.POLLIN:
                        client, _, identifier, *request = socket.recv_multipart()
                        LOGGER.debug('received REQUEST with identifier %d from %s',
                                     int.from_bytes(identifier, 'little'), client.hex())

                        try:
                            response = self.target(self.loads(*request))
                            status = self.STATUS['ok']
                        except Exception:
                            etype, value, tb = sys.exc_info()
                            response = value, "".join(traceback.format_exception(etype, value, tb))
                            status = self.STATUS['error']
                            LOGGER.exception("failed to process REQUEST with identifier %d from %s",
                                             int.from_bytes(identifier, 'little'), client.hex())

                        try:
                            response = self.dumps(response)
                        except Exception:
                            LOGGER.exception(
                                "failed to serialise RESPONSE with identifier %d for %s",
                                int.from_bytes(identifier, 'little'), client.hex()
                            )
                            response = b""
                            status = self.STATUS['serialization_error']

                        socket.send_multipart([client, b'', identifier, status, response])
                        LOGGER.debug(
                            'sent RESPONSE with identifier %s to %s with status %s',
                            int.from_bytes(identifier, 'little'), client.hex(), self.STATUS[status]
                        )

        LOGGER.error("maximum number of retries (%d) for %s exceeded", self.max_retries,
                     self.address)