dusty/scanners/dast/qualys/helper.py (291 lines of code) (raw):
#!/usr/bin/python3
# coding=utf-8
# pylint: disable=I0011,E0401,R0903,R0913,R0902
# Copyright 2019 getcarrier.io
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Qualys API helper
"""
import time
import requests
from dotted.utils import dot # pylint: disable=C0411
from dusty.tools import log
class QualysHelper:
""" Helps to query Qualys API """
def __init__(self, context, server, login, password, retries=10, retry_delay=30.0, timeout=120): # pylint: disable=R0913
self.context = context
self.server = server
self.login = login
self.password = password
self.retries = retries
self.retry_delay = retry_delay
self.timeout = timeout
self._connection_obj = None
@property
def _connection(self):
""" Prepare connection object """
if self._connection_obj is None:
self._connection_obj = requests.Session()
self._connection_obj.auth = (self.login, self.password)
self._connection_obj.headers.update({"Accept": "application/json"})
return self._connection_obj
def _destroy_connection(self):
""" Destroy connection object """
if self._connection_obj is not None:
self._connection_obj.close()
self._connection_obj = None
def _request(self, endpoint, json=None, validator=None):
""" Perform API request (with error handling) """
last_response_text = ""
for retry in range(self.retries):
try:
response = self._request_raw(endpoint, json)
if validator is not None and not validator(response):
last_response_text = response.text
raise ValueError(f"Invalid response: {response.text}")
return response
except: # pylint: disable=W0702
log.exception("Qualys API error [retry=%d]", retry)
self._destroy_connection()
time.sleep(self.retry_delay)
raise RuntimeError(
f"Qualys API request failed after {self.retries} retries. " \
f"Last response: {last_response_text}"
)
def _request_raw(self, endpoint, json=None):
""" Perform API request (directly) """
api = self._connection
if json is None:
response = api.get(f"{self.server}{endpoint}", timeout=self.timeout)
else:
response = api.post(f"{self.server}{endpoint}", json=json, timeout=self.timeout)
log.debug(
"API response: %d [%s] %s",
response.status_code, response.headers, response.text
)
return response
def get_version(self):
""" Get WAS version """
response = self._request(
"/qps/rest/portal/version",
validator=lambda r: r.ok and \
dot(r.json()).ServiceResponse.responseCode == "SUCCESS"
)
obj = dot(response.json())
return obj.ServiceResponse.data[0]["Portal-Version"]["WAS-VERSION"]
def search_for_webapp(self, webapp_name):
""" Search for existing WebApp and get ID """
response = self._request(
"/qps/rest/3.0/search/was/webapp",
json={
"ServiceRequest": {
"filters": {
"Criteria": [{
"field": "name",
"operator": "EQUALS",
"value": webapp_name
}]
}
}
},
validator=lambda r: r.ok and \
dot(r.json()).ServiceResponse.responseCode == "SUCCESS"
)
obj = dot(response.json())
if obj.ServiceResponse.count == 0:
return None
return obj.ServiceResponse.data[0].WebApp.id
def count_scans_in_webapp(self, webapp_id):
""" Count submitted/running scans in WebApp """
response = self._request(
"/qps/rest/3.0/count/was/wasscan",
json={
"ServiceRequest": {
"filters": {
"Criteria": [{
"field": "webApp.id",
"operator": "EQUALS",
"value": webapp_id
}, {
"field": "status",
"operator": "IN",
"value": "SUBMITTED,RUNNING"
}]
}
}
},
validator=lambda r: r.ok and \
dot(r.json()).ServiceResponse.responseCode == "SUCCESS"
)
obj = dot(response.json())
try:
return obj.ServiceResponse.count
except: # pylint: disable=W0702
return 0 # On error - allow to try to delete stale project
def create_webapp(self, name, application_url, option_profile, excludes=None):
""" Create WebApp record """
if excludes is None:
payload = {
"ServiceRequest": {
"data": {
"WebApp": {
"name": name,
"url": application_url,
"defaultProfile": {"id": int(option_profile)}
}
}
}
}
else:
payload = {
"ServiceRequest": {
"data": {
"WebApp": {
"name": name,
"url": application_url,
"defaultProfile": {"id": int(option_profile)},
"urlBlacklist": {"set": {"UrlEntry": [
{"value": item, "regex": "true"} for item in excludes
]}},
"postDataBlacklist": {"set": {"UrlEntry": [
{"value": item, "regex": "true"} for item in excludes
]}}
}
}
}
}
response = self._request(
"/qps/rest/3.0/create/was/webapp", json=payload,
validator=lambda r: r.ok and \
dot(r.json()).ServiceResponse.responseCode == "SUCCESS"
)
obj = dot(response.json())
return obj.ServiceResponse.data[0].WebApp.id
def create_selenium_auth_record(self, name, script, regex):
""" Create selenium auth record """
response = self._request(
"/qps/rest/3.0/create/was/webappauthrecord",
json={
"ServiceRequest": {
"data": {
"WebAppAuthRecord": {
"name": name,
"formRecord": {
"type": "SELENIUM",
"seleniumScript": {
"name": "seleniumScriptOK",
"data": script,
"regex": regex
}
}
}
}
}
},
validator=lambda r: r.ok and \
dot(r.json()).ServiceResponse.responseCode == "SUCCESS"
)
obj = dot(response.json())
return obj.ServiceResponse.data[0].WebAppAuthRecord.id
def add_auth_record_to_webapp(self, webapp_id, webapp_name, auth_record_id):
""" Add auth record to WebApp """
response = self._request(
f"/qps/rest/3.0/update/was/webapp/{webapp_id}",
json={
"ServiceRequest": {
"data": {
"WebApp": {
"name": webapp_name,
"authRecords": {
"add": {
"WebAppAuthRecord": [{
"id": auth_record_id
}]
}
}
}
}
}
},
validator=lambda r: r.ok and dot(r.json()).ServiceResponse.responseCode
)
obj = dot(response.json())
return obj.ServiceResponse.responseCode == "SUCCESS"
def delete_asset(self, asset_type, asset_id):
""" Delete asset """
response = self._request(
f"/qps/rest/3.0/delete/was/{asset_type}/{asset_id}",
json={}, validator=lambda r: r.ok and dot(r.json()).ServiceResponse.responseCode
)
obj = dot(response.json())
return obj.ServiceResponse.responseCode == "SUCCESS"
def start_scan(self, name, webapp_id, option_profile, scanner_appliance, auth_record):
""" Start scan """
response = self._request(
"/qps/rest/3.0/launch/was/wasscan/",
json={
"ServiceRequest": {
"data": {
"WasScan": {
"name": name,
"type": "VULNERABILITY",
"target": {
"webApp": {"id": webapp_id},
"webAppAuthRecord": auth_record,
"scannerAppliance": scanner_appliance
},
"profile": {"id": int(option_profile)},
"sendMail": False
}
}
}
},
validator=lambda r: r.ok and \
dot(r.json()).ServiceResponse.responseCode == "SUCCESS"
)
obj = dot(response.json())
return obj.ServiceResponse.data[0].WasScan.id
def get_scan_status(self, scan_id):
""" Get scan status """
response = self._request(
f"/qps/rest/3.0/status/was/wasscan/{scan_id}",
validator=lambda r: r.ok and \
dot(r.json()).ServiceResponse.responseCode == "SUCCESS"
)
obj = dot(response.json())
return obj.ServiceResponse.data[0].WasScan.status
def get_scan_results_status(self, scan_id):
""" Get scan status """
response = self._request(
f"/qps/rest/3.0/status/was/wasscan/{scan_id}",
validator=lambda r: r.ok and \
dot(r.json()).ServiceResponse.responseCode == "SUCCESS"
)
obj = dot(response.json())
try:
return obj.ServiceResponse.data[0].WasScan.summary.resultsStatus
except: # pylint: disable=W0702
return "UNKNOWN"
def create_report(self, name, webapp_id, report_template):
""" Create report """
response = self._request(
"/qps/rest/3.0/create/was/report",
json={
"ServiceRequest": {
"data": {
"Report": {
"name": name,
"description": "Report generated by API with Dusty",
"format": "XML",
"type": "WAS_SCAN_REPORT",
"config": {
"webAppReport": {
"target": {
"webapps": {
"WebApp": [{
"id": webapp_id
}]
}
}
}
},
"template": {"id": int(report_template)}
}
}
}
},
validator=lambda r: r.ok and \
dot(r.json()).ServiceResponse.responseCode == "SUCCESS"
)
obj = dot(response.json())
return obj.ServiceResponse.data[0].Report.id
def get_report_status(self, report_id):
""" Get scan status """
response = self._request(
f"/qps/rest/3.0/status/was/report/{report_id}",
validator=lambda r: r.ok and \
dot(r.json()).ServiceResponse.responseCode == "SUCCESS"
)
obj = dot(response.json())
return obj.ServiceResponse.data[0].Report.status
def download_report(self, report_id):
""" Download report data """
response = self._request(
f"/qps/rest/3.0/download/was/report/{report_id}",
validator=lambda r: r.ok
)
return response.content