in sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java [163:199]
public ArrayList<JobMessage> getJobMessages(
String jobId, long startTimestampMs) throws IOException {
// TODO: Allow filtering messages by importance
Instant startTimestamp = new Instant(startTimestampMs);
ArrayList<JobMessage> allMessages = new ArrayList<>();
String pageToken = null;
while (true) {
Messages.List listRequest = messagesClient.list(projectId, jobId);
if (pageToken != null) {
listRequest.setPageToken(pageToken);
}
ListJobMessagesResponse response = listRequest.execute();
if (response == null || response.getJobMessages() == null) {
return allMessages;
}
for (JobMessage m : response.getJobMessages()) {
@Nullable Instant timestamp = fromCloudTime(m.getTime());
if (timestamp == null) {
continue;
}
if (timestamp.isAfter(startTimestamp)) {
allMessages.add(m);
}
}
if (response.getNextPageToken() == null) {
break;
} else {
pageToken = response.getNextPageToken();
}
}
Collections.sort(allMessages, new TimeStampComparator());
return allMessages;
}