internal/util/kube.go (69 lines of code) (raw):
package util
import (
"bytes"
"context"
"fmt"
"io"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)
func GetPodLogs(clientset *kubernetes.Clientset, pod *corev1.Pod) (string, error) {
if pod == nil {
return "", fmt.Errorf("no job pod found, even though submission completed")
}
pods := clientset.CoreV1().Pods(pod.Namespace)
req := pods.GetLogs(pod.Name, &corev1.PodLogOptions{Container: "main"})
podLogs, err := req.Stream(context.TODO())
if err != nil {
return "", fmt.Errorf("failed to get logs for pod %s: %v", pod.Name, err)
}
defer podLogs.Close()
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return "", fmt.Errorf("error in copy information from pod logs to buf")
}
str := buf.String()
return str, nil
}
func GetNextRevisionNumber(revisions []*appsv1.ControllerRevision) int64 {
count := len(revisions)
if count <= 0 {
return 1
}
return revisions[count-1].Revision + 1
}
// Compose revision in FlinkClusterStatus with name and number of ControllerRevision
func GetRevisionWithNameNumber(cr *appsv1.ControllerRevision) string {
return fmt.Sprintf("%v-%v", cr.Name, cr.Revision)
}
func GetNonLiveHistory(revisions []*appsv1.ControllerRevision, historyLimit int) []*appsv1.ControllerRevision {
history := append([]*appsv1.ControllerRevision{}, revisions...)
nonLiveHistory := make([]*appsv1.ControllerRevision, 0)
historyLen := len(history)
if historyLen <= historyLimit {
return nonLiveHistory
}
nonLiveHistory = append(nonLiveHistory, history[:(historyLen-historyLimit)]...)
return nonLiveHistory
}
func UpperBoundedResourceList(resources corev1.ResourceRequirements) *corev1.ResourceList {
rl := corev1.ResourceList{}
if !resources.Limits.Cpu().IsZero() {
rl[corev1.ResourceCPU] = *resources.Limits.Cpu()
} else {
rl[corev1.ResourceCPU] = *resources.Requests.Cpu()
}
if !resources.Limits.Memory().IsZero() {
rl[corev1.ResourceMemory] = *resources.Limits.Memory()
} else {
rl[corev1.ResourceMemory] = *resources.Requests.Memory()
}
if !resources.Requests.Storage().IsZero() {
rl[corev1.ResourceStorage] = *resources.Requests.Storage()
}
if !resources.Requests.StorageEphemeral().IsZero() {
rl[corev1.ResourceEphemeralStorage] = *resources.Requests.StorageEphemeral()
}
return &rl
}