- Complete Fastify gateway with 8-stage pipeline - Circuit breaker (opossum) per model tier - Rate limiting per caller - Ban list validation (EN/DE/auto-detected) - TIP validator (SFF-8024, part numbers, wavelengths) - Prometheus metrics - pg-boss async queue - PostgreSQL audit log + review queue - 9 prompt templates (TIP, LinkedIn, ShieldX) - Learning engine scaffolding - Auto-learning: ban-list, few-shot, routing, prompt optimizer
241 lines
7.8 KiB
Python
Executable File
241 lines
7.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
manual_trigger.py - Manually trigger a fine-tuning run without waiting for the 30-minute poll.
|
|
|
|
Usage:
|
|
# Task-specific LoRA for a single task_type
|
|
python3 scripts/manual_trigger.py --task-type tip-transceiver-enrich
|
|
|
|
# Task-specific with a lower minimum example count
|
|
python3 scripts/manual_trigger.py --task-type linkedin-post-de --min-examples 50
|
|
|
|
# General fine-tuning across all task types
|
|
python3 scripts/manual_trigger.py --general
|
|
|
|
# DPO preference learning
|
|
python3 scripts/manual_trigger.py --dpo
|
|
|
|
# Dry-run: show what would trigger without running anything
|
|
python3 scripts/manual_trigger.py --dry-run
|
|
|
|
# Use a specific config file
|
|
python3 scripts/manual_trigger.py --task-type linkedin-post-de --config /path/to/fine_tuner.yaml
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
# Ensure the package root is on the path when running as a script
|
|
_REPO_ROOT = Path(__file__).parent.parent
|
|
sys.path.insert(0, str(_REPO_ROOT.parent.parent)) # workspace root
|
|
sys.path.insert(0, str(_REPO_ROOT)) # fine-tuner root
|
|
|
|
from src.main import _connect, _create_run_record, load_config, run_fine_tuning
|
|
from src.scheduler import (
|
|
list_active_task_types,
|
|
should_trigger_dpo,
|
|
should_trigger_general,
|
|
should_trigger_task_specific,
|
|
)
|
|
from src.data_collector import get_corpus_stats
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
|
|
)
|
|
logger = logging.getLogger("manual_trigger")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="Manually trigger LLM Gateway fine-tuning runs.",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=__doc__,
|
|
)
|
|
|
|
mode = parser.add_mutually_exclusive_group(required=True)
|
|
mode.add_argument(
|
|
"--task-type",
|
|
metavar="TASK_TYPE",
|
|
help="Run task-specific LoRA fine-tuning for this task_type.",
|
|
)
|
|
mode.add_argument(
|
|
"--general",
|
|
action="store_true",
|
|
help="Run general SFT fine-tuning across all task types.",
|
|
)
|
|
mode.add_argument(
|
|
"--dpo",
|
|
action="store_true",
|
|
help="Run DPO preference learning.",
|
|
)
|
|
mode.add_argument(
|
|
"--status",
|
|
action="store_true",
|
|
help="Show corpus statistics and trigger eligibility, then exit.",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--min-examples",
|
|
type=int,
|
|
default=None,
|
|
help="Override minimum example count for this run (bypasses threshold check).",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Show what would be triggered without actually running anything.",
|
|
)
|
|
parser.add_argument(
|
|
"--config",
|
|
metavar="PATH",
|
|
default=None,
|
|
help="Path to fine_tuner.yaml (default: config/fine_tuner.yaml).",
|
|
)
|
|
parser.add_argument(
|
|
"--force",
|
|
action="store_true",
|
|
help="Skip trigger threshold checks and run regardless of example counts.",
|
|
)
|
|
|
|
return parser
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Status report
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def print_status(conn, cfg: dict) -> None:
|
|
"""Print corpus statistics and trigger eligibility for all task types."""
|
|
stats = get_corpus_stats(conn)
|
|
task_types = list_active_task_types(conn)
|
|
|
|
print("\n=== LLM Gateway Fine-Tuner Status ===\n")
|
|
print(f"DB: {cfg['database_url'].split('@')[-1]}") # hide credentials
|
|
print(f"Gateway: {cfg['gateway_url']}")
|
|
print(f"Ollama: {cfg['ollama_url']}")
|
|
print()
|
|
|
|
print("--- Corpus by Task Type ---")
|
|
print(f"{'Task Type':<35} {'Total':>6} {'Available':>10} {'Trigger?':>10}")
|
|
print("-" * 65)
|
|
|
|
for task_type in task_types:
|
|
info = stats["by_task_type"].get(task_type, {"total": 0, "available_positive": 0})
|
|
trigger = should_trigger_task_specific(conn, task_type)
|
|
print(
|
|
f"{task_type:<35} {info['total']:>6} {info['available_positive']:>10} "
|
|
f"{'YES' if trigger else 'no':>10}"
|
|
)
|
|
|
|
print()
|
|
print(f"DPO pairs available: {stats['dpo_pairs_available']}")
|
|
print(f"General trigger: {'YES' if should_trigger_general(conn) else 'no'}")
|
|
print(f"DPO trigger: {'YES' if should_trigger_dpo(conn) else 'no'}")
|
|
print()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main() -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
|
|
cfg = load_config(args.config)
|
|
db_url = cfg["database_url"]
|
|
|
|
try:
|
|
conn = _connect(db_url)
|
|
except Exception as exc:
|
|
logger.error("Cannot connect to database: %s", exc)
|
|
return 1
|
|
|
|
try:
|
|
if args.status:
|
|
print_status(conn, cfg)
|
|
return 0
|
|
|
|
# Determine trigger
|
|
if args.task_type:
|
|
run_type = "task_specific"
|
|
task_type = args.task_type
|
|
|
|
if not args.force:
|
|
eligible = should_trigger_task_specific(conn, task_type)
|
|
if not eligible:
|
|
if args.min_examples is not None:
|
|
# Override threshold — just check the count manually
|
|
from src.data_collector import collect_positive_examples
|
|
examples = collect_positive_examples(conn, task_type=task_type)
|
|
if len(examples) < args.min_examples:
|
|
logger.error(
|
|
"Not enough examples for %s: need %d, found %d",
|
|
task_type,
|
|
args.min_examples,
|
|
len(examples),
|
|
)
|
|
return 1
|
|
logger.info(
|
|
"Threshold override: proceeding with %d examples (min-examples=%d)",
|
|
len(examples),
|
|
args.min_examples,
|
|
)
|
|
else:
|
|
logger.warning(
|
|
"Task %s does not meet trigger thresholds. "
|
|
"Use --force to run anyway, or --min-examples N to override.",
|
|
task_type,
|
|
)
|
|
print_status(conn, cfg)
|
|
return 1
|
|
|
|
elif args.general:
|
|
run_type = "general"
|
|
task_type = None
|
|
if not args.force and not should_trigger_general(conn):
|
|
logger.warning(
|
|
"General fine-tuning threshold not met. Use --force to run anyway."
|
|
)
|
|
print_status(conn, cfg)
|
|
return 1
|
|
|
|
else: # --dpo
|
|
run_type = "dpo"
|
|
task_type = None
|
|
if not args.force and not should_trigger_dpo(conn):
|
|
logger.warning(
|
|
"DPO threshold not met. Use --force to run anyway."
|
|
)
|
|
print_status(conn, cfg)
|
|
return 1
|
|
|
|
trigger = {"run_type": run_type, "task_type": task_type}
|
|
|
|
if args.dry_run:
|
|
print(f"\nDRY RUN — would trigger: {trigger}")
|
|
print("No training was started (--dry-run).")
|
|
return 0
|
|
|
|
logger.info("Manual trigger: %s", trigger)
|
|
run_fine_tuning(conn, trigger, cfg)
|
|
logger.info("Manual trigger complete.")
|
|
return 0
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|