def execute()

in dusty/scanners/sast/gitleaks/scanner.py [0:0]


    def execute(self):
        """ Run the scanner """
        # Squash commits (if needed)
        if self.config.get("squash_commits", None):
            # Rename old .git
            try:
                os.rename(
                    os.path.join(self.config.get("code"), ".git"),
                    os.path.join(self.config.get("code"), ".git.old")
                )
            except:
                log.debug("Failed to rename old .git: %s", traceback.format_exc())
            # Initialize new repo
            current_dir = os.getcwd()
            try:
                os.chdir(self.config.get("code"))
                # Patch dulwich to work without valid UID/GID
                dulwich.repo.__original__get_default_identity = dulwich.repo._get_default_identity  # pylint: disable=W0212
                dulwich.repo._get_default_identity = git_clone._dulwich_repo_get_default_identity  # pylint: disable=W0212
                # Set USERNAME if needed
                try:
                    getpass.getuser()
                except:  # pylint: disable=W0702
                    os.environ["USERNAME"] = "git"
                # Add current code
                repository = dulwich.porcelain.init(self.config.get("code"))
                repository._put_named_file(os.path.join("info", "exclude"), b"/.git.old/")  # pylint: disable=W0212
                dulwich.porcelain.add(repository)
                log.debug("Git repository status: %s", dulwich.porcelain.status(repository, True))
                dulwich.porcelain.commit(
                    repository,
                    b"Current project code", b"Carrier <dusty@localhost>"
                )
            finally:
                os.chdir(current_dir)
        # Make temporary files
        output_file_fd, output_file = tempfile.mkstemp(".json")
        log.debug("Output file: %s", output_file)
        os.close(output_file_fd)
        additional_options = list()
        if self.config.get("redact_offenders", None):
            additional_options.append("--redact")
        # Use custom rules
        if self.config.get("use_custom_rules", None):
            custom_rules_path = self.config.get("custom_rules_path", None)
            if custom_rules_path:
                config_path = custom_rules_path
            else:
                config_path = pkg_resources.resource_filename(
                    "dusty",
                    f"{'/'.join(__name__.split('.')[1:-1])}/data/gitleaks.toml")
            additional_options.append("--config")
            additional_options.append(config_path)
            log.debug("Custom config path: %s", config_path)
        # Run task
        task = subprocess.run(
            [
                "gitleaks", "--repo-path", self.config.get("code"), "--report", output_file
            ] + additional_options,
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        log.log_subprocess_result(task)
        # Parse findings
        parse_findings(output_file, self)
        # Save intermediates
        self.save_intermediates(output_file, task)
        # Revert commit squashing (if any)
        if self.config.get("squash_commits", None):
            shutil.rmtree(os.path.join(self.config.get("code"), ".git"))
            try:
                os.rename(
                    os.path.join(self.config.get("code"), ".git.old"),
                    os.path.join(self.config.get("code"), ".git")
                )
            except:
                log.debug("Failed to revert .git: %s", traceback.format_exc())