63 lines
2.9 KiB
Python

from datetime import datetime, timedelta, timezone
from typing import Annotated, Any
from fastapi import APIRouter, Depends, HTTPException, Response, status
# from fastapi.encoders import jsonable_encoder
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from backend.core.config import get_app_settings
from backend.core.dependencies.dependencies import get_current_user
from backend.core import MessageCode
from backend.db.db_setup import generate_session
from backend.schemas import ReturnValue, UserRequest, LoginResponse, PrivateUser
from backend.services.user import UserService
auth_router = APIRouter(tags=["Users: Authentication"])
user_service = UserService()
settings = get_app_settings()
db_dependency = Annotated[Session, Depends(generate_session)]
current_user_token = Annotated[PrivateUser, Depends(get_current_user)]
@auth_router.post('/token')
async def get_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: db_dependency):
user = user_service.check_exist(user=UserRequest(username=form_data.username, password=form_data.password))
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=MessageCode.WRONG_INPUT)
token = user_service.get_access_token(user_id=user.id)
return {'access_token': token, 'token_type': 'bearer'}
@auth_router.post('/login', response_model=ReturnValue[LoginResponse])
def user_login(user: UserRequest, response: Response) -> ReturnValue[Any]:
db_user = user_service.check_exist(user=user)
if not db_user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=MessageCode.WRONG_INPUT)
if db_user.is_lock is True:
raise HTTPException(status_code=status.HTTP_423_LOCKED, detail=MessageCode.ACCOUNT_LOCK)
access_token, refresh_token = user_service.generate_token(user_id=db_user.id)
duration_access = datetime.now(timezone.utc) + timedelta(minutes=settings.EXP_TOKEN)
duration_refresh = int(timedelta(days=settings.EXP_REFRESH).total_seconds())
response.set_cookie(
key=settings.COOKIE_KEY,
value=refresh_token,
max_age=duration_refresh,
expires=duration_refresh,
httponly=True,
samesite="strict",
)
return ReturnValue(status=200, data=dict(access_token=access_token, exp=int(duration_access.timestamp()), name=db_user.name))
@auth_router.get('/refresh')
def user_check(current_user: current_user_token) -> ReturnValue[Any]:
access_token = user_service.get_access_token(user_id=current_user.id)
duration_access = datetime.now(timezone.utc) + timedelta(minutes=settings.EXP_TOKEN)
return ReturnValue(status=200, data=dict(accessToken=access_token, exp=int(duration_access.timestamp())))
@auth_router.get('/logout')
def user_logout(response: Response, current_user: current_user_token) -> ReturnValue[Any]:
if current_user:
response.delete_cookie(key=settings.COOKIE_KEY)
return ReturnValue(status=200, data='Logged out')