in controllers/flinkcluster/flinkcluster_observer.go [251:329]
func (observer *ClusterStateObserver) observeJob(
ctx context.Context,
observed *ObservedClusterState) error {
// Either the cluster has been deleted or it is a session cluster.
if observed.cluster == nil || observed.cluster.Spec.Job == nil {
return nil
}
var log = logr.FromContextOrDiscard(ctx)
// Extract the log stream from pod only when the job state is Deploying.
var recordedJob = observed.cluster.Status.Components.Job
var jobName string
var applicationMode = IsApplicationModeCluster(observed.cluster)
if applicationMode {
jobName = getJobManagerJobName(observed.cluster.Name)
} else {
jobName = getSubmitterJobName(observed.cluster.Name)
}
// Job resource.
job := new(batchv1.Job)
if err := observer.observeObject(ctx, jobName, job); err != nil {
if client.IgnoreNotFound(err) != nil {
log.Error(err, "job submitter batchv1.Job")
}
job = nil
}
// Get job submitter pod resource.
jobPod := new(corev1.Pod)
if err := observer.observeJobSubmitterPod(ctx, jobName, jobPod); err != nil {
if client.IgnoreNotFound(err) != nil {
log.Error(err, "job submitter corev1.Pod")
}
jobPod = nil
}
var submitterLog *SubmitterLog
// Extract submission result only when it is in deployment progress or the submitter pod failed.
// It is not necessary to get the log stream from the submitter pod always.
var jobDeployInProgress = recordedJob != nil && recordedJob.State == v1beta1.JobStateDeploying
if jobPod != nil && (jobDeployInProgress || jobPod.Status.Phase == corev1.PodFailed) {
var err error
submitterLog, err = getFlinkJobSubmitLog(observer.k8sClientset, jobPod)
if err != nil {
// Error occurred while pulling log stream from the job submitter pod.
// In this case the operator must return the error and retry in the next reconciliation iteration.
log.Info("Failed to get log stream from the job submitter pod. Will try again in the next iteration.")
submitterLog = nil
}
}
observed.flinkJobSubmitter = FlinkJobSubmitter{
job: job,
pod: jobPod,
log: submitterLog,
}
// Wait until the job manager is ready.
jmReady := applicationMode ||
(observed.jmStatefulSet != nil && getStatefulSetState(observed.jmStatefulSet) == v1beta1.ComponentStateReady)
if jmReady {
// Observe the Flink job status.
var flinkJobID string
if jobID, ok := jobPod.Labels[JobIdLabel]; ok {
flinkJobID = jobID
} else
// Get the ID from the job submitter.
if submitterLog != nil && submitterLog.jobID != "" {
flinkJobID = submitterLog.jobID
} else
// Or get the job ID from the recorded job status which is written in previous iteration.
if recordedJob != nil {
flinkJobID = recordedJob.ID
}
observer.observeFlinkJobStatus(ctx, observed, flinkJobID, &observed.flinkJob)
}
return nil
}