redash/monitor.py (97 lines of code) (raw):
from __future__ import absolute_import
import itertools
from funcy import flatten
from sqlalchemy import union_all
from redash import redis_connection, rq_redis_connection, __version__, settings
from redash.models import db, DataSource, Query, QueryResult, Dashboard, Widget
from redash.utils import json_loads
from rq import Queue, Worker
from rq.job import Job
from rq.registry import StartedJobRegistry
def get_redis_status():
info = redis_connection.info()
return {
"redis_used_memory": info["used_memory"],
"redis_used_memory_human": info["used_memory_human"],
}
def get_object_counts():
status = {}
status["queries_count"] = Query.query.count()
if settings.FEATURE_SHOW_QUERY_RESULTS_COUNT:
status["query_results_count"] = QueryResult.query.count()
status["unused_query_results_count"] = QueryResult.unused().count()
status["dashboards_count"] = Dashboard.query.count()
status["widgets_count"] = Widget.query.count()
return status
def get_queues_status():
return {queue.name: {"size": len(queue)} for queue in Queue.all(connection=rq_redis_connection)}
def get_db_sizes():
database_metrics = []
queries = [
[
"Query Results Size",
"select pg_total_relation_size('query_results') as size from (select 1) as a",
],
["Redash DB Size", "select pg_database_size(current_database()) as size"],
]
for query_name, query in queries:
result = db.session.execute(query).first()
database_metrics.append([query_name, result[0]])
return database_metrics
def get_status():
status = {"version": __version__, "workers": []}
status.update(get_redis_status())
status.update(get_object_counts())
status["manager"] = redis_connection.hgetall("redash:status")
status["manager"]["queues"] = get_queues_status()
status["database_metrics"] = {}
status["database_metrics"]["metrics"] = get_db_sizes()
return status
def rq_job_ids():
queues = Queue.all(connection=redis_connection)
started_jobs = [StartedJobRegistry(queue=q).get_job_ids() for q in queues]
queued_jobs = [q.job_ids for q in queues]
return flatten(started_jobs + queued_jobs)
def fetch_jobs(job_ids):
return [
{
"id": job.id,
"name": job.func_name,
"origin": job.origin,
"enqueued_at": job.enqueued_at,
"started_at": job.started_at,
"meta": job.meta,
}
for job in Job.fetch_many(job_ids, connection=rq_redis_connection)
if job is not None
]
def rq_queues():
return {
q.name: {
"name": q.name,
"started": fetch_jobs(StartedJobRegistry(queue=q).get_job_ids()),
"queued": len(q.job_ids),
}
for q in sorted(Queue.all(), key=lambda q: q.name)
}
def describe_job(job):
return "{} ({})".format(job.id, job.func_name.split(".").pop()) if job else None
def rq_workers():
return [
{
"name": w.name,
"hostname": w.hostname,
"pid": w.pid,
"queues": ", ".join([q.name for q in w.queues]),
"state": w.state,
"last_heartbeat": w.last_heartbeat,
"birth_date": w.birth_date,
"current_job": describe_job(w.get_current_job()),
"successful_jobs": w.successful_job_count,
"failed_jobs": w.failed_job_count,
"total_working_time": w.total_working_time,
}
for w in Worker.all()
]
def rq_status():
return {"queues": rq_queues(), "workers": rq_workers()}