diff --git a/src/llm_core.py b/src/llm_core.py index f77f3bb..e639aee 100644 --- a/src/llm_core.py +++ b/src/llm_core.py @@ -451,7 +451,17 @@ def _build_anthropic_payload(model, messages, temperature, max_tokens, stream=Fa "temperature": temperature, } if system_parts: - payload["system"] = "\n\n".join(system_parts) + system_text = "\n\n".join(system_parts) + # Send `system` as a structured text block so we can attach a prompt-cache + # breakpoint. The agent loop re-sends this same large prefix every round; + # caching it makes Anthropic re-read it from cache (~90% cheaper, lower TTFB) + # instead of re-billing it. Skip caching tiny one-off prompts, where the + # cache-WRITE premium wouldn't pay back (no reuse). Presence of `tools` + # means an agentic/multi-round call, where the prefix is always reused. + system_block = {"type": "text", "text": system_text} + if tools or len(system_text) > 4000: + system_block["cache_control"] = {"type": "ephemeral"} + payload["system"] = [system_block] if stream: payload["stream"] = True # Convert OpenAI-format tools to Anthropic format @@ -466,6 +476,9 @@ def _build_anthropic_payload(model, messages, temperature, max_tokens, stream=Fa "input_schema": fn.get("parameters", {"type": "object", "properties": {}}), }) if anthropic_tools: + # Cache the tool schemas too — they're stable for the whole agent run. + # The breakpoint caches all tool defs preceding it in the request. + anthropic_tools[-1]["cache_control"] = {"type": "ephemeral"} payload["tools"] = anthropic_tools return payload @@ -951,7 +964,17 @@ async def stream_llm(url: str, model: str, messages: List[Dict], temperature: fl if partial and _anth_tool_blocks[idx].get("name") in ("create_document", "update_document", "edit_document"): yield f'data: {json.dumps({"type": "tool_call_delta", "index": idx, "name": _anth_tool_blocks[idx]["name"], "arg_delta": partial})}\n\n' elif evt == "message_start": - _anth_input_tokens = j.get("message", {}).get("usage", {}).get("input_tokens", 0) + _u = j.get("message", {}).get("usage", {}) + _anth_input_tokens = _u.get("input_tokens", 0) + # Surface prompt-cache effectiveness: cache_read > 0 means the + # stable system+tools prefix was served from cache this round. + _c_read = _u.get("cache_read_input_tokens", 0) + _c_write = _u.get("cache_creation_input_tokens", 0) + if _c_read or _c_write: + logger.info( + "[anthropic-cache] read=%s write=%s fresh_input=%s", + _c_read, _c_write, _anth_input_tokens, + ) elif evt == "message_delta": _anth_output_tokens = j.get("usage", {}).get("output_tokens", 0) elif evt == "message_stop": diff --git a/tests/test_llm_core_anthropic_cache.py b/tests/test_llm_core_anthropic_cache.py new file mode 100644 index 0000000..990b199 --- /dev/null +++ b/tests/test_llm_core_anthropic_cache.py @@ -0,0 +1,32 @@ +"""Regression tests for Anthropic prompt-cache breakpoints in _build_anthropic_payload (#791).""" +from src import llm_core + + +def _payload(system="sys", user="hi", tools=None): + messages = [{"role": "system", "content": system}, {"role": "user", "content": user}] + return llm_core._build_anthropic_payload("claude", messages, 0.0, 1000, stream=True, tools=tools) + + +def test_agentic_caches_system_and_last_tool(): + tools = [ + {"type": "function", "function": {"name": "a", "description": "x", "parameters": {}}}, + {"type": "function", "function": {"name": "b", "description": "y", "parameters": {}}}, + ] + p = _payload(system="SYS PROMPT " * 50, tools=tools) + assert isinstance(p["system"], list) + assert p["system"][0].get("cache_control") == {"type": "ephemeral"} + assert "cache_control" not in p["tools"][0], "only the LAST tool is a breakpoint" + assert p["tools"][-1].get("cache_control") == {"type": "ephemeral"} + breakpoints = sum("cache_control" in b for b in p["system"]) + sum("cache_control" in t for t in p["tools"]) + assert breakpoints == 2 + + +def test_tiny_tool_less_prompt_not_cached(): + p = _payload(system="hi", tools=None) + assert isinstance(p["system"], list) + assert "cache_control" not in p["system"][0] + + +def test_large_system_only_is_cached(): + p = _payload(system="z" * 5000, tools=None) + assert p["system"][0].get("cache_control") == {"type": "ephemeral"}