"""Query route for hybrid knowledge graph retrieval.""" from fastapi import APIRouter, HTTPException, Depends from pydantic import BaseModel from typing import Optional, List import logging from app.config import settings from app.db import get_session from app.services.retrieval_service import RetrievalService logger = logging.getLogger(__name__) router = APIRouter() class QueryRequest(BaseModel): query: str domain: Optional[str] = settings.LIGHTRAG_DOMAIN top_k: int = 5 entity_links: bool = True min_relevance: float = 0.5 class RetrievalResult(BaseModel): source_doc_id: str title: str content: str relevance_score: float retrieval_method: str # "bm25", "vector", "hybrid" class EntityLink(BaseModel): entity_id: str name: str entity_type: str confidence: float class QueryResponse(BaseModel): query: str domain: str results: List[RetrievalResult] entities: List[EntityLink] relations: List[dict] total_results: int latency_ms: float @router.post("/query", response_model=QueryResponse) async def query_knowledge_graph( req: QueryRequest, session = Depends(get_session) ): """ Query knowledge graph with hybrid retrieval. Combines: 1. BM25 full-text search over entity descriptions & document content 2. Vector similarity search using bge-m3 embeddings 3. Reciprocal Rank Fusion (RRF) to combine scores """ try: retrieval = RetrievalService(session) result = await retrieval.hybrid_query( query_text=req.query, domain=req.domain, top_k=req.top_k, min_relevance=req.min_relevance, extract_entities=req.entity_links ) # Convert result to match QueryResponse format return QueryResponse( query=result.get("query", req.query), domain=result.get("domain", req.domain), results=[ RetrievalResult( source_doc_id=r.get("id"), title=r.get("title", ""), content=r.get("content", ""), relevance_score=r.get("relevance_score", 0), retrieval_method=r.get("retrieval_method", "hybrid") ) for r in result.get("results", []) ], entities=[ EntityLink( entity_id=e.get("entity_id"), name=e.get("name", ""), entity_type=e.get("entity_type", ""), confidence=e.get("confidence", 0) ) for e in result.get("entities", []) ], relations=result.get("relations", []), total_results=result.get("total_results", 0), latency_ms=result.get("latency_ms", 0) ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Query error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @router.get("/query/suggestions") async def get_query_suggestions(domain: str = settings.LIGHTRAG_DOMAIN): """Get example queries for a domain.""" suggestions = { "transceiver": [ "What 400G transceivers work with Cisco Nexus 9300-GX?", "Compare QSFP-DD vs OSFP form factors for 800G", "Which compatible optics are cheaper than OEM for 100G", "What's the migration path from 10G to 100G", "SFF-8024 code meanings for transceiver specs" ], "switch": [ "What are the differences between Cisco Nexus 9300-GX and 9300-FX?", "Which Arista EOS switches support 800G ports?", ], "standard": [ "IEEE 802.3 transceiver requirements", "MSA compliance vs interoperability", ] } return suggestions.get(domain, suggestions["transceiver"])