func()

in controllers/flinkcluster/flinkcluster_observer.go [336:387]


func (observer *ClusterStateObserver) observeFlinkJobStatus(ctx context.Context, observed *ObservedClusterState, flinkJobID string, flinkJob *FlinkJob) {
	var log = logr.FromContextOrDiscard(ctx)
	// Observe following
	var flinkJobStatus *flink.Job
	var flinkJobList *flink.JobsOverview
	var flinkJobsUnexpected []string

	// Get Flink job status list.
	flinkAPIBaseURL := getFlinkAPIBaseURL(observed.cluster)
	flinkJobList, err := observer.flinkClient.GetJobsOverview(flinkAPIBaseURL)
	if err != nil {
		// It is normal in many cases, not an error.
		log.Info("Failed to get Flink job status list.", "error", err)
		return
	}
	flinkJob.list = flinkJobList

	// Extract the current job status and unexpected jobs.
	for _, job := range flinkJobList.Jobs {
		if flinkJobID == job.Id {
			flinkJobStatus = new(flink.Job)
			*flinkJobStatus = job
		} else if getFlinkJobDeploymentState(job.State) == v1beta1.JobStateRunning {
			flinkJobsUnexpected = append(flinkJobsUnexpected, job.Id)
		}
	}

	flinkJob.status = flinkJobStatus
	flinkJob.unexpected = flinkJobsUnexpected

	log.Info("Observed Flink job",
		"submitted job status", flinkJob.status,
		"all job list", flinkJob.list,
		"unexpected job list", flinkJob.unexpected)
	if len(flinkJobsUnexpected) > 0 {
		log.Info("More than one unexpected Flink job were found!")
	}

	if flinkJobID == "" {
		log.Info("No flinkJobID given. Skipping get exceptions")
	} else {
		flinkJobExceptions, err := observer.flinkClient.GetJobExceptions(flinkAPIBaseURL, flinkJobID)
		if err != nil {
			// It is normal in many cases, not an error.
			log.Info("Failed to get Flink job exceptions.", "error", err)
		} else {
			log.Info("Observed Flink job exceptions", "jobs", flinkJobExceptions)
			flinkJob.exceptions = flinkJobExceptions
		}
	}

}