189 lines
7.7 KiB
TypeScript

/**
* Price Alert Subscriptions — /api/price-alerts
*
* Users subscribe to price thresholds for specific SKUs or form factor/speed combos.
* A background checker (called by the scraper scheduler) evaluates active subscriptions
* against the latest price_observations and queues email delivery.
*
* Routes:
* POST /api/price-alerts — Create subscription
* GET /api/price-alerts?email= — List subscriptions for an email
* DELETE /api/price-alerts/:id — Cancel subscription
* POST /api/price-alerts/check — Internal: evaluate + queue alerts (scheduler)
* GET /api/price-alerts/triggered — Recent triggered alerts
*/
import { Router, Request, Response } from "express";
import { pool } from "../db/client";
export const priceAlertsRouter = Router();
// ── POST /api/price-alerts — Create a price alert subscription ───────────────
priceAlertsRouter.post("/", async (req: Request, res: Response) => {
const {
email, transceiver_id, form_factor, speed_gbps,
threshold_price, currency = "USD", direction = "below", vendor_id,
} = req.body as Record<string, any>;
if (!email || typeof email !== "string" || !email.includes("@")) {
return res.status(400).json({ success: false, error: "Valid email required" });
}
if (!threshold_price || isNaN(parseFloat(threshold_price))) {
return res.status(400).json({ success: false, error: "threshold_price required" });
}
if (!transceiver_id && !form_factor && !speed_gbps) {
return res.status(400).json({ success: false, error: "At least one of: transceiver_id, form_factor, speed_gbps" });
}
try {
const result = await pool.query(
`INSERT INTO price_alert_subscriptions
(email, transceiver_id, form_factor, speed_gbps, threshold_price, currency, direction, vendor_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING id, email, threshold_price, currency, direction, created_at`,
[
email.toLowerCase().trim(),
transceiver_id || null,
form_factor || null,
speed_gbps ? parseFloat(speed_gbps) : null,
parseFloat(threshold_price),
currency.toUpperCase(),
direction,
vendor_id || null,
]
);
return res.status(201).json({ success: true, subscription: result.rows[0] });
} catch (err) {
return res.status(500).json({ success: false, error: String(err) });
}
});
// ── GET /api/price-alerts?email= — List subscriptions ────────────────────────
priceAlertsRouter.get("/", async (req: Request, res: Response) => {
const email = String(Array.isArray(req.query.email) ? req.query.email[0] ?? "" : req.query.email ?? "").trim().toLowerCase();
if (!email) return res.status(400).json({ success: false, error: "email required" });
try {
const result = await pool.query(
`SELECT pas.*,
t.standard_name, t.form_factor AS tx_form_factor, t.speed_gbps AS tx_speed,
v.name AS vendor_name
FROM price_alert_subscriptions pas
LEFT JOIN transceivers t ON t.id = pas.transceiver_id
LEFT JOIN vendors v ON v.id = pas.vendor_id
WHERE pas.email = $1
ORDER BY pas.created_at DESC`,
[email]
);
return res.json({ success: true, subscriptions: result.rows });
} catch (err) {
return res.status(500).json({ success: false, error: String(err) });
}
});
// ── DELETE /api/price-alerts/:id — Cancel subscription ───────────────────────
priceAlertsRouter.delete("/:id", async (req: Request, res: Response) => {
const id = String(req.params.id);
const email = String(Array.isArray(req.query.email) ? req.query.email[0] ?? "" : req.query.email ?? "").trim().toLowerCase();
try {
const result = await pool.query(
`UPDATE price_alert_subscriptions SET active = false
WHERE id = $1 AND ($2 = '' OR email = $2)
RETURNING id`,
[parseInt(id), email]
);
if (result.rowCount === 0) {
return res.status(404).json({ success: false, error: "Subscription not found" });
}
return res.json({ success: true, cancelled: parseInt(id) });
} catch (err) {
return res.status(500).json({ success: false, error: String(err) });
}
});
// ── GET /api/price-alerts/triggered — Recent triggered alerts ─────────────────
priceAlertsRouter.get("/triggered", async (_req: Request, res: Response) => {
try {
const result = await pool.query(`
SELECT pal.*,
t.standard_name, t.form_factor,
v.name AS vendor_name
FROM price_alert_log pal
LEFT JOIN transceivers t ON t.id = pal.transceiver_id
LEFT JOIN vendors v ON v.id = pal.vendor_id
ORDER BY pal.created_at DESC
LIMIT 100
`);
return res.json({ success: true, alerts: result.rows });
} catch (err) {
return res.status(500).json({ success: false, error: String(err) });
}
});
// ── POST /api/price-alerts/check — Evaluate all active subscriptions ──────────
// Called by the scraper scheduler periodically. Finds triggered conditions,
// inserts into price_alert_log, and marks last_triggered on subscription.
priceAlertsRouter.post("/check", async (_req: Request, res: Response) => {
try {
// Find subscriptions where latest price crosses the threshold
const triggered = await pool.query(`
WITH latest_prices AS (
SELECT DISTINCT ON (po.transceiver_id, po.source_vendor_id)
po.transceiver_id, po.source_vendor_id AS vendor_id,
po.price, po.currency, po.time
FROM price_observations po
WHERE po.price > 0 AND COALESCE(po.is_anomalous, false) = false
ORDER BY po.transceiver_id, po.source_vendor_id, po.time DESC
),
matched AS (
SELECT
pas.id AS subscription_id,
pas.email, pas.threshold_price, pas.currency, pas.direction,
lp.transceiver_id, lp.vendor_id, lp.price AS triggered_price
FROM price_alert_subscriptions pas
JOIN latest_prices lp ON (
(pas.transceiver_id IS NULL OR lp.transceiver_id = pas.transceiver_id)
AND lp.currency = pas.currency
AND (pas.vendor_id IS NULL OR lp.vendor_id = pas.vendor_id)
)
JOIN transceivers t ON t.id = lp.transceiver_id
WHERE pas.active = true
AND (pas.form_factor IS NULL OR t.form_factor = pas.form_factor)
AND (pas.speed_gbps IS NULL OR t.speed_gbps = pas.speed_gbps)
AND (
(pas.direction = 'below' AND lp.price < pas.threshold_price)
OR
(pas.direction = 'above' AND lp.price > pas.threshold_price)
)
-- Don't re-trigger more than once per 24h per subscription
AND (pas.last_triggered IS NULL OR pas.last_triggered < NOW() - INTERVAL '24 hours')
)
SELECT * FROM matched
LIMIT 200
`);
let queued = 0;
for (const row of triggered.rows) {
await pool.query(
`INSERT INTO price_alert_log
(subscription_id, transceiver_id, vendor_id, triggered_price, threshold_price, currency, email, delivery_status)
VALUES ($1,$2,$3,$4,$5,$6,$7,'pending')`,
[row.subscription_id, row.transceiver_id, row.vendor_id,
row.triggered_price, row.threshold_price, row.currency, row.email]
);
await pool.query(
`UPDATE price_alert_subscriptions
SET last_triggered = NOW(), trigger_count = trigger_count + 1
WHERE id = $1`,
[row.subscription_id]
);
queued++;
}
return res.json({ success: true, checked: triggered.rowCount, queued });
} catch (err) {
return res.status(500).json({ success: false, error: String(err) });
}
});