diff --git a/routes/research_routes.py b/routes/research_routes.py index 7a37d9e..233cc82 100644 --- a/routes/research_routes.py +++ b/routes/research_routes.py @@ -299,6 +299,8 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter: endpoint_id: Optional[str] = None model: Optional[str] = None max_time: int = Field(default=300, ge=60, le=1800) + extraction_timeout: Optional[int] = Field(default=None, ge=15, le=600) + extraction_concurrency: Optional[int] = Field(default=None, ge=1, le=12) category: Optional[str] = None @router.post("/api/research/start") @@ -401,6 +403,8 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter: max_rounds=effective_max_rounds, search_provider=body.search_provider or None, category=body.category or None, + extraction_timeout=body.extraction_timeout, + extraction_concurrency=body.extraction_concurrency, owner=user, ) return {"session_id": session_id, "status": "running", "query": body.query} diff --git a/src/deep_research.py b/src/deep_research.py index 9228088..2de0c22 100644 --- a/src/deep_research.py +++ b/src/deep_research.py @@ -180,6 +180,8 @@ class DeepResearcher: max_urls_per_round: int = 3, max_content_chars: int = 15000, max_report_tokens: int = 8192, + extraction_timeout: int = 90, + extraction_concurrency: int = 3, min_rounds: int = 2, max_empty_rounds: int = 2, synthesis_window: int = 10, @@ -197,6 +199,8 @@ class DeepResearcher: self.max_urls_per_round = max_urls_per_round self.max_content_chars = max_content_chars self.max_report_tokens = max_report_tokens + self.extraction_timeout = min(600, max(15, int(extraction_timeout or 90))) + self.extraction_concurrency = min(12, max(1, int(extraction_concurrency or 3))) self.min_rounds = min_rounds self.max_empty_rounds = max_empty_rounds self.synthesis_window = synthesis_window @@ -492,11 +496,16 @@ class DeepResearcher: if self._cancelled or self._time_exceeded(): return all_findings - # Fetch and extract all URLs concurrently - extract_tasks = [ - self._fetch_and_extract(r["url"], question, r.get("title", "")) - for r in urls_to_fetch - ] + # Fetch and extract URLs with backpressure. Local model servers often + # serialize requests behind one GPU; flooding them makes every request + # slower and can trip the extraction timeout. + semaphore = asyncio.Semaphore(self.extraction_concurrency) + + async def _bounded_extract(result: Dict) -> Optional[Dict]: + async with semaphore: + return await self._fetch_and_extract(result["url"], question, result.get("title", "")) + + extract_tasks = [_bounded_extract(r) for r in urls_to_fetch] results_gathered = await asyncio.gather(*extract_tasks, return_exceptions=True) for result in results_gathered: @@ -576,7 +585,7 @@ class DeepResearcher: [{"role": "user", "content": prompt}], temperature=0.2, max_tokens=2048, - timeout=45, + timeout=self.extraction_timeout, ) parsed = self._parse_json_object(response) if parsed: diff --git a/src/research_handler.py b/src/research_handler.py index efa4af7..1a69e08 100644 --- a/src/research_handler.py +++ b/src/research_handler.py @@ -22,6 +22,14 @@ logger = logging.getLogger(__name__) RESEARCH_DATA_DIR = Path("data/deep_research") +def _bounded_int(value, *, default: int, minimum: int, maximum: int) -> int: + try: + n = int(value) + except (TypeError, ValueError): + return default + return max(minimum, min(maximum, n)) + + class ResearchHandler: """Handles research service operations with iterative deep research.""" @@ -165,6 +173,8 @@ class ResearchHandler: max_rounds: int = 20, search_provider: str = None, category: str = None, + extraction_timeout: int = None, + extraction_concurrency: int = None, owner: str = "", ) -> dict: """Start research as a background task. Returns task info dict. @@ -222,6 +232,8 @@ class ResearchHandler: max_rounds=max_rounds, search_provider=search_provider, category=category, + extraction_timeout=extraction_timeout, + extraction_concurrency=extraction_concurrency, ), timeout=hard_timeout, ) @@ -592,6 +604,8 @@ class ResearchHandler: max_rounds: int = 20, search_provider: str = None, category: str = None, + extraction_timeout: int = None, + extraction_concurrency: int = None, ) -> str: """ Run iterative deep research using the LLM-in-the-loop DeepResearcher. @@ -627,6 +641,18 @@ class ResearchHandler: from src.settings import get_setting _max_report_tokens = int(get_setting("research_max_tokens", 16384)) + _extraction_timeout = _bounded_int( + extraction_timeout if extraction_timeout is not None else get_setting("research_extraction_timeout_seconds", 90), + default=90, + minimum=15, + maximum=600, + ) + _extraction_concurrency = _bounded_int( + extraction_concurrency if extraction_concurrency is not None else get_setting("research_extraction_concurrency", 3), + default=3, + minimum=1, + maximum=12, + ) researcher = DeepResearcher( llm_endpoint=llm_endpoint, @@ -636,6 +662,8 @@ class ResearchHandler: min_rounds=min(3, max_rounds), max_time=max_time, max_report_tokens=_max_report_tokens, + extraction_timeout=_extraction_timeout, + extraction_concurrency=_extraction_concurrency, progress_callback=progress_callback, search_provider=search_provider, category=category, diff --git a/src/settings.py b/src/settings.py index e9bdeea..4ef068b 100644 --- a/src/settings.py +++ b/src/settings.py @@ -64,6 +64,8 @@ DEFAULT_SETTINGS = { "research_model": "", "research_search_provider": "", "research_max_tokens": 16384, + "research_extraction_timeout_seconds": 90, + "research_extraction_concurrency": 3, "agent_max_tool_calls": 0, "agent_input_token_budget": 6000, "agent_stream_timeout_seconds": 300, diff --git a/src/task_scheduler.py b/src/task_scheduler.py index 369448d..4bdb1ef 100644 --- a/src/task_scheduler.py +++ b/src/task_scheduler.py @@ -1551,6 +1551,8 @@ class TaskScheduler: pass max_tokens = int(get_setting("research_max_tokens", 8192)) + extraction_timeout = int(get_setting("research_extraction_timeout_seconds", 90) or 90) + extraction_concurrency = int(get_setting("research_extraction_concurrency", 3) or 3) researcher = DeepResearcher( llm_endpoint=endpoint_url, @@ -1559,6 +1561,8 @@ class TaskScheduler: max_rounds=8, max_time=600, # 10 min for scheduled research max_report_tokens=max_tokens, + extraction_timeout=extraction_timeout, + extraction_concurrency=extraction_concurrency, ) started_ts = time.time() diff --git a/static/index.html b/static/index.html index 9d44cbb..b7ff659 100644 --- a/static/index.html +++ b/static/index.html @@ -1451,6 +1451,14 @@ +