dusty/scanners/dast/zap/scanner.py (434 lines of code) (raw):
#!/usr/bin/python3
# coding=utf-8
# pylint: disable=I0011,E0401,W0702,W0703,R0902
# Copyright 2019 getcarrier.io
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Scanner: OWASP ZAP
"""
import os
import re
import sys
import json
import time
import shutil
import base64
import urllib
import tempfile
import traceback
import subprocess
import pkg_resources
from ruamel.yaml.comments import CommentedSeq
from ruamel.yaml.comments import CommentedMap
from zapv2 import ZAPv2
from dusty.tools import log, status, url
from dusty.models.module import DependentModuleModel
from dusty.models.scanner import ScannerModel
from dusty.models.error import Error
from . import constants
from .parser import parse_findings
class Scanner(DependentModuleModel, ScannerModel):
""" Scanner class """
def __init__(self, context):
""" Initialize scanner instance """
super().__init__()
self.context = context
self.config = \
self.context.config["scanners"][__name__.split(".")[-3]][__name__.split(".")[-2]]
self._zap_daemon = None
self._zap_api = None
self._zap_context = None
self._zap_context_name = None
self._zap_user = None
self._scan_policy_name = None
self._scan_policies = None
def execute(self):
""" Run the scanner """
try:
self._start_zap()
if not self._wait_for_zap_start():
log.error("ZAP failed to start")
error = Error(
tool=self.get_name(),
error="ZAP failed to start",
details="ZAP daemon failed to start"
)
self.errors.append(error)
return
log.info("Target: %s", self.config.get("target"))
self._prepare_context()
self._setup_scan_policy()
self._spider()
self._wait_for_passive_scan()
self._ajax_spider()
self._wait_for_passive_scan()
self._active_scan()
self._wait_for_passive_scan()
except:
log.exception("Exception during ZAP scanning")
error = Error(
tool=self.get_name(),
error=f"Exception during ZAP scanning",
details=f"```\n{traceback.format_exc()}\n```"
)
self.errors.append(error)
finally:
try:
# Get report
log.info("Getting ZAP report")
zap_report = self._zap_api.core.jsonreport()
# Parse JSON
log.info("Processing findings")
parse_findings(zap_report, self)
except:
log.exception("Exception during ZAP findings processing")
error = Error(
tool=self.get_name(),
error=f"Exception during ZAP findings processing",
details=f"```\n{traceback.format_exc()}\n```"
)
self.errors.append(error)
self._save_intermediates()
pkg_resources.cleanup_resources()
self._stop_zap()
def _start_zap(self):
""" Start ZAP daemon, create API client """
log.info("Starting ZAP daemon")
bind_host = "127.0.0.1"
if self.config.get("bind_all_interfaces", True):
bind_host = "0.0.0.0"
daemon_out = subprocess.DEVNULL
if self.config.get("daemon_debug", False):
daemon_out = sys.stdout
zap_home_dir = tempfile.mkdtemp()
log.debug("ZAP home directory: %s", zap_home_dir)
self._zap_daemon = subprocess.Popen([
"/usr/bin/java", self.config.get("java_options", "-Xmx499m"),
"-jar", constants.ZAP_PATH,
"-dir", zap_home_dir,
"-daemon", "-port", "8091", "-host", bind_host,
"-config", "api.key=dusty",
"-config", "api.addrs.addr.regex=true",
"-config", "api.addrs.addr.name=.*",
"-config", "ajaxSpider.browserId=htmlunit"
], stdout=daemon_out, stderr=daemon_out)
self._zap_api = ZAPv2(
apikey="dusty",
proxies={
"http": "http://127.0.0.1:8091",
"https": "http://127.0.0.1:8091"
}
)
def _wait_for_zap_start(self):
for _ in range(600):
try:
log.info("Started ZAP %s", self._zap_api.core.version)
return True
except IOError:
time.sleep(1)
return False
def _save_intermediates(self):
if self.config.get("save_intermediates_to", None):
log.info("Saving intermediates")
base = os.path.join(self.config.get("save_intermediates_to"), __name__.split(".")[-2])
try:
# Make directory for artifacts
os.makedirs(base, mode=0o755, exist_ok=True)
# Save session
self._zap_api.core.save_session(os.path.join(base, "zap.session"))
# Save context
self._zap_api.context.export_context(
self._zap_context_name, os.path.join(base, "zap.context")
)
# Copy log
shutil.copyfile(
os.path.join(self._zap_api.core.zap_home_path, "zap.log"),
os.path.join(base, "zap.log")
)
except:
log.exception("Failed to save intermediates")
def _stop_zap(self):
if self._zap_daemon:
log.info("Stopping ZAP daemon")
self._zap_daemon.kill()
self._zap_daemon.wait()
self._zap_daemon = None
def _wait_for_passive_scan(self):
status.wait_for_completion(
lambda: int(self._zap_api.pscan.records_to_scan) > 0,
lambda: int(self._zap_api.pscan.records_to_scan),
"Passive scan queue: %d items"
)
def _prepare_context(self): # pylint: disable=R0912
# Load or create context
if self.config.get("context_file", None):
log.info("Loading context")
# Load context from file
context_data = self._zap_api.context.import_context(self.config.get("context_file"))
self._zap_context_name = self._zap_api.context.context_list[int(context_data) - 1]
self._zap_context = context_data
else:
log.info("Preparing context")
# Create new context
self._zap_context_name = "dusty"
self._zap_context = self._zap_api.context.new_context(self._zap_context_name)
# Add hostname includsion for newly created context
self._zap_api.context.include_in_context(
self._zap_context_name,
f".*{re.escape(url.parse_url(self.config.get('target')).hostname)}.*"
)
# Setup context inclusions and exclusions
for include_regex in self.config.get("include", list()):
self._zap_api.context.include_in_context(self._zap_context_name, include_regex)
# - exclude from context
if self.config.get("exclude_from_context", True):
for exclude_regex in self.config.get("exclude", list()):
self._zap_api.context.exclude_from_context(self._zap_context_name, exclude_regex)
additional_excludes = self.config.get("exclude_from_context", list())
if isinstance(additional_excludes, list):
for exclude_regex in additional_excludes:
self._zap_api.context.exclude_from_context(
self._zap_context_name, exclude_regex
)
# - exclude from spider
if self.config.get("exclude_from_spider", True):
for exclude_regex in self.config.get("exclude", list()):
self._zap_api.spider.exclude_from_scan(exclude_regex)
additional_excludes = self.config.get("exclude_from_spider", list())
if isinstance(additional_excludes, list):
for exclude_regex in additional_excludes:
self._zap_api.spider.exclude_from_scan(exclude_regex)
# - exclude from ascan
if self.config.get("exclude_from_ascan", True):
for exclude_regex in self.config.get("exclude", list()):
self._zap_api.ascan.exclude_from_scan(exclude_regex)
additional_excludes = self.config.get("exclude_from_ascan", list())
if isinstance(additional_excludes, list):
for exclude_regex in additional_excludes:
self._zap_api.ascan.exclude_from_scan(exclude_regex)
# - exclude from proxy
if self.config.get("exclude_from_proxy", True):
for exclude_regex in self.config.get("exclude", list()):
self._zap_api.core.exclude_from_proxy(exclude_regex)
additional_excludes = self.config.get("exclude_from_proxy", list())
if isinstance(additional_excludes, list):
for exclude_regex in additional_excludes:
self._zap_api.core.exclude_from_proxy(exclude_regex)
# Auth script
if self.config.get("auth_script", None):
# Load our authentication script
self._zap_api.script.load(
scriptname="zap-selenium-login.js",
scripttype="authentication",
scriptengine="Oracle Nashorn",
filename=pkg_resources.resource_filename(
"dusty",
f"{'/'.join(__name__.split('.')[1:-1])}/data/zap-selenium-login.js"
),
scriptdescription="Login via selenium script"
)
# Enable use of loaded script with supplied selenium-like script
self._zap_api.authentication.set_authentication_method(
self._zap_context,
"scriptBasedAuthentication",
urllib.parse.urlencode({
"scriptName": "zap-selenium-login.js",
"Target": self.config.get("target"),
"Script": base64.b64encode(
json.dumps(
self.config.get("auth_script")
).encode("utf-8")
).decode("utf-8")
})
)
# Add user to context
self._zap_user = self._zap_api.users.new_user(self._zap_context, "dusty_user")
self._zap_api.users.set_authentication_credentials(
self._zap_context,
self._zap_user,
urllib.parse.urlencode({
"Username": self.config.get("auth_login", ""),
"Password": self.config.get("auth_password", ""),
"type": "UsernamePasswordAuthenticationCredentials"
})
)
# Enable added user
self._zap_api.users.set_user_enabled(self._zap_context, self._zap_user, True)
# Setup auth indicators
if self.config.get("logged_in_indicator", None):
self._zap_api.authentication.set_logged_in_indicator(
self._zap_context, self.config.get("logged_in_indicator")
)
if self.config.get("logged_out_indicator", None):
self._zap_api.authentication.set_logged_out_indicator(
self._zap_context, self.config.get("logged_out_indicator")
)
def _setup_scan_policy(self):
self._scan_policy_name = "Default Policy"
# Use user-provided policy (if any)
if self.config.get("scan_policy_data", None) or self.config.get("scan_policy_from", None):
log.info("Using user-provided scan policy")
# Write to temp file if needed
if self.config.get("scan_policy_data", None):
policy_file_fd, policy_file = tempfile.mkstemp()
os.close(policy_file_fd)
with open(policy_file, "w") as policy:
log.debug("Scan policy data: '%s'", self.config.get("scan_policy_data"))
policy.write(self.config.get("scan_policy_data"))
else:
policy_file = self.config.get("scan_policy_from")
# Load policy into ZAP
default_policies = self._zap_api.ascan.scan_policy_names
log.info("Importing scan policy from %s", policy_file)
self._zap_api.ascan.import_scan_policy(policy_file)
current_policies = self._zap_api.ascan.scan_policy_names
log.debug("Policies after load: %s", current_policies)
# Remove temporary file
if self.config.get("scan_policy_data", None):
os.remove(policy_file)
# Set name
loaded_policy_names = list(set(current_policies) - set(default_policies))
if loaded_policy_names:
self._scan_policy_name = loaded_policy_names[0]
log.info("Scan policy set to '%s'", self._scan_policy_name)
return
# Setup 'simple' scan policy
self._scan_policies = [
item.strip() for item in self.config.get("scan_types", "all").split(",")
]
# Disable globally blacklisted rules
for item in constants.ZAP_BLACKLISTED_RULES:
self._zap_api.ascan.set_scanner_alert_threshold(
id=item,
alertthreshold="OFF",
scanpolicyname=self._scan_policy_name
)
self._zap_api.pscan.set_scanner_alert_threshold(
id=item,
alertthreshold="OFF"
)
if "all" not in self._scan_policies:
# Disable all scanners first
for item in self._zap_api.ascan.scanners(self._scan_policy_name):
self._zap_api.ascan.set_scanner_alert_threshold(
id=item["id"],
alertthreshold="OFF",
scanpolicyname=self._scan_policy_name
)
# Enable scanners from suite
for policy in self._scan_policies:
for item in constants.ZAP_SCAN_POCILICES.get(policy, []):
self._zap_api.ascan.set_scanner_alert_threshold(
id=item,
alertthreshold="DEFAULT",
scanpolicyname=self._scan_policy_name)
def _spider(self):
log.info("Spidering target: %s", self.config.get("target"))
if self.config.get("auth_script", None):
scan_id = self._zap_api.spider.scan_as_user(
self._zap_context, self._zap_user, self.config.get("target"),
recurse=True, subtreeonly=True
)
else:
scan_id = self._zap_api.spider.scan(self.config.get("target"))
status.wait_for_completion(
lambda: int(self._zap_api.spider.status(scan_id)) < 100,
lambda: int(self._zap_api.spider.status(scan_id)),
"Spidering progress: %d%%"
)
def _ajax_spider(self):
log.info("Ajax spidering target: %s", self.config.get("target"))
if self.config.get("auth_script", None):
self._zap_api.ajaxSpider.scan_as_user(
self._zap_context_name, "dusty_user", self.config.get("target"), subtreeonly=True
)
else:
self._zap_api.ajaxSpider.scan(self.config.get("target"))
status.wait_for_completion(
lambda: self._zap_api.ajaxSpider.status == 'running',
lambda: int(self._zap_api.ajaxSpider.number_of_results),
"Ajax spider found: %d URLs"
)
def _active_scan(self):
log.info("Active scan against target %s", self.config.get("target"))
if self.config.get("auth_script", None):
scan_id = self._zap_api.ascan.scan_as_user(
self.config.get("target"), self._zap_context, self._zap_user, recurse=True,
scanpolicyname=self._scan_policy_name
)
else:
scan_id = self._zap_api.ascan.scan(
self.config.get("target"),
scanpolicyname=self._scan_policy_name
)
status.wait_for_completion(
lambda: int(self._zap_api.ascan.status(scan_id)) < 100,
lambda: int(self._zap_api.ascan.status(scan_id)),
"Active scan progress: %d%%"
)
@staticmethod
def fill_config(data_obj):
""" Make sample config """
data_obj.insert(
len(data_obj), "scan_types", "all",
comment="ZAP scan type, supported any combination of: 'all', 'xss', 'sqli'"
)
data_obj.insert(len(data_obj), "target", "http://app:8080", comment="scan target")
data_obj.insert(
len(data_obj), "context_file", "/path/to/zap_context",
comment="(optional) Path to ZAP context file"
)
data_obj.insert(
len(data_obj), "include", ["http://app:8080/path.*"],
comment="(optional) URLs regex to additionally include in scan"
)
data_obj.insert(
len(data_obj), "exclude", ["http://app:8080/logout.*"],
comment="(optional) URLs regex to exclude from scan"
)
data_obj.insert(
len(data_obj), "exclude_from_context", True,
comment="(optional) True/False to add data from exclude option." \
"Or URLs regex list to exclude from context"
)
data_obj.insert(
len(data_obj), "exclude_from_spider", True,
comment="(optional) True/False to add data from exclude option." \
"Or URLs regex list to exclude from spider"
)
data_obj.insert(
len(data_obj), "exclude_from_ascan", True,
comment="(optional) True/False to add data from exclude option." \
"Or URLs regex list to exclude from active scan"
)
data_obj.insert(
len(data_obj), "exclude_from_proxy", True,
comment="(optional) True/False to add data from exclude option." \
"Or URLs regex list to exclude from proxy"
)
data_obj.insert(
len(data_obj), "logged_in_indicator", "Logout",
comment="(optional) Response regex that is always present for authenticated user"
)
data_obj.insert(
len(data_obj), "logged_out_indicator", "Register a new account",
comment="(optional) Response regex that is present for unauthenticated user"
)
data_obj.insert(
len(data_obj), "auth_login", "user",
comment="(optional) User login for authenticated scan"
)
data_obj.insert(
len(data_obj), "auth_password", "P@ssw0rd",
comment="(optional) User password for authenticated scan"
)
data_obj.insert(
len(data_obj), "auth_script", CommentedSeq(),
comment="(optional) Selenium-like script for authenticated scan"
)
script_obj = data_obj["auth_script"]
for command in [
{"command": "open", "target": "%Target%/login", "value": ""},
{"command": "waitForElementPresent", "target": "id=login_login", "value": ""},
{"command": "waitForElementPresent", "target": "id=login_password", "value": ""},
{"command": "waitForElementPresent", "target": "id=login_0", "value": ""},
{"command": "type", "target": "id=login_login", "value": "%Username%"},
{"command": "type", "target": "id=login_password", "value": "%Password%"},
{"command": "clickAndWait", "target": "id=login_0", "value": ""}
]:
command_obj = CommentedMap()
command_obj.fa.set_flow_style()
for key in ["command", "target", "value"]:
command_obj.insert(len(command_obj), key, command[key])
script_obj.append(command_obj)
data_obj.insert(
len(data_obj), "bind_all_interfaces", True,
comment="(optional) Bind ZAP to all interfaces or only to localhost"
)
data_obj.insert(
len(data_obj), "daemon_debug", False,
comment="(optional) Send ZAP daemon output to stdout"
)
data_obj.insert(
len(data_obj), "java_options", "-Xmx499m",
comment="(optional) Java options for ZAP daemon"
)
data_obj.insert(
len(data_obj), "split_by_endpoint", False,
comment="(optional) Create separate findings for every endpoint"
)
data_obj.insert(
len(data_obj), "save_intermediates_to", "/data/intermediates/dast",
comment="(optional) Save scan intermediates (raw results, logs, ...)"
)
@staticmethod
def validate_config(config):
""" Validate config """
required = ["target"]
not_set = [item for item in required if item not in config]
if not_set:
error = f"Required configuration options not set: {', '.join(not_set)}"
log.error(error)
raise ValueError(error)
@staticmethod
def get_name():
""" Module name """
return "OWASP ZAP"
@staticmethod
def get_description():
""" Module description or help message """
return "OWASP Zed Attack Proxy (ZAP)"