diff --git a/packages/fine-tuner/config/blog-v8-training.yaml b/packages/fine-tuner/config/blog-v8-training.yaml new file mode 100644 index 0000000..031c8c8 --- /dev/null +++ b/packages/fine-tuner/config/blog-v8-training.yaml @@ -0,0 +1,159 @@ +# ═══════════════════════════════════════════════════════════════════════════════ +# blog-v8-training.yaml — fo-blog-v8 Training Configuration +# +# Base: Qwen/Qwen2.5-14B-Instruct (4× the capacity of v7's 7B) +# Target: 700-1000w blog posts, optical networking + BGP + infra +# +# Key improvements over v7: +# - 14B params → better instruction following at higher complexity +# - LoRA r=64 (was r=32) → more expressive adapter +# - Weighted datasets: human posts × 3.0, external rewritten × 1.5 +# - More epochs (5 SFT, 2 DPO) → deeper style absorption +# - max_seq_length=4096 → handles longer real posts +# - DPO from real v7 quality labels (good/bad scored posts) +# ═══════════════════════════════════════════════════════════════════════════════ + +base_model: "Qwen/Qwen2.5-14B-Instruct" + +# ─── Dataset Sources (merged by consolidate_v8_dataset.py) ──────────────────── +datasets: + # Tier 1: Rene's actual blog posts — Gold Standard + - path: "~/transceiver-training-data/v8-real-posts-sft.jsonl" + weight: 3.0 + description: "19 real blog posts from blog.fichtmueller.org (human written)" + + # Tier 2: v7 generated blogs (Claude-written, 197 topics, validated) + - path: "~/transceiver-training-data/v7-generated-sft.jsonl" + weight: 1.0 + description: "Claude-generated optical networking blogs (v7, 197 topics)" + + # Tier 2: RIPE / APNIC NAS data + - path: "~/transceiver-training-data/v7-ripe-apnic-sft.jsonl" + weight: 1.0 + description: "RIPE/APNIC BGP and routing content (v7 ingested)" + + # Tier 3: External crawled + rewritten content + - path: "~/transceiver-training-data/v8-external-sft.jsonl" + weight: 1.5 + description: "APNIC Blog / RIPE Labs / potaroo.net / Cloudflare (Claude rewritten)" + + # DPO preferences: chosen/rejected pairs for preference learning + dpo: + - path: "~/transceiver-training-data/v7-dpo-pairs.jsonl" + description: "v7 DPO pairs (5 rejection strategies)" + - path: "~/transceiver-training-data/v8-quality-dpo.jsonl" + description: "v8 real quality labels (good/bad from v7 generated posts)" + optional: true # generated by label_v7_quality.py if run + +# ─── SFT Phase ──────────────────────────────────────────────────────────────── +sft: + output_dir: "adapters/fo-blog-v8/adapter" + merged_dir: "models/fo-blog-v8/merged" + + # LoRA parameters + lora: + r: 64 # was 32 in v7 — more expressive + alpha: 128 # 2× r for stable training + dropout: 0.05 + target_modules: + - "q_proj" + - "k_proj" + - "v_proj" + - "o_proj" + - "gate_proj" + - "up_proj" + - "down_proj" + + # Training hyperparameters + training: + num_train_epochs: 5 # was 4 — extra epoch for 14B + per_device_train_batch_size: 1 + gradient_accumulation_steps: 8 # effective batch = 8 + learning_rate: 1.2e-4 # slightly lower than v7's 1.5e-4 for 14B stability + warmup_ratio: 0.05 + lr_scheduler_type: "cosine" + max_seq_length: 4096 # was 2048 — handles longer real posts + fp16: false + bf16: true # M4 Max supports bf16 + optim: "adamw_torch" + weight_decay: 0.01 + max_grad_norm: 1.0 + logging_steps: 10 + save_steps: 100 + evaluation_strategy: "no" + dataloader_num_workers: 0 # MPS: no multiprocessing + remove_unused_columns: false + gradient_checkpointing: true # save RAM on 14B + + # Chat template (Qwen2.5 uses ChatML) + chat_template: "chatml" + dataset_text_field: "text" + +# ─── DPO Phase ──────────────────────────────────────────────────────────────── +dpo: + input_adapter: "adapters/fo-blog-v8/adapter" # start from SFT + output_dir: "adapters/fo-blog-v8-dpo/adapter" + + training: + num_train_epochs: 2 # was 1 — more DPO for 14B + per_device_train_batch_size: 1 + gradient_accumulation_steps: 8 + learning_rate: 5e-5 + warmup_ratio: 0.05 + lr_scheduler_type: "cosine" + max_seq_length: 4096 + bf16: true + optim: "adamw_torch" + logging_steps: 5 + save_steps: 50 + dataloader_num_workers: 0 + gradient_checkpointing: true + + dpo_params: + beta: 0.1 # KL penalty (standard) + loss_type: "sigmoid" # standard DPO loss + max_prompt_length: 512 + max_length: 4096 + +# ─── GGUF Conversion ────────────────────────────────────────────────────────── +gguf: + output_name: "fo-blog-v8.gguf" + quantization: "Q4_K_M" + ollama_model_name: "fo-blog-v8" + convert_script: "/opt/homebrew/Cellar/llama.cpp/8680/bin/convert_hf_to_gguf.py" + quantize_bin: "/opt/homebrew/bin/llama-quantize" + + # Ollama Modelfile system prompt + modelfile_system: | + You are an expert technical writer specializing in optical networking and transceiver technology. + + STRICT CONSTRAINTS: + - LENGTH: 700-1000 words ONLY. Stop at 1000 words maximum. + - STRUCTURE: 1) Hook paragraph, 2) Technical sections (## headers), 3) Exactly 3 takeaways + - TOPIC DISCIPLINE: Write ONLY about the exact topic requested. Zero drift. + - NO REPETITION: Every sentence adds new information. + - VOICE: Confident and direct. No hedging phrases. + - AUDIENCE: Network engineers and IT professionals. + + modelfile_params: + temperature: 0.7 + top_p: 0.9 + top_k: 40 + repeat_penalty: 1.15 + num_predict: 1500 + +# ─── Hardware ────────────────────────────────────────────────────────────────── +hardware: + device: "mps" # Apple Silicon M4 Max + ram_gb: 48 + python: "/opt/homebrew/bin/python3.13" + # 14B model in fp16 ≈ 28GB — fits in 48GB with LoRA overhead (~4GB) + # Training peak RAM estimate: ~36-40GB + # Merge on CPU: device_map="cpu" to avoid MPS OOM during save_pretrained + +# ─── Expected Timeline ───────────────────────────────────────────────────────── +# SFT: ~8-12 hours (5 epochs, 14B, MPS) +# DPO: ~2-4 hours (2 epochs, 14B) +# Merge: ~30 min (CPU) +# GGUF: ~15 min +# Total: ~12-16 hours (run overnight) diff --git a/packages/fine-tuner/scripts/consolidate_v8_dataset.py b/packages/fine-tuner/scripts/consolidate_v8_dataset.py new file mode 100644 index 0000000..0d91be8 --- /dev/null +++ b/packages/fine-tuner/scripts/consolidate_v8_dataset.py @@ -0,0 +1,394 @@ +#!/usr/bin/env python3 +""" +consolidate_v8_dataset.py — Alle v8 Datenquellen zusammenführen + +Merged alle JSONL-Quellen in ein einzelnes Training-Dataset: + Tier 1 (weight 3.0): Renes echte Blog-Posts (Gold Standard) + Tier 2 (weight 1.0): v7-generierte Blogs, RIPE/APNIC Ingest + Tier 3 (weight 1.5): Externe gecrawlte + umgeschriebene Posts + +Für SFT-Training: Dupliziert High-Weight Beispiele entsprechend +Für DPO-Training: Merged alle DPO-Pair-Dateien + +Output: + ~/transceiver-training-data/v8-sft-merged.jsonl (SFT, gewichtet) + ~/transceiver-training-data/v8-dpo-merged.jsonl (DPO, alle Pairs) + +Usage: + python3 scripts/consolidate_v8_dataset.py + python3 scripts/consolidate_v8_dataset.py --no-weight # flat, no duplication + python3 scripts/consolidate_v8_dataset.py --stats-only # nur Statistiken +""" + +from __future__ import annotations + +import argparse +import json +import math +import random +from pathlib import Path +from typing import Any + +DATA_DIR = Path.home() / "transceiver-training-data" +OUTPUT_SFT = DATA_DIR / "v8-sft-merged.jsonl" +OUTPUT_DPO = DATA_DIR / "v8-dpo-merged.jsonl" + +random.seed(42) + +# ─── SFT Sources ────────────────────────────────────────────────────────────── +SFT_SOURCES: list[dict[str, Any]] = [ + # Tier 1 — Gold: Rene's real posts (weight × 3) + { + "file": "v8-real-posts-sft.jsonl", + "weight": 3.0, + "tier": 1, + "description": "Rene's real blog posts (human written)", + }, + # Tier 2 — Good: Claude-generated, validated topics + { + "file": "v7-generated-sft.jsonl", + "weight": 1.0, + "tier": 2, + "description": "v7 Claude-generated optical/networking blogs", + }, + { + "file": "v7-ripe-apnic-sft.jsonl", + "weight": 1.0, + "tier": 2, + "description": "RIPE/APNIC BGP & routing content (v7 ingest)", + }, + # Tier 2 — Real v6 TIP outputs (good length, real style) + { + "file": "v8-v6blogs-sft.jsonl", + "weight": 2.0, + "tier": 2, + "description": "Real fo-blog-v6 outputs (700-1100w, actual optical networking voice)", + "optional": True, + }, + # Tier 3 — External: crawled + Claude rewritten + { + "file": "v8-external-sft.jsonl", + "weight": 1.5, + "tier": 3, + "description": "External: APNIC Blog / RIPE Labs / potaroo.net / Cloudflare", + "optional": True, + }, + # Tier 2 — Legacy: pre-v7 curated datasets (lower priority, check quality) + { + "file": "nanog-ripe-labs-content.jsonl", + "weight": 0.5, + "tier": 2, + "description": "NANOG/RIPE Labs curated (pre-v7)", + "optional": True, + "needs_conversion": True, # may use different schema + }, + { + "file": "rir-infrastructure-data.jsonl", + "weight": 0.5, + "tier": 2, + "description": "RIR infrastructure data (pre-v7)", + "optional": True, + "needs_conversion": True, + }, +] + +# ─── DPO Sources ────────────────────────────────────────────────────────────── +DPO_SOURCES: list[dict[str, Any]] = [ + { + "file": "v7-dpo-pairs.jsonl", + "description": "v7 DPO pairs (5 rejection strategies, synthetic)", + }, + { + "file": "v8-v6blogs-dpo.jsonl", + "description": "Real v6 too-long posts as rejected + Claude-rewritten chosen (real failures)", + "optional": True, + }, + { + "file": "v8-quality-dpo.jsonl", + "description": "v8 real quality labels (human good/bad scoring)", + "optional": True, + }, +] + +# ─── Required SFT fields ────────────────────────────────────────────────────── +REQUIRED_SFT_FIELDS = {"system_prompt", "input_text", "output_text"} +REQUIRED_DPO_FIELDS = {"prompt", "chosen", "rejected"} + + +def load_sft_file(path: Path, needs_conversion: bool = False) -> list[dict]: + """Load + validate SFT JSONL. Attempt field mapping for legacy formats.""" + records = [] + with open(path, encoding="utf-8") as f: + for i, line in enumerate(f, 1): + line = line.strip() + if not line: + continue + try: + item = json.loads(line) + except json.JSONDecodeError: + continue + + # Check required fields + if REQUIRED_SFT_FIELDS.issubset(item.keys()): + records.append(item) + continue + + if not needs_conversion: + continue + + # Try to map legacy schemas → standard + converted = _try_convert_legacy_sft(item) + if converted: + records.append(converted) + + return records + + +def _try_convert_legacy_sft(item: dict) -> dict | None: + """Try to map legacy JSONL formats to standard SFT schema.""" + # Schema: {instruction, input, output} + if "instruction" in item and "output" in item: + return { + "system_prompt": item.get("instruction", ""), + "input_text": item.get("input", ""), + "output_text": item["output"], + "meta": item.get("meta", {}), + } + # Schema: {prompt, completion} + if "prompt" in item and "completion" in item: + return { + "system_prompt": "", + "input_text": item["prompt"], + "output_text": item["completion"], + "meta": item.get("meta", {}), + } + # Schema: {messages: [{role, content}]} + if "messages" in item: + msgs = item["messages"] + sys_msg = next((m["content"] for m in msgs if m.get("role") == "system"), "") + user_msg = next((m["content"] for m in msgs if m.get("role") == "user"), "") + asst_msg = next((m["content"] for m in msgs if m.get("role") == "assistant"), "") + if user_msg and asst_msg: + return { + "system_prompt": sys_msg, + "input_text": user_msg, + "output_text": asst_msg, + "meta": item.get("meta", {}), + } + return None + + +def validate_sft_record(record: dict) -> bool: + """Quality gate for SFT records.""" + output = record.get("output_text", "") + words = len(output.split()) + if words < 200: + return False + if not record.get("input_text"): + return False + return True + + +def duplicate_by_weight(records: list[dict], weight: float) -> list[dict]: + """Duplicate records to approximate their training weight.""" + if weight <= 1.0: + return records + # Integer duplications + probabilistic remainder + full_copies = int(math.floor(weight)) + remainder = weight - full_copies + result = records * full_copies + # Add fractional copies + n_extra = int(len(records) * remainder) + if n_extra > 0: + extra = random.sample(records, min(n_extra, len(records))) + result.extend(extra) + return result + + +def merge_sft(apply_weights: bool = True) -> dict[str, int]: + """Merge all SFT sources into OUTPUT_SFT.""" + stats: dict[str, int] = {} + all_records: list[dict] = [] + + for source in SFT_SOURCES: + path = DATA_DIR / source["file"] + if not path.exists(): + if source.get("optional"): + print(f" SKIP (optional, not found): {source['file']}") + else: + print(f" MISSING (required): {source['file']}") + continue + + records = load_sft_file(path, source.get("needs_conversion", False)) + valid = [r for r in records if validate_sft_record(r)] + invalid = len(records) - len(valid) + + if invalid: + print(f" {source['file']}: {len(records)} loaded, {invalid} dropped (quality)") + + weight = source["weight"] if apply_weights else 1.0 + if apply_weights and weight != 1.0: + weighted = duplicate_by_weight(valid, weight) + print(f" Tier {source['tier']} | {source['file']}: {len(valid)} → {len(weighted)} (×{weight})") + else: + weighted = valid + print(f" Tier {source['tier']} | {source['file']}: {len(valid)}") + + # Tag with source metadata + for r in weighted: + if "meta" not in r: + r["meta"] = {} + r["meta"].setdefault("source_file", source["file"]) + r["meta"].setdefault("tier", source["tier"]) + + all_records.extend(weighted) + stats[source["file"]] = len(valid) + + # Shuffle to mix tiers + random.shuffle(all_records) + + OUTPUT_SFT.parent.mkdir(parents=True, exist_ok=True) + with open(OUTPUT_SFT, "w", encoding="utf-8") as f: + for r in all_records: + f.write(json.dumps(r, ensure_ascii=False) + "\n") + + print(f"\nSFT merged: {len(all_records)} examples → {OUTPUT_SFT}") + return stats + + +def merge_dpo() -> dict[str, int]: + """Merge all DPO sources into OUTPUT_DPO.""" + stats: dict[str, int] = {} + all_pairs: list[dict] = [] + + for source in DPO_SOURCES: + path = DATA_DIR / source["file"] + if not path.exists(): + if source.get("optional"): + print(f" SKIP (optional): {source['file']}") + else: + print(f" MISSING: {source['file']}") + continue + + pairs = [] + with open(path, encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + item = json.loads(line) + if REQUIRED_DPO_FIELDS.issubset(item.keys()): + # Ensure chosen != rejected and both are non-empty + if (item["chosen"] != item["rejected"] + and len(item["chosen"].split()) > 50 + and len(item["rejected"].split()) > 10): + pairs.append(item) + except json.JSONDecodeError: + pass + + print(f" {source['file']}: {len(pairs)} valid pairs") + all_pairs.extend(pairs) + stats[source["file"]] = len(pairs) + + random.shuffle(all_pairs) + + with open(OUTPUT_DPO, "w", encoding="utf-8") as f: + for p in all_pairs: + f.write(json.dumps(p, ensure_ascii=False) + "\n") + + print(f"\nDPO merged: {len(all_pairs)} pairs → {OUTPUT_DPO}") + return stats + + +def print_stats() -> None: + """Print dataset statistics without merging.""" + print("\n=== v8 Dataset Status ===\n") + + print("SFT Sources:") + sft_total = 0 + for source in SFT_SOURCES: + path = DATA_DIR / source["file"] + if path.exists(): + with open(path) as f: + count = sum(1 for line in f if line.strip()) + wc_list = [] + with open(path) as f: + for line in f: + if not line.strip(): + continue + try: + r = json.loads(line) + wc = r.get("meta", {}).get("word_count") or len(r.get("output_text", "").split()) + if wc: + wc_list.append(wc) + except Exception: + pass + avg_wc = int(sum(wc_list) / len(wc_list)) if wc_list else 0 + effective = int(count * source["weight"]) + sft_total += effective + status = f"✓ {count:4d} examples (×{source['weight']} → {effective:4d} effective, avg {avg_wc}w)" + else: + status = "✗ NOT FOUND" + (" (optional)" if source.get("optional") else " ⚠ REQUIRED") + print(f" [{source['tier']}] {source['file']}: {status}") + + print(f"\n Total effective SFT: {sft_total} examples") + + print("\nDPO Sources:") + dpo_total = 0 + for source in DPO_SOURCES: + path = DATA_DIR / source["file"] + if path.exists(): + with open(path) as f: + count = sum(1 for line in f if line.strip()) + dpo_total += count + print(f" ✓ {source['file']}: {count} pairs") + else: + optional = " (optional)" if source.get("optional") else " ⚠ REQUIRED" + print(f" ✗ {source['file']}: NOT FOUND{optional}") + + print(f"\n Total DPO: {dpo_total} pairs") + + # Merged files + print("\nMerged Outputs:") + for out in [OUTPUT_SFT, OUTPUT_DPO]: + if out.exists(): + with open(out) as f: + count = sum(1 for line in f if line.strip()) + size_mb = out.stat().st_size / 1_048_576 + print(f" ✓ {out.name}: {count} lines ({size_mb:.1f} MB)") + else: + print(f" ✗ {out.name}: not yet generated") + + +def main() -> None: + parser = argparse.ArgumentParser(description="Merge v8 training datasets") + parser.add_argument("--no-weight", action="store_true", + help="Flat merge without duplication by weight") + parser.add_argument("--stats-only", action="store_true", + help="Print statistics only, do not merge") + parser.add_argument("--sft-only", action="store_true", help="Only merge SFT") + parser.add_argument("--dpo-only", action="store_true", help="Only merge DPO") + args = parser.parse_args() + + if args.stats_only: + print_stats() + return + + print_stats() + print() + + if not args.dpo_only: + print("=== Merging SFT ===") + merge_sft(apply_weights=not args.no_weight) + + if not args.sft_only: + print("\n=== Merging DPO ===") + merge_dpo() + + print("\n=== Final Stats ===") + print_stats() + + +if __name__ == "__main__": + main() diff --git a/packages/fine-tuner/scripts/crawl_v8_sources.py b/packages/fine-tuner/scripts/crawl_v8_sources.py new file mode 100644 index 0000000..b131784 --- /dev/null +++ b/packages/fine-tuner/scripts/crawl_v8_sources.py @@ -0,0 +1,607 @@ +#!/usr/bin/env python3 +""" +crawl_v8_sources.py — Crawlt externe Quellen für v8 Trainingsdaten + +Quellen (priorisiert nach technischer Tiefe): + 1. APNIC Blog — https://blog.apnic.net/feed/ (400-500 Posts) + 2. RIPE Labs — https://labs.ripe.net/feed/blog/ (300-400 Posts) + 3. Geoff Huston — https://www.potaroo.net/ispcol/ (500 Artikel) + 4. Cloudflare Blog — /tag/networking + /tag/bgp (30-50 Posts) + 5. ARIN Blog — https://www.arin.net/blog/ (bonus) + +Für jede Quelle: + - Artikel-URL + Titel + Kategorie extrahieren + - Rohtext herunterladen und bereinigen + - Claude CLI rewritet → 700-1000w, hook + Sektionen + Takeaways + - Als SFT JSONL speichern (weight: 1.5) + +Output: ~/transceiver-training-data/v8-external-sft.jsonl + +Usage: + python3 scripts/crawl_v8_sources.py # alle Quellen + python3 scripts/crawl_v8_sources.py --source apnic # nur APNIC + python3 scripts/crawl_v8_sources.py --max 50 # max 50 Artikel + python3 scripts/crawl_v8_sources.py --dry-run # nur URLs anzeigen +""" + +from __future__ import annotations + +import argparse +import json +import logging +import re +import subprocess +import time +import urllib.request +import urllib.error +from html.parser import HTMLParser +from pathlib import Path +from html import unescape as html_unescape +from typing import NamedTuple + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", +) +logger = logging.getLogger(__name__) + +OUTPUT_FILE = Path.home() / "transceiver-training-data" / "v8-external-sft.jsonl" +PROGRESS_FILE = Path.home() / "transceiver-training-data" / "v8-crawl-progress.json" +TIMEOUT = 30 +CLAUDE_TIMEOUT = 180 +USER_AGENT = "Mozilla/5.0 (compatible; research-bot/1.0; training-data-collection)" + +SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, BGP routing, and network infrastructure. + +STRICT CONSTRAINTS — Follow exactly, no exceptions: +- LENGTH: 700–1000 words. Count carefully. Stop at 1000 words maximum. +- STRUCTURE (mandatory, in this order): + 1. HOOK paragraph — 2–3 sentences stating the problem this post addresses + 2. Technical sections — 3–4 H2 sections covering the topic in depth + 3. PRACTICAL TAKEAWAYS — exactly 3 bullet points, actionable +- TOPIC DISCIPLINE: Write ONLY about the exact topic in the source material. Zero drift. +- NO REPETITION: Every sentence must add new information. No restating. +- VOICE: Confident, direct. No hedging phrases like "it's worth noting". +- AUDIENCE: Network engineers and IT professionals. Assume technical fluency. +- FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms. + +Do not summarize what you are about to write. Start with the hook directly. +Do not copy from the source — rewrite completely in your own words.""" + + +class Article(NamedTuple): + title: str + url: str + source: str + category: str + + +# ─── HTML Utilities ─────────────────────────────────────────────────────────── + +def fetch_url(url: str, timeout: int = TIMEOUT) -> str | None: + """Fetch URL, return text or None.""" + try: + req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) + with urllib.request.urlopen(req, timeout=timeout) as resp: + charset = resp.headers.get_content_charset() or "utf-8" + return resp.read().decode(charset, errors="ignore") + except Exception as exc: + logger.warning("Fetch failed %s: %s", url, exc) + return None + + +def strip_tags(html: str) -> str: + """Strip HTML tags → plain text, decode all HTML entities.""" + text = re.sub(r']*>.*?', '', html, flags=re.DOTALL) + text = re.sub(r']*>.*?', '', text, flags=re.DOTALL) + text = re.sub(r'<[^>]+>', ' ', text) + text = html_unescape(text) # handles &, ‘, ', etc. + text = text.replace('\xa0', ' ') # non-breaking space + text = re.sub(r'\s+', ' ', text) + return text.strip() + + +def find_balanced_div(html: str, start_pos: int) -> str: + """Extract content of a
starting at start_pos, handling nested divs.""" + depth = 0 + pos = start_pos + content_start = html.find('>', start_pos) + if content_start < 0: + return "" + content_start += 1 + pos = content_start + + while pos < len(html): + open_m = re.search(r']', html[pos:], re.IGNORECASE) + close_m = re.search(r'
', html[pos:], re.IGNORECASE) + + if not close_m: + break + + open_pos = open_m.start() if open_m else len(html) + close_pos = close_m.start() + + if open_pos < close_pos: + depth += 1 + pos += open_pos + 4 + else: + if depth == 0: + return html[content_start:pos + close_pos] + depth -= 1 + pos += close_pos + 6 + + return "" + + +def find_content_block(html: str, *class_or_id_patterns: str) -> str: + """Find a div by class or id pattern (handles any attribute order), return its content.""" + for pattern in class_or_id_patterns: + # Match
regardless of attribute order + m = re.search( + rf']+(?:class|id)="[^"]*{re.escape(pattern)}[^"]*"[^>]*>', + html, re.IGNORECASE, + ) + if m: + content = find_balanced_div(html, m.start()) + if len(content.split()) > 50: + return content + return "" + + +def extract_article_text(html: str, source: str) -> str: + """Extract main content area based on source-specific selectors.""" + # Potaroo uses old-school table layout — extract all

paragraphs + if source == "potaroo": + paragraphs = re.findall(r']*>(.*?)

', html, re.DOTALL | re.IGNORECASE) + # Filter: skip short nav-like paragraphs, keep substantive text + long_paras = [strip_tags(p) for p in paragraphs if len(strip_tags(p).split()) > 20] + if long_paras: + return "\n\n".join(long_paras) + return "" + + # Source-specific content div identifiers + source_patterns: dict[str, list[str]] = { + "apnic": ["entry-content", "article-content", "post-content"], + "ripe": ["article-body", "entry-content", "ripe-article"], + "cloudflare": ["post-content", "article-content", "entry-content"], + "arin": ["entry-content", "post-content", "article-body"], + } + + patterns = source_patterns.get(source, ["entry-content", "article-content"]) + + # Try div-based extraction first (handles nested divs correctly) + raw = find_content_block(html, *patterns) + + # Fallback:
tag + if not raw: + m = re.search(r']*>(.*?)
', html, re.DOTALL | re.IGNORECASE) + if m: + raw = m.group(1) + + # Fallback:
tag + if not raw: + m = re.search(r']*>(.*?)
', html, re.DOTALL | re.IGNORECASE) + if m: + raw = m.group(1) + + if not raw or len(raw.split()) < 30: + return "" + + # Convert to markdown-ish plain text + for level in [3, 2, 1]: + raw = re.sub( + rf']*>(.*?)', + lambda m: f"\n{'#'*level} {strip_tags(m.group(1))}\n", + raw, flags=re.DOTALL | re.IGNORECASE + ) + raw = re.sub(r']*>(.*?)', r'\n- \1', raw, flags=re.DOTALL) + raw = re.sub(r']*>', '\n', raw) + raw = re.sub(r'

', '\n', raw) + raw = re.sub(r'', '\n', raw) + raw = re.sub(r'<(?:strong|b)[^>]*>(.*?)', r'**\1**', raw, flags=re.DOTALL) + text = strip_tags(raw) + text = re.sub(r'\n{3,}', '\n\n', text) + text = re.sub(r'[ \t]+', ' ', text) + return text.strip() + + +# ─── RSS Parser ─────────────────────────────────────────────────────────────── + +def parse_rss(xml: str, source: str, max_items: int = 200) -> list[Article]: + """Parse RSS/Atom feed → list of Articles.""" + articles = [] + + # Try (RSS 2.0) + items = re.findall(r'(.*?)', xml, re.DOTALL) + if not items: + # Try (Atom) + items = re.findall(r'(.*?)', xml, re.DOTALL) + + for item in items[:max_items]: + title_m = re.search(r']*>(?:)?', item, re.DOTALL) + link_m = (re.search(r']*/>', item) or + re.search(r']*>(.*?)', item, re.DOTALL) or + re.search(r']*>(https?://[^<]+)', item)) + cat_m = re.search(r']*>(?:)?', item, re.DOTALL) + + if not title_m or not link_m: + continue + + title = strip_tags(title_m.group(1)).strip() + # For self-closing, get href attribute + link_raw = link_m.group(0) + href_m = re.search(r'href="([^"]+)"', link_raw) or re.search(r'>(https?://[^<]+)<', link_raw) + url = href_m.group(1) if href_m else link_m.group(1) if link_m.lastindex else "" + url = url.strip() + category = strip_tags(cat_m.group(1)).strip() if cat_m else "networking" + + if title and url.startswith("http"): + articles.append(Article(title=title, url=url, source=source, category=category)) + + logger.info("RSS parsed %d articles from %s", len(articles), source) + return articles + + +# ─── Source-specific fetchers ───────────────────────────────────────────────── + +def fetch_apnic(max_items: int = 200) -> list[Article]: + """APNIC Blog via RSS.""" + articles = [] + # APNIC has paginated RSS - try multiple pages + for page in range(1, 6): + url = f"https://blog.apnic.net/feed/?paged={page}" if page > 1 else "https://blog.apnic.net/feed/" + xml = fetch_url(url) + if not xml: + break + page_articles = parse_rss(xml, "apnic", max_items) + articles.extend(page_articles) + if len(articles) >= max_items or len(page_articles) < 10: + break + time.sleep(1) + return articles[:max_items] + + +def fetch_ripe_labs(max_items: int = 200) -> list[Article]: + """RIPE Labs via RSS (feed.xml).""" + feeds = [ + "https://labs.ripe.net/feed.xml", + "https://labs.ripe.net/feed/", + "https://labs.ripe.net/Members/feed/", + ] + for feed_url in feeds: + xml = fetch_url(feed_url) + if xml and len(xml) > 500: + articles = parse_rss(xml, "ripe", max_items) + if articles: + return articles[:max_items] + time.sleep(1) + return [] + + +def fetch_potaroo(max_items: int = 100) -> list[Article]: + """Geoff Huston's potaroo.net ISPCOL column index. + + Articles are linked as relative hrefs like "2026-04/nznog26.html" from + https://www.potaroo.net/ispcol/index.html + """ + base_url = "https://www.potaroo.net/ispcol" + html = fetch_url(f"{base_url}/index.html") or fetch_url(f"{base_url}/") + if not html: + logger.warning("Could not fetch potaroo.net index") + return [] + + articles = [] + seen: set[str] = set() + + # Pattern: href="YYYY-MM/slug.html" followed (nearby) by link text + # The index lists articles as: Title + for m in re.finditer(r'href="(\d{4}-\d{2}/[^"]+\.html)"[^>]*>([^<]{5,150})<', html, re.IGNORECASE): + rel_path, title_raw = m.group(1), m.group(2).strip() + url = f"{base_url}/{rel_path}" + title = html_unescape(title_raw).strip() + if url not in seen and len(title) > 5: + seen.add(url) + articles.append(Article(title=title, url=url, source="potaroo", category="bgp-routing")) + if len(articles) >= max_items: + break + + # Fallback: bare date-path links without visible anchor text + if not articles: + for m in re.finditer(r'href="(\d{4}-\d{2}/[^"]+\.html)"', html, re.IGNORECASE): + rel_path = m.group(1) + url = f"{base_url}/{rel_path}" + slug = rel_path.split("/")[-1].replace(".html", "").replace("-", " ").title() + if url not in seen: + seen.add(url) + articles.append(Article(title=f"Geoff Huston: {slug}", url=url, source="potaroo", category="bgp-routing")) + if len(articles) >= max_items: + break + + logger.info("Potaroo: found %d articles", len(articles)) + return articles[:max_items] + + +def fetch_cloudflare(max_items: int = 60) -> list[Article]: + """Cloudflare blog — networking and BGP tags.""" + articles = [] + tags = ["bgp", "routing", "dns", "tcp", "ddos", "ipv6"] + + for tag in tags: + # Cloudflare has a JSON API for blog listing + api_url = f"https://blog.cloudflare.com/tag/{tag}/" + html = fetch_url(api_url) + if not html: + continue + + # Extract article links from blog listing + for m in re.finditer( + r'href="(/[a-z0-9-]{10,80}/)"[^>]*>.*?]*>([^<]{10,150})', + html, re.DOTALL | re.IGNORECASE + ): + url = f"https://blog.cloudflare.com{m.group(1)}" + title = strip_tags(m.group(2)).strip() + if url not in [a.url for a in articles] and title: + articles.append(Article(title=title, url=url, source="cloudflare", category=tag)) + if len(articles) >= max_items: + break + + if len(articles) >= max_items: + break + time.sleep(1) + + logger.info("Cloudflare: found %d articles", len(articles)) + return articles[:max_items] + + +def fetch_arin_blog(max_items: int = 50) -> list[Article]: + """ARIN blog via RSS (feed.xml).""" + feeds = [ + "https://www.arin.net/blog/feed.xml", + "https://www.arin.net/blog/feed/", + "https://www.arin.net/feed/", + ] + for feed_url in feeds: + xml = fetch_url(feed_url) + if xml and len(xml) > 500: + articles = parse_rss(xml, "arin", max_items) + if articles: + return articles[:max_items] + time.sleep(0.5) + return [] + + +# ─── Category → audience mapping ───────────────────────────────────────────── + +def category_to_audience(category: str, source: str) -> str: + cat_lower = category.lower() + if any(k in cat_lower for k in ["bgp", "routing", "rpki", "aspa", "irr", "peering"]): + return "network engineers and NOC operators managing BGP and routing infrastructure" + elif any(k in cat_lower for k in ["dns", "dnssec", "resolver"]): + return "network engineers and infrastructure operators managing DNS services" + elif any(k in cat_lower for k in ["security", "ddos", "attack", "vulnerab"]): + return "network security engineers and SOC operators" + elif any(k in cat_lower for k in ["ipv6", "ipv4", "address"]): + return "network architects and engineers planning IP address strategy" + elif any(k in cat_lower for k in ["datacenter", "optical", "transceiver", "400g"]): + return "network engineers and IT professionals who evaluate and operate optical infrastructure" + else: + return "network engineers and infrastructure operators" + + +# ─── Claude rewrite ─────────────────────────────────────────────────────────── + +def rewrite_with_claude( + title: str, + raw_text: str, + audience: str, + timeout: int = CLAUDE_TIMEOUT, +) -> str | None: + """Use Claude CLI to rewrite raw article text in our blog format.""" + if len(raw_text.split()) < 100: + return None + + # Truncate very long source articles (keep first ~1500 words for context) + words = raw_text.split() + if len(words) > 1500: + raw_text = " ".join(words[:1500]) + "\n\n[Source truncated]" + + prompt = ( + f"Write a blog post on this topic.\n\n" + f"**Topic:** {title}\n\n" + f"**Target audience:** {audience}\n\n" + f"**Source material for reference (DO NOT COPY — rewrite completely):**\n\n" + f"{raw_text}\n\n" + f"Remember: 700–1000 words, hook + technical sections + 3 takeaways. " + f"Stay strictly on-topic. No filler. Start writing now." + ) + + try: + result = subprocess.run( + ["claude", "--print", "--system-prompt", SYSTEM_PROMPT, "-p", prompt], + capture_output=True, + text=True, + timeout=timeout, + ) + if result.returncode != 0 or not result.stdout.strip(): + logger.warning("Claude returned error: %s", result.stderr[:200]) + return None + output = result.stdout.strip() + word_count = len(output.split()) + if word_count < 400 or output.startswith("I "): + return None + return output + except subprocess.TimeoutExpired: + logger.warning("Claude timeout for: %s", title[:50]) + return None + except Exception as exc: + logger.warning("Claude error: %s", exc) + return None + + +# ─── Main crawler loop ──────────────────────────────────────────────────────── + +def load_progress() -> set[str]: + """Load already-processed URLs.""" + if not PROGRESS_FILE.exists(): + return set() + try: + data = json.loads(PROGRESS_FILE.read_text()) + return set(data.get("done", [])) + except Exception: + return set() + + +def save_progress(done_urls: set[str]) -> None: + PROGRESS_FILE.write_text(json.dumps({"done": list(done_urls)}, ensure_ascii=False)) + + +def crawl_source( + source: str, + articles: list[Article], + done_urls: set[str], + max_articles: int, + dry_run: bool, +) -> tuple[int, int]: + """Crawl + rewrite articles from one source. Returns (saved, skipped).""" + saved = 0 + skipped = 0 + count = 0 + + OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) + + for article in articles: + if count >= max_articles: + break + if article.url in done_urls: + logger.debug("SKIP (done): %s", article.url) + skipped += 1 + continue + + count += 1 + logger.info("[%s] %d/%d: %s", source.upper(), count, min(len(articles), max_articles), article.title[:60]) + + if dry_run: + print(f" DRY: {article.url}") + continue + + # Fetch article HTML + html = fetch_url(article.url) + if not html: + logger.warning(" SKIP (fetch failed)") + done_urls.add(article.url) + skipped += 1 + continue + + # Extract text + raw_text = extract_article_text(html, source) + if len(raw_text.split()) < 100: + logger.warning(" SKIP (too short: %d words)", len(raw_text.split())) + done_urls.add(article.url) + skipped += 1 + continue + + # Rewrite with Claude + audience = category_to_audience(article.category, source) + rewritten = rewrite_with_claude(article.title, raw_text, audience) + + if not rewritten: + logger.warning(" SKIP (claude failed)") + done_urls.add(article.url) + skipped += 1 + continue + + word_count = len(rewritten.split()) + logger.info(" OK: %dw | %s", word_count, article.title[:50]) + + record = { + "system_prompt": SYSTEM_PROMPT, + "input_text": ( + f"Write a blog post on the following topic:\n\n" + f"**Topic:** {article.title}\n\n" + f"**Target audience:** {audience}\n\n" + f"Remember: 700–1000 words, hook + technical sections + 3 takeaways. " + f"Stay strictly on-topic. No filler. Start writing now." + ), + "output_text": rewritten, + "meta": { + "title": article.title, + "source_url": article.url, + "source": source, + "category": article.category, + "word_count": word_count, + "quality": "external_rewritten", + "weight": 1.5, + "dataset_version": "v8", + }, + } + + with open(OUTPUT_FILE, "a", encoding="utf-8") as f: + f.write(json.dumps(record, ensure_ascii=False) + "\n") + + done_urls.add(article.url) + save_progress(done_urls) + saved += 1 + + # Rate limiting — be a good citizen + time.sleep(2) + + return saved, skipped + + +# ─── Entry point ────────────────────────────────────────────────────────────── + +SOURCES = { + "apnic": fetch_apnic, + "ripe": fetch_ripe_labs, + "potaroo": fetch_potaroo, + "cloudflare": fetch_cloudflare, + # "arin": fetch_arin_blog, # No accessible RSS feed found +} + + +def main() -> None: + parser = argparse.ArgumentParser(description="Crawl external networking blogs for v8 training data") + parser.add_argument("--source", choices=list(SOURCES.keys()) + ["all"], default="all", + help="Which source(s) to crawl (default: all)") + parser.add_argument("--max", type=int, default=100, + help="Max articles per source (default: 100)") + parser.add_argument("--dry-run", action="store_true", + help="Show URLs without downloading or rewriting") + args = parser.parse_args() + + done_urls = load_progress() + logger.info("Resuming: %d URLs already processed", len(done_urls)) + + active_sources = list(SOURCES.keys()) if args.source == "all" else [args.source] + # Filter only existing sources (arin removed) + active_sources = [s for s in active_sources if s in SOURCES] + total_saved = 0 + + for source in active_sources: + logger.info("=== Fetching article list: %s ===", source.upper()) + try: + articles = SOURCES[source](args.max) + except Exception as exc: + logger.error("Failed to fetch %s articles: %s", source, exc) + continue + + if not articles: + logger.warning("No articles found for %s", source) + continue + + saved, skipped = crawl_source(source, articles, done_urls, args.max, args.dry_run) + logger.info("%s done: saved=%d skipped=%d", source.upper(), saved, skipped) + total_saved += saved + + if not args.dry_run: + total_lines = 0 + if OUTPUT_FILE.exists(): + with open(OUTPUT_FILE) as f: + total_lines = sum(1 for _ in f) + logger.info("=== DONE: total saved=%d | output total=%d lines ===", total_saved, total_lines) + logger.info("Output: %s", OUTPUT_FILE) + + +if __name__ == "__main__": + main() diff --git a/packages/fine-tuner/scripts/label_v7_quality.py b/packages/fine-tuner/scripts/label_v7_quality.py new file mode 100644 index 0000000..ca9ee37 --- /dev/null +++ b/packages/fine-tuner/scripts/label_v7_quality.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python3 +""" +label_v7_quality.py — Bewertet v7-generierte Blogs und erstellt DPO-Labels + +Claude liest jeden generierten Blog und bewertet ihn nach 5 Kriterien: + 1. Wortanzahl (700-1000 = gut, sonst schlecht) + 2. Hook vorhanden (klares Einstiegsproblem) + 3. Technische Tiefe (nicht generisch) + 4. Struktur (## Headers, 3 Takeaways) + 5. Kein Drift (bleibt beim Thema) + +Aus gut/schlecht Bewertungen: + - "gut" + "schlecht" vom gleichen Thema → DPO-Pair + - Oder: "schlecht" → Claude schreibt bessere Version → DPO-Pair + +Output: + ~/transceiver-training-data/v8-quality-dpo.jsonl + +Usage: + python3 scripts/label_v7_quality.py + python3 scripts/label_v7_quality.py --input v7-generated-sft.jsonl + python3 scripts/label_v7_quality.py --max 50 --rewrite-bad +""" + +from __future__ import annotations + +import argparse +import json +import logging +import re +import subprocess +from pathlib import Path + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", +) +logger = logging.getLogger(__name__) + +DATA_DIR = Path.home() / "transceiver-training-data" +DEFAULT_INPUT = DATA_DIR / "v7-generated-sft.jsonl" +OUTPUT_FILE = DATA_DIR / "v8-quality-dpo.jsonl" +PROGRESS_FILE = DATA_DIR / "v8-quality-progress.json" + +CLAUDE_TIMEOUT = 120 + +SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, and network infrastructure. + +STRICT CONSTRAINTS — Follow exactly, no exceptions: +- LENGTH: 700–1000 words. Count carefully. Stop at 1000 words maximum. +- STRUCTURE (mandatory, in this order): + 1. HOOK paragraph — 2–3 sentences stating the problem this post addresses + 2. Technical sections — 3–4 H2 sections covering the topic in depth + 3. PRACTICAL TAKEAWAYS — exactly 3 bullet points, actionable +- TOPIC DISCIPLINE: Write ONLY about the exact topic requested. Zero drift. +- NO REPETITION: Every sentence must add new information. No restating. +- VOICE: Confident, direct. No hedging phrases like "it's worth noting". +- AUDIENCE: Network engineers and IT professionals. Assume technical fluency. +- FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms. + +Do not summarize what you are about to write. Start with the hook directly.""" + +JUDGE_PROMPT = """Evaluate this blog post against the following criteria. Respond with ONLY a JSON object — no explanation, no markdown. + +Criteria: +1. word_count_ok: true if 700-1000 words (count the words) +2. has_hook: true if first paragraph clearly states a real problem (not generic intro) +3. technical_depth: true if contains specific technical details (numbers, standards, product names) +4. good_structure: true if has ## H2 headers AND ends with 3 bullet point takeaways +5. on_topic: true if stays focused on the exact topic throughout (no generic drift) +6. overall: "good" if at least 4 of 5 criteria pass, otherwise "bad" +7. issues: list of failed criteria names + +Response format exactly: +{"word_count_ok": true/false, "has_hook": true/false, "technical_depth": true/false, "good_structure": true/false, "on_topic": true/false, "overall": "good"/"bad", "issues": ["..."]} + +Blog post to evaluate: + +""" + + +def load_progress() -> set[str]: + if not PROGRESS_FILE.exists(): + return set() + try: + return set(json.loads(PROGRESS_FILE.read_text()).get("done", [])) + except Exception: + return set() + + +def save_progress(done: set[str]) -> None: + PROGRESS_FILE.write_text(json.dumps({"done": list(done)})) + + +def judge_blog(blog_text: str) -> dict | None: + """Ask Claude to evaluate a blog post quality. Returns parsed JSON or None.""" + prompt = JUDGE_PROMPT + blog_text + + try: + result = subprocess.run( + ["claude", "--print", "-p", prompt], + capture_output=True, text=True, timeout=CLAUDE_TIMEOUT, + ) + if result.returncode != 0 or not result.stdout.strip(): + return None + + output = result.stdout.strip() + # Extract JSON from response (Claude might wrap it) + json_match = re.search(r'\{[^{}]+\}', output, re.DOTALL) + if not json_match: + return None + return json.loads(json_match.group(0)) + except (subprocess.TimeoutExpired, json.JSONDecodeError, Exception): + return None + + +def quick_check(blog_text: str) -> dict: + """Fast deterministic pre-check (no Claude call) to filter obvious cases.""" + word_count = len(blog_text.split()) + has_h2 = bool(re.search(r'^## ', blog_text, re.MULTILINE)) + has_bullets = bool(re.search(r'^[-*•] ', blog_text, re.MULTILINE)) + bullet_count = len(re.findall(r'^[-*•] ', blog_text, re.MULTILINE)) + starts_ok = not blog_text.strip().startswith("In this ") + has_takeaways = bool(re.search(r'takeaway|practical|key point', blog_text, re.IGNORECASE)) + + # Deterministic verdict (no Claude needed for clear failures) + clear_bad = word_count < 500 or word_count > 1500 or not has_h2 + clear_good = (700 <= word_count <= 1050 and has_h2 and bullet_count >= 3 + and starts_ok) + + return { + "word_count": word_count, + "has_h2": has_h2, + "has_bullets": has_bullets, + "bullet_count": bullet_count, + "clear_bad": clear_bad, + "clear_good": clear_good, + "needs_claude": not clear_bad and not clear_good, + } + + +def rewrite_for_chosen(title: str, input_text: str) -> str | None: + """Use Claude to write a high-quality version (the 'chosen' half of DPO pair).""" + try: + result = subprocess.run( + ["claude", "--print", "--system-prompt", SYSTEM_PROMPT, "-p", input_text], + capture_output=True, text=True, timeout=180, + ) + if result.returncode != 0 or not result.stdout.strip(): + return None + output = result.stdout.strip() + wc = len(output.split()) + if wc < 400 or wc > 1500: + return None + return output + except Exception: + return None + + +def process_examples( + input_file: Path, + max_items: int | None, + rewrite_bad: bool, +) -> None: + if not input_file.exists(): + logger.error("Input file not found: %s", input_file) + return + + examples = [] + with open(input_file, encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + examples.append(json.loads(line)) + except json.JSONDecodeError: + pass + + if max_items: + examples = examples[:max_items] + + logger.info("Processing %d examples from %s", len(examples), input_file.name) + + done = load_progress() + stats = {"good": 0, "bad": 0, "dpo_pairs": 0, "skipped": 0} + + OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) + + with open(OUTPUT_FILE, "a", encoding="utf-8") as out_f: + for i, item in enumerate(examples): + topic = item.get("meta", {}).get("topic", "")[:60] + output_text = item.get("output_text", "") + input_text = item.get("input_text", "") + system = item.get("system_prompt", SYSTEM_PROMPT) + + item_id = f"{input_file.stem}:{i}" + if item_id in done: + stats["skipped"] += 1 + continue + + # ── Fast pre-check ── + check = quick_check(output_text) + logger.info("[%03d/%03d] %dw | %s", i + 1, len(examples), check["word_count"], topic) + + if check["clear_good"]: + verdict = "good" + logger.info(" → CLEAR GOOD (deterministic)") + elif check["clear_bad"]: + verdict = "bad" + logger.info(" → CLEAR BAD (deterministic: %dw, h2=%s)", check["word_count"], check["has_h2"]) + else: + # Ask Claude to judge + judgment = judge_blog(output_text) + if judgment is None: + logger.warning(" → SKIP (judge failed)") + done.add(item_id) + stats["skipped"] += 1 + continue + verdict = judgment.get("overall", "bad") + issues = judgment.get("issues", []) + logger.info(" → %s | issues: %s", verdict.upper(), issues) + + if verdict == "good": + stats["good"] += 1 + else: + stats["bad"] += 1 + + if rewrite_bad: + # Create DPO pair: bad original → rewritten chosen + logger.info(" Rewriting bad post for DPO...") + chosen = rewrite_for_chosen(topic, input_text) + if chosen and chosen != output_text: + prompt = ( + f"<|im_start|>system\n{system}<|im_end|>\n" + f"<|im_start|>user\n{input_text}<|im_end|>\n" + ) + pair = { + "prompt": prompt, + "chosen": chosen, + "rejected": output_text, + "meta": { + "topic": topic, + "rejection_strategy": "quality_labeled_bad", + "chosen_words": len(chosen.split()), + "rejected_words": check["word_count"], + "verdict": verdict, + "dataset_version": "v8", + }, + } + out_f.write(json.dumps(pair, ensure_ascii=False) + "\n") + out_f.flush() + stats["dpo_pairs"] += 1 + logger.info(" DPO pair saved: %dw chosen vs %dw rejected", + len(chosen.split()), check["word_count"]) + + done.add(item_id) + save_progress(done) + + logger.info( + "Done: good=%d bad=%d dpo_pairs=%d skipped=%d", + stats["good"], stats["bad"], stats["dpo_pairs"], stats["skipped"], + ) + logger.info("Output: %s", OUTPUT_FILE) + + # Print summary + if OUTPUT_FILE.exists(): + with open(OUTPUT_FILE) as f: + total = sum(1 for _ in f) + logger.info("Total DPO pairs in output: %d", total) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Label v7 posts for quality DPO pairs") + parser.add_argument( + "--input", type=Path, default=DEFAULT_INPUT, + help=f"Input JSONL with SFT examples (default: {DEFAULT_INPUT})", + ) + parser.add_argument( + "--max", type=int, default=None, + help="Max examples to process", + ) + parser.add_argument( + "--rewrite-bad", action="store_true", + help="Rewrite bad posts with Claude to create DPO pairs (slower, costs more)", + ) + args = parser.parse_args() + process_examples(args.input, args.max, args.rewrite_bad) + + +if __name__ == "__main__": + main() diff --git a/packages/fine-tuner/scripts/parse_real_posts.py b/packages/fine-tuner/scripts/parse_real_posts.py new file mode 100644 index 0000000..ebaa996 --- /dev/null +++ b/packages/fine-tuner/scripts/parse_real_posts.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +""" +parse_real_posts.py — Konvertiert echte Blog-HTML → v8 SFT Trainingsdaten + +Extrahiert Titel + Inhalt aus Ghost CMS HTML, baut daraus hochwertige +Training-Beispiele mit dem v7/v8 System Prompt. + +Diese echten Posts sind GOLD — Rene's eigene Stimme, echte Expertise, +keine AI-Halluzinationen. Werden als Top-Priorität in v8 gewichtet. + +Input: ~/transceiver-training-data/v8-real-posts/*.html +Output: ~/transceiver-training-data/v8-real-posts-sft.jsonl +""" + +from __future__ import annotations + +import json +import re +from pathlib import Path + +POSTS_DIR = Path.home() / "transceiver-training-data" / "v8-real-posts" +OUTPUT_FILE = Path.home() / "transceiver-training-data" / "v8-real-posts-sft.jsonl" + +# Welche Posts sind für Blog-Training relevant (nicht rein persönlich/tool-spezifisch) +# Alle nehmen — auch nicht-Transceiver Posts zeigen Schreibstil + Struktur +SKIP_SLUGS = set() # nichts überspringen — alle Posts zeigen Renes Voice + +SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, and network infrastructure. + +STRICT CONSTRAINTS — Follow exactly, no exceptions: +- LENGTH: 700–1000 words. Count carefully. Stop at 1000 words maximum. +- STRUCTURE (mandatory, in this order): + 1. HOOK paragraph — 2–3 sentences stating the problem this post addresses + 2. Technical sections — 3–4 H2 sections covering the topic in depth + 3. PRACTICAL TAKEAWAYS — exactly 3 bullet points, actionable +- TOPIC DISCIPLINE: Write ONLY about the exact topic requested. Zero drift. +- NO REPETITION: Every sentence must add new information. No restating. +- VOICE: Confident, direct. No hedging phrases like "it's worth noting". +- AUDIENCE: Network engineers and IT professionals. Assume technical fluency. +- FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms. + +Do not summarize what you are about to write. Start with the hook directly.""" + + +def extract_ghost_content(html: str) -> tuple[str, str] | None: + """Extrahiert (title, clean_text) aus Ghost CMS HTML.""" + + # Titel aus og:title oder h1 + title_match = ( + re.search(r'([\s\S]+?)', html) + ) + title = title_match.group(1).strip() if title_match else "" + # Multiline title collapse + title = re.sub(r'\s+', ' ', title).strip() + # HTML entities in title + title = (title.replace('&', '&').replace('<', '<').replace('>', '>') + .replace('"', '"').replace(''', "'").replace(''', "'") + .replace(' ', ' ').replace('—', '—').replace('–', '–')) + # Ghost appends " | Blog" or " – Site Name" — only strip at a pipe or en-dash + # surrounded by spaces (require \s+, NOT \s* to avoid cutting hyphenated words) + title = re.sub(r"\s+[|–]\s+.+$", "", title).strip() + + # Ghost CMS Content-Selektoren (in Prioritätsreihenfolge) + content_patterns = [ + r'
(.*?)\s*(?:(.*?)', + r'class="post-full-content[^"]*"[^>]*>(.*?)', + r'
(.*?)
\s*(?:
]*>.*?', '', raw_content, flags=re.DOTALL) + raw_content = re.sub(r']*>.*?', '', raw_content, flags=re.DOTALL) + raw_content = re.sub(r']*>.*?', '', raw_content, flags=re.DOTALL) + + # HTML → Markdown-ähnliches Format + # Headers + for level in [6, 5, 4, 3, 2, 1]: + hashes = "#" * level + raw_content = re.sub( + rf']*>(.*?)', + lambda m: f"\n{hashes} {re.sub('<[^>]+>', '', m.group(1)).strip()}\n", + raw_content, + flags=re.DOTALL | re.IGNORECASE, + ) + + # Bold/italic + raw_content = re.sub(r']*>(.*?)', r'**\1**', raw_content, flags=re.DOTALL) + raw_content = re.sub(r']*>(.*?)', r'**\1**', raw_content, flags=re.DOTALL) + raw_content = re.sub(r']*>(.*?)', r'*\1*', raw_content, flags=re.DOTALL) + + # Lists + raw_content = re.sub(r']*>(.*?)', r'\n- \1', raw_content, flags=re.DOTALL) + raw_content = re.sub(r'<[uo]l[^>]*>', '\n', raw_content) + raw_content = re.sub(r'', '\n', raw_content) + + # Paragraphs → newlines + raw_content = re.sub(r'', '\n', raw_content) + raw_content = re.sub(r']*>', '\n', raw_content) + raw_content = re.sub(r'

