Cookbook scheduler + serve: schedule via Tasks, Stop verifies kill, Ollama auto port-pick

- Schedule cookbook serves through the existing ScheduledTask system: the
  serve preset gets a ^ button next to Launch that opens a daily/hourly/
  weekly form mirroring the admin-switch style; the schedule action runs
  action_cookbook_serve, which delegates to /api/model/serve and stamps
  the resulting task with _scheduledStopAtMs. A background
  cookbook_serve_lifecycle loop ticks every 60s and kills any serve
  whose window has ended, also dropping the auto-registered endpoint
  so the model picker doesn't keep pointing at a dead server.
- Stop and remove on a Running serve now awaits the SSH/tmux kill,
  re-checks tmux has-session, and surfaces an error toast (leaving the
  row) when the kill failed. Previously fire-and-forget, so a failed
  SSH/tmux call silently left the live serve running while the row
  vanished from the UI.
- Cookbook tasks/status orphan-adoption sweep no longer requires the
  serve-/cookbook- session-id prefix; any tmux session whose pane is
  running a known model-server process gets auto-pulled into Running.
  Without this loosening, a cookbook-launched serve whose tmux id
  fell back to a bare number was invisible — you couldn't see it,
  let alone stop it.
- Ollama serve always launches a fresh process under cookbook's tmux
  (no more monitor-mode reattach to a systemd/Docker ollama Stop can't
  reach). The handler pre-picks a free port by probing the target
  host over SSH and mutates req.cmd's OLLAMA_HOST so the runner script
  AND the auto-registered endpoint agree on the same bind port.
- Auto-register uses host.docker.internal (when running inside Docker)
  instead of localhost, matching the URL /setup adds for Ollama by
  hand. Local cookbook serves now produce a chat-reachable endpoint
  on first launch.
- Cascade-delete: removing a scheduled cookbook task also deletes any
  linked calendar event (cookbook_task_id marker in the description).
- Tasks list groups cookbook_serve under a "Cookbook" category that
  sorts above the rest, so scheduler-launched serves are easy to find.
This commit is contained in:
pewdiepie-archdaemon
2026-06-05 14:41:43 +09:00
parent f8aaeab245
commit e2f449f4ef
12 changed files with 1434 additions and 67 deletions

View File

