docker/services/user_service.py (62 lines of code) (raw):

from commons import ApplicationException, \ build_response, RESPONSE_BAD_REQUEST_CODE from commons.log_helper import get_logger from services.clients.cognito import CognitoClient _LOG = get_logger('cognito-client') class CognitoUserService: def __init__(self, client: CognitoClient): self.client: CognitoClient = client def save(self, username, password, role): _LOG.debug(f'Validating password for user {username}') errors = self.__validate_password(password) if errors: return build_response( code=RESPONSE_BAD_REQUEST_CODE, content='; '.join(errors)) if self.client.is_user_exists(username): raise ApplicationException( code=RESPONSE_BAD_REQUEST_CODE, content=f'The user with name {username} already exists.') _LOG.debug(f'Creating the user with username {username}') self.client.sign_up(username=username, password=password, role=role) _LOG.debug(f'Setting the password for the user {username}') self.client.set_password(username=username, password=password) def get_user(self, user_id): if isinstance(self.client, CognitoClient): return self.client.get_user(user_id)['Username'] return self.client.get_user(user_id) def get_user_role_name(self, user): return self.client.get_user_role(user) @staticmethod def __validate_password(password): errors = [] upper = any(char.isupper() for char in password) numeric = any(char.isdigit() for char in password) symbol = any(not char.isalnum() for char in password) if not upper: errors.append('Password must have uppercase characters') if not numeric: errors.append('Password must have numeric characters') if not symbol: errors.append('Password must have symbol characters') if len(password) < 8: errors.append(f'Invalid length. Valid min length: 8') if errors: return errors def initiate_auth(self, username, password): return self.client.admin_initiate_auth(username=username, password=password) def respond_to_auth_challenge(self, challenge_name): return self.client.respond_to_auth_challenge( challenge_name=challenge_name) def update_role(self, username, role): self.client.update_role(username=username, role=role) def is_user_exists(self, username): return self.client.is_user_exists(username) def delete_role(self, username): self.client.delete_role(username=username) def is_system_user_exists(self): return self.client.is_system_user_exists() def get_system_user(self): return self.client.get_system_user()