dusty/processors/quality_gate/processor.py (97 lines of code) (raw):

#!/usr/bin/python3 # coding=utf-8 # Copyright 2020 getcarrier.io # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Processor: quality_gate """ from ruamel.yaml.comments import CommentedMap # pylint: disable=E0401 from dusty.tools import log from dusty.models.module import DependentModuleModel from dusty.models.processor import ProcessorModel from dusty.constants import SEVERITIES class Processor(DependentModuleModel, ProcessorModel): """ Process quality gate """ def __init__(self, context): """ Initialize processor instance """ super().__init__() self.context = context self.config = \ self.context.config["processing"][__name__.split(".")[-2]] def execute(self): """ Run the processor """ log.info("Checking quality gate status") thresholds = self.config.get("thresholds", dict()) # Count issues by severity results_by_severity = dict() for item in self.context.findings: if item.get_meta("information_finding", False) or \ item.get_meta("false_positive_finding", False) or \ item.get_meta("excluded_finding", False): continue severity = item.get_meta("severity", SEVERITIES[-1]) if severity not in results_by_severity: results_by_severity[severity] = 0 results_by_severity[severity] += 1 # Prepare stats data stats_data = dict() for severity in SEVERITIES: stats_data["total"] = "OK" stats_data[severity] = { "findings": results_by_severity.get(severity, "-"), "threshold": thresholds.get(severity, "-"), "status": "OK" } # Check quality gate for severity in results_by_severity: if severity not in thresholds: continue # severity_results = results_by_severity[severity] policy_threshold = thresholds[severity] # if severity_results > policy_threshold: log.warning( "Quality gate failed: %s -> %d > %d", severity, severity_results, policy_threshold ) self.context.set_meta("fail_quality_gate", True) stats_data[severity]["status"] = "FAIL" stats_data["total"] = "FAIL" # Prepare stats stats = list() stats.append("============= Quality gate stats ============") stats.append("Severity : {:<9} {:<5} {:<7} {:<4} {:<4}".format( *SEVERITIES )) stats.append("Findings : {:<9} {:<5} {:<7} {:<4} {:<4}".format( *[stats_data[severity]["findings"] for severity in SEVERITIES] )) stats.append("Threshold : {:<9} {:<5} {:<7} {:<4} {:<4}".format( *[stats_data[severity]["threshold"] for severity in SEVERITIES] )) stats.append("Status : {:<9} {:<5} {:<7} {:<4} {:<4}".format( *[stats_data[severity]["status"] for severity in SEVERITIES] )) stats.append("============= Quality gate: {:<4} ============".format(stats_data["total"])) self.context.set_meta("quality_gate_stats", stats) @staticmethod def fill_config(data_obj): """ Make sample config """ data_obj.insert( len(data_obj), "thresholds", CommentedMap(), comment="Quality gate thresholds by severity" ) mapping_obj = data_obj["thresholds"] mapping_obj.insert( len(mapping_obj), "Critical", 3 ) mapping_obj.insert( len(mapping_obj), "High", 5 ) mapping_obj.insert( len(mapping_obj), "Medium", 7 ) mapping_obj.insert( len(mapping_obj), "Low", 9 ) mapping_obj.insert( len(mapping_obj), "Info", 11 ) @staticmethod def run_after(): """ Return optional depencies """ return ["exclude_by_endpoint", "false_positive", "min_severity_filter"] @staticmethod def get_name(): """ Module name """ return "Quality gate" @staticmethod def get_description(): """ Module description """ return "Set and check quality gate for CI/CD process"