func flinkClusterJobPhaseInfo()

in pkg/flink/handler.go [242:278]


func flinkClusterJobPhaseInfo(ctx context.Context, jobStatus *flinkOp.JobStatus, occurredAt time.Time, info *pluginsCore.TaskInfo) pluginsCore.PhaseInfo {
	logger.Infof(ctx, "job_state: %s", jobStatus.State)

	msg := fmt.Sprintf("%s %s", jobStatus.ID, jobStatus.State)

	switch jobStatus.State {
	case flinkOp.JobStateCancelled:
		return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, msg, info)
	case flinkOp.JobStateFailed, flinkOp.JobStateDeployFailed, flinkOp.JobStateLost:
		if isSubmitterExitCodeRetryable(ctx, jobStatus.SubmitterExitCode) {
			reason := fmt.Sprintf("Flink Job Failed with Error: %v (retryable)", jobStatus.FailureReasons)
			return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info)
		}
		reason := fmt.Sprintf("Flink Job Failed with Error: %v (non-retryable)", jobStatus.FailureReasons)
		return pluginsCore.PhaseInfoFailure(nonRetryableFlyteCode, reason, info)
	case flinkOp.JobStateRunning:
		return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info)
	case flinkOp.JobStateUpdating, flinkOp.JobStatePending, flinkOp.JobStateDeploying, flinkOp.JobStateRestarting:
		return pluginsCore.PhaseInfoInitializing(occurredAt, pluginsCore.DefaultPhaseVersion, msg, info)
	case flinkOp.JobStateSucceeded:
		if jobStatus.SubmitterExitCode < 0 {
			return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info)
		}
		if jobStatus.SubmitterExitCode == 0 {
			return pluginsCore.PhaseInfoSuccess(info)
		}
		if isSubmitterExitCodeRetryable(ctx, jobStatus.SubmitterExitCode) {
			reason := fmt.Sprintf("Flink jobsubmitter exited with non-zero exit code: %v (retryable)", jobStatus.FailureReasons)
			return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info)
		}
		reason := fmt.Sprintf("Flink jobsubmitter exited with non-zero exit code: %v (non-retryable)", jobStatus.FailureReasons)
		return pluginsCore.PhaseInfoFailure(nonRetryableFlyteCode, reason, info)
	default:
		msg := fmt.Sprintf("job id: %s with unknown state: %s", jobStatus.ID, jobStatus.State)
		return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, msg, info)
	}
}