From c075abce5dd21b1e7f701164e2aa9a48da6d09ea Mon Sep 17 00:00:00 2001 From: ghreprimand Date: Tue, 2 Jun 2026 07:02:26 -0500 Subject: [PATCH] Search: consolidate core and provider implementations Co-authored-by: ghreprimand <203024559+ghreprimand@users.noreply.github.com> --- src/search/core.py | 454 +------------- src/search/providers.py | 622 +------------------ tests/test_search_module_consolidation.py | 35 ++ tests/test_service_search_provider_guards.py | 7 +- 4 files changed, 54 insertions(+), 1064 deletions(-) create mode 100644 tests/test_search_module_consolidation.py diff --git a/src/search/core.py b/src/search/core.py index 850e026..c7ca009 100644 --- a/src/search/core.py +++ b/src/search/core.py @@ -1,450 +1,12 @@ -"""Core search orchestrators: searxng_search_results, comprehensive_web_search, config, cache invalidation.""" +"""Compatibility wrapper for the canonical services.search.core module. -import json -import logging -from concurrent.futures import ThreadPoolExecutor, as_completed -from datetime import datetime, timedelta -from typing import Dict, Any, Optional, List, Set -from urllib.parse import urlparse +``src.search.core`` remains importable for older agent/deep-research code, but +the implementation now lives in ``services.search.core`` so provider ordering, +cache invalidation, and search route behavior cannot drift between copies. +""" -from .analytics import ( - NetworkError, - ParseError, - RateLimitError, - error_logger, - _record_query, -) -from .cache import ( - SEARCH_CACHE_DIR, - search_cache_index, - generate_cache_key, - cleanup_cache, -) -from .query import _cache_duration_for_query -from .ranking import rank_search_results -from .providers import ( - searxng_search_api, - brave_search, - duckduckgo_search, - google_pse_search, - tavily_search, - serper_search, - _get_search_settings, - _get_result_count, -) -from .content import ( - fetch_webpage_content, - extract_key_points, - get_tldr, - extract_quotes, - extract_statistics, -) +import sys -logger = logging.getLogger(__name__) +from services.search import core as _core -# ========= CONFIG ========= -SEARCH_CONFIG: Dict[str, Any] = { - "primary_provider": "searxng", -} - - -def get_search_config() -> Dict[str, Any]: - """Get current search configuration including active provider info.""" - config = SEARCH_CONFIG.copy() - settings = _get_search_settings() - provider = settings.get("search_provider", "searxng") - config["active_provider"] = provider - config["has_api_key"] = bool((settings.get("search_api_key") or "").strip()) - config["result_count"] = _get_result_count() - if provider == "searxng": - from .providers import _get_search_instance - config["search_url"] = _get_search_instance() - return config - - -def update_search_config(api_key: str = None, **kwargs): - """Update search configuration (e.g. Brave API key).""" - if api_key: - SEARCH_CONFIG["brave_api_key"] = api_key - - -def _call_provider(provider_name: str, query: str, count: int, time_filter: str = None) -> List[dict]: - """Call a search provider by name. Returns list of results or empty list.""" - if provider_name == "searxng": - return searxng_search_api(query, count, time_filter=time_filter) - elif provider_name == "brave": - return brave_search(query, count, time_filter) - elif provider_name == "duckduckgo": - return duckduckgo_search(query, count, time_filter) - elif provider_name == "google_pse": - return google_pse_search(query, count, time_filter) - elif provider_name == "tavily": - return tavily_search(query, count, time_filter) - elif provider_name == "serper": - return serper_search(query, count, time_filter) - return [] - - -# If the self-hosted SearXNG instance is up but all enabled engines return -# empty, fall back to the no-key provider so "search X" still works on fresh -# installs. Users can override/disable with `search_fallback_chain`. -_FALLBACK_ORDER = ["duckduckgo"] - - -def _build_provider_chain(primary: str) -> List[str]: - """Build ordered list: primary first, then fallbacks (skipping primary - and dedupes). The fallback list comes from - `settings.search_fallback_chain` if the user configured one, otherwise - the hardcoded default above.""" - chain = [primary] - settings = _get_search_settings() - user_chain = settings.get("search_fallback_chain") or [] - if isinstance(user_chain, str): - # Tolerate comma-separated form from older payloads. - user_chain = [s.strip() for s in user_chain.split(",") if s.strip()] - fallbacks = user_chain if user_chain else _FALLBACK_ORDER - for fb in fallbacks: - if fb and fb != primary and fb not in chain and fb != "disabled": - chain.append(fb) - return chain - - -# ---------------------------------------------------------------------- -# Unified search with caching and retry -# ---------------------------------------------------------------------- -def searxng_search_results(query: str, count: int = 10, time_filter: str = None) -> list[dict]: - """Perform a web search using configured provider with caching and retry.""" - settings = _get_search_settings() - search_provider = settings.get("search_provider", "searxng") - result_count = _get_result_count() - # Use configured count if caller used default - if count == 10: - count = result_count - - cache_key = generate_cache_key(f"{query}|{count}|{time_filter}") - cache_file = SEARCH_CACHE_DIR / f"{cache_key}.cache" - - # Check cache - if cache_file.exists(): - try: - with open(cache_file, "r", encoding="utf-8") as f: - cached_data = json.load(f) - expiry_raw = cached_data.get("expiry") - expiry = datetime.fromisoformat(expiry_raw) if expiry_raw else None - if expiry and datetime.now() < expiry: - logger.debug(f"Search cache hit for query: {query}") - results = cached_data["data"] - _record_query(query, bool(results), cache_hit=True) - return results - else: - cache_file.unlink(missing_ok=True) - search_cache_index.pop(cache_key, None) - except Exception as e: - logger.warning(f"Failed to read search cache for {query}: {e}") - cache_file.unlink(missing_ok=True) - search_cache_index.pop(cache_key, None) - - logger.debug(f"Search cache miss for query: {query}") - - if search_provider == "disabled": - logger.info("Search is disabled via admin settings") - return [] - - provider_chain = _build_provider_chain(search_provider) - - results: List[dict] = [] - for provider_name in provider_chain: - for attempt in range(2): - try: - logger.info(f"Attempting {provider_name} search (attempt {attempt + 1})") - results = _call_provider(provider_name, query, count, time_filter) - if results: - logger.info(f"{provider_name} search succeeded with {len(results)} results") - break - except (NetworkError, ParseError, RateLimitError) as e: - error_logger.error(f"{provider_name} search error (attempt {attempt + 1}): {e}") - except Exception as e: - error_logger.error(f"Unexpected error during {provider_name} search (attempt {attempt + 1}): {e}") - if results: - break - - success = bool(results) - _record_query(query, success, cache_hit=False) - - if success: - results = rank_search_results(query, results) - try: - expiry = datetime.now() + _cache_duration_for_query(query) - cache_data = { - "timestamp": datetime.now().isoformat(), - "expiry": expiry.isoformat(), - "data": results, - } - with open(cache_file, "w", encoding="utf-8") as f: - json.dump(cache_data, f) - search_cache_index[cache_key] = datetime.now() - cleanup_cache(SEARCH_CACHE_DIR, search_cache_index, timedelta(hours=1)) - except Exception as e: - logger.warning(f"Failed to write search cache for {query}: {e}") - - if not success: - logger.error(f"All search providers failed for query: {query}") - - return results - - -# ---------------------------------------------------------------------- -# Cache invalidation -# ---------------------------------------------------------------------- -def invalidate_search_cache(query: Optional[str] = None) -> None: - """Invalidate cached search results. None clears all, otherwise just the given query.""" - if query is None: - for file in SEARCH_CACHE_DIR.glob("*.cache"): - try: - file.unlink(missing_ok=True) - except Exception as e: - error_logger.warning(f"Failed to delete cache file {file}: {e}") - search_cache_index.clear() - logger.info("All search cache entries have been cleared.") - else: - # Match the key the write path stores: searxng_search_results replaces - # the caller's default count with the configured _get_result_count() - # (default 5), so a hardcoded "|10|None" never matched a real entry. - cache_key = generate_cache_key(f"{query}|{_get_result_count()}|None") - cache_file = SEARCH_CACHE_DIR / f"{cache_key}.cache" - if cache_file.exists(): - try: - cache_file.unlink(missing_ok=True) - search_cache_index.pop(cache_key, None) - logger.info(f"Cache entry for query '{query}' has been invalidated.") - except Exception as e: - error_logger.warning(f"Failed to delete cache file for query '{query}': {e}") - else: - logger.info(f"No cache entry found for query '{query}'.") - - -# ---------------------------------------------------------------------- -# Comprehensive web search (with advanced filtering) -# ---------------------------------------------------------------------- -def comprehensive_web_search( - query: str, - max_pages: int = 3, - max_workers: int = 4, - time_filter: str = None, - domain_whitelist: Optional[Set[str]] = None, - domain_blacklist: Optional[Set[str]] = None, - content_type: Optional[str] = None, - language: Optional[str] = None, - min_content_length: int = 0, - return_sources: bool = False, -): - """Perform comprehensive web search with content fetching and advanced filtering.""" - logger.info(f"Starting comprehensive search for: {query}") - if time_filter: - logger.info(f"Applying time filter: {time_filter}") - - settings = _get_search_settings() - search_provider = settings.get("search_provider", "searxng") - result_count = _get_result_count() - - if search_provider == "disabled": - logger.info("Search is disabled via admin settings") - msg = "Web search is disabled by the administrator." - return (msg, []) if return_sources else msg - - # Use configured result count (at least max_pages for content fetching) - fetch_count = max(result_count, max_pages) - - provider_chain = _build_provider_chain(search_provider) - - # Each provider gets 2 attempts (matches the inner unified_search behavior). - # Empty results are tracked separately from exceptions so the failure - # message can tell a soft-fail (provider returned []) apart from a real - # error (network blow-up, rate limit, etc.) — useful both for logging - # and for the model when it sees the response. - search_results = [] - provider_attempts = {} # provider -> "ok N", "empty", "error: ..." - for provider_name in provider_chain: - last_err = None - empty = False - for attempt in range(2): - try: - search_results = _call_provider(provider_name, query, fetch_count, time_filter) - if search_results: - provider_attempts[provider_name] = f"ok ({len(search_results)})" - logger.info(f"Comprehensive search: {provider_name} returned {len(search_results)} results") - break - # Empty result — try once more (transient empties are common on flaky instances) - empty = True - except Exception as e: - last_err = e - logger.warning(f"Comprehensive search: {provider_name} attempt {attempt + 1} failed: {e}") - if search_results: - break - if last_err is not None: - provider_attempts[provider_name] = f"error: {last_err}" - elif empty: - provider_attempts[provider_name] = "empty" - - if not search_results: - # Build a per-provider tally so the model (and logs) see which - # providers were tried and how each one fared, instead of the - # uninformative "No search results found". - tally = ", ".join(f"{p}:{r}" for p, r in provider_attempts.items()) or "no providers configured" - any_errors = any(r.startswith("error") for r in provider_attempts.values()) - if any_errors: - msg = f"Web search failed — all providers errored or returned empty. Tried: {tally}" - logger.error(msg) - else: - msg = ( - f"No search results found. Tried: {tally}. " - "All providers returned empty — possibly a niche query or upstream rate-limiting; " - "rephrasing or using the browser tool for a specific URL may help." - ) - logger.warning(msg) - return (msg, []) if return_sources else msg - - search_results = rank_search_results(query, search_results) - - # URL filter helper - def url_passes_filters(url: str) -> bool: - try: - netloc = urlparse(url).netloc.lower() - except Exception: - return False - if domain_whitelist is not None and netloc not in domain_whitelist: - return False - if domain_blacklist is not None and netloc in domain_blacklist: - return False - if content_type: - ct = content_type.lower() - if ct == "article": - if not any(k in url.lower() for k in ("article", "blog", "news", "post")): - return False - elif ct == "forum": - if not any(k in url.lower() for k in ("forum", "discussion", "thread", "topic")): - return False - elif ct == "academic": - if not any(k in url.lower() for k in ("pdf", "doi", "scholar", "arxiv", "journal", "research")): - return False - if language: - lang_pat = language.lower() - if not (f"/{lang_pat}/" in url.lower() or f"?lang={lang_pat}" in url.lower() or f"&lang={lang_pat}" in url.lower()): - return False - return True - - filtered_urls = [r["url"] for r in search_results[:max_pages] if url_passes_filters(r["url"])] - if not filtered_urls: - logger.warning("All URLs filtered out by advanced criteria") - msg = "No suitable results after applying filters." - return (msg, []) if return_sources else msg - - # Build sources list for the frontend (before content fetching) - _source_list = [ - {"url": r.get("url", ""), "title": r.get("title", "")} - for r in search_results if r.get("url") - ] - - # Fetch content in parallel - fetched_content = [] - with ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_url = { - executor.submit(fetch_webpage_content, url, 8, retry_attempt=0): url - for url in filtered_urls - } - for future in as_completed(future_to_url): - url = future_to_url[future] - try: - result = future.result() - if result["success"] and result["content"] and len(result["content"]) >= min_content_length: - fetched_content.append(result) - except Exception as e: - logger.error(f"Exception while fetching {url}: {str(e)}") - - logger.info(f"Successfully fetched content from {len(fetched_content)} pages") - - # Format results - output_parts = [] - - if search_results: - output_parts.append("```sources") - for i, result in enumerate(search_results, 1): - output_parts.append(f"[{i}] {result['title']}") - output_parts.append(f" {result['url']}") - if result.get("age"): - output_parts.append(f" {result['age']}") - output_parts.append("```") - output_parts.append("") - - output_parts.append("=" * 70) - output_parts.append("WEB SEARCH RESULTS AND FETCHED CONTENT") - output_parts.append(f"Query: {query}") - output_parts.append(f"Searched {len(search_results)} results, fetched {len(fetched_content)} pages") - output_parts.append("=" * 70) - output_parts.append("") - - output_parts.append("SEARCH RESULTS SUMMARY:") - output_parts.append("-" * 50) - for i, result in enumerate(search_results, 1): - output_parts.append(f"\n[{i}] {result['title']}") - output_parts.append(f" URL: {result['url']}") - output_parts.append(f" Snippet: {result['snippet'][:200]}...") - if result.get("age"): - output_parts.append(f" Age: {result['age']}") - - if fetched_content: - output_parts.append("\n" + "=" * 70) - output_parts.append("FETCHED PAGE CONTENT:") - output_parts.append("-" * 50) - - for i, content in enumerate(fetched_content, 1): - output_parts.append(f"\n[CONTENT {i}] From: {content['url']}") - output_parts.append(f"Title: {content['title']}") - output_parts.append("-" * 30) - - text = content["content"][:3000] - if len(content["content"]) > 3000: - text += "... [truncated]" - output_parts.append(text) - - key_points = extract_key_points(content["content"]) - if key_points: - output_parts.append("\nKey Points:") - for pt in key_points[:5]: - output_parts.append(f"- {pt}") - - tldr = get_tldr(content["content"]) - if tldr: - output_parts.append("\nTL;DR:") - output_parts.append(tldr) - - quotes = extract_quotes(content["content"]) - if quotes: - output_parts.append("\nImportant Quotes:") - for q in quotes[:3]: - output_parts.append(f"\u201c{q}\u201d") - - stats = extract_statistics(content["content"]) - if stats: - output_parts.append("\nData / Statistics:") - for s in stats[:5]: - output_parts.append(f"- {s}") - - output_parts.append("") - - output_parts.append("=" * 70) - output_parts.append("END OF WEB SEARCH RESULTS") - output_parts.append("=" * 70) - - instructions = ( - "\n\nIMPORTANT INSTRUCTIONS:\n" - "1. Use the above web search results and fetched content to answer the user's question\n" - "2. Prioritize information from the FETCHED PAGE CONTENT section as it contains actual page data\n" - "3. Cross-reference multiple sources when possible\n" - "4. If the information is time-sensitive, pay attention to the age of the results\n" - "5. Be explicit if the search results don't contain sufficient information to fully answer the question" - ) - output_parts.append(instructions) - - result = "\n".join(output_parts) - return (result, _source_list) if return_sources else result +sys.modules[__name__] = _core diff --git a/src/search/providers.py b/src/search/providers.py index 5d8b2e6..0c83a9b 100644 --- a/src/search/providers.py +++ b/src/search/providers.py @@ -1,618 +1,12 @@ -"""Search provider implementations: SearXNG, Brave, DuckDuckGo, Google PSE, Tavily, Serper.""" +"""Compatibility wrapper for the canonical services.search.providers module. -import json -import logging -import os -from typing import List, Optional -from urllib.parse import urljoin, urlparse, parse_qs +Historically Odysseus carried duplicate provider implementations under both +``src.search`` and ``services.search``. Keep the old import path working, but +make provider behavior come from one source of truth. +""" -import httpx -from bs4 import BeautifulSoup +import sys -from src.constants import SEARXNG_INSTANCE -from .analytics import RateLimitError, error_logger -from .query import build_enhanced_query +from services.search import providers as _providers -logger = logging.getLogger(__name__) - -REQUEST_TIMEOUT = 20 - -# Provider registry — maps setting value to (label, needs_key, needs_url) -PROVIDER_INFO = { - "searxng": ("SearXNG", False, True), - "brave": ("Brave Search", True, False), - "duckduckgo": ("DuckDuckGo", False, False), - "google_pse": ("Google PSE", True, False), - "tavily": ("Tavily", True, False), - "serper": ("Serper", True, False), - "disabled": ("Disabled", False, False), -} - - -# ── Settings helpers ── - -def _get_search_settings() -> dict: - """Return search settings from admin config, falling back to env defaults.""" - try: - from src.settings import load_settings - return load_settings() - except Exception: - return {} - - -def _get_search_instance() -> str: - """Return the active search API URL from admin settings, falling back to env var.""" - settings = _get_search_settings() - url = (settings.get("search_url") or "").strip() - if url: - return url.rstrip("/") - return SEARXNG_INSTANCE - - -def _get_provider_key(provider: str) -> str: - """Return the API key for a specific provider, with legacy fallback.""" - settings = _get_search_settings() - key_map = { - "brave": "brave_api_key", - "google_pse": "google_pse_key", - "tavily": "tavily_api_key", - "serper": "serper_api_key", - } - field = key_map.get(provider, "") - if field: - val = (settings.get(field) or "").strip() - if val: - return val - # Legacy fallback: old shared search_api_key field - return (settings.get("search_api_key") or "").strip() - - -def _get_result_count() -> int: - """Return configured result count, default 5.""" - settings = _get_search_settings() - try: - return int(settings.get("search_result_count", 5)) - except (ValueError, TypeError): - return 5 - - -# Canonical SafeSearch levels: "strict" (default), "moderate", "off". -# Each provider has its own knob name and value space — see _safesearch_for(...). -_SAFESEARCH_LEVELS = ("strict", "moderate", "off") - - -def _get_safesearch_level() -> str: - """Return the configured SafeSearch level, normalized to one of - _SAFESEARCH_LEVELS. Defaults to 'strict' to avoid adult / spammy URLs - bleeding into research and web_search results.""" - settings = _get_search_settings() - raw = (settings.get("search_safesearch") or "strict").strip().lower() - if raw in _SAFESEARCH_LEVELS: - return raw - # Accept a few common aliases so a manually-edited config doesn't - # silently lose SafeSearch — fall back to strict on anything unknown. - aliases = { - "on": "strict", "high": "strict", "2": "strict", - "medium": "moderate", "1": "moderate", "default": "moderate", - "none": "off", "disabled": "off", "0": "off", - } - return aliases.get(raw, "strict") - - -def _safesearch_for(provider: str) -> Optional[str]: - """Translate the canonical level into the per-provider param value. - Returns None when SafeSearch should be omitted entirely for a provider - (some APIs default to filtered and treat missing-param as "off").""" - level = _get_safesearch_level() - if provider == "searxng": - # SearXNG: integer 0/1/2 - return {"strict": "2", "moderate": "1", "off": "0"}[level] - if provider == "brave": - # Brave: strict / moderate / off - return level - if provider == "duckduckgo_lib": - # duckduckgo-search library: on / moderate / off - return {"strict": "on", "moderate": "moderate", "off": "off"}[level] - if provider == "duckduckgo_html": - # DDG HTML endpoint kp: 1 strict / -1 moderate / -2 off - return {"strict": "1", "moderate": "-1", "off": "-2"}[level] - if provider == "google_pse": - # Google PSE: 'active' filters explicit; 'off' disables. Treat - # moderate the same as active — Google PSE has no middle tier. - return None if level == "off" else "active" - if provider == "serper": - # Serper proxies Google's `safe` param. - return None if level == "off" else "active" - return None - - -# ── SearXNG ── - -_NEWS_HINTS = ("news", "nyheter", "headlines", "breaking", "latest", "today", "idag") - -# The instance's DEFAULT general engines (google/duckduckgo/brave/startpage/ -# wikipedia) are routinely rate-limited / CAPTCHA-blocked and return nothing, -# so a plain general query comes back empty. Pin engines that actually respond -# (verified working on this instance) so non-news queries get results without -# enabling any third-party API fallback. Override via the SEARXNG_GENERAL_ENGINES -# env var if the working set changes. -_GENERAL_ENGINES = os.environ.get("SEARXNG_GENERAL_ENGINES", "bing,mojeek,presearch") - - -def searxng_search_api(query: str, count: int = 10, categories: str = "general", - time_filter: Optional[str] = None) -> List[dict]: - """Search using SearXNG JSON API. Returns list of {title, url, snippet}.""" - instance = _get_search_instance() - api_key = "" - headers = {"User-Agent": "Mozilla/5.0"} - if api_key: - headers["Authorization"] = f"Bearer {api_key}" - # News/fresh queries do badly in the 'general' category — it favours - # encyclopedic/tourism pages, ignores recency, and (with no language pin) - # bleeds in foreign-language results. When the agent layer detected - # freshness (time_filter) or the query reads like a news lookup, switch to - # the 'news' category, constrain recency, and pin language to English so a - # search like "Canada latest news" returns actual news instead of Wikipedia. - # Pin English for ALL searches — without it SearXNG mixes languages and - # brand-ambiguous terms bleed in foreign SEO pages (Honda "Odyssey" JP, - # Japanese "Trojan" malware blogs, Chinese math forums for "Polyphemus"). - params = {"q": query, "format": "json", "language": "en", - "safesearch": _safesearch_for("searxng")} - q_lc = query.lower() - is_news = time_filter is not None or any(h in q_lc for h in _NEWS_HINTS) - if is_news and categories == "general": - params["categories"] = "news" - if time_filter in ("day", "week", "month", "year"): - # 'day' is too sparse on most SearXNG news engines — widen to a week - # so there's enough volume; the news category already biases recent. - params["time_range"] = "week" if time_filter in ("day", "week") else time_filter - else: - params["categories"] = categories - # Route general queries to engines that aren't blocked (the default - # general set returns 0 on this instance — see _GENERAL_ENGINES). - if categories == "general" and _GENERAL_ENGINES: - params["engines"] = _GENERAL_ENGINES - try: - def _parse_results(results): - return [ - { - "title": r.get("title", ""), - "url": r.get("url", ""), - "snippet": r.get("content", ""), - } - for r in results[:count] - if r.get("url") - ] - - def _run(search_params): - response = httpx.get( - f"{instance}/search", - params=search_params, - headers=headers or None, - timeout=15, - ) - response.raise_for_status() - data = response.json() - return _parse_results(data.get("results", [])), data - - active_params = params - parsed, data = _run(active_params) - if not parsed and is_news and categories == "general": - # Some self-hosted SearXNG configs have no working news engines. - # Fall back to the known-good general engines before reporting an - # empty search, otherwise common queries like "Canada news" fail. - fallback = { - "q": query, - "format": "json", - "language": "en", - "categories": "general", - "safesearch": _safesearch_for("searxng"), - } - if _GENERAL_ENGINES: - fallback["engines"] = _GENERAL_ENGINES - logger.info( - "SearXNG news search returned 0 results for %r; retrying general engines", - query, - ) - active_params = fallback - parsed, data = _run(active_params) - if not parsed and active_params.get("language"): - fallback = dict(active_params) - fallback.pop("language", None) - logger.info( - "SearXNG language-pinned search returned 0 results for %r; retrying without language", - query, - ) - active_params = fallback - parsed, data = _run(active_params) - if not parsed and active_params.get("engines"): - fallback = dict(active_params) - fallback.pop("engines", None) - logger.info( - "SearXNG pinned engines returned 0 results for %r; retrying default engines", - query, - ) - parsed, data = _run(fallback) - logger.info(f"SearXNG JSON API returned {len(parsed)} results for: {query}") - if not parsed: - unresponsive = data.get("unresponsive_engines") if isinstance(data, dict) else None - if unresponsive: - logger.info(f"SearXNG unresponsive engines for {query!r}: {unresponsive}") - return parsed - except Exception as e: - logger.warning(f"SearXNG JSON API search failed: {e}") - html_results = searxng_search(query, max_results=count) - if html_results: - logger.info(f"SearXNG HTML fallback returned {len(html_results)} results for: {query}") - return html_results - - -def searxng_search(query, max_results=10): - """Search using SearXNG instance - parsing HTML.""" - instance = _get_search_instance() - api_key = "" - req_headers = {"User-Agent": "Mozilla/5.0"} - if api_key: - req_headers["Authorization"] = f"Bearer {api_key}" - try: - response = httpx.get( - f"{instance}/search", - params={"q": query, "safesearch": _safesearch_for("searxng")}, - headers=req_headers, - timeout=10, - ) - if response.is_success: - soup = BeautifulSoup(response.text, "html.parser") - results = [] - for article in soup.select("article.result")[:max_results]: - title_elem = article.select_one("h3 a") - if not title_elem: - continue - title = title_elem.get_text(strip=True) - url = title_elem.get("href", "") - snippet_elem = article.select_one("p.content") - snippet = snippet_elem.get_text(strip=True) if snippet_elem else "" - results.append({"title": title, "url": url, "snippet": snippet}) - logger.info(f"SearXNG search (HTML) returned {len(results)} results") - return results - except Exception as e: - logger.error(f"SearXNG search failed: {e}") - return [] - - -# ── Brave ── - -def brave_search(query: str, count: int = 10, time_filter: Optional[str] = None) -> List[dict]: - """Search using Brave API with key from admin settings or env var.""" - api_key = _get_provider_key("brave") or os.environ.get("DATA_BRAVE_API_KEY") or "" - return _brave_search_impl(query, count, time_filter, search_config={"brave_api_key": api_key}) - - -def _brave_search_impl(query: str, count: int, time_filter: Optional[str] = None, search_config: dict = None) -> List[dict]: - """Core Brave API call. Returns a list of result dicts or an empty list on failure.""" - enhanced_query = build_enhanced_query(query, time_filter) - config = search_config or {} - - brave_api_key = config.get("brave_api_key") - if not brave_api_key: - brave_api_key = os.environ.get("DATA_BRAVE_API_KEY") - - if not brave_api_key: - logger.warning("Brave API key not found, returning empty results for fallback") - return [] - - headers = {"X-Subscription-Token": brave_api_key, "Accept": "application/json"} - params = {"q": enhanced_query, "count": count, - "safesearch": _safesearch_for("brave")} - if time_filter: - time_map = {"day": "day", "week": "week", "month": "month", "year": "year"} - if time_filter in time_map: - params["freshness"] = time_map[time_filter] - - logger.info(f"Executing Brave search with query: {enhanced_query}") - try: - response = httpx.get( - "https://api.search.brave.com/res/v1/web/search", - headers=headers, - params=params, - timeout=REQUEST_TIMEOUT, - ) - if response.status_code == 429: - raise RateLimitError("Brave rate limit hit") - response.raise_for_status() - except httpx.RequestError as e: - error_logger.error(f"NetworkError during Brave search: {e}") - return [] - except RateLimitError as e: - error_logger.error(str(e)) - return [] - - try: - data = response.json() - except json.JSONDecodeError as e: - logger.error(f"Failed to parse Brave API response: {e}") - return [] - - results = [] - if "web" in data and "results" in data["web"]: - for item in data["web"]["results"][:count]: - url = item.get("url", "") - if not url: - continue - results.append({ - "title": item.get("title", ""), - "url": url, - "snippet": item.get("description", "") or item.get("content", ""), - "age": item.get("date", "") if item.get("date") else "", - }) - - logger.info(f"Brave search returned {len(results)} results") - return results - - -# ── DuckDuckGo (free, no key) ── - -def _is_duckduckgo_host(host: str) -> bool: - """True only for duckduckgo.com and its subdomains — not substring look-alikes - such as ``duckduckgo.com.evil.com`` or ``notduckduckgo.com``.""" - host = (host or "").lower() - return host == "duckduckgo.com" or host.endswith(".duckduckgo.com") - - -def _resolve_ddg_redirect(raw: str) -> str: - """Resolve a DuckDuckGo /l/?uddg= redirect URL to its destination.""" - if not raw: - return raw - # Handle protocol-relative URLs - resolved = raw - if resolved.startswith("//"): - resolved = "https:" + resolved - elif resolved.startswith("/"): - resolved = urljoin("https://html.duckduckgo.com", resolved) - # Extract the actual URL from DuckDuckGo's /l/?uddg= redirect - try: - parsed = urlparse(resolved) - if _is_duckduckgo_host(parsed.hostname) and parsed.path.rstrip("/") == "/l": - qs = parse_qs(parsed.query) - if "uddg" in qs: - return qs["uddg"][0] - except Exception: - pass - return resolved - - -def duckduckgo_search(query: str, count: int = 10, time_filter: Optional[str] = None) -> List[dict]: - """Search using DuckDuckGo via the duckduckgo-search library. No API key needed.""" - def _html_fallback() -> List[dict]: - try: - response = httpx.get( - "https://html.duckduckgo.com/html/", - params={"q": query, "kp": _safesearch_for("duckduckgo_html")}, - headers={"User-Agent": "Mozilla/5.0"}, - timeout=REQUEST_TIMEOUT, - ) - response.raise_for_status() - soup = BeautifulSoup(response.text, "html.parser") - parsed = [] - for result in soup.select(".result")[:count]: - link = result.select_one(".result__a") - if not link: - continue - url = _resolve_ddg_redirect(link.get("href", "")) - if not url: - continue - snippet_el = result.select_one(".result__snippet") - parsed.append({ - "title": link.get_text(" ", strip=True), - "url": url, - "snippet": snippet_el.get_text(" ", strip=True) if snippet_el else "", - }) - logger.info(f"DuckDuckGo HTML search returned {len(parsed)} results") - return parsed - except Exception as e: - logger.warning(f"DuckDuckGo HTML search failed: {e}") - return [] - - try: - from duckduckgo_search import DDGS - except ImportError: - logger.warning("duckduckgo-search package not installed; using HTML fallback") - return _html_fallback() - - timelimit = None - if time_filter: - time_map = {"day": "d", "week": "w", "month": "m", "year": "y"} - timelimit = time_map.get(time_filter) - - try: - ddgs = DDGS() - raw = ddgs.text(query, max_results=count, timelimit=timelimit, - safesearch=_safesearch_for("duckduckgo_lib")) - results = [] - for item in raw: - url = item.get("href", "") - if not url: - continue - results.append({ - "title": item.get("title", ""), - "url": url, - "snippet": item.get("body", ""), - }) - logger.info(f"DuckDuckGo search returned {len(results)} results") - return results or _html_fallback() - except Exception as e: - logger.warning(f"DuckDuckGo search failed: {e}") - return _html_fallback() - - -# ── Google Programmable Search Engine ── - -def google_pse_search(query: str, count: int = 10, time_filter: Optional[str] = None) -> List[dict]: - """Search using Google PSE (Custom Search JSON API). - - Requires two keys in settings: - - search_api_key: Google API key - - google_pse_cx: Programmable Search Engine ID (cx) - Or env vars GOOGLE_API_KEY and GOOGLE_PSE_CX. - """ - settings = _get_search_settings() - api_key = _get_provider_key("google_pse") or os.environ.get("GOOGLE_API_KEY", "") - cx = (settings.get("google_pse_cx") or "").strip() or os.environ.get("GOOGLE_PSE_CX", "") - - if not api_key or not cx: - logger.warning("Google PSE: missing API key or CX ID") - return [] - - params = { - "key": api_key, - "cx": cx, - "q": query, - "num": min(count, 10), # Google PSE max is 10 per request - } - safe = _safesearch_for("google_pse") - if safe: - params["safe"] = safe - if time_filter: - # dateRestrict: d[number], w[number], m[number], y[number] - time_map = {"day": "d1", "week": "w1", "month": "m1", "year": "y1"} - if time_filter in time_map: - params["dateRestrict"] = time_map[time_filter] - - try: - response = httpx.get( - "https://www.googleapis.com/customsearch/v1", - params=params, - timeout=REQUEST_TIMEOUT, - ) - if response.status_code == 429: - raise RateLimitError("Google PSE rate limit hit") - response.raise_for_status() - data = response.json() - except httpx.RequestError as e: - error_logger.error(f"Google PSE search failed: {e}") - return [] - except RateLimitError as e: - error_logger.error(str(e)) - return [] - - results = [] - for item in data.get("items", [])[:count]: - url = item.get("link", "") - if not url: - continue - results.append({ - "title": item.get("title", ""), - "url": url, - "snippet": item.get("snippet", ""), - }) - - logger.info(f"Google PSE returned {len(results)} results") - return results - - -# ── Tavily ── - -def tavily_search(query: str, count: int = 10, time_filter: Optional[str] = None) -> List[dict]: - """Search using Tavily API. Requires search_api_key or TAVILY_API_KEY env var.""" - api_key = _get_provider_key("tavily") or os.environ.get("TAVILY_API_KEY", "") - if not api_key: - logger.warning("Tavily: no API key configured") - return [] - - payload = { - "query": query, - "max_results": count, - "include_answer": False, - } - if time_filter: - time_map = {"day": "day", "week": "week", "month": "month", "year": "year"} - if time_filter in time_map: - payload["days"] = {"day": 1, "week": 7, "month": 30, "year": 365}[time_filter] - - try: - response = httpx.post( - "https://api.tavily.com/search", - json=payload, - headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, - timeout=REQUEST_TIMEOUT, - ) - if response.status_code == 429: - raise RateLimitError("Tavily rate limit hit") - response.raise_for_status() - data = response.json() - except httpx.RequestError as e: - error_logger.error(f"Tavily search failed: {e}") - return [] - except RateLimitError as e: - error_logger.error(str(e)) - return [] - - results = [] - for item in data.get("results", [])[:count]: - url = item.get("url", "") - if not url: - continue - results.append({ - "title": item.get("title", ""), - "url": url, - "snippet": item.get("content", ""), - "age": item.get("published_date", ""), - }) - - logger.info(f"Tavily returned {len(results)} results") - return results - - -# ── Serper.dev ── - -def serper_search(query: str, count: int = 10, time_filter: Optional[str] = None) -> List[dict]: - """Search using Serper.dev API. Requires search_api_key or SERPER_API_KEY env var.""" - api_key = _get_provider_key("serper") or os.environ.get("SERPER_API_KEY", "") - if not api_key: - logger.warning("Serper: no API key configured") - return [] - - payload = { - "q": query, - "num": count, - } - safe = _safesearch_for("serper") - if safe: - payload["safe"] = safe - if time_filter: - time_map = {"day": "qdr:d", "week": "qdr:w", "month": "qdr:m", "year": "qdr:y"} - if time_filter in time_map: - payload["tbs"] = time_map[time_filter] - - try: - response = httpx.post( - "https://google.serper.dev/search", - json=payload, - headers={"X-API-KEY": api_key, "Content-Type": "application/json"}, - timeout=REQUEST_TIMEOUT, - ) - if response.status_code == 429: - raise RateLimitError("Serper rate limit hit") - response.raise_for_status() - data = response.json() - except httpx.RequestError as e: - error_logger.error(f"Serper search failed: {e}") - return [] - except RateLimitError as e: - error_logger.error(str(e)) - return [] - - results = [] - for item in data.get("organic", [])[:count]: - url = item.get("link", "") - if not url: - continue - results.append({ - "title": item.get("title", ""), - "url": url, - "snippet": item.get("snippet", ""), - "age": item.get("date", ""), - }) - - logger.info(f"Serper returned {len(results)} results") - return results +sys.modules[__name__] = _providers diff --git a/tests/test_search_module_consolidation.py b/tests/test_search_module_consolidation.py new file mode 100644 index 0000000..61b097b --- /dev/null +++ b/tests/test_search_module_consolidation.py @@ -0,0 +1,35 @@ +"""Search consolidation regression tests. + +``src.search`` is still a public import path for agent/deep-research code, but +core/provider behavior should come from the services.search implementation. +""" + +import importlib + + +def test_src_search_core_aliases_services_core(): + src_core = importlib.import_module("src.search.core") + service_core = importlib.import_module("services.search.core") + + assert src_core is service_core + assert src_core.comprehensive_web_search is service_core.comprehensive_web_search + assert src_core.invalidate_search_cache is service_core.invalidate_search_cache + + +def test_src_search_providers_aliases_services_providers(): + src_providers = importlib.import_module("src.search.providers") + service_providers = importlib.import_module("services.search.providers") + + assert src_providers is service_providers + assert src_providers._resolve_ddg_redirect is service_providers._resolve_ddg_redirect + assert src_providers._safesearch_for is service_providers._safesearch_for + + +def test_src_search_package_exports_still_resolve(): + import src.search as search + import services.search as service_search + + assert search.comprehensive_web_search is service_search.comprehensive_web_search + assert search.searxng_search_results is service_search.searxng_search_results + assert search.searxng_search_api is service_search.searxng_search_api + assert search.PROVIDER_INFO is service_search.PROVIDER_INFO diff --git a/tests/test_service_search_provider_guards.py b/tests/test_service_search_provider_guards.py index 8e81b1a..373928e 100644 --- a/tests/test_service_search_provider_guards.py +++ b/tests/test_service_search_provider_guards.py @@ -1,8 +1,7 @@ -"""Regression tests for the services.search provider copy. +"""Regression tests for the canonical services.search provider implementation. -The UI search routes import services.search, while agent/deep-research paths -still import src.search. Keep the service-side copy aligned with the safer -provider guards already present in src.search. +The old src.search provider path aliases this module; these tests pin the +behavior at the single implementation point. """ import sys