/** * Import normalized Flexoptix shop catalog rows produced by Magatama/Pulso. * * Source file: * ../magatama/data/pulso/shop-products.normalized.jsonl * * Safe first run: * npm run flexoptix:catalog:import -- --dry-run * * Live DB import: * TIP_DB_PASS=... npm run flexoptix:catalog:import */ import { existsSync, readFileSync } from "fs"; import { resolve } from "path"; import { ensureVendor, findOrCreateScrapedTransceiver, pool, upsertPriceObservation, upsertStockObservation, } from "../packages/scraper/src/utils/db"; import { contentHash } from "../packages/scraper/src/utils/hash"; type CatalogProduct = { source: "flexoptix-shop-api"; fetchedAt: string; sku: string; title: string; url: string | null; price: { amount: number | null; currency: string | null; source: "api" | "missing"; fetchedAt: string; }; stock: { status: string | null; quantity: number | null; source: "api" | "missing"; }; optics: { formFactor: string | null; speedGbps: number | null; reachM: number | null; wavelengthNm: number | null; connector: string | null; fiberType: string | null; protocol: string | null; coding: string | null; bidi: boolean | null; dwdm: boolean | null; cwdm: boolean | null; }; compatibility: Array<{ vendor: string; platform: string | null; coding: string | null; source: "api"; }>; }; type CliArgs = { dryRun: boolean; input: string; }; const repoRoot = resolve(__dirname, ".."); const defaultInput = resolve(repoRoot, "..", "..", "magatama", "data", "pulso", "shop-products.normalized.jsonl"); function parseArgs(argv: string[]): CliArgs { const args: CliArgs = { dryRun: false, input: process.env.PULSO_SHOP_CATALOG_JSONL || defaultInput, }; for (let index = 0; index < argv.length; index += 1) { const value = argv[index]; if (value === "--") continue; if (value === "--dry-run") { args.dryRun = true; continue; } if (value === "--input") { args.input = resolve(argv[index + 1] || args.input); index += 1; continue; } if (value === "--help" || value === "-h") { console.log(`Usage: npm run flexoptix:catalog:import -- --dry-run npm run flexoptix:catalog:import -- --input /path/shop-products.normalized.jsonl Env: PULSO_SHOP_CATALOG_JSONL Optional input path override. TIP_DB_HOST/TIP_DB_PORT/TIP_DB_NAME/TIP_DB_USER/TIP_DB_PASS or PGPASSWORD for live DB import. `); process.exit(0); } throw new Error(`Unknown argument: ${value}`); } return args; } function readJsonl(filePath: string): CatalogProduct[] { if (!existsSync(filePath)) return []; return readFileSync(filePath, "utf8") .split("\n") .map(line => line.trim()) .filter(Boolean) .map(line => JSON.parse(line) as CatalogProduct); } function reachLabel(reachM: number | null): string | undefined { if (reachM === null) return undefined; if (reachM >= 1000 && reachM % 1000 === 0) return `${reachM / 1000}km`; return `${reachM}m`; } function speedLabel(speedGbps: number | null): string | undefined { if (speedGbps === null) return undefined; return `${speedGbps}G`; } function categoryFor(product: CatalogProduct): string { const text = `${product.title} ${product.optics.protocol || ""}`.toLowerCase(); if (/\bdac\b|direct attach|copper/.test(text)) return "DAC"; if (/\baoc\b|active optical/.test(text)) return "AOC"; if (/coherent|zr|dco/.test(text)) return "Coherent"; return "DataCenter"; } function canImportProduct(product: CatalogProduct): boolean { return Boolean( product.sku && product.title && product.optics.formFactor && product.optics.speedGbps !== null && product.optics.reachM !== null ); } async function importProduct(product: CatalogProduct, vendorId: string): Promise<{ priceWritten: boolean; stockWritten: boolean }> { const transceiverId = await findOrCreateScrapedTransceiver({ partNumber: product.sku, vendorId, productUrl: product.url || undefined, formFactor: product.optics.formFactor || undefined, speedGbps: product.optics.speedGbps ?? undefined, speed: speedLabel(product.optics.speedGbps), reachMeters: product.optics.reachM ?? undefined, reachLabel: reachLabel(product.optics.reachM), fiberType: product.optics.fiberType || undefined, wavelengths: product.optics.wavelengthNm === null ? undefined : `${product.optics.wavelengthNm}nm`, category: categoryFor(product), }); let priceWritten = false; if (product.price.amount !== null && product.price.currency) { priceWritten = await upsertPriceObservation({ transceiverId, sourceVendorId: vendorId, price: product.price.amount, currency: product.price.currency, stockLevel: product.stock.status || "unknown", quantityAvailable: product.stock.quantity ?? undefined, url: product.url || undefined, contentHash: contentHash({ source: product.source, sku: product.sku, price: product.price.amount, currency: product.price.currency, fetchedAt: product.price.fetchedAt, }), }); } const stockWritten = await upsertStockObservation({ transceiverId, sourceVendorId: vendorId, stockLevel: product.stock.status || "unknown", quantityAvailable: product.stock.quantity ?? undefined, priceNet: product.price.amount ?? undefined, productUrl: product.url || undefined, priceCurrency: product.price.currency || undefined, stockConfidence: product.stock.quantity === null ? 1 : 2, }); return { priceWritten, stockWritten }; } async function main(): Promise { const args = parseArgs(process.argv.slice(2)); const products = readJsonl(args.input); const importable = products.filter(canImportProduct); const skipped = products.length - importable.length; const priced = importable.filter(product => product.price.amount !== null && product.price.currency).length; const stocked = importable.filter(product => product.stock.status || product.stock.quantity !== null).length; console.log("Flexoptix normalized catalog import"); console.log(`Input: ${args.input}`); console.log(`Rows: ${products.length} | importable: ${importable.length} | skipped_missing_technical_fields: ${skipped}`); console.log(`With price: ${priced} | with stock signal: ${stocked}`); if (args.dryRun) { console.log("Dry-run only: no TIP database writes performed."); await pool.end(); return; } const vendorId = await ensureVendor("Flexoptix", "compatible", "https://www.flexoptix.net", "https://www.flexoptix.net"); let priceWrites = 0; let stockWrites = 0; try { for (const product of importable) { const result = await importProduct(product, vendorId); if (result.priceWritten) priceWrites += 1; if (result.stockWritten) stockWrites += 1; } } finally { await pool.end(); } console.log(`Import complete: ${importable.length} products processed, ${priceWrites} price observations, ${stockWrites} stock observations.`); } main().catch(async error => { console.error(error instanceof Error ? error.message : error); await pool.end().catch(() => undefined); process.exit(1); });