Scope skill mutations to caller owner

SkillsManager.update_skill walks every SKILL.md on disk and matches by
slug only; the 'owner' key in its scalar_keys whitelist meant a caller
could pass updates={'owner': 'attacker', 'description': 'pwned'} and the
first matching file on disk got silently re-owned. Two users with the
same slug under different category directories (which is supported by
the on-disk layout <category>/<name>/SKILL.md) could each stomp the
other's skill via the manage_skills tool or the in-process callers in
tool_implementations.py (edit, patch, publish, delete).

update_skill and delete_skill now require the caller's owner and only
match a file whose parsed owner field matches. The default of None
means 'no scope' and only matches ownerless skills, so an unsafe call
without an explicit owner is now a no-op. 'owner' is also removed from
scalar_keys so the updates dict cannot be used to reassign ownership
even when the manager is called from an in-process path that didn't
supply the owner argument.

The in-process callers in tool_implementations.py are updated to pass
owner=owner (which was already in scope at every call site) so the
HTTP and agent paths both go through the scoped check. The HTTP route
at routes/skills_routes.py:1499 was already owner-scoped via
sm.load(owner=user); the fix brings the in-process path up to the
same standard.
This commit is contained in:
Ernest Hysa
2026-06-01 21:59:43 +01:00
committed by GitHub
parent 5dd5847d4b
commit d42e6a7acc
3 changed files with 220 additions and 9 deletions

View File

