dusty/reporters/performer.py (211 lines of code) (raw):
#!/usr/bin/python3
# coding=utf-8
# pylint: disable=I0011,R0903,W0702,W0703
# Copyright 2019 getcarrier.io
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Reporting performer
"""
import importlib
import traceback
import pkgutil
from ruamel.yaml.comments import CommentedMap # pylint: disable=E0401
from dusty.tools import log
from dusty.tools import dependency
from dusty.models.module import ModuleModel
from dusty.models.performer import PerformerModel
from dusty.models.reporter import ReporterModel
from dusty.models.error import Error
from . import constants
class ReportingPerformer(ModuleModel, PerformerModel, ReporterModel):
""" Perform reporting """
def __init__(self, context):
""" Initialize instance """
super().__init__()
self.context = context
def prepare(self):
""" Prepare for action """
log.debug("Preparing")
config = self.context.config["reporters"]
config_items = [
item for item in list(config) if not isinstance(config[item], bool) or config[item]
]
disabled_items = [
item for item in list(config) if isinstance(config[item], bool) and not config[item]
]
# Schedule reporters
try:
all_reporters = dependency.resolve_name_order(
config_items + [
item for item in constants.DEFAULT_REPORTERS if item not in disabled_items
], "dusty.reporters.{}.reporter", "Reporter"
)
except:
all_reporters = [
item for item in constants.DEFAULT_REPORTERS if item not in disabled_items
] + config_items
for reporter_name in all_reporters:
try:
self.schedule_reporter(reporter_name, dict())
except:
log.exception("Failed to prepare reporter %s", reporter_name)
error = Error(
tool=reporter_name,
error=f"Failed to prepare reporter {reporter_name}",
details=f"```\n{traceback.format_exc()}\n```"
)
self.context.errors.append(error)
# Resolve depencies once again
dependency.resolve_depencies(self.context.reporters)
def perform(self):
""" Perform action """
self.report()
def get_module_meta(self, module, name, default=None):
""" Get submodule meta value """
try:
module_name = importlib.import_module(
f"dusty.reporters.{module}.reporter"
).Reporter.get_name()
if module_name in self.context.reporters:
return self.context.reporters[module_name].get_meta(name, default)
return default
except:
return default
def set_module_meta(self, module, name, value):
""" Set submodule meta value """
try:
module_name = importlib.import_module(
f"dusty.reporters.{module}.reporter"
).Reporter.get_name()
if module_name in self.context.reporters:
self.context.reporters[module_name].set_meta(name, value)
except:
pass
def schedule_reporter(self, reporter_name, reporter_config):
""" Schedule reporter run in current context after all already configured reporters """
try:
# Init reporter instance
reporter = importlib.import_module(
f"dusty.reporters.{reporter_name}.reporter"
).Reporter
if reporter.get_name() in self.context.reporters:
log.debug("Reporter %s already scheduled", reporter_name)
return
# Prepare config
config = self.context.config["reporters"]
if reporter_name not in config or not isinstance(config[reporter_name], dict):
config[reporter_name] = dict()
if "reporters" in self.context.config["settings"]:
general_config = self.context.config["settings"]["reporters"]
merged_config = general_config.copy()
merged_config.update(config[reporter_name])
config[reporter_name] = merged_config
config[reporter_name].update(reporter_config)
# Validate config
reporter.validate_config(config[reporter_name])
# Add to context
self.context.reporters[reporter.get_name()] = reporter(self.context)
# Resolve depencies
dependency.resolve_depencies(self.context.reporters)
# Done
log.debug("Scheduled reporter %s", reporter_name)
except:
log.exception("Failed to schedule reporter %s", reporter_name)
error = Error(
tool=reporter_name,
error=f"Failed to schedule reporter {reporter_name}",
details=f"```\n{traceback.format_exc()}\n```"
)
self.context.errors.append(error)
def report(self):
""" Report """
log.info("Starting reporting")
# Run reporters
performed = set()
perform_report_iteration = True
while perform_report_iteration:
perform_report_iteration = False
for reporter_module_name in list(self.context.reporters):
if reporter_module_name in performed:
continue
performed.add(reporter_module_name)
perform_report_iteration = True
reporter = self.context.reporters[reporter_module_name]
try:
reporter.report()
except:
log.exception("Reporter %s failed", reporter_module_name)
error = Error(
tool=reporter_module_name,
error=f"Reporter {reporter_module_name} failed",
details=f"```\n{traceback.format_exc()}\n```"
)
self.context.errors.append(error)
self.context.errors.extend(reporter.get_errors())
def on_start(self):
""" Called when testing starts """
# Run reporters
for reporter_module_name in self.context.reporters:
reporter = self.context.reporters[reporter_module_name]
try:
reporter.on_start()
except:
log.exception("Reporter %s failed", reporter_module_name)
error = Error(
tool=reporter_module_name,
error=f"Reporter {reporter_module_name} failed",
details=f"```\n{traceback.format_exc()}\n```"
)
self.context.errors.append(error)
def on_finish(self):
""" Called when testing ends """
# Run reporters
for reporter_module_name in self.context.reporters:
reporter = self.context.reporters[reporter_module_name]
try:
reporter.on_finish()
except:
log.exception("Reporter %s failed", reporter_module_name)
error = Error(
tool=reporter_module_name,
error=f"Reporter {reporter_module_name} failed",
details=f"```\n{traceback.format_exc()}\n```"
)
self.context.errors.append(error)
def on_scanner_start(self, scanner):
""" Called when scanner starts """
# Run reporters
for reporter_module_name in self.context.reporters:
reporter = self.context.reporters[reporter_module_name]
try:
reporter.on_scanner_start(scanner)
except:
log.exception("Reporter %s failed", reporter_module_name)
error = Error(
tool=reporter_module_name,
error=f"Reporter {reporter_module_name} failed",
details=f"```\n{traceback.format_exc()}\n```"
)
self.context.errors.append(error)
def on_scanner_finish(self, scanner):
""" Called when scanner ends """
# Run reporters
for reporter_module_name in self.context.reporters:
reporter = self.context.reporters[reporter_module_name]
try:
reporter.on_scanner_finish(scanner)
except:
log.exception("Reporter %s failed", reporter_module_name)
error = Error(
tool=reporter_module_name,
error=f"Reporter {reporter_module_name} failed",
details=f"```\n{traceback.format_exc()}\n```"
)
self.context.errors.append(error)
def flush(self):
""" Flush """
# Run reporters
for reporter_module_name in self.context.reporters:
reporter = self.context.reporters[reporter_module_name]
try:
reporter.flush()
except:
log.exception("Reporter %s failed", reporter_module_name)
error = Error(
tool=reporter_module_name,
error=f"Reporter {reporter_module_name} failed",
details=f"```\n{traceback.format_exc()}\n```"
)
self.context.errors.append(error)
@staticmethod
def fill_config(data_obj):
""" Make sample config """
# general_obj = data_obj["settings"]["reporters"] # This can also be used
data_obj.insert(len(data_obj), "reporters", CommentedMap(), comment="Reporters config")
reporters_obj = data_obj["reporters"]
reporters_module = importlib.import_module("dusty.reporters")
for _, name, pkg in pkgutil.iter_modules(reporters_module.__path__):
if not pkg:
continue
reporter = importlib.import_module(
"dusty.reporters.{}.reporter".format(name)
)
reporters_obj.insert(
len(reporters_obj), name, CommentedMap(),
comment=reporter.Reporter.get_description()
)
reporter.Reporter.fill_config(reporters_obj[name])
@staticmethod
def validate_config(config):
""" Validate config """
if "reporters" not in config:
log.warning("No reporters defined in config")
config["reporters"] = dict()
@staticmethod
def get_name():
""" Module name """
return "reporting"
@staticmethod
def get_description():
""" Module description or help message """
return "performs result reporting"