Full v8 training pipeline for the optical networking blog model: - train_blog_v8.py: SFT (LoRA r=64, 5 epochs) + DPO (2 epochs) on Qwen2.5-14B-Instruct Fixed for trl 1.2.x: SFTConfig instead of TrainingArguments, processing_class= instead of tokenizer=, eval_strategy= instead of deprecated evaluation_strategy= - consolidate_v8_dataset.py: weighted merge of all data sources (820 effective SFT / 235 DPO) - crawl_v8_sources.py: APNIC/RIPE Labs/potaroo/Cloudflare crawler with balanced div extraction - process_v6_blogs.py: converts 101 real v6 TIP blog outputs into SFT + DPO pairs - label_v7_quality.py: Claude-judged quality labels → v8 quality DPO pairs - parse_real_posts.py: parses blog.fichtmueller.org Ghost CMS HTML → gold SFT records - run_v8_pipeline.sh: autopilot (consolidate → SFT → DPO → GGUF → Ollama) - blog-v8-training.yaml: training config reference Dataset breakdown: 19 real posts ×3 + 196 v7-gen + 28 v6blogs ×2 + 135 external ×1.5
294 lines
11 KiB
Python
294 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
label_v7_quality.py — Bewertet v7-generierte Blogs und erstellt DPO-Labels
|
||
|
||
Claude liest jeden generierten Blog und bewertet ihn nach 5 Kriterien:
|
||
1. Wortanzahl (700-1000 = gut, sonst schlecht)
|
||
2. Hook vorhanden (klares Einstiegsproblem)
|
||
3. Technische Tiefe (nicht generisch)
|
||
4. Struktur (## Headers, 3 Takeaways)
|
||
5. Kein Drift (bleibt beim Thema)
|
||
|
||
Aus gut/schlecht Bewertungen:
|
||
- "gut" + "schlecht" vom gleichen Thema → DPO-Pair
|
||
- Oder: "schlecht" → Claude schreibt bessere Version → DPO-Pair
|
||
|
||
Output:
|
||
~/transceiver-training-data/v8-quality-dpo.jsonl
|
||
|
||
Usage:
|
||
python3 scripts/label_v7_quality.py
|
||
python3 scripts/label_v7_quality.py --input v7-generated-sft.jsonl
|
||
python3 scripts/label_v7_quality.py --max 50 --rewrite-bad
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import logging
|
||
import re
|
||
import subprocess
|
||
from pathlib import Path
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||
datefmt="%H:%M:%S",
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
DATA_DIR = Path.home() / "transceiver-training-data"
|
||
DEFAULT_INPUT = DATA_DIR / "v7-generated-sft.jsonl"
|
||
OUTPUT_FILE = DATA_DIR / "v8-quality-dpo.jsonl"
|
||
PROGRESS_FILE = DATA_DIR / "v8-quality-progress.json"
|
||
|
||
CLAUDE_TIMEOUT = 120
|
||
|
||
SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, and network infrastructure.
|
||
|
||
STRICT CONSTRAINTS — Follow exactly, no exceptions:
|
||
- LENGTH: 700–1000 words. Count carefully. Stop at 1000 words maximum.
|
||
- STRUCTURE (mandatory, in this order):
|
||
1. HOOK paragraph — 2–3 sentences stating the problem this post addresses
|
||
2. Technical sections — 3–4 H2 sections covering the topic in depth
|
||
3. PRACTICAL TAKEAWAYS — exactly 3 bullet points, actionable
|
||
- TOPIC DISCIPLINE: Write ONLY about the exact topic requested. Zero drift.
|
||
- NO REPETITION: Every sentence must add new information. No restating.
|
||
- VOICE: Confident, direct. No hedging phrases like "it's worth noting".
|
||
- AUDIENCE: Network engineers and IT professionals. Assume technical fluency.
|
||
- FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms.
|
||
|
||
Do not summarize what you are about to write. Start with the hook directly."""
|
||
|
||
JUDGE_PROMPT = """Evaluate this blog post against the following criteria. Respond with ONLY a JSON object — no explanation, no markdown.
|
||
|
||
Criteria:
|
||
1. word_count_ok: true if 700-1000 words (count the words)
|
||
2. has_hook: true if first paragraph clearly states a real problem (not generic intro)
|
||
3. technical_depth: true if contains specific technical details (numbers, standards, product names)
|
||
4. good_structure: true if has ## H2 headers AND ends with 3 bullet point takeaways
|
||
5. on_topic: true if stays focused on the exact topic throughout (no generic drift)
|
||
6. overall: "good" if at least 4 of 5 criteria pass, otherwise "bad"
|
||
7. issues: list of failed criteria names
|
||
|
||
Response format exactly:
|
||
{"word_count_ok": true/false, "has_hook": true/false, "technical_depth": true/false, "good_structure": true/false, "on_topic": true/false, "overall": "good"/"bad", "issues": ["..."]}
|
||
|
||
Blog post to evaluate:
|
||
|
||
"""
|
||
|
||
|
||
def load_progress() -> set[str]:
|
||
if not PROGRESS_FILE.exists():
|
||
return set()
|
||
try:
|
||
return set(json.loads(PROGRESS_FILE.read_text()).get("done", []))
|
||
except Exception:
|
||
return set()
|
||
|
||
|
||
def save_progress(done: set[str]) -> None:
|
||
PROGRESS_FILE.write_text(json.dumps({"done": list(done)}))
|
||
|
||
|
||
def judge_blog(blog_text: str) -> dict | None:
|
||
"""Ask Claude to evaluate a blog post quality. Returns parsed JSON or None."""
|
||
prompt = JUDGE_PROMPT + blog_text
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
["claude", "--print", "-p", prompt],
|
||
capture_output=True, text=True, timeout=CLAUDE_TIMEOUT,
|
||
)
|
||
if result.returncode != 0 or not result.stdout.strip():
|
||
return None
|
||
|
||
output = result.stdout.strip()
|
||
# Extract JSON from response (Claude might wrap it)
|
||
json_match = re.search(r'\{[^{}]+\}', output, re.DOTALL)
|
||
if not json_match:
|
||
return None
|
||
return json.loads(json_match.group(0))
|
||
except (subprocess.TimeoutExpired, json.JSONDecodeError, Exception):
|
||
return None
|
||
|
||
|
||
def quick_check(blog_text: str) -> dict:
|
||
"""Fast deterministic pre-check (no Claude call) to filter obvious cases."""
|
||
word_count = len(blog_text.split())
|
||
has_h2 = bool(re.search(r'^## ', blog_text, re.MULTILINE))
|
||
has_bullets = bool(re.search(r'^[-*•] ', blog_text, re.MULTILINE))
|
||
bullet_count = len(re.findall(r'^[-*•] ', blog_text, re.MULTILINE))
|
||
starts_ok = not blog_text.strip().startswith("In this ")
|
||
has_takeaways = bool(re.search(r'takeaway|practical|key point', blog_text, re.IGNORECASE))
|
||
|
||
# Deterministic verdict (no Claude needed for clear failures)
|
||
clear_bad = word_count < 500 or word_count > 1500 or not has_h2
|
||
clear_good = (700 <= word_count <= 1050 and has_h2 and bullet_count >= 3
|
||
and starts_ok)
|
||
|
||
return {
|
||
"word_count": word_count,
|
||
"has_h2": has_h2,
|
||
"has_bullets": has_bullets,
|
||
"bullet_count": bullet_count,
|
||
"clear_bad": clear_bad,
|
||
"clear_good": clear_good,
|
||
"needs_claude": not clear_bad and not clear_good,
|
||
}
|
||
|
||
|
||
def rewrite_for_chosen(title: str, input_text: str) -> str | None:
|
||
"""Use Claude to write a high-quality version (the 'chosen' half of DPO pair)."""
|
||
try:
|
||
result = subprocess.run(
|
||
["claude", "--print", "--system-prompt", SYSTEM_PROMPT, "-p", input_text],
|
||
capture_output=True, text=True, timeout=180,
|
||
)
|
||
if result.returncode != 0 or not result.stdout.strip():
|
||
return None
|
||
output = result.stdout.strip()
|
||
wc = len(output.split())
|
||
if wc < 400 or wc > 1500:
|
||
return None
|
||
return output
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def process_examples(
|
||
input_file: Path,
|
||
max_items: int | None,
|
||
rewrite_bad: bool,
|
||
) -> None:
|
||
if not input_file.exists():
|
||
logger.error("Input file not found: %s", input_file)
|
||
return
|
||
|
||
examples = []
|
||
with open(input_file, encoding="utf-8") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
try:
|
||
examples.append(json.loads(line))
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
if max_items:
|
||
examples = examples[:max_items]
|
||
|
||
logger.info("Processing %d examples from %s", len(examples), input_file.name)
|
||
|
||
done = load_progress()
|
||
stats = {"good": 0, "bad": 0, "dpo_pairs": 0, "skipped": 0}
|
||
|
||
OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
with open(OUTPUT_FILE, "a", encoding="utf-8") as out_f:
|
||
for i, item in enumerate(examples):
|
||
topic = item.get("meta", {}).get("topic", "")[:60]
|
||
output_text = item.get("output_text", "")
|
||
input_text = item.get("input_text", "")
|
||
system = item.get("system_prompt", SYSTEM_PROMPT)
|
||
|
||
item_id = f"{input_file.stem}:{i}"
|
||
if item_id in done:
|
||
stats["skipped"] += 1
|
||
continue
|
||
|
||
# ── Fast pre-check ──
|
||
check = quick_check(output_text)
|
||
logger.info("[%03d/%03d] %dw | %s", i + 1, len(examples), check["word_count"], topic)
|
||
|
||
if check["clear_good"]:
|
||
verdict = "good"
|
||
logger.info(" → CLEAR GOOD (deterministic)")
|
||
elif check["clear_bad"]:
|
||
verdict = "bad"
|
||
logger.info(" → CLEAR BAD (deterministic: %dw, h2=%s)", check["word_count"], check["has_h2"])
|
||
else:
|
||
# Ask Claude to judge
|
||
judgment = judge_blog(output_text)
|
||
if judgment is None:
|
||
logger.warning(" → SKIP (judge failed)")
|
||
done.add(item_id)
|
||
stats["skipped"] += 1
|
||
continue
|
||
verdict = judgment.get("overall", "bad")
|
||
issues = judgment.get("issues", [])
|
||
logger.info(" → %s | issues: %s", verdict.upper(), issues)
|
||
|
||
if verdict == "good":
|
||
stats["good"] += 1
|
||
else:
|
||
stats["bad"] += 1
|
||
|
||
if rewrite_bad:
|
||
# Create DPO pair: bad original → rewritten chosen
|
||
logger.info(" Rewriting bad post for DPO...")
|
||
chosen = rewrite_for_chosen(topic, input_text)
|
||
if chosen and chosen != output_text:
|
||
prompt = (
|
||
f"<|im_start|>system\n{system}<|im_end|>\n"
|
||
f"<|im_start|>user\n{input_text}<|im_end|>\n"
|
||
)
|
||
pair = {
|
||
"prompt": prompt,
|
||
"chosen": chosen,
|
||
"rejected": output_text,
|
||
"meta": {
|
||
"topic": topic,
|
||
"rejection_strategy": "quality_labeled_bad",
|
||
"chosen_words": len(chosen.split()),
|
||
"rejected_words": check["word_count"],
|
||
"verdict": verdict,
|
||
"dataset_version": "v8",
|
||
},
|
||
}
|
||
out_f.write(json.dumps(pair, ensure_ascii=False) + "\n")
|
||
out_f.flush()
|
||
stats["dpo_pairs"] += 1
|
||
logger.info(" DPO pair saved: %dw chosen vs %dw rejected",
|
||
len(chosen.split()), check["word_count"])
|
||
|
||
done.add(item_id)
|
||
save_progress(done)
|
||
|
||
logger.info(
|
||
"Done: good=%d bad=%d dpo_pairs=%d skipped=%d",
|
||
stats["good"], stats["bad"], stats["dpo_pairs"], stats["skipped"],
|
||
)
|
||
logger.info("Output: %s", OUTPUT_FILE)
|
||
|
||
# Print summary
|
||
if OUTPUT_FILE.exists():
|
||
with open(OUTPUT_FILE) as f:
|
||
total = sum(1 for _ in f)
|
||
logger.info("Total DPO pairs in output: %d", total)
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(description="Label v7 posts for quality DPO pairs")
|
||
parser.add_argument(
|
||
"--input", type=Path, default=DEFAULT_INPUT,
|
||
help=f"Input JSONL with SFT examples (default: {DEFAULT_INPUT})",
|
||
)
|
||
parser.add_argument(
|
||
"--max", type=int, default=None,
|
||
help="Max examples to process",
|
||
)
|
||
parser.add_argument(
|
||
"--rewrite-bad", action="store_true",
|
||
help="Rewrite bad posts with Claude to create DPO pairs (slower, costs more)",
|
||
)
|
||
args = parser.parse_args()
|
||
process_examples(args.input, args.max, args.rewrite_bad)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|