def run()

in pythonflow/pfmq/task.py [0:0]


    def run(self):
        context = zmq.Context.instance()
        num_retries = 0
        identity = uuid.uuid4().bytes
        pending = collections.OrderedDict()
        requests = iter(enumerate(self.requests, 0))
        current_identifier = last_identifier = None
        next_identifier = 0
        cache = {}

        while True:
            with context.socket(zmq.PAIR) as cancel, context.socket(zmq.REQ) as socket:
                cancel.connect(self._cancel_address)
                socket.setsockopt(zmq.IDENTITY, identity)
                socket.connect(self.address)

                poller = zmq.Poller()
                poller.register(socket, zmq.POLLIN)
                poller.register(cancel, zmq.POLLIN)

                LOGGER.debug('connected %s to %s', identity, self.address)

                while True:
                    LOGGER.debug("%d pending messages", len(pending))
                    # See whether we can find a request that has timed out
                    identifier = message = None
                    for candidate, item in pending.items():
                        delta = time.time() - item['time']
                        if delta > self.timeout:
                            identifier = candidate
                            LOGGER.info('request with identifier %d timed out', identifier)
                            message = item['message']
                        # We only need to check the first message
                        break

                    # Get a new message
                    if identifier is None:
                        try:
                            identifier, request = next(requests)
                            num_bytes = max((identifier.bit_length() + 7) // 8, 1)
                            message = [
                                identifier.to_bytes(num_bytes, 'little'),
                                self.dumps(request)
                            ]
                            # Store the most recent identifier
                            current_identifier = identifier
                            LOGGER.debug('new request with identifier %d', identifier)
                        except StopIteration:
                            last_identifier = last_identifier or current_identifier
                            identifier = None
                            message = [b'']
                            LOGGER.debug('no more requests; waiting for responses')

                    socket.send_multipart(message)
                    LOGGER.debug('sent REQUEST with identifier %s: %s', identifier, message)

                    # Add to the list of pending requests
                    if identifier is not None:
                        pending[identifier] = {
                            'message': message,
                            'time': time.time()
                        }
                    del identifier

                    # Retrieve a response
                    LOGGER.debug('polling...')
                    sockets = dict(poller.poll(1000 * self.timeout))

                    # Communication timed out
                    if not sockets:
                        num_retries += 1
                        LOGGER.info('time out #%d for %s after %.3f seconds', num_retries,
                                    self.address, self.timeout)
                        if self.max_retries and num_retries >= self.max_retries:
                            # The communication failed
                            message = "maximum number of retries (%d) for %s exceeded" % \
                                (self.max_retries, self.address)
                            LOGGER.error(message)
                            self.results.put(('timeout', TimeoutError(message)))
                            return
                        break  # pragma: no cover

                    # Reset the retry counter
                    num_retries = 0

                    # Cancel the communication thread
                    if sockets.get(cancel) == zmq.POLLIN:
                        LOGGER.debug('received CANCEL signal on %s', self._cancel_address)
                        return

                    if sockets.get(socket) == zmq.POLLIN:
                        identifier, *response = socket.recv_multipart()
                        if not identifier:
                            LOGGER.debug('received dispatch notification')
                            continue

                        # Decode the identifier and remove the corresponding request from `pending`
                        identifier = int.from_bytes(identifier, 'little')
                        LOGGER.debug('received RESPONSE for identifier %d (next: %d, end: %s)',
                                     identifier, next_identifier, last_identifier)
                        pending.pop(identifier, None)

                        # Drop the message if it is outdated
                        if identifier < next_identifier:  # pragma: no cover
                            LOGGER.debug('dropped RESPONSE with identifier %d (next: %d)',
                                         identifier, next_identifier)
                            continue

                        # Add the message to the cache
                        cache[identifier] = response
                        while True:
                            try:
                                status, response = cache.pop(next_identifier)
                                status = self.STATUS[status]
                                self.results.put(
                                    (status, identifier if status == 'serialization_error' else
                                     self.loads(response))
                                )

                                if next_identifier == last_identifier:
                                    self.results.put(('end', None))
                                    return
                                next_identifier += 1
                            except KeyError:
                                break