Rene Fichtmueller c3ab87b167 feat: add fo-blog-v8 training pipeline (Qwen2.5-14B, SFT+DPO)
Full v8 training pipeline for the optical networking blog model:
- train_blog_v8.py: SFT (LoRA r=64, 5 epochs) + DPO (2 epochs) on Qwen2.5-14B-Instruct
  Fixed for trl 1.2.x: SFTConfig instead of TrainingArguments, processing_class= instead
  of tokenizer=, eval_strategy= instead of deprecated evaluation_strategy=
- consolidate_v8_dataset.py: weighted merge of all data sources (820 effective SFT / 235 DPO)
- crawl_v8_sources.py: APNIC/RIPE Labs/potaroo/Cloudflare crawler with balanced div extraction
- process_v6_blogs.py: converts 101 real v6 TIP blog outputs into SFT + DPO pairs
- label_v7_quality.py: Claude-judged quality labels → v8 quality DPO pairs
- parse_real_posts.py: parses blog.fichtmueller.org Ghost CMS HTML → gold SFT records
- run_v8_pipeline.sh: autopilot (consolidate → SFT → DPO → GGUF → Ollama)
- blog-v8-training.yaml: training config reference

Dataset breakdown: 19 real posts ×3 + 196 v7-gen + 28 v6blogs ×2 + 135 external ×1.5
2026-04-19 11:44:09 +02:00

