"""Evaluation service for retrieval quality metrics.""" import logging import math from typing import List, Dict, Any, Optional from sqlalchemy.orm import Session from app.models import EvaluationResult from app.services.retrieval_service import RetrievalService logger = logging.getLogger(__name__) class EvaluationService: """Calculate retrieval quality metrics.""" def __init__(self, session: Session): self.session = session self.retrieval = RetrievalService(session) async def evaluate( self, domain: str, eval_set: str, queries: List[Dict[str, Any]], metrics: List[str], compare_to: Optional[str] = None ) -> Dict[str, Any]: """ Evaluate retrieval quality using evaluation set. Supports metrics: precision@K, recall@K, mrr@K, ndcg@K """ results_per_metric = {} for metric_name in metrics: metric_type, k = self._parse_metric(metric_name) metric_scores = [] for query_obj in queries: # Run hybrid query result = await self.retrieval.hybrid_query( query_text=query_obj.get("query", ""), domain=domain, top_k=k, extract_entities=False ) # Extract retrieved doc IDs retrieved_ids = [r.get("id") for r in result.get("results", [])] ground_truth_ids = query_obj.get("ground_truth_doc_ids", []) # Calculate metric for this query if metric_type == "precision": score = self._precision_at_k(retrieved_ids, ground_truth_ids, k) elif metric_type == "recall": score = self._recall_at_k(retrieved_ids, ground_truth_ids, k) elif metric_type == "mrr": score = self._mrr_at_k(retrieved_ids, ground_truth_ids, k) elif metric_type == "ndcg": score = self._ndcg_at_k(retrieved_ids, ground_truth_ids, k) else: score = 0.0 metric_scores.append(score) # Average across all queries avg_score = sum(metric_scores) / len(metric_scores) if metric_scores else 0.0 # Get baseline for comparison baseline_value = None improvement_pct = None if compare_to: baseline_value = self._get_baseline(eval_set, metric_name, compare_to) if baseline_value is not None: improvement_pct = ( ((avg_score - baseline_value) / baseline_value * 100) if baseline_value > 0 else 0 ) results_per_metric[metric_name] = { "metric": metric_name, "value": avg_score, "baseline_value": baseline_value, "improvement_pct": improvement_pct } # Store evaluation result self._store_evaluation_result( eval_set, domain, metric_name, avg_score, baseline_value, improvement_pct ) return { "eval_set": eval_set, "domain": domain, "metrics": list(results_per_metric.values()), "total_queries": len(queries), "latency_p95_ms": 0, # TODO: track actual latency "entity_extraction_accuracy": 0 # TODO: calculate from extracted vs ground truth } def _parse_metric(self, metric_name: str) -> tuple: """Parse metric name like 'precision@5' into ('precision', 5).""" parts = metric_name.split("@") if len(parts) == 2: metric_type = parts[0].lower() k = int(parts[1]) return metric_type, k return metric_name.lower(), 10 # Default K=10 def _precision_at_k( self, retrieved: List[str], ground_truth: List[str], k: int ) -> float: """Precision@K: % of top-K results that are relevant.""" if not retrieved or not ground_truth: return 0.0 top_k = retrieved[:k] relevant_count = sum(1 for doc_id in top_k if doc_id in ground_truth) return relevant_count / len(top_k) if top_k else 0.0 def _recall_at_k( self, retrieved: List[str], ground_truth: List[str], k: int ) -> float: """Recall@K: % of relevant documents that appear in top-K.""" if not ground_truth: return 0.0 top_k = retrieved[:k] relevant_count = sum(1 for doc_id in top_k if doc_id in ground_truth) return relevant_count / len(ground_truth) if ground_truth else 0.0 def _mrr_at_k( self, retrieved: List[str], ground_truth: List[str], k: int ) -> float: """Mean Reciprocal Rank: inverse of rank of first relevant result.""" if not ground_truth: return 0.0 top_k = retrieved[:k] for rank, doc_id in enumerate(top_k, 1): if doc_id in ground_truth: return 1.0 / rank return 0.0 def _ndcg_at_k( self, retrieved: List[str], ground_truth: List[str], k: int ) -> float: """Normalized Discounted Cumulative Gain.""" if not ground_truth or not retrieved: return 0.0 # Create relevance scores (1 if in ground truth, 0 otherwise) dcg = 0.0 for rank, doc_id in enumerate(retrieved[:k], 1): if doc_id in ground_truth: dcg += 1.0 / math.log2(rank + 1) # Calculate ideal DCG idcg = 0.0 for rank in range(1, min(len(ground_truth) + 1, k + 1)): idcg += 1.0 / math.log2(rank + 1) return dcg / idcg if idcg > 0 else 0.0 def _get_baseline( self, eval_set: str, metric_name: str, method: str ) -> Optional[float]: """Get baseline metric value for comparison.""" # Hardcoded baselines from eval.py baselines = { "transceiver-50qa": { "precision@5": 0.65, "recall@10": 0.72, "mrr@5": 0.58, "ndcg@10": 0.70 } } if eval_set not in baselines: return None return baselines[eval_set].get(metric_name) def _store_evaluation_result( self, eval_set: str, domain: str, metric_name: str, metric_value: float, baseline_value: Optional[float], improvement_pct: Optional[float] ): """Store evaluation result in database.""" try: result = EvaluationResult( eval_set_name=eval_set, domain=domain, metric_name=metric_name, metric_value=metric_value, baseline_value=baseline_value, improvement_pct=improvement_pct ) self.session.add(result) self.session.commit() except Exception as e: logger.error(f"Error storing evaluation result: {e}") self.session.rollback()