redash/models/parameterized_query.py (164 lines of code) (raw):

import pystache from functools import partial from numbers import Number from redash.utils import mustache_render, json_loads, user_mustache_render from redash.permissions import require_access, view_only from funcy import distinct from dateutil.parser import parse def _pluck_name_and_value(default_column, row): row = {k.lower(): v for k, v in row.items()} name_column = "name" if "name" in row.keys() else default_column.lower() value_column = "value" if "value" in row.keys() else default_column.lower() return {"name": row[name_column], "value": str(row[value_column])} def _load_result(query_id, org): from redash import models query = models.Query.get_by_id_and_org(query_id, org) if not query.data_source: raise QueryDetachedFromDataSourceError(query_id) query_result = models.QueryResult.get_by_id_and_org( query.latest_query_data_id, org ) return query_result.data def dropdown_values(query_id, org): data = _load_result(query_id, org) first_column = data["columns"][0]["name"] pluck = partial(_pluck_name_and_value, first_column) return list(map(pluck, data["rows"])) def join_parameter_list_values(parameters, schema): updated_parameters = {} for (key, value) in parameters.items(): if isinstance(value, list): definition = next( (definition for definition in schema if definition["name"] == key), {} ) multi_values_options = definition.get("multiValuesOptions", {}) separator = str(multi_values_options.get("separator", ",")) prefix = str(multi_values_options.get("prefix", "")) suffix = str(multi_values_options.get("suffix", "")) updated_parameters[key] = separator.join( [prefix + v + suffix for v in value] ) else: updated_parameters[key] = value return updated_parameters def _collect_key_names(nodes): keys = [] for node in nodes._parse_tree: if isinstance(node, pystache.parser._EscapeNode): keys.append(node.key) elif isinstance(node, pystache.parser._SectionNode): keys.append(node.key) keys.extend(_collect_key_names(node.parsed)) return distinct(keys) def _collect_query_parameters(query): nodes = pystache.parse(query) keys = _collect_key_names(nodes) return keys def _parameter_names(parameter_values): names = [] for key, value in parameter_values.items(): if isinstance(value, dict): for inner_key in value.keys(): names.append("{}.{}".format(key, inner_key)) else: names.append(key) return names def _is_number(string): if isinstance(string, Number): return True else: try: float(string) return True except ValueError: return False def _is_date(string): try: parse(string) return True except (ValueError, TypeError): return False def _is_date_range(obj): try: return _is_date(obj["start"]) and _is_date(obj["end"]) except (KeyError, TypeError): return False def _is_value_within_options(value, dropdown_options, allow_list=False): if isinstance(value, list): return allow_list and set(map(str, value)).issubset(set(dropdown_options)) return str(value) in dropdown_options class ParameterizedQuery(object): def __init__(self, template, schema=None, org=None): self.schema = schema or [] self.org = org self.template = template self.query = template self.parameters = {} def user_apply(self, attributes): self.query = user_mustache_render(self.query, attributes) def apply(self, parameters): invalid_parameter_names = [ key for (key, value) in parameters.items() if not self._valid(key, value) ] if invalid_parameter_names: raise InvalidParameterError(invalid_parameter_names) else: self.parameters.update(parameters) self.query = mustache_render( self.template, join_parameter_list_values(parameters, self.schema) ) return self def _valid(self, name, value): if not self.schema: return True definition = next( (definition for definition in self.schema if definition["name"] == name), None, ) if not definition: return False enum_options = definition.get("enumOptions") query_id = definition.get("queryId") allow_multiple_values = isinstance(definition.get("multiValuesOptions"), dict) if isinstance(enum_options, str): enum_options = enum_options.split("\n") validators = { "text": lambda value: isinstance(value, str), "number": _is_number, "enum": lambda value: _is_value_within_options( value, enum_options, allow_multiple_values ), "query": lambda value: True, "date": _is_date, "datetime-local": _is_date, "datetime-with-seconds": _is_date, "date-range": _is_date_range, "datetime-range": _is_date_range, "datetime-range-with-seconds": _is_date_range, } validate = validators.get(definition["type"], lambda x: False) return validate(value) @property def is_safe(self): text_parameters = [param for param in self.schema if param["type"] == "text"] return not any(text_parameters) @property def missing_params(self): query_parameters = set(_collect_query_parameters(self.template)) return set(query_parameters) - set(_parameter_names(self.parameters)) @property def text(self): return self.query class InvalidParameterError(Exception): def __init__(self, parameters): parameter_names = ", ".join(parameters) message = "The following parameter values are incompatible with their definitions: {}".format( parameter_names ) super(InvalidParameterError, self).__init__(message) class QueryDetachedFromDataSourceError(Exception): def __init__(self, query_id): self.query_id = query_id super(QueryDetachedFromDataSourceError, self).__init__( "This query is detached from any data source. Please select a different query." )