in dusty/reporters/jira/reporter.py [0:0]
def report(self):
""" Report """
# Prepare wrapper
log.info("Creating legacy wrapper instance")
wrapper = JiraWrapper(
self.config.get("url"),
self.config.get("username"),
self.config.get("password"),
self.config.get("project"),
self.config.get("fields")
)
if not wrapper.valid:
# Save default mapping to meta as a fallback
default_mapping = constants.JIRA_SEVERITY_MAPPING
default_mapping.update(self.config.get("custom_mapping", dict()))
self.set_meta("mapping", default_mapping)
# Report error
log.error("Jira configuration is invalid. Skipping Jira reporting")
raise RuntimeError("Jira configuration is invalid")
log.debug("Legacy wrapper is valid")
# Prepare findings
priority_mapping = self.config.get("custom_mapping", prepare_jira_mapping(wrapper))
mapping_meta = dict(priority_mapping)
findings = list()
for item in self.context.findings:
if item.get_meta("information_finding", False) or \
item.get_meta("false_positive_finding", False) or \
item.get_meta("excluded_finding", False):
continue
if isinstance(item, DastFinding):
severity = item.get_meta("severity", SEVERITIES[-1])
priority = constants.JIRA_SEVERITY_MAPPING[severity]
if priority_mapping and priority in priority_mapping:
priority = priority_mapping[priority]
mapping_meta[severity] = priority # Update meta mapping to reflect actual results
findings.append({
"title": item.title,
"priority": priority,
"description": item.description.replace("\\.", "."),
"issue_hash": item.get_meta("issue_hash", "<no_hash>"),
"additional_labels": [
label.replace(" ", "_") for label in [
item.get_meta("tool", "scanner"),
self.context.get_meta("testing_type", "DAST"),
item.get_meta("severity", SEVERITIES[-1])
]
],
"raw": item
})
elif isinstance(item, SastFinding):
severity = item.get_meta("severity", SEVERITIES[-1])
priority = constants.JIRA_SEVERITY_MAPPING[severity]
if priority_mapping and priority in priority_mapping:
priority = priority_mapping[priority]
mapping_meta[severity] = priority # Update meta mapping to reflect actual results
description_chunks = [
item.replace(
"\\.", "."
).replace(
"<pre>", "{code:collapse=true}\n\n"
).replace(
"</pre>", "\n\n{code}"
).replace(
"<br />", "\n"
) for item in item.description
]
if len("\n\n".join(description_chunks)) > constants.JIRA_DESCRIPTION_MAX_SIZE:
description = description_chunks[0]
chunks = description_chunks[1:]
comments = list()
new_line_str = ' \n \n'
for chunk in chunks:
if not comments or (len(comments[-1]) + len(new_line_str) + len(chunk)) >= \
constants.JIRA_COMMENT_MAX_SIZE:
comments.append(cut_jira_comment(chunk))
else: # Last comment can handle one more chunk
comments[-1] += new_line_str + cut_jira_comment(chunk)
else:
description = "\n\n".join(description_chunks)
comments = list()
findings.append({
"title": item.title,
"priority": priority,
"description": description,
"issue_hash": item.get_meta("issue_hash", "<no_hash>"),
"additional_labels": [
label.replace(" ", "_") for label in [
item.get_meta("tool", "scanner"),
self.context.get_meta("testing_type", "SAST"),
item.get_meta("severity", SEVERITIES[-1])
]
],
"comments": comments,
"raw": item
})
else:
log.warning("Unsupported finding type")
continue # raise ValueError("Unsupported item type")
findings.sort(key=lambda item: (
SEVERITIES.index(item["raw"].get_meta("severity", SEVERITIES[-1])),
item["raw"].get_meta("tool", ""),
item["raw"].title
))
# Submit issues
wrapper.connect()
new_tickets = list()
existing_tickets = list()
for finding in findings:
try:
issue, created = wrapper.create_issue(
finding["title"], # title
finding["priority"], # priority
finding["description"], # description
finding["issue_hash"], # issue_hash, self.get_hash_code()
# attachments=None,
# get_or_create=True,
additional_labels=finding["additional_labels"] # additional_labels
)
if created and "comments" in finding:
for comment in finding["comments"]:
wrapper.add_comment_to_issue(issue, comment)
try:
result_priority = issue.fields.priority
except: # pylint: disable=W0702
result_priority = "Default"
ticket_meta = {
"jira_id": issue.key,
"jira_url": f"{self.config.get('url')}/browse/{issue.key}",
"priority": result_priority,
"status": issue.fields.status.name,
"created": issue.fields.created,
"open_date": datetime.strptime(
issue.fields.created, "%Y-%m-%dT%H:%M:%S.%f%z").strftime("%d %b %Y %H:%M"),
"description": issue.fields.summary,
"assignee": issue.fields.assignee
}
if created:
new_tickets.append(ticket_meta)
else:
if issue.fields.status.name in constants.JIRA_OPENED_STATUSES:
existing_tickets.append(ticket_meta)
except: # pylint: disable=W0702
log.exception(f"Failed to create ticket for {finding['title']}")
error = Error(
tool=self.get_name(),
error=f"Failed to create ticket for {finding['title']}",
details=f"```\n{traceback.format_exc()}\n```"
)
self.errors.append(error)
self.set_meta("new_tickets", new_tickets)
self.set_meta("existing_tickets", existing_tickets)
self.set_meta("mapping", mapping_meta)