/** * Verification Robot Command Center * * Read-only status: * tsx src/robots/verification-robots.ts --status * * Enqueue targeted crawler waves: * tsx src/robots/verification-robots.ts --enqueue=details-fast-lane --profile=erik-safe * tsx src/robots/verification-robots.ts --enqueue=priority-vendors --profile=pi-fetch * tsx src/robots/verification-robots.ts --enqueue=all --profile=proxmox-heavy * * Dry run: * tsx src/robots/verification-robots.ts --enqueue=all --dry-run * * TIPLLM planning, no crawl load: * tsx src/robots/verification-robots.ts --tipllm-plan --limit=5 */ import { config } from "dotenv"; import { join } from "path"; import PgBoss from "pg-boss"; import { pool } from "../utils/db"; import { writeRobotExperience } from "../crawler-llm/training-data-writer"; config({ path: join(__dirname, "..", "..", "..", "..", ".env") }); type RobotWave = "details-fast-lane" | "priority-vendors" | "all"; type RobotProfile = "erik-safe" | "pi-fetch" | "proxmox-heavy"; interface VendorBlockerRow { vendor: string; total: string; missing_image: string; missing_details: string; missing_price: string; near_full_missing_details: string; near_full_missing_image: string; not_fully_verified: string; } interface TipLlmResearchPlanResponse { success?: boolean; data?: { search_patterns?: string[]; extraction_fields?: Array<{ field: string; selector_or_pattern: string; example?: string }>; relevance_rules?: string[]; confidence?: number; summary?: string; }; error?: string; } const connectionString = `postgres://${process.env.POSTGRES_USER || "tip"}:${process.env.POSTGRES_PASSWORD || "tip_dev_2026"}@${process.env.POSTGRES_HOST || "localhost"}:${process.env.POSTGRES_PORT || "5433"}/${process.env.POSTGRES_DB || "transceiver_db"}`; const TIP_API_URL = process.env.TIP_API_URL || "http://127.0.0.1:3201"; const TIP_API_TOKEN = process.env.TIP_API_TOKEN || process.env.TIP_TOKEN || ""; const DETAILS_FAST_LANE_QUEUES = [ "scrape:pricing:flexoptix", "scrape:pricing:fibermall", "scrape:pricing:atgbics", "scrape:pricing:fiber24", "scrape:pricing:qsfptek", "scrape:pricing:naddod", "scrape:pricing:gaotek", "maintenance:reconcile-verification", ]; const VENDOR_QUEUE_MAP: Array<{ match: RegExp; queues: string[] }> = [ { match: /juniper/i, queues: ["scrape:catalog:juniper-oem", "scrape:catalog:juniper-mx-oem", "scrape:catalog:juniper-qfx-oem", "scrape:compat:juniper", "discover:vendor:juniper"] }, { match: /cisco/i, queues: ["scrape:catalog:cisco-nexus-oem", "scrape:catalog:cisco-catalyst-oem", "scrape:catalog:cisco-asr-oem", "scrape:compat:cisco", "discover:vendor:cisco-tmg"] }, { match: /fs\.?com/i, queues: ["scrape:pricing:fs", "discover:vendor:fs-com"] }, { match: /gao/i, queues: ["scrape:pricing:gaotek"] }, { match: /ascent/i, queues: ["scrape:pricing:ascentoptics"] }, { match: /eoptolink/i, queues: ["scrape:catalog:eoptolink"] }, { match: /atgbics/i, queues: ["scrape:pricing:atgbics"] }, { match: /naddod/i, queues: ["scrape:pricing:naddod"] }, { match: /10gtek/i, queues: ["scrape:pricing:10gtek"] }, { match: /qsfptek/i, queues: ["scrape:pricing:qsfptek"] }, { match: /shopfiber24|fiber24/i, queues: ["scrape:pricing:fiber24"] }, { match: /flexoptix/i, queues: ["scrape:pricing:flexoptix", "scrape:vendors:flexoptix-supported"] }, { match: /fibermall/i, queues: ["scrape:pricing:fibermall"] }, { match: /nokia/i, queues: ["scrape:catalog:nokia-oem", "scrape:catalog:nokia-access-oem", "discover:vendor:nokia"] }, { match: /arista/i, queues: ["scrape:catalog:arista-oem", "scrape:catalog:arista-7000-oem", "discover:vendor:arista"] }, { match: /ciena/i, queues: ["scrape:catalog:ciena-oem", "scrape:catalog:ciena-waveserver-oem"] }, ]; const HEAVY_QUEUES = new Set([ "scrape:pricing:fs", "scrape:pricing:10gtek", "scrape:pricing:prolabs", "scrape:compat:cisco", "scrape:compat:juniper", "scrape:compat:ufispace", "scrape:compat:edgecore", ]); const ERIK_SAFE_QUEUES = new Set([ "scrape:pricing:flexoptix", "scrape:pricing:fibermall", "scrape:pricing:atgbics", "scrape:pricing:fiber24", "scrape:pricing:qsfptek", "scrape:pricing:naddod", "scrape:pricing:gaotek", "maintenance:reconcile-verification", ]); function isDiscoveryQueue(queue: string): boolean { return queue.startsWith("discover:"); } function queuesForProfile(queues: string[], profile: RobotProfile): string[] { if (profile === "proxmox-heavy") return queues; if (profile === "erik-safe") { return queues.filter((queue) => ERIK_SAFE_QUEUES.has(queue)); } return queues.filter((queue) => !HEAVY_QUEUES.has(queue) && !isDiscoveryQueue(queue)); } function defaultMaxQueues(profile: RobotProfile): number { if (profile === "erik-safe") return 3; if (profile === "pi-fetch") return 10; return 30; } function toInt(value: string | number | null | undefined): number { return Number.parseInt(String(value ?? "0"), 10) || 0; } function unique(items: string[]): string[] { return [...new Set(items)]; } async function createBoss(): Promise { const boss = new PgBoss({ connectionString, retryLimit: 1, retryDelay: 30, expireInSeconds: 7200, monitorStateIntervalSeconds: 60, }); boss.on("error", (err) => console.error("[verification-robots] pg-boss:", err.message)); await boss.start(); return boss; } export async function getVerificationStatus(): Promise<{ summary: Record; vendors: VendorBlockerRow[] }> { const summaryResult = await pool.query(` SELECT COUNT(*)::text AS total, COUNT(*) FILTER (WHERE NOT price_verified)::text AS missing_price, COUNT(*) FILTER (WHERE NOT image_verified)::text AS missing_image, COUNT(*) FILTER (WHERE NOT details_verified)::text AS missing_details, COUNT(*) FILTER (WHERE price_verified AND image_verified AND NOT details_verified)::text AS near_full_missing_details, COUNT(*) FILTER (WHERE price_verified AND details_verified AND NOT image_verified)::text AS near_full_missing_image, COUNT(*) FILTER (WHERE image_verified AND details_verified AND NOT price_verified)::text AS near_full_missing_price, COUNT(*) FILTER (WHERE NOT fully_verified)::text AS not_fully_verified FROM transceivers `); const vendorResult = await pool.query(` SELECT COALESCE(v.name, 'unknown') AS vendor, COUNT(*)::text AS total, COUNT(*) FILTER (WHERE NOT t.image_verified)::text AS missing_image, COUNT(*) FILTER (WHERE NOT t.details_verified)::text AS missing_details, COUNT(*) FILTER (WHERE NOT t.price_verified)::text AS missing_price, COUNT(*) FILTER (WHERE t.price_verified AND t.image_verified AND NOT t.details_verified)::text AS near_full_missing_details, COUNT(*) FILTER (WHERE t.price_verified AND t.details_verified AND NOT t.image_verified)::text AS near_full_missing_image, COUNT(*) FILTER (WHERE NOT t.fully_verified)::text AS not_fully_verified FROM transceivers t LEFT JOIN vendors v ON v.id = t.vendor_id GROUP BY v.name HAVING COUNT(*) FILTER (WHERE NOT t.fully_verified) > 0 ORDER BY COUNT(*) FILTER (WHERE t.price_verified AND t.image_verified AND NOT t.details_verified) DESC, COUNT(*) FILTER (WHERE NOT t.fully_verified) DESC LIMIT 30 `); return { summary: summaryResult.rows[0] ?? {}, vendors: vendorResult.rows, }; } function queuesForPriorityVendors(vendors: VendorBlockerRow[]): string[] { const queues: string[] = []; for (const row of vendors.slice(0, 20)) { for (const entry of VENDOR_QUEUE_MAP) { if (entry.match.test(row.vendor)) { queues.push(...entry.queues); } } } queues.push("maintenance:reconcile-verification"); return unique(queues); } function queuesForWave(wave: RobotWave, vendors: VendorBlockerRow[]): string[] { if (wave === "details-fast-lane") return DETAILS_FAST_LANE_QUEUES; if (wave === "priority-vendors") return queuesForPriorityVendors(vendors); return unique([...DETAILS_FAST_LANE_QUEUES, ...queuesForPriorityVendors(vendors)]); } export async function enqueueRobotWave( wave: RobotWave, options: { dryRun?: boolean; profile?: RobotProfile; maxQueues?: number } = {}, ): Promise { const profile = options.profile ?? "erik-safe"; const dryRun = options.dryRun ?? false; const status = await getVerificationStatus(); const rawQueues = queuesForWave(wave, status.vendors); const maxQueues = options.maxQueues ?? defaultMaxQueues(profile); const queues = queuesForProfile(rawQueues, profile).slice(0, maxQueues); const runId = `verification-${wave}-${new Date().toISOString()}`; console.log(`\nRobot wave: ${wave}`); console.log(`Profile: ${profile}`); console.log(`Run ID: ${runId}`); console.log(`Queues: ${queues.length} selected from ${rawQueues.length} candidates`); for (const q of queues) console.log(` - ${q}`); if (queues.length === 0) { writeRobotExperience({ event: "queue_rejected", observed_at: new Date().toISOString(), actor: "verification-robots", profile, wave, summary: "Robot queue wave rejected because the selected profile had no safe eligible queues.", input: { wave, profile, raw_queues: rawQueues, max_queues: maxQueues }, decision: { selected_queues: [], reason: "profile_filter_removed_all_candidates" }, safety_notes: ["No jobs enqueued.", "This protects Erik from accidental heavy crawler work."], }, { flush: true }); console.log("\nNo queues selected for this profile."); return queues; } if (dryRun) { writeRobotExperience({ event: "queue_plan", observed_at: new Date().toISOString(), actor: "verification-robots", profile, wave, summary: "Dry-run queue plan generated for TIPLLM learning without starting crawler jobs.", input: { wave, profile, raw_queues: rawQueues, max_queues: maxQueues }, decision: { selected_queues: queues, dry_run: true }, safety_notes: ["Dry run only.", "No pg-boss jobs were enqueued."], }, { flush: true }); console.log("\nDry run: no jobs enqueued."); return queues; } const boss = await createBoss(); try { for (const queue of queues) { await boss.createQueue(queue).catch(() => {}); await (boss as unknown as { send: (name: string, data?: object, options?: object) => Promise }).send( queue, { source: "verification-robots", wave, run_id: runId, enqueued_at: new Date().toISOString() }, { retryLimit: 1, expireInSeconds: 7200 }, ); } } finally { await boss.stop(); } console.log("\nJobs enqueued."); writeRobotExperience({ event: "queue_enqueued", observed_at: new Date().toISOString(), actor: "verification-robots", profile, wave, summary: "Limited verification robot queue wave enqueued.", input: { wave, profile, raw_queues: rawQueues, max_queues: maxQueues }, decision: { selected_queues: queues, run_id: runId }, outcome: { enqueued_count: queues.length }, safety_notes: [ profile === "erik-safe" ? "Erik-safe profile excludes Playwright and discovery queues." : "Profile selected explicitly.", "Queue count is capped.", ], }, { flush: true }); return queues; } async function fetchTipLlmPlan(vendor: string, product: string, context: string): Promise { const resp = await fetch(`${TIP_API_URL}/api/tip-llm/research-plan`, { method: "POST", headers: { "Content-Type": "application/json", ...(TIP_API_TOKEN ? { Authorization: `Bearer ${TIP_API_TOKEN}` } : {}), }, body: JSON.stringify({ vendor, product, context }), signal: AbortSignal.timeout(45000), }); const data = await resp.json().catch(() => ({})) as TipLlmResearchPlanResponse; if (!resp.ok) { return { success: false, error: data.error || `HTTP ${resp.status}` }; } return data; } export async function planWithTipLlm(limit: number): Promise { const status = await getVerificationStatus(); const targets = status.vendors .filter((row) => toInt(row.not_fully_verified) > 0) .slice(0, Math.max(1, Math.min(10, limit))); console.log(`\n=== TIPLLM Verification Planning (${targets.length} vendor targets) ===\n`); console.log("No crawler jobs are started in this mode.\n"); for (const row of targets) { const product = "missing transceiver image/details verification"; const context = `TIP verification blocker: vendor=${row.vendor}, ` + `missing_image=${row.missing_image}, missing_details=${row.missing_details}, ` + `missing_price=${row.missing_price}, near_full_missing_details=${row.near_full_missing_details}, ` + `near_full_missing_image=${row.near_full_missing_image}. ` + "Return safe search patterns, extraction fields, and relevance rules for targeted crawler work."; console.log(`Vendor: ${row.vendor}`); const plan = await fetchTipLlmPlan(row.vendor, product, context); if (!plan.success || !plan.data) { writeRobotExperience({ event: "tipllm_plan", observed_at: new Date().toISOString(), actor: "verification-robots", vendor: row.vendor, summary: "TIPLLM planning failed or returned no usable data.", input: { vendor: row.vendor, product, context }, decision: { usable_plan: false, error: plan.error || "unknown error" }, safety_notes: ["No crawler jobs started from failed TIPLLM plan."], }); console.log(` TIPLLM plan failed: ${plan.error || "unknown error"}\n`); continue; } const patterns = plan.data.search_patterns ?? []; const fields = plan.data.extraction_fields ?? []; const rules = plan.data.relevance_rules ?? []; console.log(` Summary: ${plan.data.summary || "(none)"}`); console.log(` Search patterns: ${patterns.slice(0, 4).join(" | ") || "(none)"}`); console.log(` Extraction fields: ${fields.map((f) => f.field).slice(0, 8).join(", ") || "(none)"}`); console.log(` Rules: ${rules.slice(0, 3).join(" | ") || "(none)"}`); console.log(""); writeRobotExperience({ event: "tipllm_plan", observed_at: new Date().toISOString(), actor: "verification-robots", vendor: row.vendor, summary: plan.data.summary || "TIPLLM produced a crawler research plan.", input: { vendor: row.vendor, product, context }, decision: { usable_plan: true, search_patterns: patterns, extraction_fields: fields, relevance_rules: rules, confidence: plan.data.confidence ?? null, }, safety_notes: [ "TIPLLM planning mode starts no crawler jobs.", "Plans must be executed through capped profiles.", ], }); } writeRobotExperience({ event: "tipllm_plan", observed_at: new Date().toISOString(), actor: "verification-robots", summary: "TIPLLM planning batch completed.", input: { requested_limit: limit, planned_vendors: targets.map((row) => row.vendor) }, decision: { next_safe_step: "Review plans, then enqueue a small erik-safe dry run or dispatch heavy work to Proxmox/Pi profiles." }, safety_notes: ["No crawler jobs were started by the planning batch."], }, { flush: true }); } function printStatus(summary: Record, vendors: VendorBlockerRow[]): void { console.log("\n=== TIP Verification Robot Status ===\n"); console.log(`Total products: ${summary.total ?? "0"}`); console.log(`Not fully verified: ${summary.not_fully_verified ?? "0"}`); console.log(`Missing price: ${summary.missing_price ?? "0"}`); console.log(`Missing image: ${summary.missing_image ?? "0"}`); console.log(`Missing details: ${summary.missing_details ?? "0"}`); console.log(`Near-full, missing details: ${summary.near_full_missing_details ?? "0"}`); console.log(`Near-full, missing image: ${summary.near_full_missing_image ?? "0"}`); console.log(`Near-full, missing price: ${summary.near_full_missing_price ?? "0"}`); console.log("\nTop vendor blockers:"); for (const row of vendors.slice(0, 15)) { const nearDetails = toInt(row.near_full_missing_details); const nearImage = toInt(row.near_full_missing_image); console.log( ` ${row.vendor.padEnd(22)} not_full=${row.not_fully_verified.padStart(4)} ` + `missing_img=${row.missing_image.padStart(4)} missing_details=${row.missing_details.padStart(4)} ` + `near_details=${String(nearDetails).padStart(4)} near_image=${String(nearImage).padStart(4)}`, ); } } function recordStatusExperience(summary: Record, vendors: VendorBlockerRow[]): void { writeRobotExperience({ event: "status", observed_at: new Date().toISOString(), actor: "verification-robots", summary: "Verification robot status snapshot captured for TIPLLM learning.", input: { total: summary.total, not_fully_verified: summary.not_fully_verified, missing_price: summary.missing_price, missing_image: summary.missing_image, missing_details: summary.missing_details, near_full_missing_details: summary.near_full_missing_details, near_full_missing_image: summary.near_full_missing_image, top_vendors: vendors.slice(0, 10), }, decision: { recommended_sequence: [ "Use TIPLLM planning for top vendor blockers before new crawler work.", "Run details-fast-lane first because near-full missing-details products convert fastest.", "Keep Erik on erik-safe profile; send Playwright/discovery work to Proxmox or Pi workers.", ], }, safety_notes: [ "Status collection is read-only.", "No crawler jobs are started by status collection.", ], }); } async function main(): Promise { const args = process.argv.slice(2); const enqueueArg = args.find((a) => a.startsWith("--enqueue=")); const wave = enqueueArg?.split("=")[1] as RobotWave | undefined; const profileArg = args.find((a) => a.startsWith("--profile=")); const profile = (profileArg?.split("=")[1] || "erik-safe") as RobotProfile; const maxQueuesArg = args.find((a) => a.startsWith("--max-queues=")); const maxQueues = maxQueuesArg ? Number.parseInt(maxQueuesArg.split("=")[1], 10) : undefined; const limitArg = args.find((a) => a.startsWith("--limit=")); const limit = limitArg ? Number.parseInt(limitArg.split("=")[1], 10) : 5; const dryRun = args.includes("--dry-run"); const tipLlmPlan = args.includes("--tipllm-plan"); if (args.includes("--status") || (!wave && !tipLlmPlan)) { const status = await getVerificationStatus(); printStatus(status.summary, status.vendors); recordStatusExperience(status.summary, status.vendors); } if (tipLlmPlan) { await planWithTipLlm(limit); } if (wave) { if (!["details-fast-lane", "priority-vendors", "all"].includes(wave)) { throw new Error("Invalid --enqueue value. Use details-fast-lane, priority-vendors, or all."); } if (!["erik-safe", "pi-fetch", "proxmox-heavy"].includes(profile)) { throw new Error("Invalid --profile value. Use erik-safe, pi-fetch, or proxmox-heavy."); } await enqueueRobotWave(wave, { dryRun, profile, maxQueues }); } await pool.end(); } if (require.main === module) { main().catch(async (err) => { console.error("Fatal:", err); await pool.end().catch(() => {}); process.exit(1); }); }