fix: instance-level Crawlee storage isolation + eBay vendor type
- Add utils/crawlee-config.ts: makeCrawleeConfig(name) returns a Crawlee Configuration with isolated localDataDirectory per scraper. Uses storageClientOptions (not global CRAWLEE_STORAGE_DIR) so concurrent pg-boss workers in the same process don't race on the shared env var. - Apply makeCrawleeConfig to all 6 Crawlee-based scrapers: optcore (PlaywrightCrawler), atgbics (PlaywrightCrawler), community-issues (CheerioCrawler + RequestQueue), edgecore (CheerioCrawler), ufispace (CheerioCrawler), market-intelligence (CheerioCrawler). - scheduler.ts: add withIsolatedStorage for optcore and market-intel workers (was missing, caused storage-fs path bleed from fs scraper). - ebay-enricher.ts: fix vendor type 'marketplace' -> 'reseller' to satisfy vendors_type_check constraint ['manufacturer','distributor','oem','reseller','compatible'].
This commit is contained in:
parent
4b751a771b
commit
c7d7456de9
@ -384,7 +384,7 @@ export async function registerWorkers(boss: PgBoss): Promise<void> {
|
||||
await boss.work("scrape:pricing:optcore", async () => {
|
||||
console.log(`[${new Date().toISOString()}] Running: Optcore pricing`);
|
||||
const { scrapeOptcore } = await import("./scrapers/optcore");
|
||||
await scrapeOptcore();
|
||||
await withIsolatedStorage("optcore", scrapeOptcore);
|
||||
});
|
||||
|
||||
await boss.work("scrape:pricing:champion-one", async () => {
|
||||
@ -522,7 +522,7 @@ export async function registerWorkers(boss: PgBoss): Promise<void> {
|
||||
|
||||
await boss.work("scrape:market-intel", async () => {
|
||||
console.log(`[${new Date().toISOString()}] Running: Market intelligence`);
|
||||
await scrapeMarketIntelligence();
|
||||
await withIsolatedStorage("market-intel", scrapeMarketIntelligence);
|
||||
});
|
||||
|
||||
await boss.work("scrape:nog-talks", async () => {
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
* Respects: robots.txt, rate limiting (2s between requests, max 50 pages)
|
||||
*/
|
||||
import { PlaywrightCrawler, ProxyConfiguration } from "crawlee";
|
||||
import { makeCrawleeConfig } from "../utils/crawlee-config";
|
||||
import { ensureVendor, upsertPriceObservation, findOrCreateScrapedTransceiver, pool } from "../utils/db";
|
||||
import { contentHash, parsePrice, parseStockLevel, parseQuantity } from "../utils/hash";
|
||||
|
||||
@ -366,7 +367,7 @@ export async function scrapeAtgbics(): Promise<void> {
|
||||
pagesScraped++;
|
||||
}
|
||||
},
|
||||
});
|
||||
}, makeCrawleeConfig("atgbics"));
|
||||
|
||||
const startUrls = CATEGORY_URLS.map((path) => `${BASE_URL}${path}`);
|
||||
await crawler.run(startUrls);
|
||||
|
||||
@ -14,6 +14,7 @@
|
||||
*/
|
||||
|
||||
import { CheerioCrawler, RequestQueue } from "crawlee";
|
||||
import { makeCrawleeConfig } from "../utils/crawlee-config";
|
||||
import { extractMarketIntel } from "../crawler-llm/core";
|
||||
import { db as pool } from "../utils/db";
|
||||
import { logger } from "../utils/logger";
|
||||
@ -186,7 +187,9 @@ export async function scrapeProductIssues(
|
||||
models: string[],
|
||||
sourceLimit = 3
|
||||
): Promise<void> {
|
||||
const queue = await RequestQueue.open("community-issues");
|
||||
// Use isolated Crawlee config to prevent concurrent-worker storage conflicts
|
||||
const crawleeConfig = makeCrawleeConfig("community-issues");
|
||||
const queue = await RequestQueue.open(null, { config: crawleeConfig });
|
||||
|
||||
// Add search requests for each model × source combination
|
||||
for (const model of models) {
|
||||
@ -264,7 +267,7 @@ If no issues found, return []`;
|
||||
failedRequestHandler: ({ request, error }) => {
|
||||
logger.warn(`Community scraper failed: ${request.url}`, { error });
|
||||
},
|
||||
});
|
||||
}, crawleeConfig);
|
||||
|
||||
await crawler.run();
|
||||
logger.info(`Community issues scraping complete for ${models.length} models`);
|
||||
|
||||
@ -286,7 +286,7 @@ async function saveEnrichment(switchId: string, result: EnrichResult): Promise<v
|
||||
// Find eBay vendor ID (create if needed)
|
||||
const ebayVendorResult = await pool.query(
|
||||
`INSERT INTO vendors (name, slug, type, website)
|
||||
VALUES ('eBay Marketplace', 'ebay', 'marketplace', 'https://www.ebay.de')
|
||||
VALUES ('eBay Marketplace', 'ebay', 'reseller', 'https://www.ebay.de')
|
||||
ON CONFLICT (slug) DO UPDATE SET name = EXCLUDED.name
|
||||
RETURNING id`
|
||||
);
|
||||
@ -354,7 +354,7 @@ export async function enrichTransceiversFromEbay(limit = 50): Promise<void> {
|
||||
// Find eBay vendor
|
||||
const ebayVendor = await pool.query(
|
||||
`INSERT INTO vendors (name, slug, type, website)
|
||||
VALUES ('eBay Marketplace', 'ebay', 'marketplace', 'https://www.ebay.de')
|
||||
VALUES ('eBay Marketplace', 'ebay', 'reseller', 'https://www.ebay.de')
|
||||
ON CONFLICT (slug) DO UPDATE SET updated_at = NOW()
|
||||
RETURNING id`
|
||||
);
|
||||
|
||||
@ -9,6 +9,7 @@
|
||||
* Source: https://www.edge-core.com/productsList.php?cls=1
|
||||
*/
|
||||
import { CheerioCrawler } from "crawlee";
|
||||
import { makeCrawleeConfig } from "../utils/crawlee-config";
|
||||
import { pool, ensureWhiteboxVendor, findOrCreateSwitch } from "../utils/db";
|
||||
|
||||
const BASE_URL = "https://www.edge-core.com";
|
||||
@ -193,7 +194,7 @@ export async function scrapeEdgecore(): Promise<void> {
|
||||
failedRequestHandler({ request }) {
|
||||
console.error(` ! Failed: ${request.url}`);
|
||||
},
|
||||
});
|
||||
}, makeCrawleeConfig("edgecore"));
|
||||
|
||||
await crawler.run([PRODUCT_LIST_URL]);
|
||||
console.log(`\n Created: ${created}, Updated: ${updated}\n`);
|
||||
|
||||
@ -14,6 +14,7 @@
|
||||
*/
|
||||
|
||||
import { CheerioCrawler } from "crawlee";
|
||||
import { makeCrawleeConfig } from "../utils/crawlee-config";
|
||||
import { extractMarketIntel } from "../crawler-llm/core";
|
||||
import { pool } from "../utils/db";
|
||||
|
||||
@ -140,7 +141,7 @@ export async function scrapeMarketIntelligence(): Promise<void> {
|
||||
async failedRequestHandler({ request }) {
|
||||
console.warn(`[market-intel] Failed: ${request.url}`);
|
||||
},
|
||||
});
|
||||
}, makeCrawleeConfig("market-intel"));
|
||||
|
||||
await crawler.addRequests(SOURCES.map((s) => ({ url: s.url })));
|
||||
await crawler.run();
|
||||
|
||||
@ -9,6 +9,7 @@
|
||||
* (JS lazy-loading) — static HTML has no product data.
|
||||
*/
|
||||
import { PlaywrightCrawler } from "crawlee";
|
||||
import { makeCrawleeConfig } from "../utils/crawlee-config";
|
||||
import { ensureVendor, upsertPriceObservation, findOrCreateScrapedTransceiver, pool } from "../utils/db";
|
||||
import { contentHash, parsePrice, parseStockLevel } from "../utils/hash";
|
||||
|
||||
@ -242,7 +243,7 @@ export async function scrapeOptcore(): Promise<void> {
|
||||
|
||||
pagesScraped++;
|
||||
},
|
||||
});
|
||||
}, makeCrawleeConfig("optcore"));
|
||||
|
||||
const urls = productMeta.map((p) => p.url);
|
||||
await crawler.run(urls);
|
||||
|
||||
@ -7,6 +7,7 @@
|
||||
* Source: https://www.ufispace.com/products/datacenter-switches
|
||||
*/
|
||||
import { CheerioCrawler } from "crawlee";
|
||||
import { makeCrawleeConfig } from "../utils/crawlee-config";
|
||||
import { pool, ensureWhiteboxVendor, findOrCreateSwitch } from "../utils/db";
|
||||
|
||||
const BASE_URL = "https://www.ufispace.com";
|
||||
@ -191,7 +192,7 @@ export async function scrapeUfiSpace(): Promise<void> {
|
||||
failedRequestHandler({ request }) {
|
||||
console.error(` ! Failed: ${request.url}`);
|
||||
},
|
||||
});
|
||||
}, makeCrawleeConfig("ufispace"));
|
||||
|
||||
await crawler.run(PRODUCT_URLS);
|
||||
console.log(`\n Created: ${created}, Updated: ${updated}\n`);
|
||||
|
||||
78
packages/scraper/src/utils/crawlee-config.ts
Normal file
78
packages/scraper/src/utils/crawlee-config.ts
Normal file
@ -0,0 +1,78 @@
|
||||
/**
|
||||
* Crawlee instance-level storage isolation.
|
||||
*
|
||||
* WHY THIS EXISTS:
|
||||
* ----------------
|
||||
* All pg-boss workers run concurrently inside a single Node.js process.
|
||||
* The old approach (setting process.env.CRAWLEE_STORAGE_DIR) is a global
|
||||
* env-var mutation — if two workers run simultaneously, one worker's
|
||||
* writeToEnv() can overwrite another's before the crawler reads it.
|
||||
*
|
||||
* The fix: pass a `Configuration` instance directly to each Crawlee
|
||||
* constructor. This is instance-level (not global), so concurrent scrapers
|
||||
* each get their own isolated storage directory.
|
||||
*
|
||||
* Usage:
|
||||
* import { makeCrawleeConfig } from "../utils/crawlee-config";
|
||||
*
|
||||
* // In PlaywrightCrawler:
|
||||
* const crawler = new PlaywrightCrawler({ ... }, makeCrawleeConfig("optcore"));
|
||||
*
|
||||
* // In CheerioCrawler:
|
||||
* const crawler = new CheerioCrawler({ ... }, makeCrawleeConfig("edgecore"));
|
||||
*
|
||||
* // With explicit RequestQueue (community-issues pattern):
|
||||
* const cfg = makeCrawleeConfig("community-issues");
|
||||
* const queue = await RequestQueue.open(null, { config: cfg });
|
||||
* const crawler = new CheerioCrawler({ requestQueue: queue, ... }, cfg);
|
||||
*/
|
||||
|
||||
import { Configuration } from "crawlee";
|
||||
import { join } from "node:path";
|
||||
import { mkdirSync, existsSync, writeFileSync } from "node:fs";
|
||||
|
||||
/** Absolute path to the per-scraper Crawlee storage root on disk. */
|
||||
export function crawleeStorageDir(scraperName: string): string {
|
||||
// dist layout: packages/scraper/dist/utils/ → go 4 levels up → repo root
|
||||
// Then store beside packages/ as storage-<name>/
|
||||
return join(__dirname, "..", "..", "..", "..", `storage-${scraperName}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and return a Crawlee Configuration with an isolated storageDir.
|
||||
*
|
||||
* Idempotent — safe to call every scraper run:
|
||||
* - Directories are created if they don't exist (recursive: true).
|
||||
* - SDK_SESSION_POOL_STATE.json is seeded once so Crawlee v3.16 doesn't
|
||||
* throw "Could not find file" on first run (this version reads before
|
||||
* writing on session-pool init).
|
||||
*/
|
||||
export function makeCrawleeConfig(scraperName: string): Configuration {
|
||||
const storageDir = crawleeStorageDir(scraperName);
|
||||
|
||||
// Pre-create internal directory tree
|
||||
mkdirSync(join(storageDir, "request_queues", "default"), { recursive: true });
|
||||
mkdirSync(join(storageDir, "datasets", "default"), { recursive: true });
|
||||
mkdirSync(join(storageDir, "key_value_stores", "default"), { recursive: true });
|
||||
|
||||
// Seed empty session-pool state to avoid "Could not find file" crash in v3.16
|
||||
const sessionFile = join(storageDir, "key_value_stores", "default", "SDK_SESSION_POOL_STATE.json");
|
||||
if (!existsSync(sessionFile)) {
|
||||
writeFileSync(sessionFile, JSON.stringify({
|
||||
usableSessionsCount: 0,
|
||||
retiredSessionsCount: 0,
|
||||
sessions: [],
|
||||
}));
|
||||
}
|
||||
|
||||
// `localDataDirectory` is the MemoryStorage option for the base storage path.
|
||||
// Pass it via `storageClientOptions` so the Configuration uses this path
|
||||
// instance-locally (not via global CRAWLEE_STORAGE_DIR env var).
|
||||
return new Configuration({
|
||||
storageClientOptions: {
|
||||
localDataDirectory: storageDir,
|
||||
persistStorage: true,
|
||||
},
|
||||
purgeOnStart: false, // Preserve session pool state between runs
|
||||
});
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user