def _create_graphql_api_from_meta()

in syndicate/core/resources/appsync_resource.py [0:0]


    def _create_graphql_api_from_meta(self, name, meta):
        """ Create GraphQL API from meta description.

        :type name: str
        :type meta: dict
        """
        extract_to = None
        validate_params(name, meta, API_REQUIRED_PARAMS)

        api = self.appsync_conn.get_graphql_api_by_name(name)
        if api:
            _LOG.warning(f'AppSync API {name} already exists.')
            return self.describe_graphql_api(
                name=name, meta=meta, api_id=api['apiId'])

        archive_path = meta.get('deployment_package')
        if archive_path:
            extract_to = self._extract_zip(archive_path, name)
        auth_type = meta.get('primary_auth_type')
        extra_auth_types = meta.get('extra_auth_types', [])
        lambda_auth_config = meta.get('lambda_authorizer_config', {})
        user_pool_config = meta.get('user_pool_config', {})

        if auth_type == AWS_LAMBDA_TYPE:
            lambda_region = lambda_auth_config.pop(AWS_REGION_PARAMETER, None)
            lambda_name = lambda_auth_config.pop('resource_name', None)
            if not lambda_region or not lambda_name:
                lambda_auth_config = {}
                USER_LOG.error(
                    f"Authorization type '{auth_type}' can't be configured "
                    f"for AppSync '{name}' because lambda resource name or "
                    f"aws region is not specified")
            else:
                lambda_arn = self.build_lambda_arn(
                    region=lambda_region,
                    name=lambda_name)
                lambda_auth_config['authorizer_uri'] = lambda_arn

        if auth_type == AWS_CUP_TYPE:
            cup_name = user_pool_config.pop('resource_name', None)
            cup_id = self.cup_conn.if_pool_exists_by_name(cup_name)
            if cup_id:
                user_pool_config['user_pool_id'] = cup_id
            else:
                user_pool_config = {}
                USER_LOG.error(
                    f"Authorization type '{auth_type}' can't be configured "
                    f"for AppSync '{name}' because Cognito User Pool "
                    f"{cup_name} not found")

        updated_extra_auth_types, is_extra_auth_api_key = \
            self._process_extra_auth(extra_auth_types, name)

        api_id = self.appsync_conn.create_graphql_api(
            name, auth_type=auth_type, tags=meta.get('tags'),
            user_pool_config=user_pool_config,
            open_id_config=meta.get('open_id_config'),
            lambda_auth_config=lambda_auth_config,
            log_config=self._resolve_log_config(
                meta.get('log_config', {}), name
            ),
            xray_enabled=meta.get('xray_enabled'),
            extra_auth_types=updated_extra_auth_types)

        if auth_type == 'API_KEY' or is_extra_auth_api_key:
            api_key_expiration = meta.get('api_key_expiration_days', 7)
            now = time.time()
            api_key_expires = \
                int(now + api_key_expiration * ONE_DAY_IN_SECONDS)
            self.appsync_conn.create_api_key(api_id, expires=api_key_expires)

        if schema_path := meta.get('schema_path'):
            schema_full_path = build_path(extract_to, schema_path)
            if not extract_to or not os.path.exists(schema_full_path):
                raise AssertionError(
                    f'\'{schema_full_path}\' file not found for '
                    f'AppSync \'{name}\'')

            with open(schema_full_path, 'r', encoding='utf-8') as file:
                schema_definition = file.read()

            status, details = \
                self.appsync_conn.create_schema(api_id, schema_definition)
            if status != 'SUCCESS':
                error_message = (
                    f"An error occurred when creating schema. "
                    f"Operation status: '{status}'. ")
                if details:
                    error_message += f"Details: '{details}'"
                raise AssertionError(error_message)
            else:
                _LOG.info(
                    f"Schema of the AppSync '{name}' created successfully")

        data_sources_meta = meta.get('data_sources', [])
        for data_source_meta in data_sources_meta:
            params = self._build_data_source_params_from_meta(data_source_meta)
            if not params:
                continue
            self.appsync_conn.create_data_source(api_id, **params)

        functions_config = []
        functions_meta = meta.get('functions', [])
        for func_meta in functions_meta:
            params = self._build_function_params_from_meta(
                func_meta, extract_to)
            if not params:
                continue
            func_config = self.appsync_conn.create_function(api_id, params)
            functions_config.append(func_config)

        resolvers_meta = meta.get('resolvers', [])
        for resolver_meta in resolvers_meta:
            params = self._build_resolver_params_from_meta(
                resolver_meta, extract_to, functions_config)
            if not params:
                continue
            self.appsync_conn.create_resolver(api_id, **params)

        if extract_to:
            shutil.rmtree(extract_to, ignore_errors=True)
        _LOG.info(f'Created AppSync GraphQL API {api_id}')
        return self.describe_graphql_api(name=name, meta=meta, api_id=api_id)