llm-gateway/packages/fine-tuner/scripts/process_v6_blogs.py
Rene Fichtmueller c3ab87b167 feat: add fo-blog-v8 training pipeline (Qwen2.5-14B, SFT+DPO)
Full v8 training pipeline for the optical networking blog model:
- train_blog_v8.py: SFT (LoRA r=64, 5 epochs) + DPO (2 epochs) on Qwen2.5-14B-Instruct
  Fixed for trl 1.2.x: SFTConfig instead of TrainingArguments, processing_class= instead
  of tokenizer=, eval_strategy= instead of deprecated evaluation_strategy=
- consolidate_v8_dataset.py: weighted merge of all data sources (820 effective SFT / 235 DPO)
- crawl_v8_sources.py: APNIC/RIPE Labs/potaroo/Cloudflare crawler with balanced div extraction
- process_v6_blogs.py: converts 101 real v6 TIP blog outputs into SFT + DPO pairs
- label_v7_quality.py: Claude-judged quality labels → v8 quality DPO pairs
- parse_real_posts.py: parses blog.fichtmueller.org Ghost CMS HTML → gold SFT records
- run_v8_pipeline.sh: autopilot (consolidate → SFT → DPO → GGUF → Ollama)
- blog-v8-training.yaml: training config reference

Dataset breakdown: 19 real posts ×3 + 196 v7-gen + 28 v6blogs ×2 + 135 external ×1.5
2026-04-19 11:44:09 +02:00

