redash/handlers/queries.py (345 lines of code) (raw):
import sqlparse
from flask import jsonify, request, url_for
from flask_login import login_required
from flask_restful import abort
from sqlalchemy.orm.exc import StaleDataError
from funcy import partial
from redash import models, settings
from redash.authentication.org_resolving import current_org
from redash.handlers.base import (
BaseResource,
filter_by_tags,
get_object_or_404,
org_scoped_rule,
paginate,
routes,
order_results as _order_results,
)
from redash.handlers.query_results import run_query
from redash.permissions import (
can_modify,
not_view_only,
require_access,
require_admin_or_owner,
require_object_modify_permission,
require_permission,
view_only,
)
from redash.utils import collect_parameters_from_request
from redash.serializers import QuerySerializer
from redash.models.parameterized_query import ParameterizedQuery
# Ordering map for relationships
order_map = {
"name": "lowercase_name",
"-name": "-lowercase_name",
"created_at": "created_at",
"-created_at": "-created_at",
"schedule": "interval",
"-schedule": "-interval",
"runtime": "query_results-runtime",
"-runtime": "-query_results-runtime",
"executed_at": "query_results-retrieved_at",
"-executed_at": "-query_results-retrieved_at",
"created_by": "users-name",
"-created_by": "-users-name",
}
order_results = partial(
_order_results, default_order="-created_at", allowed_orders=order_map
)
@routes.route(org_scoped_rule("/api/queries/format"), methods=["POST"])
@login_required
def format_sql_query(org_slug=None):
"""
Formats an SQL query using the Python ``sqlparse`` formatter.
:<json string query: The SQL text to format
:>json string query: Formatted SQL text
"""
arguments = request.get_json(force=True)
query = arguments.get("query", "")
return jsonify(
{"query": sqlparse.format(query, **settings.SQLPARSE_FORMAT_OPTIONS)}
)
class QuerySearchResource(BaseResource):
@require_permission("view_query")
def get(self):
"""
Search query text, names, and descriptions.
:qparam string q: Search term
:qparam number include_drafts: Whether to include draft in results
Responds with a list of :ref:`query <query-response-label>` objects.
"""
term = request.args.get("q", "")
if not term:
return []
include_drafts = request.args.get("include_drafts") is not None
self.record_event({"action": "search", "object_type": "query", "term": term})
# this redirects to the new query list API that is aware of search
new_location = url_for(
"queries",
q=term,
org_slug=current_org.slug,
drafts="true" if include_drafts else "false",
)
return {}, 301, {"Location": new_location}
class QueryRecentResource(BaseResource):
@require_permission("view_query")
def get(self):
"""
Retrieve up to 10 queries recently modified by the user.
Responds with a list of :ref:`query <query-response-label>` objects.
"""
results = (
models.Query.by_user(self.current_user)
.order_by(models.Query.updated_at.desc())
.limit(10)
)
return QuerySerializer(
results, with_last_modified_by=False, with_user=False
).serialize()
class BaseQueryListResource(BaseResource):
def get_queries(self, search_term):
if search_term:
results = models.Query.search(
search_term,
self.current_user.group_ids,
self.current_user.id,
include_drafts=True,
multi_byte_search=current_org.get_setting("multi_byte_search_enabled"),
)
else:
results = models.Query.all_queries(
self.current_user.group_ids, self.current_user.id, include_drafts=True
)
return filter_by_tags(results, models.Query.tags)
@require_permission("view_query")
def get(self):
"""
Retrieve a list of queries.
:qparam number page_size: Number of queries to return per page
:qparam number page: Page number to retrieve
:qparam number order: Name of column to order by
:qparam number q: Full text search term
Responds with an array of :ref:`query <query-response-label>` objects.
"""
# See if we want to do full-text search or just regular queries
search_term = request.args.get("q", "")
queries = self.get_queries(search_term)
results = filter_by_tags(queries, models.Query.tags)
# order results according to passed order parameter,
# special-casing search queries where the database
# provides an order by search rank
ordered_results = order_results(results, fallback=not bool(search_term))
page = request.args.get("page", 1, type=int)
page_size = request.args.get("page_size", 25, type=int)
response = paginate(
ordered_results,
page=page,
page_size=page_size,
serializer=QuerySerializer,
with_stats=True,
with_last_modified_by=False,
)
if search_term:
self.record_event(
{"action": "search", "object_type": "query", "term": search_term}
)
else:
self.record_event({"action": "list", "object_type": "query"})
return response
def require_access_to_dropdown_queries(user, query_def):
parameters = query_def.get("options", {}).get("parameters", [])
dropdown_query_ids = set(
[str(p["queryId"]) for p in parameters if p["type"] == "query"]
)
if dropdown_query_ids:
groups = models.Query.all_groups_for_query_ids(dropdown_query_ids)
if len(groups) < len(dropdown_query_ids):
abort(
400,
message="You are trying to associate a dropdown query that does not have a matching group. "
"Please verify the dropdown query id you are trying to associate with this query.",
)
require_access(dict(groups), user, view_only)
class QueryListResource(BaseQueryListResource):
@require_permission("create_query")
def post(self):
"""
Create a new query.
:<json number data_source_id: The ID of the data source this query will run on
:<json string query: Query text
:<json string name:
:<json string description:
:<json string schedule: Schedule interval, in seconds, for repeated execution of this query
:<json object options: Query options
.. _query-response-label:
:>json number id: Query ID
:>json number latest_query_data_id: ID for latest output data from this query
:>json string name:
:>json string description:
:>json string query: Query text
:>json string query_hash: Hash of query text
:>json string schedule: Schedule interval, in seconds, for repeated execution of this query
:>json string api_key: Key for public access to this query's results.
:>json boolean is_archived: Whether this query is displayed in indexes and search results or not.
:>json boolean is_draft: Whether this query is a draft or not
:>json string updated_at: Time of last modification, in ISO format
:>json string created_at: Time of creation, in ISO format
:>json number data_source_id: ID of the data source this query will run on
:>json object options: Query options
:>json number version: Revision version (for update conflict avoidance)
:>json number user_id: ID of query creator
:>json number last_modified_by_id: ID of user who last modified this query
:>json string retrieved_at: Time when query results were last retrieved, in ISO format (may be null)
:>json number runtime: Runtime of last query execution, in seconds (may be null)
"""
query_def = request.get_json(force=True)
data_source = models.DataSource.get_by_id_and_org(
query_def.pop("data_source_id"), self.current_org
)
require_access(data_source, self.current_user, not_view_only)
require_access_to_dropdown_queries(self.current_user, query_def)
for field in [
"id",
"created_at",
"api_key",
"visualizations",
"latest_query_data",
"last_modified_by",
]:
query_def.pop(field, None)
query_def["query_text"] = query_def.pop("query")
query_def["user"] = self.current_user
query_def["data_source"] = data_source
query_def["org"] = self.current_org
query_def["is_draft"] = True
query = models.Query.create(**query_def)
models.db.session.add(query)
models.db.session.commit()
self.record_event(
{"action": "create", "object_id": query.id, "object_type": "query"}
)
return QuerySerializer(query, with_visualizations=True).serialize()
class QueryArchiveResource(BaseQueryListResource):
def get_queries(self, search_term):
if search_term:
return models.Query.search(
search_term,
self.current_user.group_ids,
self.current_user.id,
include_drafts=False,
include_archived=True,
multi_byte_search=current_org.get_setting("multi_byte_search_enabled"),
)
else:
return models.Query.all_queries(
self.current_user.group_ids,
self.current_user.id,
include_drafts=False,
include_archived=True,
)
class MyQueriesResource(BaseResource):
@require_permission("view_query")
def get(self):
"""
Retrieve a list of queries created by the current user.
:qparam number page_size: Number of queries to return per page
:qparam number page: Page number to retrieve
:qparam number order: Name of column to order by
:qparam number search: Full text search term
Responds with an array of :ref:`query <query-response-label>` objects.
"""
search_term = request.args.get("q", "")
if search_term:
results = models.Query.search_by_user(search_term, self.current_user)
else:
results = models.Query.by_user(self.current_user)
results = filter_by_tags(results, models.Query.tags)
# order results according to passed order parameter,
# special-casing search queries where the database
# provides an order by search rank
ordered_results = order_results(results, fallback=not bool(search_term))
page = request.args.get("page", 1, type=int)
page_size = request.args.get("page_size", 25, type=int)
return paginate(
ordered_results,
page,
page_size,
QuerySerializer,
with_stats=True,
with_last_modified_by=False,
)
class QueryResource(BaseResource):
@require_permission("edit_query")
def post(self, query_id):
"""
Modify a query.
:param query_id: ID of query to update
:<json number data_source_id: The ID of the data source this query will run on
:<json string query: Query text
:<json string name:
:<json string description:
:<json string schedule: Schedule interval, in seconds, for repeated execution of this query
:<json object options: Query options
Responds with the updated :ref:`query <query-response-label>` object.
"""
query = get_object_or_404(
models.Query.get_by_id_and_org, query_id, self.current_org
)
query_def = request.get_json(force=True)
require_object_modify_permission(query, self.current_user)
require_access_to_dropdown_queries(self.current_user, query_def)
for field in [
"id",
"created_at",
"api_key",
"visualizations",
"latest_query_data",
"user",
"last_modified_by",
"org",
]:
query_def.pop(field, None)
if "query" in query_def:
query_def["query_text"] = query_def.pop("query")
if "tags" in query_def:
query_def["tags"] = [tag for tag in query_def["tags"] if tag]
if "data_source_id" in query_def:
data_source = models.DataSource.get_by_id_and_org(
query_def["data_source_id"], self.current_org
)
require_access(data_source, self.current_user, not_view_only)
query_def["last_modified_by"] = self.current_user
query_def["changed_by"] = self.current_user
# SQLAlchemy handles the case where a concurrent transaction beats us
# to the update. But we still have to make sure that we're not starting
# out behind.
if "version" in query_def and query_def["version"] != query.version:
abort(409)
try:
self.update_model(query, query_def)
models.db.session.commit()
except StaleDataError:
abort(409)
return QuerySerializer(query, with_visualizations=True).serialize()
@require_permission("view_query")
def get(self, query_id):
"""
Retrieve a query.
:param query_id: ID of query to fetch
Responds with the :ref:`query <query-response-label>` contents.
"""
q = get_object_or_404(
models.Query.get_by_id_and_org, query_id, self.current_org
)
require_access(q, self.current_user, view_only)
result = QuerySerializer(q, with_visualizations=True).serialize()
result["can_edit"] = can_modify(q, self.current_user)
self.record_event(
{"action": "view", "object_id": query_id, "object_type": "query"}
)
return result
# TODO: move to resource of its own? (POST /queries/{id}/archive)
def delete(self, query_id):
"""
Archives a query.
:param query_id: ID of query to archive
"""
query = get_object_or_404(
models.Query.get_by_id_and_org, query_id, self.current_org
)
require_admin_or_owner(query.user_id)
query.archive(self.current_user)
models.db.session.commit()
class QueryRegenerateApiKeyResource(BaseResource):
@require_permission("edit_query")
def post(self, query_id):
query = get_object_or_404(
models.Query.get_by_id_and_org, query_id, self.current_org
)
require_admin_or_owner(query.user_id)
query.regenerate_api_key()
models.db.session.commit()
self.record_event(
{
"action": "regnerate_api_key",
"object_id": query_id,
"object_type": "query",
}
)
result = QuerySerializer(query).serialize()
return result
class QueryForkResource(BaseResource):
@require_permission("edit_query")
def post(self, query_id):
"""
Creates a new query, copying the query text from an existing one.
:param query_id: ID of query to fork
Responds with created :ref:`query <query-response-label>` object.
"""
query = get_object_or_404(
models.Query.get_by_id_and_org, query_id, self.current_org
)
require_access(query.data_source, self.current_user, not_view_only)
forked_query = query.fork(self.current_user)
models.db.session.commit()
self.record_event(
{"action": "fork", "object_id": query_id, "object_type": "query"}
)
return QuerySerializer(forked_query, with_visualizations=True).serialize()
class QueryRefreshResource(BaseResource):
def post(self, query_id):
"""
Execute a query, updating the query object with the results.
:param query_id: ID of query to execute
Responds with query task details.
"""
# TODO: this should actually check for permissions, but because currently you can only
# get here either with a user API key or a query one, we can just check whether it's
# an api key (meaning this is a query API key, which only grants read access).
if self.current_user.is_api_user():
abort(403, message="Please use a user API key.")
query = get_object_or_404(
models.Query.get_by_id_and_org, query_id, self.current_org
)
require_access(query, self.current_user, not_view_only)
parameter_values = collect_parameters_from_request(request.args)
parameterized_query = ParameterizedQuery(query.query_text, org=self.current_org)
should_apply_auto_limit = query.options.get("apply_auto_limit", False)
return run_query(
parameterized_query, parameter_values, query.data_source, query.id, should_apply_auto_limit
)
class QueryTagsResource(BaseResource):
def get(self):
"""
Returns all query tags including those for drafts.
"""
tags = models.Query.all_tags(self.current_user, include_drafts=True)
return {"tags": [{"name": name, "count": count} for name, count in tags]}
class QueryFavoriteListResource(BaseResource):
def get(self):
search_term = request.args.get("q")
if search_term:
base_query = models.Query.search(
search_term,
self.current_user.group_ids,
include_drafts=True,
limit=None,
)
favorites = models.Query.favorites(self.current_user, base_query=base_query)
else:
favorites = models.Query.favorites(self.current_user)
favorites = filter_by_tags(favorites, models.Query.tags)
# order results according to passed order parameter,
# special-casing search queries where the database
# provides an order by search rank
ordered_favorites = order_results(favorites, fallback=not bool(search_term))
page = request.args.get("page", 1, type=int)
page_size = request.args.get("page_size", 25, type=int)
response = paginate(
ordered_favorites,
page,
page_size,
QuerySerializer,
with_stats=True,
with_last_modified_by=False,
)
self.record_event(
{
"action": "load_favorites",
"object_type": "query",
"params": {
"q": search_term,
"tags": request.args.getlist("tags"),
"page": page,
},
}
)
return response