public void processElement()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java [582:672]


        public void processElement(ProcessContext c) throws IOException {
          Pubsub pubsubClient =
              Transport.newPubsubClient(c.getPipelineOptions().as(DataflowPipelineOptions.class))
                  .build();

          String subscription;
          if (getSubscription() == null) {
            String topic = getTopic().asV1Beta2Path();
            String[] split = topic.split("/");
            subscription = "projects/" + split[1] + "/subscriptions/" + split[3]
                + "_dataflow_" + new Random().nextLong();
            Subscription subInfo = new Subscription()
                .setAckDeadlineSeconds(60)
                .setTopic(topic);
            try {
              pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
            } catch (Exception e) {
              throw new RuntimeException("Failed to create subscription: ", e);
            }
          } else {
             subscription = getSubscription().asV1Beta2Path();
          }

          Instant endTime = getMaxReadTime() == null
              ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime());

          List<PubsubMessage> messages = new ArrayList<>();

          Throwable finallyBlockException = null;
          try {
            while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords())
                && Instant.now().isBefore(endTime)) {
              PullRequest pullRequest = new PullRequest().setReturnImmediately(false);
              if (getMaxNumRecords() > 0) {
                pullRequest.setMaxMessages(getMaxNumRecords() - messages.size());
              } else {
                pullRequest.setMaxMessages(DEFAULT_PULL_SIZE);
              }

              PullResponse pullResponse =
                  pubsubClient.projects().subscriptions().pull(subscription, pullRequest).execute();
              List<String> ackIds = new ArrayList<>();
              if (pullResponse.getReceivedMessages() != null) {
                for (ReceivedMessage received : pullResponse.getReceivedMessages()) {
                  messages.add(received.getMessage());
                  ackIds.add(received.getAckId());
                }
              }

              if (ackIds.size() != 0) {
                AcknowledgeRequest ackRequest = new AcknowledgeRequest().setAckIds(ackIds);
                pubsubClient.projects()
                    .subscriptions()
                    .acknowledge(subscription, ackRequest)
                    .execute();
              }
            }
          } catch (IOException e) {
            throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
          } finally {
            if (getTopic() != null) {
              try {
                pubsubClient.projects().subscriptions().delete(subscription).execute();
              } catch (IOException e) {
                finallyBlockException = new RuntimeException("Failed to delete subscription: ", e);
                LOG.error("Failed to delete subscription: ", e);
              }
            }
          }
          if (finallyBlockException != null) {
            Throwables.propagate(finallyBlockException);
          }

          for (PubsubMessage message : messages) {
            Instant timestamp;
            if (getTimestampLabel() == null) {
              timestamp = Instant.now();
            } else {
              if (message.getAttributes() == null
                  || !message.getAttributes().containsKey(getTimestampLabel())) {
                throw new RuntimeException(
                    "Message from pubsub missing timestamp label: " + getTimestampLabel());
              }
              timestamp = new Instant(Long.parseLong(
                      message.getAttributes().get(getTimestampLabel())));
            }
            c.outputWithTimestamp(
                CoderUtils.decodeFromByteArray(getCoder(), message.decodeData()),
                timestamp);
          }
        }