diff --git a/src/search/cache.py b/src/search/cache.py index 11fe722..e66aaff 100644 --- a/src/search/cache.py +++ b/src/search/cache.py @@ -1,57 +1,11 @@ -"""Search and content caching with LRU eviction.""" +"""Compatibility wrapper for the canonical services.search.cache module. -import hashlib -import logging -from datetime import datetime, timedelta -from pathlib import Path -from typing import Dict +``src.search.cache`` stays importable for older agent/deep-research code, but the +implementation now lives in ``services.search.cache`` so the two cannot drift. +""" -logger = logging.getLogger(__name__) +import sys -# Cache directories -CACHE_DIR = Path(__file__).resolve().parent.parent / "cache" -SEARCH_CACHE_DIR = CACHE_DIR / "search" -CONTENT_CACHE_DIR = CACHE_DIR / "content" -CACHE_MAX_ENTRIES = 1000 +from services.search import cache as _cache -# Create cache directories -SEARCH_CACHE_DIR.mkdir(parents=True, exist_ok=True) -CONTENT_CACHE_DIR.mkdir(parents=True, exist_ok=True) - -# Track cache size for LRU eviction -search_cache_index: Dict[str, datetime] = {} -content_cache_index: Dict[str, datetime] = {} - -# Cache metrics (shared across modules) -cache_metrics = {"hits": 0, "misses": 0, "evictions": 0} - - -def generate_cache_key(data: str) -> str: - """Generate a unique cache key using SHA-256 hash.""" - return hashlib.sha256(data.encode("utf-8")).hexdigest() - - -def cleanup_cache(cache_dir: Path, cache_index: Dict[str, datetime], max_age: timedelta): - """Remove expired cache entries and enforce LRU policy.""" - current_time = datetime.now() - files_in_dir = {f.name.split(".")[0]: f for f in cache_dir.glob("*.cache")} - - to_remove = [] - for key, timestamp in list(cache_index.items()): - if current_time - timestamp > max_age or key not in files_in_dir: - to_remove.append(key) - if key in files_in_dir: - files_in_dir[key].unlink(missing_ok=True) - - for key in to_remove: - cache_index.pop(key, None) - cache_metrics["evictions"] += 1 - - if len(cache_index) > CACHE_MAX_ENTRIES: - sorted_items = sorted(cache_index.items(), key=lambda x: x[1]) - excess_count = len(cache_index) - CACHE_MAX_ENTRIES - for key, _ in sorted_items[:excess_count]: - cache_index.pop(key, None) - cache_file = cache_dir / f"{key}.cache" - cache_file.unlink(missing_ok=True) - cache_metrics["evictions"] += 1 +sys.modules[__name__] = _cache diff --git a/src/search/content.py b/src/search/content.py index 42f8e34..971d4c2 100644 --- a/src/search/content.py +++ b/src/search/content.py @@ -1,419 +1,11 @@ -"""Webpage content fetching with caching, PDF extraction, and summarization helpers.""" +"""Compatibility wrapper for the canonical services.search.content module. -import copy -import io -import ipaddress -import json -import os -import re -import logging -import socket -from datetime import datetime, timedelta -from typing import List -from urllib.parse import urljoin, urlparse +``src.search.content`` stays importable for older agent/deep-research code, but the +implementation now lives in ``services.search.content`` so the two cannot drift. +""" -import httpx -from bs4 import BeautifulSoup +import sys -from .analytics import RateLimitError, error_logger -from .cache import ( - CONTENT_CACHE_DIR, - content_cache_index, - generate_cache_key, - cleanup_cache, -) +from services.search import content as _content -logger = logging.getLogger(__name__) - -_PRIVATE_NETWORKS = ( - ipaddress.ip_network("0.0.0.0/8"), - ipaddress.ip_network("10.0.0.0/8"), - ipaddress.ip_network("127.0.0.0/8"), - ipaddress.ip_network("169.254.0.0/16"), - ipaddress.ip_network("172.16.0.0/12"), - ipaddress.ip_network("192.168.0.0/16"), - ipaddress.ip_network("::1/128"), - ipaddress.ip_network("fc00::/7"), - ipaddress.ip_network("fe80::/10"), -) - - -def _is_private_address(addr: ipaddress._BaseAddress) -> bool: - if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped is not None: - addr = addr.ipv4_mapped - return ( - addr.is_private - or addr.is_loopback - or addr.is_link_local - or addr.is_reserved - or addr.is_multicast - or addr.is_unspecified - or any(addr in net for net in _PRIVATE_NETWORKS) - ) - - -def _resolve_hostname_ips(hostname: str) -> List[ipaddress._BaseAddress]: - ips = [] - for family, _, _, _, sockaddr in socket.getaddrinfo(hostname, None): - if family in (socket.AF_INET, socket.AF_INET6): - ips.append(ipaddress.ip_address(sockaddr[0])) - return ips - - -def _public_http_url(url: str) -> bool: - parsed = urlparse(url) - if parsed.scheme not in ("http", "https") or not parsed.hostname: - return False - host = parsed.hostname.strip().lower() - if host in ("localhost", "metadata.google.internal", "metadata"): - return False - if host.endswith((".local", ".localhost", ".internal", ".lan", ".intranet")): - return False - try: - return not _is_private_address(ipaddress.ip_address(host)) - except ValueError: - pass - try: - ips = _resolve_hostname_ips(host) - except OSError: - return False - # Fail closed: a hostname that resolves to nothing is treated as - # non-public (an empty all(...) would otherwise return True). - return bool(ips) and all(not _is_private_address(ip) for ip in ips) - - -def _get_public_url(url: str, *, headers: dict, timeout: int) -> httpx.Response: - if not _public_http_url(url): - raise httpx.RequestError(f"Blocked non-public URL: {url}") - - current = url - with httpx.Client(headers=headers, timeout=timeout, follow_redirects=False) as client: - for _ in range(8): - response = client.get(current) - if response.status_code not in (301, 302, 303, 307, 308): - return response - location = response.headers.get("location") - if not location: - return response - current = urljoin(current, location) - if not _public_http_url(current): - raise httpx.RequestError(f"Blocked redirect to non-public URL: {current}") - raise httpx.RequestError("Too many redirects") - -# PDF extraction (optional dependency) -try: - from pdfminer.high_level import extract_text as pdf_extract_text -except ImportError: - pdf_extract_text = None # type: ignore - - -# ---------------------------------------------------------------------- -# HTML extraction helpers -# ---------------------------------------------------------------------- -def _extract_meta(soup: BeautifulSoup) -> dict: - """Pull meta description and keywords if present.""" - description = "" - keywords = "" - desc_tag = soup.find("meta", attrs={"name": re.compile("description", re.I)}) - if desc_tag and desc_tag.get("content"): - description = desc_tag["content"].strip() - kw_tag = soup.find("meta", attrs={"name": re.compile("keywords", re.I)}) - if kw_tag and kw_tag.get("content"): - keywords = kw_tag["content"].strip() - return {"description": description, "keywords": keywords} - - -def _extract_og_image(soup: BeautifulSoup) -> str: - """Extract the best representative image URL from meta tags. - - Only returns absolute http(s) URLs — skips relative paths and data URIs. - """ - candidates = [] - # Open Graph image (most reliable) - for prop in ("og:image", "og:image:url", "og:image:secure_url"): - tag = soup.find("meta", attrs={"property": prop}) - if tag and tag.get("content", "").strip(): - candidates.append(tag["content"].strip()) - # Twitter card image - tag = soup.find("meta", attrs={"name": "twitter:image"}) - if tag and tag.get("content", "").strip(): - candidates.append(tag["content"].strip()) - # Thumbnail meta - tag = soup.find("meta", attrs={"name": "thumbnail"}) - if tag and tag.get("content", "").strip(): - candidates.append(tag["content"].strip()) - # Return first absolute http(s) URL - for url in candidates: - if url.startswith(("https://", "http://")) and not url.endswith((".svg", ".ico")): - return url - return "" - - -def _extract_lists(soup: BeautifulSoup) -> List[List[str]]: - """Return a list of lists, each inner list representing a
and blocks."""
- blocks = []
- for tag in soup.find_all(["pre", "code"]):
- txt = tag.get_text(separator=" ", strip=True)
- if txt:
- blocks.append(txt)
- return blocks
-
-
-def _detect_js_frameworks(soup: BeautifulSoup) -> bool:
- """Very naive detection of common JS frameworks."""
- js_indicators = [
- "react", "angular", "vue", "svelte", "next", "nuxt",
- "ember", "backbone", "jquery", "polymer", "mithril",
- ]
- for script in soup.find_all("script"):
- src = script.get("src", "").lower()
- if any(fr in src for fr in js_indicators):
- return True
- if script.string:
- content = script.string.lower()
- if any(fr in content for fr in js_indicators):
- return True
- if soup.find(attrs={"data-reactroot": True}) or soup.find(attrs={"ng-app": True}):
- return True
- return False
-
-
-def _empty_result(url: str, error: str = "") -> dict:
- """Build a standard failure result dict."""
- return {
- "url": url,
- "title": "",
- "content": "",
- "lists": [],
- "tables": [],
- "code_blocks": [],
- "meta_description": "",
- "meta_keywords": "",
- "js_rendered": False,
- "js_message": "",
- "success": False,
- "error": error,
- }
-
-
-# ----------------------------------------------------------------------
-# Main content fetcher
-# ----------------------------------------------------------------------
-def fetch_webpage_content(url: str, timeout: int = 5, retry_attempt: int = 0) -> dict:
- """Fetch and extract meaningful content from a webpage with caching."""
- cache_key = generate_cache_key(url)
- cache_file = CONTENT_CACHE_DIR / f"{cache_key}.cache"
-
- # Check cache
- if cache_file.exists():
- try:
- with open(cache_file, "r", encoding="utf-8") as f:
- cached_data = json.load(f)
- timestamp = datetime.fromisoformat(cached_data["timestamp"])
- if datetime.now() - timestamp < timedelta(hours=2):
- logger.debug(f"Content cache hit for URL: {url}")
- return cached_data["data"]
- else:
- cache_file.unlink(missing_ok=True)
- content_cache_index.pop(cache_key, None)
- except Exception as e:
- logger.warning(f"Failed to read content cache for {url}: {e}")
- cache_file.unlink(missing_ok=True)
- content_cache_index.pop(cache_key, None)
-
- # Fetch
- try:
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
- "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
- "Accept-Language": "en-US,en;q=0.5",
- "Accept-Encoding": "gzip, deflate",
- "Connection": "keep-alive",
- }
- response = _get_public_url(url, headers=headers, timeout=timeout)
-
- if response.status_code == 429:
- raise RateLimitError(f"Rate limit hit for {url} (attempt {retry_attempt})")
-
- response.raise_for_status()
- except httpx.RequestError as e:
- error_logger.error(f"NetworkError fetching {url} (attempt {retry_attempt}): {e}")
- return _empty_result(url, f"NetworkError: {e}")
- except RateLimitError as e:
- error_logger.error(str(e))
- return _empty_result(url, str(e))
-
- # PDF handling
- content_type = response.headers.get("Content-Type", "").lower()
- if "application/pdf" in content_type or url.lower().endswith(".pdf"):
- if pdf_extract_text is None:
- logger.error("pdfminer.six is not installed; cannot extract PDF text.")
- pdf_text = ""
- else:
- try:
- pdf_bytes = io.BytesIO(response.content)
- pdf_text = pdf_extract_text(pdf_bytes)
- except Exception as e:
- logger.warning(f"PDF extraction failed for {url}: {e}")
- pdf_text = ""
- result = {
- "url": url,
- "title": os.path.basename(url),
- "content": pdf_text,
- "lists": [],
- "tables": [],
- "code_blocks": [],
- "meta_description": "",
- "meta_keywords": "",
- "js_rendered": False,
- "js_message": "",
- "success": bool(pdf_text),
- "error": "" if pdf_text else "Failed to extract PDF text",
- }
- _cache_result(cache_file, cache_key, result, url)
- return result
-
- # HTML handling
- try:
- soup = BeautifulSoup(response.text, "html.parser")
- except Exception as e:
- error_logger.error(f"ParseError parsing HTML from {url} (attempt {retry_attempt}): {e}")
- result = _empty_result(url, f"ParseError: {e}")
- _cache_result(cache_file, cache_key, result, url)
- return result
-
- title_tag = soup.find("title")
- title_text = title_tag.get_text(strip=True) if title_tag else ""
- meta_info = _extract_meta(soup)
- og_image = _extract_og_image(soup)
- js_rendered = _detect_js_frameworks(soup)
- js_message = "Page appears to be rendered by a JavaScript framework; content may be incomplete." if js_rendered else ""
-
- # Main textual content (heuristic): prefer semantic / "content"-classed
- # containers to skip nav/footer/boilerplate; tuned for article pages.
- main_content = ""
- content_areas = soup.find_all(
- ["main", "article", "section", "div"],
- class_=re.compile("content|main|body|article|post|entry|text", re.I),
- )
- if content_areas:
- for area in content_areas[:3]:
- main_content += area.get_text(separator=" ", strip=True) + " "
- main_content = re.sub(r"\s+", " ", main_content).strip()
-
- # The class heuristic can latch onto a small wrapper and miss the real
- # content (app/landing pages, or SSR sites whose body isn't in a
- # "content"-classed div, so these came back nearly empty before). When the
- # heuristic returns nothing OR suspiciously little, fall back to the full
- # , stripping scripts/styles (so JSON/JS doesn't leak into the text)
- # plus nav/header/footer/aside (boilerplate), and keep whichever yields
- # more readable text.
- THIN_CONTENT_CHARS = 600 # below this the heuristic likely missed the page
- if len(main_content) < THIN_CONTENT_CHARS:
- body = soup.find("body")
- if body:
- # Strip from a copy so the later list/table/code extractors still
- # see the original soup unmodified.
- body_copy = copy.copy(body)
- for _noise in body_copy.find_all(
- ["script", "style", "noscript", "template", "nav", "header", "footer", "aside"]
- ):
- _noise.extract()
- body_text = re.sub(r"\s+", " ", body_copy.get_text(separator=" ", strip=True)).strip()
- if len(body_text) > len(main_content):
- main_content = body_text
-
- result = {
- "url": url,
- "title": title_text,
- "content": main_content,
- "lists": _extract_lists(soup),
- "tables": _extract_tables(soup),
- "code_blocks": _extract_code_blocks(soup),
- "meta_description": meta_info.get("description", ""),
- "meta_keywords": meta_info.get("keywords", ""),
- "og_image": og_image,
- "js_rendered": js_rendered,
- "js_message": js_message,
- "success": True,
- "error": "",
- }
- _cache_result(cache_file, cache_key, result, url)
- return result
-
-
-def _cache_result(cache_file, cache_key: str, result: dict, url: str):
- """Write a result to the content cache."""
- try:
- cache_data = {"timestamp": datetime.now().isoformat(), "data": result}
- with open(cache_file, "w", encoding="utf-8") as f:
- json.dump(cache_data, f)
- content_cache_index[cache_key] = datetime.now()
- cleanup_cache(CONTENT_CACHE_DIR, content_cache_index, timedelta(hours=2))
- except Exception as e:
- logger.warning(f"Failed to write content cache for {url}: {e}")
-
-
-# ----------------------------------------------------------------------
-# Content summarization helpers
-# ----------------------------------------------------------------------
-def extract_key_points(text: str) -> List[str]:
- """Pull out bullet-style key points from a block of text."""
- points: List[str] = []
- bullet_pat = re.compile(r"^\s*[-*•]\s+(.*)")
- numbered_pat = re.compile(r"^\s*\d+[\.\)]\s+(.*)")
- for line in text.splitlines():
- m = bullet_pat.match(line) or numbered_pat.match(line)
- if m:
- points.append(m.group(1).strip())
- return points
-
-
-def get_tldr(text: str, max_sentences: int = 3) -> str:
- """Produce a very short TL;DR by taking the first few sentences."""
- sentences = re.split(r"(?<=[.!?])\s+", text)
- selected = [s.strip() for s in sentences if s][:max_sentences]
- return " ".join(selected)
-
-
-def extract_quotes(text: str) -> List[str]:
- """Return quoted excerpts that are at least 15 characters long."""
- # Backreference the opening quote so the closing quote must match it —
- # otherwise `"text'` (open double, close single) is treated as a quote.
- return [m.group(2).strip() for m in re.finditer(r'(["\'])([^"\']{15,}?)\1', text)]
-
-
-def extract_statistics(text: str) -> List[str]:
- """Find numbers, percentages, dates and simple measurements."""
- # Match a comma-grouped number (1,000,000) OR a plain digit run (50000) —
- # the old `\d{1,3}(?:,\d{3})*` matched only the first 3 digits of a
- # comma-less number, and the trailing `\b` dropped a closing `%`.
- pattern = re.compile(
- r"\b(?:\d{1,3}(?:,\d{3})+|\d+)(?:\.\d+)?\s*(%|percent|‰|per cent|[a-zA-Z]+)?",
- re.IGNORECASE,
- )
- return [m.group(0).strip() for m in pattern.finditer(text)]
+sys.modules[__name__] = _content
diff --git a/src/search/query.py b/src/search/query.py
index 844d1b8..dc5299d 100644
--- a/src/search/query.py
+++ b/src/search/query.py
@@ -1,141 +1,11 @@
-"""Query enhancement, entity extraction, and cache duration helpers."""
+"""Compatibility wrapper for the canonical services.search.query module.
-import re
-import logging
-from datetime import timedelta
-from typing import Dict, List, Optional, Tuple
+``src.search.query`` stays importable for older agent/deep-research code, but the
+implementation now lives in ``services.search.query`` so the two cannot drift.
+"""
-logger = logging.getLogger(__name__)
+import sys
+from services.search import query as _query
-# ----------------------------------------------------------------------
-# Query processing helpers
-# ----------------------------------------------------------------------
-def _detect_question_type(query: str) -> Optional[str]:
- """Return the leading question word if present (who, what, when, where, why, how)."""
- if not isinstance(query, str):
- return None
- q = query.strip().lower()
- for word in ("who", "what", "when", "where", "why", "how"):
- # Require a whole-word match: a bare prefix mis-flags ordinary queries
- # like "whatsapp pricing" (-> what) or "however ..." (-> how), which
- # then get spurious boost terms OR-appended in enhance_query.
- if q == word or q.startswith(word + " "):
- return word
- return None
-
-
-def _extract_entities(query: str) -> Dict[str, List[str]]:
- """Lightweight entity extraction: capitalized words and date patterns."""
- entities: Dict[str, List[str]] = {"names": [], "dates": []}
- qtype = _detect_question_type(query)
- cleaned = query
- if qtype:
- cleaned = re.sub(rf"^{qtype}\b", "", cleaned, flags=re.I).strip()
- for token in re.findall(r"\b[A-Z][a-zA-Z]+\b", cleaned):
- entities["names"].append(token)
- for year in re.findall(r"\b(?:19|20)\d{2}\b", cleaned):
- entities["dates"].append(year)
- month_day_year = re.findall(
- r"\b(?:Jan|January|Feb|February|Mar|March|Apr|April|May|Jun|June|Jul|July|Aug|August|Sep|Sept|September|Oct|October|Nov|November|Dec|December)\s+\d{1,2},?\s*\d{4}\b",
- cleaned,
- flags=re.I,
- )
- entities["dates"].extend(month_day_year)
- return entities
-
-
-def _split_multi_part(query: str) -> List[str]:
- """Split a query into sub-queries on common conjunctions."""
- if not isinstance(query, str):
- return []
- parts = re.split(r"\s+and\s+|\s+or\s+|;", query, flags=re.I)
- return [p.strip() for p in parts if p.strip()]
-
-
-def _extract_site_filter(query: str) -> Tuple[str, Optional[str]]:
- """Detect a 'site:example.com' token. Returns (query_without_token, site_or_None)."""
- if not isinstance(query, str):
- return "", None
- match = re.search(r"\bsite:([^\s]+)", query, flags=re.I)
- if match:
- site = match.group(1)
- new_query = re.sub(r"\bsite:[^\s]+", "", query, flags=re.I).strip()
- return new_query, site
- return query, None
-
-
-def _boost_entities_in_query(base_query: str, entities: Dict[str, List[str]]) -> str:
- """Append extracted entities to the query using OR to increase relevance."""
- parts = [base_query]
- if entities.get("names"):
- parts.append(" OR ".join(f'"{n}"' for n in entities["names"]))
- if entities.get("dates"):
- parts.append(" OR ".join(f'"{d}"' for d in entities["dates"]))
- return " ".join(parts)
-
-
-def enhance_query(original_query: str) -> Tuple[str, Optional[str]]:
- """Process the original query: site filter, question type boosts, entity extraction."""
- if not isinstance(original_query, str):
- original_query = ""
- query_without_site, site = _extract_site_filter(original_query)
- sub_queries = _split_multi_part(query_without_site)
-
- enhanced_subs: List[str] = []
- for sub in sub_queries:
- qtype = _detect_question_type(sub)
- boost_keywords = []
- if qtype == "who":
- boost_keywords.append("person")
- elif qtype == "when":
- boost_keywords.append("date")
- elif qtype == "where":
- boost_keywords.append("location")
- elif qtype == "why":
- boost_keywords.append("reason")
- elif qtype == "how":
- boost_keywords.append("method")
- entities = _extract_entities(sub)
- boosted = _boost_entities_in_query(sub, entities)
- if boost_keywords:
- boosted = f'({boosted}) OR ({" OR ".join(boost_keywords)})'
- enhanced_subs.append(boosted)
-
- final_query = " AND ".join(f"({s})" for s in enhanced_subs)
- if site:
- final_query = f"{final_query} site:{site}"
- return final_query, site
-
-
-def build_enhanced_query(query: str, time_filter: str = None) -> str:
- """Build an enhanced search query with optional time filtering."""
- enhanced_query, _ = enhance_query(query)
-
- if time_filter:
- time_map = {"day": "d", "week": "w", "month": "m", "year": "y"}
- if time_filter in time_map:
- enhanced_query = f"{enhanced_query} after:{time_map[time_filter]}"
- logger.info(f"Added time filter '{time_filter}' to query")
-
- logger.info(f"Enhanced query: '{query}' -> '{enhanced_query}'")
- return enhanced_query
-
-
-# ----------------------------------------------------------------------
-# Cache duration helpers
-# ----------------------------------------------------------------------
-def _is_news_query(query: str) -> bool:
- """Lightweight heuristic to decide if a query is news-oriented."""
- if not isinstance(query, str):
- return False
- news_terms = {"news", "latest", "breaking", "today", "today's", "current", "updates", "happening"}
- tokens = set(re.findall(r"\b\w+\b", query.lower()))
- return bool(tokens & news_terms)
-
-
-def _cache_duration_for_query(query: str) -> timedelta:
- """News queries -> 30 minutes, reference queries -> 24 hours."""
- if _is_news_query(query):
- return timedelta(minutes=30)
- return timedelta(hours=24)
+sys.modules[__name__] = _query
diff --git a/tests/test_search_content_extraction_parity.py b/tests/test_search_content_extraction_parity.py
index 13add9b..ae66b70 100644
--- a/tests/test_search_content_extraction_parity.py
+++ b/tests/test_search_content_extraction_parity.py
@@ -1,11 +1,10 @@
-"""Keep src.search and services.search content extraction behavior aligned."""
+"""Content extraction behavior for the canonical services.search.content module."""
import pytest
pytest.importorskip("bs4")
from services.search import content as service_content
-from src.search import content as src_content
class _FakeResponse:
@@ -20,7 +19,7 @@ class _FakeResponse:
return None
-@pytest.mark.parametrize("module", [src_content, service_content])
+@pytest.mark.parametrize("module", [service_content])
def test_content_fetcher_extracts_og_image_and_body_fallback(module, tmp_path, monkeypatch):
html = """
diff --git a/tests/test_search_content_url_guards.py b/tests/test_search_content_url_guards.py
index 4c8a176..b072310 100644
--- a/tests/test_search_content_url_guards.py
+++ b/tests/test_search_content_url_guards.py
@@ -3,10 +3,9 @@ import ipaddress
import pytest
from services.search import content as service_content
-from src.search import content as src_content
-@pytest.mark.parametrize("module", [src_content, service_content])
+@pytest.mark.parametrize("module", [service_content])
@pytest.mark.parametrize("url", [
"http://printer.local/",
"http://nas.lan/",
@@ -21,7 +20,7 @@ def test_search_content_url_guard_blocks_internal_names_and_address_classes(modu
assert module._public_http_url(url) is False
-@pytest.mark.parametrize("module", [src_content, service_content])
+@pytest.mark.parametrize("module", [service_content])
def test_search_content_url_guard_blocks_dns_to_multicast(monkeypatch, module):
monkeypatch.setattr(
module,
@@ -32,6 +31,6 @@ def test_search_content_url_guard_blocks_dns_to_multicast(monkeypatch, module):
assert module._public_http_url("https://example.test/page") is False
-@pytest.mark.parametrize("module", [src_content, service_content])
+@pytest.mark.parametrize("module", [service_content])
def test_search_content_url_guard_still_allows_public_ip(module):
assert module._public_http_url("https://93.184.216.34/") is True
diff --git a/tests/test_search_module_consolidation.py b/tests/test_search_module_consolidation.py
index 61b097b..dd69646 100644
--- a/tests/test_search_module_consolidation.py
+++ b/tests/test_search_module_consolidation.py
@@ -33,3 +33,10 @@ def test_src_search_package_exports_still_resolve():
assert search.searxng_search_results is service_search.searxng_search_results
assert search.searxng_search_api is service_search.searxng_search_api
assert search.PROVIDER_INFO is service_search.PROVIDER_INFO
+
+
+def test_src_search_cache_content_query_alias_services():
+ for name in ("cache", "content", "query"):
+ src_mod = importlib.import_module(f"src.search.{name}")
+ svc_mod = importlib.import_module(f"services.search.{name}")
+ assert src_mod is svc_mod, f"src.search.{name} should alias services.search.{name}"
diff --git a/tests/test_security_regressions.py b/tests/test_security_regressions.py
index 01c09a4..0f3bbe6 100644
--- a/tests/test_security_regressions.py
+++ b/tests/test_security_regressions.py
@@ -860,19 +860,14 @@ def test_web_fetch_guard_blocks_redirect_into_private(monkeypatch):
class _Resp:
status_code = 302
+ url = "http://public.example/start"
headers = {"location": "http://169.254.169.254/latest/meta-data/"}
- class _FakeClient:
- def __init__(self, *a, **k): pass
- def __enter__(self): return self
- def __exit__(self, *a): return False
- def get(self, url): return _Resp()
-
- monkeypatch.setattr(httpx, "Client", _FakeClient)
+ monkeypatch.setattr(httpx, "get", lambda url, **kwargs: _Resp())
with _pytest.raises(httpx.RequestError) as exc:
content._get_public_url("http://public.example/start", headers={}, timeout=5)
- assert "non-public" in str(exc.value)
+ assert "Blocked" in str(exc.value)
# ── audit fixes (2026-06-01): email XSS, attachment traversal, authz ──
diff --git a/tests/test_src_search_query_nonstring.py b/tests/test_src_search_query_nonstring.py
index c476f6b..d0011ed 100644
--- a/tests/test_src_search_query_nonstring.py
+++ b/tests/test_src_search_query_nonstring.py
@@ -1,22 +1,12 @@
-import importlib.machinery
-import importlib.util
-from pathlib import Path
+"""Query helpers must tolerate non-string input.
+
+`src.search.query` is a compatibility shim that aliases the canonical
+`services.search.query`, so this exercises the live implementation.
+"""
+import services.search.query as q
-_PATH = Path(__file__).resolve().parents[1] / "src" / "search" / "query.py"
-
-
-def _load():
- loader = importlib.machinery.SourceFileLoader("odysseus_src_search_query", str(_PATH))
- spec = importlib.util.spec_from_loader(loader.name, loader)
- module = importlib.util.module_from_spec(spec)
- loader.exec_module(module)
- return module
-
-
-def test_src_search_helpers_handle_non_string_queries():
- q = _load()
-
+def test_query_helpers_handle_non_string_queries():
assert q._detect_question_type(None) is None
assert q._split_multi_part(None) == []
assert q._extract_site_filter(None) == ("", None)
@@ -25,9 +15,7 @@ def test_src_search_helpers_handle_non_string_queries():
assert isinstance(q.build_enhanced_query(123), str)
-def test_src_search_valid_query_still_works():
- q = _load()
-
+def test_query_valid_query_still_works():
assert q._detect_question_type("who is bob") == "who"
assert q._is_news_query("latest news today") is True
assert q._extract_site_filter("cats site:x.com")[1] == "x.com"