diff --git a/static/js/chat.js b/static/js/chat.js index dd47188..ee347b9 100644 --- a/static/js/chat.js +++ b/static/js/chat.js @@ -82,13 +82,15 @@ import createResearchSynapse from './researchSynapse.js'; // Background streaming support const _backgroundStreams = new Map(); // sessionId -> { status, accumulated, sourcesHtml, abortCtrl, query, metrics } + const _resumingStreams = new Set(); // sessionId -> a resumeStream() reader is live (re-attach lock) let _streamSessionId = null; // Session ID for the currently active reader loop let _lastReaderActivity = 0; // Timestamp of last reader.read() success — used to detect frozen streams let _webLockRelease = null; // Function to release the Web Lock held during streaming /** Check if an SSE reader is still actively connected for a session. */ function hasActiveStream(sessionId) { - return _streamSessionId === sessionId || _backgroundStreams.has(sessionId); + return _streamSessionId === sessionId || _backgroundStreams.has(sessionId) || + _resumingStreams.has(sessionId); } // Sources box builder and toggleSources are now in chatRenderer.js @@ -3045,6 +3047,152 @@ import createResearchSynapse from './researchSynapse.js'; var _notifyStreamComplete = chatStream.notifyStreamComplete; var _insertStreamDoneToast = chatStream.insertStreamDoneToast; + /** + * Live-resume a chat run still streaming detached on the server (#2539). + * + * On session re-entry, GET /api/chat/resume/{id} replays the run's buffer then + * streams live; reply tokens render as they arrive. On completion a plain text + * reply is finalized in place (canonical bubble via chatRenderer.addMessage, no + * reload); a "rich" reply (tool calls, sources, doc streaming, multi-round) is + * reloaded from the DB so its full render stays faithful. Returns true if it + * attached, false to let the caller fall back to spinner+poll. + */ + export async function resumeStream(sessionId) { + if (!sessionId) return false; + if (hasActiveStream(sessionId)) return false; + + let res; + try { + res = await fetch(`${API_BASE}/api/chat/resume/${sessionId}`); + } catch (e) { + return false; + } + if (!res.ok || !res.body) return false; + + const box = document.getElementById('chat-history'); + if (!box) return false; + + // Block duplicate re-attach attempts while this reader is live. A dedicated + // set (not _backgroundStreams) so checkBackgroundStream doesn't mistake this + // for a same-tab POST stream and spawn its own spinner+poll on re-entry. + _resumingStreams.add(sessionId); + + const holder = document.createElement('div'); + holder.className = 'msg msg-ai'; + const meta = sessionModule.getSessions().find(s => s.id === sessionId); + const roleLabel = _shortModel(meta && meta.model); + const roleTs = new Date().toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' }); + holder.innerHTML = '
' + uiModule.esc(roleLabel) + + ' ' + roleTs + '
' + + '
'; + _applyModelColor(holder.querySelector('.role'), meta && meta.model); + const contentDiv = holder.querySelector('.stream-content'); + box.appendChild(holder); + + const spinner = spinnerModule.create('Generating response...', 'right'); + holder.querySelector('.body').appendChild(spinner.createElement()); + spinner.start(); + uiModule.scrollHistory(); + + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + let roundText = ''; + let gotDelta = false; + let leftSession = false; + let metricsData = null; + // "Rich" responses (tool calls, sources, doc streaming, multi-round) need the + // full canonical render, which is rebuilt from the saved DB record on reload. + // Plain text replies can be finalized in place without a reload. + let rich = false; + + const cleanup = () => { + try { spinner.destroy(); } catch (_) {} + _resumingStreams.delete(sessionId); + }; + + const renderDelta = () => { + const dt = stripToolBlocks(roundText); + contentDiv.innerHTML = markdownModule.mdToHtml(markdownModule.squashOutsideCode(dt)); + uiModule.scrollHistory(); + }; + + try { + readLoop: + while (true) { + // User left this session: stop rendering, the run continues server-side. + if (sessionModule.getCurrentSessionId && + sessionModule.getCurrentSessionId() !== sessionId) { + leftSession = true; + try { await reader.cancel(); } catch (_) {} + break; + } + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + const parts = buffer.split('\n\n'); + buffer = parts.pop(); + for (const part of parts) { + const line = part.split('\n').find(l => l.startsWith('data: ')); + if (!line) continue; + const payload = line.slice(6); + if (payload === '[DONE]') { + try { await reader.cancel(); } catch (_) {} + break readLoop; + } + let json; + try { json = JSON.parse(payload); } catch (_) { continue; } + if (json.delta) { + roundText += json.delta; + if (!gotDelta) { gotDelta = true; try { spinner.destroy(); } catch (_) {} } + renderDelta(); + } else if (json.type === 'doc_stream_open') { + rich = true; + if (documentModule) documentModule.streamDocOpen(json.title || '', json.lang || ''); + } else if (json.type === 'doc_stream_delta') { + rich = true; + if (documentModule && json.delta) documentModule.streamDocDelta(json.delta); + } else if (json.type === 'metrics') { + metricsData = json.data || metricsData; + } else if (json.type === 'tool_start' || json.type === 'tool_output' || + json.type === 'tool_progress' || json.type === 'agent_step' || + json.type === 'web_sources' || json.type === 'rag_sources' || + json.type === 'research_progress' || json.type === 'research_sources' || + json.type === 'research_findings' || json.type === 'research_done') { + rich = true; + } + } + } + } catch (e) { + // Network drop or parse failure: fall through to the reload below. + } + + cleanup(); + if (leftSession) { if (holder.parentNode) holder.remove(); return true; } + + const onThisSession = sessionModule.getCurrentSessionId && + sessionModule.getCurrentSessionId() === sessionId; + + // Plain text reply: finalize in place. Replace the live bubble with a + // canonical single message (markdown + footer actions + metrics) using the + // same renderer history does. No history refetch, no end-of-stream flicker. + if (onThisSession && !rich && roundText.trim()) { + if (holder.parentNode) holder.remove(); + const model = meta && meta.model; + const meta_ = metricsData ? Object.assign({ model }, metricsData) : { model }; + chatRenderer.addMessage('assistant', roundText, model, meta_); + uiModule.scrollHistory(); + return true; + } + + // Rich response (tools, sources, docs, multi-round) or user moved on: + // reload from the DB for the full canonical render. + if (holder.parentNode) holder.remove(); + if (onThisSession) sessionModule.selectSession(sessionId); + else sessionModule.loadSessions(); + return true; + } + /** * Check for background streams when switching to a session. * Called after history loads on session switch. @@ -4528,6 +4676,7 @@ import createResearchSynapse from './researchSynapse.js'; abortCurrentRequest, detachCurrentStream, checkBackgroundStream, + resumeStream, hideWelcomeScreen: chatRenderer.hideWelcomeScreen, showWelcomeScreen: chatRenderer.showWelcomeScreen, checkPendingResearch, diff --git a/static/js/sessions.js b/static/js/sessions.js index 26fa46a..dab25a1 100644 --- a/static/js/sessions.js +++ b/static/js/sessions.js @@ -2157,7 +2157,14 @@ async function _checkServerStream(sessionId) { // Skip if this is a research stream — research has its own progress UI if (info.mode === 'research' || info.is_research) return; - // Server is still streaming — show spinner and poll + // Live-resume the detached run: replay its buffer then stream live tokens + // (#2539). Falls back to the spinner+poll path below if unavailable. + if (window.chatModule && window.chatModule.resumeStream) { + const attached = await window.chatModule.resumeStream(sessionId); + if (attached) return; + } + + // Fallback: server is still streaming, show spinner and poll. const box = document.getElementById('chat-history'); if (!box) return;