77 lines
2.4 KiB
Python

from fastapi import Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordBearer
from backend.core.config import get_app_settings
from backend.core import MessageCode
import jwt
from backend.services.user.user_service import UserService
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token")
oauth2_scheme_soft_fail = OAuth2PasswordBearer(tokenUrl="/api/auth/token", auto_error=False)
ALGORITHM = "HS256"
settings = get_app_settings()
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
async def is_logged_in(token: str = Depends(oauth2_scheme_soft_fail)) -> bool:
try:
payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
exp: int = payload.get("exp")
if exp is not None:
try:
user_service = UserService()
user = user_service.get_by_id(user_id)
if not user:
raise credentials_exception
if user.is_lock is not None:
raise HTTPException(status_code=status.HTTP_423_LOCKED, detail=MessageCode.ACCOUNT_LOCK)
except Exception:
return credentials_exception
return user
except Exception:
raise credentials_exception
async def get_current_user(request: Request, token: str | None = Depends(oauth2_scheme_soft_fail)):
"""verify that user has a valid session"""
if token is None and settings.COOKIE_KEY in request.cookies:
# Try extract from cookie
token = request.cookies.get(settings.COOKIE_KEY, "")
else:
token = token or ""
try:
payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
exp: int = payload.get("exp")
if user_id is None or exp is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="credentials have expired",
)
user_service = UserService()
user = user_service.get_by_id(user_id)
if not user:
raise credentials_exception
if user.is_lock is True:
raise HTTPException(status_code=status.HTTP_423_LOCKED, detail=MessageCode.ACCOUNT_LOCK)
return user
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="credentials have expired",
)
except Exception:
raise credentials_exception