@@ -363,19 +363,33 @@ class SkillsManager:
return sk.to_dict()
def update_skill(self, skill_id: str, updates: Dict) -> bool:
def update_skill(self, skill_id: str, updates: Dict, owner: Optional[str] = None) -> bool:
"""`skill_id` is the slug name. Allows updating any field plus
renames if `name` changes (file is moved on disk)."""
renames if `name` changes (file is moved on disk).
The call is owner-scoped: it matches a skill on disk only if
`skill.owner == owner` (string compare; both empty-string and
None mean "ownerless"). When `owner is None` (the default), the
call only matches skills whose own `owner` field is empty —
callers that want to edit an owned skill must pass the matching
owner explicitly. This prevents a caller with one owner from
mutating a file owned by another user that happens to share
the same slug across category directories. The `owner` key in
`updates` is also ignored — ownership is not an editable field
via this path; rename or admin tooling is required for that.
"""
for path in self._iter_skill_files():
sk = self._read_skill(path)
if not sk or sk.name != skill_id:
continue
if (sk.owner or "") != (owner or ""):
continue
old_dir = os.path.dirname(path)
# Apply updates in a Skill-shape friendly way
scalar_keys = (
"description", "version", "category", "status", "confidence",
"source", "teacher_model", "owner", "when_to_use",
"source", "teacher_model", "when_to_use",
"body_extra",
)
for k in scalar_keys:
@@ -421,11 +435,13 @@ class SkillsManager:
return True
return False
def delete_skill(self, skill_id: str) -> bool:
def delete_skill(self, skill_id: str, owner: Optional[str] = None) -> bool:
for path in self._iter_skill_files():
sk = self._read_skill(path)
if not sk or sk.name != skill_id:
continue
if (sk.owner or "") != (owner or ""):
continue
skill_dir = os.path.dirname(path)
try:
# Remove the whole skill dir

View File

@@ -713,7 +713,7 @@ async def do_manage_skills(content: str, owner: Optional[str] = None) -> Dict:
return {"error": f"Skill {name!r} not found", "exit_code": 1}
if not sk_new.owner:
sk_new.owner = match.get("owner") or owner
ok = sm.update_skill(name, _skill_dump(sk_new))
ok = sm.update_skill(name, _skill_dump(sk_new), owner=owner)
return {"results": f"Edited skill `{sk_new.name}`."} if ok else {"error": "Update failed", "exit_code": 1}
if action == "patch":
@@ -737,7 +737,7 @@ async def do_manage_skills(content: str, owner: Optional[str] = None) -> Dict:
except Exception as e:
return {"error": f"Patched content is not valid SKILL.md: {e}", "exit_code": 1}
sk_new.name = slugify(sk_new.name or name)
ok = sm.update_skill(name, _skill_dump(sk_new))
ok = sm.update_skill(name, _skill_dump(sk_new), owner=owner)
return {"results": f"Patched skill `{sk_new.name}`."} if ok else {"error": "Patch update failed", "exit_code": 1}
if action == "publish":
@@ -750,13 +750,13 @@ async def do_manage_skills(content: str, owner: Optional[str] = None) -> Dict:
updates = {"status": "published"}
if args.get("confidence") is not None:
updates["confidence"] = max(0.0, min(1.0, float(args["confidence"])))
sm.update_skill(name, updates)
sm.update_skill(name, updates, owner=owner)
return {"results": f"✅ Published `{name}`. It now appears in the skills index for future turns."}
if action == "delete":
if not name:
return {"error": "name is required for delete", "exit_code": 1}
ok = sm.delete_skill(name)
ok = sm.delete_skill(name, owner=owner)
return {"results": f"Deleted skill `{name}`."} if ok else {"error": f"Skill {name!r} not found", "exit_code": 1}
if action == "search":

View File

@@ -0,0 +1,195 @@
"""Independent validation test for the claim that
`SkillsManager.update_skill` mutates the first skill on disk matching
`name` regardless of the caller's owner, and that `owner` is in its
`scalar_keys` whitelist allowing cross-user ownership reassignment.
This test sets up two user-owned skills on disk with the SAME slug
(`login-flow`) — Alice's and Bob's — and then calls `update_skill` with
NO `owner` argument. If the bug is real, exactly one of the two files
will be mutated (whichever `_iter_skill_files` yields first) and the
caller will have effectively re-stamped the file as owned by the value
in `updates["owner"]` ("attacker"). If the manager method is safe (or
the slug uniqueness invariant makes the bug moot), the call should
either:
* raise (it requires an `owner` argument), OR
* be a no-op (no other side effect on Bob's file), OR
* the file that gets modified should still belong to its original
owner (no ownership reassignment).
We assert the safer behaviors; the test FAILS only when update_skill
silently mutates a file owned by a different user AND overwrites the
`owner` field with an attacker's value.
"""
import os
import sys
import textwrap
import types
from pathlib import Path
from unittest.mock import MagicMock
import pytest
# ── module-load stubbing (matches other tests in this repo) ──────────
# Stub heavy deps so importing the skills manager doesn't pull DB / FastAPI.
for _mod in [
"sqlalchemy", "sqlalchemy.orm", "sqlalchemy.ext",
"sqlalchemy.ext.declarative", "src.database",
"core.atomic_io", # we'll patch atomic_write_text below
]:
if _mod not in sys.modules:
sys.modules[_mod] = MagicMock()
# Provide a no-op atomic_write_text for SkillsManager._write_skill.
def _fake_atomic_write_text(path, content, **kw):
Path(path).parent.mkdir(parents=True, exist_ok=True)
Path(path).write_text(content, encoding="utf-8")
_fake_core = types.ModuleType("core.atomic_io")
_fake_core.atomic_write_text = _fake_atomic_write_text
_fake_core.atomic_write_json = lambda p, d, **kw: Path(p).write_text(
"{}", encoding="utf-8"
)
sys.modules["core.atomic_io"] = _fake_core
from services.memory.skills import SkillsManager # noqa: E402
from services.memory.skill_format import Skill, slugify # noqa: E402
def _write_skill_md(skills_root: Path, category: str, name: str,
owner: str, description: str) -> Path:
"""Drop a real SKILL.md on disk for the given owner."""
skill_dir = skills_root / slugify(category or "general", fallback="general") / name
skill_dir.mkdir(parents=True, exist_ok=True)
md = textwrap.dedent(f"""\
---
name: {name}
description: {description}
version: 1.0.0
category: {category}
tags: []
status: draft
confidence: 0.8
source: learned
owner: {owner}
created: 2026-01-01T00:00:00Z
---
# When to use
test
# Procedure
- step 1
""")
path = skill_dir / "SKILL.md"
path.write_text(md, encoding="utf-8")
return path
def test_update_skill_does_not_mutate_foreign_owned_skill(tmp_path):
"""Two users own distinct skills with the same slug. update_skill()
called WITHOUT an owner argument must not silently overwrite the
wrong file or change its owner field."""
skills_root = tmp_path / "skills"
skills_root.mkdir(parents=True, exist_ok=True)
# Create two distinct on-disk skills with the SAME slug but in
# DIFFERENT category directories so they are real, separately
# addressable files. (The on-disk layout is
# `<category>/<name>/SKILL.md`, so two users can in fact have
# the same slug under different categories — exactly the situation
# that triggers the first-match-wins bug in update_skill.)
alice_path = _write_skill_md(
skills_root, category="alice-cat", name="login-flow",
owner="alice", description="alice original",
)
bob_path = _write_skill_md(
skills_root, category="bob-cat", name="login-flow",
owner="bob", description="bob original",
)
assert alice_path != bob_path
assert alice_path.exists() and bob_path.exists()
sm = SkillsManager(str(tmp_path))
# Snapshot before.
before_alice = alice_path.read_text(encoding="utf-8")
before_bob = bob_path.read_text(encoding="utf-8")
# Try to reassign + mutate. The caller does NOT supply an owner
# arg, mirroring the in-process callers in tool_implementations.py
# (lines 716, 740, 753) which call sm.update_skill(name, updates).
try:
result = sm.update_skill(
"login-flow",
{"owner": "attacker", "description": "pwned"},
)
except TypeError as e:
# If the method were fixed to require an owner arg, this is
# the desired (safe) behavior — the call refused.
pytest.skip(
f"update_skill raised TypeError (refused unsafe call): {e}"
)
return
# After: read what each file now contains.
after_alice = alice_path.read_text(encoding="utf-8")
after_bob = bob_path.read_text(encoding="utf-8")
# Invariant 1: a file that was owned by `alice` (resp. `bob`) MUST
# NOT end up owned by `attacker` after the call. If it does, that's
# the cross-user ownership reassignment bug.
assert "owner: attacker" not in after_alice, (
"BUG: Alice's file was silently re-owned as 'attacker' by "
"update_skill (cross-user ownership reassignment)."
)
assert "owner: attacker" not in after_bob, (
"BUG: Bob's file was silently re-owned as 'attacker' by "
"update_skill (cross-user ownership reassignment)."
)
# Invariant 2: a file that was owned by `alice` and contained
# description "alice original" must not be silently mutated into
# "pwned" by a caller that did not supply an owner.
if "alice original" in before_alice:
assert "alice original" in after_alice, (
"BUG: Alice's skill description was overwritten by a call "
"to update_skill that did not scope to her owner."
)
if "bob original" in before_bob:
assert "bob original" in after_bob, (
"BUG: Bob's skill description was overwritten by a call "
"to update_skill that did not scope to his owner."
)
# The return value should not lie about success — if the manager
# touched nothing because both files were foreign-owned, the safer
# behavior is to return False, not True. (A return of True is the
# buggy path; we don't assert False, we just don't assert True.)
_ = result # not asserted; documented behavior is not the point.
def test_update_skill_scalar_keys_exclude_owner():
"""Static check: the manager's scalar_keys whitelist MUST NOT
include 'owner' — otherwise a non-owner caller can pass
updates={'owner': 'attacker'} and reassign the file. The fix
removed 'owner' from scalar_keys; this test now asserts the
fix is in place."""
src = Path("services/memory/skills.py").read_text(encoding="utf-8")
import re
m = re.search(
r"def update_skill\(.*?scalar_keys\s*=\s*\((.*?)\)",
src,
re.DOTALL,
)
assert m, "could not locate scalar_keys tuple in update_skill"
body = m.group(1)
assert '"owner"' not in body and "'owner'" not in body, (
"BUG (regression): scalar_keys in update_skill includes 'owner'. "
"The fix removed this to prevent cross-user ownership reassignment "
"via the updates dict."
)