func()

in controllers/flinkcluster/flinkcluster_updater.go [573:599]


func (updater *ClusterStatusUpdater) getFlinkJobID() *string {
	// Observed from active job manager.
	var observedFlinkJob = updater.observed.flinkJob.status
	if observedFlinkJob != nil && len(observedFlinkJob.Id) > 0 {
		return &observedFlinkJob.Id
	}

	var observedJobSubmitter = updater.observed.flinkJobSubmitter
	if observedJobSubmitter.pod != nil {
		if jobId, ok := observedJobSubmitter.pod.Labels[JobIdLabel]; ok {
			return &jobId
		}
	}

	// Observed from job submitter (when Flink API is not ready).
	if observedJobSubmitter.log != nil && observedJobSubmitter.log.jobID != "" {
		return &observedJobSubmitter.log.jobID
	}

	// Recorded.
	var recordedJobStatus = updater.observed.cluster.Status.Components.Job
	if recordedJobStatus != nil && len(recordedJobStatus.ID) > 0 {
		return &recordedJobStatus.ID
	}

	return nil
}