modular_api/helpers/jwt_auth.py (71 lines of code) (raw):

import secrets import time from datetime import timedelta import jwt from modular_api.helpers.exceptions import ModularApiUnauthorizedException from modular_api.services import SP from modular_api.web_service import META_VERSION_ID from modular_api.services.refresh_token_service import RefreshTokenService SESSION_TOKEN_EXPIRATION = int(timedelta(days=1).total_seconds()) REFRESH_TOKEN_EXPIRATION = int(timedelta(days=14).total_seconds()) def encode_data_to_jwt(username: str) -> str: return jwt.encode( payload={ 'username': username, 'iat': int(time.time()), 'exp': int(time.time()) + SESSION_TOKEN_EXPIRATION, 'meta_version': META_VERSION_ID, }, key=SP.env.secret_key(), algorithm='HS256', ) def decode_jwt_token(token: str) -> dict: try: payload = jwt.decode( token, key=SP.env.secret_key(), algorithms='HS256', ) except jwt.exceptions.ExpiredSignatureError: # if you are going to change text in next line - you must update # RELOGIN_TEXT variable in Modular-CLI to keep automated re-login raise ModularApiUnauthorizedException( 'The provided token has expired. Please re-login to get a new token' ) except (jwt.exceptions.InvalidSignatureError, jwt.exceptions.DecodeError): return {} return payload def username_from_jwt_token(token: str) -> str | None: # todo fix, sometimes this method can receive not jwt token but base64 encoded basic auth string (username:password) payload = decode_jwt_token(token) if username := payload.get('username'): return username def gen_refresh_token_version() -> str: return secrets.token_hex() def encode_data_to_refresh_jwt(username: str, version: str) -> str: return jwt.encode( payload={ 'username': username, 'version': version, 'iat': int(time.time()), 'exp': int(time.time()) + REFRESH_TOKEN_EXPIRATION, }, key=SP.env.secret_key(), algorithm='HS256', ) def validate_refresh_token(refresh_token: str) -> tuple: try: decoded_token = jwt.decode( refresh_token, key=SP.env.secret_key(), algorithms='HS256', ) except (jwt.ExpiredSignatureError, jwt.InvalidTokenError, Exception): return None, None # Retrieve the token details username = decoded_token['username'] version = decoded_token['version'] # Check if the token exists and is valid existing_token = RefreshTokenService.get_refresh_token(username) if not existing_token: return None, None # Retrieve the existing_token from db details version_from_db = existing_token.version # Validate if version match the database records if not version == version_from_db: RefreshTokenService.delete_refresh_token(existing_token) return None, None return username, version