in annotation/annotation/jobs/services.py [0:0]
def set_task_statuses(job: Job, job_new_tasks: List[ManualAnnotationTask]):
"""Set statuses for job tasks."""
# collect all finished annotated pages
annotated_pages = defaultdict(Counter)
for job_task in job_new_tasks:
if (
not job_task.is_validation
and job_task.status == TaskStatusEnumSchema.finished
):
annotated_pages[job_task.file_id].update(Counter(job_task.pages))
for job_task in job_new_tasks:
if job_task.status != TaskStatusEnumSchema.pending:
continue
if (
not job_task.is_validation
or job.validation_type == ValidationSchema.validation_only
):
job_task.status = TaskStatusEnumSchema.ready
continue
finished_pages = annotated_pages.get(job_task.file_id, Counter())
if all(
page in finished_pages.keys()
and finished_pages[page] >= count * job.extensive_coverage
for page, count in Counter(job_task.pages).items()
):
# validation task has all finished annotations
job_task.status = TaskStatusEnumSchema.ready
for page in job_task.pages:
if finished_pages[page] > 0:
finished_pages[page] -= 1
annotated_pages[job_task.file_id] = finished_pages