dusty/processors/performer.py (143 lines of code) (raw):

#!/usr/bin/python3 # coding=utf-8 # pylint: disable=I0011,R0903,W0702,W0703 # Copyright 2019 getcarrier.io # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Processing performer """ import importlib import traceback import pkgutil from ruamel.yaml.comments import CommentedMap # pylint: disable=E0401 from dusty.tools import log from dusty.tools import dependency from dusty.models.module import ModuleModel from dusty.models.performer import PerformerModel from dusty.models.error import Error from . import constants class ProcessingPerformer(ModuleModel, PerformerModel): """ Process findings """ def __init__(self, context): """ Initialize instance """ super().__init__() self.context = context def prepare(self): """ Prepare for action """ log.debug("Preparing") config = self.context.config.get("processing") config_items = [ item for item in list(config) if not isinstance(config[item], bool) or config[item] ] disabled_items = [ item for item in list(config) if isinstance(config[item], bool) and not config[item] ] # Schedule processors try: all_processors = dependency.resolve_name_order( config_items + [ item for item in constants.DEFAULT_PROCESSORS if item not in disabled_items ], "dusty.processors.{}.processor", "Processor" ) except: all_processors = [ item for item in constants.DEFAULT_PROCESSORS if item not in disabled_items ] + config_items for processor_name in all_processors: try: self.schedule_processor(processor_name, dict()) except: log.exception("Failed to prepare processor %s", processor_name) error = Error( tool=processor_name, error=f"Failed to prepare processor {processor_name}", details=f"```\n{traceback.format_exc()}\n```" ) self.context.errors.append(error) # Resolve depencies once again dependency.resolve_depencies(self.context.processors) def perform(self): """ Perform action """ log.info("Starting processing") # Run processors performed = set() perform_processing_iteration = True while perform_processing_iteration: perform_processing_iteration = False for processor_module_name in list(self.context.processors): if processor_module_name in performed: continue performed.add(processor_module_name) perform_processing_iteration = True processor = self.context.processors[processor_module_name] try: processor.execute() except: log.exception("Processor %s failed", processor_module_name) error = Error( tool=processor_module_name, error=f"Processor {processor_module_name} failed", details=f"```\n{traceback.format_exc()}\n```" ) self.context.errors.append(error) self.context.errors.extend(processor.get_errors()) def get_module_meta(self, module, name, default=None): """ Get submodule meta value """ try: module_name = importlib.import_module( f"dusty.processors.{module}.processor" ).Processor.get_name() if module_name in self.context.processors: return self.context.processors[module_name].get_meta(name, default) return default except: return default def set_module_meta(self, module, name, value): """ Set submodule meta value """ try: module_name = importlib.import_module( f"dusty.processors.{module}.processor" ).Processor.get_name() if module_name in self.context.processors: self.context.processors[module_name].set_meta(name, value) except: pass def schedule_processor(self, processor_name, processor_config): """ Schedule processor run in current context after all already configured processors """ try: # Init processor instance processor = importlib.import_module( f"dusty.processors.{processor_name}.processor" ).Processor if processor.get_name() in self.context.processors: log.debug("Processor %s already scheduled", processor_name) return # Prepare config config = self.context.config["processing"] if processor_name not in config or not isinstance(config[processor_name], dict): config[processor_name] = dict() if "processing" in self.context.config["settings"]: general_config = self.context.config["settings"]["processing"] merged_config = general_config.copy() merged_config.update(config[processor_name]) config[processor_name] = merged_config config[processor_name].update(processor_config) # Validate config processor.validate_config(config[processor_name]) # Add to context self.context.processors[processor.get_name()] = processor(self.context) # Resolve depencies dependency.resolve_depencies(self.context.processors) # Done log.debug("Scheduled processor %s", processor_name) except: log.exception("Failed to schedule processor %s", processor_name) error = Error( tool=processor_name, error=f"Failed to schedule processor {processor_name}", details=f"```\n{traceback.format_exc()}\n```" ) self.context.errors.append(error) @staticmethod def fill_config(data_obj): """ Make sample config """ # general_obj = data_obj["settings"]["processing"] # This can also be used data_obj.insert(len(data_obj), "processing", CommentedMap(), comment="Processing config") processing_obj = data_obj["processing"] processing_module = importlib.import_module("dusty.processors") for _, name, pkg in pkgutil.iter_modules(processing_module.__path__): if not pkg: continue processor = importlib.import_module( "dusty.processors.{}.processor".format(name) ) processing_obj.insert( len(processing_obj), name, CommentedMap(), comment=processor.Processor.get_description() ) processor.Processor.fill_config(processing_obj[name]) @staticmethod def validate_config(config): """ Validate config """ if "processing" not in config: log.warning("No processing defined in config") config["processing"] = dict() @staticmethod def get_name(): """ Module name """ return "processing" @staticmethod def get_description(): """ Module description or help message """ return "performs result processing"