', '\n', raw_content) + + # Code blocks + raw_content = re.sub(r']*>]*>(.*?)', + lambda m: f"\n```\n{m.group(1)}\n```\n", + raw_content, flags=re.DOTALL) + + # Alle verbleibenden Tags entfernen + raw_content = re.sub(r'<[^>]+>', ' ', raw_content) + + # HTML entities + raw_content = raw_content.replace('&', '&').replace('<', '<').replace('>', '>') + raw_content = raw_content.replace(' ', ' ').replace('"', '"').replace(''', "'") + raw_content = raw_content.replace('—', '—').replace('–', '–').replace('…', '…') + + # Whitespace normalisieren + raw_content = re.sub(r'\n{3,}', '\n\n', raw_content) + raw_content = re.sub(r'[ \t]+', ' ', raw_content) + raw_content = re.sub(r'\n ', '\n', raw_content) + clean = raw_content.strip() + + if len(clean.split()) < 200: + return None + + return title, clean + + +def slug_to_topic(slug: str) -> str: + """Wandelt URL-Slug in lesbaren Topic-String.""" + return slug.replace("-", " ").title() + + +def main() -> None: + html_files = sorted(f for f in POSTS_DIR.glob("*.html") + if f.stat().st_size > 10_000 + and f.stem not in SKIP_SLUGS + and re.match(r'^[a-z0-9]', f.stem)) # skip hidden/garbage files + + print(f"Parsing {len(html_files)} HTML files...") + + results = [] + skipped = 0 + seen_slugs: set[str] = set() + + for fpath in html_files: + if fpath.stem in seen_slugs: + print(f" SKIP (duplicate slug): {fpath.name}") + skipped += 1 + continue + seen_slugs.add(fpath.stem) + + html = fpath.read_text(errors="ignore") + extracted = extract_ghost_content(html) + + if extracted is None: + print(f" SKIP (no content): {fpath.name}") + skipped += 1 + continue + + title, clean_text = extracted + word_count = len(clean_text.split()) + + if not title: + title = slug_to_topic(fpath.stem) + + print(f" OK: {word_count:4d}w | {title[:60]}") + + # Input-Text: genau wie beim Generieren — topic + audience + reminder + # Audience basierend auf Slug-Keywords bestimmen + slug = fpath.stem + if any(k in slug for k in ["shieldx", "claude", "papercortex", "llm", "slop", "sync"]): + audience = "developers and infrastructure engineers building AI-powered tools" + elif any(k in slug for k in ["aspa", "bgp", "peercortex", "infrastructure"]): + audience = "network engineers and NOC operators" + else: + audience = "network engineers and IT professionals who evaluate and operate optical infrastructure" + + input_text = ( + f"Write a blog post on the following topic:\n\n" + f"**Topic:** {title}\n\n" + f"**Target audience:** {audience}\n\n" + f"Remember: 700–1000 words, hook + technical sections + 3 takeaways. " + f"Stay strictly on-topic. No filler. Start writing now." + ) + + record = { + "system_prompt": SYSTEM_PROMPT, + "input_text": input_text, + "output_text": clean_text, + "meta": { + "title": title, + "slug": slug, + "source": "blog.fichtmueller.org", + "word_count": word_count, + "quality": "human_written", + "weight": 3.0, # 3x Gewichtung in Training — Gold Standard + "dataset_version": "v8", + }, + } + results.append(record) + + OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) + with open(OUTPUT_FILE, "w", encoding="utf-8") as f: + for r in results: + f.write(json.dumps(r, ensure_ascii=False) + "\n") + + print(f"\nGespeichert: {len(results)} Posts → {OUTPUT_FILE}") + print(f"Übersprungen: {skipped}") + + # Statistik + wcs = [r["meta"]["word_count"] for r in results] + if wcs: + print(f"Word count: min={min(wcs)}, max={max(wcs)}, avg={sum(wcs)//len(wcs)}") + + +if __name__ == "__main__": + main() diff --git a/packages/fine-tuner/scripts/process_v6_blogs.py b/packages/fine-tuner/scripts/process_v6_blogs.py new file mode 100644 index 0000000..f0c0ae8 --- /dev/null +++ b/packages/fine-tuner/scripts/process_v6_blogs.py @@ -0,0 +1,323 @@ +#!/usr/bin/env python3 +""" +process_v6_blogs.py — Verarbeitet echte fo-blog-v6 Outputs als v8 Trainingsdaten + +Analysiert 101 v6-generierte Blog-Posts aus /opt/tip/blog-training-data/ +und erstellt daraus: + + 1. SFT records — Posts mit 700-1100w → direkt als Training-Beispiele + 2. DPO pairs — Posts >1100w: + rejected = der zu lange Originalpost + chosen = Claude rewritet ihn als saubere 700-1000w Version + +Dies sind echte Modell-Failures (nicht synthetisch!) — besonders wertvoll für DPO. + +Input: ~/transceiver-training-data/v6-tip-blogs/*.md +Output: + ~/transceiver-training-data/v8-v6blogs-sft.jsonl (gute Posts als SFT) + ~/transceiver-training-data/v8-v6blogs-dpo.jsonl (zu lange Posts als DPO) + +Usage: + python3 scripts/process_v6_blogs.py + python3 scripts/process_v6_blogs.py --max-dpo 30 # nur 30 DPO Pairs + python3 scripts/process_v6_blogs.py --sft-only # nur SFT Records + python3 scripts/process_v6_blogs.py --dry-run # Stats only +""" + +from __future__ import annotations + +import argparse +import json +import re +import subprocess +import time +from pathlib import Path + +BLOGS_DIR = Path.home() / "transceiver-training-data" / "v6-tip-blogs" +SFT_OUTPUT = Path.home() / "transceiver-training-data" / "v8-v6blogs-sft.jsonl" +DPO_OUTPUT = Path.home() / "transceiver-training-data" / "v8-v6blogs-dpo.jsonl" +PROGRESS_FILE = Path.home() / "transceiver-training-data" / "v8-v6blogs-progress.json" + +# Word count ranges +GOOD_MIN = 700 +GOOD_MAX = 1100 +REJECTED_MIN = 1100 # posts above this are "too long" → rejected examples + +CLAUDE_TIMEOUT = 180 + +SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, and network infrastructure. + +STRICT CONSTRAINTS — Follow exactly, no exceptions: +- LENGTH: 700–1000 words. Count carefully. Stop at 1000 words maximum. +- STRUCTURE (mandatory, in this order): + 1. HOOK paragraph — 2–3 sentences stating the problem this post addresses + 2. Technical sections — 3–4 H2 sections covering the topic in depth + 3. PRACTICAL TAKEAWAYS — exactly 3 bullet points, actionable +- TOPIC DISCIPLINE: Write ONLY about the exact topic requested. Zero drift. +- NO REPETITION: Every sentence must add new information. No restating. +- VOICE: Confident, direct. No hedging phrases like "it's worth noting". +- AUDIENCE: Network engineers and IT professionals. Assume technical fluency. +- FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms. + +Do not summarize what you are about to write. Start with the hook directly.""" + + +def parse_blog_md(path: Path) -> dict | None: + """Parse a blog markdown file with YAML frontmatter.""" + text = path.read_text(encoding="utf-8", errors="ignore") + + # Extract YAML frontmatter + frontmatter: dict = {} + content = text + fm_match = re.match(r'^---\s*\n(.*?)\n---\s*\n', text, re.DOTALL) + if fm_match: + fm_text = fm_match.group(1) + content = text[fm_match.end():] + # Parse key: "value" pairs + for line in fm_text.split('\n'): + kv = re.match(r'^(\w+):\s*"?(.+?)"?\s*$', line) + if kv: + frontmatter[kv.group(1)] = kv.group(2).strip('"').strip() + + title = frontmatter.get("title", "").strip('"') + if not title: + # Fallback: first H1 + h1 = re.search(r'^# (.+)$', content, re.MULTILINE) + title = h1.group(1).strip() if h1 else path.stem.replace("-", " ").title() + + slug = frontmatter.get("slug", path.stem) + category = frontmatter.get("category", "optical networking") + word_count = len(content.split()) + + if word_count < 200: + return None + + return { + "title": title, + "slug": slug, + "category": category, + "content": content.strip(), + "word_count": word_count, + "path": str(path), + } + + +def build_input_text(title: str, audience: str) -> str: + return ( + f"Write a blog post on the following topic:\n\n" + f"**Topic:** {title}\n\n" + f"**Target audience:** {audience}\n\n" + f"Remember: 700–1000 words, hook + technical sections + 3 takeaways. " + f"Stay strictly on-topic. No filler. Start writing now." + ) + + +def get_audience(category: str) -> str: + cat = category.lower() + if any(k in cat for k in ["fiber", "cabling", "mtp", "mpo", "connector"]): + return "data center engineers and cabling specialists" + elif any(k in cat for k in ["coherent", "dwdm", "zr", "metro"]): + return "network architects and optical engineers designing long-haul links" + elif any(k in cat for k in ["compatible", "procurement", "vendor", "cost", "price"]): + return "network procurement teams and IT managers evaluating transceiver vendors" + else: + return "network engineers and IT professionals who evaluate and operate optical infrastructure" + + +def rewrite_with_claude(title: str, source_content: str, audience: str) -> str | None: + """Rewrite a too-long v6 blog as a proper 700-1000w version (chosen).""" + # Truncate source to ~800 words for context + words = source_content.split() + if len(words) > 800: + source_content = " ".join(words[:800]) + "\n\n[Source truncated for reference]" + + prompt = ( + f"Rewrite this blog post to be 700-1000 words with the correct structure.\n\n" + f"**Topic:** {title}\n" + f"**Target audience:** {audience}\n\n" + f"**Original post (DO NOT COPY — use only as topic reference, rewrite completely):**\n\n" + f"{source_content}\n\n" + f"Rewrite now. 700-1000 words. Hook + technical sections + 3 takeaways. Start directly." + ) + + try: + result = subprocess.run( + ["claude", "--print", "--system-prompt", SYSTEM_PROMPT, "-p", prompt], + capture_output=True, text=True, timeout=CLAUDE_TIMEOUT, + ) + if result.returncode != 0 or not result.stdout.strip(): + return None + output = result.stdout.strip() + word_count = len(output.split()) + if word_count < 400 or word_count > 2000: + return None + return output + except subprocess.TimeoutExpired: + print(f" TIMEOUT: Claude took too long for {title[:50]}") + return None + except Exception as e: + print(f" ERROR: {e}") + return None + + +def load_progress() -> set[str]: + if not PROGRESS_FILE.exists(): + return set() + try: + return set(json.loads(PROGRESS_FILE.read_text()).get("done", [])) + except Exception: + return set() + + +def save_progress(done: set[str]) -> None: + PROGRESS_FILE.write_text(json.dumps({"done": list(done)})) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Process v6 TIP blogs into v8 training data") + parser.add_argument("--max-dpo", type=int, default=None, help="Max DPO pairs to generate") + parser.add_argument("--sft-only", action="store_true", help="Only create SFT records (skip DPO)") + parser.add_argument("--dpo-only", action="store_true", help="Only create DPO pairs (skip SFT)") + parser.add_argument("--dry-run", action="store_true", help="Statistics only, no output") + args = parser.parse_args() + + # Load all blog files + md_files = sorted(BLOGS_DIR.glob("*.md")) + if not md_files: + print(f"No .md files found in {BLOGS_DIR}") + return + + blogs = [] + for path in md_files: + parsed = parse_blog_md(path) + if parsed: + blogs.append(parsed) + + # Categorize + good = [b for b in blogs if GOOD_MIN <= b["word_count"] <= GOOD_MAX] + too_long = [b for b in blogs if b["word_count"] > REJECTED_MIN] + too_short = [b for b in blogs if b["word_count"] < GOOD_MIN] + + print(f"=== v6 TIP Blog Analysis ===") + print(f"Total: {len(blogs)} files") + print(f"Good (SFT): {len(good)} files ({GOOD_MIN}-{GOOD_MAX}w)") + print(f"Too long: {len(too_long)} files (>{REJECTED_MIN}w) → DPO rejected") + print(f"Too short: {len(too_short)} files (<{GOOD_MIN}w) → skip") + print() + + if args.dry_run: + print("Good posts:") + for b in sorted(good, key=lambda x: x["word_count"]): + print(f" {b['word_count']:4d}w | {b['title'][:60]}") + print("\nToo long (top 10):") + for b in sorted(too_long, key=lambda x: x["word_count"], reverse=True)[:10]: + print(f" {b['word_count']:4d}w | {b['title'][:60]}") + return + + done = load_progress() + + # ─── Phase 1: SFT Records from good posts ────────────────────────────────── + if not args.dpo_only: + print("=== Phase 1: SFT Records (good posts) ===") + SFT_OUTPUT.parent.mkdir(parents=True, exist_ok=True) + sft_count = 0 + with open(SFT_OUTPUT, "w", encoding="utf-8") as f: + for blog in good: + audience = get_audience(blog["category"]) + record = { + "system_prompt": SYSTEM_PROMPT, + "input_text": build_input_text(blog["title"], audience), + "output_text": blog["content"], + "meta": { + "title": blog["title"], + "slug": blog["slug"], + "source": "tip-v6-blogs", + "word_count": blog["word_count"], + "quality": "v6_output_good", + "weight": 2.0, # Real model output, good length + "dataset_version": "v8", + }, + } + f.write(json.dumps(record, ensure_ascii=False) + "\n") + sft_count += 1 + print(f" SFT: {blog['word_count']:4d}w | {blog['title'][:60]}") + + print(f"\nSFT saved: {sft_count} records → {SFT_OUTPUT}") + + # ─── Phase 2: DPO Pairs from too-long posts ──────────────────────────────── + if not args.sft_only: + print("\n=== Phase 2: DPO Pairs (too-long posts → rewrite) ===") + DPO_OUTPUT.parent.mkdir(parents=True, exist_ok=True) + + candidates = too_long + if args.max_dpo: + candidates = candidates[:args.max_dpo] + + dpo_saved = 0 + dpo_skipped = 0 + + with open(DPO_OUTPUT, "a", encoding="utf-8") as f: + for i, blog in enumerate(candidates): + slug = blog["slug"] + if slug in done: + dpo_skipped += 1 + continue + + print(f" [{i+1}/{len(candidates)}] {blog['word_count']:4d}w → rewrite: {blog['title'][:55]}") + audience = get_audience(blog["category"]) + + # Get Claude to write a GOOD version → chosen + chosen = rewrite_with_claude(blog["title"], blog["content"], audience) + + if not chosen: + print(f" SKIP (Claude failed)") + done.add(slug) + dpo_skipped += 1 + continue + + chosen_wc = len(chosen.split()) + print(f" OK: chosen={chosen_wc}w (was {blog['word_count']}w)") + + # Build DPO prompt (ChatML prefix) + prompt = ( + f"<|im_start|>system\n{SYSTEM_PROMPT}<|im_end|>\n" + f"<|im_start|>user\n{build_input_text(blog['title'], audience)}<|im_end|>\n" + ) + + pair = { + "prompt": prompt, + "chosen": chosen, + "rejected": blog["content"], + "meta": { + "title": blog["title"], + "slug": slug, + "source": "tip-v6-dpo", + "rejection_strategy": "too_long_real", + "chosen_words": chosen_wc, + "rejected_words": blog["word_count"], + "dataset_version": "v8", + }, + } + f.write(json.dumps(pair, ensure_ascii=False) + "\n") + f.flush() + done.add(slug) + save_progress(done) + dpo_saved += 1 + time.sleep(1) + + print(f"\nDPO saved: {dpo_saved} pairs | skipped: {dpo_skipped} → {DPO_OUTPUT}") + + # ─── Summary ─────────────────────────────────────────────────────────────── + print("\n=== Summary ===") + if SFT_OUTPUT.exists(): + with open(SFT_OUTPUT) as f: + sft_n = sum(1 for _ in f) + print(f"SFT: {sft_n} records → {SFT_OUTPUT}") + if DPO_OUTPUT.exists(): + with open(DPO_OUTPUT) as f: + dpo_n = sum(1 for _ in f) + print(f"DPO: {dpo_n} pairs → {DPO_OUTPUT}") + + +if __name__ == "__main__": + main() diff --git a/packages/fine-tuner/scripts/run_v8_pipeline.sh b/packages/fine-tuner/scripts/run_v8_pipeline.sh new file mode 100755 index 0000000..bb7c409 --- /dev/null +++ b/packages/fine-tuner/scripts/run_v8_pipeline.sh @@ -0,0 +1,178 @@ +#!/usr/bin/env bash +# ═══════════════════════════════════════════════════════════════════════════════ +# run_v8_pipeline.sh — fo-blog-v8 Autopilot Pipeline +# +# Qwen2.5-14B, LoRA r=64, 5 epochs SFT + 2 epochs DPO +# +# Erwartet dass folgende Daten bereit sind: +# ~/transceiver-training-data/v8-real-posts-sft.jsonl (19 real posts) +# ~/transceiver-training-data/v7-generated-sft.jsonl (v7 generated, ≥100) +# ~/transceiver-training-data/v8-v6blogs-sft.jsonl (v6 tip blogs good) +# ~/transceiver-training-data/v8-external-sft.jsonl (crawled external) +# ~/transceiver-training-data/v7-dpo-pairs.jsonl (v7 DPO) +# ~/transceiver-training-data/v8-v6blogs-dpo.jsonl (real v6 failures) +# +# Usage: +# bash scripts/run_v8_pipeline.sh # full auto +# bash scripts/run_v8_pipeline.sh --wait-crawl # wait for crawler first +# bash scripts/run_v8_pipeline.sh --phase-from merge # skip training +# bash scripts/run_v8_pipeline.sh --phase-from dpo # skip SFT, do DPO + merge +# ═══════════════════════════════════════════════════════════════════════════════ + +set -euo pipefail + +FINE_TUNER_DIR="$(cd "$(dirname "$0")/.." && pwd)" +PYTHON="/opt/homebrew/bin/python3.13" +SCRIPTS="$FINE_TUNER_DIR/scripts" +DATA_DIR="$HOME/transceiver-training-data" +LOG_DIR="/tmp/v8-pipeline" +TIMESTAMP=$(date +%Y%m%d-%H%M%S) + +mkdir -p "$LOG_DIR" + +# ─── Colors ─────────────────────────────────────────────────────────────────── +GREEN='\033[0;32m'; YELLOW='\033[1;33m'; RED='\033[0;31m'; NC='\033[0m'; BOLD='\033[1m' +log() { echo -e "${GREEN}[$(date +%H:%M:%S)]${NC} $*"; } +warn() { echo -e "${YELLOW}[$(date +%H:%M:%S)] ⚠${NC} $*"; } +err() { echo -e "${RED}[$(date +%H:%M:%S)] ✗${NC} $*"; } +step() { echo -e "\n${BOLD}${GREEN}══ $* ══${NC}"; } + +# ─── Args ───────────────────────────────────────────────────────────────────── +WAIT_CRAWL=false +PHASE_FROM="consolidate" # consolidate | sft | dpo | merge + +for arg in "$@"; do + case "$arg" in + --wait-crawl) WAIT_CRAWL=true ;; + --phase-from=*) PHASE_FROM="${arg#*=}" ;; + --phase-from) shift; PHASE_FROM="$1" ;; + esac +done + +# ─── Step 0: Wait for external crawler ──────────────────────────────────────── +if [[ "$WAIT_CRAWL" == "true" ]]; then + step "Warte auf v8 External Crawler (crawl_v8_sources.py)" + while pgrep -f "crawl_v8_sources.py" > /dev/null 2>&1; do + EXT_COUNT=$(wc -l < "$DATA_DIR/v8-external-sft.jsonl" 2>/dev/null || echo 0) + log " Crawler läuft noch... $EXT_COUNT externe Artikel bisher" + sleep 120 + done + EXT_COUNT=$(wc -l < "$DATA_DIR/v8-external-sft.jsonl" 2>/dev/null || echo 0) + log "✓ Crawler fertig: $EXT_COUNT externe Artikel → $DATA_DIR/v8-external-sft.jsonl" + + # Also wait for v6 DPO generation + while pgrep -f "process_v6_blogs.py" > /dev/null 2>&1; do + DPO_COUNT=$(wc -l < "$DATA_DIR/v8-v6blogs-dpo.jsonl" 2>/dev/null || echo 0) + log " v6 DPO Generator läuft... $DPO_COUNT Pairs bisher" + sleep 120 + done + DPO_V6_COUNT=$(wc -l < "$DATA_DIR/v8-v6blogs-dpo.jsonl" 2>/dev/null || echo 0) + log "✓ v6 DPO fertig: $DPO_V6_COUNT Pairs" +fi + +# ─── Step 1: Datenlage prüfen ───────────────────────────────────────────────── +step "Datenlage prüfen" +cd "$FINE_TUNER_DIR" +$PYTHON "$SCRIPTS/consolidate_v8_dataset.py" --stats-only + +REAL_COUNT=$(wc -l < "$DATA_DIR/v8-real-posts-sft.jsonl" 2>/dev/null || echo 0) +V7GEN_COUNT=$(wc -l < "$DATA_DIR/v7-generated-sft.jsonl" 2>/dev/null || echo 0) +V6BLOG_COUNT=$(wc -l < "$DATA_DIR/v8-v6blogs-sft.jsonl" 2>/dev/null || echo 0) +EXT_COUNT=$(wc -l < "$DATA_DIR/v8-external-sft.jsonl" 2>/dev/null || echo 0) + +log "SFT Quellen:" +log " Real posts (Gold ×3): $REAL_COUNT" +log " v7 Generated (×1): $V7GEN_COUNT" +log " v6 TIP Blogs (×2): $V6BLOG_COUNT" +log " External crawled (×1.5): $EXT_COUNT" + +TOTAL_EST=$(( REAL_COUNT*3 + V7GEN_COUNT + V6BLOG_COUNT*2 + EXT_COUNT*2 )) +log " Geschätzt total effective: $TOTAL_EST" + +if [[ "$TOTAL_EST" -lt 80 ]]; then + err "Zu wenig Daten ($TOTAL_EST effective) — mindestens 80 nötig!" + err "Warte auf v7-generation oder crawl_v8_sources.py" + exit 1 +fi + +# ─── Step 2: Dataset konsolidieren ──────────────────────────────────────────── +if [[ "$PHASE_FROM" == "consolidate" || "$PHASE_FROM" == "sft" || "$PHASE_FROM" == "dpo" || "$PHASE_FROM" == "merge" ]]; then + if [[ "$PHASE_FROM" == "consolidate" ]]; then + step "Phase 0: Dataset Konsolidierung" + CONS_LOG="$LOG_DIR/consolidate-$TIMESTAMP.log" + log "Starte consolidate_v8_dataset.py..." + $PYTHON "$SCRIPTS/consolidate_v8_dataset.py" 2>&1 | tee "$CONS_LOG" + SFT_MERGED=$(wc -l < "$DATA_DIR/v8-sft-merged.jsonl" 2>/dev/null || echo 0) + DPO_MERGED=$(wc -l < "$DATA_DIR/v8-dpo-merged.jsonl" 2>/dev/null || echo 0) + log "✓ Merged: $SFT_MERGED SFT + $DPO_MERGED DPO" + else + log "Phase: $PHASE_FROM — Konsolidierung übersprungen" + if [[ ! -f "$DATA_DIR/v8-sft-merged.jsonl" ]]; then + warn "v8-sft-merged.jsonl fehlt — erstelle schnell..." + $PYTHON "$SCRIPTS/consolidate_v8_dataset.py" + fi + fi +fi + +# ─── Step 3: SFT Training ───────────────────────────────────────────────────── +if [[ "$PHASE_FROM" == "consolidate" || "$PHASE_FROM" == "sft" ]]; then + step "Phase 1: SFT Training (Qwen2.5-14B, LoRA r=64, 5 Epochs)" + SFT_LOG="$LOG_DIR/sft-$TIMESTAMP.log" + log "Starte train_blog_v8.py --phase sft..." + log "Log: $SFT_LOG" + log "Estimated: ~10-14 Stunden (run overnight!)" + $PYTHON "$SCRIPTS/train_blog_v8.py" --phase sft 2>&1 | tee "$SFT_LOG" + + ADAPTER="$FINE_TUNER_DIR/adapters/fo-blog-v8/adapter" + if [[ ! -d "$ADAPTER" ]]; then + err "SFT Adapter nicht gefunden: $ADAPTER" + exit 1 + fi + log "✓ SFT Adapter: $ADAPTER" +fi + +# ─── Step 4: DPO Training ───────────────────────────────────────────────────── +if [[ "$PHASE_FROM" == "consolidate" || "$PHASE_FROM" == "sft" || "$PHASE_FROM" == "dpo" ]]; then + step "Phase 2: DPO Training (2 Epochs)" + DPO_LOG="$LOG_DIR/dpo-$TIMESTAMP.log" + + DPO_FILE="$DATA_DIR/v8-dpo-merged.jsonl" + if [[ ! -f "$DPO_FILE" ]]; then + warn "DPO File fehlt — überspringe DPO Phase" + else + DPO_COUNT=$(wc -l < "$DPO_FILE") + log "DPO Pairs: $DPO_COUNT" + log "Starte train_blog_v8.py --phase dpo..." + $PYTHON "$SCRIPTS/train_blog_v8.py" --phase dpo 2>&1 | tee "$DPO_LOG" + log "✓ DPO Training abgeschlossen" + fi +fi + +# ─── Step 5: Merge + GGUF + Ollama ─────────────────────────────────────────── +step "Phase 3: Merge + GGUF + Ollama Registrierung" +CONV_LOG="$LOG_DIR/convert-$TIMESTAMP.log" +log "Starte train_blog_v8.py --phase convert..." +$PYTHON "$SCRIPTS/train_blog_v8.py" --phase convert 2>&1 | tee "$CONV_LOG" +log "✓ fo-blog-v8 in Ollama registriert" + +# ─── Abschluss ──────────────────────────────────────────────────────────────── +step "v8 Pipeline ABGESCHLOSSEN" +echo "" +log "fo-blog-v8 ist bereit:" +log " Ollama: ollama run fo-blog-v8" +log " API: OLLAMA_LLM_MODEL=fo-blog-v8" +echo "" +log "Auf Erik deployen:" +log " 1. GGUF rsync: rsync -avz models/fo-blog-v8/fo-blog-v8.gguf root@erik:/opt/ollama-models/" +log " 2. Ollama: ssh erik 'ollama create fo-blog-v8 -f /opt/tip/Modelfile-v8'" +log " 3. TIP: ecosystem.config.js → OLLAMA_LLM_MODEL=fo-blog-v8" +log " 4. Restart: ssh erik 'cd /opt/tip && pm2 restart ecosystem.config.js --update-env'" +echo "" +log "Logs: $LOG_DIR/" +echo "" +log "v8 vs v7 Verbesserungen:" +log " - 14B statt 7B (4× Parameter)" +log " - Echte Blog-Posts ×3 gewichtet" +log " - Echte Modell-Failures als DPO (v6 too-long posts)" +log " - Externe Quellen: APNIC, RIPE Labs, potaroo.net, Cloudflare" +log " - 5 SFT + 2 DPO Epochs (war 4 + 1)" diff --git a/packages/fine-tuner/scripts/train_blog_v8.py b/packages/fine-tuner/scripts/train_blog_v8.py new file mode 100644 index 0000000..dbe4fa3 --- /dev/null +++ b/packages/fine-tuner/scripts/train_blog_v8.py @@ -0,0 +1,409 @@ +#!/usr/bin/env python3 +""" +train_blog_v8.py — fo-blog-v8 Training (Qwen2.5-14B, MPS LoRA) + +Phase 1: SFT (5 epochs, LoRA r=64, from merged v8 dataset) +Phase 2: DPO (2 epochs, from SFT adapter) + +Usage: + python3 scripts/train_blog_v8.py --phase sft + python3 scripts/train_blog_v8.py --phase dpo + python3 scripts/train_blog_v8.py --phase both # SFT then DPO sequentially + +Hardware: Apple Silicon M4 Max (48GB), MPS backend +Estimated: SFT ~10-14h, DPO ~3-5h (run overnight) +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +from pathlib import Path + +# ─── Paths ──────────────────────────────────────────────────────────────────── +FINE_TUNER_DIR = Path(__file__).parent.parent +DATA_DIR = Path.home() / "transceiver-training-data" +SFT_DATA = DATA_DIR / "v8-sft-merged.jsonl" +DPO_DATA = DATA_DIR / "v8-dpo-merged.jsonl" +SFT_ADAPTER = FINE_TUNER_DIR / "adapters" / "fo-blog-v8" / "adapter" +DPO_ADAPTER = FINE_TUNER_DIR / "adapters" / "fo-blog-v8-dpo" / "adapter" +MERGED_DIR = FINE_TUNER_DIR / "models" / "fo-blog-v8" / "merged" + +BASE_MODEL = "Qwen/Qwen2.5-14B-Instruct" + +SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, and network infrastructure. + +STRICT CONSTRAINTS — Follow exactly, no exceptions: +- LENGTH: 700–1000 words. Count carefully. Stop at 1000 words maximum. +- STRUCTURE (mandatory, in this order): + 1. HOOK paragraph — 2–3 sentences stating the problem this post addresses + 2. Technical sections — 3–4 H2 sections covering the topic in depth + 3. PRACTICAL TAKEAWAYS — exactly 3 bullet points, actionable +- TOPIC DISCIPLINE: Write ONLY about the exact topic requested. Zero drift. +- NO REPETITION: Every sentence must add new information. No restating. +- VOICE: Confident, direct. No hedging phrases like "it's worth noting". +- AUDIENCE: Network engineers and IT professionals. Assume technical fluency. +- FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms. + +Do not summarize what you are about to write. Start with the hook directly.""" + + +def build_chatml(system: str, user: str, assistant: str) -> str: + """Build ChatML-formatted training string.""" + return ( + f"<|im_start|>system\n{system}<|im_end|>\n" + f"<|im_start|>user\n{user}<|im_end|>\n" + f"<|im_start|>assistant\n{assistant}<|im_end|>" + ) + + +def load_sft_dataset(tokenizer, max_seq_length: int = 4096): + """Load + tokenize SFT dataset from v8-sft-merged.jsonl.""" + from datasets import Dataset + + if not SFT_DATA.exists(): + raise FileNotFoundError( + f"SFT data not found: {SFT_DATA}\n" + "Run: python3 scripts/consolidate_v8_dataset.py" + ) + + records = [] + with open(SFT_DATA, encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + item = json.loads(line) + system = item.get("system_prompt", SYSTEM_PROMPT) + user = item.get("input_text", "") + assistant = item.get("output_text", "") + if user and assistant: + text = build_chatml(system, user, assistant) + records.append({"text": text}) + except (json.JSONDecodeError, KeyError): + pass + + print(f"Loaded {len(records)} SFT examples from {SFT_DATA.name}") + return Dataset.from_list(records) + + +def load_dpo_dataset(): + """Load DPO dataset from v8-dpo-merged.jsonl.""" + from datasets import Dataset + + if not DPO_DATA.exists(): + raise FileNotFoundError( + f"DPO data not found: {DPO_DATA}\n" + "Run: python3 scripts/consolidate_v8_dataset.py" + ) + + records = [] + with open(DPO_DATA, encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + item = json.loads(line) + records.append({ + "prompt": item["prompt"], + "chosen": item["chosen"], + "rejected": item["rejected"], + }) + except (json.JSONDecodeError, KeyError): + pass + + print(f"Loaded {len(records)} DPO pairs from {DPO_DATA.name}") + return Dataset.from_list(records) + + +def run_sft() -> None: + """Phase 1: Supervised Fine-Tuning with LoRA.""" + import torch + from peft import LoraConfig, TaskType + from transformers import AutoModelForCausalLM, AutoTokenizer + from trl import SFTTrainer, SFTConfig + + print(f"=== fo-blog-v8 SFT: {BASE_MODEL} → LoRA r=64 ===") + print(f"Device: {'MPS' if torch.backends.mps.is_available() else 'CPU'}") + + # ── Tokenizer ── + print("Loading tokenizer...") + tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL, trust_remote_code=True) + tokenizer.pad_token = tokenizer.eos_token + tokenizer.padding_side = "right" + + # ── Dataset ── + dataset = load_sft_dataset(tokenizer, max_seq_length=4096) + + # ── Model ── + print(f"Loading base model: {BASE_MODEL}") + device = "mps" if torch.backends.mps.is_available() else "cpu" + model = AutoModelForCausalLM.from_pretrained( + BASE_MODEL, + dtype=torch.bfloat16, # bf16 for M4 Max (transformers 5.x: dtype= not torch_dtype=) + device_map=device, + trust_remote_code=True, + ) + model.config.use_cache = False + + # ── LoRA Config ── + lora_config = LoraConfig( + r=64, + lora_alpha=128, + lora_dropout=0.05, + target_modules=["q_proj", "k_proj", "v_proj", "o_proj", + "gate_proj", "up_proj", "down_proj"], + bias="none", + task_type=TaskType.CAUSAL_LM, + ) + + # ── Training Config (trl 1.x: SFTConfig carries both TrainingArguments + SFT params) ── + SFT_ADAPTER.mkdir(parents=True, exist_ok=True) + training_args = SFTConfig( + output_dir=str(SFT_ADAPTER), + num_train_epochs=5, + per_device_train_batch_size=1, + gradient_accumulation_steps=8, + learning_rate=1.2e-4, + warmup_ratio=0.05, + lr_scheduler_type="cosine", + bf16=True, + fp16=False, + optim="adamw_torch", + weight_decay=0.01, + max_grad_norm=1.0, + logging_steps=10, + save_steps=100, + save_total_limit=2, + eval_strategy="no", + dataloader_num_workers=0, + remove_unused_columns=False, + gradient_checkpointing=True, + report_to="none", + # SFT-specific (moved from SFTTrainer in trl 1.x) + dataset_text_field="text", + max_seq_length=4096, + packing=False, + ) + + # ── Trainer ── + trainer = SFTTrainer( + model=model, + train_dataset=dataset, + peft_config=lora_config, + processing_class=tokenizer, + args=training_args, + ) + + print(f"Starting SFT training: {len(dataset)} examples, 5 epochs...") + trainer.train() + + print(f"Saving SFT adapter → {SFT_ADAPTER}") + trainer.save_model(str(SFT_ADAPTER)) + tokenizer.save_pretrained(str(SFT_ADAPTER)) + print("SFT Phase COMPLETE.") + + +def run_dpo() -> None: + """Phase 2: Direct Preference Optimization.""" + import torch + from peft import PeftModel + from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments + from trl import DPOTrainer, DPOConfig + + print(f"=== fo-blog-v8 DPO: SFT adapter → DPO ===") + + if not SFT_ADAPTER.exists(): + raise FileNotFoundError( + f"SFT adapter not found at {SFT_ADAPTER}\n" + "Run: python3 scripts/train_blog_v8.py --phase sft" + ) + + # ── Tokenizer ── + tokenizer = AutoTokenizer.from_pretrained(str(SFT_ADAPTER), trust_remote_code=True) + tokenizer.pad_token = tokenizer.eos_token + + # ── Dataset ── + dataset = load_dpo_dataset() + + # ── Model (base + SFT adapter) ── + print(f"Loading model + SFT adapter...") + device = "mps" if __import__("torch").backends.mps.is_available() else "cpu" + base_model = AutoModelForCausalLM.from_pretrained( + BASE_MODEL, + dtype=torch.bfloat16, + device_map=device, + trust_remote_code=True, + ) + model = PeftModel.from_pretrained(base_model, str(SFT_ADAPTER)) + + # ── DPO Config ── + DPO_ADAPTER.mkdir(parents=True, exist_ok=True) + dpo_config = DPOConfig( + output_dir=str(DPO_ADAPTER), + num_train_epochs=2, + per_device_train_batch_size=1, + gradient_accumulation_steps=8, + learning_rate=5e-5, + warmup_ratio=0.05, + lr_scheduler_type="cosine", + bf16=True, + fp16=False, + optim="adamw_torch", + max_grad_norm=1.0, + logging_steps=5, + save_steps=50, + save_total_limit=2, + eval_strategy="no", + dataloader_num_workers=0, + gradient_checkpointing=True, + report_to="none", + # DPO-specific + beta=0.1, + loss_type="sigmoid", + max_prompt_length=512, + max_length=4096, + ) + + # ── Trainer ── + trainer = DPOTrainer( + model=model, + ref_model=None, # use implicit reference via peft + args=dpo_config, + train_dataset=dataset, + processing_class=tokenizer, + ) + + print(f"Starting DPO training: {len(dataset)} pairs, 2 epochs...") + trainer.train() + + print(f"Saving DPO adapter → {DPO_ADAPTER}") + trainer.save_model(str(DPO_ADAPTER)) + tokenizer.save_pretrained(str(DPO_ADAPTER)) + print("DPO Phase COMPLETE.") + + +def run_merge_and_convert() -> None: + """Merge adapter → full model, convert to GGUF, register in Ollama.""" + import subprocess + import shutil + import torch + from peft import PeftModel + from transformers import AutoModelForCausalLM, AutoTokenizer + + # Prefer DPO adapter, fall back to SFT + adapter_path = DPO_ADAPTER if DPO_ADAPTER.exists() else SFT_ADAPTER + if not adapter_path.exists(): + print(f"No adapter found. Run --phase sft first.") + return + + print(f"=== fo-blog-v8 Merge + GGUF ===") + print(f"Adapter: {adapter_path}") + + # ── Merge ── + MERGED_DIR.mkdir(parents=True, exist_ok=True) + safetensors = MERGED_DIR / "model.safetensors" + if safetensors.exists() and safetensors.stat().st_size > 10_000_000_000: + print(f" Already merged ({safetensors.stat().st_size/1e9:.1f} GB) — skip merge") + else: + print(" Loading base model on CPU for merge (avoids MPS OOM)...") + tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL, trust_remote_code=True) + model = AutoModelForCausalLM.from_pretrained( + BASE_MODEL, dtype=torch.float16, + device_map="cpu", trust_remote_code=True, + ) + print(" Loading adapter...") + model = PeftModel.from_pretrained(model, str(adapter_path)) + print(" Merging...") + model = model.merge_and_unload() + print(f" Saving merged model → {MERGED_DIR}") + model.save_pretrained(str(MERGED_DIR), safe_serialization=True) + tokenizer.save_pretrained(str(MERGED_DIR)) + del model + print(" Merge done.") + + # ── Copy tokenizer files from HF cache if needed ── + hf_cache = Path.home() / ".cache/huggingface/hub" + snaps = list(hf_cache.glob("models--Qwen--Qwen2.5-14B-Instruct/snapshots/*/tokenizer.json")) + if snaps: + snap_dir = snaps[0].parent + for fname in ["tokenizer.json", "tokenizer_config.json", "vocab.json", "merges.txt"]: + if (snap_dir / fname).exists() and not (MERGED_DIR / fname).exists(): + shutil.copy2(snap_dir / fname, MERGED_DIR / fname) + + # ── GGUF Conversion ── + gguf_dir = FINE_TUNER_DIR / "models" / "fo-blog-v8" + gguf_f16 = gguf_dir / "fo-blog-v8-f16.gguf" + gguf_q4 = gguf_dir / "fo-blog-v8.gguf" + convert_script = "/opt/homebrew/Cellar/llama.cpp/8680/bin/convert_hf_to_gguf.py" + quantize_bin = "/opt/homebrew/bin/llama-quantize" + python_bin = "/opt/homebrew/bin/python3.13" + + if not gguf_f16.exists(): + print(" Converting to GGUF f16...") + subprocess.run( + [python_bin, convert_script, str(MERGED_DIR), + "--outfile", str(gguf_f16), "--outtype", "f16"], + check=True, + ) + else: + print(f" F16 GGUF exists ({gguf_f16.stat().st_size/1e9:.1f} GB) — skip") + + if not gguf_q4.exists(): + print(" Quantizing to Q4_K_M...") + subprocess.run( + [quantize_bin, str(gguf_f16), str(gguf_q4), "Q4_K_M"], + check=True, + ) + gguf_f16.unlink(missing_ok=True) + print(f" Q4_K_M GGUF: {gguf_q4} ({gguf_q4.stat().st_size/1e9:.1f} GB)") + + # ── Ollama Registration ── + modelfile_path = gguf_dir / "Modelfile-v8" + modelfile_content = f"""FROM {gguf_q4.resolve()} + +SYSTEM \"\"\"{SYSTEM_PROMPT}\"\"\" + +PARAMETER temperature 0.7 +PARAMETER top_p 0.9 +PARAMETER top_k 40 +PARAMETER repeat_penalty 1.15 +PARAMETER num_predict 1500 +""" + modelfile_path.write_text(modelfile_content) + print(" Registering in Ollama as fo-blog-v8...") + subprocess.run(["ollama", "create", "fo-blog-v8", "-f", str(modelfile_path)], check=True) + + import subprocess as sp + result = sp.run(["ollama", "list"], capture_output=True, text=True) + registered = "fo-blog-v8" in result.stdout + print(f" Ollama registration: {'✓ SUCCESS' if registered else '✗ FAILED'}") + print(f"\nDONE: {gguf_q4}") + + +def main() -> None: + parser = argparse.ArgumentParser(description="Train fo-blog-v8 (Qwen2.5-14B LoRA)") + parser.add_argument( + "--phase", + choices=["sft", "dpo", "both", "convert"], + default="sft", + help="Training phase to run (default: sft)", + ) + args = parser.parse_args() + + if args.phase in ("sft", "both"): + run_sft() + + if args.phase in ("dpo", "both"): + run_dpo() + + if args.phase == "convert": + run_merge_and_convert() + + +if __name__ == "__main__": + main()