transceiver-db/packages/scraper/src/scrapers/market-intelligence.ts
Rene Fichtmueller c7d7456de9 fix: instance-level Crawlee storage isolation + eBay vendor type
- Add utils/crawlee-config.ts: makeCrawleeConfig(name) returns a
  Crawlee Configuration with isolated localDataDirectory per scraper.
  Uses storageClientOptions (not global CRAWLEE_STORAGE_DIR) so
  concurrent pg-boss workers in the same process don't race on
  the shared env var.

- Apply makeCrawleeConfig to all 6 Crawlee-based scrapers:
  optcore (PlaywrightCrawler), atgbics (PlaywrightCrawler),
  community-issues (CheerioCrawler + RequestQueue),
  edgecore (CheerioCrawler), ufispace (CheerioCrawler),
  market-intelligence (CheerioCrawler).

- scheduler.ts: add withIsolatedStorage for optcore and market-intel
  workers (was missing, caused storage-fs path bleed from fs scraper).

- ebay-enricher.ts: fix vendor type 'marketplace' -> 'reseller' to
  satisfy vendors_type_check constraint
  ['manufacturer','distributor','oem','reseller','compatible'].
2026-04-18 01:35:57 +02:00

301 lines
12 KiB
TypeScript

/**
* Market Intelligence Scraper
*
* Collects procurement-relevant signals from:
* - OFC/ECOC conference programs
* - Farnell/Mouser lead times for optical modules
* - IEEE 802.3 working group status page
* - EU TED tender database (fiber infrastructure)
* - LightReading/FierceTelecom trade press
*
* Runs weekly via pg-boss scheduler.
* Results stored in market_intelligence table.
* LLM analysis via Crawler LLM extractMarketIntel().
*/
import { CheerioCrawler } from "crawlee";
import { makeCrawleeConfig } from "../utils/crawlee-config";
import { extractMarketIntel } from "../crawler-llm/core";
import { pool } from "../utils/db";
interface IntelSource {
name: string;
url: string;
type: "trade_show" | "standard_ratified" | "standard_draft" | "distributor_lead_time" | "supply_chain" | "tender" | "capex_cycle";
fetchText: (html: string) => string; // extract relevant text from HTML
}
const SOURCES: IntelSource[] = [
{
name: "OFC Conference News",
url: "https://www.ofcconference.org/en-us/home/news/",
type: "trade_show",
fetchText: (html) => html.replace(/<[^>]+>/g, " ").replace(/\s+/g, " ").substring(0, 8000),
},
{
name: "LightReading — Optical Networking",
url: "https://www.lightreading.com/optical-networking",
type: "supply_chain",
fetchText: (html) => html.replace(/<[^>]+>/g, " ").replace(/\s+/g, " ").substring(0, 8000),
},
{
name: "IEEE 802.3 Working Group",
url: "https://www.ieee802.org/3/",
type: "standard_draft",
fetchText: (html) => html.replace(/<[^>]+>/g, " ").replace(/\s+/g, " ").substring(0, 6000),
},
{
name: "EU TED — ICT/Fiber Tenders",
url: "https://ted.europa.eu/en/search/result?forms%5B0%5D%5Bcpv%5D=32571000",
type: "tender",
fetchText: (html) => html.replace(/<[^>]+>/g, " ").replace(/\s+/g, " ").substring(0, 8000),
},
{
name: "Farnell — SFP Fiber Transceivers Lead Times",
url: "https://de.farnell.com/c/passive-optische-netzwerke/glasfasertransceiver",
type: "distributor_lead_time",
fetchText: (html) => {
// Extract lead time patterns: "X Wochen", "X weeks", "X days"
const leadTimePattern = /(\d+)\s*(wochen|weeks|days|tage|week|day)/gi;
const matches = [];
let m;
while ((m = leadTimePattern.exec(html)) !== null) {
matches.push(m[0]);
}
const context = html.replace(/<[^>]+>/g, " ").replace(/\s+/g, " ");
return `Lead time mentions: ${matches.join(", ")}\n\nPage context:\n${context.substring(0, 5000)}`;
},
},
{
name: "Mouser — Optical Transceivers Category",
url: "https://www.mouser.de/c/optoelectronics/fiber-optic-components/fiber-optic-transceivers/",
type: "distributor_lead_time",
fetchText: (html) => html.replace(/<[^>]+>/g, " ").replace(/\s+/g, " ").substring(0, 6000),
},
{
name: "FierceTelecom — Optical News",
url: "https://www.fiercetelecom.com/optical",
type: "supply_chain",
fetchText: (html) => html.replace(/<[^>]+>/g, " ").replace(/\s+/g, " ").substring(0, 8000),
},
];
async function saveIntelItem(
item: Awaited<ReturnType<typeof extractMarketIntel>>,
source: IntelSource,
url: string
): Promise<void> {
if (!item.is_relevant || item.confidence < 0.5) return;
await pool.query(
`INSERT INTO market_intelligence
(intel_type, title, summary, relevance_score, technologies,
buy_signal_implication, impact_horizon_months, source_url, source_name, published_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT DO NOTHING`,
[
item.intel_type,
item.title.substring(0, 500),
item.summary,
item.confidence,
item.technologies,
item.buy_signal_implication,
item.impact_horizon_months,
url,
source.name,
item.published_at ? new Date(item.published_at) : null,
]
);
console.log(`[market-intel] Saved: ${item.title.substring(0, 60)}... (${item.buy_signal_implication})`);
}
export async function scrapeMarketIntelligence(): Promise<void> {
console.log("[market-intel] Starting market intelligence scrape...");
let processed = 0;
const crawler = new CheerioCrawler({
maxRequestsPerCrawl: SOURCES.length + 20,
maxConcurrency: 2,
requestHandlerTimeoutSecs: 30,
async requestHandler({ request, body }) {
const html = body.toString();
const source = SOURCES.find((s) => request.url.startsWith(s.url.split("?")[0]));
if (!source) return;
const text = source.fetchText(html);
if (text.length < 200) {
console.warn(`[market-intel] Too little text from ${request.url}`);
return;
}
try {
const intel = await extractMarketIntel(text, request.url, source.name);
await saveIntelItem(intel, source, request.url);
processed++;
} catch (err) {
console.error(`[market-intel] LLM error for ${request.url}:`, err);
}
},
async failedRequestHandler({ request }) {
console.warn(`[market-intel] Failed: ${request.url}`);
},
}, makeCrawleeConfig("market-intel"));
await crawler.addRequests(SOURCES.map((s) => ({ url: s.url })));
await crawler.run();
console.log(`[market-intel] Done. Processed ${processed}/${SOURCES.length} sources.`);
}
// ─────────────────────────────────────────────────────────────────────────────
// ABC Classification computation — run after each major scrape cycle
// ─────────────────────────────────────────────────────────────────────────────
export async function computeAbcClassification(): Promise<void> {
console.log("[abc] Computing ABC classification...");
await pool.query(`
INSERT INTO abc_classification
(transceiver_id, abc_class, obs_90d, compat_count, vendor_count, price_volatility, demand_score, supply_risk)
SELECT
t.id,
CASE
WHEN obs_90d > 50 AND compat_count > 100 THEN 'A'
WHEN obs_90d > 15 OR compat_count > 30 THEN 'B'
ELSE 'C'
END AS abc_class,
obs_90d,
compat_count,
vendor_count,
price_volatility,
-- demand score 0-100: weighted combination
LEAST(100, (obs_90d * 0.5 + compat_count * 0.3 + vendor_count * 5)) AS demand_score,
CASE
WHEN price_volatility > 0.3 THEN 'high'
WHEN price_volatility > 0.1 THEN 'medium'
ELSE 'low'
END AS supply_risk
FROM transceivers t
LEFT JOIN (
SELECT transceiver_id,
COUNT(*) FILTER (WHERE time > NOW() - INTERVAL '90 days') AS obs_90d,
STDDEV(price) / NULLIF(AVG(price), 0) AS price_volatility,
COUNT(DISTINCT source_vendor_id) AS vendor_count
FROM price_observations
GROUP BY transceiver_id
) po ON po.transceiver_id = t.id
LEFT JOIN (
SELECT transceiver_id, COUNT(*) AS compat_count
FROM compatibility
WHERE status = 'compatible'
GROUP BY transceiver_id
) co ON co.transceiver_id = t.id
WHERE t.data_confidence != 'garbage' OR t.data_confidence IS NULL
ON CONFLICT (transceiver_id) DO UPDATE SET
abc_class = EXCLUDED.abc_class,
obs_90d = EXCLUDED.obs_90d,
compat_count = EXCLUDED.compat_count,
vendor_count = EXCLUDED.vendor_count,
price_volatility = EXCLUDED.price_volatility,
demand_score = EXCLUDED.demand_score,
supply_risk = EXCLUDED.supply_risk,
computed_at = NOW()
`);
const stats = await pool.query(`
SELECT abc_class, COUNT(*) AS count FROM abc_classification GROUP BY abc_class ORDER BY abc_class
`);
console.log("[abc] Classification done:", stats.rows.map((r) => `${r.abc_class}: ${r.count}`).join(", "));
}
// ─────────────────────────────────────────────────────────────────────────────
// Reorder Signal computation
// ─────────────────────────────────────────────────────────────────────────────
export async function computeReorderSignals(): Promise<void> {
console.log("[reorder] Computing reorder signals...");
// Get all transceivers with enough data
const transceivers = await pool.query(`
SELECT
t.id, t.part_number, t.standard_name, t.speed_gbps, t.form_factor,
ac.abc_class, ac.price_volatility, ac.supply_risk,
-- Price trend: is price rising or falling?
(SELECT AVG(price) FROM price_observations WHERE transceiver_id = t.id AND time > NOW() - INTERVAL '14 days') AS price_recent,
(SELECT AVG(price) FROM price_observations WHERE transceiver_id = t.id AND time BETWEEN NOW() - INTERVAL '60 days' AND NOW() - INTERVAL '14 days') AS price_older,
-- Stock trend: how many vendors show in_stock recently?
(SELECT COUNT(*) FROM stock_snapshots WHERE transceiver_id = t.id AND stock_level = 'in_stock' AND scraped_at > NOW() - INTERVAL '7 days') AS in_stock_recent,
(SELECT COUNT(*) FROM stock_snapshots WHERE transceiver_id = t.id AND stock_level = 'out_of_stock' AND scraped_at > NOW() - INTERVAL '7 days') AS oos_recent,
-- Lead time
(SELECT AVG(lead_time_days) FROM stock_snapshots WHERE transceiver_id = t.id AND lead_time_days IS NOT NULL AND scraped_at > NOW() - INTERVAL '30 days') AS avg_lead_time_days,
-- Lifecycle events
(SELECT MAX(impact_level) FROM product_lifecycle_events WHERE transceiver_id = t.id OR technology = t.speed_gbps::text || 'G') AS lifecycle_impact
FROM transceivers t
LEFT JOIN abc_classification ac ON ac.transceiver_id = t.id
WHERE (t.data_confidence != 'garbage' OR t.data_confidence IS NULL)
`);
let computed = 0;
for (const row of transceivers.rows) {
const reasons: string[] = [];
let signal: "buy_now" | "wait" | "hold" | "monitor" = "monitor";
let strength = 0.3;
const priceTrend = row.price_recent && row.price_older
? (row.price_recent - row.price_older) / row.price_older
: null;
const stockTrend =
row.oos_recent > 2 ? "declining" :
row.in_stock_recent > 2 ? "stable" : "unknown";
const leadTimeWeeks = row.avg_lead_time_days ? Math.ceil(row.avg_lead_time_days / 7) : null;
// Signal logic
if (row.lifecycle_impact === "critical" || row.lifecycle_impact === "high") {
signal = "buy_now"; strength = 0.9;
reasons.push("EOL/critical lifecycle event detected");
} else if (stockTrend === "declining" && row.abc_class === "A") {
signal = "buy_now"; strength = 0.8;
reasons.push("Stock declining at multiple vendors (A-product)");
} else if (leadTimeWeeks && leadTimeWeeks >= 12) {
signal = "buy_now"; strength = 0.75;
reasons.push(`Long lead time: ${leadTimeWeeks} weeks — order in advance`);
} else if (priceTrend !== null && priceTrend < -0.10) {
signal = "wait"; strength = 0.7;
reasons.push(`Price falling ${Math.abs(Math.round(priceTrend * 100))}% — wait for floor`);
} else if (priceTrend !== null && priceTrend > 0.10) {
signal = "buy_now"; strength = 0.65;
reasons.push(`Price rising ${Math.round(priceTrend * 100)}% — buy before further increase`);
} else if (row.abc_class === "A" && stockTrend === "stable") {
signal = "hold"; strength = 0.5;
reasons.push("A-product, stable pricing and availability");
} else if (row.abc_class === "C") {
signal = "monitor"; strength = 0.3;
reasons.push("C-product: low demand — order on demand only");
}
if (reasons.length === 0) reasons.push("Insufficient data for strong signal");
await pool.query(
`INSERT INTO reorder_signals
(transceiver_id, signal, signal_strength, reasons, stock_trend, price_trend, lead_time_weeks)
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
row.id,
signal,
strength,
JSON.stringify(reasons),
stockTrend,
priceTrend === null ? "unknown" : priceTrend > 0.05 ? "rising" : priceTrend < -0.05 ? "falling" : "stable",
leadTimeWeeks,
]
);
computed++;
}
console.log(`[reorder] Computed ${computed} reorder signals.`);
}