#!/usr/bin/env python3 """ convert-external-datasets.py — Convert external HF datasets to ChatML format for MAGATAMA Ops AI fine-tuning. Merges 171K+ external samples with our 264 fixes into unified training files. Usage: source .venv/bin/activate python3 scripts/convert-external-datasets.py # Convert all python3 scripts/convert-external-datasets.py --stats # Just show stats python3 scripts/convert-external-datasets.py --sample 5000 # Subsample per dataset """ from __future__ import annotations import argparse import json import logging import os from pathlib import Path logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s") logger = logging.getLogger("convert") ROOT = Path(__file__).resolve().parent.parent DATA_DIR = ROOT / "data" EXT_DIR = DATA_DIR / "external" OUT_DIR = DATA_DIR MAGATAMA_SYSTEM = """Du bist MAGATAMA Operations AI — ein spezialisierter Assistent für IT-Infrastruktur, LLM-Pipelines, Netzwerk-Engineering, Security und Self-Healing. Deine Fähigkeiten: 1. HEALING: Symptome analysieren → Root Cause → Fix-Steps → Prevention 2. INFRASTRUCTURE: Server-Metriken bewerten, proaktiv warnen, Schwellwerte kennen 3. SECURITY: Bedrohungen erkennen, IPs blocken, Injection-Patterns, Kill Chain Mapping 4. LLM OPS: Model Routing, Prompt Optimization, Fallback Chains, Cost Management 5. NETWORK: BGP/RPKI, VLANs, WireGuard, Cloudflare Tunnels, STP, DNS Regeln: - Antworte präzise und strukturiert - Nenne spezifische Commands, Dateipfade und Konfigurationswerte - Severity-Bewertung: critical > high > warning > info - Sprache: Deutsch oder Englisch je nach Input""" def chatml(system: str, user: str, assistant: str) -> dict: return { "text": ( f"<|im_start|>system\n{system}<|im_end|>\n" f"<|im_start|>user\n{user}<|im_end|>\n" f"<|im_start|>assistant\n{assistant}<|im_end|>" ) } def load_jsonl(path: Path) -> list[dict]: samples = [] with open(path) as f: for line in f: line = line.strip() if line: samples.append(json.loads(line)) return samples def convert_fenrir(samples: list[dict], max_samples: int | None) -> list[dict]: """Fenrir v2.0: system/instruction/output format.""" out = [] for s in samples[:max_samples]: sys_msg = s.get("system", "") or MAGATAMA_SYSTEM user_msg = s.get("instruction", "") or s.get("input", "") or s.get("user", "") asst_msg = s.get("output", "") or s.get("assistant", "") if user_msg and asst_msg: out.append(chatml(sys_msg, user_msg, asst_msg)) return out def convert_trendyol(samples: list[dict], max_samples: int | None) -> list[dict]: """Trendyol: system/user/assistant messages format.""" out = [] for s in samples[:max_samples]: sys_msg = s.get("system", "") or MAGATAMA_SYSTEM user_msg = s.get("user", "") or s.get("instruction", "") asst_msg = s.get("assistant", "") or s.get("output", "") if user_msg and asst_msg: out.append(chatml(sys_msg, user_msg, asst_msg)) return out def convert_neuralchemy(samples: list[dict], max_samples: int | None) -> list[dict]: """NeurAlchemy: text classification → detection training.""" out = [] for s in samples[:max_samples]: text = s.get("text", "") label = s.get("label", 0) category = s.get("category", "unknown") severity = s.get("severity", "medium") user_msg = f"Analysiere den folgenden Text auf Prompt Injection:\n\n{text}" if label == 1: asst_msg = ( f"**INJECTION DETECTED**\n\n" f"Kategorie: {category}\n" f"Severity: {severity}\n" f"Der Text enthält einen Prompt Injection Versuch der Kategorie '{category}'." ) else: asst_msg = "**SAFE** — Kein Prompt Injection erkannt. Der Text ist legitim." out.append(chatml(MAGATAMA_SYSTEM, user_msg, asst_msg)) return out def convert_code_vuln_dpo(samples: list[dict], max_samples: int | None) -> list[dict]: """CyberNative DPO: vulnerable → fixed code pairs as SFT.""" out = [] for s in samples[:max_samples]: lang = s.get("lang", "unknown") vuln = s.get("vulnerability", "unknown") question = s.get("question", "") chosen = s.get("chosen", "") if question and chosen: user_msg = f"[{lang}] {vuln}\n\n{question}" out.append(chatml(MAGATAMA_SYSTEM, user_msg, chosen)) return out def convert_mitre_ttp(samples: list[dict], max_samples: int | None) -> list[dict]: """MITRE TTP Mapping: threat reports → technique classification.""" out = [] for s in samples[:max_samples]: text = s.get("text", "") or s.get("sentence", "") label = s.get("label", "") or s.get("technique", "") if text and label: user_msg = f"Welche MITRE ATT&CK Technik beschreibt der folgende Text?\n\n{text}" if isinstance(label, list): label_str = ", ".join(str(l) for l in label) else: label_str = str(label) asst_msg = f"MITRE ATT&CK Technik: {label_str}" out.append(chatml(MAGATAMA_SYSTEM, user_msg, asst_msg)) return out def convert_deepset(samples: list[dict], max_samples: int | None) -> list[dict]: """deepset: binary prompt injection classification.""" out = [] for s in samples[:max_samples]: text = s.get("text", "") label = s.get("label", 0) user_msg = f"Ist der folgende Text eine Prompt Injection?\n\n{text}" asst_msg = "**JA** — Prompt Injection erkannt." if label == 1 else "**NEIN** — Kein Prompt Injection." out.append(chatml(MAGATAMA_SYSTEM, user_msg, asst_msg)) return out def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--stats", action="store_true") parser.add_argument("--sample", type=int, default=None, help="Max samples per dataset") args = parser.parse_args() converters = { "fenrir-v2.jsonl": ("Fenrir v2.0 Security+DevOps", convert_fenrir), "trendyol-security.jsonl": ("Trendyol Cybersecurity", convert_trendyol), "neuralchemy-prompt-injection.jsonl": ("NeurAlchemy Injection", convert_neuralchemy), "code-vuln-dpo.jsonl": ("Code Vulnerability DPO", convert_code_vuln_dpo), "mitre-ttp-mapping.jsonl": ("MITRE TTP Mapping", convert_mitre_ttp), "deepset-injections.jsonl": ("deepset Injections", convert_deepset), } if args.stats: print(f"\n{'═'*60}") print(f"EXTERNAL DATASET STATS") print(f"{'═'*60}") total = 0 for filename, (name, _) in converters.items(): path = EXT_DIR / filename if path.exists(): count = sum(1 for _ in open(path)) size = path.stat().st_size / 1024 / 1024 total += count print(f" {name:40s} {count:>7,} samples ({size:.1f} MB)") else: print(f" {name:40s} MISSING") fixes_path = DATA_DIR / "fixes-chatml-sft.jsonl" if fixes_path.exists(): fixes_count = sum(1 for _ in open(fixes_path)) total += fixes_count print(f" {'Our fixes (fixes.json)':40s} {fixes_count:>7,} samples") print(f"\n {'TOTAL':40s} {total:>7,} samples") print(f"{'═'*60}\n") return # Convert all datasets all_chatml = [] # Load our fixes first fixes_path = DATA_DIR / "fixes-chatml-sft.jsonl" if fixes_path.exists(): our_fixes = load_jsonl(fixes_path) all_chatml.extend(our_fixes) logger.info("Our fixes: %d samples", len(our_fixes)) # Convert external datasets for filename, (name, converter) in converters.items(): path = EXT_DIR / filename if not path.exists(): logger.warning("Missing: %s", path) continue samples = load_jsonl(path) converted = converter(samples, args.sample) all_chatml.extend(converted) logger.info("%s: %d → %d ChatML samples", name, len(samples), len(converted)) # Write merged output merged_path = OUT_DIR / "merged-all-chatml-sft.jsonl" with open(merged_path, "w") as f: for s in all_chatml: f.write(json.dumps(s, ensure_ascii=False) + "\n") logger.info("Merged: %d total samples → %s (%.0f MB)", len(all_chatml), merged_path.name, merged_path.stat().st_size / 1024 / 1024) # Also write a sampled version for quick training if len(all_chatml) > 10000: import random random.seed(42) # Always include all our fixes, sample from external our_count = len(load_jsonl(fixes_path)) if fixes_path.exists() else 0 our_samples = all_chatml[:our_count] external_samples = all_chatml[our_count:] sampled_external = random.sample(external_samples, min(10000, len(external_samples))) sampled = our_samples + sampled_external random.shuffle(sampled) sampled_path = OUT_DIR / "sampled-10k-chatml-sft.jsonl" with open(sampled_path, "w") as f: for s in sampled: f.write(json.dumps(s, ensure_ascii=False) + "\n") logger.info("Sampled: %d samples → %s (%.0f MB)", len(sampled), sampled_path.name, sampled_path.stat().st_size / 1024 / 1024) print(f"\n{'═'*60}") print(f"CONVERSION COMPLETE") print(f"{'═'*60}") print(f" Full merged: {len(all_chatml):>7,} samples") print(f" Files created:") print(f" {merged_path}") if len(all_chatml) > 10000: print(f" {sampled_path}") print(f"\n Training command (full):") print(f" python3 scripts/train-fixes.py --data merged-all-chatml-sft.jsonl") print(f" Training command (sampled 10K):") print(f" python3 scripts/train-fixes.py --data sampled-10k-chatml-sft.jsonl") print(f"{'═'*60}\n") if __name__ == "__main__": main()