redash/query_runner/sparql_endpoint.py (145 lines of code) (raw):
"""Provide the query runner for SPARQL Endpoints.
seeAlso: https://www.w3.org/TR/rdf-sparql-query/
"""
import logging
import json
from os import environ
import re
from redash.query_runner import BaseQueryRunner
from redash.utils import json_dumps, json_loads
from . import register
try:
import requests
from cmem.cmempy.queries import SparqlQuery
from rdflib.plugins.sparql import prepareQuery
enabled = True
except ImportError:
enabled = False
logger = logging.getLogger(__name__)
class SPARQLEndpointQueryRunner(BaseQueryRunner):
"""Use SPARQL Endpoint as redash data source"""
# These environment keys are used by cmempy
KNOWN_CONFIG_KEYS = ("SPARQL_BASE_URI", "SSL_VERIFY")
# These variables hold secret data and should NOT be logged
KNOWN_SECRET_KEYS = ()
# This allows for an easy connection test
noop_query = "SELECT ?noop WHERE {BIND('noop' as ?noop)}"
def __init__(self, configuration):
"""init the class and configuration"""
super(SPARQLEndpointQueryRunner, self).__init__(configuration)
self.configuration = configuration
def _setup_environment(self):
"""provide environment for rdflib
rdflib environment variables need to match key in the properties
object of the configuration_schema
"""
for key in self.KNOWN_CONFIG_KEYS:
if key in environ:
environ.pop(key)
value = self.configuration.get(key, None)
if value is not None:
environ[key] = str(value)
if key in self.KNOWN_SECRET_KEYS:
logger.info("{} set by config".format(key))
else:
logger.info("{} set by config to {}".format(key, environ[key]))
@staticmethod
def _transform_sparql_results(results):
"""transforms a SPARQL query result to a redash query result
source structure: SPARQL 1.1 Query Results JSON Format
- seeAlso: https://www.w3.org/TR/sparql11-results-json/
target structure: redash result set
there is no good documentation available
so here an example result set as needed for redash:
data = {
"columns": [ {"name": "name", "type": "string", "friendly_name": "friendly name"}],
"rows": [
{"name": "value 1"},
{"name": "value 2"}
]}
FEATURE?: During the sparql_row loop, we could check the data types of the
values and, in case they are all the same, choose something better than
just string.
"""
logger.info("results are: {}".format(results))
# Not sure why we do not use the json package here but all other
# query runner do it the same way :-)
sparql_results = json_loads(results)
# transform all bindings to redash rows
rows = []
for sparql_row in sparql_results["results"]["bindings"]:
row = {}
for var in sparql_results["head"]["vars"]:
try:
row[var] = sparql_row[var]["value"]
except KeyError:
# not bound SPARQL variables are set as empty strings
row[var] = ""
rows.append(row)
# transform all vars to redash columns
columns = []
for var in sparql_results["head"]["vars"]:
columns.append({"name": var, "friendly_name": var, "type": "string"})
# Not sure why we do not use the json package here but all other
# query runner do it the same way :-)
return json_dumps({"columns": columns, "rows": rows})
@classmethod
def name(cls):
return "SPARQL Endpoint"
@classmethod
def enabled(cls):
return enabled
@classmethod
def type(cls):
return "sparql_endpoint"
def remove_comments(self, string):
return string[string.index("*/") + 2 :].strip()
def run_query(self, query, user):
"""send a query to a sparql endpoint"""
logger.info("about to execute query (user='{}'): {}".format(user, query))
query_text = self.remove_comments(query)
query = SparqlQuery(query_text)
query_type = query.get_query_type()
if query_type not in ["SELECT", None]:
raise ValueError(
"Queries of type {} can not be processed by redash.".format(query_type)
)
self._setup_environment()
try:
endpoint = self.configuration.get("SPARQL_BASE_URI")
r = requests.get(
endpoint,
params=dict(query=query_text),
headers=dict(Accept="application/json"),
)
data = self._transform_sparql_results(r.text)
except Exception as error:
logger.info("Error: {}".format(error))
try:
# try to load Problem Details for HTTP API JSON
details = json.loads(error.response.text)
error = ""
if "title" in details:
error += details["title"] + ": "
if "detail" in details:
error += details["detail"]
return None, error
except Exception:
pass
return None, error
error = None
return data, error
@classmethod
def configuration_schema(cls):
"""provide the configuration of the data source as json schema"""
return {
"type": "object",
"properties": {
"SPARQL_BASE_URI": {"type": "string", "title": "Base URL"},
"SSL_VERIFY": {
"type": "boolean",
"title": "Verify SSL certificates for API requests",
"default": True,
},
},
"required": ["SPARQL_BASE_URI"],
"secret": [],
"extra_options": ["SSL_VERIFY"],
}
def get_schema(self, get_stats=False):
"""Get the schema structure (prefixes, graphs)."""
schema = dict()
schema["1"] = {
"name": "-> Common Prefixes <-",
"columns": self._get_common_prefixes_schema(),
}
schema["2"] = {"name": "-> Graphs <-", "columns": self._get_graphs_schema()}
# schema.update(self._get_query_schema())
logger.info(f"Getting Schema Values: {schema.values()}")
return schema.values()
def _get_graphs_schema(self):
"""Get a list of readable graph FROM clause strings."""
self._setup_environment()
endpoint = self.configuration.get("SPARQL_BASE_URI")
query_text = "SELECT DISTINCT ?g WHERE {GRAPH ?g {?s ?p ?o}}"
r = requests.get(
endpoint,
params=dict(query=query_text),
headers=dict(Accept="application/json"),
).json()
graph_iris = [g.get("g").get("value") for g in r.get("results").get("bindings")]
graphs = []
for graph in graph_iris:
graphs.append("FROM <{}>".format(graph))
return graphs
@staticmethod
def _get_common_prefixes_schema():
"""Get a list of SPARQL prefix declarations."""
common_prefixes = [
"PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>",
"PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>",
"PREFIX owl: <http://www.w3.org/2002/07/owl#>",
"PREFIX schema: <http://schema.org/>",
"PREFIX dct: <http://purl.org/dc/terms/>",
"PREFIX skos: <http://www.w3.org/2004/02/skos/core#>",
]
return common_prefixes
register(SPARQLEndpointQueryRunner)