in redash/query_runner/qubole.py [0:0]
def run_query(self, query, user):
qbol.configure(
api_token=self.configuration.get("token"),
api_url="%s/api" % self.configuration.get("endpoint"),
)
try:
query_type = self.configuration.get("query_type", "hive")
if query_type == "quantum":
cmd = SqlCommand.create(query=query)
elif query_type == "hive":
cmd = HiveCommand.create(
query=query, label=self.configuration.get("cluster")
)
elif query_type == "presto":
cmd = PrestoCommand.create(
query=query, label=self.configuration.get("cluster")
)
else:
raise Exception(
"Invalid Query Type:%s.\
It must be : hive / presto / quantum."
% self.configuration.get("query_type")
)
logging.info(
"Qubole command created with Id: %s and Status: %s", cmd.id, cmd.status
)
while not Command.is_done(cmd.status):
time.sleep(qbol.poll_interval)
cmd = Command.find(cmd.id)
logging.info("Qubole command Id: %s and Status: %s", cmd.id, cmd.status)
rows = []
columns = []
error = None
if cmd.status == "done":
fp = StringIO()
cmd.get_results(
fp=fp,
inline=True,
delim="\t",
fetch=False,
qlog=None,
arguments=["true"],
)
results = fp.getvalue()
fp.close()
data = results.split("\r\n")
columns = self.fetch_columns(
[(i, TYPE_STRING) for i in data.pop(0).split("\t")]
)
rows = [
dict(zip((column["name"] for column in columns), row.split("\t")))
for row in data
]
json_data = json_dumps({"columns": columns, "rows": rows})
except (KeyboardInterrupt, JobTimeoutException):
logging.info("Sending KILL signal to Qubole Command Id: %s", cmd.id)
cmd.cancel()
raise
return json_data, error