syndicate/connection/cognito_identity_connection.py (101 lines of code) (raw):
"""
Copyright 2018 EPAM Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from boto3 import client
from syndicate.commons.log_helper import get_logger
from syndicate.connection.helper import apply_methods_decorator, retry
from syndicate.connection.iam_connection import IAMConnection
_LOG = get_logger(__name__)
@apply_methods_decorator(retry())
class CognitoIdentityConnection(object):
""" Cognito identity connection class."""
def __init__(self, region=None, aws_access_key_id=None,
aws_secret_access_key=None, aws_session_token=None):
self.region = region,
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.aws_session_token = aws_session_token
self.client = client('cognito-identity', region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token)
_LOG.debug('Opened new Cognito identity connection.')
def create_identity_pool(self, pool_name, provider_name=None,
allow_unauthenticated=False, login_providers=None,
open_id_connect_provider_arns=None,
cognito_identity_providers=None,
saml_provider_arns=None,
tags=None):
""" Crete Cognito identity pool and get identity pool id.
:type pool_name: str
:type provider_name: str ('login.provider.com')
:type allow_unauthenticated: bool
:type login_providers: dict
:type open_id_connect_provider_arns: list
:type cognito_identity_providers: list
:type saml_provider_arns: list
:type tags: dict
"""
params = dict(IdentityPoolName=pool_name,
AllowUnauthenticatedIdentities=allow_unauthenticated)
if provider_name:
params['DeveloperProviderName'] = provider_name
if login_providers:
params['SupportedLoginProviders'] = login_providers
if open_id_connect_provider_arns:
params['OpenIdConnectProviderARNs'] = open_id_connect_provider_arns
if cognito_identity_providers:
params['CognitoIdentityProviders'] = cognito_identity_providers
if saml_provider_arns:
params['SamlProviderARNs'] = saml_provider_arns
if tags:
params['IdentityPoolTags'] = tags
response = self.client.create_identity_pool(**params)
return response.get('IdentityPoolId')
def set_role(self, identity_pool_id, authenticated_role_name,
unauthenticated_role_name=None):
""" Set role for cognito identity pool authenticated and
unauthenticated users.
:type identity_pool_id: str
:type authenticated_role_name: str
:type unauthenticated_role_name: str
"""
if not (authenticated_role_name or unauthenticated_role_name):
return
iam_conn = IAMConnection(None, self.aws_access_key_id,
self.aws_secret_access_key,
self.aws_session_token)
params = dict(IdentityPoolId=identity_pool_id, Roles={})
if authenticated_role_name:
auth_role_arn = iam_conn.check_if_role_exists(
authenticated_role_name)
if auth_role_arn:
params['Roles']['authenticated'] = auth_role_arn
if unauthenticated_role_name:
unauth_role_arn = iam_conn.check_if_role_exists(
unauthenticated_role_name)
if unauth_role_arn:
params['Roles']['unauthenticated'] = unauth_role_arn
self.client.set_identity_pool_roles(**params)
def list_existing_pools(self):
""" Get list of existing identity pools."""
existing_pools = []
identity_pools = self.client.list_identity_pools(MaxResults=60)
existing_pools.extend(identity_pools.get('IdentityPools'))
token = identity_pools.get('NextToken')
while token:
identity_pools = self.client.list_identity_pools(MaxResults=60,
NextToken=token)
existing_pools.extend(identity_pools.get('IdentityPools'))
token = identity_pools['NextToken']
return existing_pools
def if_pool_exists_by_name(self, pool_name):
""" Check if pool exists by name.
:type pool_name: str
"""
pools = self.list_existing_pools()
if pools:
for each in pools:
if each['IdentityPoolName'] == pool_name:
return each['IdentityPoolId']
def describe_identity_pool(self, identity_pool_id):
return self.client.describe_identity_pool(
IdentityPoolId=identity_pool_id)
def remove_identity_pool(self, identity_pool_id, log_not_found_error=True):
""" Remove identity pool by id.
:type identity_pool_id: str
:type log_not_found_error: boolean, parameter is needed for proper log
handling in the retry decorator
"""
self.client.delete_identity_pool(IdentityPoolId=identity_pool_id)
def list_all_identities_ids_in_pool(self, identity_pool_name):
""" Lists all identities ids in identity pool"""
pool_id = self.if_pool_exists_by_name(identity_pool_name)
if pool_id:
response = self.client.list_identities(IdentityPoolId=pool_id,
MaxResults=60)
identities_list = response['Identities']
token = response.get('NextToken')
while token:
response = self.client.list_identities(IdentityPoolId=pool_id,
MaxResults=60)
identities_list.append(response['Identities'])
token = response['NextToken']
identity_ids_list = []
for identity in identities_list:
identity_ids_list.append(identity.get('IdentityId'))
return identity_ids_list
def remove_specified_identities_in_pool(self, identities_to_delete):
""" Removes specified identities from specified pool"""
response = self.client.delete_identities(
IdentityIdsToDelete=identities_to_delete)
return response