fix(calendar): scope CalDAV event lookup by calendar
* fix: CalDAV sync hijacks another user's event sharing a VEVENT uid * Seed schema-valid dtstart/dtend in caldav uid-scope test fixture
This commit is contained in:
@@ -105,6 +105,25 @@ def _to_utc_naive(dt):
|
||||
return datetime(dt.year, dt.month, dt.day), True
|
||||
|
||||
|
||||
def _find_existing_event(db, pending, uid_val, calendar_id):
|
||||
"""Find the event to update for THIS calendar.
|
||||
|
||||
CalendarEvent.uid is the global primary key, so an unscoped lookup by uid
|
||||
returns whatever row holds that VEVENT uid — including another owner's.
|
||||
The old code then reassigned that row's calendar_id, moving (stealing)
|
||||
another user's event into the syncing calendar whenever the two share a
|
||||
uid (shared/subscribed/public calendars, or two accounts on one server).
|
||||
Scope the lookup to the calendar being synced; a genuine cross-user uid
|
||||
collision then fails the PK insert inside the per-calendar try/except
|
||||
instead of hijacking the row. (import_ics was already fixed this way.)
|
||||
"""
|
||||
from core.database import CalendarEvent
|
||||
return pending.get(uid_val) or db.query(CalendarEvent).filter(
|
||||
CalendarEvent.uid == uid_val,
|
||||
CalendarEvent.calendar_id == calendar_id,
|
||||
).first()
|
||||
|
||||
|
||||
def _sync_blocking(owner: str, url: str, username: str, password: str) -> dict:
|
||||
"""The actual sync — synchronous, intended to run in a threadpool.
|
||||
Returns counts: {calendars, events, deleted, errors}."""
|
||||
@@ -235,9 +254,7 @@ def _sync_blocking(owner: str, url: str, username: str, password: str) -> dict:
|
||||
else ""
|
||||
)
|
||||
|
||||
existing = pending.get(uid_val) or db.query(CalendarEvent).filter(
|
||||
CalendarEvent.uid == uid_val,
|
||||
).first()
|
||||
existing = _find_existing_event(db, pending, uid_val, local_cal.id)
|
||||
if existing:
|
||||
existing.calendar_id = local_cal.id
|
||||
existing.summary = summary
|
||||
|
||||
76
tests/test_caldav_sync_uid_scope.py
Normal file
76
tests/test_caldav_sync_uid_scope.py
Normal file
@@ -0,0 +1,76 @@
|
||||
"""CalDAV sync must not hijack another user's event via a shared VEVENT uid.
|
||||
|
||||
CalendarEvent.uid is the global primary key. _sync_blocking looked up the
|
||||
existing event by uid with NO calendar scope, so when user B synced a uid
|
||||
that user A's calendar already held, the query returned A's row and the sync
|
||||
reassigned its calendar_id to B's calendar — stealing A's event. The lookup
|
||||
must be scoped to the calendar being synced.
|
||||
"""
|
||||
import tempfile
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.pool import NullPool
|
||||
|
||||
import core.database as cdb
|
||||
from core.database import CalendarEvent, CalendarCal
|
||||
from src.caldav_sync import _find_existing_event
|
||||
|
||||
_TMPDB = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
|
||||
_ENGINE = create_engine(f"sqlite:///{_TMPDB.name}", connect_args={"check_same_thread": False}, poolclass=NullPool)
|
||||
cdb.Base.metadata.create_all(_ENGINE)
|
||||
_TS = sessionmaker(bind=_ENGINE, autoflush=False, autocommit=False)
|
||||
|
||||
|
||||
def _setup():
|
||||
db = _TS()
|
||||
try:
|
||||
db.query(CalendarEvent).delete(); db.query(CalendarCal).delete()
|
||||
db.add(CalendarCal(id="calA", owner="alice", name="A"))
|
||||
db.add(CalendarCal(id="calB", owner="bob", name="B"))
|
||||
# dtstart/dtend are NOT NULL in the schema, so seed valid values.
|
||||
db.add(CalendarEvent(
|
||||
uid="shared@svc", calendar_id="calA", summary="Alice event",
|
||||
dtstart=datetime(2026, 6, 4, 9, 0), dtend=datetime(2026, 6, 4, 10, 0),
|
||||
))
|
||||
db.commit()
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def test_lookup_for_other_calendar_does_not_find_a_users_event():
|
||||
_setup()
|
||||
db = _TS()
|
||||
try:
|
||||
# Bob's calendar syncing the same uid must NOT resolve Alice's row.
|
||||
assert _find_existing_event(db, {}, "shared@svc", "calB") is None
|
||||
# Same calendar still resolves its own event (normal update path).
|
||||
own = _find_existing_event(db, {}, "shared@svc", "calA")
|
||||
assert own is not None and own.calendar_id == "calA"
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def test_alice_event_is_not_moved():
|
||||
_setup()
|
||||
db = _TS()
|
||||
try:
|
||||
# Simulate the (fixed) sync deciding there is no existing row for calB.
|
||||
assert _find_existing_event(db, {}, "shared@svc", "calB") is None
|
||||
ev = db.query(CalendarEvent).filter(CalendarEvent.uid == "shared@svc").first()
|
||||
assert ev.calendar_id == "calA" # unchanged — not hijacked
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def test_pending_takes_precedence():
|
||||
_setup()
|
||||
db = _TS()
|
||||
try:
|
||||
sentinel = object()
|
||||
assert _find_existing_event(db, {"shared@svc": sentinel}, "shared@svc", "calB") is sentinel
|
||||
finally:
|
||||
db.close()
|
||||
Reference in New Issue
Block a user