Done setup template

This commit is contained in:
2024-06-01 12:34:20 +00:00
parent 71d4afcc5e
commit 449a5f644f
104 changed files with 313 additions and 256 deletions

1
backend/core/__init__.py Normal file
View File

@ -0,0 +1 @@
from .message_code import *

31
backend/core/config.py Normal file
View File

@ -0,0 +1,31 @@
import os
from functools import lru_cache
from pathlib import Path
from dotenv import load_dotenv
from backend.core.settings import AppSettings, app_settings_constructor
CWD = Path(__file__).parent
BASE_DIR = CWD.parent.parent
ENV = BASE_DIR.joinpath(".env")
load_dotenv()
PRODUCTION = os.getenv("PRODUCTION", "True").lower() in ["true", "1"]
TESTING = os.getenv("TESTING", "False").lower() in ["true", "1"]
DATA_DIR = os.getenv("DATA_DIR")
def determine_data_dir() -> Path:
global PRODUCTION, TESTING, BASE_DIR, DATA_DIR
if TESTING:
return BASE_DIR.joinpath(DATA_DIR if DATA_DIR else "tests/.temp")
if PRODUCTION:
return Path(DATA_DIR if DATA_DIR else "/app/data")
return BASE_DIR.joinpath("dev", "data")
@lru_cache
def get_app_settings() -> AppSettings:
return app_settings_constructor(env_file=ENV, production=PRODUCTION, data_dir=determine_data_dir())

View File

@ -0,0 +1 @@
from .dependencies import *

View File

@ -0,0 +1,76 @@
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordBearer
from backend.core.config import get_app_settings
from backend.core import MessageCode
import jwt
from backend.services.user.user_service import UserService
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token")
oauth2_scheme_soft_fail = OAuth2PasswordBearer(tokenUrl="/api/auth/token", auto_error=False)
ALGORITHM = "HS256"
settings = get_app_settings()
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
async def is_logged_in(token: str = Depends(oauth2_scheme_soft_fail)) -> bool:
try:
payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
exp: int = payload.get("exp")
if exp is not None:
try:
user_service = UserService()
user = user_service.get_by_id(user_id)
if not user:
raise credentials_exception
if user.is_lock is True:
raise HTTPException(status_code=status.HTTP_423_LOCKED, detail=MessageCode.ACCOUNT_LOCK)
except Exception:
return credentials_exception
return user
except Exception:
raise credentials_exception
async def get_current_user(request: Request, token: str | None = Depends(oauth2_scheme_soft_fail)):
"""verify that user has a valid session"""
if token is None and settings.COOKIE_KEY in request.cookies:
# Try extract from cookie
token = request.cookies.get(settings.COOKIE_KEY, "")
else:
token = token or ""
try:
payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
exp: int = payload.get("exp")
if user_id is None or exp is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="credentials have expired",
)
user_service = UserService()
user = user_service.get_by_id(user_id)
if not user:
raise credentials_exception
if user.is_lock is True:
raise HTTPException(status_code=status.HTTP_423_LOCKED, detail=MessageCode.ACCOUNT_LOCK)
return user
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="credentials have expired",
)
except Exception:
raise credentials_exception

View File

@ -0,0 +1,66 @@
import json
import logging
import pathlib
import typing
from logging import config as logging_config
__dir = pathlib.Path(__file__).parent
__conf: dict[str, str] | None = None
def _load_config(path: pathlib.Path, substitutions: dict[str, str] | None = None) -> dict[str, typing.Any]:
with open(path) as file:
if substitutions:
contents = file.read()
for key, value in substitutions.items():
# Replaces the key matches
#
# Example:
# {"key": "value"}
# "/path/to/${key}/file" -> "/path/to/value/file"
contents = contents.replace(f"${{{key}}}", value)
json_data = json.loads(contents)
else:
json_data = json.load(file)
return json_data
def log_config() -> dict[str, str]:
if __conf is None:
raise ValueError("logger not configured, must call configured_logger first")
return __conf
def configured_logger(
*,
mode: str,
config_override: pathlib.Path | None = None,
substitutions: dict[str, str] | None = None,
) -> logging.Logger:
"""
Configure the logger based on the mode and return the root logger
Args:
mode (str): The mode to configure the logger for (production, development, testing)
config_override (pathlib.Path, optional): A path to a custom logging config. Defaults to None.
substitutions (dict[str, str], optional): A dictionary of substitutions to apply to the logging config.
"""
global __conf
if config_override:
__conf = _load_config(config_override, substitutions)
else:
if mode == "production":
__conf = _load_config(__dir / "logconf.prod.json", substitutions)
elif mode == "development":
__conf = _load_config(__dir / "logconf.dev.json", substitutions)
elif mode == "testing":
__conf = _load_config(__dir / "logconf.test.json", substitutions)
else:
raise ValueError(f"Invalid mode: {mode}")
logging_config.dictConfig(config=__conf)
return logging.getLogger()

