network/benchmarks/netperf/launch.go (405 lines of code) (raw):
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
launch.go
Launch the netperf tests
1. Launch the netperf-orch service
2. Launch the worker pods
3. Wait for the output csv data to show up in orchestrator pod logs
*/
package main
import (
"flag"
"fmt"
"os"
"strings"
"time"
api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
const (
csvDataMarker = "GENERATING CSV OUTPUT"
csvEndDataMarker = "END CSV DATA"
runUUID = "latest"
orchestratorPort = 5202
iperf3Port = 5201
qperf19766 = 19766
qperf19765 = 19765
netperfPort = 12865
)
var (
iterations int
hostnetworking bool
tag string
kubeConfig string
testNamespace string
netperfImage string
cleanupOnly bool
everythingSelector metav1.ListOptions = metav1.ListOptions{}
primaryNode api.Node
secondaryNode api.Node
testFrom, testTo int
)
func init() {
flag.BoolVar(&hostnetworking, "hostnetworking", false,
"(boolean) Enable Host Networking Mode for PODs")
flag.IntVar(&iterations, "iterations", 1,
"Number of iterations to run")
flag.StringVar(&tag, "tag", runUUID, "CSV file suffix")
flag.StringVar(&netperfImage, "image", "sirot/netperf-latest", "Docker image used to run the network tests")
flag.StringVar(&testNamespace, "namespace", "netperf", "Test namespace to run netperf pods")
defaultKubeConfig := fmt.Sprintf("%s/.kube/config", os.Getenv("HOME"))
flag.StringVar(&kubeConfig, "kubeConfig", defaultKubeConfig,
"Location of the kube configuration file ($HOME/.kube/config")
flag.BoolVar(&cleanupOnly, "cleanup", false,
"(boolean) Run the cleanup resources phase only (use this flag to clean up orphaned resources from a test run)")
flag.IntVar(&testFrom, "testFrom", 0, "start from test number testFrom")
flag.IntVar(&testTo, "testTo", 5, "end at test number testTo")
}
func setupClient() *kubernetes.Clientset {
config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
return clientset
}
// getMinions : Only return schedulable/worker nodes
func getMinionNodes(c *kubernetes.Clientset) *api.NodeList {
nodes, err := c.CoreV1().Nodes().List(
metav1.ListOptions{
FieldSelector: "spec.unschedulable=false",
})
if err != nil {
fmt.Println("Failed to fetch nodes", err)
return nil
}
return nodes
}
func cleanup(c *kubernetes.Clientset) {
// Cleanup existing rcs, pods and services in our namespace
rcs, err := c.CoreV1().ReplicationControllers(testNamespace).List(everythingSelector)
if err != nil {
fmt.Println("Failed to get replication controllers", err)
return
}
for _, rc := range rcs.Items {
fmt.Println("Deleting rc", rc.GetName())
if err := c.CoreV1().ReplicationControllers(testNamespace).Delete(
rc.GetName(), &metav1.DeleteOptions{}); err != nil {
fmt.Println("Failed to delete rc", rc.GetName(), err)
}
}
pods, err := c.CoreV1().Pods(testNamespace).List(everythingSelector)
if err != nil {
fmt.Println("Failed to get pods", err)
return
}
for _, pod := range pods.Items {
fmt.Println("Deleting pod", pod.GetName())
if err := c.CoreV1().Pods(testNamespace).Delete(pod.GetName(), &metav1.DeleteOptions{GracePeriodSeconds: new(int64)}); err != nil {
fmt.Println("Failed to delete pod", pod.GetName(), err)
}
}
svcs, err := c.CoreV1().Services(testNamespace).List(everythingSelector)
if err != nil {
fmt.Println("Failed to get services", err)
return
}
for _, svc := range svcs.Items {
fmt.Println("Deleting svc", svc.GetName())
err := c.CoreV1().Services(testNamespace).Delete(
svc.GetName(), &metav1.DeleteOptions{})
if err != nil {
fmt.Println("Failed to get service", err)
}
}
}
// createServices: Long-winded function to programmatically create our two services
func createServices(c *kubernetes.Clientset) bool {
// Create our namespace if not present
if _, err := c.CoreV1().Namespaces().Get(testNamespace, metav1.GetOptions{}); err != nil {
_, err := c.CoreV1().Namespaces().Create(&api.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}})
if err != nil {
fmt.Println("Failed to create service", err)
}
}
// Create the orchestrator service that points to the coordinator pod
orchLabels := map[string]string{"app": "netperf-orch"}
orchService := &api.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "netperf-orch",
},
Spec: api.ServiceSpec{
Selector: orchLabels,
Ports: []api.ServicePort{{
Name: "netperf-orch",
Protocol: api.ProtocolTCP,
Port: orchestratorPort,
TargetPort: intstr.FromInt(orchestratorPort),
}},
Type: api.ServiceTypeClusterIP,
},
}
if _, err := c.CoreV1().Services(testNamespace).Create(orchService); err != nil {
fmt.Println("Failed to create orchestrator service", err)
return false
}
fmt.Println("Created orchestrator service")
// Create the netperf-w2 service that points a clusterIP at the worker 2 pod
netperfW2Labels := map[string]string{"app": "netperf-w2"}
netperfW2Service := &api.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "netperf-w2",
},
Spec: api.ServiceSpec{
Selector: netperfW2Labels,
Ports: []api.ServicePort{
{
Name: "netperf-w2",
Protocol: api.ProtocolTCP,
Port: iperf3Port,
TargetPort: intstr.FromInt(iperf3Port),
},
{
Name: "netperf-w2-qperf19766",
Protocol: api.ProtocolTCP,
Port: qperf19766,
TargetPort: intstr.FromInt(qperf19766),
},
{
Name: "netperf-w2-qperf19765",
Protocol: api.ProtocolTCP,
Port: qperf19765,
TargetPort: intstr.FromInt(qperf19765),
},
{
Name: "netperf-w2-sctp",
Protocol: api.ProtocolSCTP,
Port: iperf3Port,
TargetPort: intstr.FromInt(iperf3Port),
},
{
Name: "netperf-w2-udp",
Protocol: api.ProtocolUDP,
Port: iperf3Port,
TargetPort: intstr.FromInt(iperf3Port),
},
{
Name: "netperf-w2-netperf",
Protocol: api.ProtocolTCP,
Port: netperfPort,
TargetPort: intstr.FromInt(netperfPort),
},
},
Type: api.ServiceTypeClusterIP,
},
}
if _, err := c.CoreV1().Services(testNamespace).Create(netperfW2Service); err != nil {
fmt.Println("Failed to create netperf-w2 service", err)
return false
}
fmt.Println("Created netperf-w2 service")
return true
}
// createRCs - Create replication controllers for all workers and the orchestrator
func createRCs(c *kubernetes.Clientset) bool {
// Create the orchestrator RC
name := "netperf-orch"
fmt.Println("Creating replication controller", name)
replicas := int32(1)
_, err := c.CoreV1().ReplicationControllers(testNamespace).Create(&api.ReplicationController{
ObjectMeta: metav1.ObjectMeta{Name: name},
Spec: api.ReplicationControllerSpec{
Replicas: &replicas,
Selector: map[string]string{"app": name},
Template: &api.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": name},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: name,
Image: netperfImage,
Ports: []api.ContainerPort{{ContainerPort: orchestratorPort}},
Args: []string{
"--mode=orchestrator",
fmt.Sprintf("--testFrom=%d", testFrom),
fmt.Sprintf("--testTo=%d", testTo),
},
ImagePullPolicy: "Always",
},
},
TerminationGracePeriodSeconds: new(int64),
},
},
},
})
if err != nil {
fmt.Println("Error creating orchestrator replication controller", err)
return false
}
fmt.Println("Created orchestrator replication controller")
for i := 1; i <= 3; i++ {
// Bring up pods slowly
time.Sleep(3 * time.Second)
kubeNode := primaryNode.GetName()
if i == 3 {
kubeNode = secondaryNode.GetName()
}
name = fmt.Sprintf("netperf-w%d", i)
fmt.Println("Creating replication controller", name)
portSpec := []api.ContainerPort{}
if i > 1 {
// Worker W1 is a client-only pod - no ports are exposed
portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolTCP})
portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolSCTP})
}
workerEnv := []api.EnvVar{
{Name: "worker", Value: name},
{Name: "kubeNode", Value: kubeNode},
{Name: "podname", Value: name},
}
replicas := int32(1)
_, err := c.CoreV1().ReplicationControllers(testNamespace).Create(&api.ReplicationController{
ObjectMeta: metav1.ObjectMeta{Name: name},
Spec: api.ReplicationControllerSpec{
Replicas: &replicas,
Selector: map[string]string{"app": name},
Template: &api.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": name},
},
Spec: api.PodSpec{
NodeName: kubeNode,
Containers: []api.Container{
{
Name: name,
Image: netperfImage,
Ports: portSpec,
Args: []string{"--mode=worker"},
Env: workerEnv,
ImagePullPolicy: "Always",
},
},
TerminationGracePeriodSeconds: new(int64),
},
},
},
})
if err != nil {
fmt.Println("Error creating orchestrator replication controller", name, ":", err)
return false
}
}
return true
}
func getOrchestratorPodName(pods *api.PodList) string {
for _, pod := range pods.Items {
if strings.Contains(pod.GetName(), "netperf-orch-") {
return pod.GetName()
}
}
return ""
}
// Retrieve the logs for the pod/container and check if csv data has been generated
func getCsvResultsFromPod(c *kubernetes.Clientset, podName string) *string {
body, err := c.CoreV1().Pods(testNamespace).GetLogs(podName, &api.PodLogOptions{Timestamps: false}).DoRaw()
if err != nil {
fmt.Printf("Error (%s) reading logs from pod %s", err, podName)
return nil
}
logData := string(body)
index := strings.Index(logData, csvDataMarker)
endIndex := strings.Index(logData, csvEndDataMarker)
if index == -1 || endIndex == -1 {
return nil
}
csvData := string(body[index+len(csvDataMarker)+1 : endIndex])
return &csvData
}
// processCsvData : Process the CSV datafile and generate line and bar graphs
func processCsvData(csvData *string) bool {
t := time.Now().UTC()
outputFileDirectory := fmt.Sprintf("results_%s-%s", testNamespace, tag)
outputFilePrefix := fmt.Sprintf("%s-%s_%s.", testNamespace, tag, t.Format("20060102150405"))
fmt.Printf("Test concluded - CSV raw data written to %s/%scsv\n", outputFileDirectory, outputFilePrefix)
if _, err := os.Stat(outputFileDirectory); os.IsNotExist(err) {
err := os.Mkdir(outputFileDirectory, 0766)
if err != nil {
fmt.Println("Error creating directory", err)
return false
}
}
fd, err := os.OpenFile(fmt.Sprintf("%s/%scsv", outputFileDirectory, outputFilePrefix), os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
fmt.Println("ERROR writing output CSV datafile", err)
return false
}
_, err = fd.WriteString(*csvData)
if err != nil {
fmt.Println("Error writing string", err)
return false
}
fd.Close()
return true
}
func executeTests(c *kubernetes.Clientset) bool {
for i := 0; i < iterations; i++ {
cleanup(c)
if !createServices(c) {
fmt.Println("Failed to create services - aborting test")
return false
}
time.Sleep(3 * time.Second)
if !createRCs(c) {
fmt.Println("Failed to create replication controllers - aborting test")
return false
}
fmt.Println("Waiting for netperf pods to start up")
var orchestratorPodName string
for len(orchestratorPodName) == 0 {
fmt.Println("Waiting for orchestrator pod creation")
time.Sleep(60 * time.Second)
var pods *api.PodList
var err error
if pods, err = c.CoreV1().Pods(testNamespace).List(everythingSelector); err != nil {
fmt.Println("Failed to fetch pods - waiting for pod creation", err)
continue
}
orchestratorPodName = getOrchestratorPodName(pods)
}
fmt.Println("Orchestrator Pod is", orchestratorPodName)
// The pods orchestrate themselves, we just wait for the results file to show up in the orchestrator container
for {
// Monitor the orchestrator pod for the CSV results file
csvdata := getCsvResultsFromPod(c, orchestratorPodName)
if csvdata == nil {
fmt.Println("Scanned orchestrator pod filesystem - no results file found yet...waiting for orchestrator to write CSV file...")
time.Sleep(60 * time.Second)
continue
}
if processCsvData(csvdata) {
break
}
}
fmt.Printf("TEST RUN (Iteration %d) FINISHED - cleaning up services and pods\n", i)
}
return false
}
func main() {
flag.Parse()
fmt.Println("Network Performance Test")
fmt.Println("Parameters :")
fmt.Println("Iterations : ", iterations)
fmt.Println("Host Networking : ", hostnetworking)
fmt.Println("Test Namespace : ", testNamespace)
fmt.Println("Docker image : ", netperfImage)
fmt.Println("------------------------------------------------------------")
var c *kubernetes.Clientset
if c = setupClient(); c == nil {
fmt.Println("Failed to setup REST client to Kubernetes cluster")
return
}
if cleanupOnly {
cleanup(c)
return
}
nodes := getMinionNodes(c)
if nodes == nil {
return
}
if len(nodes.Items) < 2 {
fmt.Println("Insufficient number of nodes for test (need minimum 2 nodes)")
return
}
primaryNode = nodes.Items[0]
secondaryNode = nodes.Items[1]
fmt.Printf("Selected primary,secondary nodes = (%s, %s)\n", primaryNode.GetName(), secondaryNode.GetName())
executeTests(c)
cleanup(c)
}