"""Regression: a streamed `reasoning` delta (vLLM 0.20.2 / NIM / Ollama) must surface
as a thinking chunk, while a `content` delta still streams as normal content. Also
covers the older `reasoning_content` field name for backward compatibility.
"""
import asyncio
import json
from src import llm_core
class _FakeResp:
status_code = 200
def __init__(self, lines):
self._lines = lines
async def aiter_lines(self):
for ln in self._lines:
yield ln
async def aread(self): # only used on non-200; present for safety
return b""
class _FakeStreamCtx:
def __init__(self, lines):
self._lines = lines
async def __aenter__(self):
return _FakeResp(self._lines)
async def __aexit__(self, *exc):
return False
class _FakeClient:
def __init__(self, lines):
self._lines = lines
def stream(self, *args, **kwargs):
return _FakeStreamCtx(self._lines)
def _run_stream(model, lines, monkeypatch):
"""Drive stream_llm against a faked upstream and return parsed SSE payloads."""
monkeypatch.setattr(llm_core, "_get_http_client", lambda: _FakeClient(lines))
async def _go():
out = []
async for chunk in llm_core.stream_llm(
"http://nim-nano:8000/v1/chat/completions",
model,
[{"role": "user", "content": "hi"}],
):
out.append(chunk)
return out
parsed = []
for chunk in asyncio.run(_go()):
for raw in chunk.splitlines():
raw = raw.strip()
if raw.startswith("data:"):
payload = raw[5:].strip()
if payload.startswith("{"):
try:
parsed.append(json.loads(payload))
except json.JSONDecodeError:
pass
return [p for p in parsed if "delta" in p]
def test_reasoning_field_emits_thinking_chunk(monkeypatch):
deltas = _run_stream(
"nvidia/nemotron-3-nano",
[
'data: {"choices":[{"delta":{"reasoning":"weighing options"}}]}',
'data: {"choices":[{"delta":{"content":"Hello"}}]}',
"data: [DONE]",
],
monkeypatch,
)
assert any(d.get("thinking") and "weighing options" in d["delta"] for d in deltas), deltas
assert any((not d.get("thinking")) and d["delta"] == "Hello" for d in deltas), deltas
def test_reasoning_content_field_still_supported(monkeypatch):
# Older builds emit `reasoning_content`; it must still surface as thinking.
deltas = _run_stream(
"some-thinking-model",
[
'data: {"choices":[{"delta":{"reasoning_content":"older field"}}]}',
'data: {"choices":[{"delta":{"content":"Answer"}}]}',
"data: [DONE]",
],
monkeypatch,
)
assert any(d.get("thinking") and "older field" in d["delta"] for d in deltas), deltas
assert any((not d.get("thinking")) and d["delta"] == "Answer" for d in deltas), deltas
def test_think_tag_in_content_stream_routes_to_thinking_channel(monkeypatch):
# Regression: unregistered model (Qwopus-style) that emits …
# directly in the content field. Reasoning must surface as thinking chunks;
# only the answer after is a normal delta.
deltas = _run_stream(
"Qwopus3-9B-custom", # name not in _THINKING_MODEL_PATTERNS
[
'data: {"choices":[{"delta":{"content":"step one "}}]}',
'data: {"choices":[{"delta":{"content":"step two"}}]}',
'data: {"choices":[{"delta":{"content":"Final answer"}}]}',
"data: [DONE]",
],
monkeypatch,
)
thinking = [d for d in deltas if d.get("thinking")]
regular = [d for d in deltas if not d.get("thinking")]
assert thinking, f"expected thinking deltas, got: {deltas}"
assert all("Final answer" not in d["delta"] for d in thinking), thinking
assert regular, f"expected regular delta after , got: {deltas}"
assert any("Final answer" in d["delta"] for d in regular), regular
def test_think_tag_and_close_in_same_chunk(monkeypatch):
# reasoninganswer all arrive in a single content chunk.
deltas = _run_stream(
"Qwopus3-9B-custom",
[
'data: {"choices":[{"delta":{"content":"my reasoningmy answer"}}]}',
"data: [DONE]",
],
monkeypatch,
)
thinking = [d for d in deltas if d.get("thinking")]
regular = [d for d in deltas if not d.get("thinking")]
assert thinking and "my reasoning" in thinking[0]["delta"], thinking
assert regular and "my answer" in regular[0]["delta"], regular
def test_think_tag_gt_in_mid_reasoning_not_truncated(monkeypatch):
# Regression for _first_content_sent misuse: the opening-tag strip ran on every
# chunk (not just the first) because _first_content_sent stays False throughout
# the think block. On chunk 2 it did find(">") over reasoning text and silently
# dropped everything before the first ">". Repro: 3 chunks, ">" in chunk 2.
deltas = _run_stream(
"Qwopus3-9B-custom",
[
'data: {"choices":[{"delta":{"content":"reasoning a "}}]}',
'data: {"choices":[{"delta":{"content":"more c > d "}}]}',
'data: {"choices":[{"delta":{"content":"answer"}}]}',
"data: [DONE]",
],
monkeypatch,
)
thinking = [d for d in deltas if d.get("thinking")]
regular = [d for d in deltas if not d.get("thinking")]
# "more c " must survive — must not be truncated at the '>'
assert any("more c > d" in d["delta"] for d in thinking), thinking
assert any("answer" in d["delta"] for d in regular), regular
def test_registered_thinking_model_stray_close_tag_repair_unchanged(monkeypatch):
# The existing repair for registered models must not regress.
# A registered model that starts content with gets prepended.
deltas = _run_stream(
"qwq-32b", # registered in _THINKING_MODEL_PATTERNS
[
'data: {"choices":[{"delta":{"content":"Here is my answer"}}]}',
"data: [DONE]",
],
monkeypatch,
)
assert deltas, deltas
first = deltas[0]["delta"]
assert first.startswith(""), f"expected repair prefix, got: {first!r}"