in dusty/scanners/dast/qualys/scanner.py [0:0]
def execute(self): # pylint: disable=R0912
""" Run the scanner """
helper = QualysHelper(
self.context,
self.config.get("qualys_api_server"),
self.config.get("qualys_login"),
self.config.get("qualys_password"),
retries=self.config.get("retries", 10),
retry_delay=self.config.get("retry_delay", 30.0),
timeout=self.config.get("timeout", 120)
)
log.info("Qualys WAS version: %s", helper.get_version())
timestamp = datetime.utcfromtimestamp(int(time())).strftime("%Y-%m-%d %H:%M:%S")
sleep_interval = self.config.get("sleep_interval", 10.0)
status_check_interval = self.config.get("status_check_interval", 60.0)
# Create/get project
project_name = "{}_{}".format(
self.context.get_meta("project_name", "UnnamedProject"),
self.context.get_meta("project_description", "Undescribed Project")
)
if self.config.get("random_name", False):
project_name = f"{project_name}_{self.id_generator(8)}"
log.info("Searching for existing webapp")
webapp_id = helper.search_for_webapp(project_name)
if webapp_id is None:
log.info("Creating webapp")
webapp_id = helper.create_webapp(
project_name,
self.config.get("target"),
self.config.get("qualys_option_profile_id"),
excludes=self.config.get("exclude", None)
)
sleep(sleep_interval)
# Create auth record if needed
auth_id = None
if self.config.get("auth_script", None):
log.info("Creating auth record")
auth_name = f"{project_name} SeleniumAuthScript {timestamp}"
auth_data = self.render_selenium_script(
self.config.get("auth_script"),
self.config.get("auth_login", ""),
self.config.get("auth_password", ""),
self.config.get("target")
)
auth_id = helper.create_selenium_auth_record(
auth_name, auth_data,
self.config.get("logged_in_indicator", "selenium")
)
sleep(sleep_interval)
helper.add_auth_record_to_webapp(webapp_id, project_name, auth_id)
# Start scan
log.info("Starting scan")
scan_name = f"{project_name} WAS {timestamp}"
scan_auth = {"isDefault": True}
if auth_id is not None:
scan_auth = {"id": auth_id}
scan_scanner = {"type": "EXTERNAL"}
if self.config.get("qualys_scanner_type", "EXTERNAL") == "INTERNAL" and \
self.config.get("qualys_scanner_pool", None):
scanner_pool = self.config.get("qualys_scanner_pool")
if isinstance(scanner_pool, str):
scanner_pool = [item.strip() for item in scanner_pool.split(",")]
scan_scanner = {
"type": "INTERNAL",
"friendlyName": random.choice(scanner_pool)
}
scan_id = helper.start_scan(
scan_name, webapp_id,
self.config.get("qualys_option_profile_id"),
scan_scanner, scan_auth
)
sleep(sleep_interval)
# Wait for scan to finish
while helper.get_scan_status(scan_id) in ["SUBMITTED", "RUNNING"]:
log.info("Waiting for scan to finish")
sleep(status_check_interval)
# Wait for results to finish processing
if helper.get_scan_results_status(scan_id) == "UNKNOWN":
log.warning(
"Unable to find scan results status. Scan status: %s",
helper.get_scan_status(scan_id)
)
while helper.get_scan_results_status(scan_id) in ["TO_BE_PROCESSED", "PROCESSING"]:
log.info("Waiting for scan results to finish processing")
sleep(status_check_interval)
scan_result = helper.get_scan_results_status(scan_id)
if scan_result in ["NO_HOST_ALIVE", "NO_WEB_SERVICE"]:
error = Error(
tool=self.get_name(),
error=f"Qualys failed to access target",
details="Qualys failed to access target " \
"(e.g. connection failed or target is not accessible). " \
"Please check scanner type/pool and target URL."
)
self.errors.append(error)
if scan_result in ["SCAN_RESULTS_INVALID", "SERVICE_ERROR", "SCAN_INTERNAL_ERROR"]:
error = Error(
tool=self.get_name(),
error=f"Qualys internal error occured",
details="Qualys failed to perform scan (internal scan error occured). " \
"Please re-run the scan and check config if error persists."
)
self.errors.append(error)
# Request report
log.info("Requesting report")
report_name = f"{project_name} WAS {timestamp} FOR Scan {scan_id}"
report_id = helper.create_report(
report_name, webapp_id,
self.config.get("qualys_report_template_id")
)
sleep(sleep_interval)
# Wait for report to be created
while helper.get_report_status(report_id) in ["RUNNING"]:
log.info("Waiting for report to be created")
sleep(status_check_interval)
# Download report
log.info("Downloading report XML")
report_xml = helper.download_report(report_id)
# Delete assets
log.info("Deleting assets")
helper.delete_asset("report", report_id)
helper.delete_asset("wasscan", scan_id)
if auth_id is not None:
helper.delete_asset("webappauthrecord", auth_id)
helper.delete_asset("webapp", webapp_id)
# Parse findings
parse_findings(report_xml, self)
# Save intermediates
self.save_intermediates(report_xml)