"""Document ingestion route for knowledge graph building.""" from fastapi import APIRouter, HTTPException, BackgroundTasks, Depends from pydantic import BaseModel from typing import List, Optional import logging import uuid from app.config import settings from app.db import get_session from app.services.ingestion_service import IngestionService logger = logging.getLogger(__name__) router = APIRouter() class DocumentInput(BaseModel): title: str content: str source: str # blog, datasheet, standard metadata: Optional[dict] = None class IngestRequest(BaseModel): domain: str = settings.LIGHTRAG_DOMAIN documents: List[DocumentInput] batch_size: int = 10 class IngestResponse(BaseModel): job_id: str status: str # queued, processing, completed documents_submitted: int estimated_time_sec: float class IngestStatus(BaseModel): job_id: str status: str # processing, completed, failed documents_processed: int documents_failed: int total_documents: int entities_extracted: int entities_linked: int latency_ms: float # Track ingestion jobs in memory (should use Redis in production) ingestion_jobs = {} @router.post("/ingest", response_model=IngestResponse) async def ingest_documents( req: IngestRequest, background_tasks: BackgroundTasks, session = Depends(get_session) ): """ Submit documents for knowledge graph ingestion. Pipeline: 1. Entity extraction (LLM-powered) 2. Entity linking (fuzzy match + vector similarity) 3. Relation extraction 4. Embedding + Qdrant indexing 5. PostgreSQL storage """ if not req.documents: raise HTTPException(status_code=400, detail="No documents provided") if len(req.documents) > 1000: raise HTTPException(status_code=400, detail="Max 1000 documents per request") job_id = str(uuid.uuid4()) estimated_time = len(req.documents) * 2 / 60 # ~2sec per doc # Track job ingestion_jobs[job_id] = { "status": "queued", "documents_submitted": len(req.documents), "documents_processed": 0, "documents_failed": 0, "entities_extracted": 0, "entities_linked": 0, } # Queue background task background_tasks.add_task( _process_ingestion, job_id=job_id, domain=req.domain, documents=req.documents, batch_size=req.batch_size, session=session ) return IngestResponse( job_id=job_id, status="queued", documents_submitted=len(req.documents), estimated_time_sec=estimated_time ) async def _process_ingestion( job_id: str, domain: str, documents: List[DocumentInput], batch_size: int, session ): """Background task to process document ingestion.""" try: ingestion_jobs[job_id]["status"] = "processing" ingestion = IngestionService(session) for i in range(0, len(documents), batch_size): batch = documents[i:i+batch_size] batch_dicts = [ { "title": doc.title, "content": doc.content, "source": doc.source, "metadata": doc.metadata } for doc in batch ] result = await ingestion.process_batch( domain=domain, documents=batch_dicts ) ingestion_jobs[job_id]["documents_processed"] += result["processed"] ingestion_jobs[job_id]["documents_failed"] += result["failed"] ingestion_jobs[job_id]["entities_extracted"] += result["entities_extracted"] ingestion_jobs[job_id]["entities_linked"] += result["entities_linked"] ingestion_jobs[job_id]["status"] = "completed" logger.info(f"Ingestion job {job_id} completed") except Exception as e: ingestion_jobs[job_id]["status"] = "failed" ingestion_jobs[job_id]["error"] = str(e) logger.error(f"Ingestion job {job_id} failed: {e}", exc_info=True) @router.get("/ingest/status/{job_id}", response_model=IngestStatus) async def get_ingest_status(job_id: str): """Get status of an ingestion job.""" if job_id not in ingestion_jobs: raise HTTPException(status_code=404, detail="Job not found") job = ingestion_jobs[job_id] return IngestStatus( job_id=job_id, status=job["status"], documents_processed=job["documents_processed"], documents_failed=job["documents_failed"], total_documents=job["documents_submitted"], entities_extracted=job["entities_extracted"], entities_linked=job["entities_linked"], latency_ms=0 # TODO: track actual latency ) @router.post("/ingest/rebuild") async def rebuild_index( domain: str = settings.LIGHTRAG_DOMAIN, background_tasks: BackgroundTasks = None ): """ Rebuild the entire Qdrant index from PostgreSQL. Use after: - Embedding model changes - Qdrant corruption - Schema changes """ job_id = str(uuid.uuid4()) if background_tasks: background_tasks.add_task( _rebuild_index_task, job_id=job_id, domain=domain ) return { "job_id": job_id, "status": "queued", "message": f"Index rebuild queued for domain '{domain}'" } async def _rebuild_index_task(job_id: str, domain: str): """Background task to rebuild Qdrant index.""" try: ingestion_jobs[job_id] = { "status": "processing", "type": "rebuild", "documents_processed": 0 } # TODO: Implement full index rebuild ingestion_jobs[job_id]["status"] = "completed" except Exception as e: ingestion_jobs[job_id]["status"] = "failed" ingestion_jobs[job_id]["error"] = str(e)