in performance_prediction/simple_predictor.py [0:0]
def process_resource_bottlenecks(self, merged: pd.DataFrame) -> (pd.DataFrame, pd.DataFrame):
"""This function is used to determine whether resources should be increased for topology operators
if they are bottle-necked. Then, expected service rates are updated accordingly."""
# making a copy of the current plan to modify
new_plan = self.current_plan.copy()
temp_merged = merged.copy()
temp_merged["prop-load"] = (temp_merged["av-cpu-load"]/self.CPU_LOAD_THRESHOLD)
# processing memory
temp_merged["prop-time"] = temp_merged["av-gc-time"]/self.GC_TIME_THRESHOLD
# we find the maximum proportion by which CPU and RAM need to be increased per component
maximum: pd.DataFrame = temp_merged.groupby("component").max().reset_index()
# then, we multiply the resources already provisioned by the max proportion
# they need to be increased by
for index, row in maximum.iterrows():
if row["prop-load"] > 1:
new_plan.loc[new_plan["instance"] == row["component"], "CPU"] =\
math.ceil(new_plan.loc[new_plan["instance"] == row["component"]]
["CPU"] * row["prop-load"])
if row["prop-time"] > 1:
new_plan.loc[new_plan["instance"] == row["component"], "RAM"] =\
math.ceil(new_plan.loc[new_plan["instance"] == row["component"]]
["RAM"] * row["prop-time"])
# given the above code, we have an updated physical plan but we still need to update the
# expected service rate, as we expect bottlenecks to be resolved
# create a copy of the service rates
expected_service_rate = self.queue.service_rate.copy()
# this represents the set of tasks whose service rate we're going to update in the current loop
task_set = set()
for index, row in new_plan.iterrows():
tasks = row[["tasks"]][0] # this is a list of tasks belonging to one component
if not set(tasks).issubset(task_set):
# this is the max of the service rates of those tasks
max_service_rates = pd.DataFrame.max(
expected_service_rate.loc[expected_service_rate["task"].isin(tasks)])["mean_service_rate"]
comp = row["instance"]
# values can be nan for spouts
if max_service_rates is not np.nan:
prop_time = maximum.loc[maximum["component"] == comp]["prop-time"].iloc[0]
prop_load = maximum.loc[maximum["component"] == comp]["prop-load"].iloc[0]
min_prop = 0
if prop_time > 1 and prop_load > 1:
# if we had to increase both CPU and memory resources, we expect a performance improvement
# in proportion to the minimum increase.
# this is a conservative estimate
min_prop = min(prop_time, prop_load)
elif prop_load > 1:
min_prop = prop_load
elif prop_time > 1:
min_prop = prop_time
if min_prop == 0:
min_prop = 1 # so it returns the original result
expected_service_rate.loc[expected_service_rate["task"].isin(tasks), "mean_service_rate"] =\
max_service_rates * min_prop
# once we have updated all tasks belonging to a component, there is no need to update any of those
# tasks again
task_set.update(tasks)
return new_plan, expected_service_rate