#!/usr/bin/env python3 """ crawl_v8_sources.py — Crawlt externe Quellen für v8 Trainingsdaten Quellen (priorisiert nach technischer Tiefe): 1. APNIC Blog — https://blog.apnic.net/feed/ (400-500 Posts) 2. RIPE Labs — https://labs.ripe.net/feed/blog/ (300-400 Posts) 3. Geoff Huston — https://www.potaroo.net/ispcol/ (500 Artikel) 4. Cloudflare Blog — /tag/networking + /tag/bgp (30-50 Posts) 5. ARIN Blog — https://www.arin.net/blog/ (bonus) Für jede Quelle: - Artikel-URL + Titel + Kategorie extrahieren - Rohtext herunterladen und bereinigen - Claude CLI rewritet → 700-1000w, hook + Sektionen + Takeaways - Als SFT JSONL speichern (weight: 1.5) Output: ~/transceiver-training-data/v8-external-sft.jsonl Usage: python3 scripts/crawl_v8_sources.py # alle Quellen python3 scripts/crawl_v8_sources.py --source apnic # nur APNIC python3 scripts/crawl_v8_sources.py --max 50 # max 50 Artikel python3 scripts/crawl_v8_sources.py --dry-run # nur URLs anzeigen """ from __future__ import annotations import argparse import json import logging import re import subprocess import time import urllib.request import urllib.error from html.parser import HTMLParser from pathlib import Path from html import unescape as html_unescape from typing import NamedTuple logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger(__name__) OUTPUT_FILE = Path.home() / "transceiver-training-data" / "v8-external-sft.jsonl" PROGRESS_FILE = Path.home() / "transceiver-training-data" / "v8-crawl-progress.json" TIMEOUT = 30 CLAUDE_TIMEOUT = 180 USER_AGENT = "Mozilla/5.0 (compatible; research-bot/1.0; training-data-collection)" SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, BGP routing, and network infrastructure. STRICT CONSTRAINTS — Follow exactly, no exceptions: - LENGTH: 700–1000 words. Count carefully. Stop at 1000 words maximum. - STRUCTURE (mandatory, in this order): 1. HOOK paragraph — 2–3 sentences stating the problem this post addresses 2. Technical sections — 3–4 H2 sections covering the topic in depth 3. PRACTICAL TAKEAWAYS — exactly 3 bullet points, actionable - TOPIC DISCIPLINE: Write ONLY about the exact topic in the source material. Zero drift. - NO REPETITION: Every sentence must add new information. No restating. - VOICE: Confident, direct. No hedging phrases like "it's worth noting". - AUDIENCE: Network engineers and IT professionals. Assume technical fluency. - FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms. Do not summarize what you are about to write. Start with the hook directly. Do not copy from the source — rewrite completely in your own words.""" class Article(NamedTuple): title: str url: str source: str category: str # ─── HTML Utilities ─────────────────────────────────────────────────────────── def fetch_url(url: str, timeout: int = TIMEOUT) -> str | None: """Fetch URL, return text or None.""" try: req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) with urllib.request.urlopen(req, timeout=timeout) as resp: charset = resp.headers.get_content_charset() or "utf-8" return resp.read().decode(charset, errors="ignore") except Exception as exc: logger.warning("Fetch failed %s: %s", url, exc) return None def strip_tags(html: str) -> str: """Strip HTML tags → plain text, decode all HTML entities.""" text = re.sub(r']*>.*?', '', html, flags=re.DOTALL) text = re.sub(r']*>.*?', '', text, flags=re.DOTALL) text = re.sub(r'<[^>]+>', ' ', text) text = html_unescape(text) # handles &, ‘, ', etc. text = text.replace('\xa0', ' ') # non-breaking space text = re.sub(r'\s+', ' ', text) return text.strip() def find_balanced_div(html: str, start_pos: int) -> str: """Extract content of a
starting at start_pos, handling nested divs.""" depth = 0 pos = start_pos content_start = html.find('>', start_pos) if content_start < 0: return "" content_start += 1 pos = content_start while pos < len(html): open_m = re.search(r']', html[pos:], re.IGNORECASE) close_m = re.search(r'
', html[pos:], re.IGNORECASE) if not close_m: break open_pos = open_m.start() if open_m else len(html) close_pos = close_m.start() if open_pos < close_pos: depth += 1 pos += open_pos + 4 else: if depth == 0: return html[content_start:pos + close_pos] depth -= 1 pos += close_pos + 6 return "" def find_content_block(html: str, *class_or_id_patterns: str) -> str: """Find a div by class or id pattern (handles any attribute order), return its content.""" for pattern in class_or_id_patterns: # Match
regardless of attribute order m = re.search( rf']+(?:class|id)="[^"]*{re.escape(pattern)}[^"]*"[^>]*>', html, re.IGNORECASE, ) if m: content = find_balanced_div(html, m.start()) if len(content.split()) > 50: return content return "" def extract_article_text(html: str, source: str) -> str: """Extract main content area based on source-specific selectors.""" # Potaroo uses old-school table layout — extract all

paragraphs if source == "potaroo": paragraphs = re.findall(r']*>(.*?)

', html, re.DOTALL | re.IGNORECASE) # Filter: skip short nav-like paragraphs, keep substantive text long_paras = [strip_tags(p) for p in paragraphs if len(strip_tags(p).split()) > 20] if long_paras: return "\n\n".join(long_paras) return "" # Source-specific content div identifiers source_patterns: dict[str, list[str]] = { "apnic": ["entry-content", "article-content", "post-content"], "ripe": ["article-body", "entry-content", "ripe-article"], "cloudflare": ["post-content", "article-content", "entry-content"], "arin": ["entry-content", "post-content", "article-body"], } patterns = source_patterns.get(source, ["entry-content", "article-content"]) # Try div-based extraction first (handles nested divs correctly) raw = find_content_block(html, *patterns) # Fallback:
tag if not raw: m = re.search(r']*>(.*?)
', html, re.DOTALL | re.IGNORECASE) if m: raw = m.group(1) # Fallback:
tag if not raw: m = re.search(r']*>(.*?)
', html, re.DOTALL | re.IGNORECASE) if m: raw = m.group(1) if not raw or len(raw.split()) < 30: return "" # Convert to markdown-ish plain text for level in [3, 2, 1]: raw = re.sub( rf']*>(.*?)', lambda m: f"\n{'#'*level} {strip_tags(m.group(1))}\n", raw, flags=re.DOTALL | re.IGNORECASE ) raw = re.sub(r']*>(.*?)', r'\n- \1', raw, flags=re.DOTALL) raw = re.sub(r']*>', '\n', raw) raw = re.sub(r'

', '\n', raw) raw = re.sub(r'', '\n', raw) raw = re.sub(r'<(?:strong|b)[^>]*>(.*?)', r'**\1**', raw, flags=re.DOTALL) text = strip_tags(raw) text = re.sub(r'\n{3,}', '\n\n', text) text = re.sub(r'[ \t]+', ' ', text) return text.strip() # ─── RSS Parser ─────────────────────────────────────────────────────────────── def parse_rss(xml: str, source: str, max_items: int = 200) -> list[Article]: """Parse RSS/Atom feed → list of Articles.""" articles = [] # Try (RSS 2.0) items = re.findall(r'(.*?)', xml, re.DOTALL) if not items: # Try (Atom) items = re.findall(r'(.*?)', xml, re.DOTALL) for item in items[:max_items]: title_m = re.search(r']*>(?:)?', item, re.DOTALL) link_m = (re.search(r']*/>', item) or re.search(r']*>(.*?)', item, re.DOTALL) or re.search(r']*>(https?://[^<]+)', item)) cat_m = re.search(r']*>(?:)?', item, re.DOTALL) if not title_m or not link_m: continue title = strip_tags(title_m.group(1)).strip() # For self-closing, get href attribute link_raw = link_m.group(0) href_m = re.search(r'href="([^"]+)"', link_raw) or re.search(r'>(https?://[^<]+)<', link_raw) url = href_m.group(1) if href_m else link_m.group(1) if link_m.lastindex else "" url = url.strip() category = strip_tags(cat_m.group(1)).strip() if cat_m else "networking" if title and url.startswith("http"): articles.append(Article(title=title, url=url, source=source, category=category)) logger.info("RSS parsed %d articles from %s", len(articles), source) return articles # ─── Source-specific fetchers ───────────────────────────────────────────────── def fetch_apnic(max_items: int = 200) -> list[Article]: """APNIC Blog via RSS.""" articles = [] # APNIC has paginated RSS - try multiple pages for page in range(1, 6): url = f"https://blog.apnic.net/feed/?paged={page}" if page > 1 else "https://blog.apnic.net/feed/" xml = fetch_url(url) if not xml: break page_articles = parse_rss(xml, "apnic", max_items) articles.extend(page_articles) if len(articles) >= max_items or len(page_articles) < 10: break time.sleep(1) return articles[:max_items] def fetch_ripe_labs(max_items: int = 200) -> list[Article]: """RIPE Labs via RSS (feed.xml).""" feeds = [ "https://labs.ripe.net/feed.xml", "https://labs.ripe.net/feed/", "https://labs.ripe.net/Members/feed/", ] for feed_url in feeds: xml = fetch_url(feed_url) if xml and len(xml) > 500: articles = parse_rss(xml, "ripe", max_items) if articles: return articles[:max_items] time.sleep(1) return [] def fetch_potaroo(max_items: int = 100) -> list[Article]: """Geoff Huston's potaroo.net ISPCOL column index. Articles are linked as relative hrefs like "2026-04/nznog26.html" from https://www.potaroo.net/ispcol/index.html """ base_url = "https://www.potaroo.net/ispcol" html = fetch_url(f"{base_url}/index.html") or fetch_url(f"{base_url}/") if not html: logger.warning("Could not fetch potaroo.net index") return [] articles = [] seen: set[str] = set() # Pattern: href="YYYY-MM/slug.html" followed (nearby) by link text # The index lists articles as: Title for m in re.finditer(r'href="(\d{4}-\d{2}/[^"]+\.html)"[^>]*>([^<]{5,150})<', html, re.IGNORECASE): rel_path, title_raw = m.group(1), m.group(2).strip() url = f"{base_url}/{rel_path}" title = html_unescape(title_raw).strip() if url not in seen and len(title) > 5: seen.add(url) articles.append(Article(title=title, url=url, source="potaroo", category="bgp-routing")) if len(articles) >= max_items: break # Fallback: bare date-path links without visible anchor text if not articles: for m in re.finditer(r'href="(\d{4}-\d{2}/[^"]+\.html)"', html, re.IGNORECASE): rel_path = m.group(1) url = f"{base_url}/{rel_path}" slug = rel_path.split("/")[-1].replace(".html", "").replace("-", " ").title() if url not in seen: seen.add(url) articles.append(Article(title=f"Geoff Huston: {slug}", url=url, source="potaroo", category="bgp-routing")) if len(articles) >= max_items: break logger.info("Potaroo: found %d articles", len(articles)) return articles[:max_items] def fetch_cloudflare(max_items: int = 60) -> list[Article]: """Cloudflare blog — networking and BGP tags.""" articles = [] tags = ["bgp", "routing", "dns", "tcp", "ddos", "ipv6"] for tag in tags: # Cloudflare has a JSON API for blog listing api_url = f"https://blog.cloudflare.com/tag/{tag}/" html = fetch_url(api_url) if not html: continue # Extract article links from blog listing for m in re.finditer( r'href="(/[a-z0-9-]{10,80}/)"[^>]*>.*?]*>([^<]{10,150})', html, re.DOTALL | re.IGNORECASE ): url = f"https://blog.cloudflare.com{m.group(1)}" title = strip_tags(m.group(2)).strip() if url not in [a.url for a in articles] and title: articles.append(Article(title=title, url=url, source="cloudflare", category=tag)) if len(articles) >= max_items: break if len(articles) >= max_items: break time.sleep(1) logger.info("Cloudflare: found %d articles", len(articles)) return articles[:max_items] def fetch_arin_blog(max_items: int = 50) -> list[Article]: """ARIN blog via RSS (feed.xml).""" feeds = [ "https://www.arin.net/blog/feed.xml", "https://www.arin.net/blog/feed/", "https://www.arin.net/feed/", ] for feed_url in feeds: xml = fetch_url(feed_url) if xml and len(xml) > 500: articles = parse_rss(xml, "arin", max_items) if articles: return articles[:max_items] time.sleep(0.5) return [] # ─── Category → audience mapping ───────────────────────────────────────────── def category_to_audience(category: str, source: str) -> str: cat_lower = category.lower() if any(k in cat_lower for k in ["bgp", "routing", "rpki", "aspa", "irr", "peering"]): return "network engineers and NOC operators managing BGP and routing infrastructure" elif any(k in cat_lower for k in ["dns", "dnssec", "resolver"]): return "network engineers and infrastructure operators managing DNS services" elif any(k in cat_lower for k in ["security", "ddos", "attack", "vulnerab"]): return "network security engineers and SOC operators" elif any(k in cat_lower for k in ["ipv6", "ipv4", "address"]): return "network architects and engineers planning IP address strategy" elif any(k in cat_lower for k in ["datacenter", "optical", "transceiver", "400g"]): return "network engineers and IT professionals who evaluate and operate optical infrastructure" else: return "network engineers and infrastructure operators" # ─── Claude rewrite ─────────────────────────────────────────────────────────── def rewrite_with_claude( title: str, raw_text: str, audience: str, timeout: int = CLAUDE_TIMEOUT, ) -> str | None: """Use Claude CLI to rewrite raw article text in our blog format.""" if len(raw_text.split()) < 100: return None # Truncate very long source articles (keep first ~1500 words for context) words = raw_text.split() if len(words) > 1500: raw_text = " ".join(words[:1500]) + "\n\n[Source truncated]" prompt = ( f"Write a blog post on this topic.\n\n" f"**Topic:** {title}\n\n" f"**Target audience:** {audience}\n\n" f"**Source material for reference (DO NOT COPY — rewrite completely):**\n\n" f"{raw_text}\n\n" f"Remember: 700–1000 words, hook + technical sections + 3 takeaways. " f"Stay strictly on-topic. No filler. Start writing now." ) try: result = subprocess.run( ["claude", "--print", "--system-prompt", SYSTEM_PROMPT, "-p", prompt], capture_output=True, text=True, timeout=timeout, ) if result.returncode != 0 or not result.stdout.strip(): logger.warning("Claude returned error: %s", result.stderr[:200]) return None output = result.stdout.strip() word_count = len(output.split()) if word_count < 400 or output.startswith("I "): return None return output except subprocess.TimeoutExpired: logger.warning("Claude timeout for: %s", title[:50]) return None except Exception as exc: logger.warning("Claude error: %s", exc) return None # ─── Main crawler loop ──────────────────────────────────────────────────────── def load_progress() -> set[str]: """Load already-processed URLs.""" if not PROGRESS_FILE.exists(): return set() try: data = json.loads(PROGRESS_FILE.read_text()) return set(data.get("done", [])) except Exception: return set() def save_progress(done_urls: set[str]) -> None: PROGRESS_FILE.write_text(json.dumps({"done": list(done_urls)}, ensure_ascii=False)) def crawl_source( source: str, articles: list[Article], done_urls: set[str], max_articles: int, dry_run: bool, ) -> tuple[int, int]: """Crawl + rewrite articles from one source. Returns (saved, skipped).""" saved = 0 skipped = 0 count = 0 OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) for article in articles: if count >= max_articles: break if article.url in done_urls: logger.debug("SKIP (done): %s", article.url) skipped += 1 continue count += 1 logger.info("[%s] %d/%d: %s", source.upper(), count, min(len(articles), max_articles), article.title[:60]) if dry_run: print(f" DRY: {article.url}") continue # Fetch article HTML html = fetch_url(article.url) if not html: logger.warning(" SKIP (fetch failed)") done_urls.add(article.url) skipped += 1 continue # Extract text raw_text = extract_article_text(html, source) if len(raw_text.split()) < 100: logger.warning(" SKIP (too short: %d words)", len(raw_text.split())) done_urls.add(article.url) skipped += 1 continue # Rewrite with Claude audience = category_to_audience(article.category, source) rewritten = rewrite_with_claude(article.title, raw_text, audience) if not rewritten: logger.warning(" SKIP (claude failed)") done_urls.add(article.url) skipped += 1 continue word_count = len(rewritten.split()) logger.info(" OK: %dw | %s", word_count, article.title[:50]) record = { "system_prompt": SYSTEM_PROMPT, "input_text": ( f"Write a blog post on the following topic:\n\n" f"**Topic:** {article.title}\n\n" f"**Target audience:** {audience}\n\n" f"Remember: 700–1000 words, hook + technical sections + 3 takeaways. " f"Stay strictly on-topic. No filler. Start writing now." ), "output_text": rewritten, "meta": { "title": article.title, "source_url": article.url, "source": source, "category": article.category, "word_count": word_count, "quality": "external_rewritten", "weight": 1.5, "dataset_version": "v8", }, } with open(OUTPUT_FILE, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") done_urls.add(article.url) save_progress(done_urls) saved += 1 # Rate limiting — be a good citizen time.sleep(2) return saved, skipped # ─── Entry point ────────────────────────────────────────────────────────────── SOURCES = { "apnic": fetch_apnic, "ripe": fetch_ripe_labs, "potaroo": fetch_potaroo, "cloudflare": fetch_cloudflare, # "arin": fetch_arin_blog, # No accessible RSS feed found } def main() -> None: parser = argparse.ArgumentParser(description="Crawl external networking blogs for v8 training data") parser.add_argument("--source", choices=list(SOURCES.keys()) + ["all"], default="all", help="Which source(s) to crawl (default: all)") parser.add_argument("--max", type=int, default=100, help="Max articles per source (default: 100)") parser.add_argument("--dry-run", action="store_true", help="Show URLs without downloading or rewriting") args = parser.parse_args() done_urls = load_progress() logger.info("Resuming: %d URLs already processed", len(done_urls)) active_sources = list(SOURCES.keys()) if args.source == "all" else [args.source] # Filter only existing sources (arin removed) active_sources = [s for s in active_sources if s in SOURCES] total_saved = 0 for source in active_sources: logger.info("=== Fetching article list: %s ===", source.upper()) try: articles = SOURCES[source](args.max) except Exception as exc: logger.error("Failed to fetch %s articles: %s", source, exc) continue if not articles: logger.warning("No articles found for %s", source) continue saved, skipped = crawl_source(source, articles, done_urls, args.max, args.dry_run) logger.info("%s done: saved=%d skipped=%d", source.upper(), saved, skipped) total_saved += saved if not args.dry_run: total_lines = 0 if OUTPUT_FILE.exists(): with open(OUTPUT_FILE) as f: total_lines = sum(1 for _ in f) logger.info("=== DONE: total saved=%d | output total=%d lines ===", total_saved, total_lines) logger.info("Output: %s", OUTPUT_FILE) if __name__ == "__main__": main()