fix: add rate-limiting semaphores to audit script
The audit script was flooding RIPE Stat and PeeringDB with unthrottled parallel requests, causing 429 rate-limits that resulted in auth=0 false negatives (inflating the failure count). Changes: - Added threading.Semaphore for RIPE Stat (max 3) and PeeringDB (max 2) - Added retry logic to _fetch_ripe (was fire-and-forget) - Increased PDB retries from 2 to 3 with longer backoff (2s, 4s, 6s) - Increased ASN stagger from 2s to 3s Results: Accuracy 84% -> 87% (trend: 77% -> 87%, +10%)
This commit is contained in:
parent
69650c1875
commit
9bc1292bac
@ -12,7 +12,7 @@ Reports dir: /opt/peercortex-app/audit/reports/YYYY-MM-DD.json
|
|||||||
Latest text: /opt/peercortex-app/audit/latest_report.txt
|
Latest text: /opt/peercortex-app/audit/latest_report.txt
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import json, os, sys, time, datetime, urllib.request, urllib.error
|
import json, os, sys, time, datetime, urllib.request, urllib.error, threading
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
@ -89,6 +89,13 @@ SEED_ASNS = [
|
|||||||
1, 64512, 65000, 0, 4294967295,
|
1, 64512, 65000, 0, 4294967295,
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# ─── Rate-limiting semaphores ────────────────────────────────────────────────
|
||||||
|
# Prevents 429 errors from RIPE Stat and PeeringDB by limiting concurrent requests.
|
||||||
|
# Without this, the audit floods both APIs and gets rate-limited, causing auth=0
|
||||||
|
# false negatives that inflate the failure count.
|
||||||
|
_ripe_sem = threading.Semaphore(3) # max 3 concurrent RIPE Stat requests
|
||||||
|
_pdb_sem = threading.Semaphore(2) # max 2 concurrent PeeringDB requests
|
||||||
|
|
||||||
# ─── HTTP helpers ─────────────────────────────────────────────────────────────
|
# ─── HTTP helpers ─────────────────────────────────────────────────────────────
|
||||||
def _fetch(url, timeout=30, headers=None):
|
def _fetch(url, timeout=30, headers=None):
|
||||||
"""GET url → parsed JSON dict, or None on any error."""
|
"""GET url → parsed JSON dict, or None on any error."""
|
||||||
@ -101,24 +108,39 @@ def _fetch(url, timeout=30, headers=None):
|
|||||||
except Exception:
|
except Exception:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _fetch_pdb(path, timeout=30, retries=2):
|
def _fetch_pdb(path, timeout=30, retries=3):
|
||||||
"""Fetch PeeringDB with API key and retry on 429 / failures."""
|
"""Fetch PeeringDB with API key, semaphore throttling, and retry on 429."""
|
||||||
headers = {}
|
headers = {}
|
||||||
if PEERINGDB_KEY:
|
if PEERINGDB_KEY:
|
||||||
headers["Authorization"] = "Api-Key " + PEERINGDB_KEY
|
headers["Authorization"] = "Api-Key " + PEERINGDB_KEY
|
||||||
url = PDB_BASE + path
|
url = PDB_BASE + path
|
||||||
for attempt in range(retries + 1):
|
for attempt in range(retries + 1):
|
||||||
result = _fetch(url, timeout=timeout, headers=headers)
|
_pdb_sem.acquire()
|
||||||
|
try:
|
||||||
|
result = _fetch(url, timeout=timeout, headers=headers)
|
||||||
|
finally:
|
||||||
|
_pdb_sem.release()
|
||||||
|
if result is not None:
|
||||||
|
return result
|
||||||
|
if attempt < retries:
|
||||||
|
time.sleep(2.0 * (attempt + 1)) # 2s, 4s, 6s backoff
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _fetch_ripe(endpoint, asn, timeout=30, retries=2):
|
||||||
|
"""Fetch RIPE Stat with semaphore throttling and retry on failure."""
|
||||||
|
url = f"{RIPE_BASE}/{endpoint}/data.json?resource=AS{asn}"
|
||||||
|
for attempt in range(retries + 1):
|
||||||
|
_ripe_sem.acquire()
|
||||||
|
try:
|
||||||
|
result = _fetch(url, timeout=timeout)
|
||||||
|
finally:
|
||||||
|
_ripe_sem.release()
|
||||||
if result is not None:
|
if result is not None:
|
||||||
return result
|
return result
|
||||||
if attempt < retries:
|
if attempt < retries:
|
||||||
time.sleep(1.5 * (attempt + 1)) # 1.5s, 3s backoff
|
time.sleep(1.5 * (attempt + 1)) # 1.5s, 3s backoff
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _fetch_ripe(endpoint, asn, timeout=30):
|
|
||||||
url = f"{RIPE_BASE}/{endpoint}/data.json?resource=AS{asn}"
|
|
||||||
return _fetch(url, timeout=timeout)
|
|
||||||
|
|
||||||
def _fetch_pc(asn, timeout=90):
|
def _fetch_pc(asn, timeout=90):
|
||||||
return _fetch(f"{PEERCORTEX_URL}/api/lookup?asn={asn}", timeout=timeout)
|
return _fetch(f"{PEERCORTEX_URL}/api/lookup?asn={asn}", timeout=timeout)
|
||||||
|
|
||||||
@ -186,8 +208,7 @@ def _fetch_auth(asn):
|
|||||||
net = ((pdb_net or {}).get("data") or [{}])[0]
|
net = ((pdb_net or {}).get("data") or [{}])[0]
|
||||||
net_id = net.get("id")
|
net_id = net.get("id")
|
||||||
|
|
||||||
# RIPE Stat (run in parallel via threads)
|
# RIPE Stat + PDB IX/Fac (run in parallel via threads, throttled by semaphores)
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
|
||||||
with ThreadPoolExecutor(max_workers=4) as pool:
|
with ThreadPoolExecutor(max_workers=4) as pool:
|
||||||
f_pfx = pool.submit(_fetch_ripe, "announced-prefixes", asn, TIMEOUT_AUTH)
|
f_pfx = pool.submit(_fetch_ripe, "announced-prefixes", asn, TIMEOUT_AUTH)
|
||||||
f_nb = pool.submit(_fetch_ripe, "asn-neighbours", asn, TIMEOUT_AUTH)
|
f_nb = pool.submit(_fetch_ripe, "asn-neighbours", asn, TIMEOUT_AUTH)
|
||||||
@ -414,12 +435,13 @@ def main():
|
|||||||
|
|
||||||
results = []
|
results = []
|
||||||
with ThreadPoolExecutor(max_workers=CONCURRENCY) as pool:
|
with ThreadPoolExecutor(max_workers=CONCURRENCY) as pool:
|
||||||
# Stagger submissions by 2s so PeerCortex's internal PDB requests
|
# Stagger submissions by 3s so PeerCortex's internal PDB requests
|
||||||
# don't all fire simultaneously (9+ concurrent PDB calls → rate limit).
|
# don't all fire simultaneously (semaphore limits concurrent API calls,
|
||||||
|
# but staggering avoids burst pressure on rate-limit windows).
|
||||||
futures = {}
|
futures = {}
|
||||||
for idx, asn in enumerate(batch):
|
for idx, asn in enumerate(batch):
|
||||||
if idx > 0:
|
if idx > 0:
|
||||||
time.sleep(2)
|
time.sleep(3)
|
||||||
futures[pool.submit(_audit_asn, asn)] = asn
|
futures[pool.submit(_audit_asn, asn)] = asn
|
||||||
for i, future in enumerate(as_completed(futures), 1):
|
for i, future in enumerate(as_completed(futures), 1):
|
||||||
asn = futures[future]
|
asn = futures[future]
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user