From 41d2767b30c496c8eb28ee6370ae8cb525be4b44 Mon Sep 17 00:00:00 2001 From: ghreprimand Date: Wed, 3 Jun 2026 00:14:30 -0500 Subject: [PATCH] Replace task scheduler utcnow calls (#1456) Co-authored-by: ghreprimand <203024559+ghreprimand@users.noreply.github.com> --- src/task_scheduler.py | 102 +++++++++++---------- tests/test_scheduler_restart_doublefire.py | 25 +++-- 2 files changed, 72 insertions(+), 55 deletions(-) diff --git a/src/task_scheduler.py b/src/task_scheduler.py index f80c8a1..67bb02a 100644 --- a/src/task_scheduler.py +++ b/src/task_scheduler.py @@ -6,12 +6,17 @@ import logging import re import time import uuid -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Any, Awaitable, Callable, Dict, Tuple logger = logging.getLogger(__name__) +def _utcnow() -> datetime: + """Return naive UTC for task DB fields without using deprecated APIs.""" + return datetime.now(timezone.utc).replace(tzinfo=None) + + # ── Shared TTL cache (singleflight) ──────────────────────────────────────── # Multiple scheduled tasks firing in the same minute often need the same # external data (Miniflux unreads, MCP tool snapshots, etc.). This cache @@ -73,7 +78,6 @@ def compute_next_run(schedule: str, scheduled_time: str, the legacy behavior (`scheduled_time` interpreted as naive-UTC wall clock) is preserved so existing tasks don't shift. """ - from datetime import timezone try: from zoneinfo import ZoneInfo except ImportError: @@ -89,12 +93,12 @@ def compute_next_run(schedule: str, scheduled_time: str, # "now" used for comparisons. When tz is set we work entirely in local tz # and convert to UTC at the end. Otherwise we use naive UTC (legacy). if tz is not None: - now_utc = after or datetime.utcnow() + now_utc = after or _utcnow() if now_utc.tzinfo is None: now_utc = now_utc.replace(tzinfo=timezone.utc) now = now_utc.astimezone(tz) else: - now = after or datetime.utcnow() + now = after or _utcnow() def _to_utc_naive(dt: datetime) -> datetime: """Convert a tz-aware datetime to naive UTC for DB storage.""" @@ -284,7 +288,7 @@ class TaskScheduler: run.status = "aborted" run.error = message run.result = run.result or message - run.finished_at = datetime.utcnow() + run.finished_at = _utcnow() db.commit() return True finally: @@ -305,7 +309,7 @@ class TaskScheduler: "task_id": task_id, "owner": owner, "body": (body[:500] + "…") if body and len(body) > 500 else body, - "timestamp": datetime.utcnow().isoformat() + "Z", + "timestamp": _utcnow().isoformat() + "Z", }) # Cap at 50 to avoid unbounded growth if len(self._pending_notifications) > 50: @@ -351,7 +355,7 @@ class TaskScheduler: TaskRun.status.in_(("running", "queued")) ).all() if stale: - now = datetime.utcnow() + now = _utcnow() for r in stale: old_status = r.status or "running" r.status = "aborted" @@ -372,7 +376,7 @@ class TaskScheduler: from core.database import SessionLocal as _SL, ScheduledTask as _ST db = _SL() try: - now = datetime.utcnow() + now = _utcnow() overdue = db.query(_ST).filter( _ST.status == "active", _ST.next_run.isnot(None), @@ -574,7 +578,7 @@ class TaskScheduler: _ST.next_run.isnot(None), ).order_by(_ST.next_run.asc()).first() if next_run and next_run[0]: - delta = (next_run[0] - datetime.utcnow()).total_seconds() + delta = (next_run[0] - _utcnow()).total_seconds() sleep_for = max(1.0, min(60.0, delta)) finally: _db.close() @@ -586,7 +590,7 @@ class TaskScheduler: from core.database import SessionLocal, ScheduledTask db = SessionLocal() try: - now = datetime.utcnow() + now = _utcnow() async with self._executing_lock: # Snapshot under the lock so we don't race with mid-iteration adds. executing_snapshot = set(self._executing) @@ -622,7 +626,7 @@ class TaskScheduler: run = TaskRun( id=run_id, task_id=task_id, - started_at=datetime.utcnow(), + started_at=_utcnow(), status="queued", result="Queued — waiting for a free slot…", ) @@ -665,7 +669,7 @@ class TaskScheduler: stale = db.query(TaskRun).filter(TaskRun.id == run_id).first() if stale and stale.status == "queued": stale.status = "skipped" - stale.finished_at = datetime.utcnow() + stale.finished_at = _utcnow() stale.error = f"Task no longer active (status={task.status if task else 'deleted'})" db.commit() return @@ -676,7 +680,7 @@ class TaskScheduler: run = db.query(TaskRun).filter(TaskRun.id == run_id).first() if run: run.status = "running" - run.started_at = datetime.utcnow() + run.started_at = _utcnow() run.result = "Starting…" db.commit() else: @@ -685,7 +689,7 @@ class TaskScheduler: run = TaskRun( id=run_id, task_id=task.id, - started_at=datetime.utcnow(), + started_at=_utcnow(), status="running", result="Starting…", ) @@ -727,7 +731,7 @@ class TaskScheduler: delay_seconds = int(getattr(defer, "delay_seconds", 20 * 60) or (20 * 60)) if count > 2: delay_seconds = max(delay_seconds, 40 * 60) - when = datetime.utcnow() + timedelta(seconds=delay_seconds) + when = _utcnow() + timedelta(seconds=delay_seconds) logger.info( "Task '%s' deferred for %ss after %s quiet-window hit(s): %s", task.name, delay_seconds, count, defer, @@ -745,13 +749,13 @@ class TaskScheduler: run_obj.status = "aborted" run_obj.error = "Stopped by user" run_obj.result = run_obj.result or "Stopped by user" - run_obj.finished_at = datetime.utcnow() - task.last_run = datetime.utcnow() + run_obj.finished_at = _utcnow() + task.last_run = _utcnow() if (task.trigger_type or "schedule") == "schedule": task.next_run = compute_next_run( task.schedule, task.scheduled_time, task.scheduled_day, task.scheduled_date, - after=datetime.utcnow(), + after=_utcnow(), cron_expression=task.cron_expression, tz_name=_resolve_task_timezone(db, task), ) @@ -768,13 +772,13 @@ class TaskScheduler: logger.info(f"Task '{task.name}' no-op: {noop}") run.status = "skipped" run.result = str(noop) - run.finished_at = datetime.utcnow() - task.last_run = datetime.utcnow() + run.finished_at = _utcnow() + task.last_run = _utcnow() if (task.trigger_type or "schedule") == "schedule": task.next_run = compute_next_run( task.schedule, task.scheduled_time, task.scheduled_day, task.scheduled_date, - after=datetime.utcnow(), + after=_utcnow(), cron_expression=task.cron_expression, tz_name=_resolve_task_timezone(db, task), ) @@ -783,10 +787,10 @@ class TaskScheduler: db.commit() return - run.finished_at = datetime.utcnow() + run.finished_at = _utcnow() # Update task - task.last_run = datetime.utcnow() + task.last_run = _utcnow() task.run_count = (task.run_count or 0) + 1 self._task_defer_counts.pop(task_id, None) @@ -795,7 +799,7 @@ class TaskScheduler: task.next_run = compute_next_run( task.schedule, task.scheduled_time, task.scheduled_day, task.scheduled_date, - after=datetime.utcnow(), + after=_utcnow(), cron_expression=task.cron_expression, tz_name=_resolve_task_timezone(db, task), ) @@ -869,17 +873,17 @@ class TaskScheduler: if run_obj and run_obj.status in ("running", "success"): run_obj.status = "error" run_obj.error = err_text[:2000] - run_obj.finished_at = datetime.utcnow() + run_obj.finished_at = _utcnow() # Advance next_run even on failure so a broken task doesn't # busy-loop the scheduler every tick with a stale past date. task_obj = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() if task_obj and (task_obj.trigger_type or "schedule") == "schedule": - task_obj.last_run = datetime.utcnow() + task_obj.last_run = _utcnow() try: task_obj.next_run = compute_next_run( task_obj.schedule, task_obj.scheduled_time, task_obj.scheduled_day, task_obj.scheduled_date, - after=datetime.utcnow(), + after=_utcnow(), cron_expression=task_obj.cron_expression, tz_name=_resolve_task_timezone(db, task_obj), ) @@ -904,13 +908,13 @@ class TaskScheduler: if _r and _r.status in ("running", "queued"): _r.status = "aborted" _r.error = f"commit_failed: {type(commit_err).__name__}: {commit_err}"[:2000] - _r.finished_at = datetime.utcnow() + _r.finished_at = _utcnow() _t = _recover_db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first() if _t and (_t.trigger_type or "schedule") == "schedule": # Push next_run forward 5min as a safe stall so the # scheduler doesn't immediately re-dispatch. - _t.next_run = datetime.utcnow() + _td(minutes=5) - _t.last_run = datetime.utcnow() + _t.next_run = _utcnow() + _td(minutes=5) + _t.last_run = _utcnow() _recover_db.commit() except Exception as recover_err: logger.error("Task %s recovery commit ALSO failed: %s", task_id, recover_err) @@ -1087,14 +1091,14 @@ class TaskScheduler: if tz_name: from zoneinfo import ZoneInfo from datetime import timezone, timedelta - now = datetime.utcnow().replace(tzinfo=timezone.utc).astimezone(ZoneInfo(tz_name)) + now = _utcnow().replace(tzinfo=timezone.utc).astimezone(ZoneInfo(tz_name)) else: from datetime import timedelta - now = datetime.utcnow() + now = _utcnow() time_str = now.strftime("%A, %B %d %Y, %H:%M") except Exception: from datetime import timedelta - now = datetime.utcnow() + now = _utcnow() time_str = now.strftime("%H:%M UTC") raw = {} @@ -1297,8 +1301,8 @@ class TaskScheduler: endpoint_url=endpoint_url, model=model, owner=task.owner, - created_at=datetime.utcnow(), - updated_at=datetime.utcnow(), + created_at=_utcnow(), + updated_at=_utcnow(), ) db.add(sess) task.session_id = session_id @@ -1327,12 +1331,12 @@ class TaskScheduler: if tz_name: from zoneinfo import ZoneInfo from datetime import timezone - now_local = datetime.utcnow().replace(tzinfo=timezone.utc).astimezone(ZoneInfo(tz_name)) + now_local = _utcnow().replace(tzinfo=timezone.utc).astimezone(ZoneInfo(tz_name)) time_str = now_local.strftime("%A, %B %d %Y, %H:%M %Z") else: - time_str = datetime.utcnow().strftime("%A, %B %d %Y, %H:%M UTC") + time_str = _utcnow().strftime("%A, %B %d %Y, %H:%M UTC") except Exception: - time_str = datetime.utcnow().strftime("%A, %B %d %Y, %H:%M UTC") + time_str = _utcnow().strftime("%A, %B %d %Y, %H:%M UTC") system_prompt = f"Current time: {time_str}\n\n{system_prompt}" # Compute tool filter from CrewMember.enabled_tools if set @@ -1439,8 +1443,8 @@ class TaskScheduler: endpoint_url=endpoint_url or "", model=model_name or "", owner=task.owner, - created_at=datetime.utcnow(), - updated_at=datetime.utcnow(), + created_at=_utcnow(), + updated_at=_utcnow(), ) db.add(sess) task.session_id = session_id @@ -1463,7 +1467,7 @@ class TaskScheduler: session_id=session_id, role="user", content=user_content, - timestamp=datetime.utcnow(), + timestamp=_utcnow(), meta_data=msg_meta, ) assistant_msg = ChatMessage( @@ -1471,7 +1475,7 @@ class TaskScheduler: session_id=session_id, role="assistant", content=result or "", - timestamp=datetime.utcnow(), + timestamp=_utcnow(), meta_data=msg_meta, ) db.add(user_msg) @@ -1719,8 +1723,8 @@ class TaskScheduler: endpoint_url=endpoint_url, model=model, owner=task.owner, - created_at=datetime.utcnow(), - updated_at=datetime.utcnow(), + created_at=_utcnow(), + updated_at=_utcnow(), ) db.add(sess) task.session_id = session_id @@ -2001,7 +2005,7 @@ class TaskScheduler: task.cron_expression = defs["cron_expression"] task.next_run = compute_next_run( defs["schedule"], defs["scheduled_time"], None, None, - after=datetime.utcnow(), cron_expression=defs["cron_expression"], + after=_utcnow(), cron_expression=defs["cron_expression"], tz_name=_resolve_task_timezone(db, task), ) normalized = True @@ -2036,7 +2040,7 @@ class TaskScheduler: task.next_run = compute_next_run( task.schedule, task.scheduled_time, task.scheduled_day, task.scheduled_date, - after=datetime.utcnow(), cron_expression=task.cron_expression, + after=_utcnow(), cron_expression=task.cron_expression, tz_name=_resolve_task_timezone(db, task), ) # Built-in housekeeping/action jobs should not create browser @@ -2051,7 +2055,7 @@ class TaskScheduler: if trigger_type == "schedule": next_run = compute_next_run( defs["schedule"], defs["scheduled_time"], None, None, - after=datetime.utcnow(), cron_expression=defs["cron_expression"], + after=_utcnow(), cron_expression=defs["cron_expression"], ) ships_paused = bool(defs.get("ship_paused")) task = ScheduledTask( @@ -2188,8 +2192,8 @@ class TaskScheduler: is_important=True, mode="agent", folder="Assistant", - created_at=datetime.utcnow(), - updated_at=datetime.utcnow(), + created_at=_utcnow(), + updated_at=_utcnow(), ) db.add(sess) db.flush() diff --git a/tests/test_scheduler_restart_doublefire.py b/tests/test_scheduler_restart_doublefire.py index fccb176..9f0c873 100644 --- a/tests/test_scheduler_restart_doublefire.py +++ b/tests/test_scheduler_restart_doublefire.py @@ -11,12 +11,16 @@ test asserts the opposite: the task fires at most once across two consecutive polls. """ import sys, types, asyncio -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock from sqlalchemy import create_engine, Column, String, DateTime, Integer, Boolean, Text from sqlalchemy.orm import sessionmaker, declarative_base +def _test_utcnow(): + return datetime.now(timezone.utc).replace(tzinfo=None) + + def _stub_heavy(): for name in [ "src.builtin_actions", "src.ai_interaction", "src.endpoint_resolver", @@ -59,6 +63,15 @@ def _setup_isolated_db(): return cd, ScheduledTask, TaskRun +def test_scheduler_utcnow_preserves_naive_utc_contract(): + from src.task_scheduler import _utcnow + + now = _utcnow() + + assert now.tzinfo is None + assert abs((now - _test_utcnow()).total_seconds()) < 2 + + def _drive_scheduler(monkeypatch, pre_start_setup=None): """Build a TaskScheduler bypassing __init__ and run start() + two polls.""" _stub_heavy() @@ -115,7 +128,7 @@ def test_restart_does_not_re_dispatch_overdue_task(monkeypatch): db.add(ScheduledTask( id="t_due_1", owner="alice", name="overdue", task_type="llm", - next_run=datetime.utcnow() - timedelta(hours=1), + next_run=_test_utcnow() - timedelta(hours=1), status="active", )) db.commit() @@ -126,7 +139,7 @@ def test_restart_does_not_re_dispatch_overdue_task(monkeypatch): db = cd.SessionLocal() t = db.query(ScheduledTask).filter(ScheduledTask.id == "t_due_1").first() db.close() - assert t.next_run >= datetime.utcnow() - timedelta(seconds=1), ( + assert t.next_run >= _test_utcnow() - timedelta(seconds=1), ( f"After start(), next_run should have been pushed into the future; " f"got {t.next_run}" ) @@ -140,7 +153,7 @@ def test_restart_does_not_re_dispatch_overdue_task(monkeypatch): def test_startup_does_not_advance_fresh_tasks(monkeypatch): """Tasks whose next_run is in the future must be untouched by the startup sweep — only overdue ones get pushed forward.""" - future = datetime.utcnow() + timedelta(hours=2) + future = _test_utcnow() + timedelta(hours=2) def _setup(cd, ScheduledTask, TaskRun): db = cd.SessionLocal() db.add(ScheduledTask( @@ -169,7 +182,7 @@ def test_startup_does_not_advance_paused_tasks(monkeypatch): db.add(ScheduledTask( id="t_paused", owner="alice", name="paused", task_type="llm", - next_run=datetime.utcnow() - timedelta(hours=1), + next_run=_test_utcnow() - timedelta(hours=1), status="paused", )) db.commit() @@ -183,7 +196,7 @@ def test_startup_does_not_advance_paused_tasks(monkeypatch): # The stored next_run should still be ~1h in the past (the startup sweep # only advances active overdue tasks; a paused task with an old next_run # is left alone). Allow a small delta to absorb the time the sweep took. - one_hour_ago = datetime.utcnow() - timedelta(hours=1) + one_hour_ago = _test_utcnow() - timedelta(hours=1) assert abs((t.next_run - one_hour_ago).total_seconds()) < 5, ( f"Paused task's next_run was modified: " f"expected ~{one_hour_ago}, got {t.next_run}"