in lemur/pending_certificates/service.py [0:0]
def render(args):
query = database.session_query(PendingCertificate)
time_range = args.pop("time_range")
destination_id = args.pop("destination_id")
notification_id = args.pop("notification_id", None)
show = args.pop("show")
# owner = args.pop('owner')
# creator = args.pop('creator') # TODO we should enabling filtering by owner
filt = args.pop("filter")
if filt:
terms = filt.split(";")
if "issuer" in terms:
# we can't rely on issuer being correct in the cert directly so we combine queries
sub_query = (
database.session_query(Authority.id)
.filter(Authority.name.ilike("%{0}%".format(terms[1])))
.subquery()
)
query = query.filter(
or_(
PendingCertificate.issuer.ilike("%{0}%".format(terms[1])),
PendingCertificate.authority_id.in_(sub_query),
)
)
elif "destination" in terms:
query = query.filter(
PendingCertificate.destinations.any(Destination.id == terms[1])
)
elif "notify" in filt:
query = query.filter(PendingCertificate.notify == truthiness(terms[1]))
elif "active" in filt:
query = query.filter(PendingCertificate.active == truthiness(terms[1]))
elif "cn" in terms:
query = query.filter(
or_(
PendingCertificate.cn.ilike("%{0}%".format(terms[1])),
PendingCertificate.domains.any(
Domain.name.ilike("%{0}%".format(terms[1]))
),
)
)
elif "id" in terms:
query = query.filter(PendingCertificate.id == cast(terms[1], Integer))
else:
query = database.filter(query, PendingCertificate, terms)
if show:
sub_query = (
database.session_query(Role.name)
.filter(Role.user_id == args["user"].id)
.subquery()
)
query = query.filter(
or_(
PendingCertificate.user_id == args["user"].id,
PendingCertificate.owner.in_(sub_query),
)
)
if destination_id:
query = query.filter(
PendingCertificate.destinations.any(Destination.id == destination_id)
)
if notification_id:
query = query.filter(
PendingCertificate.notifications.any(Notification.id == notification_id)
)
if time_range:
to = arrow.now().shift(weeks=+time_range).format("YYYY-MM-DD")
now = arrow.now().format("YYYY-MM-DD")
query = query.filter(PendingCertificate.not_after <= to).filter(
PendingCertificate.not_after >= now
)
# Only show unresolved certificates in the UI
query = query.filter(PendingCertificate.resolved.is_(False))
return database.sort_and_page(query, PendingCertificate, args)