def _read_from_pubsub()

in exec/src/klio_exec/runners/evaluators.py [0:0]


    def _read_from_pubsub(self, timestamp_attribute):
        # Klio maintainer note: This code is the eact same logic in
        # _PubSubReadEvaluator._read_from_pubsub with the
        # following changes:
        # 1. Import statements that were originally inside this method
        #    was moved to the top of this module.
        # 2. Import statements adjusted to import module and not objects
        #    according to the google style guide.
        # 3. The functionalty we needed to override, which skips auto-acking
        #    consumed pubsub messages, and adds them to the MessageManager
        #    to handle deadline extension and acking once done.

        def _get_element(ack_id, message):
            parsed_message = beam_pubsub.PubsubMessage._from_message(message)
            if (
                timestamp_attribute
                and timestamp_attribute in parsed_message.attributes
            ):
                rfc3339_or_milli = parsed_message.attributes[
                    timestamp_attribute
                ]
                try:
                    timestamp = beam_timestamp.Timestamp(
                        micros=int(rfc3339_or_milli) * 1000
                    )
                except ValueError:
                    try:
                        timestamp = beam_timestamp.Timestamp.from_rfc3339(
                            rfc3339_or_milli
                        )
                    except ValueError as e:
                        raise ValueError("Bad timestamp value: %s" % e)
            else:
                timestamp = beam_timestamp.Timestamp(
                    message.publish_time.seconds,
                    message.publish_time.nanos // 1000,
                )

            self.message_manager.add(ack_id, parsed_message)

            return timestamp, parsed_message

        results = None
        try:
            response = self.sub_client.pull(
                self._sub_name, max_messages=1, return_immediately=True
            )
            results = [
                _get_element(rm.ack_id, rm.message)
                for rm in response.received_messages
            ]

        # only catching/ignoring this for now - if new exceptions raise, we'll
        # figure it out as they come on how to handle them
        except g_exceptions.DeadlineExceeded as e:
            # this seems mostly a benign error when there are 20+ seconds
            # between messages
            self.logger.debug(e)

        finally:
            self.sub_client.api.transport.channel.close()

        return results