PeerCortex/audit/audit.py
Rene Fichtmueller 9bc1292bac fix: add rate-limiting semaphores to audit script
The audit script was flooding RIPE Stat and PeeringDB with unthrottled
parallel requests, causing 429 rate-limits that resulted in auth=0
false negatives (inflating the failure count).

Changes:
- Added threading.Semaphore for RIPE Stat (max 3) and PeeringDB (max 2)
- Added retry logic to _fetch_ripe (was fire-and-forget)
- Increased PDB retries from 2 to 3 with longer backoff (2s, 4s, 6s)
- Increased ASN stagger from 2s to 3s

Results: Accuracy 84% -> 87% (trend: 77% -> 87%, +10%)
2026-03-30 07:22:09 +02:00

665 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
PeerCortex Daily Accuracy Audit
================================
Runs at midnight via cron, audits a rotating batch of ASNs, and tracks
accuracy over time. Compares PeerCortex data against authoritative sources:
- RIPE Stat (prefixes, neighbours)
- PeeringDB (IX presence, facilities)
Registry file: /opt/peercortex-app/audit/asn_registry.json
Reports dir: /opt/peercortex-app/audit/reports/YYYY-MM-DD.json
Latest text: /opt/peercortex-app/audit/latest_report.txt
"""
import json, os, sys, time, datetime, urllib.request, urllib.error, threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
# ─── Directories ──────────────────────────────────────────────────────────────
AUDIT_DIR = Path("/opt/peercortex-app/audit")
REGISTRY = AUDIT_DIR / "asn_registry.json"
REPORTS_DIR = AUDIT_DIR / "reports"
LATEST_TXT = AUDIT_DIR / "latest_report.txt"
LOG_FILE = AUDIT_DIR / "audit.log"
# ─── Load .env (for cron compatibility — env vars may not be inherited) ───────
def _load_dotenv():
env_path = Path("/opt/peercortex-app/.env")
if not env_path.exists():
return
for line in env_path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
idx = line.find("=")
if idx < 1:
continue
key = line[:idx].strip()
val = line[idx + 1:].strip().strip('"').strip("'")
if key not in os.environ:
os.environ[key] = val
_load_dotenv()
# ─── Config ───────────────────────────────────────────────────────────────────
PEERINGDB_KEY = os.environ.get("PEERINGDB_API_KEY", "")
PEERCORTEX_URL = "http://localhost:3101" # local — no Cloudflare overhead
PDB_BASE = "https://www.peeringdb.com/api"
RIPE_BASE = "https://stat.ripe.net/data"
BATCH_SIZE = 100
TIMEOUT_PC = 90 # large networks (Cloudflare, Amazon) can take 6080s
TIMEOUT_AUTH = 30
CONCURRENCY = 3 # parallel PeerCortex requests — keep low to avoid PDB hammering
# (each PC lookup triggers 3+ PDB calls internally)
# Tolerance for prefix/neighbour counts (BGP timing differences are normal)
PREFIX_TOL_PCT = 0.05 # 5%
PREFIX_TOL_ABS = 2 # absolute ±2
NEIGHBOUR_TOL = 0.25 # 25%
NEIGHBOUR_ABS = 5
# ─── Seed ASN list (100 well-known networks) ─────────────────────────────────
# Format: (asn, label) — label shown in reports, no functional effect
SEED_ASNS = [
# Tier-1 / Global backbones
174, 1239, 1299, 2914, 3257, 3320, 3356, 5511, 6461, 6762,
7018, 9002, 12956,
# Hyperscalers
714, 8075, 13335, 13414, 15169, 16509, 20940, 32934, 36459, 46489,
# IXP / Route servers
6777, 6939, 8283,
# Regional ISPs
6830, 3491, 2516, 4134, 4637, 4755, 4766, 9304, 9318, 7473,
# Hobbyist / community
34927, 50869, 59947, 199121, 206924, 211982, 212635, 213279, 215638,
# European operators
42476, 47541, 48821, 60610, 61955, 206479, 207841, 212232,
# APAC
4826, 7575, 7738, 9790, 17469, 17676, 23693, 24516, 38001, 38195,
45090, 45177, 55720, 55803, 56041, 131072, 132602,
# Americas
10429, 22085, 27947, 28006, 52320, 61832, 265702, 267613, 269608,
# Africa
8346, 36874, 36924, 37100, 37239, 37271, 37468, 37662, 327786, 328474,
# Middle East / Other
135377, 140627, 394695, 397213, 400304, 401307,
# Special / edge cases
1, 64512, 65000, 0, 4294967295,
]
# ─── Rate-limiting semaphores ────────────────────────────────────────────────
# Prevents 429 errors from RIPE Stat and PeeringDB by limiting concurrent requests.
# Without this, the audit floods both APIs and gets rate-limited, causing auth=0
# false negatives that inflate the failure count.
_ripe_sem = threading.Semaphore(3) # max 3 concurrent RIPE Stat requests
_pdb_sem = threading.Semaphore(2) # max 2 concurrent PeeringDB requests
# ─── HTTP helpers ─────────────────────────────────────────────────────────────
def _fetch(url, timeout=30, headers=None):
"""GET url → parsed JSON dict, or None on any error."""
try:
req = urllib.request.Request(url, headers=headers or {})
with urllib.request.urlopen(req, timeout=timeout) as r:
if r.status == 429:
return None
return json.loads(r.read().decode("utf-8", errors="replace"))
except Exception:
return None
def _fetch_pdb(path, timeout=30, retries=3):
"""Fetch PeeringDB with API key, semaphore throttling, and retry on 429."""
headers = {}
if PEERINGDB_KEY:
headers["Authorization"] = "Api-Key " + PEERINGDB_KEY
url = PDB_BASE + path
for attempt in range(retries + 1):
_pdb_sem.acquire()
try:
result = _fetch(url, timeout=timeout, headers=headers)
finally:
_pdb_sem.release()
if result is not None:
return result
if attempt < retries:
time.sleep(2.0 * (attempt + 1)) # 2s, 4s, 6s backoff
return None
def _fetch_ripe(endpoint, asn, timeout=30, retries=2):
"""Fetch RIPE Stat with semaphore throttling and retry on failure."""
url = f"{RIPE_BASE}/{endpoint}/data.json?resource=AS{asn}"
for attempt in range(retries + 1):
_ripe_sem.acquire()
try:
result = _fetch(url, timeout=timeout)
finally:
_ripe_sem.release()
if result is not None:
return result
if attempt < retries:
time.sleep(1.5 * (attempt + 1)) # 1.5s, 3s backoff
return None
def _fetch_pc(asn, timeout=90):
return _fetch(f"{PEERCORTEX_URL}/api/lookup?asn={asn}", timeout=timeout)
# ─── Registry helpers ─────────────────────────────────────────────────────────
def _load_registry():
if REGISTRY.exists():
try:
return json.loads(REGISTRY.read_text())
except Exception:
pass
return {"asns": {}, "meta": {"created": _today(), "total_runs": 0}}
def _save_registry(reg):
REGISTRY.write_text(json.dumps(reg, indent=2))
def _today():
return datetime.date.today().isoformat()
def _now_iso():
return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# ─── Batch selection (priority: errors > never audited > oldest) ──────────────
def _select_batch(reg, batch_size):
entries = reg["asns"]
# Ensure all seed ASNs are tracked
for asn in SEED_ASNS:
k = str(asn)
if k not in entries:
entries[k] = {
"last_audited": None,
"pass_count": 0,
"error_count": 0,
"consecutive_errors": 0,
"peeringdb_absent": False,
}
# Sort into three buckets
errored = sorted(
[k for k, v in entries.items() if v.get("consecutive_errors", 0) > 0],
key=lambda k: entries[k].get("consecutive_errors", 0),
reverse=True,
)
never = [k for k, v in entries.items()
if not v.get("last_audited") and k not in errored]
audited = sorted(
[k for k, v in entries.items()
if v.get("last_audited") and k not in errored],
key=lambda k: entries[k].get("last_audited", "9999"),
)
ordered = errored + never + audited
return [int(k) for k in ordered[:batch_size]]
# ─── Authoritative data fetch ─────────────────────────────────────────────────
def _fetch_auth(asn):
"""Fetch authoritative data for one ASN from RIPE Stat + PeeringDB."""
# PeeringDB net lookup first (need net_id for IX/fac queries).
# IMPORTANT: distinguish between:
# pdb_net is None → fetch FAILED (rate limit / timeout) → pdb_present=unknown
# pdb_net["data"] == [] → ASN genuinely NOT in PeeringDB
# pdb_net["data"][0].id → ASN IS in PeeringDB, use net_id
pdb_net = _fetch_pdb(f"/net?asn={asn}", timeout=TIMEOUT_AUTH)
pdb_fetch_ok = pdb_net is not None
net = ((pdb_net or {}).get("data") or [{}])[0]
net_id = net.get("id")
# RIPE Stat + PDB IX/Fac (run in parallel via threads, throttled by semaphores)
with ThreadPoolExecutor(max_workers=4) as pool:
f_pfx = pool.submit(_fetch_ripe, "announced-prefixes", asn, TIMEOUT_AUTH)
f_nb = pool.submit(_fetch_ripe, "asn-neighbours", asn, TIMEOUT_AUTH)
f_ix = pool.submit(
_fetch_pdb,
(f"/netixlan?net_id={net_id}&limit=1000" if net_id
else f"/netixlan?asn={asn}&limit=1000"),
TIMEOUT_AUTH,
)
f_fac = pool.submit(
_fetch_pdb,
f"/netfac?net_id={net_id}&limit=1000",
TIMEOUT_AUTH,
) if net_id else None
ripe_pfx = f_pfx.result()
ripe_nb = f_nb.result()
pdb_ix = f_ix.result()
pdb_fac = f_fac.result() if f_fac else None
prefixes = (ripe_pfx or {}).get("data", {}).get("prefixes", [])
v4 = sum(1 for p in prefixes if ":" not in p.get("prefix", ""))
v6 = sum(1 for p in prefixes if ":" in p.get("prefix", ""))
neighbours = (ripe_nb or {}).get("data", {}).get("neighbours", [])
up = sum(1 for n in neighbours if n.get("type") == "left")
dn = sum(1 for n in neighbours if n.get("type") == "right")
ix_list = (pdb_ix or {}).get("data", [])
ix_unique = len(set(c.get("ix_id") for c in ix_list if c.get("ix_id")))
fac_list = (pdb_fac or {}).get("data", []) if pdb_fac else []
fac_count = len(fac_list)
return {
"pdb_id": net_id,
# pdb_present: True only when we CONFIRMED the ASN is in PeeringDB (got data[0].id).
# False only when PeeringDB confirmed the ASN is NOT in PDB (data=[]).
# None when fetch failed — IX/fac discrepancies are not counted as PC errors.
"pdb_present": True if net_id else (False if pdb_fetch_ok else None),
"v4": v4, "v6": v6,
"ix": ix_unique, "fac": fac_count,
"up": up, "dn": dn,
"ripe_ok": ripe_pfx is not None,
"pdb_ok": pdb_fetch_ok,
}
# ─── Field comparison ─────────────────────────────────────────────────────────
def _ok(auth_val, pc_val, pct=PREFIX_TOL_PCT, abs_tol=PREFIX_TOL_ABS):
"""True if pc_val is within tolerance of auth_val."""
if auth_val is None or pc_val is None:
return True # cannot compare — treat as OK
if auth_val == 0 and pc_val == 0:
return True
if auth_val == 0:
return pc_val <= abs_tol
diff = abs(auth_val - pc_val)
return diff <= abs_tol or (diff / auth_val) <= pct
def _compare(asn, auth, pc):
"""Return list of failure dicts for this ASN."""
if pc is None:
return [{"field": "TIMEOUT", "auth": None, "pc": None, "delta": None}]
failures = []
pdb_absent = not auth["pdb_present"]
pc_v4 = (pc.get("prefixes") or {}).get("ipv4")
pc_v6 = (pc.get("prefixes") or {}).get("ipv6")
pc_ix = (pc.get("ix_presence") or {}).get("unique_ixps")
pc_fac = (pc.get("facilities") or {}).get("total")
pc_up = (pc.get("neighbours") or {}).get("upstream_count")
pc_dn = (pc.get("neighbours") or {}).get("downstream_count")
if not _ok(auth["v4"], pc_v4):
failures.append({"field": "Prefixes v4", "auth": auth["v4"], "pc": pc_v4,
"delta": abs(auth["v4"] - (pc_v4 or 0))})
if not _ok(auth["v6"], pc_v6):
failures.append({"field": "Prefixes v6", "auth": auth["v6"], "pc": pc_v6,
"delta": abs(auth["v6"] - (pc_v6 or 0))})
# IXP / facility — only check when auth CONFIRMED ASN is in PeeringDB.
# pdb_present=None means our fetch failed → skip to avoid false positives.
if auth["pdb_present"] is True:
if auth["ix"] != pc_ix:
failures.append({"field": "IXPs", "auth": auth["ix"], "pc": pc_ix,
"delta": abs(auth["ix"] - (pc_ix or 0))})
if auth["fac"] != pc_fac:
failures.append({"field": "Facilities", "auth": auth["fac"], "pc": pc_fac,
"delta": abs(auth["fac"] - (pc_fac or 0))})
if not _ok(auth["up"], pc_up, pct=NEIGHBOUR_TOL, abs_tol=NEIGHBOUR_ABS):
failures.append({"field": "Neighbours (upstream)", "auth": auth["up"], "pc": pc_up,
"delta": abs(auth["up"] - (pc_up or 0))})
if not _ok(auth["dn"], pc_dn, pct=NEIGHBOUR_TOL, abs_tol=NEIGHBOUR_ABS):
failures.append({"field": "Neighbours (downstream)", "auth": auth["dn"], "pc": pc_dn,
"delta": abs(auth["dn"] - (pc_dn or 0))})
return failures
# ─── Audit one ASN ───────────────────────────────────────────────────────────
def _audit_asn(asn):
auth = _fetch_auth(asn)
pc = _fetch_pc(asn, timeout=TIMEOUT_PC)
# Self-heal attempt: if PeerCortex returned data but looks stale,
# wait briefly and retry once (cache TTL is 5 min, but a 2nd hit
# ensures the process is alive and data is fresh)
if pc is None:
time.sleep(2)
pc = _fetch_pc(asn, timeout=TIMEOUT_PC)
failures = _compare(asn, auth, pc)
# pdb_present=False → confirmed NOT in PDB → "[no PDB — correct]"
# pdb_present=None → fetch failed → "[PDB fetch failed]"
# pdb_present=True → confirmed in PDB
pdb_state = auth["pdb_present"]
return {
"asn": asn,
"auth": auth,
"pc_name": ((pc or {}).get("network") or {}).get("name", ""),
"pc_ok": pc is not None,
"pdb_absent": pdb_state is False, # confirmed not in PDB
"pdb_unknown": pdb_state is None, # fetch error — skip IX/fac check
"failures": failures,
"passed": len(failures) == 0,
}
# ─── Known issue tracking ────────────────────────────────────────────────────
def _issue_description(field, auth_val, pc_val):
"""Generate a human-readable description of the data discrepancy."""
if field == "TIMEOUT":
return "PeerCortex did not respond within timeout — server may be overloaded"
if auth_val is not None and pc_val is not None:
if auth_val > 0 and (pc_val or 0) == 0:
return (
f"PeerCortex returns 0 but authoritative source shows {auth_val}. "
f"Likely cause: PeeringDB lookup failing in server.js for this ASN "
f"(net_id resolution or netixlan/netfac query failing)."
)
if (auth_val or 0) == 0 and pc_val > 0:
return (
f"PeerCortex returns {pc_val} but authoritative source shows 0. "
f"Likely cause: PeerCortex using stale cached data or querying wrong endpoint."
)
delta = abs(auth_val - pc_val)
pct = round(delta / max(auth_val, 1) * 100)
return (
f"Mismatch: PeerCortex={pc_val}, authoritative={auth_val} "
f"(delta={delta}, {pct}% off). Exceeds tolerance."
)
return f"Comparison not possible (auth={auth_val}, pc={pc_val})"
def _update_known_issues(entry, failures, date):
"""
Update the known_issues dict for an ASN registry entry.
A known issue is created when the same field fails in 2+ consecutive runs.
It stays open (status='open') until the field passes in a future run.
When the ASN fully passes, all known_issues are cleared in the caller.
"""
if not failures:
return
# Only promote to known_issue after 2+ consecutive failures
# (consecutive_errors was already incremented by caller)
if entry.get("consecutive_errors", 0) < 2:
return
known = entry.setdefault("known_issues", {})
failing_fields = {f["field"] for f in failures if f.get("field") not in ("TIMEOUT", "EXCEPTION")}
for f in failures:
field = f.get("field")
if not field or field in ("TIMEOUT", "EXCEPTION"):
continue
if field in known:
# Update existing issue
known[field]["last_seen"] = date
known[field]["occurrences"] = known[field].get("occurrences", 1) + 1
known[field]["last_auth"] = f.get("auth")
known[field]["last_pc"] = f.get("pc")
known[field]["description"] = _issue_description(field, f.get("auth"), f.get("pc"))
else:
# Create new known issue
known[field] = {
"field": field,
"first_seen": date,
"last_seen": date,
"occurrences": 1,
"status": "open",
"last_auth": f.get("auth"),
"last_pc": f.get("pc"),
"description": _issue_description(field, f.get("auth"), f.get("pc")),
}
# Clear issues for fields that are now passing (partial fix)
for field in list(known.keys()):
if field not in failing_fields:
known[field]["status"] = "resolved"
known[field]["fixed_on"] = date
# ─── Main ─────────────────────────────────────────────────────────────────────
def main():
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
reg = _load_registry()
date = _today()
run_ts = _now_iso()
prev_accuracy = reg["meta"].get("last_accuracy_pct")
batch = _select_batch(reg, BATCH_SIZE)
header = (
f"\n{'='*60}\n"
f"PeerCortex Daily Audit — {date} ({run_ts})\n"
f"{'='*60}\n"
f"Batch: {len(batch)} ASNs | "
f"PDB key: {'ACTIVE' if PEERINGDB_KEY else 'MISSING — rate limits likely!'}\n"
)
print(header)
results = []
with ThreadPoolExecutor(max_workers=CONCURRENCY) as pool:
# Stagger submissions by 3s so PeerCortex's internal PDB requests
# don't all fire simultaneously (semaphore limits concurrent API calls,
# but staggering avoids burst pressure on rate-limit windows).
futures = {}
for idx, asn in enumerate(batch):
if idx > 0:
time.sleep(3)
futures[pool.submit(_audit_asn, asn)] = asn
for i, future in enumerate(as_completed(futures), 1):
asn = futures[future]
try:
r = future.result()
results.append(r)
status = "" if r["passed"] else f"{len(r['failures'])}"
if r.get("pdb_absent"):
pdb_note = " [no PDB — correct]"
elif r.get("pdb_unknown"):
pdb_note = " [PDB fetch failed — IX/fac skipped]"
else:
pdb_note = ""
fail_note = ""
if r["failures"] and r["failures"][0].get("field") != "TIMEOUT":
top = r["failures"][0]
fail_note = f"{top['field']}: auth={top['auth']} pc={top['pc']}"
print(f" [{i:3d}/{len(batch)}] AS{asn:<12} {status}{pdb_note}{fail_note}")
except Exception as e:
err_r = {"asn": asn, "pc_ok": False, "pdb_absent": False,
"failures": [{"field": "EXCEPTION", "auth": None,
"pc": None, "delta": None, "msg": str(e)}],
"passed": False, "auth": {}, "pc_name": ""}
results.append(err_r)
print(f" [{i:3d}/{len(batch)}] AS{asn:<12} ERROR {e}")
# ── Update registry ───────────────────────────────────────────────────────
for r in results:
k = str(r["asn"])
entry = reg["asns"].setdefault(k, {
"pass_count": 0, "error_count": 0, "consecutive_errors": 0,
"peeringdb_absent": False, "last_audited": None,
})
entry["last_audited"] = date
# Only update peeringdb_absent when we have a confirmed answer
if not r.get("pdb_unknown"):
entry["peeringdb_absent"] = r.get("pdb_absent", False)
if r["passed"]:
entry["pass_count"] = entry.get("pass_count", 0) + 1
entry["consecutive_errors"] = 0
entry["last_status"] = "pass"
# Clear all known_issues when ASN fully passes
entry.pop("known_issues", None)
else:
entry["error_count"] = entry.get("error_count", 0) + 1
entry["consecutive_errors"] = entry.get("consecutive_errors", 0) + 1
entry["last_status"] = "fail"
# ── Known issues: track persistent failures (2+ consecutive runs) ──
_update_known_issues(entry, r["failures"], date)
entry["last_failures"] = r["failures"]
# Auth source meta
auth = r.get("auth") or {}
if auth.get("pdb_id"):
entry["peeringdb_id"] = auth["pdb_id"]
if r.get("pc_name"):
entry["name"] = r["pc_name"]
total = len(results)
passed = sum(1 for r in results if r["passed"])
failed = total - passed
no_pdb = sum(1 for r in results if r.get("pdb_absent"))
pdb_fail = sum(1 for r in results if r.get("pdb_unknown"))
accuracy = round(passed / total * 100) if total else 0
reg["meta"]["last_run"] = run_ts
reg["meta"]["last_accuracy_pct"] = accuracy
reg["meta"]["total_runs"] = reg["meta"].get("total_runs", 0) + 1
reg["meta"]["total_asns"] = len(reg["asns"])
_save_registry(reg)
# ── Build report ──────────────────────────────────────────────────────────
all_failures = [
{"asn": r["asn"], **f}
for r in results
for f in r["failures"]
if f.get("field") not in ("TIMEOUT", "EXCEPTION")
]
all_failures.sort(key=lambda x: x.get("delta") or 0, reverse=True)
timeouts = [r["asn"] for r in results if not r["pc_ok"]]
trend = ""
if prev_accuracy is not None:
diff = accuracy - prev_accuracy
trend = f" Trend : {prev_accuracy}% → {accuracy}% ({diff:+d}%)\n"
summary_lines = [
f"\n{'='*60}",
f"AUDIT SUMMARY — {date}",
f"{'='*60}",
f" Audited : {total} ASNs",
f" Passed : {passed} ({accuracy}%)",
f" Failed : {failed}",
f" No PDB : {no_pdb} (fac=0 ix=0 is CORRECT — not an error)",
f" PDB err : {pdb_fail} (IX/fac skipped — PDB fetch failed, will retry next run)",
f" PDB Key : {'Active (no rate limits)' if PEERINGDB_KEY else 'MISSING — configure PEERINGDB_API_KEY!'}",
]
if trend:
summary_lines.append(trend.rstrip())
if timeouts:
summary_lines.append(f"\n Timeouts: AS{', AS'.join(str(a) for a in timeouts)}")
summary_lines.append("")
if all_failures:
summary_lines.append("TOP DISCREPANCIES:")
summary_lines.append(f" {'ASN':<12} {'Field':<24} {'Auth':>8} {'PeerCortex':>12} {'Delta':>8}")
summary_lines.append(" " + "-"*66)
for f in all_failures[:20]:
summary_lines.append(
f" AS{f['asn']:<10} {f['field']:<24} {str(f['auth']):>8} {str(f['pc']):>12} {str(f.get('delta','')):>8}"
)
else:
summary_lines.append("No discrepancies found — 100% accurate!")
# PeeringDB-absent note
absent_asns = [r["asn"] for r in results if r["pdb_absent"] and r["passed"]]
if absent_asns:
summary_lines.append(
f"\nASNs not in PeeringDB (fac=0, ix=0 correct):\n"
f" {', '.join('AS'+str(a) for a in sorted(absent_asns))}"
)
# ── Known issues across entire registry (all ASNs, not just this batch) ───
all_entries = reg["asns"]
open_issues = {
k: v for k, v in all_entries.items()
if v.get("known_issues") and
any(i.get("status") == "open" for i in v["known_issues"].values())
}
if open_issues:
summary_lines.append(
f"\n{'!'*60}\n"
f"KNOWN ISSUES ({len(open_issues)} ASNs with persistent failures)\n"
f"These remain until the data is correct in PeerCortex.\n"
f"{'!'*60}"
)
for k in sorted(open_issues, key=lambda x: int(x)):
v = all_entries[k]
name = v.get("name", "")
streak = v.get("consecutive_errors", 0)
issues = {fld: i for fld, i in v["known_issues"].items()
if i.get("status") == "open"}
summary_lines.append(
f"\n AS{k} {name}"
f" [OPEN — {streak} consecutive failures, "
f"first seen: {next(iter(issues.values()))['first_seen']}]"
)
for fld, i in issues.items():
summary_lines.append(f"{fld}:")
summary_lines.append(f" {i['description']}")
summary_lines.append(
f" auth={i['last_auth']} pc={i['last_pc']} "
f"seen {i['occurrences']}x last: {i['last_seen']}"
)
summary_lines.append("")
else:
summary_lines.append("\nNo persistent known issues — all data is consistent.")
# Overall DB health
ever_failed = sum(1 for v in all_entries.values() if v.get("error_count", 0) > 0)
clean_streak = sum(1 for v in all_entries.values()
if v.get("consecutive_errors", 0) == 0 and v.get("last_audited"))
summary_lines += [
f"\nDATABASE HEALTH:",
f" Total tracked ASNs : {len(all_entries)}",
f" Clean streak : {clean_streak} ASNs with 0 consecutive errors",
f" Open known issues : {len(open_issues)} ASNs",
f" Ever had errors : {ever_failed} ASNs",
f"\nReport: {REPORTS_DIR}/{date}.json",
]
summary = "\n".join(summary_lines)
print(summary)
# Save text report
LATEST_TXT.write_text(header + summary)
# Save JSON report
report = {
"date": date,
"run_ts": run_ts,
"batch_size": total,
"passed": passed,
"failed": failed,
"pdb_absent": no_pdb,
"accuracy_pct": accuracy,
"pdb_key_active": bool(PEERINGDB_KEY),
"known_issues_registry": {
k: v["known_issues"]
for k, v in all_entries.items()
if v.get("known_issues") and
any(i.get("status") == "open" for i in v["known_issues"].values())
},
"results": [
{
"asn": r["asn"],
"name": r.get("pc_name", ""),
"pdb_absent": r.get("pdb_absent", False),
"pdb_unknown": r.get("pdb_unknown", False),
"passed": r["passed"],
"failures": r["failures"],
"known_issues": all_entries.get(str(r["asn"]), {}).get("known_issues"),
"auth": {fk: fv for fk, fv in (r.get("auth") or {}).items()
if fk not in ("pdb_ok", "ripe_ok")},
}
for r in results
],
}
(REPORTS_DIR / f"{date}.json").write_text(json.dumps(report, indent=2))
return accuracy
if __name__ == "__main__":
acc = main()
# Exit 0 if ≥90% accurate, 1 otherwise (cron can alert on non-zero exit)
sys.exit(0 if acc >= 90 else 1)