redash/query_runner/rockset.py (94 lines of code) (raw):

import requests from redash.query_runner import * from redash.utils import json_dumps def _get_type(value): if isinstance(value, int): return TYPE_INTEGER elif isinstance(value, float): return TYPE_FLOAT elif isinstance(value, bool): return TYPE_BOOLEAN elif isinstance(value, str): return TYPE_STRING return TYPE_STRING # The following is here, because Rockset's PyPi package is Python 3 only. # Should be removed once we move to Python 3. class RocksetAPI(object): def __init__(self, api_key, api_server): self.api_key = api_key self.api_server = api_server def _request(self, endpoint, method='GET', body=None): headers = {'Authorization': 'ApiKey {}'.format(self.api_key), 'User-Agent': 'rest:redash/1.0'} url = '{}/v1/orgs/self/{}'.format(self.api_server, endpoint) if method == "GET": r = requests.get(url, headers=headers) return r.json() elif method == "POST": r = requests.post(url, headers=headers, json=body) return r.json() else: raise "Unknown method: {}".format(method) def list_workspaces(self): response = self._request('ws') return [x['name'] for x in response['data'] if x['collection_count'] > 0] def list_collections(self, workspace='commons'): response = self._request('ws/{}/collections'.format(workspace)) return [x['name'] for x in response['data']] def collection_columns(self, workspace, collection): response = self.query('DESCRIBE "{}"."{}" OPTION(max_field_depth=1)'.format(workspace, collection)) return sorted(set([x['field'][0] for x in response['results']])) def query(self, sql): return self._request("queries", "POST", {"sql": {"query": sql}}) class Rockset(BaseSQLQueryRunner): noop_query = "SELECT 1" @classmethod def configuration_schema(cls): return { "type": "object", "properties": { "api_server": { "type": "string", "title": "API Server", "default": "https://api.rs2.usw2.rockset.com", }, "api_key": {"title": "API Key", "type": "string"}, }, "order": ["api_key", "api_server"], "required": ["api_server", "api_key"], "secret": ["api_key"], } @classmethod def type(cls): return "rockset" def __init__(self, configuration): super(Rockset, self).__init__(configuration) self.api = RocksetAPI( self.configuration.get("api_key"), self.configuration.get("api_server", "https://api.rs2.usw2.rockset.com"), ) def _get_tables(self, schema): for workspace in self.api.list_workspaces(): for collection in self.api.list_collections(workspace): table_name = collection if workspace == 'commons' else '{}.{}'.format(workspace, collection) schema[table_name] = { 'name': table_name, 'columns': self.api.collection_columns(workspace, collection) } return sorted(schema.values(), key=lambda x: x['name']) def run_query(self, query, user): results = self.api.query(query) if "code" in results and results["code"] != 200: return None, "{}: {}".format(results["type"], results["message"]) if "results" not in results: message = results.get("message", "Unknown response from Rockset.") return None, message rows = results["results"] columns = [] if len(rows) > 0: columns = [] for k in rows[0]: columns.append( {"name": k, "friendly_name": k, "type": _get_type(rows[0][k])} ) data = json_dumps({"columns": columns, "rows": rows}) return data, None register(Rockset)