in backend/model_garden/management/commands/process_task_statuses.py [0:0]
def _process_labeling_tasks(self, labeling_tasks: List[LabelingTask]) -> None:
logger.info(f"Processing {len(labeling_tasks)} labeling tasks")
# NOTE: Make sure we get the "oldest" tasks every time, so we need to update
# updated_at to the current time for the current chunk.
LabelingTask.objects.filter(pk__in=[task.pk for task in labeling_tasks]).update(updated_at=timezone.now())
# update labeling tasks statuses
labeling_tasks_to_upload = []
for labeling_task, result_future in self._get_cvat_statuses(labeling_tasks=labeling_tasks):
try:
cvat_task = result_future.result()
except Exception as e:
labeling_task.set_failed(error=f"Failed to get task status: {e}")
else:
if labeling_task.status in (LabelingTaskStatus.ANNOTATION, LabelingTaskStatus.VALIDATION):
cvat_task_status = cvat_task.get('status')
if cvat_task_status is not None and labeling_task.status != cvat_task_status:
labeling_task.update_status(status=cvat_task_status)
if labeling_task.status == LabelingTaskStatus.COMPLETED:
labeling_tasks_to_upload.append(labeling_task)
# upload annotations
for labeling_task, result_future in self._upload_annotations(labeling_tasks=labeling_tasks_to_upload):
try:
result_future.result()
except NoAnnotationException as noAnnotationException:
logger.error(f"{noAnnotationException}")
labeling_task.set_failed(error=f"{noAnnotationException}")
except Exception as e:
logger.error(f"{e}")
labeling_task.set_failed(error=f"{e}")
else:
labeling_task.update_status(status=LabelingTaskStatus.SAVED)