def perform()

in dusty/scanners/performer.py [0:0]


    def perform(self):
        """ Perform action """
        log.info("Starting scanning")
        reporting = self.context.performers.get("reporting", None)
        # Create executors
        executor = dict()
        settings = self.context.config["settings"]
        for scanner_type in self.context.config["scanners"]:
            max_workers = settings.get(scanner_type, dict()).get("max_concurrent_scanners", 1)
            executor[scanner_type] = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
            log.info("Made %s executor with %d workers", scanner_type.upper(), max_workers)
        # Starting scanning
        if reporting:
            reporting.on_start()
        # Submit scanners
        futures = list()
        future_map = dict()
        future_dep_map = dict()
        for item in self.context.scanners:
            scanner = self.context.scanners[item]
            scanner_type = scanner.__class__.__module__.split(".")[-3]
            scanner_module = scanner.__class__.__module__.split(".")[-2]
            depencies = list()
            for dep in scanner.depends_on() + scanner.run_after():
                if dep in future_dep_map:
                    depencies.append(future_dep_map[dep])
            future = executor[scanner_type].submit(self._execute_scanner, scanner, depencies)
            future_dep_map[scanner_module] = future
            future_map[future] = item
            futures.append(future)
        # Wait for executors to start and finish
        started = set()
        finished = set()
        while True:
            # Check for started executors
            for future in futures:
                if future not in started and (future.running() or future.done()):
                    item = future_map[future]
                    scanner = self.context.scanners[item]
                    if not scanner.get_meta("meta_scanner", False):
                        log.info(f"Started {item} ({scanner.get_description()})")
                        if reporting:
                            reporting.on_scanner_start(item)
                    # Add to started set
                    started.add(future)
            # Check for finished executors
            for future in futures:
                if future not in finished and future.done():
                    item = future_map[future]
                    try:
                        future.result()
                    except:
                        log.exception("Scanner %s failed", item)
                        error = Error(
                            tool=item,
                            error=f"Scanner {item} failed",
                            details=f"```\n{traceback.format_exc()}\n```"
                        )
                        self.context.errors.append(error)
                    # Collect scanner findings and errors
                    scanner = self.context.scanners[item]
                    scanner_type = scanner.__class__.__module__.split(".")[-3]
                    for result in scanner.get_findings():
                        result.set_meta("scanner_type", scanner_type)
                        self.context.findings.append(result)
                    for error in scanner.get_errors():
                        error.set_meta("scanner_type", scanner_type)
                        self.context.errors.append(error)
                    if not scanner.get_meta("meta_scanner", False):
                        if reporting:
                            reporting.on_scanner_finish(item)
                    # Add to finished set
                    finished.add(future)
            # Exit if all executors done
            if self._all_futures_done(futures):
                break
            # Sleep for some short time
            time.sleep(constants.EXECUTOR_STATUS_CHECK_INTERVAL)
        # All scanners completed
        if reporting:
            reporting.on_finish()