diff --git a/src/agent_runs.py b/src/agent_runs.py index 7fc661d..8adbab9 100644 --- a/src/agent_runs.py +++ b/src/agent_runs.py @@ -15,6 +15,7 @@ Durability scope: in-memory, survives as long as the server process runs (tab close / navigation / refresh). It does NOT survive a server restart. """ import asyncio +import json import logging from typing import AsyncGenerator, Dict, Optional @@ -41,6 +42,17 @@ _RUNS: Dict[str, _Run] = {} _EVICT_GRACE_S = 180 +def _publish(run: _Run, ev: str) -> None: + """Append one SSE event and fan it out to every live subscriber.""" + run.buffer.append(ev) + seq = len(run.buffer) - 1 + for q in list(run.subscribers): + try: + q.put_nowait((seq, ev)) + except Exception: + pass + + def _schedule_evict(session_id: str) -> None: """(Re)arm a grace-period eviction for a terminal run with no subscribers. Identity-checked so a run that gets replaced/reused is never evicted by a @@ -93,13 +105,7 @@ async def _drain(session_id: str, agen: AsyncGenerator[str, None], pass try: async for ev in agen: - run.buffer.append(ev) - seq = len(run.buffer) - 1 - for q in list(run.subscribers): - try: - q.put_nowait((seq, ev)) - except Exception: - pass + _publish(run, ev) if run.status == "running": run.status = "done" except asyncio.CancelledError: @@ -113,6 +119,12 @@ async def _drain(session_id: str, agen: AsyncGenerator[str, None], except Exception as e: logger.error("[agent-run] %s failed: %s", session_id, e, exc_info=True) run.status = "error" + _publish( + run, + "event: error\n" + f"data: {json.dumps({'error': 'Agent run failed before completion.', 'status': 500})}\n\n", + ) + _publish(run, "data: [DONE]\n\n") finally: # Wake every subscriber with the end sentinel so their SSE closes. for q in list(run.subscribers): diff --git a/static/js/chat.js b/static/js/chat.js index 8c10bcf..118399c 100644 --- a/static/js/chat.js +++ b/static/js/chat.js @@ -1213,6 +1213,7 @@ import createResearchSynapse from './researchSynapse.js'; } let _nextIsError = false; + let _streamSawDone = false; while (true) { const { done, value } = await reader.read(); @@ -1255,6 +1256,7 @@ import createResearchSynapse from './researchSynapse.js'; } if (data === '[DONE]') { + _streamSawDone = true; // Always update background map if entry exists (even if user switched back) var bgDone = _backgroundStreams.get(streamSessionId); if (bgDone) { @@ -2220,6 +2222,10 @@ import createResearchSynapse from './researchSynapse.js'; } } + if (!_streamSawDone) { + throw new Error('Stream closed before completion'); + } + _renderStream(); _cancelThinkingTimer(); _removeThinkingSpinner();