Docling-powered OCR pipeline: PDF → markdown → chunks → Ollama embed → Qdrant. News embedding seeder for news_embeddings collection. Document and news semantic search API endpoints. - embeddings/ocr-pipeline.ts: Docling convert → chunk → embed pipeline - embeddings/seed-news.ts: Batch embed news_articles into Qdrant - routes/documents.ts: POST /api/documents/process, GET /api/documents - routes/search.ts: GET /search/documents, GET /search/news endpoints - sql/005-documents.sql: Add chunks_count, processed_at to documents table - Ollama + nomic-embed-text installed on Erik (CPU mode) - 89 products + 40 datasheet chunks + 33 news articles in Qdrant
218 lines
7.1 KiB
TypeScript
218 lines
7.1 KiB
TypeScript
/**
|
|
* Document processing API routes (OCR Pipeline)
|
|
*
|
|
* POST /api/documents/process — Submit a document URL for OCR + embedding
|
|
* GET /api/documents — List processed documents
|
|
* GET /api/documents/:id — Get document chunks
|
|
*/
|
|
import { Router, Request, Response } from "express";
|
|
import { embed, upsertPoints, CollectionName } from "../embeddings/client";
|
|
import { pool } from "../db/client";
|
|
import { randomUUID } from "crypto";
|
|
|
|
export const documentRouter = Router();
|
|
|
|
const DOCLING_URL = process.env.DOCLING_URL || "http://localhost:8100";
|
|
|
|
interface DoclingResult {
|
|
success: boolean;
|
|
content: string;
|
|
format: string;
|
|
pages: number | null;
|
|
error?: string;
|
|
}
|
|
|
|
/** Chunk markdown into overlapping sections */
|
|
function chunkMarkdown(
|
|
markdown: string,
|
|
maxChunkSize: number = 1500,
|
|
overlapSize: number = 200,
|
|
): Array<{ heading: string; text: string }> {
|
|
const sections = markdown.split(/(?=^#{1,3}\s)/m);
|
|
const chunks: Array<{ heading: string; text: string }> = [];
|
|
|
|
for (const section of sections) {
|
|
const trimmed = section.trim();
|
|
if (!trimmed || trimmed.length < 20) continue;
|
|
|
|
const headingMatch = trimmed.match(/^(#{1,3})\s+(.+)/);
|
|
const heading = headingMatch ? headingMatch[2].trim() : "Introduction";
|
|
const body = headingMatch ? trimmed.slice(headingMatch[0].length).trim() : trimmed;
|
|
|
|
if (body.length <= maxChunkSize) {
|
|
chunks.push({ heading, text: body });
|
|
} else {
|
|
const paragraphs = body.split(/\n\n+/);
|
|
let currentChunk = "";
|
|
|
|
for (const para of paragraphs) {
|
|
if (currentChunk.length + para.length > maxChunkSize && currentChunk.length > 0) {
|
|
chunks.push({ heading, text: currentChunk.trim() });
|
|
const overlapText = currentChunk.slice(-overlapSize);
|
|
currentChunk = overlapText + "\n\n" + para;
|
|
} else {
|
|
currentChunk += (currentChunk ? "\n\n" : "") + para;
|
|
}
|
|
}
|
|
|
|
if (currentChunk.trim().length > 20) {
|
|
chunks.push({ heading, text: currentChunk.trim() });
|
|
}
|
|
}
|
|
}
|
|
|
|
return chunks;
|
|
}
|
|
|
|
// POST /api/documents/process — Process a document URL
|
|
documentRouter.post("/process", async (req: Request, res: Response) => {
|
|
const { url, title, doc_type, vendor, collection } = req.body as {
|
|
url?: string;
|
|
title?: string;
|
|
doc_type?: string;
|
|
vendor?: string;
|
|
collection?: string;
|
|
};
|
|
|
|
if (!url) {
|
|
res.status(400).json({ success: false, error: "Missing 'url' in request body" });
|
|
return;
|
|
}
|
|
|
|
const targetCollection = (collection || "datasheet_chunks") as CollectionName;
|
|
if (!["datasheet_chunks", "manual_chunks"].includes(targetCollection)) {
|
|
res.status(400).json({ success: false, error: "collection must be 'datasheet_chunks' or 'manual_chunks'" });
|
|
return;
|
|
}
|
|
|
|
try {
|
|
// Convert via Docling
|
|
const docResp = await fetch(`${DOCLING_URL}/convert`, {
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify({ url, format: "markdown" }),
|
|
signal: AbortSignal.timeout(120000),
|
|
});
|
|
|
|
if (!docResp.ok) {
|
|
res.status(502).json({ success: false, error: "Docling conversion failed", detail: await docResp.text() });
|
|
return;
|
|
}
|
|
|
|
const docResult = (await docResp.json()) as DoclingResult;
|
|
if (!docResult.success) {
|
|
res.status(502).json({ success: false, error: "Docling conversion failed", detail: docResult.error });
|
|
return;
|
|
}
|
|
|
|
const documentId = randomUUID();
|
|
const docTitle = title || url.split("/").pop()?.replace(/\.[^.]+$/, "") || "untitled";
|
|
const docType = doc_type || "datasheet";
|
|
const docVendor = vendor || "Unknown";
|
|
|
|
// Chunk
|
|
const chunks = chunkMarkdown(docResult.content);
|
|
|
|
// Embed and store
|
|
const BATCH_SIZE = 5;
|
|
let stored = 0;
|
|
|
|
for (let i = 0; i < chunks.length; i += BATCH_SIZE) {
|
|
const batch = chunks.slice(i, i + BATCH_SIZE);
|
|
const points = await Promise.all(
|
|
batch.map(async (chunk, idx) => {
|
|
const chunkIndex = i + idx;
|
|
const embeddingText = `${docTitle}. ${chunk.heading}. ${chunk.text}`;
|
|
const vector = await embed(embeddingText);
|
|
return {
|
|
id: randomUUID(),
|
|
vector,
|
|
payload: {
|
|
document_id: documentId,
|
|
source_url: url,
|
|
document_type: docType,
|
|
chunk_index: chunkIndex,
|
|
total_chunks: chunks.length,
|
|
title: docTitle,
|
|
section_heading: chunk.heading,
|
|
text: chunk.text,
|
|
page_estimate: docResult.pages,
|
|
vendor: docVendor,
|
|
product_slug: docTitle.replace(/\s+/g, "-").toLowerCase(),
|
|
},
|
|
};
|
|
}),
|
|
);
|
|
|
|
await upsertPoints(targetCollection, points);
|
|
stored += points.length;
|
|
}
|
|
|
|
// Record in documents table (existing schema)
|
|
try {
|
|
await pool.query(
|
|
`INSERT INTO documents (id, entity_type, doc_type, title, r2_key, source_url, page_count, chunks_count, ocr_status, ocr_text, processed_at)
|
|
VALUES ($1, 'transceiver', $2, $3, $4, $5, $6, $7, 'completed', $8, NOW())
|
|
ON CONFLICT ON CONSTRAINT documents_pkey DO UPDATE
|
|
SET processed_at = NOW(), chunks_count = $7, ocr_status = 'completed'`,
|
|
[documentId, docType, docTitle, `ocr/${documentId}`, url, docResult.pages, chunks.length, docResult.content.slice(0, 50000)],
|
|
);
|
|
} catch {
|
|
// ignore if insert fails
|
|
}
|
|
|
|
res.json({
|
|
success: true,
|
|
document_id: documentId,
|
|
title: docTitle,
|
|
pages: docResult.pages,
|
|
chunks: chunks.length,
|
|
collection: targetCollection,
|
|
markdown_length: docResult.content.length,
|
|
});
|
|
} catch (err) {
|
|
res.status(503).json({
|
|
success: false,
|
|
error: "Document processing failed",
|
|
detail: (err as Error).message,
|
|
});
|
|
}
|
|
});
|
|
|
|
// GET /api/documents — List processed documents
|
|
documentRouter.get("/", async (_req: Request, res: Response) => {
|
|
try {
|
|
const result = await pool.query(
|
|
`SELECT id, title, source_url, doc_type, entity_type, page_count, chunks_count, ocr_status, processed_at, created_at
|
|
FROM documents
|
|
ORDER BY COALESCE(processed_at, created_at) DESC
|
|
LIMIT 100`,
|
|
);
|
|
|
|
res.json({ success: true, documents: result.rows, count: result.rows.length });
|
|
} catch {
|
|
// Table may not exist — return empty
|
|
res.json({ success: true, documents: [], count: 0, note: "documents table not yet created" });
|
|
}
|
|
});
|
|
|
|
// GET /api/documents/:id — Get document details
|
|
documentRouter.get("/:id", async (req: Request, res: Response) => {
|
|
try {
|
|
const result = await pool.query(
|
|
`SELECT id, title, source_url, doc_type, entity_type, page_count, chunks_count, ocr_status, processed_at, created_at
|
|
FROM documents WHERE id = $1::uuid`,
|
|
[req.params.id],
|
|
);
|
|
|
|
if (result.rows.length === 0) {
|
|
res.status(404).json({ success: false, error: "Document not found" });
|
|
return;
|
|
}
|
|
|
|
res.json({ success: true, document: result.rows[0] });
|
|
} catch {
|
|
res.status(404).json({ success: false, error: "Document not found or table not created" });
|
|
}
|
|
});
|