modular_sdk/utils/job_tracer/generic.py (76 lines of code) (raw):
from datetime import datetime
from modular_sdk.commons.constants import JOB_RUNNING_STATE, JOB_FAIL_STATE, \
JOB_SUCCESS_STATE
from modular_sdk.commons import ModularException, RESPONSE_BAD_REQUEST_CODE, \
RESPONSE_FORBIDDEN_CODE
from modular_sdk.commons.log_helper import get_logger
from modular_sdk.services.environment_service import EnvironmentService
from modular_sdk.services.job_service import JobService
from modular_sdk.utils.operation_mode.generic import \
ModularOperationModeManagerService
from modular_sdk.utils.job_tracer.abstract import AbstractJobTracer
_LOG = get_logger(__name__)
class ModularJobTracer(AbstractJobTracer):
def __init__(self, operation_mode_service: ModularOperationModeManagerService,
environment_service: EnvironmentService, component=None):
self.operation_mode_service = operation_mode_service
self.environment_service = environment_service
self.application = self.environment_service.application()
self.component = component or self.environment_service.component()
def start(self, job_id, meta=None):
_LOG.debug(f'Going to mark Job {self.component} and {job_id} id as '
f'started')
if meta is None:
meta = {}
# self.is_permitted_to_start()
job = JobService.create(job=self.component, job_id=job_id,
application=self.application,
started_at=datetime.utcnow(),
state=JOB_RUNNING_STATE, meta=meta)
JobService.save(job=job)
def is_permitted_to_start(self):
result = self.operation_mode_service.get_mode(
application_name=self.application
)
if result.get('items', {})[0].get('mode', '') == 'LIVE':
return
unappropriated_mode_message = f'{self.application} can not be traced ' \
f'due to unappropriated mode'
_LOG.error(unappropriated_mode_message)
raise ModularException(
code=RESPONSE_FORBIDDEN_CODE,
content=unappropriated_mode_message
)
@staticmethod
def __get_job(job, job_id):
job_item = JobService.get_by_id(job=job, job_id=job_id)
if job_item.state in ['SUCCESS', 'FAIL']:
job_invalid_state_message = f'Job with {job_id} id and ' \
f'{job} name with state ' \
f'not equal to RUNNING can not be ' \
f'changed'
_LOG.error(job_invalid_state_message)
raise ModularException(
code=RESPONSE_BAD_REQUEST_CODE,
content=job_invalid_state_message
)
return job_item
def fail(self, job_id, error):
_LOG.debug(f'Going to mark Job {self.component} and {job_id} id as '
f'failed')
job_item = self.__get_job(job=self.component, job_id=job_id)
stopped_at = datetime.utcnow()
state = JOB_FAIL_STATE
error_type = error.__class__.__name__
error_reason = error.__str__()
JobService.update(job=job_item, stopped_at=stopped_at, state=state,
error_type=error_type, error_reason=error_reason)
def succeed(self, job_id, meta):
_LOG.debug(f'Going to mark Job {self.component} and {job_id} id as '
f'succeeded')
job_item = self.__get_job(job=self.component, job_id=job_id)
stopped_at = datetime.utcnow()
state = JOB_SUCCESS_STATE
JobService.update(job=job_item, stopped_at=stopped_at, state=state,
meta=meta)
def track_error(self):
# todo submit error to aggregation pipeline
pass