src/services/clients/cognito.py (194 lines of code) (raw):

from datetime import datetime from typing import Union import boto3 from commons import ApplicationException, RESPONSE_INTERNAL_SERVER_ERROR, \ RESPONSE_BAD_REQUEST_CODE, get_iso_timestamp from commons.log_helper import get_logger from connections.auth_extension.base_auth_client import BaseAuthClient _LOG = get_logger('cognitoclient') CUSTOM_ROLE_ATTR = 'custom:r8s_role' CUSTOM_CUSTOMER_ATTR = 'custom:customer' CUSTOM_LATEST_LOGIN_ATTR = 'custom:latest_login' PARAM_USER_POOLS = 'UserPools' class CognitoClient(BaseAuthClient): def __init__(self, environment_service): region = environment_service.aws_region() self.client = boto3.client('cognito-idp', region) self.user_pool_name = environment_service.get_user_pool_name() def list_user_pools(self): list_user_pools = self.client.list_user_pools(MaxResults=10) pools = [] while list_user_pools[PARAM_USER_POOLS]: pools.extend(list_user_pools[PARAM_USER_POOLS]) next_token = list_user_pools.get('NextToken') if next_token: list_user_pools = self.client.list_user_pools( MaxResults=10, NextToken=next_token) else: break return pools def list_users(self, attributes_to_get=None): params = dict(UserPoolId=self.__get_user_pool_id()) if attributes_to_get: params['AttributesToGet'] = attributes_to_get return self.client.list_users(**params) def admin_initiate_auth(self, username, password): """ Initiates the authentication flow. Returns AuthenticationResult if the caller does not need to pass another challenge. If the caller does need to pass another challenge before it gets tokens, ChallengeName, ChallengeParameters, and Session are returned. """ user_pool_id = self.__get_user_pool_id() client_id = self.__get_client_id() auth_params = { 'USERNAME': username, 'PASSWORD': password } if self.is_user_exists(username): try: result = self.client.admin_initiate_auth( UserPoolId=user_pool_id, ClientId=client_id, AuthFlow='ADMIN_NO_SRP_AUTH', AuthParameters=auth_params) self.update_latest_login(username) return result except self.client.exceptions.NotAuthorizedException: return None def respond_to_auth_challenge(self, challenge_name): """ Responds to an authentication challenge. """ client_id = self.__get_client_id() self.client.respond_to_auth_challenge(ClientId=client_id, ChallengeName=challenge_name) def sign_up(self, username, password, customer, role): client_id = self.__get_client_id() custom_attr = [{ 'Name': 'name', 'Value': username }, { 'Name': CUSTOM_CUSTOMER_ATTR, 'Value': customer }, { 'Name': CUSTOM_ROLE_ATTR, 'Value': role }] validation_data = [ { 'Name': 'name', 'Value': username } ] return self.client.sign_up(ClientId=client_id, Username=username, Password=password, UserAttributes=custom_attr, ValidationData=validation_data) def set_password(self, username, password, permanent=True): user_pool_id = self.__get_user_pool_id() return self.client.admin_set_user_password(UserPoolId=user_pool_id, Username=username, Password=password, Permanent=permanent) def __get_client_id(self): user_pool_id = self.__get_user_pool_id() client = self.client.list_user_pool_clients( UserPoolId=user_pool_id, MaxResults=1)['UserPoolClients'] if not client: _LOG.error('Application Authentication Service is not configured ' 'properly: no client applications found') raise ApplicationException( code=RESPONSE_INTERNAL_SERVER_ERROR, content='Application Authentication Service is not configured ' 'properly.') return client[0]['ClientId'] def get_user_pool(self, user_pool_name): for pool in self.list_user_pools(): if pool.get('Name') == user_pool_name: return pool['Id'] def get_user(self, username): user = self.is_user_exists(username=username) if user: return user[0] _LOG.error(f'No user with username {username} was found') raise ApplicationException( code=RESPONSE_BAD_REQUEST_CODE, content=f'No user with username {username} was found') def is_user_exists(self, username): user_pool_id = self.__get_user_pool_id() users = self.client.list_users( UserPoolId=user_pool_id, Filter=f'username = "{username}"') return users['Users'] def _get_user_attr(self, user, attr_name, query_user=True): """user attribute can be either a 'username' or a user dict object already fetched from AWS Cognito""" if query_user: user = self.get_user(username=user) for attr in user['Attributes']: if attr['Name'] == attr_name: return attr['Value'] def get_user_role(self, username): return self._get_user_attr(username, CUSTOM_ROLE_ATTR) def update_role(self, username, role): user_pool_id = self.__get_user_pool_id() role_attribute = [ { 'Name': CUSTOM_ROLE_ATTR, 'Value': role } ] self.client.admin_update_user_attributes(UserPoolId=user_pool_id, Username=username, UserAttributes=role_attribute) def update_customer(self, username, customer): user_pool_id = self.__get_user_pool_id() customer_attribute = [ { 'Name': CUSTOM_CUSTOMER_ATTR, 'Value': customer } ] self.client.admin_update_user_attributes( UserPoolId=user_pool_id, Username=username, UserAttributes=customer_attribute) def delete_customer(self, username): user_pool_id = self.__get_user_pool_id() self.client.admin_delete_user_attributes( UserPoolId=user_pool_id, Username=username, UserAttributes=[CUSTOM_CUSTOMER_ATTR]) def get_user_latest_login(self, username): return self._get_user_attr(username, CUSTOM_LATEST_LOGIN_ATTR) def update_latest_login(self, username: str, latest_login: Union[str, datetime, None] = None): latest_login = latest_login or get_iso_timestamp() user_pool_id = self.__get_user_pool_id() if isinstance(latest_login, datetime): latest_login = latest_login.isoformat() latest_login_attribute = [ { 'Name': CUSTOM_LATEST_LOGIN_ATTR, 'Value': latest_login } ] self.client.admin_update_user_attributes( UserPoolId=user_pool_id, Username=username, UserAttributes=latest_login_attribute) def delete_role(self, username): user_pool_id = self.__get_user_pool_id() self.client.admin_delete_user_attributes( UserPoolId=user_pool_id, Username=username, UserAttributeNames=[CUSTOM_ROLE_ATTR]) def __get_user_pool_id(self): user_pools = self.list_user_pools() user_pool = [pool for pool in user_pools if pool['Name'] == self.user_pool_name] if not user_pool: _LOG.error(f'User pool {self.user_pool_name} does not exists') raise ApplicationException( code=RESPONSE_INTERNAL_SERVER_ERROR, content='Application Authentication Service is ' 'not configured properly.') user_pool_id = user_pool[0].get('Id') _LOG.debug(f'Retrieving the user pool with id {user_pool_id}') return user_pool_id def is_system_user_exists(self): """Checks whether user with customer='SYSTEM' already exists""" raise NotImplementedError() def get_system_user(self): """Returns the user with customer='SYSTEM' if exists, else - None""" raise NotImplementedError() def get_user_customer(self, username): return self._get_user_attr(username, CUSTOM_CUSTOMER_ATTR) def delete_user(self, username): return self.client.admin_delete_user( UserPoolId=self.__get_user_pool_id(), Username=username )