Rene Fichtmueller ae0bda9e06 feat: proxy network — geo-lookup, uptime tracking, dedup fix
- IP geo-lookup via ip-api.com on register/heartbeat (country_code, city)
- heartbeat_count column + uptime_pct computation on every heartbeat
- Deduplication: register returns existing token for same IP+port
- Heartbeat no longer overwrites registered IP (prevents IPv6 churn conflicts)
- Migration 023: heartbeat_count column + backfill existing nodes
2026-04-04 08:15:32 +02:00

398 lines
13 KiB
TypeScript

/**
* Proxy Network API routes
*
* POST /api/proxy/register — register new node, returns token
* POST /api/proxy/heartbeat — update node status + stats
* GET /api/proxy/nodes — list all nodes (admin)
* GET /api/proxy/next — get best available proxy for scraper use
* POST /api/proxy/rotate — mark node as used, get next
* GET /api/proxy/stats — network-wide stats
*
* Note: register + heartbeat are public (no auth required).
* nodes + rotate require admin token via X-Admin-Token header.
*/
import { Router, Request, Response } from "express";
import * as crypto from "crypto";
import * as http from "http";
import { pool } from "../db/client";
/** Lookup country+city for an IP via ip-api.com (free, no key required) */
async function geoLookup(ip: string): Promise<{ country: string | null; city: string | null }> {
// Skip private/loopback IPs
if (!ip || ip === "::1" || ip.startsWith("127.") || ip.startsWith("10.") ||
ip.startsWith("192.168.") || ip.startsWith("172.")) {
return { country: null, city: null };
}
return new Promise((resolve) => {
const timeout = setTimeout(() => resolve({ country: null, city: null }), 3000);
http.get(`http://ip-api.com/json/${ip}?fields=countryCode,city,status`, (res) => {
let data = "";
res.on("data", (chunk) => { data += chunk; });
res.on("end", () => {
clearTimeout(timeout);
try {
const parsed = JSON.parse(data) as { status: string; countryCode?: string; city?: string };
if (parsed.status === "success") {
resolve({
country: parsed.countryCode ?? null,
city: parsed.city ?? null,
});
} else {
resolve({ country: null, city: null });
}
} catch {
resolve({ country: null, city: null });
}
});
}).on("error", () => {
clearTimeout(timeout);
resolve({ country: null, city: null });
});
});
}
export const proxyRouter = Router();
const ADMIN_TOKEN = process.env.TIP_ADMIN_TOKEN ?? "tip-admin-2026";
function requireAdmin(req: Request, res: Response): boolean {
const token =
req.headers["x-admin-token"] ??
req.query["admin_token"] ??
"";
if (token !== ADMIN_TOKEN) {
res.status(401).json({ error: "Admin token required" });
return false;
}
return true;
}
function generateToken(): string {
return crypto.randomBytes(32).toString("hex");
}
// Round-robin index tracker (in-memory, resets on restart)
let rrIndex = 0;
// POST /api/proxy/register
proxyRouter.post("/register", async (req: Request, res: Response) => {
try {
const { token: existingToken, name, owner_email, port, version } = req.body as {
token?: string;
name?: string;
owner_email?: string;
port?: number;
version?: string;
};
const ip =
(req.headers["x-forwarded-for"] as string | undefined)?.split(",")[0]?.trim() ??
req.socket.remoteAddress ??
"";
const nodePort = port ?? 1080;
// Deduplication: if no token provided, check for existing node at same IP+port
if (!existingToken && ip) {
const existing = await pool.query(
`SELECT id, token FROM proxy_nodes WHERE ip = $1 AND port = $2 LIMIT 1`,
[ip, nodePort]
);
if (existing.rows.length > 0) {
const node = existing.rows[0];
// Update metadata and mark online
await pool.query(
`UPDATE proxy_nodes SET name = COALESCE($2, name), version = COALESCE($3, version),
status = 'online', last_seen = NOW() WHERE id = $1`,
[node.id, name ?? null, version ?? null]
);
res.json({
success: true,
token: node.token,
nodeId: node.id,
message: "Existing node found for this IP+port — token returned.",
});
return;
}
}
// Geo-lookup (async, non-blocking for registration flow)
const geo = await geoLookup(ip);
// If token provided, upsert — otherwise create new
const token = existingToken ?? generateToken();
const result = await pool.query(
`INSERT INTO proxy_nodes (token, name, owner_email, ip, port, version, status, last_seen, country_code, city)
VALUES ($1, $2, $3, $4, $5, $6, 'online', NOW(), $7, $8)
ON CONFLICT (token) DO UPDATE SET
name = COALESCE(EXCLUDED.name, proxy_nodes.name),
ip = EXCLUDED.ip,
port = COALESCE(EXCLUDED.port, proxy_nodes.port),
version = COALESCE(EXCLUDED.version, proxy_nodes.version),
country_code = COALESCE(EXCLUDED.country_code, proxy_nodes.country_code),
city = COALESCE(EXCLUDED.city, proxy_nodes.city),
status = 'online',
last_seen = NOW()
RETURNING id, token, name, status`,
[token, name ?? null, owner_email ?? null, ip, nodePort, version ?? null,
geo.country, geo.city]
);
const node = result.rows[0];
res.json({
success: true,
token: node.token,
nodeId: node.id,
message: "Node registered. Keep this token safe!",
});
} catch (err) {
console.error("[proxy] register error:", err);
res.status(500).json({ error: "Registration failed" });
}
});
// POST /api/proxy/heartbeat
proxyRouter.post("/heartbeat", async (req: Request, res: Response) => {
try {
const {
token,
ip,
port,
bytesProxied,
requestsProxied,
latencyMs,
version,
} = req.body as {
token: string;
ip?: string;
port?: number;
bytesProxied?: number;
requestsProxied?: number;
latencyMs?: number;
version?: string;
};
if (!token) {
res.status(400).json({ error: "token required" });
return;
}
const remoteIp =
ip ??
(req.headers["x-forwarded-for"] as string | undefined)?.split(",")[0]?.trim() ??
req.socket.remoteAddress ??
"";
const bytesGb = (bytesProxied ?? 0) / (1024 ** 3);
// Geo-lookup only if country_code not yet set for this node
const geoCheckResult = await pool.query(
`SELECT country_code, ip FROM proxy_nodes WHERE token = $1`,
[token]
);
let geo: { country: string | null; city: string | null } = { country: null, city: null };
if (geoCheckResult.rows.length > 0 && !geoCheckResult.rows[0].country_code) {
// Use the stored IP (registered IP is more reliable than heartbeat source IP)
const lookupIp = geoCheckResult.rows[0].ip || remoteIp;
if (lookupIp) geo = await geoLookup(lookupIp);
}
const result = await pool.query(
`UPDATE proxy_nodes SET
status = 'online',
last_seen = NOW(),
port = COALESCE($2, port),
bytes_proxied = COALESCE($3, bytes_proxied),
requests_proxied = COALESCE($4, requests_proxied),
latency_ms = COALESCE($5, latency_ms),
version = COALESCE($6, version),
bandwidth_used_gb = COALESCE($7, bandwidth_used_gb),
country_code = COALESCE($8, country_code),
city = COALESCE($9, city),
heartbeat_count = heartbeat_count + 1,
uptime_pct = LEAST(99.9,
ROUND(
((heartbeat_count + 1) * 30.0 /
GREATEST(1, EXTRACT(EPOCH FROM (NOW() - registered_at)))) * 100,
2
)
)
WHERE token = $1
RETURNING id, status`,
[
token,
port ?? null,
bytesProxied ?? null,
requestsProxied ?? null,
latencyMs ?? null,
version ?? null,
bytesGb > 0 ? bytesGb : null,
geo.country,
geo.city,
]
);
if (result.rowCount === 0) {
res.status(404).json({ error: "Node not found. Please register first." });
return;
}
res.json({ success: true, status: "online" });
} catch (err) {
console.error("[proxy] heartbeat error:", err);
res.status(500).json({ error: "Heartbeat failed" });
}
});
// GET /api/proxy/nodes (admin)
proxyRouter.get("/nodes", async (req: Request, res: Response) => {
if (!requireAdmin(req, res)) return;
try {
const result = await pool.query(
`SELECT id, token, name, country_code, city, ip, port, protocol,
status, last_seen, bytes_proxied, requests_proxied,
uptime_pct, bandwidth_limit_gb, bandwidth_used_gb,
latency_ms, version, registered_at
FROM proxy_nodes
ORDER BY status = 'online' DESC, last_seen DESC`
);
res.json({ success: true, nodes: result.rows, total: result.rowCount });
} catch (err) {
console.error("[proxy] nodes error:", err);
res.status(500).json({ error: "Failed to fetch nodes" });
}
});
// GET /api/proxy/next — best available proxy for scraper use
proxyRouter.get("/next", async (_req: Request, res: Response) => {
try {
const result = await pool.query(
`SELECT id, ip, port, protocol, country_code, city, latency_ms
FROM proxy_nodes
WHERE status = 'online'
AND last_seen > NOW() - INTERVAL '2 minutes'
AND (bandwidth_used_gb < bandwidth_limit_gb OR bandwidth_limit_gb = 0)
ORDER BY uptime_pct DESC NULLS LAST, latency_ms ASC NULLS LAST
LIMIT 10`
);
if (result.rowCount === 0) {
res.status(503).json({
error: "No proxies available",
fallback: process.env.PROXY_URL ?? null,
});
return;
}
const nodes = result.rows;
const node = nodes[rrIndex % nodes.length];
rrIndex = (rrIndex + 1) % nodes.length;
res.json({
success: true,
host: node.ip,
port: node.port,
protocol: node.protocol ?? "socks5",
nodeId: node.id,
country: node.country_code,
city: node.city,
latencyMs: node.latency_ms,
});
} catch (err) {
console.error("[proxy] next error:", err);
res.status(500).json({ error: "Failed to get next proxy" });
}
});
// POST /api/proxy/rotate — mark node as used, get next one
proxyRouter.post("/rotate", async (req: Request, res: Response) => {
if (!requireAdmin(req, res)) return;
try {
const { nodeId } = req.body as { nodeId?: string };
if (nodeId) {
// Optionally mark as temporarily degraded (just log — no ban)
await pool.query(
`UPDATE proxy_nodes SET requests_proxied = requests_proxied + 1 WHERE id = $1`,
[nodeId]
);
}
// Return next proxy
const result = await pool.query(
`SELECT id, ip, port, protocol, country_code
FROM proxy_nodes
WHERE status = 'online'
AND last_seen > NOW() - INTERVAL '2 minutes'
AND id != COALESCE($1::uuid, '00000000-0000-0000-0000-000000000000')
ORDER BY requests_proxied ASC, latency_ms ASC NULLS LAST
LIMIT 1`,
[nodeId ?? null]
);
if (result.rowCount === 0) {
res.status(503).json({ error: "No other proxies available" });
return;
}
const node = result.rows[0];
res.json({
success: true,
host: node.ip,
port: node.port,
protocol: node.protocol ?? "socks5",
nodeId: node.id,
country: node.country_code,
});
} catch (err) {
console.error("[proxy] rotate error:", err);
res.status(500).json({ error: "Failed to rotate proxy" });
}
});
// GET /api/proxy/stats — public network stats
proxyRouter.get("/stats", async (_req: Request, res: Response) => {
try {
const result = await pool.query(`
SELECT
COUNT(*) AS total_nodes,
COUNT(*) FILTER (WHERE status = 'online') AS online_nodes,
COUNT(*) FILTER (WHERE status = 'offline') AS offline_nodes,
COUNT(DISTINCT country_code) AS countries,
COALESCE(SUM(bandwidth_used_gb), 0) AS total_gb_proxied,
COALESCE(SUM(requests_proxied), 0) AS total_requests,
COALESCE(AVG(latency_ms) FILTER (WHERE status = 'online'), 0) AS avg_latency_ms
FROM proxy_nodes
`);
const row = result.rows[0] ?? {};
// Per-country breakdown
const countryResult = await pool.query(`
SELECT country_code, COUNT(*) AS nodes,
COUNT(*) FILTER (WHERE status = 'online') AS online
FROM proxy_nodes
WHERE country_code IS NOT NULL
GROUP BY country_code
ORDER BY online DESC, nodes DESC
`);
res.json({
success: true,
network: {
totalNodes: Number(row.total_nodes ?? 0),
onlineNodes: Number(row.online_nodes ?? 0),
offlineNodes: Number(row.offline_nodes ?? 0),
countries: Number(row.countries ?? 0),
totalGbProxied: parseFloat(Number(row.total_gb_proxied ?? 0).toFixed(2)),
totalRequests: Number(row.total_requests ?? 0),
avgLatencyMs: Math.round(Number(row.avg_latency_ms ?? 0)),
},
countries: countryResult.rows,
});
} catch (err) {
console.error("[proxy] stats error:", err);
res.status(500).json({ error: "Failed to get stats" });
}
});