redash/query_runner/clickhouse.py (157 lines of code) (raw):
import logging
import re
from urllib.parse import urlparse
import requests
from redash.query_runner import *
from redash.utils import json_dumps, json_loads
logger = logging.getLogger(__name__)
class ClickHouse(BaseSQLQueryRunner):
noop_query = "SELECT 1"
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"url": {"type": "string", "default": "http://127.0.0.1:8123"},
"user": {"type": "string", "default": "default"},
"password": {"type": "string"},
"dbname": {"type": "string", "title": "Database Name"},
"timeout": {
"type": "number",
"title": "Request Timeout",
"default": 30,
},
"verify": {
"type": "boolean",
"title": "Verify SSL certificate",
"default": True,
},
},
"order": ["url", "user", "password", "dbname"],
"required": ["dbname"],
"extra_options": ["timeout", "verify"],
"secret": ["password"],
}
@classmethod
def type(cls):
return "clickhouse"
@property
def _url(self):
return urlparse(self.configuration["url"])
@_url.setter
def _url(self, url):
self.configuration["url"] = url.geturl()
@property
def host(self):
return self._url.hostname
@host.setter
def host(self, host):
self._url = self._url._replace(netloc="{}:{}".format(host, self._url.port))
@property
def port(self):
return self._url.port
@port.setter
def port(self, port):
self._url = self._url._replace(netloc="{}:{}".format(self._url.hostname, port))
def _get_tables(self, schema):
query = "SELECT database, table, name FROM system.columns WHERE database NOT IN ('system')"
results, error = self.run_query(query, None)
if error is not None:
raise Exception("Failed getting schema.")
results = json_loads(results)
for row in results["rows"]:
table_name = "{}.{}".format(row["database"], row["table"])
if table_name not in schema:
schema[table_name] = {"name": table_name, "columns": []}
schema[table_name]["columns"].append(row["name"])
return list(schema.values())
def _send_query(self, data, stream=False):
url = self.configuration.get("url", "http://127.0.0.1:8123")
try:
verify = self.configuration.get("verify", True)
r = requests.post(
url,
data=data.encode("utf-8","ignore"),
stream=stream,
timeout=self.configuration.get("timeout", 30),
params={
"user": self.configuration.get("user", "default"),
"password": self.configuration.get("password", ""),
"database": self.configuration["dbname"],
},
verify=verify,
)
if r.status_code != 200:
raise Exception(r.text)
# logging.warning(r.json())
return r.json()
except requests.RequestException as e:
if e.response:
details = "({}, Status Code: {})".format(
e.__class__.__name__, e.response.status_code
)
else:
details = "({})".format(e.__class__.__name__)
raise Exception("Connection error to: {} {}.".format(url, details))
@staticmethod
def _define_column_type(column):
c = column.lower()
f = re.search(r"^nullable\((.*)\)$", c)
if f is not None:
c = f.group(1)
if c.startswith("int") or c.startswith("uint"):
return TYPE_INTEGER
elif c.startswith("float"):
return TYPE_FLOAT
elif c == "datetime":
return TYPE_DATETIME
elif c == "date":
return TYPE_DATE
else:
return TYPE_STRING
def _clickhouse_query(self, query):
query += "\nFORMAT JSON"
result = self._send_query(query)
columns = []
columns_int64 = [] # db converts value to string if its type equals UInt64
columns_totals = {}
for r in result["meta"]:
column_name = r["name"]
column_type = self._define_column_type(r["type"])
if r["type"] in ("Int64", "UInt64", "Nullable(Int64)", "Nullable(UInt64)"):
columns_int64.append(column_name)
else:
columns_totals[column_name] = (
"Total" if column_type == TYPE_STRING else None
)
columns.append(
{"name": column_name, "friendly_name": column_name, "type": column_type}
)
rows = result["data"]
for row in rows:
for column in columns_int64:
try:
row[column] = int(row[column])
except TypeError:
row[column] = None
if "totals" in result:
totals = result["totals"]
for column, value in columns_totals.items():
totals[column] = value
rows.append(totals)
return {"columns": columns, "rows": rows}
def run_query(self, query, user):
logger.debug("Clickhouse is about to execute query: %s", query)
if query == "":
json_data = None
error = "Query is empty"
return json_data, error
try:
q = self._clickhouse_query(query)
data = json_dumps(q)
error = None
except Exception as e:
data = None
logging.exception(e)
error = str(e)
return data, error
register(ClickHouse)