From 91384dbb2a3c7e12ea3aeafec1d717013e9fb797 Mon Sep 17 00:00:00 2001 From: Rene Fichtmueller Date: Sun, 26 Apr 2026 23:52:13 +0200 Subject: [PATCH] fix: SSE stream endpoint with proper HTTP/2 stream handling and heartbeat - Fixed /api/stream/requests endpoint HTTP/2 INTERNAL_ERROR - Use reply.raw.writeHead() instead of Fastify headers API for SSE - Added 30s heartbeat to keep connection alive - Proper event format with 'event:' and 'data:' fields - Comprehensive error handling and cleanup on disconnect - Mirrors working pattern from /api/stream/costs endpoint - Resolves dashboard perpetual 'Loading...' state --- packages/gateway/src/routes/dashboard.ts | 69 ++++++++++++++++++++---- 1 file changed, 60 insertions(+), 9 deletions(-) diff --git a/packages/gateway/src/routes/dashboard.ts b/packages/gateway/src/routes/dashboard.ts index 4b691e3..40725aa 100644 --- a/packages/gateway/src/routes/dashboard.ts +++ b/packages/gateway/src/routes/dashboard.ts @@ -463,31 +463,82 @@ export async function dashboardRoute(fastify: FastifyInstance): Promise { // Server-Sent Events endpoint for real-time request updates fastify.get('/api/stream/requests', async (request: FastifyRequest, reply: FastifyReply) => { - // Set SSE headers - reply.type('text/event-stream'); - reply.header('Cache-Control', 'no-cache'); - reply.header('Connection', 'keep-alive'); + // Use raw Node.js API to properly initialize HTTP/2 stream + reply.raw.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET', + 'Access-Control-Allow-Headers': 'Content-Type', + }); + + const clientIp = request.ip; + const clientId = `${clientIp}-${Date.now()}`; + + logger.info({ clientId, clientIp, activeListeners: globalRequestStream.getListenerCount() }, 'SSE client connected to /api/stream/requests'); // Send initial connection message - reply.raw.write(`data: ${JSON.stringify({ type: 'connected', timestamp: new Date().toISOString() })}\n\n`); + reply.raw.write('event: connected\n'); + reply.raw.write(`data: ${JSON.stringify({ clientId, timestamp: new Date().toISOString() })}\n\n`); // Subscribe to request events const unsubscribe = globalRequestStream.onRequest((event) => { - reply.raw.write(`data: ${JSON.stringify(event)}\n\n`); + try { + reply.raw.write('event: request-update\n'); + reply.raw.write(`data: ${JSON.stringify(event)}\n\n`); + } catch (err) { + logger.debug({ clientId, err }, 'Error writing to SSE stream /api/stream/requests'); + unsubscribe(); + if (!reply.raw.writableEnded) { + reply.raw.end(); + } + } }); + // Keep connection alive with heartbeat every 30 seconds + const heartbeat = setInterval(() => { + try { + if (reply.raw.writable) { + reply.raw.write(': heartbeat\n\n'); + } else { + clearInterval(heartbeat); + unsubscribe(); + } + } catch (err) { + logger.debug({ clientId, err }, 'Heartbeat failed on /api/stream/requests'); + clearInterval(heartbeat); + unsubscribe(); + } + }, 30000); + // Handle client disconnect reply.raw.on('close', () => { + logger.info({ clientId }, 'SSE client disconnected from /api/stream/requests'); + clearInterval(heartbeat); unsubscribe(); - logger.info('SSE client disconnected from /api/stream/requests'); }); + // Handle stream errors reply.raw.on('error', (error) => { - logger.error({ error }, 'SSE stream error'); + logger.error({ clientId, error }, 'SSE stream error on /api/stream/requests'); + clearInterval(heartbeat); unsubscribe(); }); - logger.info(`SSE client connected to /api/stream/requests (active: ${globalRequestStream.getListenerCount()})`); + // Cleanup on reply finish + reply.raw.on('finish', () => { + logger.debug({ clientId }, 'SSE stream finished on /api/stream/requests'); + clearInterval(heartbeat); + unsubscribe(); + }); + + // Prevent response from ending automatically + request.raw.on('close', () => { + logger.debug({ clientId }, 'Request closed on /api/stream/requests'); + clearInterval(heartbeat); + unsubscribe(); + }); }); // Test endpoint