From 2e961cee93125fcfda2e727d5bf3901f0e080ea3 Mon Sep 17 00:00:00 2001 From: Alexandre Teixeira <111787685+alteixeira20@users.noreply.github.com> Date: Tue, 2 Jun 2026 12:42:37 +0100 Subject: [PATCH] tests: cover calendar route owner gates --- tests/test_calendar_owner_scope.py | 297 +++++++++++++++++++++++++++++ 1 file changed, 297 insertions(+) diff --git a/tests/test_calendar_owner_scope.py b/tests/test_calendar_owner_scope.py index 7eb3479..4e66eb0 100644 --- a/tests/test_calendar_owner_scope.py +++ b/tests/test_calendar_owner_scope.py @@ -12,7 +12,15 @@ get_upcoming_events scopes to the owner; it fails if the owner filter is dropped (the original cross-tenant behavior). """ import ast +import asyncio +import sys +import types from pathlib import Path +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest +from fastapi import HTTPException def test_get_upcoming_events_is_owner_scoped(): @@ -27,3 +35,292 @@ def test_get_upcoming_events_is_owner_scoped(): assert "join(CalendarCal)" in body assert "if owner is not None:" in body assert "q.filter(CalendarCal.owner == owner)" in body + + +class _Expr: + def __init__(self, op, field=None, value=None, children=()): + self.op = op + self.field = field + self.value = value + self.children = tuple(children) + + def __or__(self, other): + return _Expr("or", children=(self, other)) + + def __and__(self, other): + return _Expr("and", children=(self, other)) + + +class _Column: + def __init__(self, field): + self.field = field + + def __eq__(self, value): + return _Expr("eq", self.field, value) + + def __ne__(self, value): + return _Expr("ne", self.field, value) + + def __lt__(self, value): + return _Expr("lt", self.field, value) + + def __gt__(self, value): + return _Expr("gt", self.field, value) + + def is_(self, value): + return _Expr("is", self.field, value) + + def isnot(self, value): + return _Expr("isnot", self.field, value) + + +def _expr_contains(expr, field, value): + if isinstance(expr, _Expr): + if expr.field == field and expr.value == value: + return True + return any(_expr_contains(child, field, value) for child in expr.children) + return False + + +class _CalendarCal: + id = _Column("CalendarCal.id") + owner = _Column("CalendarCal.owner") + name = _Column("CalendarCal.name") + + +class _CalendarEvent: + uid = _Column("CalendarEvent.uid") + status = _Column("CalendarEvent.status") + rrule = _Column("CalendarEvent.rrule") + dtstart = _Column("CalendarEvent.dtstart") + dtend = _Column("CalendarEvent.dtend") + calendar_id = _Column("CalendarEvent.calendar_id") + + +class _FakeQuery: + def __init__(self, rows): + self.rows = rows + self.filter_calls = [] + self.owner_filter = None + self.all_called = False + + def join(self, *_args, **_kwargs): + return self + + def filter(self, *exprs): + self.filter_calls.append(exprs) + for expr in exprs: + if _expr_contains(expr, "CalendarCal.owner", "alice"): + self.owner_filter = "alice" + return self + + def order_by(self, *_args, **_kwargs): + return self + + def first(self): + return self.rows[0] if self.rows else None + + def all(self): + self.all_called = True + if self.owner_filter is None: + return list(self.rows) + return [ + row for row in self.rows + if getattr(getattr(row, "calendar", None), "owner", None) == self.owner_filter + ] + + +class _FakeSession: + def __init__(self, *, calendars=(), events=()): + self.calendar_query = _FakeQuery(list(calendars)) + self.event_query = _FakeQuery(list(events)) + self.add = MagicMock() + self.commit = MagicMock() + self.rollback = MagicMock() + self.close = MagicMock() + + def query(self, model): + if model is _CalendarCal: + return self.calendar_query + if model is _CalendarEvent: + return self.event_query + raise AssertionError(f"unexpected query model: {model!r}") + + +def _install_calendar_db_stub(monkeypatch): + db = types.ModuleType("core.database") + db.SessionLocal = MagicMock() + db.CalendarCal = _CalendarCal + db.CalendarEvent = _CalendarEvent + for name in [ + "Base", + "Document", + "DocumentVersion", + "Session", + "ChatMessage", + "GalleryImage", + "GalleryAlbum", + "Note", + "ScheduledTask", + "TaskRun", + "ModelEndpoint", + "Webhook", + ]: + setattr(db, name, MagicMock()) + monkeypatch.setitem(sys.modules, "core.database", db) + return db + + +def _install_multipart_stub(monkeypatch): + multipart = types.ModuleType("python_multipart") + multipart.__version__ = "0.0.20" + monkeypatch.setitem(sys.modules, "python_multipart", multipart) + + +def _import_calendar_routes(monkeypatch): + _install_calendar_db_stub(monkeypatch) + _install_multipart_stub(monkeypatch) + monkeypatch.delitem(sys.modules, "routes.calendar_routes", raising=False) + mod = __import__("routes.calendar_routes", fromlist=["setup_calendar_routes"]) + monkeypatch.setattr(mod, "or_", lambda *args: _Expr("or", children=args)) + monkeypatch.setattr(mod, "and_", lambda *args: _Expr("and", children=args)) + return mod + + +def _route_endpoint(calendar_routes, path, method): + router = calendar_routes.setup_calendar_routes() + full_path = f"/api/calendar{path}" + for route in router.routes: + if route.path == full_path and method in route.methods: + return route.endpoint + raise AssertionError(f"route not found: {method} {full_path}") + + +def _request(user="alice"): + return SimpleNamespace(state=SimpleNamespace(current_user=user)) + + +def _calendar(owner, cal_id="cal-target"): + return SimpleNamespace(id=cal_id, owner=owner, name=f"{owner or 'null'} calendar") + + +def _event(owner, uid): + return SimpleNamespace( + uid=uid, + calendar=_calendar(owner, cal_id=f"{owner or 'null'}-cal"), + calendar_id=f"{owner or 'null'}-cal", + dtstart=SimpleNamespace(isoformat=lambda: f"{uid}-start"), + dtend=SimpleNamespace(isoformat=lambda: f"{uid}-end"), + summary=uid, + description="", + location="", + all_day=False, + is_utc=False, + rrule="", + color=None, + event_type=None, + importance="normal", + ) + + +def test_create_event_rejects_null_owner_calendar_href_at_route_boundary(monkeypatch): + calendar_routes = _import_calendar_routes(monkeypatch) + session = _FakeSession(calendars=[_calendar(None)]) + monkeypatch.setattr(calendar_routes, "SessionLocal", lambda: session) + create_event = _route_endpoint(calendar_routes, "/events", "POST") + + with pytest.raises(HTTPException) as exc: + asyncio.run(create_event( + _request(), + calendar_routes.EventCreate( + summary="blocked", + dtstart="2026-06-02T10:00:00", + calendar_href="cal-target", + ), + )) + + assert exc.value.status_code == 404 + session.add.assert_not_called() + session.commit.assert_not_called() + session.close.assert_called_once() + + +def test_create_event_rejects_cross_owner_calendar_href_at_route_boundary(monkeypatch): + calendar_routes = _import_calendar_routes(monkeypatch) + session = _FakeSession(calendars=[_calendar("bob")]) + monkeypatch.setattr(calendar_routes, "SessionLocal", lambda: session) + create_event = _route_endpoint(calendar_routes, "/events", "POST") + + with pytest.raises(HTTPException) as exc: + asyncio.run(create_event( + _request(), + calendar_routes.EventCreate( + summary="blocked", + dtstart="2026-06-02T10:00:00", + calendar_href="cal-target", + ), + )) + + assert exc.value.status_code == 404 + session.add.assert_not_called() + session.commit.assert_not_called() + session.close.assert_called_once() + + +def test_list_events_filters_by_calendar_owner_before_output(monkeypatch): + calendar_routes = _import_calendar_routes(monkeypatch) + session = _FakeSession(events=[ + _event(None, "null-owner"), + _event("bob", "bob-event"), + _event("alice", "alice-event"), + ]) + monkeypatch.setattr(calendar_routes, "SessionLocal", lambda: session) + + expanded = [] + + def fake_expand(event, _start, _end): + assert event.calendar.owner == "alice" + expanded.append(event.uid) + return [{"uid": event.uid, "dtstart": "2026-06-02T10:00:00"}] + + monkeypatch.setattr(calendar_routes, "_expand_rrule", fake_expand) + list_events = _route_endpoint(calendar_routes, "/events", "GET") + + out = asyncio.run(list_events( + _request(), + start="2026-06-01T00:00:00", + end="2026-06-03T00:00:00", + )) + + assert out == {"events": [{"uid": "alice-event", "dtstart": "2026-06-02T10:00:00"}]} + assert expanded == ["alice-event"] + assert session.event_query.owner_filter == "alice" + session.close.assert_called_once() + + +def test_export_ics_rejects_null_owner_calendar_at_route_boundary(monkeypatch): + calendar_routes = _import_calendar_routes(monkeypatch) + session = _FakeSession(calendars=[_calendar(None)]) + monkeypatch.setattr(calendar_routes, "SessionLocal", lambda: session) + export_ics = _route_endpoint(calendar_routes, "/export/{cal_id}", "GET") + + with pytest.raises(HTTPException) as exc: + asyncio.run(export_ics(_request(), cal_id="cal-target")) + + assert exc.value.status_code == 404 + assert not session.event_query.all_called + session.close.assert_called_once() + + +def test_export_ics_rejects_cross_owner_calendar_at_route_boundary(monkeypatch): + calendar_routes = _import_calendar_routes(monkeypatch) + session = _FakeSession(calendars=[_calendar("bob")]) + monkeypatch.setattr(calendar_routes, "SessionLocal", lambda: session) + export_ics = _route_endpoint(calendar_routes, "/export/{cal_id}", "GET") + + with pytest.raises(HTTPException) as exc: + asyncio.run(export_ics(_request(), cal_id="cal-target")) + + assert exc.value.status_code == 404 + assert not session.event_query.all_called + session.close.assert_called_once()