redash/utils/configuration.py (77 lines of code) (raw):
import jsonschema
import copy
from jsonschema import ValidationError
from sqlalchemy.ext.mutable import Mutable
from redash.utils import json_dumps, json_loads
SECRET_PLACEHOLDER = "--------"
class ConfigurationContainer(Mutable):
@classmethod
def coerce(cls, key, value):
if not isinstance(value, ConfigurationContainer):
if isinstance(value, dict):
return ConfigurationContainer(value)
# this call will raise ValueError
return Mutable.coerce(key, value)
else:
return value
def __init__(self, config, schema=None):
self._config = config
self.set_schema(schema)
def set_schema(self, schema):
configuration_schema = copy.deepcopy(schema)
if isinstance(configuration_schema, dict):
for prop in configuration_schema.get("properties", {}).values():
if "extendedEnum" in prop:
prop["enum"] = [option["value"] for option in prop["extendedEnum"]]
del prop["extendedEnum"]
self._schema = configuration_schema
@property
def schema(self):
if self._schema is None:
raise RuntimeError("Schema missing.")
return self._schema
def is_valid(self):
try:
self.validate()
except (ValidationError, ValueError):
return False
return True
def validate(self):
jsonschema.validate(self._config, self._schema)
def to_json(self):
return json_dumps(self._config, sort_keys=True)
def iteritems(self):
return self._config.items()
def to_dict(self, mask_secrets=False):
if mask_secrets is False or "secret" not in self.schema:
return self._config
config = self._config.copy()
for key in config:
if key in self.schema["secret"]:
config[key] = SECRET_PLACEHOLDER
return config
def update(self, new_config):
jsonschema.validate(new_config, self.schema)
config = {}
for k, v in new_config.items():
if k in self.schema.get("secret", []) and v == SECRET_PLACEHOLDER:
config[k] = self[k]
else:
config[k] = v
self._config = config
self.changed()
def get(self, *args, **kwargs):
return self._config.get(*args, **kwargs)
def __setitem__(self, key, value):
self._config[key] = value
self.changed()
def __getitem__(self, item):
if item in self._config:
return self._config[item]
raise KeyError(item)
def __contains__(self, item):
return item in self._config
@classmethod
def from_json(cls, config_in_json):
if config_in_json is None:
return cls({})
return cls(json_loads(config_in_json))