in lemur/pending_certificates/service.py [0:0]
def create_certificate(pending_certificate, certificate, user):
"""
Create and store a certificate with pending certificate's info
Args:
pending_certificate: PendingCertificate which will populate the certificate
certificate: dict from Authority, which contains the body, chain and external id
user: User that called this function, used as 'creator' of the certificate if it does
not have an owner
"""
# check if the pending_certificate already has been resolved, in that case return
# existing certificate
maybe_resolved_cert = get(pending_certificate.id)
if maybe_resolved_cert and maybe_resolved_cert.resolved and maybe_resolved_cert.resolved_cert_id:
current_app.logger.warning("Trying to resolve an already resolved certificate, returning existing resolved certificate")
return certificate_service.get(maybe_resolved_cert.resolved_cert_id)
certificate["owner"] = pending_certificate.owner
data, errors = CertificateUploadInputSchema().load(certificate)
if errors:
raise Exception(
"Unable to create certificate: {reasons}".format(reasons=errors)
)
data.update(vars(pending_certificate))
# Copy relationships, vars doesn't copy this without explicit fields
data["notifications"] = list(pending_certificate.notifications)
data["destinations"] = list(pending_certificate.destinations)
data["sources"] = list(pending_certificate.sources)
data["roles"] = list(pending_certificate.roles)
data["replaces"] = list(pending_certificate.replaces)
data["rotation_policy"] = pending_certificate.rotation_policy
data["authority"] = pending_certificate.authority
# Replace external id and chain with the one fetched from source
data["external_id"] = certificate["external_id"]
data["chain"] = certificate["chain"]
creator = user_service.get_by_email(pending_certificate.owner)
if not creator:
# Owner of the pending certificate is not the creator, so use the current user who called
# this as the creator (usually lemur)
creator = user
if pending_certificate.rename:
# If generating name from certificate, remove the one from pending certificate
del data["name"]
data["creator"] = creator
cert = certificate_service.import_certificate(**data)
database.update(cert)
return cert