in pkg/flink/handler.go [242:278]
func flinkClusterJobPhaseInfo(ctx context.Context, jobStatus *flinkOp.JobStatus, occurredAt time.Time, info *pluginsCore.TaskInfo) pluginsCore.PhaseInfo {
logger.Infof(ctx, "job_state: %s", jobStatus.State)
msg := fmt.Sprintf("%s %s", jobStatus.ID, jobStatus.State)
switch jobStatus.State {
case flinkOp.JobStateCancelled:
return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, msg, info)
case flinkOp.JobStateFailed, flinkOp.JobStateDeployFailed, flinkOp.JobStateLost:
if isSubmitterExitCodeRetryable(ctx, jobStatus.SubmitterExitCode) {
reason := fmt.Sprintf("Flink Job Failed with Error: %v (retryable)", jobStatus.FailureReasons)
return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info)
}
reason := fmt.Sprintf("Flink Job Failed with Error: %v (non-retryable)", jobStatus.FailureReasons)
return pluginsCore.PhaseInfoFailure(nonRetryableFlyteCode, reason, info)
case flinkOp.JobStateRunning:
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info)
case flinkOp.JobStateUpdating, flinkOp.JobStatePending, flinkOp.JobStateDeploying, flinkOp.JobStateRestarting:
return pluginsCore.PhaseInfoInitializing(occurredAt, pluginsCore.DefaultPhaseVersion, msg, info)
case flinkOp.JobStateSucceeded:
if jobStatus.SubmitterExitCode < 0 {
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info)
}
if jobStatus.SubmitterExitCode == 0 {
return pluginsCore.PhaseInfoSuccess(info)
}
if isSubmitterExitCodeRetryable(ctx, jobStatus.SubmitterExitCode) {
reason := fmt.Sprintf("Flink jobsubmitter exited with non-zero exit code: %v (retryable)", jobStatus.FailureReasons)
return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info)
}
reason := fmt.Sprintf("Flink jobsubmitter exited with non-zero exit code: %v (non-retryable)", jobStatus.FailureReasons)
return pluginsCore.PhaseInfoFailure(nonRetryableFlyteCode, reason, info)
default:
msg := fmt.Sprintf("job id: %s with unknown state: %s", jobStatus.ID, jobStatus.State)
return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, msg, info)
}
}