modular_sdk/utils/runtime_tracer/generic.py (95 lines of code) (raw):
import json
import re
from datetime import datetime
from modular_sdk.commons import generate_id_hex
from modular_sdk.services.environment_service import EnvironmentService
from modular_sdk.services.events_service import EventsService
from modular_sdk.services.lambda_service import LambdaService
from modular_sdk.services.sqs_service import SQSService
from modular_sdk.utils.runtime_tracer.abstract import AbstractSegmentTracer, \
AbstractSegment
class Segment(AbstractSegment):
def __init__(self, name: str, tracer: AbstractSegmentTracer):
self.name = name
self.started_at = datetime.utcnow()
self.stopped_at = None
self.execution_time = None
self.tracer = tracer
self.is_error = True
def _calculate_execution_time(self):
time_delta = self.stopped_at - self.started_at
return time_delta.seconds
def stop(self):
self.stopped_at = datetime.utcnow()
self.execution_time = self._calculate_execution_time()
self.is_error = False
self.tracer.stop_segment(self)
def error(self):
self.stopped_at = datetime.utcnow()
self.execution_time = self._calculate_execution_time()
self.tracer.stop_segment(self)
class SegmentTracer(AbstractSegmentTracer):
def __init__(self, sqs_service: SQSService,
environment_service: EnvironmentService):
self.sqs_service = sqs_service
self.environment_service = environment_service
self.unprocessed_traces = {}
self.processed_traces = []
def start(self):
name = generate_id_hex()
segment = Segment(
name=name,
tracer=self
)
self.unprocessed_traces[name] = segment
return segment
def build_sqs_message(self, segment: AbstractSegment):
message = {
'application_name': self.environment_service.application(),
'component_name': self.environment_service.component(),
'isError': segment.is_error,
'execution_time': segment.execution_time
}
return message
def save(self, processed_traces):
for segment in processed_traces:
message = self.build_sqs_message(segment=segment)
self.sqs_service.send_message(message=message)
def stop_segment(self, segment: AbstractSegment):
processed_segment = self.unprocessed_traces.pop(segment.name)
self.processed_traces.append(processed_segment)
if not self.unprocessed_traces:
self.save(processed_traces=self.processed_traces)
class ScheduledSegmentTracer(SegmentTracer):
def __init__(self, sqs_service: SQSService, lambda_service: LambdaService,
events_service: EventsService,
environment_service: EnvironmentService):
super(ScheduledSegmentTracer, self).__init__(
sqs_service=sqs_service,
environment_service=environment_service
)
self.lambda_service = lambda_service
self.events_service = events_service
def _get_event_rule(self):
lambda_name = self.environment_service.component()
policy_meta = self.lambda_service.get_policy(name=lambda_name)
policy = json.loads(policy_meta['Policy'])
rule = None
for statement in policy.get('Statement', {}):
rule_arn = statement.get('Condition', {}).get('ArnLike', {}).get(
'AWS:SourceArn', ''
)
if re.match(r"(?:arn:aws:events:[a-z0-9-]+:\d{12}:rule/)(.+)", rule_arn) \
and statement['Action'] == 'lambda:InvokeFunction':
rule = rule_arn.split('/')[-1]
if not rule:
return
rule_meta = self.events_service.describe_rule(name=rule)
rule_expressions = rule_meta['ScheduleExpression']
return rule_expressions
def build_sqs_message(self, segment):
message = super(ScheduledSegmentTracer, self).build_sqs_message(
segment=segment
)
message['cron'] = self._get_event_rule()
return message