in controllers/flinkcluster/flinkcluster_updater.go [573:599]
func (updater *ClusterStatusUpdater) getFlinkJobID() *string {
// Observed from active job manager.
var observedFlinkJob = updater.observed.flinkJob.status
if observedFlinkJob != nil && len(observedFlinkJob.Id) > 0 {
return &observedFlinkJob.Id
}
var observedJobSubmitter = updater.observed.flinkJobSubmitter
if observedJobSubmitter.pod != nil {
if jobId, ok := observedJobSubmitter.pod.Labels[JobIdLabel]; ok {
return &jobId
}
}
// Observed from job submitter (when Flink API is not ready).
if observedJobSubmitter.log != nil && observedJobSubmitter.log.jobID != "" {
return &observedJobSubmitter.log.jobID
}
// Recorded.
var recordedJobStatus = updater.observed.cluster.Status.Components.Job
if recordedJobStatus != nil && len(recordedJobStatus.ID) > 0 {
return &recordedJobStatus.ID
}
return nil
}