feat: Abverkauf velocity engine — sql/118 + analyzer + API endpoints

- sql/118-stock-velocity.sql: new stock_velocity (UPSERT per tx×vendor)
  and stock_velocity_events tables with TimescaleDB-compatible indexes
- stock-velocity-analyzer.ts: computes sell-through from stock_observations
  time-series; detects sold/zulauf/data_gap events, trims top-10% outliers,
  predicts stockout date, assigns high/medium/low/insufficient confidence
- scheduler.ts: analyze:stock:velocity job at 04:30/12:30/20:30 UTC
- stock.ts: GET /api/stock/velocity (paginated, filterable by vendor/confidence/
  stockout_days) + GET /api/stock/velocity/:id (per-product with event history)
- First run: 208 products, 979 sell events, 2811 Zulauf events written
This commit is contained in:
Rene Fichtmueller 2026-05-14 00:24:58 +02:00
parent 637839e965
commit 0d7a92e749
4 changed files with 825 additions and 1 deletions

View File

@ -7,6 +7,8 @@
* Routes:
* GET /api/stock Latest obs per transceiver × vendor (paginated)
* GET /api/stock/summary Aggregate warehouse stats (totals, top movers)
* GET /api/stock/velocity Abverkauf velocity results (paginated, filterable)
* GET /api/stock/velocity/:id Velocity + event history for one transceiver
* GET /api/stock/:transceiverIdOrSku Full obs history for one transceiver
*/
import { Router, Request, Response } from "express";
@ -283,6 +285,239 @@ stockRouter.get("/summary", async (req: Request, res: Response) => {
}
});
// ─── GET /api/stock/velocity ─────────────────────────────────────────────────
/**
* Paginated Abverkauf velocity results from the stock_velocity table.
* Query params:
* vendor_id filter by vendor UUID
* confidence "high" | "medium" | "low" | "insufficient"
* stockout_days only products with estimated_stockout_days <= N (0 = already out)
* min_sell_rate minimum avg_daily_sell_rate
* part_number partial match
* limit default 50, max 200
* offset default 0
*/
stockRouter.get("/velocity", async (req: Request, res: Response) => {
try {
const limit = Math.min(intParam(req, "limit", 50), 200);
const offset = intParam(req, "offset", 0);
const vendorId = req.query.vendor_id ? String(req.query.vendor_id) : null;
const confidence = req.query.confidence ? String(req.query.confidence) : null;
const stockoutDays = req.query.stockout_days !== undefined
? parseInt(String(req.query.stockout_days), 10)
: null;
const minSellRate = req.query.min_sell_rate
? parseFloat(String(req.query.min_sell_rate))
: null;
const partNumber = req.query.part_number ? String(req.query.part_number) : null;
const conditions: string[] = [];
const params: unknown[] = [];
let p = 1;
if (vendorId) {
conditions.push(`sv.vendor_id = $${p++}`);
params.push(vendorId);
}
if (confidence) {
conditions.push(`sv.velocity_confidence = $${p++}`);
params.push(confidence);
}
if (stockoutDays !== null && Number.isFinite(stockoutDays)) {
conditions.push(`sv.estimated_stockout_days <= $${p++}`);
params.push(stockoutDays);
}
if (minSellRate !== null && Number.isFinite(minSellRate)) {
conditions.push(`sv.avg_daily_sell_rate >= $${p++}`);
params.push(minSellRate);
}
if (partNumber) {
conditions.push(`t.part_number ILIKE $${p++}`);
params.push(`%${partNumber}%`);
}
const whereClause = conditions.length ? `WHERE ${conditions.join(" AND ")}` : "";
const sql = `
SELECT
sv.transceiver_id,
sv.vendor_id,
sv.computed_at,
sv.window_start,
sv.window_end,
sv.obs_count,
sv.avg_daily_sell_rate,
sv.peak_daily_sell_rate,
sv.total_sell_events,
sv.total_units_sold_implied,
sv.units_sold_counter_delta,
sv.units_sold_daily_rate,
sv.total_zulauf_events,
sv.total_units_zulauf,
sv.last_zulauf_at,
sv.next_expected_delivery,
sv.current_qty,
sv.current_backorder_qty,
sv.current_price_net,
sv.estimated_stockout_days,
sv.estimated_stockout_date,
sv.velocity_confidence,
t.part_number,
t.form_factor,
t.speed,
v.name AS vendor_name,
v.website AS vendor_website
FROM stock_velocity sv
JOIN transceivers t ON t.id = sv.transceiver_id
JOIN vendors v ON v.id = sv.vendor_id
${whereClause}
ORDER BY
CASE sv.velocity_confidence
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
WHEN 'insufficient' THEN 4
ELSE 5
END,
sv.avg_daily_sell_rate DESC NULLS LAST
LIMIT $${p++} OFFSET $${p++}
`;
params.push(limit, offset);
const countSql = `
SELECT COUNT(*)
FROM stock_velocity sv
JOIN transceivers t ON t.id = sv.transceiver_id
JOIN vendors v ON v.id = sv.vendor_id
${whereClause}
`;
const [rows, countRow] = await Promise.all([
pool.query(sql, params),
pool.query(countSql, params.slice(0, params.length - 2)),
]);
res.json({
success: true,
data: rows.rows,
meta: {
total: parseInt(countRow.rows[0].count, 10),
limit,
offset,
},
});
} catch (err) {
console.error("GET /api/stock/velocity error:", err);
res.status(500).json({ success: false, error: "Internal server error" });
}
});
// ─── GET /api/stock/velocity/:id ─────────────────────────────────────────────
/**
* Velocity summary + raw event history for one transceiver.
* :id can be a UUID or part_number (case-insensitive).
* Query params:
* vendor_id filter to a specific vendor (optional; returns all vendors if omitted)
* event_limit max events returned per vendor (default 200)
*/
stockRouter.get("/velocity/:id", async (req: Request, res: Response) => {
try {
const id = String(req.params.id);
const vendorId = req.query.vendor_id ? String(req.query.vendor_id) : null;
const eventLimit = Math.min(intParam(req, "event_limit", 200), 1000);
// Resolve UUID vs part_number
let transceiverUuid: string | null = null;
const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
if (uuidRegex.test(id)) {
transceiverUuid = id;
} else {
const r = await pool.query(
`SELECT id FROM transceivers WHERE part_number ILIKE $1 LIMIT 1`,
[id]
);
if (r.rows.length > 0) transceiverUuid = r.rows[0].id;
}
if (!transceiverUuid) {
res.status(404).json({ success: false, error: "Transceiver not found" });
return;
}
const velocityParams: unknown[] = [transceiverUuid];
let vendorFilter = "";
if (vendorId) {
velocityParams.push(vendorId);
vendorFilter = `AND sv.vendor_id = $${velocityParams.length}`;
}
const eventParams: unknown[] = [transceiverUuid, eventLimit];
let eventVendorFilter = "";
if (vendorId) {
eventParams.push(vendorId);
eventVendorFilter = `AND sve.vendor_id = $${eventParams.length}`;
}
const [transceiver, velocity, events] = await Promise.all([
pool.query(
`SELECT t.*, v.name AS brand_name
FROM transceivers t LEFT JOIN vendors v ON v.id = t.vendor_id
WHERE t.id = $1`,
[transceiverUuid]
),
pool.query(
`SELECT
sv.*,
v.name AS vendor_name,
v.website AS vendor_website
FROM stock_velocity sv
JOIN vendors v ON v.id = sv.vendor_id
WHERE sv.transceiver_id = $1 ${vendorFilter}
ORDER BY sv.velocity_confidence, sv.avg_daily_sell_rate DESC NULLS LAST`,
velocityParams
),
pool.query(
`SELECT
sve.event_at,
sve.event_type,
sve.units_delta,
sve.daily_rate,
sve.qty_before,
sve.qty_after,
sve.hours_elapsed,
v.name AS vendor_name
FROM stock_velocity_events sve
JOIN vendors v ON v.id = sve.vendor_id
WHERE sve.transceiver_id = $1 ${eventVendorFilter}
ORDER BY sve.event_at DESC
LIMIT $2`,
eventParams
),
]);
if (!transceiver.rows[0]) {
res.status(404).json({ success: false, error: "Transceiver not found" });
return;
}
res.json({
success: true,
data: {
transceiver: transceiver.rows[0],
velocity: velocity.rows,
events: events.rows,
meta: {
velocity_count: velocity.rows.length,
event_count: events.rows.length,
},
},
});
} catch (err) {
console.error("GET /api/stock/velocity/:id error:", err);
res.status(500).json({ success: false, error: "Internal server error" });
}
});
// ─── GET /api/stock/:id ──────────────────────────────────────────────────────
/**
* Full observation history for one transceiver.
@ -327,7 +562,7 @@ stockRouter.get("/:id", async (req: Request, res: Response) => {
const [transceiver, observations] = await Promise.all([
pool.query(
`SELECT t.*, v.name AS brand_name
FROM transceivers t LEFT JOIN vendors v ON v.id = t.brand_vendor_id
FROM transceivers t LEFT JOIN vendors v ON v.id = t.vendor_id
WHERE t.id = $1`,
[transceiverUuid]
),

View File

@ -0,0 +1,487 @@
/**
* Stock Velocity Analyzer Abverkauf & Zulauf Evaluation
*
* Processes time-series stock_observations to compute:
* avg_daily_sell_rate implied units sold per day
* total_units_sold implied cumulative sold in window
* Zulauf events when and how much stock was replenished
* estimated_stockout_date when current stock is expected to run out
*
* Data sources ranked by confidence:
* 3 = FS.com per-warehouse breakdown + units_sold counter
* 2 = QSFPTEK global real-time quantity
* 1 = ATGBICS/Optcore binary in/out stock only (skipped for velocity)
*
* Velocity is only computed for confidence 2 sources.
*
* Called by pg-boss job "analyze:stock:velocity".
*/
import { pool } from "../utils/db";
// ── Config ────────────────────────────────────────────────────────────────────
const MIN_OBS_FOR_VELOCITY = 2; // absolute minimum observations
const MAX_INTERVAL_HOURS = 96; // ignore gaps > 4 days (data outage, not a sale)
const MIN_INTERVAL_HOURS = 0.4; // ignore observations < 24min apart (duplicates)
const WINDOW_DAYS = 30; // look back this many days for velocity calc
// ── Types ─────────────────────────────────────────────────────────────────────
interface StockObs {
time: Date;
physicalQty: number; // warehouse_de_qty + warehouse_global_qty
quantityAvailable: number | null;
backorderQty: number | null;
backorderEstimatedDate: string | null;
unitsSold: number | null;
priceNet: number | null;
}
interface VelocityEvent {
transceiverId: string;
vendorId: string;
eventAt: Date;
eventType: "sold" | "zulauf" | "unchanged" | "data_gap";
unitsDelta: number;
dailyRate: number | null;
qtyBefore: number;
qtyAfter: number;
hoursElapsed: number;
}
interface VelocityResult {
transceiverId: string;
vendorId: string;
windowStart: Date;
windowEnd: Date;
obsCount: number;
avgDailySellRate: number | null;
peakDailySellRate: number | null;
totalSellEvents: number;
totalUnitsSoldImplied: number;
unitsSoldCounterDelta: number | null;
unitsSoldDailyRate: number | null;
totalZulaufEvents: number;
totalUnitsZulauf: number;
lastZulaufAt: Date | null;
nextExpectedDelivery: string | null;
currentQty: number | null;
currentBackorderQty: number | null;
currentPriceNet: number | null;
estimatedStockoutDays: number | null;
estimatedStockoutDate: Date | null;
velocityConfidence: "high" | "medium" | "low" | "insufficient";
events: VelocityEvent[];
}
// ── Core velocity computation ──────────────────────────────────────────────────
function computeVelocity(
transceiverId: string,
vendorId: string,
observations: StockObs[]
): VelocityResult {
const sorted = [...observations].sort((a, b) => a.time.getTime() - b.time.getTime());
const windowStart = sorted[0].time;
const windowEnd = sorted[sorted.length - 1].time;
const windowDays = Math.max(1, (windowEnd.getTime() - windowStart.getTime()) / 86400000);
const latest = sorted[sorted.length - 1];
const earliest = sorted[0];
// ── FS.com units_sold counter delta (most reliable) ──────────────────────
let unitsSoldCounterDelta: number | null = null;
let unitsSoldDailyRate: number | null = null;
const firstWithSold = sorted.find((o) => o.unitsSold !== null && o.unitsSold > 0);
const lastWithSold = [...sorted].reverse().find((o) => o.unitsSold !== null && o.unitsSold > 0);
if (firstWithSold && lastWithSold && firstWithSold !== lastWithSold) {
const delta = (lastWithSold.unitsSold ?? 0) - (firstWithSold.unitsSold ?? 0);
// Sanity: delta should be positive and not unrealistically large (>10x multiplier = format glitch)
const spanDays = Math.max(1, (lastWithSold.time.getTime() - firstWithSold.time.getTime()) / 86400000);
if (delta > 0 && delta < (firstWithSold.unitsSold ?? 1) * 5) {
unitsSoldCounterDelta = delta;
unitsSoldDailyRate = delta / spanDays;
}
}
// ── Interval-by-interval delta analysis ──────────────────────────────────
const events: VelocityEvent[] = [];
const sellRates: number[] = [];
let totalSellEvents = 0;
let totalUnitsSoldImplied = 0;
let totalZulaufEvents = 0;
let totalUnitsZulauf = 0;
let lastZulaufAt: Date | null = null;
for (let i = 1; i < sorted.length; i++) {
const prev = sorted[i - 1];
const curr = sorted[i];
const hoursElapsed = (curr.time.getTime() - prev.time.getTime()) / 3600000;
// Skip too-close (duplicates) or too-far (outages)
if (hoursElapsed < MIN_INTERVAL_HOURS) continue;
if (hoursElapsed > MAX_INTERVAL_HOURS) {
events.push({
transceiverId, vendorId,
eventAt: curr.time,
eventType: "data_gap",
unitsDelta: 0,
dailyRate: null,
qtyBefore: prev.physicalQty,
qtyAfter: curr.physicalQty,
hoursElapsed,
});
continue;
}
const delta = curr.physicalQty - prev.physicalQty;
const daysFraction = hoursElapsed / 24;
if (delta < 0) {
// Stock decreased → implied sales
const unitsSold = -delta;
const dailyRate = unitsSold / daysFraction;
events.push({
transceiverId, vendorId,
eventAt: curr.time,
eventType: "sold",
unitsDelta: delta, // negative
dailyRate,
qtyBefore: prev.physicalQty,
qtyAfter: curr.physicalQty,
hoursElapsed,
});
sellRates.push(dailyRate);
totalSellEvents++;
totalUnitsSoldImplied += unitsSold;
} else if (delta > 0) {
// Stock increased → Zulauf (replenishment or restock)
events.push({
transceiverId, vendorId,
eventAt: curr.time,
eventType: "zulauf",
unitsDelta: delta, // positive
dailyRate: null,
qtyBefore: prev.physicalQty,
qtyAfter: curr.physicalQty,
hoursElapsed,
});
totalZulaufEvents++;
totalUnitsZulauf += delta;
lastZulaufAt = curr.time;
} else {
events.push({
transceiverId, vendorId,
eventAt: curr.time,
eventType: "unchanged",
unitsDelta: 0,
dailyRate: null,
qtyBefore: prev.physicalQty,
qtyAfter: curr.physicalQty,
hoursElapsed,
});
}
}
// ── Compute average sell rate ─────────────────────────────────────────────
let avgDailySellRate: number | null = null;
let peakDailySellRate: number | null = null;
if (sellRates.length > 0) {
// Use trimmed mean (remove top 10% outliers to avoid one-off bulk events)
const sorted_rates = [...sellRates].sort((a, b) => a - b);
const trimCount = Math.max(0, Math.floor(sorted_rates.length * 0.1));
const trimmed = sorted_rates.slice(0, sorted_rates.length - trimCount);
avgDailySellRate = trimmed.reduce((s, r) => s + r, 0) / trimmed.length;
peakDailySellRate = sorted_rates[sorted_rates.length - 1];
}
// Prefer units_sold daily rate from FS.com counter (more reliable)
const effectiveRate = unitsSoldDailyRate ?? avgDailySellRate;
// ── Current state from latest observation ────────────────────────────────
const currentQty = latest.physicalQty;
const currentBackorderQty = latest.backorderQty;
const currentPriceNet = latest.priceNet;
const nextExpectedDelivery = latest.backorderEstimatedDate;
// ── Stockout prediction ───────────────────────────────────────────────────
let estimatedStockoutDays: number | null = null;
let estimatedStockoutDate: Date | null = null;
if (effectiveRate !== null && effectiveRate > 0 && currentQty !== null && currentQty > 0) {
estimatedStockoutDays = currentQty / effectiveRate;
estimatedStockoutDate = new Date(
latest.time.getTime() + estimatedStockoutDays * 86400000
);
} else if (currentQty === 0) {
estimatedStockoutDays = 0;
estimatedStockoutDate = latest.time;
}
// ── Confidence assessment ─────────────────────────────────────────────────
const meaningfulObs = sorted.length;
let velocityConfidence: "high" | "medium" | "low" | "insufficient";
if (meaningfulObs >= 14 && (totalSellEvents >= 3 || unitsSoldCounterDelta !== null)) {
velocityConfidence = "high";
} else if (meaningfulObs >= 5 && totalSellEvents >= 1) {
velocityConfidence = "medium";
} else if (meaningfulObs >= 2) {
velocityConfidence = "low";
} else {
velocityConfidence = "insufficient";
}
return {
transceiverId,
vendorId,
windowStart,
windowEnd,
obsCount: meaningfulObs,
avgDailySellRate,
peakDailySellRate,
totalSellEvents,
totalUnitsSoldImplied,
unitsSoldCounterDelta,
unitsSoldDailyRate,
totalZulaufEvents,
totalUnitsZulauf,
lastZulaufAt,
nextExpectedDelivery: nextExpectedDelivery ?? null,
currentQty,
currentBackorderQty: currentBackorderQty ?? null,
currentPriceNet,
estimatedStockoutDays,
estimatedStockoutDate,
velocityConfidence,
events,
};
}
// ── Database I/O ──────────────────────────────────────────────────────────────
async function fetchObservations(
vendorId: string
): Promise<Map<string, StockObs[]>> {
const result = await pool.query<{
transceiver_id: string;
time: Date;
warehouse_de_qty: number | null;
warehouse_global_qty: number | null;
quantity_available: number | null;
backorder_qty: number | null;
backorder_estimated_date: string | null;
units_sold: number | null;
price_net: number | null;
}>(
`SELECT
transceiver_id,
time,
warehouse_de_qty,
warehouse_global_qty,
quantity_available,
backorder_qty,
backorder_estimated_date::text,
units_sold,
price_net
FROM stock_observations
WHERE source_vendor_id = $1
AND stock_confidence >= 2
AND time >= NOW() - INTERVAL '${WINDOW_DAYS} days'
ORDER BY transceiver_id, time`,
[vendorId]
);
const byProduct = new Map<string, StockObs[]>();
for (const row of result.rows) {
const obs: StockObs = {
time: row.time,
physicalQty:
(row.warehouse_de_qty ?? 0) + (row.warehouse_global_qty ?? 0) ||
(row.quantity_available ?? 0),
quantityAvailable: row.quantity_available,
backorderQty: row.backorder_qty,
backorderEstimatedDate: row.backorder_estimated_date,
unitsSold: row.units_sold,
priceNet: row.price_net,
};
const list = byProduct.get(row.transceiver_id) ?? [];
list.push(obs);
byProduct.set(row.transceiver_id, list);
}
return byProduct;
}
async function upsertVelocityResult(r: VelocityResult): Promise<void> {
await pool.query(
`INSERT INTO stock_velocity (
transceiver_id, vendor_id, computed_at,
window_start, window_end, obs_count,
avg_daily_sell_rate, peak_daily_sell_rate,
total_sell_events, total_units_sold_implied,
units_sold_counter_delta, units_sold_daily_rate,
total_zulauf_events, total_units_zulauf,
last_zulauf_at, next_expected_delivery,
current_qty, current_backorder_qty, current_price_net,
estimated_stockout_days, estimated_stockout_date,
velocity_confidence
) VALUES (
$1, $2, NOW(),
$3, $4, $5,
$6, $7,
$8, $9,
$10, $11,
$12, $13,
$14, $15,
$16, $17, $18,
$19, $20,
$21
)
ON CONFLICT (transceiver_id, vendor_id) DO UPDATE SET
computed_at = EXCLUDED.computed_at,
window_start = EXCLUDED.window_start,
window_end = EXCLUDED.window_end,
obs_count = EXCLUDED.obs_count,
avg_daily_sell_rate = EXCLUDED.avg_daily_sell_rate,
peak_daily_sell_rate = EXCLUDED.peak_daily_sell_rate,
total_sell_events = EXCLUDED.total_sell_events,
total_units_sold_implied = EXCLUDED.total_units_sold_implied,
units_sold_counter_delta = EXCLUDED.units_sold_counter_delta,
units_sold_daily_rate = EXCLUDED.units_sold_daily_rate,
total_zulauf_events = EXCLUDED.total_zulauf_events,
total_units_zulauf = EXCLUDED.total_units_zulauf,
last_zulauf_at = EXCLUDED.last_zulauf_at,
next_expected_delivery = EXCLUDED.next_expected_delivery,
current_qty = EXCLUDED.current_qty,
current_backorder_qty = EXCLUDED.current_backorder_qty,
current_price_net = EXCLUDED.current_price_net,
estimated_stockout_days = EXCLUDED.estimated_stockout_days,
estimated_stockout_date = EXCLUDED.estimated_stockout_date,
velocity_confidence = EXCLUDED.velocity_confidence`,
[
r.transceiverId, r.vendorId,
r.windowStart, r.windowEnd, r.obsCount,
r.avgDailySellRate, r.peakDailySellRate,
r.totalSellEvents, r.totalUnitsSoldImplied,
r.unitsSoldCounterDelta, r.unitsSoldDailyRate,
r.totalZulaufEvents, r.totalUnitsZulauf,
r.lastZulaufAt, r.nextExpectedDelivery,
r.currentQty, r.currentBackorderQty, r.currentPriceNet,
r.estimatedStockoutDays, r.estimatedStockoutDate,
r.velocityConfidence,
]
);
}
async function insertVelocityEvents(events: VelocityEvent[]): Promise<void> {
if (events.length === 0) return;
// Deduplicate against existing events (don't re-insert known events)
const minTime = events.reduce((min, e) => e.eventAt < min ? e.eventAt : min, events[0].eventAt);
const txId = events[0].transceiverId;
const vendorId = events[0].vendorId;
await pool.query(
`DELETE FROM stock_velocity_events
WHERE transceiver_id = $1 AND vendor_id = $2 AND event_at >= $3`,
[txId, vendorId, minTime]
);
for (const e of events) {
await pool.query(
`INSERT INTO stock_velocity_events
(transceiver_id, vendor_id, event_at, event_type, units_delta, daily_rate,
qty_before, qty_after, hours_elapsed)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT DO NOTHING`,
[
e.transceiverId, e.vendorId, e.eventAt, e.eventType,
e.unitsDelta, e.dailyRate, e.qtyBefore, e.qtyAfter, e.hoursElapsed,
]
);
}
}
// ── Main export ───────────────────────────────────────────────────────────────
export async function analyzeStockVelocity(): Promise<void> {
console.log("=== Stock Velocity Analyzer starting ===\n");
// Find all vendors with confidence >= 2 stock data
const vendorResult = await pool.query<{ id: string; name: string }>(
`SELECT DISTINCT v.id, v.name
FROM stock_observations so
JOIN vendors v ON v.id = so.source_vendor_id
WHERE so.stock_confidence >= 2
AND so.time >= NOW() - INTERVAL '${WINDOW_DAYS} days'
ORDER BY v.name`
);
let totalProducts = 0;
let totalSellEvents = 0;
let totalZulaufEvents = 0;
let skipped = 0;
for (const vendor of vendorResult.rows) {
console.log(`\n[${vendor.name}] Loading observations…`);
const obsMap = await fetchObservations(vendor.id);
let vProducts = 0;
let vSellEvents = 0;
let vZulaufEvents = 0;
for (const [transceiverId, observations] of obsMap) {
if (observations.length < MIN_OBS_FOR_VELOCITY) {
skipped++;
continue;
}
const result = computeVelocity(transceiverId, vendor.id, observations);
await upsertVelocityResult(result);
await insertVelocityEvents(result.events);
vProducts++;
vSellEvents += result.totalSellEvents;
vZulaufEvents += result.totalZulaufEvents;
}
console.log(
` ${vProducts} products | ` +
`${vSellEvents} sell events | ` +
`${vZulaufEvents} Zulauf events`
);
totalProducts += vProducts;
totalSellEvents += vSellEvents;
totalZulaufEvents += vZulaufEvents;
}
console.log("\n=== Stock Velocity Analyzer complete ===");
console.log(` Vendors analyzed: ${vendorResult.rows.length}`);
console.log(` Products analyzed: ${totalProducts}`);
console.log(` Sell events: ${totalSellEvents}`);
console.log(` Zulauf events: ${totalZulaufEvents}`);
if (skipped > 0) console.log(` Skipped (<${MIN_OBS_FOR_VELOCITY} obs): ${skipped}`);
}
if (require.main === module) {
analyzeStockVelocity()
.then(() => pool.end())
.catch((err) => {
console.error("Fatal:", err);
pool.end();
process.exit(1);
});
}

