diff --git a/src/agent_loop.py b/src/agent_loop.py index 9c25e59..2a945ea 100644 --- a/src/agent_loop.py +++ b/src/agent_loop.py @@ -1123,6 +1123,11 @@ def _append_tool_results( "name": tc.get("name", ""), "arguments": tc.get("arguments", "{}"), }, + # Gemini 3 requires the opaque thought_signature it returned with + # each function call to be echoed back on the follow-up turn, or + # the next request 400s. Replay it when present; other providers + # never emit it (their payload builders just ignore the field). + **({"extra_content": tc["extra_content"]} if tc.get("extra_content") else {}), } for j, tc in enumerate(native_tool_calls) ] diff --git a/src/llm_core.py b/src/llm_core.py index 1ca4bf6..ac8de73 100644 --- a/src/llm_core.py +++ b/src/llm_core.py @@ -194,6 +194,43 @@ def _normalize_ollama_url(url: str) -> str: return base.rstrip("/") + "/chat" +def _ollama_normalize_tool_messages(messages: List[Dict]) -> List[Dict]: + """Adapt Odysseus' canonical OpenAI-style messages to native Ollama /api/chat. + + Odysseus carries assistant tool calls in the OpenAI shape, where + `function.arguments` is a JSON *string*. Native Ollama expects it to be a + JSON *object*; given the string it fails the whole request with HTTP 400 + "Value looks like object, but can't find closing '}' symbol", which aborts + every follow-up (tool-result) round. Parse the arguments back into an object + here, on a shallow copy, leaving non-tool messages untouched. The opaque + Gemini `extra_content` (thought_signature) is dropped — it is meaningless to + Ollama and only matters when the conversation is replayed to Gemini. + """ + out: List[Dict] = [] + for m in messages or []: + tcs = m.get("tool_calls") if isinstance(m, dict) else None + if not tcs: + out.append(m) + continue + new_calls = [] + for tc in tcs: + fn = tc.get("function") or {} + args = fn.get("arguments") + if isinstance(args, str): + try: + args = json.loads(args) if args.strip() else {} + except (json.JSONDecodeError, TypeError): + args = {} + call: Dict = {"function": {"name": fn.get("name", ""), "arguments": args or {}}} + if tc.get("id"): + call["id"] = tc["id"] + new_calls.append(call) + nm = dict(m) + nm["tool_calls"] = new_calls + out.append(nm) + return out + + def _build_ollama_payload( model: str, messages: List[Dict], @@ -204,7 +241,7 @@ def _build_ollama_payload( ) -> Dict: payload: Dict = { "model": model, - "messages": messages, + "messages": _ollama_normalize_tool_messages(messages), "stream": stream, } options: Dict = {} @@ -1040,6 +1077,7 @@ async def stream_llm(url: str, model: str, messages: List[Dict], temperature: fl # ── OpenAI-compatible streaming ── # Accumulate native tool_calls across streaming chunks _tc_acc: Dict[int, Dict] = {} # index -> {id, name, arguments} + _tc_last_idx = [-1] # most-recently-touched slot, for providers that omit `index` # For thinking models: prepend to first content delta so frontend # can detect thinking-in-progress (some models output but no ) _thinking_model = _supports_thinking(model) @@ -1105,12 +1143,41 @@ async def stream_llm(url: str, model: str, messages: List[Dict], temperature: fl yield f'data: {json.dumps({"delta": content})}\n\n' # Native tool calls — accumulate across chunks for tc in delta.get("tool_calls") or []: - idx = tc.get("index", 0) + func = tc.get("function") or {} + raw_idx = tc.get("index") + if raw_idx is None: + # Gemini's OpenAI-compat layer omits `index` on + # parallel tool calls (every delta arrives as + # index=None) and sends each call complete in one + # delta. Without this, all parallel calls collide + # into slot 0 — later calls overwrite the first's + # name and CORRUPT its arguments by concatenation, + # so only one malformed call survives and the + # follow-up round 400s. A function name marks the + # start of a new call → allocate a fresh slot; + # an arg-only continuation attaches to the last. + if func.get("name") or _tc_last_idx[0] < 0: + # Next free slot ABOVE any existing key (not + # len()), so a provider mixing integer indices + # with index=None can never collide. + idx = max(_tc_acc, default=-1) + 1 + else: + idx = _tc_last_idx[0] + else: + idx = raw_idx + _tc_last_idx[0] = idx if idx not in _tc_acc: _tc_acc[idx] = {"id": "", "name": "", "arguments": ""} if tc.get("id"): _tc_acc[idx]["id"] = tc["id"] - func = tc.get("function") or {} + # Gemini 3 returns an opaque thought_signature in + # extra_content on the function-call delta. It MUST be + # echoed back on the assistant tool_call next round or the + # follow-up request 400s ("Function call is missing a + # thought_signature"). Preserve it verbatim; other + # providers never send it, so this is a no-op for them. + if tc.get("extra_content"): + _tc_acc[idx]["extra_content"] = tc["extra_content"] if func.get("name"): _tc_acc[idx]["name"] = func["name"] if "arguments" in func: diff --git a/tests/test_agent_loop.py b/tests/test_agent_loop.py index ca0a1c1..7f11d6c 100644 --- a/tests/test_agent_loop.py +++ b/tests/test_agent_loop.py @@ -301,3 +301,39 @@ class TestAppendToolResultsNativeContent: assert messages[0]["content"] == "thinking..." assert messages[1]["role"] == "user" assert "tool output" in messages[1]["content"] + + +class TestAppendToolResultsThoughtSignature: + """Gemini 3 returns an opaque thought_signature (in extra_content) with each + function call and rejects the follow-up turn with HTTP 400 unless it is + echoed back on the assistant tool_call. _append_tool_results must replay it + when present, and omit the field entirely otherwise (other providers never + send it).""" + + def test_extra_content_is_replayed_when_present(self): + native = [{ + "id": "call_g", + "name": "app_api", + "arguments": '{"action": "get_memory"}', + "extra_content": {"google": {"thought_signature": "EuIDCt8DAQ=="}}, + }] + messages = [] + _append_tool_results( + messages, "", native, [{}], ["mem"], + used_native=True, round_num=1, + ) + tc = messages[0]["tool_calls"][0] + assert tc["extra_content"] == {"google": {"thought_signature": "EuIDCt8DAQ=="}} + # function payload is still well-formed alongside it + assert tc["function"]["name"] == "app_api" + assert tc["id"] == "call_g" + + def test_no_extra_content_key_when_absent(self): + native = [{"id": "call_o", "name": "app_api", "arguments": "{}"}] + messages = [] + _append_tool_results( + messages, "", native, [{}], ["r"], + used_native=True, round_num=1, + ) + # No empty/None extra_content leaks onto non-Gemini tool calls. + assert "extra_content" not in messages[0]["tool_calls"][0] diff --git a/tests/test_llm_core_ollama.py b/tests/test_llm_core_ollama.py index 18b9819..59fa8ce 100644 --- a/tests/test_llm_core_ollama.py +++ b/tests/test_llm_core_ollama.py @@ -41,3 +41,75 @@ def test_llm_call_posts_native_ollama_payload(monkeypatch): assert seen["headers"]["Authorization"] == "Bearer ollama-key" assert seen["json"]["stream"] is False assert seen["json"]["options"] == {"temperature": 0.2, "num_predict": 7} + + +# --------------------------------------------------------------------------- +# Tool-call argument serialization for native Ollama +# +# Odysseus carries assistant tool calls in the OpenAI shape, where +# `function.arguments` is a JSON *string*. Native Ollama /api/chat expects a +# JSON *object* and rejects the string form with HTTP 400 ("Value looks like +# object, but can't find closing '}' symbol"), aborting every follow-up +# (tool-result) round. _build_ollama_payload must parse it back to an object. +# --------------------------------------------------------------------------- + +def _assistant_tool_call_msgs(): + """A canonical OpenAI-style assistant tool call + tool result, as produced by + agent_loop._append_tool_results (arguments are a JSON string).""" + return [ + {"role": "user", "content": "what do you know about me?"}, + { + "role": "assistant", + "content": None, + "tool_calls": [ + { + "id": "call_0", + "type": "function", + "function": {"name": "app_api", "arguments": '{"action": "get_memory"}'}, + } + ], + }, + {"role": "tool", "tool_call_id": "call_0", "content": "Memory: user is James."}, + ] + + +def test_ollama_payload_parses_string_arguments_to_object(): + payload = llm_core._build_ollama_payload( + "gpt-oss:120b", _assistant_tool_call_msgs(), temperature=0.0, max_tokens=0, + ) + asst = payload["messages"][1] + args = asst["tool_calls"][0]["function"]["arguments"] + # The whole point: arguments must be a dict, not the JSON string. + assert args == {"action": "get_memory"} + assert not isinstance(args, str) + assert asst["tool_calls"][0]["function"]["name"] == "app_api" + assert asst["tool_calls"][0]["id"] == "call_0" + + +def test_ollama_payload_drops_gemini_thought_signature(): + """A cross-provider fallback can hand Ollama a tool call that still carries + Gemini's opaque extra_content; it is meaningless to Ollama and must not leak.""" + msgs = _assistant_tool_call_msgs() + msgs[1]["tool_calls"][0]["extra_content"] = {"google": {"thought_signature": "AAAA"}} + payload = llm_core._build_ollama_payload( + "gpt-oss:120b", msgs, temperature=0.0, max_tokens=0, + ) + tc = payload["messages"][1]["tool_calls"][0] + assert "extra_content" not in tc + assert tc["function"]["arguments"] == {"action": "get_memory"} + + +def test_ollama_payload_leaves_plain_messages_untouched(): + msgs = [{"role": "user", "content": "hello"}] + payload = llm_core._build_ollama_payload("m", msgs, temperature=0.0, max_tokens=0) + assert payload["messages"][0] == {"role": "user", "content": "hello"} + + +def test_ollama_payload_tolerates_malformed_arguments(): + msgs = [{ + "role": "assistant", + "tool_calls": [{"function": {"name": "x", "arguments": "{not json"}}], + }] + payload = llm_core._build_ollama_payload("m", msgs, temperature=0.0, max_tokens=0) + # Falls back to an empty object rather than raising. + assert payload["messages"][0]["tool_calls"][0]["function"]["arguments"] == {} diff --git a/tests/test_llm_core_streaming.py b/tests/test_llm_core_streaming.py new file mode 100644 index 0000000..4476286 --- /dev/null +++ b/tests/test_llm_core_streaming.py @@ -0,0 +1,151 @@ +"""Streaming tool-call accumulation tests for the OpenAI-compatible path. + +Regression for Gemini's OpenAI-compat layer, which (a) attaches an opaque +thought_signature in `extra_content` on the function-call delta and (b) omits +`index` on PARALLEL tool calls — every parallel delta arrives as index=None. +The accumulator must give each parallel call its own slot (otherwise they +collide into slot 0, overwriting the first call's name and concatenating — +corrupting — its arguments) and must preserve extra_content per call. +""" +import json +import asyncio + +from src import llm_core + + +class _FakeResp: + def __init__(self, lines): + self._lines = lines + self.status_code = 200 + + async def aiter_lines(self): + for ln in self._lines: + yield ln + + async def aread(self): + return b"" + + +class _FakeStreamCtx: + def __init__(self, lines): + self._lines = lines + + async def __aenter__(self): + return _FakeResp(self._lines) + + async def __aexit__(self, *a): + return False + + +class _FakeClient: + def __init__(self, lines): + self._lines = lines + + def stream(self, method, url, **kw): + return _FakeStreamCtx(self._lines) + + +def _drive(monkeypatch, lines, model="gemini-3.1-pro-preview-customtools"): + """Run stream_llm against a canned SSE line list; return parsed events.""" + monkeypatch.setattr(llm_core, "_get_http_client", lambda: _FakeClient(lines)) + monkeypatch.setattr(llm_core, "_is_host_dead", lambda u: False) + monkeypatch.setattr(llm_core, "note_model_activity", lambda *a, **k: None) + monkeypatch.setattr(llm_core, "_clear_host_dead", lambda *a, **k: None) + + async def run(): + events = [] + async for chunk in llm_core.stream_llm( + "https://generativelanguage.googleapis.com/v1beta/openai/chat/completions", + model, + [{"role": "user", "content": "hi"}], + headers={"Authorization": "Bearer k"}, + tools=[{"type": "function", "function": {"name": "x", "parameters": {}}}], + ): + for ln in chunk.split("\n"): + ln = ln.strip() + if ln.startswith("data: ") and ln[6:] != "[DONE]": + try: + events.append(json.loads(ln[6:])) + except ValueError: + pass + return events + + return asyncio.run(run()) + + +def _sse(delta): + return "data: " + json.dumps({"choices": [{"delta": delta}]}) + + +def test_parallel_calls_with_null_index_do_not_collide(monkeypatch): + # Two parallel calls, each complete in one delta, both with index=None + # (exactly what Gemini's OpenAI-compat layer emits). Only the first carries + # a thought_signature. + lines = [ + _sse({"tool_calls": [{ + "index": None, "id": "call_a", "type": "function", + "function": {"name": "get_memory", "arguments": "{}"}, + "extra_content": {"google": {"thought_signature": "SIG0"}}, + }]}), + _sse({"tool_calls": [{ + "index": None, "id": "call_b", "type": "function", + "function": {"name": "bash", "arguments": '{"command":"echo hi"}'}, + }]}), + "data: [DONE]", + ] + events = _drive(monkeypatch, lines) + calls = next(e["calls"] for e in events if e.get("type") == "tool_calls") + assert len(calls) == 2, f"parallel calls collided: {calls}" + by_name = {c["name"]: c for c in calls} + assert set(by_name) == {"get_memory", "bash"} + # arguments are NOT corrupted by concatenation + assert by_name["get_memory"]["arguments"] == "{}" + assert by_name["bash"]["arguments"] == '{"command":"echo hi"}' + # signature preserved on the first call only, exactly as received + assert by_name["get_memory"]["extra_content"] == {"google": {"thought_signature": "SIG0"}} + assert "extra_content" not in by_name["bash"] + + +def test_single_call_chunked_arguments_still_accumulate(monkeypatch): + # Conformant OpenAI style: index present, arguments streamed in pieces. + lines = [ + _sse({"tool_calls": [{"index": 0, "id": "c", "type": "function", + "function": {"name": "search", "arguments": '{"q":"'}}]}), + _sse({"tool_calls": [{"index": 0, "function": {"arguments": 'cats"}'}}]}), + "data: [DONE]", + ] + events = _drive(monkeypatch, lines, model="gpt-4o-test") + calls = next(e["calls"] for e in events if e.get("type") == "tool_calls") + assert len(calls) == 1 + assert calls[0]["name"] == "search" + assert calls[0]["arguments"] == '{"q":"cats"}' + + +def test_null_index_chunked_arguments_attach_to_last_call(monkeypatch): + # index=None where the name arrives first, then an arg-only continuation: + # the continuation must attach to the just-started call, not open a new one. + lines = [ + _sse({"tool_calls": [{"index": None, "id": "c", "type": "function", + "function": {"name": "search", "arguments": '{"q":'}}]}), + _sse({"tool_calls": [{"index": None, "function": {"arguments": '"dogs"}'}}]}), + "data: [DONE]", + ] + events = _drive(monkeypatch, lines) + calls = next(e["calls"] for e in events if e.get("type") == "tool_calls") + assert len(calls) == 1, f"continuation opened a spurious call: {calls}" + assert calls[0]["arguments"] == '{"q":"dogs"}' + + +def test_sparse_integer_indices_then_null_do_not_collide(monkeypatch): + # Hardening: a provider that uses sparse integer indices (0 and 2) and then + # a null-index call must allocate ABOVE the max key, not at len()==2 (which + # would overwrite slot 2). Three distinct calls must survive. + lines = [ + _sse({"tool_calls": [{"index": 0, "id": "a", "function": {"name": "f0", "arguments": "{}"}}]}), + _sse({"tool_calls": [{"index": 2, "id": "b", "function": {"name": "f2", "arguments": "{}"}}]}), + _sse({"tool_calls": [{"index": None, "id": "c", "function": {"name": "fn", "arguments": "{}"}}]}), + "data: [DONE]", + ] + events = _drive(monkeypatch, lines) + calls = next(e["calls"] for e in events if e.get("type") == "tool_calls") + assert sorted(c["name"] for c in calls) == ["f0", "f2", "fn"], f"collision: {calls}"