redash/query_runner/json_ds.py (145 lines of code) (raw):

import logging import yaml import datetime from funcy import compact, project from redash.utils.requests_session import requests_or_advocate, UnacceptableAddressException from redash.utils import json_dumps from redash.query_runner import ( BaseHTTPQueryRunner, register, TYPE_BOOLEAN, TYPE_DATETIME, TYPE_FLOAT, TYPE_INTEGER, TYPE_STRING, ) class QueryParseError(Exception): pass def parse_query(query): # TODO: copy paste from Metrica query runner, we should extract this into a utility query = query.strip() if query == "": raise QueryParseError("Query is empty.") try: params = yaml.safe_load(query) return params except ValueError as e: logging.exception(e) error = str(e) raise QueryParseError(error) TYPES_MAP = { str: TYPE_STRING, bytes: TYPE_STRING, int: TYPE_INTEGER, float: TYPE_FLOAT, bool: TYPE_BOOLEAN, datetime.datetime: TYPE_DATETIME, } def _get_column_by_name(columns, column_name): for c in columns: if "name" in c and c["name"] == column_name: return c return None def _get_type(value): return TYPES_MAP.get(type(value), TYPE_STRING) def add_column(columns, column_name, column_type): if _get_column_by_name(columns, column_name) is None: columns.append( {"name": column_name, "friendly_name": column_name, "type": column_type} ) def _apply_path_search(response, path): if path is None: return response path_parts = path.split(".") path_parts.reverse() while len(path_parts) > 0: current_path = path_parts.pop() if current_path in response: response = response[current_path] else: raise Exception("Couldn't find path {} in response.".format(path)) return response def _normalize_json(data, path): data = _apply_path_search(data, path) if isinstance(data, dict): data = [data] return data def _sort_columns_with_fields(columns, fields): if fields: columns = compact([_get_column_by_name(columns, field) for field in fields]) return columns # TODO: merge the logic here with the one in MongoDB's queyr runner def parse_json(data, path, fields): data = _normalize_json(data, path) rows = [] columns = [] for row in data: parsed_row = {} for key in row: if isinstance(row[key], dict): for inner_key in row[key]: column_name = "{}.{}".format(key, inner_key) if fields and key not in fields and column_name not in fields: continue value = row[key][inner_key] add_column(columns, column_name, _get_type(value)) parsed_row[column_name] = value else: if fields and key not in fields: continue value = row[key] add_column(columns, key, _get_type(value)) parsed_row[key] = row[key] rows.append(parsed_row) columns = _sort_columns_with_fields(columns, fields) return {"rows": rows, "columns": columns} class JSON(BaseHTTPQueryRunner): requires_url = False @classmethod def configuration_schema(cls): return { "type": "object", "properties": { "username": {"type": "string", "title": cls.username_title}, "password": {"type": "string", "title": cls.password_title}, }, "secret": ["password"], "order": ["username", "password"], } def __init__(self, configuration): super(JSON, self).__init__(configuration) self.syntax = "yaml" def test_connection(self): pass def run_query(self, query, user): query = parse_query(query) if not isinstance(query, dict): raise QueryParseError( "Query should be a YAML object describing the URL to query." ) if "url" not in query: raise QueryParseError("Query must include 'url' option.") method = query.get("method", "get") request_options = project(query, ("params", "headers", "data", "auth", "json")) fields = query.get("fields") path = query.get("path") if isinstance(request_options.get("auth", None), list): request_options["auth"] = tuple(request_options["auth"]) elif self.configuration.get("username") or self.configuration.get("password"): request_options["auth"] = ( self.configuration.get("username"), self.configuration.get("password"), ) if method not in ("get", "post"): raise QueryParseError("Only GET or POST methods are allowed.") if fields and not isinstance(fields, list): raise QueryParseError("'fields' needs to be a list.") response, error = self.get_response( query["url"], http_method=method, **request_options ) if error is not None: return None, error data = json_dumps(parse_json(response.json(), path, fields)) if data: return data, None else: return None, "Got empty response from '{}'.".format(query["url"]) register(JSON)