in pkg/flink/handler.go [280:300]
func flinkClusterPhaseInfo(ctx context.Context, app *flinkOp.FlinkCluster, occurredAt time.Time) (pluginsCore.PhaseInfo, error) {
info, err := flinkClusterTaskInfo(ctx, app)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
}
jobStatus := app.Status.Components.Job
logger.Infof(ctx, "cluster_state: %s", app.Status.State)
switch app.Status.State {
case flinkOp.ClusterStateCreating, flinkOp.ClusterStateReconciling, flinkOp.ClusterStateUpdating:
return pluginsCore.PhaseInfoWaitingForResourcesInfo(occurredAt, pluginsCore.DefaultPhaseVersion, "cluster starting", info), nil
case flinkOp.ClusterStateRunning:
return flinkClusterJobPhaseInfo(ctx, jobStatus, occurredAt, info), nil
case flinkOp.ClusterStateStopped, flinkOp.ClusterStateStopping, flinkOp.ClusterStatePartiallyStopped:
return flinkClusterJobPhaseInfo(ctx, jobStatus, occurredAt, info), nil
}
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil
}