From 610968f91e1f6072cab6fd69640361cd150f21ac Mon Sep 17 00:00:00 2001 From: David Anderson <215816+akapug@users.noreply.github.com> Date: Mon, 1 Jun 2026 19:27:31 -0700 Subject: [PATCH] =?UTF-8?q?fix:=20data=20integrity=20=E2=80=94=20deep-rese?= =?UTF-8?q?arch=20result=20parsing=20+=20memory-extraction=20durability=20?= =?UTF-8?q?(#808)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two independent data-integrity bugs: - services/research/service.py: ResearchService.research() (the public deep-research API, re-exported from services/__init__) treated the handler return value as a dict (result.get("sources"/"summary"/...)), but call_research_service() returns a formatted markdown STRING -> AttributeError: str has no attribute get on EVERY successful call, making the API unusable for any non-error result. Now uses the string report as the summary and parses sources from the "### Sources" markdown section (section-bounded, URL-deduped), with a defensive dict branch for back-compat. - services/memory/memory_extractor.py: extract_and_store guarded the vector-store find_similar/add calls only with the .healthy flag set ONCE at init. If the embedding/ChromaDB backend degraded LATER (OOM, evicted model, remote endpoint down), those calls raised, the exception escaped the dedup loop, skipped memory_manager.save(), and was swallowed by the outer try/except -> EVERY validated fact from the session was silently lost (the function docstring promises "never raised"). Now falls back to the existing text/fuzzy dedup so facts are still saved when the vector index is unavailable at runtime. Tests: test_research_service.py, test_memory_extractor_vector_degraded.py. --- services/memory/memory_extractor.py | 22 ++- services/research/service.py | 75 +++++++-- .../test_memory_extractor_vector_degraded.py | 113 +++++++++++++ tests/test_research_service.py | 153 ++++++++++++++++++ 4 files changed, 346 insertions(+), 17 deletions(-) create mode 100644 tests/test_memory_extractor_vector_degraded.py create mode 100644 tests/test_research_service.py diff --git a/services/memory/memory_extractor.py b/services/memory/memory_extractor.py index eea652a..0f82ba8 100644 --- a/services/memory/memory_extractor.py +++ b/services/memory/memory_extractor.py @@ -303,9 +303,18 @@ async def extract_and_store( if not fact_text or len(fact_text) < 5: continue - # Dedup: check vector similarity first (fast), then exact text match + # Dedup: check vector similarity first (fast), then exact text match. + # A runtime embedding/ChromaDB failure (backend OOM, model evicted, + # remote endpoint down) must not abort the whole batch — fall through + # to the text/fuzzy dedup below instead of losing every validated + # fact extracted this session. (`.healthy` is only set at init, so + # it does not catch failures that develop later.) if memory_vector and memory_vector.healthy: - existing_id = memory_vector.find_similar(fact_text, threshold=0.72) + try: + existing_id = memory_vector.find_similar(fact_text, threshold=0.72) + except Exception as e: + logger.warning(f"Memory dedup (vector) unavailable, using text fallback: {e}") + existing_id = None if existing_id: logger.debug(f"Memory dedup (vector): '{fact_text[:50]}' matches {existing_id}") continue @@ -330,9 +339,14 @@ async def extract_and_store( existing.append(entry) - # Add to vector index + # Add to vector index. The JSON store (saved below) is the source of + # truth and the keyword path can still retrieve this entry, so a vector + # write failure must not drop the fact or abort the remaining batch. if memory_vector and memory_vector.healthy: - memory_vector.add(entry["id"], fact_text) + try: + memory_vector.add(entry["id"], fact_text) + except Exception as e: + logger.warning(f"Memory vector add failed for {entry['id']}: {e}") added += 1 diff --git a/services/research/service.py b/services/research/service.py index 1004131..bbe6bd6 100644 --- a/services/research/service.py +++ b/services/research/service.py @@ -1,11 +1,16 @@ # services/research/service.py """Research service — deep research with LLM-in-the-loop.""" +import re from dataclasses import dataclass, field from typing import List, Optional, Callable from .research_handler import ResearchHandler +# Markdown source links emitted by ResearchHandler._format_research_report, +# e.g. "- [Some Title](https://example.com/page)". +_SOURCE_LINK_RE = re.compile(r"^\s*-\s*\[(?P[^\]]*)\]\((?P<url>[^)]+)\)\s*$") + @dataclass class ResearchSource: @@ -75,26 +80,70 @@ class ResearchService: duration = time.time() - start - # Parse result into structured format - sources = [ - ResearchSource( - url=s.get("url", ""), - title=s.get("title", ""), - snippet=s.get("snippet", ""), - relevance=s.get("relevance", 0.0), + # call_research_service returns a formatted markdown report string + # (see ResearchHandler.call_research_service -> _format_research_report), + # not a dict. Treat it as such; tolerate an unexpected dict/None defensively. + if isinstance(result, dict): + sources = [ + ResearchSource( + url=s.get("url", ""), + title=s.get("title", ""), + snippet=s.get("snippet", ""), + relevance=s.get("relevance", 0.0), + ) + for s in result.get("sources", []) + ] + return ResearchResult( + query=topic, + summary=result.get("summary", result.get("answer", "")), + sources=sources, + sections=result.get("sections", []), + tokens_used=result.get("tokens_used", 0), + duration_seconds=duration, ) - for s in result.get("sources", []) - ] + report = result if isinstance(result, str) else "" return ResearchResult( query=topic, - summary=result.get("summary", result.get("answer", "")), - sources=sources, - sections=result.get("sections", []), - tokens_used=result.get("tokens_used", 0), + summary=report, + sources=self._parse_sources(report), duration_seconds=duration, ) + @staticmethod + def _parse_sources(report: str) -> List[ResearchSource]: + """Extract sources from the markdown ### Sources section of a report. + + ResearchHandler emits one ``- [title](url)`` link per deduplicated + finding under a ``### Sources`` heading. Parse only that section so + inline links elsewhere in the body are not mistaken for sources. + """ + if not report: + return [] + sources: List[ResearchSource] = [] + seen = set() + in_sources = False + for line in report.splitlines(): + stripped = line.strip() + if stripped.startswith("###") or stripped.startswith("##"): + in_sources = stripped.lower().lstrip("#").strip() == "sources" + continue + if not in_sources: + continue + match = _SOURCE_LINK_RE.match(line) + if not match: + continue + url = match.group("url").strip() + if not url or url in seen: + continue + seen.add(url) + sources.append( + # snippet is required on ResearchSource; markdown source links + # carry no snippet, so default to empty (matches the dict path). + ResearchSource(url=url, title=match.group("title").strip(), snippet="") + ) + return sources + def start_background( self, session_id: str, diff --git a/tests/test_memory_extractor_vector_degraded.py b/tests/test_memory_extractor_vector_degraded.py new file mode 100644 index 0000000..94ea594 --- /dev/null +++ b/tests/test_memory_extractor_vector_degraded.py @@ -0,0 +1,113 @@ +"""Regression: auto memory extraction must survive a runtime vector-store +failure. + +The vector index reports `.healthy` only at init time. If the embedding +backend dies later (OOM, model evicted, remote endpoint down), the per-fact +`find_similar` / `add` calls raise. Before the fix these exceptions escaped the +dedup loop, jumped past `memory_manager.save(...)`, and were swallowed by the +function's outer try/except — so EVERY validated fact from the session was +silently lost (the feature promises "Errors are logged, never raised", but it +also quietly dropped all the data). + +After the fix a degraded vector store falls through to the text/fuzzy dedup +path (which the code already maintains "when vector index is unavailable") and +the facts still land in the JSON store. +""" + +import asyncio +import tempfile + +import src.llm_core +import src.event_bus +from src.memory import MemoryManager +from services.memory.memory_extractor import extract_and_store + + +class _FakeSession: + """Minimal session: two-message history so extraction proceeds.""" + + owner = "alice" + session_id = "sess-1" + + def get_context_messages(self): + return [ + {"role": "user", "content": "Hi, a few things about me."}, + {"role": "assistant", "content": "Noted."}, + ] + + +class _BrokenVectorStore: + """Healthy at init, but every embedding-backed op raises at runtime.""" + + healthy = True + + def find_similar(self, text, threshold=0.72): + raise RuntimeError("embedding backend unavailable") + + def add(self, memory_id, text): + raise RuntimeError("embedding backend unavailable") + + +def _run(coro): + return asyncio.new_event_loop().run_until_complete(coro) + + +def test_extraction_persists_facts_when_vector_store_fails_at_runtime(monkeypatch): + facts_json = ( + '[{"text": "Alice lives in Lisbon", "category": "fact"}, ' + '{"text": "Alice prefers tea over coffee", "category": "preference"}]' + ) + + async def _fake_llm(url, model, messages, **kwargs): + return facts_json + + monkeypatch.setattr(src.llm_core, "llm_call_async", _fake_llm) + # fire_event touches an async event loop / disk — neutralize it. + monkeypatch.setattr(src.event_bus, "fire_event", lambda *a, **k: None) + + with tempfile.TemporaryDirectory() as data_dir: + mgr = MemoryManager(data_dir) + + _run(extract_and_store( + _FakeSession(), + mgr, + _BrokenVectorStore(), + endpoint_url="http://x", + model="m", + headers=None, + )) + + stored = mgr.load(owner="alice") + texts = {e["text"] for e in stored} + + # The bug lost ALL of them (save() was never reached); both must survive. + assert "Alice lives in Lisbon" in texts + assert "Alice prefers tea over coffee" in texts + + +def test_healthy_vector_store_still_dedups_normally(monkeypatch): + """Control: when find_similar reports a match, that fact is skipped — the + try/except added around it must not swallow a legitimate dedup hit.""" + + async def _fake_llm(url, model, messages, **kwargs): + return '[{"text": "Alice lives in Lisbon", "category": "fact"}]' + + monkeypatch.setattr(src.llm_core, "llm_call_async", _fake_llm) + monkeypatch.setattr(src.event_bus, "fire_event", lambda *a, **k: None) + + class _DedupVectorStore: + healthy = True + + def find_similar(self, text, threshold=0.72): + return "existing-id" # claim it already exists + + def add(self, memory_id, text): # pragma: no cover - should not run + raise AssertionError("add should not be called for a deduped fact") + + with tempfile.TemporaryDirectory() as data_dir: + mgr = MemoryManager(data_dir) + _run(extract_and_store( + _FakeSession(), mgr, _DedupVectorStore(), + endpoint_url="http://x", model="m", headers=None, + )) + assert mgr.load(owner="alice") == [] diff --git a/tests/test_research_service.py b/tests/test_research_service.py new file mode 100644 index 0000000..221054a --- /dev/null +++ b/tests/test_research_service.py @@ -0,0 +1,153 @@ +"""Tests for ResearchService — correct handling of the handler's string report. + +ResearchHandler.call_research_service returns a *formatted markdown string*, +not a dict. ResearchService.research() must consume that contract without +raising (the previous code called ``.get()`` on the string and blew up on +every successful research call). +""" + +import asyncio + +import pytest + +from services.research.service import ( + ResearchService, + ResearchResult, + ResearchSource, +) + + +# A faithful slice of what ResearchHandler._format_research_report emits. +SAMPLE_REPORT = """--- + +## Research Summary + +**Duration:** 12.3s | **Rounds:** 3 | **Queries:** 5 | **URLs Analyzed:** 7 + +--- + +# Findings + +Quantum error correction saw major advances in 2024. See [an inline note](https://inline.example/not-a-source) here. + +### Sources + +- [Surface Codes Paper](https://example.com/surface-codes) +- [Lab Announcement](https://example.com/lab) +- [Surface Codes Paper](https://example.com/surface-codes) + +--- + +**The AI has analyzed all research findings above.** +""" + + +def _run(coro): + return asyncio.new_event_loop().run_until_complete(coro) + + +class _StubHandler: + """Stands in for ResearchHandler; returns a string like the real one.""" + + def __init__(self, report): + self._report = report + self.called_with = None + + async def call_research_service(self, topic, llm_endpoint, llm_model, + max_time=300, progress_callback=None): + self.called_with = (topic, llm_endpoint, llm_model, max_time) + return self._report + + +class TestResearchOnStringReport: + def _service(self, report): + svc = ResearchService() + svc.handler = _StubHandler(report) + return svc + + def test_does_not_raise_on_string_report(self): + svc = self._service(SAMPLE_REPORT) + result = _run(svc.research("quantum", "http://llm", "model")) + assert isinstance(result, ResearchResult) + + def test_summary_is_the_report(self): + svc = self._service(SAMPLE_REPORT) + result = _run(svc.research("quantum", "http://llm", "model")) + assert "Quantum error correction" in result.summary + assert result.query == "quantum" + + def test_sources_parsed_and_deduped(self): + svc = self._service(SAMPLE_REPORT) + result = _run(svc.research("quantum", "http://llm", "model")) + urls = [s.url for s in result.sources] + assert urls == [ + "https://example.com/surface-codes", + "https://example.com/lab", + ] + assert all(isinstance(s, ResearchSource) for s in result.sources) + + def test_inline_links_outside_sources_section_ignored(self): + svc = self._service(SAMPLE_REPORT) + result = _run(svc.research("quantum", "http://llm", "model")) + urls = [s.url for s in result.sources] + assert "https://inline.example/not-a-source" not in urls + + def test_duration_recorded(self): + svc = self._service(SAMPLE_REPORT) + result = _run(svc.research("quantum", "http://llm", "model")) + assert result.duration_seconds >= 0.0 + + def test_empty_report_yields_no_sources(self): + svc = self._service("") + result = _run(svc.research("quantum", "http://llm", "model")) + assert result.sources == [] + assert result.summary == "" + + +class TestParseSources: + def test_returns_empty_for_empty_input(self): + assert ResearchService._parse_sources("") == [] + + def test_handles_titleless_link(self): + report = "### Sources\n\n- [](https://example.com/x)\n" + sources = ResearchService._parse_sources(report) + assert len(sources) == 1 + assert sources[0].url == "https://example.com/x" + assert sources[0].title == "" + + def test_section_ends_at_next_heading(self): + report = ( + "### Sources\n\n" + "- [A](https://a.example)\n\n" + "### Notes\n\n" + "- [B](https://b.example)\n" + ) + urls = [s.url for s in ResearchService._parse_sources(report)] + assert urls == ["https://a.example"] + + +class TestDictBackCompat: + """A handler that returns a dict (legacy shape) must still work.""" + + def test_dict_result_still_parsed(self): + svc = ResearchService() + + class _DictHandler: + async def call_research_service(self, *a, **k): + return { + "summary": "done", + "sources": [ + {"url": "https://x.example", "title": "X", + "snippet": "s", "relevance": 0.9}, + ], + "sections": ["intro"], + "tokens_used": 42, + } + + svc.handler = _DictHandler() + result = _run(svc.research("q", "http://llm", "model")) + assert result.summary == "done" + assert result.tokens_used == 42 + assert result.sections == ["intro"] + assert result.sources[0].url == "https://x.example" + assert result.sources[0].relevance == 0.9