Done for Login and notification system
This commit is contained in:
@ -1,43 +1,70 @@
|
||||
from typing import Any
|
||||
from fastapi import APIRouter, Depends, HTTPException, Response, Request
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Annotated, Any
|
||||
from fastapi import APIRouter, Depends, HTTPException, Response, status
|
||||
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
# from fastapi.encoders import jsonable_encoder
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
from sqlalchemy.orm import Session
|
||||
from fuware.core.config import get_app_settings
|
||||
from fuware.core.message_code import message_code
|
||||
from fuware.core.security.hasher import get_hasher
|
||||
from fuware.core.dependencies.dependencies import get_current_user
|
||||
from fuware.core import MessageCode
|
||||
from fuware.db.db_setup import generate_session
|
||||
from fuware.schemas import ReturnValue, UserRequest, PrivateUser, UserCreate
|
||||
from fuware.services import UserService
|
||||
from fuware.schemas import ReturnValue, UserRequest, LoginResponse, UserCreate, PrivateUser
|
||||
from fuware.services.user import UserService
|
||||
|
||||
|
||||
public_router = APIRouter(tags=["Users: Authentication"])
|
||||
auth_router = APIRouter(tags=["Users: Authentication"])
|
||||
user_service = UserService()
|
||||
hasher = get_hasher()
|
||||
settings = get_app_settings()
|
||||
message = message_code()
|
||||
|
||||
@public_router.put('/register')
|
||||
def register_user(user: UserCreate, db: Session = Depends(generate_session)) -> ReturnValue[Any]:
|
||||
db_dependency = Annotated[Session, Depends(generate_session)]
|
||||
current_user_token = Annotated[PrivateUser, Depends(get_current_user)]
|
||||
|
||||
@auth_router.post('/token')
|
||||
async def get_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: db_dependency):
|
||||
user = user_service.check_exist(user=UserRequest(username=form_data.username, password=form_data.password))
|
||||
if not user:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=MessageCode.WRONG_INPUT)
|
||||
token = user_service.get_access_token(user_id=user.id)
|
||||
return {'access_token': token, 'token_type': 'bearer'}
|
||||
|
||||
|
||||
@auth_router.put('/register')
|
||||
def register_user(user: UserCreate, db: db_dependency) -> ReturnValue[Any]:
|
||||
db_user = user_service.get_by_username(username=user.username)
|
||||
if db_user:
|
||||
raise HTTPException(status_code=400, detail=message.CREATED_USER)
|
||||
user_return = user_service.create(db=db, user=user)
|
||||
return ReturnValue(status=200, data=jsonable_encoder(user_return))
|
||||
raise HTTPException(status_code=400, detail=MessageCode.CREATED_USER)
|
||||
user_service.create(db=db, user=user)
|
||||
return ReturnValue(status=200, data="created")
|
||||
|
||||
@public_router.post('/login', response_model=ReturnValue[PrivateUser])
|
||||
def user_login(user: UserRequest, response: Response, db: Session = Depends(generate_session)) -> ReturnValue[Any]:
|
||||
@auth_router.post('/login', response_model=ReturnValue[LoginResponse])
|
||||
def user_login(user: UserRequest, response: Response) -> ReturnValue[Any]:
|
||||
db_user = user_service.check_exist(user=user)
|
||||
cookieEncode = user_service.check_login(db=db, user_id=db_user.id)
|
||||
response.set_cookie(key=settings.COOKIE_KEY, value=cookieEncode, max_age=86400, httponly=True)
|
||||
return ReturnValue(status=200, data=db_user)
|
||||
if not db_user:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=MessageCode.WRONG_INPUT)
|
||||
if db_user.is_lock is True:
|
||||
raise HTTPException(status_code=status.HTTP_423_LOCKED, detail=MessageCode.ACCOUNT_LOCK)
|
||||
access_token, refresh_token = user_service.generate_token(user_id=db_user.id)
|
||||
duration_access = datetime.now(timezone.utc) + timedelta(minutes=settings.EXP_TOKEN)
|
||||
duration_refresh = int(timedelta(days=settings.EXP_REFRESH).total_seconds())
|
||||
response.set_cookie(
|
||||
key=settings.COOKIE_KEY,
|
||||
value=refresh_token,
|
||||
max_age=duration_refresh,
|
||||
expires=duration_refresh,
|
||||
httponly=True,
|
||||
samesite="strict",
|
||||
)
|
||||
return ReturnValue(status=200, data=dict(access_token=access_token, exp=int(duration_access.timestamp()), name=db_user.name))
|
||||
|
||||
@public_router.get('/logout', response_model=ReturnValue[Any])
|
||||
def user_logout(request: Request, response: Response, db: Session = Depends(generate_session)) -> ReturnValue[Any]:
|
||||
session_id = request.cookies.get(settings.COOKIE_KEY)
|
||||
if not session_id:
|
||||
@auth_router.get('/refresh')
|
||||
def user_check(current_user: current_user_token) -> ReturnValue[Any]:
|
||||
access_token = user_service.get_access_token(user_id=current_user.id)
|
||||
duration_access = datetime.now(timezone.utc) + timedelta(minutes=settings.EXP_TOKEN)
|
||||
return ReturnValue(status=200, data=dict(accessToken=access_token, exp=int(duration_access.timestamp())))
|
||||
|
||||
@auth_router.get('/logout')
|
||||
def user_logout(response: Response, current_user: current_user_token) -> ReturnValue[Any]:
|
||||
if current_user:
|
||||
response.delete_cookie(key=settings.COOKIE_KEY)
|
||||
return ReturnValue(status=200, data='Logged out')
|
||||
user_service.delete_session(db=db, user_ss=session_id)
|
||||
response.delete_cookie(key=settings.COOKIE_KEY)
|
||||
return ReturnValue(status=200, data='Logged out')
|
||||
|
Reference in New Issue
Block a user