fuware-be/fuware/db/init_db.py

88 lines
2.6 KiB
Python

import os
from pathlib import Path
from time import sleep
from sqlalchemy import engine, orm, text
from alembic import command, config, script
from alembic.config import Config
from alembic.runtime import migration
from fuware.core import root_logger
from fuware.core.config import get_app_settings
from fuware.db.db_setup import session_context
from fuware.repos.repository_users import RepositoryUsers
from fuware.repos.seeder import default_users_init
from fuware.db.models._model_base import Model
# from fuware.db.models import User
PROJECT_DIR = Path(__file__).parent.parent.parent
logger = root_logger.get_logger()
def init_db(db) -> None:
logger.info("Initializing user data...")
default_users_init(db)
def db_is_at_head(alembic_cfg: config.Config) -> bool:
settings = get_app_settings()
url = settings.DB_URL
if not url:
raise ValueError("No database url found")
connectable = engine.create_engine(url)
directory = script.ScriptDirectory.from_config(alembic_cfg)
with connectable.begin() as connection:
context = migration.MigrationContext.configure(connection)
return set(context.get_current_heads()) == set(directory.get_heads())
def connect(session: orm.Session) -> bool:
try:
session.execute(text("SELECT 1"))
return True
except Exception as e:
logger.error(f"Error connecting to database: {e}")
return False
def main():
max_retry = 10
wait_seconds = 1
with session_context() as session:
while True:
if connect(session):
logger.info("Database connection established.")
break
logger.error(f"Database connection failed. Retrying in {wait_seconds} seconds...")
max_retry -= 1
sleep(wait_seconds)
if max_retry == 0:
raise ConnectionError("Database connection failed - exiting application.")
alembic_cfg_path = os.getenv("ALEMBIC_CONFIG_FILE", default=str(PROJECT_DIR / "alembic.ini"))
if not os.path.isfile(alembic_cfg_path):
raise Exception("Provided alembic config path doesn't exist")
alembic_cfg = Config(alembic_cfg_path)
if db_is_at_head(alembic_cfg):
logger.debug("Migration not needed.")
else:
logger.info("Migration needed. Performing migration...")
command.upgrade(alembic_cfg, "head")
if session.get_bind().name == "postgresql": # needed for fuzzy search and fast GIN text indices
session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm;"))
users = RepositoryUsers()
if users.get_all():
logger.info("Database already seeded.")
else:
logger.info("Seeding database...")
init_db(session)
if __name__ == "__main__":
main()