* fix: live-resume chat stream on session re-entry (#2539) When a session was re-entered after a page refresh or in a new tab while its agent run was still streaming, the UI showed a frozen "Generating response..." spinner, polled stream_status until the run finished, and then did a full reload. The live tokens were never shown. Add resumeStream() in chat.js: it consumes GET /api/chat/resume/{id} (which replays the run's buffer then streams live), renders reply tokens as they arrive, and reloads the session on completion for the canonical final render. sessions.js _checkServerStream now calls it on re-entry and falls back to the previous spinner+poll path if it is unavailable. * Finalize plain-text resume in place instead of reloading On stream completion, resumeStream() called selectSession(), forcing a full history re-fetch and a visible flicker right as the stream finished. For plain text replies (no tool calls, sources, doc streaming, or multi-round output) the live tokens are already rendered, so finalize in place: replace the live bubble with a canonical single message via chatRenderer.addMessage (markdown + footer actions + metrics, the same renderer history uses), captured from the streamed metrics event. No history refetch, no extra round-trip, no flicker. Rich responses still reload, since their canonical render (tool bubbles, sources, multi-bubble) is rebuilt from the saved DB record. * Use a dedicated set for the resume re-attach lock; fix stale docblock resumeStream() marked its re-attach lock in _backgroundStreams, which checkBackgroundStream() also reads. On a second re-entry of the same session while a resume was still live, checkBackgroundStream() mistook that entry for a same-tab POST stream and spawned its own spinner+poll bubble. Move the lock to a dedicated _resumingStreams set (also covered by hasActiveStream) so the two paths no longer collide. Also update the resumeStream docblock to describe the in-place finalize vs reload split.
This commit is contained in:
committed by
GitHub
parent
c916224510
commit
66fba78011
@@ -82,13 +82,15 @@ import createResearchSynapse from './researchSynapse.js';
|
||||
|
||||
// Background streaming support
|
||||
const _backgroundStreams = new Map(); // sessionId -> { status, accumulated, sourcesHtml, abortCtrl, query, metrics }
|
||||
const _resumingStreams = new Set(); // sessionId -> a resumeStream() reader is live (re-attach lock)
|
||||
let _streamSessionId = null; // Session ID for the currently active reader loop
|
||||
let _lastReaderActivity = 0; // Timestamp of last reader.read() success — used to detect frozen streams
|
||||
let _webLockRelease = null; // Function to release the Web Lock held during streaming
|
||||
|
||||
/** Check if an SSE reader is still actively connected for a session. */
|
||||
function hasActiveStream(sessionId) {
|
||||
return _streamSessionId === sessionId || _backgroundStreams.has(sessionId);
|
||||
return _streamSessionId === sessionId || _backgroundStreams.has(sessionId) ||
|
||||
_resumingStreams.has(sessionId);
|
||||
}
|
||||
|
||||
// Sources box builder and toggleSources are now in chatRenderer.js
|
||||
@@ -3045,6 +3047,152 @@ import createResearchSynapse from './researchSynapse.js';
|
||||
var _notifyStreamComplete = chatStream.notifyStreamComplete;
|
||||
var _insertStreamDoneToast = chatStream.insertStreamDoneToast;
|
||||
|
||||
/**
|
||||
* Live-resume a chat run still streaming detached on the server (#2539).
|
||||
*
|
||||
* On session re-entry, GET /api/chat/resume/{id} replays the run's buffer then
|
||||
* streams live; reply tokens render as they arrive. On completion a plain text
|
||||
* reply is finalized in place (canonical bubble via chatRenderer.addMessage, no
|
||||
* reload); a "rich" reply (tool calls, sources, doc streaming, multi-round) is
|
||||
* reloaded from the DB so its full render stays faithful. Returns true if it
|
||||
* attached, false to let the caller fall back to spinner+poll.
|
||||
*/
|
||||
export async function resumeStream(sessionId) {
|
||||
if (!sessionId) return false;
|
||||
if (hasActiveStream(sessionId)) return false;
|
||||
|
||||
let res;
|
||||
try {
|
||||
res = await fetch(`${API_BASE}/api/chat/resume/${sessionId}`);
|
||||
} catch (e) {
|
||||
return false;
|
||||
}
|
||||
if (!res.ok || !res.body) return false;
|
||||
|
||||
const box = document.getElementById('chat-history');
|
||||
if (!box) return false;
|
||||
|
||||
// Block duplicate re-attach attempts while this reader is live. A dedicated
|
||||
// set (not _backgroundStreams) so checkBackgroundStream doesn't mistake this
|
||||
// for a same-tab POST stream and spawn its own spinner+poll on re-entry.
|
||||
_resumingStreams.add(sessionId);
|
||||
|
||||
const holder = document.createElement('div');
|
||||
holder.className = 'msg msg-ai';
|
||||
const meta = sessionModule.getSessions().find(s => s.id === sessionId);
|
||||
const roleLabel = _shortModel(meta && meta.model);
|
||||
const roleTs = new Date().toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' });
|
||||
holder.innerHTML = '<div class="role">' + uiModule.esc(roleLabel) +
|
||||
' <span class="role-timestamp">' + roleTs + '</span></div>' +
|
||||
'<div class="body"><div class="stream-content"></div></div>';
|
||||
_applyModelColor(holder.querySelector('.role'), meta && meta.model);
|
||||
const contentDiv = holder.querySelector('.stream-content');
|
||||
box.appendChild(holder);
|
||||
|
||||
const spinner = spinnerModule.create('Generating response...', 'right');
|
||||
holder.querySelector('.body').appendChild(spinner.createElement());
|
||||
spinner.start();
|
||||
uiModule.scrollHistory();
|
||||
|
||||
const reader = res.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = '';
|
||||
let roundText = '';
|
||||
let gotDelta = false;
|
||||
let leftSession = false;
|
||||
let metricsData = null;
|
||||
// "Rich" responses (tool calls, sources, doc streaming, multi-round) need the
|
||||
// full canonical render, which is rebuilt from the saved DB record on reload.
|
||||
// Plain text replies can be finalized in place without a reload.
|
||||
let rich = false;
|
||||
|
||||
const cleanup = () => {
|
||||
try { spinner.destroy(); } catch (_) {}
|
||||
_resumingStreams.delete(sessionId);
|
||||
};
|
||||
|
||||
const renderDelta = () => {
|
||||
const dt = stripToolBlocks(roundText);
|
||||
contentDiv.innerHTML = markdownModule.mdToHtml(markdownModule.squashOutsideCode(dt));
|
||||
uiModule.scrollHistory();
|
||||
};
|
||||
|
||||
try {
|
||||
readLoop:
|
||||
while (true) {
|
||||
// User left this session: stop rendering, the run continues server-side.
|
||||
if (sessionModule.getCurrentSessionId &&
|
||||
sessionModule.getCurrentSessionId() !== sessionId) {
|
||||
leftSession = true;
|
||||
try { await reader.cancel(); } catch (_) {}
|
||||
break;
|
||||
}
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
const parts = buffer.split('\n\n');
|
||||
buffer = parts.pop();
|
||||
for (const part of parts) {
|
||||
const line = part.split('\n').find(l => l.startsWith('data: '));
|
||||
if (!line) continue;
|
||||
const payload = line.slice(6);
|
||||
if (payload === '[DONE]') {
|
||||
try { await reader.cancel(); } catch (_) {}
|
||||
break readLoop;
|
||||
}
|
||||
let json;
|
||||
try { json = JSON.parse(payload); } catch (_) { continue; }
|
||||
if (json.delta) {
|
||||
roundText += json.delta;
|
||||
if (!gotDelta) { gotDelta = true; try { spinner.destroy(); } catch (_) {} }
|
||||
renderDelta();
|
||||
} else if (json.type === 'doc_stream_open') {
|
||||
rich = true;
|
||||
if (documentModule) documentModule.streamDocOpen(json.title || '', json.lang || '');
|
||||
} else if (json.type === 'doc_stream_delta') {
|
||||
rich = true;
|
||||
if (documentModule && json.delta) documentModule.streamDocDelta(json.delta);
|
||||
} else if (json.type === 'metrics') {
|
||||
metricsData = json.data || metricsData;
|
||||
} else if (json.type === 'tool_start' || json.type === 'tool_output' ||
|
||||
json.type === 'tool_progress' || json.type === 'agent_step' ||
|
||||
json.type === 'web_sources' || json.type === 'rag_sources' ||
|
||||
json.type === 'research_progress' || json.type === 'research_sources' ||
|
||||
json.type === 'research_findings' || json.type === 'research_done') {
|
||||
rich = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
// Network drop or parse failure: fall through to the reload below.
|
||||
}
|
||||
|
||||
cleanup();
|
||||
if (leftSession) { if (holder.parentNode) holder.remove(); return true; }
|
||||
|
||||
const onThisSession = sessionModule.getCurrentSessionId &&
|
||||
sessionModule.getCurrentSessionId() === sessionId;
|
||||
|
||||
// Plain text reply: finalize in place. Replace the live bubble with a
|
||||
// canonical single message (markdown + footer actions + metrics) using the
|
||||
// same renderer history does. No history refetch, no end-of-stream flicker.
|
||||
if (onThisSession && !rich && roundText.trim()) {
|
||||
if (holder.parentNode) holder.remove();
|
||||
const model = meta && meta.model;
|
||||
const meta_ = metricsData ? Object.assign({ model }, metricsData) : { model };
|
||||
chatRenderer.addMessage('assistant', roundText, model, meta_);
|
||||
uiModule.scrollHistory();
|
||||
return true;
|
||||
}
|
||||
|
||||
// Rich response (tools, sources, docs, multi-round) or user moved on:
|
||||
// reload from the DB for the full canonical render.
|
||||
if (holder.parentNode) holder.remove();
|
||||
if (onThisSession) sessionModule.selectSession(sessionId);
|
||||
else sessionModule.loadSessions();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check for background streams when switching to a session.
|
||||
* Called after history loads on session switch.
|
||||
@@ -4528,6 +4676,7 @@ import createResearchSynapse from './researchSynapse.js';
|
||||
abortCurrentRequest,
|
||||
detachCurrentStream,
|
||||
checkBackgroundStream,
|
||||
resumeStream,
|
||||
hideWelcomeScreen: chatRenderer.hideWelcomeScreen,
|
||||
showWelcomeScreen: chatRenderer.showWelcomeScreen,
|
||||
checkPendingResearch,
|
||||
|
||||
@@ -2157,7 +2157,14 @@ async function _checkServerStream(sessionId) {
|
||||
// Skip if this is a research stream — research has its own progress UI
|
||||
if (info.mode === 'research' || info.is_research) return;
|
||||
|
||||
// Server is still streaming — show spinner and poll
|
||||
// Live-resume the detached run: replay its buffer then stream live tokens
|
||||
// (#2539). Falls back to the spinner+poll path below if unavailable.
|
||||
if (window.chatModule && window.chatModule.resumeStream) {
|
||||
const attached = await window.chatModule.resumeStream(sessionId);
|
||||
if (attached) return;
|
||||
}
|
||||
|
||||
// Fallback: server is still streaming, show spinner and poll.
|
||||
const box = document.getElementById('chat-history');
|
||||
if (!box) return;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user