network/benchmarks/netperf/nptest/nptest.go (640 lines of code) (raw):
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
nptest.go
Dual-mode program - runs as both the orchestrator and as the worker nodes depending on command line flags
The RPC API is contained wholly within this file.
*/
package main
// Imports only base Golang packages
import (
"bytes"
"flag"
"fmt"
"log"
"net"
"net/http"
"net/rpc"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"sync"
"time"
)
type point struct {
mss int
bandwidth string
index int
}
var mode string
var port string
var host string
var worker string
var kubenode string
var podname string
var testFrom, testTo int
var workerStateMap map[string]*workerState
var iperfTCPOutputRegexp *regexp.Regexp
var iperfSCTPOutputRegexp *regexp.Regexp
var iperfUDPOutputRegexp *regexp.Regexp
var netperfOutputRegexp *regexp.Regexp
var iperfCPUOutputRegexp *regexp.Regexp
var dataPoints map[string][]point
var dataPointKeys []string
var datapointsFlushed bool
var globalLock sync.Mutex
const (
workerMode = "worker"
orchestratorMode = "orchestrator"
iperf3Path = "/usr/local/bin/iperf3"
qperfPath = "/usr/local/bin/qperf"
netperfPath = "/usr/local/bin/netperf"
netperfServerPath = "/usr/local/bin/netserver"
outputCaptureFile = "/tmp/output.txt"
mssMin = 96
mssMax = 1460
mssStepSize = 64
msgSizeMax = 1 << 16
msgSizeMin = 1
parallelStreams = "8"
rpcServicePort = "5202"
localhostIPv4Address = "127.0.0.1"
)
const (
iperfTCPTest = iota
qperfTCPTest
iperfUDPTest
iperfSctpTest
netperfTest
)
// NetPerfRPC service that exposes RegisterClient and ReceiveOutput for clients
type NetPerfRPC int
// ClientRegistrationData stores a data about a single client
type ClientRegistrationData struct {
Host string
KubeNode string
Worker string
IP string
}
// IperfClientWorkItem represents a single task for an Iperf client
type IperfClientWorkItem struct {
Host string
Port string
MSS int
MsgSize int
Type int
}
// IperfServerWorkItem represents a single task for an Iperf server
type IperfServerWorkItem struct {
ListenPort string
Timeout int
}
// WorkItem represents a single task for a worker
type WorkItem struct {
IsClientItem bool
IsServerItem bool
IsIdle bool
ClientItem IperfClientWorkItem
ServerItem IperfServerWorkItem
}
type workerState struct {
sentServerItem bool
idle bool
IP string
worker string
}
// WorkerOutput stores the results from a single worker
type WorkerOutput struct {
Output string
Code int
Worker string
Type int
}
type testcase struct {
SourceNode string
DestinationNode string
Label string
ClusterIP bool
Finished bool
MSS int
MsgSize int
Type int
}
var testcases []*testcase
var currentJobIndex int
func init() {
flag.StringVar(&mode, "mode", "worker", "Mode for the daemon (worker | orchestrator)")
flag.StringVar(&port, "port", rpcServicePort, "Port to listen on (defaults to 5202)")
flag.StringVar(&host, "host", "", "IP address to bind to (defaults to 0.0.0.0)")
flag.IntVar(&testFrom, "testFrom", 0, "start from test number testFrom")
flag.IntVar(&testTo, "testTo", 5, "end at test number testTo")
workerStateMap = make(map[string]*workerState)
testcases = []*testcase{
{SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "1 qperf TCP. Same VM using Pod IP", Type: qperfTCPTest, ClusterIP: false, MsgSize: msgSizeMin},
{SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "2 qperf TCP. Same VM using Virtual IP", Type: qperfTCPTest, ClusterIP: true, MsgSize: msgSizeMin},
{SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "3 qperf TCP. Remote VM using Pod IP", Type: qperfTCPTest, ClusterIP: false, MsgSize: msgSizeMin},
{SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "4 qperf TCP. Remote VM using Virtual IP", Type: qperfTCPTest, ClusterIP: true, MsgSize: msgSizeMin},
{SourceNode: "netperf-w2", DestinationNode: "netperf-w2", Label: "5 qperf TCP. Hairpin Pod to own Virtual IP", Type: qperfTCPTest, ClusterIP: true, MsgSize: msgSizeMin},
{SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "1 iperf TCP. Same VM using Pod IP", Type: iperfTCPTest, ClusterIP: false, MSS: mssMin},
{SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "2 iperf TCP. Same VM using Virtual IP", Type: iperfTCPTest, ClusterIP: true, MSS: mssMin},
{SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "3 iperf TCP. Remote VM using Pod IP", Type: iperfTCPTest, ClusterIP: false, MSS: mssMin},
{SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "4 iperf TCP. Remote VM using Virtual IP", Type: iperfTCPTest, ClusterIP: true, MSS: mssMin},
{SourceNode: "netperf-w2", DestinationNode: "netperf-w2", Label: "5 iperf TCP. Hairpin Pod to own Virtual IP", Type: iperfTCPTest, ClusterIP: true, MSS: mssMin},
{SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "6 iperf SCTP. Same VM using Pod IP", Type: iperfSctpTest, ClusterIP: false, MSS: mssMin},
{SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "7 iperf SCTP. Same VM using Virtual IP", Type: iperfSctpTest, ClusterIP: true, MSS: mssMin},
{SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "8 iperf SCTP. Remote VM using Pod IP", Type: iperfSctpTest, ClusterIP: false, MSS: mssMin},
{SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "9 iperf SCTP. Remote VM using Virtual IP", Type: iperfSctpTest, ClusterIP: true, MSS: mssMin},
{SourceNode: "netperf-w2", DestinationNode: "netperf-w2", Label: "10 iperf SCTP. Hairpin Pod to own Virtual IP", Type: iperfSctpTest, ClusterIP: true, MSS: mssMin},
{SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "11 iperf UDP. Same VM using Pod IP", Type: iperfUDPTest, ClusterIP: false, MSS: mssMax},
{SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "12 iperf UDP. Same VM using Virtual IP", Type: iperfUDPTest, ClusterIP: true, MSS: mssMax},
{SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "13 iperf UDP. Remote VM using Pod IP", Type: iperfUDPTest, ClusterIP: false, MSS: mssMax},
{SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "14 iperf UDP. Remote VM using Virtual IP", Type: iperfUDPTest, ClusterIP: true, MSS: mssMax},
{SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "15 netperf. Same VM using Pod IP", Type: netperfTest, ClusterIP: false},
{SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "16 netperf. Same VM using Virtual IP", Type: netperfTest, ClusterIP: true},
{SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "17 netperf. Remote VM using Pod IP", Type: netperfTest, ClusterIP: false},
{SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "18 netperf. Remote VM using Virtual IP", Type: netperfTest, ClusterIP: true},
}
currentJobIndex = 0
// Regexes to parse the Mbits/sec out of iperf TCP, SCTP, UDP and netperf output
iperfTCPOutputRegexp = regexp.MustCompile("SUM.*\\s+(\\d+)\\sMbits/sec\\s+receiver")
iperfSCTPOutputRegexp = regexp.MustCompile("SUM.*\\s+(\\d+)\\sMbits/sec\\s+receiver")
iperfUDPOutputRegexp = regexp.MustCompile("\\s+(\\S+)\\sMbits/sec\\s+\\S+\\s+ms\\s+")
netperfOutputRegexp = regexp.MustCompile("\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\S+\\s+(\\S+)\\s+")
iperfCPUOutputRegexp = regexp.MustCompile(`local/sender\s(\d+\.\d+)%\s\((\d+\.\d+)%\w/(\d+\.\d+)%\w\),\sremote/receiver\s(\d+\.\d+)%\s\((\d+\.\d+)%\w/(\d+\.\d+)%\w\)`)
dataPoints = make(map[string][]point)
}
func initializeOutputFiles() {
fd, err := os.OpenFile(outputCaptureFile, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
fmt.Println("Failed to open output capture file", err)
os.Exit(2)
}
fd.Close()
}
func main() {
initializeOutputFiles()
flag.Parse()
if !validateParams() {
fmt.Println("Failed to parse cmdline args - fatal error - bailing out")
os.Exit(1)
}
grabEnv()
testcases = testcases[testFrom:testTo]
fmt.Println("Running as", mode, "...")
if mode == orchestratorMode {
orchestrate()
} else {
startWork()
}
fmt.Println("Terminating npd")
}
func grabEnv() {
worker = os.Getenv("worker")
kubenode = os.Getenv("kubenode")
podname = os.Getenv("HOSTNAME")
}
func validateParams() (rv bool) {
rv = true
if mode != workerMode && mode != orchestratorMode {
fmt.Println("Invalid mode", mode)
return false
}
if len(port) == 0 {
fmt.Println("Invalid port", port)
return false
}
if (len(host)) == 0 {
if mode == orchestratorMode {
host = os.Getenv("NETPERF_ORCH_SERVICE_HOST")
} else {
host = os.Getenv("NETPERF_ORCH_SERVICE_HOST")
}
}
return
}
func allWorkersIdle() bool {
for _, v := range workerStateMap {
if !v.idle {
return false
}
}
return true
}
func getWorkerPodIP(worker string) string {
return workerStateMap[worker].IP
}
func allocateWorkToClient(workerState *workerState, workItem *WorkItem) {
if !allWorkersIdle() {
workItem.IsIdle = true
return
}
// System is all idle - pick up next work item to allocate to client
for n, v := range testcases {
if v.Finished {
continue
}
if v.SourceNode != workerState.worker {
workItem.IsIdle = true
return
}
if _, ok := workerStateMap[v.DestinationNode]; !ok {
workItem.IsIdle = true
return
}
fmt.Printf("Requesting jobrun '%s' from %s to %s for MSS %d for MsgSize %d\n", v.Label, v.SourceNode, v.DestinationNode, v.MSS, v.MsgSize)
workItem.ClientItem.Type = v.Type
workItem.IsClientItem = true
workerState.idle = false
currentJobIndex = n
if !v.ClusterIP {
workItem.ClientItem.Host = getWorkerPodIP(v.DestinationNode)
} else {
workItem.ClientItem.Host = os.Getenv("NETPERF_W2_SERVICE_HOST")
}
switch {
case v.Type == iperfTCPTest || v.Type == iperfUDPTest || v.Type == iperfSctpTest:
workItem.ClientItem.Port = "5201"
workItem.ClientItem.MSS = v.MSS
v.MSS = v.MSS + mssStepSize
if v.MSS > mssMax {
v.Finished = true
}
return
case v.Type == qperfTCPTest:
workItem.ClientItem.MsgSize = v.MsgSize
v.MsgSize <<= 1
if v.MsgSize > msgSizeMax {
v.Finished = true
}
return
case v.Type == netperfTest:
workItem.ClientItem.Port = "12865"
return
}
}
for _, v := range testcases {
if !v.Finished {
return
}
}
if !datapointsFlushed {
fmt.Println("ALL TESTCASES AND MSS RANGES COMPLETE - GENERATING CSV OUTPUT")
flushDataPointsToCsv()
datapointsFlushed = true
}
workItem.IsIdle = true
}
// RegisterClient registers a single and assign a work item to it
func (t *NetPerfRPC) RegisterClient(data *ClientRegistrationData, workItem *WorkItem) error {
globalLock.Lock()
defer globalLock.Unlock()
state, ok := workerStateMap[data.Worker]
if !ok {
// For new clients, trigger an iperf server start immediately
state = &workerState{sentServerItem: true, idle: true, IP: data.IP, worker: data.Worker}
workerStateMap[data.Worker] = state
workItem.IsServerItem = true
workItem.ServerItem.ListenPort = "5201"
workItem.ServerItem.Timeout = 3600
return nil
}
// Worker defaults to idle unless the allocateWork routine below assigns an item
state.idle = true
// Give the worker a new work item or let it idle loop another 5 seconds
allocateWorkToClient(state, workItem)
return nil
}
func writeOutputFile(filename, data string) {
fd, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
fmt.Println("Failed to append to existing file", filename, err)
return
}
defer fd.Close()
if _, err = fd.WriteString(data); err != nil {
fmt.Println("Failed to append to existing file", filename, err)
}
}
func registerDataPoint(label string, mss int, value string, index int) {
if sl, ok := dataPoints[label]; !ok {
dataPoints[label] = []point{{mss: mss, bandwidth: value, index: index}}
dataPointKeys = append(dataPointKeys, label)
} else {
dataPoints[label] = append(sl, point{mss: mss, bandwidth: value, index: index})
}
}
func flushDataPointsToCsv() {
var buffer string
// Write the MSS points for the X-axis before dumping all the testcase datapoints
for _, points := range dataPoints {
if len(points) == 1 {
continue
}
buffer = fmt.Sprintf("%-45s, Maximum,", "MSS")
for _, p := range points {
buffer = buffer + fmt.Sprintf(" %d,", p.mss)
}
break
}
fmt.Println(buffer)
for _, label := range dataPointKeys {
buffer = fmt.Sprintf("%-45s,", label)
points := dataPoints[label]
var max float64
for _, p := range points {
fv, _ := strconv.ParseFloat(p.bandwidth, 64)
if fv > max {
max = fv
}
}
buffer = buffer + fmt.Sprintf("%f,", max)
for _, p := range points {
buffer = buffer + fmt.Sprintf("%s,", p.bandwidth)
}
fmt.Println(buffer)
}
fmt.Println("END CSV DATA")
}
func parseIperfTCPBandwidth(output string) string {
// Parses the output of iperf3 and grabs the group Mbits/sec from the output
match := iperfTCPOutputRegexp.FindStringSubmatch(output)
if match != nil && len(match) > 1 {
return match[1]
}
return "0"
}
func parseQperfTCPLatency(output string) string {
squeeze := func(s string) string {
return strings.Join(strings.Fields(s), " ")
}
var bw, lat string
lines := strings.Split(output, "\n")
for i, line := range lines {
line = strings.TrimSpace(line)
if line == "tcp_bw:" {
bw = squeeze(lines[i+1])
} else if line == "tcp_lat:" {
lat = squeeze(lines[i+1])
}
}
return fmt.Sprintf("(%s; %s)", bw, lat)
}
func parseIperfSctpBandwidth(output string) string {
// Parses the output of iperf3 and grabs the group Mbits/sec from the output
match := iperfSCTPOutputRegexp.FindStringSubmatch(output)
if match != nil && len(match) > 1 {
return match[1]
}
return "0"
}
func parseIperfUDPBandwidth(output string) string {
// Parses the output of iperf3 (UDP mode) and grabs the Mbits/sec from the output
match := iperfUDPOutputRegexp.FindStringSubmatch(output)
if match != nil && len(match) > 1 {
return match[1]
}
return "0"
}
func parseIperfCPUUsage(output string) (string, string) {
// Parses the output of iperf and grabs the CPU usage on sender and receiver side from the output
match := iperfCPUOutputRegexp.FindStringSubmatch(output)
if match != nil && len(match) > 1 {
return match[1], match[4]
}
return "0", "0"
}
func parseNetperfBandwidth(output string) string {
// Parses the output of netperf and grabs the Bbits/sec from the output
match := netperfOutputRegexp.FindStringSubmatch(output)
if match != nil && len(match) > 1 {
return match[1]
}
return "0"
}
// ReceiveOutput processes a data received from a single client
func (t *NetPerfRPC) ReceiveOutput(data *WorkerOutput, reply *int) error {
globalLock.Lock()
defer globalLock.Unlock()
testcase := testcases[currentJobIndex]
var outputLog string
var bw string
var cpuSender string
var cpuReceiver string
switch data.Type {
case iperfTCPTest:
mss := testcases[currentJobIndex].MSS - mssStepSize
outputLog = outputLog + fmt.Sprintln("Received TCP output from worker", data.Worker, "for test", testcase.Label,
"from", testcase.SourceNode, "to", testcase.DestinationNode, "MSS:", mss) + data.Output
writeOutputFile(outputCaptureFile, outputLog)
bw = parseIperfTCPBandwidth(data.Output)
cpuSender, cpuReceiver = parseIperfCPUUsage(data.Output)
registerDataPoint(testcase.Label, mss, bw, currentJobIndex)
case qperfTCPTest:
msgSize := testcases[currentJobIndex].MsgSize / 2
outputLog = outputLog + fmt.Sprintln("Received TCP output from worker", data.Worker, "for test", testcase.Label,
"from", testcase.SourceNode, "to", testcase.DestinationNode, "MsgSize:", msgSize) + data.Output
writeOutputFile(outputCaptureFile, outputLog)
bw = parseQperfTCPLatency(data.Output)
cpuSender, cpuReceiver = "na", "na"
registerDataPoint(testcase.Label, msgSize, bw, currentJobIndex)
case iperfSctpTest:
mss := testcases[currentJobIndex].MSS - mssStepSize
outputLog = outputLog + fmt.Sprintln("Received SCTP output from worker", data.Worker, "for test", testcase.Label,
"from", testcase.SourceNode, "to", testcase.DestinationNode, "MSS:", mss) + data.Output
writeOutputFile(outputCaptureFile, outputLog)
bw = parseIperfSctpBandwidth(data.Output)
cpuSender, cpuReceiver = parseIperfCPUUsage(data.Output)
registerDataPoint(testcase.Label, mss, bw, currentJobIndex)
case iperfUDPTest:
mss := testcases[currentJobIndex].MSS - mssStepSize
outputLog = outputLog + fmt.Sprintln("Received UDP output from worker", data.Worker, "for test", testcase.Label,
"from", testcase.SourceNode, "to", testcase.DestinationNode, "MSS:", mss) + data.Output
writeOutputFile(outputCaptureFile, outputLog)
bw = parseIperfUDPBandwidth(data.Output)
registerDataPoint(testcase.Label, mss, bw, currentJobIndex)
case netperfTest:
outputLog = outputLog + fmt.Sprintln("Received netperf output from worker", data.Worker, "for test", testcase.Label,
"from", testcase.SourceNode, "to", testcase.DestinationNode) + data.Output
writeOutputFile(outputCaptureFile, outputLog)
bw = parseNetperfBandwidth(data.Output)
registerDataPoint(testcase.Label, 0, bw, currentJobIndex)
testcases[currentJobIndex].Finished = true
}
switch data.Type {
case iperfTCPTest, iperfSctpTest:
fmt.Println("Jobdone from worker", data.Worker, "Bandwidth was", bw, "Mbits/sec. CPU usage sender was", cpuSender, "%. CPU usage receiver was", cpuReceiver, "%.")
case qperfTCPTest:
fmt.Println("Jobdone from worker QPERF", data.Worker, "Bandwidth, Latency was", bw, "CPU usage sender was", cpuSender, "%. CPU usage receiver was", cpuReceiver, "%.")
default:
fmt.Println("Jobdone from worker", data.Worker, "Bandwidth was", bw, "Mbits/sec")
}
return nil
}
func serveRPCRequests(port string) {
baseObject := new(NetPerfRPC)
err := rpc.Register(baseObject)
if err != nil {
log.Fatal("failed to register rpc", err)
}
rpc.HandleHTTP()
listener, e := net.Listen("tcp", ":"+port)
if e != nil {
log.Fatal("listen error:", e)
}
err = http.Serve(listener, nil)
if err != nil {
log.Fatal("failed start server", err)
}
}
// Blocking RPC server start - only runs on the orchestrator
func orchestrate() {
serveRPCRequests(rpcServicePort)
}
// Walk the list of interfaces and find the first interface that has a valid IP
// Inside a container, there should be only one IP-enabled interface
func getMyIP() string {
ifaces, err := net.Interfaces()
if err != nil {
return localhostIPv4Address
}
for _, iface := range ifaces {
if iface.Flags&net.FlagLoopback == 0 {
addrs, _ := iface.Addrs()
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
return ip.String()
}
}
}
return "127.0.0.1"
}
func handleClientWorkItem(client *rpc.Client, workItem *WorkItem) {
fmt.Println("Orchestrator requests worker run item Type:", workItem.ClientItem.Type)
switch {
case workItem.ClientItem.Type == iperfTCPTest || workItem.ClientItem.Type == iperfUDPTest || workItem.ClientItem.Type == iperfSctpTest:
outputString := iperfClient(workItem.ClientItem.Host, workItem.ClientItem.Port, workItem.ClientItem.MSS, workItem.ClientItem.Type)
var reply int
err := client.Call("NetPerfRPC.ReceiveOutput", WorkerOutput{Output: outputString, Worker: worker, Type: workItem.ClientItem.Type}, &reply)
if err != nil {
log.Fatal("failed to call client", err)
}
case workItem.ClientItem.Type == qperfTCPTest:
outputString := qperfClient(workItem.ClientItem.Host, workItem.ClientItem.Type, workItem.ClientItem.MsgSize)
var reply int
err := client.Call("NetPerfRPC.ReceiveOutput", WorkerOutput{Output: outputString, Worker: worker, Type: workItem.ClientItem.Type}, &reply)
if err != nil {
log.Fatal("failed to call client", err)
}
case workItem.ClientItem.Type == netperfTest:
outputString := netperfClient(workItem.ClientItem.Host, workItem.ClientItem.Port, workItem.ClientItem.Type)
var reply int
err := client.Call("NetPerfRPC.ReceiveOutput", WorkerOutput{Output: outputString, Worker: worker, Type: workItem.ClientItem.Type}, &reply)
if err != nil {
log.Fatal("failed to call client", err)
}
}
// Client COOLDOWN period before asking for next work item to replenish burst allowance policers etc
time.Sleep(10 * time.Second)
}
// isIPv6: Determines if an address is an IPv6 address
func isIPv6(address string) bool {
x := net.ParseIP(address)
return x != nil && x.To4() == nil && x.To16() != nil
}
// startWork : Entry point to the worker infinite loop
func startWork() {
for true {
var timeout time.Duration
var client *rpc.Client
var err error
address := host
if isIPv6(address) {
address = "[" + address + "]"
}
timeout = 5
for true {
fmt.Println("Attempting to connect to orchestrator at", host)
client, err = rpc.DialHTTP("tcp", address+":"+port)
if err == nil {
break
}
fmt.Println("RPC connection to ", host, " failed:", err)
time.Sleep(timeout * time.Second)
}
for true {
clientData := ClientRegistrationData{Host: podname, KubeNode: kubenode, Worker: worker, IP: getMyIP()}
var workItem WorkItem
if err := client.Call("NetPerfRPC.RegisterClient", clientData, &workItem); err != nil {
// RPC server has probably gone away - attempt to reconnect
fmt.Println("Error attempting RPC call", err)
break
}
switch {
case workItem.IsIdle == true:
time.Sleep(5 * time.Second)
continue
case workItem.IsServerItem == true:
fmt.Println("Orchestrator requests worker run iperf and netperf servers")
go iperfServer()
go qperfServer()
go netperfServer()
time.Sleep(1 * time.Second)
case workItem.IsClientItem == true:
handleClientWorkItem(client, &workItem)
}
}
}
}
// Invoke and indefinitely run an iperf server
func iperfServer() {
output, success := cmdExec(iperf3Path, []string{iperf3Path, "-s", host, "-J", "-i", "60"}, 15)
if success {
fmt.Println(output)
}
}
// Invoke and indefinitely run an qperf server
func qperfServer() {
output, success := cmdExec(qperfPath, []string{qperfPath}, 15)
if success {
fmt.Println(output)
}
}
// Invoke and indefinitely run netperf server
func netperfServer() {
output, success := cmdExec(netperfServerPath, []string{netperfServerPath, "-D"}, 15)
if success {
fmt.Println(output)
}
}
// Invoke and run an iperf client and return the output if successful.
func iperfClient(serverHost, serverPort string, mss int, workItemType int) (rv string) {
switch {
case workItemType == iperfTCPTest:
output, success := cmdExec(iperf3Path, []string{iperf3Path, "-c", serverHost, "-V", "-N", "-i", "30", "-t", "10", "-f", "m", "-w", "512M", "-Z", "-P", parallelStreams, "-M", strconv.Itoa(mss)}, 15)
if success {
rv = output
}
case workItemType == iperfSctpTest:
output, success := cmdExec(iperf3Path, []string{iperf3Path, "-c", serverHost, "-V", "-N", "-i", "30", "-t", "10", "-f", "m", "-w", "512M", "-Z", "-P", parallelStreams, "-M", strconv.Itoa(mss), "--sctp"}, 15)
if success {
rv = output
}
case workItemType == iperfUDPTest:
output, success := cmdExec(iperf3Path, []string{iperf3Path, "-c", serverHost, "-i", "30", "-t", "10", "-f", "m", "-b", "0", "-u"}, 15)
if success {
rv = output
}
}
return
}
// Invoke and run an qperf client and return the output if successful.
func qperfClient(serverHost string, workItemType, msgSize int) (rv string) {
str := fmt.Sprint
switch {
case workItemType == qperfTCPTest:
output, success := cmdExec(qperfPath, []string{
qperfPath, "-ip", "19766", "-m", str(msgSize), serverHost, "tcp_bw", "tcp_lat",
}, 15)
if success {
rv = output
}
default:
fmt.Println("unknown work item type: ", workItemType)
}
return
}
// Invoke and run a netperf client and return the output if successful.
func netperfClient(serverHost, serverPort string, workItemType int) (rv string) {
output, success := cmdExec(netperfPath, []string{netperfPath, "-H", serverHost}, 15)
if success {
fmt.Println(output)
rv = output
} else {
fmt.Println("Error running netperf client", output)
}
return
}
func cmdExec(command string, args []string, timeout int32) (rv string, rc bool) {
cmd := exec.Cmd{Path: command, Args: args}
var stdoutput bytes.Buffer
var stderror bytes.Buffer
cmd.Stdout = &stdoutput
cmd.Stderr = &stderror
if err := cmd.Run(); err != nil {
outputstr := stdoutput.String()
errstr := stderror.String()
fmt.Println("Failed to run", outputstr, "error:", errstr, err)
return
}
rv = stdoutput.String()
rc = true
return
}