feat: add fo-blog-v8 training pipeline (Qwen2.5-14B, SFT+DPO)

Full v8 training pipeline for the optical networking blog model:
- train_blog_v8.py: SFT (LoRA r=64, 5 epochs) + DPO (2 epochs) on Qwen2.5-14B-Instruct
  Fixed for trl 1.2.x: SFTConfig instead of TrainingArguments, processing_class= instead
  of tokenizer=, eval_strategy= instead of deprecated evaluation_strategy=
- consolidate_v8_dataset.py: weighted merge of all data sources (820 effective SFT / 235 DPO)
- crawl_v8_sources.py: APNIC/RIPE Labs/potaroo/Cloudflare crawler with balanced div extraction
- process_v6_blogs.py: converts 101 real v6 TIP blog outputs into SFT + DPO pairs
- label_v7_quality.py: Claude-judged quality labels → v8 quality DPO pairs
- parse_real_posts.py: parses blog.fichtmueller.org Ghost CMS HTML → gold SFT records
- run_v8_pipeline.sh: autopilot (consolidate → SFT → DPO → GGUF → Ollama)
- blog-v8-training.yaml: training config reference

Dataset breakdown: 19 real posts ×3 + 196 v7-gen + 28 v6blogs ×2 + 135 external ×1.5
This commit is contained in:
Rene Fichtmueller 2026-04-19 11:44:09 +02:00
parent 79d434434f
commit c3ab87b167
8 changed files with 2591 additions and 0 deletions

View File

