"""Database initialization and connection management.""" import logging from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from sqlalchemy import text import asyncio from app.config import settings from app.models import Base logger = logging.getLogger(__name__) # Global engine and session factory engine = None AsyncSessionLocal = None async def init_db(): """Initialize database connection and create tables.""" global engine, AsyncSessionLocal try: # Create async engine engine = create_async_engine( settings.DATABASE_URL, echo=settings.DB_ECHO, pool_size=settings.DB_POOL_SIZE, max_overflow=10 ) # Create session factory AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) # Create tables async with engine.begin() as conn: # Enable pgvector extension try: await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) logger.info("pgvector extension enabled") except Exception as e: logger.warning(f"pgvector extension might already exist: {e}") # Create all tables await conn.run_sync(Base.metadata.create_all) logger.info("Database tables created successfully") except Exception as e: logger.error(f"Failed to initialize database: {e}") raise async def get_session() -> AsyncSession: """Get a new database session.""" if AsyncSessionLocal is None: raise RuntimeError("Database not initialized. Call init_db() first.") async with AsyncSessionLocal() as session: try: yield session except Exception as e: await session.rollback() logger.error(f"Database session error: {e}") raise finally: await session.close() async def close_db(): """Close database connection.""" global engine if engine: await engine.dispose() logger.info("Database connection closed")