""" tool_implementations.py Extracted tool implementation functions (do_* and helpers) from agent_tools.py. These handle the actual execution logic for each tool type. """ import json import logging import os import re from typing import Any, Dict, List, Optional MAX_OUTPUT_CHARS = 10_000 MAX_READ_CHARS = 20_000 def get_mcp_manager(): from src import agent_tools return agent_tools.get_mcp_manager() def _truncate(text: str, limit: int = MAX_OUTPUT_CHARS) -> str: if len(text) > limit: return text[:limit] + f"\n... (truncated, {len(text)} chars total)" return text logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Argument parsing # --------------------------------------------------------------------------- def _parse_tool_args(content): """Parse a tool-call argument blob. Accepts either a JSON string or an already-decoded dict. Unwraps the common `{"body": {...}}` envelope that smaller models emit when they read tool descriptions like "Body is JSON: {...}" literally — they pass `body` as a field name rather than treating it as a noun. Returns a dict on success, raises ValueError on bad JSON. """ if isinstance(content, str): try: args = json.loads(content) if content.strip() else {} except (json.JSONDecodeError, TypeError) as e: raise ValueError(str(e)) elif isinstance(content, dict): args = content else: args = {} # Unwrap {"body": {...}} envelope — but only if `body` is the sole key # and points at a dict. We don't want to clobber a legitimate `body` # field on tools where it's a real arg (e.g. send_email body text). if ( isinstance(args, dict) and len(args) == 1 and "body" in args and isinstance(args["body"], dict) and "action" in args["body"] # extra safety: only unwrap if the inner dict looks like a tool call ): args = args["body"] return args # --------------------------------------------------------------------------- # Active document state # --------------------------------------------------------------------------- _active_document_id: Optional[str] = None _active_model: Optional[str] = None def set_active_document(doc_id: Optional[str]): """Set the active document ID for document tool execution.""" global _active_document_id _active_document_id = doc_id def set_active_model(model: Optional[str]): """Set the current model name for version summaries.""" global _active_model _active_model = model def get_active_document(): return _active_document_id # --------------------------------------------------------------------------- # Document tools — create/update/edit/suggest living documents # --------------------------------------------------------------------------- def _sniff_doc_language(text: str) -> str: """Best-effort detect a document's language from its content when the model didn't specify one. Defaults to 'markdown' (prose). Recognizes the common markup/code types the editor supports so e.g. an SVG isn't saved as markdown.""" import json as _json, re as _re2 s = (text or "").strip() if not s: return "markdown" head = s[:600] hl = head.lower() if _looks_like_email_document(s): return "email" # Markup (unambiguous) if " bool: import re as _re title_l = (title or "").strip().lower() if title_l in {"new email", "new mail", "new message"}: return True s = (text or "").lstrip() if "\n---\n" in s and _re.search(r"(?im)^To:\s*", s) and _re.search(r"(?im)^Subject:\s*", s): return True return bool(_re.search(r"(?im)^To:\s*", s) and _re.search(r"(?im)^Subject:\s*", s)) def _coerce_email_document_content(existing: str, incoming: str) -> str: """Keep email docs in the To/Subject/---/body shape even if a model writes only the body or dumps header labels without the separator.""" import re as _re old = existing or "" new = (incoming or "").strip() if "\n---\n" in new: return new header = old.split("\n---\n", 1)[0] if "\n---\n" in old else "To: \nSubject: " if _looks_like_email_document(new): lines = new.splitlines() last_header_idx = -1 header_re = _re.compile(r"^(To|Cc|Bcc|Subject|In-Reply-To|References|X-Source-UID|X-Source-Folder|X-Attachments):", _re.I) for i, line in enumerate(lines): if header_re.match(line.strip()): last_header_idx = i body_lines = lines[last_header_idx + 1:] if last_header_idx >= 0 else lines while body_lines and not body_lines[0].strip(): body_lines.pop(0) body = "\n".join(body_lines).strip() else: body = new return header.rstrip() + "\n---\n" + body async def do_create_document(content_block: str, session_id: Optional[str] = None) -> Dict: """Create a new document. Supports two formats: 1) Line-based: line 1 = title, line 2 (optional) = language, rest = content 2) XML-like tags: ......... Some models mix them — strip any XML-style tags and fall back to line parsing.""" import uuid, re as _re from src.database import SessionLocal, Document, DocumentVersion, Session as DbSession raw = content_block or "" # Known languages the editor understands (match the