608 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
crawl_v8_sources.py — Crawlt externe Quellen für v8 Trainingsdaten
Quellen (priorisiert nach technischer Tiefe):
1. APNIC Blog — https://blog.apnic.net/feed/ (400-500 Posts)
2. RIPE Labs — https://labs.ripe.net/feed/blog/ (300-400 Posts)
3. Geoff Huston — https://www.potaroo.net/ispcol/ (500 Artikel)
4. Cloudflare Blog — /tag/networking + /tag/bgp (30-50 Posts)
5. ARIN Blog — https://www.arin.net/blog/ (bonus)
Für jede Quelle:
- Artikel-URL + Titel + Kategorie extrahieren
- Rohtext herunterladen und bereinigen
- Claude CLI rewritet → 700-1000w, hook + Sektionen + Takeaways
- Als SFT JSONL speichern (weight: 1.5)
Output: ~/transceiver-training-data/v8-external-sft.jsonl
Usage:
python3 scripts/crawl_v8_sources.py # alle Quellen
python3 scripts/crawl_v8_sources.py --source apnic # nur APNIC
python3 scripts/crawl_v8_sources.py --max 50 # max 50 Artikel
python3 scripts/crawl_v8_sources.py --dry-run # nur URLs anzeigen
"""
from __future__ import annotations
import argparse
import json
import logging
import re
import subprocess
import time
import urllib.request
import urllib.error
from html.parser import HTMLParser
from pathlib import Path
from html import unescape as html_unescape
from typing import NamedTuple
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)
OUTPUT_FILE = Path.home() / "transceiver-training-data" / "v8-external-sft.jsonl"
PROGRESS_FILE = Path.home() / "transceiver-training-data" / "v8-crawl-progress.json"
TIMEOUT = 30
CLAUDE_TIMEOUT = 180
USER_AGENT = "Mozilla/5.0 (compatible; research-bot/1.0; training-data-collection)"
SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, BGP routing, and network infrastructure.
STRICT CONSTRAINTS — Follow exactly, no exceptions:
- LENGTH: 7001000 words. Count carefully. Stop at 1000 words maximum.
- STRUCTURE (mandatory, in this order):
1. HOOK paragraph — 23 sentences stating the problem this post addresses
2. Technical sections — 34 H2 sections covering the topic in depth
3. PRACTICAL TAKEAWAYS — exactly 3 bullet points, actionable
- TOPIC DISCIPLINE: Write ONLY about the exact topic in the source material. Zero drift.
- NO REPETITION: Every sentence must add new information. No restating.
- VOICE: Confident, direct. No hedging phrases like "it's worth noting".
- AUDIENCE: Network engineers and IT professionals. Assume technical fluency.
- FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms.
Do not summarize what you are about to write. Start with the hook directly.
Do not copy from the source — rewrite completely in your own words."""
class Article(NamedTuple):
title: str
url: str
source: str
category: str
# ─── HTML Utilities ───────────────────────────────────────────────────────────
def fetch_url(url: str, timeout: int = TIMEOUT) -> str | None:
"""Fetch URL, return text or None."""
try:
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
with urllib.request.urlopen(req, timeout=timeout) as resp:
charset = resp.headers.get_content_charset() or "utf-8"
return resp.read().decode(charset, errors="ignore")
except Exception as exc:
logger.warning("Fetch failed %s: %s", url, exc)
return None
def strip_tags(html: str) -> str:
"""Strip HTML tags → plain text, decode all HTML entities."""
text = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL)
text = re.sub(r'<style[^>]*>.*?</style>', '', text, flags=re.DOTALL)
text = re.sub(r'<[^>]+>', ' ', text)
text = html_unescape(text) # handles &amp;, &#8216;, &#x27;, etc.
text = text.replace('\xa0', ' ') # non-breaking space
text = re.sub(r'\s+', ' ', text)
return text.strip()
def find_balanced_div(html: str, start_pos: int) -> str:
"""Extract content of a <div> starting at start_pos, handling nested divs."""
depth = 0
pos = start_pos
content_start = html.find('>', start_pos)
if content_start < 0:
return ""
content_start += 1
pos = content_start
while pos < len(html):
open_m = re.search(r'<div[\s>]', html[pos:], re.IGNORECASE)
close_m = re.search(r'</div>', html[pos:], re.IGNORECASE)
if not close_m:
break
open_pos = open_m.start() if open_m else len(html)
close_pos = close_m.start()
if open_pos < close_pos:
depth += 1
pos += open_pos + 4
else:
if depth == 0:
return html[content_start:pos + close_pos]
depth -= 1
pos += close_pos + 6
return ""
def find_content_block(html: str, *class_or_id_patterns: str) -> str:
"""Find a div by class or id pattern (handles any attribute order), return its content."""
for pattern in class_or_id_patterns:
# Match <div ...pattern...> regardless of attribute order
m = re.search(
rf'<div[^>]+(?:class|id)="[^"]*{re.escape(pattern)}[^"]*"[^>]*>',
html, re.IGNORECASE,
)
if m:
content = find_balanced_div(html, m.start())
if len(content.split()) > 50:
return content
return ""
def extract_article_text(html: str, source: str) -> str:
"""Extract main content area based on source-specific selectors."""
# Potaroo uses old-school table layout — extract all <p> paragraphs
if source == "potaroo":
paragraphs = re.findall(r'<p[^>]*>(.*?)</p>', html, re.DOTALL | re.IGNORECASE)
# Filter: skip short nav-like paragraphs, keep substantive text
long_paras = [strip_tags(p) for p in paragraphs if len(strip_tags(p).split()) > 20]
if long_paras:
return "\n\n".join(long_paras)
return ""
# Source-specific content div identifiers
source_patterns: dict[str, list[str]] = {
"apnic": ["entry-content", "article-content", "post-content"],
"ripe": ["article-body", "entry-content", "ripe-article"],
"cloudflare": ["post-content", "article-content", "entry-content"],
"arin": ["entry-content", "post-content", "article-body"],
}
patterns = source_patterns.get(source, ["entry-content", "article-content"])
# Try div-based extraction first (handles nested divs correctly)
raw = find_content_block(html, *patterns)
# Fallback: <article> tag
if not raw:
m = re.search(r'<article[^>]*>(.*?)</article>', html, re.DOTALL | re.IGNORECASE)
if m:
raw = m.group(1)
# Fallback: <main> tag
if not raw:
m = re.search(r'<main[^>]*>(.*?)</main>', html, re.DOTALL | re.IGNORECASE)
if m:
raw = m.group(1)
if not raw or len(raw.split()) < 30:
return ""
# Convert to markdown-ish plain text
for level in [3, 2, 1]:
raw = re.sub(
rf'<h{level}[^>]*>(.*?)</h{level}>',
lambda m: f"\n{'#'*level} {strip_tags(m.group(1))}\n",
raw, flags=re.DOTALL | re.IGNORECASE
)
raw = re.sub(r'<li[^>]*>(.*?)</li>', r'\n- \1', raw, flags=re.DOTALL)
raw = re.sub(r'<p[^>]*>', '\n', raw)
raw = re.sub(r'</p>', '\n', raw)
raw = re.sub(r'<br\s*/?>', '\n', raw)
raw = re.sub(r'<(?:strong|b)[^>]*>(.*?)</(?:strong|b)>', r'**\1**', raw, flags=re.DOTALL)
text = strip_tags(raw)
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r'[ \t]+', ' ', text)
return text.strip()
# ─── RSS Parser ───────────────────────────────────────────────────────────────
def parse_rss(xml: str, source: str, max_items: int = 200) -> list[Article]:
"""Parse RSS/Atom feed → list of Articles."""
articles = []
# Try <item> (RSS 2.0)
items = re.findall(r'<item>(.*?)</item>', xml, re.DOTALL)
if not items:
# Try <entry> (Atom)
items = re.findall(r'<entry>(.*?)</entry>', xml, re.DOTALL)
for item in items[:max_items]:
title_m = re.search(r'<title[^>]*>(?:<!\[CDATA\[)?(.*?)(?:\]\]>)?</title>', item, re.DOTALL)
link_m = (re.search(r'<link[^>]*/>', item) or
re.search(r'<link[^>]*>(.*?)</link>', item, re.DOTALL) or
re.search(r'<guid[^>]*>(https?://[^<]+)</guid>', item))
cat_m = re.search(r'<category[^>]*>(?:<!\[CDATA\[)?(.*?)(?:\]\]>)?</category>', item, re.DOTALL)
if not title_m or not link_m:
continue
title = strip_tags(title_m.group(1)).strip()
# For <link /> self-closing, get href attribute
link_raw = link_m.group(0)
href_m = re.search(r'href="([^"]+)"', link_raw) or re.search(r'>(https?://[^<]+)<', link_raw)
url = href_m.group(1) if href_m else link_m.group(1) if link_m.lastindex else ""
url = url.strip()
category = strip_tags(cat_m.group(1)).strip() if cat_m else "networking"
if title and url.startswith("http"):
articles.append(Article(title=title, url=url, source=source, category=category))
logger.info("RSS parsed %d articles from %s", len(articles), source)
return articles
# ─── Source-specific fetchers ─────────────────────────────────────────────────
def fetch_apnic(max_items: int = 200) -> list[Article]:
"""APNIC Blog via RSS."""
articles = []
# APNIC has paginated RSS - try multiple pages
for page in range(1, 6):
url = f"https://blog.apnic.net/feed/?paged={page}" if page > 1 else "https://blog.apnic.net/feed/"
xml = fetch_url(url)
if not xml:
break
page_articles = parse_rss(xml, "apnic", max_items)
articles.extend(page_articles)
if len(articles) >= max_items or len(page_articles) < 10:
break
time.sleep(1)
return articles[:max_items]
def fetch_ripe_labs(max_items: int = 200) -> list[Article]:
"""RIPE Labs via RSS (feed.xml)."""
feeds = [
"https://labs.ripe.net/feed.xml",
"https://labs.ripe.net/feed/",
"https://labs.ripe.net/Members/feed/",
]
for feed_url in feeds:
xml = fetch_url(feed_url)
if xml and len(xml) > 500:
articles = parse_rss(xml, "ripe", max_items)
if articles:
return articles[:max_items]
time.sleep(1)
return []
def fetch_potaroo(max_items: int = 100) -> list[Article]:
"""Geoff Huston's potaroo.net ISPCOL column index.
Articles are linked as relative hrefs like "2026-04/nznog26.html" from
https://www.potaroo.net/ispcol/index.html
"""
base_url = "https://www.potaroo.net/ispcol"
html = fetch_url(f"{base_url}/index.html") or fetch_url(f"{base_url}/")
if not html:
logger.warning("Could not fetch potaroo.net index")
return []
articles = []
seen: set[str] = set()
# Pattern: href="YYYY-MM/slug.html" followed (nearby) by link text
# The index lists articles as: <a href="2026-04/nznog26.html">Title</a>
for m in re.finditer(r'href="(\d{4}-\d{2}/[^"]+\.html)"[^>]*>([^<]{5,150})<', html, re.IGNORECASE):
rel_path, title_raw = m.group(1), m.group(2).strip()
url = f"{base_url}/{rel_path}"
title = html_unescape(title_raw).strip()
if url not in seen and len(title) > 5:
seen.add(url)
articles.append(Article(title=title, url=url, source="potaroo", category="bgp-routing"))
if len(articles) >= max_items:
break
# Fallback: bare date-path links without visible anchor text
if not articles:
for m in re.finditer(r'href="(\d{4}-\d{2}/[^"]+\.html)"', html, re.IGNORECASE):
rel_path = m.group(1)
url = f"{base_url}/{rel_path}"
slug = rel_path.split("/")[-1].replace(".html", "").replace("-", " ").title()
if url not in seen:
seen.add(url)
articles.append(Article(title=f"Geoff Huston: {slug}", url=url, source="potaroo", category="bgp-routing"))
if len(articles) >= max_items:
break
logger.info("Potaroo: found %d articles", len(articles))
return articles[:max_items]
def fetch_cloudflare(max_items: int = 60) -> list[Article]:
"""Cloudflare blog — networking and BGP tags."""
articles = []
tags = ["bgp", "routing", "dns", "tcp", "ddos", "ipv6"]
for tag in tags:
# Cloudflare has a JSON API for blog listing
api_url = f"https://blog.cloudflare.com/tag/{tag}/"
html = fetch_url(api_url)
if not html:
continue
# Extract article links from blog listing
for m in re.finditer(
r'href="(/[a-z0-9-]{10,80}/)"[^>]*>.*?<h\d[^>]*>([^<]{10,150})</h\d>',
html, re.DOTALL | re.IGNORECASE
):
url = f"https://blog.cloudflare.com{m.group(1)}"
title = strip_tags(m.group(2)).strip()
if url not in [a.url for a in articles] and title:
articles.append(Article(title=title, url=url, source="cloudflare", category=tag))
if len(articles) >= max_items:
break
if len(articles) >= max_items:
break
time.sleep(1)
logger.info("Cloudflare: found %d articles", len(articles))
return articles[:max_items]
def fetch_arin_blog(max_items: int = 50) -> list[Article]:
"""ARIN blog via RSS (feed.xml)."""
feeds = [
"https://www.arin.net/blog/feed.xml",
"https://www.arin.net/blog/feed/",
"https://www.arin.net/feed/",
]
for feed_url in feeds:
xml = fetch_url(feed_url)
if xml and len(xml) > 500:
articles = parse_rss(xml, "arin", max_items)
if articles:
return articles[:max_items]
time.sleep(0.5)
return []
# ─── Category → audience mapping ─────────────────────────────────────────────
def category_to_audience(category: str, source: str) -> str:
cat_lower = category.lower()
if any(k in cat_lower for k in ["bgp", "routing", "rpki", "aspa", "irr", "peering"]):
return "network engineers and NOC operators managing BGP and routing infrastructure"
elif any(k in cat_lower for k in ["dns", "dnssec", "resolver"]):
return "network engineers and infrastructure operators managing DNS services"
elif any(k in cat_lower for k in ["security", "ddos", "attack", "vulnerab"]):
return "network security engineers and SOC operators"
elif any(k in cat_lower for k in ["ipv6", "ipv4", "address"]):
return "network architects and engineers planning IP address strategy"
elif any(k in cat_lower for k in ["datacenter", "optical", "transceiver", "400g"]):
return "network engineers and IT professionals who evaluate and operate optical infrastructure"
else:
return "network engineers and infrastructure operators"
# ─── Claude rewrite ───────────────────────────────────────────────────────────
def rewrite_with_claude(
title: str,
raw_text: str,
audience: str,
timeout: int = CLAUDE_TIMEOUT,
) -> str | None:
"""Use Claude CLI to rewrite raw article text in our blog format."""
if len(raw_text.split()) < 100:
return None
# Truncate very long source articles (keep first ~1500 words for context)
words = raw_text.split()
if len(words) > 1500:
raw_text = " ".join(words[:1500]) + "\n\n[Source truncated]"
prompt = (
f"Write a blog post on this topic.\n\n"
f"**Topic:** {title}\n\n"
f"**Target audience:** {audience}\n\n"
f"**Source material for reference (DO NOT COPY — rewrite completely):**\n\n"
f"{raw_text}\n\n"
f"Remember: 7001000 words, hook + technical sections + 3 takeaways. "
f"Stay strictly on-topic. No filler. Start writing now."
)
try:
result = subprocess.run(
["claude", "--print", "--system-prompt", SYSTEM_PROMPT, "-p", prompt],
capture_output=True,
text=True,
timeout=timeout,
)
if result.returncode != 0 or not result.stdout.strip():
logger.warning("Claude returned error: %s", result.stderr[:200])
return None
output = result.stdout.strip()
word_count = len(output.split())
if word_count < 400 or output.startswith("I "):
return None
return output
except subprocess.TimeoutExpired:
logger.warning("Claude timeout for: %s", title[:50])
return None
except Exception as exc:
logger.warning("Claude error: %s", exc)
return None
# ─── Main crawler loop ────────────────────────────────────────────────────────
def load_progress() -> set[str]:
"""Load already-processed URLs."""
if not PROGRESS_FILE.exists():
return set()
try:
data = json.loads(PROGRESS_FILE.read_text())
return set(data.get("done", []))
except Exception:
return set()
def save_progress(done_urls: set[str]) -> None:
PROGRESS_FILE.write_text(json.dumps({"done": list(done_urls)}, ensure_ascii=False))
def crawl_source(
source: str,
articles: list[Article],
done_urls: set[str],
max_articles: int,
dry_run: bool,
) -> tuple[int, int]:
"""Crawl + rewrite articles from one source. Returns (saved, skipped)."""
saved = 0
skipped = 0
count = 0
OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True)
for article in articles:
if count >= max_articles:
break
if article.url in done_urls:
logger.debug("SKIP (done): %s", article.url)
skipped += 1
continue
count += 1
logger.info("[%s] %d/%d: %s", source.upper(), count, min(len(articles), max_articles), article.title[:60])
if dry_run:
print(f" DRY: {article.url}")
continue
# Fetch article HTML
html = fetch_url(article.url)
if not html:
logger.warning(" SKIP (fetch failed)")
done_urls.add(article.url)
skipped += 1
continue
# Extract text
raw_text = extract_article_text(html, source)
if len(raw_text.split()) < 100:
logger.warning(" SKIP (too short: %d words)", len(raw_text.split()))
done_urls.add(article.url)
skipped += 1
continue
# Rewrite with Claude
audience = category_to_audience(article.category, source)
rewritten = rewrite_with_claude(article.title, raw_text, audience)
if not rewritten:
logger.warning(" SKIP (claude failed)")
done_urls.add(article.url)
skipped += 1
continue
word_count = len(rewritten.split())
logger.info(" OK: %dw | %s", word_count, article.title[:50])
record = {
"system_prompt": SYSTEM_PROMPT,
"input_text": (
f"Write a blog post on the following topic:\n\n"
f"**Topic:** {article.title}\n\n"
f"**Target audience:** {audience}\n\n"
f"Remember: 7001000 words, hook + technical sections + 3 takeaways. "
f"Stay strictly on-topic. No filler. Start writing now."
),
"output_text": rewritten,
"meta": {
"title": article.title,
"source_url": article.url,
"source": source,
"category": article.category,
"word_count": word_count,
"quality": "external_rewritten",
"weight": 1.5,
"dataset_version": "v8",
},
}
with open(OUTPUT_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
done_urls.add(article.url)
save_progress(done_urls)
saved += 1
# Rate limiting — be a good citizen
time.sleep(2)
return saved, skipped
# ─── Entry point ──────────────────────────────────────────────────────────────
SOURCES = {
"apnic": fetch_apnic,
"ripe": fetch_ripe_labs,
"potaroo": fetch_potaroo,
"cloudflare": fetch_cloudflare,
# "arin": fetch_arin_blog, # No accessible RSS feed found
}
def main() -> None:
parser = argparse.ArgumentParser(description="Crawl external networking blogs for v8 training data")
parser.add_argument("--source", choices=list(SOURCES.keys()) + ["all"], default="all",
help="Which source(s) to crawl (default: all)")
parser.add_argument("--max", type=int, default=100,
help="Max articles per source (default: 100)")
parser.add_argument("--dry-run", action="store_true",
help="Show URLs without downloading or rewriting")
args = parser.parse_args()
done_urls = load_progress()
logger.info("Resuming: %d URLs already processed", len(done_urls))
active_sources = list(SOURCES.keys()) if args.source == "all" else [args.source]
# Filter only existing sources (arin removed)
active_sources = [s for s in active_sources if s in SOURCES]
total_saved = 0
for source in active_sources:
logger.info("=== Fetching article list: %s ===", source.upper())
try:
articles = SOURCES[source](args.max)
except Exception as exc:
logger.error("Failed to fetch %s articles: %s", source, exc)
continue
if not articles:
logger.warning("No articles found for %s", source)
continue
saved, skipped = crawl_source(source, articles, done_urls, args.max, args.dry_run)
logger.info("%s done: saved=%d skipped=%d", source.upper(), saved, skipped)
total_saved += saved
if not args.dry_run:
total_lines = 0
if OUTPUT_FILE.exists():
with open(OUTPUT_FILE) as f:
total_lines = sum(1 for _ in f)
logger.info("=== DONE: total saved=%d | output total=%d lines ===", total_saved, total_lines)
logger.info("Output: %s", OUTPUT_FILE)
if __name__ == "__main__":
main()