@ -0,0 +1,159 @@
# ═══════════════════════════════════════════════════════════════════════════════
# blog-v8-training.yaml — fo-blog-v8 Training Configuration
#
# Base: Qwen/Qwen2.5-14B-Instruct (4× the capacity of v7's 7B)
# Target: 700-1000w blog posts, optical networking + BGP + infra
#
# Key improvements over v7:
# - 14B params → better instruction following at higher complexity
# - LoRA r=64 (was r=32) → more expressive adapter
# - Weighted datasets: human posts × 3.0, external rewritten × 1.5
# - More epochs (5 SFT, 2 DPO) → deeper style absorption
# - max_seq_length=4096 → handles longer real posts
# - DPO from real v7 quality labels (good/bad scored posts)
# ═══════════════════════════════════════════════════════════════════════════════
base_model: "Qwen/Qwen2.5-14B-Instruct"
# ─── Dataset Sources (merged by consolidate_v8_dataset.py) ────────────────────
datasets:
# Tier 1: Rene's actual blog posts — Gold Standard
- path: "~/transceiver-training-data/v8-real-posts-sft.jsonl"
weight: 3.0
description: "19 real blog posts from blog.fichtmueller.org (human written)"
# Tier 2: v7 generated blogs (Claude-written, 197 topics, validated)
- path: "~/transceiver-training-data/v7-generated-sft.jsonl"
weight: 1.0
description: "Claude-generated optical networking blogs (v7, 197 topics)"
# Tier 2: RIPE / APNIC NAS data
- path: "~/transceiver-training-data/v7-ripe-apnic-sft.jsonl"
weight: 1.0
description: "RIPE/APNIC BGP and routing content (v7 ingested)"
# Tier 3: External crawled + rewritten content
- path: "~/transceiver-training-data/v8-external-sft.jsonl"
weight: 1.5
description: "APNIC Blog / RIPE Labs / potaroo.net / Cloudflare (Claude rewritten)"
# DPO preferences: chosen/rejected pairs for preference learning
dpo:
- path: "~/transceiver-training-data/v7-dpo-pairs.jsonl"
description: "v7 DPO pairs (5 rejection strategies)"
- path: "~/transceiver-training-data/v8-quality-dpo.jsonl"
description: "v8 real quality labels (good/bad from v7 generated posts)"
optional: true # generated by label_v7_quality.py if run
# ─── SFT Phase ────────────────────────────────────────────────────────────────
sft:
output_dir: "adapters/fo-blog-v8/adapter"
merged_dir: "models/fo-blog-v8/merged"
# LoRA parameters
lora:
r: 64 # was 32 in v7 — more expressive
alpha: 128 # 2× r for stable training
dropout: 0.05
target_modules:
- "q_proj"
- "k_proj"
- "v_proj"
- "o_proj"
- "gate_proj"
- "up_proj"
- "down_proj"
# Training hyperparameters
training:
num_train_epochs: 5 # was 4 — extra epoch for 14B
per_device_train_batch_size: 1
gradient_accumulation_steps: 8 # effective batch = 8
learning_rate: 1.2e-4 # slightly lower than v7's 1.5e-4 for 14B stability
warmup_ratio: 0.05
lr_scheduler_type: "cosine"
max_seq_length: 4096 # was 2048 — handles longer real posts
fp16: false
bf16: true # M4 Max supports bf16
optim: "adamw_torch"
weight_decay: 0.01
max_grad_norm: 1.0
logging_steps: 10
save_steps: 100
evaluation_strategy: "no"
dataloader_num_workers: 0 # MPS: no multiprocessing
remove_unused_columns: false
gradient_checkpointing: true # save RAM on 14B
# Chat template (Qwen2.5 uses ChatML)
chat_template: "chatml"
dataset_text_field: "text"
# ─── DPO Phase ────────────────────────────────────────────────────────────────
dpo:
input_adapter: "adapters/fo-blog-v8/adapter" # start from SFT
output_dir: "adapters/fo-blog-v8-dpo/adapter"
training:
num_train_epochs: 2 # was 1 — more DPO for 14B
per_device_train_batch_size: 1
gradient_accumulation_steps: 8
learning_rate: 5e-5
warmup_ratio: 0.05
lr_scheduler_type: "cosine"
max_seq_length: 4096
bf16: true
optim: "adamw_torch"
logging_steps: 5
save_steps: 50
dataloader_num_workers: 0
gradient_checkpointing: true
dpo_params:
beta: 0.1 # KL penalty (standard)
loss_type: "sigmoid" # standard DPO loss
max_prompt_length: 512
max_length: 4096
# ─── GGUF Conversion ──────────────────────────────────────────────────────────
gguf:
output_name: "fo-blog-v8.gguf"
quantization: "Q4_K_M"
ollama_model_name: "fo-blog-v8"
convert_script: "/opt/homebrew/Cellar/llama.cpp/8680/bin/convert_hf_to_gguf.py"
quantize_bin: "/opt/homebrew/bin/llama-quantize"
# Ollama Modelfile system prompt
modelfile_system: |
You are an expert technical writer specializing in optical networking and transceiver technology.
STRICT CONSTRAINTS:
- LENGTH: 700-1000 words ONLY. Stop at 1000 words maximum.
- STRUCTURE: 1) Hook paragraph, 2) Technical sections (## headers), 3) Exactly 3 takeaways
- TOPIC DISCIPLINE: Write ONLY about the exact topic requested. Zero drift.
- NO REPETITION: Every sentence adds new information.
- VOICE: Confident and direct. No hedging phrases.
- AUDIENCE: Network engineers and IT professionals.
modelfile_params:
temperature: 0.7
top_p: 0.9
top_k: 40
repeat_penalty: 1.15
num_predict: 1500
# ─── Hardware ──────────────────────────────────────────────────────────────────
hardware:
device: "mps" # Apple Silicon M4 Max
ram_gb: 48
python: "/opt/homebrew/bin/python3.13"
# 14B model in fp16 ≈ 28GB — fits in 48GB with LoRA overhead (~4GB)
# Training peak RAM estimate: ~36-40GB
# Merge on CPU: device_map="cpu" to avoid MPS OOM during save_pretrained
# ─── Expected Timeline ─────────────────────────────────────────────────────────
# SFT: ~8-12 hours (5 epochs, 14B, MPS)
# DPO: ~2-4 hours (2 epochs, 14B)
# Merge: ~30 min (CPU)
# GGUF: ~15 min
# Total: ~12-16 hours (run overnight)

View File

@ -0,0 +1,394 @@
#!/usr/bin/env python3
"""
consolidate_v8_dataset.py Alle v8 Datenquellen zusammenführen
Merged alle JSONL-Quellen in ein einzelnes Training-Dataset:
Tier 1 (weight 3.0): Renes echte Blog-Posts (Gold Standard)
Tier 2 (weight 1.0): v7-generierte Blogs, RIPE/APNIC Ingest
Tier 3 (weight 1.5): Externe gecrawlte + umgeschriebene Posts
Für SFT-Training: Dupliziert High-Weight Beispiele entsprechend
Für DPO-Training: Merged alle DPO-Pair-Dateien
Output:
~/transceiver-training-data/v8-sft-merged.jsonl (SFT, gewichtet)
~/transceiver-training-data/v8-dpo-merged.jsonl (DPO, alle Pairs)
Usage:
python3 scripts/consolidate_v8_dataset.py
python3 scripts/consolidate_v8_dataset.py --no-weight # flat, no duplication
python3 scripts/consolidate_v8_dataset.py --stats-only # nur Statistiken
"""
from __future__ import annotations
import argparse
import json
import math
import random
from pathlib import Path
from typing import Any
DATA_DIR = Path.home() / "transceiver-training-data"
OUTPUT_SFT = DATA_DIR / "v8-sft-merged.jsonl"
OUTPUT_DPO = DATA_DIR / "v8-dpo-merged.jsonl"
random.seed(42)
# ─── SFT Sources ──────────────────────────────────────────────────────────────
SFT_SOURCES: list[dict[str, Any]] = [
# Tier 1 — Gold: Rene's real posts (weight × 3)
{
"file": "v8-real-posts-sft.jsonl",
"weight": 3.0,
"tier": 1,
"description": "Rene's real blog posts (human written)",
},
# Tier 2 — Good: Claude-generated, validated topics
{
"file": "v7-generated-sft.jsonl",
"weight": 1.0,
"tier": 2,
"description": "v7 Claude-generated optical/networking blogs",
},
{
"file": "v7-ripe-apnic-sft.jsonl",
"weight": 1.0,
"tier": 2,
"description": "RIPE/APNIC BGP & routing content (v7 ingest)",
},
# Tier 2 — Real v6 TIP outputs (good length, real style)
{
"file": "v8-v6blogs-sft.jsonl",
"weight": 2.0,
"tier": 2,
"description": "Real fo-blog-v6 outputs (700-1100w, actual optical networking voice)",
"optional": True,
},
# Tier 3 — External: crawled + Claude rewritten
{
"file": "v8-external-sft.jsonl",
"weight": 1.5,
"tier": 3,
"description": "External: APNIC Blog / RIPE Labs / potaroo.net / Cloudflare",
"optional": True,
},
# Tier 2 — Legacy: pre-v7 curated datasets (lower priority, check quality)
{
"file": "nanog-ripe-labs-content.jsonl",
"weight": 0.5,
"tier": 2,
"description": "NANOG/RIPE Labs curated (pre-v7)",
"optional": True,
"needs_conversion": True, # may use different schema
},
{
"file": "rir-infrastructure-data.jsonl",
"weight": 0.5,
"tier": 2,
"description": "RIR infrastructure data (pre-v7)",
"optional": True,
"needs_conversion": True,
},
]
# ─── DPO Sources ──────────────────────────────────────────────────────────────
DPO_SOURCES: list[dict[str, Any]] = [
{
"file": "v7-dpo-pairs.jsonl",
"description": "v7 DPO pairs (5 rejection strategies, synthetic)",
},
{
"file": "v8-v6blogs-dpo.jsonl",
"description": "Real v6 too-long posts as rejected + Claude-rewritten chosen (real failures)",
"optional": True,
},
{
"file": "v8-quality-dpo.jsonl",
"description": "v8 real quality labels (human good/bad scoring)",
"optional": True,
},
]
# ─── Required SFT fields ──────────────────────────────────────────────────────
REQUIRED_SFT_FIELDS = {"system_prompt", "input_text", "output_text"}
REQUIRED_DPO_FIELDS = {"prompt", "chosen", "rejected"}
def load_sft_file(path: Path, needs_conversion: bool = False) -> list[dict]:
"""Load + validate SFT JSONL. Attempt field mapping for legacy formats."""
records = []
with open(path, encoding="utf-8") as f:
for i, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
item = json.loads(line)
except json.JSONDecodeError:
continue
# Check required fields
if REQUIRED_SFT_FIELDS.issubset(item.keys()):
records.append(item)
continue
if not needs_conversion:
continue
# Try to map legacy schemas → standard
converted = _try_convert_legacy_sft(item)
if converted:
records.append(converted)
return records
def _try_convert_legacy_sft(item: dict) -> dict | None:
"""Try to map legacy JSONL formats to standard SFT schema."""
# Schema: {instruction, input, output}
if "instruction" in item and "output" in item:
return {
"system_prompt": item.get("instruction", ""),
"input_text": item.get("input", ""),
"output_text": item["output"],
"meta": item.get("meta", {}),
}
# Schema: {prompt, completion}
if "prompt" in item and "completion" in item:
return {
"system_prompt": "",
"input_text": item["prompt"],
"output_text": item["completion"],
"meta": item.get("meta", {}),
}
# Schema: {messages: [{role, content}]}
if "messages" in item:
msgs = item["messages"]
sys_msg = next((m["content"] for m in msgs if m.get("role") == "system"), "")
user_msg = next((m["content"] for m in msgs if m.get("role") == "user"), "")
asst_msg = next((m["content"] for m in msgs if m.get("role") == "assistant"), "")
if user_msg and asst_msg:
return {
"system_prompt": sys_msg,
"input_text": user_msg,
"output_text": asst_msg,
"meta": item.get("meta", {}),
}
return None
def validate_sft_record(record: dict) -> bool:
"""Quality gate for SFT records."""
output = record.get("output_text", "")
words = len(output.split())
if words < 200:
return False
if not record.get("input_text"):
return False
return True
def duplicate_by_weight(records: list[dict], weight: float) -> list[dict]:
"""Duplicate records to approximate their training weight."""
if weight <= 1.0:
return records
# Integer duplications + probabilistic remainder
full_copies = int(math.floor(weight))
remainder = weight - full_copies
result = records * full_copies
# Add fractional copies
n_extra = int(len(records) * remainder)
if n_extra > 0:
extra = random.sample(records, min(n_extra, len(records)))
result.extend(extra)
return result
def merge_sft(apply_weights: bool = True) -> dict[str, int]:
"""Merge all SFT sources into OUTPUT_SFT."""
stats: dict[str, int] = {}
all_records: list[dict] = []
for source in SFT_SOURCES:
path = DATA_DIR / source["file"]
if not path.exists():
if source.get("optional"):
print(f" SKIP (optional, not found): {source['file']}")
else:
print(f" MISSING (required): {source['file']}")
continue
records = load_sft_file(path, source.get("needs_conversion", False))
valid = [r for r in records if validate_sft_record(r)]
invalid = len(records) - len(valid)
if invalid:
print(f" {source['file']}: {len(records)} loaded, {invalid} dropped (quality)")
weight = source["weight"] if apply_weights else 1.0
if apply_weights and weight != 1.0:
weighted = duplicate_by_weight(valid, weight)
print(f" Tier {source['tier']} | {source['file']}: {len(valid)}{len(weighted)} (×{weight})")
else:
weighted = valid
print(f" Tier {source['tier']} | {source['file']}: {len(valid)}")
# Tag with source metadata
for r in weighted:
if "meta" not in r:
r["meta"] = {}
r["meta"].setdefault("source_file", source["file"])
r["meta"].setdefault("tier", source["tier"])
all_records.extend(weighted)
stats[source["file"]] = len(valid)
# Shuffle to mix tiers
random.shuffle(all_records)
OUTPUT_SFT.parent.mkdir(parents=True, exist_ok=True)
with open(OUTPUT_SFT, "w", encoding="utf-8") as f:
for r in all_records:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
print(f"\nSFT merged: {len(all_records)} examples → {OUTPUT_SFT}")
return stats
def merge_dpo() -> dict[str, int]:
"""Merge all DPO sources into OUTPUT_DPO."""
stats: dict[str, int] = {}
all_pairs: list[dict] = []
for source in DPO_SOURCES:
path = DATA_DIR / source["file"]
if not path.exists():
if source.get("optional"):
print(f" SKIP (optional): {source['file']}")
else:
print(f" MISSING: {source['file']}")
continue
pairs = []
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
item = json.loads(line)
if REQUIRED_DPO_FIELDS.issubset(item.keys()):
# Ensure chosen != rejected and both are non-empty
if (item["chosen"] != item["rejected"]
and len(item["chosen"].split()) > 50
and len(item["rejected"].split()) > 10):
pairs.append(item)
except json.JSONDecodeError:
pass
print(f" {source['file']}: {len(pairs)} valid pairs")
all_pairs.extend(pairs)
stats[source["file"]] = len(pairs)
random.shuffle(all_pairs)
with open(OUTPUT_DPO, "w", encoding="utf-8") as f:
for p in all_pairs:
f.write(json.dumps(p, ensure_ascii=False) + "\n")
print(f"\nDPO merged: {len(all_pairs)} pairs → {OUTPUT_DPO}")
return stats
def print_stats() -> None:
"""Print dataset statistics without merging."""
print("\n=== v8 Dataset Status ===\n")
print("SFT Sources:")
sft_total = 0
for source in SFT_SOURCES:
path = DATA_DIR / source["file"]
if path.exists():
with open(path) as f:
count = sum(1 for line in f if line.strip())
wc_list = []
with open(path) as f:
for line in f:
if not line.strip():
continue
try:
r = json.loads(line)
wc = r.get("meta", {}).get("word_count") or len(r.get("output_text", "").split())
if wc:
wc_list.append(wc)
except Exception:
pass
avg_wc = int(sum(wc_list) / len(wc_list)) if wc_list else 0
effective = int(count * source["weight"])
sft_total += effective
status = f"{count:4d} examples (×{source['weight']}{effective:4d} effective, avg {avg_wc}w)"
else:
status = "✗ NOT FOUND" + (" (optional)" if source.get("optional") else " ⚠ REQUIRED")
print(f" [{source['tier']}] {source['file']}: {status}")
print(f"\n Total effective SFT: {sft_total} examples")
print("\nDPO Sources:")
dpo_total = 0
for source in DPO_SOURCES:
path = DATA_DIR / source["file"]
if path.exists():
with open(path) as f:
count = sum(1 for line in f if line.strip())
dpo_total += count
print(f"{source['file']}: {count} pairs")
else:
optional = " (optional)" if source.get("optional") else " ⚠ REQUIRED"
print(f"{source['file']}: NOT FOUND{optional}")
print(f"\n Total DPO: {dpo_total} pairs")
# Merged files
print("\nMerged Outputs:")
for out in [OUTPUT_SFT, OUTPUT_DPO]:
if out.exists():
with open(out) as f:
count = sum(1 for line in f if line.strip())
size_mb = out.stat().st_size / 1_048_576
print(f"{out.name}: {count} lines ({size_mb:.1f} MB)")
else:
print(f"{out.name}: not yet generated")
def main() -> None:
parser = argparse.ArgumentParser(description="Merge v8 training datasets")
parser.add_argument("--no-weight", action="store_true",
help="Flat merge without duplication by weight")
parser.add_argument("--stats-only", action="store_true",
help="Print statistics only, do not merge")
parser.add_argument("--sft-only", action="store_true", help="Only merge SFT")
parser.add_argument("--dpo-only", action="store_true", help="Only merge DPO")
args = parser.parse_args()
if args.stats_only:
print_stats()
return
print_stats()
print()
if not args.dpo_only:
print("=== Merging SFT ===")
merge_sft(apply_weights=not args.no_weight)
if not args.sft_only:
print("\n=== Merging DPO ===")
merge_dpo()
print("\n=== Final Stats ===")
print_stats()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,607 @@
#!/usr/bin/env python3
"""
crawl_v8_sources.py Crawlt externe Quellen für v8 Trainingsdaten
Quellen (priorisiert nach technischer Tiefe):
1. APNIC Blog https://blog.apnic.net/feed/ (400-500 Posts)
2. RIPE Labs https://labs.ripe.net/feed/blog/ (300-400 Posts)
3. Geoff Huston https://www.potaroo.net/ispcol/ (500 Artikel)
4. Cloudflare Blog /tag/networking + /tag/bgp (30-50 Posts)
5. ARIN Blog https://www.arin.net/blog/ (bonus)
Für jede Quelle:
- Artikel-URL + Titel + Kategorie extrahieren
- Rohtext herunterladen und bereinigen
- Claude CLI rewritet 700-1000w, hook + Sektionen + Takeaways
- Als SFT JSONL speichern (weight: 1.5)
Output: ~/transceiver-training-data/v8-external-sft.jsonl
Usage:
python3 scripts/crawl_v8_sources.py # alle Quellen
python3 scripts/crawl_v8_sources.py --source apnic # nur APNIC
python3 scripts/crawl_v8_sources.py --max 50 # max 50 Artikel
python3 scripts/crawl_v8_sources.py --dry-run # nur URLs anzeigen
"""
from __future__ import annotations
import argparse
import json
import logging
import re
import subprocess
import time
import urllib.request
import urllib.error
from html.parser import HTMLParser
from pathlib import Path
from html import unescape as html_unescape
from typing import NamedTuple
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)
OUTPUT_FILE = Path.home() / "transceiver-training-data" / "v8-external-sft.jsonl"
PROGRESS_FILE = Path.home() / "transceiver-training-data" / "v8-crawl-progress.json"
TIMEOUT = 30
CLAUDE_TIMEOUT = 180
USER_AGENT = "Mozilla/5.0 (compatible; research-bot/1.0; training-data-collection)"
SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, BGP routing, and network infrastructure.
STRICT CONSTRAINTS Follow exactly, no exceptions:
- LENGTH: 7001000 words. Count carefully. Stop at 1000 words maximum.
- STRUCTURE (mandatory, in this order):
1. HOOK paragraph 23 sentences stating the problem this post addresses
2. Technical sections 34 H2 sections covering the topic in depth
3. PRACTICAL TAKEAWAYS exactly 3 bullet points, actionable
- TOPIC DISCIPLINE: Write ONLY about the exact topic in the source material. Zero drift.
- NO REPETITION: Every sentence must add new information. No restating.
- VOICE: Confident, direct. No hedging phrases like "it's worth noting".
- AUDIENCE: Network engineers and IT professionals. Assume technical fluency.
- FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms.
Do not summarize what you are about to write. Start with the hook directly.
Do not copy from the source rewrite completely in your own words."""
class Article(NamedTuple):
title: str
url: str
source: str
category: str
# ─── HTML Utilities ───────────────────────────────────────────────────────────
def fetch_url(url: str, timeout: int = TIMEOUT) -> str | None:
"""Fetch URL, return text or None."""
try:
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
with urllib.request.urlopen(req, timeout=timeout) as resp:
charset = resp.headers.get_content_charset() or "utf-8"
return resp.read().decode(charset, errors="ignore")
except Exception as exc:
logger.warning("Fetch failed %s: %s", url, exc)
return None
def strip_tags(html: str) -> str:
"""Strip HTML tags → plain text, decode all HTML entities."""
text = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL)
text = re.sub(r'<style[^>]*>.*?</style>', '', text, flags=re.DOTALL)
text = re.sub(r'<[^>]+>', ' ', text)
text = html_unescape(text) # handles &amp;, &#8216;, &#x27;, etc.
text = text.replace('\xa0', ' ') # non-breaking space
text = re.sub(r'\s+', ' ', text)
return text.strip()
def find_balanced_div(html: str, start_pos: int) -> str:
"""Extract content of a <div> starting at start_pos, handling nested divs."""
depth = 0
pos = start_pos
content_start = html.find('>', start_pos)
if content_start < 0:
return ""
content_start += 1
pos = content_start
while pos < len(html):
open_m = re.search(r'<div[\s>]', html[pos:], re.IGNORECASE)
close_m = re.search(r'</div>', html[pos:], re.IGNORECASE)
if not close_m:
break
open_pos = open_m.start() if open_m else len(html)
close_pos = close_m.start()
if open_pos < close_pos:
depth += 1
pos += open_pos + 4
else:
if depth == 0:
return html[content_start:pos + close_pos]
depth -= 1
pos += close_pos + 6
return ""
def find_content_block(html: str, *class_or_id_patterns: str) -> str:
"""Find a div by class or id pattern (handles any attribute order), return its content."""
for pattern in class_or_id_patterns:
# Match <div ...pattern...> regardless of attribute order
m = re.search(
rf'<div[^>]+(?:class|id)="[^"]*{re.escape(pattern)}[^"]*"[^>]*>',
html, re.IGNORECASE,
)
if m:
content = find_balanced_div(html, m.start())
if len(content.split()) > 50:
return content
return ""
def extract_article_text(html: str, source: str) -> str:
"""Extract main content area based on source-specific selectors."""
# Potaroo uses old-school table layout — extract all <p> paragraphs
if source == "potaroo":
paragraphs = re.findall(r'<p[^>]*>(.*?)</p>', html, re.DOTALL | re.IGNORECASE)
# Filter: skip short nav-like paragraphs, keep substantive text
long_paras = [strip_tags(p) for p in paragraphs if len(strip_tags(p).split()) > 20]
if long_paras:
return "\n\n".join(long_paras)
return ""
# Source-specific content div identifiers
source_patterns: dict[str, list[str]] = {
"apnic": ["entry-content", "article-content", "post-content"],
"ripe": ["article-body", "entry-content", "ripe-article"],
"cloudflare": ["post-content", "article-content", "entry-content"],
"arin": ["entry-content", "post-content", "article-body"],
}
patterns = source_patterns.get(source, ["entry-content", "article-content"])
# Try div-based extraction first (handles nested divs correctly)
raw = find_content_block(html, *patterns)
# Fallback: <article> tag
if not raw:
m = re.search(r'<article[^>]*>(.*?)</article>', html, re.DOTALL | re.IGNORECASE)
if m:
raw = m.group(1)
# Fallback: <main> tag
if not raw:
m = re.search(r'<main[^>]*>(.*?)</main>', html, re.DOTALL | re.IGNORECASE)
if m:
raw = m.group(1)
if not raw or len(raw.split()) < 30:
return ""
# Convert to markdown-ish plain text
for level in [3, 2, 1]:
raw = re.sub(
rf'<h{level}[^>]*>(.*?)</h{level}>',
lambda m: f"\n{'#'*level} {strip_tags(m.group(1))}\n",
raw, flags=re.DOTALL | re.IGNORECASE
)
raw = re.sub(r'<li[^>]*>(.*?)</li>', r'\n- \1', raw, flags=re.DOTALL)
raw = re.sub(r'<p[^>]*>', '\n', raw)
raw = re.sub(r'</p>', '\n', raw)
raw = re.sub(r'<br\s*/?>', '\n', raw)
raw = re.sub(r'<(?:strong|b)[^>]*>(.*?)</(?:strong|b)>', r'**\1**', raw, flags=re.DOTALL)
text = strip_tags(raw)
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r'[ \t]+', ' ', text)
return text.strip()
# ─── RSS Parser ───────────────────────────────────────────────────────────────
def parse_rss(xml: str, source: str, max_items: int = 200) -> list[Article]:
"""Parse RSS/Atom feed → list of Articles."""
articles = []
# Try <item> (RSS 2.0)
items = re.findall(r'<item>(.*?)</item>', xml, re.DOTALL)
if not items:
# Try <entry> (Atom)
items = re.findall(r'<entry>(.*?)</entry>', xml, re.DOTALL)
for item in items[:max_items]:
title_m = re.search(r'<title[^>]*>(?:<!\[CDATA\[)?(.*?)(?:\]\]>)?</title>', item, re.DOTALL)
link_m = (re.search(r'<link[^>]*/>', item) or
re.search(r'<link[^>]*>(.*?)</link>', item, re.DOTALL) or
re.search(r'<guid[^>]*>(https?://[^<]+)</guid>', item))
cat_m = re.search(r'<category[^>]*>(?:<!\[CDATA\[)?(.*?)(?:\]\]>)?</category>', item, re.DOTALL)
if not title_m or not link_m:
continue
title = strip_tags(title_m.group(1)).strip()
# For <link /> self-closing, get href attribute
link_raw = link_m.group(0)
href_m = re.search(r'href="([^"]+)"', link_raw) or re.search(r'>(https?://[^<]+)<', link_raw)
url = href_m.group(1) if href_m else link_m.group(1) if link_m.lastindex else ""
url = url.strip()
category = strip_tags(cat_m.group(1)).strip() if cat_m else "networking"
if title and url.startswith("http"):
articles.append(Article(title=title, url=url, source=source, category=category))
logger.info("RSS parsed %d articles from %s", len(articles), source)
return articles
# ─── Source-specific fetchers ─────────────────────────────────────────────────
def fetch_apnic(max_items: int = 200) -> list[Article]:
"""APNIC Blog via RSS."""
articles = []
# APNIC has paginated RSS - try multiple pages
for page in range(1, 6):
url = f"https://blog.apnic.net/feed/?paged={page}" if page > 1 else "https://blog.apnic.net/feed/"
xml = fetch_url(url)
if not xml:
break
page_articles = parse_rss(xml, "apnic", max_items)
articles.extend(page_articles)
if len(articles) >= max_items or len(page_articles) < 10:
break
time.sleep(1)
return articles[:max_items]
def fetch_ripe_labs(max_items: int = 200) -> list[Article]:
"""RIPE Labs via RSS (feed.xml)."""
feeds = [
"https://labs.ripe.net/feed.xml",
"https://labs.ripe.net/feed/",
"https://labs.ripe.net/Members/feed/",
]
for feed_url in feeds:
xml = fetch_url(feed_url)
if xml and len(xml) > 500:
articles = parse_rss(xml, "ripe", max_items)
if articles:
return articles[:max_items]
time.sleep(1)
return []
def fetch_potaroo(max_items: int = 100) -> list[Article]:
"""Geoff Huston's potaroo.net ISPCOL column index.
Articles are linked as relative hrefs like "2026-04/nznog26.html" from
https://www.potaroo.net/ispcol/index.html
"""
base_url = "https://www.potaroo.net/ispcol"
html = fetch_url(f"{base_url}/index.html") or fetch_url(f"{base_url}/")
if not html:
logger.warning("Could not fetch potaroo.net index")
return []
articles = []
seen: set[str] = set()
# Pattern: href="YYYY-MM/slug.html" followed (nearby) by link text
# The index lists articles as: <a href="2026-04/nznog26.html">Title</a>
for m in re.finditer(r'href="(\d{4}-\d{2}/[^"]+\.html)"[^>]*>([^<]{5,150})<', html, re.IGNORECASE):
rel_path, title_raw = m.group(1), m.group(2).strip()
url = f"{base_url}/{rel_path}"
title = html_unescape(title_raw).strip()
if url not in seen and len(title) > 5:
seen.add(url)
articles.append(Article(title=title, url=url, source="potaroo", category="bgp-routing"))
if len(articles) >= max_items:
break
# Fallback: bare date-path links without visible anchor text
if not articles:
for m in re.finditer(r'href="(\d{4}-\d{2}/[^"]+\.html)"', html, re.IGNORECASE):
rel_path = m.group(1)
url = f"{base_url}/{rel_path}"
slug = rel_path.split("/")[-1].replace(".html", "").replace("-", " ").title()
if url not in seen:
seen.add(url)
articles.append(Article(title=f"Geoff Huston: {slug}", url=url, source="potaroo", category="bgp-routing"))
if len(articles) >= max_items:
break
logger.info("Potaroo: found %d articles", len(articles))
return articles[:max_items]
def fetch_cloudflare(max_items: int = 60) -> list[Article]:
"""Cloudflare blog — networking and BGP tags."""
articles = []
tags = ["bgp", "routing", "dns", "tcp", "ddos", "ipv6"]
for tag in tags:
# Cloudflare has a JSON API for blog listing
api_url = f"https://blog.cloudflare.com/tag/{tag}/"
html = fetch_url(api_url)
if not html:
continue
# Extract article links from blog listing
for m in re.finditer(
r'href="(/[a-z0-9-]{10,80}/)"[^>]*>.*?<h\d[^>]*>([^<]{10,150})</h\d>',
html, re.DOTALL | re.IGNORECASE
):
url = f"https://blog.cloudflare.com{m.group(1)}"
title = strip_tags(m.group(2)).strip()
if url not in [a.url for a in articles] and title:
articles.append(Article(title=title, url=url, source="cloudflare", category=tag))
if len(articles) >= max_items:
break
if len(articles) >= max_items:
break
time.sleep(1)
logger.info("Cloudflare: found %d articles", len(articles))
return articles[:max_items]
def fetch_arin_blog(max_items: int = 50) -> list[Article]:
"""ARIN blog via RSS (feed.xml)."""
feeds = [
"https://www.arin.net/blog/feed.xml",
"https://www.arin.net/blog/feed/",
"https://www.arin.net/feed/",
]
for feed_url in feeds:
xml = fetch_url(feed_url)
if xml and len(xml) > 500:
articles = parse_rss(xml, "arin", max_items)
if articles:
return articles[:max_items]
time.sleep(0.5)
return []
# ─── Category → audience mapping ─────────────────────────────────────────────
def category_to_audience(category: str, source: str) -> str:
cat_lower = category.lower()
if any(k in cat_lower for k in ["bgp", "routing", "rpki", "aspa", "irr", "peering"]):
return "network engineers and NOC operators managing BGP and routing infrastructure"
elif any(k in cat_lower for k in ["dns", "dnssec", "resolver"]):
return "network engineers and infrastructure operators managing DNS services"
elif any(k in cat_lower for k in ["security", "ddos", "attack", "vulnerab"]):
return "network security engineers and SOC operators"
elif any(k in cat_lower for k in ["ipv6", "ipv4", "address"]):
return "network architects and engineers planning IP address strategy"
elif any(k in cat_lower for k in ["datacenter", "optical", "transceiver", "400g"]):
return "network engineers and IT professionals who evaluate and operate optical infrastructure"
else:
return "network engineers and infrastructure operators"
# ─── Claude rewrite ───────────────────────────────────────────────────────────
def rewrite_with_claude(
title: str,
raw_text: str,
audience: str,
timeout: int = CLAUDE_TIMEOUT,
) -> str | None:
"""Use Claude CLI to rewrite raw article text in our blog format."""
if len(raw_text.split()) < 100:
return None
# Truncate very long source articles (keep first ~1500 words for context)
words = raw_text.split()
if len(words) > 1500:
raw_text = " ".join(words[:1500]) + "\n\n[Source truncated]"
prompt = (
f"Write a blog post on this topic.\n\n"
f"**Topic:** {title}\n\n"
f"**Target audience:** {audience}\n\n"
f"**Source material for reference (DO NOT COPY — rewrite completely):**\n\n"
f"{raw_text}\n\n"
f"Remember: 7001000 words, hook + technical sections + 3 takeaways. "
f"Stay strictly on-topic. No filler. Start writing now."
)
try:
result = subprocess.run(
["claude", "--print", "--system-prompt", SYSTEM_PROMPT, "-p", prompt],
capture_output=True,
text=True,
timeout=timeout,
)
if result.returncode != 0 or not result.stdout.strip():
logger.warning("Claude returned error: %s", result.stderr[:200])
return None
output = result.stdout.strip()
word_count = len(output.split())
if word_count < 400 or output.startswith("I "):
return None
return output
except subprocess.TimeoutExpired:
logger.warning("Claude timeout for: %s", title[:50])
return None
except Exception as exc:
logger.warning("Claude error: %s", exc)
return None
# ─── Main crawler loop ────────────────────────────────────────────────────────
def load_progress() -> set[str]:
"""Load already-processed URLs."""
if not PROGRESS_FILE.exists():
return set()
try:
data = json.loads(PROGRESS_FILE.read_text())
return set(data.get("done", []))
except Exception:
return set()
def save_progress(done_urls: set[str]) -> None:
PROGRESS_FILE.write_text(json.dumps({"done": list(done_urls)}, ensure_ascii=False))
def crawl_source(
source: str,
articles: list[Article],
done_urls: set[str],
max_articles: int,
dry_run: bool,
) -> tuple[int, int]:
"""Crawl + rewrite articles from one source. Returns (saved, skipped)."""
saved = 0
skipped = 0
count = 0
OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True)
for article in articles:
if count >= max_articles:
break
if article.url in done_urls:
logger.debug("SKIP (done): %s", article.url)
skipped += 1
continue
count += 1
logger.info("[%s] %d/%d: %s", source.upper(), count, min(len(articles), max_articles), article.title[:60])
if dry_run:
print(f" DRY: {article.url}")
continue
# Fetch article HTML
html = fetch_url(article.url)
if not html:
logger.warning(" SKIP (fetch failed)")
done_urls.add(article.url)
skipped += 1
continue
# Extract text
raw_text = extract_article_text(html, source)
if len(raw_text.split()) < 100:
logger.warning(" SKIP (too short: %d words)", len(raw_text.split()))
done_urls.add(article.url)
skipped += 1
continue
# Rewrite with Claude
audience = category_to_audience(article.category, source)
rewritten = rewrite_with_claude(article.title, raw_text, audience)
if not rewritten:
logger.warning(" SKIP (claude failed)")
done_urls.add(article.url)
skipped += 1
continue
word_count = len(rewritten.split())
logger.info(" OK: %dw | %s", word_count, article.title[:50])
record = {
"system_prompt": SYSTEM_PROMPT,
"input_text": (
f"Write a blog post on the following topic:\n\n"
f"**Topic:** {article.title}\n\n"
f"**Target audience:** {audience}\n\n"
f"Remember: 7001000 words, hook + technical sections + 3 takeaways. "
f"Stay strictly on-topic. No filler. Start writing now."
),
"output_text": rewritten,
"meta": {
"title": article.title,
"source_url": article.url,
"source": source,
"category": article.category,
"word_count": word_count,
"quality": "external_rewritten",
"weight": 1.5,
"dataset_version": "v8",
},
}
with open(OUTPUT_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
done_urls.add(article.url)
save_progress(done_urls)
saved += 1
# Rate limiting — be a good citizen
time.sleep(2)
return saved, skipped
# ─── Entry point ──────────────────────────────────────────────────────────────
SOURCES = {
"apnic": fetch_apnic,
"ripe": fetch_ripe_labs,
"potaroo": fetch_potaroo,
"cloudflare": fetch_cloudflare,
# "arin": fetch_arin_blog, # No accessible RSS feed found
}
def main() -> None:
parser = argparse.ArgumentParser(description="Crawl external networking blogs for v8 training data")
parser.add_argument("--source", choices=list(SOURCES.keys()) + ["all"], default="all",
help="Which source(s) to crawl (default: all)")
parser.add_argument("--max", type=int, default=100,
help="Max articles per source (default: 100)")
parser.add_argument("--dry-run", action="store_true",
help="Show URLs without downloading or rewriting")
args = parser.parse_args()
done_urls = load_progress()
logger.info("Resuming: %d URLs already processed", len(done_urls))
active_sources = list(SOURCES.keys()) if args.source == "all" else [args.source]
# Filter only existing sources (arin removed)
active_sources = [s for s in active_sources if s in SOURCES]
total_saved = 0
for source in active_sources:
logger.info("=== Fetching article list: %s ===", source.upper())
try:
articles = SOURCES[source](args.max)
except Exception as exc:
logger.error("Failed to fetch %s articles: %s", source, exc)
continue
if not articles:
logger.warning("No articles found for %s", source)
continue
saved, skipped = crawl_source(source, articles, done_urls, args.max, args.dry_run)
logger.info("%s done: saved=%d skipped=%d", source.upper(), saved, skipped)
total_saved += saved
if not args.dry_run:
total_lines = 0
if OUTPUT_FILE.exists():
with open(OUTPUT_FILE) as f:
total_lines = sum(1 for _ in f)
logger.info("=== DONE: total saved=%d | output total=%d lines ===", total_saved, total_lines)
logger.info("Output: %s", OUTPUT_FILE)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,293 @@
#!/usr/bin/env python3
"""
label_v7_quality.py Bewertet v7-generierte Blogs und erstellt DPO-Labels
Claude liest jeden generierten Blog und bewertet ihn nach 5 Kriterien:
1. Wortanzahl (700-1000 = gut, sonst schlecht)
2. Hook vorhanden (klares Einstiegsproblem)
3. Technische Tiefe (nicht generisch)
4. Struktur (## Headers, 3 Takeaways)
5. Kein Drift (bleibt beim Thema)
Aus gut/schlecht Bewertungen:
- "gut" + "schlecht" vom gleichen Thema DPO-Pair
- Oder: "schlecht" Claude schreibt bessere Version DPO-Pair
Output:
~/transceiver-training-data/v8-quality-dpo.jsonl
Usage:
python3 scripts/label_v7_quality.py
python3 scripts/label_v7_quality.py --input v7-generated-sft.jsonl
python3 scripts/label_v7_quality.py --max 50 --rewrite-bad
"""
from __future__ import annotations
import argparse
import json
import logging
import re
import subprocess
from pathlib import Path
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)
DATA_DIR = Path.home() / "transceiver-training-data"
DEFAULT_INPUT = DATA_DIR / "v7-generated-sft.jsonl"
OUTPUT_FILE = DATA_DIR / "v8-quality-dpo.jsonl"
PROGRESS_FILE = DATA_DIR / "v8-quality-progress.json"
CLAUDE_TIMEOUT = 120
SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, and network infrastructure.
STRICT CONSTRAINTS Follow exactly, no exceptions:
- LENGTH: 7001000 words. Count carefully. Stop at 1000 words maximum.
- STRUCTURE (mandatory, in this order):
1. HOOK paragraph 23 sentences stating the problem this post addresses
2. Technical sections 34 H2 sections covering the topic in depth
3. PRACTICAL TAKEAWAYS exactly 3 bullet points, actionable
- TOPIC DISCIPLINE: Write ONLY about the exact topic requested. Zero drift.
- NO REPETITION: Every sentence must add new information. No restating.
- VOICE: Confident, direct. No hedging phrases like "it's worth noting".
- AUDIENCE: Network engineers and IT professionals. Assume technical fluency.
- FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms.
Do not summarize what you are about to write. Start with the hook directly."""
JUDGE_PROMPT = """Evaluate this blog post against the following criteria. Respond with ONLY a JSON object — no explanation, no markdown.
Criteria:
1. word_count_ok: true if 700-1000 words (count the words)
2. has_hook: true if first paragraph clearly states a real problem (not generic intro)
3. technical_depth: true if contains specific technical details (numbers, standards, product names)
4. good_structure: true if has ## H2 headers AND ends with 3 bullet point takeaways
5. on_topic: true if stays focused on the exact topic throughout (no generic drift)
6. overall: "good" if at least 4 of 5 criteria pass, otherwise "bad"
7. issues: list of failed criteria names
Response format exactly:
{"word_count_ok": true/false, "has_hook": true/false, "technical_depth": true/false, "good_structure": true/false, "on_topic": true/false, "overall": "good"/"bad", "issues": ["..."]}
Blog post to evaluate:
"""
def load_progress() -> set[str]:
if not PROGRESS_FILE.exists():
return set()
try:
return set(json.loads(PROGRESS_FILE.read_text()).get("done", []))
except Exception:
return set()
def save_progress(done: set[str]) -> None:
PROGRESS_FILE.write_text(json.dumps({"done": list(done)}))
def judge_blog(blog_text: str) -> dict | None:
"""Ask Claude to evaluate a blog post quality. Returns parsed JSON or None."""
prompt = JUDGE_PROMPT + blog_text
try:
result = subprocess.run(
["claude", "--print", "-p", prompt],
capture_output=True, text=True, timeout=CLAUDE_TIMEOUT,
)
if result.returncode != 0 or not result.stdout.strip():
return None
output = result.stdout.strip()
# Extract JSON from response (Claude might wrap it)
json_match = re.search(r'\{[^{}]+\}', output, re.DOTALL)
if not json_match:
return None
return json.loads(json_match.group(0))
except (subprocess.TimeoutExpired, json.JSONDecodeError, Exception):
return None
def quick_check(blog_text: str) -> dict:
"""Fast deterministic pre-check (no Claude call) to filter obvious cases."""
word_count = len(blog_text.split())
has_h2 = bool(re.search(r'^## ', blog_text, re.MULTILINE))
has_bullets = bool(re.search(r'^[-*•] ', blog_text, re.MULTILINE))
bullet_count = len(re.findall(r'^[-*•] ', blog_text, re.MULTILINE))
starts_ok = not blog_text.strip().startswith("In this ")
has_takeaways = bool(re.search(r'takeaway|practical|key point', blog_text, re.IGNORECASE))
# Deterministic verdict (no Claude needed for clear failures)
clear_bad = word_count < 500 or word_count > 1500 or not has_h2
clear_good = (700 <= word_count <= 1050 and has_h2 and bullet_count >= 3
and starts_ok)
return {
"word_count": word_count,
"has_h2": has_h2,
"has_bullets": has_bullets,
"bullet_count": bullet_count,
"clear_bad": clear_bad,
"clear_good": clear_good,
"needs_claude": not clear_bad and not clear_good,
}
def rewrite_for_chosen(title: str, input_text: str) -> str | None:
"""Use Claude to write a high-quality version (the 'chosen' half of DPO pair)."""
try:
result = subprocess.run(
["claude", "--print", "--system-prompt", SYSTEM_PROMPT, "-p", input_text],
capture_output=True, text=True, timeout=180,
)
if result.returncode != 0 or not result.stdout.strip():
return None
output = result.stdout.strip()
wc = len(output.split())
if wc < 400 or wc > 1500:
return None
return output
except Exception:
return None
def process_examples(
input_file: Path,
max_items: int | None,
rewrite_bad: bool,
) -> None:
if not input_file.exists():
logger.error("Input file not found: %s", input_file)
return
examples = []
with open(input_file, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
examples.append(json.loads(line))
except json.JSONDecodeError:
pass
if max_items:
examples = examples[:max_items]
logger.info("Processing %d examples from %s", len(examples), input_file.name)
done = load_progress()
stats = {"good": 0, "bad": 0, "dpo_pairs": 0, "skipped": 0}
OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(OUTPUT_FILE, "a", encoding="utf-8") as out_f:
for i, item in enumerate(examples):
topic = item.get("meta", {}).get("topic", "")[:60]
output_text = item.get("output_text", "")
input_text = item.get("input_text", "")
system = item.get("system_prompt", SYSTEM_PROMPT)
item_id = f"{input_file.stem}:{i}"
if item_id in done:
stats["skipped"] += 1
continue
# ── Fast pre-check ──
check = quick_check(output_text)
logger.info("[%03d/%03d] %dw | %s", i + 1, len(examples), check["word_count"], topic)
if check["clear_good"]:
verdict = "good"
logger.info(" → CLEAR GOOD (deterministic)")
elif check["clear_bad"]:
verdict = "bad"
logger.info(" → CLEAR BAD (deterministic: %dw, h2=%s)", check["word_count"], check["has_h2"])
else:
# Ask Claude to judge
judgment = judge_blog(output_text)
if judgment is None:
logger.warning(" → SKIP (judge failed)")
done.add(item_id)
stats["skipped"] += 1
continue
verdict = judgment.get("overall", "bad")
issues = judgment.get("issues", [])
logger.info("%s | issues: %s", verdict.upper(), issues)
if verdict == "good":
stats["good"] += 1
else:
stats["bad"] += 1
if rewrite_bad:
# Create DPO pair: bad original → rewritten chosen
logger.info(" Rewriting bad post for DPO...")
chosen = rewrite_for_chosen(topic, input_text)
if chosen and chosen != output_text:
prompt = (
f"<|im_start|>system\n{system}<|im_end|>\n"
f"<|im_start|>user\n{input_text}<|im_end|>\n"
)
pair = {
"prompt": prompt,
"chosen": chosen,
"rejected": output_text,
"meta": {
"topic": topic,
"rejection_strategy": "quality_labeled_bad",
"chosen_words": len(chosen.split()),
"rejected_words": check["word_count"],
"verdict": verdict,
"dataset_version": "v8",
},
}
out_f.write(json.dumps(pair, ensure_ascii=False) + "\n")
out_f.flush()
stats["dpo_pairs"] += 1
logger.info(" DPO pair saved: %dw chosen vs %dw rejected",
len(chosen.split()), check["word_count"])
done.add(item_id)
save_progress(done)
logger.info(
"Done: good=%d bad=%d dpo_pairs=%d skipped=%d",
stats["good"], stats["bad"], stats["dpo_pairs"], stats["skipped"],
)
logger.info("Output: %s", OUTPUT_FILE)
# Print summary
if OUTPUT_FILE.exists():
with open(OUTPUT_FILE) as f:
total = sum(1 for _ in f)
logger.info("Total DPO pairs in output: %d", total)
def main() -> None:
parser = argparse.ArgumentParser(description="Label v7 posts for quality DPO pairs")
parser.add_argument(
"--input", type=Path, default=DEFAULT_INPUT,
help=f"Input JSONL with SFT examples (default: {DEFAULT_INPUT})",
)
parser.add_argument(
"--max", type=int, default=None,
help="Max examples to process",
)
parser.add_argument(
"--rewrite-bad", action="store_true",
help="Rewrite bad posts with Claude to create DPO pairs (slower, costs more)",
)
args = parser.parse_args()
process_examples(args.input, args.max, args.rewrite_bad)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,228 @@
#!/usr/bin/env python3
"""
parse_real_posts.py Konvertiert echte Blog-HTML v8 SFT Trainingsdaten
Extrahiert Titel + Inhalt aus Ghost CMS HTML, baut daraus hochwertige
Training-Beispiele mit dem v7/v8 System Prompt.
Diese echten Posts sind GOLD Rene's eigene Stimme, echte Expertise,
keine AI-Halluzinationen. Werden als Top-Priorität in v8 gewichtet.
Input: ~/transceiver-training-data/v8-real-posts/*.html
Output: ~/transceiver-training-data/v8-real-posts-sft.jsonl
"""
from __future__ import annotations
import json
import re
from pathlib import Path
POSTS_DIR = Path.home() / "transceiver-training-data" / "v8-real-posts"
OUTPUT_FILE = Path.home() / "transceiver-training-data" / "v8-real-posts-sft.jsonl"
# Welche Posts sind für Blog-Training relevant (nicht rein persönlich/tool-spezifisch)
# Alle nehmen — auch nicht-Transceiver Posts zeigen Schreibstil + Struktur
SKIP_SLUGS = set() # nichts überspringen — alle Posts zeigen Renes Voice
SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, and network infrastructure.
STRICT CONSTRAINTS Follow exactly, no exceptions:
- LENGTH: 7001000 words. Count carefully. Stop at 1000 words maximum.
- STRUCTURE (mandatory, in this order):
1. HOOK paragraph 23 sentences stating the problem this post addresses
2. Technical sections 34 H2 sections covering the topic in depth
3. PRACTICAL TAKEAWAYS exactly 3 bullet points, actionable
- TOPIC DISCIPLINE: Write ONLY about the exact topic requested. Zero drift.
- NO REPETITION: Every sentence must add new information. No restating.
- VOICE: Confident, direct. No hedging phrases like "it's worth noting".
- AUDIENCE: Network engineers and IT professionals. Assume technical fluency.
- FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms.
Do not summarize what you are about to write. Start with the hook directly."""
def extract_ghost_content(html: str) -> tuple[str, str] | None:
"""Extrahiert (title, clean_text) aus Ghost CMS HTML."""
# Titel aus og:title oder h1
title_match = (
re.search(r'<meta property="og:title" content="([^"]+)"', html)
or re.search(r'<title>([\s\S]+?)</title>', html)
)
title = title_match.group(1).strip() if title_match else ""
# Multiline title collapse
title = re.sub(r'\s+', ' ', title).strip()
# HTML entities in title
title = (title.replace('&amp;', '&').replace('&lt;', '<').replace('&gt;', '>')
.replace('&quot;', '"').replace('&#39;', "'").replace('&#x27;', "'")
.replace('&nbsp;', ' ').replace('&mdash;', '').replace('&ndash;', ''))
# Ghost appends " | Blog" or " Site Name" — only strip at a pipe or en-dash
# surrounded by spaces (require \s+, NOT \s* to avoid cutting hyphenated words)
title = re.sub(r"\s+[|]\s+.+$", "", title).strip()
# Ghost CMS Content-Selektoren (in Prioritätsreihenfolge)
content_patterns = [
r'<div class="gh-content[^"]*">(.*?)</(?:div|section)>\s*(?:<div|<section|<footer|<aside)',
r'<section class="gh-content[^"]*">(.*?)</section>',
r'class="post-full-content[^"]*"[^>]*>(.*?)</(?:div|section)>',
r'<div class="post-content[^"]*">(.*?)</div>\s*(?:<div class="post|<section|<footer)',
r'<article[^>]*>(.*?)</article>',
]
raw_content = ""
for pat in content_patterns:
m = re.search(pat, html, re.DOTALL | re.IGNORECASE)
if m:
raw_content = m.group(1)
break
if not raw_content:
return None
# Cleanup
raw_content = re.sub(r'<script[^>]*>.*?</script>', '', raw_content, flags=re.DOTALL)
raw_content = re.sub(r'<style[^>]*>.*?</style>', '', raw_content, flags=re.DOTALL)
raw_content = re.sub(r'<noscript[^>]*>.*?</noscript>', '', raw_content, flags=re.DOTALL)
# HTML → Markdown-ähnliches Format
# Headers
for level in [6, 5, 4, 3, 2, 1]:
hashes = "#" * level
raw_content = re.sub(
rf'<h{level}[^>]*>(.*?)</h{level}>',
lambda m: f"\n{hashes} {re.sub('<[^>]+>', '', m.group(1)).strip()}\n",
raw_content,
flags=re.DOTALL | re.IGNORECASE,
)
# Bold/italic
raw_content = re.sub(r'<strong[^>]*>(.*?)</strong>', r'**\1**', raw_content, flags=re.DOTALL)
raw_content = re.sub(r'<b[^>]*>(.*?)</b>', r'**\1**', raw_content, flags=re.DOTALL)
raw_content = re.sub(r'<em[^>]*>(.*?)</em>', r'*\1*', raw_content, flags=re.DOTALL)
# Lists
raw_content = re.sub(r'<li[^>]*>(.*?)</li>', r'\n- \1', raw_content, flags=re.DOTALL)
raw_content = re.sub(r'<[uo]l[^>]*>', '\n', raw_content)
raw_content = re.sub(r'</[uo]l>', '\n', raw_content)
# Paragraphs → newlines
raw_content = re.sub(r'<br\s*/?>', '\n', raw_content)
raw_content = re.sub(r'<p[^>]*>', '\n', raw_content)
raw_content = re.sub(r'</p>', '\n', raw_content)
# Code blocks
raw_content = re.sub(r'<pre[^>]*><code[^>]*>(.*?)</code></pre>',
lambda m: f"\n```\n{m.group(1)}\n```\n",
raw_content, flags=re.DOTALL)
# Alle verbleibenden Tags entfernen
raw_content = re.sub(r'<[^>]+>', ' ', raw_content)
# HTML entities
raw_content = raw_content.replace('&amp;', '&').replace('&lt;', '<').replace('&gt;', '>')
raw_content = raw_content.replace('&nbsp;', ' ').replace('&quot;', '"').replace('&#39;', "'")
raw_content = raw_content.replace('&mdash;', '').replace('&ndash;', '').replace('&hellip;', '')
# Whitespace normalisieren
raw_content = re.sub(r'\n{3,}', '\n\n', raw_content)
raw_content = re.sub(r'[ \t]+', ' ', raw_content)
raw_content = re.sub(r'\n ', '\n', raw_content)
clean = raw_content.strip()
if len(clean.split()) < 200:
return None
return title, clean
def slug_to_topic(slug: str) -> str:
"""Wandelt URL-Slug in lesbaren Topic-String."""
return slug.replace("-", " ").title()
def main() -> None:
html_files = sorted(f for f in POSTS_DIR.glob("*.html")
if f.stat().st_size > 10_000
and f.stem not in SKIP_SLUGS
and re.match(r'^[a-z0-9]', f.stem)) # skip hidden/garbage files
print(f"Parsing {len(html_files)} HTML files...")
results = []
skipped = 0
seen_slugs: set[str] = set()
for fpath in html_files:
if fpath.stem in seen_slugs:
print(f" SKIP (duplicate slug): {fpath.name}")
skipped += 1
continue
seen_slugs.add(fpath.stem)
html = fpath.read_text(errors="ignore")
extracted = extract_ghost_content(html)
if extracted is None:
print(f" SKIP (no content): {fpath.name}")
skipped += 1
continue
title, clean_text = extracted
word_count = len(clean_text.split())
if not title:
title = slug_to_topic(fpath.stem)
print(f" OK: {word_count:4d}w | {title[:60]}")
# Input-Text: genau wie beim Generieren — topic + audience + reminder
# Audience basierend auf Slug-Keywords bestimmen
slug = fpath.stem
if any(k in slug for k in ["shieldx", "claude", "papercortex", "llm", "slop", "sync"]):
audience = "developers and infrastructure engineers building AI-powered tools"
elif any(k in slug for k in ["aspa", "bgp", "peercortex", "infrastructure"]):
audience = "network engineers and NOC operators"
else:
audience = "network engineers and IT professionals who evaluate and operate optical infrastructure"
input_text = (
f"Write a blog post on the following topic:\n\n"
f"**Topic:** {title}\n\n"
f"**Target audience:** {audience}\n\n"
f"Remember: 7001000 words, hook + technical sections + 3 takeaways. "
f"Stay strictly on-topic. No filler. Start writing now."
)
record = {
"system_prompt": SYSTEM_PROMPT,
"input_text": input_text,
"output_text": clean_text,
"meta": {
"title": title,
"slug": slug,
"source": "blog.fichtmueller.org",
"word_count": word_count,
"quality": "human_written",
"weight": 3.0, # 3x Gewichtung in Training — Gold Standard
"dataset_version": "v8",
},
}
results.append(record)
OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
for r in results:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
print(f"\nGespeichert: {len(results)} Posts → {OUTPUT_FILE}")
print(f"Übersprungen: {skipped}")
# Statistik
wcs = [r["meta"]["word_count"] for r in results]
if wcs:
print(f"Word count: min={min(wcs)}, max={max(wcs)}, avg={sum(wcs)//len(wcs)}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,323 @@
#!/usr/bin/env python3
"""
process_v6_blogs.py Verarbeitet echte fo-blog-v6 Outputs als v8 Trainingsdaten
Analysiert 101 v6-generierte Blog-Posts aus /opt/tip/blog-training-data/
und erstellt daraus:
1. SFT records Posts mit 700-1100w direkt als Training-Beispiele
2. DPO pairs Posts >1100w:
rejected = der zu lange Originalpost
chosen = Claude rewritet ihn als saubere 700-1000w Version
Dies sind echte Modell-Failures (nicht synthetisch!) besonders wertvoll für DPO.
Input: ~/transceiver-training-data/v6-tip-blogs/*.md
Output:
~/transceiver-training-data/v8-v6blogs-sft.jsonl (gute Posts als SFT)
~/transceiver-training-data/v8-v6blogs-dpo.jsonl (zu lange Posts als DPO)
Usage:
python3 scripts/process_v6_blogs.py
python3 scripts/process_v6_blogs.py --max-dpo 30 # nur 30 DPO Pairs
python3 scripts/process_v6_blogs.py --sft-only # nur SFT Records
python3 scripts/process_v6_blogs.py --dry-run # Stats only
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import time
from pathlib import Path
BLOGS_DIR = Path.home() / "transceiver-training-data" / "v6-tip-blogs"
SFT_OUTPUT = Path.home() / "transceiver-training-data" / "v8-v6blogs-sft.jsonl"
DPO_OUTPUT = Path.home() / "transceiver-training-data" / "v8-v6blogs-dpo.jsonl"
PROGRESS_FILE = Path.home() / "transceiver-training-data" / "v8-v6blogs-progress.json"
# Word count ranges
GOOD_MIN = 700
GOOD_MAX = 1100
REJECTED_MIN = 1100 # posts above this are "too long" → rejected examples
CLAUDE_TIMEOUT = 180
SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, and network infrastructure.
STRICT CONSTRAINTS Follow exactly, no exceptions:
- LENGTH: 7001000 words. Count carefully. Stop at 1000 words maximum.
- STRUCTURE (mandatory, in this order):
1. HOOK paragraph 23 sentences stating the problem this post addresses
2. Technical sections 34 H2 sections covering the topic in depth
3. PRACTICAL TAKEAWAYS exactly 3 bullet points, actionable
- TOPIC DISCIPLINE: Write ONLY about the exact topic requested. Zero drift.
- NO REPETITION: Every sentence must add new information. No restating.
- VOICE: Confident, direct. No hedging phrases like "it's worth noting".
- AUDIENCE: Network engineers and IT professionals. Assume technical fluency.
- FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms.
Do not summarize what you are about to write. Start with the hook directly."""
def parse_blog_md(path: Path) -> dict | None:
"""Parse a blog markdown file with YAML frontmatter."""
text = path.read_text(encoding="utf-8", errors="ignore")
# Extract YAML frontmatter
frontmatter: dict = {}
content = text
fm_match = re.match(r'^---\s*\n(.*?)\n---\s*\n', text, re.DOTALL)
if fm_match:
fm_text = fm_match.group(1)
content = text[fm_match.end():]
# Parse key: "value" pairs
for line in fm_text.split('\n'):
kv = re.match(r'^(\w+):\s*"?(.+?)"?\s*$', line)
if kv:
frontmatter[kv.group(1)] = kv.group(2).strip('"').strip()
title = frontmatter.get("title", "").strip('"')
if not title:
# Fallback: first H1
h1 = re.search(r'^# (.+)$', content, re.MULTILINE)
title = h1.group(1).strip() if h1 else path.stem.replace("-", " ").title()
slug = frontmatter.get("slug", path.stem)
category = frontmatter.get("category", "optical networking")
word_count = len(content.split())
if word_count < 200:
return None
return {
"title": title,
"slug": slug,
"category": category,
"content": content.strip(),
"word_count": word_count,
"path": str(path),
}
def build_input_text(title: str, audience: str) -> str:
return (
f"Write a blog post on the following topic:\n\n"
f"**Topic:** {title}\n\n"
f"**Target audience:** {audience}\n\n"
f"Remember: 7001000 words, hook + technical sections + 3 takeaways. "
f"Stay strictly on-topic. No filler. Start writing now."
)
def get_audience(category: str) -> str:
cat = category.lower()
if any(k in cat for k in ["fiber", "cabling", "mtp", "mpo", "connector"]):
return "data center engineers and cabling specialists"
elif any(k in cat for k in ["coherent", "dwdm", "zr", "metro"]):
return "network architects and optical engineers designing long-haul links"
elif any(k in cat for k in ["compatible", "procurement", "vendor", "cost", "price"]):
return "network procurement teams and IT managers evaluating transceiver vendors"
else:
return "network engineers and IT professionals who evaluate and operate optical infrastructure"
def rewrite_with_claude(title: str, source_content: str, audience: str) -> str | None:
"""Rewrite a too-long v6 blog as a proper 700-1000w version (chosen)."""
# Truncate source to ~800 words for context
words = source_content.split()
if len(words) > 800:
source_content = " ".join(words[:800]) + "\n\n[Source truncated for reference]"
prompt = (
f"Rewrite this blog post to be 700-1000 words with the correct structure.\n\n"
f"**Topic:** {title}\n"
f"**Target audience:** {audience}\n\n"
f"**Original post (DO NOT COPY — use only as topic reference, rewrite completely):**\n\n"
f"{source_content}\n\n"
f"Rewrite now. 700-1000 words. Hook + technical sections + 3 takeaways. Start directly."
)
try:
result = subprocess.run(
["claude", "--print", "--system-prompt", SYSTEM_PROMPT, "-p", prompt],
capture_output=True, text=True, timeout=CLAUDE_TIMEOUT,
)
if result.returncode != 0 or not result.stdout.strip():
return None
output = result.stdout.strip()
word_count = len(output.split())
if word_count < 400 or word_count > 2000:
return None
return output
except subprocess.TimeoutExpired:
print(f" TIMEOUT: Claude took too long for {title[:50]}")
return None
except Exception as e:
print(f" ERROR: {e}")
return None
def load_progress() -> set[str]:
if not PROGRESS_FILE.exists():
return set()
try:
return set(json.loads(PROGRESS_FILE.read_text()).get("done", []))
except Exception:
return set()
def save_progress(done: set[str]) -> None:
PROGRESS_FILE.write_text(json.dumps({"done": list(done)}))
def main() -> None:
parser = argparse.ArgumentParser(description="Process v6 TIP blogs into v8 training data")
parser.add_argument("--max-dpo", type=int, default=None, help="Max DPO pairs to generate")
parser.add_argument("--sft-only", action="store_true", help="Only create SFT records (skip DPO)")
parser.add_argument("--dpo-only", action="store_true", help="Only create DPO pairs (skip SFT)")
parser.add_argument("--dry-run", action="store_true", help="Statistics only, no output")
args = parser.parse_args()
# Load all blog files
md_files = sorted(BLOGS_DIR.glob("*.md"))
if not md_files:
print(f"No .md files found in {BLOGS_DIR}")
return
blogs = []
for path in md_files:
parsed = parse_blog_md(path)
if parsed:
blogs.append(parsed)
# Categorize
good = [b for b in blogs if GOOD_MIN <= b["word_count"] <= GOOD_MAX]
too_long = [b for b in blogs if b["word_count"] > REJECTED_MIN]
too_short = [b for b in blogs if b["word_count"] < GOOD_MIN]
print(f"=== v6 TIP Blog Analysis ===")
print(f"Total: {len(blogs)} files")
print(f"Good (SFT): {len(good)} files ({GOOD_MIN}-{GOOD_MAX}w)")
print(f"Too long: {len(too_long)} files (>{REJECTED_MIN}w) → DPO rejected")
print(f"Too short: {len(too_short)} files (<{GOOD_MIN}w) → skip")
print()
if args.dry_run:
print("Good posts:")
for b in sorted(good, key=lambda x: x["word_count"]):
print(f" {b['word_count']:4d}w | {b['title'][:60]}")
print("\nToo long (top 10):")
for b in sorted(too_long, key=lambda x: x["word_count"], reverse=True)[:10]:
print(f" {b['word_count']:4d}w | {b['title'][:60]}")
return
done = load_progress()
# ─── Phase 1: SFT Records from good posts ──────────────────────────────────
if not args.dpo_only:
print("=== Phase 1: SFT Records (good posts) ===")
SFT_OUTPUT.parent.mkdir(parents=True, exist_ok=True)
sft_count = 0
with open(SFT_OUTPUT, "w", encoding="utf-8") as f:
for blog in good:
audience = get_audience(blog["category"])
record = {
"system_prompt": SYSTEM_PROMPT,
"input_text": build_input_text(blog["title"], audience),
"output_text": blog["content"],
"meta": {
"title": blog["title"],
"slug": blog["slug"],
"source": "tip-v6-blogs",
"word_count": blog["word_count"],
"quality": "v6_output_good",
"weight": 2.0, # Real model output, good length
"dataset_version": "v8",
},
}
f.write(json.dumps(record, ensure_ascii=False) + "\n")
sft_count += 1
print(f" SFT: {blog['word_count']:4d}w | {blog['title'][:60]}")
print(f"\nSFT saved: {sft_count} records → {SFT_OUTPUT}")
# ─── Phase 2: DPO Pairs from too-long posts ────────────────────────────────
if not args.sft_only:
print("\n=== Phase 2: DPO Pairs (too-long posts → rewrite) ===")
DPO_OUTPUT.parent.mkdir(parents=True, exist_ok=True)
candidates = too_long
if args.max_dpo:
candidates = candidates[:args.max_dpo]
dpo_saved = 0
dpo_skipped = 0
with open(DPO_OUTPUT, "a", encoding="utf-8") as f:
for i, blog in enumerate(candidates):
slug = blog["slug"]
if slug in done:
dpo_skipped += 1
continue
print(f" [{i+1}/{len(candidates)}] {blog['word_count']:4d}w → rewrite: {blog['title'][:55]}")
audience = get_audience(blog["category"])
# Get Claude to write a GOOD version → chosen
chosen = rewrite_with_claude(blog["title"], blog["content"], audience)
if not chosen:
print(f" SKIP (Claude failed)")
done.add(slug)
dpo_skipped += 1
continue
chosen_wc = len(chosen.split())
print(f" OK: chosen={chosen_wc}w (was {blog['word_count']}w)")
# Build DPO prompt (ChatML prefix)
prompt = (
f"<|im_start|>system\n{SYSTEM_PROMPT}<|im_end|>\n"
f"<|im_start|>user\n{build_input_text(blog['title'], audience)}<|im_end|>\n"
)
pair = {
"prompt": prompt,
"chosen": chosen,
"rejected": blog["content"],
"meta": {
"title": blog["title"],
"slug": slug,
"source": "tip-v6-dpo",
"rejection_strategy": "too_long_real",
"chosen_words": chosen_wc,
"rejected_words": blog["word_count"],
"dataset_version": "v8",
},
}
f.write(json.dumps(pair, ensure_ascii=False) + "\n")
f.flush()
done.add(slug)
save_progress(done)
dpo_saved += 1
time.sleep(1)
print(f"\nDPO saved: {dpo_saved} pairs | skipped: {dpo_skipped}{DPO_OUTPUT}")
# ─── Summary ───────────────────────────────────────────────────────────────
print("\n=== Summary ===")
if SFT_OUTPUT.exists():
with open(SFT_OUTPUT) as f:
sft_n = sum(1 for _ in f)
print(f"SFT: {sft_n} records → {SFT_OUTPUT}")
if DPO_OUTPUT.exists():
with open(DPO_OUTPUT) as f:
dpo_n = sum(1 for _ in f)
print(f"DPO: {dpo_n} pairs → {DPO_OUTPUT}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,178 @@
#!/usr/bin/env bash
# ═══════════════════════════════════════════════════════════════════════════════
# run_v8_pipeline.sh — fo-blog-v8 Autopilot Pipeline
#
# Qwen2.5-14B, LoRA r=64, 5 epochs SFT + 2 epochs DPO
#
# Erwartet dass folgende Daten bereit sind:
# ~/transceiver-training-data/v8-real-posts-sft.jsonl (19 real posts)
# ~/transceiver-training-data/v7-generated-sft.jsonl (v7 generated, ≥100)
# ~/transceiver-training-data/v8-v6blogs-sft.jsonl (v6 tip blogs good)
# ~/transceiver-training-data/v8-external-sft.jsonl (crawled external)
# ~/transceiver-training-data/v7-dpo-pairs.jsonl (v7 DPO)
# ~/transceiver-training-data/v8-v6blogs-dpo.jsonl (real v6 failures)
#
# Usage:
# bash scripts/run_v8_pipeline.sh # full auto
# bash scripts/run_v8_pipeline.sh --wait-crawl # wait for crawler first
# bash scripts/run_v8_pipeline.sh --phase-from merge # skip training
# bash scripts/run_v8_pipeline.sh --phase-from dpo # skip SFT, do DPO + merge
# ═══════════════════════════════════════════════════════════════════════════════
set -euo pipefail
FINE_TUNER_DIR="$(cd "$(dirname "$0")/.." && pwd)"
PYTHON="/opt/homebrew/bin/python3.13"
SCRIPTS="$FINE_TUNER_DIR/scripts"
DATA_DIR="$HOME/transceiver-training-data"
LOG_DIR="/tmp/v8-pipeline"
TIMESTAMP=$(date +%Y%m%d-%H%M%S)
mkdir -p "$LOG_DIR"
# ─── Colors ───────────────────────────────────────────────────────────────────
GREEN='\033[0;32m'; YELLOW='\033[1;33m'; RED='\033[0;31m'; NC='\033[0m'; BOLD='\033[1m'
log() { echo -e "${GREEN}[$(date +%H:%M:%S)]${NC} $*"; }
warn() { echo -e "${YELLOW}[$(date +%H:%M:%S)] ⚠${NC} $*"; }
err() { echo -e "${RED}[$(date +%H:%M:%S)] ✗${NC} $*"; }
step() { echo -e "\n${BOLD}${GREEN}══ $* ══${NC}"; }
# ─── Args ─────────────────────────────────────────────────────────────────────
WAIT_CRAWL=false
PHASE_FROM="consolidate" # consolidate | sft | dpo | merge
for arg in "$@"; do
case "$arg" in
--wait-crawl) WAIT_CRAWL=true ;;
--phase-from=*) PHASE_FROM="${arg#*=}" ;;
--phase-from) shift; PHASE_FROM="$1" ;;
esac
done
# ─── Step 0: Wait for external crawler ────────────────────────────────────────
if [[ "$WAIT_CRAWL" == "true" ]]; then
step "Warte auf v8 External Crawler (crawl_v8_sources.py)"
while pgrep -f "crawl_v8_sources.py" > /dev/null 2>&1; do
EXT_COUNT=$(wc -l < "$DATA_DIR/v8-external-sft.jsonl" 2>/dev/null || echo 0)
log " Crawler läuft noch... $EXT_COUNT externe Artikel bisher"
sleep 120
done
EXT_COUNT=$(wc -l < "$DATA_DIR/v8-external-sft.jsonl" 2>/dev/null || echo 0)
log "✓ Crawler fertig: $EXT_COUNT externe Artikel → $DATA_DIR/v8-external-sft.jsonl"
# Also wait for v6 DPO generation
while pgrep -f "process_v6_blogs.py" > /dev/null 2>&1; do
DPO_COUNT=$(wc -l < "$DATA_DIR/v8-v6blogs-dpo.jsonl" 2>/dev/null || echo 0)
log " v6 DPO Generator läuft... $DPO_COUNT Pairs bisher"
sleep 120
done
DPO_V6_COUNT=$(wc -l < "$DATA_DIR/v8-v6blogs-dpo.jsonl" 2>/dev/null || echo 0)
log "✓ v6 DPO fertig: $DPO_V6_COUNT Pairs"
fi
# ─── Step 1: Datenlage prüfen ─────────────────────────────────────────────────
step "Datenlage prüfen"
cd "$FINE_TUNER_DIR"
$PYTHON "$SCRIPTS/consolidate_v8_dataset.py" --stats-only
REAL_COUNT=$(wc -l < "$DATA_DIR/v8-real-posts-sft.jsonl" 2>/dev/null || echo 0)
V7GEN_COUNT=$(wc -l < "$DATA_DIR/v7-generated-sft.jsonl" 2>/dev/null || echo 0)
V6BLOG_COUNT=$(wc -l < "$DATA_DIR/v8-v6blogs-sft.jsonl" 2>/dev/null || echo 0)
EXT_COUNT=$(wc -l < "$DATA_DIR/v8-external-sft.jsonl" 2>/dev/null || echo 0)
log "SFT Quellen:"
log " Real posts (Gold ×3): $REAL_COUNT"
log " v7 Generated (×1): $V7GEN_COUNT"
log " v6 TIP Blogs (×2): $V6BLOG_COUNT"
log " External crawled (×1.5): $EXT_COUNT"
TOTAL_EST=$(( REAL_COUNT*3 + V7GEN_COUNT + V6BLOG_COUNT*2 + EXT_COUNT*2 ))
log " Geschätzt total effective: $TOTAL_EST"
if [[ "$TOTAL_EST" -lt 80 ]]; then
err "Zu wenig Daten ($TOTAL_EST effective) — mindestens 80 nötig!"
err "Warte auf v7-generation oder crawl_v8_sources.py"
exit 1
fi
# ─── Step 2: Dataset konsolidieren ────────────────────────────────────────────
if [[ "$PHASE_FROM" == "consolidate" || "$PHASE_FROM" == "sft" || "$PHASE_FROM" == "dpo" || "$PHASE_FROM" == "merge" ]]; then
if [[ "$PHASE_FROM" == "consolidate" ]]; then
step "Phase 0: Dataset Konsolidierung"
CONS_LOG="$LOG_DIR/consolidate-$TIMESTAMP.log"
log "Starte consolidate_v8_dataset.py..."
$PYTHON "$SCRIPTS/consolidate_v8_dataset.py" 2>&1 | tee "$CONS_LOG"
SFT_MERGED=$(wc -l < "$DATA_DIR/v8-sft-merged.jsonl" 2>/dev/null || echo 0)
DPO_MERGED=$(wc -l < "$DATA_DIR/v8-dpo-merged.jsonl" 2>/dev/null || echo 0)
log "✓ Merged: $SFT_MERGED SFT + $DPO_MERGED DPO"
else
log "Phase: $PHASE_FROM — Konsolidierung übersprungen"
if [[ ! -f "$DATA_DIR/v8-sft-merged.jsonl" ]]; then
warn "v8-sft-merged.jsonl fehlt — erstelle schnell..."
$PYTHON "$SCRIPTS/consolidate_v8_dataset.py"
fi
fi
fi
# ─── Step 3: SFT Training ─────────────────────────────────────────────────────
if [[ "$PHASE_FROM" == "consolidate" || "$PHASE_FROM" == "sft" ]]; then
step "Phase 1: SFT Training (Qwen2.5-14B, LoRA r=64, 5 Epochs)"
SFT_LOG="$LOG_DIR/sft-$TIMESTAMP.log"
log "Starte train_blog_v8.py --phase sft..."
log "Log: $SFT_LOG"
log "Estimated: ~10-14 Stunden (run overnight!)"
$PYTHON "$SCRIPTS/train_blog_v8.py" --phase sft 2>&1 | tee "$SFT_LOG"
ADAPTER="$FINE_TUNER_DIR/adapters/fo-blog-v8/adapter"
if [[ ! -d "$ADAPTER" ]]; then
err "SFT Adapter nicht gefunden: $ADAPTER"
exit 1
fi
log "✓ SFT Adapter: $ADAPTER"
fi
# ─── Step 4: DPO Training ─────────────────────────────────────────────────────
if [[ "$PHASE_FROM" == "consolidate" || "$PHASE_FROM" == "sft" || "$PHASE_FROM" == "dpo" ]]; then
step "Phase 2: DPO Training (2 Epochs)"
DPO_LOG="$LOG_DIR/dpo-$TIMESTAMP.log"
DPO_FILE="$DATA_DIR/v8-dpo-merged.jsonl"
if [[ ! -f "$DPO_FILE" ]]; then
warn "DPO File fehlt — überspringe DPO Phase"
else
DPO_COUNT=$(wc -l < "$DPO_FILE")
log "DPO Pairs: $DPO_COUNT"
log "Starte train_blog_v8.py --phase dpo..."
$PYTHON "$SCRIPTS/train_blog_v8.py" --phase dpo 2>&1 | tee "$DPO_LOG"
log "✓ DPO Training abgeschlossen"
fi
fi
# ─── Step 5: Merge + GGUF + Ollama ───────────────────────────────────────────
step "Phase 3: Merge + GGUF + Ollama Registrierung"
CONV_LOG="$LOG_DIR/convert-$TIMESTAMP.log"
log "Starte train_blog_v8.py --phase convert..."
$PYTHON "$SCRIPTS/train_blog_v8.py" --phase convert 2>&1 | tee "$CONV_LOG"
log "✓ fo-blog-v8 in Ollama registriert"
# ─── Abschluss ────────────────────────────────────────────────────────────────
step "v8 Pipeline ABGESCHLOSSEN"
echo ""
log "fo-blog-v8 ist bereit:"
log " Ollama: ollama run fo-blog-v8"
log " API: OLLAMA_LLM_MODEL=fo-blog-v8"
echo ""
log "Auf Erik deployen:"
log " 1. GGUF rsync: rsync -avz models/fo-blog-v8/fo-blog-v8.gguf root@erik:/opt/ollama-models/"
log " 2. Ollama: ssh erik 'ollama create fo-blog-v8 -f /opt/tip/Modelfile-v8'"
log " 3. TIP: ecosystem.config.js → OLLAMA_LLM_MODEL=fo-blog-v8"
log " 4. Restart: ssh erik 'cd /opt/tip && pm2 restart ecosystem.config.js --update-env'"
echo ""
log "Logs: $LOG_DIR/"
echo ""
log "v8 vs v7 Verbesserungen:"
log " - 14B statt 7B (4× Parameter)"
log " - Echte Blog-Posts ×3 gewichtet"
log " - Echte Modell-Failures als DPO (v6 too-long posts)"
log " - Externe Quellen: APNIC, RIPE Labs, potaroo.net, Cloudflare"
log " - 5 SFT + 2 DPO Epochs (war 4 + 1)"

View File

@ -0,0 +1,409 @@
#!/usr/bin/env python3
"""
train_blog_v8.py fo-blog-v8 Training (Qwen2.5-14B, MPS LoRA)
Phase 1: SFT (5 epochs, LoRA r=64, from merged v8 dataset)
Phase 2: DPO (2 epochs, from SFT adapter)
Usage:
python3 scripts/train_blog_v8.py --phase sft
python3 scripts/train_blog_v8.py --phase dpo
python3 scripts/train_blog_v8.py --phase both # SFT then DPO sequentially
Hardware: Apple Silicon M4 Max (48GB), MPS backend
Estimated: SFT ~10-14h, DPO ~3-5h (run overnight)
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
# ─── Paths ────────────────────────────────────────────────────────────────────
FINE_TUNER_DIR = Path(__file__).parent.parent
DATA_DIR = Path.home() / "transceiver-training-data"
SFT_DATA = DATA_DIR / "v8-sft-merged.jsonl"
DPO_DATA = DATA_DIR / "v8-dpo-merged.jsonl"
SFT_ADAPTER = FINE_TUNER_DIR / "adapters" / "fo-blog-v8" / "adapter"
DPO_ADAPTER = FINE_TUNER_DIR / "adapters" / "fo-blog-v8-dpo" / "adapter"
MERGED_DIR = FINE_TUNER_DIR / "models" / "fo-blog-v8" / "merged"
BASE_MODEL = "Qwen/Qwen2.5-14B-Instruct"
SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, and network infrastructure.
STRICT CONSTRAINTS Follow exactly, no exceptions:
- LENGTH: 7001000 words. Count carefully. Stop at 1000 words maximum.
- STRUCTURE (mandatory, in this order):
1. HOOK paragraph 23 sentences stating the problem this post addresses
2. Technical sections 34 H2 sections covering the topic in depth
3. PRACTICAL TAKEAWAYS exactly 3 bullet points, actionable
- TOPIC DISCIPLINE: Write ONLY about the exact topic requested. Zero drift.
- NO REPETITION: Every sentence must add new information. No restating.
- VOICE: Confident, direct. No hedging phrases like "it's worth noting".
- AUDIENCE: Network engineers and IT professionals. Assume technical fluency.
- FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms.
Do not summarize what you are about to write. Start with the hook directly."""
def build_chatml(system: str, user: str, assistant: str) -> str:
"""Build ChatML-formatted training string."""
return (
f"<|im_start|>system\n{system}<|im_end|>\n"
f"<|im_start|>user\n{user}<|im_end|>\n"
f"<|im_start|>assistant\n{assistant}<|im_end|>"
)
def load_sft_dataset(tokenizer, max_seq_length: int = 4096):
"""Load + tokenize SFT dataset from v8-sft-merged.jsonl."""
from datasets import Dataset
if not SFT_DATA.exists():
raise FileNotFoundError(
f"SFT data not found: {SFT_DATA}\n"
"Run: python3 scripts/consolidate_v8_dataset.py"
)
records = []
with open(SFT_DATA, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
item = json.loads(line)
system = item.get("system_prompt", SYSTEM_PROMPT)
user = item.get("input_text", "")
assistant = item.get("output_text", "")
if user and assistant:
text = build_chatml(system, user, assistant)
records.append({"text": text})
except (json.JSONDecodeError, KeyError):
pass
print(f"Loaded {len(records)} SFT examples from {SFT_DATA.name}")
return Dataset.from_list(records)
def load_dpo_dataset():
"""Load DPO dataset from v8-dpo-merged.jsonl."""
from datasets import Dataset
if not DPO_DATA.exists():
raise FileNotFoundError(
f"DPO data not found: {DPO_DATA}\n"
"Run: python3 scripts/consolidate_v8_dataset.py"
)
records = []
with open(DPO_DATA, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
item = json.loads(line)
records.append({
"prompt": item["prompt"],
"chosen": item["chosen"],
"rejected": item["rejected"],
})
except (json.JSONDecodeError, KeyError):
pass
print(f"Loaded {len(records)} DPO pairs from {DPO_DATA.name}")
return Dataset.from_list(records)
def run_sft() -> None:
"""Phase 1: Supervised Fine-Tuning with LoRA."""
import torch
from peft import LoraConfig, TaskType
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import SFTTrainer, SFTConfig
print(f"=== fo-blog-v8 SFT: {BASE_MODEL} → LoRA r=64 ===")
print(f"Device: {'MPS' if torch.backends.mps.is_available() else 'CPU'}")
# ── Tokenizer ──
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"
# ── Dataset ──
dataset = load_sft_dataset(tokenizer, max_seq_length=4096)
# ── Model ──
print(f"Loading base model: {BASE_MODEL}")
device = "mps" if torch.backends.mps.is_available() else "cpu"
model = AutoModelForCausalLM.from_pretrained(
BASE_MODEL,
dtype=torch.bfloat16, # bf16 for M4 Max (transformers 5.x: dtype= not torch_dtype=)
device_map=device,
trust_remote_code=True,
)
model.config.use_cache = False
# ── LoRA Config ──
lora_config = LoraConfig(
r=64,
lora_alpha=128,
lora_dropout=0.05,
target_modules=["q_proj", "k_proj", "v_proj", "o_proj",
"gate_proj", "up_proj", "down_proj"],
bias="none",
task_type=TaskType.CAUSAL_LM,
)
# ── Training Config (trl 1.x: SFTConfig carries both TrainingArguments + SFT params) ──
SFT_ADAPTER.mkdir(parents=True, exist_ok=True)
training_args = SFTConfig(
output_dir=str(SFT_ADAPTER),
num_train_epochs=5,
per_device_train_batch_size=1,
gradient_accumulation_steps=8,
learning_rate=1.2e-4,
warmup_ratio=0.05,
lr_scheduler_type="cosine",
bf16=True,
fp16=False,
optim="adamw_torch",
weight_decay=0.01,
max_grad_norm=1.0,
logging_steps=10,
save_steps=100,
save_total_limit=2,
eval_strategy="no",
dataloader_num_workers=0,
remove_unused_columns=False,
gradient_checkpointing=True,
report_to="none",
# SFT-specific (moved from SFTTrainer in trl 1.x)
dataset_text_field="text",
max_seq_length=4096,
packing=False,
)
# ── Trainer ──
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
peft_config=lora_config,
processing_class=tokenizer,
args=training_args,
)
print(f"Starting SFT training: {len(dataset)} examples, 5 epochs...")
trainer.train()
print(f"Saving SFT adapter → {SFT_ADAPTER}")
trainer.save_model(str(SFT_ADAPTER))
tokenizer.save_pretrained(str(SFT_ADAPTER))
print("SFT Phase COMPLETE.")
def run_dpo() -> None:
"""Phase 2: Direct Preference Optimization."""
import torch
from peft import PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments
from trl import DPOTrainer, DPOConfig
print(f"=== fo-blog-v8 DPO: SFT adapter → DPO ===")
if not SFT_ADAPTER.exists():
raise FileNotFoundError(
f"SFT adapter not found at {SFT_ADAPTER}\n"
"Run: python3 scripts/train_blog_v8.py --phase sft"
)
# ── Tokenizer ──
tokenizer = AutoTokenizer.from_pretrained(str(SFT_ADAPTER), trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
# ── Dataset ──
dataset = load_dpo_dataset()
# ── Model (base + SFT adapter) ──
print(f"Loading model + SFT adapter...")
device = "mps" if __import__("torch").backends.mps.is_available() else "cpu"
base_model = AutoModelForCausalLM.from_pretrained(
BASE_MODEL,
dtype=torch.bfloat16,
device_map=device,
trust_remote_code=True,
)
model = PeftModel.from_pretrained(base_model, str(SFT_ADAPTER))
# ── DPO Config ──
DPO_ADAPTER.mkdir(parents=True, exist_ok=True)
dpo_config = DPOConfig(
output_dir=str(DPO_ADAPTER),
num_train_epochs=2,
per_device_train_batch_size=1,
gradient_accumulation_steps=8,
learning_rate=5e-5,
warmup_ratio=0.05,
lr_scheduler_type="cosine",
bf16=True,
fp16=False,
optim="adamw_torch",
max_grad_norm=1.0,
logging_steps=5,
save_steps=50,
save_total_limit=2,
eval_strategy="no",
dataloader_num_workers=0,
gradient_checkpointing=True,
report_to="none",
# DPO-specific
beta=0.1,
loss_type="sigmoid",
max_prompt_length=512,
max_length=4096,
)
# ── Trainer ──
trainer = DPOTrainer(
model=model,
ref_model=None, # use implicit reference via peft
args=dpo_config,
train_dataset=dataset,
processing_class=tokenizer,
)
print(f"Starting DPO training: {len(dataset)} pairs, 2 epochs...")
trainer.train()
print(f"Saving DPO adapter → {DPO_ADAPTER}")
trainer.save_model(str(DPO_ADAPTER))
tokenizer.save_pretrained(str(DPO_ADAPTER))
print("DPO Phase COMPLETE.")
def run_merge_and_convert() -> None:
"""Merge adapter → full model, convert to GGUF, register in Ollama."""
import subprocess
import shutil
import torch
from peft import PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
# Prefer DPO adapter, fall back to SFT
adapter_path = DPO_ADAPTER if DPO_ADAPTER.exists() else SFT_ADAPTER
if not adapter_path.exists():
print(f"No adapter found. Run --phase sft first.")
return
print(f"=== fo-blog-v8 Merge + GGUF ===")
print(f"Adapter: {adapter_path}")
# ── Merge ──
MERGED_DIR.mkdir(parents=True, exist_ok=True)
safetensors = MERGED_DIR / "model.safetensors"
if safetensors.exists() and safetensors.stat().st_size > 10_000_000_000:
print(f" Already merged ({safetensors.stat().st_size/1e9:.1f} GB) — skip merge")
else:
print(" Loading base model on CPU for merge (avoids MPS OOM)...")
tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
BASE_MODEL, dtype=torch.float16,
device_map="cpu", trust_remote_code=True,
)
print(" Loading adapter...")
model = PeftModel.from_pretrained(model, str(adapter_path))
print(" Merging...")
model = model.merge_and_unload()
print(f" Saving merged model → {MERGED_DIR}")
model.save_pretrained(str(MERGED_DIR), safe_serialization=True)
tokenizer.save_pretrained(str(MERGED_DIR))
del model
print(" Merge done.")
# ── Copy tokenizer files from HF cache if needed ──
hf_cache = Path.home() / ".cache/huggingface/hub"
snaps = list(hf_cache.glob("models--Qwen--Qwen2.5-14B-Instruct/snapshots/*/tokenizer.json"))
if snaps:
snap_dir = snaps[0].parent
for fname in ["tokenizer.json", "tokenizer_config.json", "vocab.json", "merges.txt"]:
if (snap_dir / fname).exists() and not (MERGED_DIR / fname).exists():
shutil.copy2(snap_dir / fname, MERGED_DIR / fname)
# ── GGUF Conversion ──
gguf_dir = FINE_TUNER_DIR / "models" / "fo-blog-v8"
gguf_f16 = gguf_dir / "fo-blog-v8-f16.gguf"
gguf_q4 = gguf_dir / "fo-blog-v8.gguf"
convert_script = "/opt/homebrew/Cellar/llama.cpp/8680/bin/convert_hf_to_gguf.py"
quantize_bin = "/opt/homebrew/bin/llama-quantize"
python_bin = "/opt/homebrew/bin/python3.13"
if not gguf_f16.exists():
print(" Converting to GGUF f16...")
subprocess.run(
[python_bin, convert_script, str(MERGED_DIR),
"--outfile", str(gguf_f16), "--outtype", "f16"],
check=True,
)
else:
print(f" F16 GGUF exists ({gguf_f16.stat().st_size/1e9:.1f} GB) — skip")
if not gguf_q4.exists():
print(" Quantizing to Q4_K_M...")
subprocess.run(
[quantize_bin, str(gguf_f16), str(gguf_q4), "Q4_K_M"],
check=True,
)
gguf_f16.unlink(missing_ok=True)
print(f" Q4_K_M GGUF: {gguf_q4} ({gguf_q4.stat().st_size/1e9:.1f} GB)")
# ── Ollama Registration ──
modelfile_path = gguf_dir / "Modelfile-v8"
modelfile_content = f"""FROM {gguf_q4.resolve()}
SYSTEM \"\"\"{SYSTEM_PROMPT}\"\"\"
PARAMETER temperature 0.7
PARAMETER top_p 0.9
PARAMETER top_k 40
PARAMETER repeat_penalty 1.15
PARAMETER num_predict 1500
"""
modelfile_path.write_text(modelfile_content)
print(" Registering in Ollama as fo-blog-v8...")
subprocess.run(["ollama", "create", "fo-blog-v8", "-f", str(modelfile_path)], check=True)
import subprocess as sp
result = sp.run(["ollama", "list"], capture_output=True, text=True)
registered = "fo-blog-v8" in result.stdout
print(f" Ollama registration: {'✓ SUCCESS' if registered else '✗ FAILED'}")
print(f"\nDONE: {gguf_q4}")
def main() -> None:
parser = argparse.ArgumentParser(description="Train fo-blog-v8 (Qwen2.5-14B LoRA)")
parser.add_argument(
"--phase",
choices=["sft", "dpo", "both", "convert"],
default="sft",
help="Training phase to run (default: sft)",
)
args = parser.parse_args()
if args.phase in ("sft", "both"):
run_sft()
if args.phase in ("dpo", "both"):
run_dpo()
if args.phase == "convert":
run_merge_and_convert()
if __name__ == "__main__":
main()