in controllers/flinkcluster/flinkcluster_observer.go [336:387]
func (observer *ClusterStateObserver) observeFlinkJobStatus(ctx context.Context, observed *ObservedClusterState, flinkJobID string, flinkJob *FlinkJob) {
var log = logr.FromContextOrDiscard(ctx)
// Observe following
var flinkJobStatus *flink.Job
var flinkJobList *flink.JobsOverview
var flinkJobsUnexpected []string
// Get Flink job status list.
flinkAPIBaseURL := getFlinkAPIBaseURL(observed.cluster)
flinkJobList, err := observer.flinkClient.GetJobsOverview(flinkAPIBaseURL)
if err != nil {
// It is normal in many cases, not an error.
log.Info("Failed to get Flink job status list.", "error", err)
return
}
flinkJob.list = flinkJobList
// Extract the current job status and unexpected jobs.
for _, job := range flinkJobList.Jobs {
if flinkJobID == job.Id {
flinkJobStatus = new(flink.Job)
*flinkJobStatus = job
} else if getFlinkJobDeploymentState(job.State) == v1beta1.JobStateRunning {
flinkJobsUnexpected = append(flinkJobsUnexpected, job.Id)
}
}
flinkJob.status = flinkJobStatus
flinkJob.unexpected = flinkJobsUnexpected
log.Info("Observed Flink job",
"submitted job status", flinkJob.status,
"all job list", flinkJob.list,
"unexpected job list", flinkJob.unexpected)
if len(flinkJobsUnexpected) > 0 {
log.Info("More than one unexpected Flink job were found!")
}
if flinkJobID == "" {
log.Info("No flinkJobID given. Skipping get exceptions")
} else {
flinkJobExceptions, err := observer.flinkClient.GetJobExceptions(flinkAPIBaseURL, flinkJobID)
if err != nil {
// It is normal in many cases, not an error.
log.Info("Failed to get Flink job exceptions.", "error", err)
} else {
log.Info("Observed Flink job exceptions", "jobs", flinkJobExceptions)
flinkJob.exceptions = flinkJobExceptions
}
}
}