in backend/model_garden/management/commands/process_task_statuses.py [0:0]
def handle(self, *args, **kwargs):
try:
labeling_tasks = list(
LabelingTask.objects
.filter(
status__in=(
LabelingTaskStatus.ANNOTATION,
LabelingTaskStatus.VALIDATION,
LabelingTaskStatus.COMPLETED,
),
error__isnull=True,
)
.order_by("updated_at")
[:settings.TASK_STATUSES_WORKER_CHUNK_SIZE],
)
if labeling_tasks:
self._process_labeling_tasks(labeling_tasks=labeling_tasks)
else:
logger.info("No pending labeling tasks found")
except Exception as e:
raise CommandError(f"Failed to process labeling tasks: {e}")