pkg/telemetry/telemetry.go (137 lines of code) (raw):

package telemetry import ( "context" "fmt" "time" "github.com/go-resty/resty/v2" corev1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" pipelineAPi "github.com/epam/edp-cd-pipeline-operator/v2/api/v1" codebaseApi "github.com/epam/edp-codebase-operator/v2/api/v1" "github.com/epam/edp-codebase-operator/v2/pkg/platform" ) type Collector struct { namespace string telemetryUrl string k8sClient client.Client } func NewCollector(namespace, telemetryUrl string, k8sClient client.Client) *Collector { return &Collector{namespace: namespace, telemetryUrl: telemetryUrl, k8sClient: k8sClient} } func (c *Collector) Start(ctx context.Context, delay, sendEvery time.Duration) { log := ctrl.Log.WithName("telemetry-collector") go func() { timeToSend := time.Now().Add(delay) ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): log.Info("Stop telemetry-metrics collector") return case now := <-ticker.C: if timeToSend.Before(now) { if err := c.sendTelemetry(ctx); err != nil { log.Error(err, "Failed to send telemetry-metrics") return } log.Info("Telemetry-metrics were sent") return } } } }() ticker := time.NewTicker(sendEvery) defer ticker.Stop() for { select { case <-ctx.Done(): log.Info("Stop telemetry-metrics collector") return case <-ticker.C: if err := c.sendTelemetry(ctx); err != nil { log.Error(err, "Failed to send telemetry-metrics") break } log.Info("Telemetry-metrics were sent") } } } func (c *Collector) sendTelemetry(ctx context.Context) error { edpConfig := &corev1.ConfigMap{} if err := c.k8sClient.Get(ctx, client.ObjectKey{ Namespace: c.namespace, Name: platform.EdpConfigMap, }, edpConfig); err != nil { return fmt.Errorf("failed to get edp config: %w", err) } telemetry := PlatformMetrics{} telemetry.RegistryType = edpConfig.Data["container_registry_type"] telemetry.Version = edpConfig.Data["edp_version"] codebases := &codebaseApi.CodebaseList{} if err := c.k8sClient.List(ctx, codebases, client.InNamespace(c.namespace)); err != nil { return fmt.Errorf("failed to get codebases: %w", err) } for i := 0; i < len(codebases.Items); i++ { telemetry.CodebaseMetrics = append(telemetry.CodebaseMetrics, CodebaseMetrics{ Lang: codebases.Items[i].Spec.Lang, Framework: codebases.Items[i].Spec.Framework, BuildTool: codebases.Items[i].Spec.BuildTool, Strategy: string(codebases.Items[i].Spec.Strategy), Type: codebases.Items[i].Spec.Type, Versioning: string(codebases.Items[i].Spec.Versioning.Type), }) } gitProviders := &codebaseApi.GitServerList{} if err := c.k8sClient.List(ctx, gitProviders, client.InNamespace(c.namespace)); err != nil { return fmt.Errorf("failed to get git providers: %w", err) } if len(gitProviders.Items) > 0 { telemetry.GitProviders = append(telemetry.GitProviders, gitProviders.Items[0].Spec.GitProvider) } stages := &pipelineAPi.StageList{} if err := c.k8sClient.List(ctx, stages, client.InNamespace(c.namespace)); err != nil { return fmt.Errorf("failed to get stages: %w", err) } deploymentType := map[string]string{} stagesCount := map[string]int{} for i := 0; i < len(stages.Items); i++ { stagesCount[stages.Items[i].Spec.CdPipeline]++ if stages.Items[i].Spec.TriggerType == "Auto" { deploymentType[stages.Items[i].Spec.CdPipeline] = "Auto" } } cdPipelines := &pipelineAPi.CDPipelineList{} if err := c.k8sClient.List(ctx, cdPipelines, client.InNamespace(c.namespace)); err != nil { return fmt.Errorf("failed to get cd pipelines: %w", err) } for i := 0; i < len(cdPipelines.Items); i++ { pipeDeployment := "Manual" if val, ok := deploymentType[cdPipelines.Items[i].Name]; ok { pipeDeployment = val } telemetry.CdPipelineMetrics = append(telemetry.CdPipelineMetrics, CdPipelineMetrics{ DeploymentType: pipeDeployment, NumberOfStages: stagesCount[cdPipelines.Items[i].Name], }) } jiraServers := &codebaseApi.JiraServerList{} if err := c.k8sClient.List(ctx, jiraServers, client.InNamespace(c.namespace)); err != nil { return fmt.Errorf("failed to get jira servers: %w", err) } telemetry.JiraEnabled = len(jiraServers.Items) > 0 resp, err := resty.New(). SetHostURL(c.telemetryUrl). R(). SetContext(ctx). SetBody(map[string]PlatformMetrics{"platformMetrics": telemetry}). Post("/v1/submit") if err != nil { return fmt.Errorf("failed to send telemetry: %w", err) } if resp.IsError() { return fmt.Errorf("failed to send telemetry: http status code: %s, body: %s", resp.Status(), resp.String()) } return nil }