in pythonflow/pfmq/task.py [0:0]
def run(self):
context = zmq.Context.instance()
num_retries = 0
identity = uuid.uuid4().bytes
pending = collections.OrderedDict()
requests = iter(enumerate(self.requests, 0))
current_identifier = last_identifier = None
next_identifier = 0
cache = {}
while True:
with context.socket(zmq.PAIR) as cancel, context.socket(zmq.REQ) as socket:
cancel.connect(self._cancel_address)
socket.setsockopt(zmq.IDENTITY, identity)
socket.connect(self.address)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
poller.register(cancel, zmq.POLLIN)
LOGGER.debug('connected %s to %s', identity, self.address)
while True:
LOGGER.debug("%d pending messages", len(pending))
# See whether we can find a request that has timed out
identifier = message = None
for candidate, item in pending.items():
delta = time.time() - item['time']
if delta > self.timeout:
identifier = candidate
LOGGER.info('request with identifier %d timed out', identifier)
message = item['message']
# We only need to check the first message
break
# Get a new message
if identifier is None:
try:
identifier, request = next(requests)
num_bytes = max((identifier.bit_length() + 7) // 8, 1)
message = [
identifier.to_bytes(num_bytes, 'little'),
self.dumps(request)
]
# Store the most recent identifier
current_identifier = identifier
LOGGER.debug('new request with identifier %d', identifier)
except StopIteration:
last_identifier = last_identifier or current_identifier
identifier = None
message = [b'']
LOGGER.debug('no more requests; waiting for responses')
socket.send_multipart(message)
LOGGER.debug('sent REQUEST with identifier %s: %s', identifier, message)
# Add to the list of pending requests
if identifier is not None:
pending[identifier] = {
'message': message,
'time': time.time()
}
del identifier
# Retrieve a response
LOGGER.debug('polling...')
sockets = dict(poller.poll(1000 * self.timeout))
# Communication timed out
if not sockets:
num_retries += 1
LOGGER.info('time out #%d for %s after %.3f seconds', num_retries,
self.address, self.timeout)
if self.max_retries and num_retries >= self.max_retries:
# The communication failed
message = "maximum number of retries (%d) for %s exceeded" % \
(self.max_retries, self.address)
LOGGER.error(message)
self.results.put(('timeout', TimeoutError(message)))
return
break # pragma: no cover
# Reset the retry counter
num_retries = 0
# Cancel the communication thread
if sockets.get(cancel) == zmq.POLLIN:
LOGGER.debug('received CANCEL signal on %s', self._cancel_address)
return
if sockets.get(socket) == zmq.POLLIN:
identifier, *response = socket.recv_multipart()
if not identifier:
LOGGER.debug('received dispatch notification')
continue
# Decode the identifier and remove the corresponding request from `pending`
identifier = int.from_bytes(identifier, 'little')
LOGGER.debug('received RESPONSE for identifier %d (next: %d, end: %s)',
identifier, next_identifier, last_identifier)
pending.pop(identifier, None)
# Drop the message if it is outdated
if identifier < next_identifier: # pragma: no cover
LOGGER.debug('dropped RESPONSE with identifier %d (next: %d)',
identifier, next_identifier)
continue
# Add the message to the cache
cache[identifier] = response
while True:
try:
status, response = cache.pop(next_identifier)
status = self.STATUS[status]
self.results.put(
(status, identifier if status == 'serialization_error' else
self.loads(response))
)
if next_identifier == last_identifier:
self.results.put(('end', None))
return
next_identifier += 1
except KeyError:
break