fix(skills): scope skill reads to caller owner (#777)

read_skill_md and read_skill_reference walk all skill files via
_iter_skill_files and return the first match by slug, regardless
of owner. In a multi-user deployment where two users have skills
with the same slug under different categories, a caller scoped
to owner='alice' can read Bob's skill content.

This is the same cross-tenant leak class as the update_skill /
delete_skill fix (PR #755, merged), but on the read path.

Changes:
- read_skill_md / read_skill_reference accept owner= param (default
  None = match ownerless only, matching the write-path convention).
- 7 callers updated: tool_implementations.py (view, view_ref, patch),
  builtin_actions.py (test_skills), skills_routes.py (audit, source,
  test routes).
- Tests: read scoping (alice reads hers, not bob's), positive update
  scoping (alice can mutate her own), ownerless-match default.
This commit is contained in:
Ernest Hysa
2026-06-02 03:21:27 +01:00
committed by GitHub
parent 000bd6d1ab
commit f4aef0dcf7
5 changed files with 89 additions and 15 deletions

View File

@@ -766,7 +766,7 @@ async def _audit_one_skill(skills_manager, skill, url, model, headers,
except Exception:
pass
md = skills_manager.read_skill_md(name)
md = skills_manager.read_skill_md(name, owner=owner)
if not md:
log(f"{name}: no source — skipped")
return {"skill": name, "result": "skipped"}
@@ -1246,7 +1246,7 @@ def setup_skills_routes(skills_manager: SkillsManager) -> APIRouter:
if not match:
raise HTTPException(404, "Skill not found")
_verify_owner(match, user)
md = skills_manager.read_skill_md(match.get("name"))
md = skills_manager.read_skill_md(match.get("name"), owner=user)
if md is None:
raise HTTPException(404, "Skill source unavailable (legacy entry?)")
return {"name": match.get("name"), "markdown": md}
@@ -1273,7 +1273,7 @@ def setup_skills_routes(skills_manager: SkillsManager) -> APIRouter:
raise HTTPException(404, "Skill not found")
_verify_owner(match, user)
name = match.get("name")
md = skills_manager.read_skill_md(name) or ""
md = skills_manager.read_skill_md(name, owner=user) or ""
if not task:
task = _skill_test_task(match)

View File

@@ -472,24 +472,29 @@ class SkillsManager:
# Reading a single skill (used by the skill_view tool)
# ----------------------------------------------------------------------
def read_skill_md(self, name: str) -> Optional[str]:
def read_skill_md(self, name: str, owner: Optional[str] = None) -> Optional[str]:
for path in self._iter_skill_files():
sk = self._read_skill(path)
if sk and sk.name == name:
try:
with open(path, encoding="utf-8") as f:
return f.read()
except Exception:
return None
if not sk or sk.name != name:
continue
if (sk.owner or "") != (owner or ""):
continue
try:
with open(path, encoding="utf-8") as f:
return f.read()
except Exception:
return None
return None
def read_skill_reference(self, name: str, ref_path: str) -> Optional[str]:
def read_skill_reference(self, name: str, ref_path: str, owner: Optional[str] = None) -> Optional[str]:
"""Read a sub-file under the skill's directory (references/, etc).
Refuses path traversal."""
for path in self._iter_skill_files():
sk = self._read_skill(path)
if not sk or sk.name != name:
continue
if (sk.owner or "") != (owner or ""):
continue
base = os.path.realpath(os.path.dirname(path))
target = os.path.realpath(os.path.join(base, ref_path))
if os.path.commonpath([base, target]) != base or target == os.path.dirname(path):

View File

@@ -1318,7 +1318,7 @@ async def action_test_skills(owner: str, **kwargs) -> Tuple[str, bool]:
name = skill.get("name")
if not name:
continue
md = sm.read_skill_md(name) or ""
md = sm.read_skill_md(name, owner=owner) or ""
if not md:
tally["skipped"] += 1
per_skill_log.append(f"{name}: skipped (no SKILL.md)")

View File

@@ -651,7 +651,7 @@ async def do_manage_skills(content: str, owner: Optional[str] = None) -> Dict:
if action == "view":
if not name:
return {"error": "name is required for view", "exit_code": 1}
md = sm.read_skill_md(name)
md = sm.read_skill_md(name, owner=owner)
if md is None:
return {"error": f"Skill {name!r} not found", "exit_code": 1}
return {"results": md}
@@ -662,7 +662,7 @@ async def do_manage_skills(content: str, owner: Optional[str] = None) -> Dict:
ref = (args.get("path") or "").strip()
if not ref:
return {"error": "path is required for view_ref", "exit_code": 1}
text = sm.read_skill_reference(name, ref)
text = sm.read_skill_reference(name, ref, owner=owner)
if text is None:
return {"error": f"Reference {ref!r} not found under {name!r}", "exit_code": 1}
return {"results": text}
@@ -747,7 +747,7 @@ async def do_manage_skills(content: str, owner: Optional[str] = None) -> Dict:
new_str = args.get("new_string", "")
if not isinstance(old, str) or not old:
return {"error": "old_string is required and must be non-empty", "exit_code": 1}
md = sm.read_skill_md(name)
md = sm.read_skill_md(name, owner=owner)
if md is None:
return {"error": f"Skill {name!r} not found", "exit_code": 1}
count = md.count(old)

View File

@@ -193,3 +193,72 @@ def test_update_skill_scalar_keys_exclude_owner():
"The fix removed this to prevent cross-user ownership reassignment "
"via the updates dict."
)
def test_read_skill_md_scopes_to_owner(tmp_path):
"""Two users own distinct skills with the same slug. read_skill_md()
called with owner='alice' must return Alice's content, not Bob's.
Called without an owner it must match only ownerless skills."""
skills_root = tmp_path / "skills"
skills_root.mkdir(parents=True, exist_ok=True)
alice_path = _write_skill_md(
skills_root, category="alice-cat", name="login-flow",
owner="alice", description="alice secret",
)
bob_path = _write_skill_md(
skills_root, category="bob-cat", name="login-flow",
owner="bob", description="bob secret",
)
sm = SkillsManager(str(tmp_path))
alice_md = sm.read_skill_md("login-flow", owner="alice")
assert alice_md is not None, "read_skill_md returned None for alice's skill"
assert "alice secret" in alice_md, (
f"read_skill_md(owner='alice') returned the wrong file: {alice_md[:200]}"
)
bob_md = sm.read_skill_md("login-flow", owner="bob")
assert bob_md is not None, "read_skill_md returned None for bob's skill"
assert "bob secret" in bob_md, (
f"read_skill_md(owner='bob') returned the wrong file: {bob_md[:200]}"
)
no_owner_md = sm.read_skill_md("login-flow")
assert no_owner_md is None, (
"read_skill_md without owner matched an owned skill — "
"default should only match ownerless skills. Got: "
f"{no_owner_md[:200] if no_owner_md else 'None'}"
)
def test_update_skill_positive_scoping(tmp_path):
"""Alice CAN update her own skill. Two users with the same slug;
update_skill(owner='alice') modifies only Alice's file."""
skills_root = tmp_path / "skills"
skills_root.mkdir(parents=True, exist_ok=True)
alice_path = _write_skill_md(
skills_root, category="alice-cat", name="login-flow",
owner="alice", description="alice original",
)
bob_path = _write_skill_md(
skills_root, category="bob-cat", name="login-flow",
owner="bob", description="bob original",
)
sm = SkillsManager(str(tmp_path))
ok = sm.update_skill("login-flow", {"description": "alice updated"}, owner="alice")
assert ok, "update_skill(owner='alice') should succeed on alice's file"
after_alice = alice_path.read_text(encoding="utf-8")
after_bob = bob_path.read_text(encoding="utf-8")
assert "alice updated" in after_alice, (
"Alice's file was not updated despite passing owner='alice'."
)
assert "bob original" in after_bob and "alice updated" not in after_bob, (
"Bob's file was mutated by Alice's update_skill call — cross-tenant leak."
)