diff --git a/audit/audit.py b/audit/audit.py index cc76070..9d06522 100644 --- a/audit/audit.py +++ b/audit/audit.py @@ -12,7 +12,7 @@ Reports dir: /opt/peercortex-app/audit/reports/YYYY-MM-DD.json Latest text: /opt/peercortex-app/audit/latest_report.txt """ -import json, os, sys, time, datetime, urllib.request, urllib.error +import json, os, sys, time, datetime, urllib.request, urllib.error, threading from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path @@ -89,6 +89,13 @@ SEED_ASNS = [ 1, 64512, 65000, 0, 4294967295, ] +# ─── Rate-limiting semaphores ──────────────────────────────────────────────── +# Prevents 429 errors from RIPE Stat and PeeringDB by limiting concurrent requests. +# Without this, the audit floods both APIs and gets rate-limited, causing auth=0 +# false negatives that inflate the failure count. +_ripe_sem = threading.Semaphore(3) # max 3 concurrent RIPE Stat requests +_pdb_sem = threading.Semaphore(2) # max 2 concurrent PeeringDB requests + # ─── HTTP helpers ───────────────────────────────────────────────────────────── def _fetch(url, timeout=30, headers=None): """GET url → parsed JSON dict, or None on any error.""" @@ -101,24 +108,39 @@ def _fetch(url, timeout=30, headers=None): except Exception: return None -def _fetch_pdb(path, timeout=30, retries=2): - """Fetch PeeringDB with API key and retry on 429 / failures.""" +def _fetch_pdb(path, timeout=30, retries=3): + """Fetch PeeringDB with API key, semaphore throttling, and retry on 429.""" headers = {} if PEERINGDB_KEY: headers["Authorization"] = "Api-Key " + PEERINGDB_KEY url = PDB_BASE + path for attempt in range(retries + 1): - result = _fetch(url, timeout=timeout, headers=headers) + _pdb_sem.acquire() + try: + result = _fetch(url, timeout=timeout, headers=headers) + finally: + _pdb_sem.release() + if result is not None: + return result + if attempt < retries: + time.sleep(2.0 * (attempt + 1)) # 2s, 4s, 6s backoff + return None + +def _fetch_ripe(endpoint, asn, timeout=30, retries=2): + """Fetch RIPE Stat with semaphore throttling and retry on failure.""" + url = f"{RIPE_BASE}/{endpoint}/data.json?resource=AS{asn}" + for attempt in range(retries + 1): + _ripe_sem.acquire() + try: + result = _fetch(url, timeout=timeout) + finally: + _ripe_sem.release() if result is not None: return result if attempt < retries: time.sleep(1.5 * (attempt + 1)) # 1.5s, 3s backoff return None -def _fetch_ripe(endpoint, asn, timeout=30): - url = f"{RIPE_BASE}/{endpoint}/data.json?resource=AS{asn}" - return _fetch(url, timeout=timeout) - def _fetch_pc(asn, timeout=90): return _fetch(f"{PEERCORTEX_URL}/api/lookup?asn={asn}", timeout=timeout) @@ -186,8 +208,7 @@ def _fetch_auth(asn): net = ((pdb_net or {}).get("data") or [{}])[0] net_id = net.get("id") - # RIPE Stat (run in parallel via threads) - from concurrent.futures import ThreadPoolExecutor + # RIPE Stat + PDB IX/Fac (run in parallel via threads, throttled by semaphores) with ThreadPoolExecutor(max_workers=4) as pool: f_pfx = pool.submit(_fetch_ripe, "announced-prefixes", asn, TIMEOUT_AUTH) f_nb = pool.submit(_fetch_ripe, "asn-neighbours", asn, TIMEOUT_AUTH) @@ -414,12 +435,13 @@ def main(): results = [] with ThreadPoolExecutor(max_workers=CONCURRENCY) as pool: - # Stagger submissions by 2s so PeerCortex's internal PDB requests - # don't all fire simultaneously (9+ concurrent PDB calls → rate limit). + # Stagger submissions by 3s so PeerCortex's internal PDB requests + # don't all fire simultaneously (semaphore limits concurrent API calls, + # but staggering avoids burst pressure on rate-limit windows). futures = {} for idx, asn in enumerate(batch): if idx > 0: - time.sleep(2) + time.sleep(3) futures[pool.submit(_audit_asn, asn)] = asn for i, future in enumerate(as_completed(futures), 1): asn = futures[future]