Add Anthropic prompt caching to the agent loop (#812)

Send `system` as a structured text block with an ephemeral cache_control
breakpoint and cache the last tool schema, so multi-round agent runs read
the stable system+tools prefix from cache instead of re-billing it. Gate
the system breakpoint so tiny tool-less prompts skip the cache-write
premium. Log cache_read/creation tokens at message_start.

Fixes #791

Co-authored-by: Ethan <23321960+0xLeathery@users.noreply.github.com>
This commit is contained in:
Ethan
2026-06-02 12:14:31 +10:00
committed by GitHub
parent 8e918dfdbb
commit fd04ad353d
2 changed files with 57 additions and 2 deletions

View File

@@ -451,7 +451,17 @@ def _build_anthropic_payload(model, messages, temperature, max_tokens, stream=Fa
"temperature": temperature, "temperature": temperature,
} }
if system_parts: if system_parts:
payload["system"] = "\n\n".join(system_parts) system_text = "\n\n".join(system_parts)
# Send `system` as a structured text block so we can attach a prompt-cache
# breakpoint. The agent loop re-sends this same large prefix every round;
# caching it makes Anthropic re-read it from cache (~90% cheaper, lower TTFB)
# instead of re-billing it. Skip caching tiny one-off prompts, where the
# cache-WRITE premium wouldn't pay back (no reuse). Presence of `tools`
# means an agentic/multi-round call, where the prefix is always reused.
system_block = {"type": "text", "text": system_text}
if tools or len(system_text) > 4000:
system_block["cache_control"] = {"type": "ephemeral"}
payload["system"] = [system_block]
if stream: if stream:
payload["stream"] = True payload["stream"] = True
# Convert OpenAI-format tools to Anthropic format # Convert OpenAI-format tools to Anthropic format
@@ -466,6 +476,9 @@ def _build_anthropic_payload(model, messages, temperature, max_tokens, stream=Fa
"input_schema": fn.get("parameters", {"type": "object", "properties": {}}), "input_schema": fn.get("parameters", {"type": "object", "properties": {}}),
}) })
if anthropic_tools: if anthropic_tools:
# Cache the tool schemas too — they're stable for the whole agent run.
# The breakpoint caches all tool defs preceding it in the request.
anthropic_tools[-1]["cache_control"] = {"type": "ephemeral"}
payload["tools"] = anthropic_tools payload["tools"] = anthropic_tools
return payload return payload
@@ -951,7 +964,17 @@ async def stream_llm(url: str, model: str, messages: List[Dict], temperature: fl
if partial and _anth_tool_blocks[idx].get("name") in ("create_document", "update_document", "edit_document"): if partial and _anth_tool_blocks[idx].get("name") in ("create_document", "update_document", "edit_document"):
yield f'data: {json.dumps({"type": "tool_call_delta", "index": idx, "name": _anth_tool_blocks[idx]["name"], "arg_delta": partial})}\n\n' yield f'data: {json.dumps({"type": "tool_call_delta", "index": idx, "name": _anth_tool_blocks[idx]["name"], "arg_delta": partial})}\n\n'
elif evt == "message_start": elif evt == "message_start":
_anth_input_tokens = j.get("message", {}).get("usage", {}).get("input_tokens", 0) _u = j.get("message", {}).get("usage", {})
_anth_input_tokens = _u.get("input_tokens", 0)
# Surface prompt-cache effectiveness: cache_read > 0 means the
# stable system+tools prefix was served from cache this round.
_c_read = _u.get("cache_read_input_tokens", 0)
_c_write = _u.get("cache_creation_input_tokens", 0)
if _c_read or _c_write:
logger.info(
"[anthropic-cache] read=%s write=%s fresh_input=%s",
_c_read, _c_write, _anth_input_tokens,
)
elif evt == "message_delta": elif evt == "message_delta":
_anth_output_tokens = j.get("usage", {}).get("output_tokens", 0) _anth_output_tokens = j.get("usage", {}).get("output_tokens", 0)
elif evt == "message_stop": elif evt == "message_stop":

View File

@@ -0,0 +1,32 @@
"""Regression tests for Anthropic prompt-cache breakpoints in _build_anthropic_payload (#791)."""
from src import llm_core
def _payload(system="sys", user="hi", tools=None):
messages = [{"role": "system", "content": system}, {"role": "user", "content": user}]
return llm_core._build_anthropic_payload("claude", messages, 0.0, 1000, stream=True, tools=tools)
def test_agentic_caches_system_and_last_tool():
tools = [
{"type": "function", "function": {"name": "a", "description": "x", "parameters": {}}},
{"type": "function", "function": {"name": "b", "description": "y", "parameters": {}}},
]
p = _payload(system="SYS PROMPT " * 50, tools=tools)
assert isinstance(p["system"], list)
assert p["system"][0].get("cache_control") == {"type": "ephemeral"}
assert "cache_control" not in p["tools"][0], "only the LAST tool is a breakpoint"
assert p["tools"][-1].get("cache_control") == {"type": "ephemeral"}
breakpoints = sum("cache_control" in b for b in p["system"]) + sum("cache_control" in t for t in p["tools"])
assert breakpoints == 2
def test_tiny_tool_less_prompt_not_cached():
p = _payload(system="hi", tools=None)
assert isinstance(p["system"], list)
assert "cache_control" not in p["system"][0]
def test_large_system_only_is_cached():
p = _payload(system="z" * 5000, tools=None)
assert p["system"][0].get("cache_control") == {"type": "ephemeral"}