Security: owner-scope v1 chat endpoint fallback

The sync-chat endpoint's Case 3 fallback selected a ModelEndpoint with an
unscoped `query(ModelEndpoint).filter(is_enabled == True).first()` and then
used that row's decrypted `api_key` for the LLM call. ModelEndpoint is a
per-user resource (owner non-null = private to that user), so a chat-scoped
API token for user A that sent no session and no api_key could fall back onto
user B's PRIVATE endpoint — spending B's API key/quota and reaching whatever
internal base_url B configured. This is the same multi-tenant owner-scoping
class already fixed for the session gate on this very endpoint
(_caller_owns_session) and for companion/models.

Scope the fallback to the token owner's own rows plus legacy null-owner
(shared) rows via the existing owner_filter helper, matching
routes/model_routes.py and companion/routes.py. A null/empty owner stays a
no-op, preserving single-user/legacy behaviour.

Add regression tests pinning the scoped fallback (cross-owner, shared-only,
no-visible-row, disabled-owned, and the legacy null-owner no-op).
This commit is contained in:
Mahdi Salmanzade
2026-06-02 15:31:35 +04:00
committed by GitHub
parent b5747e3979
commit 280c29d572
2 changed files with 123 additions and 2 deletions

View File

@@ -26,6 +26,25 @@ MAX_MESSAGE_LEN = 32_000
from core.middleware import require_admin as _require_admin
def _first_enabled_endpoint(db, owner):
"""First enabled ModelEndpoint VISIBLE to `owner` — their own rows plus
legacy null-owner ("shared") rows. Owner-scoped on purpose: ModelEndpoint
is per-user (core/database.py — "when non-null, the model picker only shows
the endpoint to that user"), and the sync-chat fallback uses the row's
decrypted `api_key`. An unscoped ``.first()`` would let a chat-scoped token
(e.g. a paired mobile device) fall back onto ANOTHER user's private
endpoint and silently spend that owner's API key / quota — and reach
whatever internal base_url they configured. Mirrors the owner_filter scoping
in routes/model_routes.py and companion/routes.py. A null/empty owner is a
no-op (single-user / legacy mode), preserving the original behaviour.
"""
from core.database import ModelEndpoint
from src.auth_helpers import owner_filter
q = db.query(ModelEndpoint).filter(ModelEndpoint.is_enabled == True) # noqa: E712
q = owner_filter(q, ModelEndpoint, owner)
return q.first()
def _caller_owns_session(sess_owner, caller) -> bool:
"""Strict session-ownership gate for the token-authenticated sync-chat
endpoint (`POST /api/v1/chat`).
@@ -222,7 +241,6 @@ def setup_webhook_routes(
from core.models import ChatMessage
from src.llm_core import llm_call_async
from core.database import ModelEndpoint
from src.endpoint_resolver import build_chat_url, build_headers, build_models_url, normalize_base
message = body.message.strip()
@@ -287,7 +305,9 @@ def setup_webhook_routes(
if not sess:
db = SessionLocal()
try:
ep = db.query(ModelEndpoint).filter(ModelEndpoint.is_enabled == True).first()
# Owner-scoped: only THIS token owner's endpoints + legacy
# shared rows, never another user's private endpoint/api_key.
ep = _first_enabled_endpoint(db, token_owner)
finally:
db.close()

View File

@@ -218,3 +218,104 @@ def test_sync_chat_gate_rejects_unresolvable_caller():
def test_sync_chat_gate_accepts_matching_owner():
wh_mod = _import_webhook_helper()
assert wh_mod._caller_owns_session("alice", "alice") is True
# ---------------------------------------------------------------------------
# webhook._first_enabled_endpoint (POST /api/v1/chat, Case 3 fallback)
# ---------------------------------------------------------------------------
# The SAME multi-tenant leak in a second spot on this endpoint: when a
# chat-scoped token sends no session and no api_key, sync-chat falls back to a
# configured ModelEndpoint and uses that row's *decrypted* api_key. The query
# was an unscoped `.first()`, so a token for "alice" could fall back onto
# "bob"'s PRIVATE endpoint and silently spend bob's API key / reach bob's
# internal base_url. The fallback must be owner-scoped (own rows + legacy
# null-owner shared rows), exactly like routes/model_routes.py and
# companion/routes.py.
class _Predicate:
def __init__(self, check):
self._check = check
def __call__(self, row):
return self._check(row)
def __or__(self, other):
return _Predicate(lambda row: self(row) or other(row))
class _Column:
def __init__(self, name):
self.name = name
def __eq__(self, value):
return _Predicate(lambda row: getattr(row, self.name) == value)
class _ModelEndpoint:
is_enabled = _Column("is_enabled")
owner = _Column("owner")
class _Query:
def __init__(self, rows):
self._rows = list(rows)
def filter(self, *predicates):
self._rows = [r for r in self._rows if all(p(r) for p in predicates)]
return self
def first(self):
return self._rows[0] if self._rows else None
class _DB:
def __init__(self, rows):
self._rows = rows
def query(self, model):
assert model is _ModelEndpoint
return _Query(self._rows)
def _ep(name, owner, *, is_enabled=True):
return SimpleNamespace(name=name, owner=owner, is_enabled=is_enabled)
def _select(rows, owner):
wh_mod = _import_webhook_helper()
sys.modules["core.database"].ModelEndpoint = _ModelEndpoint
return wh_mod._first_enabled_endpoint(_DB(rows), owner)
def test_sync_chat_fallback_never_picks_another_owners_endpoint():
# bob's private endpoint is first in the table, but alice must never get it.
rows = [_ep("bob-private", "bob"), _ep("alice-private", "alice")]
ep = _select(rows, "alice")
assert ep is not None and ep.name == "alice-private"
def test_sync_chat_fallback_prefers_owned_or_shared_only():
rows = [_ep("bob-private", "bob"), _ep("shared", None)]
ep = _select(rows, "alice")
# Only the legacy null-owner shared row is visible to alice.
assert ep is not None and ep.name == "shared"
def test_sync_chat_fallback_returns_none_when_only_others_endpoints():
rows = [_ep("bob-private", "bob"), _ep("carol-private", "carol")]
# No owned/shared row → fall through to the 400, never borrow bob's key.
assert _select(rows, "alice") is None
def test_sync_chat_fallback_skips_disabled_owned_endpoint():
rows = [_ep("alice-disabled", "alice", is_enabled=False), _ep("shared", None)]
ep = _select(rows, "alice")
assert ep is not None and ep.name == "shared"
def test_sync_chat_fallback_null_owner_is_legacy_single_user_noop():
# An unresolvable/empty token owner keeps the original single-user behaviour
# (owner_filter no-op): first enabled row, whatever it is.
rows = [_ep("first", "bob"), _ep("second", "alice")]
ep = _select(rows, None)
assert ep is not None and ep.name == "first"