redash/query_runner/mysql.py (234 lines of code) (raw):
import logging
import os
import threading
from redash.query_runner import (
TYPE_FLOAT,
TYPE_INTEGER,
TYPE_DATETIME,
TYPE_STRING,
TYPE_DATE,
BaseSQLQueryRunner,
InterruptException,
JobTimeoutException,
register,
)
from redash.settings import parse_boolean
from redash.utils import json_dumps, json_loads
try:
import MySQLdb
enabled = True
except ImportError:
enabled = False
logger = logging.getLogger(__name__)
types_map = {
0: TYPE_FLOAT,
1: TYPE_INTEGER,
2: TYPE_INTEGER,
3: TYPE_INTEGER,
4: TYPE_FLOAT,
5: TYPE_FLOAT,
7: TYPE_DATETIME,
8: TYPE_INTEGER,
9: TYPE_INTEGER,
10: TYPE_DATE,
12: TYPE_DATETIME,
15: TYPE_STRING,
16: TYPE_INTEGER,
246: TYPE_FLOAT,
253: TYPE_STRING,
254: TYPE_STRING,
}
class Result(object):
def __init__(self):
pass
class Mysql(BaseSQLQueryRunner):
noop_query = "SELECT 1"
@classmethod
def configuration_schema(cls):
show_ssl_settings = parse_boolean(
os.environ.get("MYSQL_SHOW_SSL_SETTINGS", "true")
)
schema = {
"type": "object",
"properties": {
"host": {"type": "string", "default": "127.0.0.1"},
"user": {"type": "string"},
"passwd": {"type": "string", "title": "Password"},
"db": {"type": "string", "title": "Database name"},
"port": {"type": "number", "default": 3306},
},
"order": ["host", "port", "user", "passwd", "db"],
"required": ["db"],
"secret": ["passwd"],
}
if show_ssl_settings:
schema["properties"].update(
{
"use_ssl": {"type": "boolean", "title": "Use SSL"},
"ssl_cacert": {
"type": "string",
"title": "Path to CA certificate file to verify peer against (SSL)",
},
"ssl_cert": {
"type": "string",
"title": "Path to client certificate file (SSL)",
},
"ssl_key": {
"type": "string",
"title": "Path to private key file (SSL)",
},
}
)
return schema
@classmethod
def name(cls):
return "MySQL"
@classmethod
def enabled(cls):
return enabled
def _connection(self):
params = dict(
host=self.configuration.get("host", ""),
user=self.configuration.get("user", ""),
passwd=self.configuration.get("passwd", ""),
db=self.configuration["db"],
port=self.configuration.get("port", 3306),
charset="utf8",
use_unicode=True,
connect_timeout=60,
)
ssl_options = self._get_ssl_parameters()
if ssl_options:
params["ssl"] = ssl_options
connection = MySQLdb.connect(**params)
return connection
def _get_tables(self, schema):
query = """
SELECT col.table_schema as table_schema,
col.table_name as table_name,
col.column_name as column_name
FROM `information_schema`.`columns` col
WHERE col.table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys');
"""
results, error = self.run_query(query, None)
if error is not None:
raise Exception("Failed getting schema.")
results = json_loads(results)
for row in results["rows"]:
if row["table_schema"] != self.configuration["db"]:
table_name = "{}.{}".format(row["table_schema"], row["table_name"])
else:
table_name = row["table_name"]
if table_name not in schema:
schema[table_name] = {"name": table_name, "columns": []}
schema[table_name]["columns"].append(row["column_name"])
return list(schema.values())
def run_query(self, query, user):
ev = threading.Event()
thread_id = ""
r = Result()
t = None
try:
connection = self._connection()
thread_id = connection.thread_id()
t = threading.Thread(
target=self._run_query, args=(query, user, connection, r, ev)
)
t.start()
while not ev.wait(1):
pass
except (KeyboardInterrupt, InterruptException, JobTimeoutException):
self._cancel(thread_id)
t.join()
raise
return r.json_data, r.error
def _run_query(self, query, user, connection, r, ev):
try:
cursor = connection.cursor()
logger.debug("MySQL running query: %s", query)
cursor.execute(query)
data = cursor.fetchall()
desc = cursor.description
while cursor.nextset():
if cursor.description is not None:
data = cursor.fetchall()
desc = cursor.description
# TODO - very similar to pg.py
if desc is not None:
columns = self.fetch_columns(
[(i[0], types_map.get(i[1], None)) for i in desc]
)
rows = [
dict(zip((column["name"] for column in columns), row))
for row in data
]
data = {"columns": columns, "rows": rows}
r.json_data = json_dumps(data)
r.error = None
else:
r.json_data = None
r.error = "No data was returned."
cursor.close()
except MySQLdb.Error as e:
if cursor:
cursor.close()
r.json_data = None
r.error = e.args[1]
finally:
ev.set()
if connection:
connection.close()
def _get_ssl_parameters(self):
if not self.configuration.get("use_ssl"):
return None
ssl_params = {}
if self.configuration.get("use_ssl"):
config_map = {"ssl_cacert": "ca", "ssl_cert": "cert", "ssl_key": "key"}
for key, cfg in config_map.items():
val = self.configuration.get(key)
if val:
ssl_params[cfg] = val
return ssl_params
def _cancel(self, thread_id):
connection = None
cursor = None
error = None
try:
connection = self._connection()
cursor = connection.cursor()
query = "KILL %d" % (thread_id)
logging.debug(query)
cursor.execute(query)
except MySQLdb.Error as e:
if cursor:
cursor.close()
error = e.args[1]
finally:
if connection:
connection.close()
return error
class RDSMySQL(Mysql):
@classmethod
def name(cls):
return "MySQL (Amazon RDS)"
@classmethod
def type(cls):
return "rds_mysql"
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"host": {"type": "string"},
"user": {"type": "string"},
"passwd": {"type": "string", "title": "Password"},
"db": {"type": "string", "title": "Database name"},
"port": {"type": "number", "default": 3306},
"use_ssl": {"type": "boolean", "title": "Use SSL"},
},
"order": ["host", "port", "user", "passwd", "db"],
"required": ["db", "user", "passwd", "host"],
"secret": ["passwd"],
}
def _get_ssl_parameters(self):
if self.configuration.get("use_ssl"):
ca_path = os.path.join(
os.path.dirname(__file__), "./files/rds-combined-ca-bundle.pem"
)
return {"ca": ca_path}
return None
register(Mysql)
register(RDSMySQL)