redash/models/__init__.py (1,227 lines of code) (raw):

import datetime import calendar import logging import time import numbers import pytz from sqlalchemy import distinct, or_, and_, UniqueConstraint, cast from sqlalchemy.dialects import postgresql from sqlalchemy.event import listens_for from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import backref, contains_eager, joinedload, subqueryload, load_only from sqlalchemy.orm.exc import NoResultFound # noqa: F401 from sqlalchemy import func from sqlalchemy_utils import generic_relationship from sqlalchemy_utils.types import TSVectorType from sqlalchemy_utils.models import generic_repr from sqlalchemy_utils.types.encrypted.encrypted_type import FernetEngine from redash import redis_connection, utils, settings from redash.destinations import ( get_configuration_schema_for_destination_type, get_destination, ) from redash.metrics import database # noqa: F401 from redash.query_runner import ( with_ssh_tunnel, get_configuration_schema_for_query_runner_type, get_query_runner, TYPE_BOOLEAN, TYPE_DATE, TYPE_DATETIME, BaseQueryRunner) from redash.utils import ( generate_token, json_dumps, json_loads, mustache_render, base_url, sentry, gen_query_hash) from redash.utils.configuration import ConfigurationContainer from redash.models.parameterized_query import ParameterizedQuery from .base import db, gfk_type, Column, GFKBase, SearchBaseQuery, key_type, primary_key from .changes import ChangeTrackingMixin, Change # noqa from .mixins import BelongsToOrgMixin, TimestampMixin from .organizations import Organization from .types import ( EncryptedConfiguration, Configuration, MutableDict, MutableList, PseudoJSON, pseudo_json_cast_property ) from .users import AccessPermission, AnonymousUser, ApiUser, Group, User # noqa logger = logging.getLogger(__name__) class ScheduledQueriesExecutions(object): KEY_NAME = "sq:executed_at" def __init__(self): self.executions = {} def refresh(self): self.executions = redis_connection.hgetall(self.KEY_NAME) def update(self, query_id): redis_connection.hmset(self.KEY_NAME, {query_id: time.time()}) def get(self, query_id): timestamp = self.executions.get(str(query_id)) if timestamp: timestamp = utils.dt_from_timestamp(timestamp) return timestamp scheduled_queries_executions = ScheduledQueriesExecutions() @generic_repr("id", "name", "type", "org_id", "created_at") class DataSource(BelongsToOrgMixin, db.Model): id = primary_key("DataSource") org_id = Column(key_type("Organization"), db.ForeignKey("organizations.id")) org = db.relationship(Organization, backref="data_sources") name = Column(db.String(255)) type = Column(db.String(255)) options = Column( "encrypted_options", ConfigurationContainer.as_mutable( EncryptedConfiguration( db.Text, settings.DATASOURCE_SECRET_KEY, FernetEngine ) ), ) queue_name = Column(db.String(255), default="queries") scheduled_queue_name = Column(db.String(255), default="scheduled_queries") created_at = Column(db.DateTime(True), default=db.func.now()) data_source_groups = db.relationship( "DataSourceGroup", back_populates="data_source", cascade="all" ) __tablename__ = "data_sources" __table_args__ = (db.Index("data_sources_org_id_name", "org_id", "name"),) def __eq__(self, other): return self.id == other.id def __hash__(self): return hash(self.id) def to_dict(self, all=False, with_permissions_for=None): d = { "id": self.id, "name": self.name, "type": self.type, "syntax": self.query_runner.syntax, "paused": self.paused, "pause_reason": self.pause_reason, "supports_auto_limit": self.query_runner.supports_auto_limit } if all: schema = get_configuration_schema_for_query_runner_type(self.type) self.options.set_schema(schema) d["options"] = self.options.to_dict(mask_secrets=True) d["queue_name"] = self.queue_name d["scheduled_queue_name"] = self.scheduled_queue_name d["groups"] = self.groups if with_permissions_for is not None: d["view_only"] = ( db.session.query(DataSourceGroup.view_only) .filter( DataSourceGroup.group == with_permissions_for, DataSourceGroup.data_source == self, ) .one()[0] ) return d def __str__(self): return str(self.name) @classmethod def create_with_group(cls, *args, **kwargs): data_source = cls(*args, **kwargs) data_source_group = DataSourceGroup( data_source=data_source, group=data_source.org.default_group ) db.session.add_all([data_source, data_source_group]) return data_source @classmethod def all(cls, org, group_ids=None): data_sources = cls.query.filter(cls.org == org).order_by(cls.id.asc()) if group_ids: data_sources = data_sources.join(DataSourceGroup).filter( DataSourceGroup.group_id.in_(group_ids) ) return data_sources.distinct() @classmethod def get_by_id(cls, _id): return cls.query.filter(cls.id == _id).one() def delete(self): Query.query.filter(Query.data_source == self).update( dict(data_source_id=None, latest_query_data_id=None) ) QueryResult.query.filter(QueryResult.data_source == self).delete() res = db.session.delete(self) db.session.commit() redis_connection.delete(self._schema_key) return res def get_cached_schema(self): cache = redis_connection.get(self._schema_key) return json_loads(cache) if cache else None def get_schema(self, refresh=False): out_schema = None if not refresh: out_schema = self.get_cached_schema() if out_schema is None: query_runner = self.query_runner schema = query_runner.get_schema(get_stats=refresh) try: out_schema = self._sort_schema(schema) except Exception: logging.exception( "Error sorting schema columns for data_source {}".format(self.id) ) out_schema = schema finally: redis_connection.set(self._schema_key, json_dumps(out_schema)) return out_schema def _sort_schema(self, schema): return [ {"name": i["name"], "columns": sorted(i["columns"], key=lambda x: x["name"] if isinstance(x, dict) else x)} for i in sorted(schema, key=lambda x: x["name"]) ] @property def _schema_key(self): return "data_source:schema:{}".format(self.id) @property def _pause_key(self): return "ds:{}:pause".format(self.id) @property def paused(self): return redis_connection.exists(self._pause_key) @property def pause_reason(self): return redis_connection.get(self._pause_key) def pause(self, reason=None): redis_connection.set(self._pause_key, reason or "") def resume(self): redis_connection.delete(self._pause_key) def add_group(self, group, view_only=False): dsg = DataSourceGroup(group=group, data_source=self, view_only=view_only) db.session.add(dsg) return dsg def remove_group(self, group): DataSourceGroup.query.filter( DataSourceGroup.group == group, DataSourceGroup.data_source == self ).delete() db.session.commit() def update_group_permission(self, group, view_only): dsg = DataSourceGroup.query.filter( DataSourceGroup.group == group, DataSourceGroup.data_source == self ).one() dsg.view_only = view_only db.session.add(dsg) return dsg @property def uses_ssh_tunnel(self): return "ssh_tunnel" in self.options @property def query_runner(self): query_runner = get_query_runner(self.type, self.options) if self.uses_ssh_tunnel: query_runner = with_ssh_tunnel(query_runner, self.options.get("ssh_tunnel")) return query_runner @classmethod def get_by_name(cls, name): return cls.query.filter(cls.name == name).one() # XXX examine call sites to see if a regular SQLA collection would work better @property def groups(self): groups = DataSourceGroup.query.filter(DataSourceGroup.data_source == self) return dict([(group.group_id, group.view_only) for group in groups]) @generic_repr("id", "data_source_id", "group_id", "view_only") class DataSourceGroup(db.Model): # XXX drop id, use datasource/group as PK id = primary_key("DataSourceGroup") data_source_id = Column(key_type("DataSource"), db.ForeignKey("data_sources.id")) data_source = db.relationship(DataSource, back_populates="data_source_groups") group_id = Column(key_type("Group"), db.ForeignKey("groups.id")) group = db.relationship(Group, back_populates="data_sources") view_only = Column(db.Boolean, default=False) __tablename__ = "data_source_groups" DESERIALIZED_DATA_ATTR = "_deserialized_data" class DBPersistence(object): @property def data(self): if self._data is None: return None if not hasattr(self, DESERIALIZED_DATA_ATTR): setattr(self, DESERIALIZED_DATA_ATTR, json_loads(self._data)) return self._deserialized_data @data.setter def data(self, data): if hasattr(self, DESERIALIZED_DATA_ATTR): delattr(self, DESERIALIZED_DATA_ATTR) self._data = data QueryResultPersistence = ( settings.dynamic_settings.QueryResultPersistence or DBPersistence ) @generic_repr("id", "org_id", "data_source_id", "query_hash", "runtime", "retrieved_at") class QueryResult(db.Model, QueryResultPersistence, BelongsToOrgMixin): id = primary_key("QueryResult") org_id = Column(key_type("Organization"), db.ForeignKey("organizations.id")) org = db.relationship(Organization) data_source_id = Column(key_type("DataSource"), db.ForeignKey("data_sources.id")) data_source = db.relationship(DataSource, backref=backref("query_results")) query_hash = Column(db.String(32), index=True) query_text = Column("query", db.Text) _data = Column("data", db.Text) runtime = Column(postgresql.DOUBLE_PRECISION) retrieved_at = Column(db.DateTime(True)) __tablename__ = "query_results" def __str__(self): return "%d | %s | %s" % (self.id, self.query_hash, self.retrieved_at) def to_dict(self): return { "id": self.id, "query_hash": self.query_hash, "query": self.query_text, "data": self.data, "data_source_id": self.data_source_id, "runtime": self.runtime, "retrieved_at": self.retrieved_at, } @classmethod def unused(cls, days=7): age_threshold = datetime.datetime.now() - datetime.timedelta(days=days) return ( cls.query.filter( Query.id.is_(None), cls.retrieved_at < age_threshold ).outerjoin(Query) ).options(load_only("id")) @classmethod def get_latest(cls, data_source, query, max_age=0): query_hash = gen_query_hash(query) if max_age == -1: query = cls.query.filter( cls.query_hash == query_hash, cls.data_source == data_source ) else: query = cls.query.filter( cls.query_hash == query_hash, cls.data_source == data_source, ( db.func.timezone("utc", cls.retrieved_at) + datetime.timedelta(seconds=max_age) >= db.func.timezone("utc", db.func.now()) ), ) return query.order_by(cls.retrieved_at.desc()).first() @classmethod def store_result( cls, org, data_source, query_hash, query, data, run_time, retrieved_at ): query_result = cls( org_id=org, query_hash=query_hash, query_text=query, runtime=run_time, data_source=data_source, retrieved_at=retrieved_at, data=data, ) db.session.add(query_result) logging.info("Inserted query (%s) data; id=%s", query_hash, query_result.id) return query_result @property def groups(self): return self.data_source.groups def should_schedule_next( previous_iteration, now, interval, time=None, day_of_week=None, failures=0 ): # if time exists then interval > 23 hours (82800s) # if day_of_week exists then interval > 6 days (518400s) if time is None: ttl = int(interval) next_iteration = previous_iteration + datetime.timedelta(seconds=ttl) else: hour, minute = time.split(":") hour, minute = int(hour), int(minute) # The following logic is needed for cases like the following: # - The query scheduled to run at 23:59. # - The scheduler wakes up at 00:01. # - Using naive implementation of comparing timestamps, it will skip the execution. normalized_previous_iteration = previous_iteration.replace( hour=hour, minute=minute ) if normalized_previous_iteration > previous_iteration: previous_iteration = normalized_previous_iteration - datetime.timedelta( days=1 ) days_delay = int(interval) / 60 / 60 / 24 days_to_add = 0 if day_of_week is not None: days_to_add = ( list(calendar.day_name).index(day_of_week) - normalized_previous_iteration.weekday() ) next_iteration = ( previous_iteration + datetime.timedelta(days=days_delay) + datetime.timedelta(days=days_to_add) ).replace(hour=hour, minute=minute) if failures: try: next_iteration += datetime.timedelta(minutes=2 ** failures) except OverflowError: return False return now > next_iteration @gfk_type @generic_repr( "id", "name", "query_hash", "version", "user_id", "org_id", "data_source_id", "query_hash", "last_modified_by_id", "is_archived", "is_draft", "schedule", "schedule_failures", ) class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model): id = primary_key("Query") version = Column(db.Integer, default=1) org_id = Column(key_type("Organization"), db.ForeignKey("organizations.id")) org = db.relationship(Organization, backref="queries") data_source_id = Column(key_type("DataSource"), db.ForeignKey("data_sources.id"), nullable=True) data_source = db.relationship(DataSource, backref="queries") latest_query_data_id = Column( key_type("QueryResult"), db.ForeignKey("query_results.id"), nullable=True ) latest_query_data = db.relationship(QueryResult) name = Column(db.String(255)) description = Column(db.String(4096), nullable=True) query_text = Column("query", db.Text) query_hash = Column(db.String(32)) api_key = Column(db.String(40), default=lambda: generate_token(40)) user_id = Column(key_type("User"), db.ForeignKey("users.id")) user = db.relationship(User, foreign_keys=[user_id]) last_modified_by_id = Column(key_type("User"), db.ForeignKey("users.id"), nullable=True) last_modified_by = db.relationship( User, backref="modified_queries", foreign_keys=[last_modified_by_id] ) is_archived = Column(db.Boolean, default=False, index=True) is_draft = Column(db.Boolean, default=True, index=True) schedule = Column(MutableDict.as_mutable(PseudoJSON), nullable=True) interval = pseudo_json_cast_property(db.Integer, "schedule", "interval", default=0) schedule_failures = Column(db.Integer, default=0) visualizations = db.relationship("Visualization", cascade="all, delete-orphan") options = Column(MutableDict.as_mutable(PseudoJSON), default={}) search_vector = Column( TSVectorType( "id", "name", "description", "query", weights={"name": "A", "id": "B", "description": "C", "query": "D"}, ), nullable=True, ) tags = Column( "tags", MutableList.as_mutable(postgresql.ARRAY(db.Unicode)), nullable=True ) query_class = SearchBaseQuery __tablename__ = "queries" __mapper_args__ = {"version_id_col": version, "version_id_generator": False} def __str__(self): return str(self.id) def archive(self, user=None): db.session.add(self) self.is_archived = True self.schedule = None for vis in self.visualizations: for w in vis.widgets: db.session.delete(w) for a in self.alerts: db.session.delete(a) if user: self.record_changes(user) def regenerate_api_key(self): self.api_key = generate_token(40) @classmethod def create(cls, **kwargs): query = cls(**kwargs) db.session.add( Visualization( query_rel=query, name="Table", description="", type="TABLE", options="{}", ) ) return query @classmethod def all_queries( cls, group_ids, user_id=None, include_drafts=False, include_archived=False ): query_ids = ( db.session.query(distinct(cls.id)) .join( DataSourceGroup, Query.data_source_id == DataSourceGroup.data_source_id ) .filter(Query.is_archived.is_(include_archived)) .filter(DataSourceGroup.group_id.in_(group_ids)) ) queries = ( cls.query.options( joinedload(Query.user), joinedload(Query.latest_query_data).load_only( "runtime", "retrieved_at" ), ) .filter(cls.id.in_(query_ids)) # Adding outer joins to be able to order by relationship .outerjoin(User, User.id == Query.user_id) .outerjoin(QueryResult, QueryResult.id == Query.latest_query_data_id) .options( contains_eager(Query.user), contains_eager(Query.latest_query_data) ) ) if not include_drafts: queries = queries.filter( or_(Query.is_draft.is_(False), Query.user_id == user_id) ) return queries @classmethod def favorites(cls, user, base_query=None): if base_query is None: base_query = cls.all_queries(user.group_ids, user.id, include_drafts=True) return base_query.join( ( Favorite, and_(Favorite.object_type == "Query", Favorite.object_id == Query.id), ) ).filter(Favorite.user_id == user.id) @classmethod def all_tags(cls, user, include_drafts=False): queries = cls.all_queries( group_ids=user.group_ids, user_id=user.id, include_drafts=include_drafts ) tag_column = func.unnest(cls.tags).label("tag") usage_count = func.count(1).label("usage_count") query = ( db.session.query(tag_column, usage_count) .group_by(tag_column) .filter(Query.id.in_(queries.options(load_only("id")))) .order_by(usage_count.desc()) ) return query @classmethod def by_user(cls, user): return cls.all_queries(user.group_ids, user.id).filter(Query.user == user) @classmethod def by_api_key(cls, api_key): return cls.query.filter(cls.api_key == api_key).one() @classmethod def past_scheduled_queries(cls): now = utils.utcnow() queries = Query.query.filter(Query.schedule.isnot(None)).order_by(Query.id) return [ query for query in queries if query.schedule["until"] is not None and pytz.utc.localize( datetime.datetime.strptime(query.schedule["until"], "%Y-%m-%d") ) <= now ] @classmethod def outdated_queries(cls): queries = ( Query.query.options( joinedload(Query.latest_query_data).load_only("retrieved_at") ) .filter(Query.schedule.isnot(None)) .order_by(Query.id) .all() ) now = utils.utcnow() outdated_queries = {} scheduled_queries_executions.refresh() for query in queries: try: if query.schedule.get("disabled"): continue if query.schedule["until"]: schedule_until = pytz.utc.localize( datetime.datetime.strptime(query.schedule["until"], "%Y-%m-%d") ) if schedule_until <= now: continue retrieved_at = scheduled_queries_executions.get(query.id) or ( query.latest_query_data and query.latest_query_data.retrieved_at ) if should_schedule_next( retrieved_at or now, now, query.schedule["interval"], query.schedule["time"], query.schedule["day_of_week"], query.schedule_failures, ): key = "{}:{}".format(query.query_hash, query.data_source_id) outdated_queries[key] = query except Exception as e: query.schedule["disabled"] = True db.session.commit() message = ( "Could not determine if query %d is outdated due to %s. The schedule for this query has been disabled." % (query.id, repr(e)) ) logging.info(message) sentry.capture_exception( type(e)(message).with_traceback(e.__traceback__) ) return list(outdated_queries.values()) @classmethod def search( cls, term, group_ids, user_id=None, include_drafts=False, limit=None, include_archived=False, multi_byte_search=False, ): all_queries = cls.all_queries( group_ids, user_id=user_id, include_drafts=include_drafts, include_archived=include_archived, ) if multi_byte_search: # Since tsvector doesn't work well with CJK languages, use `ilike` too pattern = "%{}%".format(term) return ( all_queries.filter( or_(cls.name.ilike(pattern), cls.description.ilike(pattern)) ) .order_by(Query.id) .limit(limit) ) # sort the result using the weight as defined in the search vector column return all_queries.search(term, sort=True).limit(limit) @classmethod def search_by_user(cls, term, user, limit=None): return cls.by_user(user).search(term, sort=True).limit(limit) @classmethod def recent(cls, group_ids, user_id=None, limit=20): query = ( cls.query.filter(Event.created_at > (db.func.current_date() - 7)) .join(Event, Query.id == Event.object_id.cast(db.Integer)) .join( DataSourceGroup, Query.data_source_id == DataSourceGroup.data_source_id ) .filter( Event.action.in_( ["edit", "execute", "edit_name", "edit_description", "view_source"] ), Event.object_id != None, Event.object_type == "query", DataSourceGroup.group_id.in_(group_ids), or_(Query.is_draft == False, Query.user_id == user_id), Query.is_archived == False, ) .group_by(Event.object_id, Query.id) .order_by(db.desc(db.func.count(0))) ) if user_id: query = query.filter(Event.user_id == user_id) query = query.limit(limit) return query @classmethod def get_by_id(cls, _id): return cls.query.filter(cls.id == _id).one() @classmethod def all_groups_for_query_ids(cls, query_ids): query = """SELECT group_id, view_only FROM queries JOIN data_source_groups ON queries.data_source_id = data_source_groups.data_source_id WHERE queries.id in :ids""" return db.session.execute(query, {"ids": tuple(query_ids)}).fetchall() @classmethod def update_latest_result(cls, query_result): # TODO: Investigate how big an impact this select-before-update makes. queries = Query.query.filter( Query.query_hash == query_result.query_hash, Query.data_source == query_result.data_source, ) for q in queries: q.latest_query_data = query_result # don't auto-update the updated_at timestamp q.skip_updated_at = True db.session.add(q) query_ids = [q.id for q in queries] logging.info( "Updated %s queries with result (%s).", len(query_ids), query_result.query_hash, ) return query_ids def fork(self, user): forked_list = [ "org", "data_source", "latest_query_data", "description", "query_text", "query_hash", "options", "tags", ] kwargs = {a: getattr(self, a) for a in forked_list} # Query.create will add default TABLE visualization, so use constructor to create bare copy of query forked_query = Query( name="Copy of (#{}) {}".format(self.id, self.name), user=user, **kwargs ) for v in sorted(self.visualizations, key=lambda v: v.id): forked_v = v.copy() forked_v["query_rel"] = forked_query fv = Visualization( **forked_v ) # it will magically add it to `forked_query.visualizations` db.session.add(fv) db.session.add(forked_query) return forked_query @property def runtime(self): return self.latest_query_data.runtime @property def retrieved_at(self): return self.latest_query_data.retrieved_at @property def groups(self): if self.data_source is None: return {} return self.data_source.groups @hybrid_property def lowercase_name(self): "Optional property useful for sorting purposes." return self.name.lower() @lowercase_name.expression def lowercase_name(cls): "The SQLAlchemy expression for the property above." return func.lower(cls.name) @property def parameters(self): return self.options.get("parameters", []) @property def parameterized(self): return ParameterizedQuery(self.query_text, self.parameters, self.org) @property def dashboard_api_keys(self): query = """SELECT api_keys.api_key FROM api_keys JOIN dashboards ON object_id = dashboards.id JOIN widgets ON dashboards.id = widgets.dashboard_id JOIN visualizations ON widgets.visualization_id = visualizations.id WHERE object_type='dashboards' AND active=true AND visualizations.query_id = :id""" api_keys = db.session.execute(query, {"id": self.id}).fetchall() return [api_key[0] for api_key in api_keys] def update_query_hash(self): should_apply_auto_limit = self.options.get("apply_auto_limit", False) if self.options else False query_runner = self.data_source.query_runner if self.data_source else BaseQueryRunner({}) self.query_hash = query_runner.gen_query_hash(self.query_text, should_apply_auto_limit) @listens_for(Query, "before_insert") @listens_for(Query, "before_update") def receive_before_insert_update(mapper, connection, target): target.update_query_hash() @listens_for(Query.user_id, "set") def query_last_modified_by(target, val, oldval, initiator): target.last_modified_by_id = val @generic_repr("id", "object_type", "object_id", "user_id", "org_id") class Favorite(TimestampMixin, db.Model): id = primary_key("Favorite") org_id = Column(key_type("Organization"), db.ForeignKey("organizations.id")) object_type = Column(db.Unicode(255)) object_id = Column(key_type("Favorite")) object = generic_relationship(object_type, object_id) user_id = Column(key_type("User"), db.ForeignKey("users.id")) user = db.relationship(User, backref="favorites") __tablename__ = "favorites" __table_args__ = ( UniqueConstraint("object_type", "object_id", "user_id", name="unique_favorite"), ) @classmethod def is_favorite(cls, user, object): return cls.query.filter(cls.object == object, cls.user_id == user).count() > 0 @classmethod def are_favorites(cls, user, objects): objects = list(objects) if not objects: return [] object_type = str(objects[0].__class__.__name__) return [ fav.object_id for fav in cls.query.filter( cls.object_id.in_([o.id for o in objects]), cls.object_type == object_type, cls.user_id == user, ) ] OPERATORS = { ">": lambda v, t: v > t, ">=": lambda v, t: v >= t, "<": lambda v, t: v < t, "<=": lambda v, t: v <= t, "==": lambda v, t: v == t, "!=": lambda v, t: v != t, # backward compatibility "greater than": lambda v, t: v > t, "less than": lambda v, t: v < t, "equals": lambda v, t: v == t, } def next_state(op, value, threshold): if isinstance(value, bool): # If it's a boolean cast to string and lower case, because upper cased # boolean value is Python specific and most likely will be confusing to # users. value = str(value).lower() else: try: value = float(value) value_is_number = True except ValueError: value_is_number = isinstance(value, numbers.Number) if value_is_number: try: threshold = float(threshold) except ValueError: return Alert.UNKNOWN_STATE else: value = str(value) if op(value, threshold): new_state = Alert.TRIGGERED_STATE else: new_state = Alert.OK_STATE return new_state @generic_repr( "id", "name", "query_id", "user_id", "state", "last_triggered_at", "rearm" ) class Alert(TimestampMixin, BelongsToOrgMixin, db.Model): UNKNOWN_STATE = "unknown" OK_STATE = "ok" TRIGGERED_STATE = "triggered" id = primary_key("Alert") name = Column(db.String(255)) query_id = Column(key_type("Query"), db.ForeignKey("queries.id")) query_rel = db.relationship(Query, backref=backref("alerts", cascade="all")) user_id = Column(key_type("User"), db.ForeignKey("users.id")) user = db.relationship(User, backref="alerts") options = Column(MutableDict.as_mutable(PseudoJSON)) state = Column(db.String(255), default=UNKNOWN_STATE) subscriptions = db.relationship("AlertSubscription", cascade="all, delete-orphan") last_triggered_at = Column(db.DateTime(True), nullable=True) rearm = Column(db.Integer, nullable=True) __tablename__ = "alerts" @classmethod def all(cls, group_ids): return ( cls.query.options(joinedload(Alert.user), joinedload(Alert.query_rel)) .join(Query) .join( DataSourceGroup, DataSourceGroup.data_source_id == Query.data_source_id ) .filter(DataSourceGroup.group_id.in_(group_ids)) ) @classmethod def get_by_id_and_org(cls, object_id, org): return super(Alert, cls).get_by_id_and_org(object_id, org, Query) def evaluate(self): data = self.query_rel.latest_query_data.data if data["rows"] and self.options["column"] in data["rows"][0]: op = OPERATORS.get(self.options["op"], lambda v, t: False) value = data["rows"][0][self.options["column"]] threshold = self.options["value"] new_state = next_state(op, value, threshold) else: new_state = self.UNKNOWN_STATE return new_state def subscribers(self): return User.query.join(AlertSubscription).filter( AlertSubscription.alert == self ) def render_template(self, template): if template is None: return "" data = self.query_rel.latest_query_data.data host = base_url(self.query_rel.org) col_name = self.options["column"] if data["rows"] and col_name in data["rows"][0]: result_value = data["rows"][0][col_name] else: result_value = None context = { "ALERT_NAME": self.name, "ALERT_URL": "{host}/alerts/{alert_id}".format(host=host, alert_id=self.id), "ALERT_STATUS": self.state.upper(), "ALERT_CONDITION": self.options["op"], "ALERT_THRESHOLD": self.options["value"], "QUERY_NAME": self.query_rel.name, "QUERY_URL": "{host}/queries/{query_id}".format( host=host, query_id=self.query_rel.id ), "QUERY_RESULT_VALUE": result_value, "QUERY_RESULT_ROWS": data["rows"], "QUERY_RESULT_COLS": data["columns"], } return mustache_render(template, context) @property def custom_body(self): template = self.options.get("custom_body", self.options.get("template")) return self.render_template(template) @property def custom_subject(self): template = self.options.get("custom_subject") return self.render_template(template) @property def groups(self): return self.query_rel.groups @property def muted(self): return self.options.get("muted", False) def generate_slug(ctx): slug = utils.slugify(ctx.current_parameters["name"]) tries = 1 while Dashboard.query.filter(Dashboard.slug == slug).first() is not None: slug = utils.slugify(ctx.current_parameters["name"]) + "_" + str(tries) tries += 1 return slug @gfk_type @generic_repr( "id", "name", "slug", "user_id", "org_id", "version", "is_archived", "is_draft" ) class Dashboard(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model): id = primary_key("Dashboard") version = Column(db.Integer) org_id = Column(key_type("Organization"), db.ForeignKey("organizations.id")) org = db.relationship(Organization, backref="dashboards") slug = Column(db.String(140), index=True, default=generate_slug) name = Column(db.String(100)) user_id = Column(key_type("User"), db.ForeignKey("users.id")) user = db.relationship(User) # layout is no longer used, but kept so we know how to render old dashboards. layout = Column(db.Text) dashboard_filters_enabled = Column(db.Boolean, default=False) is_archived = Column(db.Boolean, default=False, index=True) is_draft = Column(db.Boolean, default=True, index=True) widgets = db.relationship("Widget", backref="dashboard", lazy="dynamic") tags = Column( "tags", MutableList.as_mutable(postgresql.ARRAY(db.Unicode)), nullable=True ) options = Column( MutableDict.as_mutable(postgresql.JSON), server_default="{}", default={} ) __tablename__ = "dashboards" __mapper_args__ = {"version_id_col": version} def __str__(self): return "%s=%s" % (self.id, self.name) @property def name_as_slug(self): return utils.slugify(self.name) @classmethod def all(cls, org, group_ids, user_id): query = ( Dashboard.query.options( joinedload(Dashboard.user).load_only( "id", "name", "_profile_image_url", "email" ) ).distinct(Dashboard.created_at, Dashboard.slug) .outerjoin(Widget) .outerjoin(Visualization) .outerjoin(Query) .outerjoin( DataSourceGroup, Query.data_source_id == DataSourceGroup.data_source_id ) .filter( Dashboard.is_archived == False, ( DataSourceGroup.group_id.in_(group_ids) | (Dashboard.user_id == user_id) ), Dashboard.org == org, ) ) query = query.filter( or_(Dashboard.user_id == user_id, Dashboard.is_draft == False) ) return query @classmethod def search(cls, org, groups_ids, user_id, search_term): # TODO: switch to FTS return cls.all(org, groups_ids, user_id).filter( cls.name.ilike("%{}%".format(search_term)) ) @classmethod def search_by_user(cls, term, user, limit=None): return cls.by_user(user).filter(cls.name.ilike("%{}%".format(term))).limit(limit) @classmethod def all_tags(cls, org, user): dashboards = cls.all(org, user.group_ids, user.id) tag_column = func.unnest(cls.tags).label("tag") usage_count = func.count(1).label("usage_count") query = ( db.session.query(tag_column, usage_count) .group_by(tag_column) .filter(Dashboard.id.in_(dashboards.options(load_only("id")))) .order_by(usage_count.desc()) ) return query @classmethod def favorites(cls, user, base_query=None): if base_query is None: base_query = cls.all(user.org, user.group_ids, user.id) return base_query.join( ( Favorite, and_( Favorite.object_type == "Dashboard", Favorite.object_id == Dashboard.id, ), ) ).filter(Favorite.user_id == user.id) @classmethod def by_user(cls, user): return cls.all(user.org, user.group_ids, user.id).filter(Dashboard.user == user) @classmethod def get_by_slug_and_org(cls, slug, org): return cls.query.filter(cls.slug == slug, cls.org == org).one() @hybrid_property def lowercase_name(self): "Optional property useful for sorting purposes." return self.name.lower() @lowercase_name.expression def lowercase_name(cls): "The SQLAlchemy expression for the property above." return func.lower(cls.name) @generic_repr("id", "name", "type", "query_id") class Visualization(TimestampMixin, BelongsToOrgMixin, db.Model): id = primary_key("Visualization") type = Column(db.String(100)) query_id = Column(key_type("Query"), db.ForeignKey("queries.id")) # query_rel and not query, because db.Model already has query defined. query_rel = db.relationship(Query, back_populates="visualizations") name = Column(db.String(255)) description = Column(db.String(4096), nullable=True) options = Column(db.Text) __tablename__ = "visualizations" def __str__(self): return "%s %s" % (self.id, self.type) @classmethod def get_by_id_and_org(cls, object_id, org): return super(Visualization, cls).get_by_id_and_org(object_id, org, Query) def copy(self): return { "type": self.type, "name": self.name, "description": self.description, "options": self.options, } @generic_repr("id", "visualization_id", "dashboard_id") class Widget(TimestampMixin, BelongsToOrgMixin, db.Model): id = primary_key("Widget") visualization_id = Column( key_type("Visualization"), db.ForeignKey("visualizations.id"), nullable=True ) visualization = db.relationship( Visualization, backref=backref("widgets", cascade="delete") ) text = Column(db.Text, nullable=True) width = Column(db.Integer) options = Column(db.Text) dashboard_id = Column(key_type("Dashboard"), db.ForeignKey("dashboards.id"), index=True) __tablename__ = "widgets" def __str__(self): return "%s" % self.id @classmethod def get_by_id_and_org(cls, object_id, org): return super(Widget, cls).get_by_id_and_org(object_id, org, Dashboard) @generic_repr( "id", "object_type", "object_id", "action", "user_id", "org_id", "created_at" ) class Event(db.Model): id = primary_key("Event") org_id = Column(key_type("Organization"), db.ForeignKey("organizations.id")) org = db.relationship(Organization, back_populates="events") user_id = Column(key_type("User"), db.ForeignKey("users.id"), nullable=True) user = db.relationship(User, backref="events") action = Column(db.String(255)) object_type = Column(db.String(255)) object_id = Column(db.String(255), nullable=True) additional_properties = Column( MutableDict.as_mutable(PseudoJSON), nullable=True, default={} ) created_at = Column(db.DateTime(True), default=db.func.now()) __tablename__ = "events" def __str__(self): return "%s,%s,%s,%s" % ( self.user_id, self.action, self.object_type, self.object_id, ) def to_dict(self): return { "org_id": self.org_id, "user_id": self.user_id, "action": self.action, "object_type": self.object_type, "object_id": self.object_id, "additional_properties": self.additional_properties, "created_at": self.created_at.isoformat(), } @classmethod def record(cls, event): org_id = event.pop("org_id") user_id = event.pop("user_id", None) action = event.pop("action") object_type = event.pop("object_type") object_id = event.pop("object_id", None) created_at = datetime.datetime.utcfromtimestamp(event.pop("timestamp")) event = cls( org_id=org_id, user_id=user_id, action=action, object_type=object_type, object_id=object_id, additional_properties=event, created_at=created_at, ) db.session.add(event) return event @generic_repr("id", "created_by_id", "org_id", "active") class ApiKey(TimestampMixin, GFKBase, db.Model): id = primary_key("ApiKey") org_id = Column(key_type("Organization"), db.ForeignKey("organizations.id")) org = db.relationship(Organization) api_key = Column(db.String(255), index=True, default=lambda: generate_token(40)) active = Column(db.Boolean, default=True) # 'object' provided by GFKBase object_id = Column(key_type("ApiKey")) created_by_id = Column(key_type("User"), db.ForeignKey("users.id"), nullable=True) created_by = db.relationship(User) __tablename__ = "api_keys" __table_args__ = ( db.Index("api_keys_object_type_object_id", "object_type", "object_id"), ) @classmethod def get_by_api_key(cls, api_key): return cls.query.filter(cls.api_key == api_key, cls.active == True).one() @classmethod def get_by_object(cls, object): return cls.query.filter( cls.object_type == object.__class__.__tablename__, cls.object_id == object.id, cls.active == True, ).first() @classmethod def create_for_object(cls, object, user): k = cls(org=user.org, object=object, created_by=user) db.session.add(k) return k @generic_repr("id", "name", "type", "user_id", "org_id", "created_at") class NotificationDestination(BelongsToOrgMixin, db.Model): id = primary_key("NotificationDestination") org_id = Column(key_type("Organization"), db.ForeignKey("organizations.id")) org = db.relationship(Organization, backref="notification_destinations") user_id = Column(key_type("User"), db.ForeignKey("users.id")) user = db.relationship(User, backref="notification_destinations") name = Column(db.String(255)) type = Column(db.String(255)) options = Column( "encrypted_options", ConfigurationContainer.as_mutable( EncryptedConfiguration( db.Text, settings.DATASOURCE_SECRET_KEY, FernetEngine ) ), ) created_at = Column(db.DateTime(True), default=db.func.now()) __tablename__ = "notification_destinations" __table_args__ = ( db.Index( "notification_destinations_org_id_name", "org_id", "name", unique=True ), ) def __str__(self): return str(self.name) def to_dict(self, all=False): d = { "id": self.id, "name": self.name, "type": self.type, "icon": self.destination.icon(), } if all: schema = get_configuration_schema_for_destination_type(self.type) self.options.set_schema(schema) d["options"] = self.options.to_dict(mask_secrets=True) return d @property def destination(self): return get_destination(self.type, self.options) @classmethod def all(cls, org): notification_destinations = cls.query.filter(cls.org == org).order_by( cls.id.asc() ) return notification_destinations def notify(self, alert, query, user, new_state, app, host): schema = get_configuration_schema_for_destination_type(self.type) self.options.set_schema(schema) return self.destination.notify( alert, query, user, new_state, app, host, self.options ) @generic_repr("id", "user_id", "destination_id", "alert_id") class AlertSubscription(TimestampMixin, db.Model): id = primary_key("AlertSubscription") user_id = Column(key_type("User"), db.ForeignKey("users.id")) user = db.relationship(User) destination_id = Column( key_type("NotificationDestination"), db.ForeignKey("notification_destinations.id"), nullable=True ) destination = db.relationship(NotificationDestination) alert_id = Column(key_type("Alert"), db.ForeignKey("alerts.id")) alert = db.relationship(Alert, back_populates="subscriptions") __tablename__ = "alert_subscriptions" __table_args__ = ( db.Index( "alert_subscriptions_destination_id_alert_id", "destination_id", "alert_id", unique=True, ), ) def to_dict(self): d = {"id": self.id, "user": self.user.to_dict(), "alert_id": self.alert_id} if self.destination: d["destination"] = self.destination.to_dict() return d @classmethod def all(cls, alert_id): return AlertSubscription.query.join(User).filter( AlertSubscription.alert_id == alert_id ) def notify(self, alert, query, user, new_state, app, host): if self.destination: return self.destination.notify(alert, query, user, new_state, app, host) else: # User email subscription, so create an email destination object config = {"addresses": self.user.email} schema = get_configuration_schema_for_destination_type("email") options = ConfigurationContainer(config, schema) destination = get_destination("email", options) return destination.notify(alert, query, user, new_state, app, host, options) @generic_repr("id", "trigger", "user_id", "org_id") class QuerySnippet(TimestampMixin, db.Model, BelongsToOrgMixin): id = primary_key("QuerySnippet") org_id = Column(key_type("Organization"), db.ForeignKey("organizations.id")) org = db.relationship(Organization, backref="query_snippets") trigger = Column(db.String(255), unique=True) description = Column(db.Text) user_id = Column(key_type("User"), db.ForeignKey("users.id")) user = db.relationship(User, backref="query_snippets") snippet = Column(db.Text) __tablename__ = "query_snippets" @classmethod def all(cls, org): return cls.query.filter(cls.org == org) def to_dict(self): d = { "id": self.id, "trigger": self.trigger, "description": self.description, "snippet": self.snippet, "user": self.user.to_dict(), "updated_at": self.updated_at, "created_at": self.created_at, } return d def init_db(): default_org = Organization(name="Default", slug="default", settings={}) admin_group = Group( name="admin", permissions=["admin", "super_admin"], org=default_org, type=Group.BUILTIN_GROUP, ) default_group = Group( name="default", permissions=Group.DEFAULT_PERMISSIONS, org=default_org, type=Group.BUILTIN_GROUP, ) db.session.add_all([default_org, admin_group, default_group]) # XXX remove after fixing User.group_ids db.session.commit() return default_org, admin_group, default_group