performance_prediction/simple_predictor.py (84 lines of code) (raw):
# Copyright 2018 Twitter, Inc.
# Licensed under the Apache License, Version 2.0
# http://www.apache.org/licenses/LICENSE-2.0
""" This module models different queues and performs relevant calculations for it."""
import datetime as dt
import math
from typing import Any
import json
from caladrius.metrics.client import MetricsClient
from caladrius.graph.gremlin.client import GremlinClient
from caladrius.model.topology.heron.abs_queueing_models import QueueingModels
from caladrius.model.topology.heron.helpers import *
from caladrius.performance_prediction.predictor import Predictor
class SimplePredictor(Predictor):
def __init__(self, topology_id: str, cluster: str, environ: str,
start: [dt.datetime], end: [dt.datetime], tracker_url: str, metrics_client: MetricsClient,
graph_client: GremlinClient, queue: QueueingModels, **kwargs: Any):
super().__init__(topology_id, cluster, environ, start, end, tracker_url,
metrics_client, graph_client, queue, **kwargs)
self.GC_TIME_THRESHOLD = 500 # units --> ms
self.CPU_LOAD_THRESHOLD = 0.7 # load per core
def create_new_plan(self) -> json:
""" Predicts the performance of the new packing plan by 1) finding
out the performance of the current plan, 2) finding out where the
new plan is different 3) analysing how that might impact the new
plan's performance.
"""
gc_time: pd.DataFrame = self.metrics_client.get_gc_time(self.topology_id,
self.cluster, self.environ,
self.start, self.end, **self.kwargs)
grouped_gc_time: pd.DataFrame = \
gc_time.groupby(["component", "task"]).mean().reset_index()[["component", "task", "gc-time"]]
grouped_gc_time.rename(index=str, columns={"gc-time": "av-gc-time"}, inplace=True)
cpu_load: pd.DataFrame = self.metrics_client.get_cpu_load(self.topology_id, self.cluster,
self.environ, self.start, self.end, **self.kwargs)
grouped_cpu_load: pd.DataFrame = \
cpu_load.groupby(["component", "task"]).mean().reset_index()[["component", "task", "cpu-load"]]
grouped_cpu_load.rename(index=str, columns={"cpu-load": "av-cpu-load"}, inplace=True)
merged: pd.DataFrame = grouped_cpu_load.merge(grouped_gc_time)
(new_plan, expected_service_rate) = self.process_resource_bottlenecks(merged)
# now check if parallelism has to be updated
new_plan: pd.DataFrame = self.process_parallelism(new_plan, expected_service_rate)
return new_plan.to_json()
def process_resource_bottlenecks(self, merged: pd.DataFrame) -> (pd.DataFrame, pd.DataFrame):
"""This function is used to determine whether resources should be increased for topology operators
if they are bottle-necked. Then, expected service rates are updated accordingly."""
# making a copy of the current plan to modify
new_plan = self.current_plan.copy()
temp_merged = merged.copy()
temp_merged["prop-load"] = (temp_merged["av-cpu-load"]/self.CPU_LOAD_THRESHOLD)
# processing memory
temp_merged["prop-time"] = temp_merged["av-gc-time"]/self.GC_TIME_THRESHOLD
# we find the maximum proportion by which CPU and RAM need to be increased per component
maximum: pd.DataFrame = temp_merged.groupby("component").max().reset_index()
# then, we multiply the resources already provisioned by the max proportion
# they need to be increased by
for index, row in maximum.iterrows():
if row["prop-load"] > 1:
new_plan.loc[new_plan["instance"] == row["component"], "CPU"] =\
math.ceil(new_plan.loc[new_plan["instance"] == row["component"]]
["CPU"] * row["prop-load"])
if row["prop-time"] > 1:
new_plan.loc[new_plan["instance"] == row["component"], "RAM"] =\
math.ceil(new_plan.loc[new_plan["instance"] == row["component"]]
["RAM"] * row["prop-time"])
# given the above code, we have an updated physical plan but we still need to update the
# expected service rate, as we expect bottlenecks to be resolved
# create a copy of the service rates
expected_service_rate = self.queue.service_rate.copy()
# this represents the set of tasks whose service rate we're going to update in the current loop
task_set = set()
for index, row in new_plan.iterrows():
tasks = row[["tasks"]][0] # this is a list of tasks belonging to one component
if not set(tasks).issubset(task_set):
# this is the max of the service rates of those tasks
max_service_rates = pd.DataFrame.max(
expected_service_rate.loc[expected_service_rate["task"].isin(tasks)])["mean_service_rate"]
comp = row["instance"]
# values can be nan for spouts
if max_service_rates is not np.nan:
prop_time = maximum.loc[maximum["component"] == comp]["prop-time"].iloc[0]
prop_load = maximum.loc[maximum["component"] == comp]["prop-load"].iloc[0]
min_prop = 0
if prop_time > 1 and prop_load > 1:
# if we had to increase both CPU and memory resources, we expect a performance improvement
# in proportion to the minimum increase.
# this is a conservative estimate
min_prop = min(prop_time, prop_load)
elif prop_load > 1:
min_prop = prop_load
elif prop_time > 1:
min_prop = prop_time
if min_prop == 0:
min_prop = 1 # so it returns the original result
expected_service_rate.loc[expected_service_rate["task"].isin(tasks), "mean_service_rate"] =\
max_service_rates * min_prop
# once we have updated all tasks belonging to a component, there is no need to update any of those
# tasks again
task_set.update(tasks)
return new_plan, expected_service_rate
def process_parallelism(self, new_plan: pd.DataFrame, expected_service_rate: pd.DataFrame) -> pd.DataFrame:
"""This function takes in the updated packing plan of the topology and the expected service rate
and uses it to determine if the parallelism level of operators needs to be changed. We can conservatively
increase the parallelism level, but we do not decrease it."""
# sum up arrival rate per component
arrival_rate: pd.DataFrame = self.queue.arrival_rate.copy()
for index, row in new_plan.iterrows():
task_arrivals = arrival_rate.loc[arrival_rate["task"].isin(row["tasks"])]
min_serviced = expected_service_rate.loc[expected_service_rate["task"].isin(row["tasks"])][
"mean_service_rate"].min()
if not task_arrivals.empty:
total_arrivals = task_arrivals["mean_arrival_rate"].sum()
# we are assuming equal distribution here.
parallelism = math.ceil(total_arrivals/min_serviced)
if parallelism > row["parallelism"]:
new_plan.loc[index, "parallelism"] = (parallelism)
return new_plan