modular_sdk/utils/runtime_tracer/abstract.py (34 lines of code) (raw):
from abc import abstractmethod, ABC
from datetime import datetime
from modular_sdk.services.environment_service import EnvironmentService
from modular_sdk.services.sqs_service import SQSService
class AbstractSegment(ABC):
@abstractmethod
def __init__(self, name: str, tracer):
self.name = name
self.started_at = datetime.utcnow()
self.stopped_at = None
self.execution_time = None
self.tracer = tracer
self.is_error = True
@abstractmethod
def stop(self):
pass
@abstractmethod
def error(self):
pass
class AbstractSegmentTracer(ABC):
@abstractmethod
def __init__(self, sqs_service: SQSService,
environment_service: EnvironmentService):
self.sqs_service = sqs_service
self.environment_service = environment_service
@abstractmethod
def start(self):
pass
@abstractmethod
def save(self, processed_traces):
pass
@abstractmethod
def stop_segment(self, segment: AbstractSegment):
pass