dusty/scanners/dast/qualys/scanner.py (300 lines of code) (raw):
#!/usr/bin/python3
# coding=utf-8
# pylint: disable=I0011,E0401,W0702,W0703,R0902,R0914,R0915
# Copyright 2019 getcarrier.io
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Scanner: Qualys WAS
"""
import os
import string
import random
from time import sleep, time
from datetime import datetime
from ruamel.yaml.comments import CommentedSeq
from ruamel.yaml.comments import CommentedMap
from dusty.tools import log
from dusty.models.module import DependentModuleModel
from dusty.models.scanner import ScannerModel
from dusty.models.error import Error
from .helper import QualysHelper
from .parser import parse_findings
class Scanner(DependentModuleModel, ScannerModel):
""" Scanner class """
def __init__(self, context):
""" Initialize scanner instance """
super().__init__()
self.context = context
self.config = \
self.context.config["scanners"][__name__.split(".")[-3]][__name__.split(".")[-2]]
def execute(self): # pylint: disable=R0912
""" Run the scanner """
helper = QualysHelper(
self.context,
self.config.get("qualys_api_server"),
self.config.get("qualys_login"),
self.config.get("qualys_password"),
retries=self.config.get("retries", 10),
retry_delay=self.config.get("retry_delay", 30.0),
timeout=self.config.get("timeout", 120)
)
log.info("Qualys WAS version: %s", helper.get_version())
timestamp = datetime.utcfromtimestamp(int(time())).strftime("%Y-%m-%d %H:%M:%S")
sleep_interval = self.config.get("sleep_interval", 10.0)
status_check_interval = self.config.get("status_check_interval", 60.0)
# Create/get project
project_name = "{}_{}".format(
self.context.get_meta("project_name", "UnnamedProject"),
self.context.get_meta("project_description", "Undescribed Project")
)
if self.config.get("random_name", False):
project_name = f"{project_name}_{self.id_generator(8)}"
log.info("Searching for existing webapp")
webapp_id = helper.search_for_webapp(project_name)
if webapp_id is None:
log.info("Creating webapp")
webapp_id = helper.create_webapp(
project_name,
self.config.get("target"),
self.config.get("qualys_option_profile_id"),
excludes=self.config.get("exclude", None)
)
sleep(sleep_interval)
# Create auth record if needed
auth_id = None
if self.config.get("auth_script", None):
log.info("Creating auth record")
auth_name = f"{project_name} SeleniumAuthScript {timestamp}"
auth_data = self.render_selenium_script(
self.config.get("auth_script"),
self.config.get("auth_login", ""),
self.config.get("auth_password", ""),
self.config.get("target")
)
auth_id = helper.create_selenium_auth_record(
auth_name, auth_data,
self.config.get("logged_in_indicator", "selenium")
)
sleep(sleep_interval)
helper.add_auth_record_to_webapp(webapp_id, project_name, auth_id)
# Start scan
log.info("Starting scan")
scan_name = f"{project_name} WAS {timestamp}"
scan_auth = {"isDefault": True}
if auth_id is not None:
scan_auth = {"id": auth_id}
scan_scanner = {"type": "EXTERNAL"}
if self.config.get("qualys_scanner_type", "EXTERNAL") == "INTERNAL" and \
self.config.get("qualys_scanner_pool", None):
scanner_pool = self.config.get("qualys_scanner_pool")
if isinstance(scanner_pool, str):
scanner_pool = [item.strip() for item in scanner_pool.split(",")]
scan_scanner = {
"type": "INTERNAL",
"friendlyName": random.choice(scanner_pool)
}
scan_id = helper.start_scan(
scan_name, webapp_id,
self.config.get("qualys_option_profile_id"),
scan_scanner, scan_auth
)
sleep(sleep_interval)
# Wait for scan to finish
while helper.get_scan_status(scan_id) in ["SUBMITTED", "RUNNING"]:
log.info("Waiting for scan to finish")
sleep(status_check_interval)
# Wait for results to finish processing
if helper.get_scan_results_status(scan_id) == "UNKNOWN":
log.warning(
"Unable to find scan results status. Scan status: %s",
helper.get_scan_status(scan_id)
)
while helper.get_scan_results_status(scan_id) in ["TO_BE_PROCESSED", "PROCESSING"]:
log.info("Waiting for scan results to finish processing")
sleep(status_check_interval)
scan_result = helper.get_scan_results_status(scan_id)
if scan_result in ["NO_HOST_ALIVE", "NO_WEB_SERVICE"]:
error = Error(
tool=self.get_name(),
error=f"Qualys failed to access target",
details="Qualys failed to access target " \
"(e.g. connection failed or target is not accessible). " \
"Please check scanner type/pool and target URL."
)
self.errors.append(error)
if scan_result in ["SCAN_RESULTS_INVALID", "SERVICE_ERROR", "SCAN_INTERNAL_ERROR"]:
error = Error(
tool=self.get_name(),
error=f"Qualys internal error occured",
details="Qualys failed to perform scan (internal scan error occured). " \
"Please re-run the scan and check config if error persists."
)
self.errors.append(error)
# Request report
log.info("Requesting report")
report_name = f"{project_name} WAS {timestamp} FOR Scan {scan_id}"
report_id = helper.create_report(
report_name, webapp_id,
self.config.get("qualys_report_template_id")
)
sleep(sleep_interval)
# Wait for report to be created
while helper.get_report_status(report_id) in ["RUNNING"]:
log.info("Waiting for report to be created")
sleep(status_check_interval)
# Download report
log.info("Downloading report XML")
report_xml = helper.download_report(report_id)
# Delete assets
log.info("Deleting assets")
helper.delete_asset("report", report_id)
helper.delete_asset("wasscan", scan_id)
if auth_id is not None:
helper.delete_asset("webappauthrecord", auth_id)
helper.delete_asset("webapp", webapp_id)
# Parse findings
parse_findings(report_xml, self)
# Save intermediates
self.save_intermediates(report_xml)
def save_intermediates(self, report_xml):
""" Save scanner intermediates """
if self.config.get("save_intermediates_to", None):
log.info("Saving intermediates")
base = os.path.join(self.config.get("save_intermediates_to"), __name__.split(".")[-2])
try:
# Make directory for artifacts
os.makedirs(base, mode=0o755, exist_ok=True)
# Save report
with open(os.path.join(base, "report.xml"), "w") as report:
report.write(report_xml.decode("utf-8", errors="ignore"))
except:
log.exception("Failed to save intermediates")
@staticmethod
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
""" Generate random ID (legacy code) """
return ''.join(random.choice(chars) for _ in range(size))
@staticmethod
def render_selenium_script(auth_script, auth_login, auth_password, target):
""" Generate selenium script in HTML format """
# pylint: disable=C0301
result = \
f'<?xml version="1.0" encoding="UTF-8"?>' \
f'<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">' \
f'<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">' \
f'<head profile="http://selenium-ide.openqa.org/profiles/test-case">' \
f'<meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />' \
f'<link rel="selenium.base" href="https://community.qualys.com/" />' \
f'<title>seleniumScriptOK</title>' \
f'</head>' \
f'<body>' \
f'<table cellpadding="1" cellspacing="1" border="1">' \
f'<thead>' \
f'<tr><td rowspan="1" colspan="3">seleniumScriptOK</td></tr>' \
f'</thead><tbody>'
for item in auth_script:
item_command = item["command"]
item_target = item["target"]
item_target = item_target.replace("%Target%", target)
item_target = item_target.replace("%Username%", auth_login)
item_target = item_target.replace("%Password%", auth_password)
item_value = item["value"]
item_value = item_value.replace("%Target%", target)
item_value = item_value.replace("%Username%", auth_login)
item_value = item_value.replace("%Password%", auth_password)
result += \
f'<tr>' \
f'<td>{item_command}</td>' \
f'<td>{item_target}</td>' \
f'<td>{item_value}</td>' \
f'</tr>'
result += \
f'</tbody></table>' \
f'</body>' \
f'</html>'
return result
@staticmethod
def fill_config(data_obj):
""" Make sample config """
data_obj.insert(
len(data_obj), "qualys_api_server", "https://qualysapi.qualys.eu",
comment="Qualys API server URL"
)
data_obj.insert(
len(data_obj), "qualys_login", "some-user",
comment="Qualys user login"
)
data_obj.insert(
len(data_obj), "qualys_password", "S0m3P@ssw0rd",
comment="Qualys user password"
)
data_obj.insert(
len(data_obj), "qualys_option_profile_id", 12345,
comment="Qualys option profile ID"
)
data_obj.insert(
len(data_obj), "qualys_report_template_id", 12345,
comment="Qualys report template ID"
)
data_obj.insert(
len(data_obj), "qualys_scanner_type", "EXTERNAL",
comment="Qualys scanner type: EXTERNAL or INTERNAL"
)
data_obj.insert(
len(data_obj), "qualys_scanner_pool", CommentedSeq(),
comment="(INTERNAL only) Qualys scanner pool: list of scanner appliances to choose from"
)
pool_obj = data_obj["qualys_scanner_pool"]
pool_obj.append("MY_SCANNER_Name1")
pool_obj.append("MY_SCANNER_Name2")
pool_obj.append("MY_OTHERSCANNER_Name")
data_obj.insert(len(data_obj), "random_name", False, comment="Use random project name")
data_obj.insert(len(data_obj), "target", "http://app:8080", comment="scan target")
data_obj.insert(
len(data_obj), "exclude", ["http://app:8080/logout.*"],
comment="(optional) URLs regex to exclude from scan"
)
data_obj.insert(
len(data_obj), "auth_login", "user",
comment="(optional) User login for authenticated scan"
)
data_obj.insert(
len(data_obj), "auth_password", "P@ssw0rd",
comment="(optional) User password for authenticated scan"
)
data_obj.insert(
len(data_obj), "auth_script", CommentedSeq(),
comment="(optional) Selenium-like script for authenticated scan"
)
script_obj = data_obj["auth_script"]
for command in [
{"command": "open", "target": "%Target%/login", "value": ""},
{"command": "waitForElementPresent", "target": "id=login_login", "value": ""},
{"command": "waitForElementPresent", "target": "id=login_password", "value": ""},
{"command": "waitForElementPresent", "target": "id=login_0", "value": ""},
{"command": "type", "target": "id=login_login", "value": "%Username%"},
{"command": "type", "target": "id=login_password", "value": "%Password%"},
{"command": "clickAndWait", "target": "id=login_0", "value": ""}
]:
command_obj = CommentedMap()
command_obj.fa.set_flow_style()
for key in ["command", "target", "value"]:
command_obj.insert(len(command_obj), key, command[key])
script_obj.append(command_obj)
data_obj.insert(
len(data_obj), "logged_in_indicator", "Logout",
comment="(optional) Response regex that is always present for authenticated user"
)
data_obj.insert(
len(data_obj), "sleep_interval", 10,
comment="(optional) Seconds to sleep after creating new resource"
)
data_obj.insert(
len(data_obj), "status_check_interval", 60,
comment="(optional) Seconds to wait between scan/report status checks"
)
data_obj.insert(
len(data_obj), "retries", 10,
comment="(optional) API request retry count"
)
data_obj.insert(
len(data_obj), "retry_delay", 30,
comment="(optional) API request retry delay"
)
data_obj.insert(
len(data_obj), "timeout", 120,
comment="(optional) API request timeout"
)
data_obj.insert(
len(data_obj), "save_intermediates_to", "/data/intermediates/dast",
comment="(optional) Save scan intermediates (raw results, logs, ...)"
)
@staticmethod
def validate_config(config):
""" Validate config """
required = [
"qualys_api_server", "qualys_login", "qualys_password",
"qualys_option_profile_id", "qualys_report_template_id",
"qualys_scanner_type",
"target"
]
not_set = [item for item in required if item not in config]
if not_set:
error = f"Required configuration options not set: {', '.join(not_set)}"
log.error(error)
raise ValueError(error)
@staticmethod
def get_name():
""" Module name """
return "Qualys WAS"
@staticmethod
def get_description():
""" Module description or help message """
return "Qualys (R) Web Application Scanning"