syndicate/connection/eventbridge_scheduler_connection.py (36 lines of code) (raw):

from boto3 import client from botocore.exceptions import ClientError from syndicate.commons.log_helper import get_logger from syndicate.connection.helper import apply_methods_decorator, retry _LOG = get_logger(__name__) @apply_methods_decorator(retry()) class EventBridgeSchedulerConnection(object): """ EventBridge Scheduler Connection class.""" def __init__(self, region=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None): self.client = client('scheduler', region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_session_token=aws_session_token) _LOG.debug('Opened new EventBridge Scheduler connection.') def create_schedule(self, **params): return self.client.create_schedule(**params)['ScheduleArn'] def update_schedule(self, **params): return self.client.update_schedule(**params)['ScheduleArn'] def describe_schedule(self, name, group_name=None): params = {'Name': name} if group_name is not None: params['GroupName'] = group_name try: return self.client.get_schedule(**params) except ClientError as e: if 'ResourceNotFoundException' in str(e): _LOG.warning( f'Cannot find EventBridge schedule with name {name}') return None else: raise e def delete_schedule(self, name, group_name=None, log_not_found_error=True): """ log_not_found_error parameter is needed for proper log handling in the retry decorator """ params = {'Name': name} if group_name is not None: params['GroupName'] = group_name return self.client.delete_schedule(**params)