View File

@ -0,0 +1,15 @@
{
"version": 1,
"disable_existing_loggers": false,
"handlers": {
"rich": {
"class": "rich.logging.RichHandler"
}
},
"loggers": {
"root": {
"level": "DEBUG",
"handlers": ["rich"]
}
}
}

View File

@ -0,0 +1,63 @@
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"simple": {
"format": "%(levelname)-8s %(asctime)s - %(message)s",
"datefmt": "%Y-%m-%dT%H:%M:%S"
},
"detailed": {
"format": "[%(levelname)s|%(module)s|L%(lineno)d] %(asctime)s: %(message)s",
"datefmt": "%Y-%m-%dT%H:%M:%S"
},
"access": {
"()": "uvicorn.logging.AccessFormatter",
"fmt": "%(levelname)-8s %(asctime)s - [%(client_addr)s] %(status_code)s \"%(request_line)s\"",
"datefmt": "%Y-%m-%dT%H:%M:%S"
}
},
"handlers": {
"stderr": {
"class": "logging.StreamHandler",
"level": "WARNING",
"formatter": "simple",
"stream": "ext://sys.stderr"
},
"stdout": {
"class": "logging.StreamHandler",
"level": "${LOG_LEVEL}",
"formatter": "simple",
"stream": "ext://sys.stdout"
},
"access": {
"class": "logging.StreamHandler",
"level": "${LOG_LEVEL}",
"formatter": "access",
"stream": "ext://sys.stdout"
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"level": "DEBUG",
"formatter": "detailed",
"filename": "${DATA_DIR}/mealie.log",
"maxBytes": 10000,
"backupCount": 3
}
},
"loggers": {
"root": {
"level": "${LOG_LEVEL}",
"handlers": ["stderr", "file", "stdout"]
},
"uvicorn.error": {
"handlers": ["stderr", "file", "stdout"],
"level": "${LOG_LEVEL}",
"propagate": false
},
"uvicorn.access": {
"handlers": ["access", "file"],
"level": "${LOG_LEVEL}",
"propagate": false
}
}
}

View File

@ -0,0 +1,24 @@
{
"version": 1,
"disable_existing_loggers": false,
"formatters": {
"detailed": {
"format": "[%(levelname)s|%(module)s|L%(lineno)d] %(asctime)s: %(message)s",
"datefmt": "%Y-%m-%dT%H:%M:%S"
}
},
"handlers": {
"stdout": {
"class": "logging.StreamHandler",
"level": "DEBUG",
"formatter": "detailed",
"stream": "ext://sys.stdout"
}
},
"loggers": {
"root": {
"level": "${LOG_LEVEL}",
"handlers": ["stdout"]
}
}
}

View File

@ -0,0 +1,6 @@
class MessageCode():
CREATED_USER: str = 'CREATED_USER'
WRONG_INPUT: str = 'LOGIN_WRONG'
ACCOUNT_LOCK: str = 'USER_LOCK'
REFRESH_TOKEN_EXPIRED: str = 'REFRESH_TOKEN_EXPIRED'

View File

@ -0,0 +1,43 @@
import logging
from .config import get_app_settings
from .logger.config import configured_logger
__root_logger: None | logging.Logger = None
def get_logger(module=None) -> logging.Logger:
"""
Get a logger instance for a module, in most cases module should not be
provided. Simply using the root logger is sufficient.
Cases where you would want to use a module specific logger might be a background
task or a long running process where you want to easily identify the source of
those messages
"""
global __root_logger
if __root_logger is None:
app_settings = get_app_settings()
mode = "development"
if app_settings.TESTING:
mode = "testing"
elif app_settings.PRODUCTION:
mode = "production"
substitutions = {
"LOG_LEVEL": app_settings.LOG_LEVEL.upper(),
}
__root_logger = configured_logger(
mode=mode,
config_override=app_settings.LOG_CONFIG_OVERRIDE,
substitutions=substitutions,
)
if module is None:
return __root_logger
return __root_logger.getChild(module)

View File

@ -0,0 +1 @@
from .security import *

View File

