fix: instance-level Crawlee storage isolation + eBay vendor type

- Add utils/crawlee-config.ts: makeCrawleeConfig(name) returns a
  Crawlee Configuration with isolated localDataDirectory per scraper.
  Uses storageClientOptions (not global CRAWLEE_STORAGE_DIR) so
  concurrent pg-boss workers in the same process don't race on
  the shared env var.

- Apply makeCrawleeConfig to all 6 Crawlee-based scrapers:
  optcore (PlaywrightCrawler), atgbics (PlaywrightCrawler),
  community-issues (CheerioCrawler + RequestQueue),
  edgecore (CheerioCrawler), ufispace (CheerioCrawler),
  market-intelligence (CheerioCrawler).

- scheduler.ts: add withIsolatedStorage for optcore and market-intel
  workers (was missing, caused storage-fs path bleed from fs scraper).

- ebay-enricher.ts: fix vendor type 'marketplace' -> 'reseller' to
  satisfy vendors_type_check constraint
  ['manufacturer','distributor','oem','reseller','compatible'].
This commit is contained in:
Rene Fichtmueller 2026-04-18 01:35:57 +02:00
parent 9ba8369f18
commit 9965d8e43c
9 changed files with 97 additions and 11 deletions

View File

@ -384,7 +384,7 @@ export async function registerWorkers(boss: PgBoss): Promise<void> {
await boss.work("scrape:pricing:optcore", async () => { await boss.work("scrape:pricing:optcore", async () => {
console.log(`[${new Date().toISOString()}] Running: Optcore pricing`); console.log(`[${new Date().toISOString()}] Running: Optcore pricing`);
const { scrapeOptcore } = await import("./scrapers/optcore"); const { scrapeOptcore } = await import("./scrapers/optcore");
await scrapeOptcore(); await withIsolatedStorage("optcore", scrapeOptcore);
}); });
await boss.work("scrape:pricing:champion-one", async () => { await boss.work("scrape:pricing:champion-one", async () => {
@ -522,7 +522,7 @@ export async function registerWorkers(boss: PgBoss): Promise<void> {
await boss.work("scrape:market-intel", async () => { await boss.work("scrape:market-intel", async () => {
console.log(`[${new Date().toISOString()}] Running: Market intelligence`); console.log(`[${new Date().toISOString()}] Running: Market intelligence`);
await scrapeMarketIntelligence(); await withIsolatedStorage("market-intel", scrapeMarketIntelligence);
}); });
await boss.work("scrape:nog-talks", async () => { await boss.work("scrape:nog-talks", async () => {

View File

@ -16,6 +16,7 @@
* Respects: robots.txt, rate limiting (2s between requests, max 50 pages) * Respects: robots.txt, rate limiting (2s between requests, max 50 pages)
*/ */
import { PlaywrightCrawler, ProxyConfiguration } from "crawlee"; import { PlaywrightCrawler, ProxyConfiguration } from "crawlee";
import { makeCrawleeConfig } from "../utils/crawlee-config";
import { ensureVendor, upsertPriceObservation, findOrCreateScrapedTransceiver, pool } from "../utils/db"; import { ensureVendor, upsertPriceObservation, findOrCreateScrapedTransceiver, pool } from "../utils/db";
import { contentHash, parsePrice, parseStockLevel, parseQuantity } from "../utils/hash"; import { contentHash, parsePrice, parseStockLevel, parseQuantity } from "../utils/hash";
@ -366,7 +367,7 @@ export async function scrapeAtgbics(): Promise<void> {
pagesScraped++; pagesScraped++;
} }
}, },
}); }, makeCrawleeConfig("atgbics"));
const startUrls = CATEGORY_URLS.map((path) => `${BASE_URL}${path}`); const startUrls = CATEGORY_URLS.map((path) => `${BASE_URL}${path}`);
await crawler.run(startUrls); await crawler.run(startUrls);

View File

@ -14,6 +14,7 @@
*/ */
import { CheerioCrawler, RequestQueue } from "crawlee"; import { CheerioCrawler, RequestQueue } from "crawlee";
import { makeCrawleeConfig } from "../utils/crawlee-config";
import { extractMarketIntel } from "../crawler-llm/core"; import { extractMarketIntel } from "../crawler-llm/core";
import { db as pool } from "../utils/db"; import { db as pool } from "../utils/db";
import { logger } from "../utils/logger"; import { logger } from "../utils/logger";
@ -186,7 +187,9 @@ export async function scrapeProductIssues(
models: string[], models: string[],
sourceLimit = 3 sourceLimit = 3
): Promise<void> { ): Promise<void> {
const queue = await RequestQueue.open("community-issues"); // Use isolated Crawlee config to prevent concurrent-worker storage conflicts
const crawleeConfig = makeCrawleeConfig("community-issues");
const queue = await RequestQueue.open(null, { config: crawleeConfig });
// Add search requests for each model × source combination // Add search requests for each model × source combination
for (const model of models) { for (const model of models) {
@ -264,7 +267,7 @@ If no issues found, return []`;
failedRequestHandler: ({ request, error }) => { failedRequestHandler: ({ request, error }) => {
logger.warn(`Community scraper failed: ${request.url}`, { error }); logger.warn(`Community scraper failed: ${request.url}`, { error });
}, },
}); }, crawleeConfig);
await crawler.run(); await crawler.run();
logger.info(`Community issues scraping complete for ${models.length} models`); logger.info(`Community issues scraping complete for ${models.length} models`);

View File

@ -286,7 +286,7 @@ async function saveEnrichment(switchId: string, result: EnrichResult): Promise<v
// Find eBay vendor ID (create if needed) // Find eBay vendor ID (create if needed)
const ebayVendorResult = await pool.query( const ebayVendorResult = await pool.query(
`INSERT INTO vendors (name, slug, type, website) `INSERT INTO vendors (name, slug, type, website)
VALUES ('eBay Marketplace', 'ebay', 'marketplace', 'https://www.ebay.de') VALUES ('eBay Marketplace', 'ebay', 'reseller', 'https://www.ebay.de')
ON CONFLICT (slug) DO UPDATE SET name = EXCLUDED.name ON CONFLICT (slug) DO UPDATE SET name = EXCLUDED.name
RETURNING id` RETURNING id`
); );
@ -354,7 +354,7 @@ export async function enrichTransceiversFromEbay(limit = 50): Promise<void> {
// Find eBay vendor // Find eBay vendor
const ebayVendor = await pool.query( const ebayVendor = await pool.query(
`INSERT INTO vendors (name, slug, type, website) `INSERT INTO vendors (name, slug, type, website)
VALUES ('eBay Marketplace', 'ebay', 'marketplace', 'https://www.ebay.de') VALUES ('eBay Marketplace', 'ebay', 'reseller', 'https://www.ebay.de')
ON CONFLICT (slug) DO UPDATE SET updated_at = NOW() ON CONFLICT (slug) DO UPDATE SET updated_at = NOW()
RETURNING id` RETURNING id`
); );

View File

@ -9,6 +9,7 @@
* Source: https://www.edge-core.com/productsList.php?cls=1 * Source: https://www.edge-core.com/productsList.php?cls=1
*/ */
import { CheerioCrawler } from "crawlee"; import { CheerioCrawler } from "crawlee";
import { makeCrawleeConfig } from "../utils/crawlee-config";
import { pool, ensureWhiteboxVendor, findOrCreateSwitch } from "../utils/db"; import { pool, ensureWhiteboxVendor, findOrCreateSwitch } from "../utils/db";
const BASE_URL = "https://www.edge-core.com"; const BASE_URL = "https://www.edge-core.com";
@ -193,7 +194,7 @@ export async function scrapeEdgecore(): Promise<void> {
failedRequestHandler({ request }) { failedRequestHandler({ request }) {
console.error(` ! Failed: ${request.url}`); console.error(` ! Failed: ${request.url}`);
}, },
}); }, makeCrawleeConfig("edgecore"));
await crawler.run([PRODUCT_LIST_URL]); await crawler.run([PRODUCT_LIST_URL]);
console.log(`\n Created: ${created}, Updated: ${updated}\n`); console.log(`\n Created: ${created}, Updated: ${updated}\n`);

View File

@ -14,6 +14,7 @@
*/ */
import { CheerioCrawler } from "crawlee"; import { CheerioCrawler } from "crawlee";
import { makeCrawleeConfig } from "../utils/crawlee-config";
import { extractMarketIntel } from "../crawler-llm/core"; import { extractMarketIntel } from "../crawler-llm/core";
import { pool } from "../utils/db"; import { pool } from "../utils/db";
@ -140,7 +141,7 @@ export async function scrapeMarketIntelligence(): Promise<void> {
async failedRequestHandler({ request }) { async failedRequestHandler({ request }) {
console.warn(`[market-intel] Failed: ${request.url}`); console.warn(`[market-intel] Failed: ${request.url}`);
}, },
}); }, makeCrawleeConfig("market-intel"));
await crawler.addRequests(SOURCES.map((s) => ({ url: s.url }))); await crawler.addRequests(SOURCES.map((s) => ({ url: s.url })));
await crawler.run(); await crawler.run();

View File

@ -9,6 +9,7 @@
* (JS lazy-loading) static HTML has no product data. * (JS lazy-loading) static HTML has no product data.
*/ */
import { PlaywrightCrawler } from "crawlee"; import { PlaywrightCrawler } from "crawlee";
import { makeCrawleeConfig } from "../utils/crawlee-config";
import { ensureVendor, upsertPriceObservation, findOrCreateScrapedTransceiver, pool } from "../utils/db"; import { ensureVendor, upsertPriceObservation, findOrCreateScrapedTransceiver, pool } from "../utils/db";
import { contentHash, parsePrice, parseStockLevel } from "../utils/hash"; import { contentHash, parsePrice, parseStockLevel } from "../utils/hash";
@ -242,7 +243,7 @@ export async function scrapeOptcore(): Promise<void> {
pagesScraped++; pagesScraped++;
}, },
}); }, makeCrawleeConfig("optcore"));
const urls = productMeta.map((p) => p.url); const urls = productMeta.map((p) => p.url);
await crawler.run(urls); await crawler.run(urls);

View File

@ -7,6 +7,7 @@
* Source: https://www.ufispace.com/products/datacenter-switches * Source: https://www.ufispace.com/products/datacenter-switches
*/ */
import { CheerioCrawler } from "crawlee"; import { CheerioCrawler } from "crawlee";
import { makeCrawleeConfig } from "../utils/crawlee-config";
import { pool, ensureWhiteboxVendor, findOrCreateSwitch } from "../utils/db"; import { pool, ensureWhiteboxVendor, findOrCreateSwitch } from "../utils/db";
const BASE_URL = "https://www.ufispace.com"; const BASE_URL = "https://www.ufispace.com";
@ -191,7 +192,7 @@ export async function scrapeUfiSpace(): Promise<void> {
failedRequestHandler({ request }) { failedRequestHandler({ request }) {
console.error(` ! Failed: ${request.url}`); console.error(` ! Failed: ${request.url}`);
}, },
}); }, makeCrawleeConfig("ufispace"));
await crawler.run(PRODUCT_URLS); await crawler.run(PRODUCT_URLS);
console.log(`\n Created: ${created}, Updated: ${updated}\n`); console.log(`\n Created: ${created}, Updated: ${updated}\n`);

View File

@ -0,0 +1,78 @@
/**
* Crawlee instance-level storage isolation.
*
* WHY THIS EXISTS:
* ----------------
* All pg-boss workers run concurrently inside a single Node.js process.
* The old approach (setting process.env.CRAWLEE_STORAGE_DIR) is a global
* env-var mutation if two workers run simultaneously, one worker's
* writeToEnv() can overwrite another's before the crawler reads it.
*
* The fix: pass a `Configuration` instance directly to each Crawlee
* constructor. This is instance-level (not global), so concurrent scrapers
* each get their own isolated storage directory.
*
* Usage:
* import { makeCrawleeConfig } from "../utils/crawlee-config";
*
* // In PlaywrightCrawler:
* const crawler = new PlaywrightCrawler({ ... }, makeCrawleeConfig("optcore"));
*
* // In CheerioCrawler:
* const crawler = new CheerioCrawler({ ... }, makeCrawleeConfig("edgecore"));
*
* // With explicit RequestQueue (community-issues pattern):
* const cfg = makeCrawleeConfig("community-issues");
* const queue = await RequestQueue.open(null, { config: cfg });
* const crawler = new CheerioCrawler({ requestQueue: queue, ... }, cfg);
*/
import { Configuration } from "crawlee";
import { join } from "node:path";
import { mkdirSync, existsSync, writeFileSync } from "node:fs";
/** Absolute path to the per-scraper Crawlee storage root on disk. */
export function crawleeStorageDir(scraperName: string): string {
// dist layout: packages/scraper/dist/utils/ → go 4 levels up → repo root
// Then store beside packages/ as storage-<name>/
return join(__dirname, "..", "..", "..", "..", `storage-${scraperName}`);
}
/**
* Create and return a Crawlee Configuration with an isolated storageDir.
*
* Idempotent safe to call every scraper run:
* - Directories are created if they don't exist (recursive: true).
* - SDK_SESSION_POOL_STATE.json is seeded once so Crawlee v3.16 doesn't
* throw "Could not find file" on first run (this version reads before
* writing on session-pool init).
*/
export function makeCrawleeConfig(scraperName: string): Configuration {
const storageDir = crawleeStorageDir(scraperName);
// Pre-create internal directory tree
mkdirSync(join(storageDir, "request_queues", "default"), { recursive: true });
mkdirSync(join(storageDir, "datasets", "default"), { recursive: true });
mkdirSync(join(storageDir, "key_value_stores", "default"), { recursive: true });
// Seed empty session-pool state to avoid "Could not find file" crash in v3.16
const sessionFile = join(storageDir, "key_value_stores", "default", "SDK_SESSION_POOL_STATE.json");
if (!existsSync(sessionFile)) {
writeFileSync(sessionFile, JSON.stringify({
usableSessionsCount: 0,
retiredSessionsCount: 0,
sessions: [],
}));
}
// `localDataDirectory` is the MemoryStorage option for the base storage path.
// Pass it via `storageClientOptions` so the Configuration uses this path
// instance-locally (not via global CRAWLEE_STORAGE_DIR env var).
return new Configuration({
storageClientOptions: {
localDataDirectory: storageDir,
persistStorage: true,
},
purgeOnStart: false, // Preserve session pool state between runs
});
}