redash/handlers/query_results.py (406 lines of code) (raw):
import logging
import time
import unicodedata
from flask import make_response, request
from flask_login import current_user
from flask_restful import abort
from werkzeug.urls import url_quote
from redash import models, settings
from redash.handlers.base import BaseResource, get_object_or_404, record_event
from redash.permissions import (
has_access,
not_view_only,
require_access,
require_permission,
require_any_of_permission,
view_only,
)
from redash.tasks import Job
from redash.tasks.queries import enqueue_query
from redash.utils import (
collect_parameters_from_request,
json_dumps,
utcnow,
to_filename,
)
from redash.models.parameterized_query import (
ParameterizedQuery,
InvalidParameterError,
QueryDetachedFromDataSourceError,
dropdown_values,
)
from redash.serializers import (
serialize_query_result,
serialize_query_result_to_dsv,
serialize_query_result_to_xlsx,
serialize_job,
)
from functools import partial
from sqlalchemy.orm.exc import NoResultFound
def error_response(message, http_status=400):
return {"job": {"status": 4, "error": message}}, http_status
error_messages = {
"unsafe_when_shared": error_response(
"This query contains potentially unsafe parameters and cannot be executed on a shared dashboard or an embedded visualization.",
403,
),
"unsafe_on_view_only": error_response(
"This query contains potentially unsafe parameters and cannot be executed with read-only access to this data source.",
403,
),
"no_permission": error_response(
"You do not have permission to run queries with this data source.", 403
),
"select_data_source": error_response(
"Please select data source to run this query.", 401
),
}
def run_query(
query, parameters, data_source, query_id, should_apply_auto_limit, max_age=0
):
if data_source.paused:
if data_source.pause_reason:
message = "{} is paused ({}). Please try later.".format(
data_source.name, data_source.pause_reason
)
else:
message = "{} is paused. Please try later.".format(data_source.name)
return error_response(message)
try:
query.apply(parameters)
query.user_apply(current_user.attributes)
except (InvalidParameterError, QueryDetachedFromDataSourceError) as e:
abort(400, message=str(e))
query_text = data_source.query_runner.apply_auto_limit(
query.text, should_apply_auto_limit
)
if query.missing_params:
return error_response(
"Missing parameter value for: {}".format(", ".join(query.missing_params))
)
if max_age == 0:
query_result = None
else:
query_result = models.QueryResult.get_latest(data_source, query_text, max_age)
record_event(
current_user.org,
current_user,
{
"action": "execute_query",
"cache": "hit" if query_result else "miss",
"object_id": data_source.id,
"object_type": "data_source",
"query": query_text,
"query_id": query_id,
"parameters": parameters,
},
)
if query_result:
return {
"query_result": serialize_query_result(
query_result, current_user.is_api_user()
)
}
else:
job = enqueue_query(
query_text,
data_source,
current_user.id,
current_user.is_api_user(),
metadata={
"Username": repr(current_user)
if current_user.is_api_user()
else current_user.email,
"query_id": query_id,
},
)
return serialize_job(job)
def get_download_filename(query_result, query, filetype):
retrieved_at = query_result.retrieved_at.strftime("%Y_%m_%d")
if query:
filename = to_filename(query.name) if query.name != "" else str(query.id)
else:
filename = str(query_result.id)
return "{}_{}.{}".format(filename, retrieved_at, filetype)
def content_disposition_filenames(attachment_filename):
if not isinstance(attachment_filename, str):
attachment_filename = attachment_filename.decode("utf-8")
try:
attachment_filename = attachment_filename.encode("ascii")
except UnicodeEncodeError:
filenames = {
"filename": unicodedata.normalize("NFKD", attachment_filename).encode(
"ascii", "ignore"
),
"filename*": "UTF-8''%s" % url_quote(attachment_filename, safe=b""),
}
else:
filenames = {"filename": attachment_filename}
return filenames
class QueryResultListResource(BaseResource):
@require_permission("execute_query")
def post(self):
"""
Execute a query (or retrieve recent results).
:qparam string query: The query text to execute
:qparam number query_id: The query object to update with the result (optional)
:qparam number max_age: If query results less than `max_age` seconds old are available,
return them, otherwise execute the query; if omitted or -1, returns
any cached result, or executes if not available. Set to zero to
always execute.
:qparam number data_source_id: ID of data source to query
:qparam object parameters: A set of parameter values to apply to the query.
"""
params = request.get_json(force=True)
query = params["query"]
max_age = params.get("max_age", -1)
# max_age might have the value of None, in which case calling int(None) will fail
if max_age is None:
max_age = -1
max_age = int(max_age)
query_id = params.get("query_id", "adhoc")
parameters = params.get(
"parameters", collect_parameters_from_request(request.args)
)
parameterized_query = ParameterizedQuery(query, org=self.current_org)
should_apply_auto_limit = params.get("apply_auto_limit", False)
data_source_id = params.get("data_source_id")
if data_source_id:
data_source = models.DataSource.get_by_id_and_org(
params.get("data_source_id"), self.current_org
)
else:
return error_messages["select_data_source"]
if not has_access(data_source, self.current_user, not_view_only):
return error_messages["no_permission"]
return run_query(
parameterized_query,
parameters,
data_source,
query_id,
should_apply_auto_limit,
max_age,
)
ONE_YEAR = 60 * 60 * 24 * 365.25
class QueryResultDropdownResource(BaseResource):
def get(self, query_id):
query = get_object_or_404(
models.Query.get_by_id_and_org, query_id, self.current_org
)
require_access(query.data_source, current_user, view_only)
try:
return dropdown_values(query_id, self.current_org)
except NoResultFound as e:
if len(request.args) > 0:
return QueryDropdownsResource.parametrized_result(query_id, self.current_org)
else:
raise e
except QueryDetachedFromDataSourceError as e:
abort(400, message=str(e))
class QueryDropdownsResource(BaseResource):
def get(self, query_id, dropdown_query_id):
query = get_object_or_404(
models.Query.get_by_id_and_org, query_id, self.current_org
)
require_access(query, current_user, view_only)
related_queries_ids = [
p["queryId"] for p in query.parameters if p["type"] == "query"
]
if int(dropdown_query_id) not in related_queries_ids:
dropdown_query = get_object_or_404(
models.Query.get_by_id_and_org, dropdown_query_id, self.current_org
)
require_access(dropdown_query.data_source, current_user, view_only)
try:
return dropdown_values(dropdown_query_id, self.current_org)
except NoResultFound as e:
if len(request.args) > 0:
return self.parametrized_result(dropdown_query_id, self.current_org)
else:
raise e
@staticmethod
def parametrized_result(dropdown_query_id, current_org):
dropdown_query = get_object_or_404(
models.Query.get_by_id_and_org, dropdown_query_id, current_org
)
parameterized_query = dropdown_query.parameterized
args = QueryDropdownsResource.filter_args_by_query(parameterized_query=parameterized_query, args=request.args)
rq = run_query(parameterized_query, args, dropdown_query.data_source,
dropdown_query_id, False, 0)
result_id = -1
if 'job' in rq:
for i in range(0, 10):
time.sleep(1)
jb = Job.fetch(rq['job']['id'])
if jb.result is not None:
result_id = jb.result
break
elif 'query_result' in rq:
result_id = rq['query_result']['id']
query_result = models.QueryResult.get_by_id_and_org(
result_id, current_org
)
first_column = query_result.data["columns"][0]["name"]
pluck = partial(QueryDropdownsResource._pluck_name_and_value, first_column)
return list(map(pluck, query_result.data["rows"]))
@staticmethod
def filter_args_by_query(parameterized_query: ParameterizedQuery, args) -> dict:
new_args = {}
for sch in parameterized_query.schema:
for k, v in args.items():
if '[]' in k:
v = args.getlist(k)
k = k[0:len(k) - 2]
if sch['name'] == k:
new_args[k] = v
break
return new_args
@staticmethod
def _pluck_name_and_value(default_column, row):
row = {k.lower(): v for k, v in row.items()}
name_column = "name" if "name" in row.keys() else default_column.lower()
value_column = "value" if "value" in row.keys() else default_column.lower()
return {"name": row[name_column], "value": str(row[value_column])}
class QueryResultResource(BaseResource):
@staticmethod
def add_cors_headers(headers):
if "Origin" in request.headers:
origin = request.headers["Origin"]
if set(["*", origin]) & settings.ACCESS_CONTROL_ALLOW_ORIGIN:
headers["Access-Control-Allow-Origin"] = origin
headers["Access-Control-Allow-Credentials"] = str(
settings.ACCESS_CONTROL_ALLOW_CREDENTIALS
).lower()
@require_any_of_permission(("view_query", "execute_query"))
def options(self, query_id=None, query_result_id=None, filetype="json"):
headers = {}
self.add_cors_headers(headers)
if settings.ACCESS_CONTROL_REQUEST_METHOD:
headers[
"Access-Control-Request-Method"
] = settings.ACCESS_CONTROL_REQUEST_METHOD
if settings.ACCESS_CONTROL_ALLOW_HEADERS:
headers[
"Access-Control-Allow-Headers"
] = settings.ACCESS_CONTROL_ALLOW_HEADERS
return make_response("", 200, headers)
@require_any_of_permission(("view_query", "execute_query"))
def post(self, query_id):
"""
Execute a saved query.
:param number query_id: The ID of the query whose results should be fetched.
:param object parameters: The parameter values to apply to the query.
:qparam number max_age: If query results less than `max_age` seconds old are available,
return them, otherwise execute the query; if omitted or -1, returns
any cached result, or executes if not available. Set to zero to
always execute.
"""
params = request.get_json(force=True, silent=True) or {}
parameter_values = params.get("parameters", {})
max_age = params.get("max_age", -1)
# max_age might have the value of None, in which case calling int(None) will fail
if max_age is None:
max_age = -1
max_age = int(max_age)
query = get_object_or_404(
models.Query.get_by_id_and_org, query_id, self.current_org
)
allow_executing_with_view_only_permissions = query.parameterized.is_safe
should_apply_auto_limit = params.get("apply_auto_limit", False)
if has_access(
query, self.current_user, allow_executing_with_view_only_permissions
):
return run_query(
query.parameterized,
parameter_values,
query.data_source,
query_id,
should_apply_auto_limit,
max_age,
)
else:
if not query.parameterized.is_safe:
if current_user.is_api_user():
return error_messages["unsafe_when_shared"]
else:
return error_messages["unsafe_on_view_only"]
else:
return error_messages["no_permission"]
@require_any_of_permission(("view_query", "execute_query"))
def get(self, query_id=None, query_result_id=None, filetype="json"):
"""
Retrieve query results.
:param number query_id: The ID of the query whose results should be fetched
:param number query_result_id: the ID of the query result to fetch
:param string filetype: Format to return. One of 'json', 'xlsx', or 'csv'. Defaults to 'json'.
:<json number id: Query result ID
:<json string query: Query that produced this result
:<json string query_hash: Hash code for query text
:<json object data: Query output
:<json number data_source_id: ID of data source that produced this result
:<json number runtime: Length of execution time in seconds
:<json string retrieved_at: Query retrieval date/time, in ISO format
"""
# TODO:
# This method handles two cases: retrieving result by id & retrieving result by query id.
# They need to be split, as they have different logic (for example, retrieving by query id
# should check for query parameters and shouldn't cache the result).
should_cache = query_result_id is not None
parameter_values = collect_parameters_from_request(request.args)
max_age = int(request.args.get("maxAge", 0))
query_result = None
query = None
if query_result_id:
query_result = get_object_or_404(
models.QueryResult.get_by_id_and_org, query_result_id, self.current_org
)
if query_id is not None:
query = get_object_or_404(
models.Query.get_by_id_and_org, query_id, self.current_org
)
if (
query_result is None
and query is not None
and query.latest_query_data_id is not None
):
query_result = get_object_or_404(
models.QueryResult.get_by_id_and_org,
query.latest_query_data_id,
self.current_org,
)
if (
query is not None
and query_result is not None
and self.current_user.is_api_user()
):
if query.query_hash != query_result.query_hash:
abort(404, message="No cached result found for this query.")
if query_result:
require_access(query_result.data_source, self.current_user, view_only)
if isinstance(self.current_user, models.ApiUser):
event = {
"user_id": None,
"org_id": self.current_org.id,
"action": "api_get",
"api_key": self.current_user.name,
"file_type": filetype,
"user_agent": request.user_agent.string,
"ip": request.remote_addr,
}
if query_id:
event["object_type"] = "query"
event["object_id"] = query_id
else:
event["object_type"] = "query_result"
event["object_id"] = query_result_id
self.record_event(event)
response_builders = {
"json": self.make_json_response,
"xlsx": self.make_excel_response,
"csv": self.make_csv_response,
"tsv": self.make_tsv_response,
}
response = response_builders[filetype](query_result)
if len(settings.ACCESS_CONTROL_ALLOW_ORIGIN) > 0:
self.add_cors_headers(response.headers)
if should_cache:
response.headers.add_header(
"Cache-Control", "private,max-age=%d" % ONE_YEAR
)
filename = get_download_filename(query_result, query, filetype)
filenames = content_disposition_filenames(filename)
response.headers.add("Content-Disposition", "attachment", **filenames)
return response
else:
abort(404, message="No cached result found for this query.")
@staticmethod
def make_json_response(query_result):
data = json_dumps({"query_result": query_result.to_dict()})
headers = {"Content-Type": "application/json"}
return make_response(data, 200, headers)
@staticmethod
def make_csv_response(query_result):
headers = {"Content-Type": "text/csv; charset=UTF-8"}
return make_response(
serialize_query_result_to_dsv(query_result, ","), 200, headers
)
@staticmethod
def make_tsv_response(query_result):
headers = {"Content-Type": "text/tab-separated-values; charset=UTF-8"}
return make_response(
serialize_query_result_to_dsv(query_result, "\t"), 200, headers
)
@staticmethod
def make_excel_response(query_result):
headers = {
"Content-Type": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
}
return make_response(serialize_query_result_to_xlsx(query_result), 200, headers)
class JobResource(BaseResource):
def get(self, job_id, query_id=None):
"""
Retrieve info about a running query job.
"""
job = Job.fetch(job_id)
return serialize_job(job)
def delete(self, job_id):
"""
Cancel a query job in progress.
"""
job = Job.fetch(job_id)
job.cancel()