Merge branch 'main' into dev
Bring main's maintainer-curated work (cookbook scheduler, calendar rendering/sync, settings polish, agent debug loop) into dev so dev is a superset of main (resolves the dev/main drift, #2543).
This commit is contained in:
@@ -337,6 +337,7 @@ If the user asks for a reminder/alarm before the event, pass `reminder_minutes`
|
||||
"ui_control": "- ```ui_control``` — Control the UI: toggle tools on/off, OPEN PANELS, open email reply drafts, switch models, change themes. Commands: `toggle <name> on/off` (names: bash/shell, web/search, research, incognito, document_editor/documents), `open_panel <name>` (panels: documents, gallery, email, sessions, notes, memories/brain, skills, settings, cookbook), `open_email_reply <uid> <folder> <reply|reply-all|ai-reply>` (opens an email compose document, does NOT send), `set_mode agent/chat`, `switch_model <name>`, `set_theme <preset>`, `create_theme <name> <bg> <fg> <panel> <border> <accent>` (optional key=val for advanced colors AND background effects: bgPattern=<none|dots|synapse|rain|constellations|perlin-flow|petals|sparkles|embers>, bgEffectColor=#RRGGBB, bgEffectIntensity=<num>, bgEffectSize=<num>, frosted=true|false). \"open documents\" / \"open library\" / \"show gallery\" / \"open inbox\" / \"open notes\" / \"open cookbook\" all map to `open_panel <name>`. Theme presets: dark, light, midnight, paper, cyberpunk, retrowave, forest, ocean, ume, copper, terminal, organs, lavender, gpt, claude, cute.",
|
||||
"list_served_models": "- ```list_served_models``` — Show what the Cookbook (LLM-serving subsystem) is currently running. NO args. Use this for ANY 'what's running' / 'what's serving' / 'show my cookbook' / 'is anything up' query. DO NOT shell out (`ps aux`, `docker ps`, etc.) — this tool is the source of truth. Failed serve tasks include recent logs plus diagnosis/retry suggestions; use those suggestions to call `serve_model` again with an adjusted command when appropriate.",
|
||||
"stop_served_model": "- ```stop_served_model``` — Stop a running model server. Args (JSON): {\"session_id\": \"<from list_served_models>\"}. Use for 'kill my cookbook' / 'stop the model' / 'shut down vLLM'.",
|
||||
"tail_serve_output": "- ```tail_serve_output``` — Read the actual tmux stderr/traceback of a CURRENTLY failing cookbook task. Args (JSON): {\"session_id\": \"<from list_served_models>\", \"tail\": 150?}. **Use ONLY after** you just launched something via `serve_model` AND `list_served_models` reports YOUR new task as `crashed`/`error`. DO NOT use it on old stopped/completed download tasks (they're historical noise — won't predict whether a new launch succeeds). DO NOT call it before launching a fresh attempt. When you do call it, bump `tail` to 400+ only if the visible error references 'see root cause above'.",
|
||||
"download_model": "- ```download_model``` — Download a HuggingFace model. Args (JSON): {\"repo_id\": \"Qwen/Qwen3-8B\", \"host\": \"user@gpu-box\"?, \"include\": \"*Q4_K_M*\"?}.",
|
||||
"serve_model": "- ```serve_model``` — Start serving a model with vLLM / SGLang / llama.cpp / Ollama / Diffusers. Args (JSON): {\"repo_id\": \"...\", \"cmd\": \"vllm serve ... --port 8000\" or \"python3 -m sglang.launch_server ... --port 30000\" or \"python3 scripts/diffusion_server.py --model diffusers/stable-diffusion-xl-1.0-inpainting-0.1 --port 8100\", \"host\": \"user@gpu-box\"?}. For image/inpaint/diffusion models, use the `scripts/diffusion_server.py` command exactly. After launch, call `list_served_models`; if it returns a diagnosis with an adjusted command, retry with that command.",
|
||||
"list_downloads": "- ```list_downloads``` — Show in-progress HuggingFace model downloads (filters Cookbook tasks/status to downloads only). NO args. Use for 'what's downloading' / 'show my downloads' / 'check download progress'.",
|
||||
@@ -1659,6 +1660,28 @@ async def stream_agent_loop(
|
||||
_tool_type_counts: collections.Counter = collections.Counter()
|
||||
_THINK_RE = re.compile(r'<think>.*?</think>', re.DOTALL | re.IGNORECASE)
|
||||
_force_answer = False # set by loop-breaker → next round runs with NO tools
|
||||
# Supervisor: how many times we've nudged the model after it announced
|
||||
# an action without emitting the tool call. Capped to prevent a model
|
||||
# that *can't* call the tool from looping forever.
|
||||
_intent_nudge_count = 0
|
||||
_MAX_INTENT_NUDGES = 2
|
||||
|
||||
# "I said I would, then didn't" detector. The pattern that breaks debug
|
||||
# loops on weak models (deepseek-v4-flash mid-2026): the model writes
|
||||
# "Let me tail the output to see the error" and then ends the turn with
|
||||
# no tool_calls. The intent is sincere but the function call gets dropped.
|
||||
# Match the common phrasings + an action verb that maps to an available
|
||||
# tool, so we don't nudge on harmless transitional text like "let me
|
||||
# know what you think".
|
||||
_INTENT_RE = re.compile(
|
||||
r"(?:^|\n)\s*(?:let me|i'?ll|i will|going to|let's)\s+"
|
||||
r"(?:tail|check|investigate|look at|see|tail|read|fetch|inspect|"
|
||||
r"verify|diagnose|examine|debug|capture|grab|pull|view|run|call|"
|
||||
r"trigger|launch|start|kick off|stop|kill|restart|adopt|serve|"
|
||||
r"register|adopt|list|search|find|query|hit|ping|test)"
|
||||
r"\b[^.\n]{0,140}",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
# Document streaming state (persists across rounds)
|
||||
_doc_acc = "" # accumulated tool-call JSON arguments
|
||||
@@ -2010,6 +2033,46 @@ async def stream_agent_loop(
|
||||
# never re-verify an unchanged state in a loop.
|
||||
_effectful_used = False
|
||||
continue
|
||||
# ── Intent-without-action supervisor ─────────────────────
|
||||
# Catch "Let me tail the output" / "I'll check the logs" /
|
||||
# "Let me investigate" patterns where the model announces an
|
||||
# action but emits no tool_call. The bug shows up most on
|
||||
# smaller models trained to verbalize plans before acting.
|
||||
# We inject one sharp nudge ("you said you would X — call the
|
||||
# actual tool now") and loop again. Capped at
|
||||
# _MAX_INTENT_NUDGES so a model that genuinely cannot use the
|
||||
# tool doesn't pin us in a forever loop.
|
||||
_intent_text = _THINK_RE.sub("", cleaned_round).strip()
|
||||
_intent_match = _INTENT_RE.search(_intent_text) if _intent_text else None
|
||||
# Only nudge when the round REALLY looks like an unfinished
|
||||
# promise: short response (<400 chars), no fenced code/answer,
|
||||
# and an action-intent phrase was matched. Long answers that
|
||||
# happen to contain "let me know" are not stalls.
|
||||
_looks_like_promise = (
|
||||
_intent_match is not None
|
||||
and len(_intent_text) < 400
|
||||
and "```" not in _intent_text
|
||||
and _intent_nudge_count < _MAX_INTENT_NUDGES
|
||||
)
|
||||
if _looks_like_promise:
|
||||
_intent_nudge_count += 1
|
||||
_matched_phrase = _intent_match.group(0).strip()
|
||||
logger.info(f"[agent] intent-without-action nudge #{_intent_nudge_count} on round {round_num}: {_matched_phrase!r}")
|
||||
messages.append({
|
||||
"role": "system",
|
||||
"content": (
|
||||
f"You just wrote: \"{_matched_phrase}\" — but ended the "
|
||||
"turn without making the actual tool call. The user can "
|
||||
"see you announced the action but didn't run it, which "
|
||||
"is the most frustrating thing you can do. "
|
||||
"DO IT NOW: emit the actual function call this turn. "
|
||||
"If you decided not to do it after all, say so plainly in "
|
||||
"one sentence instead of restating the plan."
|
||||
),
|
||||
})
|
||||
# Visible signal in the stream so the user knows we caught it.
|
||||
yield f'data: {json.dumps({"type": "agent_step", "round": round_num + 1})}\n\n'
|
||||
continue
|
||||
break # no tools — done
|
||||
|
||||
# ── Loop-breaker (Terminus-style stall detector) ──────────────
|
||||
@@ -2285,6 +2348,19 @@ async def stream_agent_loop(
|
||||
_anchor = f"\n\n[Open in Deep Research](#research-{_rsid})\n"
|
||||
yield 'data: ' + json.dumps({"delta": _anchor}) + '\n\n'
|
||||
|
||||
# Same pattern for notes: when manage_notes creates a note
|
||||
# and returns note_id, drop a `[View note](#note-<id>)` link
|
||||
# into the stream so chatRenderer's click handler routes to
|
||||
# the new openNote() in notes.js — opens the notes panel and
|
||||
# scrolls/flashes the matching card. Without this, the agent
|
||||
# would write "View note" as a phrase with no target.
|
||||
_nid = result.get("note_id")
|
||||
if _nid and block.tool_type == "manage_notes":
|
||||
_title = (result.get("note_title") or "").strip()
|
||||
_label = f"View note: {_title}" if _title else "View note"
|
||||
_anchor = f"\n\n[{_label}](#note-{_nid})\n"
|
||||
yield 'data: ' + json.dumps({"delta": _anchor}) + '\n\n'
|
||||
|
||||
# Save for history persistence
|
||||
tool_event = {
|
||||
"round": round_num,
|
||||
|
||||
@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants (kept here — sub-modules import from here)
|
||||
# ---------------------------------------------------------------------------
|
||||
MAX_AGENT_ROUNDS = 20
|
||||
MAX_AGENT_ROUNDS = 50
|
||||
SHELL_TIMEOUT = 60
|
||||
PYTHON_TIMEOUT = 30
|
||||
MAX_OUTPUT_CHARS = 10_000
|
||||
|
||||
@@ -2001,6 +2001,197 @@ async def action_check_email_urgency(owner: str, **kwargs) -> Tuple[str, bool]:
|
||||
return str(e), False
|
||||
|
||||
|
||||
async def action_cookbook_serve(
|
||||
owner: str,
|
||||
task_name: str = "",
|
||||
progress_cb=None,
|
||||
command: str = "",
|
||||
**kwargs,
|
||||
) -> Tuple[str, bool]:
|
||||
"""Launch a Cookbook model serve as a scheduled task.
|
||||
|
||||
`command` is the JSON config string the task carries in `prompt`,
|
||||
of shape: {"preset": "name"} OR {"repo_id": "...", "cmd": "...", "host": "..."}.
|
||||
Optional `end_after_min: N` schedules a hard-stop N minutes after launch
|
||||
(handled by cookbook_serve_lifecycle_loop in src/cookbook_serve_lifecycle.py).
|
||||
"""
|
||||
import json
|
||||
import time as _time
|
||||
import httpx
|
||||
from pathlib import Path
|
||||
from core.middleware import INTERNAL_TOOL_HEADER, INTERNAL_TOOL_TOKEN
|
||||
from core.atomic_io import atomic_write_json
|
||||
|
||||
headers = {INTERNAL_TOOL_HEADER: INTERNAL_TOOL_TOKEN}
|
||||
try:
|
||||
cfg = json.loads(command or "{}")
|
||||
except Exception:
|
||||
return f"Invalid JSON config: {command!r}", False
|
||||
if not isinstance(cfg, dict):
|
||||
return "Config must be a JSON object", False
|
||||
|
||||
# Resolve the preset (if named) OR fall through with explicit fields.
|
||||
preset_name = (cfg.get("preset") or "").strip()
|
||||
repo_id = (cfg.get("repo_id") or "").strip()
|
||||
cmd = (cfg.get("cmd") or "").strip()
|
||||
host = (cfg.get("host") or cfg.get("remote_host") or "").strip()
|
||||
try:
|
||||
end_after_min = int(cfg.get("end_after_min") or 0)
|
||||
except Exception:
|
||||
end_after_min = 0
|
||||
|
||||
state_path = Path("/app/data/cookbook_state.json")
|
||||
try:
|
||||
state = json.loads(state_path.read_text(encoding="utf-8")) if state_path.exists() else {}
|
||||
except Exception:
|
||||
state = {}
|
||||
|
||||
# Preset lookup. Try three matching strategies in order so the
|
||||
# schedule still works even when the user's preset is named
|
||||
# differently from the model's short name:
|
||||
#
|
||||
# 1. Exact preset.name == preset_name (case-insensitive)
|
||||
# 2. preset.model / preset.modelId == repo_id (caller knows the repo)
|
||||
# 3. preset.model's short name (after final /) == preset_name
|
||||
#
|
||||
# Without #2 and #3, scheduling "Qwen3.5-397B-A17B-AWQ" failed when
|
||||
# the saved preset was named "vllm-qwen-397b" or had the model field
|
||||
# populated with the full HF repo path. Either should resolve.
|
||||
def _short(name: str) -> str:
|
||||
return (name or "").rsplit("/", 1)[-1].lower()
|
||||
|
||||
if not cmd or not repo_id:
|
||||
presets = state.get("presets") or []
|
||||
chosen = None
|
||||
# Strategy 1: exact name match.
|
||||
if preset_name:
|
||||
chosen = next(
|
||||
(p for p in presets if isinstance(p, dict)
|
||||
and (p.get("name") or "").lower() == preset_name.lower()),
|
||||
None,
|
||||
)
|
||||
# Strategy 2: repo_id matches the preset's model field.
|
||||
if chosen is None and repo_id:
|
||||
chosen = next(
|
||||
(p for p in presets if isinstance(p, dict)
|
||||
and (p.get("model") or p.get("modelId") or "").lower() == repo_id.lower()),
|
||||
None,
|
||||
)
|
||||
# Strategy 3: model's short name matches the preset_name.
|
||||
if chosen is None and preset_name:
|
||||
chosen = next(
|
||||
(p for p in presets if isinstance(p, dict)
|
||||
and _short(p.get("model") or p.get("modelId") or "") == preset_name.lower()),
|
||||
None,
|
||||
)
|
||||
if chosen is not None:
|
||||
repo_id = repo_id or chosen.get("model") or chosen.get("modelId") or ""
|
||||
cmd = cmd or (chosen.get("cmd") or "").strip()
|
||||
host = host or chosen.get("host") or chosen.get("remoteHost") or ""
|
||||
if not repo_id or not cmd or cmd.startswith("(adopted"):
|
||||
# Surface what we tried so the user can name their preset to match.
|
||||
preset_names = [(p.get("name") or "") for p in (state.get("presets") or []) if isinstance(p, dict)]
|
||||
hint = f" Saved presets: {preset_names!r}" if preset_names else ""
|
||||
return (f"No launchable config for {preset_name!r} (repo_id={repo_id!r}). "
|
||||
f"Check Cookbook → Presets has a real cmd, not 'adopted'.{hint}", False)
|
||||
|
||||
# Resolve env_prefix etc. from the host's saved cookbook server entry,
|
||||
# matching the chat agent's serve_model path.
|
||||
body = {"repo_id": repo_id, "cmd": cmd}
|
||||
if host:
|
||||
body["remote_host"] = host
|
||||
env = (state.get("env") or {})
|
||||
srv = next(
|
||||
(s for s in (env.get("servers") or [])
|
||||
if isinstance(s, dict) and (s.get("host") == host or s.get("name") == host)),
|
||||
{},
|
||||
)
|
||||
if srv.get("env") == "venv" and srv.get("envPath"):
|
||||
body["env_prefix"] = f"source {srv['envPath']}/bin/activate"
|
||||
elif srv.get("env") == "conda" and srv.get("envPath"):
|
||||
body["env_prefix"] = f"conda activate {srv['envPath']}"
|
||||
if srv.get("hfToken"): body["hf_token"] = srv["hfToken"]
|
||||
if srv.get("port"): body["ssh_port"] = str(srv["port"])
|
||||
if srv.get("platform"): body["platform"] = srv["platform"]
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
r = await client.post("http://localhost:7000/api/model/serve",
|
||||
json=body, headers=headers)
|
||||
data = r.json() if r.content else {}
|
||||
except Exception as e:
|
||||
return f"Launch HTTP failed: {e}", False
|
||||
if not data.get("ok"):
|
||||
return f"Launch rejected: {data.get('error') or data.get('detail') or 'unknown'}", False
|
||||
|
||||
sid = data.get("session_id") or ""
|
||||
# Register the new task in cookbook_state.json + stamp it with our
|
||||
# scheduler-owner markers. /api/model/serve spawns the tmux session
|
||||
# but leaves the state-write to the UI — when a scheduled action
|
||||
# launches a serve from server-side, NOBODY writes the task into
|
||||
# state, so the Cookbook tab never shows it. We do the write here.
|
||||
if sid:
|
||||
try:
|
||||
# Re-read fresh (the route may have updated state already).
|
||||
try:
|
||||
fresh = json.loads(state_path.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
fresh = {}
|
||||
if not isinstance(fresh, dict):
|
||||
fresh = {}
|
||||
tasks = fresh.get("tasks") if isinstance(fresh.get("tasks"), list) else []
|
||||
existing = next(
|
||||
(t for t in tasks if isinstance(t, dict) and t.get("sessionId") == sid),
|
||||
None,
|
||||
)
|
||||
if existing is None:
|
||||
display_name = repo_id.split("/")[-1] if "/" in repo_id else repo_id
|
||||
placeholder = (
|
||||
f"Launched by scheduled task {task_name!r} — waiting for tmux output…\n"
|
||||
f" session: {sid}\n"
|
||||
f" target: {host or 'local'}\n"
|
||||
f" cmd: {cmd[:200]}{'…' if len(cmd) > 200 else ''}"
|
||||
)
|
||||
existing = {
|
||||
"id": sid,
|
||||
"sessionId": sid,
|
||||
"name": display_name,
|
||||
"modelId": repo_id,
|
||||
"type": "serve",
|
||||
"status": "running",
|
||||
"output": placeholder,
|
||||
"ts": int(_time.time() * 1000),
|
||||
"payload": {"repo_id": repo_id, "remote_host": host or "", "_cmd": cmd},
|
||||
"remoteHost": host or "",
|
||||
"sshPort": "",
|
||||
"platform": "linux",
|
||||
"_serveReady": False,
|
||||
"_endpointAdded": False,
|
||||
}
|
||||
tasks.append(existing)
|
||||
# Stamp ownership + end-at on the task entry.
|
||||
existing["_scheduledByTask"] = task_name or ""
|
||||
existing["_scheduledByOwner"] = owner or ""
|
||||
if end_after_min > 0:
|
||||
existing["_scheduledStopAtMs"] = int(_time.time() * 1000) + end_after_min * 60 * 1000
|
||||
fresh["tasks"] = tasks
|
||||
atomic_write_json(state_path, fresh)
|
||||
except Exception as e:
|
||||
logger.warning(f"cookbook_serve: state register/stamp failed: {e}")
|
||||
# Don't try to render absolute clock time in the message — the
|
||||
# server runs in UTC (Docker default), the user reads it as local,
|
||||
# and the offset depends on the user's TZ which the action doesn't
|
||||
# have a reliable handle on. The Tasks UI already shows the RUN
|
||||
# timestamp in the user's local time right above this message, so
|
||||
# "stops 8 min after that" gives the user everything they need.
|
||||
if end_after_min:
|
||||
return (
|
||||
f"Launched {repo_id} (session {sid}); stops {end_after_min} min after this ran",
|
||||
True,
|
||||
)
|
||||
return f"Launched {repo_id} (session {sid})", True
|
||||
|
||||
|
||||
BUILTIN_ACTIONS = {
|
||||
"tidy_sessions": action_tidy_sessions,
|
||||
"tidy_documents": action_tidy_documents,
|
||||
@@ -2020,6 +2211,7 @@ BUILTIN_ACTIONS = {
|
||||
"test_skills": action_test_skills,
|
||||
"audit_skills": action_audit_skills,
|
||||
"check_email_urgency": action_check_email_urgency,
|
||||
"cookbook_serve": action_cookbook_serve,
|
||||
# ping_notes removed from the registry — runs only inside `_note_pings_loop`.
|
||||
}
|
||||
|
||||
|
||||
193
src/cookbook_serve_lifecycle.py
Normal file
193
src/cookbook_serve_lifecycle.py
Normal file
@@ -0,0 +1,193 @@
|
||||
"""Cookbook serve lifecycle: kills scheduler-owned serves whose end-of-
|
||||
window has passed.
|
||||
|
||||
Pairs with action_cookbook_serve in builtin_actions.py — that action
|
||||
stamps the task it launches with `_scheduledStopAtMs`, this loop ticks
|
||||
every 60s and kills any serve whose stamp is in the past.
|
||||
|
||||
Single small module. Delete this file + the registration line in app.py
|
||||
and the feature stops doing anything; scheduler-launched serves just
|
||||
stay up until the user kills them manually.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _internal_headers() -> dict:
|
||||
from core.middleware import INTERNAL_TOOL_HEADER, INTERNAL_TOOL_TOKEN
|
||||
return {INTERNAL_TOOL_HEADER: INTERNAL_TOOL_TOKEN}
|
||||
|
||||
|
||||
async def _delete_endpoint_for_task(task: dict) -> None:
|
||||
"""Drop the auto-registered model endpoint for a scheduled-stop serve.
|
||||
|
||||
Without this, killing the tmux session leaves the endpoint sitting in
|
||||
the picker (probe goes offline; chats still try to route there) and
|
||||
the user has to delete it by hand in Settings -> Endpoints.
|
||||
"""
|
||||
import re as _re
|
||||
payload = task.get("payload") or {}
|
||||
cmd = str(payload.get("_cmd") or "")
|
||||
remote = task.get("remoteHost") or ""
|
||||
# Build host the same way _auto_register_llm_endpoint does so URL match wins.
|
||||
if remote:
|
||||
host = remote.split("@")[-1] if "@" in remote else remote
|
||||
else:
|
||||
host = "host.docker.internal"
|
||||
port_match = _re.search(r"--port\s+(\d+)", cmd)
|
||||
ollama_host_match = _re.search(r"OLLAMA_HOST=[^\s]*?:(\d+)", cmd)
|
||||
if port_match:
|
||||
port = int(port_match.group(1))
|
||||
elif ollama_host_match:
|
||||
port = int(ollama_host_match.group(1))
|
||||
elif "ollama" in cmd:
|
||||
port = 11434
|
||||
else:
|
||||
port = 8080
|
||||
base_url = f"http://{host}:{port}/v1"
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=8) as client:
|
||||
r = await client.get(
|
||||
"http://localhost:7000/api/model-endpoints",
|
||||
headers=_internal_headers(),
|
||||
)
|
||||
if r.status_code >= 400:
|
||||
return
|
||||
eps = r.json() if r.content else []
|
||||
# Prefer exact URL match; fall back to host:port substring so we
|
||||
# still catch the case where 0.0.0.0 vs the registered host
|
||||
# representation diverged.
|
||||
ep = next((e for e in eps if e.get("base_url") == base_url), None)
|
||||
if not ep:
|
||||
hostport = f"{host}:{port}"
|
||||
ep = next((e for e in eps if hostport in (e.get("base_url") or "")), None)
|
||||
if ep:
|
||||
await client.delete(
|
||||
f"http://localhost:7000/api/model-endpoints/{ep['id']}",
|
||||
headers=_internal_headers(),
|
||||
)
|
||||
logger.info(
|
||||
f"cookbook_serve_lifecycle: deleted endpoint {ep.get('id')} "
|
||||
f"({ep.get('base_url')}) after scheduled stop"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"cookbook_serve_lifecycle: endpoint delete failed: {e}")
|
||||
|
||||
|
||||
async def _stop_serve(session_id: str, remote_host: str = "", ssh_port: str = "") -> bool:
|
||||
"""Kill the tmux session that hosts the serve.
|
||||
|
||||
There's no `/api/model/stop` route — the cookbook UI and the chat
|
||||
agent both kill via `/api/shell/exec` running a `tmux kill-session`
|
||||
(wrapped in ssh for remote hosts). Mirror that here so the
|
||||
lifecycle loop can actually stop scheduler-launched serves at
|
||||
window-end. Without this, the action stamped `_scheduledStopAtMs`
|
||||
correctly but every kill attempt failed silently (the route
|
||||
returned 404 and the result was logged as "failed").
|
||||
"""
|
||||
import shlex
|
||||
if remote_host:
|
||||
port_flag = f"-p {shlex.quote(str(ssh_port))} " if ssh_port and str(ssh_port) != "22" else ""
|
||||
cmd = (
|
||||
f"ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no "
|
||||
f"{port_flag}{shlex.quote(remote_host)} "
|
||||
f"'tmux kill-session -t {shlex.quote(session_id)}'"
|
||||
)
|
||||
else:
|
||||
cmd = f"tmux kill-session -t {shlex.quote(session_id)}"
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
r = await client.post(
|
||||
"http://localhost:7000/api/shell/exec",
|
||||
json={"command": cmd},
|
||||
headers=_internal_headers(),
|
||||
)
|
||||
if r.status_code >= 400:
|
||||
return False
|
||||
data = r.json() if r.content else {}
|
||||
ec = data.get("exit_code")
|
||||
# tmux returns non-zero when the session is already gone
|
||||
# ("can't find session: ..."). That's still "stop succeeded"
|
||||
# from our POV — the goal is no live session at the end.
|
||||
if ec in (None, 0):
|
||||
return True
|
||||
stderr = (data.get("stderr") or "").lower()
|
||||
return "no server" in stderr or "can't find session" in stderr or "session not found" in stderr
|
||||
except Exception as e:
|
||||
logger.warning(f"cookbook_serve_lifecycle: stop {session_id} failed: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def _tick() -> None:
|
||||
state_path = Path("/app/data/cookbook_state.json")
|
||||
if not state_path.exists():
|
||||
return
|
||||
try:
|
||||
state = json.loads(state_path.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
return
|
||||
tasks = state.get("tasks") or []
|
||||
now_ms = int(time.time() * 1000)
|
||||
to_stop = []
|
||||
for t in tasks:
|
||||
if not isinstance(t, dict):
|
||||
continue
|
||||
stop_at = t.get("_scheduledStopAtMs")
|
||||
if not isinstance(stop_at, (int, float)):
|
||||
continue
|
||||
if stop_at > now_ms:
|
||||
continue
|
||||
if (t.get("status") or "").lower() in {"stopped", "ended", "killed", "crashed"}:
|
||||
continue
|
||||
sid = t.get("sessionId") or t.get("id")
|
||||
if not sid:
|
||||
continue
|
||||
to_stop.append((sid, t.get("remoteHost") or "", t.get("sshPort") or ""))
|
||||
if not to_stop:
|
||||
return
|
||||
# Re-read state once before writing so we capture any updates from
|
||||
# concurrent UI syncs.
|
||||
stopped_any = False
|
||||
for sid, host, port in to_stop:
|
||||
ok = await _stop_serve(sid, host, port)
|
||||
logger.info(f"cookbook_serve_lifecycle: stop {sid} (host={host or 'local'}): {'ok' if ok else 'failed'}")
|
||||
if ok:
|
||||
stopped_any = True
|
||||
# Drop the auto-registered endpoint so the model picker and
|
||||
# the chat router don't keep pointing at a dead server.
|
||||
for t in tasks:
|
||||
if isinstance(t, dict) and (t.get("sessionId") == sid or t.get("id") == sid):
|
||||
if t.get("type") == "serve":
|
||||
await _delete_endpoint_for_task(t)
|
||||
t["status"] = "stopped"
|
||||
t["_scheduledStopAtMs"] = None
|
||||
t["_lastStatusFlipAt"] = now_ms
|
||||
break
|
||||
if stopped_any:
|
||||
try:
|
||||
from core.atomic_io import atomic_write_json
|
||||
state["tasks"] = tasks
|
||||
atomic_write_json(state_path, state)
|
||||
except Exception as e:
|
||||
logger.warning(f"cookbook_serve_lifecycle: state write failed: {e}")
|
||||
|
||||
|
||||
async def cookbook_serve_lifecycle_loop() -> None:
|
||||
"""Forever-loop. Registered as a startup task in app.py."""
|
||||
await asyncio.sleep(20) # let the rest of startup settle
|
||||
while True:
|
||||
try:
|
||||
await _tick()
|
||||
except Exception as e:
|
||||
logger.warning(f"cookbook_serve_lifecycle tick failed: {e}")
|
||||
await asyncio.sleep(60)
|
||||
@@ -1018,6 +1018,10 @@ class TaskScheduler:
|
||||
kwargs = {"owner": task.owner, "task_name": task.name, "progress_cb": _progress}
|
||||
if task.action in ("run_script", "run_local", "ssh_command") and task.prompt:
|
||||
kwargs["script" if task.action in ("run_script", "run_local") else "command"] = task.prompt
|
||||
# cookbook_serve carries its JSON config in task.prompt — feed it
|
||||
# through as `command` so action_cookbook_serve can json.loads it.
|
||||
elif task.action == "cookbook_serve" and task.prompt:
|
||||
kwargs["command"] = task.prompt
|
||||
result, success = await action_fn(**kwargs)
|
||||
return result, success
|
||||
except TaskNoop:
|
||||
|
||||
@@ -1119,6 +1119,7 @@ async def execute_tool_block(
|
||||
do_manage_documents, do_manage_settings, do_manage_notes,
|
||||
do_manage_calendar,
|
||||
do_download_model, do_serve_model, do_list_served_models, do_stop_served_model,
|
||||
do_tail_serve_output,
|
||||
do_list_downloads, do_cancel_download, do_search_hf_models, do_list_cached_models,
|
||||
do_list_serve_presets, do_serve_preset, do_adopt_served_model,
|
||||
do_list_cookbook_servers,
|
||||
@@ -1290,6 +1291,9 @@ async def execute_tool_block(
|
||||
elif tool == "stop_served_model":
|
||||
desc = "stop_served_model"
|
||||
result = await do_stop_served_model(content, owner=owner)
|
||||
elif tool == "tail_serve_output":
|
||||
desc = "tail_serve_output"
|
||||
result = await do_tail_serve_output(content, owner=owner)
|
||||
elif tool == "list_downloads":
|
||||
desc = "list_downloads"
|
||||
result = await do_list_downloads(content, owner=owner)
|
||||
|
||||
@@ -5,6 +5,7 @@ Extracted tool implementation functions (do_* and helpers) from agent_tools.py.
|
||||
These handle the actual execution logic for each tool type.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -1956,7 +1957,19 @@ async def do_manage_notes(content: str, owner: Optional[str] = None) -> Dict:
|
||||
)
|
||||
db.add(note)
|
||||
db.commit()
|
||||
return {"response": f"Note created: \"{title or '(untitled)'}\" (id: {note.id[:8]})", "exit_code": 0}
|
||||
# Return note_id so the chat-side renderer can build a real
|
||||
# "View note" button that opens the notes modal at this id.
|
||||
# Previously the create response only included a prose
|
||||
# confirmation; the model would type "View note" as a markdown
|
||||
# link with no target, leaving the user with a click that
|
||||
# did nothing and uncertainty about whether the note was made.
|
||||
return {
|
||||
"response": f"Note created: \"{title or '(untitled)'}\" (id: {note.id[:8]})",
|
||||
"note_id": note.id,
|
||||
"note_title": title or "",
|
||||
"open_url": f"/#open=notes¬e={note.id}",
|
||||
"exit_code": 0,
|
||||
}
|
||||
|
||||
elif action == "update":
|
||||
note_id = args.get("id", "")
|
||||
@@ -2603,6 +2616,8 @@ async def _cookbook_env_for_host(host: str) -> Dict[str, Any]:
|
||||
|
||||
return {
|
||||
"env_prefix": env_prefix,
|
||||
"env_type": env_kind,
|
||||
"env_path": env_path,
|
||||
"gpus": env_root.get("gpus") or "",
|
||||
"platform": platform,
|
||||
"hf_token": env_root.get("hfToken") or "",
|
||||
@@ -3041,6 +3056,31 @@ async def do_serve_model(content: str, owner: Optional[str] = None) -> Dict:
|
||||
# the UI uses. Without env_prefix, `vllm serve …` lands in a shell
|
||||
# without the user's venv and fails 'command not found'.
|
||||
env_cfg = await _cookbook_env_for_host(host)
|
||||
# Rewrite bare `vllm` / `python3` leading tokens to the venv's absolute
|
||||
# binary path when the target host has a venv configured. SSH non-
|
||||
# interactive shells often leave ~/.local/bin ahead of the venv bin on
|
||||
# PATH even with the venv activated, so `vllm serve` finds the wrong
|
||||
# binary and crashes early (e.g. compute_89 torch ABI errors on an old
|
||||
# user-site torch). This mirrors what static/js/cookbook.js does in
|
||||
# _buildServeCmd for the UI launch path.
|
||||
env_path = (env_cfg.get("env_path") or "").rstrip("/")
|
||||
env_type = (env_cfg.get("env_type") or env_cfg.get("env") or "").lower()
|
||||
if env_type == "venv" and env_path:
|
||||
venv_bin = f"{env_path}/bin"
|
||||
# Match the FIRST shell-token: skip leading KEY=VAL env-var prefixes
|
||||
# (CUDA_VISIBLE_DEVICES=… VLLM_USE_FLASHINFER_SAMPLER=…) before the binary.
|
||||
import re as _re3
|
||||
tokens = cmd.split()
|
||||
idx = 0
|
||||
env_re = _re3.compile(r"^[A-Za-z_][A-Za-z0-9_]*=")
|
||||
while idx < len(tokens) and env_re.match(tokens[idx]):
|
||||
idx += 1
|
||||
if idx < len(tokens):
|
||||
head = tokens[idx]
|
||||
if head in ("vllm", "python3", "python"):
|
||||
tokens[idx] = f"{venv_bin}/{head}"
|
||||
cmd = " ".join(tokens)
|
||||
payload["cmd"] = cmd
|
||||
if env_cfg.get("env_prefix"): payload["env_prefix"] = env_cfg["env_prefix"]
|
||||
if env_cfg.get("gpus"): payload["gpus"] = env_cfg["gpus"]
|
||||
if env_cfg.get("hf_token"): payload["hf_token"] = env_cfg["hf_token"]
|
||||
@@ -3059,7 +3099,19 @@ async def do_serve_model(content: str, owner: Optional[str] = None) -> Dict:
|
||||
)
|
||||
note = "" if registered else " (state-write failed — task may not show in UI)"
|
||||
return {"output": f"Serving {repo_id} (session: {sid}){note}", "session_id": sid, "exit_code": 0}
|
||||
return {"error": data.get("error", "Serve failed"), "exit_code": 1}
|
||||
# FastAPI HTTPException puts the message under `detail`, not `error`.
|
||||
# Surface BOTH so the agent sees "Invalid characters in cmd" (from
|
||||
# _validate_serve_cmd rejecting `&&`/`source`/`cd`) instead of
|
||||
# the generic "Serve failed", which leaves it with nothing to act on.
|
||||
err_msg = data.get("error") or data.get("detail") or "Serve failed"
|
||||
hint = ""
|
||||
if isinstance(err_msg, str) and "cmd" in err_msg.lower():
|
||||
hint = (" — the cmd must START with an allowlisted binary "
|
||||
"(vllm, python3, llama-server, ollama, sglang, lmdeploy, node, npx). "
|
||||
"Do NOT prefix with `cd …`, `source …`, or chain with `&&`. "
|
||||
"env_prefix (e.g. `source ~/qwen35-env/bin/activate`) is added "
|
||||
"automatically from the host's saved venv settings.")
|
||||
return {"error": f"{err_msg}{hint}", "exit_code": 1}
|
||||
except Exception as e:
|
||||
return {"error": str(e), "exit_code": 1}
|
||||
|
||||
@@ -3103,13 +3155,31 @@ async def do_list_served_models(content: str, owner: Optional[str] = None) -> Di
|
||||
"exit_code": 0,
|
||||
}
|
||||
|
||||
# Sort so the agent sees what's actually LIVE first. Stopped/error/
|
||||
# completed tasks are mostly historical noise — they shouldn't lead
|
||||
# the list when something is genuinely serving.
|
||||
_ORDER = {
|
||||
"ready": 0, "running": 1, "loading": 1, "warming": 1,
|
||||
"queued": 2, "starting": 2,
|
||||
"error": 5, "crashed": 5, "failed": 5,
|
||||
"stopped": 6, "killed": 6, "cancelled": 6, "canceled": 6,
|
||||
"done": 7, "completed": 7, "finished": 7,
|
||||
}
|
||||
def _rank(t: Dict[str, Any]) -> int:
|
||||
phase = (t.get("phase") or t.get("status") or "unknown").lower()
|
||||
return _ORDER.get(phase, 3)
|
||||
merged.sort(key=_rank)
|
||||
|
||||
cb_n = len(cookbook_tasks)
|
||||
ext_n = len(external)
|
||||
live_n = sum(1 for t in merged if _rank(t) <= 2)
|
||||
header = []
|
||||
if cb_n:
|
||||
header.append(f"{cb_n} cookbook-tracked")
|
||||
if ext_n:
|
||||
header.append(f"{ext_n} external")
|
||||
if live_n:
|
||||
header.insert(0, f"{live_n} LIVE")
|
||||
lines = [f"Running: {', '.join(header)}."]
|
||||
for t in merged:
|
||||
phase = t.get("phase") or t.get("status", "unknown")
|
||||
@@ -3136,8 +3206,20 @@ async def do_list_served_models(content: str, owner: Optional[str] = None) -> Di
|
||||
if t.get("status") == "error" and t.get("output_tail"):
|
||||
tail = str(t.get("output_tail") or "").strip()
|
||||
if tail:
|
||||
# Prefer a window around a Python traceback if one exists,
|
||||
# falling back to the last 30 lines. The previous 6-line
|
||||
# tail showed only the post-crash bash prompt / neofetch
|
||||
# banner ("Locale: C / Ubuntu_Odysseus ❯") — useless for
|
||||
# diagnosis. The traceback we want is usually 50-200 lines
|
||||
# earlier in the buffer.
|
||||
_tail_lines = tail.splitlines()
|
||||
_shown = _tail_lines[-30:]
|
||||
for _i, _ln in enumerate(_tail_lines):
|
||||
if "Traceback (most recent call last)" in _ln or "ERROR" in _ln or "Error:" in _ln:
|
||||
_shown = _tail_lines[_i:_i + 40]
|
||||
break
|
||||
lines.append(" recent log:")
|
||||
for line in tail.splitlines()[-6:]:
|
||||
for line in _shown:
|
||||
lines.append(f" {line[:220]}")
|
||||
if t.get("external") and t.get("cmdline_preview"):
|
||||
lines.append(f" cmd: {t['cmdline_preview']}")
|
||||
@@ -3243,6 +3325,125 @@ async def do_stop_served_model(content: str, owner: Optional[str] = None) -> Dic
|
||||
)
|
||||
|
||||
|
||||
async def do_tail_serve_output(content: str, owner: Optional[str] = None) -> Dict:
|
||||
"""Capture the last N lines of a cookbook task's tmux pane — remote-aware.
|
||||
|
||||
Used by the agent to debug a failed/stuck serve: list_served_models tells
|
||||
you the task is `crashed`, this tool returns the actual stderr/traceback
|
||||
so the agent can match it against a known fix (compute_89 nvcc mismatch,
|
||||
flashinfer version mismatch, OOM, missing kernels, etc.) and decide
|
||||
whether to relaunch via serve_model with new flags.
|
||||
"""
|
||||
import httpx
|
||||
import shlex
|
||||
try:
|
||||
args = _parse_tool_args(content)
|
||||
except ValueError:
|
||||
return {"error": "Invalid JSON arguments", "exit_code": 1}
|
||||
session_id = (args.get("session_id") or "").strip()
|
||||
if not session_id:
|
||||
return {"error": "session_id is required (from list_served_models)", "exit_code": 1}
|
||||
import re as _re
|
||||
if not _re.fullmatch(r"[a-zA-Z0-9_-]+", session_id):
|
||||
return {"error": "Invalid session_id format", "exit_code": 1}
|
||||
try:
|
||||
tail = int(args.get("tail") or 400)
|
||||
except (TypeError, ValueError):
|
||||
tail = 400
|
||||
tail = max(20, min(tail, 4000))
|
||||
headers = _internal_headers()
|
||||
remote = (args.get("remote_host") or args.get("host") or "").strip()
|
||||
sport = (args.get("ssh_port") or "").strip()
|
||||
# Resolve host from cookbook state if caller didn't pass one — same
|
||||
# lookup _cookbook_kill_session uses.
|
||||
if not remote:
|
||||
state: Dict[str, Any] = {}
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.get(f"{_COOKBOOK_BASE}/api/cookbook/state", headers=headers)
|
||||
state = resp.json() or {}
|
||||
except Exception as e:
|
||||
logger.debug(f"cookbook state lookup failed for {session_id}: {e}")
|
||||
if isinstance(state, dict):
|
||||
for t in (state.get("tasks") or []):
|
||||
if isinstance(t, dict) and (t.get("sessionId") == session_id or t.get("id") == session_id):
|
||||
remote = t.get("remoteHost") or ""
|
||||
if not sport:
|
||||
sport = t.get("sshPort") or ""
|
||||
break
|
||||
# Prefer the persisted /tmp/odysseus-tmux/SESSION.log file over the
|
||||
# live tmux pane. The pane is what the user would see scrolling on
|
||||
# their screen — including the post-crash neofetch banner and the
|
||||
# idle bash prompt that overwrites the actual traceback the moment
|
||||
# vllm exits. The log file is the raw stdout/stderr of the wrapped
|
||||
# process and survives the crash unchanged. We only fall back to
|
||||
# the pane when the log file doesn't exist (older sessions launched
|
||||
# before the tmux+tee wrapper was added).
|
||||
log_path = f"/tmp/odysseus-tmux/{session_id}.log"
|
||||
pane_inner = f"tmux capture-pane -t {shlex.quote(session_id)} -p -S -{tail} 2>/dev/null"
|
||||
file_inner = f"tail -n {tail} {shlex.quote(log_path)} 2>/dev/null"
|
||||
inner = (
|
||||
f"if [ -s {shlex.quote(log_path)} ]; then {file_inner}; "
|
||||
f"else {pane_inner}; fi"
|
||||
)
|
||||
if remote:
|
||||
_pf = f"-p {shlex.quote(str(sport))} " if sport and str(sport) != "22" else ""
|
||||
cmd = (
|
||||
f"ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no "
|
||||
f"{_pf}{shlex.quote(remote)} {shlex.quote(inner)}"
|
||||
)
|
||||
host_label = remote
|
||||
else:
|
||||
cmd = inner
|
||||
host_label = "local"
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=20) as client:
|
||||
resp = await client.post(f"{_COOKBOOK_BASE}/api/shell/exec",
|
||||
json={"command": cmd}, headers=headers)
|
||||
if resp.status_code >= 400:
|
||||
return {"error": f"shell/exec returned HTTP {resp.status_code}: {resp.text[:200]}", "exit_code": 1}
|
||||
data = resp.json() if resp.content else {}
|
||||
output_text = (data.get("stdout") or "").strip()
|
||||
stderr_text = (data.get("stderr") or "").strip()
|
||||
rc = data.get("exit_code")
|
||||
if rc not in (None, 0) and not output_text:
|
||||
already_gone = any(s in (stderr_text or "").lower() for s in ("no server running", "can't find session", "session not found"))
|
||||
if already_gone:
|
||||
return {"output": f"Tmux session {session_id} on {host_label} is gone (task already exited).", "exit_code": 0, "session_id": session_id, "host": host_label}
|
||||
return {"error": f"capture-pane failed on {host_label}: {stderr_text or f'exit {rc}'}", "exit_code": 1}
|
||||
# Dedupe download-progress noise. A 100-shard HF download produces
|
||||
# tens of thousands of `model-NN-of-MM.safetensors: 91%|...` lines
|
||||
# that all look the same to the agent and drown the actual error.
|
||||
# Keep only one sample per (file, decile-percent) bucket.
|
||||
import re as _re2
|
||||
lines = output_text.splitlines()
|
||||
dedup_lines = []
|
||||
seen_progress = set()
|
||||
progress_re = _re2.compile(r"^([\w./\-]+):\s+(\d+)%")
|
||||
for ln in lines:
|
||||
m = progress_re.match(ln.strip())
|
||||
if m:
|
||||
key = (m.group(1), int(m.group(2)) // 10) # bucket by 10%
|
||||
if key in seen_progress:
|
||||
continue
|
||||
seen_progress.add(key)
|
||||
dedup_lines.append(ln)
|
||||
output_text = "\n".join(dedup_lines)
|
||||
# Hard cap so the agent doesn't blow its token budget.
|
||||
MAX_CHARS = 8000
|
||||
if len(output_text) > MAX_CHARS:
|
||||
output_text = "…(earlier output truncated)…\n" + output_text[-MAX_CHARS:]
|
||||
return {
|
||||
"output": output_text or "(empty pane)",
|
||||
"session_id": session_id,
|
||||
"host": host_label,
|
||||
"tail_lines": tail,
|
||||
"exit_code": 0,
|
||||
}
|
||||
except Exception as e:
|
||||
return {"error": str(e), "exit_code": 1}
|
||||
|
||||
|
||||
async def do_list_downloads(content: str, owner: Optional[str] = None) -> Dict:
|
||||
"""List in-flight model downloads (filters /api/cookbook/tasks/status to type=download)."""
|
||||
import httpx
|
||||
@@ -3615,38 +3816,133 @@ async def do_serve_preset(content: str, owner: Optional[str] = None) -> Dict:
|
||||
|
||||
|
||||
async def do_list_cached_models(content: str, owner: Optional[str] = None) -> Dict:
|
||||
"""List models already cached locally (or on a remote host)."""
|
||||
"""List models already cached locally and/or on remote hosts.
|
||||
|
||||
With no `host` arg, scans EVERY configured Cookbook server (and local)
|
||||
and aggregates — so the agent sees the full inventory in one call
|
||||
instead of having to query each server individually.
|
||||
"""
|
||||
import httpx
|
||||
try:
|
||||
args = _parse_tool_args(content) if content.strip() else {}
|
||||
except ValueError:
|
||||
return {"error": "Invalid JSON arguments", "exit_code": 1}
|
||||
params: Dict[str, str] = {}
|
||||
raw_host = (args.get("host") or "").strip()
|
||||
host = await _resolve_cookbook_host(raw_host) if raw_host else ""
|
||||
if host:
|
||||
params["host"] = host
|
||||
if args.get("model_dir"):
|
||||
params["model_dir"] = args["model_dir"]
|
||||
if args.get("ssh_port"):
|
||||
params["ssh_port"] = str(args["ssh_port"])
|
||||
if args.get("platform"):
|
||||
params["platform"] = args["platform"]
|
||||
headers = _internal_headers()
|
||||
|
||||
async def _scan_one(host_label: str, host_val: str, ssh_port: str = "",
|
||||
platform: str = "", model_dir: str = "") -> list:
|
||||
"""Hit /api/model/cached for one host; tag each returned model with its source."""
|
||||
p: Dict[str, str] = {}
|
||||
if host_val:
|
||||
p["host"] = host_val
|
||||
# Caller-provided override beats per-server config beats nothing.
|
||||
if args.get("model_dir"):
|
||||
p["model_dir"] = args["model_dir"]
|
||||
elif model_dir:
|
||||
p["model_dir"] = model_dir
|
||||
if ssh_port:
|
||||
p["ssh_port"] = ssh_port
|
||||
elif args.get("ssh_port"):
|
||||
p["ssh_port"] = str(args["ssh_port"])
|
||||
if platform:
|
||||
p["platform"] = platform
|
||||
elif args.get("platform"):
|
||||
p["platform"] = args["platform"]
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=60) as client:
|
||||
resp = await client.get(f"{_COOKBOOK_BASE}/api/model/cached",
|
||||
params=p, headers=headers)
|
||||
data = resp.json()
|
||||
ms = data.get("models", []) if isinstance(data, dict) else (data or [])
|
||||
for m in ms:
|
||||
m["host"] = host_label or "local"
|
||||
return ms or []
|
||||
except Exception as e:
|
||||
logger.debug(f"list_cached_models scan({host_label}) failed: {e}")
|
||||
return []
|
||||
|
||||
# When the caller specifies a host explicitly, scan only that one (old behaviour).
|
||||
# Otherwise iterate every configured server + local so the agent doesn't
|
||||
# have to repeat the call per server.
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=60) as client:
|
||||
resp = await client.get(f"{_COOKBOOK_BASE}/api/model/cached",
|
||||
params=params, headers=_internal_headers())
|
||||
data = resp.json()
|
||||
models = data.get("models", []) if isinstance(data, dict) else data
|
||||
# Pull configured servers from cookbook state (used for resolving
|
||||
# modelDirs both when caller specifies a host and when we scan all).
|
||||
servers: list = []
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
st = await client.get(f"{_COOKBOOK_BASE}/api/cookbook/state", headers=headers)
|
||||
st_data = st.json() if st.headers.get("content-type", "").startswith("application/json") else {}
|
||||
servers = (st_data.get("env", {}) or {}).get("servers") or []
|
||||
except Exception as e:
|
||||
logger.debug(f"server list fetch failed: {e}")
|
||||
st_data = {}
|
||||
|
||||
def _dirs_for(server_record: Dict[str, Any]) -> str:
|
||||
"""Comma-joined modelDirs from a saved server record (Settings).
|
||||
|
||||
Filters out the HF cache (~/.cache/huggingface/hub) — the backend
|
||||
scan script always scans it by default, so re-passing it as an
|
||||
extra model_dir is redundant AND confuses some path-handling
|
||||
edge cases where the extra dir suppresses the deeper scan.
|
||||
We only need to forward the NON-default dirs (e.g. /mnt/HADES/models).
|
||||
"""
|
||||
mds = server_record.get("modelDirs") if isinstance(server_record, dict) else None
|
||||
HF_DEFAULTS = {"~/.cache/huggingface/hub", "~/.cache/huggingface"}
|
||||
if isinstance(mds, list):
|
||||
extras = [d for d in mds if isinstance(d, str) and d.strip() and d.strip() not in HF_DEFAULTS]
|
||||
return ",".join(extras)
|
||||
if isinstance(mds, str) and mds.strip() not in HF_DEFAULTS:
|
||||
return mds
|
||||
return ""
|
||||
|
||||
if raw_host:
|
||||
host = await _resolve_cookbook_host(raw_host)
|
||||
# Find this host's saved record so its modelDirs apply too.
|
||||
srv = next(
|
||||
(s for s in servers if isinstance(s, dict)
|
||||
and (s.get("name") == raw_host or s.get("host") == host or s.get("host") == raw_host)),
|
||||
{},
|
||||
)
|
||||
models = await _scan_one(raw_host, host, model_dir=_dirs_for(srv))
|
||||
else:
|
||||
# Always include local. Local's saved record is the one with no host.
|
||||
local_srv = next((s for s in servers if isinstance(s, dict) and not (s.get("host") or "").strip()), {})
|
||||
scans: list = [_scan_one("local", "", model_dir=_dirs_for(local_srv))]
|
||||
for s in servers:
|
||||
if not isinstance(s, dict):
|
||||
continue
|
||||
name = s.get("name") or s.get("host")
|
||||
host_val = s.get("host") or ""
|
||||
if not host_val:
|
||||
continue
|
||||
scans.append(_scan_one(
|
||||
name,
|
||||
host_val,
|
||||
ssh_port=str(s.get("port") or ""),
|
||||
platform=s.get("platform") or "",
|
||||
model_dir=_dirs_for(s),
|
||||
))
|
||||
results = await asyncio.gather(*scans, return_exceptions=False)
|
||||
# Dedupe by (host, repo_id) — same model could appear in both HF cache + Ollama list.
|
||||
seen = set()
|
||||
models: list = []
|
||||
for batch in results:
|
||||
for m in batch:
|
||||
key = (m.get("host", ""), m.get("repo_id", ""))
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
models.append(m)
|
||||
if not models:
|
||||
# Filesystem cache scans can miss models downloaded into the HF
|
||||
# default cache when the server has no explicit model_dir configured.
|
||||
# Still surface completed Cookbook downloads so the agent doesn't
|
||||
# incorrectly assume a model is absent and re-download it.
|
||||
# Cache scans can miss models downloaded into the HF default cache
|
||||
# when the server has no explicit model_dir configured. Surface
|
||||
# completed Cookbook download tasks so the agent doesn't conclude
|
||||
# a model is absent and re-download it.
|
||||
downloaded = []
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
st = await client.get(f"{_COOKBOOK_BASE}/api/cookbook/state", headers=_internal_headers())
|
||||
st = await client.get(f"{_COOKBOOK_BASE}/api/cookbook/state", headers=headers)
|
||||
state = st.json() if st.headers.get("content-type", "").startswith("application/json") else {}
|
||||
for t in (state.get("tasks") or []):
|
||||
if not isinstance(t, dict) or t.get("type") != "download":
|
||||
@@ -3654,27 +3950,44 @@ async def do_list_cached_models(content: str, owner: Optional[str] = None) -> Di
|
||||
if (t.get("status") or "").lower() not in {"done", "completed"}:
|
||||
continue
|
||||
task_host = t.get("remoteHost") or (t.get("payload") or {}).get("remote_host") or ""
|
||||
if host and task_host != host:
|
||||
if raw_host and task_host != raw_host:
|
||||
continue
|
||||
repo = t.get("modelId") or t.get("repoId") or (t.get("payload") or {}).get("repo_id") or t.get("name")
|
||||
if repo and repo not in downloaded:
|
||||
downloaded.append(repo)
|
||||
except Exception:
|
||||
downloaded = []
|
||||
host_str = f" on {raw_host}" if raw_host else ""
|
||||
if downloaded:
|
||||
host_str = f" on {raw_host or host}" if (raw_host or host) else ""
|
||||
lines = [f"No cache paths were detected{host_str}, but Cookbook has completed download task(s):"]
|
||||
lines.extend(f"- {repo} — downloaded via Cookbook task" for repo in downloaded)
|
||||
return {"output": "\n".join(lines), "models": [{"repo_id": repo, "source": "cookbook_task"} for repo in downloaded], "exit_code": 0}
|
||||
host_str = f" on {raw_host or host}" if (raw_host or host) else ""
|
||||
return {"output": f"No cached models found{host_str}.", "exit_code": 0}
|
||||
lines = [f"{len(models)} cached model(s):"]
|
||||
for m in models:
|
||||
name = m.get("repo_id", "?")
|
||||
sz = m.get("size") or (f"{m.get('size_bytes', 0) / (1024**3):.1f}GB" if m.get("size_bytes") else "")
|
||||
inc = " (incomplete)" if m.get("has_incomplete") else ""
|
||||
kind = " [diffusion]" if m.get("is_diffusion") else ""
|
||||
lines.append(f"- {name}{kind} — {sz}{inc}")
|
||||
# Multi-host scan: group by host so the agent sees inventory per server.
|
||||
# Single-host scan: flat list (matches old output shape).
|
||||
if raw_host:
|
||||
lines = [f"{len(models)} cached model(s) on {raw_host}:"]
|
||||
for m in models:
|
||||
name = m.get("repo_id", "?")
|
||||
sz = m.get("size") or (f"{m.get('size_bytes', 0) / (1024**3):.1f}GB" if m.get("size_bytes") else "")
|
||||
inc = " (incomplete)" if m.get("has_incomplete") else ""
|
||||
kind = " [diffusion]" if m.get("is_diffusion") else ""
|
||||
lines.append(f"- {name}{kind} — {sz}{inc}")
|
||||
else:
|
||||
from collections import defaultdict as _dd
|
||||
by_host = _dd(list)
|
||||
for m in models:
|
||||
by_host[m.get("host", "local")].append(m)
|
||||
lines = [f"{len(models)} cached model(s) across {len(by_host)} server(s):"]
|
||||
for host_name in sorted(by_host.keys()):
|
||||
lines.append(f"\n[{host_name}]")
|
||||
for m in by_host[host_name]:
|
||||
name = m.get("repo_id", "?")
|
||||
sz = m.get("size") or (f"{m.get('size_bytes', 0) / (1024**3):.1f}GB" if m.get("size_bytes") else "")
|
||||
inc = " (incomplete)" if m.get("has_incomplete") else ""
|
||||
kind = " [diffusion]" if m.get("is_diffusion") else ""
|
||||
backend = f" ({m.get('backend')})" if m.get("backend") else ""
|
||||
lines.append(f"- {name}{kind}{backend} — {sz}{inc}")
|
||||
return {"output": "\n".join(lines), "models": models, "exit_code": 0}
|
||||
except Exception as e:
|
||||
return {"error": str(e), "exit_code": 1}
|
||||
|
||||
@@ -37,7 +37,15 @@ ALWAYS_AVAILABLE = frozenset({
|
||||
# keyword hints when the user is actually talking about cookbook.
|
||||
# Keeping the always-on set small leaves room in the ~16-tool
|
||||
# budget for manage_tasks / manage_calendar / etc.
|
||||
"list_served_models", "stop_served_model",
|
||||
"list_served_models", "stop_served_model", "tail_serve_output",
|
||||
# Serving is a core agent capability — keep these always available so
|
||||
# the router doesn't lose them on phrasings like "servic" / "fire up" / "boot".
|
||||
"serve_model", "serve_preset", "list_serve_presets",
|
||||
"list_cached_models", "list_cookbook_servers",
|
||||
# Fallback when serve_model's allowlist rejects a cmd or when the
|
||||
# model was launched out-of-band via bash+tmux — without this the
|
||||
# session is invisible to the cookbook UI even though it's running.
|
||||
"adopt_served_model",
|
||||
# Generic API loopback — the catch-all when no named tool fits.
|
||||
"app_api",
|
||||
# Memory is ambient — "remember this" can follow any message regardless
|
||||
@@ -118,9 +126,10 @@ BUILTIN_TOOL_DESCRIPTIONS: Dict[str, str] = {
|
||||
"manage_notes": "Create and manage notes and checklists (Google Keep-style). ALWAYS use this for note/todo/checklist/reminder creation — NEVER hit /api/notes via app_api. Accepts natural-language `due_date` like 'tomorrow at 9am' or '11pm today' (parsed in the USER'S timezone). The due_date IS the reminder — it fires a notification at that time, so do NOT also create a calendar event for the same reminder. Set colors, labels, pin, archive. Do NOT use manage_memory for note content.",
|
||||
"manage_calendar": "Calendar event management: list, create, update, delete. Each event can carry a tag/category (event_type — work/personal/health/travel/meal/social/admin/other) and importance (low/normal/high/critical). Resolve today/tomorrow using the Current date and time context, then use ISO datetimes in the user's local wall time; supports all-day events. For event reminders/alarms, pass reminder_minutes; this creates the Notes reminder, so do not also call manage_notes for the same reminder.",
|
||||
"download_model": "Download a HuggingFace model to a local or remote server. Specify repo_id (e.g. 'Qwen/Qwen3-8B'), optional server host, and optional include filter for specific files.",
|
||||
"serve_model": "Start serving a model with vLLM, SGLang, llama.cpp, Ollama, or Diffusers. For image/inpainting/diffusion use python3 scripts/diffusion_server.py --model <repo> --port 8100. After launch, call list_served_models for readiness/errors and retry suggestions.",
|
||||
"serve_model": "Start serving a model with vLLM, SGLang, llama.cpp, Ollama, or Diffusers. cmd MUST start with the binary directly — e.g. `vllm serve /mnt/HADES/models/Qwen3.5-397B-A17B-AWQ --port 8003 --tensor-parallel-size 8 …`. NEVER prefix with `cd …`, `source …`, or chain with `&&`/`||` — those get rejected by the validator. The venv activation (env_prefix) and CUDA env are added automatically from the target host's saved settings. For image/inpainting/diffusion use python3 scripts/diffusion_server.py --model <repo> --port 8100. After launch, call list_served_models for readiness/errors and retry suggestions. If serve_model fails with 'Invalid characters in cmd', simplify to the bare binary + args.",
|
||||
"list_served_models": "List currently running model servers in the Cookbook — shows status (loading, ready, idle, error), model name, port, throughput, and serve failure diagnosis/retry suggestions. Use when the user asks 'what's running', 'show my cookbook', 'which models are up', 'what's serving'.",
|
||||
"stop_served_model": "Stop a running model server in the Cookbook by session ID or model name. Use when the user says 'kill my cookbook', 'stop the model', 'kill the serve', 'shut down vLLM', 'cancel the running model'.",
|
||||
"tail_serve_output": "Read the actual tmux stderr/traceback of a cookbook serve/download task. Use to debug WHY a task is `crashed`/`error` (compute_89 nvcc mismatch, OOM, missing kernels, wrong attention backend, etc.) so you can call serve_model with adjusted flags. Pass session_id from list_served_models; tail defaults to 300, bump if the error references 'see root cause above'.",
|
||||
"list_downloads": "List in-progress HuggingFace model downloads in the Cookbook. Shows model name, phase, percent, session ID. Use for 'what's downloading', 'show my downloads', 'check download progress'.",
|
||||
"cancel_download": "Cancel an in-progress model download by tmux session ID. Use for 'cancel the download', 'stop downloading X', 'kill the download'. Call list_downloads first to get the session_id.",
|
||||
"search_hf_models": "Search HuggingFace for models matching a query (e.g. 'qwen 8B', 'flux', 'llama-3 instruct'). Returns ranked repo IDs with sizes and download counts. Use for 'find a model', 'search huggingface for X', 'what models are there for Y'.",
|
||||
|
||||
@@ -787,6 +787,21 @@ FUNCTION_TOOL_SCHEMAS = [
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "tail_serve_output",
|
||||
"description": "Read the last N lines of a cookbook serve/download task's tmux pane. Use ONLY in this exact sequence: (1) the user asked to serve a model, (2) you launched it via serve_model, (3) list_served_models reports the NEW task as crashed/error, (4) call tail_serve_output on the new sessionId to find the root cause, (5) call serve_model again with adjusted flags. DO NOT call this on old stopped/completed download tasks — they are historical and won't tell you anything about the current attempt. DO NOT investigate past failures before launching; the environment may have changed since.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"session_id": {"type": "string", "description": "Tmux session id from list_served_models (e.g. 'serve-abc12345', 'cookbook-a1b2c3d4')."},
|
||||
"tail": {"type": "integer", "description": "How many lines of pane scrollback to fetch (default 300, max 4000). Bump this if the error in the visible tail references an earlier line ('see root cause above')."},
|
||||
},
|
||||
"required": ["session_id"]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
|
||||
Reference in New Issue
Block a user