def _get_event_rule()

in modular_sdk/utils/runtime_tracer/generic.py [0:0]


    def _get_event_rule(self):
        lambda_name = self.environment_service.component()
        policy_meta = self.lambda_service.get_policy(name=lambda_name)
        policy = json.loads(policy_meta['Policy'])
        rule = None
        for statement in policy.get('Statement', {}):
            rule_arn = statement.get('Condition', {}).get('ArnLike', {}).get(
                'AWS:SourceArn', ''
            )
            if re.match(r"(?:arn:aws:events:[a-z0-9-]+:\d{12}:rule/)(.+)", rule_arn) \
                    and statement['Action'] == 'lambda:InvokeFunction':
                rule = rule_arn.split('/')[-1]
        if not rule:
            return

        rule_meta = self.events_service.describe_rule(name=rule)
        rule_expressions = rule_meta['ScheduleExpression']
        return rule_expressions