in lemur/auth/service.py [0:0]
def login_required(f):
"""
Validates the JWT and ensures that is has not expired and the user is still active.
:param f:
:return:
"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not request.headers.get("Authorization"):
response = jsonify(message="Missing authorization header")
response.status_code = 401
return response
try:
token = request.headers.get("Authorization").split()[1]
except Exception as e:
return dict(message="Token is invalid"), 403
try:
header_data = fetch_token_header(token)
payload = jwt.decode(token, current_app.config["LEMUR_TOKEN_SECRET"], algorithms=[header_data["alg"]])
except jwt.DecodeError:
return dict(message="Token is invalid"), 403
except jwt.ExpiredSignatureError:
return dict(message="Token has expired"), 403
except jwt.InvalidTokenError:
return dict(message="Token is invalid"), 403
if "aid" in payload:
access_key = api_key_service.get(payload["aid"])
if access_key.revoked:
return dict(message="Token has been revoked"), 403
if access_key.ttl != -1:
current_time = datetime.utcnow()
expired_time = datetime.fromtimestamp(
access_key.issued_at + access_key.ttl
)
if current_time >= expired_time:
return dict(message="Token has expired"), 403
user = user_service.get(payload["sub"])
if not user.active:
return dict(message="User is not currently active"), 403
g.current_user = user
if not g.current_user:
return dict(message="You are not logged in"), 403
# Tell Flask-Principal the identity changed
identity_changed.send(
current_app._get_current_object(), identity=Identity(g.current_user.id)
)
return f(*args, **kwargs)
return decorated_function