From 4989c4affd65d753e56adbdc9cdd463467743c41 Mon Sep 17 00:00:00 2001 From: Rene Fichtmueller Date: Mon, 6 Apr 2026 02:51:28 +0200 Subject: [PATCH] fix(blog): fix claudeQueue deadlock from recursive 429 retry The generateClaude() function was recursively calling itself inside enqueueClaude(), creating a circular Promise dependency that permanently deadlocked the claudeQueue. Any 429 rate-limit response would poison the queue, blocking all future Claude API calls until server restart. Fixes: - Split retries into claudeApiCall() which is called from enqueueClaude (not re-entering the queue on retry = no circular dependency) - Max 3 retries with increasing backoff (10s/30s/60s) - Add resetClaudeQueue() exported function - Add 15-minute auto-reset stall detection to enqueueClaude - Expose resetClaudeQueue in POST /api/blog/llm/reset-queue endpoint - Fix merge conflict markers in index.ts (duplicate scraperRouter import) --- packages/api/src/index.ts | 3 -- packages/api/src/llm/client.ts | 53 +++++++++++++++++++++++++-------- packages/api/src/routes/blog.ts | 7 +++-- 3 files changed, 44 insertions(+), 19 deletions(-) diff --git a/packages/api/src/index.ts b/packages/api/src/index.ts index 62e7def..f09c69f 100644 --- a/packages/api/src/index.ts +++ b/packages/api/src/index.ts @@ -26,7 +26,6 @@ import { hotTopicsRouter } from "./routes/hot-topics"; import { adoptionRouter } from "./routes/adoption"; import { procurementRouter } from "./routes/procurement"; import { changelogRouter } from "./routes/changelog"; -import { scraperRouter } from "./routes/scrapers"; import { newsRouter } from "./routes/news"; import { proxyRouter } from "./routes/proxy"; @@ -74,7 +73,6 @@ app.use("/api/search", searchRouter); app.use("/api/documents", documentRouter); app.use("/api/blog", blogSllRouter); app.use("/api/blog", blogRouter); -<<<<<<< Updated upstream app.use("/api/scrapers", scraperRouter); app.use("/api/finder", finderRouter); app.use("/api/competitor-alerts", competitorRouter); @@ -85,7 +83,6 @@ app.use("/api/adoption", adoptionRouter); app.use("/api/hot-topics", hotTopicsRouter); app.use("/api/procurement", procurementRouter); app.use("/api/changelog", changelogRouter); -app.use("/api/scrapers", scraperRouter); app.use("/api/news", newsRouter); // Dashboard (static HTML) diff --git a/packages/api/src/llm/client.ts b/packages/api/src/llm/client.ts index 5d97c7d..a95c009 100644 --- a/packages/api/src/llm/client.ts +++ b/packages/api/src/llm/client.ts @@ -34,24 +34,39 @@ function sleep(ms: number): Promise { // Serialize Claude API calls to stay within TPM limits // Tier-1 has 40,000 TPM — with ~20K tokens/step, only 1 concurrent call safe let claudeQueue: Promise = Promise.resolve(); +let claudeQueueEnqueueTime = 0; + +export function resetClaudeQueue(): void { + claudeQueue = Promise.resolve(); + claudeQueueEnqueueTime = 0; + console.log("[LLM] Claude queue reset — previous stuck requests cleared"); +} function enqueueClaude(fn: () => Promise): Promise { - const result = claudeQueue.then(() => fn()); + claudeQueueEnqueueTime = Date.now(); + const result = claudeQueue.then(() => { + // Auto-reset if queue has been stalled > 15 minutes (prevents deadlock on stuck requests) + if (Date.now() - claudeQueueEnqueueTime > 900000) { + console.warn("[LLM] Claude queue auto-reset after 15min stall"); + return Promise.reject(new Error("Claude queue auto-reset: previous request timed out")); + } + return fn(); + }); claudeQueue = result.catch(() => {}); return result; } -async function generateClaude( +// Direct API call without going through the serialization queue — used for 429 retries +// to avoid the circular-promise deadlock that recursive enqueueClaude creates +async function claudeApiCall( systemPrompt: string, userPrompt: string, options?: { temperature?: number; maxTokens?: number; timeoutMs?: number }, + retryCount = 0, ): Promise { - if (!ANTHROPIC_API_KEY) { - throw new Error("ANTHROPIC_API_KEY not set — cannot use Claude provider"); - } - - return enqueueClaude(async () => { const startTime = Date.now(); + const MAX_RETRIES = 3; + const RETRY_DELAYS = [10000, 30000, 60000]; const resp = await fetch("https://api.anthropic.com/v1/messages", { method: "POST", @@ -72,11 +87,11 @@ async function generateClaude( if (!resp.ok) { const errText = await resp.text(); - // Rate limit retry - if (resp.status === 429) { - console.log("[LLM] Claude 429 — retrying in 10s..."); - await sleep(10000); - return generateClaude(systemPrompt, userPrompt, options); + if (resp.status === 429 && retryCount < MAX_RETRIES) { + const delay = RETRY_DELAYS[retryCount] ?? 60000; + console.log(`[LLM] Claude 429 — retrying in ${delay / 1000}s (attempt ${retryCount + 1}/${MAX_RETRIES})...`); + await sleep(delay); + return claudeApiCall(systemPrompt, userPrompt, options, retryCount + 1); } throw new Error(`Claude API failed: ${resp.status} ${errText.slice(0, 200)}`); } @@ -101,7 +116,19 @@ async function generateClaude( totalDuration: duration * 1_000_000, // ns for compat evalCount: data.usage.output_tokens, }; - }); // end enqueueClaude +} + +async function generateClaude( + systemPrompt: string, + userPrompt: string, + options?: { temperature?: number; maxTokens?: number; timeoutMs?: number }, +): Promise { + if (!ANTHROPIC_API_KEY) { + throw new Error("ANTHROPIC_API_KEY not set — cannot use Claude provider"); + } + // Use enqueueClaude for serialization, but call claudeApiCall (not generateClaude) + // for retries to avoid circular-promise deadlock + return enqueueClaude(() => claudeApiCall(systemPrompt, userPrompt, options)); } // ═══════════════════════════════════════════════════════ diff --git a/packages/api/src/routes/blog.ts b/packages/api/src/routes/blog.ts index fd7f83d..d91a70d 100644 --- a/packages/api/src/routes/blog.ts +++ b/packages/api/src/routes/blog.ts @@ -24,7 +24,7 @@ function clearProgress(draftId: string): void { pipelineProgress.delete(draftId); } import { semanticSearch } from "../embeddings/client"; -import { generate, checkHealth, resetOllamaQueue, getQueueDepth } from "../llm/client"; +import { generate, checkHealth, resetOllamaQueue, resetClaudeQueue, getQueueDepth } from "../llm/client"; import { SYSTEM_PROMPT, DEPTH_PROMPT, @@ -1700,10 +1700,11 @@ blogRouter.get("/llm/status", async (_req: Request, res: Response) => { res.json({ success: true, queue_depth: getQueueDepth(), llm: health }); }); -// POST /api/blog/llm/reset-queue — Force-reset stuck Ollama queue +// POST /api/blog/llm/reset-queue — Force-reset stuck Ollama or Claude queue blogRouter.post("/llm/reset-queue", (_req: Request, res: Response) => { resetOllamaQueue(); - res.json({ success: true, message: "Ollama queue reset — stuck requests cleared" }); + resetClaudeQueue(); + res.json({ success: true, message: "LLM queues reset — stuck requests cleared (Ollama + Claude)" }); }); // GET /api/blog/:id — Get a specific draft with full content