redash/tasks/queries/execution.py (234 lines of code) (raw):
import signal
import time
import redis
from rq import get_current_job
from rq.job import JobStatus
from rq.timeouts import JobTimeoutException
from rq.exceptions import NoSuchJobError
from redash import models, redis_connection, settings
from redash.query_runner import InterruptException
from redash.tasks.worker import Queue, Job
from redash.tasks.alerts import check_alerts_for_query
from redash.tasks.failure_report import track_failure
from redash.utils import gen_query_hash, json_dumps, utcnow
from redash.worker import get_job_logger
logger = get_job_logger(__name__)
TIMEOUT_MESSAGE = "Query exceeded Redash query execution time limit."
def _job_lock_id(query_hash, data_source_id):
return "query_hash_job:%s:%s" % (data_source_id, query_hash)
def _unlock(query_hash, data_source_id):
redis_connection.delete(_job_lock_id(query_hash, data_source_id))
def enqueue_query(
query, data_source, user_id, is_api_key=False, scheduled_query=None, metadata={}
):
query_hash = gen_query_hash(query)
logger.info("Inserting job for %s with metadata=%s", query_hash, metadata)
try_count = 0
job = None
while try_count < 5:
try_count += 1
pipe = redis_connection.pipeline()
try:
pipe.watch(_job_lock_id(query_hash, data_source.id))
job_id = pipe.get(_job_lock_id(query_hash, data_source.id))
if job_id:
logger.info("[%s] Found existing job: %s", query_hash, job_id)
job_complete = None
job_cancelled = None
try:
job = Job.fetch(job_id)
job_exists = True
status = job.get_status()
job_complete = status in [JobStatus.FINISHED, JobStatus.FAILED]
job_cancelled = job.is_cancelled
if job_complete:
message = "job found is complete (%s)" % status
elif job_cancelled:
message = "job found has ben cancelled"
except NoSuchJobError:
message = "job found has expired"
job_exists = False
lock_is_irrelevant = job_complete or job_cancelled or not job_exists
if lock_is_irrelevant:
logger.info("[%s] %s, removing lock", query_hash, message)
redis_connection.delete(_job_lock_id(query_hash, data_source.id))
job = None
if not job:
pipe.multi()
if scheduled_query:
queue_name = data_source.scheduled_queue_name
scheduled_query_id = scheduled_query.id
else:
queue_name = data_source.queue_name
scheduled_query_id = None
time_limit = settings.dynamic_settings.query_time_limit(
scheduled_query, user_id, data_source.org_id
)
metadata["Queue"] = queue_name
queue = Queue(queue_name)
enqueue_kwargs = {
"user_id": user_id,
"scheduled_query_id": scheduled_query_id,
"is_api_key": is_api_key,
"job_timeout": time_limit,
"failure_ttl": settings.JOB_DEFAULT_FAILURE_TTL,
"meta": {
"data_source_id": data_source.id,
"org_id": data_source.org_id,
"scheduled": scheduled_query_id is not None,
"query_id": metadata.get("query_id"),
"user_id": user_id,
},
}
if not scheduled_query:
enqueue_kwargs["result_ttl"] = settings.JOB_EXPIRY_TIME
job = queue.enqueue(
execute_query, query, data_source.id, metadata, **enqueue_kwargs
)
logger.info("[%s] Created new job: %s", query_hash, job.id)
pipe.set(
_job_lock_id(query_hash, data_source.id),
job.id,
settings.JOB_EXPIRY_TIME,
)
pipe.execute()
break
except redis.WatchError:
continue
if not job:
logger.error("[Manager][%s] Failed adding job for query.", query_hash)
return job
def signal_handler(*args):
raise InterruptException
class QueryExecutionError(Exception):
pass
def _resolve_user(user_id, is_api_key, query_id):
if user_id is not None:
if is_api_key:
api_key = user_id
if query_id is not None:
q = models.Query.get_by_id(query_id)
else:
q = models.Query.by_api_key(api_key)
return models.ApiUser(api_key, q.org, q.groups)
else:
return models.User.get_by_id(user_id)
else:
return None
class QueryExecutor(object):
def __init__(
self, query, data_source_id, user_id, is_api_key, metadata, is_scheduled_query
):
self.job = get_current_job()
self.query = query
self.data_source_id = data_source_id
self.metadata = metadata
self.data_source = self._load_data_source()
self.query_id = metadata.get("query_id")
self.user = _resolve_user(user_id, is_api_key, metadata.get("query_id"))
self.query_model = (
models.Query.query.get(self.query_id)
if self.query_id and self.query_id != "adhoc"
else None
)
# Close DB connection to prevent holding a connection for a long time while the query is executing.
models.db.session.close()
self.query_hash = gen_query_hash(self.query)
self.is_scheduled_query = is_scheduled_query
if self.is_scheduled_query:
# Load existing tracker or create a new one if the job was created before code update:
models.scheduled_queries_executions.update(self.query_model.id)
def run(self):
signal.signal(signal.SIGINT, signal_handler)
started_at = time.time()
logger.debug("Executing query:\n%s", self.query)
self._log_progress("executing_query")
query_runner = self.data_source.query_runner
annotated_query = self._annotate_query(query_runner)
try:
data, error = query_runner.run_query(annotated_query, self.user)
except Exception as e:
if isinstance(e, JobTimeoutException):
error = TIMEOUT_MESSAGE
else:
error = str(e)
data = None
logger.warning("Unexpected error while running query:", exc_info=1)
run_time = time.time() - started_at
logger.info(
"job=execute_query query_hash=%s ds_id=%d data_length=%s error=[%s]",
self.query_hash,
self.data_source_id,
data and len(data),
error,
)
_unlock(self.query_hash, self.data_source.id)
if error is not None and data is None:
result = QueryExecutionError(error)
if self.is_scheduled_query:
self.query_model = models.db.session.merge(self.query_model, load=False)
track_failure(self.query_model, error)
raise result
else:
if self.query_model and self.query_model.schedule_failures > 0:
self.query_model = models.db.session.merge(self.query_model, load=False)
self.query_model.schedule_failures = 0
self.query_model.skip_updated_at = True
models.db.session.add(self.query_model)
query_result = models.QueryResult.store_result(
self.data_source.org_id,
self.data_source,
self.query_hash,
self.query,
data,
run_time,
utcnow(),
)
updated_query_ids = models.Query.update_latest_result(query_result)
models.db.session.commit() # make sure that alert sees the latest query result
self._log_progress("checking_alerts")
for query_id in updated_query_ids:
check_alerts_for_query.delay(query_id)
self._log_progress("finished")
result = query_result.id
models.db.session.commit()
return result
def _annotate_query(self, query_runner):
self.metadata["Job ID"] = self.job.id
self.metadata["Query Hash"] = self.query_hash
self.metadata["Scheduled"] = self.is_scheduled_query
return query_runner.annotate_query(self.query, self.metadata)
def _log_progress(self, state):
logger.info(
"job=execute_query state=%s query_hash=%s type=%s ds_id=%d "
"job_id=%s queue=%s query_id=%s username=%s",
state,
self.query_hash,
self.data_source.type,
self.data_source.id,
self.job.id,
self.metadata.get("Queue", "unknown"),
self.metadata.get("query_id", "unknown"),
self.metadata.get("Username", "unknown"),
)
def _load_data_source(self):
logger.info("job=execute_query state=load_ds ds_id=%d", self.data_source_id)
return models.DataSource.query.get(self.data_source_id)
# user_id is added last as a keyword argument for backward compatability -- to support executing previously submitted
# jobs before the upgrade to this version.
def execute_query(
query,
data_source_id,
metadata,
user_id=None,
scheduled_query_id=None,
is_api_key=False,
):
try:
return QueryExecutor(
query,
data_source_id,
user_id,
is_api_key,
metadata,
scheduled_query_id is not None,
).run()
except QueryExecutionError as e:
models.db.session.rollback()
return e