/** * pg-boss Job Scheduler — manages scrape jobs with adaptive timing. * * Job types: * scrape:pricing:fs — Every 4 hours for FS.com prices/stock * scrape:pricing:optcore — Every 6 hours for Optcore prices/stock * scrape:pricing:atgbics — Every 8 hours for ATGBICS prices/stock (GBP) * scrape:pricing:prolabs — Every 8 hours for ProLabs prices/stock (USD) * scrape:compat:cisco — Weekly for OEM compatibility matrices * scrape:news — Every 6 hours for trade press and news * scrape:docs — Weekly for manuals and datasheets * scrape:faq — Weekly for vendor FAQ/troubleshooting pages */ import PgBoss from "pg-boss"; import { config } from "dotenv"; import { join } from "path"; import { rmSync, mkdirSync } from "fs"; /** Run a scraper with an isolated Crawlee storage directory to prevent queue collisions */ async function withIsolatedStorage(name: string, fn: () => Promise): Promise { const dir = join(__dirname, "..", "..", "..", `storage-${name}`); mkdirSync(dir, { recursive: true }); const prev = process.env.CRAWLEE_STORAGE_DIR; process.env.CRAWLEE_STORAGE_DIR = dir; try { await fn(); } finally { process.env.CRAWLEE_STORAGE_DIR = prev ?? ""; // Clean up after successful run try { rmSync(dir, { recursive: true, force: true }); } catch { /* ignore */ } } } config({ path: join(__dirname, "..", "..", "..", ".env") }); const connectionString = `postgres://${process.env.POSTGRES_USER || "tip"}:${process.env.POSTGRES_PASSWORD || "tip_dev_2026"}@${process.env.POSTGRES_HOST || "localhost"}:${process.env.POSTGRES_PORT || "5433"}/${process.env.POSTGRES_DB || "transceiver_db"}`; export async function createScheduler(): Promise { const boss = new PgBoss({ connectionString, retryLimit: 3, retryDelay: 30, retryBackoff: true, expireInSeconds: 300, // 5 min timeout per job monitorStateIntervalSeconds: 30, }); boss.on("error", (error) => console.error("pg-boss error:", error)); await boss.start(); console.log("pg-boss scheduler started"); return boss; } export async function registerSchedules(boss: PgBoss): Promise { // pg-boss v10: create queues before scheduling const queues = [ "scrape:pricing:fs", "scrape:pricing:optcore", "scrape:pricing:10gtek", "scrape:pricing:atgbics", "scrape:pricing:prolabs", "scrape:compat:cisco", "scrape:pricing:flexoptix", "scrape:vendors:flexoptix", "scrape:news", "scrape:faq", "scrape:docs", ]; for (const q of queues) { await boss.createQueue(q).catch(() => { /* already exists */ }); } // v0.2.0: Increased frequencies for permanent price monitoring (R-SCAN) // FS.com pricing (every 4 hours — JS rendering is slow) await boss.schedule("scrape:pricing:fs", "0 */4 * * *", {}, { retryLimit: 2, expireInSeconds: 3600, }); // Optcore pricing (every 4 hours — was 6h) await boss.schedule("scrape:pricing:optcore", "0 2/4 * * *", {}, { retryLimit: 2, expireInSeconds: 7200, }); // Compatibility matrices (every Sunday at 3am) await boss.schedule("scrape:compat:cisco", "0 3 * * 0", {}, { retryLimit: 3, expireInSeconds: 3600, }); // News aggregation (every 6 hours) await boss.schedule("scrape:news", "0 */6 * * *", {}, { retryLimit: 2, expireInSeconds: 1800, }); // FAQ/KB scraping (every Wednesday at 2am) await boss.schedule("scrape:faq", "0 2 * * 3", {}, { retryLimit: 3, expireInSeconds: 3600, }); // 10Gtek pricing (every 8 hours — Playwright, reasonable rate) await boss.schedule("scrape:pricing:10gtek", "0 */8 * * *", {}, { retryLimit: 2, expireInSeconds: 3600, }); // ATGBICS pricing (every 8 hours — Shopify/Playwright, GBP prices) await boss.schedule("scrape:pricing:atgbics", "0 2/8 * * *", {}, { retryLimit: 2, expireInSeconds: 3600, }); // ProLabs pricing (every 8 hours — Playwright, needs proxy for CloudFront) await boss.schedule("scrape:pricing:prolabs", "0 4/8 * * *", {}, { retryLimit: 2, expireInSeconds: 3600, }); // Flexoptix catalog (every 2 hours — fetch-based, fast — R-SCAN requirement) await boss.schedule("scrape:pricing:flexoptix", "0 */2 * * *", {}, { retryLimit: 2, expireInSeconds: 3600, }); // Flexoptix vendor list (weekly, Sunday at 6am — own data) await boss.schedule("scrape:vendors:flexoptix", "0 6 * * 0", {}, { retryLimit: 3, expireInSeconds: 600, }); // Document/datasheet check (every Saturday at 4am) await boss.schedule("scrape:docs", "0 4 * * 6", {}, { retryLimit: 3, expireInSeconds: 7200, }); console.log("All schedules registered"); } export async function registerWorkers(boss: PgBoss): Promise { // Lazy-load scrapers to avoid circular deps const { scrapeFs } = await import("./scrapers/fs-com"); const { scrapeCiscoTmg } = await import("./scrapers/cisco-tmg"); const { scrapeOptcore } = await import("./scrapers/optcore"); const { scrape10Gtek } = await import("./scrapers/tenGtek"); const { scrapeFlexoptixCatalog } = await import("./scrapers/flexoptix-catalog"); const { scrapeFlexoptixVendors } = await import("./scrapers/flexoptix-vendors"); const { scrapeNews } = await import("./scrapers/news"); const { scrapeAtgbics } = await import("./scrapers/atgbics"); const { scrapeProLabs } = await import("./scrapers/prolabs"); await boss.work("scrape:pricing:fs", async (_job) => { console.log(`[${new Date().toISOString()}] Running: FS.com pricing`); await withIsolatedStorage("fs", scrapeFs); }); await boss.work("scrape:pricing:optcore", async (_job) => { console.log(`[${new Date().toISOString()}] Running: Optcore pricing`); await withIsolatedStorage("optcore", scrapeOptcore); }); await boss.work("scrape:compat:cisco", async (_job) => { console.log(`[${new Date().toISOString()}] Running: Cisco TMG`); await withIsolatedStorage("cisco", scrapeCiscoTmg); }); await boss.work("scrape:pricing:10gtek", async (_job) => { console.log(`[${new Date().toISOString()}] Running: 10Gtek pricing`); await withIsolatedStorage("10gtek", scrape10Gtek); }); await boss.work("scrape:pricing:flexoptix", async (_job) => { console.log(`[${new Date().toISOString()}] Running: Flexoptix catalog pricing`); await scrapeFlexoptixCatalog(); }); await boss.work("scrape:vendors:flexoptix", async (_job) => { console.log(`[${new Date().toISOString()}] Running: Flexoptix vendor list`); await scrapeFlexoptixVendors(); }); await boss.work("scrape:news", async (_job) => { console.log(`[${new Date().toISOString()}] Running: News aggregation`); await scrapeNews(); }); await boss.work("scrape:pricing:atgbics", async (_job) => { console.log(`[${new Date().toISOString()}] Running: ATGBICS pricing`); await withIsolatedStorage("atgbics", scrapeAtgbics); }); await boss.work("scrape:pricing:prolabs", async (_job) => { console.log(`[${new Date().toISOString()}] Running: ProLabs pricing`); await withIsolatedStorage("prolabs", scrapeProLabs); }); await boss.work("scrape:faq", async (_job) => { console.log(`[${new Date().toISOString()}] FAQ scraper — not yet implemented`); }); await boss.work("scrape:docs", async (_job) => { console.log(`[${new Date().toISOString()}] Docs scraper — not yet implemented`); }); console.log("All workers registered"); }