#!/usr/bin/env python3 """ PeerCortex Daily Accuracy Audit ================================ Runs at midnight via cron, audits a rotating batch of ASNs, and tracks accuracy over time. Compares PeerCortex data against authoritative sources: - RIPE Stat (prefixes, neighbours) - PeeringDB (IX presence, facilities) Registry file: /opt/peercortex-app/audit/asn_registry.json Reports dir: /opt/peercortex-app/audit/reports/YYYY-MM-DD.json Latest text: /opt/peercortex-app/audit/latest_report.txt """ import json, os, sys, time, datetime, urllib.request, urllib.error from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path # ─── Directories ────────────────────────────────────────────────────────────── AUDIT_DIR = Path("/opt/peercortex-app/audit") REGISTRY = AUDIT_DIR / "asn_registry.json" REPORTS_DIR = AUDIT_DIR / "reports" LATEST_TXT = AUDIT_DIR / "latest_report.txt" LOG_FILE = AUDIT_DIR / "audit.log" # ─── Load .env (for cron compatibility — env vars may not be inherited) ─────── def _load_dotenv(): env_path = Path("/opt/peercortex-app/.env") if not env_path.exists(): return for line in env_path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#"): continue idx = line.find("=") if idx < 1: continue key = line[:idx].strip() val = line[idx + 1:].strip().strip('"').strip("'") if key not in os.environ: os.environ[key] = val _load_dotenv() # ─── Config ─────────────────────────────────────────────────────────────────── PEERINGDB_KEY = os.environ.get("PEERINGDB_API_KEY", "") PEERCORTEX_URL = "http://localhost:3101" # local — no Cloudflare overhead PDB_BASE = "https://www.peeringdb.com/api" RIPE_BASE = "https://stat.ripe.net/data" BATCH_SIZE = 100 TIMEOUT_PC = 90 # large networks (Cloudflare, Amazon) can take 60–80s TIMEOUT_AUTH = 30 CONCURRENCY = 3 # parallel PeerCortex requests — keep low to avoid PDB hammering # (each PC lookup triggers 3+ PDB calls internally) # Tolerance for prefix/neighbour counts (BGP timing differences are normal) PREFIX_TOL_PCT = 0.05 # 5% PREFIX_TOL_ABS = 2 # absolute ±2 NEIGHBOUR_TOL = 0.25 # 25% NEIGHBOUR_ABS = 5 # ─── Seed ASN list (100 well-known networks) ───────────────────────────────── # Format: (asn, label) — label shown in reports, no functional effect SEED_ASNS = [ # Tier-1 / Global backbones 174, 1239, 1299, 2914, 3257, 3320, 3356, 5511, 6461, 6762, 7018, 9002, 12956, # Hyperscalers 714, 8075, 13335, 13414, 15169, 16509, 20940, 32934, 36459, 46489, # IXP / Route servers 6777, 6939, 8283, # Regional ISPs 6830, 3491, 2516, 4134, 4637, 4755, 4766, 9304, 9318, 7473, # Hobbyist / community 34927, 50869, 59947, 199121, 206924, 211982, 212635, 213279, 215638, # European operators 42476, 47541, 48821, 60610, 61955, 206479, 207841, 212232, # APAC 4826, 7575, 7738, 9790, 17469, 17676, 23693, 24516, 38001, 38195, 45090, 45177, 55720, 55803, 56041, 131072, 132602, # Americas 10429, 22085, 27947, 28006, 52320, 61832, 265702, 267613, 269608, # Africa 8346, 36874, 36924, 37100, 37239, 37271, 37468, 37662, 327786, 328474, # Middle East / Other 135377, 140627, 394695, 397213, 400304, 401307, # Special / edge cases 1, 64512, 65000, 0, 4294967295, ] # ─── HTTP helpers ───────────────────────────────────────────────────────────── def _fetch(url, timeout=30, headers=None): """GET url → parsed JSON dict, or None on any error.""" try: req = urllib.request.Request(url, headers=headers or {}) with urllib.request.urlopen(req, timeout=timeout) as r: if r.status == 429: return None return json.loads(r.read().decode("utf-8", errors="replace")) except Exception: return None def _fetch_pdb(path, timeout=30, retries=2): """Fetch PeeringDB with API key and retry on 429 / failures.""" headers = {} if PEERINGDB_KEY: headers["Authorization"] = "Api-Key " + PEERINGDB_KEY url = PDB_BASE + path for attempt in range(retries + 1): result = _fetch(url, timeout=timeout, headers=headers) if result is not None: return result if attempt < retries: time.sleep(1.5 * (attempt + 1)) # 1.5s, 3s backoff return None def _fetch_ripe(endpoint, asn, timeout=30): url = f"{RIPE_BASE}/{endpoint}/data.json?resource=AS{asn}" return _fetch(url, timeout=timeout) def _fetch_pc(asn, timeout=90): return _fetch(f"{PEERCORTEX_URL}/api/lookup?asn={asn}", timeout=timeout) # ─── Registry helpers ───────────────────────────────────────────────────────── def _load_registry(): if REGISTRY.exists(): try: return json.loads(REGISTRY.read_text()) except Exception: pass return {"asns": {}, "meta": {"created": _today(), "total_runs": 0}} def _save_registry(reg): REGISTRY.write_text(json.dumps(reg, indent=2)) def _today(): return datetime.date.today().isoformat() def _now_iso(): return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") # ─── Batch selection (priority: errors > never audited > oldest) ────────────── def _select_batch(reg, batch_size): entries = reg["asns"] # Ensure all seed ASNs are tracked for asn in SEED_ASNS: k = str(asn) if k not in entries: entries[k] = { "last_audited": None, "pass_count": 0, "error_count": 0, "consecutive_errors": 0, "peeringdb_absent": False, } # Sort into three buckets errored = sorted( [k for k, v in entries.items() if v.get("consecutive_errors", 0) > 0], key=lambda k: entries[k].get("consecutive_errors", 0), reverse=True, ) never = [k for k, v in entries.items() if not v.get("last_audited") and k not in errored] audited = sorted( [k for k, v in entries.items() if v.get("last_audited") and k not in errored], key=lambda k: entries[k].get("last_audited", "9999"), ) ordered = errored + never + audited return [int(k) for k in ordered[:batch_size]] # ─── Authoritative data fetch ───────────────────────────────────────────────── def _fetch_auth(asn): """Fetch authoritative data for one ASN from RIPE Stat + PeeringDB.""" # PeeringDB net lookup first (need net_id for IX/fac queries). # IMPORTANT: distinguish between: # pdb_net is None → fetch FAILED (rate limit / timeout) → pdb_present=unknown # pdb_net["data"] == [] → ASN genuinely NOT in PeeringDB # pdb_net["data"][0].id → ASN IS in PeeringDB, use net_id pdb_net = _fetch_pdb(f"/net?asn={asn}", timeout=TIMEOUT_AUTH) pdb_fetch_ok = pdb_net is not None net = ((pdb_net or {}).get("data") or [{}])[0] net_id = net.get("id") # RIPE Stat (run in parallel via threads) from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(max_workers=4) as pool: f_pfx = pool.submit(_fetch_ripe, "announced-prefixes", asn, TIMEOUT_AUTH) f_nb = pool.submit(_fetch_ripe, "asn-neighbours", asn, TIMEOUT_AUTH) f_ix = pool.submit( _fetch_pdb, (f"/netixlan?net_id={net_id}&limit=1000" if net_id else f"/netixlan?asn={asn}&limit=1000"), TIMEOUT_AUTH, ) f_fac = pool.submit( _fetch_pdb, f"/netfac?net_id={net_id}&limit=1000", TIMEOUT_AUTH, ) if net_id else None ripe_pfx = f_pfx.result() ripe_nb = f_nb.result() pdb_ix = f_ix.result() pdb_fac = f_fac.result() if f_fac else None prefixes = (ripe_pfx or {}).get("data", {}).get("prefixes", []) v4 = sum(1 for p in prefixes if ":" not in p.get("prefix", "")) v6 = sum(1 for p in prefixes if ":" in p.get("prefix", "")) neighbours = (ripe_nb or {}).get("data", {}).get("neighbours", []) up = sum(1 for n in neighbours if n.get("type") == "left") dn = sum(1 for n in neighbours if n.get("type") == "right") ix_list = (pdb_ix or {}).get("data", []) ix_unique = len(set(c.get("ix_id") for c in ix_list if c.get("ix_id"))) fac_list = (pdb_fac or {}).get("data", []) if pdb_fac else [] fac_count = len(fac_list) return { "pdb_id": net_id, # pdb_present: True only when we CONFIRMED the ASN is in PeeringDB (got data[0].id). # False only when PeeringDB confirmed the ASN is NOT in PDB (data=[]). # None when fetch failed — IX/fac discrepancies are not counted as PC errors. "pdb_present": True if net_id else (False if pdb_fetch_ok else None), "v4": v4, "v6": v6, "ix": ix_unique, "fac": fac_count, "up": up, "dn": dn, "ripe_ok": ripe_pfx is not None, "pdb_ok": pdb_fetch_ok, } # ─── Field comparison ───────────────────────────────────────────────────────── def _ok(auth_val, pc_val, pct=PREFIX_TOL_PCT, abs_tol=PREFIX_TOL_ABS): """True if pc_val is within tolerance of auth_val.""" if auth_val is None or pc_val is None: return True # cannot compare — treat as OK if auth_val == 0 and pc_val == 0: return True if auth_val == 0: return pc_val <= abs_tol diff = abs(auth_val - pc_val) return diff <= abs_tol or (diff / auth_val) <= pct def _compare(asn, auth, pc): """Return list of failure dicts for this ASN.""" if pc is None: return [{"field": "TIMEOUT", "auth": None, "pc": None, "delta": None}] failures = [] pdb_absent = not auth["pdb_present"] pc_v4 = (pc.get("prefixes") or {}).get("ipv4") pc_v6 = (pc.get("prefixes") or {}).get("ipv6") pc_ix = (pc.get("ix_presence") or {}).get("unique_ixps") pc_fac = (pc.get("facilities") or {}).get("total") pc_up = (pc.get("neighbours") or {}).get("upstream_count") pc_dn = (pc.get("neighbours") or {}).get("downstream_count") if not _ok(auth["v4"], pc_v4): failures.append({"field": "Prefixes v4", "auth": auth["v4"], "pc": pc_v4, "delta": abs(auth["v4"] - (pc_v4 or 0))}) if not _ok(auth["v6"], pc_v6): failures.append({"field": "Prefixes v6", "auth": auth["v6"], "pc": pc_v6, "delta": abs(auth["v6"] - (pc_v6 or 0))}) # IXP / facility — only check when auth CONFIRMED ASN is in PeeringDB. # pdb_present=None means our fetch failed → skip to avoid false positives. if auth["pdb_present"] is True: if auth["ix"] != pc_ix: failures.append({"field": "IXPs", "auth": auth["ix"], "pc": pc_ix, "delta": abs(auth["ix"] - (pc_ix or 0))}) if auth["fac"] != pc_fac: failures.append({"field": "Facilities", "auth": auth["fac"], "pc": pc_fac, "delta": abs(auth["fac"] - (pc_fac or 0))}) if not _ok(auth["up"], pc_up, pct=NEIGHBOUR_TOL, abs_tol=NEIGHBOUR_ABS): failures.append({"field": "Neighbours (upstream)", "auth": auth["up"], "pc": pc_up, "delta": abs(auth["up"] - (pc_up or 0))}) if not _ok(auth["dn"], pc_dn, pct=NEIGHBOUR_TOL, abs_tol=NEIGHBOUR_ABS): failures.append({"field": "Neighbours (downstream)", "auth": auth["dn"], "pc": pc_dn, "delta": abs(auth["dn"] - (pc_dn or 0))}) return failures # ─── Audit one ASN ─────────────────────────────────────────────────────────── def _audit_asn(asn): auth = _fetch_auth(asn) pc = _fetch_pc(asn, timeout=TIMEOUT_PC) # Self-heal attempt: if PeerCortex returned data but looks stale, # wait briefly and retry once (cache TTL is 5 min, but a 2nd hit # ensures the process is alive and data is fresh) if pc is None: time.sleep(2) pc = _fetch_pc(asn, timeout=TIMEOUT_PC) failures = _compare(asn, auth, pc) # pdb_present=False → confirmed NOT in PDB → "[no PDB — correct]" # pdb_present=None → fetch failed → "[PDB fetch failed]" # pdb_present=True → confirmed in PDB pdb_state = auth["pdb_present"] return { "asn": asn, "auth": auth, "pc_name": ((pc or {}).get("network") or {}).get("name", ""), "pc_ok": pc is not None, "pdb_absent": pdb_state is False, # confirmed not in PDB "pdb_unknown": pdb_state is None, # fetch error — skip IX/fac check "failures": failures, "passed": len(failures) == 0, } # ─── Known issue tracking ──────────────────────────────────────────────────── def _issue_description(field, auth_val, pc_val): """Generate a human-readable description of the data discrepancy.""" if field == "TIMEOUT": return "PeerCortex did not respond within timeout — server may be overloaded" if auth_val is not None and pc_val is not None: if auth_val > 0 and (pc_val or 0) == 0: return ( f"PeerCortex returns 0 but authoritative source shows {auth_val}. " f"Likely cause: PeeringDB lookup failing in server.js for this ASN " f"(net_id resolution or netixlan/netfac query failing)." ) if (auth_val or 0) == 0 and pc_val > 0: return ( f"PeerCortex returns {pc_val} but authoritative source shows 0. " f"Likely cause: PeerCortex using stale cached data or querying wrong endpoint." ) delta = abs(auth_val - pc_val) pct = round(delta / max(auth_val, 1) * 100) return ( f"Mismatch: PeerCortex={pc_val}, authoritative={auth_val} " f"(delta={delta}, {pct}% off). Exceeds tolerance." ) return f"Comparison not possible (auth={auth_val}, pc={pc_val})" def _update_known_issues(entry, failures, date): """ Update the known_issues dict for an ASN registry entry. A known issue is created when the same field fails in 2+ consecutive runs. It stays open (status='open') until the field passes in a future run. When the ASN fully passes, all known_issues are cleared in the caller. """ if not failures: return # Only promote to known_issue after 2+ consecutive failures # (consecutive_errors was already incremented by caller) if entry.get("consecutive_errors", 0) < 2: return known = entry.setdefault("known_issues", {}) failing_fields = {f["field"] for f in failures if f.get("field") not in ("TIMEOUT", "EXCEPTION")} for f in failures: field = f.get("field") if not field or field in ("TIMEOUT", "EXCEPTION"): continue if field in known: # Update existing issue known[field]["last_seen"] = date known[field]["occurrences"] = known[field].get("occurrences", 1) + 1 known[field]["last_auth"] = f.get("auth") known[field]["last_pc"] = f.get("pc") known[field]["description"] = _issue_description(field, f.get("auth"), f.get("pc")) else: # Create new known issue known[field] = { "field": field, "first_seen": date, "last_seen": date, "occurrences": 1, "status": "open", "last_auth": f.get("auth"), "last_pc": f.get("pc"), "description": _issue_description(field, f.get("auth"), f.get("pc")), } # Clear issues for fields that are now passing (partial fix) for field in list(known.keys()): if field not in failing_fields: known[field]["status"] = "resolved" known[field]["fixed_on"] = date # ─── Main ───────────────────────────────────────────────────────────────────── def main(): REPORTS_DIR.mkdir(parents=True, exist_ok=True) reg = _load_registry() date = _today() run_ts = _now_iso() prev_accuracy = reg["meta"].get("last_accuracy_pct") batch = _select_batch(reg, BATCH_SIZE) header = ( f"\n{'='*60}\n" f"PeerCortex Daily Audit — {date} ({run_ts})\n" f"{'='*60}\n" f"Batch: {len(batch)} ASNs | " f"PDB key: {'ACTIVE' if PEERINGDB_KEY else 'MISSING — rate limits likely!'}\n" ) print(header) results = [] with ThreadPoolExecutor(max_workers=CONCURRENCY) as pool: # Stagger submissions by 2s so PeerCortex's internal PDB requests # don't all fire simultaneously (9+ concurrent PDB calls → rate limit). futures = {} for idx, asn in enumerate(batch): if idx > 0: time.sleep(2) futures[pool.submit(_audit_asn, asn)] = asn for i, future in enumerate(as_completed(futures), 1): asn = futures[future] try: r = future.result() results.append(r) status = "✓" if r["passed"] else f"✗ {len(r['failures'])}" if r.get("pdb_absent"): pdb_note = " [no PDB — correct]" elif r.get("pdb_unknown"): pdb_note = " [PDB fetch failed — IX/fac skipped]" else: pdb_note = "" fail_note = "" if r["failures"] and r["failures"][0].get("field") != "TIMEOUT": top = r["failures"][0] fail_note = f" → {top['field']}: auth={top['auth']} pc={top['pc']}" print(f" [{i:3d}/{len(batch)}] AS{asn:<12} {status}{pdb_note}{fail_note}") except Exception as e: err_r = {"asn": asn, "pc_ok": False, "pdb_absent": False, "failures": [{"field": "EXCEPTION", "auth": None, "pc": None, "delta": None, "msg": str(e)}], "passed": False, "auth": {}, "pc_name": ""} results.append(err_r) print(f" [{i:3d}/{len(batch)}] AS{asn:<12} ERROR {e}") # ── Update registry ─────────────────────────────────────────────────────── for r in results: k = str(r["asn"]) entry = reg["asns"].setdefault(k, { "pass_count": 0, "error_count": 0, "consecutive_errors": 0, "peeringdb_absent": False, "last_audited": None, }) entry["last_audited"] = date # Only update peeringdb_absent when we have a confirmed answer if not r.get("pdb_unknown"): entry["peeringdb_absent"] = r.get("pdb_absent", False) if r["passed"]: entry["pass_count"] = entry.get("pass_count", 0) + 1 entry["consecutive_errors"] = 0 entry["last_status"] = "pass" # Clear all known_issues when ASN fully passes entry.pop("known_issues", None) else: entry["error_count"] = entry.get("error_count", 0) + 1 entry["consecutive_errors"] = entry.get("consecutive_errors", 0) + 1 entry["last_status"] = "fail" # ── Known issues: track persistent failures (2+ consecutive runs) ── _update_known_issues(entry, r["failures"], date) entry["last_failures"] = r["failures"] # Auth source meta auth = r.get("auth") or {} if auth.get("pdb_id"): entry["peeringdb_id"] = auth["pdb_id"] if r.get("pc_name"): entry["name"] = r["pc_name"] total = len(results) passed = sum(1 for r in results if r["passed"]) failed = total - passed no_pdb = sum(1 for r in results if r.get("pdb_absent")) pdb_fail = sum(1 for r in results if r.get("pdb_unknown")) accuracy = round(passed / total * 100) if total else 0 reg["meta"]["last_run"] = run_ts reg["meta"]["last_accuracy_pct"] = accuracy reg["meta"]["total_runs"] = reg["meta"].get("total_runs", 0) + 1 reg["meta"]["total_asns"] = len(reg["asns"]) _save_registry(reg) # ── Build report ────────────────────────────────────────────────────────── all_failures = [ {"asn": r["asn"], **f} for r in results for f in r["failures"] if f.get("field") not in ("TIMEOUT", "EXCEPTION") ] all_failures.sort(key=lambda x: x.get("delta") or 0, reverse=True) timeouts = [r["asn"] for r in results if not r["pc_ok"]] trend = "" if prev_accuracy is not None: diff = accuracy - prev_accuracy trend = f" Trend : {prev_accuracy}% → {accuracy}% ({diff:+d}%)\n" summary_lines = [ f"\n{'='*60}", f"AUDIT SUMMARY — {date}", f"{'='*60}", f" Audited : {total} ASNs", f" Passed : {passed} ({accuracy}%)", f" Failed : {failed}", f" No PDB : {no_pdb} (fac=0 ix=0 is CORRECT — not an error)", f" PDB err : {pdb_fail} (IX/fac skipped — PDB fetch failed, will retry next run)", f" PDB Key : {'Active (no rate limits)' if PEERINGDB_KEY else 'MISSING — configure PEERINGDB_API_KEY!'}", ] if trend: summary_lines.append(trend.rstrip()) if timeouts: summary_lines.append(f"\n Timeouts: AS{', AS'.join(str(a) for a in timeouts)}") summary_lines.append("") if all_failures: summary_lines.append("TOP DISCREPANCIES:") summary_lines.append(f" {'ASN':<12} {'Field':<24} {'Auth':>8} {'PeerCortex':>12} {'Delta':>8}") summary_lines.append(" " + "-"*66) for f in all_failures[:20]: summary_lines.append( f" AS{f['asn']:<10} {f['field']:<24} {str(f['auth']):>8} {str(f['pc']):>12} {str(f.get('delta','')):>8}" ) else: summary_lines.append("No discrepancies found — 100% accurate!") # PeeringDB-absent note absent_asns = [r["asn"] for r in results if r["pdb_absent"] and r["passed"]] if absent_asns: summary_lines.append( f"\nASNs not in PeeringDB (fac=0, ix=0 correct):\n" f" {', '.join('AS'+str(a) for a in sorted(absent_asns))}" ) # ── Known issues across entire registry (all ASNs, not just this batch) ─── all_entries = reg["asns"] open_issues = { k: v for k, v in all_entries.items() if v.get("known_issues") and any(i.get("status") == "open" for i in v["known_issues"].values()) } if open_issues: summary_lines.append( f"\n{'!'*60}\n" f"KNOWN ISSUES ({len(open_issues)} ASNs with persistent failures)\n" f"These remain until the data is correct in PeerCortex.\n" f"{'!'*60}" ) for k in sorted(open_issues, key=lambda x: int(x)): v = all_entries[k] name = v.get("name", "") streak = v.get("consecutive_errors", 0) issues = {fld: i for fld, i in v["known_issues"].items() if i.get("status") == "open"} summary_lines.append( f"\n AS{k} {name}" f" [OPEN — {streak} consecutive failures, " f"first seen: {next(iter(issues.values()))['first_seen']}]" ) for fld, i in issues.items(): summary_lines.append(f" ▸ {fld}:") summary_lines.append(f" {i['description']}") summary_lines.append( f" auth={i['last_auth']} pc={i['last_pc']} " f"seen {i['occurrences']}x last: {i['last_seen']}" ) summary_lines.append("") else: summary_lines.append("\nNo persistent known issues — all data is consistent.") # Overall DB health ever_failed = sum(1 for v in all_entries.values() if v.get("error_count", 0) > 0) clean_streak = sum(1 for v in all_entries.values() if v.get("consecutive_errors", 0) == 0 and v.get("last_audited")) summary_lines += [ f"\nDATABASE HEALTH:", f" Total tracked ASNs : {len(all_entries)}", f" Clean streak : {clean_streak} ASNs with 0 consecutive errors", f" Open known issues : {len(open_issues)} ASNs", f" Ever had errors : {ever_failed} ASNs", f"\nReport: {REPORTS_DIR}/{date}.json", ] summary = "\n".join(summary_lines) print(summary) # Save text report LATEST_TXT.write_text(header + summary) # Save JSON report report = { "date": date, "run_ts": run_ts, "batch_size": total, "passed": passed, "failed": failed, "pdb_absent": no_pdb, "accuracy_pct": accuracy, "pdb_key_active": bool(PEERINGDB_KEY), "known_issues_registry": { k: v["known_issues"] for k, v in all_entries.items() if v.get("known_issues") and any(i.get("status") == "open" for i in v["known_issues"].values()) }, "results": [ { "asn": r["asn"], "name": r.get("pc_name", ""), "pdb_absent": r.get("pdb_absent", False), "pdb_unknown": r.get("pdb_unknown", False), "passed": r["passed"], "failures": r["failures"], "known_issues": all_entries.get(str(r["asn"]), {}).get("known_issues"), "auth": {fk: fv for fk, fv in (r.get("auth") or {}).items() if fk not in ("pdb_ok", "ripe_ok")}, } for r in results ], } (REPORTS_DIR / f"{date}.json").write_text(json.dumps(report, indent=2)) return accuracy if __name__ == "__main__": acc = main() # Exit 0 if ≥90% accurate, 1 otherwise (cron can alert on non-zero exit) sys.exit(0 if acc >= 90 else 1)