in lemur/certificates/service.py [0:0]
def create_csr(**csr_config):
"""
Given a list of domains create the appropriate csr
for those domains
:param csr_config:
"""
private_key = generate_private_key(csr_config.get("key_type"))
builder = x509.CertificateSigningRequestBuilder()
name_list = [x509.NameAttribute(x509.OID_COMMON_NAME, csr_config["common_name"])]
if current_app.config.get("LEMUR_OWNER_EMAIL_IN_SUBJECT", True):
name_list.append(
x509.NameAttribute(x509.OID_EMAIL_ADDRESS, csr_config["owner"])
)
if "organization" in csr_config and csr_config["organization"].strip():
name_list.append(
x509.NameAttribute(x509.OID_ORGANIZATION_NAME, csr_config["organization"])
)
if (
"organizational_unit" in csr_config
and csr_config["organizational_unit"].strip()
):
name_list.append(
x509.NameAttribute(
x509.OID_ORGANIZATIONAL_UNIT_NAME, csr_config["organizational_unit"]
)
)
if "country" in csr_config and csr_config["country"].strip():
name_list.append(
x509.NameAttribute(x509.OID_COUNTRY_NAME, csr_config["country"])
)
if "state" in csr_config and csr_config["state"].strip():
name_list.append(
x509.NameAttribute(x509.OID_STATE_OR_PROVINCE_NAME, csr_config["state"])
)
if "location" in csr_config and csr_config["location"].strip():
name_list.append(
x509.NameAttribute(x509.OID_LOCALITY_NAME, csr_config["location"])
)
builder = builder.subject_name(x509.Name(name_list))
extensions = csr_config.get("extensions", {})
critical_extensions = ["basic_constraints", "sub_alt_names", "key_usage"]
noncritical_extensions = ["extended_key_usage"]
for k, v in extensions.items():
if v:
if k in critical_extensions:
current_app.logger.debug(
"Adding Critical Extension: {0} {1}".format(k, v)
)
if k == "sub_alt_names":
if v["names"]:
builder = builder.add_extension(v["names"], critical=True)
else:
builder = builder.add_extension(v, critical=True)
if k in noncritical_extensions:
current_app.logger.debug("Adding Extension: {0} {1}".format(k, v))
builder = builder.add_extension(v, critical=False)
ski = extensions.get("subject_key_identifier", {})
if ski.get("include_ski", False):
builder = builder.add_extension(
x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
critical=False,
)
request = builder.sign(private_key, hashes.SHA256(), default_backend())
# serialize our private key and CSR
private_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL, # would like to use PKCS8 but AWS ELBs don't like it
encryption_algorithm=serialization.NoEncryption(),
).decode("utf-8")
csr = request.public_bytes(encoding=serialization.Encoding.PEM).decode("utf-8")
return csr, private_key