in redash/tasks/queries/execution.py [0:0]
def run(self):
signal.signal(signal.SIGINT, signal_handler)
started_at = time.time()
logger.debug("Executing query:\n%s", self.query)
self._log_progress("executing_query")
query_runner = self.data_source.query_runner
annotated_query = self._annotate_query(query_runner)
try:
data, error = query_runner.run_query(annotated_query, self.user)
except Exception as e:
if isinstance(e, JobTimeoutException):
error = TIMEOUT_MESSAGE
else:
error = str(e)
data = None
logger.warning("Unexpected error while running query:", exc_info=1)
run_time = time.time() - started_at
logger.info(
"job=execute_query query_hash=%s ds_id=%d data_length=%s error=[%s]",
self.query_hash,
self.data_source_id,
data and len(data),
error,
)
_unlock(self.query_hash, self.data_source.id)
if error is not None and data is None:
result = QueryExecutionError(error)
if self.is_scheduled_query:
self.query_model = models.db.session.merge(self.query_model, load=False)
track_failure(self.query_model, error)
raise result
else:
if self.query_model and self.query_model.schedule_failures > 0:
self.query_model = models.db.session.merge(self.query_model, load=False)
self.query_model.schedule_failures = 0
self.query_model.skip_updated_at = True
models.db.session.add(self.query_model)
query_result = models.QueryResult.store_result(
self.data_source.org_id,
self.data_source,
self.query_hash,
self.query,
data,
run_time,
utcnow(),
)
updated_query_ids = models.Query.update_latest_result(query_result)
models.db.session.commit() # make sure that alert sees the latest query result
self._log_progress("checking_alerts")
for query_id in updated_query_ids:
check_alerts_for_query.delay(query_id)
self._log_progress("finished")
result = query_result.id
models.db.session.commit()
return result