syndicate/core/resources/eventbridge_scheduler_resource.py (105 lines of code) (raw):

""" Copyright 2018 EPAM Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ from datetime import datetime, timezone from syndicate.commons.log_helper import get_logger from syndicate.core import ClientError from syndicate.core.helper import unpack_kwargs from syndicate.core.helper import dict_keys_to_capitalized_camel_case from syndicate.core.resources.base_resource import BaseResource from syndicate.core.resources.helper import build_description_obj, \ validate_params, assert_possible_values _LOG = get_logger(__name__) REQUIRED_PARAMS = {'name', 'schedule_expression', 'state', 'description', 'flexible_time_window'} def convert_to_datetime(name, date_str): try: if len(date_str) > 10: return datetime.fromisoformat(date_str) else: return datetime.utcfromtimestamp(int(date_str)) except (ValueError, OSError): raise AssertionError( f'Invalid date format: {date_str}. Resource: {name}. Should be ISO8601 or timestamp' ) def prepare_schedule_parameters(meta): name = meta.get('name') validate_params(name, meta, REQUIRED_PARAMS) params = meta.copy() # keys inside "Target" parameters should NOT be changed to PascalCase # syndicate user responsible for providing Target's key-values pairs in proper format target = params.pop('target') params = dict_keys_to_capitalized_camel_case(params) params['Target'] = target assert_possible_values([params.get('State')], ['ENABLED', 'DISABLED']) \ if 'State' in params else None assert_possible_values([params.get('FlexibleTimeWindow').get('Mode')], ['OFF', 'FLEXIBLE']) \ if 'Mode' in params.get('FlexibleTimeWindow') else None if 'StartDate' in params: start_date = convert_to_datetime(name, params.get('StartDate')) if start_date <= datetime.now(timezone.utc): raise ValueError('Start date must be in the future.') if 'EndDate' in params: end_date = convert_to_datetime(name, params.get('EndDate')) if start_date <= datetime.now(timezone.utc): raise ValueError('End date must be in the future.') if 'StartDate' in params and 'EndDate' in params: if start_date >= end_date: raise ClientError('Start date must be earlier than end date.') return params class EventBridgeSchedulerResource(BaseResource): def __init__(self, eventbridge_conn): self.connection = eventbridge_conn def create_schedule(self, args): return self.create_pool(self._create_schedule_from_meta, args) @unpack_kwargs def _create_schedule_from_meta(self, name, meta): _LOG.debug(f'Creating schedule {name}') check_params = meta['schedule_content'] check_params['name'] = name params = prepare_schedule_parameters(check_params) group_name = check_params.get('group_name') response = self.connection.describe_schedule(name, group_name) if response: _arn = response['Arn'] return self.describe_schedule(name, group_name, meta, _arn, response) arn = self.connection.create_schedule(**params) _LOG.info(f'Created EventBridge schedule {arn}') return self.describe_schedule(name=name, group_name=group_name, meta=meta, arn=arn) def update_schedule(self, args): return self.create_pool(self._update_schedule_from_meta, args) @unpack_kwargs def _update_schedule_from_meta(self, name, meta, context): """ Create EventBridge Schedule from meta description after parameter validation. :type name: str :type meta: dict """ check_params = meta['schedule_content'] check_params['name'] = name group_name = check_params.get('group_name') response = self.connection.describe_schedule(name, group_name) if not response: raise AssertionError(f'{name} schedule does not exist.') params = prepare_schedule_parameters(check_params) _arn = response['Arn'] arn = self.connection.update_schedule(**params) _LOG.info(f'Updated EventBridge schedule {arn}') return self.describe_schedule(name=name, group_name=group_name, meta=meta, arn=arn) def describe_schedule(self, name, group_name, meta, arn, response=None): if not response: response = self.connection.describe_schedule(name, group_name) if response: return { arn: build_description_obj(response, name, meta) } return {} def remove_schedule(self, args): return self.create_pool(self._remove_schedule, args) @unpack_kwargs def _remove_schedule(self, arn, config): name = config['resource_name'] try: group_name = config['resource_meta']['schedule_content'].get( 'group_name') except: group_name = None self.connection.delete_schedule(name=name, group_name=group_name, log_not_found_error=False) return {arn: config}