2330 lines
120 KiB
Python
2330 lines
120 KiB
Python
"""Cookbook routes — model download, serve, cache scanning, and cookbook state sync."""
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import shlex
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
import uuid
|
||
from pathlib import Path
|
||
|
||
from fastapi import APIRouter, HTTPException, Request, Depends
|
||
|
||
from src.auth_helpers import require_user
|
||
from pydantic import BaseModel
|
||
|
||
from core.middleware import require_admin
|
||
from core.platform_compat import (
|
||
IS_WINDOWS,
|
||
detached_popen_kwargs,
|
||
find_bash,
|
||
kill_process_tree,
|
||
pid_alive,
|
||
safe_chmod,
|
||
which_tool,
|
||
)
|
||
from routes.shell_routes import TMUX_LOG_DIR
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
from routes.cookbook_helpers import (
|
||
_SSH_PORT_RE, _REMOTE_HOST_RE, _SESSION_ID_RE,
|
||
_validate_repo_id, _validate_serve_model_id, _validate_include, _validate_remote_host, _validate_token,
|
||
_validate_local_dir, _validate_ssh_port, _validate_gpus, _shell_path,
|
||
_ps_squote, _bash_squote, _validate_serve_cmd, _parse_serve_phase,
|
||
_safe_env_prefix, _local_tooling_path_export, _append_serve_preflight_exit_lines,
|
||
_append_serve_exit_code_lines, _append_llama_cpp_linux_accel_build_lines, _cached_model_scan_script,
|
||
_append_vllm_linux_preflight_lines, _ollama_bind_from_cmd, _pip_install_fallback_chain,
|
||
_pip_install_no_cache, _user_shell_path_bootstrap, _venv_safe_local_pip_install_cmd,
|
||
_diagnose_serve_output,
|
||
ModelDownloadRequest, ServeRequest,
|
||
)
|
||
|
||
_HF_TOKEN_STATUS_SNIPPET = (
|
||
'if [ -n "$HF_TOKEN" ]; then '
|
||
'echo "[odysseus] HF token: applied"; '
|
||
'else '
|
||
'echo "[odysseus] HF token: NOT SET — gated/private models will be denied. '
|
||
'Add one in Odysseus Settings -> Cookbook -> HuggingFace Token."; '
|
||
'fi'
|
||
)
|
||
|
||
def setup_cookbook_routes() -> APIRouter:
|
||
router = APIRouter(tags=["cookbook"])
|
||
_cookbook_state_path = Path(os.environ.get("DATA_DIR", "data")) / "cookbook_state.json"
|
||
|
||
def _mask_secret(value: str) -> str:
|
||
if not value:
|
||
return ""
|
||
if len(value) <= 8:
|
||
return "stored"
|
||
return f"{value[:4]}...{value[-4:]}"
|
||
|
||
def _decrypt_secret(value: str | None) -> str:
|
||
if not value:
|
||
return ""
|
||
from src.secret_storage import decrypt
|
||
return decrypt(value)
|
||
|
||
def _encrypt_secret(value: str) -> str:
|
||
from src.secret_storage import encrypt
|
||
return encrypt(value)
|
||
|
||
def _strip_task_secrets(state):
|
||
tasks = state.get("tasks") if isinstance(state, dict) else None
|
||
if isinstance(tasks, list):
|
||
for task in tasks:
|
||
if isinstance(task, dict) and isinstance(task.get("payload"), dict):
|
||
task["payload"].pop("hf_token", None)
|
||
return state
|
||
|
||
def _state_for_client(state):
|
||
"""Return cookbook state without raw secrets for browser clients."""
|
||
_strip_task_secrets(state)
|
||
env = state.get("env") if isinstance(state, dict) else None
|
||
if isinstance(env, dict):
|
||
token = _decrypt_secret(env.get("hfToken"))
|
||
env.pop("hfToken", None)
|
||
env["hfTokenConfigured"] = bool(token)
|
||
env["hfTokenMasked"] = _mask_secret(token)
|
||
return state
|
||
|
||
def _state_for_storage(state, on_disk=None):
|
||
"""Encrypt cookbook secrets before writing state to disk."""
|
||
_strip_task_secrets(state)
|
||
env = state.get("env") if isinstance(state, dict) else None
|
||
disk_env = on_disk.get("env") if isinstance(on_disk, dict) and isinstance(on_disk.get("env"), dict) else {}
|
||
if isinstance(env, dict):
|
||
incoming = env.get("hfToken")
|
||
if incoming:
|
||
_validate_token(incoming)
|
||
env["hfToken"] = _encrypt_secret(incoming)
|
||
elif disk_env.get("hfToken"):
|
||
env["hfToken"] = disk_env["hfToken"]
|
||
else:
|
||
env.pop("hfToken", None)
|
||
env.pop("hfTokenMasked", None)
|
||
env.pop("hfTokenConfigured", None)
|
||
return state
|
||
|
||
def _load_stored_hf_token() -> str:
|
||
if not _cookbook_state_path.exists():
|
||
return ""
|
||
try:
|
||
state = json.loads(_cookbook_state_path.read_text(encoding="utf-8"))
|
||
env = state.get("env") if isinstance(state, dict) else {}
|
||
return _decrypt_secret(env.get("hfToken") if isinstance(env, dict) else "")
|
||
except Exception:
|
||
return ""
|
||
|
||
def _cookbook_ssh_dir() -> Path:
|
||
# The Docker image keeps cookbook keys under /app/.ssh; that path only
|
||
# exists inside the container. On Windows (and any non-container host)
|
||
# fall back to the user profile's ~/.ssh, which OpenSSH on Win10+ uses.
|
||
if not IS_WINDOWS:
|
||
app_ssh = Path("/app/.ssh")
|
||
if Path("/app").exists():
|
||
return app_ssh
|
||
return Path.home() / ".ssh"
|
||
|
||
def _cookbook_ssh_key_path() -> Path:
|
||
return _cookbook_ssh_dir() / "id_ed25519"
|
||
|
||
def _read_cookbook_public_key() -> str:
|
||
pub = _cookbook_ssh_key_path().with_suffix(".pub")
|
||
if not pub.exists():
|
||
return ""
|
||
return pub.read_text(encoding="utf-8", errors="replace").strip()
|
||
|
||
@router.get("/api/cookbook/ssh-key")
|
||
async def get_cookbook_ssh_key(request: Request):
|
||
require_admin(request)
|
||
public_key = _read_cookbook_public_key()
|
||
return {
|
||
"configured": bool(public_key),
|
||
"public_key": public_key,
|
||
}
|
||
|
||
@router.post("/api/cookbook/ssh-key")
|
||
async def generate_cookbook_ssh_key(request: Request):
|
||
require_admin(request)
|
||
ssh_dir = _cookbook_ssh_dir()
|
||
key_path = _cookbook_ssh_key_path()
|
||
ssh_dir.mkdir(parents=True, exist_ok=True)
|
||
# safe_chmod no-ops on Windows (~/.ssh is already ACL-restricted to the
|
||
# user profile); applies 0o700 on POSIX.
|
||
safe_chmod(ssh_dir, 0o700)
|
||
if not key_path.exists():
|
||
# ssh-keygen ships with the OpenSSH client on Win10+; resolve it via
|
||
# which_tool so the .exe is found even when PATHEXT is unusual.
|
||
ssh_keygen = which_tool("ssh-keygen") or "ssh-keygen"
|
||
proc = await asyncio.create_subprocess_exec(
|
||
ssh_keygen, "-t", "ed25519", "-N", "", "-C", "odysseus-cookbook", "-f", str(key_path),
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
stdout, stderr = await proc.communicate()
|
||
if proc.returncode != 0:
|
||
detail = (stderr or stdout).decode("utf-8", errors="replace").strip()[-500:]
|
||
return {"ok": False, "error": detail or "Failed to generate SSH key"}
|
||
safe_chmod(key_path, 0o600)
|
||
safe_chmod(key_path.with_suffix(".pub"), 0o644)
|
||
return {"ok": True, "public_key": _read_cookbook_public_key()}
|
||
|
||
def _needs_binary(cmd: str, binary: str) -> bool:
|
||
return bool(re.search(rf"(^|[\s;&|()]){re.escape(binary)}($|[\s;&|()])", cmd or ""))
|
||
|
||
def _missing_binary_message(binary: str, target: str) -> str:
|
||
if binary == "tmux":
|
||
return (
|
||
f"tmux is required for Cookbook background downloads/serves on {target}. "
|
||
"Install it with your OS package manager, or run Cookbook server setup for that server."
|
||
)
|
||
if binary == "docker":
|
||
return (
|
||
f"Docker is required by this Cookbook launch command on {target}, but the docker CLI was not found. "
|
||
"Install Docker and make sure this user can run `docker`, then retry."
|
||
)
|
||
return f"{binary} is required on {target}, but it was not found."
|
||
|
||
async def _remote_binary_available(remote: str, ssh_port: str | None, binary: str, *, windows: bool = False) -> bool:
|
||
_port = ssh_port or ""
|
||
_pf = ["-p", _port] if _port and _port != "22" else []
|
||
if windows:
|
||
check = f"powershell -NoProfile -Command \"if (Get-Command {binary} -ErrorAction SilentlyContinue) {{ exit 0 }} else {{ exit 127 }}\""
|
||
else:
|
||
check = f"command -v {shlex.quote(binary)} >/dev/null 2>&1"
|
||
try:
|
||
proc = await asyncio.create_subprocess_exec(
|
||
"ssh", "-o", "ConnectTimeout=6", "-o", "StrictHostKeyChecking=no",
|
||
*_pf, remote, check,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
await asyncio.wait_for(proc.communicate(), timeout=10)
|
||
return proc.returncode == 0
|
||
except Exception:
|
||
return False
|
||
|
||
async def _binary_available(binary: str, remote: str | None, ssh_port: str | None, *, windows: bool = False) -> bool:
|
||
if remote:
|
||
return await _remote_binary_available(remote, ssh_port, binary, windows=windows)
|
||
return shutil.which(binary) is not None
|
||
|
||
def _launch_local_detached(session_id: str, bash_lines: list[str]) -> dict:
|
||
"""Windows-native stand-in for a LOCAL tmux session (tmux doesn't exist
|
||
on Windows). Mirrors shell_routes._generate_win_detached / bg_jobs.launch:
|
||
runs the wrapper detached so it survives a browser/SSE disconnect (the
|
||
whole point of the tmux feature for long downloads/serves), writing a
|
||
<session>.log the status poller tails and a <session>.pid for liveness.
|
||
|
||
`bash_lines` is the same bash wrapper used on POSIX. Prefers Git Bash
|
||
for full command-syntax parity; falls back to a cmd.exe wrapper that
|
||
runs the script through whatever bash is reachable, else best-effort
|
||
directly (simple commands only). Returns the launched job record."""
|
||
log_path = TMUX_LOG_DIR / f"{session_id}.log"
|
||
pid_path = TMUX_LOG_DIR / f"{session_id}.pid"
|
||
bash = find_bash()
|
||
if bash:
|
||
# Run the existing bash wrapper verbatim through Git Bash, redirecting
|
||
# all output to the log the poller reads. Paths handed to bash use
|
||
# POSIX form + shell-quoting so drive paths / spaces survive.
|
||
inner = TMUX_LOG_DIR / f"{session_id}_run.sh"
|
||
inner.write_text("\n".join(bash_lines) + "\n", encoding="utf-8")
|
||
lp = shlex.quote(log_path.as_posix())
|
||
ip = shlex.quote(inner.as_posix())
|
||
script_path = TMUX_LOG_DIR / f"{session_id}.sh"
|
||
script_path.write_text(
|
||
f"bash {ip} > {lp} 2>&1\n",
|
||
encoding="utf-8",
|
||
)
|
||
argv = [bash, str(script_path)]
|
||
else:
|
||
# No bash on this Windows host: the bash wrapper can't run. Fall back
|
||
# to a cmd.exe wrapper that just records a clear error to the log so
|
||
# the UI surfaces "install Git Bash" instead of silently hanging.
|
||
script_path = TMUX_LOG_DIR / f"{session_id}.cmd"
|
||
script_path.write_text(
|
||
"@echo off\r\n"
|
||
f'echo Cookbook LOCAL execution on Windows needs Git Bash ^(bash.exe^) on PATH. > "{log_path}" 2>&1\r\n'
|
||
f'echo Install Git for Windows, then retry. >> "{log_path}"\r\n',
|
||
encoding="utf-8",
|
||
)
|
||
argv = [os.environ.get("ComSpec", "cmd.exe"), "/c", str(script_path)]
|
||
env = os.environ.copy()
|
||
env["PYTHONUTF8"] = "1"
|
||
env["PYTHONIOENCODING"] = "utf-8"
|
||
proc = subprocess.Popen(
|
||
argv,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
stdin=subprocess.DEVNULL,
|
||
env=env,
|
||
**detached_popen_kwargs(),
|
||
)
|
||
pid_path.write_text(str(proc.pid), encoding="utf-8")
|
||
return {"pid": proc.pid, "log_path": str(log_path)}
|
||
|
||
@router.post("/api/model/download")
|
||
async def model_download(request: Request, req: ModelDownloadRequest):
|
||
"""Download a HuggingFace model in a tmux session.
|
||
Uses `hf download` CLI directly — runs in tmux via `script -qc`
|
||
for real TTY progress, streams ANSI-stripped output via log file."""
|
||
require_admin(request)
|
||
# Defence-in-depth: even though this endpoint is admin-gated, refuse
|
||
# values that would land in shell contexts with metacharacters.
|
||
_validate_repo_id(req.repo_id)
|
||
_validate_include(req.include)
|
||
_validate_remote_host(req.remote_host)
|
||
req.ssh_port = _validate_ssh_port(req.ssh_port)
|
||
req.local_dir = _validate_local_dir(req.local_dir)
|
||
req.hf_token = req.hf_token or _load_stored_hf_token()
|
||
_validate_token(req.hf_token)
|
||
TMUX_LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||
session_id = f"cookbook-{uuid.uuid4().hex[:8]}"
|
||
wrapper_script = TMUX_LOG_DIR / f"{session_id}.sh"
|
||
|
||
# When a download directory is set, target a per-model subfolder under it
|
||
# (<dir>/<name>) so the flat-directory cache scan lists it as its own
|
||
# model. Without it, hf/snapshot_download falls back to the HF cache.
|
||
_dl_short = req.repo_id.split("/")[-1] if "/" in req.repo_id else req.repo_id
|
||
_dl_base = (req.local_dir.rstrip("/") + "/" + _dl_short) if req.local_dir else None
|
||
_dl_shell = _shell_path(_dl_base) if _dl_base else None # for hf CLI / bash
|
||
_dl_pyarg = (", local_dir=os.path.expanduser(" + repr(_dl_base) + ")") if _dl_base else ""
|
||
|
||
# Build the hf download command. Redirection to suppress the interactive
|
||
# "update available? [Y/n]" prompt is added per-platform further down
|
||
# (< /dev/null on bash, $null | on PowerShell).
|
||
hf_cmd = f"hf download {req.repo_id}"
|
||
if req.include:
|
||
hf_cmd += f" --include '{req.include}'"
|
||
if _dl_shell:
|
||
hf_cmd += f" --local-dir {_dl_shell}"
|
||
|
||
# Build the shell wrapper — runs hf download directly in tmux (which is a TTY)
|
||
# No script/tee needed — we'll use tmux capture-pane to read output
|
||
lines = ["#!/bin/bash"]
|
||
lines.extend(_user_shell_path_bootstrap())
|
||
if req.hf_token:
|
||
lines.append(f"export HF_TOKEN='{_bash_squote(req.hf_token)}'")
|
||
# Ensure pip-user scripts (e.g. hf CLI installed via --user) are on PATH
|
||
lines.append('export PATH="$HOME/.local/bin:$PATH"')
|
||
# When Odysseus runs from a venv (e.g. native macOS install), put its bin
|
||
# on PATH so the tmux shell finds the bundled `hf`/`python3` without an
|
||
# activated venv. Local bash runs only — meaningless over SSH.
|
||
if not req.remote_host:
|
||
lines.append(_local_tooling_path_export(sys.executable))
|
||
# Best-effort install hf CLI (always). hf_transfer (Rust parallel downloader)
|
||
# is fast but flaky on large files — it tends to crash near the end at high
|
||
# throughput. Retries set disable_hf_transfer to fall back to the plain,
|
||
# slower-but-reliable downloader (resumes cleanly from the .incomplete files).
|
||
# Use `python3 -m pip` not `pip` — macOS has no bare `pip` command.
|
||
lines.append(f"command -v hf >/dev/null 2>&1 || {_pip_install_fallback_chain('huggingface_hub', upgrade=True)}")
|
||
if req.disable_hf_transfer:
|
||
lines.append("export HF_HUB_ENABLE_HF_TRANSFER=0")
|
||
lines.append("export HF_HUB_DOWNLOAD_MAX_WORKERS=4")
|
||
else:
|
||
lines.append(f"python3 -c 'import hf_transfer' 2>/dev/null || {_pip_install_fallback_chain('hf_transfer')}")
|
||
lines.append("python3 -c 'import hf_transfer' 2>/dev/null && export HF_HUB_ENABLE_HF_TRANSFER=1")
|
||
lines.append("export HF_HUB_DOWNLOAD_MAX_WORKERS=8")
|
||
|
||
remote = req.remote_host # None for local
|
||
is_windows = req.platform == "windows"
|
||
# LOCAL execution on a native-Windows host never uses tmux (it uses the
|
||
# detached-process path below), regardless of the UI-supplied platform.
|
||
local_windows = IS_WINDOWS and not remote
|
||
logger.info(f"Download request: repo={req.repo_id}, remote={remote}, ssh_port={req.ssh_port}, platform={req.platform}")
|
||
|
||
if not is_windows and not local_windows and not await _binary_available("tmux", remote, req.ssh_port):
|
||
return {
|
||
"ok": False,
|
||
"error": _missing_binary_message("tmux", remote or "local server"),
|
||
"session_id": session_id,
|
||
}
|
||
|
||
if remote and is_windows:
|
||
# ── Windows remote: generate .ps1 runner, use Start-Process for background ──
|
||
remote_runner = f".{session_id}_run.ps1"
|
||
ps_lines = []
|
||
ps_lines.append('$sessionDir = "$env:TEMP\\odysseus-sessions"')
|
||
ps_lines.append('New-Item -ItemType Directory -Force -Path $sessionDir | Out-Null')
|
||
if req.hf_token:
|
||
ps_lines.append(f"$env:HF_TOKEN = '{_ps_squote(req.hf_token)}'")
|
||
if req.env_prefix:
|
||
ps_lines.append(_safe_env_prefix(req.env_prefix))
|
||
# Try hf CLI, fall back to Python huggingface_hub, then auto-install
|
||
ps_lines.append('try {{')
|
||
ps_lines.append(' $hfPath = Get-Command hf -ErrorAction SilentlyContinue')
|
||
ps_lines.append(' if ($hfPath) {{')
|
||
# Pipe $null to stdin to suppress interactive "update available? [Y/n]" prompt
|
||
ps_lines.append(f' $null | {hf_cmd}')
|
||
ps_lines.append(' }} else {{')
|
||
ps_lines.append(' python -c "import huggingface_hub" 2>$null')
|
||
ps_lines.append(' if ($LASTEXITCODE -eq 0) {{')
|
||
ps_lines.append(' Write-Host "hf CLI not found, using Python huggingface_hub..."')
|
||
ps_lines.append(' python -m pip install -q hf_transfer 2>$null')
|
||
ps_lines.append(' $env:HF_HUB_ENABLE_HF_TRANSFER = "1"')
|
||
ps_lines.append(f" python -c \"import os; from huggingface_hub import snapshot_download; snapshot_download('{req.repo_id}'{_dl_pyarg}, max_workers=8)\"")
|
||
ps_lines.append(' }} else {{')
|
||
ps_lines.append(' Write-Host "Installing huggingface-hub..."')
|
||
ps_lines.append(' python -m pip install -q huggingface-hub hf_transfer')
|
||
ps_lines.append(' $env:HF_HUB_ENABLE_HF_TRANSFER = "1"')
|
||
ps_lines.append(f" python -c \"import os; from huggingface_hub import snapshot_download; snapshot_download('{req.repo_id}'{_dl_pyarg}, max_workers=8)\"")
|
||
ps_lines.append(' }}')
|
||
ps_lines.append(' }}')
|
||
ps_lines.append(' if ($LASTEXITCODE -eq 0) {{ Write-Host ""; Write-Host "DOWNLOAD_OK" }}')
|
||
ps_lines.append(' else {{ Write-Host ""; Write-Host "DOWNLOAD_FAILED (exit $LASTEXITCODE)" }}')
|
||
ps_lines.append('}} catch {{')
|
||
ps_lines.append(' Write-Host ""; Write-Host "DOWNLOAD_FAILED ($_)"')
|
||
ps_lines.append('}}')
|
||
ps_lines.append(f'Remove-Item -Force "$HOME\\{remote_runner}" -ErrorAction SilentlyContinue')
|
||
runner_path = TMUX_LOG_DIR / f"{session_id}_run.ps1"
|
||
runner_path.write_text("\r\n".join(ps_lines) + "\r\n", encoding="utf-8")
|
||
|
||
# scp the .ps1 script, then launch it as a detached process with log + pid files
|
||
_port = req.ssh_port
|
||
_Pf = f"-P {_port} " if _port and _port != "22" else ""
|
||
_pf = f"-p {_port} " if _port and _port != "22" else ""
|
||
# Start-Process creates a fully detached process that survives SSH disconnect
|
||
launch_ps = (
|
||
"$sd = \\\"$env:TEMP\\odysseus-sessions\\\"; "
|
||
f"Start-Process powershell -ArgumentList '-ExecutionPolicy','Bypass','-File','$HOME\\{remote_runner}' "
|
||
f"-RedirectStandardOutput \\\"$sd\\{session_id}.log\\\" "
|
||
f"-RedirectStandardError \\\"$sd\\{session_id}.err.log\\\" "
|
||
f"-NoNewWindow -PassThru | ForEach-Object {{ $_.Id | Out-File \\\"$sd\\{session_id}.pid\\\" }}"
|
||
)
|
||
setup_cmd = (
|
||
f"scp -O {_Pf}-q '{runner_path}' {remote}:{remote_runner} && "
|
||
f'ssh {_pf}{remote} "powershell -Command \\"{launch_ps}\\""'
|
||
)
|
||
|
||
elif remote:
|
||
# ── Linux/Termux remote: create tmux session ON the remote host ──
|
||
remote_runner = f".{session_id}_run.sh"
|
||
runner_lines = ["#!/bin/bash"]
|
||
runner_lines.extend(_user_shell_path_bootstrap())
|
||
runner_lines.append("# Auto-detect environment")
|
||
runner_lines.append("deactivate 2>/dev/null; hash -r")
|
||
if req.hf_token:
|
||
runner_lines.append(f"export HF_TOKEN='{_bash_squote(req.hf_token)}'")
|
||
if req.env_prefix:
|
||
runner_lines.append(_safe_env_prefix(req.env_prefix))
|
||
else:
|
||
# Fallback: find a venv with hf CLI, or install huggingface-hub
|
||
runner_lines.append(
|
||
'for p in ~/vllm-env ~/venv ~/.venv; do '
|
||
'if [ -f "$p/bin/activate" ]; then source "$p/bin/activate"; break; fi; '
|
||
'done'
|
||
)
|
||
# Ensure pip-user scripts (e.g. hf CLI installed via --user) are on PATH
|
||
runner_lines.append('export PATH="$HOME/.local/bin:$PATH"')
|
||
# Install hf CLI + optional hf_transfer best-effort. Retries disable
|
||
# hf_transfer because the Rust parallel path is fast but has been
|
||
# flaky near the end of very large multi-file downloads.
|
||
# Use --break-system-packages on PEP-668 systems (Arch, newer Debian) so it doesn't bail.
|
||
runner_lines.append(f"command -v hf >/dev/null 2>&1 || {_pip_install_fallback_chain('huggingface_hub', python_cmd='pip', upgrade=True)}")
|
||
if req.disable_hf_transfer:
|
||
runner_lines.append("export HF_HUB_ENABLE_HF_TRANSFER=0")
|
||
runner_lines.append("export HF_HUB_DOWNLOAD_MAX_WORKERS=4")
|
||
else:
|
||
runner_lines.append(f"python3 -c 'import hf_transfer' 2>/dev/null || {_pip_install_fallback_chain('hf_transfer', python_cmd='pip')}")
|
||
runner_lines.append("python3 -c 'import hf_transfer' 2>/dev/null && export HF_HUB_ENABLE_HF_TRANSFER=1")
|
||
runner_lines.append("export HF_HUB_DOWNLOAD_MAX_WORKERS=8")
|
||
# Surface whether the HF token actually reached THIS server, so a gated
|
||
# download's "not authorized" failure can be told apart from a missing
|
||
# token (the token is masked — we only print applied / not-set).
|
||
runner_lines.append(_HF_TOKEN_STATUS_SNIPPET)
|
||
# Try hf CLI first, fall back to Python huggingface_hub, then auto-install
|
||
runner_lines.append('if command -v hf &>/dev/null; then')
|
||
# < /dev/null suppresses interactive "update available? [Y/n]" prompt
|
||
runner_lines.append(f' {hf_cmd} < /dev/null')
|
||
runner_lines.append('elif python3 -c "import huggingface_hub" 2>/dev/null; then')
|
||
runner_lines.append(' echo "hf CLI not found, using Python huggingface_hub..."')
|
||
runner_lines.append(f' python3 -c "import os; from huggingface_hub import snapshot_download; snapshot_download(\'{req.repo_id}\'{_dl_pyarg}, max_workers={4 if req.disable_hf_transfer else 8})"')
|
||
runner_lines.append('else')
|
||
runner_lines.append(' echo "Installing huggingface-hub and dependencies..."')
|
||
runner_lines.append(' pip install --no-deps -q huggingface-hub 2>/dev/null')
|
||
if req.disable_hf_transfer:
|
||
runner_lines.append(' pip install -q filelock fsspec packaging pyyaml tqdm typer httpx requests 2>/dev/null')
|
||
runner_lines.append(' export HF_HUB_ENABLE_HF_TRANSFER=0')
|
||
else:
|
||
runner_lines.append(' pip install -q filelock fsspec packaging pyyaml tqdm typer httpx requests hf_transfer 2>/dev/null')
|
||
runner_lines.append(" python3 -c 'import hf_transfer' 2>/dev/null && export HF_HUB_ENABLE_HF_TRANSFER=1")
|
||
runner_lines.append(f' python3 -c "import os; from huggingface_hub import snapshot_download; snapshot_download(\'{req.repo_id}\'{_dl_pyarg}, max_workers={4 if req.disable_hf_transfer else 8})"')
|
||
runner_lines.append('fi')
|
||
runner_lines.append('_ec=$?; if [ $_ec -eq 0 ]; then echo ""; echo "DOWNLOAD_OK"; else echo ""; echo "DOWNLOAD_FAILED (exit $_ec)"; fi')
|
||
runner_lines.append(f"rm -f {remote_runner}")
|
||
runner_lines.append('exec "${SHELL:-/bin/bash}"')
|
||
runner_path = TMUX_LOG_DIR / f"{session_id}_run.sh"
|
||
runner_path.write_text("\n".join(runner_lines) + "\n", encoding="utf-8")
|
||
# Local temp file is scp'd then chmod'd on the remote; the local bit
|
||
# is irrelevant (no-op on Windows).
|
||
safe_chmod(runner_path, 0o755)
|
||
|
||
# scp the runner script, then create tmux session on the remote
|
||
_port = req.ssh_port
|
||
_pf = f"-P {_port} " if _port and _port != "22" else ""
|
||
_spf = f"-p {_port} " if _port and _port != "22" else ""
|
||
setup_cmd = (
|
||
f"scp -O {_pf}-q '{runner_path}' {remote}:{remote_runner} && "
|
||
f"ssh {_spf}{remote} 'chmod +x {remote_runner} && tmux new-session -d -s {session_id} \"./{remote_runner}\"'"
|
||
)
|
||
else:
|
||
# Local: run hf download in the background (tmux on POSIX, a detached
|
||
# process + logfile on Windows where tmux doesn't exist).
|
||
if req.env_prefix:
|
||
lines.append(_safe_env_prefix(req.env_prefix))
|
||
else:
|
||
lines.append("deactivate 2>/dev/null; hash -r")
|
||
# Show whether the HF token reached this run (masked) — tells a gated
|
||
# "not authorized" failure apart from a missing token.
|
||
lines.append(_HF_TOKEN_STATUS_SNIPPET)
|
||
if IS_WINDOWS:
|
||
# Detached path: no controlling TTY, so skip `< /dev/null`
|
||
# (handled by Popen stdin=DEVNULL) and don't keep a shell open.
|
||
lines.append(hf_cmd)
|
||
lines.append('_ec=$?; if [ $_ec -eq 0 ]; then echo ""; echo "DOWNLOAD_OK"; else echo ""; echo "DOWNLOAD_FAILED (exit $_ec)"; fi')
|
||
else:
|
||
# < /dev/null suppresses interactive "update available? [Y/n]" prompt
|
||
lines.append(f"{hf_cmd} < /dev/null")
|
||
lines.append('_ec=$?; if [ $_ec -eq 0 ]; then echo ""; echo "DOWNLOAD_OK"; else echo ""; echo "DOWNLOAD_FAILED (exit $_ec)"; fi')
|
||
lines.append(f"rm -f '{wrapper_script}'")
|
||
lines.append('exec "${SHELL:-/bin/bash}"')
|
||
wrapper_script.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
||
wrapper_script.chmod(0o755)
|
||
setup_cmd = None if IS_WINDOWS else f"tmux new-session -d -s {session_id} {shlex.quote(str(wrapper_script))}"
|
||
|
||
logger.info(f"Model download: {req.repo_id} (include={req.include}, session={session_id}, remote={remote})")
|
||
logger.info(f"Download setup_cmd: {setup_cmd}")
|
||
|
||
if setup_cmd is None:
|
||
# LOCAL Windows: launch the bash wrapper detached; no tmux setup_cmd.
|
||
try:
|
||
_launch_local_detached(session_id, lines)
|
||
except Exception as e:
|
||
logger.error(f"Local detached download launch failed: {e}")
|
||
return {"ok": False, "error": str(e), "session_id": session_id}
|
||
else:
|
||
proc = await asyncio.create_subprocess_shell(
|
||
setup_cmd,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
await proc.wait()
|
||
|
||
if proc.returncode != 0:
|
||
stderr = (await proc.stderr.read()).decode(errors="replace")
|
||
logger.error(f"Download failed (rc={proc.returncode}): {stderr}")
|
||
return {"ok": False, "error": stderr, "session_id": session_id}
|
||
|
||
# Log to assistant
|
||
try:
|
||
from src.assistant_log import log_to_assistant
|
||
from src.auth_helpers import get_current_user
|
||
owner = get_current_user(request)
|
||
log_to_assistant(
|
||
owner,
|
||
f"Started downloading {req.repo_id} to {remote or 'local'}",
|
||
category="Download",
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
return {"ok": True, "session_id": session_id, "remote": remote or "local"}
|
||
|
||
@router.get("/api/model/cached")
|
||
async def model_cached(request: Request, host: str | None = None, model_dir: str | None = None, ssh_port: str | None = None, platform: str | None = None):
|
||
"""List cached models. Scans HF cache + optional model directory."""
|
||
require_admin(request)
|
||
# Validate shell-bound inputs, matching the sibling list_gpus endpoint —
|
||
# `host`/`ssh_port` are interpolated into an ssh command below, so an
|
||
# unvalidated value (e.g. "x'; rm -rf ~ #") would be command injection.
|
||
host = _validate_remote_host(host)
|
||
if ssh_port is not None and ssh_port != "" and not _SSH_PORT_RE.fullmatch(ssh_port):
|
||
raise HTTPException(400, "Invalid ssh_port")
|
||
TMUX_LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
model_dirs = []
|
||
if model_dir:
|
||
for d in model_dir.split(','):
|
||
d = d.strip()
|
||
if d:
|
||
model_dirs.append(d)
|
||
paths_code = _cached_model_scan_script(model_dirs)
|
||
|
||
scan_py = TMUX_LOG_DIR / "scan_cache.py"
|
||
scan_py.write_text(paths_code, encoding="utf-8")
|
||
|
||
if host:
|
||
_pf = f"-p {ssh_port} " if ssh_port and ssh_port != "22" else ""
|
||
if platform == "windows":
|
||
# Windows: use 'python' and pipe via stdin with double-quote wrapping
|
||
cmd = f'ssh {_pf}{host} "python -" < \'{scan_py}\''
|
||
else:
|
||
cmd = f"ssh {_pf}{host} 'python3 -' < '{scan_py}'"
|
||
proc = await asyncio.create_subprocess_shell(
|
||
cmd,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
cwd=str(Path.home()),
|
||
)
|
||
else:
|
||
# LOCAL scan: use sys.executable (the venv Python Odysseus is already
|
||
# running under) — it's guaranteed real Python on all platforms.
|
||
# Falling back to which_tool on Windows risks hitting the Microsoft
|
||
# Store stub alias for "python3"/"python", which prints
|
||
# "Python was not found; run without arguments to install from the
|
||
# Microsoft Store" and exits 9009, producing empty stdout and a
|
||
# JSON parse error. sys.executable bypasses PATH entirely.
|
||
local_py = sys.executable or (
|
||
which_tool("python3") or which_tool("python")
|
||
or which_tool("py") or "python"
|
||
)
|
||
proc = await asyncio.create_subprocess_exec(
|
||
local_py, str(scan_py),
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
cwd=str(Path.home()),
|
||
)
|
||
stdout_b, stderr_b = await asyncio.wait_for(proc.communicate(), timeout=60)
|
||
|
||
models = []
|
||
try:
|
||
raw = json.loads(stdout_b.decode(errors="replace").strip())
|
||
for m in raw:
|
||
size_gb = m["size_bytes"] / (1024 ** 3)
|
||
if size_gb >= 1:
|
||
size_str = f"{size_gb:.1f} GB"
|
||
else:
|
||
size_str = f"{m['size_bytes'] / (1024**2):.0f} MB"
|
||
entry = {
|
||
"repo_id": m["repo_id"],
|
||
"size": size_str,
|
||
"nb_files": m["nb_files"],
|
||
"has_incomplete": m["has_incomplete"],
|
||
"status": "downloading" if m["has_incomplete"] else "ready",
|
||
"path": m.get("path", ""),
|
||
"is_diffusion": m.get("is_diffusion", False),
|
||
}
|
||
if m.get("is_local_dir"):
|
||
entry["is_local_dir"] = True
|
||
if m.get("is_gguf"):
|
||
entry["is_gguf"] = True
|
||
if m.get("backend"):
|
||
entry["backend"] = m.get("backend")
|
||
if m.get("is_ollama"):
|
||
entry["is_ollama"] = True
|
||
if isinstance(m.get("gguf_files"), list):
|
||
entry["gguf_files"] = m["gguf_files"]
|
||
models.append(entry)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to parse cached models: {e}")
|
||
logger.warning(f"stderr: {stderr_b.decode(errors='replace')[:500]}")
|
||
|
||
return {"models": models, "host": host or "local"}
|
||
|
||
def _auto_register_image_endpoint(req: ServeRequest, remote: str | None) -> str | None:
|
||
"""Register a diffusion model as an image endpoint so it appears in the model selector."""
|
||
import re
|
||
from core.database import SessionLocal, ModelEndpoint
|
||
|
||
# Parse port from command (--port NNNN), default 8100 for diffusion_server
|
||
port_match = re.search(r'--port\s+(\d+)', req.cmd)
|
||
port = int(port_match.group(1)) if port_match else 8100
|
||
|
||
# Determine host
|
||
if remote:
|
||
# SSH alias — use as hostname (Tailscale resolves it later)
|
||
host = remote.split("@")[-1] if "@" in remote else remote
|
||
else:
|
||
host = "localhost"
|
||
|
||
base_url = f"http://{host}:{port}/v1"
|
||
|
||
# Friendly display name from repo_id
|
||
short_name = req.repo_id.split("/")[-1] if "/" in req.repo_id else req.repo_id
|
||
display_name = f"{short_name} (image)"
|
||
|
||
db = SessionLocal()
|
||
try:
|
||
# Check for existing endpoint with same base_url — update it
|
||
existing = db.query(ModelEndpoint).filter(ModelEndpoint.base_url == base_url).first()
|
||
if existing:
|
||
existing.is_enabled = True
|
||
existing.model_type = "image"
|
||
existing.name = display_name
|
||
db.commit()
|
||
logger.info(f"Updated existing image endpoint: {base_url}")
|
||
return existing.id
|
||
|
||
ep_id = f"img-{uuid.uuid4().hex[:8]}"
|
||
ep = ModelEndpoint(
|
||
id=ep_id,
|
||
name=display_name,
|
||
base_url=base_url,
|
||
api_key=None,
|
||
is_enabled=True,
|
||
model_type="image",
|
||
)
|
||
db.add(ep)
|
||
db.commit()
|
||
logger.info(f"Auto-registered image endpoint: {display_name} @ {base_url}")
|
||
return ep_id
|
||
except Exception as e:
|
||
logger.error(f"Failed to auto-register image endpoint: {e}")
|
||
db.rollback()
|
||
return None
|
||
finally:
|
||
db.close()
|
||
|
||
def _pick_free_port_for_ollama(
|
||
remote: str | None, ssh_port: str | None, start_port: int, max_offset: int
|
||
) -> int | None:
|
||
"""Return the first free port in [start_port, start_port+max_offset] on
|
||
the target host. Used to pick a real bind for `ollama serve` so we
|
||
don't reattach to an external systemd ollama (or other listener) the
|
||
Cookbook Stop button can't kill."""
|
||
import socket
|
||
if remote:
|
||
# Probe over SSH. Bash's /dev/tcp gives a portable "is anything
|
||
# listening" check without requiring ss/netstat/nmap.
|
||
ssh_base = ["ssh", "-o", "ConnectTimeout=4", "-o", "StrictHostKeyChecking=no"]
|
||
if ssh_port and str(ssh_port) != "22":
|
||
if not _SSH_PORT_RE.match(str(ssh_port)):
|
||
return None
|
||
ssh_base.extend(["-p", str(ssh_port)])
|
||
host_arg = remote
|
||
if not _REMOTE_HOST_RE.match(host_arg):
|
||
return None
|
||
probe_ports = " ".join(str(start_port + i) for i in range(max_offset + 1))
|
||
script = (
|
||
f"for p in {probe_ports}; do "
|
||
"if ! (exec 3<>/dev/tcp/127.0.0.1/$p) 2>/dev/null; then "
|
||
"echo $p; exit 0; fi; exec 3<&-; exec 3>&-; done; exit 1"
|
||
)
|
||
try:
|
||
import subprocess
|
||
r = subprocess.run(
|
||
ssh_base + [host_arg, script],
|
||
capture_output=True, text=True, timeout=8,
|
||
)
|
||
if r.returncode == 0:
|
||
out = (r.stdout or "").strip().splitlines()
|
||
if out and out[0].isdigit():
|
||
return int(out[0])
|
||
except Exception:
|
||
return None
|
||
return None
|
||
# Local: just try to connect.
|
||
for off in range(max_offset + 1):
|
||
p = start_port + off
|
||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||
s.settimeout(0.25)
|
||
try:
|
||
s.connect(("127.0.0.1", p))
|
||
except (ConnectionRefusedError, socket.timeout, OSError):
|
||
return p
|
||
return None
|
||
|
||
def _auto_register_llm_endpoint(req: ServeRequest, remote: str | None) -> str | None:
|
||
"""Register a freshly-served LLM as a model endpoint so it appears in the
|
||
model picker without a manual /setup step — the text-model sibling of
|
||
_auto_register_image_endpoint.
|
||
|
||
Cookbook serve commands launch an OpenAI-compatible server (llama.cpp's
|
||
llama-server, vLLM, SGLang, or Ollama) on a known port. We point an
|
||
endpoint at that server's /v1; the picker auto-discovers the model id by
|
||
probing /v1/models and dims the endpoint until the server is reachable,
|
||
so registering immediately (before the server finishes loading) is safe.
|
||
"""
|
||
import re
|
||
from core.database import SessionLocal, ModelEndpoint
|
||
|
||
# Port: ordered fallbacks so we match whatever the user actually
|
||
# asked for, not a hardcoded default:
|
||
# 1. explicit `--port N` (vllm / sglang / llama-server)
|
||
# 2. `OLLAMA_HOST=host:port` (the way Ollama specifies its bind)
|
||
# 3. fallback by backend (11434 ollama / 8080 llama.cpp)
|
||
# Previously the OLLAMA_HOST form was silently ignored and we
|
||
# registered every Ollama endpoint at 11434 — even if the user
|
||
# set OLLAMA_HOST=0.0.0.0:11435 to avoid colliding with an
|
||
# existing systemd Ollama, the registered endpoint pointed at
|
||
# the OLD port and showed as offline.
|
||
port_match = re.search(r'--port\s+(\d+)', req.cmd)
|
||
ollama_host_match = re.search(r'OLLAMA_HOST=[^\s]*?:(\d+)', req.cmd)
|
||
if port_match:
|
||
port = int(port_match.group(1))
|
||
elif ollama_host_match:
|
||
port = int(ollama_host_match.group(1))
|
||
elif "ollama" in req.cmd:
|
||
port = 11434
|
||
else:
|
||
port = 8080 # llama.cpp's llama-server default — the Apple Silicon path
|
||
|
||
# Determine host (mirrors the image path: SSH alias for remote serves).
|
||
# For local serves while Odysseus runs inside Docker, "localhost"
|
||
# resolves to the container itself — useless. Use host.docker.internal
|
||
# which compose maps to the actual host, matching what /setup adds
|
||
# for Ollama by hand.
|
||
if remote:
|
||
host = remote.split("@")[-1] if "@" in remote else remote
|
||
else:
|
||
from routes.model_routes import _docker_host_gateway_reachable
|
||
host = "host.docker.internal" if _docker_host_gateway_reachable() else "localhost"
|
||
|
||
base_url = f"http://{host}:{port}/v1"
|
||
|
||
short_name = req.repo_id.split("/")[-1] if "/" in req.repo_id else req.repo_id
|
||
display_name = short_name or "Local model"
|
||
|
||
# If the serve command opts models into OpenAI tool-calling, record it so
|
||
# agent_loop trusts emitted tool_calls instead of the name heuristic.
|
||
supports_tools = True if "--enable-auto-tool-choice" in req.cmd else None
|
||
|
||
db = SessionLocal()
|
||
try:
|
||
# Reuse an endpoint already pointed at this URL instead of duplicating.
|
||
existing = db.query(ModelEndpoint).filter(ModelEndpoint.base_url == base_url).first()
|
||
if existing:
|
||
existing.is_enabled = True
|
||
existing.model_type = "llm"
|
||
existing.name = display_name
|
||
if supports_tools is not None:
|
||
existing.supports_tools = supports_tools
|
||
db.commit()
|
||
logger.info(f"Updated existing local model endpoint: {base_url}")
|
||
return existing.id
|
||
|
||
ep_id = f"local-{uuid.uuid4().hex[:8]}"
|
||
ep = ModelEndpoint(
|
||
id=ep_id,
|
||
name=display_name,
|
||
base_url=base_url,
|
||
api_key=None,
|
||
is_enabled=True,
|
||
model_type="llm",
|
||
supports_tools=supports_tools,
|
||
)
|
||
db.add(ep)
|
||
db.commit()
|
||
logger.info(f"Auto-registered local model endpoint: {display_name} @ {base_url}")
|
||
return ep_id
|
||
except Exception as e:
|
||
logger.error(f"Failed to auto-register local model endpoint: {e}")
|
||
db.rollback()
|
||
return None
|
||
finally:
|
||
db.close()
|
||
|
||
@router.post("/api/model/serve")
|
||
async def model_serve(request: Request, req: ServeRequest):
|
||
"""Launch a model server in a tmux session (or PowerShell background process on Windows).
|
||
|
||
`repo_id` is dual-purpose: a HuggingFace repo (`<org>/<name>`) for
|
||
model-serve commands, a cached local-model id (the folder name reported
|
||
by `/api/model/cached`) for models scanned from a custom model dir, OR a
|
||
bare pip package name when the cmd is a `python -m pip install …`. We
|
||
keep strict validation, but serving local cached models must not require
|
||
a fake org/name wrapper.
|
||
"""
|
||
require_admin(request)
|
||
# Defence-in-depth: reject values that could break out of shell contexts.
|
||
_validate_remote_host(req.remote_host)
|
||
req.ssh_port = _validate_ssh_port(req.ssh_port)
|
||
req.gpus = _validate_gpus(req.gpus)
|
||
req.hf_token = req.hf_token or _load_stored_hf_token()
|
||
_validate_token(req.hf_token)
|
||
# Normalize away backslash-newline continuations (multi-line pasted
|
||
# serve commands) so the cleaned single-line command is what gets
|
||
# written into the runner script and used for engine auto-detection.
|
||
# `_validate_serve_cmd` returns None for empty input; coerce to "" so the
|
||
# many downstream `"engine" in req.cmd` membership checks can't hit
|
||
# `TypeError: argument of type 'NoneType'` (a 500 instead of a clean 400).
|
||
req.cmd = _validate_serve_cmd(req.cmd) or ""
|
||
req.cmd = _venv_safe_local_pip_install_cmd(
|
||
req.cmd,
|
||
local=not bool(req.remote_host),
|
||
in_venv=sys.prefix != sys.base_prefix,
|
||
)
|
||
is_pip_install = bool(req.cmd and "pip install" in req.cmd)
|
||
if is_pip_install:
|
||
# Keep big dependency wheel builds (vLLM, …) off the home filesystem's
|
||
# pip cache so they don't fail mid-build with "No space left" (#1219)
|
||
# and leave the dep installed-but-unusable (#1459).
|
||
req.cmd = _pip_install_no_cache(req.cmd)
|
||
# PEP-508-style package spec — letters, digits, `.-_` for the
|
||
# name; `[` `]` for extras; `<>=!~,` for version specifiers.
|
||
# v2 review HIGH-14: tightened from the previous regex which
|
||
# also allowed spaces and `+`, both of which can be abused to
|
||
# introduce extra shell tokens once interpolated into the
|
||
# serve command. We now use `re.fullmatch` and drop space/`+`.
|
||
if not req.repo_id or not re.fullmatch(
|
||
r"[A-Za-z0-9][A-Za-z0-9._\-\[\]<>=!,~]{0,200}", req.repo_id
|
||
):
|
||
raise HTTPException(400, "Invalid pip package name")
|
||
else:
|
||
_validate_serve_model_id(req.repo_id)
|
||
TMUX_LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||
session_id = f"serve-{uuid.uuid4().hex[:8]}"
|
||
remote = req.remote_host
|
||
is_windows = req.platform == "windows"
|
||
|
||
# Ollama: if the user didn't pin a port, resolve the actual port we'll
|
||
# bind to here (before runner construction) by probing the target host.
|
||
# Otherwise the runner script picks one at runtime and `_auto_register`
|
||
# below still registers the stale 11434 default — which on a host with
|
||
# a systemd ollama lands on the wrong (unreachable-from-docker) service.
|
||
if "ollama" in req.cmd and "OLLAMA_HOST=" not in req.cmd:
|
||
_ollama_bind_host = "0.0.0.0" if remote else "127.0.0.1"
|
||
_ollama_chosen_port = _pick_free_port_for_ollama(
|
||
remote, req.ssh_port, start_port=11434, max_offset=10,
|
||
)
|
||
if _ollama_chosen_port:
|
||
req.cmd = f"OLLAMA_HOST={_ollama_bind_host}:{_ollama_chosen_port} {req.cmd}"
|
||
# LOCAL execution on a native-Windows host never uses tmux (detached
|
||
# process path below), regardless of the UI-supplied platform.
|
||
local_windows = IS_WINDOWS and not remote
|
||
|
||
if not is_windows and not local_windows and not await _binary_available("tmux", remote, req.ssh_port):
|
||
return {
|
||
"ok": False,
|
||
"error": _missing_binary_message("tmux", remote or "local server"),
|
||
"session_id": session_id,
|
||
}
|
||
if _needs_binary(req.cmd, "docker") and not await _binary_available("docker", remote, req.ssh_port, windows=is_windows):
|
||
return {
|
||
"ok": False,
|
||
"error": _missing_binary_message("docker", remote or "local server"),
|
||
"session_id": session_id,
|
||
}
|
||
|
||
if is_windows and remote:
|
||
# ── Windows remote: generate .ps1 serve runner ──
|
||
remote_runner = f".{session_id}_run.ps1"
|
||
ps_lines = []
|
||
ps_lines.append('$sessionDir = "$env:TEMP\\odysseus-sessions"')
|
||
ps_lines.append('New-Item -ItemType Directory -Force -Path $sessionDir | Out-Null')
|
||
if req.hf_token:
|
||
ps_lines.append(f"$env:HF_TOKEN = '{_ps_squote(req.hf_token)}'")
|
||
if req.gpus:
|
||
ps_lines.append(f"$env:CUDA_VISIBLE_DEVICES = '{req.gpus}'")
|
||
if req.env_prefix:
|
||
ps_lines.append(_safe_env_prefix(req.env_prefix))
|
||
# Auto-install ollama if the command uses it
|
||
if "ollama" in req.cmd:
|
||
ps_lines.append('# Check if ollama is available')
|
||
ps_lines.append('if (-not (Get-Command ollama -ErrorAction SilentlyContinue)) {')
|
||
ps_lines.append(' Write-Host "Ollama not found. Please install from https://ollama.com/download/windows"')
|
||
ps_lines.append(' exit 1')
|
||
ps_lines.append('}')
|
||
elif "llama_cpp" in req.cmd or "llama-server" in req.cmd:
|
||
ps_lines.append('# Auto-install llama-cpp-python if missing')
|
||
ps_lines.append('try { python -c "import llama_cpp" 2>$null } catch {}')
|
||
ps_lines.append('if ($LASTEXITCODE -ne 0) {')
|
||
ps_lines.append(' Write-Host "Installing llama-cpp-python..."')
|
||
ps_lines.append(' python -m pip install llama-cpp-python[server]')
|
||
ps_lines.append('}')
|
||
elif "vllm" in req.cmd:
|
||
ps_lines.append('Write-Host "ERROR: vLLM is not supported on Windows. Use Ollama or llama.cpp instead."')
|
||
ps_lines.append('exit 1')
|
||
ps_lines.append(req.cmd)
|
||
if is_pip_install:
|
||
ps_lines.append('if ($LASTEXITCODE -eq 0) { Write-Host ""; Write-Host "DOWNLOAD_OK" }')
|
||
ps_lines.append('Write-Host ""')
|
||
ps_lines.append('Write-Host "=== Process exited with code $LASTEXITCODE ==="')
|
||
runner_path = TMUX_LOG_DIR / f"{session_id}_run.ps1"
|
||
runner_path.write_text("\r\n".join(ps_lines) + "\r\n", encoding="utf-8")
|
||
|
||
_port = req.ssh_port
|
||
_Pf = f"-P {_port} " if _port and _port != "22" else ""
|
||
_pf = f"-p {_port} " if _port and _port != "22" else ""
|
||
launch_ps = (
|
||
"$sd = \\\"$env:TEMP\\odysseus-sessions\\\"; "
|
||
f"Start-Process powershell -ArgumentList '-ExecutionPolicy','Bypass','-File','$HOME\\{remote_runner}' "
|
||
f"-RedirectStandardOutput \\\"$sd\\{session_id}.log\\\" "
|
||
f"-RedirectStandardError \\\"$sd\\{session_id}.err.log\\\" "
|
||
f"-NoNewWindow -PassThru | ForEach-Object {{ $_.Id | Out-File \\\"$sd\\{session_id}.pid\\\" }}"
|
||
)
|
||
setup_cmd = (
|
||
f"scp -O {_Pf}-q '{runner_path}' {remote}:{remote_runner} && "
|
||
f'ssh {_pf}{remote} "powershell -Command \\"{launch_ps}\\""'
|
||
)
|
||
else:
|
||
# ── Linux/Termux: bash + tmux (existing flow) ──
|
||
runner_lines = ["#!/bin/bash"]
|
||
# Mirror every line of stdout+stderr into a persistent log file
|
||
# on the host running the serve. This is the file tail_serve_output
|
||
# reads when the tmux pane has been overwritten by the post-crash
|
||
# bash prompt — without it, the agent's diagnostic tool sees the
|
||
# neofetch banner instead of the actual Python traceback.
|
||
# We save the original fds to 3/4 so we can RESTORE them before
|
||
# `exec ${SHELL}` at the end of the script. Without that restore,
|
||
# the post-crash interactive shell's neofetch banner ALSO gets
|
||
# teed into the log file and `tail -N` returns ONLY the banner —
|
||
# the actual traceback ends up earlier than the tail window.
|
||
runner_lines.append("mkdir -p /tmp/odysseus-tmux 2>/dev/null || true")
|
||
runner_lines.append("exec 3>&1 4>&2")
|
||
runner_lines.append(
|
||
f"exec > >(tee -a /tmp/odysseus-tmux/{session_id}.log) 2>&1"
|
||
)
|
||
runner_lines.extend(_user_shell_path_bootstrap())
|
||
runner_lines.append('ODYSSEUS_PREFLIGHT_EXIT=""')
|
||
# Put Odysseus's own venv bin on PATH (local runs only) so the serve
|
||
# shell resolves the bundled python3/hf, mirroring the download flow.
|
||
if not remote:
|
||
runner_lines.append(_local_tooling_path_export(sys.executable))
|
||
runner_lines.append("export FLASHINFER_DISABLE_VERSION_CHECK=1")
|
||
if req.hf_token:
|
||
runner_lines.append(f"export HF_TOKEN='{_bash_squote(req.hf_token)}'")
|
||
if req.gpus:
|
||
runner_lines.append(f"export CUDA_VISIBLE_DEVICES='{req.gpus}'")
|
||
if req.env_prefix:
|
||
runner_lines.append(_safe_env_prefix(req.env_prefix))
|
||
else:
|
||
runner_lines.append("deactivate 2>/dev/null; hash -r")
|
||
# Show whether the HF token reached this server (masked) — a gated
|
||
# model vLLM has to download will be denied without it.
|
||
runner_lines.append(_HF_TOKEN_STATUS_SNIPPET)
|
||
handled_ollama_serve = False
|
||
# Auto-install inference engine if missing
|
||
if "llama_cpp" in req.cmd or "llama-server" in req.cmd:
|
||
# Prefer the NATIVE llama-server binary — its minja templating
|
||
# renders modern GGUF chat templates that the Python bindings'
|
||
# Jinja2 rejects (do_tojson ensure_ascii). Build it once from
|
||
# source if missing; keep llama-cpp-python only as a fallback.
|
||
runner_lines.append('# Ensure a llama.cpp server (prefer native llama-server)')
|
||
# Include the Homebrew bin dirs so a brew-installed llama-server /
|
||
# ollama is found (otherwise macOS falls back to a slow source build).
|
||
# /opt/homebrew = Apple Silicon, /usr/local = Intel; harmless on Linux.
|
||
runner_lines.append('export PATH="$HOME/.local/bin:$HOME/bin:$HOME/llama.cpp/build/bin:/opt/homebrew/bin:/usr/local/bin:$PATH"')
|
||
runner_lines.append('if [ -d /data/data/com.termux ]; then')
|
||
runner_lines.append(' # Termux: no native build — use the Python bindings (CPU).')
|
||
runner_lines.append(' if ! python3 -c "import llama_cpp" 2>/dev/null; then')
|
||
runner_lines.append(' pkg install -y cmake 2>/dev/null')
|
||
runner_lines.append(' pip install numpy diskcache jinja2 2>/dev/null')
|
||
runner_lines.append(' CMAKE_ARGS="-DGGML_BLAS=OFF -DGGML_LLAMAFILE=OFF" pip install \'llama-cpp-python[server]\' --no-build-isolation --no-cache-dir 2>&1 || true')
|
||
runner_lines.append(' fi')
|
||
runner_lines.append('elif ! command -v llama-server &>/dev/null; then')
|
||
runner_lines.append(' echo "Native llama-server not found — building from source (one-time, may take a few minutes)..."')
|
||
runner_lines.append(' mkdir -p ~/bin')
|
||
runner_lines.append(' cd ~ && [ -d llama.cpp ] || git clone --depth 1 https://github.com/ggml-org/llama.cpp')
|
||
# Build with the right accelerator: Metal on macOS (llama.cpp
|
||
# enables it automatically, no flag), CUDA on Linux when present,
|
||
# else a plain CPU build. nproc is Linux-only — fall back to
|
||
# `sysctl hw.ncpu` on macOS. (Tip: `brew install llama.cpp` ships
|
||
# a prebuilt llama-server and skips this whole source build.)
|
||
runner_lines.append(' NPROC="$(nproc 2>/dev/null || sysctl -n hw.ncpu 2>/dev/null || echo 4)"')
|
||
runner_lines.append(' if [ "$(uname -s)" = "Darwin" ]; then')
|
||
runner_lines.append(' command -v cmake >/dev/null 2>&1 || echo "WARNING: cmake not found — install it with: brew install cmake (or: brew install llama.cpp for a prebuilt llama-server)."')
|
||
# Start from a clean cache: a prior failed configure (e.g. a CUDA
|
||
# attempt) poisons build/CMakeCache.txt, so a plain `cmake -B build`
|
||
# would reuse the bad settings and fail again. CMAKE_BUILD_TYPE is
|
||
# explicit so the binary is optimized (Metal auto-enables on macOS).
|
||
runner_lines.append(' cd ~/llama.cpp && rm -rf build && cmake -B build -DCMAKE_BUILD_TYPE=Release \\')
|
||
runner_lines.append(' && cmake --build build -j"$NPROC" --target llama-server \\')
|
||
runner_lines.append(' && ln -sf ~/llama.cpp/build/bin/llama-server ~/bin/llama-server')
|
||
runner_lines.append(' else')
|
||
_append_llama_cpp_linux_accel_build_lines(runner_lines)
|
||
runner_lines.append(' fi')
|
||
runner_lines.append(' # If the native build failed, fall back to the Python bindings.')
|
||
runner_lines.append(' if ! command -v llama-server &>/dev/null && ! python3 -c "import llama_cpp" 2>/dev/null; then')
|
||
runner_lines.append(' echo "llama-server build failed — installing Python bindings as fallback..."')
|
||
runner_lines.append(f" {_pip_install_fallback_chain('llama-cpp-python[server]', python_cmd='pip')} || true")
|
||
runner_lines.append(' fi')
|
||
runner_lines.append(' if ! command -v llama-server &>/dev/null && ! python3 -c "import llama_cpp" 2>/dev/null; then')
|
||
runner_lines.append(' echo "ERROR: llama.cpp serving is not available after install/build attempts."')
|
||
runner_lines.append(' ODYSSEUS_PREFLIGHT_EXIT=127')
|
||
runner_lines.append(' fi')
|
||
runner_lines.append('fi')
|
||
elif "ollama" in req.cmd:
|
||
handled_ollama_serve = True
|
||
_ollama_default_host = "0.0.0.0" if remote else "127.0.0.1"
|
||
_ollama_host, _ollama_port = _ollama_bind_from_cmd(
|
||
req.cmd,
|
||
default_host=_ollama_default_host,
|
||
)
|
||
# Always launch a fresh ollama under tmux so Stop reliably
|
||
# kills it. If the requested port is busy (e.g. a systemd
|
||
# ollama on 11434), scan upward for a free one rather than
|
||
# silently reattaching to an external service that Stop
|
||
# can't reach.
|
||
runner_lines.append(f'ODYSSEUS_OLLAMA_HOST={_bash_squote(_ollama_host)}')
|
||
runner_lines.append(f'ODYSSEUS_OLLAMA_PORT="{_ollama_port}"')
|
||
runner_lines.append('for _ody_off in 0 1 2 3 4 5 6 7 8 9; do')
|
||
runner_lines.append(' _ody_try_port=$((ODYSSEUS_OLLAMA_PORT + _ody_off))')
|
||
runner_lines.append(' if ! (exec 3<>/dev/tcp/127.0.0.1/$_ody_try_port) 2>/dev/null; then')
|
||
runner_lines.append(' exec 3<&-; exec 3>&-')
|
||
runner_lines.append(' ODYSSEUS_OLLAMA_PORT="$_ody_try_port"')
|
||
runner_lines.append(' break')
|
||
runner_lines.append(' fi')
|
||
runner_lines.append(' exec 3<&-; exec 3>&-')
|
||
runner_lines.append('done')
|
||
runner_lines.append('if ! command -v ollama &>/dev/null; then')
|
||
runner_lines.append(' echo "ERROR: Ollama not found on this server. Install it from https://ollama.com/download or `curl -fsSL https://ollama.com/install.sh | sh`."')
|
||
runner_lines.append(' echo')
|
||
runner_lines.append(' echo "=== Process exited with code 127 ==="')
|
||
runner_lines.append(' exec bash -i')
|
||
runner_lines.append('fi')
|
||
runner_lines.append('ODYSSEUS_OLLAMA_URL="http://${ODYSSEUS_OLLAMA_HOST}:${ODYSSEUS_OLLAMA_PORT}"')
|
||
if remote and _ollama_host in ("0.0.0.0", "::"):
|
||
runner_lines.append('echo "[odysseus] WARNING: remote Ollama will bind to ${ODYSSEUS_OLLAMA_HOST}:${ODYSSEUS_OLLAMA_PORT} so Odysseus can reach it from this host."')
|
||
runner_lines.append('echo "[odysseus] Ollama has no built-in authentication; expose this only on a trusted LAN/VPN or provide an explicit OLLAMA_HOST with your own access controls."')
|
||
runner_lines.append('echo "Starting ollama server on ${ODYSSEUS_OLLAMA_HOST}:${ODYSSEUS_OLLAMA_PORT}..."')
|
||
runner_lines.append('OLLAMA_HOST="${ODYSSEUS_OLLAMA_HOST}:${ODYSSEUS_OLLAMA_PORT}" ollama serve')
|
||
runner_lines.append('_ody_exit=$?')
|
||
runner_lines.append('echo')
|
||
runner_lines.append('echo "=== Process exited with code ${_ody_exit} ==="')
|
||
runner_lines.append('exec bash -i')
|
||
elif "vllm serve" in req.cmd:
|
||
# vLLM is CUDA/ROCm-only and does not run on macOS at all.
|
||
runner_lines.append('if [ "$(uname -s)" = "Darwin" ]; then')
|
||
runner_lines.append(' echo "ERROR: vLLM does not run on macOS. Use Ollama or llama.cpp (Metal) instead."')
|
||
runner_lines.append(' ODYSSEUS_PREFLIGHT_EXIT=1')
|
||
runner_lines.append('fi')
|
||
_append_vllm_linux_preflight_lines(runner_lines)
|
||
elif "sglang.launch_server" in req.cmd:
|
||
runner_lines.append('export PATH="$HOME/.local/bin:$PATH"')
|
||
runner_lines.append('if ! command -v sglang &>/dev/null; then')
|
||
runner_lines.append(' echo "ERROR: SGLang is not installed."')
|
||
runner_lines.append(' ODYSSEUS_PREFLIGHT_EXIT=127')
|
||
runner_lines.append('elif ! ODYSSEUS_SGLANG_IMPORT_ERROR="$(python3 -c "import sglang" 2>&1)"; then')
|
||
runner_lines.append(' echo "ERROR: SGLang is installed but failed to import."')
|
||
runner_lines.append(' printf "%s\\n" "$ODYSSEUS_SGLANG_IMPORT_ERROR"')
|
||
runner_lines.append(' ODYSSEUS_PREFLIGHT_EXIT=127')
|
||
runner_lines.append('fi')
|
||
elif "scripts/diffusion_server.py" in req.cmd or ".diffusion_server.py" in req.cmd:
|
||
runner_lines.append('export PATH="$HOME/.local/bin:$PATH"')
|
||
runner_lines.append('if ! ODYSSEUS_DIFFUSION_IMPORT_ERROR="$(python3 -c "import torch, diffusers" 2>&1)"; then')
|
||
runner_lines.append(' echo "ERROR: Diffusion serving requires PyTorch + diffusers."')
|
||
runner_lines.append(' printf "%s\\n" "$ODYSSEUS_DIFFUSION_IMPORT_ERROR"')
|
||
runner_lines.append(' ODYSSEUS_PREFLIGHT_EXIT=127')
|
||
runner_lines.append('fi')
|
||
|
||
if not handled_ollama_serve:
|
||
_append_serve_preflight_exit_lines(
|
||
runner_lines,
|
||
keep_shell_open=not local_windows,
|
||
)
|
||
runner_lines.append(req.cmd)
|
||
if local_windows:
|
||
# Detached background process — no interactive shell to keep open.
|
||
# Print the exit marker the status poller looks for, then stop.
|
||
_append_serve_exit_code_lines(
|
||
runner_lines,
|
||
keep_shell_open=False,
|
||
is_pip_install=is_pip_install,
|
||
)
|
||
else:
|
||
# Keep shell open after exit so user can see errors
|
||
_append_serve_exit_code_lines(
|
||
runner_lines,
|
||
keep_shell_open=True,
|
||
is_pip_install=is_pip_install,
|
||
)
|
||
|
||
runner_path = TMUX_LOG_DIR / f"{session_id}_run.sh"
|
||
runner_path.write_text("\n".join(runner_lines) + "\n", encoding="utf-8")
|
||
# chmod is a no-op on Windows; bash on Windows runs the script
|
||
# regardless of the executable bit.
|
||
safe_chmod(runner_path, 0o755)
|
||
|
||
if local_windows:
|
||
# LOCAL Windows: launch the bash runner detached (tmux replacement).
|
||
setup_cmd = None
|
||
elif remote:
|
||
remote_runner = f".{session_id}_run.sh"
|
||
# If command references scripts/, scp those too
|
||
scp_extras = ""
|
||
_port = req.ssh_port
|
||
_Pf = f"-P {_port} " if _port and _port != "22" else ""
|
||
_pf = f"-p {_port} " if _port and _port != "22" else ""
|
||
if "scripts/diffusion_server.py" in req.cmd:
|
||
from core.constants import BASE_DIR
|
||
diff_script = Path(BASE_DIR) / "scripts" / "diffusion_server.py"
|
||
if diff_script.exists():
|
||
scp_extras = f"scp -O {_Pf}-q '{diff_script}' {remote}:.diffusion_server.py && "
|
||
runner_path.write_text(
|
||
runner_path.read_text(encoding="utf-8").replace(
|
||
"scripts/diffusion_server.py", ".diffusion_server.py"
|
||
),
|
||
encoding="utf-8",
|
||
)
|
||
setup_cmd = (
|
||
f"{scp_extras}"
|
||
f"scp -O {_Pf}-q '{runner_path}' {remote}:{remote_runner} && "
|
||
f"ssh {_pf}{remote} 'chmod +x {remote_runner} && tmux new-session -d -s {session_id} \"./{remote_runner}\"'"
|
||
)
|
||
else:
|
||
setup_cmd = f"tmux new-session -d -s {session_id} {shlex.quote(str(runner_path))}"
|
||
|
||
if setup_cmd is None:
|
||
# LOCAL Windows: launch the bash runner detached; no tmux setup_cmd.
|
||
try:
|
||
_launch_local_detached(session_id, runner_lines)
|
||
except Exception as e:
|
||
logger.error(f"Local detached serve launch failed: {e}")
|
||
return {"ok": False, "error": str(e), "session_id": session_id}
|
||
else:
|
||
proc = await asyncio.create_subprocess_shell(
|
||
setup_cmd,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
await proc.wait()
|
||
|
||
if proc.returncode != 0:
|
||
stderr = (await proc.stderr.read()).decode(errors="replace")
|
||
return {"ok": False, "error": stderr, "session_id": session_id}
|
||
|
||
# Auto-register a model endpoint so the served model shows up in the model
|
||
# picker with no manual /setup step. Diffusion models get an image
|
||
# endpoint; any other real model serve (i.e. not a pip-install task) gets
|
||
# a local LLM endpoint pointed at its /v1.
|
||
endpoint_id = None
|
||
is_diffusion = "diffusion_server.py" in req.cmd
|
||
if is_diffusion:
|
||
endpoint_id = _auto_register_image_endpoint(req, remote)
|
||
elif not is_pip_install:
|
||
endpoint_id = _auto_register_llm_endpoint(req, remote)
|
||
|
||
# Log to assistant
|
||
try:
|
||
from src.assistant_log import log_to_assistant
|
||
from src.auth_helpers import get_current_user
|
||
owner = get_current_user(request)
|
||
short = req.repo_id.split("/")[-1] if "/" in req.repo_id else req.repo_id
|
||
log_to_assistant(
|
||
owner,
|
||
f"Started serving {short} on {remote or 'local'}",
|
||
category="Serve",
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
return {"ok": True, "session_id": session_id, "remote": remote or "local",
|
||
"endpoint_id": endpoint_id}
|
||
|
||
# ── Server setup (install deps on remote) ──
|
||
|
||
class SetupRequest(BaseModel):
|
||
host: str
|
||
ssh_port: str | None = None
|
||
|
||
@router.post("/api/cookbook/setup")
|
||
async def server_setup(request: Request, req: SetupRequest):
|
||
"""Install required dependencies on a remote server via SSH."""
|
||
require_admin(request)
|
||
host = _validate_remote_host(req.host)
|
||
if not host:
|
||
raise HTTPException(400, "host is required")
|
||
port = req.ssh_port
|
||
if port is not None and port != "" and not re.fullmatch(r"\d{1,5}", port):
|
||
raise HTTPException(400, "Invalid ssh_port")
|
||
pf = f"-p {port} " if port and port != "22" else ""
|
||
|
||
# Detect platform: Windows first (echo %OS% → Windows_NT), then Termux, then Linux
|
||
detect_cmd = f'ssh {pf}{host} "echo %OS%"'
|
||
platform = "linux"
|
||
try:
|
||
proc = await asyncio.create_subprocess_shell(
|
||
detect_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
||
)
|
||
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10)
|
||
out = stdout.decode().strip()
|
||
if "Windows_NT" in out:
|
||
platform = "windows"
|
||
else:
|
||
# Check for Termux
|
||
detect_cmd2 = f"ssh {pf}{host} 'test -d /data/data/com.termux && echo termux || echo linux'"
|
||
proc2 = await asyncio.create_subprocess_shell(
|
||
detect_cmd2, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
||
)
|
||
stdout2, _ = await asyncio.wait_for(proc2.communicate(), timeout=10)
|
||
platform = stdout2.decode().strip()
|
||
except Exception:
|
||
platform = "linux"
|
||
|
||
if platform == "windows":
|
||
# Windows setup: ensure Python + pip + huggingface-hub via PowerShell
|
||
# Also create the session directory for background tasks
|
||
setup_script = (
|
||
'powershell -Command "'
|
||
"New-Item -ItemType Directory -Force -Path $env:TEMP\\odysseus-sessions | Out-Null; "
|
||
"try { python --version } catch { Write-Host 'ERROR: Python not found — install from python.org'; exit 1 }; "
|
||
"python -m pip install -q huggingface-hub 2>$null; "
|
||
"python -c \\\"from huggingface_hub import snapshot_download; print('OK')\\\""
|
||
'"'
|
||
)
|
||
cmd = f'ssh {pf}{host} {setup_script}'
|
||
elif platform == "termux":
|
||
setup_script = (
|
||
"pkg install -y python tmux 2>/dev/null; "
|
||
"pip install --no-deps -q huggingface-hub 2>/dev/null; "
|
||
"pip install -q filelock fsspec packaging pyyaml tqdm typer httpx requests 2>/dev/null; "
|
||
"python3 -c 'from huggingface_hub import snapshot_download; print(\"OK\")'"
|
||
)
|
||
cmd = f"ssh {pf}{host} '{setup_script}'"
|
||
else:
|
||
# Linux: auto-install tmux (via whichever package manager is available)
|
||
# and huggingface_hub + hf_transfer (falling back to --user/--break-system-packages
|
||
# on PEP-668 locked distros like Arch / newer Debian).
|
||
setup_script = (
|
||
# Install tmux if missing — try common package managers; skip if no sudo
|
||
"if ! command -v tmux >/dev/null 2>&1; then "
|
||
" if command -v apt-get >/dev/null 2>&1; then sudo -n apt-get install -y tmux 2>/dev/null; "
|
||
" elif command -v pacman >/dev/null 2>&1; then sudo -n pacman -S --noconfirm tmux 2>/dev/null; "
|
||
" elif command -v dnf >/dev/null 2>&1; then sudo -n dnf install -y tmux 2>/dev/null; "
|
||
" elif command -v apk >/dev/null 2>&1; then sudo -n apk add --no-interactive tmux 2>/dev/null; "
|
||
" elif command -v zypper >/dev/null 2>&1; then sudo -n zypper --non-interactive install tmux 2>/dev/null; "
|
||
" fi; "
|
||
"fi; "
|
||
"command -v tmux >/dev/null 2>&1 || echo 'WARNING: tmux missing and auto-install failed (need passwordless sudo). Install manually.'; "
|
||
# Install Python bits. Try system install first; fall back to --user --break-system-packages on PEP 668 systems.
|
||
"pip install -q huggingface_hub hf_transfer 2>/dev/null || "
|
||
"pip install --user --break-system-packages -q huggingface_hub hf_transfer 2>/dev/null || "
|
||
"pip3 install --user --break-system-packages -q huggingface_hub hf_transfer 2>/dev/null; "
|
||
"python3 -c 'from huggingface_hub import snapshot_download; print(\"OK\")'"
|
||
)
|
||
cmd = f"ssh {pf}{host} '{setup_script}'"
|
||
|
||
try:
|
||
proc = await asyncio.create_subprocess_shell(
|
||
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
||
)
|
||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=120)
|
||
output = stdout.decode() + stderr.decode()
|
||
ok = "OK" in output
|
||
return {"ok": ok, "output": output.strip(), "platform": platform}
|
||
except asyncio.TimeoutError:
|
||
return {"ok": False, "error": "Setup timed out (120s)", "platform": platform}
|
||
except Exception as e:
|
||
return {"ok": False, "error": str(e), "platform": platform}
|
||
|
||
# ── GPU availability probe ──
|
||
|
||
async def _run_nvidia_smi(query: str, host: str | None, ssh_port: str | None, timeout: int = 8):
|
||
"""Run nvidia-smi locally or over SSH. Returns (stdout, error_or_None)."""
|
||
if host:
|
||
pf = f"-p {ssh_port} " if ssh_port and ssh_port != "22" else ""
|
||
cmd = f"ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no {pf}{host} '{query}'"
|
||
proc = await asyncio.create_subprocess_shell(
|
||
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
||
)
|
||
else:
|
||
proc = await asyncio.create_subprocess_exec(
|
||
*shlex.split(query),
|
||
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
try:
|
||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
||
except asyncio.TimeoutError:
|
||
proc.kill()
|
||
return None, "nvidia-smi timed out"
|
||
if proc.returncode != 0:
|
||
err = (stderr.decode("utf-8", errors="replace") or "").strip()[:200]
|
||
return None, err or "nvidia-smi failed"
|
||
return stdout.decode("utf-8", errors="replace"), None
|
||
|
||
async def _run_gpu_shell(cmd_text: str, host: str | None, ssh_port: str | None, timeout: int = 8):
|
||
"""Run a small GPU probe shell command locally or over SSH."""
|
||
if host:
|
||
pf = f"-p {ssh_port} " if ssh_port and ssh_port != "22" else ""
|
||
quoted_cmd = shlex.quote(cmd_text)
|
||
remote_cmd = (
|
||
f"if command -v sh >/dev/null 2>&1; then sh -lc {quoted_cmd}; "
|
||
f"elif command -v bash >/dev/null 2>&1; then bash -lc {quoted_cmd}; "
|
||
f"elif command -v zsh >/dev/null 2>&1; then zsh -lc {quoted_cmd}; "
|
||
"else echo 'No POSIX shell found for GPU probe' >&2; exit 127; fi"
|
||
)
|
||
cmd = f"ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no {pf}{host} {shlex.quote(remote_cmd)}"
|
||
proc = await asyncio.create_subprocess_shell(
|
||
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
||
)
|
||
else:
|
||
proc = await asyncio.create_subprocess_shell(
|
||
cmd_text, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
||
)
|
||
try:
|
||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
||
except asyncio.TimeoutError:
|
||
proc.kill()
|
||
return None, "GPU probe timed out"
|
||
if proc.returncode != 0:
|
||
err = (stderr.decode("utf-8", errors="replace") or "").strip()[:200]
|
||
return None, err or f"GPU probe failed ({proc.returncode})"
|
||
return stdout.decode("utf-8", errors="replace"), None
|
||
|
||
async def _gpu_read_file(path: str, host: str | None, ssh_port: str | None) -> str | None:
|
||
out, err = await _run_gpu_shell(f"cat {shlex.quote(path)} 2>/dev/null", host, ssh_port, timeout=4)
|
||
if err is not None or out is None:
|
||
return None
|
||
return out.strip()
|
||
|
||
async def _probe_gpu_device_processes(host: str | None, ssh_port: str | None) -> list[dict]:
|
||
pid_cmd = (
|
||
"{ command -v lsof >/dev/null 2>&1 && "
|
||
"lsof -w -t /dev/kfd /dev/dri/renderD* 2>/dev/null || true; "
|
||
"command -v fuser >/dev/null 2>&1 && "
|
||
"fuser /dev/kfd /dev/dri/renderD* 2>/dev/null || true; } "
|
||
"| tr ' ' '\\n' | sed '/^[0-9][0-9]*$/!d' | sort -n -u"
|
||
)
|
||
out, err = await _run_gpu_shell(pid_cmd, host, ssh_port, timeout=5)
|
||
if err is not None or not out:
|
||
return []
|
||
processes = []
|
||
seen = set()
|
||
for raw in out.splitlines():
|
||
try:
|
||
pid = int(raw.strip())
|
||
except ValueError:
|
||
continue
|
||
if pid in seen:
|
||
continue
|
||
seen.add(pid)
|
||
name_out, _ = await _run_gpu_shell(f"ps -p {pid} -o comm= 2>/dev/null", host, ssh_port, timeout=3)
|
||
name = (name_out or "").strip().splitlines()[0] if (name_out or "").strip() else "process"
|
||
processes.append({"pid": pid, "name": name[:80], "used_mb": 0})
|
||
return processes
|
||
|
||
async def _probe_amd_sysfs(host: str | None, ssh_port: str | None) -> list[dict]:
|
||
out, err = await _run_gpu_shell("ls -1 /sys/class/drm 2>/dev/null", host, ssh_port, timeout=4)
|
||
if err is not None or not out:
|
||
return []
|
||
gpus = []
|
||
for entry in out.split():
|
||
if not entry.startswith("card") or "-" in entry:
|
||
continue
|
||
base = f"/sys/class/drm/{entry}/device"
|
||
vendor = await _gpu_read_file(f"{base}/vendor", host, ssh_port)
|
||
if vendor != "0x1002":
|
||
continue
|
||
vram_raw = await _gpu_read_file(f"{base}/mem_info_vram_total", host, ssh_port)
|
||
vis_raw = await _gpu_read_file(f"{base}/mem_info_vis_vram_total", host, ssh_port)
|
||
gtt_raw = await _gpu_read_file(f"{base}/mem_info_gtt_total", host, ssh_port)
|
||
vram_bytes = int(vram_raw) if vram_raw and vram_raw.isdigit() else 0
|
||
vis_bytes = int(vis_raw) if vis_raw and vis_raw.isdigit() else 0
|
||
gtt_bytes = int(gtt_raw) if gtt_raw and gtt_raw.isdigit() else 0
|
||
total_bytes = max(vram_bytes, vis_bytes)
|
||
used_attr = "mem_info_vis_vram_used" if vis_bytes and vis_bytes >= vram_bytes else "mem_info_vram_used"
|
||
unified = bool(vis_bytes and vis_bytes >= vram_bytes)
|
||
if total_bytes <= 0:
|
||
total_bytes = gtt_bytes
|
||
used_attr = "mem_info_gtt_used"
|
||
unified = True
|
||
if total_bytes <= 0:
|
||
continue
|
||
used_raw = await _gpu_read_file(f"{base}/{used_attr}", host, ssh_port)
|
||
used_bytes = int(used_raw) if used_raw and used_raw.isdigit() else 0
|
||
name = await _gpu_read_file(f"{base}/product_name", host, ssh_port)
|
||
if not name:
|
||
device = await _gpu_read_file(f"{base}/device", host, ssh_port)
|
||
name = f"AMD GPU {device or entry}"
|
||
total_mb = max(0, int(total_bytes / (1024 * 1024)))
|
||
used_mb = max(0, min(total_mb, int(used_bytes / (1024 * 1024))))
|
||
free_mb = max(0, total_mb - used_mb)
|
||
# GTT = the system-RAM pool the GPU pages into when VRAM is full.
|
||
# On a discrete card a large gtt_used means the model spilled past
|
||
# VRAM into RAM over PCIe — much slower. Surface it so the UI can
|
||
# warn "spilling to RAM" instead of the user wondering why it's slow.
|
||
gtt_used_raw = await _gpu_read_file(f"{base}/mem_info_gtt_used", host, ssh_port)
|
||
gtt_used_mb = max(0, int(int(gtt_used_raw) / (1024 * 1024))) if (gtt_used_raw and gtt_used_raw.isdigit()) else 0
|
||
gpus.append({
|
||
"index": len(gpus), "name": name, "uuid": entry,
|
||
"free_mb": free_mb, "total_mb": total_mb, "used_mb": used_mb,
|
||
"gtt_used_mb": gtt_used_mb,
|
||
"util_pct": 0, "busy": bool(total_mb and (free_mb / total_mb) < 0.85),
|
||
"processes": [], "backend": "rocm", "source": "amd-sysfs",
|
||
"unified_memory": unified,
|
||
})
|
||
if gpus:
|
||
processes = await _probe_gpu_device_processes(host, ssh_port)
|
||
if processes:
|
||
gpus[0]["processes"] = processes
|
||
gpus[0]["busy"] = True
|
||
return gpus
|
||
|
||
@router.get("/api/cookbook/gpus")
|
||
async def list_gpus(request: Request, host: str | None = None, ssh_port: str | None = None):
|
||
"""Probe GPU memory/process state locally or via SSH.
|
||
|
||
Probe order:
|
||
1. NVIDIA via nvidia-smi
|
||
2. AMD/ROCm and unified-memory APUs via /sys/class/drm
|
||
3. Generic GPU device holders via /dev/kfd and /dev/dri/renderD*
|
||
|
||
Returned shape:
|
||
{ "ok": True, "gpus": [
|
||
{"index": 0, "name": "...", "free_mb": int, "total_mb": int,
|
||
"used_mb": int, "util_pct": int, "busy": bool,
|
||
"uuid": "GPU-...",
|
||
"processes": [{"pid": int, "name": str, "used_mb": int}, ...]
|
||
}, ...
|
||
]}
|
||
`busy` is True when free_mb/total_mb < 0.5.
|
||
"""
|
||
require_admin(request)
|
||
host = _validate_remote_host(host)
|
||
if ssh_port is not None and ssh_port != "" and not _SSH_PORT_RE.fullmatch(ssh_port):
|
||
raise HTTPException(400, "Invalid ssh_port")
|
||
gpu_query = "nvidia-smi --query-gpu=index,name,memory.free,memory.total,memory.used,utilization.gpu,uuid --format=csv,noheader,nounits"
|
||
nvidia_error = None
|
||
try:
|
||
gpu_out, err = await _run_nvidia_smi(gpu_query, host, ssh_port)
|
||
if err is not None:
|
||
nvidia_error = err
|
||
gpu_out = ""
|
||
except FileNotFoundError:
|
||
nvidia_error = "nvidia-smi not found"
|
||
gpu_out = ""
|
||
except Exception as e:
|
||
nvidia_error = str(e)[:200]
|
||
gpu_out = ""
|
||
|
||
gpus = []
|
||
uuid_to_idx: dict[str, int] = {}
|
||
for line in (gpu_out or "").strip().splitlines():
|
||
parts = [p.strip() for p in line.split(",")]
|
||
if len(parts) < 7:
|
||
continue
|
||
try:
|
||
idx = int(parts[0])
|
||
name = parts[1]
|
||
free_mb = int(float(parts[2]))
|
||
total_mb = int(float(parts[3]))
|
||
used_mb = int(float(parts[4]))
|
||
util_pct = int(float(parts[5]))
|
||
gpu_uuid = parts[6]
|
||
except (ValueError, IndexError):
|
||
continue
|
||
busy = total_mb > 0 and (free_mb / total_mb) < 0.5
|
||
uuid_to_idx[gpu_uuid] = idx
|
||
gpus.append({
|
||
"index": idx, "name": name, "uuid": gpu_uuid,
|
||
"free_mb": free_mb, "total_mb": total_mb,
|
||
"used_mb": used_mb, "util_pct": util_pct,
|
||
"busy": busy, "processes": [],
|
||
})
|
||
|
||
# Best-effort process listing — skip silently if it fails
|
||
proc_query = "nvidia-smi --query-compute-apps=pid,gpu_uuid,process_name,used_memory --format=csv,noheader,nounits"
|
||
try:
|
||
proc_out, proc_err = await _run_nvidia_smi(proc_query, host, ssh_port, timeout=5)
|
||
if proc_err is None and proc_out:
|
||
gpus_by_idx = {g["index"]: g for g in gpus}
|
||
for line in proc_out.strip().splitlines():
|
||
parts = [p.strip() for p in line.split(",")]
|
||
if len(parts) < 4:
|
||
continue
|
||
try:
|
||
pid = int(parts[0])
|
||
pname = parts[2]
|
||
pmem = int(float(parts[3]))
|
||
except (ValueError, IndexError):
|
||
continue
|
||
idx = uuid_to_idx.get(parts[1])
|
||
if idx is None or idx not in gpus_by_idx:
|
||
continue
|
||
gpus_by_idx[idx]["processes"].append({
|
||
"pid": pid, "name": pname, "used_mb": pmem,
|
||
})
|
||
except Exception:
|
||
pass
|
||
|
||
if gpus:
|
||
return {"ok": True, "gpus": gpus, "backend": "cuda", "source": "nvidia-smi"}
|
||
|
||
# Local Apple Silicon / Metal fallback. macOS has no nvidia-smi and no
|
||
# Linux /sys/class/drm tree, but services.hwfit.hardware already knows
|
||
# how to size the shared unified-memory GPU budget. Keep this route in
|
||
# sync so Cookbook's GPU picker doesn't show "nvidia-smi not found" on
|
||
# native Mac launches.
|
||
if not host and sys.platform == "darwin":
|
||
try:
|
||
from services.hwfit.hardware import detect_system
|
||
info = detect_system(fresh=True)
|
||
backend = str(info.get("backend") or "").lower()
|
||
if backend in {"metal", "mps", "apple"} and info.get("gpu_count", 0) > 0:
|
||
total_mb = int(float(info.get("gpu_vram_gb") or info.get("total_ram_gb") or 0) * 1024)
|
||
free_mb = int(float(info.get("available_ram_gb") or 0) * 1024)
|
||
if total_mb and (free_mb <= 0 or free_mb > total_mb):
|
||
free_mb = total_mb
|
||
used_mb = max(0, total_mb - max(0, free_mb))
|
||
return {
|
||
"ok": True,
|
||
"gpus": [{
|
||
"index": 0,
|
||
"name": info.get("gpu_name") or info.get("cpu_name") or "Apple Silicon GPU",
|
||
"uuid": "apple-metal-0",
|
||
"free_mb": max(0, free_mb),
|
||
"total_mb": max(0, total_mb),
|
||
"used_mb": used_mb,
|
||
"util_pct": 0,
|
||
"busy": bool(total_mb and (free_mb / total_mb) < 0.5),
|
||
"processes": [],
|
||
"backend": "metal",
|
||
"source": "apple-metal",
|
||
"unified_memory": True,
|
||
}],
|
||
"backend": "metal",
|
||
"source": "apple-metal",
|
||
"fallback_from": "nvidia-smi",
|
||
"nvidia_error": nvidia_error,
|
||
}
|
||
except Exception as e:
|
||
logger.warning("Apple Metal GPU fallback failed: %s", e)
|
||
|
||
amd_gpus = await _probe_amd_sysfs(host, ssh_port)
|
||
if amd_gpus:
|
||
return {
|
||
"ok": True,
|
||
"gpus": amd_gpus,
|
||
"backend": "rocm",
|
||
"source": "amd-sysfs",
|
||
"fallback_from": "nvidia-smi",
|
||
"nvidia_error": nvidia_error,
|
||
}
|
||
|
||
processes = await _probe_gpu_device_processes(host, ssh_port)
|
||
if processes:
|
||
return {
|
||
"ok": True,
|
||
"gpus": [{
|
||
"index": 0, "name": "GPU device holders", "uuid": "dev-dri",
|
||
"free_mb": 0, "total_mb": 0, "used_mb": 0, "util_pct": 0,
|
||
"busy": True, "processes": processes,
|
||
"backend": "generic", "source": "gpu-devices",
|
||
}],
|
||
"backend": "generic",
|
||
"source": "gpu-devices",
|
||
"fallback_from": "nvidia-smi",
|
||
"nvidia_error": nvidia_error,
|
||
}
|
||
|
||
return {"ok": False, "error": nvidia_error or "No GPU memory probe available", "gpus": []}
|
||
|
||
class KillPidRequest(BaseModel):
|
||
pid: int
|
||
host: str | None = None
|
||
ssh_port: str | None = None
|
||
signal: str = "TERM" # TERM (graceful) or KILL (force)
|
||
|
||
@router.post("/api/cookbook/kill-pid")
|
||
async def kill_pid(request: Request, req: KillPidRequest):
|
||
"""Kill a PID that's holding GPU memory.
|
||
|
||
Admin-gated. Validates PID is positive int, signal is TERM/KILL, and
|
||
forbids low PIDs (<100) to avoid accidentally signalling init/system
|
||
daemons. Uses `kill -<sig> <pid>` locally or over SSH.
|
||
"""
|
||
require_admin(request)
|
||
if req.pid < 100:
|
||
raise HTTPException(400, f"Refusing to signal PID {req.pid} (<100, likely system process)")
|
||
sig = (req.signal or "TERM").upper()
|
||
if sig not in ("TERM", "KILL", "INT"):
|
||
raise HTTPException(400, "signal must be TERM, KILL, or INT")
|
||
host = _validate_remote_host(req.host)
|
||
if req.ssh_port and not _SSH_PORT_RE.fullmatch(req.ssh_port):
|
||
raise HTTPException(400, "Invalid ssh_port")
|
||
kill_cmd = f"kill -{sig} {req.pid}"
|
||
try:
|
||
if host:
|
||
pf = f"-p {req.ssh_port} " if req.ssh_port and req.ssh_port != "22" else ""
|
||
cmd = f"ssh -o ConnectTimeout=5 -o StrictHostKeyChecking=no {pf}{host} '{kill_cmd}'"
|
||
proc = await asyncio.create_subprocess_shell(
|
||
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
||
)
|
||
elif IS_WINDOWS:
|
||
# No `kill` binary / POSIX signals on Windows. taskkill /F /T tears
|
||
# down the PID and its children. There's no graceful-vs-force
|
||
# distinction, so TERM/KILL/INT all map to the same forced kill.
|
||
# NB: never use os.kill(pid, 0) to probe here — on Windows that
|
||
# routes to TerminateProcess and would kill the process.
|
||
if not pid_alive(req.pid):
|
||
return {"ok": False, "error": f"PID {req.pid} is not running"}
|
||
await asyncio.to_thread(kill_process_tree, req.pid)
|
||
return {"ok": True, "pid": req.pid, "signal": sig}
|
||
else:
|
||
proc = await asyncio.create_subprocess_exec(
|
||
"kill", f"-{sig}", str(req.pid),
|
||
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=5)
|
||
if proc.returncode != 0:
|
||
err = (stderr.decode("utf-8", errors="replace") or "").strip()[:200]
|
||
return {"ok": False, "error": err or f"kill returned {proc.returncode}"}
|
||
return {"ok": True, "pid": req.pid, "signal": sig}
|
||
except asyncio.TimeoutError:
|
||
return {"ok": False, "error": "kill command timed out"}
|
||
except Exception as e:
|
||
return {"ok": False, "error": str(e)[:200]}
|
||
|
||
# ── Cookbook state persistence (cross-device sync) ──
|
||
|
||
@router.get("/api/cookbook/state")
|
||
async def get_cookbook_state(request: Request):
|
||
"""Load saved cookbook state (tasks, servers, presets, settings)."""
|
||
require_admin(request)
|
||
if _cookbook_state_path.exists():
|
||
try:
|
||
return _state_for_client(json.loads(_cookbook_state_path.read_text(encoding="utf-8")))
|
||
except Exception:
|
||
return {}
|
||
return {}
|
||
|
||
@router.post("/api/cookbook/state")
|
||
async def save_cookbook_state(request: Request):
|
||
"""Save cookbook state for cross-device sync.
|
||
|
||
Admin-gated because cookbook state is read back into shell-quoting
|
||
contexts when polling tmux session status (see status handler).
|
||
|
||
Merge guard: the UI debounces a `_syncToServer` POST every few
|
||
seconds with whatever localStorage has. The agent's tool layer
|
||
writes server-side tasks (e.g. `download_model` registering a
|
||
task). Without a merge, every UI sync wipes the agent's recent
|
||
additions. We preserve any on-disk task that the incoming body
|
||
omits but was added in the last RACE_WINDOW seconds — that's a
|
||
race, not an intentional delete.
|
||
"""
|
||
require_admin(request)
|
||
RACE_WINDOW_MS = 60_000
|
||
try:
|
||
from core.atomic_io import atomic_write_json
|
||
data = await request.json()
|
||
if not isinstance(data, dict):
|
||
data = {}
|
||
try:
|
||
if _cookbook_state_path.exists():
|
||
on_disk = json.loads(_cookbook_state_path.read_text(encoding="utf-8"))
|
||
else:
|
||
on_disk = {}
|
||
except Exception:
|
||
on_disk = {}
|
||
# Anti-wipe guard for env servers. The UI debounces a
|
||
# sync of whatever is in memory; if it fires before the state has
|
||
# hydrated from GET /state (a load-time race) or during a render
|
||
# glitch, `env.servers` would be empty and silently overwrite the
|
||
# saved servers on disk. Never let an empty/absent incoming
|
||
# env.servers clobber a populated on-disk one — preserve the disk
|
||
# values while still accepting the rest of the incoming env.
|
||
disk_env = on_disk.get("env") if isinstance(on_disk, dict) and isinstance(on_disk.get("env"), dict) else None
|
||
if disk_env:
|
||
inc_env = data.get("env") if isinstance(data.get("env"), dict) else None
|
||
if inc_env is None:
|
||
data["env"] = disk_env
|
||
logger.warning("cookbook state POST: incoming body had no env; preserved on-disk env (anti-wipe guard)")
|
||
elif disk_env.get("servers") and not inc_env.get("servers"):
|
||
inc_env["servers"] = disk_env["servers"]
|
||
logger.warning("cookbook state POST: incoming env.servers empty; preserved on-disk servers (anti-wipe guard)")
|
||
|
||
disk_tasks = on_disk.get("tasks") or [] if isinstance(on_disk, dict) else []
|
||
incoming_tasks = data.get("tasks") if isinstance(data.get("tasks"), list) else []
|
||
# Anti-poisoning guard: a stale browser tab can keep POSTing a
|
||
# download task as status='done' from before the strict-finish
|
||
# fix landed, undoing any server-side correction. For each
|
||
# incoming "done" download, override to "running" if the last
|
||
# shard pattern says N<total AND no DOWNLOAD_OK/DOWNLOAD_FAILED/
|
||
# /snapshots/ sentinel is in the output.
|
||
import re as _re_dl
|
||
for _it in incoming_tasks:
|
||
if (not isinstance(_it, dict)) or _it.get("type") != "download" or _it.get("status") != "done":
|
||
continue
|
||
_out = _it.get("output") or ""
|
||
if ("DOWNLOAD_OK" in _out) or ("DOWNLOAD_FAILED" in _out) or ("/snapshots/" in _out):
|
||
continue
|
||
_shards = _re_dl.findall(r"model-(\d+)-of-(\d+)\.safetensors", _out)
|
||
if _shards:
|
||
_n, _tot = _shards[-1]
|
||
if int(_n) < int(_tot):
|
||
logger.info(f"cookbook state POST: rejecting stale done for {_it.get('sessionId')} "
|
||
f"(last shard {_n}/{_tot}, no DOWNLOAD_OK)")
|
||
_it["status"] = "running"
|
||
else:
|
||
_completed = _out.count("Download complete")
|
||
_starts = _out.count("Downloading '")
|
||
if _starts > _completed:
|
||
logger.info(f"cookbook state POST: rejecting stale done for {_it.get('sessionId')} "
|
||
f"({_completed}/{_starts} files complete, no DOWNLOAD_OK)")
|
||
_it["status"] = "running"
|
||
incoming_ids = {t.get("sessionId") for t in incoming_tasks if isinstance(t, dict) and t.get("sessionId")}
|
||
import time as _t
|
||
now_ms = int(_t.time() * 1000)
|
||
preserved = []
|
||
for t in disk_tasks:
|
||
if not isinstance(t, dict):
|
||
continue
|
||
sid = t.get("sessionId")
|
||
if not sid or sid in incoming_ids:
|
||
continue # client's version wins
|
||
ts = t.get("ts") or 0
|
||
if isinstance(ts, (int, float)) and (now_ms - ts) <= RACE_WINDOW_MS:
|
||
preserved.append(t)
|
||
if preserved:
|
||
logger.info(f"cookbook state POST: preserving {len(preserved)} recent task(s) "
|
||
f"not in incoming body (race guard): "
|
||
f"{[t.get('sessionId') for t in preserved]}")
|
||
data["tasks"] = incoming_tasks + preserved
|
||
atomic_write_json(str(_cookbook_state_path), _state_for_storage(data, on_disk), indent=2)
|
||
return {"ok": True, "preserved": len(preserved)}
|
||
except Exception as e:
|
||
return {"ok": False, "error": str(e)}
|
||
|
||
@router.get("/api/cookbook/hf-latest")
|
||
async def hf_latest(vram_gb: float = 0, limit: int = 10, pipeline: str = "text-generation", owner: str = Depends(require_user)):
|
||
"""Fetch latest HuggingFace models, filtered by what fits in available VRAM.
|
||
|
||
vram_gb: total available VRAM in GB. 0 = no filter (return everything).
|
||
limit: how many models to return (default 10).
|
||
pipeline: HF pipeline_tag filter (text-generation, text-to-image, etc.).
|
||
"""
|
||
import re
|
||
import httpx
|
||
|
||
# Fetch a larger pool so we have enough to filter from (we drop ~80%)
|
||
pool_size = max(limit * 15, 100)
|
||
url = (
|
||
"https://huggingface.co/api/models"
|
||
f"?sort=trendingScore&direction=-1&limit={pool_size}&filter={pipeline}"
|
||
)
|
||
try:
|
||
async with httpx.AsyncClient(timeout=15) as client:
|
||
resp = await client.get(url)
|
||
if resp.status_code != 200:
|
||
return {"models": [], "error": f"HF API HTTP {resp.status_code}"}
|
||
raw = resp.json()
|
||
except Exception as e:
|
||
return {"models": [], "error": str(e)}
|
||
|
||
# Estimate VRAM from the model id. Looks for patterns like "7B", "70B", "1.5B" etc.
|
||
# Returns approx VRAM in GB at fp16 (params*2). Caller adjusts for quant.
|
||
def _est_vram_fp16(repo_id: str) -> float | None:
|
||
m = re.search(r'[-_/](\d+(?:\.\d+)?)\s*[Bb](?![a-zA-Z])', repo_id)
|
||
if not m:
|
||
return None
|
||
params_b = float(m.group(1))
|
||
return params_b * 2.0 # fp16 baseline
|
||
|
||
# Detect quantization from repo_id / tags. Returns a multiplier on fp16 size.
|
||
def _quant_factor(repo_id: str, tags: list) -> float:
|
||
text = (repo_id + " " + " ".join(tags or [])).lower()
|
||
if "fp4" in text or "nf4" in text or "int4" in text or "4bit" in text or "q4" in text or "awq" in text or "gptq" in text:
|
||
return 0.25
|
||
if "int8" in text or "8bit" in text or "q8" in text or "fp8" in text:
|
||
return 0.5
|
||
if "bf16" in text or "fp16" in text:
|
||
return 1.0
|
||
return 1.0 # default fp16
|
||
|
||
# Exclude adapters, LoRAs, datasets, GGUF-only repos, and other non-runnable artifacts
|
||
EXCLUDE_TAG_SUBSTRINGS = (
|
||
"lora", "adapter", "peft", "qlora",
|
||
"dataset", "embeddings",
|
||
"merge", "control-lora",
|
||
"diffusion-lora", "stable-diffusion-lora",
|
||
"text-classification", "token-classification",
|
||
"feature-extraction", "sentence-similarity",
|
||
)
|
||
EXCLUDE_NAME_SUBSTRINGS = (
|
||
"lora", "adapter", "peft", "qlora",
|
||
"embedding", "embed-",
|
||
"dataset",
|
||
)
|
||
|
||
def _is_excluded(repo_id: str, tags: list) -> bool:
|
||
text = repo_id.lower()
|
||
for s in EXCLUDE_NAME_SUBSTRINGS:
|
||
if s in text:
|
||
return True
|
||
tag_text = " ".join(t.lower() for t in (tags or []))
|
||
for s in EXCLUDE_TAG_SUBSTRINGS:
|
||
if s in tag_text:
|
||
return True
|
||
return False
|
||
|
||
out = []
|
||
for entry in raw:
|
||
repo_id = entry.get("modelId") or entry.get("id") or ""
|
||
if not repo_id:
|
||
continue
|
||
tags = entry.get("tags") or []
|
||
pipeline_tag = entry.get("pipeline_tag") or ""
|
||
|
||
# Hard filter: only the requested pipeline (HF's filter param is loose)
|
||
if pipeline and pipeline_tag and pipeline_tag != pipeline:
|
||
continue
|
||
# Skip adapters, LoRAs, datasets, etc.
|
||
if _is_excluded(repo_id, tags):
|
||
continue
|
||
|
||
est_fp16 = _est_vram_fp16(repo_id)
|
||
quant_mult = _quant_factor(repo_id, tags)
|
||
est_vram = (est_fp16 * quant_mult) if est_fp16 else None
|
||
# Add 30% headroom for KV cache, activations, etc.
|
||
needed_vram = (est_vram * 1.3) if est_vram else None
|
||
|
||
if vram_gb > 0 and needed_vram is not None and needed_vram > vram_gb:
|
||
continue
|
||
# Unknown-size models (e.g. MiniMax-M2.7, DeepSeek-V4-Flash) have no
|
||
# "NB" in the repo id, so the regex above can't extract their
|
||
# param count. Previously we dropped them entirely, which made
|
||
# brand-new flagship releases silently vanish from this list even
|
||
# on rigs with hundreds of GB of VRAM. Adapters/LoRAs are already
|
||
# filtered by _is_excluded(), so what falls through here is
|
||
# overwhelmingly full models — keep them, just without a size
|
||
# badge (the frontend handles needed_vram_gb=null gracefully).
|
||
|
||
out.append({
|
||
"repo_id": repo_id,
|
||
"downloads": entry.get("downloads", 0),
|
||
"likes": entry.get("likes", 0),
|
||
"createdAt": entry.get("createdAt", ""),
|
||
"tags": tags[:5], # trim
|
||
"pipeline_tag": pipeline_tag,
|
||
"est_vram_gb": round(est_vram, 1) if est_vram else None,
|
||
"needed_vram_gb": round(needed_vram, 1) if needed_vram else None,
|
||
})
|
||
if len(out) >= limit:
|
||
break
|
||
|
||
return {"models": out}
|
||
|
||
# Rate-limit for the orphan-tmux adoption sweep. The UI polls
|
||
# tasks/status every ~3s; we don't want to SSH every host on every
|
||
# poll. 20s is fast enough that a model the agent launched in the
|
||
# background shows up "almost immediately" in the UI without being
|
||
# wasteful.
|
||
_last_orphan_sweep_ts = [0.0]
|
||
_ORPHAN_SWEEP_MIN_INTERVAL_S = 20.0
|
||
|
||
def _maybe_sweep_orphans(tasks: list, state: dict) -> None:
|
||
"""Scan each configured cookbook server for `serve-*` tmux sessions
|
||
the cookbook doesn't know about and adopt them into state.tasks.
|
||
|
||
Writes are conditional: if no orphans are found, nothing is touched.
|
||
Rate-limited so polling UIs don't trigger SSH on every refresh.
|
||
"""
|
||
import time as _time
|
||
import subprocess
|
||
logger.info(f"_maybe_sweep_orphans: entered, last_ts={_last_orphan_sweep_ts[0]}")
|
||
now = _time.monotonic()
|
||
if now - _last_orphan_sweep_ts[0] < _ORPHAN_SWEEP_MIN_INTERVAL_S:
|
||
logger.info(f"_maybe_sweep_orphans: rate-limited, {now - _last_orphan_sweep_ts[0]:.1f}s since last")
|
||
return
|
||
_last_orphan_sweep_ts[0] = now
|
||
|
||
env = state.get("env") if isinstance(state, dict) else {}
|
||
servers = env.get("servers") if isinstance(env, dict) else []
|
||
logger.info(f"orphan sweep starting: {len(servers) if isinstance(servers, list) else 0} server(s), known_sids={len([t for t in tasks if isinstance(t, dict) and t.get('sessionId')])}")
|
||
if not isinstance(servers, list):
|
||
return
|
||
|
||
known_sids = {
|
||
t.get("sessionId") for t in tasks
|
||
if isinstance(t, dict) and t.get("sessionId")
|
||
}
|
||
|
||
adopted_any = False
|
||
for srv in servers:
|
||
if not isinstance(srv, dict):
|
||
continue
|
||
host = (srv.get("host") or "").strip()
|
||
if not host:
|
||
continue # local-only entry; the /proc scan handles it
|
||
if not _REMOTE_HOST_RE.match(host):
|
||
continue
|
||
sport = str(srv.get("port") or "").strip()
|
||
ssh_base = ["ssh", "-o", "ConnectTimeout=4", "-o", "StrictHostKeyChecking=no"]
|
||
if sport and sport != "22":
|
||
if not _SSH_PORT_RE.match(sport):
|
||
continue
|
||
ssh_base.extend(["-p", sport])
|
||
|
||
try:
|
||
ls = subprocess.run(
|
||
ssh_base + [host, "tmux ls 2>/dev/null"],
|
||
timeout=6, capture_output=True, text=True,
|
||
)
|
||
except Exception:
|
||
continue
|
||
for line in (ls.stdout or "").splitlines():
|
||
sid = line.split(":", 1)[0].strip()
|
||
if not sid or not _SESSION_ID_RE.match(sid):
|
||
continue
|
||
if sid in known_sids:
|
||
continue
|
||
# Adopt any session whose pane is currently running a
|
||
# known model-server process (checked below). The earlier
|
||
# prefix gate (serve-/cookbook-) dropped legitimate
|
||
# serves whenever tmux fell back to numeric IDs, leaving
|
||
# them invisible in the Cookbook UI — so the user could
|
||
# neither see nor stop them.
|
||
# Skip zombie / idle-shell sessions. A tmux session left
|
||
# over from a crashed vllm just shows a bash prompt —
|
||
# adopting it would pollute the UI with "running" tasks
|
||
# that aren't actually serving anything. pane_current_command
|
||
# is the foreground process in the pane right now; only
|
||
# real model serves leave a python/vllm/etc. process there.
|
||
try:
|
||
pc = subprocess.run(
|
||
ssh_base + [host, "tmux", "list-panes", "-t", sid,
|
||
"-F", "#{pane_current_command}"],
|
||
timeout=4, capture_output=True, text=True,
|
||
)
|
||
cur = (pc.stdout or "").strip().splitlines()
|
||
except Exception:
|
||
cur = []
|
||
LIVE_PROCS = {"python", "python3", "vllm", "llama-server",
|
||
"llama_cpp_main", "sglang", "lmdeploy",
|
||
"ollama", "node", "uvicorn"}
|
||
if not any(c in LIVE_PROCS for c in cur):
|
||
continue
|
||
# Try to recover a plausible repo_id + port from the
|
||
# pane buffer. Cheap heuristic — if we can't, register
|
||
# with placeholder fields; the UI still shows it.
|
||
try:
|
||
cap = subprocess.run(
|
||
ssh_base + [host, "tmux", "capture-pane", "-t", sid, "-p", "-S", "-300"],
|
||
timeout=6, capture_output=True, text=True,
|
||
)
|
||
pane = cap.stdout or ""
|
||
except Exception:
|
||
pane = ""
|
||
import re as _re_orphan
|
||
# vLLM banner: "model /path/...". Falls back to the
|
||
# raw vllm-serve command if the banner already scrolled.
|
||
m_model = _re_orphan.search(r"model\s+(\S+)", pane)
|
||
model = m_model.group(1) if m_model else ""
|
||
if not model:
|
||
m_serve = _re_orphan.search(r"vllm\s+serve\s+(\S+)", pane)
|
||
model = m_serve.group(1) if m_serve else f"adopted:{sid}"
|
||
m_port = _re_orphan.search(r"--port\s+(\d+)", pane)
|
||
port = int(m_port.group(1)) if m_port else 0
|
||
|
||
import time as _t2
|
||
tasks.append({
|
||
"id": sid,
|
||
"sessionId": sid,
|
||
"name": model.split("/")[-1] if "/" in model else model,
|
||
"type": "serve",
|
||
"status": "running",
|
||
"output": f"Auto-adopted from orphan tmux session on {host}. "
|
||
"Open the task to see live output.",
|
||
"ts": int(_t2.time() * 1000),
|
||
"payload": {
|
||
"repo_id": model,
|
||
"remote_host": host,
|
||
"_cmd": "(orphan tmux session — original launch cmd unknown)",
|
||
"port": port,
|
||
},
|
||
"remoteHost": host,
|
||
"sshPort": sport,
|
||
"platform": "linux",
|
||
"_serveReady": False,
|
||
"_endpointAdded": False,
|
||
"_adoptedExternally": True,
|
||
})
|
||
known_sids.add(sid)
|
||
adopted_any = True
|
||
logger.info(f"auto-adopted orphan tmux session {sid!r} on {host}")
|
||
|
||
if adopted_any:
|
||
try:
|
||
from core.atomic_io import atomic_write_json
|
||
state["tasks"] = tasks
|
||
atomic_write_json(_cookbook_state_path, state)
|
||
except Exception as e:
|
||
logger.warning(f"orphan sweep: state write failed: {e}")
|
||
|
||
@router.get("/api/cookbook/tasks/status")
|
||
async def cookbook_tasks_status(request: Request):
|
||
"""Check status of all active cookbook tmux sessions.
|
||
|
||
Critical: every subprocess.run inside this handler is a sync blocking
|
||
call that — when this was a plain async def — froze the entire server
|
||
event loop. Now the whole body runs in a worker thread via
|
||
asyncio.to_thread so other requests stay responsive."""
|
||
require_admin(request)
|
||
return await asyncio.to_thread(_cookbook_tasks_status_sync)
|
||
|
||
def _cookbook_tasks_status_sync():
|
||
import subprocess
|
||
|
||
def _download_cache_complete(repo_id: str, remote_host: str = "", ssh_port: str = "") -> bool:
|
||
"""Best-effort check for a completed HF cache entry.
|
||
|
||
tmux output can stop at a stale progress line if the pane/session
|
||
disappears before Cookbook captures the final DOWNLOAD_OK marker.
|
||
In that case, trust the cache shape: a snapshot directory with files
|
||
and no *.incomplete blobs means HuggingFace finished materializing the
|
||
model.
|
||
"""
|
||
if not repo_id or "/" not in repo_id:
|
||
return False
|
||
py = (
|
||
"import os,sys;"
|
||
"repo=sys.argv[1];"
|
||
"base=os.environ.get('HUGGINGFACE_HUB_CACHE') or os.path.join(os.environ.get('HF_HOME', os.path.expanduser('~/.cache/huggingface')), 'hub');"
|
||
"d=os.path.join(base,'models--'+repo.replace('/','--'));"
|
||
"snap=os.path.join(d,'snapshots');"
|
||
"ok=os.path.isdir(snap) and any(os.path.isdir(os.path.join(snap,x)) and os.listdir(os.path.join(snap,x)) for x in os.listdir(snap));"
|
||
"inc=False;"
|
||
"blobs=os.path.join(d,'blobs');"
|
||
"inc=os.path.isdir(blobs) and any(x.endswith('.incomplete') for x in os.listdir(blobs));"
|
||
"sys.exit(0 if ok and not inc else 1)"
|
||
)
|
||
cmd = ["python3", "-c", py, repo_id]
|
||
try:
|
||
if remote_host:
|
||
ssh_base = ["ssh"]
|
||
if ssh_port and ssh_port != "22":
|
||
ssh_base.extend(["-p", str(ssh_port)])
|
||
shell_cmd = " ".join(shlex.quote(x) for x in cmd)
|
||
proc = subprocess.run(ssh_base + [remote_host, shell_cmd], timeout=12, capture_output=True)
|
||
else:
|
||
proc = subprocess.run(cmd, timeout=12, capture_output=True)
|
||
return proc.returncode == 0
|
||
except Exception:
|
||
return False
|
||
|
||
# Load saved tasks from cookbook state
|
||
tasks = []
|
||
state = {}
|
||
if _cookbook_state_path.exists():
|
||
try:
|
||
state = json.loads(_cookbook_state_path.read_text(encoding="utf-8"))
|
||
saved_tasks = state.get("tasks", [])
|
||
if isinstance(saved_tasks, list):
|
||
tasks = saved_tasks
|
||
elif isinstance(saved_tasks, dict):
|
||
tasks = list(saved_tasks.values())
|
||
except Exception:
|
||
pass
|
||
|
||
# Orphan-tmux auto-adoption sweep. When the agent (or anyone)
|
||
# SSH-launches a `serve-*` tmux session — usually because
|
||
# serve_model rejected `source ... && vllm ...` or because of a
|
||
# manual relaunch via tmux send-keys — that session is invisible
|
||
# to the cookbook UI even though it's a live model server. The
|
||
# sweep finds those orphans on each configured remote host and
|
||
# writes them into state.tasks with _adoptedExternally=True, so
|
||
# they show up in the UI on the next poll without anyone having
|
||
# to remember to call adopt_served_model. Rate-limited via the
|
||
# module-level _last_orphan_sweep so we don't SSH every 3s.
|
||
try:
|
||
_maybe_sweep_orphans(tasks, state)
|
||
except Exception as _sweep_e:
|
||
logger.warning(f"orphan sweep failed (non-fatal): {_sweep_e!r}")
|
||
|
||
results = []
|
||
for task in tasks:
|
||
session_id = task.get("sessionId", "")
|
||
if not session_id:
|
||
continue
|
||
remote = task.get("remoteHost", "")
|
||
task_type = task.get("type", "download") # "download" or "serve"
|
||
# Field name varies depending on whether the task was added
|
||
# via the download flow (`repoId`), the serve flow (`modelId`),
|
||
# or the UI-side serve preset (which uses `name` + `payload.repo_id`).
|
||
_payload = task.get("payload") or {}
|
||
model = (
|
||
task.get("modelId")
|
||
or task.get("repoId")
|
||
or task.get("name")
|
||
or _payload.get("repo_id")
|
||
or _payload.get("modelId")
|
||
or ""
|
||
)
|
||
task_platform = task.get("platform", "")
|
||
|
||
# Check if session is alive + capture output
|
||
_tport = task.get("sshPort", "")
|
||
# Defense-in-depth: cookbook state is admin-writable but the values
|
||
# land in shell-interpolated commands below. Reject anything that
|
||
# isn't a benign session-id / hostname / port.
|
||
if not _SESSION_ID_RE.match(session_id):
|
||
logger.warning(f"Skipping task with unsafe session_id: {session_id!r}")
|
||
continue
|
||
if remote and not _REMOTE_HOST_RE.match(remote):
|
||
logger.warning(f"Skipping task with unsafe remoteHost: {remote!r}")
|
||
continue
|
||
if _tport and not _SSH_PORT_RE.match(str(_tport)):
|
||
logger.warning(f"Skipping task with unsafe sshPort: {_tport!r}")
|
||
continue
|
||
if task_platform == "windows" and remote:
|
||
# Windows: check PID file + Get-Process, read log tail
|
||
sd = "$env:TEMP\\odysseus-sessions"
|
||
ssh_base = ["ssh"]
|
||
if _tport and _tport != "22":
|
||
ssh_base.extend(["-p", str(_tport)])
|
||
check_cmd = ssh_base + [
|
||
remote,
|
||
"powershell",
|
||
"-Command",
|
||
f"$pid = Get-Content \"{sd}\\{session_id}.pid\" -ErrorAction SilentlyContinue; "
|
||
"if ($pid) {{ Get-Process -Id $pid -ErrorAction SilentlyContinue | Out-Null; if ($?) {{ exit 0 }} else {{ exit 1 }} }} else {{ exit 1 }}"
|
||
]
|
||
capture_cmd = ssh_base + [
|
||
remote,
|
||
"powershell",
|
||
"-Command",
|
||
f"Get-Content \"{sd}\\{session_id}.log\" -Tail 10 -ErrorAction SilentlyContinue",
|
||
]
|
||
elif remote:
|
||
ssh_base = ["ssh"]
|
||
if _tport and _tport != "22":
|
||
ssh_base.extend(["-p", str(_tport)])
|
||
check_cmd = ssh_base + [remote, "tmux", "has-session", "-t", session_id]
|
||
# Capture 500 lines (was 50) so a Python traceback survives
|
||
# the post-crash neofetch banner + bash prompt that otherwise
|
||
# fills the visible tail. Without this, output_tail ends up
|
||
# as just "Locale: C / Ubuntu_Odysseus ❯" and the agent
|
||
# can't diagnose the actual error.
|
||
capture_cmd = ssh_base + [remote, "tmux", "capture-pane", "-t", session_id, "-p", "-S", "-500"]
|
||
elif IS_WINDOWS:
|
||
# LOCAL Windows task: launched as a detached process (no tmux).
|
||
# Liveness comes from the <session>.pid file, output from the
|
||
# <session>.log file the wrapper redirects into. No subprocess.
|
||
check_cmd = None
|
||
capture_cmd = None
|
||
else:
|
||
check_cmd = ["tmux", "has-session", "-t", session_id]
|
||
capture_cmd = ["tmux", "capture-pane", "-t", session_id, "-p", "-S", "-500"]
|
||
|
||
local_win_task = (not remote) and IS_WINDOWS
|
||
|
||
progress_text = ""
|
||
full_snapshot = ""
|
||
|
||
if local_win_task:
|
||
# File-based liveness + output for the detached-process model.
|
||
pid_path = TMUX_LOG_DIR / f"{session_id}.pid"
|
||
log_path = TMUX_LOG_DIR / f"{session_id}.log"
|
||
task_pid = None
|
||
try:
|
||
task_pid = int(pid_path.read_text(encoding="utf-8").strip())
|
||
except Exception:
|
||
task_pid = None
|
||
is_alive = pid_alive(task_pid)
|
||
try:
|
||
if log_path.exists():
|
||
full_snapshot = log_path.read_text(
|
||
encoding="utf-8", errors="replace"
|
||
).strip()[-12000:]
|
||
lines = [l.strip() for l in full_snapshot.split('\n') if l.strip()]
|
||
downloading_lines = [l for l in lines if l.startswith("Downloading")]
|
||
if downloading_lines:
|
||
progress_text = downloading_lines[-1]
|
||
elif lines:
|
||
progress_text = lines[-1]
|
||
except Exception:
|
||
pass
|
||
else:
|
||
try:
|
||
alive = subprocess.run(check_cmd, timeout=10, capture_output=True)
|
||
is_alive = alive.returncode == 0
|
||
except Exception:
|
||
is_alive = False
|
||
|
||
# Capture last lines for progress. Prefer the "Downloading" line
|
||
# (real aggregate bytes) over "Fetching N files" (whole-file count that
|
||
# lags with hf_transfer). Falls back to the true last line otherwise.
|
||
if is_alive:
|
||
try:
|
||
cap = subprocess.run(capture_cmd, timeout=10, capture_output=True, text=True)
|
||
if cap.returncode == 0:
|
||
full_snapshot = cap.stdout.strip()
|
||
lines = [l.strip() for l in full_snapshot.split('\n') if l.strip()]
|
||
downloading_lines = [l for l in lines if l.startswith("Downloading")]
|
||
if downloading_lines:
|
||
progress_text = downloading_lines[-1]
|
||
elif lines:
|
||
progress_text = lines[-1]
|
||
except Exception:
|
||
pass
|
||
|
||
# Determine status. For the local-Windows detached model the log file
|
||
# persists after the process exits, so a finished download still has a
|
||
# snapshot to classify (DOWNLOAD_OK / exit marker) — evaluate it even
|
||
# when the PID is gone instead of blindly reporting "stopped".
|
||
download_zero_files = False
|
||
status = "unknown"
|
||
if is_alive or (local_win_task and full_snapshot):
|
||
lower = full_snapshot.lower()
|
||
exit_match = re.search(r"=== process exited with code\s+(-?\d+)", full_snapshot, re.I)
|
||
has_exit = exit_match is not None
|
||
exit_code = int(exit_match.group(1)) if exit_match else None
|
||
has_error = "error" in lower or "failed" in lower or "traceback" in lower
|
||
if has_exit and task_type == "serve":
|
||
# Serve tasks that exit are always errors — they should run indefinitely
|
||
status = "error"
|
||
elif has_exit and task_type == "download":
|
||
# Dependency installs are tracked as download tasks but only
|
||
# emit the generic runner exit marker, not HF download markers.
|
||
status = "completed" if exit_code == 0 else "error"
|
||
elif has_exit and "unrecognized arguments" in lower:
|
||
status = "error"
|
||
elif has_error and not ("application startup complete" in lower):
|
||
status = "error"
|
||
elif task_type == "download" and ("100%" in full_snapshot or "DOWNLOAD_OK" in full_snapshot):
|
||
# Only download tasks treat 100% as "completed".
|
||
# Serve tasks log 100%|██████| during inference progress
|
||
# (diffusion sampling, etc.) — that's "running", not done.
|
||
if re.search(r"Fetching\s+0\s+files", full_snapshot, re.IGNORECASE):
|
||
status = "error"
|
||
download_zero_files = True
|
||
else:
|
||
status = "completed"
|
||
elif "application startup complete" in lower:
|
||
status = "ready"
|
||
elif not is_alive:
|
||
# local-Windows: process gone, log has no success/ready marker.
|
||
status = "stopped"
|
||
else:
|
||
status = "running"
|
||
else:
|
||
# Session is dead — check if it completed or crashed
|
||
if task_type == "download" and _download_cache_complete(_payload.get("repo_id") or model, remote, str(_tport or "")):
|
||
status = "completed"
|
||
if not progress_text:
|
||
progress_text = "Download complete"
|
||
if not full_snapshot:
|
||
full_snapshot = "DOWNLOAD_OK"
|
||
else:
|
||
status = "stopped"
|
||
|
||
# Parse structured phase info — single source of truth for the UI
|
||
phase_info = _parse_serve_phase(full_snapshot, task_type) if (task_type == "serve" and status == "running" and full_snapshot) else {}
|
||
if phase_info.get("status") == "ready":
|
||
status = "ready"
|
||
serve_phase = phase_info.get("phase", "")
|
||
diagnosis = _diagnose_serve_output(full_snapshot) if task_type == "serve" and full_snapshot else None
|
||
if diagnosis and status in {"running", "unknown", "stopped"}:
|
||
status = "error"
|
||
if download_zero_files:
|
||
diagnosis = {"message": "No matching files were downloaded. The model repo or filename/quant pattern may be wrong (for example a ':Q4_K_M' tag that does not exist in the repo). Check the repo and the include/quant pattern."}
|
||
output_tail = "\n".join(full_snapshot.splitlines()[-12:]) if full_snapshot else ""
|
||
|
||
results.append({
|
||
"session_id": session_id,
|
||
"type": task_type,
|
||
"model": model.split("/")[-1] if "/" in model else model,
|
||
"status": status,
|
||
"progress": serve_phase if task_type == "serve" else progress_text[:120],
|
||
"phase": serve_phase,
|
||
"diagnosis": diagnosis,
|
||
"output_tail": output_tail,
|
||
"cmd": _payload.get("_cmd") or "",
|
||
"tps": phase_info.get("tps"),
|
||
"reqs": phase_info.get("reqs"),
|
||
"pct": phase_info.get("pct"),
|
||
"remote": remote or "local",
|
||
})
|
||
|
||
return {"tasks": results}
|
||
|
||
return router
|