in dusty/scanners/sast/gitleaks/scanner.py [0:0]
def execute(self):
""" Run the scanner """
# Squash commits (if needed)
if self.config.get("squash_commits", None):
# Rename old .git
try:
os.rename(
os.path.join(self.config.get("code"), ".git"),
os.path.join(self.config.get("code"), ".git.old")
)
except:
log.debug("Failed to rename old .git: %s", traceback.format_exc())
# Initialize new repo
current_dir = os.getcwd()
try:
os.chdir(self.config.get("code"))
# Patch dulwich to work without valid UID/GID
dulwich.repo.__original__get_default_identity = dulwich.repo._get_default_identity # pylint: disable=W0212
dulwich.repo._get_default_identity = git_clone._dulwich_repo_get_default_identity # pylint: disable=W0212
# Set USERNAME if needed
try:
getpass.getuser()
except: # pylint: disable=W0702
os.environ["USERNAME"] = "git"
# Add current code
repository = dulwich.porcelain.init(self.config.get("code"))
repository._put_named_file(os.path.join("info", "exclude"), b"/.git.old/") # pylint: disable=W0212
dulwich.porcelain.add(repository)
log.debug("Git repository status: %s", dulwich.porcelain.status(repository, True))
dulwich.porcelain.commit(
repository,
b"Current project code", b"Carrier <dusty@localhost>"
)
finally:
os.chdir(current_dir)
# Make temporary files
output_file_fd, output_file = tempfile.mkstemp(".json")
log.debug("Output file: %s", output_file)
os.close(output_file_fd)
additional_options = list()
if self.config.get("redact_offenders", None):
additional_options.append("--redact")
# Use custom rules
if self.config.get("use_custom_rules", None):
custom_rules_path = self.config.get("custom_rules_path", None)
if custom_rules_path:
config_path = custom_rules_path
else:
config_path = pkg_resources.resource_filename(
"dusty",
f"{'/'.join(__name__.split('.')[1:-1])}/data/gitleaks.toml")
additional_options.append("--config")
additional_options.append(config_path)
log.debug("Custom config path: %s", config_path)
# Run task
task = subprocess.run(
[
"gitleaks", "--repo-path", self.config.get("code"), "--report", output_file
] + additional_options,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
log.log_subprocess_result(task)
# Parse findings
parse_findings(output_file, self)
# Save intermediates
self.save_intermediates(output_file, task)
# Revert commit squashing (if any)
if self.config.get("squash_commits", None):
shutil.rmtree(os.path.join(self.config.get("code"), ".git"))
try:
os.rename(
os.path.join(self.config.get("code"), ".git.old"),
os.path.join(self.config.get("code"), ".git")
)
except:
log.debug("Failed to revert .git: %s", traceback.format_exc())