Rene Fichtmueller 3a00ff4d33 feat: initial llm-gateway implementation
- Complete Fastify gateway with 8-stage pipeline
- Circuit breaker (opossum) per model tier
- Rate limiting per caller
- Ban list validation (EN/DE/auto-detected)
- TIP validator (SFF-8024, part numbers, wavelengths)
- Prometheus metrics
- pg-boss async queue
- PostgreSQL audit log + review queue
- 9 prompt templates (TIP, LinkedIn, ShieldX)
- Learning engine scaffolding
- Auto-learning: ban-list, few-shot, routing, prompt optimizer
2026-04-02 22:48:55 +02:00

241 lines
7.8 KiB
Python
Executable File

#!/usr/bin/env python3
"""
manual_trigger.py - Manually trigger a fine-tuning run without waiting for the 30-minute poll.
Usage:
# Task-specific LoRA for a single task_type
python3 scripts/manual_trigger.py --task-type tip-transceiver-enrich
# Task-specific with a lower minimum example count
python3 scripts/manual_trigger.py --task-type linkedin-post-de --min-examples 50
# General fine-tuning across all task types
python3 scripts/manual_trigger.py --general
# DPO preference learning
python3 scripts/manual_trigger.py --dpo
# Dry-run: show what would trigger without running anything
python3 scripts/manual_trigger.py --dry-run
# Use a specific config file
python3 scripts/manual_trigger.py --task-type linkedin-post-de --config /path/to/fine_tuner.yaml
"""
from __future__ import annotations
import argparse
import logging
import sys
import uuid
from pathlib import Path
# Ensure the package root is on the path when running as a script
_REPO_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(_REPO_ROOT.parent.parent)) # workspace root
sys.path.insert(0, str(_REPO_ROOT)) # fine-tuner root
from src.main import _connect, _create_run_record, load_config, run_fine_tuning
from src.scheduler import (
list_active_task_types,
should_trigger_dpo,
should_trigger_general,
should_trigger_task_specific,
)
from src.data_collector import get_corpus_stats
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
)
logger = logging.getLogger("manual_trigger")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Manually trigger LLM Gateway fine-tuning runs.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
mode = parser.add_mutually_exclusive_group(required=True)
mode.add_argument(
"--task-type",
metavar="TASK_TYPE",
help="Run task-specific LoRA fine-tuning for this task_type.",
)
mode.add_argument(
"--general",
action="store_true",
help="Run general SFT fine-tuning across all task types.",
)
mode.add_argument(
"--dpo",
action="store_true",
help="Run DPO preference learning.",
)
mode.add_argument(
"--status",
action="store_true",
help="Show corpus statistics and trigger eligibility, then exit.",
)
parser.add_argument(
"--min-examples",
type=int,
default=None,
help="Override minimum example count for this run (bypasses threshold check).",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be triggered without actually running anything.",
)
parser.add_argument(
"--config",
metavar="PATH",
default=None,
help="Path to fine_tuner.yaml (default: config/fine_tuner.yaml).",
)
parser.add_argument(
"--force",
action="store_true",
help="Skip trigger threshold checks and run regardless of example counts.",
)
return parser
# ---------------------------------------------------------------------------
# Status report
# ---------------------------------------------------------------------------
def print_status(conn, cfg: dict) -> None:
"""Print corpus statistics and trigger eligibility for all task types."""
stats = get_corpus_stats(conn)
task_types = list_active_task_types(conn)
print("\n=== LLM Gateway Fine-Tuner Status ===\n")
print(f"DB: {cfg['database_url'].split('@')[-1]}") # hide credentials
print(f"Gateway: {cfg['gateway_url']}")
print(f"Ollama: {cfg['ollama_url']}")
print()
print("--- Corpus by Task Type ---")
print(f"{'Task Type':<35} {'Total':>6} {'Available':>10} {'Trigger?':>10}")
print("-" * 65)
for task_type in task_types:
info = stats["by_task_type"].get(task_type, {"total": 0, "available_positive": 0})
trigger = should_trigger_task_specific(conn, task_type)
print(
f"{task_type:<35} {info['total']:>6} {info['available_positive']:>10} "
f"{'YES' if trigger else 'no':>10}"
)
print()
print(f"DPO pairs available: {stats['dpo_pairs_available']}")
print(f"General trigger: {'YES' if should_trigger_general(conn) else 'no'}")
print(f"DPO trigger: {'YES' if should_trigger_dpo(conn) else 'no'}")
print()
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> int:
parser = build_parser()
args = parser.parse_args()
cfg = load_config(args.config)
db_url = cfg["database_url"]
try:
conn = _connect(db_url)
except Exception as exc:
logger.error("Cannot connect to database: %s", exc)
return 1
try:
if args.status:
print_status(conn, cfg)
return 0
# Determine trigger
if args.task_type:
run_type = "task_specific"
task_type = args.task_type
if not args.force:
eligible = should_trigger_task_specific(conn, task_type)
if not eligible:
if args.min_examples is not None:
# Override threshold — just check the count manually
from src.data_collector import collect_positive_examples
examples = collect_positive_examples(conn, task_type=task_type)
if len(examples) < args.min_examples:
logger.error(
"Not enough examples for %s: need %d, found %d",
task_type,
args.min_examples,
len(examples),
)
return 1
logger.info(
"Threshold override: proceeding with %d examples (min-examples=%d)",
len(examples),
args.min_examples,
)
else:
logger.warning(
"Task %s does not meet trigger thresholds. "
"Use --force to run anyway, or --min-examples N to override.",
task_type,
)
print_status(conn, cfg)
return 1
elif args.general:
run_type = "general"
task_type = None
if not args.force and not should_trigger_general(conn):
logger.warning(
"General fine-tuning threshold not met. Use --force to run anyway."
)
print_status(conn, cfg)
return 1
else: # --dpo
run_type = "dpo"
task_type = None
if not args.force and not should_trigger_dpo(conn):
logger.warning(
"DPO threshold not met. Use --force to run anyway."
)
print_status(conn, cfg)
return 1
trigger = {"run_type": run_type, "task_type": task_type}
if args.dry_run:
print(f"\nDRY RUN — would trigger: {trigger}")
print("No training was started (--dry-run).")
return 0
logger.info("Manual trigger: %s", trigger)
run_fine_tuning(conn, trigger, cfg)
logger.info("Manual trigger complete.")
return 0
finally:
conn.close()
if __name__ == "__main__":
sys.exit(main())