dusty/models/config.py (259 lines of code) (raw):
#!/usr/bin/python3
# coding=utf-8
# pylint: disable=I0011,R0903,R0201,E0401,W0702,C0411
# Copyright 2019 getcarrier.io
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Config helper
"""
import os
import re
import sys
import yaml
import pkgutil
import importlib
from ruamel.yaml.comments import CommentedMap
from dusty.models.depot import SecretDepotModel, ObjectDepotModel, StateDepotModel
from dusty.tools.module import DataModuleLoader
from dusty.tools.dict import recursive_merge, recursive_merge_existing
from dusty.tools import log, depots, seeds
from dusty import constants
class ConfigModel:
""" Parses config """
def __init__(self, context):
""" Initialize context instance """
super().__init__()
self.context = context
def load(self, config_seed, config_variable, config_file, suite):
""" Load and parse config """
config = self._load_config(config_seed, config_variable, config_file)
if not self._validate_config_base(config, suite):
raise ValueError("Invalid config")
context_config = self._prepare_context_config(config, suite)
self.context.suite = suite
self.context.config = context_config
log.debug("Resulting context config: %s", self.context.config)
log.info("Loaded %s suite configuration", self.context.suite)
def _load_config(self, config_seed, config_variable, config_file):
log.info("Loading config from seed")
config_data = seeds.unseed(config_seed)
if not config_data:
log.info("Loading config from %s", config_variable)
config_data = os.environ.get(config_variable, None)
if not config_data:
log.info("Loading config from %s", config_file)
with open(config_file, "rb") as file_:
config_data = file_.read()
config = self._variable_substitution(
yaml.load(
os.path.expandvars(config_data),
Loader=yaml.FullLoader
)
)
return config
def _variable_substitution(self, obj):
""" Allows to use raw environmental variables inside YAML/JSON config """
if isinstance(obj, dict):
for key in list(obj.keys()):
obj[self._variable_substitution(key)] = \
self._variable_substitution(obj.pop(key))
if isinstance(obj, list):
for index, item in enumerate(obj):
obj[index] = self._variable_substitution(item)
if isinstance(obj, str):
if re.match(r"^\$\![a-zA-Z_][a-zA-Z0-9_]*$", obj.strip()) \
and obj.strip()[2:] in os.environ:
return os.environ[obj.strip()[2:]]
return obj
def _prepare_context_config(self, config, suite):
""" Prepare context configuration, e.g.: resolve secret variables, merge config objects """
context_config = recursive_merge(config["global"], config["suites"].get(suite))
# Allow to inherit from other suite
if context_config.get("inherit_from", None) and \
config["suites"].get(context_config.get("inherit_from"), None):
context_config = recursive_merge(
config["suites"].get(context_config.get("inherit_from")), context_config
)
# Process depots and load modules
for _ in range(3):
self._load_modules(context_config)
context_config = self._process_depots(context_config)
# Load config objects
base_config = self._depot_read_config_object("__base__.yaml")
project_config = self._depot_read_config_object(
context_config["settings"].get("load_settings_from", None)
)
merge_config = self._depot_read_config_object("__merge__.yaml")
override_config = self._depot_read_config_object("__override__.yaml")
# Merge resulting config
result = base_config
result = recursive_merge(result, context_config)
result = recursive_merge(result, project_config)
result = recursive_merge_existing(result, merge_config)
result = recursive_merge(result, override_config)
# Process depots and load modules with resulting config
for _ in range(3):
self._load_modules(result)
result = self._process_depots(result)
return result
def _load_modules(self, context_config):
modules_to_load = list()
for item in [
value for key, value in context_config["settings"].items() if \
key.startswith("load_module_from") or key.startswith("load_modules_from")
]:
if isinstance(item, list):
modules_to_load.extend(item)
else:
modules_to_load.append(item)
for module_name in modules_to_load:
if module_name in self.context.modules:
continue # Module is already loaded during first iteration
module_object = depots.get_object(self.context, module_name)
if module_object is not None:
sys.meta_path.insert(0, DataModuleLoader(module_object))
self.context.modules.append(module_name)
log.info("Loaded module from %s", module_name)
def _process_depots(self, current_context_config): # pylint: disable=R0912
context_config = current_context_config
# Check depot config section sanity
if "depots" not in context_config["settings"]:
context_config["settings"]["depots"] = dict()
depot_sections = ["secret", "object", "state"]
for depot_section in depot_sections:
if depot_section not in context_config["settings"]["depots"]:
context_config["settings"]["depots"][depot_section] = dict()
# Support legacy depot configuration
legacy_depot_names = [
item for item in list(
context_config["settings"].get("depots", dict())
) if item not in depot_sections
]
legacy_section_map = {
SecretDepotModel: "secret",
ObjectDepotModel: "object",
StateDepotModel: "state"
}
for depot_name in legacy_depot_names:
try:
depot_class = importlib.import_module(
f"dusty.tools.depots.{depot_name}.depot"
).Depot
for depot_type in legacy_section_map:
if issubclass(depot_class, depot_type):
depot_section = legacy_section_map[depot_type]
context_config["settings"]["depots"][depot_section][depot_name] = \
context_config["settings"]["depots"][depot_name]
log.info("Legacy depot %s added to section %s", depot_name, depot_section)
context_config["settings"]["depots"].pop(depot_name)
except:
log.exception("Failed to process legacy depot %s", depot_name)
# Make depot instances
for depot_section in depot_sections:
for depot_name in list(context_config["settings"]["depots"][depot_section]):
try:
depot_class = importlib.import_module(
f"dusty.tools.depots.{depot_name}.depot"
).Depot
if depot_class.get_name() in self.context.depots[depot_section]:
continue # Depot already enabled in first iteration
depot = depot_class(
self.context,
context_config["settings"]["depots"][depot_section][depot_name]
)
self.context.depots[depot_section][depot.get_name()] = depot
log.info("Enabled %s depot %s", depot_section, depot_name)
if isinstance(depot, SecretDepotModel):
self.context.set_meta("depots_resolved_secrets", 0)
context_config = self._depot_substitution(context_config)
log.info(
"Resolved %d secrets from depots",
self.context.get_meta("depots_resolved_secrets", 0)
)
except ModuleNotFoundError:
log.warning(
"Depot %s is not enabled, probably needed module is not loaded yet",
depot_name
)
except:
log.exception("Failed to enable depot %s", depot_name)
#
return context_config
def _depot_substitution(self, obj):
""" Allows to use depot secrets inside YAML/JSON config """
if isinstance(obj, dict):
for key in list(obj.keys()):
obj[self._depot_substitution(key)] = \
self._depot_substitution(obj.pop(key))
if isinstance(obj, list):
for index, item in enumerate(obj):
obj[index] = self._depot_substitution(item)
if isinstance(obj, str):
if re.match(r"^\$\=\S*$", obj.strip()):
obj_key = obj.strip()[2:]
obj_value = depots.get_secret(self.context, obj_key)
if obj_value is not None:
self.context.set_meta(
"depots_resolved_secrets",
self.context.get_meta("depots_resolved_secrets", 0) + 1
)
return obj_value
return obj
def _depot_read_config_object(self, obj):
result = dict()
if obj is None:
return result
data = depots.get_object(self.context, obj)
if data is None:
return result
try:
self.context.set_meta("depots_resolved_secrets", 0)
result = self._depot_substitution(
self._variable_substitution(
yaml.load(
os.path.expandvars(data),
Loader=yaml.FullLoader
)
)
)
log.info("Loaded %s from depots", obj)
log.debug(
"Resolved %d object secrets from depots",
self.context.get_meta("depots_resolved_secrets", 0)
)
return result
except:
return result
def _validate_config_base(self, config, suite):
if config.get(constants.CONFIG_VERSION_KEY, 0) != constants.CURRENT_CONFIG_VERSION:
log.error("Invalid config version")
return False
if "global" not in config:
config["global"] = dict()
if "suites" not in config:
log.error("Suites are not defined")
return False
if not config["suites"].get(suite, None):
log.error("Suite is not defined: %s", suite)
log.info("Available suites: %s", ", ".join(list(config["suites"])))
return False
if "settings" not in config["suites"][suite]:
config["suites"][suite]["settings"] = dict()
return True
def list_suites(self, config_seed, config_variable, config_file):
""" List available suites from config """
config = self._load_config(config_seed, config_variable, config_file)
if "suites" not in config:
log.error("Suites are not defined")
return list()
return list(config["suites"])
@staticmethod
def fill_config(data_obj):
""" Make sample config """
data_obj.insert(
len(data_obj), constants.CONFIG_VERSION_KEY, constants.CURRENT_CONFIG_VERSION
)
data_obj.insert(
len(data_obj), "global", CommentedMap(), comment="Common settings for all suites"
)
data_obj.insert(len(data_obj), "suites", CommentedMap(), comment="Test suites")
global_obj = data_obj["global"]
global_obj.insert(
len(global_obj), "settings", CommentedMap(), comment="General config"
)
settings_obj = global_obj["settings"]
settings_obj.insert(
len(settings_obj), "load_module_from", "module.zip",
comment="(optional) Load extension module from depot. Can be a string or list"
)
settings_obj.insert(
len(settings_obj), "depots", CommentedMap(), comment="Upstream setting providers config"
)
depots_obj = settings_obj["depots"]
depots_module = importlib.import_module("dusty.tools.depots")
for _, name, pkg in pkgutil.iter_modules(depots_module.__path__):
if not pkg:
continue
depot = importlib.import_module(
"dusty.tools.depots.{}.depot".format(name)
)
depots_obj.insert(
len(depots_obj), name, CommentedMap(),
comment=depot.Depot.get_description()
)
depot.Depot.fill_config(depots_obj[name])