util-images/network/netperfbenchmark/pkg/worker/worker.go (260 lines of code) (raw):
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package worker implements the worker related activities like starting TCP/UDP/HTTP client/server
// and collecting the metric output to be returned to the controller when requested.
package worker
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)
// Worker holds data required for facilitating measurement.
type Worker struct {
resourceInterface dynamic.ResourceInterface
podName string
work work
stopCh chan struct{}
}
type work struct {
resourceName string
workType string
resourceSpec resourceProperties
arguments []string
}
type resourceProperties struct {
ServerPodIP string
ServerPodName string
Duration int64
NumberOfClients int64
Protocol string
ClientPodIP string
ClientPodName string
ClientStartTimestamp int64
}
type handlersMap map[string]func(http.ResponseWriter, *http.Request)
// http server listen port.
const (
httpPort = "5301"
namespace = "netperf"
)
var (
protocolCommandMap = map[string]string{
ProtocolHTTP: "siege",
ProtocolTCP: "iperf",
ProtocolUDP: "iperf",
}
// Command arguments for each protocol.This supports templates("{}"),
// the value in template will be replaced by value in http request.
// Iperf command args:
// -c <destinationIP> : connect to destinationIP(as client).
// -f K : report format KBytes/sec.
// -l 20 : read/write buffer size 20 bytes.
// -b 1M : bandwidth 1 Mbits/sec.
// -i 1 : report stats every 1 sec.
// -t duration : run <duration> seconds.
// -u : for udp measurement.
// -e : enhanced reports, gives more metrics for udp.
// -s : run in server mode.
// -P numOfClients: handle <numOfClients> number of clients before disconnecting.
udpClientArguments = []string{"-c", "{serverPodIP}", "-u", "-f", "K", "-e", "-i", "1", "-t", "{duration}"}
udpServerArguments = []string{"-s", "-f", "K", "-u", "-e", "-i", "{duration}", "-P", "{numberOfClients}"}
tcpServerArguments = []string{"-s", "-f", "K", "-i", "{duration}", "-P", "{numberOfClients}"}
tcpClientArguments = []string{"-c", "{serverPodIP}", "-f", "K", "-i", "1", "-t", "{duration}"}
// Siege command args:
// -d1 : random delay between 0 to 1 sec.
// -t<duration>S : run test for <duration> seconds.
// -c1 : one concurrent user.
httpClientArguments = []string{"http://" + "{serverPodIP}" + ":" + httpPort + "/test", "-d1", "-t" + "{duration}" + "S", "-c1"}
protocolArgumentMap = map[string]map[string][]string{
"client": {
ProtocolHTTP: httpClientArguments,
ProtocolTCP: tcpClientArguments,
ProtocolUDP: udpClientArguments,
},
"server": {
ProtocolTCP: tcpServerArguments,
ProtocolUDP: udpServerArguments,
},
}
)
var gvk = schema.GroupVersionKind{Group: "clusterloader.io", Kind: "NetworkTestRequest", Version: "v1alpha1"}
func NewWorker() *Worker {
return &Worker{}
}
// Start starts the worker.
func (w *Worker) Start(extraArguments map[string]*string) {
w.populatePodName()
w.initialize(extraArguments)
}
func (w *Worker) initialize(extraArguments map[string]*string) {
w.addExtraArguments(extraArguments)
gvr, _ := meta.UnsafeGuessKindToResource(gvk)
k8sClient, err := getDynamicClient()
if err != nil {
klog.Fatalf("Error getting dynamic client:%s", err)
}
w.resourceInterface = k8sClient.Resource(gvr).Namespace(namespace)
informer, err := getInformer(w.getCustomResourceLabelSelector(), namespace, k8sClient, gvr)
if err != nil {
klog.Fatalf("Error getting informer:%s", err)
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
w.handleCustomResource(obj)
},
})
w.stopCh = make(chan struct{})
informer.Run(w.stopCh)
}
func (w *Worker) addExtraArguments(extraAguments map[string]*string) {
for argumentName, argumentValue := range extraAguments {
if *argumentValue == "" {
continue
}
split := strings.Split(argumentName, "_")
if len(split) < 2 {
klog.Errorf("Extra argument name should be in format <type_protocol>: %s", argumentName)
return
}
protocolArgumentMap[split[0]][split[1]] = append(protocolArgumentMap[split[0]][split[1]], *argumentValue)
}
}
func (w *Worker) populatePodName() {
var isPodNameAvailable bool
w.podName, isPodNameAvailable = os.LookupEnv("POD_NAME")
if !isPodNameAvailable {
klog.Fatal(errors.New("pod name not set as environment variable"))
}
klog.Info("Pod Name set:", w.podName)
}
func (w *Worker) getCustomResourceLabelSelector() string {
return fmt.Sprintf("%s in (clientPodName,serverPodName)", w.podName)
}
func (w *Worker) handleCustomResource(obj interface{}) {
newRuntimeObj, ok := obj.(runtime.Object)
if obj != nil && !ok {
klog.Errorf("Error casting object: %s", obj)
return
}
err := w.populateResourceSpec(newRuntimeObj)
if err != nil {
w.handleError(fmt.Errorf("populating resource spec failed: %v", err))
return
}
klog.Info("Recevied add event for resource with spec:", w.work.resourceSpec)
switch w.podName {
case w.work.resourceSpec.ClientPodName:
w.work.workType = "client"
case w.work.resourceSpec.ServerPodName:
w.work.workType = "server"
default:
w.handleError(errors.New("pod name not set as client or server"))
return
}
w.startWork()
}
func (w *Worker) populateResourceSpec(object runtime.Object) error {
resourceContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(object)
if err != nil {
return fmt.Errorf("error converting event Object to unstructured.Event object: %s", object)
}
metadata := resourceContent["metadata"].(map[string]interface{})
w.work.resourceName = metadata["name"].(string)
resourceSpecMap := resourceContent["spec"].(map[string]interface{})
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(resourceSpecMap, &w.work.resourceSpec); err != nil {
return fmt.Errorf("error converting custom resource properties: %s", err)
}
return nil
}
func (w *Worker) startWork() {
if w.work.resourceSpec.Protocol == ProtocolHTTP && w.work.workType == "server" {
w.StartHTTPServer()
return
}
var err error
properties := extractParameters(w.work.resourceSpec)
w.work.arguments, err = populateTemplates(protocolArgumentMap[w.work.workType][w.work.resourceSpec.Protocol], properties)
if err != nil {
w.handleError(fmt.Errorf("populating template failed: %v", err))
return
}
klog.Infof("Populated templates: %s", w.work.arguments)
startTimestamp := w.getStartTimestamp()
w.schedule(startTimestamp)
}
func (w *Worker) getStartTimestamp() int64 {
if w.work.workType == "client" {
return w.work.resourceSpec.ClientStartTimestamp
}
return time.Now().Unix()
}
func (w *Worker) updateStatus(status map[string]interface{}) error {
resource, err := w.resourceInterface.Get(context.TODO(), w.work.resourceName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("updating status failed: %v", err)
}
resourceContent := resource.UnstructuredContent()
resourceContent["status"] = status
var unstructuredRes unstructured.Unstructured
unstructuredRes.SetUnstructuredContent(resourceContent)
// TODO(@VivekThrivikraman-est): add retries.
_, err = w.resourceInterface.Update(context.TODO(), &unstructuredRes, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error updating status: %v", err)
}
klog.Info("Status Updated")
return nil
}
func (w *Worker) handleError(err error) {
klog.Error(err)
if err := w.updateStatus(makeErrorStatus(err)); err != nil {
klog.Error(err)
}
}
func (w *Worker) schedule(startTimestamp int64) {
startTime := time.Unix(startTimestamp, 0)
klog.Infof("About to wait until %v, current time: %v", startTime, time.Now())
time.Sleep(startTime.Sub(time.Now()))
workerDelay := time.Now().Sub(startTime)
command := protocolCommandMap[w.work.resourceSpec.Protocol]
result, err := executeCommand(command, w.work.arguments)
if err != nil {
w.handleError(fmt.Errorf("error executing command %v %v: %v", command, w.work.arguments, err))
return
}
if !w.shouldParseResponse() {
return
}
durationInt := strconv.FormatInt(w.work.resourceSpec.Duration, 10)
parsedResult, err := parseResult(w.work.resourceSpec.Protocol, result, durationInt)
if err != nil {
w.handleError(fmt.Errorf("error parsing command response: %v", err))
return
}
klog.Info("Parsed Response:", parsedResult)
if err := w.updateStatus(makeSuccessStatus(parsedResult, workerDelay.Seconds())); err != nil {
klog.Error(err)
}
}
func (w *Worker) shouldParseResponse() bool {
return (w.work.resourceSpec.Protocol != ProtocolHTTP && w.work.workType == "server") ||
(w.work.resourceSpec.Protocol == ProtocolHTTP && w.work.workType == "client")
}
func (w *Worker) startListening(port string, handlers handlersMap) {
for urlPath, handler := range handlers {
http.HandleFunc(urlPath, handler)
}
if err := http.ListenAndServe(":"+port, nil); err != nil {
klog.Fatalf("Failed to start http server on port %v: %v", port, err)
}
}
func (w *Worker) sendResponse(rw http.ResponseWriter, statusCode int, response interface{}) {
rw.Header().Set("Content-Type", "application/json")
marshalledResponse, err := json.Marshal(response)
if err != nil {
klog.Errorf("Error marshalling to json: %v", err)
rw.WriteHeader(http.StatusInternalServerError)
return
}
klog.V(3).Infof("Marshalled Response: %v", response)
rw.WriteHeader(statusCode)
if _, err := rw.Write(marshalledResponse); err != nil {
klog.Errorf("Error writing response to ResponseWriter: %v", err)
}
}
// StartHTTPServer starts an http server for http measurements.
func (w *Worker) StartHTTPServer() {
klog.Info("Starting HTTP Server")
go w.startListening(httpPort, handlersMap{"/test": w.Handler})
}
// Handler handles http requests for http measurements.
func (w *Worker) Handler(rw http.ResponseWriter, request *http.Request) {
w.sendResponse(rw, http.StatusOK, "ok")
}