fix(security): scope send_to_session agent tool by owner (#1757)

send_to_session was the only agent tool that didn't check session
ownership — an agent acting for user A could read from and write
into user B's session on a multi-user instance.

Add owner parameter and reject access when the target session
belongs to a different user, matching the pattern used by
create_session, list_sessions, and manage_session.

Fixes #1616

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Wes Huber
2026-06-02 21:24:08 -07:00
committed by GitHub
parent b3da01efd5
commit 3abb735200

View File

@@ -517,7 +517,7 @@ async def do_list_sessions(content: str, session_id: Optional[str] = None, owner
return {"error": str(e)}
async def do_send_to_session(content: str, session_id: Optional[str] = None) -> Dict:
async def do_send_to_session(content: str, session_id: Optional[str] = None, owner: Optional[str] = None) -> Dict:
"""Send a message to an existing session and get a response.
Content format:
@@ -541,6 +541,10 @@ async def do_send_to_session(content: str, session_id: Optional[str] = None) ->
if not sess:
return {"error": f"Session '{target_sid}' not found"}
# Owner-scope: reject access to another user's session
if owner and getattr(sess, "owner", None) and sess.owner != owner:
return {"error": f"Session '{target_sid}' not found"}
if not message:
return {"error": "No message provided"}
@@ -1769,7 +1773,7 @@ async def dispatch_ai_tool(
elif tool == "send_to_session":
sid = content.split("\n")[0].strip()[:20]
desc = f"send_to_session: {sid}"
result = await do_send_to_session(content, session_id)
result = await do_send_to_session(content, session_id, owner=owner)
elif tool == "pipeline":
desc = "pipeline: running steps"