redash/query_runner/elasticsearch.py (422 lines of code) (raw):
import logging
import sys
import urllib.request
import urllib.parse
import urllib.error
import requests
from requests.auth import HTTPBasicAuth
from redash.query_runner import *
from redash.utils import json_dumps, json_loads
try:
import http.client as http_client
except ImportError:
# Python 2
import http.client as http_client
logger = logging.getLogger(__name__)
ELASTICSEARCH_TYPES_MAPPING = {
"integer": TYPE_INTEGER,
"long": TYPE_INTEGER,
"float": TYPE_FLOAT,
"double": TYPE_FLOAT,
"boolean": TYPE_BOOLEAN,
"string": TYPE_STRING,
"date": TYPE_DATE,
"object": TYPE_STRING,
# "geo_point" TODO: Need to split to 2 fields somehow
}
ELASTICSEARCH_BUILTIN_FIELDS_MAPPING = {"_id": "Id", "_score": "Score"}
PYTHON_TYPES_MAPPING = {
str: TYPE_STRING,
bytes: TYPE_STRING,
bool: TYPE_BOOLEAN,
int: TYPE_INTEGER,
float: TYPE_FLOAT,
}
class BaseElasticSearch(BaseQueryRunner):
should_annotate_query = False
DEBUG_ENABLED = False
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"server": {"type": "string", "title": "Base URL"},
"basic_auth_user": {"type": "string", "title": "Basic Auth User"},
"basic_auth_password": {
"type": "string",
"title": "Basic Auth Password",
},
},
"order": ["server", "basic_auth_user", "basic_auth_password"],
"secret": ["basic_auth_password"],
"required": ["server"],
}
@classmethod
def enabled(cls):
return False
def __init__(self, configuration):
super(BaseElasticSearch, self).__init__(configuration)
self.syntax = "json"
if self.DEBUG_ENABLED:
http_client.HTTPConnection.debuglevel = 1
# you need to initialize logging, otherwise you will not see anything from requests
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
logger.setLevel(logging.DEBUG)
self.server_url = self.configuration["server"]
if self.server_url[-1] == "/":
self.server_url = self.server_url[:-1]
basic_auth_user = self.configuration.get("basic_auth_user", None)
basic_auth_password = self.configuration.get("basic_auth_password", None)
self.auth = None
if basic_auth_user and basic_auth_password:
self.auth = HTTPBasicAuth(basic_auth_user, basic_auth_password)
def _get_mappings(self, url):
mappings = {}
error = None
try:
r = requests.get(url, auth=self.auth)
r.raise_for_status()
mappings = r.json()
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(
r.status_code, r.text
)
mappings = None
except requests.exceptions.RequestException as e:
logger.exception(e)
error = "Connection refused"
mappings = None
return mappings, error
def _get_query_mappings(self, url):
mappings_data, error = self._get_mappings(url)
if error:
return mappings_data, error
mappings = {}
for index_name in mappings_data:
index_mappings = mappings_data[index_name]
for m in index_mappings.get("mappings", {}):
if "properties" not in index_mappings["mappings"][m]:
continue
for property_name in index_mappings["mappings"][m]["properties"]:
property_data = index_mappings["mappings"][m]["properties"][
property_name
]
if property_name not in mappings:
property_type = property_data.get("type", None)
if property_type:
if property_type in ELASTICSEARCH_TYPES_MAPPING:
mappings[property_name] = ELASTICSEARCH_TYPES_MAPPING[
property_type
]
else:
mappings[property_name] = TYPE_STRING
# raise Exception("Unknown property type: {0}".format(property_type))
return mappings, error
def get_schema(self, *args, **kwargs):
def parse_doc(doc, path=None):
"""Recursively parse a doc type dictionary
"""
path = path or []
result = []
for field, description in doc["properties"].items():
if "properties" in description:
result.extend(parse_doc(description, path + [field]))
else:
result.append(".".join(path + [field]))
return result
schema = {}
url = "{0}/_mappings".format(self.server_url)
mappings, error = self._get_mappings(url)
if mappings:
# make a schema for each index
# the index contains a mappings dict with documents
# in a hierarchical format
for name, index in mappings.items():
columns = []
schema[name] = {"name": name}
for doc, items in index["mappings"].items():
columns.extend(parse_doc(items))
# remove duplicates
# sort alphabetically
schema[name]["columns"] = sorted(set(columns))
return list(schema.values())
def _parse_results(
self, mappings, result_fields, raw_result, result_columns, result_rows
):
def add_column_if_needed(
mappings, column_name, friendly_name, result_columns, result_columns_index
):
if friendly_name not in result_columns_index:
result_columns.append(
{
"name": friendly_name,
"friendly_name": friendly_name,
"type": mappings.get(column_name, "string"),
}
)
result_columns_index[friendly_name] = result_columns[-1]
def get_row(rows, row):
if row is None:
row = {}
rows.append(row)
return row
def collect_value(mappings, row, key, value, type):
if result_fields and key not in result_fields_index:
return
mappings[key] = type
add_column_if_needed(
mappings, key, key, result_columns, result_columns_index
)
row[key] = value
def collect_aggregations(
mappings, rows, parent_key, data, row, result_columns, result_columns_index
):
if isinstance(data, dict):
for key, value in data.items():
val = collect_aggregations(
mappings,
rows,
parent_key if key == "buckets" else key,
value,
row,
result_columns,
result_columns_index,
)
if val:
row = get_row(rows, row)
collect_value(mappings, row, key, val, "long")
for data_key in ["value", "doc_count"]:
if data_key not in data:
continue
if "key" in data and len(list(data.keys())) == 2:
key_is_string = "key_as_string" in data
collect_value(
mappings,
row,
data["key"] if not key_is_string else data["key_as_string"],
data[data_key],
"long" if not key_is_string else "string",
)
else:
return data[data_key]
elif isinstance(data, list):
for value in data:
result_row = get_row(rows, row)
collect_aggregations(
mappings,
rows,
parent_key,
value,
result_row,
result_columns,
result_columns_index,
)
if "doc_count" in value:
collect_value(
mappings,
result_row,
"doc_count",
value["doc_count"],
"integer",
)
if "key" in value:
if "key_as_string" in value:
collect_value(
mappings,
result_row,
parent_key,
value["key_as_string"],
"string",
)
else:
collect_value(
mappings, result_row, parent_key, value["key"], "string"
)
return None
result_columns_index = {c["name"]: c for c in result_columns}
result_fields_index = {}
if result_fields:
for r in result_fields:
result_fields_index[r] = None
if "error" in raw_result:
error = raw_result["error"]
if len(error) > 10240:
error = error[:10240] + "... continues"
raise Exception(error)
elif "aggregations" in raw_result:
if result_fields:
for field in result_fields:
add_column_if_needed(
mappings, field, field, result_columns, result_columns_index
)
for key, data in raw_result["aggregations"].items():
collect_aggregations(
mappings,
result_rows,
key,
data,
None,
result_columns,
result_columns_index,
)
logger.debug("result_rows %s", str(result_rows))
logger.debug("result_columns %s", str(result_columns))
elif "hits" in raw_result and "hits" in raw_result["hits"]:
if result_fields:
for field in result_fields:
add_column_if_needed(
mappings, field, field, result_columns, result_columns_index
)
for h in raw_result["hits"]["hits"]:
row = {}
column_name = "_source" if "_source" in h else "fields"
for column in h[column_name]:
if result_fields and column not in result_fields_index:
continue
add_column_if_needed(
mappings, column, column, result_columns, result_columns_index
)
value = h[column_name][column]
row[column] = (
value[0]
if isinstance(value, list) and len(value) == 1
else value
)
result_rows.append(row)
else:
raise Exception(
"Redash failed to parse the results it got from Elasticsearch."
)
def test_connection(self):
try:
r = requests.get(
"{0}/_cluster/health".format(self.server_url), auth=self.auth
)
r.raise_for_status()
except requests.HTTPError as e:
logger.exception(e)
raise Exception(
"Failed to execute query. Return Code: {0} Reason: {1}".format(
r.status_code, r.text
)
)
except requests.exceptions.RequestException as e:
logger.exception(e)
raise Exception("Connection refused")
class Kibana(BaseElasticSearch):
@classmethod
def enabled(cls):
return True
def _execute_simple_query(
self, url, auth, _from, mappings, result_fields, result_columns, result_rows
):
url += "&from={0}".format(_from)
r = requests.get(url, auth=self.auth)
r.raise_for_status()
raw_result = r.json()
self._parse_results(
mappings, result_fields, raw_result, result_columns, result_rows
)
total = raw_result["hits"]["total"]
result_size = len(raw_result["hits"]["hits"])
logger.debug("Result Size: {0} Total: {1}".format(result_size, total))
return raw_result["hits"]["total"]
def run_query(self, query, user):
try:
error = None
logger.debug(query)
query_params = json_loads(query)
index_name = query_params["index"]
query_data = query_params["query"]
size = int(query_params.get("size", 500))
limit = int(query_params.get("limit", 500))
result_fields = query_params.get("fields", None)
sort = query_params.get("sort", None)
if not self.server_url:
error = "Missing configuration key 'server'"
return None, error
url = "{0}/{1}/_search?".format(self.server_url, index_name)
mapping_url = "{0}/{1}/_mapping".format(self.server_url, index_name)
mappings, error = self._get_query_mappings(mapping_url)
if error:
return None, error
if sort:
url += "&sort={0}".format(urllib.parse.quote_plus(sort))
url += "&q={0}".format(urllib.parse.quote_plus(query_data))
logger.debug("Using URL: {0}".format(url))
logger.debug("Using Query: {0}".format(query_data))
result_columns = []
result_rows = []
if isinstance(query_data, str):
_from = 0
while True:
query_size = size if limit >= (_from + size) else (limit - _from)
total = self._execute_simple_query(
url + "&size={0}".format(query_size),
self.auth,
_from,
mappings,
result_fields,
result_columns,
result_rows,
)
_from += size
if _from >= limit:
break
else:
# TODO: Handle complete ElasticSearch queries (JSON based sent over HTTP POST)
raise Exception("Advanced queries are not supported")
json_data = json_dumps({"columns": result_columns, "rows": result_rows})
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(
r.status_code, r.text
)
json_data = None
except requests.exceptions.RequestException as e:
logger.exception(e)
error = "Connection refused"
json_data = None
return json_data, error
class ElasticSearch(BaseElasticSearch):
@classmethod
def enabled(cls):
return True
@classmethod
def name(cls):
return "Elasticsearch"
def run_query(self, query, user):
try:
error = None
logger.debug(query)
query_dict = json_loads(query)
index_name = query_dict.pop("index", "")
result_fields = query_dict.pop("result_fields", None)
if not self.server_url:
error = "Missing configuration key 'server'"
return None, error
url = "{0}/{1}/_search".format(self.server_url, index_name)
mapping_url = "{0}/{1}/_mapping".format(self.server_url, index_name)
mappings, error = self._get_query_mappings(mapping_url)
if error:
return None, error
logger.debug("Using URL: %s", url)
logger.debug("Using query: %s", query_dict)
r = requests.get(url, json=query_dict, auth=self.auth)
r.raise_for_status()
logger.debug("Result: %s", r.json())
result_columns = []
result_rows = []
self._parse_results(
mappings, result_fields, r.json(), result_columns, result_rows
)
json_data = json_dumps({"columns": result_columns, "rows": result_rows})
except (KeyboardInterrupt, JobTimeoutException):
logger.exception(e)
raise
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(
r.status_code, r.text
)
json_data = None
except requests.exceptions.RequestException as e:
logger.exception(e)
error = "Connection refused"
json_data = None
return json_data, error
register(Kibana)
register(ElasticSearch)