in redash/query_runner/athena.py [0:0]
def run_query(self, query, user):
cursor = pyathena.connect(
s3_staging_dir=self.configuration["s3_staging_dir"],
schema_name=self.configuration.get("schema", "default"),
encryption_option=self.configuration.get("encryption_option", None),
kms_key=self.configuration.get("kms_key", None),
work_group=self.configuration.get("work_group", "primary"),
formatter=SimpleFormatter(),
**self._get_iam_credentials(user=user)
).cursor()
try:
cursor.execute(query)
column_tuples = [
(i[0], _TYPE_MAPPINGS.get(i[1], None)) for i in cursor.description
]
columns = self.fetch_columns(column_tuples)
rows = [
dict(zip(([c["name"] for c in columns]), r))
for i, r in enumerate(cursor.fetchall())
]
qbytes = None
athena_query_id = None
try:
qbytes = cursor.data_scanned_in_bytes
except AttributeError as e:
logger.debug("Athena Upstream can't get data_scanned_in_bytes: %s", e)
try:
athena_query_id = cursor.query_id
except AttributeError as e:
logger.debug("Athena Upstream can't get query_id: %s", e)
price = self.configuration.get("cost_per_tb", 5)
data = {
"columns": columns,
"rows": rows,
"metadata": {
"data_scanned": qbytes,
"athena_query_id": athena_query_id,
"query_cost": price * qbytes * 10e-12,
},
}
json_data = json_dumps(data, ignore_nan=True)
error = None
except Exception:
if cursor.query_id:
cursor.cancel()
raise
return json_data, error