def run_query()

in redash/query_runner/athena.py [0:0]


    def run_query(self, query, user):
        cursor = pyathena.connect(
            s3_staging_dir=self.configuration["s3_staging_dir"],
            schema_name=self.configuration.get("schema", "default"),
            encryption_option=self.configuration.get("encryption_option", None),
            kms_key=self.configuration.get("kms_key", None),
            work_group=self.configuration.get("work_group", "primary"),
            formatter=SimpleFormatter(),
            **self._get_iam_credentials(user=user)
        ).cursor()

        try:
            cursor.execute(query)
            column_tuples = [
                (i[0], _TYPE_MAPPINGS.get(i[1], None)) for i in cursor.description
            ]
            columns = self.fetch_columns(column_tuples)
            rows = [
                dict(zip(([c["name"] for c in columns]), r))
                for i, r in enumerate(cursor.fetchall())
            ]
            qbytes = None
            athena_query_id = None
            try:
                qbytes = cursor.data_scanned_in_bytes
            except AttributeError as e:
                logger.debug("Athena Upstream can't get data_scanned_in_bytes: %s", e)
            try:
                athena_query_id = cursor.query_id
            except AttributeError as e:
                logger.debug("Athena Upstream can't get query_id: %s", e)

            price = self.configuration.get("cost_per_tb", 5)
            data = {
                "columns": columns,
                "rows": rows,
                "metadata": {
                    "data_scanned": qbytes,
                    "athena_query_id": athena_query_id,
                    "query_cost": price * qbytes * 10e-12,
                },
            }

            json_data = json_dumps(data, ignore_nan=True)
            error = None
        except Exception:
            if cursor.query_id:
                cursor.cancel()
            raise

        return json_data, error