fix: data integrity — deep-research result parsing + memory-extraction durability (#808)
Two independent data-integrity bugs:
- services/research/service.py: ResearchService.research() (the public deep-research
API, re-exported from services/__init__) treated the handler return value as a
dict (result.get("sources"/"summary"/...)), but call_research_service() returns a
formatted markdown STRING -> AttributeError: str has no attribute get on EVERY
successful call, making the API unusable for any non-error result. Now uses the
string report as the summary and parses sources from the "### Sources" markdown
section (section-bounded, URL-deduped), with a defensive dict branch for back-compat.
- services/memory/memory_extractor.py: extract_and_store guarded the vector-store
find_similar/add calls only with the .healthy flag set ONCE at init. If the
embedding/ChromaDB backend degraded LATER (OOM, evicted model, remote endpoint
down), those calls raised, the exception escaped the dedup loop, skipped
memory_manager.save(), and was swallowed by the outer try/except -> EVERY
validated fact from the session was silently lost (the function docstring
promises "never raised"). Now falls back to the existing text/fuzzy dedup so
facts are still saved when the vector index is unavailable at runtime.
Tests: test_research_service.py, test_memory_extractor_vector_degraded.py.
This commit is contained in:
@@ -303,9 +303,18 @@ async def extract_and_store(
|
|||||||
if not fact_text or len(fact_text) < 5:
|
if not fact_text or len(fact_text) < 5:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Dedup: check vector similarity first (fast), then exact text match
|
# Dedup: check vector similarity first (fast), then exact text match.
|
||||||
|
# A runtime embedding/ChromaDB failure (backend OOM, model evicted,
|
||||||
|
# remote endpoint down) must not abort the whole batch — fall through
|
||||||
|
# to the text/fuzzy dedup below instead of losing every validated
|
||||||
|
# fact extracted this session. (`.healthy` is only set at init, so
|
||||||
|
# it does not catch failures that develop later.)
|
||||||
if memory_vector and memory_vector.healthy:
|
if memory_vector and memory_vector.healthy:
|
||||||
existing_id = memory_vector.find_similar(fact_text, threshold=0.72)
|
try:
|
||||||
|
existing_id = memory_vector.find_similar(fact_text, threshold=0.72)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Memory dedup (vector) unavailable, using text fallback: {e}")
|
||||||
|
existing_id = None
|
||||||
if existing_id:
|
if existing_id:
|
||||||
logger.debug(f"Memory dedup (vector): '{fact_text[:50]}' matches {existing_id}")
|
logger.debug(f"Memory dedup (vector): '{fact_text[:50]}' matches {existing_id}")
|
||||||
continue
|
continue
|
||||||
@@ -330,9 +339,14 @@ async def extract_and_store(
|
|||||||
|
|
||||||
existing.append(entry)
|
existing.append(entry)
|
||||||
|
|
||||||
# Add to vector index
|
# Add to vector index. The JSON store (saved below) is the source of
|
||||||
|
# truth and the keyword path can still retrieve this entry, so a vector
|
||||||
|
# write failure must not drop the fact or abort the remaining batch.
|
||||||
if memory_vector and memory_vector.healthy:
|
if memory_vector and memory_vector.healthy:
|
||||||
memory_vector.add(entry["id"], fact_text)
|
try:
|
||||||
|
memory_vector.add(entry["id"], fact_text)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Memory vector add failed for {entry['id']}: {e}")
|
||||||
|
|
||||||
added += 1
|
added += 1
|
||||||
|
|
||||||
|
|||||||
@@ -1,11 +1,16 @@
|
|||||||
# services/research/service.py
|
# services/research/service.py
|
||||||
"""Research service — deep research with LLM-in-the-loop."""
|
"""Research service — deep research with LLM-in-the-loop."""
|
||||||
|
|
||||||
|
import re
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from typing import List, Optional, Callable
|
from typing import List, Optional, Callable
|
||||||
|
|
||||||
from .research_handler import ResearchHandler
|
from .research_handler import ResearchHandler
|
||||||
|
|
||||||
|
# Markdown source links emitted by ResearchHandler._format_research_report,
|
||||||
|
# e.g. "- [Some Title](https://example.com/page)".
|
||||||
|
_SOURCE_LINK_RE = re.compile(r"^\s*-\s*\[(?P<title>[^\]]*)\]\((?P<url>[^)]+)\)\s*$")
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ResearchSource:
|
class ResearchSource:
|
||||||
@@ -75,26 +80,70 @@ class ResearchService:
|
|||||||
|
|
||||||
duration = time.time() - start
|
duration = time.time() - start
|
||||||
|
|
||||||
# Parse result into structured format
|
# call_research_service returns a formatted markdown report string
|
||||||
sources = [
|
# (see ResearchHandler.call_research_service -> _format_research_report),
|
||||||
ResearchSource(
|
# not a dict. Treat it as such; tolerate an unexpected dict/None defensively.
|
||||||
url=s.get("url", ""),
|
if isinstance(result, dict):
|
||||||
title=s.get("title", ""),
|
sources = [
|
||||||
snippet=s.get("snippet", ""),
|
ResearchSource(
|
||||||
relevance=s.get("relevance", 0.0),
|
url=s.get("url", ""),
|
||||||
|
title=s.get("title", ""),
|
||||||
|
snippet=s.get("snippet", ""),
|
||||||
|
relevance=s.get("relevance", 0.0),
|
||||||
|
)
|
||||||
|
for s in result.get("sources", [])
|
||||||
|
]
|
||||||
|
return ResearchResult(
|
||||||
|
query=topic,
|
||||||
|
summary=result.get("summary", result.get("answer", "")),
|
||||||
|
sources=sources,
|
||||||
|
sections=result.get("sections", []),
|
||||||
|
tokens_used=result.get("tokens_used", 0),
|
||||||
|
duration_seconds=duration,
|
||||||
)
|
)
|
||||||
for s in result.get("sources", [])
|
|
||||||
]
|
|
||||||
|
|
||||||
|
report = result if isinstance(result, str) else ""
|
||||||
return ResearchResult(
|
return ResearchResult(
|
||||||
query=topic,
|
query=topic,
|
||||||
summary=result.get("summary", result.get("answer", "")),
|
summary=report,
|
||||||
sources=sources,
|
sources=self._parse_sources(report),
|
||||||
sections=result.get("sections", []),
|
|
||||||
tokens_used=result.get("tokens_used", 0),
|
|
||||||
duration_seconds=duration,
|
duration_seconds=duration,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_sources(report: str) -> List[ResearchSource]:
|
||||||
|
"""Extract sources from the markdown ### Sources section of a report.
|
||||||
|
|
||||||
|
ResearchHandler emits one ``- [title](url)`` link per deduplicated
|
||||||
|
finding under a ``### Sources`` heading. Parse only that section so
|
||||||
|
inline links elsewhere in the body are not mistaken for sources.
|
||||||
|
"""
|
||||||
|
if not report:
|
||||||
|
return []
|
||||||
|
sources: List[ResearchSource] = []
|
||||||
|
seen = set()
|
||||||
|
in_sources = False
|
||||||
|
for line in report.splitlines():
|
||||||
|
stripped = line.strip()
|
||||||
|
if stripped.startswith("###") or stripped.startswith("##"):
|
||||||
|
in_sources = stripped.lower().lstrip("#").strip() == "sources"
|
||||||
|
continue
|
||||||
|
if not in_sources:
|
||||||
|
continue
|
||||||
|
match = _SOURCE_LINK_RE.match(line)
|
||||||
|
if not match:
|
||||||
|
continue
|
||||||
|
url = match.group("url").strip()
|
||||||
|
if not url or url in seen:
|
||||||
|
continue
|
||||||
|
seen.add(url)
|
||||||
|
sources.append(
|
||||||
|
# snippet is required on ResearchSource; markdown source links
|
||||||
|
# carry no snippet, so default to empty (matches the dict path).
|
||||||
|
ResearchSource(url=url, title=match.group("title").strip(), snippet="")
|
||||||
|
)
|
||||||
|
return sources
|
||||||
|
|
||||||
def start_background(
|
def start_background(
|
||||||
self,
|
self,
|
||||||
session_id: str,
|
session_id: str,
|
||||||
|
|||||||
113
tests/test_memory_extractor_vector_degraded.py
Normal file
113
tests/test_memory_extractor_vector_degraded.py
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
"""Regression: auto memory extraction must survive a runtime vector-store
|
||||||
|
failure.
|
||||||
|
|
||||||
|
The vector index reports `.healthy` only at init time. If the embedding
|
||||||
|
backend dies later (OOM, model evicted, remote endpoint down), the per-fact
|
||||||
|
`find_similar` / `add` calls raise. Before the fix these exceptions escaped the
|
||||||
|
dedup loop, jumped past `memory_manager.save(...)`, and were swallowed by the
|
||||||
|
function's outer try/except — so EVERY validated fact from the session was
|
||||||
|
silently lost (the feature promises "Errors are logged, never raised", but it
|
||||||
|
also quietly dropped all the data).
|
||||||
|
|
||||||
|
After the fix a degraded vector store falls through to the text/fuzzy dedup
|
||||||
|
path (which the code already maintains "when vector index is unavailable") and
|
||||||
|
the facts still land in the JSON store.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
import src.llm_core
|
||||||
|
import src.event_bus
|
||||||
|
from src.memory import MemoryManager
|
||||||
|
from services.memory.memory_extractor import extract_and_store
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeSession:
|
||||||
|
"""Minimal session: two-message history so extraction proceeds."""
|
||||||
|
|
||||||
|
owner = "alice"
|
||||||
|
session_id = "sess-1"
|
||||||
|
|
||||||
|
def get_context_messages(self):
|
||||||
|
return [
|
||||||
|
{"role": "user", "content": "Hi, a few things about me."},
|
||||||
|
{"role": "assistant", "content": "Noted."},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class _BrokenVectorStore:
|
||||||
|
"""Healthy at init, but every embedding-backed op raises at runtime."""
|
||||||
|
|
||||||
|
healthy = True
|
||||||
|
|
||||||
|
def find_similar(self, text, threshold=0.72):
|
||||||
|
raise RuntimeError("embedding backend unavailable")
|
||||||
|
|
||||||
|
def add(self, memory_id, text):
|
||||||
|
raise RuntimeError("embedding backend unavailable")
|
||||||
|
|
||||||
|
|
||||||
|
def _run(coro):
|
||||||
|
return asyncio.new_event_loop().run_until_complete(coro)
|
||||||
|
|
||||||
|
|
||||||
|
def test_extraction_persists_facts_when_vector_store_fails_at_runtime(monkeypatch):
|
||||||
|
facts_json = (
|
||||||
|
'[{"text": "Alice lives in Lisbon", "category": "fact"}, '
|
||||||
|
'{"text": "Alice prefers tea over coffee", "category": "preference"}]'
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _fake_llm(url, model, messages, **kwargs):
|
||||||
|
return facts_json
|
||||||
|
|
||||||
|
monkeypatch.setattr(src.llm_core, "llm_call_async", _fake_llm)
|
||||||
|
# fire_event touches an async event loop / disk — neutralize it.
|
||||||
|
monkeypatch.setattr(src.event_bus, "fire_event", lambda *a, **k: None)
|
||||||
|
|
||||||
|
with tempfile.TemporaryDirectory() as data_dir:
|
||||||
|
mgr = MemoryManager(data_dir)
|
||||||
|
|
||||||
|
_run(extract_and_store(
|
||||||
|
_FakeSession(),
|
||||||
|
mgr,
|
||||||
|
_BrokenVectorStore(),
|
||||||
|
endpoint_url="http://x",
|
||||||
|
model="m",
|
||||||
|
headers=None,
|
||||||
|
))
|
||||||
|
|
||||||
|
stored = mgr.load(owner="alice")
|
||||||
|
texts = {e["text"] for e in stored}
|
||||||
|
|
||||||
|
# The bug lost ALL of them (save() was never reached); both must survive.
|
||||||
|
assert "Alice lives in Lisbon" in texts
|
||||||
|
assert "Alice prefers tea over coffee" in texts
|
||||||
|
|
||||||
|
|
||||||
|
def test_healthy_vector_store_still_dedups_normally(monkeypatch):
|
||||||
|
"""Control: when find_similar reports a match, that fact is skipped — the
|
||||||
|
try/except added around it must not swallow a legitimate dedup hit."""
|
||||||
|
|
||||||
|
async def _fake_llm(url, model, messages, **kwargs):
|
||||||
|
return '[{"text": "Alice lives in Lisbon", "category": "fact"}]'
|
||||||
|
|
||||||
|
monkeypatch.setattr(src.llm_core, "llm_call_async", _fake_llm)
|
||||||
|
monkeypatch.setattr(src.event_bus, "fire_event", lambda *a, **k: None)
|
||||||
|
|
||||||
|
class _DedupVectorStore:
|
||||||
|
healthy = True
|
||||||
|
|
||||||
|
def find_similar(self, text, threshold=0.72):
|
||||||
|
return "existing-id" # claim it already exists
|
||||||
|
|
||||||
|
def add(self, memory_id, text): # pragma: no cover - should not run
|
||||||
|
raise AssertionError("add should not be called for a deduped fact")
|
||||||
|
|
||||||
|
with tempfile.TemporaryDirectory() as data_dir:
|
||||||
|
mgr = MemoryManager(data_dir)
|
||||||
|
_run(extract_and_store(
|
||||||
|
_FakeSession(), mgr, _DedupVectorStore(),
|
||||||
|
endpoint_url="http://x", model="m", headers=None,
|
||||||
|
))
|
||||||
|
assert mgr.load(owner="alice") == []
|
||||||
153
tests/test_research_service.py
Normal file
153
tests/test_research_service.py
Normal file
@@ -0,0 +1,153 @@
|
|||||||
|
"""Tests for ResearchService — correct handling of the handler's string report.
|
||||||
|
|
||||||
|
ResearchHandler.call_research_service returns a *formatted markdown string*,
|
||||||
|
not a dict. ResearchService.research() must consume that contract without
|
||||||
|
raising (the previous code called ``.get()`` on the string and blew up on
|
||||||
|
every successful research call).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from services.research.service import (
|
||||||
|
ResearchService,
|
||||||
|
ResearchResult,
|
||||||
|
ResearchSource,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# A faithful slice of what ResearchHandler._format_research_report emits.
|
||||||
|
SAMPLE_REPORT = """---
|
||||||
|
|
||||||
|
## Research Summary
|
||||||
|
|
||||||
|
**Duration:** 12.3s | **Rounds:** 3 | **Queries:** 5 | **URLs Analyzed:** 7
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
# Findings
|
||||||
|
|
||||||
|
Quantum error correction saw major advances in 2024. See [an inline note](https://inline.example/not-a-source) here.
|
||||||
|
|
||||||
|
### Sources
|
||||||
|
|
||||||
|
- [Surface Codes Paper](https://example.com/surface-codes)
|
||||||
|
- [Lab Announcement](https://example.com/lab)
|
||||||
|
- [Surface Codes Paper](https://example.com/surface-codes)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
**The AI has analyzed all research findings above.**
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def _run(coro):
|
||||||
|
return asyncio.new_event_loop().run_until_complete(coro)
|
||||||
|
|
||||||
|
|
||||||
|
class _StubHandler:
|
||||||
|
"""Stands in for ResearchHandler; returns a string like the real one."""
|
||||||
|
|
||||||
|
def __init__(self, report):
|
||||||
|
self._report = report
|
||||||
|
self.called_with = None
|
||||||
|
|
||||||
|
async def call_research_service(self, topic, llm_endpoint, llm_model,
|
||||||
|
max_time=300, progress_callback=None):
|
||||||
|
self.called_with = (topic, llm_endpoint, llm_model, max_time)
|
||||||
|
return self._report
|
||||||
|
|
||||||
|
|
||||||
|
class TestResearchOnStringReport:
|
||||||
|
def _service(self, report):
|
||||||
|
svc = ResearchService()
|
||||||
|
svc.handler = _StubHandler(report)
|
||||||
|
return svc
|
||||||
|
|
||||||
|
def test_does_not_raise_on_string_report(self):
|
||||||
|
svc = self._service(SAMPLE_REPORT)
|
||||||
|
result = _run(svc.research("quantum", "http://llm", "model"))
|
||||||
|
assert isinstance(result, ResearchResult)
|
||||||
|
|
||||||
|
def test_summary_is_the_report(self):
|
||||||
|
svc = self._service(SAMPLE_REPORT)
|
||||||
|
result = _run(svc.research("quantum", "http://llm", "model"))
|
||||||
|
assert "Quantum error correction" in result.summary
|
||||||
|
assert result.query == "quantum"
|
||||||
|
|
||||||
|
def test_sources_parsed_and_deduped(self):
|
||||||
|
svc = self._service(SAMPLE_REPORT)
|
||||||
|
result = _run(svc.research("quantum", "http://llm", "model"))
|
||||||
|
urls = [s.url for s in result.sources]
|
||||||
|
assert urls == [
|
||||||
|
"https://example.com/surface-codes",
|
||||||
|
"https://example.com/lab",
|
||||||
|
]
|
||||||
|
assert all(isinstance(s, ResearchSource) for s in result.sources)
|
||||||
|
|
||||||
|
def test_inline_links_outside_sources_section_ignored(self):
|
||||||
|
svc = self._service(SAMPLE_REPORT)
|
||||||
|
result = _run(svc.research("quantum", "http://llm", "model"))
|
||||||
|
urls = [s.url for s in result.sources]
|
||||||
|
assert "https://inline.example/not-a-source" not in urls
|
||||||
|
|
||||||
|
def test_duration_recorded(self):
|
||||||
|
svc = self._service(SAMPLE_REPORT)
|
||||||
|
result = _run(svc.research("quantum", "http://llm", "model"))
|
||||||
|
assert result.duration_seconds >= 0.0
|
||||||
|
|
||||||
|
def test_empty_report_yields_no_sources(self):
|
||||||
|
svc = self._service("")
|
||||||
|
result = _run(svc.research("quantum", "http://llm", "model"))
|
||||||
|
assert result.sources == []
|
||||||
|
assert result.summary == ""
|
||||||
|
|
||||||
|
|
||||||
|
class TestParseSources:
|
||||||
|
def test_returns_empty_for_empty_input(self):
|
||||||
|
assert ResearchService._parse_sources("") == []
|
||||||
|
|
||||||
|
def test_handles_titleless_link(self):
|
||||||
|
report = "### Sources\n\n- [](https://example.com/x)\n"
|
||||||
|
sources = ResearchService._parse_sources(report)
|
||||||
|
assert len(sources) == 1
|
||||||
|
assert sources[0].url == "https://example.com/x"
|
||||||
|
assert sources[0].title == ""
|
||||||
|
|
||||||
|
def test_section_ends_at_next_heading(self):
|
||||||
|
report = (
|
||||||
|
"### Sources\n\n"
|
||||||
|
"- [A](https://a.example)\n\n"
|
||||||
|
"### Notes\n\n"
|
||||||
|
"- [B](https://b.example)\n"
|
||||||
|
)
|
||||||
|
urls = [s.url for s in ResearchService._parse_sources(report)]
|
||||||
|
assert urls == ["https://a.example"]
|
||||||
|
|
||||||
|
|
||||||
|
class TestDictBackCompat:
|
||||||
|
"""A handler that returns a dict (legacy shape) must still work."""
|
||||||
|
|
||||||
|
def test_dict_result_still_parsed(self):
|
||||||
|
svc = ResearchService()
|
||||||
|
|
||||||
|
class _DictHandler:
|
||||||
|
async def call_research_service(self, *a, **k):
|
||||||
|
return {
|
||||||
|
"summary": "done",
|
||||||
|
"sources": [
|
||||||
|
{"url": "https://x.example", "title": "X",
|
||||||
|
"snippet": "s", "relevance": 0.9},
|
||||||
|
],
|
||||||
|
"sections": ["intro"],
|
||||||
|
"tokens_used": 42,
|
||||||
|
}
|
||||||
|
|
||||||
|
svc.handler = _DictHandler()
|
||||||
|
result = _run(svc.research("q", "http://llm", "model"))
|
||||||
|
assert result.summary == "done"
|
||||||
|
assert result.tokens_used == 42
|
||||||
|
assert result.sections == ["intro"]
|
||||||
|
assert result.sources[0].url == "https://x.example"
|
||||||
|
assert result.sources[0].relevance == 0.9
|
||||||
Reference in New Issue
Block a user