#!/usr/bin/env python3 """Load blog-training-alpaca.jsonl via SSH tunnel to Erik's PostgreSQL.""" import json import subprocess import sys import time from pathlib import Path from threading import Thread JSONL_FILE = Path(__file__).parent.parent / "data" / "blog-training-alpaca.jsonl" ERIK_HOST = "192.168.178.82" ERIK_USER = "rene" DB_HOST = "localhost" DB_PORT = 5432 LOCAL_PORT = 15432 DB_USER = "llm" DB_PASS = "llm_secure_2026" DB_NAME = "llm_gateway" TASK_TYPE = "tip_blog" print("šŸ”„ Loading blog training data via SSH tunnel...") print(f" JSONL: {JSONL_FILE}") print(f" Erik: {ERIK_HOST}") print(f" Remote DB: {DB_HOST}:{DB_PORT}") print() # Try to establish SSH tunnel print("šŸ”— Setting up SSH tunnel to Erik...") tunnel_proc = None try: # Check if tunnel is already active tunnel_cmd = [ "ssh", "-N", "-L", f"{LOCAL_PORT}:{DB_HOST}:{DB_PORT}", f"{ERIK_USER}@{ERIK_HOST}", ] tunnel_proc = subprocess.Popen( tunnel_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) # Wait a bit for tunnel to establish time.sleep(2) # Check if process is still running if tunnel_proc.poll() is not None: _, err = tunnel_proc.communicate() print(f"āš ļø SSH tunnel failed: {err.decode()}") print(" Trying direct connection to Erik's PostgreSQL instead...") # Try direct connection DB_HOST = ERIK_HOST LOCAL_PORT = DB_PORT else: print(f"āœ… Tunnel established at localhost:{LOCAL_PORT}") # Now load the data print(f"\nšŸ“ Loading {JSONL_FILE.stat().st_size} bytes from JSONL...") try: with open(JSONL_FILE) as f: samples = [json.loads(line.strip()) for line in f if line.strip()] except FileNotFoundError: print(f"āŒ File not found: {JSONL_FILE}") sys.exit(1) print(f" Found {len(samples)} samples") # Create temporary SQL file tmpfile = Path("/tmp/load_blog_data_ssh.sql") sql_statements = [] sql_statements.append("BEGIN;") sql_statements.append(f"SELECT 'Before: ' || COUNT(*) FROM learning_corpus WHERE task_type = '{TASK_TYPE}';") # Insert samples for sample in samples: prompt_text = sample.get("instruction", "") if sample.get("input"): prompt_text += f"\n{sample['input']}" # Escape single quotes for SQL prompt_text = prompt_text.replace("'", "''") completion_text = sample.get("output", "").replace("'", "''") quality_score = sample.get("quality_score", 8.0) source = sample.get("source", "unknown").replace("'", "''") tags = f"ARRAY['{source}', '{TASK_TYPE}']" sql = f"INSERT INTO learning_corpus (task_type, prompt_text, completion_text, quality_score, tags) VALUES ('{TASK_TYPE}', '{prompt_text}', '{completion_text}', {quality_score}, {tags});" sql_statements.append(sql) sql_statements.append(f"SELECT 'After: ' || COUNT(*) FROM learning_corpus WHERE task_type = '{TASK_TYPE}';") sql_statements.append("COMMIT;") with open(tmpfile, "w") as f: f.write("\n".join(sql_statements)) # Execute via psql print(f"\nšŸ—„ļø Executing SQL against {DB_HOST}:{LOCAL_PORT}/{DB_NAME}...") env = {"PGPASSWORD": DB_PASS} result = subprocess.run( [ "/opt/homebrew/bin/psql", "-h", DB_HOST, "-p", str(LOCAL_PORT), "-U", DB_USER, "-d", DB_NAME, "-f", str(tmpfile), ], capture_output=True, text=True, env=env, ) tmpfile.unlink(missing_ok=True) if result.returncode == 0: print(result.stdout) print(f"\nāœ… Loaded {len(samples)} samples into learning_corpus") print(f"šŸŽÆ Next: ssh {ERIK_USER}@{ERIK_HOST} 'cd /opt/llm-gateway/packages/fine-tuner && python3 scripts/manual_trigger.py --general --force'") else: print("āŒ Error loading data:") print(result.stderr) sys.exit(1) finally: if tunnel_proc: tunnel_proc.terminate() tunnel_proc.wait(timeout=5)