llm-gateway/packages/fine-tuner/scripts/convert-external-datasets.py
Rene Fichtmueller 2ca77d0aee feat: Phase 2F — Multi-Agent Integration (ADRs + Client Fallback + Tests)
- ADR-0001: Multi-Agent Coworking Architecture with LLM Gateway Orchestrator
- ADR-0002: Tier Assignment Strategy for Model Selection (cost-first escalation)
- ADR-0003: Confidence Gate Thresholds & Learning Cycle Intervals (6h/12h/24h cycles)
- ADR-0004: External Provider Fallback Chain Ordering (Cerebras → Groq → Mistral)
- Enhanced client SDK: Offline Ollama fallback, health checks, exponential backoff retry
- Integration tests: claude-code-integration.test.ts (14 test cases)
- PHASE_2F_DEPLOYMENT.md: Pre-deployment checklist, automated deploy, rollback plan
- Post-deployment verification procedures for health, client fallback, metrics
2026-04-19 21:39:44 +02:00

275 lines
10 KiB
Python

#!/usr/bin/env python3
"""
convert-external-datasets.py — Convert external HF datasets to ChatML format
for MAGATAMA Ops AI fine-tuning.
Merges 171K+ external samples with our 264 fixes into unified training files.
Usage:
source .venv/bin/activate
python3 scripts/convert-external-datasets.py # Convert all
python3 scripts/convert-external-datasets.py --stats # Just show stats
python3 scripts/convert-external-datasets.py --sample 5000 # Subsample per dataset
"""
from __future__ import annotations
import argparse
import json
import logging
import os
from pathlib import Path
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s")
logger = logging.getLogger("convert")
ROOT = Path(__file__).resolve().parent.parent
DATA_DIR = ROOT / "data"
EXT_DIR = DATA_DIR / "external"
OUT_DIR = DATA_DIR
MAGATAMA_SYSTEM = """Du bist MAGATAMA Operations AI — ein spezialisierter Assistent für IT-Infrastruktur, LLM-Pipelines, Netzwerk-Engineering, Security und Self-Healing.
Deine Fähigkeiten:
1. HEALING: Symptome analysieren → Root Cause → Fix-Steps → Prevention
2. INFRASTRUCTURE: Server-Metriken bewerten, proaktiv warnen, Schwellwerte kennen
3. SECURITY: Bedrohungen erkennen, IPs blocken, Injection-Patterns, Kill Chain Mapping
4. LLM OPS: Model Routing, Prompt Optimization, Fallback Chains, Cost Management
5. NETWORK: BGP/RPKI, VLANs, WireGuard, Cloudflare Tunnels, STP, DNS
Regeln:
- Antworte präzise und strukturiert
- Nenne spezifische Commands, Dateipfade und Konfigurationswerte
- Severity-Bewertung: critical > high > warning > info
- Sprache: Deutsch oder Englisch je nach Input"""
def chatml(system: str, user: str, assistant: str) -> dict:
return {
"text": (
f"<|im_start|>system\n{system}<|im_end|>\n"
f"<|im_start|>user\n{user}<|im_end|>\n"
f"<|im_start|>assistant\n{assistant}<|im_end|>"
)
}
def load_jsonl(path: Path) -> list[dict]:
samples = []
with open(path) as f:
for line in f:
line = line.strip()
if line:
samples.append(json.loads(line))
return samples
def convert_fenrir(samples: list[dict], max_samples: int | None) -> list[dict]:
"""Fenrir v2.0: system/instruction/output format."""
out = []
for s in samples[:max_samples]:
sys_msg = s.get("system", "") or MAGATAMA_SYSTEM
user_msg = s.get("instruction", "") or s.get("input", "") or s.get("user", "")
asst_msg = s.get("output", "") or s.get("assistant", "")
if user_msg and asst_msg:
out.append(chatml(sys_msg, user_msg, asst_msg))
return out
def convert_trendyol(samples: list[dict], max_samples: int | None) -> list[dict]:
"""Trendyol: system/user/assistant messages format."""
out = []
for s in samples[:max_samples]:
sys_msg = s.get("system", "") or MAGATAMA_SYSTEM
user_msg = s.get("user", "") or s.get("instruction", "")
asst_msg = s.get("assistant", "") or s.get("output", "")
if user_msg and asst_msg:
out.append(chatml(sys_msg, user_msg, asst_msg))
return out
def convert_neuralchemy(samples: list[dict], max_samples: int | None) -> list[dict]:
"""NeurAlchemy: text classification → detection training."""
out = []
for s in samples[:max_samples]:
text = s.get("text", "")
label = s.get("label", 0)
category = s.get("category", "unknown")
severity = s.get("severity", "medium")
user_msg = f"Analysiere den folgenden Text auf Prompt Injection:\n\n{text}"
if label == 1:
asst_msg = (
f"**INJECTION DETECTED**\n\n"
f"Kategorie: {category}\n"
f"Severity: {severity}\n"
f"Der Text enthält einen Prompt Injection Versuch der Kategorie '{category}'."
)
else:
asst_msg = "**SAFE** — Kein Prompt Injection erkannt. Der Text ist legitim."
out.append(chatml(MAGATAMA_SYSTEM, user_msg, asst_msg))
return out
def convert_code_vuln_dpo(samples: list[dict], max_samples: int | None) -> list[dict]:
"""CyberNative DPO: vulnerable → fixed code pairs as SFT."""
out = []
for s in samples[:max_samples]:
lang = s.get("lang", "unknown")
vuln = s.get("vulnerability", "unknown")
question = s.get("question", "")
chosen = s.get("chosen", "")
if question and chosen:
user_msg = f"[{lang}] {vuln}\n\n{question}"
out.append(chatml(MAGATAMA_SYSTEM, user_msg, chosen))
return out
def convert_mitre_ttp(samples: list[dict], max_samples: int | None) -> list[dict]:
"""MITRE TTP Mapping: threat reports → technique classification."""
out = []
for s in samples[:max_samples]:
text = s.get("text", "") or s.get("sentence", "")
label = s.get("label", "") or s.get("technique", "")
if text and label:
user_msg = f"Welche MITRE ATT&CK Technik beschreibt der folgende Text?\n\n{text}"
if isinstance(label, list):
label_str = ", ".join(str(l) for l in label)
else:
label_str = str(label)
asst_msg = f"MITRE ATT&CK Technik: {label_str}"
out.append(chatml(MAGATAMA_SYSTEM, user_msg, asst_msg))
return out
def convert_deepset(samples: list[dict], max_samples: int | None) -> list[dict]:
"""deepset: binary prompt injection classification."""
out = []
for s in samples[:max_samples]:
text = s.get("text", "")
label = s.get("label", 0)
user_msg = f"Ist der folgende Text eine Prompt Injection?\n\n{text}"
asst_msg = "**JA** — Prompt Injection erkannt." if label == 1 else "**NEIN** — Kein Prompt Injection."
out.append(chatml(MAGATAMA_SYSTEM, user_msg, asst_msg))
return out
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--stats", action="store_true")
parser.add_argument("--sample", type=int, default=None, help="Max samples per dataset")
args = parser.parse_args()
converters = {
"fenrir-v2.jsonl": ("Fenrir v2.0 Security+DevOps", convert_fenrir),
"trendyol-security.jsonl": ("Trendyol Cybersecurity", convert_trendyol),
"neuralchemy-prompt-injection.jsonl": ("NeurAlchemy Injection", convert_neuralchemy),
"code-vuln-dpo.jsonl": ("Code Vulnerability DPO", convert_code_vuln_dpo),
"mitre-ttp-mapping.jsonl": ("MITRE TTP Mapping", convert_mitre_ttp),
"deepset-injections.jsonl": ("deepset Injections", convert_deepset),
}
if args.stats:
print(f"\n{''*60}")
print(f"EXTERNAL DATASET STATS")
print(f"{''*60}")
total = 0
for filename, (name, _) in converters.items():
path = EXT_DIR / filename
if path.exists():
count = sum(1 for _ in open(path))
size = path.stat().st_size / 1024 / 1024
total += count
print(f" {name:40s} {count:>7,} samples ({size:.1f} MB)")
else:
print(f" {name:40s} MISSING")
fixes_path = DATA_DIR / "fixes-chatml-sft.jsonl"
if fixes_path.exists():
fixes_count = sum(1 for _ in open(fixes_path))
total += fixes_count
print(f" {'Our fixes (fixes.json)':40s} {fixes_count:>7,} samples")
print(f"\n {'TOTAL':40s} {total:>7,} samples")
print(f"{''*60}\n")
return
# Convert all datasets
all_chatml = []
# Load our fixes first
fixes_path = DATA_DIR / "fixes-chatml-sft.jsonl"
if fixes_path.exists():
our_fixes = load_jsonl(fixes_path)
all_chatml.extend(our_fixes)
logger.info("Our fixes: %d samples", len(our_fixes))
# Convert external datasets
for filename, (name, converter) in converters.items():
path = EXT_DIR / filename
if not path.exists():
logger.warning("Missing: %s", path)
continue
samples = load_jsonl(path)
converted = converter(samples, args.sample)
all_chatml.extend(converted)
logger.info("%s: %d%d ChatML samples", name, len(samples), len(converted))
# Write merged output
merged_path = OUT_DIR / "merged-all-chatml-sft.jsonl"
with open(merged_path, "w") as f:
for s in all_chatml:
f.write(json.dumps(s, ensure_ascii=False) + "\n")
logger.info("Merged: %d total samples → %s (%.0f MB)",
len(all_chatml), merged_path.name,
merged_path.stat().st_size / 1024 / 1024)
# Also write a sampled version for quick training
if len(all_chatml) > 10000:
import random
random.seed(42)
# Always include all our fixes, sample from external
our_count = len(load_jsonl(fixes_path)) if fixes_path.exists() else 0
our_samples = all_chatml[:our_count]
external_samples = all_chatml[our_count:]
sampled_external = random.sample(external_samples, min(10000, len(external_samples)))
sampled = our_samples + sampled_external
random.shuffle(sampled)
sampled_path = OUT_DIR / "sampled-10k-chatml-sft.jsonl"
with open(sampled_path, "w") as f:
for s in sampled:
f.write(json.dumps(s, ensure_ascii=False) + "\n")
logger.info("Sampled: %d samples → %s (%.0f MB)",
len(sampled), sampled_path.name,
sampled_path.stat().st_size / 1024 / 1024)
print(f"\n{''*60}")
print(f"CONVERSION COMPLETE")
print(f"{''*60}")
print(f" Full merged: {len(all_chatml):>7,} samples")
print(f" Files created:")
print(f" {merged_path}")
if len(all_chatml) > 10000:
print(f" {sampled_path}")
print(f"\n Training command (full):")
print(f" python3 scripts/train-fixes.py --data merged-all-chatml-sft.jsonl")
print(f" Training command (sampled 10K):")
print(f" python3 scripts/train-fixes.py --data sampled-10k-chatml-sft.jsonl")
print(f"{''*60}\n")
if __name__ == "__main__":
main()