Harden CalDAV credentials and URLs (#1310)

This commit is contained in:
Vykos
2026-06-02 19:50:02 +02:00
committed by GitHub
parent 56656de5bc
commit b2291fad49
3 changed files with 188 additions and 5 deletions

View File

@@ -528,13 +528,20 @@ def setup_calendar_routes() -> APIRouter:
owner = _require_user(request) owner = _require_user(request)
from routes.prefs_routes import _load_for_user from routes.prefs_routes import _load_for_user
cfg = (_load_for_user(owner) or {}).get("caldav", {}) or {} cfg = (_load_for_user(owner) or {}).get("caldav", {}) or {}
caldav_password = cfg.get("password") or ""
if caldav_password:
try:
from src.secret_storage import decrypt
caldav_password = decrypt(caldav_password)
except Exception:
pass
# Surface url+username but never hand the password back to the # Surface url+username but never hand the password back to the
# client — saved-state UI shouldn't leak the credential. # client — saved-state UI shouldn't leak the credential.
return { return {
"url": cfg.get("url", "") or "", "url": cfg.get("url", "") or "",
"username": cfg.get("username", "") or "", "username": cfg.get("username", "") or "",
"password": "", "password": "",
"has_password": bool(cfg.get("password")), "has_password": bool(caldav_password),
"local": not bool(cfg.get("url")), "local": not bool(cfg.get("url")),
} }
@@ -553,12 +560,20 @@ def setup_calendar_routes() -> APIRouter:
prefs.pop("caldav", None) prefs.pop("caldav", None)
_save_for_user(owner, prefs) _save_for_user(owner, prefs)
return {"ok": True, "cleared": True} return {"ok": True, "cleared": True}
cfg["url"] = body.get("url", "").strip() from src.caldav_sync import validate_caldav_url
try:
cfg["url"] = validate_caldav_url(body.get("url", ""))
except ValueError as e:
raise HTTPException(400, str(e))
cfg["username"] = (body.get("username") or "").strip() cfg["username"] = (body.get("username") or "").strip()
# Preserve the stored password when the client sends an empty # Preserve the stored password when the client sends an empty
# one (edit form re-submitted without re-typing the password). # one (edit form re-submitted without re-typing the password).
if body.get("password"): if body.get("password"):
cfg["password"] = body["password"] from src.secret_storage import encrypt
cfg["password"] = encrypt(body["password"])
elif cfg.get("password"):
from src.secret_storage import encrypt
cfg["password"] = encrypt(cfg["password"])
prefs["caldav"] = cfg prefs["caldav"] = cfg
_save_for_user(owner, prefs) _save_for_user(owner, prefs)
return {"ok": True} return {"ok": True}
@@ -585,9 +600,21 @@ def setup_calendar_routes() -> APIRouter:
cfg = (_load_for_user(owner) or {}).get("caldav", {}) or {} cfg = (_load_for_user(owner) or {}).get("caldav", {}) or {}
url = url or (cfg.get("url") or "") url = url or (cfg.get("url") or "")
user = user or (cfg.get("username") or "") user = user or (cfg.get("username") or "")
pw = pw or (cfg.get("password") or "") if not pw:
pw = cfg.get("password") or ""
if pw:
try:
from src.secret_storage import decrypt
pw = decrypt(pw)
except Exception:
pass
if not (url and user and pw): if not (url and user and pw):
return {"ok": False, "error": "Missing URL, username, or password"} return {"ok": False, "error": "Missing URL, username, or password"}
from src.caldav_sync import validate_caldav_url
try:
url = validate_caldav_url(url)
except ValueError as e:
return {"ok": False, "error": str(e)}
import httpx import httpx
propfind_body = ( propfind_body = (
'<?xml version="1.0" encoding="UTF-8"?>\n' '<?xml version="1.0" encoding="UTF-8"?>\n'
@@ -595,7 +622,7 @@ def setup_calendar_routes() -> APIRouter:
'</d:prop></d:propfind>' '</d:prop></d:propfind>'
) )
try: try:
async with httpx.AsyncClient(timeout=8.0, follow_redirects=True) as cx: async with httpx.AsyncClient(timeout=8.0, follow_redirects=False, trust_env=False) as cx:
r = await cx.request( r = await cx.request(
"PROPFIND", url, "PROPFIND", url,
auth=(user, pw), auth=(user, pw),
@@ -612,6 +639,8 @@ def setup_calendar_routes() -> APIRouter:
return {"ok": False, "error": "Forbidden — user can't access that URL"} return {"ok": False, "error": "Forbidden — user can't access that URL"}
if r.status_code == 404: if r.status_code == 404:
return {"ok": False, "error": "Not found — check the URL path"} return {"ok": False, "error": "Not found — check the URL path"}
if 300 <= r.status_code < 400:
return {"ok": False, "error": "Redirects are not followed for CalDAV safety; use the final URL"}
return {"ok": False, "error": f"HTTP {r.status_code}"} return {"ok": False, "error": f"HTTP {r.status_code}"}
except httpx.ConnectError as e: except httpx.ConnectError as e:
return {"ok": False, "error": f"Connection refused: {e}"[:200]} return {"ok": False, "error": f"Connection refused: {e}"[:200]}

View File

@@ -24,9 +24,12 @@ Design notes:
import asyncio import asyncio
import hashlib import hashlib
import ipaddress
import logging import logging
import os
import uuid import uuid
from datetime import date, datetime, timedelta, timezone from datetime import date, datetime, timedelta, timezone
from urllib.parse import urlparse, urlunparse
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -35,6 +38,52 @@ logger = logging.getLogger(__name__)
# events still come through via RRULE expansion on the frontend. # events still come through via RRULE expansion on the frontend.
_LOOKBACK_DAYS = 90 _LOOKBACK_DAYS = 90
_LOOKAHEAD_DAYS = 365 _LOOKAHEAD_DAYS = 365
_BLOCKED_HOSTS = {
"localhost",
"localhost.",
"ip6-localhost",
"metadata.google.internal",
}
def _private_caldav_allowed() -> bool:
return os.environ.get("ODYSSEUS_ALLOW_PRIVATE_CALDAV", "0").lower() in {"1", "true", "yes"}
def _validate_caldav_ip(host: str) -> None:
try:
ip = ipaddress.ip_address(host.strip("[]"))
except ValueError:
return
if ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_unspecified:
raise ValueError("CalDAV URL host is not allowed")
if ip.is_private and not _private_caldav_allowed():
raise ValueError("Private CalDAV IPs require ODYSSEUS_ALLOW_PRIVATE_CALDAV=1")
def validate_caldav_url(raw_url: str) -> str:
"""Validate and normalize a user-provided CalDAV URL before server-side use."""
url = (raw_url or "").strip()
if not url:
raise ValueError("CalDAV URL is required")
parsed = urlparse(url)
if parsed.scheme not in {"http", "https"}:
raise ValueError("CalDAV URL must start with http:// or https://")
if not parsed.hostname:
raise ValueError("CalDAV URL must include a host")
if parsed.username or parsed.password:
raise ValueError("Put CalDAV credentials in the username/password fields, not the URL")
if parsed.fragment:
raise ValueError("CalDAV URL fragments are not allowed")
try:
parsed.port
except ValueError:
raise ValueError("CalDAV URL has an invalid port")
host = (parsed.hostname or "").lower()
if host in _BLOCKED_HOSTS or host.endswith(".localhost"):
raise ValueError("CalDAV URL host is not allowed")
_validate_caldav_ip(host)
return urlunparse(parsed._replace(fragment="")).rstrip("/")
def _stable_cal_id(remote_url: str) -> str: def _stable_cal_id(remote_url: str) -> str:
@@ -250,13 +299,21 @@ async def sync_caldav(owner: str) -> dict:
url = (cfg.get("url") or "").strip() url = (cfg.get("url") or "").strip()
user = (cfg.get("username") or "").strip() user = (cfg.get("username") or "").strip()
pw = cfg.get("password") or "" pw = cfg.get("password") or ""
try:
from src.secret_storage import decrypt
pw = decrypt(pw)
except Exception:
pass
if not (url and user and pw): if not (url and user and pw):
return { return {
"calendars": 0, "events": 0, "deleted": 0, "calendars": 0, "events": 0, "deleted": 0,
"errors": ["CalDAV is not configured"], "errors": ["CalDAV is not configured"],
} }
try: try:
url = validate_caldav_url(url)
return await asyncio.to_thread(_sync_blocking, owner, url, user, pw) return await asyncio.to_thread(_sync_blocking, owner, url, user, pw)
except ValueError as e:
return {"calendars": 0, "events": 0, "deleted": 0, "errors": [str(e)]}
except Exception as e: except Exception as e:
logger.exception("CalDAV sync raised") logger.exception("CalDAV sync raised")
return {"calendars": 0, "events": 0, "deleted": 0, "errors": [str(e)[:200]]} return {"calendars": 0, "events": 0, "deleted": 0, "errors": [str(e)[:200]]}

View File

@@ -0,0 +1,97 @@
import asyncio
import sys
import types
from pathlib import Path
import pytest
from src import caldav_sync
def test_validate_caldav_url_normalizes_safe_url():
assert (
caldav_sync.validate_caldav_url(" https://calendar.example.com/dav/ ")
== "https://calendar.example.com/dav"
)
@pytest.mark.parametrize(
"url, message",
[
("ftp://calendar.example.com/dav", "must start with"),
("https://alice:secret@calendar.example.com/dav", "credentials"),
("https://calendar.example.com/dav#frag", "fragments"),
("http://localhost:5232/dav", "host is not allowed"),
("http://service.localhost/dav", "host is not allowed"),
("http://127.0.0.1:5232/dav", "host is not allowed"),
("http://[::1]:5232/dav", "host is not allowed"),
("http://169.254.169.254/latest", "host is not allowed"),
],
)
def test_validate_caldav_url_rejects_unsafe_urls(url, message):
with pytest.raises(ValueError, match=message):
caldav_sync.validate_caldav_url(url)
def test_validate_caldav_url_blocks_private_ips_unless_explicitly_allowed(monkeypatch):
monkeypatch.delenv("ODYSSEUS_ALLOW_PRIVATE_CALDAV", raising=False)
with pytest.raises(ValueError, match="Private CalDAV IPs require"):
caldav_sync.validate_caldav_url("http://10.0.0.5:5232/dav")
monkeypatch.setenv("ODYSSEUS_ALLOW_PRIVATE_CALDAV", "1")
assert caldav_sync.validate_caldav_url("http://10.0.0.5:5232/dav") == "http://10.0.0.5:5232/dav"
def test_sync_caldav_decrypts_stored_password_and_validates_url(monkeypatch):
prefs_mod = types.ModuleType("routes.prefs_routes")
prefs_mod._load_for_user = lambda owner: {
"caldav": {
"url": " https://calendar.example.com/dav/ ",
"username": owner,
"password": "enc:stored",
}
}
monkeypatch.setitem(sys.modules, "routes.prefs_routes", prefs_mod)
secret_mod = types.ModuleType("src.secret_storage")
secret_mod.decrypt = lambda value: "decrypted-password" if value == "enc:stored" else value
monkeypatch.setitem(sys.modules, "src.secret_storage", secret_mod)
captured = {}
def fake_sync_blocking(owner, url, username, password):
captured.update(
{
"owner": owner,
"url": url,
"username": username,
"password": password,
}
)
return {"calendars": 1, "events": 0, "deleted": 0, "errors": []}
async def inline_to_thread(func, *args, **kwargs):
return func(*args, **kwargs)
monkeypatch.setattr(caldav_sync, "_sync_blocking", fake_sync_blocking)
monkeypatch.setattr(caldav_sync.asyncio, "to_thread", inline_to_thread)
result = asyncio.run(caldav_sync.sync_caldav("alice"))
assert result["calendars"] == 1
assert captured == {
"owner": "alice",
"url": "https://calendar.example.com/dav",
"username": "alice",
"password": "decrypted-password",
}
def test_calendar_routes_use_hardened_caldav_client_and_secret_storage():
text = Path("routes/calendar_routes.py").read_text(encoding="utf-8")
assert "validate_caldav_url(body.get(\"url\", \"\"))" in text
assert "cfg[\"password\"] = encrypt(body[\"password\"])" in text
assert "pw = decrypt(pw)" in text
assert "follow_redirects=False, trust_env=False" in text
assert "Redirects are not followed for CalDAV safety" in text