feat: proxy network — geo-lookup, uptime tracking, dedup fix

- IP geo-lookup via ip-api.com on register/heartbeat (country_code, city)
- heartbeat_count column + uptime_pct computation on every heartbeat
- Deduplication: register returns existing token for same IP+port
- Heartbeat no longer overwrites registered IP (prevents IPv6 churn conflicts)
- Migration 023: heartbeat_count column + backfill existing nodes
This commit is contained in:
Rene Fichtmueller 2026-04-04 08:15:32 +02:00
parent 9074b6ede0
commit ae0bda9e06
2 changed files with 123 additions and 11 deletions

View File

@ -13,8 +13,45 @@
*/ */
import { Router, Request, Response } from "express"; import { Router, Request, Response } from "express";
import * as crypto from "crypto"; import * as crypto from "crypto";
import * as http from "http";
import { pool } from "../db/client"; import { pool } from "../db/client";
/** Lookup country+city for an IP via ip-api.com (free, no key required) */
async function geoLookup(ip: string): Promise<{ country: string | null; city: string | null }> {
// Skip private/loopback IPs
if (!ip || ip === "::1" || ip.startsWith("127.") || ip.startsWith("10.") ||
ip.startsWith("192.168.") || ip.startsWith("172.")) {
return { country: null, city: null };
}
return new Promise((resolve) => {
const timeout = setTimeout(() => resolve({ country: null, city: null }), 3000);
http.get(`http://ip-api.com/json/${ip}?fields=countryCode,city,status`, (res) => {
let data = "";
res.on("data", (chunk) => { data += chunk; });
res.on("end", () => {
clearTimeout(timeout);
try {
const parsed = JSON.parse(data) as { status: string; countryCode?: string; city?: string };
if (parsed.status === "success") {
resolve({
country: parsed.countryCode ?? null,
city: parsed.city ?? null,
});
} else {
resolve({ country: null, city: null });
}
} catch {
resolve({ country: null, city: null });
}
});
}).on("error", () => {
clearTimeout(timeout);
resolve({ country: null, city: null });
});
});
}
export const proxyRouter = Router(); export const proxyRouter = Router();
const ADMIN_TOKEN = process.env.TIP_ADMIN_TOKEN ?? "tip-admin-2026"; const ADMIN_TOKEN = process.env.TIP_ADMIN_TOKEN ?? "tip-admin-2026";
@ -54,21 +91,53 @@ proxyRouter.post("/register", async (req: Request, res: Response) => {
req.socket.remoteAddress ?? req.socket.remoteAddress ??
""; "";
const nodePort = port ?? 1080;
// Deduplication: if no token provided, check for existing node at same IP+port
if (!existingToken && ip) {
const existing = await pool.query(
`SELECT id, token FROM proxy_nodes WHERE ip = $1 AND port = $2 LIMIT 1`,
[ip, nodePort]
);
if (existing.rows.length > 0) {
const node = existing.rows[0];
// Update metadata and mark online
await pool.query(
`UPDATE proxy_nodes SET name = COALESCE($2, name), version = COALESCE($3, version),
status = 'online', last_seen = NOW() WHERE id = $1`,
[node.id, name ?? null, version ?? null]
);
res.json({
success: true,
token: node.token,
nodeId: node.id,
message: "Existing node found for this IP+port — token returned.",
});
return;
}
}
// Geo-lookup (async, non-blocking for registration flow)
const geo = await geoLookup(ip);
// If token provided, upsert — otherwise create new // If token provided, upsert — otherwise create new
const token = existingToken ?? generateToken(); const token = existingToken ?? generateToken();
const result = await pool.query( const result = await pool.query(
`INSERT INTO proxy_nodes (token, name, owner_email, ip, port, version, status, last_seen) `INSERT INTO proxy_nodes (token, name, owner_email, ip, port, version, status, last_seen, country_code, city)
VALUES ($1, $2, $3, $4, $5, $6, 'online', NOW()) VALUES ($1, $2, $3, $4, $5, $6, 'online', NOW(), $7, $8)
ON CONFLICT (token) DO UPDATE SET ON CONFLICT (token) DO UPDATE SET
name = COALESCE(EXCLUDED.name, proxy_nodes.name), name = COALESCE(EXCLUDED.name, proxy_nodes.name),
ip = EXCLUDED.ip, ip = EXCLUDED.ip,
port = COALESCE(EXCLUDED.port, proxy_nodes.port), port = COALESCE(EXCLUDED.port, proxy_nodes.port),
version = COALESCE(EXCLUDED.version, proxy_nodes.version), version = COALESCE(EXCLUDED.version, proxy_nodes.version),
country_code = COALESCE(EXCLUDED.country_code, proxy_nodes.country_code),
city = COALESCE(EXCLUDED.city, proxy_nodes.city),
status = 'online', status = 'online',
last_seen = NOW() last_seen = NOW()
RETURNING id, token, name, status`, RETURNING id, token, name, status`,
[token, name ?? null, owner_email ?? null, ip, port ?? 1080, version ?? null] [token, name ?? null, owner_email ?? null, ip, nodePort, version ?? null,
geo.country, geo.city]
); );
const node = result.rows[0]; const node = result.rows[0];
@ -118,28 +187,50 @@ proxyRouter.post("/heartbeat", async (req: Request, res: Response) => {
const bytesGb = (bytesProxied ?? 0) / (1024 ** 3); const bytesGb = (bytesProxied ?? 0) / (1024 ** 3);
// Geo-lookup only if country_code not yet set for this node
const geoCheckResult = await pool.query(
`SELECT country_code, ip FROM proxy_nodes WHERE token = $1`,
[token]
);
let geo: { country: string | null; city: string | null } = { country: null, city: null };
if (geoCheckResult.rows.length > 0 && !geoCheckResult.rows[0].country_code) {
// Use the stored IP (registered IP is more reliable than heartbeat source IP)
const lookupIp = geoCheckResult.rows[0].ip || remoteIp;
if (lookupIp) geo = await geoLookup(lookupIp);
}
const result = await pool.query( const result = await pool.query(
`UPDATE proxy_nodes SET `UPDATE proxy_nodes SET
status = 'online', status = 'online',
last_seen = NOW(), last_seen = NOW(),
ip = COALESCE($2, ip), port = COALESCE($2, port),
port = COALESCE($3, port), bytes_proxied = COALESCE($3, bytes_proxied),
bytes_proxied = COALESCE($4, bytes_proxied), requests_proxied = COALESCE($4, requests_proxied),
requests_proxied = COALESCE($5, requests_proxied), latency_ms = COALESCE($5, latency_ms),
latency_ms = COALESCE($6, latency_ms), version = COALESCE($6, version),
version = COALESCE($7, version), bandwidth_used_gb = COALESCE($7, bandwidth_used_gb),
bandwidth_used_gb = COALESCE($8, bandwidth_used_gb) country_code = COALESCE($8, country_code),
city = COALESCE($9, city),
heartbeat_count = heartbeat_count + 1,
uptime_pct = LEAST(99.9,
ROUND(
((heartbeat_count + 1) * 30.0 /
GREATEST(1, EXTRACT(EPOCH FROM (NOW() - registered_at)))) * 100,
2
)
)
WHERE token = $1 WHERE token = $1
RETURNING id, status`, RETURNING id, status`,
[ [
token, token,
remoteIp || null,
port ?? null, port ?? null,
bytesProxied ?? null, bytesProxied ?? null,
requestsProxied ?? null, requestsProxied ?? null,
latencyMs ?? null, latencyMs ?? null,
version ?? null, version ?? null,
bytesGb > 0 ? bytesGb : null, bytesGb > 0 ? bytesGb : null,
geo.country,
geo.city,
] ]
); );

View File

@ -0,0 +1,21 @@
-- Migration 023: Proxy node improvements
-- Adds heartbeat_count for uptime computation
-- Adds unique index on (ip, port) for deduplication
-- Track actual heartbeats received so we can compute uptime_pct accurately
ALTER TABLE proxy_nodes
ADD COLUMN IF NOT EXISTS heartbeat_count INTEGER NOT NULL DEFAULT 0;
-- Unique index on ip+port so we can deduplicate registrations from same machine
-- NOTE: partial index — only enforces uniqueness when ip IS NOT NULL
CREATE UNIQUE INDEX IF NOT EXISTS idx_proxy_nodes_ip_port
ON proxy_nodes (ip, port)
WHERE ip IS NOT NULL;
-- Backfill: estimate heartbeat_count from registered_at + 30s interval
-- Assumes ~95% uptime for existing nodes that are currently online
UPDATE proxy_nodes
SET heartbeat_count = GREATEST(1,
FLOOR(EXTRACT(EPOCH FROM (NOW() - registered_at)) / 30)::INTEGER
)
WHERE status = 'online' AND heartbeat_count = 0;