modular_sdk/services/job_service.py (72 lines of code) (raw):
from datetime import datetime
from typing import Optional, List
from pynamodb.exceptions import DoesNotExist
from modular_sdk.commons import generate_id
from modular_sdk.commons import ModularException, \
RESPONSE_RESOURCE_NOT_FOUND_CODE
from modular_sdk.commons.log_helper import get_logger
from modular_sdk.models.job import Job
_LOG = get_logger(__name__)
class JobService:
@staticmethod
def create(job: str, job_id: str, application: str, started_at: datetime,
state: str, stopped_at: Optional[datetime] = None,
error_type: Optional[str] = None,
error_reason: Optional[str] = None,
meta: Optional[dict] = None) -> Job:
job_id = job_id or generate_id()
return Job(job=job, job_id=job_id, application=application,
started_at=started_at, state=state, stopped_at=stopped_at,
error_type=error_type, error_reason=error_reason, meta=meta)
@staticmethod
def get_by_id(job: str, job_id: str) -> Optional[Job]:
try:
job_item = Job.get(hash_key=job, range_key=job_id)
except DoesNotExist:
job_does_not_exist_message = f'Job with {job} name and {job_id} ' \
f'id does not exists'
_LOG.error(job_does_not_exist_message)
raise ModularException(
code=RESPONSE_RESOURCE_NOT_FOUND_CODE,
content=job_does_not_exist_message
)
return job_item
@staticmethod
def list(job: str) -> List[Job]:
jobs = Job.query(hash_key=job)
return list(jobs)
@staticmethod
def list_within_daterange(job: str, start_date: datetime,
end_date: datetime) -> List[Job]:
jobs = Job.job_started_at_index.query(
hash_key=job,
range_key_condition=Job.started_at.between(start_date, end_date)
)
return list(jobs)
@staticmethod
def save(job: Job):
job.save()
@staticmethod
def update(job: Job, started_at: Optional[datetime] = None,
state: Optional[str] = None,
stopped_at: Optional[datetime] = None,
error_type: Optional[str] = None,
error_reason: Optional[str] = None,
meta: Optional[dict] = None):
attributes = {
'started_at': started_at,
'state': state,
'stopped_at': stopped_at,
'error_type': error_type,
'error_reason': error_reason,
'meta': meta
}
actions = [
getattr(Job, attr).set(value or getattr(job, attr))
for attr, value in attributes.items()
if value or getattr(job, attr)
]
job.update(actions=actions)
@staticmethod
def get_dto(Job: Job) -> dict:
return Job.get_json()