def _get_connection()

in redash/query_runner/pg.py [0:0]


    def _get_connection(self):

        sslrootcert_path = os.path.join(
            os.path.dirname(__file__), "./files/redshift-ca-bundle.crt"
        )

        login_method = self._login_method_selection()

        if login_method == "KEYS":
            client = boto3.client(
                "redshift",
                region_name=self.configuration.get("aws_region"),
                aws_access_key_id=self.configuration.get("aws_access_key_id"),
                aws_secret_access_key=self.configuration.get("aws_secret_access_key"),
            )
        elif login_method == "ROLE":
            client = boto3.client(
                "redshift", region_name=self.configuration.get("aws_region")
            )
        else:
            if login_method == "ASSUME_ROLE_KEYS":
                assume_client = client = boto3.client(
                    "sts",
                    region_name=self.configuration.get("aws_region"),
                    aws_access_key_id=self.configuration.get("aws_access_key_id"),
                    aws_secret_access_key=self.configuration.get(
                        "aws_secret_access_key"
                    ),
                )
            else:
                assume_client = client = boto3.client(
                    "sts", region_name=self.configuration.get("aws_region")
                )
            role_session = f"redash_{uuid4().hex}"
            session_keys = assume_client.assume_role(
                RoleArn=self.configuration.get("rolename"), RoleSessionName=role_session
            )["Credentials"]
            client = boto3.client(
                "redshift",
                region_name=self.configuration.get("aws_region"),
                aws_access_key_id=session_keys["AccessKeyId"],
                aws_secret_access_key=session_keys["SecretAccessKey"],
                aws_session_token=session_keys["SessionToken"],
            )
        credentials = client.get_cluster_credentials(
            DbUser=self.configuration.get("user"),
            DbName=self.configuration.get("dbname"),
            ClusterIdentifier=self.configuration.get("clusterid"),
        )
        db_user = credentials["DbUser"]
        db_password = credentials["DbPassword"]
        connection = psycopg2.connect(
            user=db_user,
            password=db_password,
            host=self.configuration.get("host"),
            port=self.configuration.get("port"),
            dbname=self.configuration.get("dbname"),
            sslmode=self.configuration.get("sslmode", "prefer"),
            sslrootcert=sslrootcert_path,
            async_=True,
        )

        return connection