dusty/scanners/dast/zap/parser.py (88 lines of code) (raw):
#!/usr/bin/python3
# coding=utf-8
# pylint: disable=I0011,E0401,W0702,W0703,R0902,R0912
# Copyright 2019 getcarrier.io
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OWASP ZAP JSON parser
"""
import json
import html
from dusty.tools import log, markdown, url
from dusty.models.finding import DastFinding
from . import constants
def parse_findings(data, scanner):
""" Parse findings """
log.debug("Parsing findings")
zap_json = json.loads(data)
for site in zap_json["site"]:
for alert in site["alerts"]:
description = list()
if "desc" in alert:
description.append(markdown.html_to_text(alert["desc"]))
if "solution" in alert:
description.append(
f'\n**Solution:**\n {markdown.html_to_text(alert["solution"])}')
if "reference" in alert:
description.append(
f'\n**Reference:**\n {markdown.html_to_text(alert["reference"])}')
if "otherinfo" in alert:
description.append(
f'\n**Other information:**\n {markdown.html_to_text(alert["otherinfo"])}')
if alert["instances"]:
description.append("\n**Instances:**\n")
description.append("| URI | Method | Parameter | Attack | Evidence |")
description.append("| --- | ------ | --------- | ------ | -------- |")
# Prepare results
finding_data = list()
if scanner.config.get("split_by_endpoint", False):
# Collect endpoints
endpoints = list()
for item in alert["instances"]:
if not item.get("uri", None):
continue
endpoint = url.parse_url(item.get("uri"))
if endpoint in endpoints:
continue
endpoints.append(endpoint)
# Prepare data
for endpoint in endpoints:
finding_data.append({
"title": f'{alert["name"]} on {endpoint.raw}',
"description": "\n".join(description + ["| {} |".format(" | ".join([
html.escape(markdown.markdown_table_escape(item.get("uri", "-"))),
html.escape(markdown.markdown_table_escape(item.get("method", "-"))),
html.escape(markdown.markdown_table_escape(item.get("param", "-"))),
html.escape(markdown.markdown_table_escape(item.get("attack", "-"))),
html.escape(markdown.markdown_table_escape(item.get("evidence", "-")))
])) for item in alert["instances"] \
if item.get("uri", None) == endpoint.raw]),
"tool": scanner.get_name(),
"severity": constants.ZAP_SEVERITIES[alert["riskcode"]],
"confidence": constants.ZAP_CONFIDENCES[alert["confidence"]],
"endpoints": [endpoint]
})
# Make one finding object if needed/requested
if not finding_data:
# Extend description
for item in alert["instances"]:
description.append("| {} |".format(" | ".join([
html.escape(markdown.markdown_table_escape(item.get("uri", "-"))),
html.escape(markdown.markdown_table_escape(item.get("method", "-"))),
html.escape(markdown.markdown_table_escape(item.get("param", "-"))),
html.escape(markdown.markdown_table_escape(item.get("attack", "-"))),
html.escape(markdown.markdown_table_escape(item.get("evidence", "-")))
])))
# Endpoints (for backwards compatibility)
endpoints = list()
for item in alert["instances"]:
if not item.get("uri", None):
continue
endpoint = url.parse_url(item.get("uri"))
if endpoint in endpoints:
continue
endpoints.append(endpoint)
# Data
finding_data.append({
"title": alert["name"],
"description": "\n".join(description),
"tool": scanner.get_name(),
"severity": constants.ZAP_SEVERITIES[alert["riskcode"]],
"confidence": constants.ZAP_CONFIDENCES[alert["confidence"]],
"endpoints": endpoints
})
# Make finding objects
for object_data in finding_data:
finding = DastFinding(
title=object_data["title"],
description=object_data["description"]
)
finding.set_meta("tool", object_data["tool"])
finding.set_meta("severity", object_data["severity"])
finding.set_meta("confidence", object_data["confidence"])
finding.set_meta("endpoints", object_data["endpoints"])
log.debug(f"Endpoints: {finding.get_meta('endpoints')}")
scanner.findings.append(finding)