modular_sdk/commons/trace_helper.py (72 lines of code) (raw):
from functools import wraps
from modular_sdk.modular import Modular
from modular_sdk.utils.operation_mode.generic import \
ModularOperationModeManagerService
from modular_sdk.utils.runtime_tracer.generic import SegmentTracer, \
ScheduledSegmentTracer
def __resolve_event(args, kwargs):
if len(args) != 2:
return kwargs['event']
return args[0]
def __resolve_context(args, kwargs):
if len(args) != 2:
return kwargs['context']
return args[1]
def tracer_decorator(is_scheduled=False, is_job=False, component=None):
def real_wrapper(func):
@wraps(func)
def wrapper(*args, **kwargs):
environment_service = Modular().environment_service()
sqs_service = Modular().sqs_service()
job_tracer = None
request_id = None
event = {}
if is_job:
from modular_sdk.utils.job_tracer.generic import \
ModularJobTracer
operation_mode_service = ModularOperationModeManagerService()
job_tracer = ModularJobTracer(
operation_mode_service=operation_mode_service,
environment_service=environment_service,
component=component
)
event = __resolve_event(args, kwargs)
context = __resolve_context(args, kwargs)
request_id = event.get('request_id') or context.aws_request_id
if is_scheduled:
runtime_tracer = ScheduledSegmentTracer(
sqs_service=sqs_service,
lambda_service=Modular().lambda_service(),
events_service=Modular().events_service(),
environment_service=environment_service
)
else:
runtime_tracer = SegmentTracer(
sqs_service=sqs_service,
environment_service=environment_service
)
segment = runtime_tracer.start()
if is_job:
meta = None
dry_run = event.get('dry_run')
if dry_run in ('true', 't', 'y', 'yes') \
or (isinstance(dry_run, bool) and dry_run):
meta = {'message': 'Dry run mode enabled.'}
job_tracer.start(job_id=request_id, meta=meta)
try:
from aws_xray_sdk.core import patch
libs_to_patch = ('boto3', 'pynamodb')
patch(libs_to_patch)
response = func(*args, **kwargs)
except Exception as e:
if is_job:
job_tracer.fail(job_id=request_id, error=e)
segment.error()
raise e
else:
if is_job:
job_tracer.succeed(job_id=request_id, meta=response)
segment.stop()
return response
return wrapper
return real_wrapper