syndicate/core/resources/step_functions_resource.py (185 lines of code) (raw):

""" Copyright 2018 EPAM Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import time from botocore.exceptions import ClientError from syndicate.commons.log_helper import get_logger from syndicate.core.helper import unpack_kwargs from syndicate.core.resources.base_resource import BaseResource from syndicate.core.resources.helper import (build_description_obj, validate_params) _LOG = get_logger(__name__) class StepFunctionResource(BaseResource): def __init__(self, sf_conn, iam_conn, cw_events_conn, lambda_conn, account_id, region) -> None: self.sf_conn = sf_conn self.iam_conn = iam_conn self.cw_events_conn = cw_events_conn self.lambda_conn = lambda_conn self.account_id = account_id self.region = region def create_state_machine(self, args): return self.create_pool(self._create_state_machine_from_meta, args) def create_activities(self, args): return self.create_pool(self._create_activity_from_meta, args) def remove_state_machines(self, args): result = self.create_pool(self._remove_state_machine, args) if args: time.sleep(60) return result @unpack_kwargs def _remove_state_machine(self, arn, config): sm_name = config['resource_name'] try: executions = self.sf_conn.list_executions_by_status(arn, 'RUNNING') if executions: _LOG.debug('Found {0} running executions ' 'for {1}'.format(len(executions), sm_name)) for execution in executions: self.sf_conn.stop_execution(execution['executionArn']) _LOG.debug('Executions stop initiated') self.sf_conn.delete_state_machine(arn, log_not_found_error=False) _LOG.info('State machine %s was removed', sm_name) return {arn: config} except ClientError as e: exception_type = e.response['Error']['Code'] if exception_type == 'StateMachineDoesNotExist': _LOG.warn('State machine %s is not found', sm_name) return {arn: config} else: raise e def remove_activities(self, args): return self.create_pool(self._remove_activity, args) @unpack_kwargs def _remove_activity(self, arn, config): activity_name = config['resource_name'] try: self.sf_conn.delete_activity(arn, log_not_found_error=False) _LOG.info('State activity %s was removed', activity_name) return {arn: config} except ClientError as e: exception_type = e.response['Error']['Code'] if exception_type == 'ResourceNotFoundException': _LOG.warn('State activity %s is not found', activity_name) return {arn: config} else: raise e def __remove_key_from_dict(self, obj, name): try: del obj[name] except KeyError: pass @unpack_kwargs def _create_state_machine_from_meta(self, name, meta): arn = self._build_sm_arn(name, self.region) response = self.sf_conn.describe_state_machine(arn) if response: _LOG.warn('State machine %s exists', name) return { arn: build_description_obj(response, name, meta) } iam_role = meta['iam_role'] role_arn = self.iam_conn.check_if_role_exists(iam_role) if not role_arn: raise AssertionError( 'IAM role {0} does not exist.'.format(iam_role)) # check resource exists and get arn definition = meta['definition'] definition_copy = definition.copy() for key in definition['States']: definition_meta = definition['States'][key] if definition_meta.get('Lambda'): lambda_name = definition_meta['Lambda'] # alias has a higher priority than version in arn resolving lambda_version = definition_meta.get('Lambda_version') lambda_alias = definition_meta.get('Lambda_alias') lambda_arn = self.resolve_lambda_arn_by_version_and_alias( lambda_name, lambda_version, lambda_alias) self.__remove_key_from_dict(definition_copy['States'][key], 'Lambda') self.__remove_key_from_dict(definition_copy['States'][key], 'Lambda_version') self.__remove_key_from_dict(definition_copy['States'][key], 'Lambda_alias') definition_copy['States'][key]['Resource'] = lambda_arn if definition_meta.get('Activity'): activity_name = definition_meta['Activity'] activity_arn = 'arn:aws:states:{0}:{1}:activity:{2}'.format( self.region, self.account_id, activity_name) activity_info = self.sf_conn.describe_activity( arn=activity_arn) if not activity_info: raise AssertionError('Activity does not exists: %s', activity_name) activity_arn = activity_info['activityArn'] del definition_copy['States'][key]['Activity'] definition_copy['States'][key]['Resource'] = activity_arn machine_info = self.sf_conn.create_state_machine( machine_name=name, role_arn=role_arn, definition=definition_copy, tags=meta.get('tags')) event_sources = meta.get('event_sources') if event_sources: for trigger_meta in event_sources: trigger_type = trigger_meta['resource_type'] func = self.CREATE_TRIGGER[trigger_type] func(name, trigger_meta) _LOG.info('Created state machine %s.', machine_info['stateMachineArn']) return self.describe_step_function(name=name, meta=meta, arn=arn) def describe_step_function(self, name, meta, arn=None): if not arn: arn = self._build_sm_arn(name, self.region) response = self.sf_conn.describe_state_machine(arn) if response: return { arn: build_description_obj(response, name, meta) } return {} def _build_sm_arn(self, name, region): return f'arn:aws:states:{region}:{self.account_id}:stateMachine:{name}' def _create_cloud_watch_trigger_from_meta(self, name, trigger_meta): required_parameters = ['target_rule', 'input', 'iam_role'] validate_params(name, trigger_meta, required_parameters) rule_name = trigger_meta['target_rule'] input = trigger_meta['input'] sf_role = trigger_meta['iam_role'] sf_arn = self._build_sm_arn(name, self.region) sf_description = self.sf_conn.describe_state_machine(arn=sf_arn) if sf_description.get('status') == 'ACTIVE': sf_role_arn = self.iam_conn.check_if_role_exists(sf_role) if sf_role_arn: self.cw_events_conn.add_rule_sf_target(rule_name, sf_arn, input, sf_role_arn) _LOG.info('State machine %s subscribed to cloudwatch rule %s', name, rule_name) CREATE_TRIGGER = { 'cloudwatch_rule_trigger': _create_cloud_watch_trigger_from_meta, 'eventbridge_rule_trigger': _create_cloud_watch_trigger_from_meta } @unpack_kwargs def _create_activity_from_meta(self, name, meta): arn = self.build_activity_arn(name=name) response = self.sf_conn.describe_activity(arn) if response: _LOG.warn('Activity %s exists.', name) return { arn: build_description_obj(response, name, meta) } response = self.sf_conn.create_activity(name=name, tags=meta.get('tags')) _LOG.info('Activity %s is created.', name) return { arn: build_description_obj(response, name, meta) } def describe_activity(self, name, meta): arn = self.build_activity_arn(name=name) response = self.sf_conn.describe_activity(arn=arn) if response: return { arn: build_description_obj(response, name, meta) } return {} def build_activity_arn(self, name): arn = 'arn:aws:states:{0}:{1}:activity:{2}'.format(self.region, self.account_id, name) return arn