dusty/scanners/dast/w3af/parser.py (89 lines of code) (raw):
#!/usr/bin/python3
# coding=utf-8
# pylint: disable=I0011,W1401,E0401,R0914,R0915,R0912
# Copyright 2019 getcarrier.io
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
w3af XML parser
"""
import base64
import hashlib
from urllib.parse import urlparse
from lxml import etree
from dusty.tools import log, url, markdown
from dusty.models.finding import DastFinding
from . import constants
def parse_findings(output_file, scanner):
""" Parse findings (code from dusty 1.0) """
log.debug("Parsing findings")
parser = etree.XMLParser(resolve_entities=False, huge_tree=True)
w3scan = etree.parse(output_file, parser)
root = w3scan.getroot()
dupes = dict()
for vulnerability in root.findall("vulnerability"):
name = vulnerability.attrib["name"]
severity = constants.W3AF_SEVERITIES[vulnerability.attrib["severity"]]
description = "%s are:\n\n" % vulnerability.find("description").text.split("are:")[0]
transactions = vulnerability.find("http-transactions")
if transactions is not None:
transactions = transactions.findall("http-transaction")
for transaction in transactions:
request = transaction.find("http-request")
response = transaction.find("http-response")
status = request.find("status").text.split(" ")
response_code = response.find("status").text.split(" ")[1]
http_method = status[0]
request_url = status[1]
data = ""
for part in [request, response]:
headers = [f"{h.attrib['field']} -> {h.attrib['content']}" \
for h in part.find("headers").findall("header")]
headers = "\n".join(headers)
request_body = part.find("body")
if request_body.attrib["content-encoding"] == "base64":
if request_body.text:
request_body = base64.b64decode(
request_body.text
).decode("utf-8", errors="ignore")
else:
request_body = ""
else:
request_body = request_body.text if request_body.text else ""
if not data:
data = f"Request: {request_url} {http_method} {response_code} \n\n"
else:
data += "Response: \n"
data += f"Headers: {headers}\n\nBody:{request_body}\n\n"
dupe_url = urlparse(request_url)
# Creating dupe path: need to think on more intelligent implementation
dupe_path = dupe_url.path[:dupe_url.path.index("%")] \
if "%" in dupe_url.path else dupe_url.path
dupe_path = dupe_path[:dupe_path.index("+")] if "+" in dupe_path else dupe_path
dupe_path = dupe_path[:dupe_path.index(".")] if "." in dupe_path else dupe_path
dupe_path = dupe_path[:dupe_path.rindex("/")] if "/" in dupe_path else dupe_path
dupe_url = f"{dupe_url.scheme}://{dupe_url.netloc}{dupe_path}"
dupe_code = f"{str(response_code)[0]}xx"
dupe_key = hashlib.md5(
f"{name} {dupe_url} {http_method} {dupe_code}".encode("utf-8")
).hexdigest()
# Create finding data dictionary
if dupe_key not in dupes:
dupes[dupe_key] = {
"title": f"{name} {dupe_url} {dupe_code}",
"description": description,
"severity": severity,
"references": data,
"endpoints": list()
}
elif data not in dupes[dupe_key]["references"]:
dupes[dupe_key]["references"] += data
if request_url not in dupes[dupe_key]["endpoints"]:
dupes[dupe_key]["description"] += f"- {request_url}\n\n"
dupes[dupe_key]["endpoints"].append(request_url)
# Create finding objects
for item in dupes.values():
description = f"{markdown.markdown_escape(item['description'])}\n\n"
description += f"**References:**\n {markdown.markdown_escape(item['references'])}\n\n"
# Make finding object
finding = DastFinding(
title=item["title"],
description=description
)
finding.set_meta("tool", scanner.get_name())
finding.set_meta("severity", item["severity"])
# Endpoints (for backwards compatibility)
endpoints = list()
for entry in item["endpoints"]:
endpoint = url.parse_url(entry)
if endpoint in endpoints:
continue
endpoints.append(endpoint)
finding.set_meta("endpoints", endpoints)
log.debug(f"Endpoints: {finding.get_meta('endpoints')}")
# Done
scanner.findings.append(finding)