diff --git a/routes/email_helpers.py b/routes/email_helpers.py index 409c6c4..fef2944 100644 --- a/routes/email_helpers.py +++ b/routes/email_helpers.py @@ -266,6 +266,48 @@ COMPOSE_UPLOADS_DIR.mkdir(parents=True, exist_ok=True) SCHEDULED_DB = DATA_DIR / "scheduled_emails.db" +OWNER_SCOPED_EMAIL_CACHE_TABLES = { + "email_summaries", + "email_ai_replies", + "email_calendar_extractions", + "email_urgency_alerts", +} + + +def _email_cache_owner_clause(owner: str = "") -> tuple[str, tuple[str, ...]]: + owner = (owner or "").strip() + if owner: + return "owner = ?", (owner,) + return "(owner = '' OR owner IS NULL)", () + + +def _ensure_owner_scoped_email_cache_table(conn, table: str, create_sql: str, columns: list[str]): + """Rebuild legacy Message-ID-only cache tables with owner in the PK.""" + conn.execute(create_sql) + try: + info = conn.execute(f"PRAGMA table_info({table})").fetchall() + cols = [r[1] for r in info] + pk_cols = [r[1] for r in sorted((r for r in info if r[5]), key=lambda r: r[5])] + if "owner" in cols and pk_cols == ["message_id", "owner"]: + return + + conn.execute(f"ALTER TABLE {table} RENAME TO {table}__old") + conn.execute(create_sql) + old_cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table}__old)").fetchall()] + copy_cols = [c for c in columns if c != "owner" and c in old_cols] + source_owner = "COALESCE(owner, '')" if "owner" in old_cols else "''" + target_cols = ["owner", *copy_cols] + select_exprs = [source_owner, *copy_cols] + conn.execute( + f"INSERT OR IGNORE INTO {table} ({', '.join(target_cols)}) " + f"SELECT {', '.join(select_exprs)} FROM {table}__old" + ) + conn.execute(f"DROP TABLE {table}__old") + except Exception as _mig_e: + import logging as _lg + _lg.getLogger(__name__).warning(f"{table} owner-migration skipped: {_mig_e}") + + def attachment_extract_dir(folder: str, uid: str) -> Path: """Containment-safe extraction directory for an attachment. @@ -301,30 +343,35 @@ def _init_scheduled_db(): owner TEXT DEFAULT '' ) """) - # Email summary cache (keyed by Message-ID) - conn.execute(""" + # Email summary cache. SECURITY: Message-IDs are global, so AI-derived + # cache rows must be owner-scoped just like email_tags. + _ensure_owner_scoped_email_cache_table(conn, "email_summaries", """ CREATE TABLE IF NOT EXISTS email_summaries ( - message_id TEXT PRIMARY KEY, + message_id TEXT, + owner TEXT DEFAULT '', uid TEXT, folder TEXT, subject TEXT, sender TEXT, summary TEXT NOT NULL, model_used TEXT, - created_at TEXT NOT NULL + created_at TEXT NOT NULL, + PRIMARY KEY (message_id, owner) ) - """) + """, ["message_id", "owner", "uid", "folder", "subject", "sender", "summary", "model_used", "created_at"]) # Email AI reply cache (pre-generated draft replies) - conn.execute(""" + _ensure_owner_scoped_email_cache_table(conn, "email_ai_replies", """ CREATE TABLE IF NOT EXISTS email_ai_replies ( - message_id TEXT PRIMARY KEY, + message_id TEXT, + owner TEXT DEFAULT '', uid TEXT, folder TEXT, reply TEXT NOT NULL, model_used TEXT, - created_at TEXT NOT NULL + created_at TEXT NOT NULL, + PRIMARY KEY (message_id, owner) ) - """) + """, ["message_id", "owner", "uid", "folder", "reply", "model_used", "created_at"]) # Email tags / spam classification cache. SECURITY: keyed by # (message_id, owner) because Message-IDs are GLOBAL (a newsletter goes # to many users with the same Message-ID). Without owner-scoping, a @@ -384,17 +431,20 @@ def _init_scheduled_db(): # Best-effort — log via the module logger if available import logging as _lg _lg.getLogger(__name__).warning(f"email_tags owner-migration skipped: {_mig_e}") - conn.execute(""" + _ensure_owner_scoped_email_cache_table(conn, "email_calendar_extractions", """ CREATE TABLE IF NOT EXISTS email_calendar_extractions ( - message_id TEXT PRIMARY KEY, + message_id TEXT, + owner TEXT DEFAULT '', uid TEXT, events_created INTEGER DEFAULT 0, - created_at TEXT NOT NULL + created_at TEXT NOT NULL, + PRIMARY KEY (message_id, owner) ) - """) - conn.execute(""" + """, ["message_id", "owner", "uid", "events_created", "created_at"]) + _ensure_owner_scoped_email_cache_table(conn, "email_urgency_alerts", """ CREATE TABLE IF NOT EXISTS email_urgency_alerts ( - message_id TEXT PRIMARY KEY, + message_id TEXT, + owner TEXT DEFAULT '', uid TEXT, folder TEXT, subject TEXT, @@ -402,9 +452,10 @@ def _init_scheduled_db(): urgency TEXT, reason TEXT, alerted INTEGER DEFAULT 0, - created_at TEXT NOT NULL + created_at TEXT NOT NULL, + PRIMARY KEY (message_id, owner) ) - """) + """, ["message_id", "owner", "uid", "folder", "subject", "sender", "urgency", "reason", "alerted", "created_at"]) conn.execute(""" CREATE TABLE IF NOT EXISTS email_event_seen ( owner TEXT NOT NULL, diff --git a/routes/email_pollers.py b/routes/email_pollers.py index 7bed2f6..04ffb0a 100644 --- a/routes/email_pollers.py +++ b/routes/email_pollers.py @@ -39,7 +39,7 @@ from routes.email_helpers import ( _extract_attachment_text, _extract_text, _pre_retrieve_context, _attach_compose_uploads, _cleanup_compose_uploads, _q, - SCHEDULED_DB, _EMAIL_REPLY_SYS_PROMPT_BASE, + SCHEDULED_DB, _EMAIL_REPLY_SYS_PROMPT_BASE, _email_cache_owner_clause, ) logger = logging.getLogger(__name__) @@ -243,8 +243,15 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None await _emit_progress(progress_cb, f"Found {len(uid_list)} recent email(s); checking cache…") _c = _sql3.connect(SCHEDULED_DB) - _sum_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_summaries").fetchall()} - _reply_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_ai_replies").fetchall()} + _cache_owner_clause, _cache_owner_params = _email_cache_owner_clause(account_owner) + _sum_existing = {r[0] for r in _c.execute( + f"SELECT message_id FROM email_summaries WHERE {_cache_owner_clause}", + _cache_owner_params, + ).fetchall()} + _reply_existing = {r[0] for r in _c.execute( + f"SELECT message_id FROM email_ai_replies WHERE {_cache_owner_clause}", + _cache_owner_params, + ).fetchall()} if auto_tag or auto_spam: if account_owner: _tag_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_tags WHERE owner=?", (account_owner,)).fetchall()} @@ -252,12 +259,18 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None _tag_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_tags WHERE owner='' OR owner IS NULL").fetchall()} else: _tag_existing = set() - _cal_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_calendar_extractions").fetchall()} if auto_cal else set() + _cal_existing = {r[0] for r in _c.execute( + f"SELECT message_id FROM email_calendar_extractions WHERE {_cache_owner_clause}", + _cache_owner_params, + ).fetchall()} if auto_cal else set() # Urgency is handled by the built-in `check_email_urgency` task. Keep # this legacy poller path disabled so users don't get two independent # urgent-email systems. auto_urgent = False - _urgent_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_urgency_alerts").fetchall()} if auto_urgent else set() + _urgent_existing = {r[0] for r in _c.execute( + f"SELECT message_id FROM email_urgency_alerts WHERE {_cache_owner_clause}", + _cache_owner_params, + ).fetchall()} if auto_urgent else set() _c.close() # Hoist the self-address lookup OUT of the per-email loop — fetching @@ -415,9 +428,9 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None _c = _sql3.connect(SCHEDULED_DB) _c.execute(""" INSERT OR REPLACE INTO email_summaries - (message_id, uid, folder, subject, sender, summary, model_used, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) - """, (message_id, uid.decode() if isinstance(uid, bytes) else str(uid), _folder, subject, sender, summary, model, datetime.utcnow().isoformat())) + (message_id, owner, uid, folder, subject, sender, summary, model_used, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, (message_id, account_owner or "", uid.decode() if isinstance(uid, bytes) else str(uid), _folder, subject, sender, summary, model, datetime.utcnow().isoformat())) _c.commit() _c.close() _sum_existing.add(message_id) @@ -458,9 +471,9 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None _c = _sql3.connect(SCHEDULED_DB) _c.execute(""" INSERT OR REPLACE INTO email_ai_replies - (message_id, uid, folder, reply, model_used, created_at) - VALUES (?, ?, ?, ?, ?, ?) - """, (message_id, uid.decode() if isinstance(uid, bytes) else str(uid), _folder, reply, model, datetime.utcnow().isoformat())) + (message_id, owner, uid, folder, reply, model_used, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, (message_id, account_owner or "", uid.decode() if isinstance(uid, bytes) else str(uid), _folder, reply, model, datetime.utcnow().isoformat())) _c.commit() _c.close() _reply_existing.add(message_id) @@ -675,8 +688,8 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None _cc = _sql3.connect(SCHEDULED_DB) _cc.execute( "INSERT OR REPLACE INTO email_calendar_extractions " - "(message_id, uid, events_created, created_at) VALUES (?, ?, ?, ?)", - (message_id, uid.decode() if isinstance(uid, bytes) else str(uid), + "(message_id, owner, uid, events_created, created_at) VALUES (?, ?, ?, ?, ?)", + (message_id, account_owner or "", uid.decode() if isinstance(uid, bytes) else str(uid), _cal_run_count, datetime.utcnow().isoformat()) ) _cc.commit() @@ -733,9 +746,9 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None _uc = _sql3.connect(SCHEDULED_DB) _uc.execute( "INSERT OR REPLACE INTO email_urgency_alerts " - "(message_id, uid, folder, subject, sender, urgency, reason, alerted, created_at) " - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", - (message_id, uid.decode() if isinstance(uid, bytes) else str(uid), + "(message_id, owner, uid, folder, subject, sender, urgency, reason, alerted, created_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (message_id, account_owner or "", uid.decode() if isinstance(uid, bytes) else str(uid), _folder, subject, sender, urgency, reason, 1 if urgency in ("critical", "high") else 0, datetime.utcnow().isoformat()) diff --git a/routes/email_routes.py b/routes/email_routes.py index 87f8e76..7ab033b 100644 --- a/routes/email_routes.py +++ b/routes/email_routes.py @@ -49,7 +49,7 @@ from routes.email_helpers import ( _EMAIL_REPLY_SYS_PROMPT_BASE, _POOL_HOOKS, SendEmailRequest, ExtractStyleRequest, ATTACHMENTS_DIR, COMPOSE_UPLOADS_DIR, SCHEDULED_DB, - attachment_extract_dir, + attachment_extract_dir, _email_cache_owner_clause, ) from routes.email_pollers import _start_poller @@ -934,9 +934,11 @@ def setup_email_routes(): import sqlite3 as _sql3 _c = _sql3.connect(SCHEDULED_DB) placeholders = ",".join("?" * len(ids)) + owner_clause, owner_params = _email_cache_owner_clause(owner) rows = _c.execute( - f"SELECT message_id, summary FROM email_summaries WHERE message_id IN ({placeholders})", - ids, + f"SELECT message_id, summary FROM email_summaries " + f"WHERE message_id IN ({placeholders}) AND {owner_clause}", + (*ids, *owner_params), ).fetchall() _c.close() by_id = {r[0]: r[1] for r in rows} @@ -1219,15 +1221,16 @@ def setup_email_routes(): try: import sqlite3 as _sql3 _c = _sql3.connect(SCHEDULED_DB) + owner_clause, owner_params = _email_cache_owner_clause(owner) _row = _c.execute( - "SELECT summary FROM email_summaries WHERE message_id = ?", - (message_id.strip(),), + f"SELECT summary FROM email_summaries WHERE message_id = ? AND {owner_clause}", + (message_id.strip(), *owner_params), ).fetchone() if _row: cached_summary = _row[0] _row2 = _c.execute( - "SELECT reply FROM email_ai_replies WHERE message_id = ?", - (message_id.strip(),), + f"SELECT reply FROM email_ai_replies WHERE message_id = ? AND {owner_clause}", + (message_id.strip(), *owner_params), ).fetchone() if _row2: cached_ai_reply = _apply_email_style_mechanics(_extract_reply(_row2[0] or "")) @@ -2549,10 +2552,10 @@ def setup_email_routes(): _c = _sql3.connect(SCHEDULED_DB) _c.execute(""" INSERT OR REPLACE INTO email_summaries - (message_id, uid, folder, subject, sender, summary, model_used, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) + (message_id, owner, uid, folder, subject, sender, summary, model_used, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( - mid, data.get("uid", ""), data.get("folder", ""), + mid, owner, data.get("uid", ""), data.get("folder", ""), subject, sender, content, model, datetime.utcnow().isoformat(), )) _c.commit() @@ -2587,9 +2590,10 @@ def setup_email_routes(): if message_id: try: _c = _sql3.connect(SCHEDULED_DB) + owner_clause, owner_params = _email_cache_owner_clause(owner) _row = _c.execute( - "SELECT reply, model_used FROM email_ai_replies WHERE message_id = ?", - (message_id,), + f"SELECT reply, model_used FROM email_ai_replies WHERE message_id = ? AND {owner_clause}", + (message_id, *owner_params), ).fetchone() _c.close() if _row and _row[0]: @@ -2791,9 +2795,9 @@ def setup_email_routes(): _c = _sql3.connect(SCHEDULED_DB) _c.execute(""" INSERT OR REPLACE INTO email_ai_replies - (message_id, uid, folder, reply, model_used, created_at) - VALUES (?, ?, ?, ?, ?, ?) - """, (message_id, source_uid, source_folder, reply, model, datetime.utcnow().isoformat())) + (message_id, owner, uid, folder, reply, model_used, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, (message_id, owner, source_uid, source_folder, reply, model, datetime.utcnow().isoformat())) _c.commit() _c.close() except Exception as e: diff --git a/routes/task_routes.py b/routes/task_routes.py index ebe9da1..a5c49ad 100644 --- a/routes/task_routes.py +++ b/routes/task_routes.py @@ -455,7 +455,7 @@ def setup_task_routes(task_scheduler) -> APIRouter: import sqlite3 from pathlib import Path - from routes.email_helpers import SCHEDULED_DB + from routes.email_helpers import SCHEDULED_DB, OWNER_SCOPED_EMAIL_CACHE_TABLES, _email_cache_owner_clause cleared = {} conn = sqlite3.connect(SCHEDULED_DB) @@ -468,6 +468,13 @@ def setup_task_routes(task_scheduler) -> APIRouter: (user,), ).fetchone()[0] conn.execute("DELETE FROM email_tags WHERE owner = ? OR owner = ''", (user,)) + elif table in OWNER_SCOPED_EMAIL_CACHE_TABLES and user: + owner_clause, owner_params = _email_cache_owner_clause(user) + before = conn.execute( + f"SELECT COUNT(*) FROM {table} WHERE {owner_clause}", + owner_params, + ).fetchone()[0] + conn.execute(f"DELETE FROM {table} WHERE {owner_clause}", owner_params) else: before = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] conn.execute(f"DELETE FROM {table}") diff --git a/tests/test_email_owner_scope.py b/tests/test_email_owner_scope.py index 5445e17..2c04db2 100644 --- a/tests/test_email_owner_scope.py +++ b/tests/test_email_owner_scope.py @@ -43,6 +43,129 @@ def test_email_tag_clause_keeps_legacy_rows_for_single_user_mode(monkeypatch): assert params == [""] +def test_email_ai_cache_tables_are_owner_scoped_and_migrate_legacy_rows(tmp_path, monkeypatch): + import routes.email_helpers as email_helpers + + db_path = tmp_path / "scheduled_emails.db" + monkeypatch.setattr(email_helpers, "SCHEDULED_DB", db_path) + + conn = sqlite3.connect(db_path) + conn.execute( + """ + CREATE TABLE email_summaries ( + message_id TEXT PRIMARY KEY, + uid TEXT, + folder TEXT, + subject TEXT, + sender TEXT, + summary TEXT NOT NULL, + model_used TEXT, + created_at TEXT NOT NULL + ) + """ + ) + conn.execute( + """ + INSERT INTO email_summaries + (message_id, uid, folder, subject, sender, summary, model_used, created_at) + VALUES ('', '1', 'INBOX', 'Subject', 'a@example.com', 'legacy', 'm', '2026-01-01') + """ + ) + conn.commit() + conn.close() + + email_helpers._init_scheduled_db() + + conn = sqlite3.connect(db_path) + try: + for table in ( + "email_summaries", + "email_ai_replies", + "email_calendar_extractions", + "email_urgency_alerts", + ): + info = conn.execute(f"PRAGMA table_info({table})").fetchall() + pk_cols = [r[1] for r in sorted((r for r in info if r[5]), key=lambda r: r[5])] + assert pk_cols == ["message_id", "owner"] + assert conn.execute( + "SELECT owner, summary FROM email_summaries WHERE message_id=?", + ("",), + ).fetchone() == ("", "legacy") + + conn.execute( + """ + INSERT INTO email_summaries + (message_id, owner, uid, folder, subject, sender, summary, model_used, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ("", "alice", "2", "INBOX", "Subject", "a@example.com", "alice", "m", "2026-01-02"), + ) + conn.execute( + """ + INSERT INTO email_summaries + (message_id, owner, uid, folder, subject, sender, summary, model_used, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ("", "bob", "3", "INBOX", "Subject", "a@example.com", "bob", "m", "2026-01-03"), + ) + rows = conn.execute( + "SELECT owner, summary FROM email_summaries WHERE message_id=? ORDER BY owner", + ("",), + ).fetchall() + assert rows == [("", "legacy"), ("alice", "alice"), ("bob", "bob")] + finally: + conn.close() + + +@pytest.mark.asyncio +async def test_ai_reply_cache_lookup_is_owner_scoped(tmp_path, monkeypatch): + import routes.email_helpers as email_helpers + import routes.email_routes as email_routes + + db_path = tmp_path / "scheduled_emails.db" + monkeypatch.setattr(email_helpers, "SCHEDULED_DB", db_path) + monkeypatch.setattr(email_routes, "SCHEDULED_DB", db_path) + email_helpers._init_scheduled_db() + + conn = sqlite3.connect(db_path) + conn.execute( + """ + INSERT INTO email_ai_replies + (message_id, owner, uid, folder, reply, model_used, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ("", "alice", "1", "INBOX", "alice private draft", "m-a", "2026-01-01"), + ) + conn.execute( + """ + INSERT INTO email_ai_replies + (message_id, owner, uid, folder, reply, model_used, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ("", "bob", "2", "INBOX", "bob private draft", "m-b", "2026-01-02"), + ) + conn.commit() + conn.close() + + router = email_routes.setup_email_routes() + ai_reply = _route_endpoint(router, "/api/email/ai-reply", "POST") + + result = await ai_reply( + { + "to": "sender@example.com", + "subject": "Subject", + "original_body": "Body", + "message_id": "", + }, + owner="bob", + ) + + assert result["success"] is True + assert result["cached"] is True + assert result["reply"] == "bob private draft" + assert result["model_used"] == "m-b" + + @pytest.mark.asyncio async def test_scheduled_email_routes_are_owner_scoped(tmp_path, monkeypatch): import routes.email_helpers as email_helpers