Handle incomplete detached agent streams
This commit is contained in:
@@ -15,6 +15,7 @@ Durability scope: in-memory, survives as long as the server process runs (tab
|
||||
close / navigation / refresh). It does NOT survive a server restart.
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import AsyncGenerator, Dict, Optional
|
||||
|
||||
@@ -41,6 +42,17 @@ _RUNS: Dict[str, _Run] = {}
|
||||
_EVICT_GRACE_S = 180
|
||||
|
||||
|
||||
def _publish(run: _Run, ev: str) -> None:
|
||||
"""Append one SSE event and fan it out to every live subscriber."""
|
||||
run.buffer.append(ev)
|
||||
seq = len(run.buffer) - 1
|
||||
for q in list(run.subscribers):
|
||||
try:
|
||||
q.put_nowait((seq, ev))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _schedule_evict(session_id: str) -> None:
|
||||
"""(Re)arm a grace-period eviction for a terminal run with no subscribers.
|
||||
Identity-checked so a run that gets replaced/reused is never evicted by a
|
||||
@@ -93,13 +105,7 @@ async def _drain(session_id: str, agen: AsyncGenerator[str, None],
|
||||
pass
|
||||
try:
|
||||
async for ev in agen:
|
||||
run.buffer.append(ev)
|
||||
seq = len(run.buffer) - 1
|
||||
for q in list(run.subscribers):
|
||||
try:
|
||||
q.put_nowait((seq, ev))
|
||||
except Exception:
|
||||
pass
|
||||
_publish(run, ev)
|
||||
if run.status == "running":
|
||||
run.status = "done"
|
||||
except asyncio.CancelledError:
|
||||
@@ -113,6 +119,12 @@ async def _drain(session_id: str, agen: AsyncGenerator[str, None],
|
||||
except Exception as e:
|
||||
logger.error("[agent-run] %s failed: %s", session_id, e, exc_info=True)
|
||||
run.status = "error"
|
||||
_publish(
|
||||
run,
|
||||
"event: error\n"
|
||||
f"data: {json.dumps({'error': 'Agent run failed before completion.', 'status': 500})}\n\n",
|
||||
)
|
||||
_publish(run, "data: [DONE]\n\n")
|
||||
finally:
|
||||
# Wake every subscriber with the end sentinel so their SSE closes.
|
||||
for q in list(run.subscribers):
|
||||
|
||||
@@ -1213,6 +1213,7 @@ import createResearchSynapse from './researchSynapse.js';
|
||||
}
|
||||
|
||||
let _nextIsError = false;
|
||||
let _streamSawDone = false;
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
@@ -1255,6 +1256,7 @@ import createResearchSynapse from './researchSynapse.js';
|
||||
}
|
||||
|
||||
if (data === '[DONE]') {
|
||||
_streamSawDone = true;
|
||||
// Always update background map if entry exists (even if user switched back)
|
||||
var bgDone = _backgroundStreams.get(streamSessionId);
|
||||
if (bgDone) {
|
||||
@@ -2220,6 +2222,10 @@ import createResearchSynapse from './researchSynapse.js';
|
||||
}
|
||||
}
|
||||
|
||||
if (!_streamSawDone) {
|
||||
throw new Error('Stream closed before completion');
|
||||
}
|
||||
|
||||
_renderStream();
|
||||
_cancelThinkingTimer();
|
||||
_removeThinkingSpinner();
|
||||
|
||||
Reference in New Issue
Block a user