Rene Fichtmueller d5be0ba43c fix: catch non-zero exit in local train, return JSON instead of 500
- sh -lc replaced with bash to avoid dash/profile.d syntax errors
- runCommand errors now caught in local provider path
- stdout/stderr extracted from error object and returned as JSON
- No more HTTP 500 on script failure
2026-04-25 23:24:04 +02:00

213 lines
8.9 KiB
TypeScript

import { execFile } from "child_process";
import { existsSync, readFileSync } from "fs";
import { join } from "path";
import { promisify } from "util";
import { Router, Request, Response } from "express";
const execFileAsync = promisify(execFile);
export const selflearningRouter = Router();
type Lane = "tip_llm" | "blog_llm";
type Provider = "runpod" | "local";
const repoRoot = join(__dirname, "..", "..", "..", "..");
const manifestPath = join(repoRoot, "training-data", "runpod", "manifest.json");
function isLane(value: unknown): value is Lane {
return value === "tip_llm" || value === "blog_llm";
}
function readManifest() {
if (!existsSync(manifestPath)) return null;
return JSON.parse(readFileSync(manifestPath, "utf8"));
}
async function runCommand(command: string, args: string[], timeoutMs = 20 * 60 * 1000) {
const { stdout, stderr } = await execFileAsync(command, args, {
cwd: repoRoot,
timeout: timeoutMs,
maxBuffer: 20 * 1024 * 1024,
env: process.env,
});
return { stdout, stderr };
}
async function keychain(service: string): Promise<string | null> {
if (process.platform !== "darwin") return null;
try {
const { stdout } = await execFileAsync("security", ["find-generic-password", "-s", service, "-w"], { timeout: 5000 });
return stdout.trim() || null;
} catch {
return null;
}
}
async function secret(envNames: string[], services: string[]): Promise<string | null> {
for (const name of envNames) if (process.env[name]) return process.env[name] as string;
for (const service of services) {
const value = await keychain(service);
if (value) return value;
}
return null;
}
function dataset(lane: Lane): string {
return lane === "tip_llm"
? process.env.TIP_HF_DATASET_TIP_LLM || "renefichtmueller/tip-llm-sft"
: process.env.TIP_HF_DATASET_BLOG_LLM || "renefichtmueller/blog-llm-sft";
}
function modelRepo(lane: Lane, runId: string): string {
const fallback = lane === "tip_llm" ? "renefichtmueller/TIP_LLM" : "renefichtmueller/Blog_LLM";
const base = lane === "tip_llm" ? process.env.TIP_HF_MODEL_TIP_LLM : process.env.TIP_HF_MODEL_BLOG_LLM;
return `${base || fallback}-${runId}`;
}
function runpodInput(lane: Lane, seedOnly: boolean, maxSteps: number, runId: string, hfToken: string) {
return {
user_id: "tip-selflearning",
model_id: `${lane}-${runId}`,
run_id: runId,
credentials: { hf_token: hfToken },
args: {
base_model: process.env.TIP_RUNPOD_BASE_MODEL || "Qwen/Qwen2.5-Coder-7B-Instruct",
model_type: "AutoModelForCausalLM",
tokenizer_type: "AutoTokenizer",
load_in_4bit: true,
strict: false,
datasets: [{ path: dataset(lane), type: "chat_template", split: "train" }],
val_set_size: 0.02,
output_dir: `/workspace/outputs/${lane}-${runId}`,
sequence_len: lane === "blog_llm" ? 4096 : 3072,
sample_packing: true,
eval_sample_packing: false,
pad_to_sequence_len: true,
adapter: "qlora",
lora_r: lane === "blog_llm" ? 48 : 32,
lora_alpha: lane === "blog_llm" ? 96 : 64,
lora_dropout: 0.05,
lora_target_linear: true,
lora_modules_to_save: ["embed_tokens", "lm_head"],
gradient_accumulation_steps: 2,
micro_batch_size: 1,
num_epochs: seedOnly ? 1 : lane === "blog_llm" ? 2 : 3,
optimizer: "adamw_torch_fused",
lr_scheduler: "cosine",
learning_rate: lane === "blog_llm" ? 0.00016 : 0.00018,
train_on_inputs: false,
bf16: "auto",
tf32: true,
gradient_checkpointing: true,
flash_attention: true,
logging_steps: 5,
warmup_steps: 10,
evals_per_epoch: 1,
save_steps: seedOnly ? 50 : 250,
max_steps: maxSteps,
push_to_hub: !seedOnly,
hub_model_id: modelRepo(lane, runId),
hub_strategy: "end",
hub_private_repo: true,
hf_use_auth_token: true,
special_tokens: { pad_token: "<|endoftext|>" },
},
};
}
selflearningRouter.get("/status", async (_req: Request, res: Response) => {
const runpodEndpoint = process.env.TIP_RUNPOD_ENDPOINT_ID || process.env.RUNPOD_ENDPOINT_ID || null;
const runpodToken = await secret(["RUNPOD_API_KEY", "TIP_RUNPOD_API_KEY"], ["magatama.runpod.api", "tip.runpod.api"]);
const hfToken = await secret(["HF_TOKEN", "HUGGINGFACE_TOKEN"], ["magatama.huggingface.token", "tip.huggingface.token"]);
res.json({
success: true,
manifest: readManifest(),
lanes: {
tip_llm: { dataset: dataset("tip_llm"), target_model_prefix: process.env.TIP_HF_MODEL_TIP_LLM || "renefichtmueller/TIP_LLM" },
blog_llm: { dataset: dataset("blog_llm"), target_model_prefix: process.env.TIP_HF_MODEL_BLOG_LLM || "renefichtmueller/Blog_LLM" },
},
runpod: { endpoint_configured: Boolean(runpodEndpoint), api_key_configured: Boolean(runpodToken) },
huggingface: { token_configured: Boolean(hfToken) },
local: { command: process.env.TIP_LOCAL_TRAIN_COMMAND || "not configured", ready: Boolean(process.env.TIP_LOCAL_TRAIN_COMMAND) },
});
});
selflearningRouter.post("/build", async (_req: Request, res: Response) => {
try {
const out = await runCommand("npm", ["run", "learning-pool:build"]);
res.json({ success: true, manifest: readManifest(), stdout: out.stdout.slice(-4000), stderr: out.stderr.slice(-4000) });
} catch (err) {
res.status(500).json({ success: false, error: String(err) });
}
});
selflearningRouter.post("/publish-hf", async (_req: Request, res: Response) => {
try {
if (!readManifest()) await runCommand("npm", ["run", "learning-pool:build"]);
const out = await runCommand("npm", ["run", "learning-pool:publish-hf"], 30 * 60 * 1000);
res.json({ success: true, stdout: out.stdout.slice(-6000), stderr: out.stderr.slice(-4000) });
} catch (err) {
res.status(500).json({ success: false, error: String(err) });
}
});
selflearningRouter.post("/train", async (req: Request, res: Response) => {
const lane = req.body?.lane;
const provider = (req.body?.provider || "runpod") as Provider;
const seedOnly = req.body?.seed_only !== false;
const maxSteps = Number(req.body?.max_steps || (seedOnly ? 200 : 2000));
if (!isLane(lane)) {
res.status(400).json({ success: false, error: "lane must be tip_llm or blog_llm" });
return;
}
if (provider === "local") {
const command = process.env.TIP_LOCAL_TRAIN_COMMAND;
if (!command) {
res.status(409).json({ success: false, error: "Local training command is not configured.", suggestion: "Set TIP_LOCAL_TRAIN_COMMAND; the lane name is appended automatically." });
return;
}
try {
const out = await runCommand("bash", [command, lane], 12 * 60 * 60 * 1000);
res.json({ success: true, provider, lane, stdout: out.stdout.slice(-6000), stderr: out.stderr.slice(-4000) });
} catch (err: unknown) {
// execFileAsync throws on non-zero exit — stdout/stderr are still on the error object
const e = err as { stdout?: string; stderr?: string; message?: string };
const stdout = (e.stdout ?? "").slice(-6000);
const stderr = (e.stderr ?? "").slice(-4000);
res.json({
success: false,
provider,
lane,
error: e.message ?? String(err),
stdout,
stderr,
});
}
return;
}
const endpoint = process.env.TIP_RUNPOD_ENDPOINT_ID || process.env.RUNPOD_ENDPOINT_ID;
const runpodToken = await secret(["RUNPOD_API_KEY", "TIP_RUNPOD_API_KEY"], ["magatama.runpod.api", "tip.runpod.api"]);
const hfToken = await secret(["HF_TOKEN", "HUGGINGFACE_TOKEN"], ["magatama.huggingface.token", "tip.huggingface.token"]);
if (!endpoint || !runpodToken || !hfToken) {
res.status(409).json({ success: false, error: "RunPod/Hugging Face credentials are incomplete.", runpod_endpoint_configured: Boolean(endpoint), runpod_api_key_configured: Boolean(runpodToken), hf_token_configured: Boolean(hfToken) });
return;
}
try {
if (!readManifest()) await runCommand("npm", ["run", "learning-pool:build"]);
const runId = `${lane}-v${new Date().toISOString().replace(/[-:T.Z]/g, "").slice(0, 12)}`;
const input = runpodInput(lane, seedOnly, maxSteps, runId, hfToken);
const response = await fetch(`https://api.runpod.ai/v2/${endpoint}/run`, {
method: "POST",
headers: { "Content-Type": "application/json", Authorization: `Bearer ${runpodToken}` },
body: JSON.stringify({ input, policy: { executionTimeout: Number(process.env.TIP_RUNPOD_EXECUTION_TIMEOUT || 12 * 60 * 60), ttl: Number(process.env.TIP_RUNPOD_JOB_TTL || 24 * 60 * 60) } }),
});
const body = await response.json().catch(() => ({}));
if (!response.ok) {
res.status(response.status).json({ success: false, error: "RunPod request failed", details: body });
return;
}
res.json({ success: true, provider, lane, seed_only: seedOnly, run_id: runId, dataset: dataset(lane), target_model: input.args.hub_model_id, runpod: body });
} catch (err) {
res.status(500).json({ success: false, error: String(err) });
}
});