Add Deep Research extraction controls

This commit is contained in:
pewdiepie-archdaemon
2026-06-01 14:55:24 +09:00
parent 6872679f31
commit b998c52dd0
8 changed files with 165 additions and 6 deletions

View File

@@ -299,6 +299,8 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
endpoint_id: Optional[str] = None
model: Optional[str] = None
max_time: int = Field(default=300, ge=60, le=1800)
extraction_timeout: Optional[int] = Field(default=None, ge=15, le=600)
extraction_concurrency: Optional[int] = Field(default=None, ge=1, le=12)
category: Optional[str] = None
@router.post("/api/research/start")
@@ -401,6 +403,8 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
max_rounds=effective_max_rounds,
search_provider=body.search_provider or None,
category=body.category or None,
extraction_timeout=body.extraction_timeout,
extraction_concurrency=body.extraction_concurrency,
owner=user,
)
return {"session_id": session_id, "status": "running", "query": body.query}

View File

@@ -180,6 +180,8 @@ class DeepResearcher:
max_urls_per_round: int = 3,
max_content_chars: int = 15000,
max_report_tokens: int = 8192,
extraction_timeout: int = 90,
extraction_concurrency: int = 3,
min_rounds: int = 2,
max_empty_rounds: int = 2,
synthesis_window: int = 10,
@@ -197,6 +199,8 @@ class DeepResearcher:
self.max_urls_per_round = max_urls_per_round
self.max_content_chars = max_content_chars
self.max_report_tokens = max_report_tokens
self.extraction_timeout = min(600, max(15, int(extraction_timeout or 90)))
self.extraction_concurrency = min(12, max(1, int(extraction_concurrency or 3)))
self.min_rounds = min_rounds
self.max_empty_rounds = max_empty_rounds
self.synthesis_window = synthesis_window
@@ -492,11 +496,16 @@ class DeepResearcher:
if self._cancelled or self._time_exceeded():
return all_findings
# Fetch and extract all URLs concurrently
extract_tasks = [
self._fetch_and_extract(r["url"], question, r.get("title", ""))
for r in urls_to_fetch
]
# Fetch and extract URLs with backpressure. Local model servers often
# serialize requests behind one GPU; flooding them makes every request
# slower and can trip the extraction timeout.
semaphore = asyncio.Semaphore(self.extraction_concurrency)
async def _bounded_extract(result: Dict) -> Optional[Dict]:
async with semaphore:
return await self._fetch_and_extract(result["url"], question, result.get("title", ""))
extract_tasks = [_bounded_extract(r) for r in urls_to_fetch]
results_gathered = await asyncio.gather(*extract_tasks, return_exceptions=True)
for result in results_gathered:
@@ -576,7 +585,7 @@ class DeepResearcher:
[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=2048,
timeout=45,
timeout=self.extraction_timeout,
)
parsed = self._parse_json_object(response)
if parsed:

View File

