fix: daemon stability + health monitor accuracy

- Add global unhandledRejection handler in scheduler daemon to swallow
  Crawlee's benign post-run ENOENT lock-file races (prevents process.exit(1))
- Add SKIP_FS_SCRAPER env var: skip FS.com worker on Erik where Cloudflare
  WAF blocks datacenter IPs (Mac launchd handles FS.com from residential IP)
- Remove FS.COM from health monitor EXPECTED_VENDORS (skipped on Erik)
- Health monitor: extend pg-boss lookup from 12h → 26h, add completed-job
  map; if job ran OK in last 26h + vendor has historical prices → mark
  STABLE instead of CRITICAL (fixes ATGBICS/Fluxlight hash-dedup false positives)
- Install Playwright Chromium on Erik (fixes ATGBICS BrowserLaunchError)
- Create missing Crawlee storage dirs on Erik (storage-fs-phase1/2,
  storage-ebay-transceivers) to prevent ENOENT on first Crawlee run
This commit is contained in:
Rene Fichtmueller 2026-04-18 03:16:59 +02:00
parent 8391b194a5
commit 93d825dc04
2 changed files with 66 additions and 11 deletions

View File

@ -169,6 +169,22 @@ async function runScheduler(): Promise<void> {
console.log("=== TIP Scraper Engine ===\n"); console.log("=== TIP Scraper Engine ===\n");
console.log("Mode: Scheduler (pg-boss)\n"); console.log("Mode: Scheduler (pg-boss)\n");
// Crawlee's FileSystemStorage fires spurious unhandledRejection errors after
// crawler.run() resolves: its internal task loop schedules one final
// _isTaskReadyFunction call that tries to mkdir/lock a request file that
// Crawlee already cleaned up. If not suppressed, Node.js calls process.exit(1)
// and takes down the whole scheduler daemon. Swallow ENOENT from request_queues
// paths (the Crawlee storage root); re-raise everything else.
process.on("unhandledRejection", (reason) => {
const msg = reason instanceof Error ? reason.message : String(reason);
if ((msg.includes("ENOENT") || msg.includes("no such file")) && msg.includes("request_queues")) {
// Benign Crawlee post-run lock-file race — ignore
return;
}
console.error("[scheduler] Unhandled rejection:", reason);
process.exit(1);
});
const boss = await createScheduler(); const boss = await createScheduler();
// Cleanup zombie jobs left by previous daemon crash. // Cleanup zombie jobs left by previous daemon crash.

View File

@ -326,6 +326,13 @@ export async function registerWorkers(boss: PgBoss): Promise<void> {
// ── Playwright scrapers ─────────────────────────────────────────────── // ── Playwright scrapers ───────────────────────────────────────────────
await boss.work("scrape:pricing:fs", async () => { await boss.work("scrape:pricing:fs", async () => {
// FS.com uses Playwright + Cloudflare bypass. On datacenter servers the
// datacenter IP is blocked by Cloudflare WAF. Set SKIP_FS_SCRAPER=true to
// skip on Erik; the Mac launchd cron handles FS.com from a residential IP.
if (process.env["SKIP_FS_SCRAPER"] === "true") {
console.log(`[${new Date().toISOString()}] FS.com pricing: SKIPPED (SKIP_FS_SCRAPER=true)`);
return;
}
console.log(`[${new Date().toISOString()}] Running: FS.com pricing`); console.log(`[${new Date().toISOString()}] Running: FS.com pricing`);
await scrapeFs(); await scrapeFs();
}); });
@ -691,7 +698,8 @@ export async function registerWorkers(boss: PgBoss): Promise<void> {
{ name: "FiberMall", jobName: "scrape:pricing:fibermall" }, { name: "FiberMall", jobName: "scrape:pricing:fibermall" },
{ name: "QSFPTEK", jobName: "scrape:pricing:qsfptek" }, { name: "QSFPTEK", jobName: "scrape:pricing:qsfptek" },
{ name: "Flexoptix", jobName: "scrape:pricing:flexoptix" }, { name: "Flexoptix", jobName: "scrape:pricing:flexoptix" },
{ name: "FS.COM", jobName: "scrape:pricing:fs" }, // FS.COM is skipped on Erik (SKIP_FS_SCRAPER=true) — Mac launchd handles it.
// Exclude from monitor so stale last_seen doesn't trigger false positives.
{ name: "10Gtek", jobName: "scrape:pricing:10gtek" }, { name: "10Gtek", jobName: "scrape:pricing:10gtek" },
{ name: "ATGBICS", jobName: "scrape:pricing:atgbics" }, { name: "ATGBICS", jobName: "scrape:pricing:atgbics" },
{ name: "GBICS", jobName: "scrape:pricing:gbics" }, { name: "GBICS", jobName: "scrape:pricing:gbics" },
@ -715,12 +723,15 @@ export async function registerWorkers(boss: PgBoss): Promise<void> {
ORDER BY last_seen ASC NULLS FIRST ORDER BY last_seen ASC NULLS FIRST
`, [vendorNames]); `, [vendorNames]);
// Last successful pg-boss job per vendor scraper (within last 12h) // Last successful pg-boss job per vendor scraper (within last 26h — covers any
// 2h-scheduled job at least once plus 2h slack for daemon restart delays).
// Also pull the most recent failed job so we can distinguish "running but
// prices stable" from "job consistently failing".
const jobResult = await pool.query(` const jobResult = await pool.query(`
SELECT DISTINCT ON (name) name, state, completed_on SELECT DISTINCT ON (name) name, state, completed_on
FROM pgboss.job FROM pgboss.job
WHERE name = ANY($1) WHERE name = ANY($1)
AND created_on > NOW() - INTERVAL '12 hours' AND created_on > NOW() - INTERVAL '26 hours'
ORDER BY name, created_on DESC ORDER BY name, created_on DESC
`, [jobNames]); `, [jobNames]);
@ -729,10 +740,28 @@ export async function registerWorkers(boss: PgBoss): Promise<void> {
jobMap.set(row.name as string, { state: row.state as string, completed_on: row.completed_on as Date | null }); jobMap.set(row.name as string, { state: row.state as string, completed_on: row.completed_on as Date | null });
} }
// Last COMPLETED job per vendor (to know when the job last ran successfully)
const completedResult = await pool.query(`
SELECT DISTINCT ON (name) name, completed_on AS last_completed
FROM pgboss.job
WHERE name = ANY($1)
AND state = 'completed'
AND created_on > NOW() - INTERVAL '26 hours'
ORDER BY name, completed_on DESC
`, [jobNames]);
const completedMap = new Map<string, Date>();
for (const row of completedResult.rows) {
if (row.last_completed) completedMap.set(row.name as string, new Date(row.last_completed as string));
}
// Thresholds for alerting: // Thresholds for alerting:
// CRITICAL (🔴): last price > 168h (7 days) — genuinely broken // CRITICAL (🔴): no job completed in 26h AND last price > 168h (7 days)
// WARNING (🟡): last price > 48h (2 days) — possibly stale // — scraper genuinely not running or consistently failing
// STABLE (✅): 0 new prices but last price ≤48h — prices unchanged, scraper OK // WARNING (🟡): no job completed in 26h AND last price > 48h
// — scraper may be broken but recently seen
// STABLE (✅): job completed in last 26h AND vendor has historical prices
// — prices unchanged (hash dedup), scraper is healthy
const CRITICAL_HOURS = 168; const CRITICAL_HOURS = 168;
const WARN_HOURS = 48; const WARN_HOURS = 48;
@ -743,17 +772,27 @@ export async function registerWorkers(boss: PgBoss): Promise<void> {
for (const row of priceResult.rows) { for (const row of priceResult.rows) {
const h = parseFloat(row.hours_since ?? "9999"); const h = parseFloat(row.hours_since ?? "9999");
const n = parseInt(row.prices_6h ?? "0", 10); const n = parseInt(row.prices_6h ?? "0", 10);
if (n > 0) continue; // new prices written → healthy if (n > 0) continue; // new prices written → definitely healthy
const lastStr = row.last_seen const lastStr = row.last_seen
? `last price ${h.toFixed(1)}h ago (${new Date(row.last_seen as string).toISOString().slice(0, 16)})` ? `last price ${h.toFixed(1)}h ago (${new Date(row.last_seen as string).toISOString().slice(0, 16)})`
: "NEVER scraped"; : "NEVER scraped";
const vendor = EXPECTED_VENDORS.find((v) => v.name === row.name); const vendor = EXPECTED_VENDORS.find((v) => v.name === row.name);
const jobInfo = vendor ? jobMap.get(vendor.jobName) : undefined; const jobInfo = vendor ? jobMap.get(vendor.jobName) : undefined;
const jobStr = jobInfo const lastCompleted = vendor ? completedMap.get(vendor.jobName) : undefined;
const jobStr = jobInfo
? ` | job=${jobInfo.state} at ${jobInfo.completed_on ? new Date(jobInfo.completed_on).toISOString().slice(11, 16) : "?"}` ? ` | job=${jobInfo.state} at ${jobInfo.completed_on ? new Date(jobInfo.completed_on).toISOString().slice(11, 16) : "?"}`
: " | job=not run in 12h"; : " | job=not run in 26h";
// If the job completed successfully in the last 26h AND the vendor has
// historical prices, prices are just stable (hash dedup) — not an outage.
const jobRunningOk = !!lastCompleted && row.last_seen;
if (jobRunningOk) {
stable.push(`${row.name}: prices stable (${h.toFixed(1)}h unchanged, job OK)${jobStr}`);
continue;
}
if (!row.last_seen || h > CRITICAL_HOURS) { if (!row.last_seen || h > CRITICAL_HOURS) {
critical.push(`🔴 ${row.name}: ${lastStr}${jobStr}`); critical.push(`🔴 ${row.name}: ${lastStr}${jobStr}`);