redash/destinations/__init__.py (61 lines of code) (raw):
import logging
logger = logging.getLogger(__name__)
__all__ = ["BaseDestination", "register", "get_destination", "import_destinations"]
class BaseDestination(object):
deprecated = False
def __init__(self, configuration):
self.configuration = configuration
@classmethod
def name(cls):
return cls.__name__
@classmethod
def type(cls):
return cls.__name__.lower()
@classmethod
def icon(cls):
return "fa-bullseye"
@classmethod
def enabled(cls):
return True
@classmethod
def configuration_schema(cls):
return {}
def notify(self, alert, query, user, new_state, app, host, options):
raise NotImplementedError()
@classmethod
def to_dict(cls):
return {
"name": cls.name(),
"type": cls.type(),
"icon": cls.icon(),
"configuration_schema": cls.configuration_schema(),
**({ "deprecated": True } if cls.deprecated else {})
}
destinations = {}
def register(destination_class):
global destinations
if destination_class.enabled():
logger.debug(
"Registering %s (%s) destinations.",
destination_class.name(),
destination_class.type(),
)
destinations[destination_class.type()] = destination_class
else:
logger.warning(
"%s destination enabled but not supported, not registering. Either disable or install missing dependencies.",
destination_class.name(),
)
def get_destination(destination_type, configuration):
destination_class = destinations.get(destination_type, None)
if destination_class is None:
return None
return destination_class(configuration)
def get_configuration_schema_for_destination_type(destination_type):
destination_class = destinations.get(destination_type, None)
if destination_class is None:
return None
return destination_class.configuration_schema()
def import_destinations(destination_imports):
for destination_import in destination_imports:
__import__(destination_import)