in redash/query_runner/trino.py [0:0]
def run_query(self, query, user):
if self.configuration.get("password"):
auth = trino.auth.BasicAuthentication(
username=self.configuration.get("username"),
password=self.configuration.get("password")
)
else:
auth = trino.constants.DEFAULT_AUTH
connection = trino.dbapi.connect(
http_scheme=self.configuration.get("protocol", "http"),
host=self.configuration.get("host", ""),
port=self.configuration.get("port", 8080),
catalog=self.configuration.get("catalog", "hive"),
schema=self.configuration.get("schema", "default"),
user=self.configuration.get("username"),
auth=auth
)
cursor = connection.cursor()
try:
cursor.execute(query)
results = cursor.fetchall()
description = cursor.description
columns = self.fetch_columns([
(c[0], TRINO_TYPES_MAPPING.get(c[1], None)) for c in description
])
rows = [
dict(zip([c["name"] for c in columns], r))
for r in results
]
data = {
"columns": columns,
"rows": rows
}
json_data = json_dumps(data)
error = None
except DatabaseError as db:
json_data = None
default_message = "Unspecified DatabaseError: {0}".format(str(db))
if isinstance(db.args[0], dict):
message = db.args[0].get("failureInfo", {"message", None}).get("message")
else:
message = None
error = default_message if message is None else message
except (KeyboardInterrupt, InterruptException, JobTimeoutException):
cursor.cancel()
raise
return json_data, error