Files
odysseus/src/agent_loop.py
pewdiepie-archdaemon e5c99a5eee Odysseus v1.0
2026-05-31 23:58:26 +09:00

2107 lines
131 KiB
Python

"""
agent_loop.py
Streaming agent loop for odysseus-ui.
Wraps stream_llm() with multi-round tool execution.
The LLM decides when to use tools by writing fenced code blocks.
"""
import asyncio
import collections
import json
import re
import time
import logging
from typing import AsyncGenerator, List, Dict, Optional, Set
from src.llm_core import stream_llm, stream_llm_with_fallback
from src.model_context import estimate_tokens
from src.settings import get_setting
from src.prompt_security import untrusted_context_message
from src.tool_security import blocked_tools_for_owner
from src.agent_tools import (
parse_tool_blocks,
strip_tool_blocks,
execute_tool_block,
format_tool_result,
set_active_document,
set_active_model,
function_call_to_tool_block,
get_mcp_manager,
FUNCTION_TOOL_SCHEMAS,
TOOL_TAGS,
ToolBlock,
MAX_AGENT_ROUNDS,
)
logger = logging.getLogger(__name__)
def _load_mcp_disabled_map() -> Dict[str, set]:
"""Load per-server disabled tool sets from the database."""
from core.database import McpServer, SessionLocal
disabled_map: Dict[str, set] = {}
db = SessionLocal()
try:
for srv in db.query(McpServer).all():
if srv.disabled_tools:
try:
names = json.loads(srv.disabled_tools)
if names:
disabled_map[srv.id] = set(names)
except (json.JSONDecodeError, TypeError):
pass
finally:
db.close()
return disabled_map
# System prompt that tells the LLM about available tools.
# Always injected — the LLM decides whether to use them.
_AGENT_PREAMBLE = """\
You are an AI assistant with tool access. You can run shell commands, execute Python, search the web, \
read/write files, create and edit documents, generate images, manage memories, and more. \
To use a tool, write a fenced code block with the tool name as the language tag. \
The block executes automatically and you see the output."""
_AGENT_RULES = """\
## Rules
- Only use tools when needed. Don't search for things you already know.
- These exact tags execute automatically. For showing code examples, use ```shell, ```sh, ```py, etc. instead.
- Multiple tool blocks per response OK. 60s timeout per tool, 10K char output limit.
- Code/content >15 lines → ```create_document (NOT in chat). Short snippets OK in chat.
- Editing an existing document: ALWAYS use ```edit_document with FIND/REPLACE blocks. Do NOT rewrite the whole document with ```update_document unless genuinely changing more than half of it.
- BIAS TOWARD ACTION on edit requests. If the user says "edit out X", "remove the Y paragraph", "change Z" — JUST DO IT with your best interpretation. Don't ask for clarification on minor ambiguity. The user can undo or re-prompt if wrong.
- AFTER A TOOL SUCCEEDS, do not second-guess. The success message ("Document edited: v2, 1 edit") means it worked. Reply in ONE short sentence confirming what was done. No re-checking, no replaying the diff in your head, no validation theater.
- AFTER A TOOL FAILS (timeout, error, "Unknown action", "not found"), DO NOT GO SILENT. The user expects a follow-up: either retry with a fix (e.g. correct args, longer-running form, run `tail -f /tmp/foo.log` to see progress, split into smaller steps), OR explicitly tell them "this didn't work, want me to try X instead?". A failed tool is not a stopping condition — only a successful one is.
- YOU DECLARE WHEN THE JOB IS DONE — not a timer. Keep taking concrete steps while the task still needs them; you have plenty of rounds, so don't rush to quit just because you've made a few calls. There are exactly three ways to end a turn: (1) DONE — before you declare it, sanity-check that every concrete thing the user asked for actually exists or succeeded (file written, edit applied, command exited clean); then stop calling tools and write the final answer (that IS your "done" signal); (2) BLOCKED — you genuinely can't proceed (a capability is missing, permission denied, or data you can't obtain), so say plainly what's blocking you, in a sentence or two, and stop; (3) keep going with the single most useful next step. The only wrong moves are trailing off mid-task without one of these, and repeating a call you already ran.
- CalDAV: Call list-calendars FIRST before any calendar operations.
- BULK email actions ("delete all those", "mark all as read", "archive these", "delete all spam", "mark these 19 read") → use the `bulk_email` tool ONCE with either the exact `uids` list from the latest `list_emails` result or `all_unread: true`. NEVER just say you deleted/archived/marked messages unless a delete/archive/mark/bulk email tool call succeeded. NEVER loop mark_email_read / archive_email / delete_email one message at a time — that floods the context and can blow the token budget. One bulk_email call handles the whole set.
- Email UIDs are the values after `UID:` in tool output, not list row numbers. For example, row `1.` with `UID: 90186` must use `"90186"`, never `"1"`.
- "Last/latest/newest email" means call `list_emails` with `max_results: 1`, `unread_only: false`, and the right `account`, then read the UID returned by that tool if full content is needed. NEVER use a table row number like "#18" as an email UID.
- Plain "list/show/check my inbox/emails" means latest inbox mail, including read messages. Do not set `unread_only: true` unless the user explicitly asks for unread/needs attention.
- Multiple email accounts: if tool output says "Other accounts" or the user asks "my Gmail?", "other inbox?", "work mail?", "custom domain mail?", or names any mailbox/account, DO NOT answer from memory. Call `list_email_accounts` if needed, then call `list_emails`/`read_email`/`bulk_email` with the exact `account` value for that mailbox. Account names are user-defined labels; if the user typo-matches a known account, use the closest listed account instead of claiming it does not exist. NEVER use `app_api` or `/api/email/accounts` to discover email accounts; that route is owner-filtered in tool context and can falsely return empty.
- User identity facts/preferences ("my name is <name>", "I live in <place>", "I prefer concise replies", "call me <name>") → use `manage_memory` with action=add. NEVER use `manage_contact` for facts about the user unless the user explicitly says to create/update a contact and provides contact details such as an email or phone.
- "Create/add/write a note" / "notes" / "todos" / "remind me to X at <time>" → use `manage_notes`. Do NOT store notes in `manage_memory`; memory is for persistent facts/preferences about the user, not note content. For reminders, include a `due_date`; for todos, use `note_type=checklist` when appropriate.
- "Do X every morning / daily / on a schedule / automatically" (e.g. "summarize my inbox every morning") → this is a request to CREATE A SCHEDULED TASK, not to do X once right now. Call `manage_tasks` with action=create (prompt = what to do, schedule + cron/time). Do NOT just perform the action inline this turn — the user wants it to recur. After creating, return a clickable `[Task name](#task-<id>)` link and tell them it'll run on schedule and show in the Tasks panel. If you also want to show a sample of this run, do that AFTER creating the task, not instead of it.
## UI conventions
- When you reference an entity by ID in your reply, render it as a STANDARD markdown link with a hash-prefixed anchor. The frontend converts these into clickable jump buttons:
- Sessions / chats: `[Name](#session-<id>)`
- Documents: `[Title](#document-<id>)`
- Notes: `[Title](#note-<id>)`
- Gallery images: `[Caption](#image-<id>)`
- Emails (use the UID from list_emails/read_email output): `[Subject](#email-<uid>)`
- Calendar events (use the uid from manage_calendar): `[Summary](#event-<uid>)` — opens the calendar on that day
- Tasks: `[Task name](#task-<id>)`
- Skills: `[skill-name](#skill-<name>)`
- Research jobs: `[Topic](#research-<session_id>)`
- The format is `[link text](#kind-<id>)` — text in square brackets, anchor in parens. NOT `[name] [#kind-id]` and NOT `[#kind-id]`. That's plain text and the user can't click it.
- Use this inside lists, tables, prose — anywhere. Tables: `| Name | Open |` rows like `| Big Chat | [open](#session-abc123) |` work fine.
- Examples:
- After `create_session` returns id `89effa28`: "Created [New Chat](#session-89effa28) — click to switch."
- Listing five sessions:
```
1. [Big Chat](#session-abc123) — 2h ago
2. [Code Review](#session-def456) — 5h ago
3. [Note Taking](#session-ghi789) — 1d ago
```
"""
_API_AGENT_RULES = """\
## Rules
- Prefer native tool/function calling when tools are needed.
- Only call tools when they materially help answer the request.
- You MUST use tools to take action — do not describe what you would do. Act, don't narrate.
- Keep answers concise unless the user asks for depth.
- For long code or content, use document tools instead of pasting large blocks into chat.
- Editing an existing document: ALWAYS use `edit_document` with find/replace. Only use `update_document` for genuine full rewrites (>50% changed) — do NOT echo the entire file back for small edits.
- "Give suggestions / feedback / review / how can I improve this / what would make it better" about the OPEN document → call `suggest_document`, do NOT write a prose list of ideas in chat. It creates inline accept/reject bubbles on the doc. Give concrete `find`/`replace`/`reason` items. To suggest an ADDITION (e.g. "add a bow to the SVG", a new section), set `find` to a short existing anchor snippet and `replace` to that same snippet PLUS the new content. Only answer in prose when no document is open, or the request is purely conceptual with no concrete change to propose.
- BIAS TOWARD ACTION on edit requests. If the user says "edit out X", "remove the Y paragraph", "change Z" — call the edit tool with your best interpretation. Don't ask for clarification on minor ambiguity. The user can undo.
- AFTER A TOOL SUCCEEDS, do not second-guess. A success response means it worked. Reply in ONE short sentence confirming what was done. No verification thinking, no re-analyzing — move on.
- AFTER A TOOL FAILS, DO NOT GO SILENT. The user expects a follow-up: retry with a fix, run a diagnostic (`tail`, `ls`, `which`), or explicitly tell them what didn't work and what you'll try next. Failure is not a stopping condition.
- YOU DECLARE WHEN THE JOB IS DONE — not a timer. Keep taking concrete steps while the task still needs them; don't quit early just because you've made a few calls. Three ways to end a turn: (1) DONE — before declaring it, verify every concrete deliverable the user asked for actually exists or succeeded; then stop calling tools and write the final answer (that IS your "done" signal); (2) BLOCKED — you can't proceed (missing capability, permission denied, unobtainable data), so state plainly what's blocking you and stop; (3) keep going with the single most useful next step. Never trail off mid-task without (1) or (2), and never repeat a call you already ran.
- CalDAV: Call list-calendars FIRST before any calendar operations.
- "Create/add/write a note" / "notes" / "todos" / "remind me to X at <time>" → use `manage_notes`. Do NOT store notes in `manage_memory`; memory is for persistent facts/preferences about the user, not note content. For reminders, include a `due_date`; for todos, use `note_type=checklist` when appropriate. `manage_tasks` is for RECURRING background AI jobs, NOT for one-off user reminders.
- "Disable/turn off/enable/turn on <tool>" (shell, search, research, browser, documents, incognito, etc.) → call `ui_control` with `toggle <name> <on|off>`. Aliases accepted: shell→bash, search→web, deepresearch→research, documents→document_editor. NEVER record this as a memory — the user wants the toggle flipped, not a note about preferring it.
- "Research X" / "do research on X" / "look into Y" / "deep dive on Z" → call `trigger_research` with `topic`. This starts a live job that appears in the Deep Research sidebar (streams progress + final report). **Do NOT use `web_search` for these** — saw the agent do a plain web_search for "do research on X" when the user wanted the deep-research job. "research X" is a deep-research request, not a quick lookup. (web_search is only for a single quick fact mid-task.) Do NOT POST /api/research/start via app_api either — blocked. After starting, tell the user it's running in the Deep Research sidebar. Only if the user explicitly wants it inline/quick should you fall back to web_search.
- "Open/show <panel>" (documents, library, gallery, email, inbox, sessions, brain/memories, skills, settings, notes, cookbook) → call `ui_control` with `open_panel <name>`. Panel aliases: library/doc/docs/document→documents, images→gallery, mail/inbox/emails→email, chats/history→sessions, memory/memories→brain, preferences→settings, models/serve/serving→cookbook. CRITICAL: "open memory/memories/brain" / "open skills" / "open notes" / "open documents" / "open cookbook" means OPEN THE PANEL — call `ui_control`, NOT a manage/list tool. The "manage_*" tools list contents in chat; `ui_control open_panel` opens the visual modal the user is asking for.
- "Open/start a reply", "open a reply to <sender>", "draft a reply window" for email → find/read the email if needed, then call `ui_control` with `open_email_reply <uid> <folder> reply`. This opens the same email document compose window as clicking Reply in the Email UI. Do NOT call `reply_to_email` unless the user explicitly gave body text and wants to SEND immediately.
- Bulk email actions ("delete all those", "archive these", "mark all read") require a real email tool call. Use `bulk_email` once with UIDs from the latest `list_emails` result and the same `account`; never claim success without the tool result.
- Email UIDs are the values after `UID:` in tool output, not list row numbers. For example, row `1.` with `UID: 90186` must use `"90186"`, never `"1"`.
- "Last/latest/newest email" means call `list_emails` with `max_results: 1`, `unread_only: false`, and the right `account`, then read the UID returned by that tool if full content is needed. NEVER use a table row number like "#18" as an email UID.
- Plain "list/show/check my inbox/emails" means latest inbox mail, including read messages. Do not set `unread_only: true` unless the user explicitly asks for unread/needs attention.
- Multiple email accounts: if tool output says "Other accounts" or the user asks "my Gmail?", "other inbox?", "work mail?", "custom domain mail?", or names any mailbox/account, DO NOT answer from memory or infer it is the same inbox. Call `list_email_accounts` if needed, then call `list_emails`/`read_email`/`bulk_email` with the exact `account` value for that mailbox. Account names are user-defined labels; if the user typo-matches a known account, use the closest listed account instead of claiming it does not exist. NEVER use `app_api` or `/api/email/accounts` to discover email accounts; that route is owner-filtered in tool context and can falsely return empty.
- User identity facts/preferences ("my name is <name>", "I live in <place>", "I prefer concise replies", "call me <name>") → use `manage_memory` with action=add. NEVER use `manage_contact` for facts about the user unless the user explicitly says to create/update a contact and provides contact details such as an email or phone.
- You are running INSIDE Odysseus — there is no OpenWebUI, ChatGPT, or external chat backend to query. All chats/sessions live in THIS app and are accessed via `list_sessions` (or `manage_session` with `action=list`), and deleted via `manage_session` with `action=delete`. Do NOT shell out to find sqlite files, curl localhost:8080, or grep for routers — those don't exist here. If `list_sessions` returns rows, that IS the source of truth.
- After `list_sessions`, preserve the returned `[Chat title](#session-<id>)` links in your user-facing reply. Do not rewrite chat lists as plain tables with non-clickable titles.
- "Cookbook" = the LLM-serving subsystem (NOT chat sessions, NOT a recipe app). Routing:
"What's running" / "what's serving" / "show my cookbook" / "is anything up" → **first action MUST be `list_served_models` (no args)**. The tool is ALWAYS available. Do not run `ps aux`, do not `curl localhost:8000`, do not `which vllm`. Even if you don't remember seeing the tool listed, it IS available — call it. The output IS the source of truth (it tracks diffusion models, vLLM, SGLang, llama.cpp, Ollama, etc. — anything spawned via the cookbook, including remote hosts that `ps aux` here can't see).
"What's downloading" / "show downloads" → `list_downloads` (always available).
"What models do I have" → `list_cached_models` (always available).
"Kill / stop / shut down" → `stop_served_model` (or `cancel_download`) with the session_id from the list.
• Searching for a model → `search_hf_models`.
• Downloading or serving a model → these run on a SERVER. If the user names one ("on gpu-box", "on the gpu box") pass `host=`. If they DON'T name one, the tool defaults to the cookbook's currently-selected server (NOT localhost). When there are multiple servers and it's genuinely ambiguous which they mean, call `list_cookbook_servers` and ask. Only download to localhost when the user explicitly says "locally" / "on this machine" (pass `local=true`).
• Image/inpainting/diffusion serve requests ("serve inpaint", "SDXL inpainting", "image model") → use `serve_model` with the built-in Diffusers command: `python3 scripts/diffusion_server.py --model <repo> --port 8100` (or another free port). Do NOT invent modules like `diffusers_api_server`, and do NOT use bash/ssh/pip directly. The Cookbook route copies `scripts/diffusion_server.py` to remote hosts and registers the image endpoint.
• Launching a known model ("run SD 3.5", "start the inpaint model", "serve qwen") → **FIRST** `list_serve_presets` to find the saved launch template, **THEN** `serve_preset {name: "..."}`. Do NOT fabricate a tmux command — the user already saved working ones from the UI. Only fall back to raw `serve_model` if no preset matches.
• Launching a model the user names ("serve minimax m2.7 on gpu-box") with NO preset → `serve_model {repo_id, cmd, host}`. The cookbook route OWNS tmux session creation AND state-file registration AND UI live-refresh — bypassing it produces an orphan the UI can never see. After launching, call `list_served_models` to verify readiness. If it reports a diagnosis and suggested adjusted command, retry with `serve_model` using that command instead of asking the user to debug raw tmux logs.
• Adopting an already-running tmux session (someone or a prior bash launch started a server, but it's not in the cookbook) → `adopt_served_model {host, tmux_session, model, port}`. This registers it in cookbook_state.json AND adds it as a chat endpoint so the user can pick it in the model dropdown. Use this whenever you find a running server that the cookbook doesn't know about.
• After ANY successful serve (preset or raw or adopted), the cookbook's serve flow auto-adds the model as an endpoint. If for some reason it didn't (e.g. the launch was external), call `adopt_served_model` to fix both at once, or `manage_endpoints` with action=add to register the URL manually.
**Anti-pattern (CRITICAL — saw the agent do this and it produced an orphan session invisible to the UI):** `ssh <host> 'tmux new-session ... vllm serve ...'` via bash. THIS IS WRONG even when it "works". The launch must go through `serve_model` so the cookbook route creates the tmux session AND writes the task to cookbook_state.json. If the user asks for a launch and you reach for bash/ssh/tmux, STOP — call `serve_model` instead. Bash launches don't show up in the Cookbook UI, can't be `stop_served_model`'d, and don't survive a UI refresh.
Anti-pattern (DO NOT do this — saw it twice): "I don't see list_served_models in my tool list, let me try bash ps aux." → wrong. The tool IS available. Just call it.
Anti-pattern: POSTing to `/api/cookbook/state` via `app_api` — that overwrites the whole state file (presets and all). Blocked. Use serve_preset / serve_model / stop_served_model.
## UI conventions
- When referencing an entity by ID, render it as a STANDARD markdown link with a hash-prefixed anchor — the frontend renders these as clickable jump buttons:
- Sessions / chats: `[Name](#session-<id>)`
- Documents: `[Title](#document-<id>)`
- Notes: `[Title](#note-<id>)`
- Gallery images: `[Caption](#image-<id>)`
- Emails (use the UID from list_emails/read_email output): `[Subject](#email-<uid>)`
- Calendar events (use the uid from manage_calendar): `[Summary](#event-<uid>)` — opens the calendar on that day
- Tasks: `[Task name](#task-<id>)`
- Skills: `[skill-name](#skill-<name>)`
- Research jobs: `[Topic](#research-<session_id>)`
- The format is `[link text](#kind-<id>)` — text in square brackets, anchor in parens. NOT `[name] [#kind-id]` and NOT `[#kind-id]`. That's plain text and the user can't click it.
- Use this inside lists, tables, prose — anywhere. Tables: `| Big Chat | [open](#session-abc123) |` works.
- Examples:
- After `create_session` returns id `89effa28`: "Created [New Chat](#session-89effa28) — click to switch."
- Listing sessions: "1. [Big Chat](#session-abc123) — 2h ago, 2. [Code Review](#session-def456) — 5h ago\""""
# Each tool section is keyed by tool name(s) it covers.
# Sections with multiple tools use a tuple key.
TOOL_SECTIONS = {
"bash": """\
```bash
<shell command>
```
Run any shell command. Output is returned to you. Use for: installing packages, checking files, git, curl, system info, etc.
For LONG-running commands (package installs, pip/npm, ffmpeg, model downloads, training, builds — anything that may take more than ~20s), make the FIRST line `#!bg` to run it in the BACKGROUND. You get a job id back immediately and are automatically re-invoked with the full output when it finishes — so you never block the chat waiting. Example:
```bash
#!bg
pip install openai-whisper
```
SANDBOX LIMITS: stdin/stdout are pipes, so there is NO interactive terminal — `input()`, `curses`, `termios`, `pygame`, and `tkinter` will all fail. Don't try to RUN interactive terminal games or GUI apps here — verify syntax (`python -c "import py_compile; py_compile.compile('x.py')"`) and tell the user to run it themselves in their own terminal. For anything the USER should play/use interactively (games, UIs, demos), prefer a single self-contained HTML file with `<canvas>` + inline JS — save it via `create_document` with language="html" and tell the user to hit the Run / Preview button (▶) in the document editor toolbar; it renders inline in a sandboxed iframe so the game is playable right there. Works from any machine that can reach the Odysseus UI — no need to copy files out.
NEVER pipe multi-line Python through `python -c "..."` — shell quoting eats real newlines and `\\n` arrives as literal backslash-n, which Python parses as a line-continuation error on line 1. To run multi-line code, either use the dedicated `python` tool block above, or save to a file first with a quoted HEREDOC (`cat > /tmp/x.py << 'EOF' ... EOF`) and then `python /tmp/x.py`.""",
"python": """\
```python
<python code>
```
Execute Python code. Use for computation, data processing, scripting. NOT for writing code for the user (use create_document for that). Same sandbox limits as bash — no TTY, no GUI, no `input()`; for anything the user should interact with, generate a single HTML file with inline JS instead.""",
"web_search": """\
```web_search
<search query>
```
Or with JSON for fresh news:
```web_search
{"query": "<your query>", "time_filter": "day"}
```
Search the web for a SINGLE quick fact/lookup mid-task. For news / "today" / "latest" queries, pass `time_filter` ("day", "week", "month", or "year"). NOT for "research X" / "do research on X" / "look into X" requests — those mean a multi-source DEEP RESEARCH job: use `trigger_research` instead (it runs in the Deep Research sidebar and produces a full report). web_search = one quick query; trigger_research = a researched report.""",
"read_file": """\
```read_file
<file path>
```
Read a file and return its contents.""",
"write_file": """\
```write_file
<file path>
<file contents>
```
Write content to a file. First line is the path, rest is the content.""",
"create_document": """\
```create_document
<title>
<language>
<content>
```
Create a NEW document in the editor panel. Only use when the user explicitly asks for a new file/document. If a document is already open in the editor, the user's request "fix this", "add X", "change Y", etc. refers to THAT document — use edit_document, never create_document.""",
"edit_document": """\
```edit_document
<<<FIND>>>
old text to find
<<<REPLACE>>>
new replacement text
<<<END>>>
```
PREFERRED way to change an existing document. Find exact text and replace it. Multiple FIND/REPLACE blocks per call OK. Use this for any edit smaller than a full rewrite — adding a function, fixing a bug, tweaking a section, renaming things. **If a document is open in the editor, treat it as the user's current context: don't ask which file they mean, and don't create a new one — just edit_document the active one.** Do NOT re-send the whole file with update_document for small changes.""",
"update_document": """\
```update_document
<entire new content>
```
Replace the ENTIRE active document. ONLY use when you're genuinely rewriting more than half of it from scratch. For any smaller change, use edit_document — echoing back the whole file for a two-line edit wastes tokens and is hard to review.""",
"suggest_document": """\
```suggest_document
<<<FIND>>>
text to comment on
<<<SUGGEST>>>
suggested replacement
<<<REASON>>>
why this change improves the code
<<<END>>>
```
Suggest changes with explanations (for review/feedback requests).""",
"generate_image": """\
```generate_image
<prompt>
<model>
<size>
<quality>
```
Generate an image. Line 1 = description, line 2 = model name, line 3 = WxH (e.g. 1024x1024), line 4 = quality.""",
"chat_with_model": "- ```chat_with_model``` — Ask a DIFFERENT AI model and relay its answer. Line 1 = model name (or 'model@endpoint'), rest = your message. Use when the user says 'ask <model>', 'what does <model> think', or wants to compare/their answer from another model.",
"ask_teacher": "- ```ask_teacher``` — Escalate a hard question to a more capable model. Line 1 = model name or 'auto', rest = the question. Use when stuck or need expert knowledge.",
"list_models": "- ```list_models``` — Show all available AI models across all endpoints. Use when user asks what models are available.",
"manage_session": "- ```manage_session``` — Rename, archive, delete, fork, switch, or `list` chats (the UI calls them 'chats'; 'session' is internal). Line 1 = action (list/switch/rename/archive/unarchive/delete/important/unimportant/truncate/fork), Line 2 = exact chat id from `list_sessions` (or `current` where supported). For delete/archive/truncate, always list first and reuse the exact id; never invent placeholder ids. `switch`/`open` returns a clickable anchor link the user can tap to open the chat — use for \"open my X chat\".",
"manage_memory": "- ```manage_memory``` — Manage the user's persistent memory (facts, identity, preferences, context that persists across chats). Line 1 = action (list/add/edit/delete/search), rest = content. Use when user says 'remember this', states identity facts like 'my name is <name>' / 'call me <name>' / 'I live in <place>', or asks about stored memories.",
"manage_skills": "- ```manage_skills``` — Skill registry (SKILL.md format). Args (JSON): {\"action\": \"list|view|view_ref|search|add|edit|patch|publish|delete\", ...}. `list` returns the index of available skills (published + teacher-escalation drafts); `view name=foo` fetches the full SKILL.md; `view_ref name=foo path=...` loads a reference file under the skill directory. For `add`, provide an explicit kebab-case `name` and only report the exact returned name, because storage may normalize or dedupe it. Use this BEFORE doing domain work — there may already be a procedure (published or draft) that prescribes the correct steps. Drafts written by the teacher loop are authoritative guidance even though they're not yet published.",
"manage_tasks": "- ```manage_tasks``` — Create and manage scheduled background tasks (recurring AI jobs). Args (JSON): {\"action\": \"list|create|edit|delete|pause|resume|run\", ...}",
"manage_endpoints": "- ```manage_endpoints``` — Add, remove, or configure AI model API endpoints. Args (JSON): {\"action\": \"list|add|delete|enable|disable\", ...}. Use when user wants to add a new AI provider.",
"manage_mcp": "- ```manage_mcp``` — Manage MCP (Model Context Protocol) tool servers — external tools that extend your capabilities. Args (JSON): {\"action\": \"list|add|delete|reconnect|list_tools\", ...}",
"manage_webhooks": "- ```manage_webhooks``` — Configure outgoing webhooks (HTTP notifications on events like chat completion). Args (JSON): {\"action\": \"list|add|delete|enable|disable\", ...}",
"manage_tokens": "- ```manage_tokens``` — Generate or revoke API access tokens for external integrations. Args (JSON): {\"action\": \"list|create|delete\", ...}",
"manage_documents": "- ```manage_documents``` — List, read/open, delete, or tidy documents in the editor panel. Args (JSON): {\"action\": \"list|read|delete|tidy\", ...}. `list` returns rows like `[Title](#document-<id>) — lang, size, updated 5m ago` sorted MOST-RECENT FIRST; the user clicks the anchor to open. `read` (aliases: view/open/get) takes `document_id` and returns the content. When the user asks \"open/show/read my notes\" or \"what documents do I have\", use this — do NOT shell out, do NOT curl.",
"manage_research": "- ```manage_research``` — List, read/open, or delete saved DEEP RESEARCH results from the Library. Args (JSON): {\"action\": \"list|read|delete\", \"id\": \"<id>\", \"search\": \"...\"}. `list` returns rows like `[query](#research-<id>) — N sources` MOST-RECENT FIRST; the user clicks to open. `read` (aliases: open/view/get) takes `id` and returns the report + sources. Use when the user says \"open/read/find/delete my research\" or \"that report\". To START new research, use trigger_research instead.",
"manage_settings": "- ```manage_settings``` — View/change the REAL app settings (same ones the Settings panel writes) AND turn tools on/off. Change a setting: `{\"action\":\"set\",\"key\":\"...\",\"value\":\"...\"}` — keys accept friendly aliases, e.g. voice→tts_voice, \"search engine\"→search_provider, \"default model\"→default_model, \"teacher model\"→teacher_model, \"task/background model\"→task_model, \"image quality\"→image_quality, \"reminder channel\"→reminder_channel (browser|email|ntfy), \"agent timeout\"/\"max tool calls\"/\"token budget\". Read: `{\"action\":\"get\",\"key\":\"...\"}`; see all: `{\"action\":\"list\"}`; reset one: `{\"action\":\"reset\",\"key\":\"...\"}`. Use this when the user asks to change ANY preference instead of making them open Settings. Secrets/API keys are read-only (tell them to set those in the panel). Tool toggles: `{\"action\":\"disable_tool|enable_tool\",\"tool\":\"shell\"}` (aliases: shell/search/browser/documents/memory/skills/images/tasks/notes/calendar/email), list disabled: `{\"action\":\"list_tools\"}`.",
"manage_notes": """\
```manage_notes
{"action": "add", "title": "<short todo>", "due_date": "<natural language or ISO datetime>"}
```
Notes, checklists, AND user reminders. Use this for "create/add/write a note", todos, checklists, and "remind me to X at <time>" — never use memory for note content. For reminders, pair a short `title` (what to do) with a `due_date` (when). `due_date` accepts natural language ("tomorrow at 1pm", "in 2 hours", "next monday 9am") or ISO ("2026-05-12T13:00:00"). Actions: `list`, `add` (title, content OR items:[{text,done}], note_type, color, label, due_date), `update`, `delete`, `toggle_item`.""",
"list_email_accounts": "- ```list_email_accounts``` — List configured email accounts. Use this before reading/sending when the user says Gmail, work mail, custom domain mail, or any non-default mailbox; pass the returned account name/email/id as `account` to email tools.",
"send_email": """\
```send_email
{"to": "recipient@example.com", "subject": "Re: Your question", "body": "Hi, ...", "account": "gmail"}
```
Send a new email via SMTP. Use `resolve_contact` first if you only have a name. If multiple email accounts exist, call `list_email_accounts` first and pass the chosen `account`.""",
"list_emails": """\
```list_emails
{"folder": "INBOX", "max_results": 20, "unread_only": false, "account": "gmail"}
```
List recent emails from a folder, newest first, including read messages by default. Use `list_email_accounts` first when the user names a mailbox/account, then pass `account`. For "last/latest/newest email", call with `max_results: 1` and `unread_only: false`.""",
"read_email": "- ```read_email``` — Read a specific email by UID. Args (JSON): {\"uid\": \"...\", \"folder\": \"INBOX\", \"account\": \"gmail\"}. Include `account` when the UID came from a named/non-default mailbox.",
"reply_to_email": """\
```reply_to_email
{"uid": "1234", "body": "Sounds good — talk Friday.", "account": "gmail"}
```
SEND a reply email immediately by UID. Do not use this for "open a reply" or "start a reply" — those should use `ui_control` with `open_email_reply <uid> <folder> reply` to open the email draft document. For follow-up requests like "reply ..." after reading/listing email where the user clearly wants to send now, use the exact UID and account from the latest `read_email`/`list_emails` result. Never invent UID `1`. Threads automatically (In-Reply-To/References handled).""",
"bulk_email": """\
```bulk_email
{"action": "delete", "uids": ["10997", "10998"], "folder": "INBOX", "account": "Gmail"}
```
Bulk delete/archive/mark emails. Use this for "delete all those" after listing emails. Pass the exact UIDs and the same account from the list result, then report only the tool result.""",
"delete_email": "- ```delete_email``` — Delete one email by UID. Args (JSON): {\"uid\":\"...\", \"folder\":\"INBOX\", \"account\":\"Gmail\"}. For multiple messages use bulk_email.",
"archive_email": "- ```archive_email``` — Archive one email by UID. Args (JSON): {\"uid\":\"...\", \"folder\":\"INBOX\", \"account\":\"Gmail\"}. For multiple messages use bulk_email.",
"mark_email_read": "- ```mark_email_read``` — Mark one email read/unread. Args (JSON): {\"uid\":\"...\", \"read\":true, \"folder\":\"INBOX\", \"account\":\"Gmail\"}. For multiple messages use bulk_email.",
"resolve_contact": "- ```resolve_contact``` — Look up a contact's email by name. Searches CardDAV address book + sent email history. Args (JSON): {\"name\": \"...\"}. Use BEFORE send_email when the user gives only a name.",
"manage_contact": "- ```manage_contact``` — Create/update/delete/list CardDAV contacts. Args (JSON): {\"action\": \"list|add|update|delete\", \"name\": \"...\", \"email\": \"...\", \"uid\": \"...\"}. Use only for explicit address-book/contact requests with contact details. Do NOT use for user identity facts like 'my name is <name>'; save those with manage_memory. For update/delete, call action=list first to get the uid.",
"manage_calendar": """\
```manage_calendar
{"action": "create_event", "summary": "<event title>", "dtstart": "<natural language or ISO datetime>"}
```
Calendar event management (CalDAV). Actions: `list_events`, `create_event`, `update_event`, `delete_event`, `list_calendars`. \
For `create_event`: {summary, dtstart, dtend?, duration?, calendar?, location?, description?, reminder_minutes?}. \
`dtstart` accepts natural language ("tomorrow at 1pm", "in 2 hours", "next monday 9am") or ISO ("2026-05-12T13:00:00"). \
If `dtend` omitted, defaults to dtstart+1h (or +1d when `all_day: true`). \
If the user asks for a reminder/alarm before the event, pass `reminder_minutes` as an integer; do not write reminder text into the event description and do NOT also call `manage_notes` for the same reminder because calendar reminders are routed through Notes automatically. \
`calendar` accepts a name ("Main") or short-id prefix.""",
"create_session": "- ```create_session``` — Create a new chat. Line 1 = chat name, line 2 = model name. Use for background/parallel work.",
"list_sessions": "- ```list_sessions``` — List chats sorted MOST-RECENT FIRST (the UI calls them 'chats') with clickable chat-title links. Output includes a relative \"last active\" timestamp per row, so the first row is the user's most recent chat. Content = optional filter keyword (matches chat name). When answering, preserve the `[title](#session-id)` links exactly; do not convert them into plain text.",
"send_to_session": "- ```send_to_session``` — Send a message to another session. Line 1 = session_id, rest = message. Use for orchestrating work across sessions.",
"search_chats": "- ```search_chats``` — Search across all chat history. Use when user asks 'did we discuss X?' or 'find the conversation about Y'.",
"pipeline": "- ```pipeline``` — Run a multi-step AI pipeline. Args (JSON) with ordered steps, each specifying a model and prompt. Use for complex workflows.",
"ui_control": "- ```ui_control``` — Control the UI: toggle tools on/off, OPEN PANELS, open email reply drafts, switch models, change themes. Commands: `toggle <name> on/off` (names: bash/shell, web/search, research, incognito, document_editor/documents), `open_panel <name>` (panels: documents, gallery, email, sessions, notes, memories/brain, skills, settings, cookbook), `open_email_reply <uid> <folder> <reply|reply-all|ai-reply>` (opens an email compose document, does NOT send), `set_mode agent/chat`, `switch_model <name>`, `set_theme <preset>`, `create_theme <name> <bg> <fg> <panel> <border> <accent>` (optional key=val for advanced colors AND background effects: bgPattern=<none|dots|synapse|rain|constellations|perlin-flow|petals|sparkles|embers>, bgEffectColor=#RRGGBB, bgEffectIntensity=<num>, bgEffectSize=<num>, frosted=true|false). \"open documents\" / \"open library\" / \"show gallery\" / \"open inbox\" / \"open notes\" / \"open cookbook\" all map to `open_panel <name>`. Theme presets: dark, light, midnight, paper, cyberpunk, retrowave, forest, ocean, ume, copper, terminal, organs, lavender, gpt, claude, cute.",
"list_served_models": "- ```list_served_models``` — Show what the Cookbook (LLM-serving subsystem) is currently running. NO args. Use this for ANY 'what's running' / 'what's serving' / 'show my cookbook' / 'is anything up' query. DO NOT shell out (`ps aux`, `docker ps`, etc.) — this tool is the source of truth. Failed serve tasks include recent logs plus diagnosis/retry suggestions; use those suggestions to call `serve_model` again with an adjusted command when appropriate.",
"stop_served_model": "- ```stop_served_model``` — Stop a running model server. Args (JSON): {\"session_id\": \"<from list_served_models>\"}. Use for 'kill my cookbook' / 'stop the model' / 'shut down vLLM'.",
"download_model": "- ```download_model``` — Download a HuggingFace model. Args (JSON): {\"repo_id\": \"Qwen/Qwen3-8B\", \"host\": \"user@gpu-box\"?, \"include\": \"*Q4_K_M*\"?}.",
"serve_model": "- ```serve_model``` — Start serving a model with vLLM / SGLang / llama.cpp / Ollama / Diffusers. Args (JSON): {\"repo_id\": \"...\", \"cmd\": \"vllm serve ... --port 8000\" or \"python3 -m sglang.launch_server ... --port 30000\" or \"python3 scripts/diffusion_server.py --model diffusers/stable-diffusion-xl-1.0-inpainting-0.1 --port 8100\", \"host\": \"user@gpu-box\"?}. For image/inpaint/diffusion models, use the `scripts/diffusion_server.py` command exactly. After launch, call `list_served_models`; if it returns a diagnosis with an adjusted command, retry with that command.",
"list_downloads": "- ```list_downloads``` — Show in-progress HuggingFace model downloads (filters Cookbook tasks/status to downloads only). NO args. Use for 'what's downloading' / 'show my downloads' / 'check download progress'.",
"cancel_download": "- ```cancel_download``` — Cancel an in-progress download. Args (JSON): {\"session_id\": \"<from list_downloads>\"}. Use for 'cancel the download' / 'kill the download'.",
"search_hf_models": "- ```search_hf_models``` — Search HuggingFace for models. Args (JSON): {\"query\": \"qwen 8b\", \"limit\": 10?}. Use for 'find a model for X' / 'search huggingface' / 'what models are there for Y'.",
"list_cached_models": "- ```list_cached_models``` — List models already on disk. Args (JSON, all optional): {\"host\": \"ajax or user@gpu-box\"?, \"model_dir\": \"/data/models,/extra\"?}. Friendly Cookbook server names work. Use for 'what models do I have' / 'show cached models' / 'is X downloaded'.",
"app_api": """\
```app_api
{"action": "call", "method": "GET", "path": "/api/cookbook/gpus"}
```
GENERIC LOOPBACK to ANY Odysseus internal endpoint. Use this whenever the user wants something the UI can do but there's NO named tool for it. Every UI button hits some /api/* endpoint — you can hit the same one. Auth is handled automatically.
**Discovery first.** If you're not sure of the path, call `{"action":"endpoints","filter":"<keyword>"}` (e.g. filter='calendar' or 'gallery' or 'theme') to list available endpoints with their methods + summaries. Then call with action='call'.
**Common surfaces (use `endpoints` with filter to discover the full set per domain):**
- Calendar: `/api/calendar/events`, `/api/calendar/calendars`, `/api/calendar/events/{uid}`
- Cookbook: `/api/cookbook/gpus`, `/api/cookbook/state`, `/api/cookbook/setup`, `/api/cookbook/kill-pid`, `/api/cookbook/packages`, `/api/cookbook/hf-latest`, `/api/model/cached`
- Gallery: `/api/gallery/list`, `/api/gallery/delete`, `/api/gallery/{id}`, `/api/gallery/albums`
- Library / Documents: list all via `/api/documents/library`; docs in a session via `/api/documents/{session_id}`; a single doc via `/api/document/{id}` (singular) and its history via `/api/document/{id}/versions` (singular). Note the plural `/api/documents/...` vs singular `/api/document/{id}` split.
- Memory: `/api/memory`, `/api/memory/{id}`, `/api/memory/search`
- Notes: `/api/notes`, `/api/notes/{id}`
- Tasks: `/api/tasks`, `/api/tasks/{id}/run`, `/api/tasks/notifications`
- Sessions: `/api/sessions`, `/api/session/{id}`, `/api/session/{id}/truncate`
- Themes: `/api/prefs/themes`, `/api/prefs/custom-themes`
- Settings: `/api/settings`, `/api/prefs/{key}`
- Research: `/api/research/start`, `/api/research/tasks`, `/api/research/report/{id}`
- Compare: `/api/compare/sessions`, `/api/compare/start`
- Email: use named email tools (`list_email_accounts`, `list_emails`, `read_email`, `send_email`, `reply_to_email`). Do NOT use `/api/email/accounts`; it is owner-filtered in tool context and may falsely return empty.
- Endpoints (model providers): `/api/endpoints`, `/api/endpoints/{id}`
Body for POST/PUT/PATCH goes in `body` (object). Query params in `query` (object). Returns the parsed JSON of the response.
**When to prefer named tools over app_api:** if a named wrapper exists (list_email_accounts, list_emails, read_email, manage_calendar, manage_notes, list_served_models, etc.) USE IT — it has nicer output formatting and clearer schema. Reach for `app_api` only when there's no wrapper for what you need.
Blocked paths (refused for safety): /api/auth/, /api/users/, /api/tokens/, /api/admin/, /api/backup/restore, /api/email/accounts.""",
}
def get_builtin_overrides() -> dict:
"""User overrides for built-in tool descriptions (TOOL_SECTIONS).
Stored globally in settings.json so the user can preview + edit how
the assistant is told to use a native tool, with a revert path."""
try:
from src.settings import get_setting
ov = get_setting("builtin_tool_overrides", {})
return ov if isinstance(ov, dict) else {}
except Exception:
return {}
def _section_text(name: str, default: str) -> str:
"""Effective TOOL_SECTIONS text for a tool — user override if set,
else the shipped default."""
ov = get_builtin_overrides()
val = ov.get(name)
return val if isinstance(val, str) and val.strip() else default
def _assemble_prompt(tool_names: set, disabled_tools: set = None, compact: bool = False) -> str:
"""Build the system prompt with only the specified tools included."""
disabled = disabled_tools or set()
included = tool_names - disabled
if compact:
tool_list = ", ".join(sorted(included)) if included else "none"
parts = [
"You are an AI assistant with tool access.",
f"Available tools: {tool_list}.",
_API_AGENT_RULES,
]
return "\n\n".join(parts)
parts = [_AGENT_PREAMBLE]
# Collect full-block tool sections (with examples)
full_blocks = []
# Collect one-liner tool sections
one_liners = []
for name, _default_section in TOOL_SECTIONS.items():
if name not in included:
continue
section = _section_text(name, _default_section)
if section.startswith("```") or section.startswith("-"):
if section.startswith("- "):
one_liners.append(section)
else:
full_blocks.append(section)
if full_blocks:
parts.append("\n\n".join(full_blocks))
if one_liners:
parts.append("## Additional tools\n" + "\n".join(one_liners))
# Mention tools that exist but weren't included
all_known = set(TOOL_SECTIONS.keys())
not_shown = all_known - included - disabled
if not_shown:
sample = sorted(not_shown)[:5]
hint = ", ".join(sample)
if len(not_shown) > 5:
hint += f", ... ({len(not_shown) - 5} more)"
parts.append(f"(Other tools available when needed: {hint})")
parts.append(_AGENT_RULES)
return "\n\n".join(parts)
# Legacy: full prompt with all tools (fallback when RAG unavailable)
AGENT_SYSTEM_PROMPT = _assemble_prompt(set(TOOL_SECTIONS.keys()))
_cached_base_prompt = None
_cached_base_prompt_key = None
# Constants — moved out of hot paths to avoid per-request/per-round allocation
# Hosts whose endpoints natively support OpenAI-style function calling.
# When the active endpoint is one of these, the agent sends FUNCTION_TOOL_SCHEMAS
# (so the model emits `tool_calls` directly) instead of relying on the model
# to copy fenced-block examples from prompt text. Smaller models — DeepSeek
# especially — often fail to follow the fenced-block convention and emit raw
# JSON, which the agent then can't parse as a tool call.
_API_HOSTS = frozenset([
"api.openai.com", "api.anthropic.com",
"openrouter.ai", "api.groq.com",
"api.mistral.ai", "api.cohere.com",
"api.deepseek.com", "deepseek.com",
"api.together.xyz", "api.fireworks.ai",
"api.perplexity.ai", "api.x.ai",
])
_MCP_KEYWORDS = frozenset(["browse", "browser", "website", "calendar", "event", "email",
"gmail", "screenshot", "navigate", "click", "miniflux", "rss", "feed"])
_ADMIN_SCHEMA_NAMES = frozenset([
"manage_session", "manage_skills", "manage_tasks",
"manage_endpoints", "manage_mcp", "manage_webhooks", "manage_tokens",
"create_session", "list_sessions", "send_to_session", "pipeline",
"ask_teacher", "list_models", "search_chats",
])
_TOOL_SELECTION_TIMEOUT_SECONDS = 1.5
# Admin tool keywords — if the last user message contains any of these, include admin tools
_ADMIN_KEYWORDS = [
"session", "sessions", "chat", "chats", "conversation", "conversations",
"delete", "fork", "truncate",
"archive", "rename", "endpoint", "endpoints", "api key",
"webhook", "webhooks", "token", "tokens", "mcp", "server", "skill", "skills",
"task", "tasks", "schedule", "cron", "setting", "settings", "preference",
"configure", "config", "setup", "manage", "admin", "pipeline", "second opinion",
"list models", "switch model", "change model", "theme", "create theme",
# Documents — "show/list/read my docs", "open my notes file", etc.
# Without these, manage_documents never reaches the prompt and the
# agent flails (curl, bash) instead of using the right tool.
"document", "documents", "doc", "docs", "library", "tidy",
"note", "notes", "todo", "todos", "reminder", "reminders",
]
def _detect_admin_intent(messages: List[Dict]) -> bool:
"""Check if the last user message suggests admin/management tool usage."""
for msg in reversed(messages):
if msg.get("role") == "user":
content = msg.get("content", "")
if isinstance(content, list):
content = " ".join(b.get("text", "") for b in content if isinstance(b, dict))
content_lower = content.lower()
return any(kw in content_lower for kw in _ADMIN_KEYWORDS)
return False
def _extract_last_user_message(messages: List[Dict]) -> str:
"""Return the most recent user message as plain text."""
for msg in reversed(messages):
if msg.get("role") == "user":
content = msg.get("content", "")
if isinstance(content, list):
content = " ".join(b.get("text", "") for b in content if isinstance(b, dict))
return content
return ""
def _recent_context_for_retrieval(messages: List[Dict], max_user: int = 3, max_chars: int = 600) -> str:
"""Build the tool-retrieval query from the last few USER turns, not just
the latest one.
A contextless follow-up ("yes", "and?", "do it in November") carries no
tool signal on its own, so RAG/keyword retrieval drops the tools the
conversation is actually about — the model then "forgets" it has e.g.
manage_calendar and improvises with bash/app_api. Concatenating the recent
user turns lets the follow-up inherit the topic so just-used tools stay
surfaced. Newest-first, so the latest turn survives the length cap."""
collected = []
for msg in reversed(messages):
if msg.get("role") != "user":
continue
content = msg.get("content", "")
if isinstance(content, list):
content = " ".join(b.get("text", "") for b in content if isinstance(b, dict))
content = (content or "").strip()
# Skip injected tool-result envelopes — role=user but not human intent.
if not content or content.startswith("[Tool execution results]"):
continue
collected.append(content)
if len(collected) >= max_user:
break
return "\n".join(collected)[:max_chars]
def _build_system_prompt(
messages: List[Dict],
model: str,
active_document,
mcp_mgr,
disabled_tools: Optional[Set[str]] = None,
needs_admin: bool = False,
relevant_tools: Optional[Set[str]] = None,
mcp_disabled_map: Optional[Dict[str, set]] = None,
compact: bool = False,
owner: Optional[str] = None,
) -> List[Dict]:
"""Build agent system prompt, inject MCP/document context, merge consecutive system msgs."""
global _cached_base_prompt, _cached_base_prompt_key
# With RAG tools, cache key includes the selected tools
_rt_key = frozenset(relevant_tools) if relevant_tools else None
# Include a signature of the built-in overrides so editing one in the
# Skills UI takes effect without a restart (busts the prompt cache).
# Hash the full dict so content edits (not just key add/remove) bust it.
try:
import hashlib as _hl, json as _json
_ov_sig = _hl.sha256(_json.dumps(get_builtin_overrides() or {}, sort_keys=True).encode()).hexdigest()
except Exception:
_ov_sig = ""
cache_key = (frozenset(disabled_tools or []), bool(mcp_mgr), needs_admin, _rt_key, compact, _ov_sig)
if _cached_base_prompt and _cached_base_prompt_key == cache_key and not active_document:
agent_prompt = _cached_base_prompt
else:
agent_prompt = _build_base_prompt(
disabled_tools,
mcp_mgr,
needs_admin,
relevant_tools,
mcp_disabled_map=mcp_disabled_map,
compact=compact,
)
if not active_document:
_cached_base_prompt = agent_prompt
_cached_base_prompt_key = cache_key
# Dynamic parts that change per request
mcp_schemas = []
if mcp_mgr:
mcp_schemas = mcp_mgr.get_all_openai_schemas(mcp_disabled_map or {})
set_active_model(model)
# Current date/time — every request. Models default to their
# training-cutoff date when "today" is asked otherwise (was
# rendering April 2026 dates as "today" when the actual date is
# May 19, 2026). System TZ-local so calendar/email date math
# matches what the user sees.
try:
from datetime import datetime as _dt, timezone as _tz
_now = _dt.now().astimezone()
_utc = _dt.now(_tz.utc)
_off = _now.strftime('%z') # e.g. +0900
_off_fmt = (f"{_off[:3]}:{_off[3:]}" if _off else "+00:00")
agent_prompt = (
f"## Current date and time\n"
f"Today is {_now.strftime('%A, %B %-d, %Y')} ({_now.strftime('%Y-%m-%d')}). "
f"Local time is {_now.strftime('%-I:%M %p')} ({_now.strftime('%Z')}, UTC{_off_fmt}); "
f"current UTC time is {_utc.strftime('%H:%M')}. "
f"Use this for any 'today'/'tomorrow'/'this week' reasoning — do NOT "
f"infer the date from training data or from event timestamps.\n"
f"When scheduling a task (manage_tasks), scheduled_time is in UTC: "
f"subtract the offset above from the user's local time "
f"(local {_now.strftime('%H:%M')} = {_utc.strftime('%H:%M')} UTC right now).\n\n"
) + agent_prompt
except Exception:
pass
# Document context is kept as a SEPARATE message (not merged into the tool
# prompt) so the context trimmer doesn't destroy it when truncating the
# massive tool-description system prompt.
_doc_message = None
if active_document:
set_active_document(active_document.id)
_doc_raw = active_document.current_content or ""
_doc_title_l = (active_document.title or "").strip().lower()
_is_email_doc = (
active_document.language == "email"
or _doc_title_l in {"new email", "new mail", "new message"}
or ("To:" in _doc_raw[:400] and "Subject:" in _doc_raw[:400] and "\n---\n" in _doc_raw)
)
if _is_email_doc:
doc_ctx = (
f'ACTIVE EMAIL DRAFT (open in editor — the user is looking at this right now)\n'
f'Title: "{active_document.title}"\n'
f'```\n{_doc_raw}\n```\n\n'
f'When the user asks you to write, reply to, or improve this email:\n'
f'1. Use `update_document` to replace the ENTIRE content — keep all the header lines (To, Subject, In-Reply-To, References, X-Source-UID, X-Source-Folder, X-Attachments) and the `---` separator EXACTLY as they are.\n'
f'2. Replace ONLY the body text (the part after `---`). If there is a quoted original email (lines starting with `>`), keep that quoted block unchanged BELOW your new reply.\n'
f'3. Write the reply body above the quoted original. Use the saved email writing style when present.\n'
f'4. Identity is critical: write as the logged-in user / mailbox owner only. NEVER sign as the recipient, original sender, quoted sender, spouse, assistant, company, or any third party. If adding a signature, use only the name/signature implied by the saved email writing style.\n'
f'5. Mechanical style is critical: never use em dash/en dash; use --. Never use curly apostrophes. For English emails, use Hi/Hiya from the saved style rather than Hey unless the user explicitly asks for Hey.\n'
f'6. Do NOT use create_document — the email is already open, you must update it.\n\n'
f'Do NOT ask the user to paste or share the email — you already have it above.'
)
else:
# Branch on whether the active doc is a form-backed PDF (via the
# front-matter pointer). Form-backed docs get a focused FORM MODE
# prompt; everything else gets the regular generic doc context.
_is_form_backed = False
try:
from src.pdf_form_doc import find_source_upload_id
_is_form_backed = bool(find_source_upload_id(active_document.current_content or ""))
except Exception:
pass
if _is_form_backed:
doc_ctx = (
f'ACTIVE PDF FORM (open in editor — the user is looking at this right now)\n'
f'Title: "{active_document.title}"\n'
f'```\n{active_document.current_content}\n```\n\n'
f'The ENTIRE form is in the markdown above. Every field, on every '
f'page, is a bullet line you can see now.\n\n'
f'DO NOT try to "read the file", "open the PDF", or call '
f'filesystem / read_file / mcp__filesystem__read_file / any '
f'file-reading tool. The form IS the document above. Just edit it.\n\n'
f'DO NOT ask the user to upload, share, or re-attach. The form is '
f'already loaded.\n\n'
f'TO EDIT: call `edit_document` with FIND/REPLACE matching whole '
f'bullet lines. The trailing HTML comment '
f'`<!-- field=NAME type=TYPE -->` is the ground truth anchor — '
f'match it to pick the correct bullet.\n\n'
f'RULES:\n'
f'1. FIND the WHOLE bullet line including the trailing comment. '
f'REPLACE keeps the bullet structure and the comment exactly; '
f'only the value text after the label changes.\n'
f'2. Text bullets — `- **label:** value <!--field=NAME-->` — '
f'replace `value`.\n'
f'3. Choice bullets — `- **label** [opt1 / opt2 / opt3]: value <!--field=NAME-->` — '
f'replace `value` with one of the listed options verbatim.\n'
f'4. Checkbox bullets — `- [ ] **label** <!--field=NAME-->` — '
f'toggle `[ ]` ↔ `[x]`.\n'
f'5. NEVER invent values. If the user gives no value, ASK. Never '
f'write fake names, addresses, emails, or "NaN"/"N/A"/"TBD".\n'
f'6. NEVER edit the front-matter `<!-- pdf_form_source ... -->` '
f'or the `## Page N` section headers.\n'
f'7. NEVER touch signature fields (type=signature) — the user '
f'signs those by clicking on the rendered PDF.\n'
f'8. Bulk requests are scoped by field type. "All included" means '
f'every choice field with that option. Do NOT touch text fields.\n'
f'9. The user has an Export button — do NOT try to export.'
)
else:
_doc_raw = active_document.current_content or ""
_doc_numbered = "\n".join(
f"{_i}\t{_ln}" for _i, _ln in enumerate(_doc_raw.split("\n"), 1)
)
doc_ctx = (
f'ACTIVE DOCUMENT (open in the editor — the user is looking at it right now)\n'
f'Title: "{active_document.title}" | Language: {active_document.language or "text"}\n'
f'Below is the full text. Each line is prefixed with its line number and a TAB, '
f'purely so you can locate references like "[Doc edit: L25]" — the number and tab '
f'are NOT part of the document.\n'
f'```\n{_doc_numbered}\n```\n'
f'You ALREADY HAVE this document — it is right above. Do NOT ask the user to paste '
f'it, and do NOT use read_file, bash, cat, or any tool to fetch it: it lives in the '
f'editor, NOT on disk, so those attempts will fail. Every request is about THIS '
f'document unless the user clearly says otherwise.\n'
f'A "[Doc edit: L25]" prefix means the user is pointing at that line — use the '
f'numbers above to find the text they mean.\n'
f'To edit: use edit_document with <<<FIND>>>...<<<REPLACE>>>...<<<END>>>. The FIND '
f'text must match the document EXACTLY and must NOT include the leading line-number '
f'or tab (those are reference-only). To rewrite entirely: update_document.'
)
_doc_message = untrusted_context_message("active editor document", doc_ctx)
_doc_message["_protected"] = True
# Auto-detect suggestion mode
_last_user_msg = ""
for msg in reversed(messages):
if msg.get("role") == "user":
_content = msg.get("content", "")
if isinstance(_content, list):
_content = " ".join(b.get("text", "") for b in _content if isinstance(b, dict))
_last_user_msg = _content.lower()
break
_suggest_keywords = ["suggest", "review", "improve", "feedback", "critique", "proofread", "check my", "look over"]
if any(kw in _last_user_msg for kw in _suggest_keywords):
_doc_message["content"] += (
"\n\nTrusted instruction for this turn: the user appears to want "
"suggestions for the active editor document. Use suggest_document "
"with <<<FIND>>>...<<<SUGGEST>>>...<<<REASON>>>...<<<END>>> blocks."
)
else:
set_active_document(None)
# Inject writing style for any email writing path. This is deliberately
# broader than read/list: models may compose via send_email, reply_to_email,
# or ui_control open_email_reply after the first tool round.
_inject_style = False
_EMAIL_TOOL_HINTS = {
"list_email_accounts", "send_email", "reply_to_email", "list_emails", "read_email",
"bulk_email", "archive_email", "delete_email", "mark_email_read",
"resolve_contact", "ui_control",
"mcp__email__list_email_accounts",
"mcp__email__send_email", "mcp__email__reply_to_email",
"mcp__email__list_emails", "mcp__email__read_email",
"mcp__email__bulk_email", "mcp__email__archive_email",
"mcp__email__delete_email", "mcp__email__mark_email_read",
}
if active_document and active_document.language == "email":
_inject_style = True
elif relevant_tools and (_EMAIL_TOOL_HINTS & set(relevant_tools)):
# Avoid adding email style for unrelated UI-only requests unless the
# user's words are email-ish.
_last_user_text = ""
for _msg in reversed(messages):
if _msg.get("role") == "user":
_c = _msg.get("content", "")
if isinstance(_c, list):
_c = " ".join(b.get("text", "") for b in _c if isinstance(b, dict))
_last_user_text = str(_c).lower()
break
_inject_style = any(tok in _last_user_text for tok in ("email", "mail", "reply", "send", "inbox"))
if _inject_style:
try:
from src.settings import load_settings as _load_settings
_style = (_load_settings().get("email_writing_style", "") or "").strip()
if _style:
agent_prompt += (
"\n\n📧 EMAIL WRITING STYLE AND IDENTITY — FOLLOW FOR ANY EMAIL DRAFT OR SEND:\n"
f"{_style}\n\n"
"Hard identity rule: write as the user/mailbox owner only. Do not sign as, speak as, "
"or imply you are the recipient, original sender, quoted sender, spouse, assistant, "
"company, or any other third party. If a signature is needed, use only the name/signature "
"from the saved writing style. Never copy a name from the quoted thread into the sign-off.\n"
"Mechanical style rules: never use em dash/en dash; use --. Never use curly apostrophes. "
"For English emails, default to Hi [Name] or Hiya from the saved style rather than Hey. "
"If the saved style specifies Best/newline/name, use that sign-off when a sign-off is natural."
)
except Exception:
pass
# When creating email documents, instruct the AI on the format
if relevant_tools and (_EMAIL_TOOL_HINTS & set(relevant_tools)):
agent_prompt += (
'\n\n📧 EMAIL DOCUMENT FORMAT: When drafting email replies, use create_document with language="email". '
'The content format is:\n'
'To: recipient@example.com\n'
'Subject: Re: Original subject\n'
'In-Reply-To: <original-message-id>\n'
'References: <original-message-id>\n'
'---\n'
'Body text here...\n\n'
'The user can then edit and click Send or Draft in the editor. For an already-open email draft, '
'edit the current document instead of creating another one.'
)
# Inject relevant skills based on the user's last message. The
# SkillsManager does a Jaccard token-match over published skills'
# name + description + when_to_use + procedure, returning the top
# few. If the teacher wrote a procedure for "open my X chat" last
# time the student failed, this is where the student finds it
# before deciding which tool to call.
try:
last_user = _extract_last_user_message(messages)
# Respect the user's skills-enabled toggle (mirrors memory_enabled).
# When off, don't inject relevant skills into the prompt.
_skills_on = True
_prefs = {}
try:
from routes.prefs_routes import _load_for_user as _load_prefs
_prefs = _load_prefs(owner) or {}
_skills_on = _prefs.get("skills_enabled", True)
except Exception:
pass
if last_user and _skills_on:
from services.memory.skills import SkillsManager
from src.constants import DATA_DIR
sm = SkillsManager(DATA_DIR)
# Brain → Skills settings → "Auto-approve skills" toggle +
# confidence threshold. Approve OFF → published-only (no draft
# passes). Approve ON → drafts at/above the chosen confidence
# (0 = "All"). Falls back to the global default setting.
if not _prefs.get("auto_approve_skills", True):
_skill_min_conf = 2.0 # nothing draft clears it → published only
else:
try:
_skill_min_conf = float(_prefs.get(
"skill_min_confidence",
get_setting("skill_autosave_min_confidence", 0.85)))
except (TypeError, ValueError):
_skill_min_conf = 0.85
try:
_skill_max_injected = int(_prefs.get(
"skill_max_injected",
get_setting("skill_max_injected", 3)))
except (TypeError, ValueError):
_skill_max_injected = 3
_skill_max_injected = max(0, min(12, _skill_max_injected))
relevant_skills = sm.get_relevant_skills(
last_user,
skills=sm.load(owner=owner),
threshold=0.25,
max_items=_skill_max_injected,
min_confidence=_skill_min_conf,
) if _skill_max_injected > 0 else []
if relevant_skills:
# Bump the "uses" counter on every skill we actually surface
# to the agent — otherwise every skill shows "0 times" no
# matter how often it's been matched and applied.
for _sk in relevant_skills:
try:
sm.record_use(_sk.get('name', ''))
except Exception:
pass
lines = ["", "## Relevant skills for this request",
"These skills are matched to your current request. Each is a "
"procedure proven to work. Follow them step by step. To see "
"the full SKILL.md (more detail, pitfalls, verification "
"steps), call `manage_skills` with action='view' and the "
"skill name."]
for sk in relevant_skills:
src_tag = ""
if sk.get("source") == "teacher-escalation":
tm = sk.get("teacher_model") or "teacher"
src_tag = f" _(learned from {tm})_"
lines.append(f"\n### {sk.get('name','?')}{src_tag}")
if sk.get("description"):
lines.append(sk["description"])
if sk.get("when_to_use"):
lines.append(f"_When to use:_ {sk['when_to_use']}")
proc = sk.get("procedure") or []
if proc:
lines.append("Procedure:")
for i, step in enumerate(proc, 1):
lines.append(f" {i}. {step}")
pitfalls = sk.get("pitfalls") or []
if pitfalls:
lines.append("Pitfalls: " + "; ".join(pitfalls))
agent_prompt += "\n".join(lines)
except Exception as _sk_err:
logger.debug(f"skill injection failed (non-fatal): {_sk_err}")
agent_msg = {"role": "system", "content": agent_prompt}
insert_idx = 0
for i, msg in enumerate(messages):
if msg.get("role") == "system":
insert_idx = i + 1
else:
break
messages = messages[:insert_idx] + [agent_msg] + messages[insert_idx:]
# Merge consecutive system messages — but skip _protected doc messages
merged = []
for msg in messages:
if (msg.get("role") == "system"
and not msg.get("_protected")
and merged and merged[-1].get("role") == "system"
and not merged[-1].get("_protected")):
merged[-1] = {
"role": "system",
"content": merged[-1]["content"] + "\n\n" + msg["content"],
}
else:
merged.append(msg)
# Insert the document message right before the last user message so it's
# close to the user's request and survives context trimming independently.
if _doc_message:
last_user_idx = len(merged) - 1
for i in range(len(merged) - 1, -1, -1):
if merged[i].get("role") == "user":
last_user_idx = i
break
merged.insert(last_user_idx, _doc_message)
return merged, mcp_schemas
_ADMIN_TOOLS = {
"manage_session", "manage_skills", "manage_tasks",
"manage_endpoints", "manage_mcp", "manage_webhooks", "manage_tokens",
"manage_documents", "manage_settings", "create_session", "list_sessions",
"send_to_session", "pipeline", "ask_teacher", "list_models",
}
def _build_base_prompt(
disabled_tools,
mcp_mgr,
needs_admin,
relevant_tools=None,
mcp_disabled_map=None,
compact: bool = False,
):
"""Build the agent prompt with only relevant tools included.
If relevant_tools is provided (from RAG retrieval), only those tools
are shown with full descriptions. Otherwise falls back to full prompt.
"""
from src.tool_index import ALWAYS_AVAILABLE
disabled = set(disabled_tools or [])
if not get_setting("image_gen_enabled", True):
disabled.add("generate_image")
if relevant_tools is not None:
# RAG mode: include always-available + retrieved + admin (if needed)
tool_names = set(ALWAYS_AVAILABLE) | set(relevant_tools)
if needs_admin:
tool_names |= _ADMIN_TOOLS
agent_prompt = _assemble_prompt(tool_names, disabled, compact=compact)
else:
# Fallback: full prompt (RAG unavailable)
agent_prompt = AGENT_SYSTEM_PROMPT
if not needs_admin:
# At least strip the management section
mgmt_tools = set(TOOL_SECTIONS.keys()) - set(ALWAYS_AVAILABLE) - {
"generate_image", "suggest_document",
"chat_with_model", "ask_teacher", "list_models",
}
agent_prompt = _assemble_prompt(
set(TOOL_SECTIONS.keys()) - mgmt_tools, disabled, compact=compact
)
elif compact:
agent_prompt = _assemble_prompt(set(TOOL_SECTIONS.keys()), disabled, compact=True)
# Inject the Level-0 skill index — one line per skill so the agent
# knows what canonical procedures exist. Includes published skills
# plus teacher-escalation drafts (auto-written when the student
# fails a task; appear here on the very next turn so the student
# can apply them immediately). Full SKILL.md fetched on demand via
# `manage_skills view name=...`. Gating mirrors index_for: platform
# + requires_toolsets + fallback_for_toolsets.
try:
from services.memory.skills import SkillsManager
from src.constants import DATA_DIR
_sm = SkillsManager(DATA_DIR)
active_tools = list(set(TOOL_SECTIONS.keys()) - set(disabled or []))
skill_idx = _sm.index_for(owner=None, active_toolsets=active_tools)
if skill_idx:
lines = ["## Available skills",
"Procedures the assistant should consult before doing domain work. "
"Fetch the full procedure with `manage_skills` action=view name=<name> "
"when one looks relevant. Entries tagged `(draft)` were written by the "
"teacher-escalation loop after a prior failure — treat them as authoritative "
"guidance; if you follow one and it works, that's a good signal the procedure "
"is correct."]
by_cat: dict[str, list] = {}
for s in skill_idx:
by_cat.setdefault(s["category"], []).append(s)
for cat in sorted(by_cat):
lines.append(f"\n**{cat}**")
for s in by_cat[cat]:
badge = " *(draft)*" if s.get("status") == "draft" else ""
lines.append(f"- `{s['name']}` — {s['description']}{badge}")
agent_prompt += "\n\n" + "\n".join(lines)
except Exception as _e:
# Skill index is a soft enhancement — never fail prompt assembly on it.
logger.debug(f"Skill-index injection skipped: {_e}")
# Inject integration descriptions
from src.integrations import get_integrations_prompt
integ_prompt = get_integrations_prompt()
if integ_prompt:
agent_prompt += "\n\n" + integ_prompt
# Inject MCP tool descriptions
if mcp_mgr:
mcp_desc = mcp_mgr.get_tool_descriptions_for_prompt(mcp_disabled_map or {})
if mcp_desc:
agent_prompt += mcp_desc
return agent_prompt
def _resolve_tool_blocks(round_response: str, native_tool_calls: list, round_num: int):
"""Choose native function calls or fenced code block parsing. Returns (tool_blocks, used_native)."""
used_native = False
if native_tool_calls:
tool_blocks = []
for tc in native_tool_calls:
tc_name = tc.get("name", "")
tc_args = tc.get("arguments", "{}")
block = function_call_to_tool_block(tc_name, tc_args)
if block:
tool_blocks.append(block)
logger.info(f" -> converted: {tc_name} -> {block.tool_type}")
else:
logger.warning(f" -> FAILED to convert native call: {tc_name} args={tc_args[:200]}")
if tool_blocks:
used_native = True
if not used_native:
tool_blocks = parse_tool_blocks(round_response)
if tool_blocks:
logger.info(f"Agent round {round_num}: {len(tool_blocks)} fenced tool block(s) detected")
resp_preview = round_response[:200].replace('\n', '\\n') if round_response else "(empty)"
logger.info(f"Agent round {round_num} summary: {len(round_response)} chars, "
f"{len(native_tool_calls)} native calls, "
f"{len(tool_blocks)} tool blocks. Preview: {resp_preview}")
return tool_blocks, used_native
def _append_tool_results(
messages: List[Dict],
round_response: str,
native_tool_calls: list,
tool_results: list,
tool_result_texts: list,
used_native: bool,
round_num: int,
round_reasoning: str = "",
):
"""Append tool execution results back into the message history for the next LLM round.
`round_reasoning` (DeepSeek / vLLM reasoning-parser deltas) is echoed
back via `reasoning_content` on the assistant message — DeepSeek's API
rejects follow-up requests in thinking mode that don't include the
prior reasoning. Other vendors ignore the extra field.
"""
if used_native and native_tool_calls:
assistant_msg = {"role": "assistant"}
assistant_msg["content"] = round_response if round_response.strip() else ""
if round_reasoning:
assistant_msg["reasoning_content"] = round_reasoning
assistant_msg["tool_calls"] = [
{
"id": tc.get("id", f"call_{round_num}_{j}"),
"type": "function",
"function": {
"name": tc.get("name", ""),
"arguments": tc.get("arguments", "{}"),
},
}
for j, tc in enumerate(native_tool_calls)
]
messages.append(assistant_msg)
for j, tc in enumerate(native_tool_calls):
result_text = tool_result_texts[j] if j < len(tool_result_texts) else ""
messages.append({
"role": "tool",
"tool_call_id": tc.get("id", f"call_{round_num}_{j}"),
"content": result_text,
})
else:
tool_output_text = "\n\n".join(tool_results)
msg = {"role": "assistant", "content": round_response}
if round_reasoning:
msg["reasoning_content"] = round_reasoning
messages.append(msg)
messages.append(
{"role": "user", "content": f"[Tool execution results]\n\n{tool_output_text}"}
)
def _compute_final_metrics(
messages: List[Dict],
full_response: str,
total_duration: float,
time_to_first_token,
context_length: int,
real_input_tokens: int,
real_output_tokens: int,
has_real_usage: bool,
tool_events: list,
round_texts: list,
model: str = "",
last_round_input_tokens: int = 0,
prep_timings: Optional[Dict[str, float]] = None,
) -> dict:
"""Compute token counts, TPS, and build the final metrics dict."""
if has_real_usage:
input_tokens = real_input_tokens
output_tokens = real_output_tokens
else:
input_content = ""
for msg in messages:
if isinstance(msg.get("content"), str):
input_content += msg["content"] + "\n"
input_tokens = len(input_content) // 4
output_tokens = len(full_response) // 4
tps = output_tokens / total_duration if total_duration > 0 else 0
# Use last round's input tokens for context % (peak usage) when available
ctx_tokens = last_round_input_tokens if last_round_input_tokens > 0 else input_tokens
ctx_pct = min(round((ctx_tokens / context_length) * 100, 1), 100.0) if context_length else 0
metrics = {
"response_time": round(total_duration, 2),
"time_to_first_token": round(time_to_first_token, 2) if time_to_first_token else 0,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"tokens_per_second": round(tps, 2),
"total_tokens": input_tokens + output_tokens,
"context_length": context_length,
"context_percent": ctx_pct,
"usage_source": "real" if has_real_usage else "estimated",
"model": model,
}
if prep_timings:
prep_total = round(sum(prep_timings.values()), 3)
metrics["agent_prep_time"] = prep_total
metrics["agent_model_wait_time"] = round(max((time_to_first_token or 0) - prep_total, 0), 3)
metrics["agent_prep_breakdown"] = {
key: round(value, 3) for key, value in prep_timings.items()
}
if tool_events:
metrics["tool_events"] = tool_events
metrics["round_texts"] = round_texts
return metrics
# ── Completion verifier ──
# Tools whose effects produce a checkable artifact. A turn that used one of
# these is "effectful" and worth an independent completion check; pure
# read-only / Q&A turns are not.
_VERIFIER_EFFECTFUL_TOOLS = {
"create_document", "update_document", "edit_document",
"bash", "python", "write_file",
}
_VERIFIER_MAX_ROUNDS = 2 # cap re-verify cycles per turn — never loop forever
def _build_actions_snapshot(tool_events: list, limit: int = 8000) -> str:
"""Compact record of what the agent actually did this turn, for the
verifier to judge against. One block per tool execution: the command and
a head of its output."""
parts = []
for ev in tool_events:
tool = ev.get("tool", "?")
cmd = (ev.get("command") or "").strip()
out = (ev.get("output") or "").strip()
rc = ev.get("exit_code")
head = f"[{tool}] {cmd}" if cmd else f"[{tool}]"
rc_s = f" (exit {rc})" if rc not in (None, 0) else ""
body = (out[:1200] + "") if len(out) > 1200 else (out or "(no output)")
parts.append(f"{head}{rc_s}\n-> {body}")
snap = "\n\n".join(parts)
return snap[:limit] if len(snap) > limit else snap
async def _run_verifier_subagent(
instruction: str, actions_snapshot: str,
*, endpoint_url: str, model: str, headers: dict,
) -> list:
"""Fresh-context completion verifier. A second model instance with NO
shared history reads the user's request + a record of what the agent did
and judges whether the task is genuinely complete. The independent context
is the whole point: a model checking its own work rationalizes; one that
didn't do the work reads it cold. Returns a list of failure reasons
(empty = pass, or silently empty on any error so it can't block a valid
completion)."""
from src.llm_core import llm_call_async
prompt = (
"You are an independent verifier. Another assistant just claimed the "
"following task is complete. Using ONLY the request and the record of "
"what it actually did, decide whether that claim is correct. Be strict: "
"only say SUCCESS if the work genuinely satisfies the request.\n\n"
f"<user_request>\n{(instruction or '')[:4000]}\n</user_request>\n\n"
f"<actions_taken>\n{actions_snapshot[:8000]}\n</actions_taken>\n\n"
"<checklist>\n"
"1. Every concrete deliverable the request asked for was actually produced\n"
"2. Outputs/edits match what was asked — nothing missing, no extra or unrequested changes\n"
"3. Tool results show success, not errors or empty output that got ignored\n"
"4. Anything the request said to leave alone was left unchanged\n"
"</checklist>\n\n"
"Reason briefly (2-3 sentences max). Then output EXACTLY one of:\n"
" VERIFICATION: SUCCESS\n"
" VERIFICATION: FAIL: <one short sentence per issue, semicolon-separated>\n"
"Output nothing after the VERIFICATION line."
)
try:
raw = await llm_call_async(
url=endpoint_url, model=model,
messages=[{"role": "user", "content": prompt}],
headers=headers, temperature=0.0, max_tokens=600, timeout=60,
)
except Exception as e:
logger.warning(f"[agent] verifier subagent failed: {e}")
return []
raw = re.sub(r"<think>.*?</think>", "", raw or "", flags=re.DOTALL | re.IGNORECASE)
last_v = None
for line in raw.splitlines():
if "VERIFICATION:" in line:
last_v = line.strip()
if not last_v or "VERIFICATION: FAIL:" not in last_v:
return []
reasons = last_v.split("VERIFICATION: FAIL:", 1)[1].strip()
return [r.strip() for r in reasons.split(";") if r.strip()]
async def stream_agent_loop(
endpoint_url: str,
model: str,
messages: List[Dict],
headers: Optional[Dict] = None,
temperature: float = 0.3,
max_tokens: int = 4096,
prompt_type: Optional[str] = None,
max_rounds: int = MAX_AGENT_ROUNDS,
max_tool_calls: int = 0,
context_length: int = 0,
active_document=None,
session_id: Optional[str] = None,
disabled_tools: Optional[Set[str]] = None,
owner: Optional[str] = None,
relevant_tools: Optional[Set[str]] = None,
fallbacks: Optional[List[tuple]] = None,
_is_teacher_run: bool = False,
) -> AsyncGenerator[str, None]:
"""Streaming agent loop generator.
Yields SSE events:
- data: {"delta": "text"} (text chunks)
- data: {"type": "tool_start", "tool": "...", ...} (before execution)
- data: {"type": "tool_output", "tool": "...", ...} (after execution)
- data: {"type": "agent_step", "round": N} (next round)
- data: {"type": "metrics", "data": {...}} (final metrics)
- data: [DONE] (end)
"""
mcp_mgr = get_mcp_manager()
prep_timings: Dict[str, float] = {}
disabled_tools = set(disabled_tools or [])
public_blocked_tools = blocked_tools_for_owner(owner)
if public_blocked_tools:
disabled_tools.update(public_blocked_tools)
# MCP tools are namespaced dynamically, so hide all MCP schemas for
# public/non-admin users rather than trying to enumerate every tool.
mcp_mgr = None
_t0 = time.time()
_needs_admin = _detect_admin_intent(messages)
_last_user = _extract_last_user_message(messages)
# Tool retrieval keys on recent conversation context (last few user turns),
# not just the latest message, so short follow-ups don't drop just-used tools.
_retrieval_query = _recent_context_for_retrieval(messages) or _last_user
_mcp_disabled_map = _load_mcp_disabled_map() if mcp_mgr else {}
prep_timings["request_setup"] = time.time() - _t0
# RAG-based tool selection: retrieve relevant tools for this query.
# If caller provided a pre-computed set (e.g. task_scheduler), use that.
_relevant_tools = relevant_tools
_t1 = time.time()
if _relevant_tools:
logger.info(f"[tool-rag] Using caller-provided relevant_tools ({len(_relevant_tools)} tools)")
if not _relevant_tools:
try:
from src.tool_index import get_tool_index, ALWAYS_AVAILABLE
tool_idx = get_tool_index()
if tool_idx:
if mcp_mgr:
try:
await asyncio.wait_for(
asyncio.to_thread(tool_idx.index_mcp_tools, mcp_mgr, _mcp_disabled_map),
timeout=_TOOL_SELECTION_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
logger.warning(
"[tool-rag] MCP tool indexing exceeded %.1fs; continuing without reindex",
_TOOL_SELECTION_TIMEOUT_SECONDS,
)
if _retrieval_query:
try:
_relevant_tools = await asyncio.wait_for(
asyncio.to_thread(tool_idx.get_tools_for_query, _retrieval_query, 8),
timeout=_TOOL_SELECTION_TIMEOUT_SECONDS,
)
logger.info(f"[tool-rag] Retrieved tools for query: {sorted(_relevant_tools - ALWAYS_AVAILABLE)}")
except asyncio.TimeoutError:
logger.warning(
"[tool-rag] Retrieval exceeded %.1fs; falling back to always-available tools",
_TOOL_SELECTION_TIMEOUT_SECONDS,
)
_relevant_tools = set(ALWAYS_AVAILABLE)
except Exception as e:
logger.warning(f"[tool-rag] Retrieval failed, using keyword fallback: {e}")
_relevant_tools = None
# Fallback: if RAG unavailable, use keyword-based tool selection
# instead of sending ALL tools (which overwhelms the model).
if not _relevant_tools and _retrieval_query:
from src.tool_index import ALWAYS_AVAILABLE, ToolIndex
_relevant_tools = set(ALWAYS_AVAILABLE)
ql = _retrieval_query.lower()
for keywords, tools in ToolIndex._KEYWORD_HINTS.items():
if any(kw in ql for kw in keywords):
_relevant_tools.update(tools)
# Always include core document/memory tools
_relevant_tools.update({"create_document", "manage_memory", "manage_notes"})
logger.info(f"[tool-rag] Keyword fallback selected: {sorted(_relevant_tools - ALWAYS_AVAILABLE)}")
# If a document is open the model needs the editing tools available
# regardless of which selection path (RAG, keyword, caller-provided) ran
# or what keywords were in the latest user message.
if _relevant_tools is not None and active_document is not None:
_relevant_tools.update({"edit_document", "update_document", "suggest_document"})
prep_timings["tool_selection"] = time.time() - _t1
_t2 = time.time()
# Hosted-API match by URL, OR the model name looks like a recent model
# known to follow OpenAI-style function calling (DeepSeek, GPT*, Claude,
# Gemini, Qwen3+, Mixtral, Llama 3.1+). Caught the DeepSeek-via-local-
# vLLM case where endpoint_url doesn't include a vendor host.
_model_lc = (model or "").lower()
# Step 1: per-endpoint override (set at registration time from the
# serve command — `--enable-auto-tool-choice` flips it on. UI can
# also toggle per endpoint). NULL = unknown, fall through to the
# keyword heuristic + host check.
_endpoint_supports: Optional[bool] = None
try:
from core.database import SessionLocal as _SL, ModelEndpoint as _ME
_db = _SL()
try:
_ep = _db.query(_ME).filter(_ME.base_url == endpoint_url).first()
if not _ep and endpoint_url:
_u = endpoint_url.rstrip("/")
_ep = _db.query(_ME).filter(_ME.base_url == _u).first() or \
_db.query(_ME).filter(_ME.base_url == _u + "/").first()
if _ep is not None:
_endpoint_supports = _ep.supports_tools
finally:
_db.close()
except Exception as _e:
logger.debug(f"endpoint supports_tools lookup failed: {_e}")
_model_supports_tools = any(kw in _model_lc for kw in (
"deepseek", "gpt-4", "gpt-5", "gpt-o", "claude", "gemini",
"qwen3", "qwen2.5", "mixtral", "mistral", "llama-3.1", "llama-3.2",
"llama-3.3", "llama-4",
# Local-served models that follow OpenAI-style function calling
# via vLLM's `--enable-auto-tool-choice`. Belt-and-suspenders
# with the per-endpoint flag above.
"minimax", "kimi", "yi-", "phi-3", "phi-4", "command-r",
"glm-4", "internlm", "hermes",
))
if _endpoint_supports is True:
_is_api_model = True
elif _endpoint_supports is False:
_is_api_model = False
else:
_is_api_model = any(h in endpoint_url for h in _API_HOSTS) or _model_supports_tools
messages, mcp_schemas = _build_system_prompt(
messages, model, active_document, mcp_mgr, disabled_tools,
needs_admin=_needs_admin, relevant_tools=_relevant_tools,
mcp_disabled_map=_mcp_disabled_map,
compact=_is_api_model,
owner=owner,
)
prep_timings["prompt_build"] = time.time() - _t2
_t3 = time.time()
try:
from src.context_compactor import trim_for_context
soft_budget = int(get_setting("agent_input_token_budget", 6000) or 0)
if soft_budget > 0:
before_trim_tokens = estimate_tokens(messages)
reserve_tokens = min(max(max_tokens or 1024, 512), 2048)
effective_budget = min(context_length or soft_budget, soft_budget)
trimmed_messages = trim_for_context(
messages,
effective_budget,
reserve_tokens=reserve_tokens,
)
after_trim_tokens = estimate_tokens(trimmed_messages)
if after_trim_tokens < before_trim_tokens:
logger.info(
"[agent] soft-trimmed context: %s -> %s tokens (budget=%s, reserve=%s)",
before_trim_tokens,
after_trim_tokens,
effective_budget,
reserve_tokens,
)
messages = trimmed_messages
except Exception as e:
logger.warning("[agent] Soft context trim skipped: %s", e)
prep_timings["context_trim"] = time.time() - _t3
# Strip internal metadata keys before sending to the LLM API
messages = [{k: v for k, v in msg.items() if k != "_protected"} for msg in messages]
yield f"data: {json.dumps({'type': 'agent_prep', 'data': {k: round(v, 3) for k, v in prep_timings.items()}})}\n\n"
full_response = ""
total_start = time.time()
time_to_first_token = None
first_token_received = False
tool_events = [] # Persist tool executions for history reload
round_texts = [] # Cleaned text per round for history reload
# Completion-verifier state (mechanism 3a). _effectful_used flips on when
# a tool that produces a checkable artifact runs; the verifier only fires
# on such turns and at most _VERIFIER_MAX_ROUNDS times.
_effectful_used = False
_verifier_rounds = 0
_verifier_instruction = _extract_last_user_message(messages)
real_input_tokens = 0 # Accumulated real usage from API
real_output_tokens = 0
last_round_input_tokens = 0 # Last round's input tokens (for context % peak)
has_real_usage = False
total_tool_calls = 0 # for budget enforcement
# Loop-breaker state. Small models (e.g. deepseek-v4-flash) can get
# stuck firing the same tool call over and over with no text — burns
# all 20 rounds, looks like the chat "died". Track recent call
# signatures + consecutive no-text tool rounds to bail early.
_recent_call_sigs = collections.deque(maxlen=6)
_stuck_rounds = 0
_tool_type_counts: collections.Counter = collections.Counter()
_THINK_RE = re.compile(r'<think>.*?</think>', re.DOTALL | re.IGNORECASE)
_force_answer = False # set by loop-breaker → next round runs with NO tools
# Document streaming state (persists across rounds)
_doc_acc = "" # accumulated tool-call JSON arguments
_doc_opened = False # whether doc_stream_open was sent
_doc_last_len = 0 # last content length sent
for round_num in range(1, max_rounds + 1):
round_response = ""
round_reasoning = "" # reasoning_content deltas (DeepSeek-thinking, vLLM --reasoning-parser)
native_tool_calls = [] # populated if model uses function calling
# Reset doc streaming state per round
_doc_acc = ""
_doc_opened = False
_doc_last_len = 0
_doc_fence_offset = 0 # offset into round_response for text-fence content
# Cursor for the multi-block scanner — when a `create_document`
# fenced block closes we advance this so the next iteration can
# detect a SUBSEQUENT block in the same round.
_doc_scan_from = 0
# Merge native tool schemas with MCP tool schemas, filtering out
# Only send function schemas for API models (OpenAI, Anthropic, etc.).
# Local models use fenced code blocks or <tool_code> — schemas add overhead.
if _force_answer:
# Loop-breaker decided the model has enough info but keeps
# calling tools. Send NO tools this round so it's forced to
# write the answer instead of flailing further.
all_tool_schemas = []
elif _is_api_model:
# Filter schemas by RAG-selected tools (if available)
if _relevant_tools:
base_schemas = [
s for s in FUNCTION_TOOL_SCHEMAS
if s.get("function", {}).get("name") in _relevant_tools
]
_mcp_filtered = [
s for s in mcp_schemas
if s.get("function", {}).get("name") in _relevant_tools
]
all_tool_schemas = base_schemas + _mcp_filtered
else:
base_schemas = FUNCTION_TOOL_SCHEMAS if _needs_admin else [
s for s in FUNCTION_TOOL_SCHEMAS
if s.get("function", {}).get("name") not in _ADMIN_SCHEMA_NAMES
]
all_tool_schemas = base_schemas + mcp_schemas
if disabled_tools:
all_tool_schemas = [
t for t in all_tool_schemas
if t.get("function", {}).get("name") not in disabled_tools
and t.get("name") not in disabled_tools
]
else:
# Local: only MCP schemas when message suggests MCP tool usage
_last_content = _last_user.lower()
_wants_mcp = any(kw in _last_content for kw in _MCP_KEYWORDS)
all_tool_schemas = mcp_schemas if (_wants_mcp and mcp_schemas) else []
agent_stream_timeout = int(get_setting("agent_stream_timeout_seconds", 300) or 300)
_tool_names_sent = [t.get("function", {}).get("name") for t in (all_tool_schemas or []) if t.get("function")]
logger.info(f"[agent-debug] round={round_num} model={model} _is_api_model={_is_api_model} tools_sent={len(_tool_names_sent)} tool_names={_tool_names_sent[:15]} relevant_tools={sorted(_relevant_tools)[:15] if _relevant_tools else 'ALL'}")
# Primary target + any configured fallback models. stream_llm_with_fallback
# only switches on a pre-content failure, so streamed output is never
# duplicated; the dead-host cooldown keeps repeat primary attempts cheap.
_candidates = [(endpoint_url, model, headers)] + list(fallbacks or [])
# stream_llm enforces a per-read INACTIVITY timeout (httpx read=timeout),
# which kills a wedged/silent endpoint. This wall-clock deadline is the
# complementary cap for the rare stream that trickles bytes forever and
# so never trips the inactivity timeout. Generous — only catches runaway.
_round_deadline = time.time() + max(agent_stream_timeout * 4, 1200)
async for chunk in stream_llm_with_fallback(
_candidates,
messages,
temperature=temperature,
max_tokens=max_tokens,
prompt_type=prompt_type if round_num == 1 else None,
tools=all_tool_schemas if all_tool_schemas else None,
timeout=agent_stream_timeout,
):
if time.time() > _round_deadline:
logger.warning(f"[agent] round {round_num} stream exceeded wall-clock deadline; cutting off")
break
# Forward error events from stream_llm to the frontend
if chunk.startswith("event: error"):
yield chunk
continue
if chunk.startswith("data: ") and not chunk.startswith("data: [DONE]"):
try:
data = json.loads(chunk[6:])
# IMPORTANT: check type-based events BEFORE "delta" key,
# because tool_call_delta also has an "arg_delta" field.
if data.get("type") == "tool_call_delta":
# Stream document content to frontend as AI generates it
logger.debug(f"tool_call_delta: name={data.get('name')}, len(arg_delta)={len(data.get('arg_delta', ''))}")
_doc_acc += data.get("arg_delta", "")
if not _doc_opened:
tm = re.search(r'"title"\s*:\s*"((?:[^"\\]|\\.)*)"', _doc_acc)
if tm:
_doc_opened = True
try:
title = json.loads('"' + tm.group(1) + '"')
except Exception:
title = tm.group(1)
lm = re.search(r'"language"\s*:\s*"((?:[^"\\]|\\.)*)"', _doc_acc)
lang = ""
if lm:
try:
lang = json.loads('"' + lm.group(1) + '"')
except Exception:
lang = lm.group(1)
logger.info(f"Doc streaming: open title={title!r} lang={lang!r}")
yield f'data: {json.dumps({"type": "doc_stream_open", "title": title, "language": lang})}\n\n'
if _doc_opened:
cm = re.search(r'"content"\s*:\s*"', _doc_acc)
if cm:
raw = _doc_acc[cm.end():]
raw = re.sub(r'"\s*\}\s*$', '', raw)
try:
decoded = json.loads('"' + raw + '"')
except Exception:
try:
decoded = json.loads('"' + raw.rstrip('\\') + '"')
except Exception:
decoded = raw.replace('\\n', '\n').replace('\\t', '\t').replace('\\"', '"').replace('\\\\', '\\')
if len(decoded) > _doc_last_len:
_doc_last_len = len(decoded)
yield f'data: {json.dumps({"type": "doc_stream_delta", "content": decoded})}\n\n'
elif data.get("type") == "tool_calls":
native_tool_calls = data.get("calls", [])
logger.info(f"Agent round {round_num}: received {len(native_tool_calls)} native tool call(s)")
elif data.get("type") == "usage":
u = data.get("data", {})
round_input = u.get("input_tokens", 0)
real_input_tokens += round_input
real_output_tokens += u.get("output_tokens", 0)
last_round_input_tokens = round_input
has_real_usage = True
elif "delta" in data:
if not first_token_received:
time_to_first_token = time.time() - total_start
first_token_received = True
# Keep reasoning deltas in a separate accumulator so
# we can echo them back via `reasoning_content` on the
# next request (DeepSeek requires this; harmless for
# other vendors). Regular content still flows into
# round_response unchanged.
if data.get("thinking"):
round_reasoning += data["delta"]
else:
round_response += data["delta"]
full_response += data["delta"]
yield chunk # Stream all rounds
# Detect text-fence doc streaming for rounds 2+
# (round 1 is handled by frontend fence detection + server fenced block path)
if round_num > 1 and not _doc_acc:
_fence_marker = '```create_document\n'
# Open a new block if we're not currently inside one
# and there's an unstreamed marker in the response.
# The marker search starts at the byte after the
# last block's closing fence so the SECOND
# `create_document` block in the same round gets
# detected (previously only the first one was
# streamed and the rest were silently dropped).
if not _doc_opened and _fence_marker in round_response[_doc_scan_from:]:
_fi = round_response.index(_fence_marker, _doc_scan_from)
_fa = round_response[_fi + len(_fence_marker):]
_fl = _fa.split('\n')
if _fl and _fl[0].strip():
_doc_opened = True
_ft = _fl[0].strip()
_kl = {'python','py','javascript','js','typescript','ts','html','css','json','yaml','bash','sql','rust','go','java','c','cpp','markdown','text'}
_flang = _fl[1].strip() if len(_fl) > 1 and _fl[1].strip().lower() in _kl else ''
_doc_fence_offset = _fi + len(_fence_marker) + len(_fl[0]) + 1
if _flang:
_doc_fence_offset += len(_fl[1]) + 1
_doc_last_len = 0
yield f'data: {json.dumps({"type": "doc_stream_open", "title": _ft, "language": _flang})}\n\n'
if _doc_opened:
_rc = round_response[_doc_fence_offset:]
_ci = _rc.find('\n```')
if _ci >= 0:
_rc = _rc[:_ci]
if len(_rc) > _doc_last_len:
_doc_last_len = len(_rc)
yield f'data: {json.dumps({"type": "doc_stream_delta", "content": _rc})}\n\n'
# If the closing fence has arrived, finalise
# this block and arm detection of the NEXT
# one. The model can emit multiple
# `create_document` blocks in a single round.
if _ci >= 0:
_doc_opened = False
_doc_scan_from = _doc_fence_offset + _ci + len('\n```')
_doc_fence_offset = 0
_doc_last_len = 0
elif data.get("error"):
err_msg = data.get("error", "unknown")
logger.error(f"Agent round {round_num}: stream error: {err_msg}")
yield f'data: {json.dumps({"delta": chr(10) + chr(10) + "*[Stream error: " + str(err_msg) + "]*"})}\n\n'
except json.JSONDecodeError:
if round_num == 1:
yield chunk
elif chunk.startswith("event: "):
# Forward error events to frontend as visible text
yield chunk
# Intercept [DONE] — don't forward until all rounds finish
tool_blocks, used_native = _resolve_tool_blocks(round_response, native_tool_calls, round_num)
# Force-answer round: we told the model to STOP calling tools and
# answer. If it ignored that and emitted a (possibly DSML) tool
# call anyway, discard it — don't execute, don't re-loop. Keep
# only the prose; if there's none, emit a graceful fallback.
if _force_answer:
if tool_blocks:
logger.info(f"[agent] force-answer round {round_num}: discarding {len(tool_blocks)} ignored tool call(s)")
tool_blocks = []
if not _THINK_RE.sub("", strip_tool_blocks(round_response)).strip():
# The model burned its budget gathering data but never wrote a
# final answer (common with weaker models on multi-source
# briefings). Salvage it: one blunt non-streaming synthesis call
# over the full conversation (which already holds every tool
# result) before falling back to the canned apology.
_synth = ""
try:
from src.llm_core import llm_call_async
_synth_messages = list(messages) + [{
"role": "user",
"content": (
"Using ONLY the information already gathered above, write "
"the final answer for the user now. Do NOT call any tools, "
"do NOT explain your reasoning — output the finished response "
"directly. If some data couldn't be fetched, just work with "
"what you have and note what's missing in one short line."
),
}]
_raw = await llm_call_async(
url=endpoint_url, model=model, messages=_synth_messages,
headers=headers, temperature=0.3, max_tokens=max_tokens, timeout=60,
)
_synth = _THINK_RE.sub("", strip_tool_blocks(_raw or "")).strip()
except Exception as _e:
logger.warning(f"[agent] grace synthesis failed: {_e}")
if _synth:
yield f'data: {json.dumps({"delta": _synth})}\n\n'
full_response += _synth
else:
_fb = ("I gathered some search results but couldn't pull a clean "
"answer together. Want me to try a more specific question, "
"or summarize what I did find?")
yield f'data: {json.dumps({"delta": _fb})}\n\n'
full_response += _fb
# ── Fallback: auto-create document if model dumped large code in chat ──
# If no create_document tool was used, check for big code blocks in text
has_doc_tool = any(
b.tool_type in ("create_document", "update_document")
for b in tool_blocks
) or any(
tc.get("name") in ("create_document", "update_document")
for tc in native_tool_calls
)
if not has_doc_tool and session_id and "create_document" not in (disabled_tools or set()):
_code_block_re = re.compile(r'```(\w*)\n([\s\S]*?)```')
for m in _code_block_re.finditer(round_response):
lang_tag = m.group(1).lower()
code_body = m.group(2).strip()
# Skip small blocks and known tool tags
if code_body.count('\n') < 30:
continue
if lang_tag in TOOL_TAGS:
continue # already handled as a tool execution
# Auto-create a document from this code block
lang_map = {"py": "python", "js": "javascript", "ts": "typescript", "": "text"}
doc_lang = lang_map.get(lang_tag, lang_tag or "text")
doc_title = f"Code ({doc_lang})"
tb = ToolBlock("create_document", f"{doc_title}\n{doc_lang}\n{code_body}")
tool_blocks.append(tb)
# Stream the document open event
yield f'data: {json.dumps({"type": "doc_stream_open", "title": doc_title, "language": doc_lang})}\n\n'
yield f'data: {json.dumps({"type": "doc_stream_delta", "content": code_body})}\n\n'
logger.info(f"Auto-created document from {lang_tag} code block ({code_body.count(chr(10))+1} lines)")
break # only auto-create one document per round
# Save cleaned round text for history persistence
# Keep <think> blocks so they render in the thinking section on reload
cleaned_round = strip_tool_blocks(round_response).strip()
round_texts.append(cleaned_round)
if not tool_blocks:
# ── Completion verifier (mechanism 3a) ────────────────────
# The model is finishing. If this was an effectful agentic turn,
# have a fresh-context verifier independently check the work
# before we accept "done". On FAIL, surface the issues and let
# the model fix them (capped, and it must do new effectful work
# to re-trigger). Skipped on force-answer rounds (no tools to
# fix with), pure Q&A, and when the toggle is off.
_claimed_done = bool(_THINK_RE.sub("", cleaned_round).strip())
if (_effectful_used and not _force_answer
and _claimed_done
and _verifier_rounds < _VERIFIER_MAX_ROUNDS
# Default OFF: on weak local models the verifier can't judge
# from the action-snapshot (no doc body), so it false-rejects
# ("content not shown") and forces a costly extra round every
# effectful turn. Opt-in via setting for strong models.
and get_setting("agent_verifier_subagent", False)):
# Brief "working" indicator while the verifier runs.
yield f'data: {json.dumps({"type": "agent_step", "round": round_num})}\n\n'
_vfail = await _run_verifier_subagent(
_verifier_instruction,
_build_actions_snapshot(tool_events),
endpoint_url=endpoint_url, model=model, headers=headers,
)
if _vfail:
_verifier_rounds += 1
logger.info(f"[agent] verifier flagged {len(_vfail)} issue(s) on round {round_num}: {_vfail}")
_note = "\n\n_Double-checked the work and found something to fix._\n\n"
yield f'data: {json.dumps({"delta": _note})}\n\n'
full_response += _note
messages.append({
"role": "system",
"content": (
"An independent verifier reviewed your work against the "
"original request and found issues that must be fixed before "
"this is actually done:\n- " + "\n- ".join(_vfail) +
"\n\nFix these now using tools, then finish."
),
})
# Require fresh effectful work before verifying again, so we
# never re-verify an unchanged state in a loop.
_effectful_used = False
continue
break # no tools — done
# ── Loop-breaker (Terminus-style stall detector) ──────────────
# Stall detector for repeated no-progress tool loops.
# A round is "useless" ONLY when it re-issues a recent tool call AND
# writes no answer text — i.e. the model is going in circles.
# Genuine exploration (new, distinct calls) is never useless, so
# multi-step work (file hunts, multi-host ssh, build→test→fix) rides
# all the way to a real answer. We bail only on a streak of useless
# rounds, or a single tool fired an absurd number of times (hard
# runaway backstop). On bail we don't give up — we force one
# tool-free round so the model declares done or declares blocked,
# mirroring Terminus's explicit-completion handshake.
_sig = "|".join(sorted(f"{b.tool_type}:{(b.content or '').strip()[:120]}" for b in tool_blocks))
_is_repeat = _sig in _recent_call_sigs
_recent_call_sigs.append(_sig)
for _b in tool_blocks:
_tool_type_counts[_b.tool_type] += 1
# "Real" answer text = round text minus <think> blocks. Empty-think
# rounds (just "<think>\n\n</think>" + a tool call) must not read as
# progress, so strip think before checking.
_real_text = _THINK_RE.sub("", cleaned_round).strip()
# Circling = repeating a recent call with nothing written. Any
# progress (a NEW distinct call, or actual answer text) resets it.
if _is_repeat and not _real_text:
_stuck_rounds += 1
else:
_stuck_rounds = 0
_runaway = next((t for t, n in _tool_type_counts.items() if n >= 15), None)
if _stuck_rounds >= 4 or _runaway:
reason = (f"calling {_runaway} over and over" if _runaway
else "repeating the same tool calls without new progress")
logger.warning(f"[agent] loop-breaker tripped on round {round_num} ({reason}); sig={_sig[:80]!r}")
# The model has been executing tools, so its results are already
# in context. Force ONE tool-free round to converge: write the
# answer from what it has, or state plainly what's blocking it.
# The force-answer handler above salvages (grace synthesis) or
# apologizes honestly if it still writes nothing.
_off = [t for t in ("web_search", "bash")
if disabled_tools and t in disabled_tools]
_off_note = (f" ({', '.join(_off)} is currently disabled — say so if "
f"you needed it.)" if _off else "")
_force_answer = True
messages.append({
"role": "system",
"content": (
"You're repeating tool calls without converging. STOP calling "
"tools and end the turn one of two ways: (a) write your best "
"final answer NOW from the information already gathered, or "
"(b) if you're genuinely blocked, say plainly what's blocking "
"you in a sentence or two." + _off_note
),
})
full_response += "\n\n"
yield f'data: {json.dumps({"type": "agent_step", "round": round_num + 1})}\n\n'
continue
# Pre-stream document content for fenced tool blocks (non-native path)
# Native path already streamed via tool_call_delta above
# For round 1 fenced blocks, frontend fence detection already handled streaming
if not _doc_opened and round_num == 1:
for block in tool_blocks:
if block.tool_type == "create_document":
_doc_opened = True
break
if not _doc_opened:
for block in tool_blocks:
if block.tool_type == "create_document":
lines = block.content.strip().split("\n")
title = lines[0].strip() if lines else "Untitled"
lang = ""
content_start = 1
if len(lines) > 1 and len(lines[1].strip()) < 20 and lines[1].strip().isalpha():
lang = lines[1].strip()
content_start = 2
content = "\n".join(lines[content_start:]) if len(lines) > content_start else ""
yield f'data: {json.dumps({"type": "doc_stream_open", "title": title, "language": lang})}\n\n'
if content:
yield f'data: {json.dumps({"type": "doc_stream_delta", "content": content})}\n\n'
break
elif block.tool_type == "update_document":
# Pre-stream the full replacement content so user sees it immediately
content = block.content.strip()
yield f'data: {json.dumps({"type": "doc_stream_open", "title": "", "language": ""})}\n\n'
yield f'data: {json.dumps({"type": "doc_stream_delta", "content": content})}\n\n'
break
# Execute each tool block
tool_results = []
tool_result_texts = [] # plain text for native tool role messages
budget_hit = False
for i, block in enumerate(tool_blocks):
# --- Tool budget check ---
if max_tool_calls > 0 and total_tool_calls >= max_tool_calls:
yield f'data: {json.dumps({"type": "budget_exceeded", "limit": max_tool_calls, "used": total_tool_calls})}\n\n'
budget_hit = True
break
total_tool_calls += 1
# Build a short display string for the frontend tool bubble.
# Document tools show a brief summary instead of dumping full content.
is_doc_tool = block.tool_type in ("create_document", "update_document", "edit_document", "suggest_document")
if is_doc_tool:
cmd_display = block.content.split("\n")[0].strip()[:80]
else:
cmd_display = block.content.strip()
yield (
f'data: {json.dumps({"type": "tool_start", "tool": block.tool_type, "command": cmd_display, "round": round_num})}\n\n'
)
# Streaming progress for long-running tools (bash, python).
# The bash/python branches inside _direct_fallback emit
# periodic {elapsed_s, tail} payloads via this callback;
# we forward each one as a `tool_progress` SSE event so
# the UI can render live elapsed-time + tail-of-output.
_progress_q: asyncio.Queue = asyncio.Queue()
async def _push_progress(payload):
await _progress_q.put(payload)
async def _run_tool():
try:
return await execute_tool_block(
block,
session_id=session_id,
disabled_tools=disabled_tools,
owner=owner,
progress_cb=_push_progress,
)
finally:
# Sentinel so the drainer knows to stop.
await _progress_q.put(None)
_tool_task = asyncio.create_task(_run_tool())
# Drain progress events as they arrive — block until the
# next event OR the tool finishes (sentinel = None).
while True:
evt = await _progress_q.get()
if evt is None:
break
yield (
f'data: {json.dumps({"type": "tool_progress", "tool": block.tool_type, "round": round_num, **evt})}\n\n'
)
desc, result = await _tool_task
# Extract structured web sources from web_search tool output
_src_text = result.get("results") or result.get("stdout") or ""
if block.tool_type == "web_search" and _src_text:
_src_marker = "<!-- SOURCES:"
_src_idx = _src_text.find(_src_marker)
if _src_idx >= 0:
_src_end = _src_text.find(" -->", _src_idx)
if _src_end >= 0:
try:
_extracted_sources = json.loads(_src_text[_src_idx + len(_src_marker):_src_end])
yield f'data: {json.dumps({"type": "web_sources", "data": _extracted_sources})}\n\n'
# Strip the marker from the result so it doesn't show in chat
_clean = _src_text[:_src_idx].rstrip()
if "results" in result:
result["results"] = _clean
elif "stdout" in result:
result["stdout"] = _clean
except (json.JSONDecodeError, Exception):
pass
# Emit doc-specific event for document tools — the frontend
# document panel handles this; no need to show content in chat.
if is_doc_tool and "action" in result:
if result["action"] == "suggest":
yield (
f'data: {json.dumps({"type": "doc_suggestions", "doc_id": result["doc_id"], "suggestions": result["suggestions"]})}\n\n'
)
else:
yield (
f'data: {json.dumps({"type": "doc_update", "doc_id": result["doc_id"], "content": result["content"], "version": result["version"], "title": result.get("title", ""), "language": result.get("language")})}\n\n'
)
# Emit ui_control event for frontend to apply UI changes
if "ui_event" in result:
yield (
f'data: {json.dumps({"type": "ui_control", "data": result})}\n\n'
)
# Build output for frontend tool bubble.
# Document tools get a short summary — content goes to the editor panel.
output_text = ""
if is_doc_tool and "action" in result:
action = result["action"]
title = result.get("title", "")
ver = result.get("version", "?")
if action == "create":
output_text = f'Document created: "{title}" (v{ver})'
elif action == "edit":
output_text = f'Document edited: "{title}" (v{ver}, {result.get("applied", 0)} edit(s))'
elif action == "update":
output_text = f'Document updated: "{title}" (v{ver})'
elif "stdout" in result:
# On a bash/python timeout the result carries error + (often
# empty) stdout/stderr; fall back to the error so the "timed
# out" reason reaches the UI instead of a blank result.
output_text = (result["stdout"] or result["stderr"] or result.get("error", ""))[:2000]
elif "output" in result:
# bash / python canonical result: {"output": ..., "exit_code": ...}
output_text = (result["output"] or "")[:2000]
elif "response" in result:
# AI interaction tools (chat_with_model, send_to_session)
label = result.get("model", result.get("session_name", "AI"))
output_text = f"{label}: {result['response']}"[:4000]
elif "content" in result:
output_text = result["content"][:2000]
elif "results" in result:
output_text = result["results"][:4000]
elif "session_id" in result and "name" in result:
output_text = f"Session created: {result['name']} (id: {result['session_id']})"
elif "success" in result:
output_text = (
f"Written: {result.get('path', '')}"
if result["success"]
else f"Error: {result.get('error', '')}"
)
elif "error" in result:
output_text = result["error"][:2000]
# Emit tool_output (include ui_event data if present)
tool_output_data = {"type": "tool_output", "tool": block.tool_type, "command": cmd_display, "output": output_text, "exit_code": result.get("exit_code")}
if "ui_event" in result:
tool_output_data["ui_event"] = result["ui_event"]
for k in ("toggle_name", "state", "mode", "model", "endpoint_url", "theme_name", "colors"):
if k in result:
tool_output_data[k] = result[k]
# Forward image data from generate_image tool
for k in ("image_url", "image_prompt", "image_model", "image_size", "image_quality"):
if k in result:
tool_output_data[k] = result[k]
# Forward screenshots from browser tools (base64 images)
if result.get("images"):
img = result["images"][0]
tool_output_data["screenshot"] = f"data:{img['mimeType']};base64,{img['data']}"
yield f'data: {json.dumps(tool_output_data)}\n\n'
# Native document tools open in the editor + carry the REAL doc id.
# Emit a doc_update so the frontend opens/activates it and sends it
# back as active_doc_id next turn (otherwise the agent can't "see"
# the document it just created on the follow-up message).
if block.tool_type in ("create_document", "update_document", "edit_document") and result.get("doc_id"):
yield (
'data: ' + json.dumps({
"type": "doc_update",
"doc_id": result["doc_id"],
"title": result.get("title", ""),
"language": result.get("language", ""),
"content": result.get("content", ""),
"version": result.get("version", 1),
}) + '\n\n'
)
# Inline research: emit the open-link as part of the assistant's
# actual response text — a `#research-<id>` anchor that chatRenderer
# turns into a regular clickable link. Saved with the message, so it
# PERSISTS across refresh (unlike the old ephemeral injected chip).
_rsid = result.get("research_session_id")
if _rsid:
_anchor = f"\n\n[Open in Deep Research](#research-{_rsid})\n"
yield 'data: ' + json.dumps({"delta": _anchor}) + '\n\n'
# Save for history persistence
tool_event = {
"round": round_num,
"tool": block.tool_type,
"command": cmd_display,
"output": output_text,
"exit_code": result.get("exit_code"),
}
if result.get("image_url"):
for ik in ("image_url", "image_prompt", "image_model", "image_size", "image_quality"):
if result.get(ik):
tool_event[ik] = result[ik]
if result.get("doc_id"):
tool_event["doc_id"] = result["doc_id"]
tool_event["doc_title"] = result.get("title", "")
tool_events.append(tool_event)
if block.tool_type in _VERIFIER_EFFECTFUL_TOOLS:
_effectful_used = True
formatted = format_tool_result(desc, result)
tool_results.append(formatted)
tool_result_texts.append(formatted)
# If budget was hit, stop the loop
if budget_hit:
break
# Feed results back to LLM for next round
_append_tool_results(messages, round_response, native_tool_calls,
tool_results, tool_result_texts, used_native, round_num,
round_reasoning=round_reasoning)
# Emit agent_step event
yield (
f'data: {json.dumps({"type": "agent_step", "round": round_num + 1})}\n\n'
)
# Separator in accumulated response
full_response += "\n\n"
# --- Final metrics ---
total_duration = time.time() - total_start
metrics = _compute_final_metrics(
messages, full_response, total_duration, time_to_first_token,
context_length, real_input_tokens, real_output_tokens,
has_real_usage, tool_events, round_texts, model=model,
last_round_input_tokens=last_round_input_tokens,
prep_timings=prep_timings,
)
yield f"data: {json.dumps({'type': 'metrics', 'data': metrics})}\n\n"
# Teacher-escalation: inline takeover visible in the chat stream.
# The student just finished; if Tier 1 flags failure, the teacher
# gets a turn (with its own tool calls forwarded to the user) and
# a skill is saved ONLY if the teacher actually succeeds. Skipped
# when we ARE the teacher to avoid recursion.
if not _is_teacher_run:
try:
from src.teacher_escalation import run_teacher_inline
async for evt in run_teacher_inline(
student_endpoint_url=endpoint_url,
student_messages=messages,
student_tool_events=tool_events,
student_reply=full_response,
owner=owner,
):
yield evt
except Exception as _esc_err:
logger.warning(f"teacher escalation hook failed: {_esc_err}", exc_info=True)
yield "data: [DONE]\n\n"