fix: streaming drops providers that emit SSE data lines with no space (#1701)
This commit is contained in:
@@ -1192,9 +1192,13 @@ async def stream_llm(url: str, model: str, messages: List[Dict], temperature: fl
|
|||||||
yield f'event: error\ndata: {json.dumps({"status": r.status_code, "text": friendly, "raw": raw[:500]})}\n\n'
|
yield f'event: error\ndata: {json.dumps({"status": r.status_code, "text": friendly, "raw": raw[:500]})}\n\n'
|
||||||
return
|
return
|
||||||
async for line in r.aiter_lines():
|
async for line in r.aiter_lines():
|
||||||
if not line or not line.startswith("data: "):
|
# SSE allows "data:value" with no space after the colon
|
||||||
|
# (the space is optional per the spec). Some gateways and
|
||||||
|
# local servers omit it; gating on "data: " dropped their
|
||||||
|
# entire stream.
|
||||||
|
if not line or not line.startswith("data:"):
|
||||||
continue
|
continue
|
||||||
data = line[6:].strip()
|
data = line[5:].strip()
|
||||||
if not data or not data.startswith("{"):
|
if not data or not data.startswith("{"):
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
@@ -1307,8 +1311,11 @@ async def stream_llm(url: str, model: str, messages: List[Dict], temperature: fl
|
|||||||
if not line:
|
if not line:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if line.startswith("data: "):
|
# SSE allows "data:value" with no space after the colon; gating
|
||||||
data = line[6:].strip()
|
# on "data: " silently dropped content + usage from providers
|
||||||
|
# that omit it.
|
||||||
|
if line.startswith("data:"):
|
||||||
|
data = line[5:].strip()
|
||||||
if data == "[DONE]":
|
if data == "[DONE]":
|
||||||
tc_event = _emit_tool_calls()
|
tc_event = _emit_tool_calls()
|
||||||
if tc_event:
|
if tc_event:
|
||||||
|
|||||||
121
tests/test_llm_core_sse_no_space.py
Normal file
121
tests/test_llm_core_sse_no_space.py
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
"""SSE lines with no space after \'data:\' must still be parsed.
|
||||||
|
|
||||||
|
The SSE spec makes the space after the colon optional ("data:value" is
|
||||||
|
valid), and several gateways / local inference servers emit it that way.
|
||||||
|
stream_llm gated on line.startswith("data: ") (trailing space) in both the
|
||||||
|
OpenAI-compatible and Anthropic branches, so those providers\' ENTIRE
|
||||||
|
stream — content and usage — was silently dropped.
|
||||||
|
"""
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
|
||||||
|
from src import llm_core
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeResp:
|
||||||
|
def __init__(self, lines):
|
||||||
|
self._lines = lines
|
||||||
|
self.status_code = 200
|
||||||
|
|
||||||
|
async def aiter_lines(self):
|
||||||
|
for ln in self._lines:
|
||||||
|
yield ln
|
||||||
|
|
||||||
|
async def aread(self):
|
||||||
|
return b""
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeStreamCtx:
|
||||||
|
def __init__(self, lines):
|
||||||
|
self._lines = lines
|
||||||
|
|
||||||
|
async def __aenter__(self):
|
||||||
|
return _FakeResp(self._lines)
|
||||||
|
|
||||||
|
async def __aexit__(self, *a):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeClient:
|
||||||
|
def __init__(self, lines):
|
||||||
|
self._lines = lines
|
||||||
|
|
||||||
|
def stream(self, method, url, **kw):
|
||||||
|
return _FakeStreamCtx(self._lines)
|
||||||
|
|
||||||
|
|
||||||
|
def _drive(monkeypatch, url, lines, model):
|
||||||
|
monkeypatch.setattr(llm_core, "_get_http_client", lambda: _FakeClient(lines))
|
||||||
|
monkeypatch.setattr(llm_core, "_is_host_dead", lambda u: False)
|
||||||
|
monkeypatch.setattr(llm_core, "note_model_activity", lambda *a, **k: None)
|
||||||
|
monkeypatch.setattr(llm_core, "_clear_host_dead", lambda *a, **k: None)
|
||||||
|
monkeypatch.setattr(llm_core, "_mark_host_dead", lambda *a, **k: False, raising=False)
|
||||||
|
|
||||||
|
async def run():
|
||||||
|
out = []
|
||||||
|
async for chunk in llm_core.stream_llm(
|
||||||
|
url, model, [{"role": "user", "content": "hi"}],
|
||||||
|
headers={"Authorization": "Bearer k"},
|
||||||
|
):
|
||||||
|
out.append(chunk)
|
||||||
|
return "".join(out)
|
||||||
|
|
||||||
|
return asyncio.run(run())
|
||||||
|
|
||||||
|
|
||||||
|
def _deltas(blob):
|
||||||
|
deltas = []
|
||||||
|
for ln in blob.split("\n"):
|
||||||
|
ln = ln.strip()
|
||||||
|
if ln.startswith("data: ") and ln[6:] != "[DONE]":
|
||||||
|
try:
|
||||||
|
j = json.loads(ln[6:])
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
if "delta" in j:
|
||||||
|
deltas.append(j["delta"])
|
||||||
|
return deltas
|
||||||
|
|
||||||
|
|
||||||
|
def test_openai_compat_no_space_data_is_parsed(monkeypatch):
|
||||||
|
lines = [
|
||||||
|
'data:' + json.dumps({"choices": [{"delta": {"content": "Hi"}}]}),
|
||||||
|
'data:' + json.dumps({"choices": [{"delta": {"content": " there"}}]}),
|
||||||
|
'data:[DONE]',
|
||||||
|
]
|
||||||
|
blob = _drive(
|
||||||
|
monkeypatch,
|
||||||
|
"https://generativelanguage.googleapis.com/v1beta/openai/chat/completions",
|
||||||
|
lines,
|
||||||
|
"gpt-4o-test",
|
||||||
|
)
|
||||||
|
assert "".join(_deltas(blob)) == "Hi there"
|
||||||
|
|
||||||
|
|
||||||
|
def test_openai_compat_with_space_still_works(monkeypatch):
|
||||||
|
lines = [
|
||||||
|
'data: ' + json.dumps({"choices": [{"delta": {"content": "Yo"}}]}),
|
||||||
|
'data: [DONE]',
|
||||||
|
]
|
||||||
|
blob = _drive(
|
||||||
|
monkeypatch,
|
||||||
|
"https://generativelanguage.googleapis.com/v1beta/openai/chat/completions",
|
||||||
|
lines,
|
||||||
|
"gpt-4o-test",
|
||||||
|
)
|
||||||
|
assert "".join(_deltas(blob)) == "Yo"
|
||||||
|
|
||||||
|
|
||||||
|
def test_anthropic_no_space_data_is_parsed(monkeypatch):
|
||||||
|
lines = [
|
||||||
|
'data:' + json.dumps({"type": "content_block_delta",
|
||||||
|
"delta": {"type": "text_delta", "text": "Hi"}}),
|
||||||
|
'data:' + json.dumps({"type": "message_stop"}),
|
||||||
|
]
|
||||||
|
blob = _drive(
|
||||||
|
monkeypatch,
|
||||||
|
"https://api.anthropic.com/v1/messages",
|
||||||
|
lines,
|
||||||
|
"claude-test",
|
||||||
|
)
|
||||||
|
assert "Hi" in "".join(_deltas(blob))
|
||||||
Reference in New Issue
Block a user