import os from pathlib import Path from time import sleep from sqlalchemy import engine, orm, text from alembic import command, config, script from alembic.config import Config from alembic.runtime import migration from backend.core import root_logger from backend.core.config import get_app_settings from backend.db.db_setup import session_context from backend.repos.repository_users import RepositoryUsers from backend.repos.seeder import default_users_init # from fuware.db.models import User PROJECT_DIR = Path(__file__).parent.parent.parent logger = root_logger.get_logger() def init_db(db) -> None: logger.info("Initializing user data...") default_users_init(db) def db_is_at_head(alembic_cfg: config.Config) -> bool: settings = get_app_settings() url = settings.DB_URL if not url: raise ValueError("No database url found") connectable = engine.create_engine(url) directory = script.ScriptDirectory.from_config(alembic_cfg) with connectable.begin() as connection: context = migration.MigrationContext.configure(connection) return set(context.get_current_heads()) == set(directory.get_heads()) def connect(session: orm.Session) -> bool: try: session.execute(text("SELECT 1")) return True except Exception as e: logger.error(f"Error connecting to database: {e}") return False def main(): max_retry = 10 wait_seconds = 1 with session_context() as session: while True: if connect(session): logger.info("Database connection established.") break logger.error(f"Database connection failed. Retrying in {wait_seconds} seconds...") max_retry -= 1 sleep(wait_seconds) if max_retry == 0: raise ConnectionError("Database connection failed - exiting application.") alembic_cfg_path = os.getenv("ALEMBIC_CONFIG_FILE", default=str(PROJECT_DIR / "alembic.ini")) if not os.path.isfile(alembic_cfg_path): raise Exception("Provided alembic config path doesn't exist") alembic_cfg = Config(alembic_cfg_path) if db_is_at_head(alembic_cfg): logger.debug("Migration not needed.") else: logger.info("Migration needed. Performing migration...") command.upgrade(alembic_cfg, "head") if session.get_bind().name == "postgresql": # needed for fuzzy search and fast GIN text indices session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm;")) users = RepositoryUsers() if users.get_all(): logger.info("Database already seeded.") else: logger.info("Seeding database...") init_db(session) if __name__ == "__main__": main()