From 6fd52cf317351929d1e0637ec8ec9a8348d9716a Mon Sep 17 00:00:00 2001 From: ghreprimand Date: Wed, 3 Jun 2026 00:14:23 -0500 Subject: [PATCH] Replace webhook manager datetime.utcnow calls (#1499) Co-authored-by: ghreprimand <203024559+ghreprimand@users.noreply.github.com> --- src/webhook_manager.py | 13 ++++-- tests/test_webhook_ssrf_resilience.py | 66 +++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/src/webhook_manager.py b/src/webhook_manager.py index 5e56032..e43f8e4 100644 --- a/src/webhook_manager.py +++ b/src/webhook_manager.py @@ -7,7 +7,7 @@ import ipaddress import json import logging import re -from datetime import datetime +from datetime import datetime, timezone from typing import Optional from urllib.parse import urlparse @@ -37,6 +37,11 @@ _PRIVATE_NETWORKS = [ ] +def _utcnow() -> datetime: + """Return naive UTC for existing DB columns while avoiding datetime.utcnow().""" + return datetime.now(timezone.utc).replace(tzinfo=None) + + def _ip_is_private(addr: ipaddress._BaseAddress) -> bool: # If the address is IPv4-mapped IPv6, extract and evaluate the embedded IPv4 if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped is not None: @@ -203,7 +208,7 @@ class WebhookManager: logger.warning(f"Webhook {webhook_id} has invalid URL, skipping: {e}") return - body = json.dumps({"event": event, "timestamp": datetime.utcnow().isoformat(), "data": payload}) + body = json.dumps({"event": event, "timestamp": _utcnow().isoformat(), "data": payload}) headers = { "Content-Type": "application/json", "X-Odysseus-Event": event, @@ -217,7 +222,7 @@ class WebhookManager: try: resp = await self._client.post(url, content=body, headers=headers) db.query(Webhook).filter(Webhook.id == webhook_id).update({ - "last_triggered_at": datetime.utcnow(), + "last_triggered_at": _utcnow(), "last_status_code": resp.status_code, "last_error": None, }) @@ -226,7 +231,7 @@ class WebhookManager: logger.warning(f"Webhook delivery failed for {webhook_id}") try: db.query(Webhook).filter(Webhook.id == webhook_id).update({ - "last_triggered_at": datetime.utcnow(), + "last_triggered_at": _utcnow(), "last_status_code": None, "last_error": sanitize_error(str(e)), }) diff --git a/tests/test_webhook_ssrf_resilience.py b/tests/test_webhook_ssrf_resilience.py index 8557b78..6cc7312 100644 --- a/tests/test_webhook_ssrf_resilience.py +++ b/tests/test_webhook_ssrf_resilience.py @@ -1,4 +1,7 @@ import sys +import json +from datetime import datetime + # conftest.py stubs src.database with a fake module; webhook_manager imports # from it, so drop the stub here to load the real module under test. if "src.database" in sys.modules: @@ -26,3 +29,66 @@ def test_webhook_url_ssrf_mitigation(): # A clearly public IP literal must still be accepted. public_url = "http://93.184.216.34/" assert validate_webhook_url(public_url) == public_url + + +@pytest.mark.asyncio +async def test_webhook_delivery_uses_naive_utc_timestamps(monkeypatch): + import src.webhook_manager as wm + + class _Query: + def __init__(self, updates): + self.updates = updates + + def filter(self, *_args, **_kwargs): + return self + + def update(self, values): + self.updates.append(values) + + class _Db: + def __init__(self): + self.updates = [] + self.committed = False + self.closed = False + + def query(self, _model): + return _Query(self.updates) + + def commit(self): + self.committed = True + + def rollback(self): + pass + + def close(self): + self.closed = True + + class _Response: + status_code = 204 + + class _Client: + def __init__(self): + self.content = "" + + async def post(self, _url, content, headers): + self.content = content + assert headers["X-Odysseus-Event"] == "webhook.test" + return _Response() + + db = _Db() + client = _Client() + monkeypatch.setattr(wm, "SessionLocal", lambda: db) + + manager = wm.WebhookManager() + await manager._client.aclose() + manager._client = client + + await manager._deliver("hook-1", "http://93.184.216.34/", None, "webhook.test", {"ok": True}) + + body = json.loads(client.content) + payload_timestamp = datetime.fromisoformat(body["timestamp"]) + assert payload_timestamp.tzinfo is None + assert db.updates[0]["last_triggered_at"].tzinfo is None + assert db.updates[0]["last_status_code"] == 204 + assert db.committed is True + assert db.closed is True