63 lines
2.9 KiB
Python
63 lines
2.9 KiB
Python
from datetime import datetime, timedelta, timezone
|
|
from typing import Annotated, Any
|
|
from fastapi import APIRouter, Depends, HTTPException, Response, status
|
|
|
|
# from fastapi.encoders import jsonable_encoder
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
from sqlalchemy.orm import Session
|
|
from backend.core.config import get_app_settings
|
|
from backend.core.dependencies.dependencies import get_current_user
|
|
from backend.core import MessageCode
|
|
from backend.db.db_setup import generate_session
|
|
from backend.schemas import ReturnValue, UserRequest, LoginResponse, PrivateUser
|
|
from backend.services.user import UserService
|
|
|
|
|
|
auth_router = APIRouter(tags=["Users: Authentication"])
|
|
user_service = UserService()
|
|
settings = get_app_settings()
|
|
|
|
db_dependency = Annotated[Session, Depends(generate_session)]
|
|
current_user_token = Annotated[PrivateUser, Depends(get_current_user)]
|
|
|
|
@auth_router.post('/token')
|
|
async def get_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: db_dependency):
|
|
user = user_service.check_exist(user=UserRequest(username=form_data.username, password=form_data.password))
|
|
if not user:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=MessageCode.WRONG_INPUT)
|
|
token = user_service.get_access_token(user_id=user.id)
|
|
return {'access_token': token, 'token_type': 'bearer'}
|
|
|
|
|
|
@auth_router.post('/login', response_model=ReturnValue[LoginResponse])
|
|
def user_login(user: UserRequest, response: Response) -> ReturnValue[Any]:
|
|
db_user = user_service.check_exist(user=user)
|
|
if not db_user:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=MessageCode.WRONG_INPUT)
|
|
if db_user.is_lock is True:
|
|
raise HTTPException(status_code=status.HTTP_423_LOCKED, detail=MessageCode.ACCOUNT_LOCK)
|
|
access_token, refresh_token = user_service.generate_token(user_id=db_user.id)
|
|
duration_access = datetime.now(timezone.utc) + timedelta(minutes=settings.EXP_TOKEN)
|
|
duration_refresh = int(timedelta(days=settings.EXP_REFRESH).total_seconds())
|
|
response.set_cookie(
|
|
key=settings.COOKIE_KEY,
|
|
value=refresh_token,
|
|
max_age=duration_refresh,
|
|
expires=duration_refresh,
|
|
httponly=True,
|
|
samesite="strict",
|
|
)
|
|
return ReturnValue(status=200, data=dict(access_token=access_token, exp=int(duration_access.timestamp()), name=db_user.name))
|
|
|
|
@auth_router.get('/refresh')
|
|
def user_check(current_user: current_user_token) -> ReturnValue[Any]:
|
|
access_token = user_service.get_access_token(user_id=current_user.id)
|
|
duration_access = datetime.now(timezone.utc) + timedelta(minutes=settings.EXP_TOKEN)
|
|
return ReturnValue(status=200, data=dict(accessToken=access_token, exp=int(duration_access.timestamp())))
|
|
|
|
@auth_router.get('/logout')
|
|
def user_logout(response: Response, current_user: current_user_token) -> ReturnValue[Any]:
|
|
if current_user:
|
|
response.delete_cookie(key=settings.COOKIE_KEY)
|
|
return ReturnValue(status=200, data='Logged out')
|