Files
odysseus/tests/test_webhook_ssrf_resilience.py
2026-06-04 00:27:29 +01:00

111 lines
3.2 KiB
Python

import sys
import json
from datetime import datetime
# conftest.py stubs src.database with a fake module; webhook_manager imports
# from it, so drop the stub here to load the real module under test.
if "src.database" in sys.modules:
del sys.modules["src.database"]
_core_database = sys.modules.get("core.database")
_core_database_all = getattr(_core_database, "__all__", None) if _core_database is not None else None
if (
_core_database is not None
and (
not getattr(_core_database, "__file__", None)
or (
_core_database_all is not None
and (
not isinstance(_core_database_all, (list, tuple, set))
or not all(isinstance(name, str) for name in _core_database_all)
)
)
)
):
del sys.modules["core.database"]
import pytest
from src.webhook_manager import validate_webhook_url
def test_webhook_url_ssrf_mitigation():
# SSRF bypasses that must be rejected, including IPv6 unspecified and
# IPv4-mapped IPv6 (loopback + cloud metadata).
private_urls = [
"http://[::]/",
"http://[::ffff:127.0.0.1]/",
"http://[::ffff:169.254.169.254]/",
"http://127.0.0.1/",
"http://0.0.0.0/",
]
for url in private_urls:
with pytest.raises(ValueError) as exc:
validate_webhook_url(url)
assert "private/internal addresses" in str(exc.value)
# A clearly public IP literal must still be accepted.
public_url = "http://93.184.216.34/"
assert validate_webhook_url(public_url) == public_url
@pytest.mark.asyncio
async def test_webhook_delivery_uses_naive_utc_timestamps(monkeypatch):
import src.webhook_manager as wm
class _Query:
def __init__(self, updates):
self.updates = updates
def filter(self, *_args, **_kwargs):
return self
def update(self, values):
self.updates.append(values)
class _Db:
def __init__(self):
self.updates = []
self.committed = False
self.closed = False
def query(self, _model):
return _Query(self.updates)
def commit(self):
self.committed = True
def rollback(self):
pass
def close(self):
self.closed = True
class _Response:
status_code = 204
class _Client:
def __init__(self):
self.content = ""
async def post(self, _url, content, headers):
self.content = content
assert headers["X-Odysseus-Event"] == "webhook.test"
return _Response()
db = _Db()
client = _Client()
monkeypatch.setattr(wm, "SessionLocal", lambda: db)
manager = wm.WebhookManager()
await manager._client.aclose()
manager._client = client
await manager._deliver("hook-1", "http://93.184.216.34/", None, "webhook.test", {"ok": True})
body = json.loads(client.content)
payload_timestamp = datetime.fromisoformat(body["timestamp"])
assert payload_timestamp.tzinfo is None
assert db.updates[0]["last_triggered_at"].tzinfo is None
assert db.updates[0]["last_status_code"] == 204
assert db.committed is True
assert db.closed is True