""" PostgreSQL Database connection for Node Registry """ import os from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker, Session from contextlib import contextmanager import logging logger = logging.getLogger(__name__) # Database URL from environment DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/node_registry") # Create engine engine = create_engine( DATABASE_URL, pool_pre_ping=True, pool_size=5, max_overflow=10, echo=os.getenv("NODE_REGISTRY_ENV") == "development", ) # Create session factory SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) def get_db() -> Session: """ Dependency for FastAPI to get database session Usage: @app.get("/") def endpoint(db: Session = Depends(get_db)): ... """ db = SessionLocal() try: yield db finally: db.close() @contextmanager def get_db_context(): """ Context manager for database session Usage: with get_db_context() as db: db.query(Node).all() """ db = SessionLocal() try: yield db db.commit() except Exception: db.rollback() raise finally: db.close() def check_db_connection() -> bool: """Check if database connection is working""" try: with engine.connect() as conn: conn.execute(text("SELECT 1")) return True except Exception as e: logger.error(f"Database connection failed: {e}") return False def get_db_info() -> dict: """Get database connection information""" return { "type": "postgresql", "url": DATABASE_URL.split("@")[-1] if "@" in DATABASE_URL else DATABASE_URL, "connected": check_db_connection(), }