def _verify_iam_roles()

in cli/src/klio_cli/commands/job/verify.py [0:0]


    def _verify_iam_roles(self):
        logging.info("Verifying IAM roles for the job's service account")

        service_account = (
            self.klio_config.pipeline_options.service_account_email
        )
        if not service_account:
            try:
                response = (
                    self.compute_client.projects()
                    .get(project=self.project)
                    .execute()
                )
            except google_errors.HttpError as e:
                logging.error(
                    "Error verifying IAM roles: could"
                    + " not get project information. "
                    "Skipping.",
                    exc_info=e,
                )
                return False
            service_account = response["defaultServiceAccount"]

        try:
            response = (
                self.iam_client.projects()
                .getIamPolicy(resource=self.project, body={})
                .execute()
            )
        except google_errors.HttpError as e:
            logging.error(
                "Error verifying IAM roles: could"
                + " not get IAM policies on job's "
                "service account. Skipping.",
                exc_info=e,
            )
            return False
        bindings = response.get("bindings", [])
        svc_account_string = "serviceAccount:{}".format(service_account)

        # Invert dictionary from role -> [svc account] to svc account -> [role]
        members_lookup = {}
        for el in bindings:
            role = el.get("role")
            for mem in el.get("members", []):
                members_lookup.setdefault(mem, set()).add(role)

        svc_account_roles = members_lookup.get(svc_account_string, set())
        if {"roles/editor", "roles/owner"}.intersection(svc_account_roles):
            logging.warning(
                "The default compute service account has "
                "unsafe project editor or owner permissions."
            )

        runner = self.klio_config.pipeline_options.runner
        if runner == var.KlioRunner.DIRECT_GKE_RUNNER:
            roles_to_check = DIRECT_GKE_ROLES_TO_CHECK
        if self.klio_config.pipeline_options.streaming:
            roles_to_check = STREAMING_ROLES_TO_CHECK
        else:
            roles_to_check = BASE_ROLES_TO_CHECK

        if roles_to_check.issubset(svc_account_roles):
            logging.info(
                "Verified that the service account has the required roles"
            )
            return True

        missing_roles = roles_to_check - svc_account_roles
        logging.warning(
            "The configured compute service account is missing the following "
            f"IAM role(s): {', '.join(missing_roles)}"
        )
        if self.create_resources:
            gcloud_cmd_base = (
                f"\tgcloud projects add-iam-policy-binding {self.project} \\\n"
                f"\t\t--member={svc_account_string} \\\n"
                "\t\t--role="
            )
            gcloud_cmds = []
            for role in missing_roles:
                cmd = gcloud_cmd_base + role
                gcloud_cmds.append(cmd)

            gcloud_cmds = "\n".join(gcloud_cmds)
            logging.warning(
                "Klio is unable to add the required role(s) to the service "
                "account at this time. Add them with the following gcloud "
                "command(s):\n"
                f"{gcloud_cmds}"
            )
        return False