324 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
process_v6_blogs.py — Verarbeitet echte fo-blog-v6 Outputs als v8 Trainingsdaten
Analysiert 101 v6-generierte Blog-Posts aus /opt/tip/blog-training-data/
und erstellt daraus:
1. SFT records — Posts mit 700-1100w → direkt als Training-Beispiele
2. DPO pairs — Posts >1100w:
rejected = der zu lange Originalpost
chosen = Claude rewritet ihn als saubere 700-1000w Version
Dies sind echte Modell-Failures (nicht synthetisch!) — besonders wertvoll für DPO.
Input: ~/transceiver-training-data/v6-tip-blogs/*.md
Output:
~/transceiver-training-data/v8-v6blogs-sft.jsonl (gute Posts als SFT)
~/transceiver-training-data/v8-v6blogs-dpo.jsonl (zu lange Posts als DPO)
Usage:
python3 scripts/process_v6_blogs.py
python3 scripts/process_v6_blogs.py --max-dpo 30 # nur 30 DPO Pairs
python3 scripts/process_v6_blogs.py --sft-only # nur SFT Records
python3 scripts/process_v6_blogs.py --dry-run # Stats only
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import time
from pathlib import Path
BLOGS_DIR = Path.home() / "transceiver-training-data" / "v6-tip-blogs"
SFT_OUTPUT = Path.home() / "transceiver-training-data" / "v8-v6blogs-sft.jsonl"
DPO_OUTPUT = Path.home() / "transceiver-training-data" / "v8-v6blogs-dpo.jsonl"
PROGRESS_FILE = Path.home() / "transceiver-training-data" / "v8-v6blogs-progress.json"
# Word count ranges
GOOD_MIN = 700
GOOD_MAX = 1100
REJECTED_MIN = 1100 # posts above this are "too long" → rejected examples
CLAUDE_TIMEOUT = 180
SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, and network infrastructure.
STRICT CONSTRAINTS — Follow exactly, no exceptions:
- LENGTH: 7001000 words. Count carefully. Stop at 1000 words maximum.
- STRUCTURE (mandatory, in this order):
1. HOOK paragraph — 23 sentences stating the problem this post addresses
2. Technical sections — 34 H2 sections covering the topic in depth
3. PRACTICAL TAKEAWAYS — exactly 3 bullet points, actionable
- TOPIC DISCIPLINE: Write ONLY about the exact topic requested. Zero drift.
- NO REPETITION: Every sentence must add new information. No restating.
- VOICE: Confident, direct. No hedging phrases like "it's worth noting".
- AUDIENCE: Network engineers and IT professionals. Assume technical fluency.
- FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms.
Do not summarize what you are about to write. Start with the hook directly."""
def parse_blog_md(path: Path) -> dict | None:
"""Parse a blog markdown file with YAML frontmatter."""
text = path.read_text(encoding="utf-8", errors="ignore")
# Extract YAML frontmatter
frontmatter: dict = {}
content = text
fm_match = re.match(r'^---\s*\n(.*?)\n---\s*\n', text, re.DOTALL)
if fm_match:
fm_text = fm_match.group(1)
content = text[fm_match.end():]
# Parse key: "value" pairs
for line in fm_text.split('\n'):
kv = re.match(r'^(\w+):\s*"?(.+?)"?\s*$', line)
if kv:
frontmatter[kv.group(1)] = kv.group(2).strip('"').strip()
title = frontmatter.get("title", "").strip('"')
if not title:
# Fallback: first H1
h1 = re.search(r'^# (.+)$', content, re.MULTILINE)
title = h1.group(1).strip() if h1 else path.stem.replace("-", " ").title()
slug = frontmatter.get("slug", path.stem)
category = frontmatter.get("category", "optical networking")
word_count = len(content.split())
if word_count < 200:
return None
return {
"title": title,
"slug": slug,
"category": category,
"content": content.strip(),
"word_count": word_count,
"path": str(path),
}
def build_input_text(title: str, audience: str) -> str:
return (
f"Write a blog post on the following topic:\n\n"
f"**Topic:** {title}\n\n"
f"**Target audience:** {audience}\n\n"
f"Remember: 7001000 words, hook + technical sections + 3 takeaways. "
f"Stay strictly on-topic. No filler. Start writing now."
)
def get_audience(category: str) -> str:
cat = category.lower()
if any(k in cat for k in ["fiber", "cabling", "mtp", "mpo", "connector"]):
return "data center engineers and cabling specialists"
elif any(k in cat for k in ["coherent", "dwdm", "zr", "metro"]):
return "network architects and optical engineers designing long-haul links"
elif any(k in cat for k in ["compatible", "procurement", "vendor", "cost", "price"]):
return "network procurement teams and IT managers evaluating transceiver vendors"
else:
return "network engineers and IT professionals who evaluate and operate optical infrastructure"
def rewrite_with_claude(title: str, source_content: str, audience: str) -> str | None:
"""Rewrite a too-long v6 blog as a proper 700-1000w version (chosen)."""
# Truncate source to ~800 words for context
words = source_content.split()
if len(words) > 800:
source_content = " ".join(words[:800]) + "\n\n[Source truncated for reference]"
prompt = (
f"Rewrite this blog post to be 700-1000 words with the correct structure.\n\n"
f"**Topic:** {title}\n"
f"**Target audience:** {audience}\n\n"
f"**Original post (DO NOT COPY — use only as topic reference, rewrite completely):**\n\n"
f"{source_content}\n\n"
f"Rewrite now. 700-1000 words. Hook + technical sections + 3 takeaways. Start directly."
)
try:
result = subprocess.run(
["claude", "--print", "--system-prompt", SYSTEM_PROMPT, "-p", prompt],
capture_output=True, text=True, timeout=CLAUDE_TIMEOUT,
)
if result.returncode != 0 or not result.stdout.strip():
return None
output = result.stdout.strip()
word_count = len(output.split())
if word_count < 400 or word_count > 2000:
return None
return output
except subprocess.TimeoutExpired:
print(f" TIMEOUT: Claude took too long for {title[:50]}")
return None
except Exception as e:
print(f" ERROR: {e}")
return None
def load_progress() -> set[str]:
if not PROGRESS_FILE.exists():
return set()
try:
return set(json.loads(PROGRESS_FILE.read_text()).get("done", []))
except Exception:
return set()
def save_progress(done: set[str]) -> None:
PROGRESS_FILE.write_text(json.dumps({"done": list(done)}))
def main() -> None:
parser = argparse.ArgumentParser(description="Process v6 TIP blogs into v8 training data")
parser.add_argument("--max-dpo", type=int, default=None, help="Max DPO pairs to generate")
parser.add_argument("--sft-only", action="store_true", help="Only create SFT records (skip DPO)")
parser.add_argument("--dpo-only", action="store_true", help="Only create DPO pairs (skip SFT)")
parser.add_argument("--dry-run", action="store_true", help="Statistics only, no output")
args = parser.parse_args()
# Load all blog files
md_files = sorted(BLOGS_DIR.glob("*.md"))
if not md_files:
print(f"No .md files found in {BLOGS_DIR}")
return
blogs = []
for path in md_files:
parsed = parse_blog_md(path)
if parsed:
blogs.append(parsed)
# Categorize
good = [b for b in blogs if GOOD_MIN <= b["word_count"] <= GOOD_MAX]
too_long = [b for b in blogs if b["word_count"] > REJECTED_MIN]
too_short = [b for b in blogs if b["word_count"] < GOOD_MIN]
print(f"=== v6 TIP Blog Analysis ===")
print(f"Total: {len(blogs)} files")
print(f"Good (SFT): {len(good)} files ({GOOD_MIN}-{GOOD_MAX}w)")
print(f"Too long: {len(too_long)} files (>{REJECTED_MIN}w) → DPO rejected")
print(f"Too short: {len(too_short)} files (<{GOOD_MIN}w) → skip")
print()
if args.dry_run:
print("Good posts:")
for b in sorted(good, key=lambda x: x["word_count"]):
print(f" {b['word_count']:4d}w | {b['title'][:60]}")
print("\nToo long (top 10):")
for b in sorted(too_long, key=lambda x: x["word_count"], reverse=True)[:10]:
print(f" {b['word_count']:4d}w | {b['title'][:60]}")
return
done = load_progress()
# ─── Phase 1: SFT Records from good posts ──────────────────────────────────
if not args.dpo_only:
print("=== Phase 1: SFT Records (good posts) ===")
SFT_OUTPUT.parent.mkdir(parents=True, exist_ok=True)
sft_count = 0
with open(SFT_OUTPUT, "w", encoding="utf-8") as f:
for blog in good:
audience = get_audience(blog["category"])
record = {
"system_prompt": SYSTEM_PROMPT,
"input_text": build_input_text(blog["title"], audience),
"output_text": blog["content"],
"meta": {
"title": blog["title"],
"slug": blog["slug"],
"source": "tip-v6-blogs",
"word_count": blog["word_count"],
"quality": "v6_output_good",
"weight": 2.0, # Real model output, good length
"dataset_version": "v8",
},
}
f.write(json.dumps(record, ensure_ascii=False) + "\n")
sft_count += 1
print(f" SFT: {blog['word_count']:4d}w | {blog['title'][:60]}")
print(f"\nSFT saved: {sft_count} records → {SFT_OUTPUT}")
# ─── Phase 2: DPO Pairs from too-long posts ────────────────────────────────
if not args.sft_only:
print("\n=== Phase 2: DPO Pairs (too-long posts → rewrite) ===")
DPO_OUTPUT.parent.mkdir(parents=True, exist_ok=True)
candidates = too_long
if args.max_dpo:
candidates = candidates[:args.max_dpo]
dpo_saved = 0
dpo_skipped = 0
with open(DPO_OUTPUT, "a", encoding="utf-8") as f:
for i, blog in enumerate(candidates):
slug = blog["slug"]
if slug in done:
dpo_skipped += 1
continue
print(f" [{i+1}/{len(candidates)}] {blog['word_count']:4d}w → rewrite: {blog['title'][:55]}")
audience = get_audience(blog["category"])
# Get Claude to write a GOOD version → chosen
chosen = rewrite_with_claude(blog["title"], blog["content"], audience)
if not chosen:
print(f" SKIP (Claude failed)")
done.add(slug)
dpo_skipped += 1
continue
chosen_wc = len(chosen.split())
print(f" OK: chosen={chosen_wc}w (was {blog['word_count']}w)")
# Build DPO prompt (ChatML prefix)
prompt = (
f"<|im_start|>system\n{SYSTEM_PROMPT}<|im_end|>\n"
f"<|im_start|>user\n{build_input_text(blog['title'], audience)}<|im_end|>\n"
)
pair = {
"prompt": prompt,
"chosen": chosen,
"rejected": blog["content"],
"meta": {
"title": blog["title"],
"slug": slug,
"source": "tip-v6-dpo",
"rejection_strategy": "too_long_real",
"chosen_words": chosen_wc,
"rejected_words": blog["word_count"],
"dataset_version": "v8",
},
}
f.write(json.dumps(pair, ensure_ascii=False) + "\n")
f.flush()
done.add(slug)
save_progress(done)
dpo_saved += 1
time.sleep(1)
print(f"\nDPO saved: {dpo_saved} pairs | skipped: {dpo_skipped}{DPO_OUTPUT}")
# ─── Summary ───────────────────────────────────────────────────────────────
print("\n=== Summary ===")
if SFT_OUTPUT.exists():
with open(SFT_OUTPUT) as f:
sft_n = sum(1 for _ in f)
print(f"SFT: {sft_n} records → {SFT_OUTPUT}")
if DPO_OUTPUT.exists():
with open(DPO_OUTPUT) as f:
dpo_n = sum(1 for _ in f)
print(f"DPO: {dpo_n} pairs → {DPO_OUTPUT}")
if __name__ == "__main__":
main()