@ -0,0 +1,34 @@
from functools import lru_cache
from typing import Protocol
import bcrypt
from backend.core.config import get_app_settings
class Hasher(Protocol):
def hash(self, password: str) -> str: ...
def verify(self, password: str, hashed: str) -> bool: ...
class FakeHasher:
def hash(self, password: str) -> str:
return password
def verify(self, password: str, hashed: str) -> bool:
return password == hashed
class BcryptHasher:
def hash(self, password: str) -> str:
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def verify(self, password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
@lru_cache(maxsize=1)
def get_hasher() -> Hasher:
settings = get_app_settings()
if settings.TESTING:
return FakeHasher()
return BcryptHasher()

View File

@ -0,0 +1,46 @@
import secrets
from datetime import datetime, timedelta, timezone
from pathlib import Path
import jwt
from backend.core.config import get_app_settings
from backend.core import root_logger
from backend.core.security.hasher import get_hasher
ALGORITHM = "HS256"
logger = root_logger.get_logger("security")
settings = get_app_settings()
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
settings = get_app_settings()
to_encode = data.copy()
expires_delta = expires_delta or timedelta(minutes=settings.EXP_TOKEN)
expire = datetime.now(timezone.utc) + expires_delta
to_encode["exp"] = expire
return jwt.encode(to_encode, settings.SECRET, algorithm=ALGORITHM)
def create_refresh_token(data: dict) -> str:
return create_access_token(data, expires_delta=timedelta(days=settings.EXP_REFRESH))
def create_file_token(file_path: Path) -> str:
token_data = {"file": str(file_path)}
return create_access_token(token_data, expires_delta=timedelta(minutes=30))
def hash_password(password: str) -> str:
"""Takes in a raw password and hashes it. Used prior to saving a new password to the database."""
return get_hasher().hash(password)
def url_safe_token() -> str:
"""Generates a cryptographic token without embedded data. Used for password reset tokens and invitation tokens"""
return secrets.token_urlsafe(24)
def verify_token(exp: int):
expried = datetime.fromtimestamp(exp / 1e3)
return expried < datetime.now(timezone.utc)

View File

@ -0,0 +1 @@
from .settings import *

View File

@ -0,0 +1,28 @@
from abc import ABC, abstractmethod
from pathlib import Path
from pydantic import BaseModel
class AbstractDBProvider(ABC):
@property
@abstractmethod
def db_url(self) -> str: ...
@property
@abstractmethod
def db_url_public(self) -> str: ...
class SQLiteProvider(AbstractDBProvider, BaseModel):
data_dir: Path
prefix: str = ""
@property
def db_path(self):
return self.data_dir / f"{self.prefix}fuware.db"
@property
def db_url(self) -> str:
return f"sqlite:///{str(self.db_path.absolute())}"
@property
def db_url_public(self) -> str:
return self.db_url

View File

@ -0,0 +1,80 @@
from pathlib import Path
from backend.core.settings.db_providers import AbstractDBProvider, SQLiteProvider
from pydantic_settings import BaseSettings # type: ignore
def determine_secrets(production: bool) -> str:
if not production:
return "shh-secret-test-key"
return "1d00e664fb3b07aff5a191755ea72f9c4bc85a3f36868308d0b2c417aed3419e"
def determine_cookie(production: bool) -> str:
if not production:
return "logcook"
return "7fo24CMyIc"
class AppSettings(BaseSettings):
PRODUCTION: bool
TESTING: bool
BASE_URL: str = "http://localhost:8080"
"""trailing slashes are trimmed (ex. `http://localhost:8080/` becomes ``http://localhost:8080`)"""
HOST_IP: str = "*"
API_HOST: str = "0.0.0.0"
API_PORT: int = 9000
API_DOCS: bool = True
ALLOW_SIGNUP: bool = False
SECRET: str
COOKIE_KEY: str
EXP_TOKEN: int = 30
"""in minutes, default is 30 minutes"""
EXP_REFRESH: int = 7
"""in days, default is 7 days"""
LOG_CONFIG_OVERRIDE: Path | None = None
"""path to custom logging configuration file"""
LOG_LEVEL: str = "info"
"""corresponds to standard Python log levels"""
@property
def DOCS_URL(self) -> str | None:
return "/docs" if self.API_DOCS else None
@property
def REDOC_URL(self) -> str | None:
return "/redoc" if self.API_DOCS else None
# ===============================================
# Database Configuration
DB_PROVIDER: AbstractDBProvider | None = None
@property
def DB_URL(self) -> str | None:
return self.DB_PROVIDER.db_url if self.DB_PROVIDER else None
@property
def DB_URL_PUBLIC(self) -> str | None:
return self.DB_PROVIDER.db_url_public if self.DB_PROVIDER else None
def app_settings_constructor(data_dir: Path, production: bool, env_file: Path, env_encoding="utf-8") -> AppSettings:
"""
app_settings_constructor is a factory function that returns an AppSettings object. It is used to inject the
required dependencies into the AppSettings object and nested child objects. AppSettings should not be substantiated
directly, but rather through this factory function.
"""
app_settings = AppSettings(
_env_file=env_file, # type: ignore
_env_file_encoding=env_encoding, # type: ignore
**{"SECRET": determine_secrets(production), 'COOKIE_KEY': determine_cookie(production)},
)
app_settings.DB_PROVIDER = SQLiteProvider(data_dir=data_dir)
return app_settings