#!/usr/bin/env python3 """ ingest_ripe_apnic.py — Convert raw RIPE/APNIC/NANOG content into v7 training format Reads raw content JSONL files (from previous collection sessions and Fearghas NAS) and converts them into proper SFT training examples with the anchored v7 system prompt. For each raw content item: 1. Uses Claude subprocess to expand the raw summary into a full 700-1000w blog post 2. Saves in {system_prompt, input_text, output_text} format Input files (auto-discovered): ~/transceiver-training-data/nanog-ripe-labs-content.jsonl — 34 entries ~/transceiver-training-data/rir-infrastructure-data.jsonl — 150 entries [Fearghas NAS mount]/training-data/*.jsonl — (when mounted) Output: ~/transceiver-training-data/v7-ripe-apnic-sft.jsonl Usage: python3 scripts/ingest_ripe_apnic.py python3 scripts/ingest_ripe_apnic.py --nas-path /Volumes/Fearghas/training-data python3 scripts/ingest_ripe_apnic.py --dry-run """ from __future__ import annotations import argparse import json import logging import subprocess import time from pathlib import Path logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger(__name__) # ─── Paths ──────────────────────────────────────────────────────────────────── TRAINING_DATA_DIR = Path.home() / "transceiver-training-data" OUTPUT_FILE = TRAINING_DATA_DIR / "v7-ripe-apnic-sft.jsonl" PROGRESS_FILE = TRAINING_DATA_DIR / "v7-ripe-apnic-progress.json" LOCAL_RAW_FILES = [ TRAINING_DATA_DIR / "nanog-ripe-labs-content.jsonl", TRAINING_DATA_DIR / "rir-infrastructure-data.jsonl", ] # ─── System prompt (same anchored v7 prompt) ────────────────────────────────── SYSTEM_PROMPT = """You are an expert technical writer specializing in optical networking, transceiver technology, and network infrastructure. STRICT CONSTRAINTS — Follow exactly, no exceptions: - LENGTH: 700–1000 words. Count carefully. Stop at 1000 words maximum. - STRUCTURE (mandatory, in this order): 1. HOOK paragraph — 2–3 sentences stating the problem this post addresses 2. Technical sections — 3–4 H2 sections covering the topic in depth 3. PRACTICAL TAKEAWAYS — exactly 3 bullet points, actionable - TOPIC DISCIPLINE: Write ONLY about the exact topic requested. Zero drift. - NO REPETITION: Every sentence must add new information. No restating. - VOICE: Confident, direct. No hedging phrases like "it's worth noting". - AUDIENCE: Network engineers and IT professionals. Assume technical fluency. - FORMAT: Markdown. Use ## for section headers. Use **bold** for key terms. Do not summarize what you are about to write. Start with the hook directly.""" def build_expansion_prompt(raw: dict) -> tuple[str, str] | None: """ Build a user prompt to expand raw content into a full blog post. Returns (input_text_for_training, expansion_prompt_for_claude) or None if skipped. """ # Support both raw content schemas title = raw.get("title", "").strip() content = raw.get("content", "").strip() category = raw.get("category", raw.get("rir", "networking")).strip() source = raw.get("source", raw.get("rir", "unknown")).strip() if not title or not content or len(content) < 50: return None # Derive audience from source/category audience = "network engineers and infrastructure operators" if any(k in category.lower() for k in ["ipv6", "bgp", "rpki", "routing", "rir", "dns"]): audience = "network engineers and NOC operators" elif any(k in category.lower() for k in ["market", "business", "vendor", "price"]): audience = "IT decision makers, procurement, and network architects" input_text = ( f"Write a blog post on the following topic:\n\n" f"**Topic:** {title}\n\n" f"**Target audience:** {audience}\n\n" f"Remember: 700–1000 words, hook + technical sections + 3 takeaways. " f"Stay strictly on-topic. No filler. Start writing now." ) # The expansion prompt also includes the raw content as context for Claude expansion_prompt = ( f"Write a blog post on the following topic:\n\n" f"**Topic:** {title}\n\n" f"**Target audience:** {audience}\n\n" f"**Background context (do NOT copy verbatim, use as technical grounding):**\n" f"{content[:800]}\n\n" f"Remember: 700–1000 words, hook + technical sections + 3 takeaways. " f"Stay strictly on-topic. No filler. Start writing now." ) return input_text, expansion_prompt def call_claude(system: str, user_prompt: str, timeout: int = 180) -> str | None: """Call claude --print subprocess.""" try: result = subprocess.run( ["claude", "--print", "--system-prompt", system, "-p", user_prompt], capture_output=True, text=True, timeout=timeout, ) if result.returncode != 0: logger.warning("claude error (rc=%d): %s", result.returncode, result.stderr[:200]) return None output = result.stdout.strip() return output if output else None except subprocess.TimeoutExpired: logger.warning("claude timed out") return None except Exception as exc: logger.warning("claude error: %s", exc) return None def load_raw_from_file(path: Path) -> list[dict]: """Load all entries from a raw JSONL file.""" if not path.exists(): logger.warning("File not found: %s", path) return [] items = [] with open(path, encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: items.append(json.loads(line)) except json.JSONDecodeError: pass logger.info("Loaded %d entries from %s", len(items), path.name) return items def load_raw_from_dir(nas_path: Path) -> list[dict]: """ Load JSON/JSONL files from a NAS directory (tashi-crawler format). Handles: - tashi-crawler/2026-03-06/ripe/ripe_data.json → RIPE ASN/RIR data - tashi-crawler/2026-03-06/nog/nog_all.json → NOG community data - tashi-crawler/2026-03-06/bgp/bgp_*.json → BGP hourly snapshots - Any JSONL in training format (pass-through) """ items: list[dict] = [] # ── tashi-crawler detection ──────────────────────────────────────────── crawler_dirs = list(nas_path.glob("tashi-crawler/**")) if crawler_dirs or (nas_path / "ripe").exists() or (nas_path / "nog").exists(): items.extend(_load_tashi_crawler(nas_path)) return items # ── Generic JSONL pass-through ───────────────────────────────────────── jsonl_files = list(nas_path.glob("**/*.jsonl")) logger.info("Found %d JSONL files on NAS at %s", len(jsonl_files), nas_path) for fpath in jsonl_files: logger.info("Loading NAS file: %s", fpath.name) raw = load_raw_from_file(fpath) for item in raw: if "system_prompt" in item and "input_text" in item and "output_text" in item: if item.get("output_text") and len(item["output_text"].split()) > 100: item["system_prompt"] = SYSTEM_PROMPT item["meta"] = {**item.get("meta", {}), "source": "fearghas-nas", "dataset_version": "v7"} items.append({"_direct": True, **item}) continue if not item.get("title") and not item.get("name"): continue if not item.get("title"): item["title"] = item.get("name", "") if not item.get("content"): item["content"] = item.get("body", item.get("text", item.get("summary", ""))) items.append(item) return items def _load_tashi_crawler(base_path: Path) -> list[dict]: """Extract training topics from tashi-crawler data on Fearghas NAS.""" items: list[dict] = [] # Walk from base_path — tashi-crawler may be at base_path or a subdir crawler_root = base_path if (base_path / "tashi-crawler").exists(): # Find the latest date directory date_dirs = sorted((base_path / "tashi-crawler").glob("*/")) crawler_root = date_dirs[-1] if date_dirs else base_path / "tashi-crawler" logger.info("Loading tashi-crawler data from: %s", crawler_root) # ── RIPE data → ASN and RIR training topics ─────────────────────────── ripe_file = crawler_root / "ripe" / "ripe_data.json" if ripe_file.exists(): logger.info("Processing RIPE data: %s", ripe_file) try: with open(ripe_file, encoding="utf-8") as f: ripe = json.load(f) # Generate topics from sample ASNs sample_asns = ripe.get("sample_asns", []) for asn_data in sample_asns: overview = asn_data.get("overview", {}) prefixes = asn_data.get("prefixes", {}) asn = overview.get("asn", "") holder = overview.get("holder", "") if not asn or not holder: continue prefix_count = prefixes.get("prefix_count", 0) items.append({ "title": f"RIPE NCC and the Internet Routing Registry: ASN Allocation and BGP Routing", "content": ( f"Real routing data from {asn} ({holder}): " f"{prefix_count} announced prefixes. " f"RIPE NCC manages IP address allocation and AS number registration " f"for Europe, the Middle East, and Central Asia. " f"Understanding how ASNs are allocated and how BGP routing works is " f"fundamental to network operations." ), "category": "ripe-rir", "source": "fearghas-tashi-ripe", }) # RIR summary → generate blog topics per RIR rir_summary = ripe.get("rir_delegated_summary", {}) for rir_name, rir_data in rir_summary.items(): if not isinstance(rir_data, dict): continue items.append({ "title": f"{rir_name} Internet Resource Delegation: IPv4, IPv6, and ASN Statistics", "content": ( f"{rir_name} is one of the five Regional Internet Registries. " f"RIR delegation data shows the distribution of IP addresses and AS numbers " f"across the registry's service region. " f"This data is critical for understanding internet growth, IPv4 exhaustion, " f"and the pace of IPv6 adoption in each region." ), "category": "rir-statistics", "source": "fearghas-tashi-ripe", }) logger.info("RIPE: extracted %d topics", len(items)) except Exception as exc: logger.warning("RIPE data parse error: %s", exc) # ── NOG data → NOG community training topics ────────────────────────── nog_file = crawler_root / "nog" / "nog_all.json" if nog_file.exists(): logger.info("Processing NOG data: %s", nog_file) try: with open(nog_file, encoding="utf-8") as f: nog_all = json.load(f) results = nog_all.get("results", []) for nog in results: nog_name = nog.get("nog", "") region = nog.get("region", "") url = nog.get("url", "") events = nog.get("events", []) plinks = nog.get("presentation_links", []) if not nog_name: continue content = ( f"{nog_name} is the Network Operators Group for {region}. " f"NOGs are professional communities where network engineers share operational " f"knowledge, present technical case studies, and discuss emerging challenges. " f"{nog_name} meets {len(events) if events else 'periodically'} times per year " f"and features presentations on routing, security, optical networking, and more." ) items.append({ "title": f"{nog_name}: The Network Operators Group for {region}", "content": content, "category": "nog-community", "source": "fearghas-tashi-nog", }) logger.info("NOG: extracted %d topics", len(results)) except Exception as exc: logger.warning("NOG data parse error: %s", exc) # ── BGP snapshots → BGP operational topics ──────────────────────────── bgp_files = sorted(crawler_root.glob("bgp/bgp_*.json")) if bgp_files: logger.info("Processing %d BGP snapshot files", len(bgp_files)) # Use first file to extract structural context (they're all similar) try: with open(bgp_files[0], encoding="utf-8") as f: bgp = json.load(f) summary = bgp.get("summary", {}) items.append({ "title": "RouteViews and RIPE RIS: How BGP Routing Tables Are Monitored", "content": ( f"RouteViews and RIPE RIS collect BGP routing tables from collectors worldwide. " f"These systems monitor {summary.get('collectors_queried', 'multiple')} collectors " f"with {summary.get('total_update_files_listed', 'thousands of')} update files daily. " f"This data is essential for routing security research, anomaly detection, " f"and understanding global internet routing behavior." ), "category": "bgp-monitoring", "source": "fearghas-tashi-bgp", }) items.append({ "title": "BGP Table Size and Internet Growth: What the Data Tells Us", "content": ( "The global BGP routing table has grown steadily since the early days of the internet. " "Route collectors at RouteViews and RIPE RIS track MRT dumps and UPDATE messages " "in real-time. Understanding BGP table growth is critical for router memory planning, " "route reflector design, and understanding the impact of IPv4 deaggregation." ), "category": "bgp-routing-table", "source": "fearghas-tashi-bgp", }) except Exception as exc: logger.warning("BGP data parse error: %s", exc) logger.info("tashi-crawler total topics extracted: %d", len(items)) return items def load_progress() -> set[str]: if not PROGRESS_FILE.exists(): return set() try: with open(PROGRESS_FILE) as f: return set(json.load(f).get("completed", [])) except Exception: return set() def save_progress(completed: set[str]) -> None: with open(PROGRESS_FILE, "w") as f: json.dump({"completed": sorted(completed)}, f) def make_key(item: dict, idx: int) -> str: title = item.get("title", "") return f"{idx}:{title[:60]}" def process(nas_path: Path | None = None, dry_run: bool = False) -> None: TRAINING_DATA_DIR.mkdir(parents=True, exist_ok=True) # Collect raw items all_raw: list[dict] = [] for fpath in LOCAL_RAW_FILES: all_raw.extend(load_raw_from_file(fpath)) if nas_path and nas_path.exists(): logger.info("Loading NAS data from: %s", nas_path) all_raw.extend(load_raw_from_dir(nas_path)) elif nas_path: logger.warning("NAS path not accessible: %s — skipping", nas_path) logger.info("Total raw items to process: %d", len(all_raw)) if dry_run: print(f"\nDRY RUN: {len(all_raw)} items found") for i, item in enumerate(all_raw[:10]): print(f" [{i:03d}] {'[DIRECT]' if item.get('_direct') else '[EXPAND]'} {item.get('title', '?')[:70]}") if len(all_raw) > 10: print(f" ... and {len(all_raw) - 10} more") return completed = load_progress() stats = {"direct": 0, "expanded": 0, "skipped": 0, "failed": 0} with open(OUTPUT_FILE, "a", encoding="utf-8") as out_f: for idx, item in enumerate(all_raw): key = make_key(item, idx) if key in completed: stats["skipped"] += 1 continue # Direct pass-through (already formatted, from NAS) if item.get("_direct"): record = {k: v for k, v in item.items() if k != "_direct"} out_f.write(json.dumps(record, ensure_ascii=False) + "\n") out_f.flush() completed.add(key) save_progress(completed) stats["direct"] += 1 logger.info("[%03d] DIRECT: %s", idx, item.get("title", "")[:60]) continue # Expand raw content via Claude result = build_expansion_prompt(item) if result is None: logger.info("[%03d] SKIP (no usable content): %s", idx, item.get("title", "")[:60]) stats["skipped"] += 1 completed.add(key) save_progress(completed) continue input_text, expansion_prompt = result logger.info("[%03d] Expanding: %s", idx, item.get("title", "")[:60]) output_text = call_claude(SYSTEM_PROMPT, expansion_prompt) if output_text is None: logger.warning("[%03d] FAILED: %s", idx, item.get("title", "")[:60]) stats["failed"] += 1 time.sleep(5) continue word_count = len(output_text.split()) logger.info("[%03d] OK: %d words", idx, word_count) record = { "system_prompt": SYSTEM_PROMPT, "input_text": input_text, "output_text": output_text, "meta": { "title": item.get("title", ""), "source": item.get("source", item.get("rir", "ripe-apnic")), "category": item.get("category", ""), "word_count": word_count, "raw_content_len": len(item.get("content", "")), "generated_by": "claude-code-subprocess", "dataset_version": "v7", }, } out_f.write(json.dumps(record, ensure_ascii=False) + "\n") out_f.flush() completed.add(key) save_progress(completed) stats["expanded"] += 1 time.sleep(2) logger.info("Done: direct=%d expanded=%d skipped=%d failed=%d", stats["direct"], stats["expanded"], stats["skipped"], stats["failed"]) logger.info("Output: %s", OUTPUT_FILE) def main() -> None: parser = argparse.ArgumentParser(description="Ingest RIPE/APNIC raw content into v7 training format") parser.add_argument("--nas-path", type=Path, default=None, help="Path to Fearghas NAS training data directory (e.g. /Volumes/Fearghas/training-data)") parser.add_argument("--dry-run", action="store_true", help="List items without generating") args = parser.parse_args() process(nas_path=args.nas_path, dry_run=args.dry_run) if __name__ == "__main__": main()