@@ -22,6 +22,14 @@ logger = logging.getLogger(__name__)
RESEARCH_DATA_DIR = Path("data/deep_research")
def _bounded_int(value, *, default: int, minimum: int, maximum: int) -> int:
try:
n = int(value)
except (TypeError, ValueError):
return default
return max(minimum, min(maximum, n))
class ResearchHandler:
"""Handles research service operations with iterative deep research."""
@@ -165,6 +173,8 @@ class ResearchHandler:
max_rounds: int = 20,
search_provider: str = None,
category: str = None,
extraction_timeout: int = None,
extraction_concurrency: int = None,
owner: str = "",
) -> dict:
"""Start research as a background task. Returns task info dict.
@@ -222,6 +232,8 @@ class ResearchHandler:
max_rounds=max_rounds,
search_provider=search_provider,
category=category,
extraction_timeout=extraction_timeout,
extraction_concurrency=extraction_concurrency,
),
timeout=hard_timeout,
)
@@ -592,6 +604,8 @@ class ResearchHandler:
max_rounds: int = 20,
search_provider: str = None,
category: str = None,
extraction_timeout: int = None,
extraction_concurrency: int = None,
) -> str:
"""
Run iterative deep research using the LLM-in-the-loop DeepResearcher.
@@ -627,6 +641,18 @@ class ResearchHandler:
from src.settings import get_setting
_max_report_tokens = int(get_setting("research_max_tokens", 16384))
_extraction_timeout = _bounded_int(
extraction_timeout if extraction_timeout is not None else get_setting("research_extraction_timeout_seconds", 90),
default=90,
minimum=15,
maximum=600,
)
_extraction_concurrency = _bounded_int(
extraction_concurrency if extraction_concurrency is not None else get_setting("research_extraction_concurrency", 3),
default=3,
minimum=1,
maximum=12,
)
researcher = DeepResearcher(
llm_endpoint=llm_endpoint,
@@ -636,6 +662,8 @@ class ResearchHandler:
min_rounds=min(3, max_rounds),
max_time=max_time,
max_report_tokens=_max_report_tokens,
extraction_timeout=_extraction_timeout,
extraction_concurrency=_extraction_concurrency,
progress_callback=progress_callback,
search_provider=search_provider,
category=category,

View File

@@ -64,6 +64,8 @@ DEFAULT_SETTINGS = {
"research_model": "",
"research_search_provider": "",
"research_max_tokens": 16384,
"research_extraction_timeout_seconds": 90,
"research_extraction_concurrency": 3,
"agent_max_tool_calls": 0,
"agent_input_token_budget": 6000,
"agent_stream_timeout_seconds": 300,

View File

@@ -1551,6 +1551,8 @@ class TaskScheduler:
pass
max_tokens = int(get_setting("research_max_tokens", 8192))
extraction_timeout = int(get_setting("research_extraction_timeout_seconds", 90) or 90)
extraction_concurrency = int(get_setting("research_extraction_concurrency", 3) or 3)
researcher = DeepResearcher(
llm_endpoint=endpoint_url,
@@ -1559,6 +1561,8 @@ class TaskScheduler:
max_rounds=8,
max_time=600, # 10 min for scheduled research
max_report_tokens=max_tokens,
extraction_timeout=extraction_timeout,
extraction_concurrency=extraction_concurrency,
)
started_ts = time.time()

View File

@@ -1451,6 +1451,14 @@
<label class="settings-label">Max Tokens</label>
<input id="set-researchMaxTokens" type="text" inputmode="numeric" placeholder="8192 (default)" class="settings-select" style="width:120px;">
</div>
<div class="settings-row">
<label class="settings-label">Extract Timeout</label>
<input id="set-researchExtractTimeout" type="text" inputmode="numeric" placeholder="90 sec" class="settings-select" style="width:120px;">
</div>
<div class="settings-row">
<label class="settings-label">Extract Parallel</label>
<input id="set-researchExtractConcurrency" type="text" inputmode="numeric" placeholder="3" class="settings-select" style="width:120px;">
</div>
<div id="set-researchMsg" style="font-size:11px;color:color-mix(in srgb, var(--fg) 45%, transparent);"></div>
</div>
</div>

View File

@@ -1365,6 +1365,8 @@ async function initResearchSettings() {
var epSel = el('set-researchEndpoint');
var modelSel = el('set-researchModel');
var tokensInput = el('set-researchMaxTokens');
var extractTimeoutInput = el('set-researchExtractTimeout');
var extractConcurrencyInput = el('set-researchExtractConcurrency');
var msg = el('set-researchMsg');
var endpoints = [];
@@ -1385,6 +1387,8 @@ async function initResearchSettings() {
if (settings.research_endpoint_id) epSel.value = settings.research_endpoint_id;
refreshModels(settings.research_model || '');
if (settings.research_max_tokens) tokensInput.value = settings.research_max_tokens;
if (settings.research_extraction_timeout_seconds) extractTimeoutInput.value = settings.research_extraction_timeout_seconds;
if (settings.research_extraction_concurrency) extractConcurrencyInput.value = settings.research_extraction_concurrency;
} catch (e) { console.warn('Failed to load research settings', e); }
function showStatus() {
@@ -1397,6 +1401,12 @@ async function initResearchSettings() {
if (tokensInput.value) {
parts.push('Max tokens: ' + tokensInput.value);
}
if (extractTimeoutInput.value) {
parts.push('Extract: ' + extractTimeoutInput.value + 's');
}
if (extractConcurrencyInput.value) {
parts.push('Parallel: ' + extractConcurrencyInput.value);
}
if (parts.length) {
msg.textContent = parts.join(' · ');
msg.style.color = 'var(--fg)';
@@ -1414,6 +1424,10 @@ async function initResearchSettings() {
};
var tv = parseInt(tokensInput.value, 10);
if (tv && tv >= 1024) payload.research_max_tokens = tv;
var et = parseInt(extractTimeoutInput.value, 10);
if (et && et >= 15 && et <= 600) payload.research_extraction_timeout_seconds = et;
var ec = parseInt(extractConcurrencyInput.value, 10);
if (ec && ec >= 1 && ec <= 12) payload.research_extraction_concurrency = ec;
try {
await fetch('/api/auth/settings', { method: 'POST', credentials: 'same-origin',
headers: { 'Content-Type': 'application/json' },
@@ -1430,6 +1444,8 @@ async function initResearchSettings() {
});
modelSel.addEventListener('change', saveResearch);
tokensInput.addEventListener('change', saveResearch);
extractTimeoutInput.addEventListener('change', saveResearch);
extractConcurrencyInput.addEventListener('change', saveResearch);
_registerAiEndpointRefresh(function(nextEndpoints) {
endpoints = nextEndpoints;

View File

@@ -0,0 +1,88 @@
import asyncio
import json
import sys
import time
import types
import pytest
from src.deep_research import DeepResearcher
class _ControlledResearcher(DeepResearcher):
def __init__(self, *args, **kwargs):
super().__init__(
llm_endpoint="http://local.test/v1/chat/completions",
llm_model="local-model",
*args,
**kwargs,
)
self.active = 0
self.max_active = 0
async def _search(self, query):
return [
{"url": f"https://example.test/{query}/{i}", "title": f"{query}-{i}"}
for i in range(4)
]
async def _fetch_and_extract(self, url, question, title):
self.active += 1
self.max_active = max(self.max_active, self.active)
await asyncio.sleep(0.01)
self.active -= 1
return {"url": url, "title": title, "summary": "ok"}
@pytest.mark.asyncio
async def test_search_and_extract_respects_extraction_concurrency():
researcher = _ControlledResearcher(extraction_concurrency=2, max_urls_per_round=4)
researcher._start_time = time.time()
findings = await researcher._search_and_extract(["a", "b"], "question")
assert len(findings) == 8
assert researcher.max_active == 2
@pytest.mark.asyncio
async def test_fetch_and_extract_uses_configured_timeout(monkeypatch):
captured = {}
search_mod = types.ModuleType("src.search")
def fake_fetch_webpage_content(url, timeout):
return {
"success": True,
"content": "useful page content",
"title": "Page",
"og_image": "",
}
search_mod.fetch_webpage_content = fake_fetch_webpage_content
monkeypatch.setitem(sys.modules, "src.search", search_mod)
async def immediate_to_thread(fn, *args, **kwargs):
return fn(*args, **kwargs)
monkeypatch.setattr(asyncio, "to_thread", immediate_to_thread)
researcher = DeepResearcher(
llm_endpoint="http://local.test/v1/chat/completions",
llm_model="local-model",
extraction_timeout=123,
)
async def fake_llm(messages, temperature=0.3, max_tokens=4096, timeout=60):
captured["timeout"] = timeout
return json.dumps({
"rational": "relevant",
"evidence": "evidence",
"summary": "useful page content",
})
researcher._llm = fake_llm
result = await researcher._fetch_and_extract("https://example.test", "question", "Title")
assert result["summary"] == "useful page content"
assert captured["timeout"] == 123