in lemur/certificates/service.py [0:0]
def render(args):
"""
Helper function that allows use to render our REST Api.
:param args:
:return:
"""
query = database.session_query(Certificate)
show_expired = args.pop("showExpired")
if show_expired != 1:
one_month_old = (
arrow.now()
.shift(months=current_app.config.get("HIDE_EXPIRED_CERTS_AFTER_MONTHS", -1))
.format("YYYY-MM-DD")
)
query = query.filter(Certificate.not_after > one_month_old)
time_range = args.pop("time_range")
destination_id = args.pop("destination_id")
notification_id = args.pop("notification_id", None)
show = args.pop("show")
# owner = args.pop('owner')
# creator = args.pop('creator') # TODO we should enabling filtering by owner
filt = args.pop("filter")
if filt:
terms = filt.split(";")
term = "%{0}%".format(terms[1])
# Exact matches for quotes. Only applies to name, issuer, and cn
if terms[1].startswith('"') and terms[1].endswith('"'):
term = terms[1][1:-1]
if "issuer" in terms:
# we can't rely on issuer being correct in the cert directly so we combine queries
sub_query = (
database.session_query(Authority.id)
.filter(Authority.name.ilike(term))
.subquery()
)
query = query.filter(
or_(
Certificate.issuer.ilike(term),
Certificate.authority_id.in_(sub_query),
)
)
elif "destination" in terms:
query = query.filter(
Certificate.destinations.any(Destination.id == terms[1])
)
elif "notify" in filt:
query = query.filter(Certificate.notify == truthiness(terms[1]))
elif "active" in filt:
query = query.filter(Certificate.active == truthiness(terms[1]))
elif "cn" in terms:
query = query.filter(
or_(
func.lower(Certificate.cn).like(term.lower()),
Certificate.id.in_(like_domain_query(term)),
)
)
elif "id" in terms:
query = query.filter(Certificate.id == cast(terms[1], Integer))
elif "name" in terms:
query = query.filter(
or_(
func.lower(Certificate.name).like(term.lower()),
Certificate.id.in_(like_domain_query(term)),
func.lower(Certificate.cn).like(term.lower()),
)
)
elif "fixedName" in terms:
# only what matches the fixed name directly if a fixedname is provided
query = query.filter(Certificate.name == terms[1])
else:
query = database.filter(query, Certificate, terms)
if show:
sub_query = (
database.session_query(Role.name)
.filter(Role.user_id == args["user"].id)
.subquery()
)
query = query.filter(
or_(
Certificate.user_id == args["user"].id, Certificate.owner.in_(sub_query)
)
)
if destination_id:
query = query.filter(
Certificate.destinations.any(Destination.id == destination_id)
)
if notification_id:
query = query.filter(
Certificate.notifications.any(Notification.id == notification_id)
)
if time_range:
to = arrow.now().shift(weeks=+time_range).format("YYYY-MM-DD")
now = arrow.now().format("YYYY-MM-DD")
query = query.filter(Certificate.not_after <= to).filter(
Certificate.not_after >= now
)
if current_app.config.get("ALLOW_CERT_DELETION", False):
query = query.filter(Certificate.deleted == false())
result = database.sort_and_page(query, Certificate, args)
return result