@@ -2001,6 +2001,197 @@ async def action_check_email_urgency(owner: str, **kwargs) -> Tuple[str, bool]:
return str(e), False
async def action_cookbook_serve(
owner: str,
task_name: str = "",
progress_cb=None,
command: str = "",
**kwargs,
) -> Tuple[str, bool]:
"""Launch a Cookbook model serve as a scheduled task.
`command` is the JSON config string the task carries in `prompt`,
of shape: {"preset": "name"} OR {"repo_id": "...", "cmd": "...", "host": "..."}.
Optional `end_after_min: N` schedules a hard-stop N minutes after launch
(handled by cookbook_serve_lifecycle_loop in src/cookbook_serve_lifecycle.py).
"""
import json
import time as _time
import httpx
from pathlib import Path
from core.middleware import INTERNAL_TOOL_HEADER, INTERNAL_TOOL_TOKEN
from core.atomic_io import atomic_write_json
headers = {INTERNAL_TOOL_HEADER: INTERNAL_TOOL_TOKEN}
try:
cfg = json.loads(command or "{}")
except Exception:
return f"Invalid JSON config: {command!r}", False
if not isinstance(cfg, dict):
return "Config must be a JSON object", False
# Resolve the preset (if named) OR fall through with explicit fields.
preset_name = (cfg.get("preset") or "").strip()
repo_id = (cfg.get("repo_id") or "").strip()
cmd = (cfg.get("cmd") or "").strip()
host = (cfg.get("host") or cfg.get("remote_host") or "").strip()
try:
end_after_min = int(cfg.get("end_after_min") or 0)
except Exception:
end_after_min = 0
state_path = Path("/app/data/cookbook_state.json")
try:
state = json.loads(state_path.read_text(encoding="utf-8")) if state_path.exists() else {}
except Exception:
state = {}
# Preset lookup. Try three matching strategies in order so the
# schedule still works even when the user's preset is named
# differently from the model's short name:
#
# 1. Exact preset.name == preset_name (case-insensitive)
# 2. preset.model / preset.modelId == repo_id (caller knows the repo)
# 3. preset.model's short name (after final /) == preset_name
#
# Without #2 and #3, scheduling "Qwen3.5-397B-A17B-AWQ" failed when
# the saved preset was named "vllm-qwen-397b" or had the model field
# populated with the full HF repo path. Either should resolve.
def _short(name: str) -> str:
return (name or "").rsplit("/", 1)[-1].lower()
if not cmd or not repo_id:
presets = state.get("presets") or []
chosen = None
# Strategy 1: exact name match.
if preset_name:
chosen = next(
(p for p in presets if isinstance(p, dict)
and (p.get("name") or "").lower() == preset_name.lower()),
None,
)
# Strategy 2: repo_id matches the preset's model field.
if chosen is None and repo_id:
chosen = next(
(p for p in presets if isinstance(p, dict)
and (p.get("model") or p.get("modelId") or "").lower() == repo_id.lower()),
None,
)
# Strategy 3: model's short name matches the preset_name.
if chosen is None and preset_name:
chosen = next(
(p for p in presets if isinstance(p, dict)
and _short(p.get("model") or p.get("modelId") or "") == preset_name.lower()),
None,
)
if chosen is not None:
repo_id = repo_id or chosen.get("model") or chosen.get("modelId") or ""
cmd = cmd or (chosen.get("cmd") or "").strip()
host = host or chosen.get("host") or chosen.get("remoteHost") or ""
if not repo_id or not cmd or cmd.startswith("(adopted"):
# Surface what we tried so the user can name their preset to match.
preset_names = [(p.get("name") or "") for p in (state.get("presets") or []) if isinstance(p, dict)]
hint = f" Saved presets: {preset_names!r}" if preset_names else ""
return (f"No launchable config for {preset_name!r} (repo_id={repo_id!r}). "
f"Check Cookbook → Presets has a real cmd, not 'adopted'.{hint}", False)
# Resolve env_prefix etc. from the host's saved cookbook server entry,
# matching the chat agent's serve_model path.
body = {"repo_id": repo_id, "cmd": cmd}
if host:
body["remote_host"] = host
env = (state.get("env") or {})
srv = next(
(s for s in (env.get("servers") or [])
if isinstance(s, dict) and (s.get("host") == host or s.get("name") == host)),
{},
)
if srv.get("env") == "venv" and srv.get("envPath"):
body["env_prefix"] = f"source {srv['envPath']}/bin/activate"
elif srv.get("env") == "conda" and srv.get("envPath"):
body["env_prefix"] = f"conda activate {srv['envPath']}"
if srv.get("hfToken"): body["hf_token"] = srv["hfToken"]
if srv.get("port"): body["ssh_port"] = str(srv["port"])
if srv.get("platform"): body["platform"] = srv["platform"]
try:
async with httpx.AsyncClient(timeout=30) as client:
r = await client.post("http://localhost:7000/api/model/serve",
json=body, headers=headers)
data = r.json() if r.content else {}
except Exception as e:
return f"Launch HTTP failed: {e}", False
if not data.get("ok"):
return f"Launch rejected: {data.get('error') or data.get('detail') or 'unknown'}", False
sid = data.get("session_id") or ""
# Register the new task in cookbook_state.json + stamp it with our
# scheduler-owner markers. /api/model/serve spawns the tmux session
# but leaves the state-write to the UI — when a scheduled action
# launches a serve from server-side, NOBODY writes the task into
# state, so the Cookbook tab never shows it. We do the write here.
if sid:
try:
# Re-read fresh (the route may have updated state already).
try:
fresh = json.loads(state_path.read_text(encoding="utf-8"))
except Exception:
fresh = {}
if not isinstance(fresh, dict):
fresh = {}
tasks = fresh.get("tasks") if isinstance(fresh.get("tasks"), list) else []
existing = next(
(t for t in tasks if isinstance(t, dict) and t.get("sessionId") == sid),
None,
)
if existing is None:
display_name = repo_id.split("/")[-1] if "/" in repo_id else repo_id
placeholder = (
f"Launched by scheduled task {task_name!r} — waiting for tmux output…\n"
f" session: {sid}\n"
f" target: {host or 'local'}\n"
f" cmd: {cmd[:200]}{'' if len(cmd) > 200 else ''}"
)
existing = {
"id": sid,
"sessionId": sid,
"name": display_name,
"modelId": repo_id,
"type": "serve",
"status": "running",
"output": placeholder,
"ts": int(_time.time() * 1000),
"payload": {"repo_id": repo_id, "remote_host": host or "", "_cmd": cmd},
"remoteHost": host or "",
"sshPort": "",
"platform": "linux",
"_serveReady": False,
"_endpointAdded": False,
}
tasks.append(existing)
# Stamp ownership + end-at on the task entry.
existing["_scheduledByTask"] = task_name or ""
existing["_scheduledByOwner"] = owner or ""
if end_after_min > 0:
existing["_scheduledStopAtMs"] = int(_time.time() * 1000) + end_after_min * 60 * 1000
fresh["tasks"] = tasks
atomic_write_json(state_path, fresh)
except Exception as e:
logger.warning(f"cookbook_serve: state register/stamp failed: {e}")
# Don't try to render absolute clock time in the message — the
# server runs in UTC (Docker default), the user reads it as local,
# and the offset depends on the user's TZ which the action doesn't
# have a reliable handle on. The Tasks UI already shows the RUN
# timestamp in the user's local time right above this message, so
# "stops 8 min after that" gives the user everything they need.
if end_after_min:
return (
f"Launched {repo_id} (session {sid}); stops {end_after_min} min after this ran",
True,
)
return f"Launched {repo_id} (session {sid})", True
BUILTIN_ACTIONS = {
"tidy_sessions": action_tidy_sessions,
"tidy_documents": action_tidy_documents,
@@ -2020,6 +2211,7 @@ BUILTIN_ACTIONS = {
"test_skills": action_test_skills,
"audit_skills": action_audit_skills,
"check_email_urgency": action_check_email_urgency,
"cookbook_serve": action_cookbook_serve,
# ping_notes removed from the registry — runs only inside `_note_pings_loop`.
}

View File

@@ -0,0 +1,193 @@
"""Cookbook serve lifecycle: kills scheduler-owned serves whose end-of-
window has passed.
Pairs with action_cookbook_serve in builtin_actions.py — that action
stamps the task it launches with `_scheduledStopAtMs`, this loop ticks
every 60s and kills any serve whose stamp is in the past.
Single small module. Delete this file + the registration line in app.py
and the feature stops doing anything; scheduler-launched serves just
stay up until the user kills them manually.
"""
from __future__ import annotations
import asyncio
import json
import logging
import time
from pathlib import Path
import httpx
logger = logging.getLogger(__name__)
def _internal_headers() -> dict:
from core.middleware import INTERNAL_TOOL_HEADER, INTERNAL_TOOL_TOKEN
return {INTERNAL_TOOL_HEADER: INTERNAL_TOOL_TOKEN}
async def _delete_endpoint_for_task(task: dict) -> None:
"""Drop the auto-registered model endpoint for a scheduled-stop serve.
Without this, killing the tmux session leaves the endpoint sitting in
the picker (probe goes offline; chats still try to route there) and
the user has to delete it by hand in Settings -> Endpoints.
"""
import re as _re
payload = task.get("payload") or {}
cmd = str(payload.get("_cmd") or "")
remote = task.get("remoteHost") or ""
# Build host the same way _auto_register_llm_endpoint does so URL match wins.
if remote:
host = remote.split("@")[-1] if "@" in remote else remote
else:
host = "host.docker.internal"
port_match = _re.search(r"--port\s+(\d+)", cmd)
ollama_host_match = _re.search(r"OLLAMA_HOST=[^\s]*?:(\d+)", cmd)
if port_match:
port = int(port_match.group(1))
elif ollama_host_match:
port = int(ollama_host_match.group(1))
elif "ollama" in cmd:
port = 11434
else:
port = 8080
base_url = f"http://{host}:{port}/v1"
try:
async with httpx.AsyncClient(timeout=8) as client:
r = await client.get(
"http://localhost:7000/api/model-endpoints",
headers=_internal_headers(),
)
if r.status_code >= 400:
return
eps = r.json() if r.content else []
# Prefer exact URL match; fall back to host:port substring so we
# still catch the case where 0.0.0.0 vs the registered host
# representation diverged.
ep = next((e for e in eps if e.get("base_url") == base_url), None)
if not ep:
hostport = f"{host}:{port}"
ep = next((e for e in eps if hostport in (e.get("base_url") or "")), None)
if ep:
await client.delete(
f"http://localhost:7000/api/model-endpoints/{ep['id']}",
headers=_internal_headers(),
)
logger.info(
f"cookbook_serve_lifecycle: deleted endpoint {ep.get('id')} "
f"({ep.get('base_url')}) after scheduled stop"
)
except Exception as e:
logger.warning(f"cookbook_serve_lifecycle: endpoint delete failed: {e}")
async def _stop_serve(session_id: str, remote_host: str = "", ssh_port: str = "") -> bool:
"""Kill the tmux session that hosts the serve.
There's no `/api/model/stop` route — the cookbook UI and the chat
agent both kill via `/api/shell/exec` running a `tmux kill-session`
(wrapped in ssh for remote hosts). Mirror that here so the
lifecycle loop can actually stop scheduler-launched serves at
window-end. Without this, the action stamped `_scheduledStopAtMs`
correctly but every kill attempt failed silently (the route
returned 404 and the result was logged as "failed").
"""
import shlex
if remote_host:
port_flag = f"-p {shlex.quote(str(ssh_port))} " if ssh_port and str(ssh_port) != "22" else ""
cmd = (
f"ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no "
f"{port_flag}{shlex.quote(remote_host)} "
f"'tmux kill-session -t {shlex.quote(session_id)}'"
)
else:
cmd = f"tmux kill-session -t {shlex.quote(session_id)}"
try:
async with httpx.AsyncClient(timeout=15) as client:
r = await client.post(
"http://localhost:7000/api/shell/exec",
json={"command": cmd},
headers=_internal_headers(),
)
if r.status_code >= 400:
return False
data = r.json() if r.content else {}
ec = data.get("exit_code")
# tmux returns non-zero when the session is already gone
# ("can't find session: ..."). That's still "stop succeeded"
# from our POV — the goal is no live session at the end.
if ec in (None, 0):
return True
stderr = (data.get("stderr") or "").lower()
return "no server" in stderr or "can't find session" in stderr or "session not found" in stderr
except Exception as e:
logger.warning(f"cookbook_serve_lifecycle: stop {session_id} failed: {e}")
return False
async def _tick() -> None:
state_path = Path("/app/data/cookbook_state.json")
if not state_path.exists():
return
try:
state = json.loads(state_path.read_text(encoding="utf-8"))
except Exception:
return
tasks = state.get("tasks") or []
now_ms = int(time.time() * 1000)
to_stop = []
for t in tasks:
if not isinstance(t, dict):
continue
stop_at = t.get("_scheduledStopAtMs")
if not isinstance(stop_at, (int, float)):
continue
if stop_at > now_ms:
continue
if (t.get("status") or "").lower() in {"stopped", "ended", "killed", "crashed"}:
continue
sid = t.get("sessionId") or t.get("id")
if not sid:
continue
to_stop.append((sid, t.get("remoteHost") or "", t.get("sshPort") or ""))
if not to_stop:
return
# Re-read state once before writing so we capture any updates from
# concurrent UI syncs.
stopped_any = False
for sid, host, port in to_stop:
ok = await _stop_serve(sid, host, port)
logger.info(f"cookbook_serve_lifecycle: stop {sid} (host={host or 'local'}): {'ok' if ok else 'failed'}")
if ok:
stopped_any = True
# Drop the auto-registered endpoint so the model picker and
# the chat router don't keep pointing at a dead server.
for t in tasks:
if isinstance(t, dict) and (t.get("sessionId") == sid or t.get("id") == sid):
if t.get("type") == "serve":
await _delete_endpoint_for_task(t)
t["status"] = "stopped"
t["_scheduledStopAtMs"] = None
t["_lastStatusFlipAt"] = now_ms
break
if stopped_any:
try:
from core.atomic_io import atomic_write_json
state["tasks"] = tasks
atomic_write_json(state_path, state)
except Exception as e:
logger.warning(f"cookbook_serve_lifecycle: state write failed: {e}")
async def cookbook_serve_lifecycle_loop() -> None:
"""Forever-loop. Registered as a startup task in app.py."""
await asyncio.sleep(20) # let the rest of startup settle
while True:
try:
await _tick()
except Exception as e:
logger.warning(f"cookbook_serve_lifecycle tick failed: {e}")
await asyncio.sleep(60)

View File

@@ -1018,6 +1018,10 @@ class TaskScheduler:
kwargs = {"owner": task.owner, "task_name": task.name, "progress_cb": _progress}
if task.action in ("run_script", "run_local", "ssh_command") and task.prompt:
kwargs["script" if task.action in ("run_script", "run_local") else "command"] = task.prompt
# cookbook_serve carries its JSON config in task.prompt — feed it
# through as `command` so action_cookbook_serve can json.loads it.
elif task.action == "cookbook_serve" and task.prompt:
kwargs["command"] = task.prompt
result, success = await action_fn(**kwargs)
return result, success
except TaskNoop: