Validate internal tool owner attribution
This commit is contained in:
14
app.py
14
app.py
@@ -224,13 +224,15 @@ if AUTH_ENABLED:
|
||||
_hdr = request.headers.get(INTERNAL_TOOL_HEADER)
|
||||
if _hdr and _hdr == _ITT and _is_trusted_loopback(request):
|
||||
# Impersonation: when the agent's loopback call sets
|
||||
# X-Odysseus-Owner, attribute the request to that
|
||||
# user so notes/calendar/etc. land in their account
|
||||
# instead of being owned by "internal-tool" (which
|
||||
# made the agent's POSTs invisible to the user that
|
||||
# asked for them).
|
||||
# X-Odysseus-Owner, attribute the request to that user only
|
||||
# if they exist. Authorization checks remain separate; this
|
||||
# is just owner attribution for notes/calendar/etc.
|
||||
_impersonate = (request.headers.get("X-Odysseus-Owner") or "").strip()
|
||||
request.state.current_user = _impersonate or "internal-tool"
|
||||
_auth_mgr = getattr(request.app.state, "auth_manager", None) or auth_manager
|
||||
if _impersonate and _impersonate in getattr(_auth_mgr, "users", {}):
|
||||
request.state.current_user = _impersonate
|
||||
else:
|
||||
request.state.current_user = "internal-tool"
|
||||
request.state.api_token = False
|
||||
return await call_next(request)
|
||||
except Exception:
|
||||
|
||||
@@ -305,6 +305,26 @@ def test_require_admin_allows_when_auth_explicitly_disabled(monkeypatch):
|
||||
assert require_admin(_Req()) is None
|
||||
|
||||
|
||||
def test_internal_tool_owner_header_logic_requires_known_user():
|
||||
"""Pin the owner-attribution branch used by app.AuthMiddleware without
|
||||
booting the full FastAPI app."""
|
||||
users = {
|
||||
"alice": {"is_admin": False},
|
||||
"AdminUser": {"is_admin": True},
|
||||
}
|
||||
|
||||
def resolve_owner(header_value):
|
||||
impersonate = (header_value or "").strip()
|
||||
if impersonate and impersonate in users:
|
||||
return impersonate
|
||||
return "internal-tool"
|
||||
|
||||
assert resolve_owner("alice") == "alice"
|
||||
assert resolve_owner("AdminUser") == "AdminUser"
|
||||
assert resolve_owner("doesnotexist") == "internal-tool"
|
||||
assert resolve_owner("") == "internal-tool"
|
||||
|
||||
|
||||
def test_auth_manager_migrates_legacy_admin_role(tmp_path):
|
||||
"""Old setup.py wrote role='admin'; startup must turn that into is_admin."""
|
||||
sys.modules.pop("core.auth", None)
|
||||
|
||||
Reference in New Issue
Block a user