PeerCortex/bio-rd-client.js
Rene Fichtmueller 344ee15338 feat(bio-rd): add local RIB integration via bio-rd gRPC client
Adds real-time local BGP RIB data as a complement to external APIs
(RIPE Stat, bgproutes.io) which have rate limits and 15min+ delays.

- bio-rd-client.js: gRPC client for bio-routing/bio-rd RIS service
  - LPM, Get, GetLonger, GetRouters, DumpRIB, ObserveRIB methods
  - IPv4/IPv6 encoding as uint64 pair (bio.net format)
  - Full BGP path decode: AS paths, communities, large communities
  - Graceful fallback if RIS unavailable (null/empty returns)
- protos/: bio-rd proto definitions (ris, bgp, session, route, net)
- server.js: three new endpoints + WebSocket stream
  - GET /api/rib/prefix — LPM + more-specifics via GetLonger
  - GET /api/rib/routers — list BMP-monitored routers
  - GET /api/rib/dump — full RIB dump with ASN filter + limit
  - WS /ws/rib — live ObserveRIB stream (add/withdraw events)
- package.json: @grpc/grpc-js + @grpc/proto-loader dependencies
2026-04-05 11:44:50 +02:00

449 lines
14 KiB
JavaScript

'use strict';
/**
* bio-rd gRPC client wrapper for PeerCortex.
*
* Wraps the RoutingInformationService (RIS) from bio-rd.
* proto package: bio.ris (cmd/ris/api/ris.proto)
* proto package: bio.net (net/api/net.proto)
* proto package: bio.route (route/api/route.proto)
*
* Usage:
* const { createRisClient } = require('./bio-rd-client');
* const ris = createRisClient('localhost', 4321);
* const routes = await ris.dumpRib('router1', '0:0', 0);
*/
const path = require('path');
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const PROTO_DIR = path.join(__dirname, 'protos');
const RIS_PROTO = path.join(PROTO_DIR, 'cmd/ris/api/ris.proto');
const LOADER_OPTIONS = {
keepCase: true,
longs: String, // uint64 fields come back as strings (avoids JS BigInt issues)
enums: String, // enum values come back as their string names
defaults: true,
oneofs: true,
includeDirs: [PROTO_DIR],
};
const HIDDEN_REASON_MAP = {
HiddenReasonNone: null,
HiddenReasonNextHopUnreachable: 'next-hop-unreachable',
HiddenReasonFilteredByPolicy: 'filtered-by-policy',
HiddenReasonASLoop: 'as-loop',
HiddenReasonOurOriginatorID: 'our-originator-id',
HiddenReasonClusterLoop: 'cluster-loop',
HiddenReasonOTCMismatch: 'otc-mismatch',
};
const ORIGIN_MAP = { 0: 'IGP', 1: 'EGP', 2: 'Incomplete' };
// ─── IP helpers ──────────────────────────────────────────────────────────────
/**
* Convert a dotted-decimal or colon-hex IP string to the bio.net.IP proto object.
* uint64 fields are passed as strings (matching longs:'String' loader option).
*/
function ipToProto(ipString) {
if (ipString.includes(':')) {
return _ipv6ToProto(ipString);
}
return _ipv4ToProto(ipString);
}
function _ipv4ToProto(ipString) {
const parts = ipString.split('.').map(Number);
if (parts.length !== 4 || parts.some(isNaN)) {
throw new Error(`Invalid IPv4 address: ${ipString}`);
}
const val = ((parts[0] << 24) | (parts[1] << 16) | (parts[2] << 8) | parts[3]) >>> 0;
return { higher: '0', lower: String(val), version: 'IPv4' };
}
function _ipv6ToProto(ipString) {
// Expand :: notation and parse into two uint64 halves
const expanded = _expandIPv6(ipString);
const groups = expanded.split(':').map(g => parseInt(g, 16));
const higher = BigInt(groups[0]) << 48n
| BigInt(groups[1]) << 32n
| BigInt(groups[2]) << 16n
| BigInt(groups[3]);
const lower = BigInt(groups[4]) << 48n
| BigInt(groups[5]) << 32n
| BigInt(groups[6]) << 16n
| BigInt(groups[7]);
return { higher: String(higher), lower: String(lower), version: 'IPv6' };
}
function _expandIPv6(addr) {
if (addr.includes('::')) {
const [left, right] = addr.split('::');
const leftGroups = left ? left.split(':') : [];
const rightGroups = right ? right.split(':') : [];
const missing = 8 - leftGroups.length - rightGroups.length;
const mid = Array(missing).fill('0');
return [...leftGroups, ...mid, ...rightGroups].join(':');
}
return addr;
}
/**
* Convert a bio.net.IP proto object back to a human-readable IP string.
*/
function protoToIp(protoIp) {
if (!protoIp) return null;
const version = protoIp.version || 'IPv4';
const lower = BigInt(protoIp.lower || '0');
const higher = BigInt(protoIp.higher || '0');
if (version === 'IPv4' || version === 0) {
const n = Number(lower);
return [(n >>> 24) & 0xff, (n >>> 16) & 0xff, (n >>> 8) & 0xff, n & 0xff].join('.');
}
// IPv6: reassemble 8 groups of 16 bits from two uint64 values
const mask16 = 0xffffn;
const groups = [
(higher >> 48n) & mask16,
(higher >> 32n) & mask16,
(higher >> 16n) & mask16,
higher & mask16,
(lower >> 48n) & mask16,
(lower >> 32n) & mask16,
(lower >> 16n) & mask16,
lower & mask16,
];
return groups.map(g => g.toString(16)).join(':');
}
/**
* Convert a CIDR string to a bio.net.Prefix proto object.
*/
function prefixToProto(cidr) {
const [ipStr, lenStr] = cidr.split('/');
const length = lenStr !== undefined ? parseInt(lenStr, 10) : (ipStr.includes(':') ? 128 : 32);
return { address: ipToProto(ipStr), length };
}
// ─── Route decoding ───────────────────────────────────────────────────────────
/**
* Decode a bio.route.Route proto object to a clean JS object.
* Handles both DumpRIBReply.route and direct Route messages.
*/
function decodeRoute(risRoute) {
if (!risRoute) return null;
// DumpRIBReply wraps the route in a .route field
const route = risRoute.route || risRoute;
if (!route || !route.pfx) return null;
const pfxAddr = protoToIp(route.pfx.address);
const pfxLen = route.pfx.length || 0;
const prefix = `${pfxAddr}/${pfxLen}`;
const paths = (route.paths || []).map(_decodePath);
// Flatten: return first path's attributes at top level, paths array for multi-path
const first = paths[0] || {};
return {
prefix,
nextHop: first.nextHop || null,
asPaths: first.asPaths || [],
communities: first.communities || [],
largeCommunities: first.largeCommunities || [],
localPref: first.localPref || 0,
med: first.med || 0,
origin: first.origin || null,
ebgp: first.ebgp || false,
hidden: first.hidden || false,
hiddenReason: first.hiddenReason || null,
paths,
};
}
function _decodePath(path) {
const hiddenReasonStr = typeof path.hidden_reason === 'string'
? path.hidden_reason
: Object.keys(HIDDEN_REASON_MAP)[path.hidden_reason] || 'HiddenReasonNone';
const hidden = hiddenReasonStr !== 'HiddenReasonNone';
const hiddenReason = HIDDEN_REASON_MAP[hiddenReasonStr] || null;
if (path.type === 'BGP' || path.type === 1) {
return _decodeBgpPath(path.bgp_path, hidden, hiddenReason);
}
if (path.type === 'Static' || path.type === 0) {
return _decodeStaticPath(path.static_path, hidden, hiddenReason);
}
return { nextHop: null, asPaths: [], communities: [], largeCommunities: [],
localPref: 0, med: 0, origin: null, ebgp: false, hidden, hiddenReason };
}
function _decodeBgpPath(bgp, hidden, hiddenReason) {
if (!bgp) return { nextHop: null, asPaths: [], communities: [], largeCommunities: [],
localPref: 0, med: 0, origin: null, ebgp: false, hidden, hiddenReason };
const asPaths = (bgp.as_path || []).flatMap(seg => seg.asns || []).map(Number);
const communities = (bgp.communities || []).map(c => {
const n = Number(c);
return `${(n >> 16) & 0xffff}:${n & 0xffff}`;
});
const largeCommunities = (bgp.large_communities || []).map(lc =>
`${lc.global_administrator}:${lc.data_part1}:${lc.data_part2}`
);
return {
nextHop: protoToIp(bgp.next_hop),
asPaths,
communities,
largeCommunities,
localPref: Number(bgp.local_pref || 0),
med: Number(bgp.med || 0),
origin: ORIGIN_MAP[bgp.origin] || 'Unknown',
ebgp: bgp.ebgp || false,
hidden,
hiddenReason,
};
}
function _decodeStaticPath(staticPath, hidden, hiddenReason) {
return {
nextHop: staticPath ? protoToIp(staticPath.next_hop) : null,
asPaths: [], communities: [], largeCommunities: [],
localPref: 0, med: 0, origin: 'Static', ebgp: false,
hidden, hiddenReason,
};
}
// ─── Client factory ───────────────────────────────────────────────────────────
/**
* Load the RIS proto definition (cached after first call).
*/
let _packageDef = null;
function _loadPackageDef() {
if (_packageDef) return _packageDef;
_packageDef = protoLoader.loadSync(RIS_PROTO, LOADER_OPTIONS);
return _packageDef;
}
/**
* Create a RIS gRPC client.
* Returns null if the package definition cannot be loaded.
*/
function createRisClient(host = 'localhost', port = 4321) {
let RisService;
try {
const packageDef = _loadPackageDef();
const proto = grpc.loadPackageDefinition(packageDef);
RisService = proto.bio.ris.RoutingInformationService;
} catch (err) {
console.error('[bio-rd] Failed to load proto definition:', err.message);
return null;
}
const address = `${host}:${port}`;
const channelOptions = {
'grpc.connect_timeout_ms': 5000,
'grpc.initial_reconnect_backoff_ms': 1000,
};
const channel = new grpc.Channel(
address,
grpc.credentials.createInsecure(),
channelOptions
);
const client = new RisService(address, grpc.credentials.createInsecure(), channelOptions);
// ── Unary helper ────────────────────────────────────────────────────────────
function _unary(method, request) {
return new Promise((resolve, reject) => {
const deadline = new Date(Date.now() + 5000);
client[method](request, { deadline }, (err, response) => {
if (err) return reject(err);
resolve(response);
});
});
}
// ── Stream collector ────────────────────────────────────────────────────────
function _collectStream(method, request) {
return new Promise((resolve, reject) => {
const results = [];
const deadline = new Date(Date.now() + 30000);
const call = client[method](request, { deadline });
call.on('data', chunk => results.push(chunk));
call.on('error', reject);
call.on('end', () => resolve(results));
});
}
// ── Public API ──────────────────────────────────────────────────────────────
/**
* Longest Prefix Match for a given prefix.
* Returns the best matching route or null.
*/
async function lpm(router, prefix) {
try {
const response = await _unary('lPM', {
router,
vrf: '0:0',
pfx: prefixToProto(prefix),
});
const routes = (response.routes || []).map(decodeRoute).filter(Boolean);
return routes[0] || null;
} catch (err) {
console.warn(`[bio-rd] lpm(${router}, ${prefix}) failed:`, err.message);
return null;
}
}
/**
* Exact prefix lookup.
* Returns the matching route or null.
*/
async function get(router, prefix) {
try {
const response = await _unary('get', {
router,
vrf: '0:0',
pfx: prefixToProto(prefix),
});
const routes = (response.routes || []).map(decodeRoute).filter(Boolean);
return routes[0] || null;
} catch (err) {
console.warn(`[bio-rd] get(${router}, ${prefix}) failed:`, err.message);
return null;
}
}
/**
* Get all more-specific prefixes covered by the given prefix.
* Returns an array of decoded routes.
*/
async function getLonger(router, prefix) {
try {
const response = await _unary('getLonger', {
router,
vrf: '0:0',
pfx: prefixToProto(prefix),
});
return (response.routes || []).map(decodeRoute).filter(Boolean);
} catch (err) {
console.warn(`[bio-rd] getLonger(${router}, ${prefix}) failed:`, err.message);
return [];
}
}
/**
* Get all routers known to this RIS instance.
* Returns an array of { name, address } objects.
*/
async function getRouters() {
try {
const response = await _unary('getRouters', {});
return (response.routers || []).map(r => ({
name: r.sys_name || '',
address: r.address || '',
}));
} catch (err) {
console.warn('[bio-rd] getRouters() failed:', err.message);
return [];
}
}
/**
* Dump the full RIB for a router/VRF, optionally filtered by origin ASN.
* Collects the entire stream and returns an array of decoded routes.
*
* @param {string} router - Router identifier (e.g. "router1")
* @param {string} vrfName - VRF name (e.g. "0:0" for default)
* @param {number} originAsn - Filter by originating AS (0 = no filter)
*/
async function dumpRib(router, vrfName = '0:0', originAsn = 0) {
try {
const request = {
router,
vrf: vrfName,
afisafi: 'IPv4Unicast',
filter: { originating_asn: originAsn || 0 },
};
const chunks = await _collectStream('dumpRIB', request);
return chunks.map(decodeRoute).filter(Boolean);
} catch (err) {
console.warn(`[bio-rd] dumpRib(${router}) failed:`, err.message);
return [];
}
}
/**
* Observe real-time RIB updates for a router/VRF.
*
* @param {string} router - Router identifier
* @param {string} vrfName - VRF name (e.g. "0:0")
* @param {function} onUpdate - Called with { type: 'add'|'withdraw', route }
* @param {function} onError - Called with the error on stream failure
* @returns {function} cancel - Call to stop the observation
*/
function observeRib(router, vrfName = '0:0', onUpdate, onError) {
const request = {
router,
vrf: vrfName,
afisafi: 'IPv4Unicast',
};
let call;
try {
call = client.observeRIB(request);
} catch (err) {
if (onError) onError(err);
return () => {};
}
call.on('data', (update) => {
if (!update || !update.route) return;
const type = update.advertisement ? 'add' : 'withdraw';
const route = decodeRoute(update.route);
if (route && onUpdate) onUpdate({ type, route });
});
call.on('error', (err) => {
console.warn(`[bio-rd] observeRib(${router}) stream error:`, err.message);
if (onError) onError(err);
});
call.on('end', () => {
console.log(`[bio-rd] observeRib(${router}) stream ended`);
});
return () => {
try { call.cancel(); } catch (_) {}
};
}
// ── Cleanup ─────────────────────────────────────────────────────────────────
function close() {
try { channel.close(); } catch (_) {}
try { client.close(); } catch (_) {}
}
return { lpm, get, getLonger, getRouters, dumpRib, observeRib, close };
}
module.exports = {
createRisClient,
ipToProto,
prefixToProto,
protoToIp,
decodeRoute,
};