in src/services/clients/mongo_ssm_auth_client.py [0:0]
def admin_refresh_token(self, refresh_token: str
) -> AuthenticationResult | None:
_LOG.info('Starting on-prem refresh token flow')
tpl = self._decrypt_refresh_token(refresh_token)
if not tpl:
_LOG.info('Invalid refresh token provided. Cannot refresh')
return
username, rt_version = tpl
latest = self._refresh_col.find_one({'_id': username})
if not latest or not latest.get('v'):
_LOG.warning('Latest version of token not found in DB '
'but valid token was received. Cannot refresh')
return
correct_version = latest['v']
if rt_version != correct_version:
_LOG.warning('Valid token received but its version and one from '
'DB do not match. Stolen refresh token or user '
'reused one. Invalidating existing version')
self._refresh_col.delete_one({'_id': username})
return
rt_version = self._gen_refresh_token_version()
self._refresh_col.replace_one({'_id': username}, {
'v': rt_version # latest version for user
}, upsert=True)
user_item = User.get_nullable(hash_key=username)
return {
'id_token': self._gen_access_token(user_item),
'refresh_token': self._gen_refresh_token(username, rt_version),
'expires_in': EXPIRATION_IN_MINUTES * 60
}