From b998c52dd071f55a690da246044991b38d9974aa Mon Sep 17 00:00:00 2001 From: pewdiepie-archdaemon Date: Mon, 1 Jun 2026 14:55:24 +0900 Subject: [PATCH] Add Deep Research extraction controls --- routes/research_routes.py | 4 + src/deep_research.py | 21 +++-- src/research_handler.py | 28 ++++++ src/settings.py | 2 + src/task_scheduler.py | 4 + static/index.html | 8 ++ static/js/settings.js | 16 ++++ .../test_deep_research_extraction_controls.py | 88 +++++++++++++++++++ 8 files changed, 165 insertions(+), 6 deletions(-) create mode 100644 tests/test_deep_research_extraction_controls.py diff --git a/routes/research_routes.py b/routes/research_routes.py index 7a37d9e..233cc82 100644 --- a/routes/research_routes.py +++ b/routes/research_routes.py @@ -299,6 +299,8 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter: endpoint_id: Optional[str] = None model: Optional[str] = None max_time: int = Field(default=300, ge=60, le=1800) + extraction_timeout: Optional[int] = Field(default=None, ge=15, le=600) + extraction_concurrency: Optional[int] = Field(default=None, ge=1, le=12) category: Optional[str] = None @router.post("/api/research/start") @@ -401,6 +403,8 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter: max_rounds=effective_max_rounds, search_provider=body.search_provider or None, category=body.category or None, + extraction_timeout=body.extraction_timeout, + extraction_concurrency=body.extraction_concurrency, owner=user, ) return {"session_id": session_id, "status": "running", "query": body.query} diff --git a/src/deep_research.py b/src/deep_research.py index 9228088..2de0c22 100644 --- a/src/deep_research.py +++ b/src/deep_research.py @@ -180,6 +180,8 @@ class DeepResearcher: max_urls_per_round: int = 3, max_content_chars: int = 15000, max_report_tokens: int = 8192, + extraction_timeout: int = 90, + extraction_concurrency: int = 3, min_rounds: int = 2, max_empty_rounds: int = 2, synthesis_window: int = 10, @@ -197,6 +199,8 @@ class DeepResearcher: self.max_urls_per_round = max_urls_per_round self.max_content_chars = max_content_chars self.max_report_tokens = max_report_tokens + self.extraction_timeout = min(600, max(15, int(extraction_timeout or 90))) + self.extraction_concurrency = min(12, max(1, int(extraction_concurrency or 3))) self.min_rounds = min_rounds self.max_empty_rounds = max_empty_rounds self.synthesis_window = synthesis_window @@ -492,11 +496,16 @@ class DeepResearcher: if self._cancelled or self._time_exceeded(): return all_findings - # Fetch and extract all URLs concurrently - extract_tasks = [ - self._fetch_and_extract(r["url"], question, r.get("title", "")) - for r in urls_to_fetch - ] + # Fetch and extract URLs with backpressure. Local model servers often + # serialize requests behind one GPU; flooding them makes every request + # slower and can trip the extraction timeout. + semaphore = asyncio.Semaphore(self.extraction_concurrency) + + async def _bounded_extract(result: Dict) -> Optional[Dict]: + async with semaphore: + return await self._fetch_and_extract(result["url"], question, result.get("title", "")) + + extract_tasks = [_bounded_extract(r) for r in urls_to_fetch] results_gathered = await asyncio.gather(*extract_tasks, return_exceptions=True) for result in results_gathered: @@ -576,7 +585,7 @@ class DeepResearcher: [{"role": "user", "content": prompt}], temperature=0.2, max_tokens=2048, - timeout=45, + timeout=self.extraction_timeout, ) parsed = self._parse_json_object(response) if parsed: diff --git a/src/research_handler.py b/src/research_handler.py index efa4af7..1a69e08 100644 --- a/src/research_handler.py +++ b/src/research_handler.py @@ -22,6 +22,14 @@ logger = logging.getLogger(__name__) RESEARCH_DATA_DIR = Path("data/deep_research") +def _bounded_int(value, *, default: int, minimum: int, maximum: int) -> int: + try: + n = int(value) + except (TypeError, ValueError): + return default + return max(minimum, min(maximum, n)) + + class ResearchHandler: """Handles research service operations with iterative deep research.""" @@ -165,6 +173,8 @@ class ResearchHandler: max_rounds: int = 20, search_provider: str = None, category: str = None, + extraction_timeout: int = None, + extraction_concurrency: int = None, owner: str = "", ) -> dict: """Start research as a background task. Returns task info dict. @@ -222,6 +232,8 @@ class ResearchHandler: max_rounds=max_rounds, search_provider=search_provider, category=category, + extraction_timeout=extraction_timeout, + extraction_concurrency=extraction_concurrency, ), timeout=hard_timeout, ) @@ -592,6 +604,8 @@ class ResearchHandler: max_rounds: int = 20, search_provider: str = None, category: str = None, + extraction_timeout: int = None, + extraction_concurrency: int = None, ) -> str: """ Run iterative deep research using the LLM-in-the-loop DeepResearcher. @@ -627,6 +641,18 @@ class ResearchHandler: from src.settings import get_setting _max_report_tokens = int(get_setting("research_max_tokens", 16384)) + _extraction_timeout = _bounded_int( + extraction_timeout if extraction_timeout is not None else get_setting("research_extraction_timeout_seconds", 90), + default=90, + minimum=15, + maximum=600, + ) + _extraction_concurrency = _bounded_int( + extraction_concurrency if extraction_concurrency is not None else get_setting("research_extraction_concurrency", 3), + default=3, + minimum=1, + maximum=12, + ) researcher = DeepResearcher( llm_endpoint=llm_endpoint, @@ -636,6 +662,8 @@ class ResearchHandler: min_rounds=min(3, max_rounds), max_time=max_time, max_report_tokens=_max_report_tokens, + extraction_timeout=_extraction_timeout, + extraction_concurrency=_extraction_concurrency, progress_callback=progress_callback, search_provider=search_provider, category=category, diff --git a/src/settings.py b/src/settings.py index e9bdeea..4ef068b 100644 --- a/src/settings.py +++ b/src/settings.py @@ -64,6 +64,8 @@ DEFAULT_SETTINGS = { "research_model": "", "research_search_provider": "", "research_max_tokens": 16384, + "research_extraction_timeout_seconds": 90, + "research_extraction_concurrency": 3, "agent_max_tool_calls": 0, "agent_input_token_budget": 6000, "agent_stream_timeout_seconds": 300, diff --git a/src/task_scheduler.py b/src/task_scheduler.py index 369448d..4bdb1ef 100644 --- a/src/task_scheduler.py +++ b/src/task_scheduler.py @@ -1551,6 +1551,8 @@ class TaskScheduler: pass max_tokens = int(get_setting("research_max_tokens", 8192)) + extraction_timeout = int(get_setting("research_extraction_timeout_seconds", 90) or 90) + extraction_concurrency = int(get_setting("research_extraction_concurrency", 3) or 3) researcher = DeepResearcher( llm_endpoint=endpoint_url, @@ -1559,6 +1561,8 @@ class TaskScheduler: max_rounds=8, max_time=600, # 10 min for scheduled research max_report_tokens=max_tokens, + extraction_timeout=extraction_timeout, + extraction_concurrency=extraction_concurrency, ) started_ts = time.time() diff --git a/static/index.html b/static/index.html index 9d44cbb..b7ff659 100644 --- a/static/index.html +++ b/static/index.html @@ -1451,6 +1451,14 @@ +
+ + +
+
+ + +
diff --git a/static/js/settings.js b/static/js/settings.js index c4e48be..d8a74e8 100644 --- a/static/js/settings.js +++ b/static/js/settings.js @@ -1365,6 +1365,8 @@ async function initResearchSettings() { var epSel = el('set-researchEndpoint'); var modelSel = el('set-researchModel'); var tokensInput = el('set-researchMaxTokens'); + var extractTimeoutInput = el('set-researchExtractTimeout'); + var extractConcurrencyInput = el('set-researchExtractConcurrency'); var msg = el('set-researchMsg'); var endpoints = []; @@ -1385,6 +1387,8 @@ async function initResearchSettings() { if (settings.research_endpoint_id) epSel.value = settings.research_endpoint_id; refreshModels(settings.research_model || ''); if (settings.research_max_tokens) tokensInput.value = settings.research_max_tokens; + if (settings.research_extraction_timeout_seconds) extractTimeoutInput.value = settings.research_extraction_timeout_seconds; + if (settings.research_extraction_concurrency) extractConcurrencyInput.value = settings.research_extraction_concurrency; } catch (e) { console.warn('Failed to load research settings', e); } function showStatus() { @@ -1397,6 +1401,12 @@ async function initResearchSettings() { if (tokensInput.value) { parts.push('Max tokens: ' + tokensInput.value); } + if (extractTimeoutInput.value) { + parts.push('Extract: ' + extractTimeoutInput.value + 's'); + } + if (extractConcurrencyInput.value) { + parts.push('Parallel: ' + extractConcurrencyInput.value); + } if (parts.length) { msg.textContent = parts.join(' ยท '); msg.style.color = 'var(--fg)'; @@ -1414,6 +1424,10 @@ async function initResearchSettings() { }; var tv = parseInt(tokensInput.value, 10); if (tv && tv >= 1024) payload.research_max_tokens = tv; + var et = parseInt(extractTimeoutInput.value, 10); + if (et && et >= 15 && et <= 600) payload.research_extraction_timeout_seconds = et; + var ec = parseInt(extractConcurrencyInput.value, 10); + if (ec && ec >= 1 && ec <= 12) payload.research_extraction_concurrency = ec; try { await fetch('/api/auth/settings', { method: 'POST', credentials: 'same-origin', headers: { 'Content-Type': 'application/json' }, @@ -1430,6 +1444,8 @@ async function initResearchSettings() { }); modelSel.addEventListener('change', saveResearch); tokensInput.addEventListener('change', saveResearch); + extractTimeoutInput.addEventListener('change', saveResearch); + extractConcurrencyInput.addEventListener('change', saveResearch); _registerAiEndpointRefresh(function(nextEndpoints) { endpoints = nextEndpoints; diff --git a/tests/test_deep_research_extraction_controls.py b/tests/test_deep_research_extraction_controls.py new file mode 100644 index 0000000..bdbbae3 --- /dev/null +++ b/tests/test_deep_research_extraction_controls.py @@ -0,0 +1,88 @@ +import asyncio +import json +import sys +import time +import types + +import pytest + +from src.deep_research import DeepResearcher + + +class _ControlledResearcher(DeepResearcher): + def __init__(self, *args, **kwargs): + super().__init__( + llm_endpoint="http://local.test/v1/chat/completions", + llm_model="local-model", + *args, + **kwargs, + ) + self.active = 0 + self.max_active = 0 + + async def _search(self, query): + return [ + {"url": f"https://example.test/{query}/{i}", "title": f"{query}-{i}"} + for i in range(4) + ] + + async def _fetch_and_extract(self, url, question, title): + self.active += 1 + self.max_active = max(self.max_active, self.active) + await asyncio.sleep(0.01) + self.active -= 1 + return {"url": url, "title": title, "summary": "ok"} + + +@pytest.mark.asyncio +async def test_search_and_extract_respects_extraction_concurrency(): + researcher = _ControlledResearcher(extraction_concurrency=2, max_urls_per_round=4) + researcher._start_time = time.time() + + findings = await researcher._search_and_extract(["a", "b"], "question") + + assert len(findings) == 8 + assert researcher.max_active == 2 + + +@pytest.mark.asyncio +async def test_fetch_and_extract_uses_configured_timeout(monkeypatch): + captured = {} + search_mod = types.ModuleType("src.search") + + def fake_fetch_webpage_content(url, timeout): + return { + "success": True, + "content": "useful page content", + "title": "Page", + "og_image": "", + } + + search_mod.fetch_webpage_content = fake_fetch_webpage_content + monkeypatch.setitem(sys.modules, "src.search", search_mod) + + async def immediate_to_thread(fn, *args, **kwargs): + return fn(*args, **kwargs) + + monkeypatch.setattr(asyncio, "to_thread", immediate_to_thread) + + researcher = DeepResearcher( + llm_endpoint="http://local.test/v1/chat/completions", + llm_model="local-model", + extraction_timeout=123, + ) + + async def fake_llm(messages, temperature=0.3, max_tokens=4096, timeout=60): + captured["timeout"] = timeout + return json.dumps({ + "rational": "relevant", + "evidence": "evidence", + "summary": "useful page content", + }) + + researcher._llm = fake_llm + + result = await researcher._fetch_and_extract("https://example.test", "question", "Title") + + assert result["summary"] == "useful page content" + assert captured["timeout"] == 123