from typing import Any from fastapi import APIRouter, HTTPException, Response, Request, Depends from fastapi.encoders import jsonable_encoder from schemas import ReturnValue, User, UserCreate, UserRequest from ultis import root_api_path_build, encryptString, decryptString, verify_password from const import COOKIE_KEY from sqlalchemy.orm import Session from db.controller import get_user_by_username, create_user from db import get_db authRouter=APIRouter(prefix=root_api_path_build('/auth')) @authRouter.put('/register') def register_user(user: UserCreate, db: Session = Depends(get_db)) -> ReturnValue[Any]: db_user = get_user_by_username(db=db, usn=user.username) if db_user: raise HTTPException(status_code=400, detail="Username already registered!") user_return = create_user(db=db, user=user) return ReturnValue(status=200, data=jsonable_encoder(user_return)) @authRouter.post('/login', response_model=ReturnValue[User]) def user_login(user: UserRequest, response: Response, db: Session = Depends(get_db)) -> ReturnValue[Any]: db_user = get_user_by_username(db, user.username) if not db_user: raise HTTPException(status_code=401, detail="Your username or password input is wrong!") if not verify_password(user.password, db_user.password): raise HTTPException(status_code=401, detail="Your username or password input is wrong!") if db_user.is_lock is True: raise HTTPException(status_code=401, detail="Your Account is banned") cookieEncode = encryptString(user.username + ',' + user.password) response.set_cookie(key=COOKIE_KEY, value=cookieEncode.decode('utf-8')) return ReturnValue(status=200, data=jsonable_encoder(db_user)) @authRouter.get('/logout') def user_logout(response: Response) -> ReturnValue[Any]: response.delete_cookie(key=COOKIE_KEY) return ReturnValue(status=200, data='Logged out') def get_auth_user(request: Request, db: Session = Depends(get_db)): """verify that user has a valid session""" session_id = request.cookies.get(COOKIE_KEY) if not session_id: raise HTTPException(status_code=401, detail="Unauthorized") decrypt_user = decryptString(session_id).split(',') db_user = get_user_by_username(db, decrypt_user[0]) if not db_user: raise HTTPException(status_code=403) if not verify_password(decrypt_user[1], db_user.password): raise HTTPException(status_code=401, detail="Your username or password input is wrong!") return True