in sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java [90:123]
public void process(List<JobMessage> messages) {
for (JobMessage message : messages) {
if (message.getMessageText() == null || message.getMessageText().isEmpty()) {
continue;
}
String importanceString = null;
if (message.getMessageImportance() == null) {
continue;
} else if (message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
importanceString = "Error: ";
} else if (message.getMessageImportance().equals("JOB_MESSAGE_WARNING")) {
importanceString = "Warning: ";
} else if (message.getMessageImportance().equals("JOB_MESSAGE_BASIC")) {
importanceString = "Basic: ";
} else if (message.getMessageImportance().equals("JOB_MESSAGE_DETAILED")) {
importanceString = "Detail: ";
} else {
// TODO: Remove filtering here once getJobMessages supports minimum
// importance.
continue;
}
@Nullable Instant time = TimeUtil.fromCloudTime(message.getTime());
if (time == null) {
out.print("UNKNOWN TIMESTAMP: ");
} else {
out.print(time + ": ");
}
if (importanceString != null) {
out.print(importanceString);
}
out.println(message.getMessageText());
}
out.flush();
}