in sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java [582:672]
public void processElement(ProcessContext c) throws IOException {
Pubsub pubsubClient =
Transport.newPubsubClient(c.getPipelineOptions().as(DataflowPipelineOptions.class))
.build();
String subscription;
if (getSubscription() == null) {
String topic = getTopic().asV1Beta2Path();
String[] split = topic.split("/");
subscription = "projects/" + split[1] + "/subscriptions/" + split[3]
+ "_dataflow_" + new Random().nextLong();
Subscription subInfo = new Subscription()
.setAckDeadlineSeconds(60)
.setTopic(topic);
try {
pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
} catch (Exception e) {
throw new RuntimeException("Failed to create subscription: ", e);
}
} else {
subscription = getSubscription().asV1Beta2Path();
}
Instant endTime = getMaxReadTime() == null
? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime());
List<PubsubMessage> messages = new ArrayList<>();
Throwable finallyBlockException = null;
try {
while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords())
&& Instant.now().isBefore(endTime)) {
PullRequest pullRequest = new PullRequest().setReturnImmediately(false);
if (getMaxNumRecords() > 0) {
pullRequest.setMaxMessages(getMaxNumRecords() - messages.size());
} else {
pullRequest.setMaxMessages(DEFAULT_PULL_SIZE);
}
PullResponse pullResponse =
pubsubClient.projects().subscriptions().pull(subscription, pullRequest).execute();
List<String> ackIds = new ArrayList<>();
if (pullResponse.getReceivedMessages() != null) {
for (ReceivedMessage received : pullResponse.getReceivedMessages()) {
messages.add(received.getMessage());
ackIds.add(received.getAckId());
}
}
if (ackIds.size() != 0) {
AcknowledgeRequest ackRequest = new AcknowledgeRequest().setAckIds(ackIds);
pubsubClient.projects()
.subscriptions()
.acknowledge(subscription, ackRequest)
.execute();
}
}
} catch (IOException e) {
throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
} finally {
if (getTopic() != null) {
try {
pubsubClient.projects().subscriptions().delete(subscription).execute();
} catch (IOException e) {
finallyBlockException = new RuntimeException("Failed to delete subscription: ", e);
LOG.error("Failed to delete subscription: ", e);
}
}
}
if (finallyBlockException != null) {
Throwables.propagate(finallyBlockException);
}
for (PubsubMessage message : messages) {
Instant timestamp;
if (getTimestampLabel() == null) {
timestamp = Instant.now();
} else {
if (message.getAttributes() == null
|| !message.getAttributes().containsKey(getTimestampLabel())) {
throw new RuntimeException(
"Message from pubsub missing timestamp label: " + getTimestampLabel());
}
timestamp = new Instant(Long.parseLong(
message.getAttributes().get(getTimestampLabel())));
}
c.outputWithTimestamp(
CoderUtils.decodeFromByteArray(getCoder(), message.decodeData()),
timestamp);
}
}