fix(blog): fix claudeQueue deadlock from recursive 429 retry
The generateClaude() function was recursively calling itself inside enqueueClaude(), creating a circular Promise dependency that permanently deadlocked the claudeQueue. Any 429 rate-limit response would poison the queue, blocking all future Claude API calls until server restart. Fixes: - Split retries into claudeApiCall() which is called from enqueueClaude (not re-entering the queue on retry = no circular dependency) - Max 3 retries with increasing backoff (10s/30s/60s) - Add resetClaudeQueue() exported function - Add 15-minute auto-reset stall detection to enqueueClaude - Expose resetClaudeQueue in POST /api/blog/llm/reset-queue endpoint - Fix merge conflict markers in index.ts (duplicate scraperRouter import)
This commit is contained in:
parent
55de4920b2
commit
72033ff5c5
@ -26,7 +26,6 @@ import { hotTopicsRouter } from "./routes/hot-topics";
|
|||||||
import { adoptionRouter } from "./routes/adoption";
|
import { adoptionRouter } from "./routes/adoption";
|
||||||
import { procurementRouter } from "./routes/procurement";
|
import { procurementRouter } from "./routes/procurement";
|
||||||
import { changelogRouter } from "./routes/changelog";
|
import { changelogRouter } from "./routes/changelog";
|
||||||
import { scraperRouter } from "./routes/scrapers";
|
|
||||||
import { newsRouter } from "./routes/news";
|
import { newsRouter } from "./routes/news";
|
||||||
import { proxyRouter } from "./routes/proxy";
|
import { proxyRouter } from "./routes/proxy";
|
||||||
|
|
||||||
@ -74,7 +73,6 @@ app.use("/api/search", searchRouter);
|
|||||||
app.use("/api/documents", documentRouter);
|
app.use("/api/documents", documentRouter);
|
||||||
app.use("/api/blog", blogSllRouter);
|
app.use("/api/blog", blogSllRouter);
|
||||||
app.use("/api/blog", blogRouter);
|
app.use("/api/blog", blogRouter);
|
||||||
<<<<<<< Updated upstream
|
|
||||||
app.use("/api/scrapers", scraperRouter);
|
app.use("/api/scrapers", scraperRouter);
|
||||||
app.use("/api/finder", finderRouter);
|
app.use("/api/finder", finderRouter);
|
||||||
app.use("/api/competitor-alerts", competitorRouter);
|
app.use("/api/competitor-alerts", competitorRouter);
|
||||||
@ -85,7 +83,6 @@ app.use("/api/adoption", adoptionRouter);
|
|||||||
app.use("/api/hot-topics", hotTopicsRouter);
|
app.use("/api/hot-topics", hotTopicsRouter);
|
||||||
app.use("/api/procurement", procurementRouter);
|
app.use("/api/procurement", procurementRouter);
|
||||||
app.use("/api/changelog", changelogRouter);
|
app.use("/api/changelog", changelogRouter);
|
||||||
app.use("/api/scrapers", scraperRouter);
|
|
||||||
app.use("/api/news", newsRouter);
|
app.use("/api/news", newsRouter);
|
||||||
|
|
||||||
// Dashboard (static HTML)
|
// Dashboard (static HTML)
|
||||||
|
|||||||
@ -34,24 +34,39 @@ function sleep(ms: number): Promise<void> {
|
|||||||
// Serialize Claude API calls to stay within TPM limits
|
// Serialize Claude API calls to stay within TPM limits
|
||||||
// Tier-1 has 40,000 TPM — with ~20K tokens/step, only 1 concurrent call safe
|
// Tier-1 has 40,000 TPM — with ~20K tokens/step, only 1 concurrent call safe
|
||||||
let claudeQueue: Promise<unknown> = Promise.resolve();
|
let claudeQueue: Promise<unknown> = Promise.resolve();
|
||||||
|
let claudeQueueEnqueueTime = 0;
|
||||||
|
|
||||||
|
export function resetClaudeQueue(): void {
|
||||||
|
claudeQueue = Promise.resolve();
|
||||||
|
claudeQueueEnqueueTime = 0;
|
||||||
|
console.log("[LLM] Claude queue reset — previous stuck requests cleared");
|
||||||
|
}
|
||||||
|
|
||||||
function enqueueClaude<T>(fn: () => Promise<T>): Promise<T> {
|
function enqueueClaude<T>(fn: () => Promise<T>): Promise<T> {
|
||||||
const result = claudeQueue.then(() => fn());
|
claudeQueueEnqueueTime = Date.now();
|
||||||
|
const result = claudeQueue.then(() => {
|
||||||
|
// Auto-reset if queue has been stalled > 15 minutes (prevents deadlock on stuck requests)
|
||||||
|
if (Date.now() - claudeQueueEnqueueTime > 900000) {
|
||||||
|
console.warn("[LLM] Claude queue auto-reset after 15min stall");
|
||||||
|
return Promise.reject(new Error("Claude queue auto-reset: previous request timed out"));
|
||||||
|
}
|
||||||
|
return fn();
|
||||||
|
});
|
||||||
claudeQueue = result.catch(() => {});
|
claudeQueue = result.catch(() => {});
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
async function generateClaude(
|
// Direct API call without going through the serialization queue — used for 429 retries
|
||||||
|
// to avoid the circular-promise deadlock that recursive enqueueClaude creates
|
||||||
|
async function claudeApiCall(
|
||||||
systemPrompt: string,
|
systemPrompt: string,
|
||||||
userPrompt: string,
|
userPrompt: string,
|
||||||
options?: { temperature?: number; maxTokens?: number; timeoutMs?: number },
|
options?: { temperature?: number; maxTokens?: number; timeoutMs?: number },
|
||||||
|
retryCount = 0,
|
||||||
): Promise<LlmResponse> {
|
): Promise<LlmResponse> {
|
||||||
if (!ANTHROPIC_API_KEY) {
|
|
||||||
throw new Error("ANTHROPIC_API_KEY not set — cannot use Claude provider");
|
|
||||||
}
|
|
||||||
|
|
||||||
return enqueueClaude(async () => {
|
|
||||||
const startTime = Date.now();
|
const startTime = Date.now();
|
||||||
|
const MAX_RETRIES = 3;
|
||||||
|
const RETRY_DELAYS = [10000, 30000, 60000];
|
||||||
|
|
||||||
const resp = await fetch("https://api.anthropic.com/v1/messages", {
|
const resp = await fetch("https://api.anthropic.com/v1/messages", {
|
||||||
method: "POST",
|
method: "POST",
|
||||||
@ -72,11 +87,11 @@ async function generateClaude(
|
|||||||
|
|
||||||
if (!resp.ok) {
|
if (!resp.ok) {
|
||||||
const errText = await resp.text();
|
const errText = await resp.text();
|
||||||
// Rate limit retry
|
if (resp.status === 429 && retryCount < MAX_RETRIES) {
|
||||||
if (resp.status === 429) {
|
const delay = RETRY_DELAYS[retryCount] ?? 60000;
|
||||||
console.log("[LLM] Claude 429 — retrying in 10s...");
|
console.log(`[LLM] Claude 429 — retrying in ${delay / 1000}s (attempt ${retryCount + 1}/${MAX_RETRIES})...`);
|
||||||
await sleep(10000);
|
await sleep(delay);
|
||||||
return generateClaude(systemPrompt, userPrompt, options);
|
return claudeApiCall(systemPrompt, userPrompt, options, retryCount + 1);
|
||||||
}
|
}
|
||||||
throw new Error(`Claude API failed: ${resp.status} ${errText.slice(0, 200)}`);
|
throw new Error(`Claude API failed: ${resp.status} ${errText.slice(0, 200)}`);
|
||||||
}
|
}
|
||||||
@ -101,7 +116,19 @@ async function generateClaude(
|
|||||||
totalDuration: duration * 1_000_000, // ns for compat
|
totalDuration: duration * 1_000_000, // ns for compat
|
||||||
evalCount: data.usage.output_tokens,
|
evalCount: data.usage.output_tokens,
|
||||||
};
|
};
|
||||||
}); // end enqueueClaude
|
}
|
||||||
|
|
||||||
|
async function generateClaude(
|
||||||
|
systemPrompt: string,
|
||||||
|
userPrompt: string,
|
||||||
|
options?: { temperature?: number; maxTokens?: number; timeoutMs?: number },
|
||||||
|
): Promise<LlmResponse> {
|
||||||
|
if (!ANTHROPIC_API_KEY) {
|
||||||
|
throw new Error("ANTHROPIC_API_KEY not set — cannot use Claude provider");
|
||||||
|
}
|
||||||
|
// Use enqueueClaude for serialization, but call claudeApiCall (not generateClaude)
|
||||||
|
// for retries to avoid circular-promise deadlock
|
||||||
|
return enqueueClaude(() => claudeApiCall(systemPrompt, userPrompt, options));
|
||||||
}
|
}
|
||||||
|
|
||||||
// ═══════════════════════════════════════════════════════
|
// ═══════════════════════════════════════════════════════
|
||||||
|
|||||||
@ -24,7 +24,7 @@ function clearProgress(draftId: string): void {
|
|||||||
pipelineProgress.delete(draftId);
|
pipelineProgress.delete(draftId);
|
||||||
}
|
}
|
||||||
import { semanticSearch } from "../embeddings/client";
|
import { semanticSearch } from "../embeddings/client";
|
||||||
import { generate, checkHealth, resetOllamaQueue, getQueueDepth } from "../llm/client";
|
import { generate, checkHealth, resetOllamaQueue, resetClaudeQueue, getQueueDepth } from "../llm/client";
|
||||||
import {
|
import {
|
||||||
SYSTEM_PROMPT,
|
SYSTEM_PROMPT,
|
||||||
DEPTH_PROMPT,
|
DEPTH_PROMPT,
|
||||||
@ -1700,10 +1700,11 @@ blogRouter.get("/llm/status", async (_req: Request, res: Response) => {
|
|||||||
res.json({ success: true, queue_depth: getQueueDepth(), llm: health });
|
res.json({ success: true, queue_depth: getQueueDepth(), llm: health });
|
||||||
});
|
});
|
||||||
|
|
||||||
// POST /api/blog/llm/reset-queue — Force-reset stuck Ollama queue
|
// POST /api/blog/llm/reset-queue — Force-reset stuck Ollama or Claude queue
|
||||||
blogRouter.post("/llm/reset-queue", (_req: Request, res: Response) => {
|
blogRouter.post("/llm/reset-queue", (_req: Request, res: Response) => {
|
||||||
resetOllamaQueue();
|
resetOllamaQueue();
|
||||||
res.json({ success: true, message: "Ollama queue reset — stuck requests cleared" });
|
resetClaudeQueue();
|
||||||
|
res.json({ success: true, message: "LLM queues reset — stuck requests cleared (Ollama + Claude)" });
|
||||||
});
|
});
|
||||||
|
|
||||||
// GET /api/blog/:id — Get a specific draft with full content
|
// GET /api/blog/:id — Get a specific draft with full content
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user