Search: consolidate core and provider implementations

Co-authored-by: ghreprimand <203024559+ghreprimand@users.noreply.github.com>
This commit is contained in:
ghreprimand
2026-06-02 07:02:26 -05:00
committed by GitHub
parent de92bbe47a
commit c075abce5d
4 changed files with 54 additions and 1064 deletions

View File

@@ -1,450 +1,12 @@
"""Core search orchestrators: searxng_search_results, comprehensive_web_search, config, cache invalidation."""
"""Compatibility wrapper for the canonical services.search.core module.
import json
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, List, Set
from urllib.parse import urlparse
``src.search.core`` remains importable for older agent/deep-research code, but
the implementation now lives in ``services.search.core`` so provider ordering,
cache invalidation, and search route behavior cannot drift between copies.
"""
from .analytics import (
NetworkError,
ParseError,
RateLimitError,
error_logger,
_record_query,
)
from .cache import (
SEARCH_CACHE_DIR,
search_cache_index,
generate_cache_key,
cleanup_cache,
)
from .query import _cache_duration_for_query
from .ranking import rank_search_results
from .providers import (
searxng_search_api,
brave_search,
duckduckgo_search,
google_pse_search,
tavily_search,
serper_search,
_get_search_settings,
_get_result_count,
)
from .content import (
fetch_webpage_content,
extract_key_points,
get_tldr,
extract_quotes,
extract_statistics,
)
import sys
logger = logging.getLogger(__name__)
from services.search import core as _core
# ========= CONFIG =========
SEARCH_CONFIG: Dict[str, Any] = {
"primary_provider": "searxng",
}
def get_search_config() -> Dict[str, Any]:
"""Get current search configuration including active provider info."""
config = SEARCH_CONFIG.copy()
settings = _get_search_settings()
provider = settings.get("search_provider", "searxng")
config["active_provider"] = provider
config["has_api_key"] = bool((settings.get("search_api_key") or "").strip())
config["result_count"] = _get_result_count()
if provider == "searxng":
from .providers import _get_search_instance
config["search_url"] = _get_search_instance()
return config
def update_search_config(api_key: str = None, **kwargs):
"""Update search configuration (e.g. Brave API key)."""
if api_key:
SEARCH_CONFIG["brave_api_key"] = api_key
def _call_provider(provider_name: str, query: str, count: int, time_filter: str = None) -> List[dict]:
"""Call a search provider by name. Returns list of results or empty list."""
if provider_name == "searxng":
return searxng_search_api(query, count, time_filter=time_filter)
elif provider_name == "brave":
return brave_search(query, count, time_filter)
elif provider_name == "duckduckgo":
return duckduckgo_search(query, count, time_filter)
elif provider_name == "google_pse":
return google_pse_search(query, count, time_filter)
elif provider_name == "tavily":
return tavily_search(query, count, time_filter)
elif provider_name == "serper":
return serper_search(query, count, time_filter)
return []
# If the self-hosted SearXNG instance is up but all enabled engines return
# empty, fall back to the no-key provider so "search X" still works on fresh
# installs. Users can override/disable with `search_fallback_chain`.
_FALLBACK_ORDER = ["duckduckgo"]
def _build_provider_chain(primary: str) -> List[str]:
"""Build ordered list: primary first, then fallbacks (skipping primary
and dedupes). The fallback list comes from
`settings.search_fallback_chain` if the user configured one, otherwise
the hardcoded default above."""
chain = [primary]
settings = _get_search_settings()
user_chain = settings.get("search_fallback_chain") or []
if isinstance(user_chain, str):
# Tolerate comma-separated form from older payloads.
user_chain = [s.strip() for s in user_chain.split(",") if s.strip()]
fallbacks = user_chain if user_chain else _FALLBACK_ORDER
for fb in fallbacks:
if fb and fb != primary and fb not in chain and fb != "disabled":
chain.append(fb)
return chain
# ----------------------------------------------------------------------
# Unified search with caching and retry
# ----------------------------------------------------------------------
def searxng_search_results(query: str, count: int = 10, time_filter: str = None) -> list[dict]:
"""Perform a web search using configured provider with caching and retry."""
settings = _get_search_settings()
search_provider = settings.get("search_provider", "searxng")
result_count = _get_result_count()
# Use configured count if caller used default
if count == 10:
count = result_count
cache_key = generate_cache_key(f"{query}|{count}|{time_filter}")
cache_file = SEARCH_CACHE_DIR / f"{cache_key}.cache"
# Check cache
if cache_file.exists():
try:
with open(cache_file, "r", encoding="utf-8") as f:
cached_data = json.load(f)
expiry_raw = cached_data.get("expiry")
expiry = datetime.fromisoformat(expiry_raw) if expiry_raw else None
if expiry and datetime.now() < expiry:
logger.debug(f"Search cache hit for query: {query}")
results = cached_data["data"]
_record_query(query, bool(results), cache_hit=True)
return results
else:
cache_file.unlink(missing_ok=True)
search_cache_index.pop(cache_key, None)
except Exception as e:
logger.warning(f"Failed to read search cache for {query}: {e}")
cache_file.unlink(missing_ok=True)
search_cache_index.pop(cache_key, None)
logger.debug(f"Search cache miss for query: {query}")
if search_provider == "disabled":
logger.info("Search is disabled via admin settings")
return []
provider_chain = _build_provider_chain(search_provider)
results: List[dict] = []
for provider_name in provider_chain:
for attempt in range(2):
try:
logger.info(f"Attempting {provider_name} search (attempt {attempt + 1})")
results = _call_provider(provider_name, query, count, time_filter)
if results:
logger.info(f"{provider_name} search succeeded with {len(results)} results")
break
except (NetworkError, ParseError, RateLimitError) as e:
error_logger.error(f"{provider_name} search error (attempt {attempt + 1}): {e}")
except Exception as e:
error_logger.error(f"Unexpected error during {provider_name} search (attempt {attempt + 1}): {e}")
if results:
break
success = bool(results)
_record_query(query, success, cache_hit=False)
if success:
results = rank_search_results(query, results)
try:
expiry = datetime.now() + _cache_duration_for_query(query)
cache_data = {
"timestamp": datetime.now().isoformat(),
"expiry": expiry.isoformat(),
"data": results,
}
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(cache_data, f)
search_cache_index[cache_key] = datetime.now()
cleanup_cache(SEARCH_CACHE_DIR, search_cache_index, timedelta(hours=1))
except Exception as e:
logger.warning(f"Failed to write search cache for {query}: {e}")
if not success:
logger.error(f"All search providers failed for query: {query}")
return results
# ----------------------------------------------------------------------
# Cache invalidation
# ----------------------------------------------------------------------
def invalidate_search_cache(query: Optional[str] = None) -> None:
"""Invalidate cached search results. None clears all, otherwise just the given query."""
if query is None:
for file in SEARCH_CACHE_DIR.glob("*.cache"):
try:
file.unlink(missing_ok=True)
except Exception as e:
error_logger.warning(f"Failed to delete cache file {file}: {e}")
search_cache_index.clear()
logger.info("All search cache entries have been cleared.")
else:
# Match the key the write path stores: searxng_search_results replaces
# the caller's default count with the configured _get_result_count()
# (default 5), so a hardcoded "|10|None" never matched a real entry.
cache_key = generate_cache_key(f"{query}|{_get_result_count()}|None")
cache_file = SEARCH_CACHE_DIR / f"{cache_key}.cache"
if cache_file.exists():
try:
cache_file.unlink(missing_ok=True)
search_cache_index.pop(cache_key, None)
logger.info(f"Cache entry for query '{query}' has been invalidated.")
except Exception as e:
error_logger.warning(f"Failed to delete cache file for query '{query}': {e}")
else:
logger.info(f"No cache entry found for query '{query}'.")
# ----------------------------------------------------------------------
# Comprehensive web search (with advanced filtering)
# ----------------------------------------------------------------------
def comprehensive_web_search(
query: str,
max_pages: int = 3,
max_workers: int = 4,
time_filter: str = None,
domain_whitelist: Optional[Set[str]] = None,
domain_blacklist: Optional[Set[str]] = None,
content_type: Optional[str] = None,
language: Optional[str] = None,
min_content_length: int = 0,
return_sources: bool = False,
):
"""Perform comprehensive web search with content fetching and advanced filtering."""
logger.info(f"Starting comprehensive search for: {query}")
if time_filter:
logger.info(f"Applying time filter: {time_filter}")
settings = _get_search_settings()
search_provider = settings.get("search_provider", "searxng")
result_count = _get_result_count()
if search_provider == "disabled":
logger.info("Search is disabled via admin settings")
msg = "Web search is disabled by the administrator."
return (msg, []) if return_sources else msg
# Use configured result count (at least max_pages for content fetching)
fetch_count = max(result_count, max_pages)
provider_chain = _build_provider_chain(search_provider)
# Each provider gets 2 attempts (matches the inner unified_search behavior).
# Empty results are tracked separately from exceptions so the failure
# message can tell a soft-fail (provider returned []) apart from a real
# error (network blow-up, rate limit, etc.) — useful both for logging
# and for the model when it sees the response.
search_results = []
provider_attempts = {} # provider -> "ok N", "empty", "error: ..."
for provider_name in provider_chain:
last_err = None
empty = False
for attempt in range(2):
try:
search_results = _call_provider(provider_name, query, fetch_count, time_filter)
if search_results:
provider_attempts[provider_name] = f"ok ({len(search_results)})"
logger.info(f"Comprehensive search: {provider_name} returned {len(search_results)} results")
break
# Empty result — try once more (transient empties are common on flaky instances)
empty = True
except Exception as e:
last_err = e
logger.warning(f"Comprehensive search: {provider_name} attempt {attempt + 1} failed: {e}")
if search_results:
break
if last_err is not None:
provider_attempts[provider_name] = f"error: {last_err}"
elif empty:
provider_attempts[provider_name] = "empty"
if not search_results:
# Build a per-provider tally so the model (and logs) see which
# providers were tried and how each one fared, instead of the
# uninformative "No search results found".
tally = ", ".join(f"{p}:{r}" for p, r in provider_attempts.items()) or "no providers configured"
any_errors = any(r.startswith("error") for r in provider_attempts.values())
if any_errors:
msg = f"Web search failed — all providers errored or returned empty. Tried: {tally}"
logger.error(msg)
else:
msg = (
f"No search results found. Tried: {tally}. "
"All providers returned empty — possibly a niche query or upstream rate-limiting; "
"rephrasing or using the browser tool for a specific URL may help."
)
logger.warning(msg)
return (msg, []) if return_sources else msg
search_results = rank_search_results(query, search_results)
# URL filter helper
def url_passes_filters(url: str) -> bool:
try:
netloc = urlparse(url).netloc.lower()
except Exception:
return False
if domain_whitelist is not None and netloc not in domain_whitelist:
return False
if domain_blacklist is not None and netloc in domain_blacklist:
return False
if content_type:
ct = content_type.lower()
if ct == "article":
if not any(k in url.lower() for k in ("article", "blog", "news", "post")):
return False
elif ct == "forum":
if not any(k in url.lower() for k in ("forum", "discussion", "thread", "topic")):
return False
elif ct == "academic":
if not any(k in url.lower() for k in ("pdf", "doi", "scholar", "arxiv", "journal", "research")):
return False
if language:
lang_pat = language.lower()
if not (f"/{lang_pat}/" in url.lower() or f"?lang={lang_pat}" in url.lower() or f"&lang={lang_pat}" in url.lower()):
return False
return True
filtered_urls = [r["url"] for r in search_results[:max_pages] if url_passes_filters(r["url"])]
if not filtered_urls:
logger.warning("All URLs filtered out by advanced criteria")
msg = "No suitable results after applying filters."
return (msg, []) if return_sources else msg
# Build sources list for the frontend (before content fetching)
_source_list = [
{"url": r.get("url", ""), "title": r.get("title", "")}
for r in search_results if r.get("url")
]
# Fetch content in parallel
fetched_content = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {
executor.submit(fetch_webpage_content, url, 8, retry_attempt=0): url
for url in filtered_urls
}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
if result["success"] and result["content"] and len(result["content"]) >= min_content_length:
fetched_content.append(result)
except Exception as e:
logger.error(f"Exception while fetching {url}: {str(e)}")
logger.info(f"Successfully fetched content from {len(fetched_content)} pages")
# Format results
output_parts = []
if search_results:
output_parts.append("```sources")
for i, result in enumerate(search_results, 1):
output_parts.append(f"[{i}] {result['title']}")
output_parts.append(f" {result['url']}")
if result.get("age"):
output_parts.append(f" {result['age']}")
output_parts.append("```")
output_parts.append("")
output_parts.append("=" * 70)
output_parts.append("WEB SEARCH RESULTS AND FETCHED CONTENT")
output_parts.append(f"Query: {query}")
output_parts.append(f"Searched {len(search_results)} results, fetched {len(fetched_content)} pages")
output_parts.append("=" * 70)
output_parts.append("")
output_parts.append("SEARCH RESULTS SUMMARY:")
output_parts.append("-" * 50)
for i, result in enumerate(search_results, 1):
output_parts.append(f"\n[{i}] {result['title']}")
output_parts.append(f" URL: {result['url']}")
output_parts.append(f" Snippet: {result['snippet'][:200]}...")
if result.get("age"):
output_parts.append(f" Age: {result['age']}")
if fetched_content:
output_parts.append("\n" + "=" * 70)
output_parts.append("FETCHED PAGE CONTENT:")
output_parts.append("-" * 50)
for i, content in enumerate(fetched_content, 1):
output_parts.append(f"\n[CONTENT {i}] From: {content['url']}")
output_parts.append(f"Title: {content['title']}")
output_parts.append("-" * 30)
text = content["content"][:3000]
if len(content["content"]) > 3000:
text += "... [truncated]"
output_parts.append(text)
key_points = extract_key_points(content["content"])
if key_points:
output_parts.append("\nKey Points:")
for pt in key_points[:5]:
output_parts.append(f"- {pt}")
tldr = get_tldr(content["content"])
if tldr:
output_parts.append("\nTL;DR:")
output_parts.append(tldr)
quotes = extract_quotes(content["content"])
if quotes:
output_parts.append("\nImportant Quotes:")
for q in quotes[:3]:
output_parts.append(f"\u201c{q}\u201d")
stats = extract_statistics(content["content"])
if stats:
output_parts.append("\nData / Statistics:")
for s in stats[:5]:
output_parts.append(f"- {s}")
output_parts.append("")
output_parts.append("=" * 70)
output_parts.append("END OF WEB SEARCH RESULTS")
output_parts.append("=" * 70)
instructions = (
"\n\nIMPORTANT INSTRUCTIONS:\n"
"1. Use the above web search results and fetched content to answer the user's question\n"
"2. Prioritize information from the FETCHED PAGE CONTENT section as it contains actual page data\n"
"3. Cross-reference multiple sources when possible\n"
"4. If the information is time-sensitive, pay attention to the age of the results\n"
"5. Be explicit if the search results don't contain sufficient information to fully answer the question"
)
output_parts.append(instructions)
result = "\n".join(output_parts)
return (result, _source_list) if return_sources else result
sys.modules[__name__] = _core

