/** * OCR Pipeline: PDF/document → Docling → chunks → Ollama embed → Qdrant * * Connects to the Docling REST API (Erik port 8100) for document conversion, * then chunks markdown output and embeds into Qdrant collections. * * Collections: * - datasheet_chunks: Product datasheets (specs, diagrams, compliance) * - manual_chunks: Installation/configuration manuals * * Run: npx tsx packages/api/src/embeddings/ocr-pipeline.ts [--url ] [--dir ] */ import { pool } from "../db/client"; import { embed, upsertPoints, CollectionName } from "./client"; import { randomUUID } from "crypto"; const DOCLING_URL = process.env.DOCLING_URL || "http://localhost:8100"; interface DoclingResult { success: boolean; content: string; format: string; pages: number | null; error?: string; } interface DocumentChunk { id: string; vector: number[]; payload: { document_id: string; source_url: string; document_type: "datasheet" | "manual" | "whitepaper"; chunk_index: number; total_chunks: number; title: string; section_heading: string; text: string; page_estimate: number | null; vendor: string; product_slug: string; }; } /** Convert a document via Docling API */ async function convertDocument(url: string, format: "markdown" | "json" = "markdown"): Promise { const resp = await fetch(`${DOCLING_URL}/convert`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ url, format }), signal: AbortSignal.timeout(120000), // 2 min for large PDFs }); if (!resp.ok) { throw new Error(`Docling convert failed: ${resp.status} ${await resp.text()}`); } return resp.json() as Promise; } /** * Chunk markdown into overlapping sections. * * Strategy: * 1. Split by ## headings first (natural section boundaries) * 2. If a section exceeds maxChunkSize, split by paragraphs * 3. Apply overlap (repeat last N chars of previous chunk) */ function chunkMarkdown( markdown: string, maxChunkSize: number = 1500, overlapSize: number = 200, ): Array<{ heading: string; text: string }> { const sections = markdown.split(/(?=^#{1,3}\s)/m); const chunks: Array<{ heading: string; text: string }> = []; for (const section of sections) { const trimmed = section.trim(); if (!trimmed || trimmed.length < 20) continue; // Extract heading const headingMatch = trimmed.match(/^(#{1,3})\s+(.+)/); const heading = headingMatch ? headingMatch[2].trim() : "Introduction"; const body = headingMatch ? trimmed.slice(headingMatch[0].length).trim() : trimmed; if (body.length <= maxChunkSize) { chunks.push({ heading, text: body }); } else { // Split large sections by paragraphs const paragraphs = body.split(/\n\n+/); let currentChunk = ""; for (const para of paragraphs) { if (currentChunk.length + para.length > maxChunkSize && currentChunk.length > 0) { chunks.push({ heading, text: currentChunk.trim() }); // Overlap: keep tail of previous chunk const overlapText = currentChunk.slice(-overlapSize); currentChunk = overlapText + "\n\n" + para; } else { currentChunk += (currentChunk ? "\n\n" : "") + para; } } if (currentChunk.trim().length > 20) { chunks.push({ heading, text: currentChunk.trim() }); } } } return chunks; } /** Classify document type from URL or content */ function classifyDocument(url: string, content: string): "datasheet" | "manual" | "whitepaper" { const urlLower = url.toLowerCase(); const contentLower = content.slice(0, 2000).toLowerCase(); if (urlLower.includes("datasheet") || contentLower.includes("datasheet") || contentLower.includes("specifications")) { return "datasheet"; } if (urlLower.includes("manual") || urlLower.includes("install") || contentLower.includes("installation guide") || contentLower.includes("user manual")) { return "manual"; } return "whitepaper"; } /** Extract vendor name from URL or content */ function extractVendor(url: string): string { const urlLower = url.toLowerCase(); const vendorPatterns: Array<[RegExp, string]> = [ [/flexoptix/i, "Flexoptix"], [/cisco/i, "Cisco"], [/juniper/i, "Juniper"], [/arista/i, "Arista"], [/nokia/i, "Nokia"], [/huawei/i, "Huawei"], [/finisar|ii-vi|coherent/i, "II-VI/Coherent"], [/innolight/i, "Innolight"], [/broadcom/i, "Broadcom"], [/intel/i, "Intel"], [/fs\.com|fiberstore/i, "FS.com"], [/10gtek/i, "10Gtek"], ]; for (const [pattern, name] of vendorPatterns) { if (pattern.test(urlLower)) return name; } return "Unknown"; } /** Extract product slug from URL */ function extractProductSlug(url: string): string { const filename = url.split("/").pop() || ""; return filename.replace(/\.(pdf|docx|doc|xlsx)$/i, "").replace(/[^a-zA-Z0-9-]/g, "-").toLowerCase(); } /** Process a single document: convert → chunk → embed → store */ async function processDocument( url: string, collection: CollectionName = "datasheet_chunks", title?: string, ): Promise<{ documentId: string; chunksStored: number }> { const documentId = randomUUID(); console.log(` Converting: ${url}`); const result = await convertDocument(url); if (!result.success) { throw new Error(`Conversion failed: ${result.error}`); } const markdown = result.content; console.log(` Converted: ${result.pages ?? "?"} pages, ${markdown.length} chars`); const docType = classifyDocument(url, markdown); const vendor = extractVendor(url); const productSlug = extractProductSlug(url); const docTitle = title || productSlug.replace(/-/g, " "); // Chunk the markdown const chunks = chunkMarkdown(markdown); console.log(` Chunked: ${chunks.length} chunks (type: ${docType})`); if (chunks.length === 0) { console.log(" Warning: No chunks produced, skipping"); return { documentId, chunksStored: 0 }; } // Embed and store in batches const BATCH_SIZE = 5; let stored = 0; for (let i = 0; i < chunks.length; i += BATCH_SIZE) { const batch = chunks.slice(i, i + BATCH_SIZE); const points: DocumentChunk[] = await Promise.all( batch.map(async (chunk, idx) => { const chunkIndex = i + idx; const embeddingText = `${docTitle}. ${chunk.heading}. ${chunk.text}`; const vector = await embed(embeddingText); return { id: randomUUID(), vector, payload: { document_id: documentId, source_url: url, document_type: docType, chunk_index: chunkIndex, total_chunks: chunks.length, title: docTitle, section_heading: chunk.heading, text: chunk.text, page_estimate: result.pages, vendor, product_slug: productSlug, }, }; }), ); await upsertPoints(collection, points); stored += points.length; console.log(` Embedded ${stored}/${chunks.length} chunks`); } // Record in documents table try { await pool.query( `INSERT INTO documents (id, entity_type, doc_type, title, r2_key, source_url, page_count, chunks_count, ocr_status, processed_at) VALUES ($1, 'transceiver', $2, $3, $4, $5, $6, $7, 'completed', NOW()) ON CONFLICT ON CONSTRAINT documents_pkey DO UPDATE SET processed_at = NOW(), chunks_count = $7, ocr_status = 'completed'`, [documentId, docType, docTitle, `ocr/${documentId}`, url, result.pages, chunks.length], ); } catch { // ignore if insert fails } return { documentId, chunksStored: stored }; } /** Known datasheet URLs to seed from */ const SEED_DATASHEETS: Array<{ url: string; title: string; collection: CollectionName }> = [ // Flexoptix product guides { url: "https://www.flexoptix.net/media/pdf/flexoptix-sfp-compatibility-guide.pdf", title: "Flexoptix SFP Compatibility Guide", collection: "datasheet_chunks", }, // IEEE standards (publicly available) { url: "https://standards.ieee.org/content/dam/ieee-standards/standards/web/download/802.3-2022_downloads/802.3-2022.pdf", title: "IEEE 802.3 Ethernet Standard", collection: "manual_chunks", }, ]; async function main() { const args = process.argv.slice(2); console.log("=== OCR Pipeline: Document → Chunks → Embeddings ===\n"); // Check Docling health try { const healthResp = await fetch(`${DOCLING_URL}/health`, { signal: AbortSignal.timeout(5000) }); const health = await healthResp.json() as { status: string }; console.log(`Docling API: ${health.status} at ${DOCLING_URL}`); } catch (err) { console.error(`Docling API not reachable at ${DOCLING_URL}: ${(err as Error).message}`); console.error("Set DOCLING_URL env var or start Docling on Erik (port 8100)"); process.exit(1); } let totalDocs = 0; let totalChunks = 0; if (args.includes("--url")) { // Process a single URL const urlIdx = args.indexOf("--url") + 1; const url = args[urlIdx]; const title = args.includes("--title") ? args[args.indexOf("--title") + 1] : undefined; const collection = (args.includes("--collection") ? args[args.indexOf("--collection") + 1] : "datasheet_chunks") as CollectionName; if (!url) { console.error("Usage: --url [--title ] [--collection <name>]"); process.exit(1); } const result = await processDocument(url, collection, title); totalDocs = 1; totalChunks = result.chunksStored; } else if (args.includes("--dir")) { // Process all PDFs in a directory const dirIdx = args.indexOf("--dir") + 1; const dir = args[dirIdx]; const { readdirSync } = await import("fs"); const files = readdirSync(dir).filter((f) => f.toLowerCase().endsWith(".pdf")); console.log(`Found ${files.length} PDFs in ${dir}\n`); for (const file of files) { const filePath = `${dir}/${file}`; try { const result = await processDocument(filePath, "datasheet_chunks"); totalDocs++; totalChunks += result.chunksStored; } catch (err) { console.error(` Failed: ${file} — ${(err as Error).message}`); } } } else { // Seed from known URLs console.log(`Processing ${SEED_DATASHEETS.length} seed documents\n`); for (const doc of SEED_DATASHEETS) { try { console.log(`\n[${doc.title}]`); const result = await processDocument(doc.url, doc.collection, doc.title); totalDocs++; totalChunks += result.chunksStored; } catch (err) { console.error(` Failed: ${doc.title} — ${(err as Error).message}`); } } } console.log(`\n=== Done: ${totalDocs} documents, ${totalChunks} chunks embedded ===`); await pool.end(); } main().catch((err) => { console.error("Fatal:", err); pool.end(); process.exit(1); });