syndicate/connection/cognito_identity_provider_connection.py (159 lines of code) (raw):
"""
Copyright 2018 EPAM Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from boto3 import client
from syndicate.commons.log_helper import get_logger
from syndicate.connection.helper import apply_methods_decorator, retry
_LOG = get_logger(__name__)
@apply_methods_decorator(retry())
class CognitoIdentityProviderConnection(object):
""" Cognito identity provider connection class."""
def __init__(self, client_config, region=None, aws_access_key_id=None,
aws_secret_access_key=None, aws_session_token=None):
self.region = region
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.aws_session_token = aws_session_token
self.client = client('cognito-idp', region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token,
config=client_config)
_LOG.debug('Opened new Cognito identity connection.')
def create_user_pool(self, pool_name, auto_verified_attributes=None,
sms_configuration=None, username_attributes=None,
policies=None, tags=None):
"""
Crete Cognito user pool and get user pool id.
"""
params = dict(PoolName=pool_name)
if auto_verified_attributes:
params['AutoVerifiedAttributes'] = auto_verified_attributes
if sms_configuration:
params['SmsConfiguration'] = sms_configuration
if username_attributes:
params['UsernameAttributes'] = username_attributes
if policies:
params['Policies'] = policies
if tags:
params['UserPoolTags'] = tags
response = self.client.create_user_pool(**params)
return response['UserPool'].get('Id')
def create_user_pool_client(
self, user_pool_id, client_name, generate_secret=True,
refresh_token_validity=None,
read_attributes=None,
write_attributes=None, explicit_auth_flows=None,
supported_identity_providers=None,
callback_urls=None, logout_urls=None, default_redirect_uri=None,
allowed_oauth_flows=None, allowed_oauth_scopes=None,
allowed_oauth_flows_user_pool_client=None,
analytics_configuration=None, prevent_user_existence_errors=None,
enable_token_revocation=None):
params = dict(UserPoolId=user_pool_id, ClientName=client_name,
GenerateSecret=generate_secret)
if refresh_token_validity:
params.update(RefreshTokenValidity=refresh_token_validity)
if read_attributes:
params.update(ReadAttributes=read_attributes)
if write_attributes:
params.update(WriteAttributes=write_attributes)
if explicit_auth_flows:
params.update(ExplicitAuthFlows=explicit_auth_flows)
if supported_identity_providers:
params.update(
SupportedIdentityProviders=supported_identity_providers)
if callback_urls:
params.update(CallbackURLs=callback_urls)
if logout_urls:
params.update(LogoutURLs=logout_urls)
if default_redirect_uri:
params.update(DefaultRedirectURI=default_redirect_uri)
if allowed_oauth_flows:
params.update(AllowedOAuthFlows=allowed_oauth_flows)
if allowed_oauth_scopes:
params.update(AllowedOAuthScopes=allowed_oauth_scopes)
if allowed_oauth_flows_user_pool_client:
params.update(AllowedOAuthFlowsUserPoolClient=
allowed_oauth_flows_user_pool_client)
if analytics_configuration:
params.update(AnalyticsConfiguration=analytics_configuration)
if prevent_user_existence_errors:
params.update(
PreventUserExistenceErrors=prevent_user_existence_errors)
if enable_token_revocation:
params.update(EnableTokenRevocation=enable_token_revocation)
response = self.client.create_user_pool_client(**params)
return response['UserPoolClient'].get('ClientId')
def list_user_pool_clients(self, cup_id):
response = self.client.list_user_pool_clients(
UserPoolId=cup_id,
MaxResults=5
)
client_ids = [cup_client['ClientId'] for
cup_client in response['UserPoolClients']]
next_token = response.get('NextToken')
while next_token:
response = self.client.list_user_pool_clients(
UserPoolId=cup_id,
MaxResults=5,
NextToken=next_token
)
client_ids.extend([cup_client['ClientId'] for
cup_client in response['UserPoolClients']])
next_token = response.get('NextToken')
return client_ids
def if_cup_client_exist(self, cup_name):
cup_id = self.if_pool_exists_by_name(cup_name)
if not cup_id:
return
client_ids = self.list_user_pool_clients(cup_id)
if len(client_ids) == 1:
return client_ids[0]
if len(client_ids) > 1:
_LOG.warn(f'Client ID of Cognito User Pool "{cup_name}" can\'t be '
f'identified unambiguously because there is more than '
f'one client in the Cognito User Pool. Determined IDs: '
f'"{client_ids}"')
else:
_LOG.warn(f'Clients not found in the Cognito User Pool with the '
f'name "{cup_name}"')
def if_pool_exists_by_name(self, user_pool_name):
ids = []
paginator = self.client.get_paginator('list_user_pools')
response = paginator.paginate(
PaginationConfig={
'MaxItems': 60,
'PageSize': 10
}
)
for page in response:
ids.extend(
[user_pool['Id'] for user_pool in page['UserPools'] if
user_pool['Name'] == user_pool_name]
)
next_token = response.resume_token
while next_token:
response = paginator.paginate(
PaginationConfig={
'MaxItems': 60,
'PageSize': 10,
'StartingToken': next_token
}
)
for page in response:
ids.extend(
[user_pool['Id'] for user_pool in page['UserPools'] if
user_pool['Name'] == user_pool_name]
)
next_token = response.resume_token
if len(ids) == 1:
return ids[0]
if len(ids) > 1:
_LOG.warn(f'Cognito User Pool can\'t be identified unambiguously '
f'because there is more than one resource with the name '
f'"{user_pool_name}" in the region {self.region}. '
f'Determined IDs: "{ids}"')
else:
_LOG.warn(f'Cognito User Pool with the name "{user_pool_name}" '
f'not found in the region {self.region}')
def describe_user_pool(self, user_pool_id):
return self.client.describe_user_pool(UserPoolId=user_pool_id)
def remove_user_pool(self, user_pool_id, log_not_found_error=True):
"""
Removes user pool by id.
:type user_pool_id: str
:type log_not_found_error: boolean, parameter is needed for proper log
handling in the retry decorator
"""
self.client.delete_user_pool(UserPoolId=user_pool_id)
def add_custom_attributes(self, user_pool_id, attributes):
self.client.add_custom_attributes(UserPoolId=user_pool_id,
CustomAttributes=attributes)