- Migration 020: product_issues table, condition/marketplace on price_observations, features JSONB - eBay enricher: switch features/description/refurb prices + transceiver condition pricing - Community issues scraper: Reddit/ServeTheHome/Arista/Cisco community bug reports - 7 pre-seeded issues (DCS-7800R3, SG350, QFX5120, CRS326, USW-Pro etc.) - API: /switches/:id/issues + /switches/:id/documents endpoints - Dashboard switch modal: features from DB, description, eBay refurb price, issues+docs async - Datasheet finder for Arista/Cisco/Juniper/HPE vendor pages - Scheduler: 4 new jobs (ebay enrichment nightly, community issues weekly)
304 lines
11 KiB
TypeScript
304 lines
11 KiB
TypeScript
/**
|
|
* pg-boss Job Scheduler — manages scrape jobs with adaptive timing.
|
|
*
|
|
* Job types:
|
|
* scrape:pricing:fs — Every 4 hours for FS.com prices/stock
|
|
* scrape:pricing:optcore — Every 6 hours for Optcore prices/stock
|
|
* scrape:pricing:atgbics — Every 8 hours for ATGBICS prices/stock (GBP)
|
|
* scrape:pricing:prolabs — Every 8 hours for ProLabs prices/stock (USD)
|
|
* scrape:compat:cisco — Weekly for OEM compatibility matrices
|
|
* scrape:news — Every 6 hours for trade press and news
|
|
* scrape:docs — Weekly for manuals and datasheets
|
|
* scrape:faq — Weekly for vendor FAQ/troubleshooting pages
|
|
*/
|
|
import PgBoss from "pg-boss";
|
|
import { config } from "dotenv";
|
|
import { join } from "path";
|
|
import { rmSync, mkdirSync } from "fs";
|
|
|
|
/** Run a scraper with an isolated Crawlee storage directory to prevent queue collisions */
|
|
async function withIsolatedStorage(name: string, fn: () => Promise<void>): Promise<void> {
|
|
const dir = join(__dirname, "..", "..", "..", `storage-${name}`);
|
|
mkdirSync(dir, { recursive: true });
|
|
const prev = process.env.CRAWLEE_STORAGE_DIR;
|
|
process.env.CRAWLEE_STORAGE_DIR = dir;
|
|
try {
|
|
await fn();
|
|
} finally {
|
|
process.env.CRAWLEE_STORAGE_DIR = prev ?? "";
|
|
// Clean up after successful run
|
|
try { rmSync(dir, { recursive: true, force: true }); } catch { /* ignore */ }
|
|
}
|
|
}
|
|
|
|
config({ path: join(__dirname, "..", "..", "..", ".env") });
|
|
|
|
const connectionString = `postgres://${process.env.POSTGRES_USER || "tip"}:${process.env.POSTGRES_PASSWORD || "tip_dev_2026"}@${process.env.POSTGRES_HOST || "localhost"}:${process.env.POSTGRES_PORT || "5433"}/${process.env.POSTGRES_DB || "transceiver_db"}`;
|
|
|
|
export async function createScheduler(): Promise<PgBoss> {
|
|
const boss = new PgBoss({
|
|
connectionString,
|
|
retryLimit: 3,
|
|
retryDelay: 30,
|
|
retryBackoff: true,
|
|
expireInSeconds: 300, // 5 min timeout per job
|
|
monitorStateIntervalSeconds: 30,
|
|
});
|
|
|
|
boss.on("error", (error) => console.error("pg-boss error:", error));
|
|
|
|
await boss.start();
|
|
console.log("pg-boss scheduler started");
|
|
|
|
return boss;
|
|
}
|
|
|
|
export async function registerSchedules(boss: PgBoss): Promise<void> {
|
|
// pg-boss v10: create queues before scheduling
|
|
const queues = [
|
|
"scrape:pricing:fs",
|
|
"scrape:pricing:optcore",
|
|
"scrape:pricing:10gtek",
|
|
"scrape:pricing:atgbics",
|
|
"scrape:pricing:prolabs",
|
|
"scrape:compat:cisco",
|
|
"scrape:pricing:flexoptix",
|
|
"scrape:vendors:flexoptix",
|
|
"scrape:news",
|
|
"scrape:faq",
|
|
"scrape:docs",
|
|
"scrape:market-intel",
|
|
"compute:abc",
|
|
"compute:reorder-signals",
|
|
"enrich:ebay-switches",
|
|
"enrich:ebay-transceivers",
|
|
"scrape:community-issues",
|
|
"scrape:datasheet-links",
|
|
];
|
|
for (const q of queues) {
|
|
await boss.createQueue(q).catch(() => { /* already exists */ });
|
|
}
|
|
|
|
// v0.2.0: Increased frequencies for permanent price monitoring (R-SCAN)
|
|
|
|
// FS.com pricing (every 4 hours — JS rendering is slow)
|
|
await boss.schedule("scrape:pricing:fs", "0 */4 * * *", {}, {
|
|
retryLimit: 2,
|
|
expireInSeconds: 3600,
|
|
});
|
|
|
|
// Optcore pricing (every 4 hours — was 6h)
|
|
await boss.schedule("scrape:pricing:optcore", "0 2/4 * * *", {}, {
|
|
retryLimit: 2,
|
|
expireInSeconds: 7200,
|
|
});
|
|
|
|
// Compatibility matrices (every Sunday at 3am)
|
|
await boss.schedule("scrape:compat:cisco", "0 3 * * 0", {}, {
|
|
retryLimit: 3,
|
|
expireInSeconds: 3600,
|
|
});
|
|
|
|
// News aggregation (every 6 hours)
|
|
await boss.schedule("scrape:news", "0 */6 * * *", {}, {
|
|
retryLimit: 2,
|
|
expireInSeconds: 1800,
|
|
});
|
|
|
|
// FAQ/KB scraping (every Wednesday at 2am)
|
|
await boss.schedule("scrape:faq", "0 2 * * 3", {}, {
|
|
retryLimit: 3,
|
|
expireInSeconds: 3600,
|
|
});
|
|
|
|
// 10Gtek pricing (every 8 hours — Playwright, reasonable rate)
|
|
await boss.schedule("scrape:pricing:10gtek", "0 */8 * * *", {}, {
|
|
retryLimit: 2,
|
|
expireInSeconds: 3600,
|
|
});
|
|
|
|
// ATGBICS pricing (every 8 hours — Shopify/Playwright, GBP prices)
|
|
await boss.schedule("scrape:pricing:atgbics", "0 2/8 * * *", {}, {
|
|
retryLimit: 2,
|
|
expireInSeconds: 3600,
|
|
});
|
|
|
|
// ProLabs pricing (every 8 hours — Playwright, needs proxy for CloudFront)
|
|
await boss.schedule("scrape:pricing:prolabs", "0 4/8 * * *", {}, {
|
|
retryLimit: 2,
|
|
expireInSeconds: 3600,
|
|
});
|
|
|
|
// Flexoptix catalog (every 2 hours — fetch-based, fast — R-SCAN requirement)
|
|
await boss.schedule("scrape:pricing:flexoptix", "0 */2 * * *", {}, {
|
|
retryLimit: 2,
|
|
expireInSeconds: 3600,
|
|
});
|
|
|
|
// Flexoptix vendor list (weekly, Sunday at 6am — own data)
|
|
await boss.schedule("scrape:vendors:flexoptix", "0 6 * * 0", {}, {
|
|
retryLimit: 3,
|
|
expireInSeconds: 600,
|
|
});
|
|
|
|
// Document/datasheet check (every Saturday at 4am)
|
|
await boss.schedule("scrape:docs", "0 4 * * 6", {}, {
|
|
retryLimit: 3,
|
|
expireInSeconds: 7200,
|
|
});
|
|
|
|
// Market intelligence: OFC/ECOC, IEEE, TED, Farnell/Mouser lead times (every Tuesday 5am)
|
|
await boss.schedule("scrape:market-intel", "0 5 * * 2", {}, {
|
|
retryLimit: 2,
|
|
expireInSeconds: 3600,
|
|
});
|
|
|
|
// ABC classification recompute (after each major pricing run — daily at 8am)
|
|
await boss.schedule("compute:abc", "0 8 * * *", {}, {
|
|
retryLimit: 2,
|
|
expireInSeconds: 600,
|
|
});
|
|
|
|
// Reorder signals recompute (daily at 8:30am — after ABC)
|
|
await boss.schedule("compute:reorder-signals", "30 8 * * *", {}, {
|
|
retryLimit: 2,
|
|
expireInSeconds: 600,
|
|
});
|
|
|
|
// eBay switch enrichment: features, descriptions, refurb prices (nightly at 1am)
|
|
await boss.schedule("enrich:ebay-switches", "0 1 * * *", {}, {
|
|
retryLimit: 1,
|
|
expireInSeconds: 7200,
|
|
});
|
|
|
|
// eBay transceiver pricing with condition (nightly at 2am)
|
|
await boss.schedule("enrich:ebay-transceivers", "0 2 * * *", {}, {
|
|
retryLimit: 1,
|
|
expireInSeconds: 7200,
|
|
});
|
|
|
|
// Community issues scraping: Reddit/forums for known bugs (weekly on Sunday 4am)
|
|
await boss.schedule("scrape:community-issues", "0 4 * * 0", {}, {
|
|
retryLimit: 1,
|
|
expireInSeconds: 3600,
|
|
});
|
|
|
|
// Datasheet link discovery (weekly on Monday 6am)
|
|
await boss.schedule("scrape:datasheet-links", "0 6 * * 1", {}, {
|
|
retryLimit: 1,
|
|
expireInSeconds: 3600,
|
|
});
|
|
|
|
console.log("All schedules registered");
|
|
}
|
|
|
|
export async function registerWorkers(boss: PgBoss): Promise<void> {
|
|
// Lazy-load scrapers to avoid circular deps
|
|
const { scrapeFs } = await import("./scrapers/fs-com");
|
|
const { scrapeCiscoTmg } = await import("./scrapers/cisco-tmg");
|
|
const { scrapeOptcore } = await import("./scrapers/optcore");
|
|
const { scrape10Gtek } = await import("./scrapers/tenGtek");
|
|
const { scrapeFlexoptixCatalog } = await import("./scrapers/flexoptix-catalog");
|
|
const { scrapeFlexoptixVendors } = await import("./scrapers/flexoptix-vendors");
|
|
const { scrapeNews } = await import("./scrapers/news");
|
|
const { scrapeAtgbics } = await import("./scrapers/atgbics");
|
|
const { scrapeProLabs } = await import("./scrapers/prolabs");
|
|
|
|
await boss.work("scrape:pricing:fs", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Running: FS.com pricing`);
|
|
await withIsolatedStorage("fs", scrapeFs);
|
|
});
|
|
|
|
await boss.work("scrape:pricing:optcore", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Running: Optcore pricing`);
|
|
await withIsolatedStorage("optcore", scrapeOptcore);
|
|
});
|
|
|
|
await boss.work("scrape:compat:cisco", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Running: Cisco TMG`);
|
|
await withIsolatedStorage("cisco", scrapeCiscoTmg);
|
|
});
|
|
|
|
await boss.work("scrape:pricing:10gtek", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Running: 10Gtek pricing`);
|
|
await withIsolatedStorage("10gtek", scrape10Gtek);
|
|
});
|
|
|
|
await boss.work("scrape:pricing:flexoptix", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Running: Flexoptix catalog pricing`);
|
|
await scrapeFlexoptixCatalog();
|
|
});
|
|
|
|
await boss.work("scrape:vendors:flexoptix", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Running: Flexoptix vendor list`);
|
|
await scrapeFlexoptixVendors();
|
|
});
|
|
|
|
await boss.work("scrape:news", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Running: News aggregation`);
|
|
await scrapeNews();
|
|
});
|
|
|
|
await boss.work("scrape:pricing:atgbics", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Running: ATGBICS pricing`);
|
|
await withIsolatedStorage("atgbics", scrapeAtgbics);
|
|
});
|
|
|
|
await boss.work("scrape:pricing:prolabs", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Running: ProLabs pricing`);
|
|
await withIsolatedStorage("prolabs", scrapeProLabs);
|
|
});
|
|
|
|
await boss.work("scrape:faq", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] FAQ scraper — not yet implemented`);
|
|
});
|
|
|
|
await boss.work("scrape:docs", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Docs scraper — not yet implemented`);
|
|
});
|
|
|
|
await boss.work("scrape:market-intel", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Running: Market intelligence`);
|
|
const { scrapeMarketIntelligence } = await import("./scrapers/market-intelligence");
|
|
await withIsolatedStorage("market-intel", scrapeMarketIntelligence);
|
|
});
|
|
|
|
await boss.work("compute:abc", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Computing: ABC classification`);
|
|
const { computeAbcClassification } = await import("./scrapers/market-intelligence");
|
|
await computeAbcClassification();
|
|
});
|
|
|
|
await boss.work("compute:reorder-signals", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Computing: Reorder signals`);
|
|
const { computeReorderSignals } = await import("./scrapers/market-intelligence");
|
|
await computeReorderSignals();
|
|
});
|
|
|
|
await boss.work("enrich:ebay-switches", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Running: eBay switch enrichment`);
|
|
const { enrichSwitchesFromEbay } = await import("./scrapers/ebay-enricher");
|
|
await withIsolatedStorage("ebay-switches", () => enrichSwitchesFromEbay(30));
|
|
});
|
|
|
|
await boss.work("enrich:ebay-transceivers", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Running: eBay transceiver pricing`);
|
|
const { enrichTransceiversFromEbay } = await import("./scrapers/ebay-enricher");
|
|
await withIsolatedStorage("ebay-transceivers", () => enrichTransceiversFromEbay(100));
|
|
});
|
|
|
|
await boss.work("scrape:community-issues", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Running: Community issues scraping`);
|
|
const { scrapeAllSwitchIssues } = await import("./scrapers/community-issues");
|
|
await withIsolatedStorage("community-issues", () => scrapeAllSwitchIssues(30));
|
|
});
|
|
|
|
await boss.work("scrape:datasheet-links", async (_job) => {
|
|
console.log(`[${new Date().toISOString()}] Running: Datasheet link discovery`);
|
|
const { findAndSeedDatasheetLinks } = await import("./scrapers/community-issues");
|
|
await findAndSeedDatasheetLinks(50);
|
|
});
|
|
|
|
console.log("All workers registered");
|
|
}
|