dusty/reporters/jira/reporter.py (240 lines of code) (raw):

#!/usr/bin/python3 # coding=utf-8 # pylint: disable=I0011,E0401,R0914,R0912,R0915 # Copyright 2019 getcarrier.io # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Reporter: jira """ import traceback from datetime import datetime from ruamel.yaml.comments import CommentedSeq from ruamel.yaml.comments import CommentedMap from dusty.tools import log from dusty.models.module import DependentModuleModel from dusty.models.reporter import ReporterModel from dusty.models.finding import DastFinding, SastFinding from dusty.models.error import Error from dusty.constants import SEVERITIES from . import constants from .legacy import JiraWrapper, prepare_jira_mapping, cut_jira_comment class Reporter(DependentModuleModel, ReporterModel): """ Report findings from scanners """ def __init__(self, context): """ Initialize reporter instance """ super().__init__() self.context = context self.config = \ self.context.config["reporters"][__name__.split(".")[-2]] def report(self): """ Report """ # Prepare wrapper log.info("Creating legacy wrapper instance") wrapper = JiraWrapper( self.config.get("url"), self.config.get("username"), self.config.get("password"), self.config.get("project"), self.config.get("fields") ) if not wrapper.valid: # Save default mapping to meta as a fallback default_mapping = constants.JIRA_SEVERITY_MAPPING default_mapping.update(self.config.get("custom_mapping", dict())) self.set_meta("mapping", default_mapping) # Report error log.error("Jira configuration is invalid. Skipping Jira reporting") raise RuntimeError("Jira configuration is invalid") log.debug("Legacy wrapper is valid") # Prepare findings priority_mapping = self.config.get("custom_mapping", prepare_jira_mapping(wrapper)) mapping_meta = dict(priority_mapping) findings = list() for item in self.context.findings: if item.get_meta("information_finding", False) or \ item.get_meta("false_positive_finding", False) or \ item.get_meta("excluded_finding", False): continue if isinstance(item, DastFinding): severity = item.get_meta("severity", SEVERITIES[-1]) priority = constants.JIRA_SEVERITY_MAPPING[severity] if priority_mapping and priority in priority_mapping: priority = priority_mapping[priority] mapping_meta[severity] = priority # Update meta mapping to reflect actual results findings.append({ "title": item.title, "priority": priority, "description": item.description.replace("\\.", "."), "issue_hash": item.get_meta("issue_hash", "<no_hash>"), "additional_labels": [ label.replace(" ", "_") for label in [ item.get_meta("tool", "scanner"), self.context.get_meta("testing_type", "DAST"), item.get_meta("severity", SEVERITIES[-1]) ] ], "raw": item }) elif isinstance(item, SastFinding): severity = item.get_meta("severity", SEVERITIES[-1]) priority = constants.JIRA_SEVERITY_MAPPING[severity] if priority_mapping and priority in priority_mapping: priority = priority_mapping[priority] mapping_meta[severity] = priority # Update meta mapping to reflect actual results description_chunks = [ item.replace( "\\.", "." ).replace( "<pre>", "{code:collapse=true}\n\n" ).replace( "</pre>", "\n\n{code}" ).replace( "<br />", "\n" ) for item in item.description ] if len("\n\n".join(description_chunks)) > constants.JIRA_DESCRIPTION_MAX_SIZE: description = description_chunks[0] chunks = description_chunks[1:] comments = list() new_line_str = ' \n \n' for chunk in chunks: if not comments or (len(comments[-1]) + len(new_line_str) + len(chunk)) >= \ constants.JIRA_COMMENT_MAX_SIZE: comments.append(cut_jira_comment(chunk)) else: # Last comment can handle one more chunk comments[-1] += new_line_str + cut_jira_comment(chunk) else: description = "\n\n".join(description_chunks) comments = list() findings.append({ "title": item.title, "priority": priority, "description": description, "issue_hash": item.get_meta("issue_hash", "<no_hash>"), "additional_labels": [ label.replace(" ", "_") for label in [ item.get_meta("tool", "scanner"), self.context.get_meta("testing_type", "SAST"), item.get_meta("severity", SEVERITIES[-1]) ] ], "comments": comments, "raw": item }) else: log.warning("Unsupported finding type") continue # raise ValueError("Unsupported item type") findings.sort(key=lambda item: ( SEVERITIES.index(item["raw"].get_meta("severity", SEVERITIES[-1])), item["raw"].get_meta("tool", ""), item["raw"].title )) # Submit issues wrapper.connect() new_tickets = list() existing_tickets = list() for finding in findings: try: issue, created = wrapper.create_issue( finding["title"], # title finding["priority"], # priority finding["description"], # description finding["issue_hash"], # issue_hash, self.get_hash_code() # attachments=None, # get_or_create=True, additional_labels=finding["additional_labels"] # additional_labels ) if created and "comments" in finding: for comment in finding["comments"]: wrapper.add_comment_to_issue(issue, comment) try: result_priority = issue.fields.priority except: # pylint: disable=W0702 result_priority = "Default" ticket_meta = { "jira_id": issue.key, "jira_url": f"{self.config.get('url')}/browse/{issue.key}", "priority": result_priority, "status": issue.fields.status.name, "created": issue.fields.created, "open_date": datetime.strptime( issue.fields.created, "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%d %b %Y %H:%M"), "description": issue.fields.summary, "assignee": issue.fields.assignee } if created: new_tickets.append(ticket_meta) else: if issue.fields.status.name in constants.JIRA_OPENED_STATUSES: existing_tickets.append(ticket_meta) except: # pylint: disable=W0702 log.exception(f"Failed to create ticket for {finding['title']}") error = Error( tool=self.get_name(), error=f"Failed to create ticket for {finding['title']}", details=f"```\n{traceback.format_exc()}\n```" ) self.errors.append(error) self.set_meta("new_tickets", new_tickets) self.set_meta("existing_tickets", existing_tickets) self.set_meta("mapping", mapping_meta) @staticmethod def fill_config(data_obj): """ Make sample config """ data_obj.insert(len(data_obj), "url", "https://jira.example.com", comment="Jira URL") data_obj.insert( len(data_obj), "username", "some_username", comment="Jira login" ) data_obj.insert( len(data_obj), "password", "SomeSecurePassword", comment="Jira password" ) data_obj.insert( len(data_obj), "project", "SOME-PROJECT", comment="Jira project" ) data_obj.insert( len(data_obj), "fields", CommentedMap(), comment="Fields for created tickets" ) fields_obj = data_obj["fields"] fields_obj.insert( len(fields_obj), "Issue Type", "Bug", comment="(field) Ticket type" ) fields_obj.insert( len(fields_obj), "Assignee", "Ticket_Assignee", comment="(field) Assignee" ) fields_obj.insert( len(fields_obj), "Epic Link", "SOMEPROJECT-1234", comment="(field) Epic" ) fields_obj.insert( len(fields_obj), "Security Level", "SOME_LEVEL", comment="(field) Security level" ) fields_obj.insert( len(fields_obj), "Components/s", CommentedSeq(), comment="(field) Component/s" ) components_obj = fields_obj["Components/s"] component_obj = CommentedMap() component_obj.insert(len(component_obj), "name", "Component Name") components_obj.append(component_obj) data_obj.insert( len(data_obj), "custom_mapping", CommentedMap(), comment="Custom priority mapping" ) mapping_obj = data_obj["custom_mapping"] mapping_obj.insert( len(mapping_obj), "Critical", "Very High" ) mapping_obj.insert( len(mapping_obj), "Major", "High" ) mapping_obj.insert( len(mapping_obj), "Medium", "Medium" ) mapping_obj.insert( len(mapping_obj), "Minor", "Low" ) mapping_obj.insert( len(mapping_obj), "Trivial", "Low" ) @staticmethod def validate_config(config): """ Validate config """ required = ["url", "username", "password", "project"] not_set = [item for item in required if item not in config] if not_set: error = f"Required configuration options not set: {', '.join(not_set)}" log.error(error) raise ValueError(error) @staticmethod def get_name(): """ Reporter name """ return "Jira" @staticmethod def get_description(): """ Reporter description """ return "Jira reporter"