llm-gateway/packages/fine-tuner/scripts/load_blog_data_ssh.py
Rene Fichtmueller 2ca77d0aee feat: Phase 2F — Multi-Agent Integration (ADRs + Client Fallback + Tests)
- ADR-0001: Multi-Agent Coworking Architecture with LLM Gateway Orchestrator
- ADR-0002: Tier Assignment Strategy for Model Selection (cost-first escalation)
- ADR-0003: Confidence Gate Thresholds & Learning Cycle Intervals (6h/12h/24h cycles)
- ADR-0004: External Provider Fallback Chain Ordering (Cerebras → Groq → Mistral)
- Enhanced client SDK: Offline Ollama fallback, health checks, exponential backoff retry
- Integration tests: claude-code-integration.test.ts (14 test cases)
- PHASE_2F_DEPLOYMENT.md: Pre-deployment checklist, automated deploy, rollback plan
- Post-deployment verification procedures for health, client fallback, metrics
2026-04-19 21:39:44 +02:00

137 lines
4.1 KiB
Python

#!/usr/bin/env python3
"""Load blog-training-alpaca.jsonl via SSH tunnel to Erik's PostgreSQL."""
import json
import subprocess
import sys
import time
from pathlib import Path
from threading import Thread
JSONL_FILE = Path(__file__).parent.parent / "data" / "blog-training-alpaca.jsonl"
ERIK_HOST = "192.168.178.82"
ERIK_USER = "rene"
DB_HOST = "localhost"
DB_PORT = 5432
LOCAL_PORT = 15432
DB_USER = "llm"
DB_PASS = "llm_secure_2026"
DB_NAME = "llm_gateway"
TASK_TYPE = "tip_blog"
print("🔄 Loading blog training data via SSH tunnel...")
print(f" JSONL: {JSONL_FILE}")
print(f" Erik: {ERIK_HOST}")
print(f" Remote DB: {DB_HOST}:{DB_PORT}")
print()
# Try to establish SSH tunnel
print("🔗 Setting up SSH tunnel to Erik...")
tunnel_proc = None
try:
# Check if tunnel is already active
tunnel_cmd = [
"ssh",
"-N",
"-L", f"{LOCAL_PORT}:{DB_HOST}:{DB_PORT}",
f"{ERIK_USER}@{ERIK_HOST}",
]
tunnel_proc = subprocess.Popen(
tunnel_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Wait a bit for tunnel to establish
time.sleep(2)
# Check if process is still running
if tunnel_proc.poll() is not None:
_, err = tunnel_proc.communicate()
print(f"⚠️ SSH tunnel failed: {err.decode()}")
print(" Trying direct connection to Erik's PostgreSQL instead...")
# Try direct connection
DB_HOST = ERIK_HOST
LOCAL_PORT = DB_PORT
else:
print(f"✅ Tunnel established at localhost:{LOCAL_PORT}")
# Now load the data
print(f"\n📝 Loading {JSONL_FILE.stat().st_size} bytes from JSONL...")
try:
with open(JSONL_FILE) as f:
samples = [json.loads(line.strip()) for line in f if line.strip()]
except FileNotFoundError:
print(f"❌ File not found: {JSONL_FILE}")
sys.exit(1)
print(f" Found {len(samples)} samples")
# Create temporary SQL file
tmpfile = Path("/tmp/load_blog_data_ssh.sql")
sql_statements = []
sql_statements.append("BEGIN;")
sql_statements.append(f"SELECT 'Before: ' || COUNT(*) FROM learning_corpus WHERE task_type = '{TASK_TYPE}';")
# Insert samples
for sample in samples:
prompt_text = sample.get("instruction", "")
if sample.get("input"):
prompt_text += f"\n{sample['input']}"
# Escape single quotes for SQL
prompt_text = prompt_text.replace("'", "''")
completion_text = sample.get("output", "").replace("'", "''")
quality_score = sample.get("quality_score", 8.0)
source = sample.get("source", "unknown").replace("'", "''")
tags = f"ARRAY['{source}', '{TASK_TYPE}']"
sql = f"INSERT INTO learning_corpus (task_type, prompt_text, completion_text, quality_score, tags) VALUES ('{TASK_TYPE}', '{prompt_text}', '{completion_text}', {quality_score}, {tags});"
sql_statements.append(sql)
sql_statements.append(f"SELECT 'After: ' || COUNT(*) FROM learning_corpus WHERE task_type = '{TASK_TYPE}';")
sql_statements.append("COMMIT;")
with open(tmpfile, "w") as f:
f.write("\n".join(sql_statements))
# Execute via psql
print(f"\n🗄️ Executing SQL against {DB_HOST}:{LOCAL_PORT}/{DB_NAME}...")
env = {"PGPASSWORD": DB_PASS}
result = subprocess.run(
[
"/opt/homebrew/bin/psql",
"-h", DB_HOST,
"-p", str(LOCAL_PORT),
"-U", DB_USER,
"-d", DB_NAME,
"-f", str(tmpfile),
],
capture_output=True,
text=True,
env=env,
)
tmpfile.unlink(missing_ok=True)
if result.returncode == 0:
print(result.stdout)
print(f"\n✅ Loaded {len(samples)} samples into learning_corpus")
print(f"🎯 Next: ssh {ERIK_USER}@{ERIK_HOST} 'cd /opt/llm-gateway/packages/fine-tuner && python3 scripts/manual_trigger.py --general --force'")
else:
print("❌ Error loading data:")
print(result.stderr)
sys.exit(1)
finally:
if tunnel_proc:
tunnel_proc.terminate()
tunnel_proc.wait(timeout=5)