okta/AuthClient.py (134 lines of code) (raw):

from okta.framework.ApiClient import ApiClient from okta.framework.Utils import Utils from okta.models.auth.AuthResult import AuthResult class AuthClient(ApiClient): def __init__(self, *args, **kwargs): kwargs['pathname'] = '/api/v1/authn' ApiClient.__init__(self, *args, **kwargs) def authenticate(self, username, password, relay_state=None, response_type=None, force_mfa=None, context=None): """Begin the authentication process with a username and password :param username: user's username :type username: str :param password: user's password :type password: str :param relay_state: data that will persist for the lifetime of the authentication or recovery token :type relay_state: str or None :param response_type: the type of session to return (session_token or session_token_url usually) :type response_type: str :param force_mfa: whether to force mfa even if the auth is exempt :type force_mfa: bool :param context: contextual info about the auth request like ip and location :type context: Context :rtype: AuthResult """ request = { 'username': username, 'password': password, 'relayState': relay_state, 'context': context } params = { 'force_mfa': force_mfa, 'response_type': response_type } response = ApiClient.post_path(self, '/', request, params=params) return Utils.deserialize(response.text, AuthResult) def auth_with_factor(self, state_token, factor_id, passcode, relay_state=None, remember_device=None): """Continue authentication with an MFA attempt :param state_token: current state token from the previous AuthResult :type state_token: str :param factor_id: target factor id :type factor_id: str :param passcode: passcode required for authenticating the factor :type passcode: str :param relay_state: data that will persist for the lifetime of the authentication or recovery token :type relay_state: str or None :param remember_device: whether to remember this device to avoid requiring MFA next time :type remember_device: bool :rtype: AuthResult """ request = { 'stateToken': state_token, 'passCode': passcode, 'relayState': relay_state } params = { 'rememberDevice': remember_device } response = ApiClient.post_path(self, '/factors/{0}/verify'.format(factor_id), request, params=params) return Utils.deserialize(response.text, AuthResult) # MFA MANAGEMENT def enroll_factor(self, state_token, factor_type, provider, profile, relay_state=None): """Enroll in an MFA factor during the auth flow. Usually only encountered if MFA is required for authentication :param state_token: current state token from the previous AuthResult :type state_token: str :param factor_type: type of factor (sms, token, question, token:software:totp, token:hardware etc) :type factor_type: str :param provider: factor provider (OKTA, RSA, SYMANTEC, GOOGLE etc) :type provider: str :param profile: factor profile that depends on the factor type :type profile: FactorProfile :param relay_state: data that will persist for the lifetime of the authentication or recovery token :type relay_state: str or None :rtype: AuthResult """ request = { 'stateToken': state_token, 'factorType': factor_type, 'provider': provider, 'profile': profile, 'relayState': relay_state } response = ApiClient.post_path(self, '/factors', request) return Utils.deserialize(response.text, AuthResult) def activate_factor(self, state_token, factor_id, passcode, relay_state=None): """Activate an MFA factor during the auth flow :param state_token: current state token from the previous AuthResult :type state_token: str :param factor_id: target factor id :type factor_id: str :param passcode: passcode required to activate the factor :type passcode: str :param relay_state: data that will persist for the lifetime of the authentication or recovery token :type relay_state: str or None :rtype: AuthResult """ request = { 'stateToken': state_token, 'passCode': passcode, 'relayState': relay_state } response = ApiClient.post_path(self, '/factors/{0}/lifecycle/activate'.format(factor_id), request) return Utils.deserialize(response.text, AuthResult) def resend_code(self, state_token, factor_id, relay_state=None): """Resend an a passcode for an authentication factor :param state_token: current state token from the previous AuthResult :type state_token: str :param factor_id: target factor id :type factor_id: str :param relay_state: data that will persist for the lifetime of the authentication or recovery token :type relay_state: str or None :rtype: AuthResult """ request = { 'stateToken': state_token, 'relayState': relay_state } response = ApiClient.post_path(self, '/factors/{0}/lifecycle/resend'.format(factor_id), request) return Utils.deserialize(response.text, AuthResult) # CREDENTIAL MANAGEMENT def change_password(self, state_token, old_password, new_password, relay_state=None): """Change a user's password during an authentication flow :param state_token: current state token from the previous AuthResult :type state_token: str :param old_password: user's current password :type old_password: str :param new_password: user's desired password :type new_password: str :param relay_state: data that will persist for the lifetime of the authentication or recovery token :type relay_state: str or None :rtype: AuthResult """ request = { 'stateToken': state_token, 'oldPassword': old_password, 'newPassword': new_password, 'relayState': relay_state } response = ApiClient.post_path(self, '/credentials/change_password', request) return Utils.deserialize(response.text, AuthResult) def reset_password(self, state_token, new_password, relay_state=None): """Reset a user's password during an authentication flow :param state_token: current state token from the previous AuthResult :type state_token: str :param new_password: user's desired password :type new_password: str :param relay_state: data that will persist for the lifetime of the authentication or recovery token :type relay_state: str or None :rtype: AuthResult """ request = { 'stateToken': state_token, 'newPassword': new_password, 'relayState': relay_state } response = ApiClient.post_path(self, '/credentials/reset_password', request) return Utils.deserialize(response.text, AuthResult) def forgot_password(self, username, relay_state=None): """Initiate a forgot password flow for a user :param username: user's username :type username: str :param relay_state: data that will persist for the lifetime of the authentication or recovery token :type relay_state: str or None :rtype: AuthResult """ request = { 'username': username, 'relayState': relay_state } response = ApiClient.post_path(self, '/recovery/password', request) return Utils.deserialize(response.text, AuthResult) def forgot_password_answer(self, state_token, security_answer, new_password, relay_state=None): """Answer the forgot password during an authentication flow :param state_token: current state token from the previous AuthResult :type state_token: str :param security_answer: answer to a user's security question :type security_answer: str :param new_password: user's desired password :type new_password: str :param relay_state: data that will persist for the lifetime of the authentication or recovery token :type relay_state: str or None :rtype: AuthResult """ request = { 'stateToken': state_token, 'securityAnswer': security_answer, 'newPassword': new_password, 'relayState': relay_state } response = ApiClient.post_path(self, '/recovery/answer', request) return Utils.deserialize(response.text, AuthResult) # RECOVERY def validate_recovery_token(self, recovery_token, relay_state=None): """Validate a token for recovery :param recovery_token: token distributed to end-user via out-of-band mechanism such as email :type recovery_token: str :param relay_state: data that will persist for the lifetime of the authentication or recovery token :type relay_state: str or None :rtype: AuthResult """ request = { 'recoveryToken': recovery_token, 'relayState': relay_state } response = ApiClient.post_path(self, '/recovery/token', request) return Utils.deserialize(response.text, AuthResult) def unlock_account(self, username, relay_state=None): """Begin unlocking an account :param username: user's username :type username: str :param relay_state: data that will persist for the lifetime of the authentication or recovery token :type relay_state: str or None :rtype: AuthResult """ request = { 'username': username, 'relayState': relay_state } response = ApiClient.post_path(self, '/recovery/unlock', request) return Utils.deserialize(response.text, AuthResult) def unlock_account_answer(self, state_token, security_answer, relay_state=None): """Unlock an account during an authentication :param state_token: current state token from the previous AuthResult :type state_token: str :param security_answer: answer to the user's security question :type security_answer: str :param relay_state: data that will persist for the lifetime of the authentication or recovery token :type relay_state: str or None :rtype: AuthResult """ request = { 'stateToken': state_token, 'securityAnswer': security_answer, 'relayState': relay_state } response = ApiClient.post_path(self, '/recovery/answer', request) return Utils.deserialize(response.text, AuthResult) # STATE MANAGEMENT def previous_state(self, state_token, relay_state=None): """Get the previous state of an in-progress authentication :param state_token: current state token from the previous AuthResult :type state_token: str :param relay_state: data that will persist for the lifetime of the authentication or recovery token :type relay_state: str or None :rtype: AuthResult """ request = { 'stateToken': state_token, 'relayState': relay_state } response = ApiClient.post_path(self, '/previous', request) return Utils.deserialize(response.text, AuthResult) def get_status(self, state_token, relay_state=None): """Get the status of an in-progress authentication :param state_token: current state token from the previous AuthResult :type state_token: str :param relay_state: data that will persist for the lifetime of the authentication or recovery token :type relay_state: str or None :rtype: AuthResult """ request = { 'stateToken': state_token, 'relayState': relay_state } response = ApiClient.post_path(self, '/', request) return Utils.deserialize(response.text, AuthResult) def verify_transaction(self, factor_id, transaction_id, user_response): """Verify a transaction :param factor_id: target factor id :type factor_id: str :param transaction_id: target transaction id :type transaction_id: str :param user_response: APPROVE or REJECT :type user_response: str :rtype: AuthResult """ request = { 'result': user_response } response = ApiClient.post_path(self, '/factors/{0}/transactions/{1}/verify'.format(factor_id, transaction_id), request) return Utils.deserialize(response.text, AuthResult)