#!/usr/bin/env python3 """ process_v6_blogs.py — Verarbeitet echte fo-blog-v6 Outputs als v8 Trainingsdaten Analysiert 101 v6-generierte Blog-Posts aus /opt/tip/blog-training-data/ und erstellt daraus: 1. SFT records — Posts mit 700-1100w → direkt als Training-Beispiele 2. DPO pairs — Posts >1100w: rejected = der zu lange Originalpost chosen = Claude rewritet ihn als saubere 700-1000w Version Dies sind echte Modell-Failures (nicht synthetisch!) — besonders wertvoll für DPO. Input: ~/transceiver-training-data/v6-tip-blogs/*.md Output: ~/transceiver-training-data/v8-v6blogs-sft.jsonl (gute Posts als SFT) ~/transceiver-training-data/v8-v6blogs-dpo.jsonl (zu lange Posts als DPO) Usage: python3 scripts/process_v6_blogs.py python3 scripts/process_v6_blogs.py --max-dpo 30 # nur 30 DPO Pairs python3 scripts/process_v6_blogs.py --sft-only # nur SFT Records python3 scripts/process_v6_blogs.py --dry-run # Stats only """ from __future__ import annotations import argparse import json import re import subprocess import time from pathlib import Path BLOGS_DIR = Path.home() / "transceiver-training-data" / "v6-tip-blogs" SFT_OUTPUT = Path.home() / "transceiver-training-data" / "v8-v6blogs-sft.jsonl" DPO_OUTPUT = Path.home() / "transceiver-training-data" / "v8-v6blogs-dpo.jsonl" PROGRESS_FILE = Path.home() / "transceiver-training-data" / "v8-v6blogs-progress.json" # Word count ranges GOOD_MIN = 700 GOOD_MAX = 1100 REJECTED_MIN = 1100 # posts above this are "too long" → rejected examples CLAUDE_TIMEOUT = 180 SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, and network infrastructure. STRICT CONSTRAINTS — Follow exactly, no exceptions: - LENGTH: 700–1000 words. Count carefully. Stop at 1000 words maximum. - STRUCTURE (mandatory, in this order): 1. HOOK paragraph — 2–3 sentences stating the problem this post addresses 2. Technical sections — 3–4 H2 sections covering the topic in depth 3. PRACTICAL TAKEAWAYS — exactly 3 bullet points, actionable - TOPIC DISCIPLINE: Write ONLY about the exact topic requested. Zero drift. - NO REPETITION: Every sentence must add new information. No restating. - VOICE: Confident, direct. No hedging phrases like "it's worth noting". - AUDIENCE: Network engineers and IT professionals. Assume technical fluency. - FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms. Do not summarize what you are about to write. Start with the hook directly.""" def parse_blog_md(path: Path) -> dict | None: """Parse a blog markdown file with YAML frontmatter.""" text = path.read_text(encoding="utf-8", errors="ignore") # Extract YAML frontmatter frontmatter: dict = {} content = text fm_match = re.match(r'^---\s*\n(.*?)\n---\s*\n', text, re.DOTALL) if fm_match: fm_text = fm_match.group(1) content = text[fm_match.end():] # Parse key: "value" pairs for line in fm_text.split('\n'): kv = re.match(r'^(\w+):\s*"?(.+?)"?\s*$', line) if kv: frontmatter[kv.group(1)] = kv.group(2).strip('"').strip() title = frontmatter.get("title", "").strip('"') if not title: # Fallback: first H1 h1 = re.search(r'^# (.+)$', content, re.MULTILINE) title = h1.group(1).strip() if h1 else path.stem.replace("-", " ").title() slug = frontmatter.get("slug", path.stem) category = frontmatter.get("category", "optical networking") word_count = len(content.split()) if word_count < 200: return None return { "title": title, "slug": slug, "category": category, "content": content.strip(), "word_count": word_count, "path": str(path), } def build_input_text(title: str, audience: str) -> str: return ( f"Write a blog post on the following topic:\n\n" f"**Topic:** {title}\n\n" f"**Target audience:** {audience}\n\n" f"Remember: 700–1000 words, hook + technical sections + 3 takeaways. " f"Stay strictly on-topic. No filler. Start writing now." ) def get_audience(category: str) -> str: cat = category.lower() if any(k in cat for k in ["fiber", "cabling", "mtp", "mpo", "connector"]): return "data center engineers and cabling specialists" elif any(k in cat for k in ["coherent", "dwdm", "zr", "metro"]): return "network architects and optical engineers designing long-haul links" elif any(k in cat for k in ["compatible", "procurement", "vendor", "cost", "price"]): return "network procurement teams and IT managers evaluating transceiver vendors" else: return "network engineers and IT professionals who evaluate and operate optical infrastructure" def rewrite_with_claude(title: str, source_content: str, audience: str) -> str | None: """Rewrite a too-long v6 blog as a proper 700-1000w version (chosen).""" # Truncate source to ~800 words for context words = source_content.split() if len(words) > 800: source_content = " ".join(words[:800]) + "\n\n[Source truncated for reference]" prompt = ( f"Rewrite this blog post to be 700-1000 words with the correct structure.\n\n" f"**Topic:** {title}\n" f"**Target audience:** {audience}\n\n" f"**Original post (DO NOT COPY — use only as topic reference, rewrite completely):**\n\n" f"{source_content}\n\n" f"Rewrite now. 700-1000 words. Hook + technical sections + 3 takeaways. Start directly." ) try: result = subprocess.run( ["claude", "--print", "--system-prompt", SYSTEM_PROMPT, "-p", prompt], capture_output=True, text=True, timeout=CLAUDE_TIMEOUT, ) if result.returncode != 0 or not result.stdout.strip(): return None output = result.stdout.strip() word_count = len(output.split()) if word_count < 400 or word_count > 2000: return None return output except subprocess.TimeoutExpired: print(f" TIMEOUT: Claude took too long for {title[:50]}") return None except Exception as e: print(f" ERROR: {e}") return None def load_progress() -> set[str]: if not PROGRESS_FILE.exists(): return set() try: return set(json.loads(PROGRESS_FILE.read_text()).get("done", [])) except Exception: return set() def save_progress(done: set[str]) -> None: PROGRESS_FILE.write_text(json.dumps({"done": list(done)})) def main() -> None: parser = argparse.ArgumentParser(description="Process v6 TIP blogs into v8 training data") parser.add_argument("--max-dpo", type=int, default=None, help="Max DPO pairs to generate") parser.add_argument("--sft-only", action="store_true", help="Only create SFT records (skip DPO)") parser.add_argument("--dpo-only", action="store_true", help="Only create DPO pairs (skip SFT)") parser.add_argument("--dry-run", action="store_true", help="Statistics only, no output") args = parser.parse_args() # Load all blog files md_files = sorted(BLOGS_DIR.glob("*.md")) if not md_files: print(f"No .md files found in {BLOGS_DIR}") return blogs = [] for path in md_files: parsed = parse_blog_md(path) if parsed: blogs.append(parsed) # Categorize good = [b for b in blogs if GOOD_MIN <= b["word_count"] <= GOOD_MAX] too_long = [b for b in blogs if b["word_count"] > REJECTED_MIN] too_short = [b for b in blogs if b["word_count"] < GOOD_MIN] print(f"=== v6 TIP Blog Analysis ===") print(f"Total: {len(blogs)} files") print(f"Good (SFT): {len(good)} files ({GOOD_MIN}-{GOOD_MAX}w)") print(f"Too long: {len(too_long)} files (>{REJECTED_MIN}w) → DPO rejected") print(f"Too short: {len(too_short)} files (<{GOOD_MIN}w) → skip") print() if args.dry_run: print("Good posts:") for b in sorted(good, key=lambda x: x["word_count"]): print(f" {b['word_count']:4d}w | {b['title'][:60]}") print("\nToo long (top 10):") for b in sorted(too_long, key=lambda x: x["word_count"], reverse=True)[:10]: print(f" {b['word_count']:4d}w | {b['title'][:60]}") return done = load_progress() # ─── Phase 1: SFT Records from good posts ────────────────────────────────── if not args.dpo_only: print("=== Phase 1: SFT Records (good posts) ===") SFT_OUTPUT.parent.mkdir(parents=True, exist_ok=True) sft_count = 0 with open(SFT_OUTPUT, "w", encoding="utf-8") as f: for blog in good: audience = get_audience(blog["category"]) record = { "system_prompt": SYSTEM_PROMPT, "input_text": build_input_text(blog["title"], audience), "output_text": blog["content"], "meta": { "title": blog["title"], "slug": blog["slug"], "source": "tip-v6-blogs", "word_count": blog["word_count"], "quality": "v6_output_good", "weight": 2.0, # Real model output, good length "dataset_version": "v8", }, } f.write(json.dumps(record, ensure_ascii=False) + "\n") sft_count += 1 print(f" SFT: {blog['word_count']:4d}w | {blog['title'][:60]}") print(f"\nSFT saved: {sft_count} records → {SFT_OUTPUT}") # ─── Phase 2: DPO Pairs from too-long posts ──────────────────────────────── if not args.sft_only: print("\n=== Phase 2: DPO Pairs (too-long posts → rewrite) ===") DPO_OUTPUT.parent.mkdir(parents=True, exist_ok=True) candidates = too_long if args.max_dpo: candidates = candidates[:args.max_dpo] dpo_saved = 0 dpo_skipped = 0 with open(DPO_OUTPUT, "a", encoding="utf-8") as f: for i, blog in enumerate(candidates): slug = blog["slug"] if slug in done: dpo_skipped += 1 continue print(f" [{i+1}/{len(candidates)}] {blog['word_count']:4d}w → rewrite: {blog['title'][:55]}") audience = get_audience(blog["category"]) # Get Claude to write a GOOD version → chosen chosen = rewrite_with_claude(blog["title"], blog["content"], audience) if not chosen: print(f" SKIP (Claude failed)") done.add(slug) dpo_skipped += 1 continue chosen_wc = len(chosen.split()) print(f" OK: chosen={chosen_wc}w (was {blog['word_count']}w)") # Build DPO prompt (ChatML prefix) prompt = ( f"<|im_start|>system\n{SYSTEM_PROMPT}<|im_end|>\n" f"<|im_start|>user\n{build_input_text(blog['title'], audience)}<|im_end|>\n" ) pair = { "prompt": prompt, "chosen": chosen, "rejected": blog["content"], "meta": { "title": blog["title"], "slug": slug, "source": "tip-v6-dpo", "rejection_strategy": "too_long_real", "chosen_words": chosen_wc, "rejected_words": blog["word_count"], "dataset_version": "v8", }, } f.write(json.dumps(pair, ensure_ascii=False) + "\n") f.flush() done.add(slug) save_progress(done) dpo_saved += 1 time.sleep(1) print(f"\nDPO saved: {dpo_saved} pairs | skipped: {dpo_skipped} → {DPO_OUTPUT}") # ─── Summary ─────────────────────────────────────────────────────────────── print("\n=== Summary ===") if SFT_OUTPUT.exists(): with open(SFT_OUTPUT) as f: sft_n = sum(1 for _ in f) print(f"SFT: {sft_n} records → {SFT_OUTPUT}") if DPO_OUTPUT.exists(): with open(DPO_OUTPUT) as f: dpo_n = sum(1 for _ in f) print(f"DPO: {dpo_n} pairs → {DPO_OUTPUT}") if __name__ == "__main__": main()