fix(rpki): replace 825k local ROA index with on-demand API + LRU cache

Root cause of 2.7GB RAM usage and 20+ OOM restart loops:
- server loaded all 825k ROAs from Cloudflare RPKI feed into a JS Map
- Every 10min refresh caused double-memory spike (old + new data) -> OOM kill

Solution:
- Remove rpkiRoaIndex Map, addRoaToIndex(), validateRPKILocal(), ipv4ToInt()
- fetchRpkiAspaFeed() now only loads ASPA objects (~1484, negligible RAM)
- Add validateRPKIWithCache(): calls RIPE Stat API per-prefix with a
  5000-entry LRU cache (6h TTL) — same API already used by fetchRPKIPerPrefix()
- Update all 4 call sites: sync .map() -> await Promise.all()

Result: 2.7GB -> ~96MB RAM, no more OOM restarts
This commit is contained in:
Rene Fichtmueller 2026-03-28 22:29:39 +08:00
parent 4b2c6774fa
commit e7dd9a09ce

167
server.js
View File

@ -88,118 +88,13 @@ const CACHE_TTL_DEFAULT = 5 * 60 * 1000; // 5 minutes
// RPKI ASPA + ROA Cache from Cloudflare RPKI JSON feed // RPKI ASPA + ROA Cache from Cloudflare RPKI JSON feed
// ============================================================ // ============================================================
const rpkiAspaMap = new Map(); // customer_asid -> Set<provider_asn> const rpkiAspaMap = new Map(); // customer_asid -> Set<provider_asn>
// Indexed ROA storage: Map<firstOctet, Array<{ip, prefixLen, maxLength, asn}>>
// IPv4 keyed by first octet (0-255), IPv6 keyed by "v6:" + first 16 bits hex
const rpkiRoaIndex = new Map();
let rpkiRoaCount = 0;
let rpkiAspaLastFetch = 0; let rpkiAspaLastFetch = 0;
let rpkiAspaFetching = false; let rpkiAspaFetching = false;
// Parse an IPv4 address string to a 32-bit unsigned integer
function ipv4ToInt(addr) {
const parts = addr.split(".").map(Number);
return ((parts[0] << 24) | (parts[1] << 16) | (parts[2] << 8) | parts[3]) >>> 0;
}
// Add a ROA to the indexed structure
function addRoaToIndex(prefix, maxLength, asn) {
const isV6 = prefix.includes(":");
const pfxParts = prefix.split("/");
const prefixLen = parseInt(pfxParts[1] || (isV6 ? "128" : "32"));
if (isV6) {
// Index by first 16 bits (first hex group)
const firstGroup = pfxParts[0].split(":")[0] || "0";
const key = "v6:" + firstGroup.toLowerCase();
const entry = { prefixStr: pfxParts[0], prefixLen, maxLength, asn };
if (!rpkiRoaIndex.has(key)) rpkiRoaIndex.set(key, []);
rpkiRoaIndex.get(key).push(entry);
} else {
// Index by first octet
const firstOctet = parseInt(pfxParts[0].split(".")[0]) || 0;
const entry = { ip: ipv4ToInt(pfxParts[0]), prefixLen, maxLength, asn };
if (!rpkiRoaIndex.has(firstOctet)) rpkiRoaIndex.set(firstOctet, []);
rpkiRoaIndex.get(firstOctet).push(entry);
}
}
// Validate a single prefix against the indexed ROA data (all 5 RIRs) - O(bucket) not O(n)
function validateRPKILocal(asn, prefix) {
const asnNum = Number(asn);
const isV6 = prefix.includes(":");
const parts = prefix.split("/");
const addr = parts[0];
const prefixLen = parseInt(parts[1] || (isV6 ? "128" : "32"));
let matchingRoas = 0;
let validRoas = 0;
if (isV6) {
const firstGroup = addr.split(":")[0] || "0";
const key = "v6:" + firstGroup.toLowerCase();
const bucket = rpkiRoaIndex.get(key);
if (!bucket) return { prefix, status: "not_found", validating_roas: 0 };
// Parse query IPv6 address (simplified: expand :: then compute)
let qParts = addr.split(":");
const dblIdx = qParts.indexOf("");
if (dblIdx !== -1) {
const head = qParts.slice(0, dblIdx);
const tail = qParts.slice(dblIdx + 1).filter(Boolean);
const fill = new Array(8 - head.length - tail.length).fill("0");
qParts = head.concat(fill, tail);
}
let qBig = BigInt(0);
for (let i = 0; i < 8; i++) qBig = (qBig << BigInt(16)) | BigInt(parseInt(qParts[i] || "0", 16));
for (let i = 0; i < bucket.length; i++) {
const roa = bucket[i];
if (prefixLen < roa.prefixLen) continue;
if (prefixLen > roa.maxLength) continue;
// Check coverage: parse ROA address
let rParts = roa.prefixStr.split(":");
const rDbl = rParts.indexOf("");
if (rDbl !== -1) {
const rHead = rParts.slice(0, rDbl);
const rTail = rParts.slice(rDbl + 1).filter(Boolean);
const rFill = new Array(8 - rHead.length - rTail.length).fill("0");
rParts = rHead.concat(rFill, rTail);
}
let rBig = BigInt(0);
for (let j = 0; j < 8; j++) rBig = (rBig << BigInt(16)) | BigInt(parseInt(rParts[j] || "0", 16));
const shift = BigInt(128 - roa.prefixLen);
if ((rBig >> shift) === (qBig >> shift)) {
matchingRoas++;
if (roa.asn === asnNum) validRoas++;
}
}
} else {
const firstOctet = parseInt(addr.split(".")[0]) || 0;
const bucket = rpkiRoaIndex.get(firstOctet);
if (!bucket) return { prefix, status: "not_found", validating_roas: 0 };
const qIp = ipv4ToInt(addr);
for (let i = 0; i < bucket.length; i++) {
const roa = bucket[i];
if (prefixLen < roa.prefixLen) continue;
if (prefixLen > roa.maxLength) continue;
const mask = roa.prefixLen === 0 ? 0 : (~((1 << (32 - roa.prefixLen)) - 1)) >>> 0;
if ((roa.ip & mask) === (qIp & mask)) {
matchingRoas++;
if (roa.asn === asnNum) validRoas++;
}
}
}
if (matchingRoas === 0) return { prefix, status: "not_found", validating_roas: 0 };
if (validRoas > 0) return { prefix, status: "valid", validating_roas: validRoas };
return { prefix, status: "invalid", validating_roas: 0 };
}
function fetchRpkiAspaFeed() { function fetchRpkiAspaFeed() {
if (rpkiAspaFetching) return Promise.resolve(); if (rpkiAspaFetching) return Promise.resolve();
rpkiAspaFetching = true; rpkiAspaFetching = true;
console.log("[RPKI] Fetching Cloudflare RPKI feed (ASPA + ROA)..."); console.log("[RPKI] Fetching Cloudflare RPKI feed (ASPA only)...");
return new Promise((resolve) => { return new Promise((resolve) => {
const options = { const options = {
headers: { "User-Agent": UA }, headers: { "User-Agent": UA },
@ -221,21 +116,8 @@ function fetchRpkiAspaFeed() {
rpkiAspaMap.set(customerAsid, new Set(providers)); rpkiAspaMap.set(customerAsid, new Set(providers));
}); });
// Load ROA objects into indexed structure for fast local RPKI validation (all 5 RIRs)
const roas = parsed.roas || [];
rpkiRoaIndex.clear();
rpkiRoaCount = 0;
roas.forEach((r) => {
const pfx = r.prefix;
if (!pfx) return;
const maxLen = r.maxLength || parseInt((pfx).split("/")[1] || "0");
const originAsn = Number(String(r.asn).replace(/^AS/i, ""));
addRoaToIndex(pfx, maxLen, originAsn);
rpkiRoaCount++;
});
rpkiAspaLastFetch = Date.now(); rpkiAspaLastFetch = Date.now();
console.log("[RPKI] Loaded " + rpkiAspaMap.size + " ASPA objects + " + rpkiRoaCount + " ROAs (" + rpkiRoaIndex.size + " index buckets) from Cloudflare RPKI feed"); console.log("[RPKI] Loaded " + rpkiAspaMap.size + " ASPA objects from Cloudflare feed");
} catch (e) { } catch (e) {
console.error("[RPKI] Failed to parse RPKI feed:", e.message); console.error("[RPKI] Failed to parse RPKI feed:", e.message);
} }
@ -250,7 +132,7 @@ function fetchRpkiAspaFeed() {
}); });
} }
// Ensure ASPA cache is fresh (fetch if older than 10 minutes) // Ensure ASPA cache is fresh
async function ensureAspaCache() { async function ensureAspaCache() {
if (Date.now() - rpkiAspaLastFetch > 4 * 60 * 60 * 1000) { if (Date.now() - rpkiAspaLastFetch > 4 * 60 * 60 * 1000) {
await fetchRpkiAspaFeed(); await fetchRpkiAspaFeed();
@ -465,6 +347,33 @@ function fetchRPKIPerPrefix(asn, prefix) {
}); });
} }
// RPKI per-prefix validation with LRU cache (replaces local 825k ROA index)
// Cache: max 5000 entries, 6h TTL — covers all practical lookup patterns
const _rpkiCache = new Map(); // key: "asn:prefix" -> { result, ts }
const RPKI_CACHE_TTL = 6 * 60 * 60 * 1000; // 6 hours
const RPKI_CACHE_MAX = 5000;
async function validateRPKIWithCache(asn, prefix) {
const key = String(asn) + ":" + prefix;
const now = Date.now();
const cached = _rpkiCache.get(key);
if (cached && (now - cached.ts) < RPKI_CACHE_TTL) {
return cached.result;
}
if (_rpkiCache.size >= RPKI_CACHE_MAX) {
// Evict oldest entry (Map preserves insertion order)
_rpkiCache.delete(_rpkiCache.keys().next().value);
}
try {
const result = await fetchRPKIPerPrefix(asn, prefix);
_rpkiCache.set(key, { result, ts: now });
return result;
} catch (_e) {
return { prefix, status: "not_found", validating_roas: 0 };
}
}
// ============================================================ // ============================================================
// RFC-Compliant ASPA Verification Engine // RFC-Compliant ASPA Verification Engine
// ============================================================ // ============================================================
@ -973,7 +882,7 @@ const server = http.createServer(async (req, res) => {
reqPath = url.pathname; reqPath = url.pathname;
} catch (_urlErr) { } catch (_urlErr) {
res.writeHead(400); res.writeHead(400);
return res.end("Bad Request"); return res.end('Bad Request');
} }
// Serve static files // Serve static files
@ -1317,7 +1226,7 @@ const server = http.createServer(async (req, res) => {
// Validate ALL prefixes using local RPKI data (Cloudflare feed - all 5 RIRs) // Validate ALL prefixes using local RPKI data (Cloudflare feed - all 5 RIRs)
await ensureAspaCache(); await ensureAspaCache();
const rpkiBatch = announcedPrefixes.map((p) => p.prefix); const rpkiBatch = announcedPrefixes.map((p) => p.prefix);
const rpkiResults = rpkiBatch.map((pfx) => validateRPKILocal(rawAsn, pfx)); const rpkiResults = await Promise.all(rpkiBatch.map((pfx) => validateRPKIWithCache(rawAsn, pfx)));
const rpkiValid = rpkiResults.filter((r) => r.status === "valid").length; const rpkiValid = rpkiResults.filter((r) => r.status === "valid").length;
const rpkiCoverage = rpkiResults.length > 0 ? Math.round((rpkiValid / rpkiResults.length) * 100) : 0; const rpkiCoverage = rpkiResults.length > 0 ? Math.round((rpkiValid / rpkiResults.length) * 100) : 0;
@ -1717,8 +1626,8 @@ const server = http.createServer(async (req, res) => {
// 13. RPKI ROA Completeness (local validation against Cloudflare RPKI feed - all RIRs) // 13. RPKI ROA Completeness (local validation against Cloudflare RPKI feed - all RIRs)
await ensureAspaCache(); // Ensure ROA data is loaded await ensureAspaCache(); // Ensure ROA data is loaded
validationPromises.rpki_completeness = Promise.resolve( validationPromises.rpki_completeness = Promise.all(
allPrefixes.map(function(pfx) { return validateRPKILocal(rawAsn, pfx); }) allPrefixes.map(function(pfx) { return validateRPKIWithCache(rawAsn, pfx); })
).then(function(rpkiResults) { ).then(function(rpkiResults) {
var withRoa = rpkiResults.filter(function(r) { return r.status === "valid"; }); var withRoa = rpkiResults.filter(function(r) { return r.status === "valid"; });
var coverage = rpkiResults.length > 0 ? Math.round((withRoa.length / rpkiResults.length) * 100) : 0; var coverage = rpkiResults.length > 0 ? Math.round((withRoa.length / rpkiResults.length) * 100) : 0;
@ -1946,8 +1855,8 @@ const server = http.createServer(async (req, res) => {
} }
// 23. Resource Certification (local RPKI validation - all prefixes, all RIRs) // 23. Resource Certification (local RPKI validation - all prefixes, all RIRs)
validationPromises.resource_cert = Promise.resolve( validationPromises.resource_cert = Promise.all(
allPrefixes.map(function(pfx) { return validateRPKILocal(rawAsn, pfx); }) allPrefixes.map(function(pfx) { return validateRPKIWithCache(rawAsn, pfx); })
).then(function(results) { ).then(function(results) {
var hasRoa = results.some(function(r) { return r.status === "valid" || r.validating_roas > 0; }); var hasRoa = results.some(function(r) { return r.status === "valid" || r.validating_roas > 0; });
return { status: hasRoa ? "pass" : "fail", has_roas: hasRoa, checked: results.length, roa_count: results.filter(function(r) { return r.status === "valid"; }).length }; return { status: hasRoa ? "pass" : "fail", has_roas: hasRoa, checked: results.length, roa_count: results.filter(function(r) { return r.status === "valid"; }).length };
@ -2127,7 +2036,7 @@ const server = http.createServer(async (req, res) => {
// RPKI: validate ALL prefixes using local Cloudflare RPKI data (all 5 RIRs, instant) // RPKI: validate ALL prefixes using local Cloudflare RPKI data (all 5 RIRs, instant)
await ensureAspaCache(); await ensureAspaCache();
const allPrefixes = prefixes.map((p) => p.prefix); const allPrefixes = prefixes.map((p) => p.prefix);
const rpkiAllResults = allPrefixes.map((pfx) => validateRPKILocal(asn, pfx)); const rpkiAllResults = await Promise.all(allPrefixes.map((pfx) => validateRPKIWithCache(asn, pfx)));
const ixConnections = (ixlanData?.data || []) const ixConnections = (ixlanData?.data || [])
.map((ix) => ({ .map((ix) => ({
@ -3105,7 +3014,7 @@ setInterval(() => {
fetchRpkiAspaFeed(); fetchRpkiAspaFeed();
}, 4 * 60 * 60 * 1000); }, 4 * 60 * 60 * 1000);
// Refresh Atlas probe cache every 12 hours // Refresh Atlas probe cache every hour
setInterval(function() { setInterval(function() {
fetchAllAtlasProbes(); fetchAllAtlasProbes();
}, 12 * 60 * 60 * 1000); }, 12 * 60 * 60 * 1000);