performance_prediction/util/util.py (86 lines of code) (raw):
# Copyright 2018 Twitter, Inc.
# Licensed under the Apache License, Version 2.0
# http://www.apache.org/licenses/LICENSE-2.0
""" This module contains helper functions associated with packing plans. """
import logging
from collections import defaultdict, namedtuple
from jsonschema import validate
from typing import Dict
LOG: logging.Logger = logging.getLogger(__name__)
InstanceInfo = namedtuple('InstanceInfo', ['parallelism', 'CPU', 'RAM', 'Disk', 'tasks'])
def validate_packing_plan(json_packing_plan) -> bool:
schema = {
"definitions": {
"resource": {
"type": "object",
"properties": {
"cpu": {"type": "number"},
"ram": {"type": "integer"},
"disk": {"type": "integer"}
},
"required": ["cpu", "ram", "disk"]
},
"instance": {
"properties": {
"instance_resources": {
"maxProperties": 1,
"minProperties": 1,
"$ref": "#/definitions/resource"},
"component_name": {"type": "string"},
"task_id": {"type": "integer"}
},
"required": ["instance_resources", "component_name", "task_id"]
},
"container_plan": {
"type": "object",
"properties": {
"scheduled_resources": {
"maxProperties": 1,
"minProperties": 0,
"$ref": "#/definitions/resource"
},
"instances": {"type": "array",
"items": {
"$ref": "#/definitions/instance"}
},
"required_resources": {
"maxProperties": 1,
"minProperties": 1,
"$ref": "#/definitions/resource"}
},
"required": ["required_resources", "instances"]
}
},
"type": "object",
"properties": {
"container_plans": {
"type": "array",
"items": {
"$ref": "#/definitions/container_plan"
}
}
},
"required": ["container_plans"]
}
validate(json_packing_plan, schema)
def summarize_packing_plans(packing_plan) -> Dict[int, tuple]:
# We assume that the resources of all instances are the same
# TODO: How do we factors in different container sizes + padding?
instance_count = {}
instance_cpu = {}
instance_ram = {}
instance_disk = {}
instance_task_ids = defaultdict(list)
result = {}
container_plans = packing_plan["container_plans"]
for container_plan in container_plans:
for instance in container_plan["instances"]:
comp_name = instance["component_name"]
count = instance_count.get(comp_name, 0) + 1
instance_count[comp_name] = count
if comp_name not in instance_cpu:
instance_cpu[comp_name] = instance["instance_resources"]["cpu"]
instance_ram[comp_name] = instance["instance_resources"]["ram"]
instance_disk[comp_name] = instance["instance_resources"]["disk"]
instance_task_ids[comp_name] = [instance["task_id"]]
else:
instance_task_ids[comp_name].append(instance["task_id"])
for comp_name in instance_count:
result[comp_name] = InstanceInfo(instance_count[comp_name], instance_cpu[comp_name],
instance_ram[comp_name], instance_disk[comp_name],
instance_task_ids[comp_name])
return result