View File

@@ -1,618 +1,12 @@
"""Search provider implementations: SearXNG, Brave, DuckDuckGo, Google PSE, Tavily, Serper."""
"""Compatibility wrapper for the canonical services.search.providers module.
import json
import logging
import os
from typing import List, Optional
from urllib.parse import urljoin, urlparse, parse_qs
import httpx
from bs4 import BeautifulSoup
from src.constants import SEARXNG_INSTANCE
from .analytics import RateLimitError, error_logger
from .query import build_enhanced_query
logger = logging.getLogger(__name__)
REQUEST_TIMEOUT = 20
# Provider registry — maps setting value to (label, needs_key, needs_url)
PROVIDER_INFO = {
"searxng": ("SearXNG", False, True),
"brave": ("Brave Search", True, False),
"duckduckgo": ("DuckDuckGo", False, False),
"google_pse": ("Google PSE", True, False),
"tavily": ("Tavily", True, False),
"serper": ("Serper", True, False),
"disabled": ("Disabled", False, False),
}
# ── Settings helpers ──
def _get_search_settings() -> dict:
"""Return search settings from admin config, falling back to env defaults."""
try:
from src.settings import load_settings
return load_settings()
except Exception:
return {}
def _get_search_instance() -> str:
"""Return the active search API URL from admin settings, falling back to env var."""
settings = _get_search_settings()
url = (settings.get("search_url") or "").strip()
if url:
return url.rstrip("/")
return SEARXNG_INSTANCE
def _get_provider_key(provider: str) -> str:
"""Return the API key for a specific provider, with legacy fallback."""
settings = _get_search_settings()
key_map = {
"brave": "brave_api_key",
"google_pse": "google_pse_key",
"tavily": "tavily_api_key",
"serper": "serper_api_key",
}
field = key_map.get(provider, "")
if field:
val = (settings.get(field) or "").strip()
if val:
return val
# Legacy fallback: old shared search_api_key field
return (settings.get("search_api_key") or "").strip()
def _get_result_count() -> int:
"""Return configured result count, default 5."""
settings = _get_search_settings()
try:
return int(settings.get("search_result_count", 5))
except (ValueError, TypeError):
return 5
# Canonical SafeSearch levels: "strict" (default), "moderate", "off".
# Each provider has its own knob name and value space — see _safesearch_for(...).
_SAFESEARCH_LEVELS = ("strict", "moderate", "off")
def _get_safesearch_level() -> str:
"""Return the configured SafeSearch level, normalized to one of
_SAFESEARCH_LEVELS. Defaults to 'strict' to avoid adult / spammy URLs
bleeding into research and web_search results."""
settings = _get_search_settings()
raw = (settings.get("search_safesearch") or "strict").strip().lower()
if raw in _SAFESEARCH_LEVELS:
return raw
# Accept a few common aliases so a manually-edited config doesn't
# silently lose SafeSearch — fall back to strict on anything unknown.
aliases = {
"on": "strict", "high": "strict", "2": "strict",
"medium": "moderate", "1": "moderate", "default": "moderate",
"none": "off", "disabled": "off", "0": "off",
}
return aliases.get(raw, "strict")
def _safesearch_for(provider: str) -> Optional[str]:
"""Translate the canonical level into the per-provider param value.
Returns None when SafeSearch should be omitted entirely for a provider
(some APIs default to filtered and treat missing-param as "off")."""
level = _get_safesearch_level()
if provider == "searxng":
# SearXNG: integer 0/1/2
return {"strict": "2", "moderate": "1", "off": "0"}[level]
if provider == "brave":
# Brave: strict / moderate / off
return level
if provider == "duckduckgo_lib":
# duckduckgo-search library: on / moderate / off
return {"strict": "on", "moderate": "moderate", "off": "off"}[level]
if provider == "duckduckgo_html":
# DDG HTML endpoint kp: 1 strict / -1 moderate / -2 off
return {"strict": "1", "moderate": "-1", "off": "-2"}[level]
if provider == "google_pse":
# Google PSE: 'active' filters explicit; 'off' disables. Treat
# moderate the same as active — Google PSE has no middle tier.
return None if level == "off" else "active"
if provider == "serper":
# Serper proxies Google's `safe` param.
return None if level == "off" else "active"
return None
# ── SearXNG ──
_NEWS_HINTS = ("news", "nyheter", "headlines", "breaking", "latest", "today", "idag")
# The instance's DEFAULT general engines (google/duckduckgo/brave/startpage/
# wikipedia) are routinely rate-limited / CAPTCHA-blocked and return nothing,
# so a plain general query comes back empty. Pin engines that actually respond
# (verified working on this instance) so non-news queries get results without
# enabling any third-party API fallback. Override via the SEARXNG_GENERAL_ENGINES
# env var if the working set changes.
_GENERAL_ENGINES = os.environ.get("SEARXNG_GENERAL_ENGINES", "bing,mojeek,presearch")
def searxng_search_api(query: str, count: int = 10, categories: str = "general",
time_filter: Optional[str] = None) -> List[dict]:
"""Search using SearXNG JSON API. Returns list of {title, url, snippet}."""
instance = _get_search_instance()
api_key = ""
headers = {"User-Agent": "Mozilla/5.0"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
# News/fresh queries do badly in the 'general' category — it favours
# encyclopedic/tourism pages, ignores recency, and (with no language pin)
# bleeds in foreign-language results. When the agent layer detected
# freshness (time_filter) or the query reads like a news lookup, switch to
# the 'news' category, constrain recency, and pin language to English so a
# search like "Canada latest news" returns actual news instead of Wikipedia.
# Pin English for ALL searches — without it SearXNG mixes languages and
# brand-ambiguous terms bleed in foreign SEO pages (Honda "Odyssey" JP,
# Japanese "Trojan" malware blogs, Chinese math forums for "Polyphemus").
params = {"q": query, "format": "json", "language": "en",
"safesearch": _safesearch_for("searxng")}
q_lc = query.lower()
is_news = time_filter is not None or any(h in q_lc for h in _NEWS_HINTS)
if is_news and categories == "general":
params["categories"] = "news"
if time_filter in ("day", "week", "month", "year"):
# 'day' is too sparse on most SearXNG news engines — widen to a week
# so there's enough volume; the news category already biases recent.
params["time_range"] = "week" if time_filter in ("day", "week") else time_filter
else:
params["categories"] = categories
# Route general queries to engines that aren't blocked (the default
# general set returns 0 on this instance — see _GENERAL_ENGINES).
if categories == "general" and _GENERAL_ENGINES:
params["engines"] = _GENERAL_ENGINES
try:
def _parse_results(results):
return [
{
"title": r.get("title", ""),
"url": r.get("url", ""),
"snippet": r.get("content", ""),
}
for r in results[:count]
if r.get("url")
]
def _run(search_params):
response = httpx.get(
f"{instance}/search",
params=search_params,
headers=headers or None,
timeout=15,
)
response.raise_for_status()
data = response.json()
return _parse_results(data.get("results", [])), data
active_params = params
parsed, data = _run(active_params)
if not parsed and is_news and categories == "general":
# Some self-hosted SearXNG configs have no working news engines.
# Fall back to the known-good general engines before reporting an
# empty search, otherwise common queries like "Canada news" fail.
fallback = {
"q": query,
"format": "json",
"language": "en",
"categories": "general",
"safesearch": _safesearch_for("searxng"),
}
if _GENERAL_ENGINES:
fallback["engines"] = _GENERAL_ENGINES
logger.info(
"SearXNG news search returned 0 results for %r; retrying general engines",
query,
)
active_params = fallback
parsed, data = _run(active_params)
if not parsed and active_params.get("language"):
fallback = dict(active_params)
fallback.pop("language", None)
logger.info(
"SearXNG language-pinned search returned 0 results for %r; retrying without language",
query,
)
active_params = fallback
parsed, data = _run(active_params)
if not parsed and active_params.get("engines"):
fallback = dict(active_params)
fallback.pop("engines", None)
logger.info(
"SearXNG pinned engines returned 0 results for %r; retrying default engines",
query,
)
parsed, data = _run(fallback)
logger.info(f"SearXNG JSON API returned {len(parsed)} results for: {query}")
if not parsed:
unresponsive = data.get("unresponsive_engines") if isinstance(data, dict) else None
if unresponsive:
logger.info(f"SearXNG unresponsive engines for {query!r}: {unresponsive}")
return parsed
except Exception as e:
logger.warning(f"SearXNG JSON API search failed: {e}")
html_results = searxng_search(query, max_results=count)
if html_results:
logger.info(f"SearXNG HTML fallback returned {len(html_results)} results for: {query}")
return html_results
def searxng_search(query, max_results=10):
"""Search using SearXNG instance - parsing HTML."""
instance = _get_search_instance()
api_key = ""
req_headers = {"User-Agent": "Mozilla/5.0"}
if api_key:
req_headers["Authorization"] = f"Bearer {api_key}"
try:
response = httpx.get(
f"{instance}/search",
params={"q": query, "safesearch": _safesearch_for("searxng")},
headers=req_headers,
timeout=10,
)
if response.is_success:
soup = BeautifulSoup(response.text, "html.parser")
results = []
for article in soup.select("article.result")[:max_results]:
title_elem = article.select_one("h3 a")
if not title_elem:
continue
title = title_elem.get_text(strip=True)
url = title_elem.get("href", "")
snippet_elem = article.select_one("p.content")
snippet = snippet_elem.get_text(strip=True) if snippet_elem else ""
results.append({"title": title, "url": url, "snippet": snippet})
logger.info(f"SearXNG search (HTML) returned {len(results)} results")
return results
except Exception as e:
logger.error(f"SearXNG search failed: {e}")
return []
# ── Brave ──
def brave_search(query: str, count: int = 10, time_filter: Optional[str] = None) -> List[dict]:
"""Search using Brave API with key from admin settings or env var."""
api_key = _get_provider_key("brave") or os.environ.get("DATA_BRAVE_API_KEY") or ""
return _brave_search_impl(query, count, time_filter, search_config={"brave_api_key": api_key})
def _brave_search_impl(query: str, count: int, time_filter: Optional[str] = None, search_config: dict = None) -> List[dict]:
"""Core Brave API call. Returns a list of result dicts or an empty list on failure."""
enhanced_query = build_enhanced_query(query, time_filter)
config = search_config or {}
brave_api_key = config.get("brave_api_key")
if not brave_api_key:
brave_api_key = os.environ.get("DATA_BRAVE_API_KEY")
if not brave_api_key:
logger.warning("Brave API key not found, returning empty results for fallback")
return []
headers = {"X-Subscription-Token": brave_api_key, "Accept": "application/json"}
params = {"q": enhanced_query, "count": count,
"safesearch": _safesearch_for("brave")}
if time_filter:
time_map = {"day": "day", "week": "week", "month": "month", "year": "year"}
if time_filter in time_map:
params["freshness"] = time_map[time_filter]
logger.info(f"Executing Brave search with query: {enhanced_query}")
try:
response = httpx.get(
"https://api.search.brave.com/res/v1/web/search",
headers=headers,
params=params,
timeout=REQUEST_TIMEOUT,
)
if response.status_code == 429:
raise RateLimitError("Brave rate limit hit")
response.raise_for_status()
except httpx.RequestError as e:
error_logger.error(f"NetworkError during Brave search: {e}")
return []
except RateLimitError as e:
error_logger.error(str(e))
return []
try:
data = response.json()
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Brave API response: {e}")
return []
results = []
if "web" in data and "results" in data["web"]:
for item in data["web"]["results"][:count]:
url = item.get("url", "")
if not url:
continue
results.append({
"title": item.get("title", ""),
"url": url,
"snippet": item.get("description", "") or item.get("content", ""),
"age": item.get("date", "") if item.get("date") else "",
})
logger.info(f"Brave search returned {len(results)} results")
return results
# ── DuckDuckGo (free, no key) ──
def _is_duckduckgo_host(host: str) -> bool:
"""True only for duckduckgo.com and its subdomains — not substring look-alikes
such as ``duckduckgo.com.evil.com`` or ``notduckduckgo.com``."""
host = (host or "").lower()
return host == "duckduckgo.com" or host.endswith(".duckduckgo.com")
def _resolve_ddg_redirect(raw: str) -> str:
"""Resolve a DuckDuckGo /l/?uddg= redirect URL to its destination."""
if not raw:
return raw
# Handle protocol-relative URLs
resolved = raw
if resolved.startswith("//"):
resolved = "https:" + resolved
elif resolved.startswith("/"):
resolved = urljoin("https://html.duckduckgo.com", resolved)
# Extract the actual URL from DuckDuckGo's /l/?uddg= redirect
try:
parsed = urlparse(resolved)
if _is_duckduckgo_host(parsed.hostname) and parsed.path.rstrip("/") == "/l":
qs = parse_qs(parsed.query)
if "uddg" in qs:
return qs["uddg"][0]
except Exception:
pass
return resolved
def duckduckgo_search(query: str, count: int = 10, time_filter: Optional[str] = None) -> List[dict]:
"""Search using DuckDuckGo via the duckduckgo-search library. No API key needed."""
def _html_fallback() -> List[dict]:
try:
response = httpx.get(
"https://html.duckduckgo.com/html/",
params={"q": query, "kp": _safesearch_for("duckduckgo_html")},
headers={"User-Agent": "Mozilla/5.0"},
timeout=REQUEST_TIMEOUT,
)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
parsed = []
for result in soup.select(".result")[:count]:
link = result.select_one(".result__a")
if not link:
continue
url = _resolve_ddg_redirect(link.get("href", ""))
if not url:
continue
snippet_el = result.select_one(".result__snippet")
parsed.append({
"title": link.get_text(" ", strip=True),
"url": url,
"snippet": snippet_el.get_text(" ", strip=True) if snippet_el else "",
})
logger.info(f"DuckDuckGo HTML search returned {len(parsed)} results")
return parsed
except Exception as e:
logger.warning(f"DuckDuckGo HTML search failed: {e}")
return []
try:
from duckduckgo_search import DDGS
except ImportError:
logger.warning("duckduckgo-search package not installed; using HTML fallback")
return _html_fallback()
timelimit = None
if time_filter:
time_map = {"day": "d", "week": "w", "month": "m", "year": "y"}
timelimit = time_map.get(time_filter)
try:
ddgs = DDGS()
raw = ddgs.text(query, max_results=count, timelimit=timelimit,
safesearch=_safesearch_for("duckduckgo_lib"))
results = []
for item in raw:
url = item.get("href", "")
if not url:
continue
results.append({
"title": item.get("title", ""),
"url": url,
"snippet": item.get("body", ""),
})
logger.info(f"DuckDuckGo search returned {len(results)} results")
return results or _html_fallback()
except Exception as e:
logger.warning(f"DuckDuckGo search failed: {e}")
return _html_fallback()
# ── Google Programmable Search Engine ──
def google_pse_search(query: str, count: int = 10, time_filter: Optional[str] = None) -> List[dict]:
"""Search using Google PSE (Custom Search JSON API).
Requires two keys in settings:
- search_api_key: Google API key
- google_pse_cx: Programmable Search Engine ID (cx)
Or env vars GOOGLE_API_KEY and GOOGLE_PSE_CX.
Historically Odysseus carried duplicate provider implementations under both
``src.search`` and ``services.search``. Keep the old import path working, but
make provider behavior come from one source of truth.
"""
settings = _get_search_settings()
api_key = _get_provider_key("google_pse") or os.environ.get("GOOGLE_API_KEY", "")
cx = (settings.get("google_pse_cx") or "").strip() or os.environ.get("GOOGLE_PSE_CX", "")
if not api_key or not cx:
logger.warning("Google PSE: missing API key or CX ID")
return []
import sys
params = {
"key": api_key,
"cx": cx,
"q": query,
"num": min(count, 10), # Google PSE max is 10 per request
}
safe = _safesearch_for("google_pse")
if safe:
params["safe"] = safe
if time_filter:
# dateRestrict: d[number], w[number], m[number], y[number]
time_map = {"day": "d1", "week": "w1", "month": "m1", "year": "y1"}
if time_filter in time_map:
params["dateRestrict"] = time_map[time_filter]
from services.search import providers as _providers
try:
response = httpx.get(
"https://www.googleapis.com/customsearch/v1",
params=params,
timeout=REQUEST_TIMEOUT,
)
if response.status_code == 429:
raise RateLimitError("Google PSE rate limit hit")
response.raise_for_status()
data = response.json()
except httpx.RequestError as e:
error_logger.error(f"Google PSE search failed: {e}")
return []
except RateLimitError as e:
error_logger.error(str(e))
return []
results = []
for item in data.get("items", [])[:count]:
url = item.get("link", "")
if not url:
continue
results.append({
"title": item.get("title", ""),
"url": url,
"snippet": item.get("snippet", ""),
})
logger.info(f"Google PSE returned {len(results)} results")
return results
# ── Tavily ──
def tavily_search(query: str, count: int = 10, time_filter: Optional[str] = None) -> List[dict]:
"""Search using Tavily API. Requires search_api_key or TAVILY_API_KEY env var."""
api_key = _get_provider_key("tavily") or os.environ.get("TAVILY_API_KEY", "")
if not api_key:
logger.warning("Tavily: no API key configured")
return []
payload = {
"query": query,
"max_results": count,
"include_answer": False,
}
if time_filter:
time_map = {"day": "day", "week": "week", "month": "month", "year": "year"}
if time_filter in time_map:
payload["days"] = {"day": 1, "week": 7, "month": 30, "year": 365}[time_filter]
try:
response = httpx.post(
"https://api.tavily.com/search",
json=payload,
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
timeout=REQUEST_TIMEOUT,
)
if response.status_code == 429:
raise RateLimitError("Tavily rate limit hit")
response.raise_for_status()
data = response.json()
except httpx.RequestError as e:
error_logger.error(f"Tavily search failed: {e}")
return []
except RateLimitError as e:
error_logger.error(str(e))
return []
results = []
for item in data.get("results", [])[:count]:
url = item.get("url", "")
if not url:
continue
results.append({
"title": item.get("title", ""),
"url": url,
"snippet": item.get("content", ""),
"age": item.get("published_date", ""),
})
logger.info(f"Tavily returned {len(results)} results")
return results
# ── Serper.dev ──
def serper_search(query: str, count: int = 10, time_filter: Optional[str] = None) -> List[dict]:
"""Search using Serper.dev API. Requires search_api_key or SERPER_API_KEY env var."""
api_key = _get_provider_key("serper") or os.environ.get("SERPER_API_KEY", "")
if not api_key:
logger.warning("Serper: no API key configured")
return []
payload = {
"q": query,
"num": count,
}
safe = _safesearch_for("serper")
if safe:
payload["safe"] = safe
if time_filter:
time_map = {"day": "qdr:d", "week": "qdr:w", "month": "qdr:m", "year": "qdr:y"}
if time_filter in time_map:
payload["tbs"] = time_map[time_filter]
try:
response = httpx.post(
"https://google.serper.dev/search",
json=payload,
headers={"X-API-KEY": api_key, "Content-Type": "application/json"},
timeout=REQUEST_TIMEOUT,
)
if response.status_code == 429:
raise RateLimitError("Serper rate limit hit")
response.raise_for_status()
data = response.json()
except httpx.RequestError as e:
error_logger.error(f"Serper search failed: {e}")
return []
except RateLimitError as e:
error_logger.error(str(e))
return []
results = []
for item in data.get("organic", [])[:count]:
url = item.get("link", "")
if not url:
continue
results.append({
"title": item.get("title", ""),
"url": url,
"snippet": item.get("snippet", ""),
"age": item.get("date", ""),
})
logger.info(f"Serper returned {len(results)} results")
return results
sys.modules[__name__] = _providers

View File

@@ -0,0 +1,35 @@
"""Search consolidation regression tests.
``src.search`` is still a public import path for agent/deep-research code, but
core/provider behavior should come from the services.search implementation.
"""
import importlib
def test_src_search_core_aliases_services_core():
src_core = importlib.import_module("src.search.core")
service_core = importlib.import_module("services.search.core")
assert src_core is service_core
assert src_core.comprehensive_web_search is service_core.comprehensive_web_search
assert src_core.invalidate_search_cache is service_core.invalidate_search_cache
def test_src_search_providers_aliases_services_providers():
src_providers = importlib.import_module("src.search.providers")
service_providers = importlib.import_module("services.search.providers")
assert src_providers is service_providers
assert src_providers._resolve_ddg_redirect is service_providers._resolve_ddg_redirect
assert src_providers._safesearch_for is service_providers._safesearch_for
def test_src_search_package_exports_still_resolve():
import src.search as search
import services.search as service_search
assert search.comprehensive_web_search is service_search.comprehensive_web_search
assert search.searxng_search_results is service_search.searxng_search_results
assert search.searxng_search_api is service_search.searxng_search_api
assert search.PROVIDER_INFO is service_search.PROVIDER_INFO

View File

@@ -1,8 +1,7 @@
"""Regression tests for the services.search provider copy.
"""Regression tests for the canonical services.search provider implementation.
The UI search routes import services.search, while agent/deep-research paths
still import src.search. Keep the service-side copy aligned with the safer
provider guards already present in src.search.
The old src.search provider path aliases this module; these tests pin the
behavior at the single implementation point.
"""
import sys