def dnstest()

in lemur/acme_providers/cli.py [0:0]


def dnstest(domain, token):
    """
    Create, verify, and delete DNS TXT records using an autodetected provider.
    """
    print("[+] Starting ACME Tests.")
    change_id = (domain, token)

    acme_handler = AcmeHandler()
    acme_handler.autodetect_dns_providers(domain)
    if not acme_handler.dns_providers_for_domain[domain]:
        raise Exception(f"No DNS providers found for domain: {format(domain)}.")

    # Create TXT Records
    for dns_provider in acme_handler.dns_providers_for_domain[domain]:
        dns_provider_plugin = acme_handler.get_dns_provider(dns_provider.provider_type)
        dns_provider_options = json.loads(dns_provider.credentials)
        account_number = dns_provider_options.get("account_id")

        print(f"[+] Creating TXT Record in `{dns_provider.name}` provider")
        change_id = dns_provider_plugin.create_txt_record(domain, token, account_number)

    print("[+] Verifying TXT Record has propagated to DNS.")
    print("[+] This step could take a while...")
    time.sleep(10)

    # Verify TXT Records
    for dns_provider in acme_handler.dns_providers_for_domain[domain]:
        dns_provider_plugin = acme_handler.get_dns_provider(dns_provider.provider_type)
        dns_provider_options = json.loads(dns_provider.credentials)
        account_number = dns_provider_options.get("account_id")

        try:
            dns_provider_plugin.wait_for_dns_change(change_id, account_number)
            print(f"[+] Verified TXT Record in `{dns_provider.name}` provider")
        except Exception:
            sentry.captureException()
            current_app.logger.debug(
                f"Unable to resolve DNS challenge for change_id: {change_id}, account_id: "
                f"{account_number}",
                exc_info=True,
            )
            print(f"[+] Unable to Verify TXT Record in `{dns_provider.name}` provider")

    time.sleep(10)

    # Delete TXT Records
    for dns_provider in acme_handler.dns_providers_for_domain[domain]:
        dns_provider_plugin = acme_handler.get_dns_provider(dns_provider.provider_type)
        dns_provider_options = json.loads(dns_provider.credentials)
        account_number = dns_provider_options.get("account_id")

        # TODO(csine@: Add Exception Handling
        dns_provider_plugin.delete_txt_record(change_id, account_number, domain, token)
        print(f"[+] Deleted TXT Record in `{dns_provider.name}` provider")

    status = SUCCESS_METRIC_STATUS
    print("[+] Done with ACME Tests.")