in lemur/common/celery.py [0:0]
def rotate_all_pending_endpoints():
"""."""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
logger = logging.getLogger(function)
task_id = None
if celery.current_task:
task_id = celery.current_task.request.id
log_data = {
"task_id": task_id,
}
if task_id and is_task_active(function, task_id, None):
logger.debug("Skipping task: Task is already active", extra=log_data)
return
pending_endpoints = endpoint_service.get_all_pending_rotation()
rotate_endpoint_tasks = []
for endpoint in pending_endpoints:
new_cert = endpoint.certificate.replaced[0]
# verify that the certificate has been uploaded before rotating the endpoints
if not all(
[dest.plugin.verify(new_cert.name, dest.options) for dest in new_cert.destinations]
):
logger.warning(
"Certificate has not been uploaded to all destinations, skipping rotate"
)
continue
project, lb, old_cert_name = endpoint.name.split("/")
task = rotate_endpoint.s(
endpoint.id,
lb=lb,
project=project,
new_cert=new_cert.name,
old_cert=old_cert_name,
)
rotate_endpoint_tasks.append(task)
logger.info(
f"Creating tasks to rotate {len(rotate_endpoint_tasks)} endpoints."
)
# create task group and skew execution
skew_config = current_app.config.get("CELERY_ROTATE_ENDPOINT_SKEW", {})
start = skew_config.get("start", 0) # seconds until execution of first task
stop = skew_config.get(
"stop", None
) # maximum number of seconds to wait for a task
step = skew_config.get(
"step", 10
) # number of seconds to wait until next task is executed (stop might override this)
logger.debug(
f"Scheduling endpoint rotations (start={start}, stop={stop}, step={step})"
)
g = group(rotate_endpoint_tasks).skew(start, stop, step)
g.apply_async()