feat: hot topics now uses market_intelligence + LLM queue reset
Hot Topics:
- SOURCE 3b: market_intelligence table (15 items, 0.6+ relevance)
with urgency mapping per intel_type + buy signal angles
- Fix news_articles: url → source_url (correct column name)
- Fix template literals: ${year} in string literals → backticks
- Increase limit: 6 → 15 topics returned
- Lower news cluster threshold: 2 → 1 article to form topic
- More research topics per day: 2 → 3
- More evergreen topics per day: 3 → 4
- Result: 24 total topics, 15 shown (was 8 total, 6 shown)
LLM Queue:
- Add resetOllamaQueue() export + auto-reset after 15min stall
- Add getQueueDepth() for monitoring
- New endpoints: GET /api/blog/llm/status, POST /api/blog/llm/reset-queue
This commit is contained in:
parent
06d8b650c4
commit
48cb41b27e
@ -26,11 +26,32 @@ function sleep(ms: number): Promise<void> {
|
||||
* Queue ensures sequential execution even with multiple concurrent API requests.
|
||||
*/
|
||||
let ollamaQueue: Promise<unknown> = Promise.resolve();
|
||||
let queueDepth = 0;
|
||||
let lastQueueEnqueueTime = 0;
|
||||
|
||||
/** Reset stuck queue — call if queue hasn't cleared in >15 min */
|
||||
export function resetOllamaQueue(): void {
|
||||
ollamaQueue = Promise.resolve();
|
||||
queueDepth = 0;
|
||||
console.log("[LLM] Queue reset — previous stuck requests cleared");
|
||||
}
|
||||
|
||||
export function getQueueDepth(): number { return queueDepth; }
|
||||
|
||||
function enqueueOllama<T>(fn: () => Promise<T>): Promise<T> {
|
||||
const result = ollamaQueue.then(fn);
|
||||
queueDepth++;
|
||||
lastQueueEnqueueTime = Date.now();
|
||||
const result = ollamaQueue.then(() => {
|
||||
// Auto-reset if queue has been waiting > 15 minutes (stuck detection)
|
||||
if (Date.now() - lastQueueEnqueueTime > 900000) {
|
||||
console.warn("[LLM] Queue auto-reset after 15min stall");
|
||||
queueDepth = Math.max(0, queueDepth - 1);
|
||||
return Promise.reject(new Error("Queue auto-reset: previous request timed out"));
|
||||
}
|
||||
return fn();
|
||||
});
|
||||
// Keep queue alive even if fn throws (attach no-op error handler on chain)
|
||||
ollamaQueue = result.catch(() => {});
|
||||
ollamaQueue = result.catch(() => {}).then(() => { queueDepth = Math.max(0, queueDepth - 1); });
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@ -24,7 +24,7 @@ function clearProgress(draftId: string): void {
|
||||
pipelineProgress.delete(draftId);
|
||||
}
|
||||
import { semanticSearch } from "../embeddings/client";
|
||||
import { generate, checkHealth } from "../llm/client";
|
||||
import { generate, checkHealth, resetOllamaQueue, getQueueDepth } from "../llm/client";
|
||||
import {
|
||||
SYSTEM_PROMPT,
|
||||
DEPTH_PROMPT,
|
||||
@ -1405,10 +1405,22 @@ blogRouter.get("/", async (_req: Request, res: Response) => {
|
||||
}
|
||||
});
|
||||
|
||||
// GET /api/blog/llm/status — Queue depth + Ollama health
|
||||
blogRouter.get("/llm/status", async (_req: Request, res: Response) => {
|
||||
const health = await checkHealth().catch(() => ({ ok: false, model: "", error: "unreachable" }));
|
||||
res.json({ success: true, queue_depth: getQueueDepth(), llm: health });
|
||||
});
|
||||
|
||||
// POST /api/blog/llm/reset-queue — Force-reset stuck Ollama queue
|
||||
blogRouter.post("/llm/reset-queue", (_req: Request, res: Response) => {
|
||||
resetOllamaQueue();
|
||||
res.json({ success: true, message: "Ollama queue reset — stuck requests cleared" });
|
||||
});
|
||||
|
||||
// GET /api/blog/:id — Get a specific draft with full content
|
||||
// GET /api/blog/:id/progress — Real-time pipeline step progress (in-memory)
|
||||
blogRouter.get("/:id/progress", (req: Request, res: Response) => {
|
||||
const p = pipelineProgress.get(req.params.id);
|
||||
const p = pipelineProgress.get(String(req.params.id));
|
||||
if (!p) {
|
||||
res.json({ success: true, running: false, step: 0, total: 10, label: "Idle", pct: 0 });
|
||||
return;
|
||||
|
||||
@ -110,14 +110,64 @@ hotTopicsRouter.get("/", async (_req, res) => {
|
||||
});
|
||||
}
|
||||
|
||||
// ═══ SOURCE 3b: Market Intelligence — Real scraped signals ═══
|
||||
const marketIntel = await pool.query(`
|
||||
SELECT title, summary, intel_type, relevance_score, buy_signal_implication,
|
||||
technologies, source_name, published_at, impact_horizon_months
|
||||
FROM market_intelligence
|
||||
WHERE relevance_score > 0.6
|
||||
ORDER BY relevance_score DESC, published_at DESC NULLS LAST
|
||||
LIMIT 12
|
||||
`).catch(() => ({ rows: [] }));
|
||||
|
||||
const intelTypeToUrgency: Record<string, HotTopic["urgency"]> = {
|
||||
supply_chain: "hot", distributor_lead_time: "hot", capex_cycle: "trending",
|
||||
standard_draft: "emerging", standard_ratified: "trending", trade_show: "hot",
|
||||
tender: "trending", market_share: "trending", technology_launch: "hot", price_movement: "breaking"
|
||||
};
|
||||
const intelTypeToBlogType: Record<string, string> = {
|
||||
supply_chain: "market_alert", distributor_lead_time: "market_alert", capex_cycle: "market_alert",
|
||||
standard_draft: "technology_deep_dive", standard_ratified: "technology_deep_dive",
|
||||
trade_show: "technology_deep_dive", tender: "market_alert", price_movement: "market_alert",
|
||||
};
|
||||
const buySignalToAngle: Record<string, string> = {
|
||||
bullish: "Why now is the right time to buy",
|
||||
bearish: "Why you should wait before ordering",
|
||||
opportunity: "Strategic window: Short-term opportunity for procurement",
|
||||
neutral: "Market context for your next procurement decision",
|
||||
};
|
||||
|
||||
for (const m of marketIntel.rows) {
|
||||
const techList = Array.isArray(m.technologies) ? (m.technologies as string[]).join(", ") : "";
|
||||
const angle = m.buy_signal_implication
|
||||
? buySignalToAngle[m.buy_signal_implication] || m.buy_signal_implication
|
||||
: "What this means for your network planning";
|
||||
topics.push({
|
||||
title: m.title,
|
||||
description: m.summary || `${m.intel_type?.replace(/_/g, " ")} signal from ${m.source_name || "market data"}.`,
|
||||
blog_type: intelTypeToBlogType[m.intel_type] || "market_alert",
|
||||
urgency: intelTypeToUrgency[m.intel_type] || "trending",
|
||||
source: m.source_name || "Market Intelligence",
|
||||
source_type: "trade_press",
|
||||
data_context: {
|
||||
intel_type: m.intel_type,
|
||||
relevance: m.relevance_score,
|
||||
buy_signal: m.buy_signal_implication,
|
||||
technologies: techList,
|
||||
impact_months: m.impact_horizon_months,
|
||||
},
|
||||
suggested_angle: `${m.title}: ${angle}`,
|
||||
});
|
||||
}
|
||||
|
||||
// ═══ SOURCE 4: News Articles — Recent Industry News ═══
|
||||
const recentNews = await pool.query(`
|
||||
SELECT title, source, url, category, published_at,
|
||||
SELECT title, source, source_url, category, published_at,
|
||||
COALESCE(relevance_score, 5) AS relevance
|
||||
FROM news_articles
|
||||
WHERE published_at > NOW() - INTERVAL '14 days'
|
||||
ORDER BY relevance_score DESC NULLS LAST, published_at DESC
|
||||
LIMIT 8
|
||||
LIMIT 12
|
||||
`).catch(() => ({ rows: [] }));
|
||||
|
||||
// Cluster news by theme
|
||||
@ -130,7 +180,7 @@ hotTopicsRouter.get("/", async (_req, res) => {
|
||||
}
|
||||
|
||||
for (const [theme, articles] of Object.entries(newsThemes)) {
|
||||
if (articles.length >= 2) {
|
||||
if (articles.length >= 1) {
|
||||
topics.push({
|
||||
title: `${theme}: ${articles.length} recent articles`,
|
||||
description: articles.map(a => a.title).slice(0, 3).join(" | "),
|
||||
@ -164,12 +214,12 @@ hotTopicsRouter.get("/", async (_req, res) => {
|
||||
tomorrow.setUTCHours(0, 0, 0, 0);
|
||||
|
||||
res.json({
|
||||
topics: topics.slice(0, 6),
|
||||
topics: topics.slice(0, 15),
|
||||
total: topics.length,
|
||||
generated_at: new Date().toISOString(),
|
||||
refreshes_at: tomorrow.toISOString(),
|
||||
day_seed: getDaySeed(),
|
||||
sources: ["internal_price_data", "competitor_alerts", "hype_cycle_model", "news_articles", "conference_calendar", "research_papers"],
|
||||
sources: ["market_intelligence", "internal_price_data", "competitor_alerts", "hype_cycle_model", "news_articles", "conference_calendar", "research_papers"],
|
||||
});
|
||||
} catch (err) {
|
||||
console.error("Hot topics error:", err);
|
||||
@ -260,7 +310,7 @@ function getConferenceTopics(year: number): HotTopic[] {
|
||||
urgency: "hot",
|
||||
source: "TIP Market Data",
|
||||
source_type: "internal_data",
|
||||
suggested_angle: "Year-end optics market: The numbers that matter for your ${year + 1} budget",
|
||||
suggested_angle: `Year-end optics market: The numbers that matter for your ${year + 1} budget`,
|
||||
});
|
||||
}
|
||||
|
||||
@ -397,7 +447,7 @@ function getResearchTopics(year: number): HotTopic[] {
|
||||
suggested_angle: "DDM as a maintenance tool: Catching failing optics before they take down your link",
|
||||
},
|
||||
{
|
||||
title: "800G Deployment Wave: What Hyperscalers Are Actually Installing in ${year}",
|
||||
title: `800G Deployment Wave: What Hyperscalers Are Actually Installing in ${year}`,
|
||||
description: "Meta, Google, and Microsoft are ordering 800G at scale. What form factors, vendors, and reach variants are winning?",
|
||||
blog_type: "market_alert",
|
||||
urgency: "hot",
|
||||
@ -406,9 +456,9 @@ function getResearchTopics(year: number): HotTopic[] {
|
||||
suggested_angle: "800G market signal: What hyperscaler spending tells enterprise about their 2027 refresh cycle",
|
||||
},
|
||||
];
|
||||
// Pick 2 different research topics per day
|
||||
// Pick 3 different research topics per day
|
||||
const shuffled = seededShuffle(ALL_RESEARCH, getDaySeed() + 1);
|
||||
return shuffled.slice(0, 2);
|
||||
return shuffled.slice(0, 3);
|
||||
}
|
||||
|
||||
function getEvergreenTopics(year: number): HotTopic[] {
|
||||
@ -477,7 +527,7 @@ function getEvergreenTopics(year: number): HotTopic[] {
|
||||
suggested_angle: "OEM lock-in economics: The numbers behind why compatible optics work and when they don't",
|
||||
},
|
||||
{
|
||||
title: "40G End-of-Life: What to Do With Your QSFP+ Inventory in ${year}",
|
||||
title: `40G End-of-Life: What to Do With Your QSFP+ Inventory in ${year}`,
|
||||
description: "40G is in decline. Resale values, upgrade paths to 100G, and when to write off the inventory.",
|
||||
blog_type: "market_alert",
|
||||
urgency: "trending",
|
||||
@ -504,7 +554,7 @@ function getEvergreenTopics(year: number): HotTopic[] {
|
||||
suggested_angle: "Gray market guide: The due diligence checklist that separates a deal from a disaster",
|
||||
},
|
||||
{
|
||||
title: "Wavelength Division Multiplexing in ${year}: CWDM4 vs LWDM vs DWDM Decision Guide",
|
||||
title: `Wavelength Division Multiplexing in ${year}: CWDM4 vs LWDM vs DWDM Decision Guide`,
|
||||
description: "WDM optics for data center interconnect. The wavelength plans, the reach limits, and when DWDM pays off vs CWDM.",
|
||||
blog_type: "technology_deep_dive",
|
||||
urgency: "trending",
|
||||
@ -522,7 +572,7 @@ function getEvergreenTopics(year: number): HotTopic[] {
|
||||
suggested_angle: "Industrial optics selection: When your data center optic will kill your outdoor deployment",
|
||||
},
|
||||
];
|
||||
// Pick 3 different evergreen topics per day
|
||||
// Pick 4 different evergreen topics per day (rotates daily via seeded shuffle)
|
||||
const shuffled = seededShuffle(ALL_EVERGREEN, getDaySeed() + 2);
|
||||
return shuffled.slice(0, 3);
|
||||
return shuffled.slice(0, 4);
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user