def process_resource_bottlenecks()

in performance_prediction/simple_predictor.py [0:0]


    def process_resource_bottlenecks(self, merged: pd.DataFrame) -> (pd.DataFrame, pd.DataFrame):
        """This function is used to determine whether resources should be increased for topology operators
        if they are bottle-necked. Then, expected service rates are updated accordingly."""
        # making a copy of the current plan to modify
        new_plan = self.current_plan.copy()

        temp_merged = merged.copy()
        temp_merged["prop-load"] = (temp_merged["av-cpu-load"]/self.CPU_LOAD_THRESHOLD)

        # processing memory
        temp_merged["prop-time"] = temp_merged["av-gc-time"]/self.GC_TIME_THRESHOLD

        # we find the maximum proportion by which CPU and RAM need to be increased per component
        maximum: pd.DataFrame = temp_merged.groupby("component").max().reset_index()

        # then, we multiply the resources already provisioned by the max proportion
        # they need to be increased by
        for index, row in maximum.iterrows():
            if row["prop-load"] > 1:
                new_plan.loc[new_plan["instance"] == row["component"], "CPU"] =\
                    math.ceil(new_plan.loc[new_plan["instance"] == row["component"]]
                              ["CPU"] * row["prop-load"])
            if row["prop-time"] > 1:
                new_plan.loc[new_plan["instance"] == row["component"], "RAM"] =\
                    math.ceil(new_plan.loc[new_plan["instance"] == row["component"]]
                              ["RAM"] * row["prop-time"])

        # given the above code, we have an updated physical plan but we still need to update the
        # expected service rate, as we expect bottlenecks to be resolved

        # create a copy of the service rates
        expected_service_rate = self.queue.service_rate.copy()

        # this represents the set of tasks whose service rate we're going to update in the current loop
        task_set = set()
        for index, row in new_plan.iterrows():
            tasks = row[["tasks"]][0]  # this is a list of tasks belonging to one component

            if not set(tasks).issubset(task_set):
                # this is the max of the service rates of those tasks
                max_service_rates = pd.DataFrame.max(
                    expected_service_rate.loc[expected_service_rate["task"].isin(tasks)])["mean_service_rate"]

                comp = row["instance"]

                # values can be nan for spouts
                if max_service_rates is not np.nan:
                    prop_time = maximum.loc[maximum["component"] == comp]["prop-time"].iloc[0]
                    prop_load = maximum.loc[maximum["component"] == comp]["prop-load"].iloc[0]

                    min_prop = 0
                    if prop_time > 1 and prop_load > 1:
                        # if we had to increase both CPU and memory resources, we expect a performance improvement
                        # in proportion to the minimum increase.
                        # this is a conservative estimate
                        min_prop = min(prop_time, prop_load)
                    elif prop_load > 1:
                        min_prop = prop_load
                    elif prop_time > 1:
                        min_prop = prop_time

                    if min_prop == 0:
                        min_prop = 1  # so it returns the original result

                    expected_service_rate.loc[expected_service_rate["task"].isin(tasks), "mean_service_rate"] =\
                        max_service_rates * min_prop
                # once we have updated all tasks belonging to a component, there is no need to update any of those
                # tasks again
                task_set.update(tasks)

        return new_plan, expected_service_rate