"""Document ingestion service for knowledge graph building.""" import logging import json import uuid from typing import List, Optional, Dict, Any from datetime import datetime from sqlalchemy.orm import Session from sentence_transformers import SentenceTransformer from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams, PointStruct import httpx from app.config import settings from app.models import Document, Entity, Relation logger = logging.getLogger(__name__) class IngestionService: """Process documents for knowledge graph ingestion.""" def __init__(self, session: Session): self.session = session self.embedding_model = SentenceTransformer(settings.EMBEDDING_MODEL) self.qdrant_client = QdrantClient(url=settings.QDRANT_URL) self.vector_size = 384 self.ollama_url = settings.OLLAMA_URL self.ollama_model = settings.OLLAMA_MODEL async def process_batch( self, domain: str, documents: List[Dict[str, Any]] ) -> Dict[str, int]: """ Process a batch of documents through full ingestion pipeline. Pipeline: 1. Entity extraction via Ollama 2. Entity linking with duplicate detection 3. Relation extraction 4. Embedding + storage """ stats = { "processed": 0, "failed": 0, "entities_extracted": 0, "entities_linked": 0 } for doc_data in documents: try: # Extract entities from document entities = await self._extract_entities( doc_data.get("content", ""), domain ) stats["entities_extracted"] += len(entities) # Link entities (deduplicate, match to existing) linked_entities = await self._link_entities( entities, domain ) stats["entities_linked"] += len(linked_entities) # Embed document doc_embedding = self.embedding_model.encode( doc_data.get("content", ""), convert_to_numpy=True ) # Store document doc_id = str(uuid.uuid4()) document = Document( id=doc_id, domain=domain, title=doc_data.get("title", ""), content=doc_data.get("content", ""), source=doc_data.get("source", ""), entity_ids=[e["id"] for e in linked_entities], embedding=doc_embedding.tolist(), metadata=doc_data.get("metadata", {}) ) self.session.add(document) # Index in Qdrant await self._index_in_qdrant( doc_id, domain, doc_data.get("title", ""), doc_data.get("content", ""), doc_data.get("source", ""), doc_embedding.tolist() ) self.session.commit() stats["processed"] += 1 except Exception as e: logger.error(f"Document processing error: {e}") stats["failed"] += 1 self.session.rollback() return stats async def _extract_entities( self, content: str, domain: str ) -> List[Dict[str, Any]]: """Extract entities from document text using Ollama.""" try: # Truncate content if too long (Ollama context limit) content_chunk = content[:2000] prompt = f"""Extract all entities from this text. Return JSON with list of entities. Each entity should have: name, type (e.g., transceiver, vendor, standard), description. Text: {content_chunk} Return ONLY valid JSON in this format: {{"entities": [{{"name": "...", "type": "...", "description": "..."}}]}}""" async with httpx.AsyncClient(timeout=30) as client: response = await client.post( f"{self.ollama_url}/api/generate", json={ "model": self.ollama_model, "prompt": prompt, "stream": False } ) if response.status_code != 200: logger.error(f"Ollama error: {response.text}") return [] result = response.json() response_text = result.get("response", "") # Parse JSON from response try: # Try to extract JSON from response start = response_text.find("{") end = response_text.rfind("}") + 1 if start >= 0 and end > start: json_str = response_text[start:end] parsed = json.loads(json_str) return parsed.get("entities", []) except json.JSONDecodeError: logger.warning("Failed to parse Ollama JSON response") return [] except Exception as e: logger.error(f"Entity extraction error: {e}") return [] async def _link_entities( self, entities: List[Dict[str, Any]], domain: str ) -> List[Dict[str, Any]]: """Link extracted entities to existing entities or create new ones.""" linked = [] for entity in entities: try: # Check if entity with same name exists existing = self.session.query(Entity).filter( Entity.domain == domain, Entity.name == entity.get("name") ).first() if existing: linked.append({ "id": str(existing.id), "name": existing.name, "type": existing.entity_type }) else: # Create new entity entity_id = uuid.uuid4() entity_embedding = self.embedding_model.encode( entity.get("name", ""), convert_to_numpy=True ) new_entity = Entity( id=entity_id, domain=domain, name=entity.get("name", ""), description=entity.get("description", ""), entity_type=entity.get("type", "unknown"), embedding=entity_embedding.tolist(), confidence=0.8 ) self.session.add(new_entity) self.session.flush() linked.append({ "id": str(entity_id), "name": entity.get("name", ""), "type": entity.get("type", "unknown") }) except Exception as e: logger.error(f"Entity linking error: {e}") continue return linked async def _index_in_qdrant( self, doc_id: str, domain: str, title: str, content: str, source: str, embedding: List[float] ): """Index document in Qdrant vector database.""" try: collection_name = f"documents_{domain}" # Ensure collection exists try: self.qdrant_client.get_collection(collection_name) except Exception: # Create collection if it doesn't exist self.qdrant_client.create_collection( collection_name=collection_name, vectors_config=VectorParams( size=self.vector_size, distance=Distance.COSINE ) ) # Upsert point point = PointStruct( id=hash(doc_id) % (2**31), # Convert to positive int vector=embedding, payload={ "doc_id": doc_id, "title": title, "content": content, "source": source, "domain": domain } ) self.qdrant_client.upsert( collection_name=collection_name, points=[point] ) except Exception as e: logger.error(f"Qdrant indexing error: {e}")