dusty/scanners/performer.py (211 lines of code) (raw):

#!/usr/bin/python3 # coding=utf-8 # pylint: disable=I0011,R0903,W0702,W0703,R0914,R0912,R0915 # Copyright 2019 getcarrier.io # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Scanning performer """ import importlib import traceback import pkgutil import time import concurrent.futures from ruamel.yaml.comments import CommentedMap from dusty.tools import log from dusty.tools import dependency from dusty.models.module import ModuleModel from dusty.models.performer import PerformerModel from dusty.models.error import Error from . import constants class ScanningPerformer(ModuleModel, PerformerModel): """ Runs scanners """ def __init__(self, context): """ Initialize instance """ super().__init__() self.context = context def prepare(self): """ Prepare for action """ log.debug("Preparing") config = self.context.config["scanners"] # Schedule scanners for scanner_type in list(config): for scanner_name in list(config[scanner_type]): if isinstance(config[scanner_type][scanner_name], bool) and \ not config[scanner_type][scanner_name]: continue try: self.schedule_scanner(scanner_type, scanner_name, dict()) except: log.exception( "Failed to prepare %s scanner %s", scanner_type, scanner_name ) error = Error( tool=f"{scanner_type}.{scanner_name}", error=f"Failed to prepare {scanner_type} scanner {scanner_name}", details=f"```\n{traceback.format_exc()}\n```" ) self.context.errors.append(error) # Resolve depencies once again dependency.resolve_depencies(self.context.scanners) def perform(self): """ Perform action """ log.info("Starting scanning") reporting = self.context.performers.get("reporting", None) # Create executors executor = dict() settings = self.context.config["settings"] for scanner_type in self.context.config["scanners"]: max_workers = settings.get(scanner_type, dict()).get("max_concurrent_scanners", 1) executor[scanner_type] = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) log.info("Made %s executor with %d workers", scanner_type.upper(), max_workers) # Starting scanning if reporting: reporting.on_start() # Submit scanners futures = list() future_map = dict() future_dep_map = dict() for item in self.context.scanners: scanner = self.context.scanners[item] scanner_type = scanner.__class__.__module__.split(".")[-3] scanner_module = scanner.__class__.__module__.split(".")[-2] depencies = list() for dep in scanner.depends_on() + scanner.run_after(): if dep in future_dep_map: depencies.append(future_dep_map[dep]) future = executor[scanner_type].submit(self._execute_scanner, scanner, depencies) future_dep_map[scanner_module] = future future_map[future] = item futures.append(future) # Wait for executors to start and finish started = set() finished = set() while True: # Check for started executors for future in futures: if future not in started and (future.running() or future.done()): item = future_map[future] scanner = self.context.scanners[item] if not scanner.get_meta("meta_scanner", False): log.info(f"Started {item} ({scanner.get_description()})") if reporting: reporting.on_scanner_start(item) # Add to started set started.add(future) # Check for finished executors for future in futures: if future not in finished and future.done(): item = future_map[future] try: future.result() except: log.exception("Scanner %s failed", item) error = Error( tool=item, error=f"Scanner {item} failed", details=f"```\n{traceback.format_exc()}\n```" ) self.context.errors.append(error) # Collect scanner findings and errors scanner = self.context.scanners[item] scanner_type = scanner.__class__.__module__.split(".")[-3] for result in scanner.get_findings(): result.set_meta("scanner_type", scanner_type) self.context.findings.append(result) for error in scanner.get_errors(): error.set_meta("scanner_type", scanner_type) self.context.errors.append(error) if not scanner.get_meta("meta_scanner", False): if reporting: reporting.on_scanner_finish(item) # Add to finished set finished.add(future) # Exit if all executors done if self._all_futures_done(futures): break # Sleep for some short time time.sleep(constants.EXECUTOR_STATUS_CHECK_INTERVAL) # All scanners completed if reporting: reporting.on_finish() @staticmethod def _execute_scanner(scanner, depencies): if depencies: concurrent.futures.wait(depencies) scanner.execute() @staticmethod def _all_futures_done(futures): for item in futures: if not item.done(): return False return True def get_module_meta(self, module, name, default=None): """ Get submodule meta value """ try: module_name = importlib.import_module( f"dusty.scanners.{module}.scanner" ).Scanner.get_name() if module_name in self.context.scanners: return self.context.scanners[module_name].get_meta(name, default) return default except: return default def set_module_meta(self, module, name, value): """ Set submodule meta value """ try: module_name = importlib.import_module( f"dusty.scanners.{module}.scanner" ).Scanner.get_name() if module_name in self.context.scanners: self.context.scanners[module_name].set_meta(name, value) except: pass def schedule_scanner(self, scanner_type, scanner_name, scanner_config): """ Schedule scanner run in current context after all already configured scanners """ try: # Init scanner instance scanner = importlib.import_module( f"dusty.scanners.{scanner_type}.{scanner_name}.scanner" ).Scanner if scanner.get_name() in self.context.scanners: log.debug("Scanner %s.%s already scheduled", scanner_type, scanner_name) return # Prepare config config = self.context.config["scanners"] if scanner_type not in config: config[scanner_type] = dict() if scanner_name not in config[scanner_type] or \ not isinstance(config[scanner_type][scanner_name], dict): config[scanner_type][scanner_name] = dict() general_config = dict() if "settings" in self.context.config: general_config = self.context.config["settings"] if scanner_type in general_config: merged_config = general_config[scanner_type].copy() merged_config.update(config[scanner_type][scanner_name]) config[scanner_type][scanner_name] = merged_config config[scanner_type][scanner_name].update(scanner_config) # Validate config scanner.validate_config(config[scanner_type][scanner_name]) # Add to context scanner = scanner(self.context) self.context.scanners[scanner.get_name()] = scanner # Resolve depencies dependency.resolve_depencies(self.context.scanners) # Prepare scanner scanner.prepare() # Done log.debug("Scheduled scanner %s.%s", scanner_type, scanner_name) except: log.exception( "Failed to schedule %s scanner %s", scanner_type, scanner_name ) error = Error( tool=f"{scanner_type}.{scanner_name}", error=f"Failed to schedule {scanner_type} scanner {scanner_name}", details=f"```\n{traceback.format_exc()}\n```" ) self.context.errors.append(error) @staticmethod def fill_config(data_obj): """ Make sample config """ data_obj.insert(len(data_obj), "scanners", CommentedMap(), comment="Scanners config") scanner_obj = data_obj["scanners"] scanners_module = importlib.import_module("dusty.scanners") for _, name, pkg in pkgutil.iter_modules(scanners_module.__path__): if not pkg: continue # general_scanner_obj = data_obj["settings"][name] # This can also be used scanner_type = importlib.import_module("dusty.scanners.{}".format(name)) scanner_obj.insert(len(scanner_obj), name, CommentedMap()) inner_obj = scanner_obj[name] for _, inner_name, inner_pkg in pkgutil.iter_modules(scanner_type.__path__): if not inner_pkg: continue try: scanner = importlib.import_module( "dusty.scanners.{}.{}.scanner".format(name, inner_name) ) inner_obj.insert( len(inner_obj), inner_name, CommentedMap(), comment=scanner.Scanner.get_description() ) scanner.Scanner.fill_config(inner_obj[inner_name]) except: pass # Skip scanner, it may be DAST scanner (in SAST image) or vice versa @staticmethod def validate_config(config): """ Validate config """ if "scanners" not in config: log.warning("No scanners defined in config") config["scanners"] = dict() @staticmethod def get_name(): """ Module name """ return "scanning" @staticmethod def get_description(): """ Module description or help message """ return "performs scanning"