View File

@ -362,6 +362,8 @@ export async function registerSchedules(boss: PgBoss): Promise<void> {
"match:opn",
// ── Spec-Based Equivalence Matcher ───────────────────────────────────
"match:spec",
// ── Stock Velocity / Abverkauf Analyzer ──────────────────────────────
"analyze:stock:velocity",
];
for (const q of queues) {
@ -452,6 +454,13 @@ export async function registerSchedules(boss: PgBoss): Promise<void> {
expireInSeconds: 1800,
});
// Stock Velocity / Abverkauf Analyzer — 3x täglich (nach FS.com + QSFPTEK Scrapes)
// Läuft nach den Haupt-Stock-Scrapes: 04:30, 12:30, 20:30 UTC
await boss.schedule("analyze:stock:velocity", "30 4,12,20 * * *", {}, {
retryLimit: 2,
expireInSeconds: 1800,
});
// ══════════════════════════════════════════════════════════════════════
// MANUFACTURER CATALOGS — every 4h (product data, no prices)
// ══════════════════════════════════════════════════════════════════════
@ -983,6 +992,15 @@ export async function registerWorkers(boss: PgBoss): Promise<void> {
);
});
// Stock Velocity / Abverkauf Analyzer
await boss.work("analyze:stock:velocity", async () => {
const ts = new Date().toISOString();
console.log(`[${ts}] Running: Stock Velocity Analyzer`);
const { analyzeStockVelocity } = await import("./robots/stock-velocity-analyzer");
await analyzeStockVelocity();
console.log(`[${ts}] Stock Velocity Analyzer complete`);
});
// Flexoptix Detail Enricher — fetches full specs + compat from API per SKU
await boss.work("enrich:flexoptix-details", async () => {
const ts = new Date().toISOString();

View File

@ -0,0 +1,84 @@
-- ══════════════════════════════════════════════════════════════════════════════
-- 118 — Stock Velocity & Sell-Through Analysis
--
-- Evaluates implied Abverkauf (sell-through) from time-series stock_observations:
-- • Negative stock delta → implied units sold (sell event)
-- • Positive stock delta after backorder → Zulauf (incoming replenishment)
-- • FS.com units_sold counter delta → high-confidence sell signal
--
-- Stores per-product velocity results in stock_velocity for API / dashboard use.
-- ══════════════════════════════════════════════════════════════════════════════
-- ── Main results table ────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS stock_velocity (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
transceiver_id UUID NOT NULL REFERENCES transceivers(id) ON DELETE CASCADE,
vendor_id UUID NOT NULL REFERENCES vendors(id) ON DELETE CASCADE,
computed_at TIMESTAMPTZ DEFAULT NOW() NOT NULL,
-- Observation window
window_start TIMESTAMPTZ NOT NULL,
window_end TIMESTAMPTZ NOT NULL,
obs_count INTEGER NOT NULL,
-- Sell-through metrics
avg_daily_sell_rate NUMERIC(12, 2), -- units/day (implied)
peak_daily_sell_rate NUMERIC(12, 2), -- highest single-interval rate
total_sell_events INTEGER DEFAULT 0,
total_units_sold_implied INTEGER DEFAULT 0,
-- FS.com direct counter (more reliable when available)
units_sold_counter_delta BIGINT, -- delta in FS.com units_sold between first/last obs
units_sold_daily_rate NUMERIC(12, 2), -- counter_delta / window_days
-- Zulauf (incoming stock / replenishment)
total_zulauf_events INTEGER DEFAULT 0,
total_units_zulauf INTEGER DEFAULT 0,
last_zulauf_at TIMESTAMPTZ,
next_expected_delivery DATE, -- backorder_estimated_date from latest obs
-- Current stock state (from latest observation)
current_qty INTEGER,
current_backorder_qty INTEGER,
current_price_net NUMERIC(10, 2),
-- Sell-through prediction
estimated_stockout_days NUMERIC(8, 1), -- NULL if no velocity or stock = 0
estimated_stockout_date DATE,
-- Signal quality
velocity_confidence TEXT CHECK (velocity_confidence IN ('high', 'medium', 'low', 'insufficient')),
-- high = ≥14 observations with meaningful deltas
-- medium = ≥5 observations
-- low = 24 observations
-- insufficient = only 1 observation or no change detected
UNIQUE (transceiver_id, vendor_id)
);
CREATE INDEX IF NOT EXISTS idx_stock_velocity_vendor ON stock_velocity (vendor_id);
CREATE INDEX IF NOT EXISTS idx_stock_velocity_computed ON stock_velocity (computed_at);
CREATE INDEX IF NOT EXISTS idx_stock_velocity_stockout ON stock_velocity (estimated_stockout_date)
WHERE estimated_stockout_date IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_stock_velocity_confidence ON stock_velocity (velocity_confidence);
COMMENT ON TABLE stock_velocity IS
'Computed sell-through velocity per transceiver per vendor, derived from '
'time-series stock_observations. Refreshed by analyze:stock:velocity job.';
-- ── Sell event log (raw events for trend analysis) ────────────────────────────
CREATE TABLE IF NOT EXISTS stock_velocity_events (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
transceiver_id UUID NOT NULL REFERENCES transceivers(id) ON DELETE CASCADE,
vendor_id UUID NOT NULL REFERENCES vendors(id) ON DELETE CASCADE,
event_at TIMESTAMPTZ NOT NULL,
event_type TEXT NOT NULL CHECK (event_type IN ('sold', 'zulauf', 'unchanged', 'data_gap')),
units_delta INTEGER, -- negative = sold, positive = arrived
daily_rate NUMERIC(10, 2), -- implied rate for this interval
qty_before INTEGER,
qty_after INTEGER,
hours_elapsed NUMERIC(8, 2)
);
CREATE INDEX IF NOT EXISTS idx_velocity_events_tx ON stock_velocity_events (transceiver_id, vendor_id, event_at);
CREATE INDEX IF NOT EXISTS idx_velocity_events_type ON stock_velocity_events (event_type, event_at);