in dusty/scanners/performer.py [0:0]
def schedule_scanner(self, scanner_type, scanner_name, scanner_config):
""" Schedule scanner run in current context after all already configured scanners """
try:
# Init scanner instance
scanner = importlib.import_module(
f"dusty.scanners.{scanner_type}.{scanner_name}.scanner"
).Scanner
if scanner.get_name() in self.context.scanners:
log.debug("Scanner %s.%s already scheduled", scanner_type, scanner_name)
return
# Prepare config
config = self.context.config["scanners"]
if scanner_type not in config:
config[scanner_type] = dict()
if scanner_name not in config[scanner_type] or \
not isinstance(config[scanner_type][scanner_name], dict):
config[scanner_type][scanner_name] = dict()
general_config = dict()
if "settings" in self.context.config:
general_config = self.context.config["settings"]
if scanner_type in general_config:
merged_config = general_config[scanner_type].copy()
merged_config.update(config[scanner_type][scanner_name])
config[scanner_type][scanner_name] = merged_config
config[scanner_type][scanner_name].update(scanner_config)
# Validate config
scanner.validate_config(config[scanner_type][scanner_name])
# Add to context
scanner = scanner(self.context)
self.context.scanners[scanner.get_name()] = scanner
# Resolve depencies
dependency.resolve_depencies(self.context.scanners)
# Prepare scanner
scanner.prepare()
# Done
log.debug("Scheduled scanner %s.%s", scanner_type, scanner_name)
except:
log.exception(
"Failed to schedule %s scanner %s",
scanner_type, scanner_name
)
error = Error(
tool=f"{scanner_type}.{scanner_name}",
error=f"Failed to schedule {scanner_type} scanner {scanner_name}",
details=f"```\n{traceback.format_exc()}\n```"
)
self.context.errors.append(error)