redash/query_runner/couchbase.py (128 lines of code) (raw):

import datetime import logging from dateutil.parser import parse from redash.query_runner import * from redash.utils import JSONEncoder, json_dumps, json_loads, parse_human_time import json logger = logging.getLogger(__name__) try: import requests import httplib2 except ImportError as e: logger.error("Failed to import: " + str(e)) TYPES_MAP = { str: TYPE_STRING, bytes: TYPE_STRING, int: TYPE_INTEGER, float: TYPE_FLOAT, bool: TYPE_BOOLEAN, datetime.datetime: TYPE_DATETIME, datetime.datetime: TYPE_STRING, } def _get_column_by_name(columns, column_name): for c in columns: if "name" in c and c["name"] == column_name: return c return None def parse_results(results): rows = [] columns = [] for row in results: parsed_row = {} for key in row: if isinstance(row[key], dict): for inner_key in row[key]: column_name = "{}.{}".format(key, inner_key) if _get_column_by_name(columns, column_name) is None: columns.append( { "name": column_name, "friendly_name": column_name, "type": TYPES_MAP.get( type(row[key][inner_key]), TYPE_STRING ), } ) parsed_row[column_name] = row[key][inner_key] else: if _get_column_by_name(columns, key) is None: columns.append( { "name": key, "friendly_name": key, "type": TYPES_MAP.get(type(row[key]), TYPE_STRING), } ) parsed_row[key] = row[key] rows.append(parsed_row) return rows, columns class Couchbase(BaseQueryRunner): should_annotate_query = False noop_query = "Select 1" @classmethod def configuration_schema(cls): return { "type": "object", "properties": { "protocol": {"type": "string", "default": "http"}, "host": {"type": "string"}, "port": { "type": "string", "title": "Port (Defaults: 8095 - Analytics, 8093 - N1QL)", "default": "8095", }, "user": {"type": "string"}, "password": {"type": "string"}, }, "required": ["host", "user", "password"], "order": ["protocol", "host", "port", "user", "password"], "secret": ["password"], } def __init__(self, configuration): super(Couchbase, self).__init__(configuration) @classmethod def enabled(cls): return True def test_connection(self): result = self.call_service(self.noop_query, "") def get_buckets(self, query, name_param): defaultColumns = ["meta().id"] result = self.call_service(query, "").json()["results"] schema = {} for row in result: table_name = row.get(name_param) schema[table_name] = {"name": table_name, "columns": defaultColumns} return list(schema.values()) def get_schema(self, get_stats=False): try: # Try fetch from Analytics return self.get_buckets( "SELECT ds.GroupName as name FROM Metadata.`Dataset` ds where ds.DataverseName <> 'Metadata'", "name", ) except Exception: # Try fetch from N1QL return self.get_buckets("select name from system:keyspaces", "name") def call_service(self, query, user): try: user = self.configuration.get("user") password = self.configuration.get("password") protocol = self.configuration.get("protocol", "http") host = self.configuration.get("host") port = self.configuration.get("port", 8095) params = {"statement": query} url = "%s://%s:%s/query/service" % (protocol, host, port) r = requests.post(url, params=params, auth=(user, password)) r.raise_for_status() return r except requests.exceptions.HTTPError as err: if err.response.status_code == 401: raise Exception("Wrong username/password") raise Exception("Couchbase connection error") def run_query(self, query, user): result = self.call_service(query, user) rows, columns = parse_results(result.json()["results"]) data = {"columns": columns, "rows": rows} return json_dumps(data), None @classmethod def name(cls): return "Couchbase" register(Couchbase)