func flinkClusterPhaseInfo()

in pkg/flink/handler.go [280:300]


func flinkClusterPhaseInfo(ctx context.Context, app *flinkOp.FlinkCluster, occurredAt time.Time) (pluginsCore.PhaseInfo, error) {
	info, err := flinkClusterTaskInfo(ctx, app)
	if err != nil {
		return pluginsCore.PhaseInfoUndefined, err
	}

	jobStatus := app.Status.Components.Job

	logger.Infof(ctx, "cluster_state: %s", app.Status.State)

	switch app.Status.State {
	case flinkOp.ClusterStateCreating, flinkOp.ClusterStateReconciling, flinkOp.ClusterStateUpdating:
		return pluginsCore.PhaseInfoWaitingForResourcesInfo(occurredAt, pluginsCore.DefaultPhaseVersion, "cluster starting", info), nil
	case flinkOp.ClusterStateRunning:
		return flinkClusterJobPhaseInfo(ctx, jobStatus, occurredAt, info), nil
	case flinkOp.ClusterStateStopped, flinkOp.ClusterStateStopping, flinkOp.ClusterStatePartiallyStopped:
		return flinkClusterJobPhaseInfo(ctx, jobStatus, occurredAt, info), nil
	}

	return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil
}