Replace task scheduler utcnow calls (#1456)

Co-authored-by: ghreprimand <203024559+ghreprimand@users.noreply.github.com>
This commit is contained in:
ghreprimand
2026-06-03 00:14:30 -05:00
committed by GitHub
parent 4f03f5ccdd
commit 41d2767b30
2 changed files with 72 additions and 55 deletions

View File

@@ -6,12 +6,17 @@ import logging
import re
import time
import uuid
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import Any, Awaitable, Callable, Dict, Tuple
logger = logging.getLogger(__name__)
def _utcnow() -> datetime:
"""Return naive UTC for task DB fields without using deprecated APIs."""
return datetime.now(timezone.utc).replace(tzinfo=None)
# ── Shared TTL cache (singleflight) ────────────────────────────────────────
# Multiple scheduled tasks firing in the same minute often need the same
# external data (Miniflux unreads, MCP tool snapshots, etc.). This cache
@@ -73,7 +78,6 @@ def compute_next_run(schedule: str, scheduled_time: str,
the legacy behavior (`scheduled_time` interpreted as naive-UTC wall clock)
is preserved so existing tasks don't shift.
"""
from datetime import timezone
try:
from zoneinfo import ZoneInfo
except ImportError:
@@ -89,12 +93,12 @@ def compute_next_run(schedule: str, scheduled_time: str,
# "now" used for comparisons. When tz is set we work entirely in local tz
# and convert to UTC at the end. Otherwise we use naive UTC (legacy).
if tz is not None:
now_utc = after or datetime.utcnow()
now_utc = after or _utcnow()
if now_utc.tzinfo is None:
now_utc = now_utc.replace(tzinfo=timezone.utc)
now = now_utc.astimezone(tz)
else:
now = after or datetime.utcnow()
now = after or _utcnow()
def _to_utc_naive(dt: datetime) -> datetime:
"""Convert a tz-aware datetime to naive UTC for DB storage."""
@@ -284,7 +288,7 @@ class TaskScheduler:
run.status = "aborted"
run.error = message
run.result = run.result or message
run.finished_at = datetime.utcnow()
run.finished_at = _utcnow()
db.commit()
return True
finally:
@@ -305,7 +309,7 @@ class TaskScheduler:
"task_id": task_id,
"owner": owner,
"body": (body[:500] + "") if body and len(body) > 500 else body,
"timestamp": datetime.utcnow().isoformat() + "Z",
"timestamp": _utcnow().isoformat() + "Z",
})
# Cap at 50 to avoid unbounded growth
if len(self._pending_notifications) > 50:
@@ -351,7 +355,7 @@ class TaskScheduler:
TaskRun.status.in_(("running", "queued"))
).all()
if stale:
now = datetime.utcnow()
now = _utcnow()
for r in stale:
old_status = r.status or "running"
r.status = "aborted"
@@ -372,7 +376,7 @@ class TaskScheduler:
from core.database import SessionLocal as _SL, ScheduledTask as _ST
db = _SL()
try:
now = datetime.utcnow()
now = _utcnow()
overdue = db.query(_ST).filter(
_ST.status == "active",
_ST.next_run.isnot(None),
@@ -574,7 +578,7 @@ class TaskScheduler:
_ST.next_run.isnot(None),
).order_by(_ST.next_run.asc()).first()
if next_run and next_run[0]:
delta = (next_run[0] - datetime.utcnow()).total_seconds()
delta = (next_run[0] - _utcnow()).total_seconds()
sleep_for = max(1.0, min(60.0, delta))
finally:
_db.close()
@@ -586,7 +590,7 @@ class TaskScheduler:
from core.database import SessionLocal, ScheduledTask
db = SessionLocal()
try:
now = datetime.utcnow()
now = _utcnow()
async with self._executing_lock:
# Snapshot under the lock so we don't race with mid-iteration adds.
executing_snapshot = set(self._executing)
@@ -622,7 +626,7 @@ class TaskScheduler:
run = TaskRun(
id=run_id,
task_id=task_id,
started_at=datetime.utcnow(),
started_at=_utcnow(),
status="queued",
result="Queued — waiting for a free slot…",
)
@@ -665,7 +669,7 @@ class TaskScheduler:
stale = db.query(TaskRun).filter(TaskRun.id == run_id).first()
if stale and stale.status == "queued":
stale.status = "skipped"
stale.finished_at = datetime.utcnow()
stale.finished_at = _utcnow()
stale.error = f"Task no longer active (status={task.status if task else 'deleted'})"
db.commit()
return
@@ -676,7 +680,7 @@ class TaskScheduler:
run = db.query(TaskRun).filter(TaskRun.id == run_id).first()
if run:
run.status = "running"
run.started_at = datetime.utcnow()
run.started_at = _utcnow()
run.result = "Starting…"
db.commit()
else:
@@ -685,7 +689,7 @@ class TaskScheduler:
run = TaskRun(
id=run_id,
task_id=task.id,
started_at=datetime.utcnow(),
started_at=_utcnow(),
status="running",
result="Starting…",
)
@@ -727,7 +731,7 @@ class TaskScheduler:
delay_seconds = int(getattr(defer, "delay_seconds", 20 * 60) or (20 * 60))
if count > 2:
delay_seconds = max(delay_seconds, 40 * 60)
when = datetime.utcnow() + timedelta(seconds=delay_seconds)
when = _utcnow() + timedelta(seconds=delay_seconds)
logger.info(
"Task '%s' deferred for %ss after %s quiet-window hit(s): %s",
task.name, delay_seconds, count, defer,
@@ -745,13 +749,13 @@ class TaskScheduler:
run_obj.status = "aborted"
run_obj.error = "Stopped by user"
run_obj.result = run_obj.result or "Stopped by user"
run_obj.finished_at = datetime.utcnow()
task.last_run = datetime.utcnow()
run_obj.finished_at = _utcnow()
task.last_run = _utcnow()
if (task.trigger_type or "schedule") == "schedule":
task.next_run = compute_next_run(
task.schedule, task.scheduled_time,
task.scheduled_day, task.scheduled_date,
after=datetime.utcnow(),
after=_utcnow(),
cron_expression=task.cron_expression,
tz_name=_resolve_task_timezone(db, task),
)
@@ -768,13 +772,13 @@ class TaskScheduler:
logger.info(f"Task '{task.name}' no-op: {noop}")
run.status = "skipped"
run.result = str(noop)
run.finished_at = datetime.utcnow()
task.last_run = datetime.utcnow()
run.finished_at = _utcnow()
task.last_run = _utcnow()
if (task.trigger_type or "schedule") == "schedule":
task.next_run = compute_next_run(
task.schedule, task.scheduled_time,
task.scheduled_day, task.scheduled_date,
after=datetime.utcnow(),
after=_utcnow(),
cron_expression=task.cron_expression,
tz_name=_resolve_task_timezone(db, task),
)
@@ -783,10 +787,10 @@ class TaskScheduler:
db.commit()
return
run.finished_at = datetime.utcnow()
run.finished_at = _utcnow()
# Update task
task.last_run = datetime.utcnow()
task.last_run = _utcnow()
task.run_count = (task.run_count or 0) + 1
self._task_defer_counts.pop(task_id, None)
@@ -795,7 +799,7 @@ class TaskScheduler:
task.next_run = compute_next_run(
task.schedule, task.scheduled_time,
task.scheduled_day, task.scheduled_date,
after=datetime.utcnow(),
after=_utcnow(),
cron_expression=task.cron_expression,
tz_name=_resolve_task_timezone(db, task),
)
@@ -869,17 +873,17 @@ class TaskScheduler:
if run_obj and run_obj.status in ("running", "success"):
run_obj.status = "error"
run_obj.error = err_text[:2000]
run_obj.finished_at = datetime.utcnow()
run_obj.finished_at = _utcnow()
# Advance next_run even on failure so a broken task doesn't
# busy-loop the scheduler every tick with a stale past date.
task_obj = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
if task_obj and (task_obj.trigger_type or "schedule") == "schedule":
task_obj.last_run = datetime.utcnow()
task_obj.last_run = _utcnow()
try:
task_obj.next_run = compute_next_run(
task_obj.schedule, task_obj.scheduled_time,
task_obj.scheduled_day, task_obj.scheduled_date,
after=datetime.utcnow(),
after=_utcnow(),
cron_expression=task_obj.cron_expression,
tz_name=_resolve_task_timezone(db, task_obj),
)
@@ -904,13 +908,13 @@ class TaskScheduler:
if _r and _r.status in ("running", "queued"):
_r.status = "aborted"
_r.error = f"commit_failed: {type(commit_err).__name__}: {commit_err}"[:2000]
_r.finished_at = datetime.utcnow()
_r.finished_at = _utcnow()
_t = _recover_db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
if _t and (_t.trigger_type or "schedule") == "schedule":
# Push next_run forward 5min as a safe stall so the
# scheduler doesn't immediately re-dispatch.
_t.next_run = datetime.utcnow() + _td(minutes=5)
_t.last_run = datetime.utcnow()
_t.next_run = _utcnow() + _td(minutes=5)
_t.last_run = _utcnow()
_recover_db.commit()
except Exception as recover_err:
logger.error("Task %s recovery commit ALSO failed: %s", task_id, recover_err)
@@ -1087,14 +1091,14 @@ class TaskScheduler:
if tz_name:
from zoneinfo import ZoneInfo
from datetime import timezone, timedelta
now = datetime.utcnow().replace(tzinfo=timezone.utc).astimezone(ZoneInfo(tz_name))
now = _utcnow().replace(tzinfo=timezone.utc).astimezone(ZoneInfo(tz_name))
else:
from datetime import timedelta
now = datetime.utcnow()
now = _utcnow()
time_str = now.strftime("%A, %B %d %Y, %H:%M")
except Exception:
from datetime import timedelta
now = datetime.utcnow()
now = _utcnow()
time_str = now.strftime("%H:%M UTC")
raw = {}
@@ -1297,8 +1301,8 @@ class TaskScheduler:
endpoint_url=endpoint_url,
model=model,
owner=task.owner,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
created_at=_utcnow(),
updated_at=_utcnow(),
)
db.add(sess)
task.session_id = session_id
@@ -1327,12 +1331,12 @@ class TaskScheduler:
if tz_name:
from zoneinfo import ZoneInfo
from datetime import timezone
now_local = datetime.utcnow().replace(tzinfo=timezone.utc).astimezone(ZoneInfo(tz_name))
now_local = _utcnow().replace(tzinfo=timezone.utc).astimezone(ZoneInfo(tz_name))
time_str = now_local.strftime("%A, %B %d %Y, %H:%M %Z")
else:
time_str = datetime.utcnow().strftime("%A, %B %d %Y, %H:%M UTC")
time_str = _utcnow().strftime("%A, %B %d %Y, %H:%M UTC")
except Exception:
time_str = datetime.utcnow().strftime("%A, %B %d %Y, %H:%M UTC")
time_str = _utcnow().strftime("%A, %B %d %Y, %H:%M UTC")
system_prompt = f"Current time: {time_str}\n\n{system_prompt}"
# Compute tool filter from CrewMember.enabled_tools if set
@@ -1439,8 +1443,8 @@ class TaskScheduler:
endpoint_url=endpoint_url or "",
model=model_name or "",
owner=task.owner,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
created_at=_utcnow(),
updated_at=_utcnow(),
)
db.add(sess)
task.session_id = session_id
@@ -1463,7 +1467,7 @@ class TaskScheduler:
session_id=session_id,
role="user",
content=user_content,
timestamp=datetime.utcnow(),
timestamp=_utcnow(),
meta_data=msg_meta,
)
assistant_msg = ChatMessage(
@@ -1471,7 +1475,7 @@ class TaskScheduler:
session_id=session_id,
role="assistant",
content=result or "",
timestamp=datetime.utcnow(),
timestamp=_utcnow(),
meta_data=msg_meta,
)
db.add(user_msg)
@@ -1719,8 +1723,8 @@ class TaskScheduler:
endpoint_url=endpoint_url,
model=model,
owner=task.owner,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
created_at=_utcnow(),
updated_at=_utcnow(),
)
db.add(sess)
task.session_id = session_id
@@ -2001,7 +2005,7 @@ class TaskScheduler:
task.cron_expression = defs["cron_expression"]
task.next_run = compute_next_run(
defs["schedule"], defs["scheduled_time"], None, None,
after=datetime.utcnow(), cron_expression=defs["cron_expression"],
after=_utcnow(), cron_expression=defs["cron_expression"],
tz_name=_resolve_task_timezone(db, task),
)
normalized = True
@@ -2036,7 +2040,7 @@ class TaskScheduler:
task.next_run = compute_next_run(
task.schedule, task.scheduled_time,
task.scheduled_day, task.scheduled_date,
after=datetime.utcnow(), cron_expression=task.cron_expression,
after=_utcnow(), cron_expression=task.cron_expression,
tz_name=_resolve_task_timezone(db, task),
)
# Built-in housekeeping/action jobs should not create browser
@@ -2051,7 +2055,7 @@ class TaskScheduler:
if trigger_type == "schedule":
next_run = compute_next_run(
defs["schedule"], defs["scheduled_time"], None, None,
after=datetime.utcnow(), cron_expression=defs["cron_expression"],
after=_utcnow(), cron_expression=defs["cron_expression"],
)
ships_paused = bool(defs.get("ship_paused"))
task = ScheduledTask(
@@ -2188,8 +2192,8 @@ class TaskScheduler:
is_important=True,
mode="agent",
folder="Assistant",
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
created_at=_utcnow(),
updated_at=_utcnow(),
)
db.add(sess)
db.flush()

View File

@@ -11,12 +11,16 @@ test asserts the opposite: the task fires at most once across two consecutive
polls.
"""
import sys, types, asyncio
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock
from sqlalchemy import create_engine, Column, String, DateTime, Integer, Boolean, Text
from sqlalchemy.orm import sessionmaker, declarative_base
def _test_utcnow():
return datetime.now(timezone.utc).replace(tzinfo=None)
def _stub_heavy():
for name in [
"src.builtin_actions", "src.ai_interaction", "src.endpoint_resolver",
@@ -59,6 +63,15 @@ def _setup_isolated_db():
return cd, ScheduledTask, TaskRun
def test_scheduler_utcnow_preserves_naive_utc_contract():
from src.task_scheduler import _utcnow
now = _utcnow()
assert now.tzinfo is None
assert abs((now - _test_utcnow()).total_seconds()) < 2
def _drive_scheduler(monkeypatch, pre_start_setup=None):
"""Build a TaskScheduler bypassing __init__ and run start() + two polls."""
_stub_heavy()
@@ -115,7 +128,7 @@ def test_restart_does_not_re_dispatch_overdue_task(monkeypatch):
db.add(ScheduledTask(
id="t_due_1", owner="alice", name="overdue",
task_type="llm",
next_run=datetime.utcnow() - timedelta(hours=1),
next_run=_test_utcnow() - timedelta(hours=1),
status="active",
))
db.commit()
@@ -126,7 +139,7 @@ def test_restart_does_not_re_dispatch_overdue_task(monkeypatch):
db = cd.SessionLocal()
t = db.query(ScheduledTask).filter(ScheduledTask.id == "t_due_1").first()
db.close()
assert t.next_run >= datetime.utcnow() - timedelta(seconds=1), (
assert t.next_run >= _test_utcnow() - timedelta(seconds=1), (
f"After start(), next_run should have been pushed into the future; "
f"got {t.next_run}"
)
@@ -140,7 +153,7 @@ def test_restart_does_not_re_dispatch_overdue_task(monkeypatch):
def test_startup_does_not_advance_fresh_tasks(monkeypatch):
"""Tasks whose next_run is in the future must be untouched by the startup
sweep — only overdue ones get pushed forward."""
future = datetime.utcnow() + timedelta(hours=2)
future = _test_utcnow() + timedelta(hours=2)
def _setup(cd, ScheduledTask, TaskRun):
db = cd.SessionLocal()
db.add(ScheduledTask(
@@ -169,7 +182,7 @@ def test_startup_does_not_advance_paused_tasks(monkeypatch):
db.add(ScheduledTask(
id="t_paused", owner="alice", name="paused",
task_type="llm",
next_run=datetime.utcnow() - timedelta(hours=1),
next_run=_test_utcnow() - timedelta(hours=1),
status="paused",
))
db.commit()
@@ -183,7 +196,7 @@ def test_startup_does_not_advance_paused_tasks(monkeypatch):
# The stored next_run should still be ~1h in the past (the startup sweep
# only advances active overdue tasks; a paused task with an old next_run
# is left alone). Allow a small delta to absorb the time the sweep took.
one_hour_ago = datetime.utcnow() - timedelta(hours=1)
one_hour_ago = _test_utcnow() - timedelta(hours=1)
assert abs((t.next_run - one_hour_ago).total_seconds()) < 5, (
f"Paused task's next_run was modified: "
f"expected ~{one_hour_ago}, got {t.next_run}"