in dusty/scanners/performer.py [0:0]
def perform(self):
""" Perform action """
log.info("Starting scanning")
reporting = self.context.performers.get("reporting", None)
# Create executors
executor = dict()
settings = self.context.config["settings"]
for scanner_type in self.context.config["scanners"]:
max_workers = settings.get(scanner_type, dict()).get("max_concurrent_scanners", 1)
executor[scanner_type] = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
log.info("Made %s executor with %d workers", scanner_type.upper(), max_workers)
# Starting scanning
if reporting:
reporting.on_start()
# Submit scanners
futures = list()
future_map = dict()
future_dep_map = dict()
for item in self.context.scanners:
scanner = self.context.scanners[item]
scanner_type = scanner.__class__.__module__.split(".")[-3]
scanner_module = scanner.__class__.__module__.split(".")[-2]
depencies = list()
for dep in scanner.depends_on() + scanner.run_after():
if dep in future_dep_map:
depencies.append(future_dep_map[dep])
future = executor[scanner_type].submit(self._execute_scanner, scanner, depencies)
future_dep_map[scanner_module] = future
future_map[future] = item
futures.append(future)
# Wait for executors to start and finish
started = set()
finished = set()
while True:
# Check for started executors
for future in futures:
if future not in started and (future.running() or future.done()):
item = future_map[future]
scanner = self.context.scanners[item]
if not scanner.get_meta("meta_scanner", False):
log.info(f"Started {item} ({scanner.get_description()})")
if reporting:
reporting.on_scanner_start(item)
# Add to started set
started.add(future)
# Check for finished executors
for future in futures:
if future not in finished and future.done():
item = future_map[future]
try:
future.result()
except:
log.exception("Scanner %s failed", item)
error = Error(
tool=item,
error=f"Scanner {item} failed",
details=f"```\n{traceback.format_exc()}\n```"
)
self.context.errors.append(error)
# Collect scanner findings and errors
scanner = self.context.scanners[item]
scanner_type = scanner.__class__.__module__.split(".")[-3]
for result in scanner.get_findings():
result.set_meta("scanner_type", scanner_type)
self.context.findings.append(result)
for error in scanner.get_errors():
error.set_meta("scanner_type", scanner_type)
self.context.errors.append(error)
if not scanner.get_meta("meta_scanner", False):
if reporting:
reporting.on_scanner_finish(item)
# Add to finished set
finished.add(future)
# Exit if all executors done
if self._all_futures_done(futures):
break
# Sleep for some short time
time.sleep(constants.EXECUTOR_STATUS_CHECK_INTERVAL)
# All scanners completed
if reporting:
reporting.on_finish()