def run()

in pythonflow/pfmq/broker.py [0:0]


    def run(self):
        context = zmq.Context.instance()
        workers = set()
        clients = set()
        cache = {}

        with context.socket(zmq.ROUTER) as frontend, context.socket(zmq.ROUTER) as backend, \
                context.socket(zmq.PAIR) as cancel:

            cancel.connect(self._cancel_address)
            frontend.bind(self.frontend_address)
            LOGGER.debug('bound frontend to %s', self.frontend_address)
            backend.bind(self.backend_address)
            LOGGER.debug('bound backend to %s', self.backend_address)

            backend_poller = zmq.Poller()
            backend_poller.register(backend, zmq.POLLIN)
            backend_poller.register(cancel, zmq.POLLIN)

            poller = zmq.Poller()
            poller.register(frontend, zmq.POLLIN)
            poller.register(backend, zmq.POLLIN)
            poller.register(cancel, zmq.POLLIN)

            while True:
                # Only listen to the backend if no workers are available
                if workers:
                    LOGGER.debug('polling frontend and backend...')
                    sockets = dict(poller.poll())
                else:
                    LOGGER.debug('polling backend...')
                    sockets = dict(backend_poller.poll())

                # Cancel the communication thread
                if sockets.get(cancel) == zmq.POLLIN:
                    LOGGER.debug('received CANCEL signal on %s', self._cancel_address)
                    break

                # Receive responses or sign-up messages from the backend
                if sockets.get(backend) == zmq.POLLIN:
                    worker, _, client, *message = backend.recv_multipart()
                    workers.add(worker)

                    if client:
                        _, identifier, status, response = message
                        LOGGER.debug(
                            'received RESPONSE with identifier %s from %s for %s with status %s',
                            int.from_bytes(identifier, 'little'), worker.hex(), client.hex(),
                            self.STATUS[status]
                        )
                        # Try to forward the message to a waiting client
                        try:
                            clients.remove(client)
                            self._forward_response(frontend, client, identifier, status, response)
                        # Add it to the cache otherwise
                        except KeyError:
                            cache.setdefault(client, []).append((identifier, status, response))
                    else:
                        LOGGER.debug('received SIGN-UP message from %s; now %d workers',
                                     worker.hex(), len(workers))

                # Receive requests from the frontend, forward to the workers, and return responses
                if sockets.get(frontend) == zmq.POLLIN:
                    client, _, identifier, *request = frontend.recv_multipart()
                    LOGGER.debug('received REQUEST with byte identifier %s from %s',
                                 identifier, client.hex())

                    if identifier:
                        worker = workers.pop()
                        backend.send_multipart([worker, _, client, _, identifier, *request])
                        LOGGER.debug('forwarded REQUEST with identifier %s from %s to %s',
                                     int.from_bytes(identifier, 'little'), client.hex(),
                                     worker.hex())

                    try:
                        self._forward_response(frontend, client, *cache[client].pop(0))
                    except (KeyError, IndexError):
                        # Send a dispatch notification if the task sent a new message
                        if identifier:
                            frontend.send_multipart([client, _, _])
                            LOGGER.debug('notified %s of REQUEST dispatch', client.hex())
                        # Add the task to the list of tasks waiting for responses otherwise
                        else:
                            clients.add(client)

        LOGGER.debug('exiting communication loop')