in exec/src/klio_exec/runners/evaluators.py [0:0]
def _read_from_pubsub(self, timestamp_attribute):
# Klio maintainer note: This code is the eact same logic in
# _PubSubReadEvaluator._read_from_pubsub with the
# following changes:
# 1. Import statements that were originally inside this method
# was moved to the top of this module.
# 2. Import statements adjusted to import module and not objects
# according to the google style guide.
# 3. The functionalty we needed to override, which skips auto-acking
# consumed pubsub messages, and adds them to the MessageManager
# to handle deadline extension and acking once done.
def _get_element(ack_id, message):
parsed_message = beam_pubsub.PubsubMessage._from_message(message)
if (
timestamp_attribute
and timestamp_attribute in parsed_message.attributes
):
rfc3339_or_milli = parsed_message.attributes[
timestamp_attribute
]
try:
timestamp = beam_timestamp.Timestamp(
micros=int(rfc3339_or_milli) * 1000
)
except ValueError:
try:
timestamp = beam_timestamp.Timestamp.from_rfc3339(
rfc3339_or_milli
)
except ValueError as e:
raise ValueError("Bad timestamp value: %s" % e)
else:
timestamp = beam_timestamp.Timestamp(
message.publish_time.seconds,
message.publish_time.nanos // 1000,
)
self.message_manager.add(ack_id, parsed_message)
return timestamp, parsed_message
results = None
try:
response = self.sub_client.pull(
self._sub_name, max_messages=1, return_immediately=True
)
results = [
_get_element(rm.ack_id, rm.message)
for rm in response.received_messages
]
# only catching/ignoring this for now - if new exceptions raise, we'll
# figure it out as they come on how to handle them
except g_exceptions.DeadlineExceeded as e:
# this seems mostly a benign error when there are 20+ seconds
# between messages
self.logger.debug(e)
finally:
self.sub_client.api.transport.channel.close()
return results