libs/google/admin.py (320 lines of code) (raw):

import json from google.controller import GoogleApiController from helper_functions import HelperFunctions class GoogleAdminApi(GoogleApiController): def __init__(self, oauth, config=None): self.oauth = oauth self.config = config self.service = self._get_service("admin", "directory_v1") def get_user_name(self, user_key): """ Return a user's name object. :param user_key: :return: name object """ try: r = json.loads(self.call_google_api(service=self.service, api_resource="users", api_method="get", response_field="name", userKey=user_key)) return r except(ValueError, KeyError, TypeError): return None def is_suspended(self, user_key): """ Check if user is suspended. :param user_key: :return: bool, True is user is suspended, False otherwise. """ try: r = json.loads(self.call_google_api(service=self.service, api_resource="users", api_method="get", response_field="suspended", userKey=user_key)) return r except(ValueError, KeyError, TypeError): return None def suspend(self, user_key): """ Suspends a user. :param user_key: userKey :return: bool Note: When suspending a user, we expect the response to be True. """ result = True try: r = json.loads(self.call_google_api(service=self.service, api_resource="users", api_method="update", response_field="suspended", userKey=user_key, body={"suspended": True})) if r is True: result = True except(ValueError, KeyError, TypeError): result = False return result def un_suspend(self, user_key): """ Un-suspends a user. :param user_key: userKey :return: bool Note: When un-suspending a user, we expect the response to be False. """ try: r = json.loads(self.call_google_api(service=self.service, api_resource="users", api_method="update", response_field="suspended", userKey=user_key, body={"suspended": False})) if r is False: return True else: return False except(ValueError, KeyError, TypeError): return False def delete_user(self, user_key): """ Deletes a user. :param user_key: userKey :return: bool Note: When successful, this request returns an empty body. """ r = self.call_google_api(service=self.service, api_resource="users", api_method="delete", response_field=None, userKey=user_key) if r == "": return True else: return False def reset_password(self, user_key): """ Resets a user's password. :param user_key: userKey :return: bool Note: The password field is always returned empty. """ result = True passwd = HelperFunctions().hash_passwd() try: r = json.loads(self.call_google_api(service=self.service, api_resource="users", api_method="update", response_field=None, userKey=user_key, body={"password": passwd, "hashFunction": "SHA-1"})) if "password" in r: if r["password"] == "": result = True except(ValueError, KeyError, TypeError): result = False return result def list_asps(self, user_key): """ Lists a user's ASPs. :param user_key: userKey :return: list of ASP CodeIds """ try: code_ids = [] r = json.loads(self.call_google_api(service=self.service, api_resource="asps", api_method="list", response_field="items", userKey=user_key)) if r is not None: for asp in r: code_ids.append(asp["codeId"]) return code_ids except(ValueError, KeyError, TypeError): return False def delete_asp(self, user_key, code_id): """ Deletes a user's ASP. :param user_key: userKey :param code_id: codeId :return: bool Note: When successful, this request returns an empty body. """ try: r = json.loads(self.call_google_api(service=self.service, api_resource="asps", api_method="delete", response_field=None, userKey=user_key, codeId=code_id)) if r == "": return True else: return False except(ValueError, KeyError, TypeError): return False def delete_asps(self, user_key): """ Deletes all ASPs from a user account. :param user_key: userKey :return: dict of codeId,bool """ try: results = {} code_ids = self.list_asps(user_key=user_key) if code_ids is not None: for code_id in code_ids: result = self.delete_asp(user_key=user_key, code_id=code_id) results[code_id] = result if result is False: print("Error deleting ASP codeId: %s" % code_id) return results except(ValueError, KeyError, TypeError): return False def list_tokens(self, user_key): """ Lists a user's OAuth Tokens. :param user_key: userKey :return: list of OAuth Token clientIds """ try: client_ids = [] r = json.loads(self.call_google_api(service=self.service, api_resource="tokens", api_method="list", response_field="items", userKey=user_key)) if r is not None: for token in r: client_ids.append(token["clientId"]) return client_ids except(ValueError, KeyError, TypeError): return None def delete_token(self, user_key, client_id): """ Deletes a user's OAuth Token. :param user_key: userKey :param client_id: clientId :return: bool Note: When successful, this request returns an empty body. """ try: r = json.loads(self.call_google_api(service=self.service, api_resource="tokens", api_method="delete", response_field=None, userKey=user_key, clientId=client_id)) if r == "": return True else: return False except(ValueError, KeyError, TypeError): return False def delete_tokens(self, user_key): """ Deletes all OAuth Tokens from a user account. :param user_key: userKey :return: dict of clientId,bool """ try: results = {} client_ids = self.list_tokens(user_key=user_key) if client_ids is not None: for client_id in client_ids: result = self.delete_token(user_key=user_key, client_id=client_id) results[client_id] = result if result is False: print("Error deleting OAuth Token clientId: %s" % client_id) return results except(ValueError, KeyError, TypeError): return False def list_backup_codes(self, user_key): """ Lists a users backup codes. :param user_key: userKey :return: list Note: When successful, this request returns an empty body. """ try: backup_codes = [] r = json.loads(self.call_google_api(service=self.service, api_resource="verificationCodes", api_method="list", response_field="items", userKey=user_key)) if r is not None: for code in r: backup_codes.append(code['verificationCode']) return backup_codes except(ValueError, KeyError, TypeError): return False def invalidate_backup_codes(self, user_key): """ Invalidates a users backup codes. :param user_key: userKey :return: bool Note: When successful, this request returns an empty body. """ if self.is_suspended(user_key): self.un_suspend(user_key) try: r = json.loads(self.call_google_api(service=self.service, api_resource="verificationCodes", api_method="invalidate", response_field=None, userKey=user_key)) if r == "": return True else: return False except(ValueError, KeyError, TypeError): return False def generate_backup_codes(self, user_key): """ Generates new user backup codes. :param user_key: userKey :return: bool Note: When successful, this request returns an empty body. """ try: r = json.loads(self.call_google_api(service=self.service, api_resource="verificationCodes", api_method="generate", response_field=None, userKey=user_key)) if r == "": return True else: return False except(ValueError, KeyError, TypeError): return False def org_unit_change(self, user_key): """ Moves user to offboarded OrgUnit. :param user_key: userKey :param org_unit_path: orgUnitPath :return: bool Note: When successful, this request returns None. """ org_unit_path = self.config["offboarded_ou"] try: r = json.loads(self.call_google_api(service=self.service, api_resource="users", api_method="update", response_field="primaryEmail", userKey=user_key, body={"orgUnitPath": org_unit_path})) if r == user_key: return True else: return False except(ValueError, KeyError, TypeError): return False def org_unit_reset(self, user_key): """ Moves a user back to the default OrgUnit. :param user_key: userKey :return: bool Note: When successful, this request returns None. """ try: r = json.loads(self.call_google_api(service=self.service, api_resource="users", api_method="update", response_field="primaryEmail", userKey=user_key, body={"orgUnitPath": "/"})) if r == user_key: return True else: return False except(ValueError, KeyError, TypeError): return False def group_member_list(self, group_key): """ Lists members of a group. :param group_key: groupKey :return: list Note: Returns a list of group member email addresses. """ try: group_members = [] r = json.loads(self.call_google_api(service=self.service, api_resource="members", api_method="list", response_field="members", groupKey=group_key)) for user in r: group_members.append(user['email']) return group_members except TypeError: return None def group_member_delete(self, group_key, user_key): """ Removes member of a group. :param group_key: groupKey :param user_key: userKey :return: Bool Note: When successful, this request returns an empty body. """ r = json.loads(self.call_google_api(service=self.service, api_resource="members", api_method="delete", response_field=None, groupKey=group_key, memberKey=user_key)) if r == "": return True else: return False def two_step_status(self, user_key): """ Returns the 2-step verification status of a user. :param user_key: userKey :return: Boolean Note: True if user is enrolled in 2-step verification """ r = json.loads(self.call_google_api(service=self.service, api_resource="users", api_method="get", response_field="isEnrolledIn2Sv", userKey=user_key)) return r def get_user_last_login(self, user_key): """ Returns the last sign-in dates of a user. :param user_key: userKey :return: datetime object Note: Returns the late time a user logged in. """ r = json.loads(self.call_google_api(service=self.service, api_resource="users", api_method="get", response_field="lastLoginTime", userKey=user_key)) return r def get_user_alias(self, user_key): """ Returns any alias associated with a user. :param user_key: user_key :return: alias object Note: Returns a list of aliases. """ r = json.loads(self.call_google_api(service=self.service, api_resource="users", api_method="get", response_field="aliases", userKey=user_key)) return r def add_user_alias(self, user_key, alias): """ Adds an alias for a user. :param user_key: user_key :param alias: alias :return: Boolean Note: Returns if alias added successfully. """ r = json.loads(self.call_google_api(service=self.service, api_resource="users.aliases", api_method="insert", response_field="alias", userKey=user_key, body={"alias": alias})) if r == alias: return True else: return False def delete_user_alias(self, user_key, alias): """ Deletes an alias from a user. :param user_key: user_key :param alias: alias :return: Boolean Note: When successful, this request returns empty. """ r = json.loads(self.call_google_api(service=self.service, api_resource="users.aliases", api_method="delete", response_field=None, userKey=user_key, alias=alias)) if r == "": return True else: return False def create_user(self, user_key, last_name, first_name): """ Creates a user account. :param user_key: user_key :param last_name: last_name :param first_name: first_name :return: user object Note: When successful this returns a user object. """ passwd = HelperFunctions().hash_passwd() user_body = { "name": { "familyName": last_name, "givenName": first_name, }, "password": passwd, "hashFunction": "SHA-1", "primaryEmail": user_key, } r = json.loads(self.call_google_api(service=self.service, api_resource="users", api_method="insert", response_field=None, body=user_body)) return r