#!/usr/bin/env python3 """ manual_trigger.py - Manually trigger a fine-tuning run without waiting for the 30-minute poll. Usage: # Task-specific LoRA for a single task_type python3 scripts/manual_trigger.py --task-type tip-transceiver-enrich # Task-specific with a lower minimum example count python3 scripts/manual_trigger.py --task-type linkedin-post-de --min-examples 50 # General fine-tuning across all task types python3 scripts/manual_trigger.py --general # DPO preference learning python3 scripts/manual_trigger.py --dpo # Dry-run: show what would trigger without running anything python3 scripts/manual_trigger.py --dry-run # Use a specific config file python3 scripts/manual_trigger.py --task-type linkedin-post-de --config /path/to/fine_tuner.yaml """ from __future__ import annotations import argparse import logging import sys import uuid from pathlib import Path # Ensure the package root is on the path when running as a script _REPO_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(_REPO_ROOT.parent.parent)) # workspace root sys.path.insert(0, str(_REPO_ROOT)) # fine-tuner root from src.main import _connect, _create_run_record, load_config, run_fine_tuning from src.scheduler import ( list_active_task_types, should_trigger_dpo, should_trigger_general, should_trigger_task_specific, ) from src.data_collector import get_corpus_stats logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s [%(name)s] %(message)s", ) logger = logging.getLogger("manual_trigger") # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Manually trigger LLM Gateway fine-tuning runs.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) mode = parser.add_mutually_exclusive_group(required=True) mode.add_argument( "--task-type", metavar="TASK_TYPE", help="Run task-specific LoRA fine-tuning for this task_type.", ) mode.add_argument( "--general", action="store_true", help="Run general SFT fine-tuning across all task types.", ) mode.add_argument( "--dpo", action="store_true", help="Run DPO preference learning.", ) mode.add_argument( "--status", action="store_true", help="Show corpus statistics and trigger eligibility, then exit.", ) parser.add_argument( "--min-examples", type=int, default=None, help="Override minimum example count for this run (bypasses threshold check).", ) parser.add_argument( "--dry-run", action="store_true", help="Show what would be triggered without actually running anything.", ) parser.add_argument( "--config", metavar="PATH", default=None, help="Path to fine_tuner.yaml (default: config/fine_tuner.yaml).", ) parser.add_argument( "--force", action="store_true", help="Skip trigger threshold checks and run regardless of example counts.", ) return parser # --------------------------------------------------------------------------- # Status report # --------------------------------------------------------------------------- def print_status(conn, cfg: dict) -> None: """Print corpus statistics and trigger eligibility for all task types.""" stats = get_corpus_stats(conn) task_types = list_active_task_types(conn) print("\n=== LLM Gateway Fine-Tuner Status ===\n") print(f"DB: {cfg['database_url'].split('@')[-1]}") # hide credentials print(f"Gateway: {cfg['gateway_url']}") print(f"Ollama: {cfg['ollama_url']}") print() print("--- Corpus by Task Type ---") print(f"{'Task Type':<35} {'Total':>6} {'Available':>10} {'Trigger?':>10}") print("-" * 65) for task_type in task_types: info = stats["by_task_type"].get(task_type, {"total": 0, "available_positive": 0}) trigger = should_trigger_task_specific(conn, task_type) print( f"{task_type:<35} {info['total']:>6} {info['available_positive']:>10} " f"{'YES' if trigger else 'no':>10}" ) print() print(f"DPO pairs available: {stats['dpo_pairs_available']}") print(f"General trigger: {'YES' if should_trigger_general(conn) else 'no'}") print(f"DPO trigger: {'YES' if should_trigger_dpo(conn) else 'no'}") print() # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> int: parser = build_parser() args = parser.parse_args() cfg = load_config(args.config) db_url = cfg["database_url"] try: conn = _connect(db_url) except Exception as exc: logger.error("Cannot connect to database: %s", exc) return 1 try: if args.status: print_status(conn, cfg) return 0 # Determine trigger if args.task_type: run_type = "task_specific" task_type = args.task_type if not args.force: eligible = should_trigger_task_specific(conn, task_type) if not eligible: if args.min_examples is not None: # Override threshold — just check the count manually from src.data_collector import collect_positive_examples examples = collect_positive_examples(conn, task_type=task_type) if len(examples) < args.min_examples: logger.error( "Not enough examples for %s: need %d, found %d", task_type, args.min_examples, len(examples), ) return 1 logger.info( "Threshold override: proceeding with %d examples (min-examples=%d)", len(examples), args.min_examples, ) else: logger.warning( "Task %s does not meet trigger thresholds. " "Use --force to run anyway, or --min-examples N to override.", task_type, ) print_status(conn, cfg) return 1 elif args.general: run_type = "general" task_type = None if not args.force and not should_trigger_general(conn): logger.warning( "General fine-tuning threshold not met. Use --force to run anyway." ) print_status(conn, cfg) return 1 else: # --dpo run_type = "dpo" task_type = None if not args.force and not should_trigger_dpo(conn): logger.warning( "DPO threshold not met. Use --force to run anyway." ) print_status(conn, cfg) return 1 trigger = {"run_type": run_type, "task_type": task_type} if args.dry_run: print(f"\nDRY RUN — would trigger: {trigger}") print("No training was started (--dry-run).") return 0 logger.info("Manual trigger: %s", trigger) run_fine_tuning(conn, trigger, cfg) logger.info("Manual trigger complete.") return 0 finally: conn.close() if __name__ == "__main__": sys.exit(main())