When a timezone is configured, `now` is tz-aware local time. The comparison stripped tzinfo with `.replace(tzinfo=None)`, producing naive local time, but `scheduled_date` is stored as naive UTC. For users east of UTC this causes tasks to appear expired prematurely; for users west they linger past due time. Use `_to_utc_naive(now)` to convert to the same reference frame.
2237 lines
104 KiB
Python
2237 lines
104 KiB
Python
"""Background scheduler for ScheduledTask execution."""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import re
|
|
import time
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Awaitable, Callable, Dict, Tuple
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ── Shared TTL cache (singleflight) ────────────────────────────────────────
|
|
# Multiple scheduled tasks firing in the same minute often need the same
|
|
# external data (Miniflux unreads, MCP tool snapshots, etc.). This cache
|
|
# deduplicates those fetches — in-flight requests for the same key await the
|
|
# same underlying coroutine, and completed results are reused until TTL expiry.
|
|
_shared_cache: Dict[Tuple, Tuple[float, Any]] = {}
|
|
_shared_cache_pending: Dict[Tuple, asyncio.Future] = {}
|
|
_shared_cache_lock = asyncio.Lock()
|
|
|
|
|
|
async def _cached(key: Tuple, ttl: float, fetch: Callable[[], Awaitable[Any]]) -> Any:
|
|
"""Return a cached result for `key` if fresh, else call `fetch()` and store.
|
|
|
|
Concurrent callers for the same missing key share one `fetch()` call.
|
|
Exceptions propagate to every waiter and do not poison the cache.
|
|
"""
|
|
now = time.monotonic()
|
|
async with _shared_cache_lock:
|
|
entry = _shared_cache.get(key)
|
|
if entry and entry[0] > now:
|
|
return entry[1]
|
|
fut = _shared_cache_pending.get(key)
|
|
if fut is not None:
|
|
pending = fut
|
|
owner = False
|
|
else:
|
|
loop = asyncio.get_running_loop()
|
|
fut = loop.create_future()
|
|
_shared_cache_pending[key] = fut
|
|
pending = fut
|
|
owner = True
|
|
if not owner:
|
|
return await pending
|
|
try:
|
|
val = await fetch()
|
|
async with _shared_cache_lock:
|
|
_shared_cache[key] = (time.monotonic() + ttl, val)
|
|
_shared_cache_pending.pop(key, None)
|
|
pending.set_result(val)
|
|
return val
|
|
except Exception as e:
|
|
async with _shared_cache_lock:
|
|
_shared_cache_pending.pop(key, None)
|
|
pending.set_exception(e)
|
|
raise
|
|
|
|
|
|
def compute_next_run(schedule: str, scheduled_time: str,
|
|
scheduled_day: int = None,
|
|
scheduled_date: datetime = None,
|
|
after: datetime = None,
|
|
cron_expression: str = None,
|
|
tz_name: str = None) -> datetime | None:
|
|
"""Compute the next run datetime (stored as naive UTC) based on schedule type.
|
|
|
|
If `tz_name` is provided (IANA zone, e.g. "America/New_York"), `scheduled_time` /
|
|
`scheduled_day` are interpreted as local wall-clock time in that zone and
|
|
the result is converted to naive UTC for DB storage. If `tz_name` is None,
|
|
the legacy behavior (`scheduled_time` interpreted as naive-UTC wall clock)
|
|
is preserved so existing tasks don't shift.
|
|
"""
|
|
from datetime import timezone
|
|
try:
|
|
from zoneinfo import ZoneInfo
|
|
except ImportError:
|
|
ZoneInfo = None
|
|
|
|
tz = None
|
|
if tz_name and ZoneInfo is not None:
|
|
try:
|
|
tz = ZoneInfo(tz_name)
|
|
except Exception:
|
|
tz = None
|
|
|
|
# "now" used for comparisons. When tz is set we work entirely in local tz
|
|
# and convert to UTC at the end. Otherwise we use naive UTC (legacy).
|
|
if tz is not None:
|
|
now_utc = after or datetime.utcnow()
|
|
if now_utc.tzinfo is None:
|
|
now_utc = now_utc.replace(tzinfo=timezone.utc)
|
|
now = now_utc.astimezone(tz)
|
|
else:
|
|
now = after or datetime.utcnow()
|
|
|
|
def _to_utc_naive(dt: datetime) -> datetime:
|
|
"""Convert a tz-aware datetime to naive UTC for DB storage."""
|
|
if dt.tzinfo is None:
|
|
return dt
|
|
return dt.astimezone(timezone.utc).replace(tzinfo=None)
|
|
|
|
if schedule == "cron" and cron_expression:
|
|
try:
|
|
from croniter import croniter
|
|
cron = croniter(cron_expression, now)
|
|
nxt = cron.get_next(datetime)
|
|
if tz is not None and nxt.tzinfo is None:
|
|
nxt = nxt.replace(tzinfo=tz)
|
|
return _to_utc_naive(nxt) if tz is not None else nxt
|
|
except Exception as e:
|
|
logger.warning(f"Invalid cron expression '{cron_expression}': {e}")
|
|
return None
|
|
|
|
if schedule == "once":
|
|
if scheduled_date and scheduled_date > (_to_utc_naive(now) if tz is not None else now):
|
|
return scheduled_date
|
|
return None
|
|
|
|
if not scheduled_time:
|
|
return None
|
|
|
|
# Parse HH:MM
|
|
parts = scheduled_time.split(":")
|
|
hour, minute = int(parts[0]), int(parts[1])
|
|
|
|
if schedule == "daily":
|
|
candidate = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
|
|
if candidate <= now:
|
|
candidate += timedelta(days=1)
|
|
return _to_utc_naive(candidate) if tz is not None else candidate
|
|
|
|
if schedule == "weekly":
|
|
day = scheduled_day if scheduled_day is not None else 0 # 0=Monday
|
|
candidate = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
|
|
days_ahead = day - candidate.weekday()
|
|
if days_ahead < 0 or (days_ahead == 0 and candidate <= now):
|
|
days_ahead += 7
|
|
candidate += timedelta(days=days_ahead)
|
|
return _to_utc_naive(candidate) if tz is not None else candidate
|
|
|
|
if schedule == "monthly":
|
|
day = scheduled_day if scheduled_day is not None else 1
|
|
try:
|
|
candidate = now.replace(day=day, hour=hour, minute=minute, second=0, microsecond=0)
|
|
except ValueError:
|
|
candidate = now
|
|
if candidate <= now:
|
|
if now.month == 12:
|
|
next_month = now.replace(year=now.year + 1, month=1, day=1)
|
|
else:
|
|
next_month = now.replace(month=now.month + 1, day=1)
|
|
try:
|
|
candidate = next_month.replace(day=day, hour=hour, minute=minute, second=0, microsecond=0)
|
|
except ValueError:
|
|
if next_month.month == 12:
|
|
last = next_month.replace(year=next_month.year + 1, month=1, day=1) - timedelta(days=1)
|
|
else:
|
|
last = next_month.replace(month=next_month.month + 1, day=1) - timedelta(days=1)
|
|
candidate = last.replace(hour=hour, minute=minute, second=0, microsecond=0)
|
|
return _to_utc_naive(candidate) if tz is not None else candidate
|
|
|
|
return None
|
|
|
|
|
|
def _resolve_task_timezone(db, task) -> str | None:
|
|
"""Look up the IANA timezone name for a task via its linked CrewMember, if any."""
|
|
if not getattr(task, "crew_member_id", None):
|
|
return None
|
|
try:
|
|
from core.database import CrewMember
|
|
cm = db.query(CrewMember).filter(CrewMember.id == task.crew_member_id).first()
|
|
if cm and cm.timezone:
|
|
return cm.timezone
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
# Built-in "housekeeping" tasks seeded for every owner, keyed by action.
|
|
# These are the canonical defaults — used both to seed and to revert a
|
|
# built-in task the user has altered. schedule "daily" uses scheduled_time;
|
|
# "cron" uses cron_expression.
|
|
HOUSEKEEPING_DEFAULTS = {
|
|
"tidy_sessions": {"name": "Chat Sessions Tidy", "trigger_type": "event", "trigger_event": "session_created", "trigger_count": 5, "schedule": None, "scheduled_time": None, "cron_expression": None, "legacy_names": ["Tidy Chat Sessions"]},
|
|
"tidy_documents": {"name": "Documents Tidy", "trigger_type": "event", "trigger_event": "document_created", "trigger_count": 5, "schedule": None, "scheduled_time": None, "cron_expression": None, "legacy_names": ["Tidy Documents"]},
|
|
"consolidate_memory": {"name": "Memory Tidy", "trigger_type": "event", "trigger_event": "memory_added", "trigger_count": 5, "schedule": None, "scheduled_time": None, "cron_expression": None, "legacy_names": ["Tidy Memory"]},
|
|
"tidy_research": {"name": "Research Tidy", "trigger_type": "event", "trigger_event": "research_completed", "trigger_count": 5, "schedule": None, "scheduled_time": None, "cron_expression": None, "legacy_names": ["Tidy Research"]},
|
|
"summarize_emails": {"name": "Email (Summary)", "schedule": "cron", "scheduled_time": None, "cron_expression": "0 */2 * * *", "ship_paused": True, "legacy_names": ["Tidy Email (Summary)"]},
|
|
"draft_email_replies": {"name": "Email AI Auto Reply", "schedule": "cron", "scheduled_time": None, "cron_expression": "0 */2 * * *", "ship_paused": True, "legacy_names": ["Tidy Email (Replies)", "AI Auto Reply"]},
|
|
"extract_email_events": {"name": "Email Calendar Events", "schedule": "cron", "scheduled_time": None, "cron_expression": "0 */1 * * *", "ship_paused": True, "legacy_names": ["Email → Calendar Events"]},
|
|
"classify_events": {"name": "Calendar Classify Events", "schedule": "cron", "scheduled_time": None, "cron_expression": "0 6,18 * * *", "ship_paused": True, "legacy_names": ["Classify Calendar Events"]},
|
|
"mark_email_boundaries": {"name": "Email Mark Boundaries", "schedule": "cron", "scheduled_time": None, "cron_expression": "0 */2 * * *", "ship_paused": True, "legacy_names": ["Mark Email Boundaries"]},
|
|
"check_email_urgency": {"name": "Email Tags", "schedule": "cron", "scheduled_time": None, "cron_expression": "0 * * * *", "ship_paused": True, "old_cron_expressions": ["*/15 * * * *"], "legacy_names": ["Email Triage", "Urgent Email"]},
|
|
"audit_skills": {"name": "Skills Audit", "trigger_type": "event", "trigger_event": "skill_added", "trigger_count": 5, "schedule": None, "scheduled_time": None, "cron_expression": None, "legacy_names": ["Audit Skills"]},
|
|
}
|
|
|
|
RETIRED_HOUSEKEEPING_ACTIONS = frozenset({
|
|
"tidy_calendar",
|
|
"tidy_email_inbox",
|
|
})
|
|
|
|
|
|
def _digest_windows(now):
|
|
"""(label, start, end) buckets for the calendar check-in digest.
|
|
|
|
The windows are contiguous so no event is dropped between buckets — an
|
|
earlier version started the 30-day window at now+8d while the week window
|
|
ended at now+7d, so events ~7-8 days out fell into no bucket.
|
|
"""
|
|
return [
|
|
("today_tomorrow", now, now + timedelta(days=2)),
|
|
("this_week", now + timedelta(days=2), now + timedelta(days=7)),
|
|
("next_30_days", now + timedelta(days=7), now + timedelta(days=30)),
|
|
]
|
|
|
|
|
|
class TaskScheduler:
|
|
def __init__(self, session_manager):
|
|
self._session_manager = session_manager
|
|
self._running = False
|
|
self._task = None
|
|
self._executing = set() # task IDs currently running OR queued behind the semaphore
|
|
# Guards mutations of _executing. _check_due_tasks runs in the loop
|
|
# coroutine; trigger_task() can be called from request handlers; the
|
|
# event bus fires from background tasks. Without this lock long-running
|
|
# tasks could be double-dispatched.
|
|
self._executing_lock = asyncio.Lock()
|
|
self._pending_notifications = [] # completed task notifications
|
|
self._task_defer_counts = {}
|
|
# Strict serial execution — exactly one task runs at a time. Anything
|
|
# else (manual trigger, scheduled dispatch, task chain) waits behind
|
|
# the semaphore as "queued" and starts when the current run finishes.
|
|
# This is a hard guarantee, not configurable.
|
|
self._run_semaphore = asyncio.Semaphore(1)
|
|
self._concurrency_cap = 1
|
|
self._task_handles = {}
|
|
|
|
def _set_run_progress(self, run_id: str, message: str):
|
|
"""Persist short live progress text for Activity while a run is active."""
|
|
if not run_id:
|
|
return
|
|
try:
|
|
from core.database import SessionLocal, TaskRun
|
|
db = SessionLocal()
|
|
try:
|
|
run = db.query(TaskRun).filter(TaskRun.id == run_id).first()
|
|
if run and run.status in ("queued", "running"):
|
|
run.result = (message or "")[:4000]
|
|
db.commit()
|
|
finally:
|
|
db.close()
|
|
except Exception:
|
|
logger.debug("Task progress update failed", exc_info=True)
|
|
|
|
def _mark_run_aborted(self, task_id: str, run_id: str | None = None, message: str = "Stopped by user") -> bool:
|
|
"""Mark an active run as aborted. Used by stop/cancel paths."""
|
|
try:
|
|
from core.database import SessionLocal, TaskRun
|
|
db = SessionLocal()
|
|
try:
|
|
q = db.query(TaskRun)
|
|
if run_id:
|
|
q = q.filter(TaskRun.id == run_id)
|
|
else:
|
|
q = q.filter(
|
|
TaskRun.task_id == task_id,
|
|
TaskRun.status.in_(("queued", "running")),
|
|
).order_by(TaskRun.started_at.desc())
|
|
run = q.first()
|
|
if not run or run.status not in ("queued", "running"):
|
|
return False
|
|
run.status = "aborted"
|
|
run.error = message
|
|
run.result = run.result or message
|
|
run.finished_at = datetime.utcnow()
|
|
db.commit()
|
|
return True
|
|
finally:
|
|
db.close()
|
|
except Exception:
|
|
logger.debug("Task abort marker failed for %s", task_id, exc_info=True)
|
|
return False
|
|
|
|
def add_notification(self, task_name: str, status: str, task_id: str = None, owner: str = None, body: str = None):
|
|
"""Store a notification about a completed task run. Tagged with the
|
|
task's owner so `pop_notifications` can return only that user's
|
|
notifications and prevent cross-tenant drain. `body` is the result
|
|
text — populated when output_target='notification' so the client can
|
|
show a rich browser Notification, not just a toast."""
|
|
self._pending_notifications.append({
|
|
"task_name": task_name,
|
|
"status": status,
|
|
"task_id": task_id,
|
|
"owner": owner,
|
|
"body": (body[:500] + "…") if body and len(body) > 500 else body,
|
|
"timestamp": datetime.utcnow().isoformat() + "Z",
|
|
})
|
|
# Cap at 50 to avoid unbounded growth
|
|
if len(self._pending_notifications) > 50:
|
|
self._pending_notifications = self._pending_notifications[-50:]
|
|
|
|
def pop_notifications(self, owner: str = None) -> list:
|
|
"""Return and clear pending notifications.
|
|
|
|
When `owner` is set, only matching notifications are returned (and
|
|
cleared). Notifications stored before owner-tagging existed (or
|
|
from owner-less tasks) are included when the caller is anonymous
|
|
or when no owner filter is given — preserves backward behaviour
|
|
for the legacy single-user deploy.
|
|
"""
|
|
if owner is None:
|
|
notes = self._pending_notifications[:]
|
|
self._pending_notifications.clear()
|
|
return notes
|
|
# Strict owner scope — used to OR-in null-owner notifications for
|
|
# "legacy single-user" compat but that leaked notification bodies to
|
|
# any authenticated user once a second account existed.
|
|
keep, take = [], []
|
|
for n in self._pending_notifications:
|
|
if n.get("owner") == owner:
|
|
take.append(n)
|
|
else:
|
|
keep.append(n)
|
|
self._pending_notifications = keep
|
|
return take
|
|
|
|
async def start(self):
|
|
# On startup, mark any leftover "running" task_runs as errored. Without
|
|
# this, a server crash leaves rows stuck running indefinitely and the
|
|
# _executing in-memory set forgets them, so the UI shows phantoms.
|
|
try:
|
|
from core.database import SessionLocal, TaskRun
|
|
db = SessionLocal()
|
|
try:
|
|
# Zombies from a prior server crash. Tagged "aborted" (not
|
|
# "error") so the Activity view + error-rate stats don't
|
|
# falsely blame the task for what was an infrastructure event.
|
|
stale = db.query(TaskRun).filter(
|
|
TaskRun.status.in_(("running", "queued"))
|
|
).all()
|
|
if stale:
|
|
now = datetime.utcnow()
|
|
for r in stale:
|
|
old_status = r.status or "running"
|
|
r.status = "aborted"
|
|
r.error = "Server restarted while task was " + old_status
|
|
r.finished_at = now
|
|
db.commit()
|
|
logger.info(f"Cleared {len(stale)} stale task_runs from previous run")
|
|
finally:
|
|
db.close()
|
|
except Exception as e:
|
|
logger.warning(f"Could not clear stale task_runs on startup: {e}")
|
|
|
|
# Advance next_run for active tasks whose next_run is already in the
|
|
# past. Without this, a restart hits _check_due_tasks() with an empty
|
|
# in-process _executing set, and the same overdue task fires once per
|
|
# poll until it completes.
|
|
try:
|
|
from core.database import SessionLocal as _SL, ScheduledTask as _ST
|
|
db = _SL()
|
|
try:
|
|
now = datetime.utcnow()
|
|
overdue = db.query(_ST).filter(
|
|
_ST.status == "active",
|
|
_ST.next_run.isnot(None),
|
|
_ST.next_run < now,
|
|
).all()
|
|
if overdue:
|
|
for t in overdue:
|
|
t.next_run = now + timedelta(seconds=60)
|
|
db.commit()
|
|
logger.info(
|
|
"Pushed next_run forward by 60s for %d overdue active tasks on startup",
|
|
len(overdue),
|
|
)
|
|
finally:
|
|
db.close()
|
|
except Exception as e:
|
|
logger.warning(f"Could not advance overdue next_run on startup: {e}")
|
|
|
|
# Defense-in-depth dedupe sweep: for any owner with >1 rows where
|
|
# is_default_assistant=True, keep the oldest and demote the rest +
|
|
# delete their orphaned check-in tasks. This is the safety net for
|
|
# the synthetic-owner seeding bug (we cleaned a manual instance of
|
|
# it, but a stale code path or DB import could recreate it).
|
|
try:
|
|
from core.database import SessionLocal, CrewMember, ScheduledTask
|
|
db = SessionLocal()
|
|
try:
|
|
from sqlalchemy import func
|
|
groups = db.query(CrewMember.owner, func.count(CrewMember.id).label("n")).filter(
|
|
CrewMember.is_default_assistant == True, # noqa: E712
|
|
).group_by(CrewMember.owner).having(func.count(CrewMember.id) > 1).all()
|
|
for owner, n in groups:
|
|
rows = db.query(CrewMember).filter(
|
|
CrewMember.owner == owner,
|
|
CrewMember.is_default_assistant == True, # noqa: E712
|
|
).order_by(CrewMember.created_at.asc()).all()
|
|
keep = rows[0]
|
|
losers = rows[1:]
|
|
loser_ids = [r.id for r in losers]
|
|
# Delete the orphaned tasks tied to the loser crews — they
|
|
# are duplicates of the keeper's check-ins.
|
|
n_tasks = db.query(ScheduledTask).filter(
|
|
ScheduledTask.crew_member_id.in_(loser_ids)
|
|
).delete(synchronize_session=False)
|
|
for r in losers:
|
|
db.delete(r)
|
|
db.commit()
|
|
logger.warning(
|
|
"Default-assistant dedupe: owner=%r had %d rows, kept %s, "
|
|
"dropped %d crew + %d orphan tasks",
|
|
owner, n, keep.id, len(losers), n_tasks,
|
|
)
|
|
finally:
|
|
db.close()
|
|
except Exception as e:
|
|
logger.warning(f"Could not dedupe default-assistant rows on startup: {e}")
|
|
|
|
self._running = True
|
|
self._task = asyncio.create_task(self._loop())
|
|
# Internal background scanner that isn't a user-facing "task" — pure
|
|
# infra (no LLM), shouldn't clutter the Tasks UI, fires on its own
|
|
# cadence inside the scheduler process.
|
|
#
|
|
# Calendar event reminders are represented as Notes by the calendar UI,
|
|
# so the Notes scanner is the single reminder dispatch path. Running the
|
|
# old event scanner too caused duplicate emails/notifications for the
|
|
# same calendar event.
|
|
self._note_pings_task = asyncio.create_task(self._note_pings_loop())
|
|
logger.info(f"Task scheduler started (concurrency cap: {self._concurrency_cap})")
|
|
# Audit clusters: show any minute-of-day where >1 active scheduled
|
|
# tasks land. Helps spot "all my tasks fire at 9am" patterns the user
|
|
# may want to spread out.
|
|
try:
|
|
from core.database import SessionLocal, ScheduledTask
|
|
db = SessionLocal()
|
|
try:
|
|
rows = db.query(ScheduledTask).filter(
|
|
ScheduledTask.status == "active",
|
|
ScheduledTask.trigger_type == "schedule",
|
|
ScheduledTask.next_run.isnot(None),
|
|
).all()
|
|
buckets: Dict[str, list] = {}
|
|
for r in rows:
|
|
if not r.next_run:
|
|
continue
|
|
key = r.next_run.strftime("%H:%M")
|
|
buckets.setdefault(key, []).append(r.name or r.id)
|
|
clusters = {k: v for k, v in buckets.items() if len(v) > 1}
|
|
if clusters:
|
|
summary = ", ".join(f"{k} ({len(v)})" for k, v in sorted(clusters.items()))
|
|
logger.info(f"Task scheduling clusters (>1 task/minute): {summary}")
|
|
finally:
|
|
db.close()
|
|
except Exception as e:
|
|
logger.debug(f"Cluster audit skipped: {e}")
|
|
|
|
async def stop(self):
|
|
self._running = False
|
|
if self._task:
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
for attr in ("_note_pings_task", "_event_pings_task"):
|
|
t = getattr(self, attr, None)
|
|
if t:
|
|
t.cancel()
|
|
try: await t
|
|
except asyncio.CancelledError: pass
|
|
logger.info("Task scheduler stopped")
|
|
|
|
async def _note_pings_loop(self):
|
|
"""Built-in note-due scanner — ticks every 60s inside the scheduler.
|
|
Pure infra (no LLM), doesn't surface in the Tasks UI. Iterates
|
|
per-owner so cache pruning in `action_ping_notes` (which removes
|
|
cache entries for notes not in the current scan's seen_ids) doesn't
|
|
cross-delete other users' entries (review C4).
|
|
"""
|
|
await asyncio.sleep(30)
|
|
from src.builtin_actions import action_ping_notes, TaskNoop
|
|
while self._running:
|
|
owners = self._known_task_owners()
|
|
for ow in (owners or [""]):
|
|
try:
|
|
await action_ping_notes(owner=ow)
|
|
except TaskNoop:
|
|
pass
|
|
except Exception as e:
|
|
logger.warning(f"ping_notes background scanner errored for owner={ow!r}: {e}")
|
|
await asyncio.sleep(60) # 1 min
|
|
|
|
async def _event_pings_loop(self):
|
|
"""Built-in calendar-event scanner — same recipe as note pings. Runs
|
|
every 10 min, fires reminders via dispatch_reminder. Not a user task.
|
|
Iterates per-owner so each user only gets their own calendar pings
|
|
(passing owner="" globally would email User B's events to User A's
|
|
configured SMTP "from" address — see review C3).
|
|
"""
|
|
await asyncio.sleep(90)
|
|
from src.builtin_actions import action_ping_events, TaskNoop
|
|
while self._running:
|
|
owners = self._known_task_owners()
|
|
for ow in (owners or [""]):
|
|
try:
|
|
await action_ping_events(owner=ow)
|
|
except TaskNoop:
|
|
pass
|
|
except Exception as e:
|
|
logger.warning(f"ping_events background scanner errored for owner={ow!r}: {e}")
|
|
await asyncio.sleep(600) # 10 min
|
|
|
|
def _known_task_owners(self) -> list:
|
|
"""Distinct non-empty owners that background scanners should visit.
|
|
|
|
Scheduled tasks used to be the only owner source. Calendar reminders
|
|
are stored as Notes, though, so an account with due notes but no task
|
|
rows could get the browser reminder while the backend email/ntfy
|
|
scanner never ran for that owner.
|
|
"""
|
|
from core.database import SessionLocal, ScheduledTask, Note
|
|
db = SessionLocal()
|
|
try:
|
|
owners = set()
|
|
for r in db.query(ScheduledTask.owner).distinct().all():
|
|
if r[0]:
|
|
owners.add(r[0])
|
|
note_q = db.query(Note.owner).filter(
|
|
Note.due_date.isnot(None),
|
|
Note.due_date != "",
|
|
Note.archived == False, # noqa: E712
|
|
).distinct()
|
|
for r in note_q.all():
|
|
if r[0]:
|
|
owners.add(r[0])
|
|
return sorted(owners)
|
|
except Exception:
|
|
return []
|
|
finally:
|
|
db.close()
|
|
|
|
async def _loop(self):
|
|
await asyncio.sleep(10)
|
|
while self._running:
|
|
try:
|
|
await self._check_due_tasks()
|
|
except Exception:
|
|
logger.exception("Error in task scheduler loop")
|
|
# Sleep until the next scheduled run, capped at 60s. A `* * * * *`
|
|
# cron task previously fired up to ~60s late because we always
|
|
# slept the full minute; now the loop wakes near the boundary.
|
|
sleep_for = 60.0
|
|
try:
|
|
from core.database import SessionLocal as _SL, ScheduledTask as _ST
|
|
_db = _SL()
|
|
try:
|
|
next_run = _db.query(_ST.next_run).filter(
|
|
_ST.status == "active",
|
|
_ST.next_run.isnot(None),
|
|
).order_by(_ST.next_run.asc()).first()
|
|
if next_run and next_run[0]:
|
|
delta = (next_run[0] - datetime.utcnow()).total_seconds()
|
|
sleep_for = max(1.0, min(60.0, delta))
|
|
finally:
|
|
_db.close()
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(sleep_for)
|
|
|
|
async def _check_due_tasks(self):
|
|
from core.database import SessionLocal, ScheduledTask
|
|
db = SessionLocal()
|
|
try:
|
|
now = datetime.utcnow()
|
|
async with self._executing_lock:
|
|
# Snapshot under the lock so we don't race with mid-iteration adds.
|
|
executing_snapshot = set(self._executing)
|
|
# Scheduled tasks and deferred event tasks both use next_run.
|
|
due = db.query(ScheduledTask).filter(
|
|
ScheduledTask.status == "active",
|
|
ScheduledTask.next_run <= now,
|
|
ScheduledTask.id.notin_(executing_snapshot) if executing_snapshot else True,
|
|
).all()
|
|
to_dispatch = []
|
|
for task in due:
|
|
if task.id in self._executing:
|
|
continue
|
|
self._executing.add(task.id)
|
|
to_dispatch.append(task.id)
|
|
for task_id in to_dispatch:
|
|
asyncio.create_task(self._execute_task(task_id))
|
|
finally:
|
|
db.close()
|
|
|
|
async def _execute_task(self, task_id: str, *, bypass_model_slot: bool = False, release_executing: bool = True):
|
|
# Create the run record with status="queued" BEFORE waiting on the
|
|
# semaphore so the UI can show that a manually-triggered task is in
|
|
# line behind another. Once we acquire the slot, flip to "running"
|
|
# and hand off to _execute_task_locked.
|
|
from core.database import SessionLocal, TaskRun
|
|
current = asyncio.current_task()
|
|
if current:
|
|
self._task_handles[task_id] = current
|
|
run_id = str(uuid.uuid4())
|
|
_q_db = SessionLocal()
|
|
try:
|
|
run = TaskRun(
|
|
id=run_id,
|
|
task_id=task_id,
|
|
started_at=datetime.utcnow(),
|
|
status="queued",
|
|
result="Queued — waiting for a free slot…",
|
|
)
|
|
_q_db.add(run)
|
|
_q_db.commit()
|
|
except Exception:
|
|
logger.exception(f"Failed to create queued run row for task {task_id}")
|
|
finally:
|
|
_q_db.close()
|
|
|
|
try:
|
|
if bypass_model_slot or not self._task_needs_model_slot(task_id):
|
|
await self._execute_task_locked(task_id, run_id, release_executing=release_executing)
|
|
return
|
|
|
|
async with self._run_semaphore:
|
|
await self._execute_task_locked(task_id, run_id, release_executing=release_executing)
|
|
except asyncio.CancelledError:
|
|
# If cancellation happens while queued behind the semaphore,
|
|
# _execute_task_locked never runs and cannot update the Activity row.
|
|
self._mark_run_aborted(task_id, run_id)
|
|
raise
|
|
finally:
|
|
handle = self._task_handles.get(task_id)
|
|
if handle is current:
|
|
self._task_handles.pop(task_id, None)
|
|
if release_executing:
|
|
async with self._executing_lock:
|
|
self._executing.discard(task_id)
|
|
|
|
async def _execute_task_locked(self, task_id: str, run_id: str, *, release_executing: bool = True):
|
|
from core.database import SessionLocal, ScheduledTask, TaskRun
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
|
|
if not task or task.status != "active":
|
|
# Task was paused/deleted while queued — record that outcome
|
|
# so the run row doesn't sit as "queued" forever.
|
|
stale = db.query(TaskRun).filter(TaskRun.id == run_id).first()
|
|
if stale and stale.status == "queued":
|
|
stale.status = "skipped"
|
|
stale.finished_at = datetime.utcnow()
|
|
stale.error = f"Task no longer active (status={task.status if task else 'deleted'})"
|
|
db.commit()
|
|
return
|
|
|
|
# Flip the run from queued → running. Reset started_at to the
|
|
# actual execution start so queue wait time is visible from
|
|
# created_at vs started_at if we ever surface that.
|
|
run = db.query(TaskRun).filter(TaskRun.id == run_id).first()
|
|
if run:
|
|
run.status = "running"
|
|
run.started_at = datetime.utcnow()
|
|
run.result = "Starting…"
|
|
db.commit()
|
|
else:
|
|
# Defensive: row may have been wiped; recreate so the rest of
|
|
# the code can look it up by run_id without crashing.
|
|
run = TaskRun(
|
|
id=run_id,
|
|
task_id=task.id,
|
|
started_at=datetime.utcnow(),
|
|
status="running",
|
|
result="Starting…",
|
|
)
|
|
db.add(run)
|
|
db.commit()
|
|
|
|
task_type = task.task_type or "llm"
|
|
|
|
from src.builtin_actions import TaskDeferred, TaskNoop
|
|
|
|
# Cleared each run so an action task (no model) doesn't inherit a
|
|
# previous llm/research run's model. The executors set it once the
|
|
# model is resolved.
|
|
self._last_run_model = None
|
|
try:
|
|
if task_type == "action":
|
|
result, success = await self._execute_action(task, run_id=run_id)
|
|
run.status = "success" if success else "error"
|
|
run.result = result
|
|
if not success:
|
|
run.error = result
|
|
elif task_type == "research":
|
|
result = await self._execute_research_task(task, db)
|
|
run.status = "success"
|
|
run.result = result
|
|
else:
|
|
# LLM task — use agent loop for tool access
|
|
result = await self._execute_llm_task(task, db)
|
|
run.status = "success"
|
|
run.result = result
|
|
# Record which model actually ran (resolved inside the executor).
|
|
if getattr(self, "_last_run_model", None):
|
|
run.model = self._last_run_model
|
|
if run.status == "success":
|
|
await self._deliver_task_result(task, result, db, model=getattr(self, "_last_run_model", None))
|
|
except TaskDeferred as defer:
|
|
count = self._task_defer_counts.get(task_id, 0) + 1
|
|
self._task_defer_counts[task_id] = count
|
|
delay_seconds = int(getattr(defer, "delay_seconds", 20 * 60) or (20 * 60))
|
|
if count > 2:
|
|
delay_seconds = max(delay_seconds, 40 * 60)
|
|
when = datetime.utcnow() + timedelta(seconds=delay_seconds)
|
|
logger.info(
|
|
"Task '%s' deferred for %ss after %s quiet-window hit(s): %s",
|
|
task.name, delay_seconds, count, defer,
|
|
)
|
|
run_obj = db.query(TaskRun).filter(TaskRun.id == run_id).first()
|
|
if run_obj:
|
|
db.delete(run_obj)
|
|
task.next_run = when
|
|
db.commit()
|
|
return
|
|
except asyncio.CancelledError:
|
|
logger.info("Task '%s' stopped by user", task.name)
|
|
run_obj = db.query(TaskRun).filter(TaskRun.id == run_id).first()
|
|
if run_obj:
|
|
run_obj.status = "aborted"
|
|
run_obj.error = "Stopped by user"
|
|
run_obj.result = run_obj.result or "Stopped by user"
|
|
run_obj.finished_at = datetime.utcnow()
|
|
task.last_run = datetime.utcnow()
|
|
if (task.trigger_type or "schedule") == "schedule":
|
|
task.next_run = compute_next_run(
|
|
task.schedule, task.scheduled_time,
|
|
task.scheduled_day, task.scheduled_date,
|
|
after=datetime.utcnow(),
|
|
cron_expression=task.cron_expression,
|
|
tz_name=_resolve_task_timezone(db, task),
|
|
)
|
|
else:
|
|
task.next_run = None
|
|
db.commit()
|
|
return
|
|
except TaskNoop as noop:
|
|
# Action reported "nothing to do". Mark the run as `skipped`
|
|
# with the reason in `result` so it surfaces in Activity as a
|
|
# slim "skipped — <reason>" row instead of vanishing silently.
|
|
# (Previous behavior was `db.delete(run)`, which made the user
|
|
# think queued tasks had been dropped on the floor.)
|
|
logger.info(f"Task '{task.name}' no-op: {noop}")
|
|
run.status = "skipped"
|
|
run.result = str(noop)
|
|
run.finished_at = datetime.utcnow()
|
|
task.last_run = datetime.utcnow()
|
|
if (task.trigger_type or "schedule") == "schedule":
|
|
task.next_run = compute_next_run(
|
|
task.schedule, task.scheduled_time,
|
|
task.scheduled_day, task.scheduled_date,
|
|
after=datetime.utcnow(),
|
|
cron_expression=task.cron_expression,
|
|
tz_name=_resolve_task_timezone(db, task),
|
|
)
|
|
else:
|
|
task.next_run = None
|
|
db.commit()
|
|
return
|
|
|
|
run.finished_at = datetime.utcnow()
|
|
|
|
# Update task
|
|
task.last_run = datetime.utcnow()
|
|
task.run_count = (task.run_count or 0) + 1
|
|
self._task_defer_counts.pop(task_id, None)
|
|
|
|
# Compute next run only for schedule-triggered tasks
|
|
if (task.trigger_type or "schedule") == "schedule":
|
|
task.next_run = compute_next_run(
|
|
task.schedule, task.scheduled_time,
|
|
task.scheduled_day, task.scheduled_date,
|
|
after=datetime.utcnow(),
|
|
cron_expression=task.cron_expression,
|
|
tz_name=_resolve_task_timezone(db, task),
|
|
)
|
|
if task.next_run is None and task.schedule == "once":
|
|
task.status = "completed"
|
|
else:
|
|
task.next_run = None
|
|
|
|
db.commit()
|
|
logger.info(f"Task '{task.name}' completed (run {run_id})")
|
|
output = task.output_target or "session"
|
|
# Per-task notification gate. Default True (notifications_enabled
|
|
# defaults to True at column level), but skip when the user has
|
|
# explicitly turned them off for this task — quiets chatty
|
|
# housekeeping cron tasks without disabling them entirely.
|
|
should_notify = (
|
|
(task.task_type or "llm") in {"llm", "research"}
|
|
and getattr(task, "notifications_enabled", True)
|
|
)
|
|
if should_notify:
|
|
self.add_notification(
|
|
task.name,
|
|
run.status,
|
|
task_id,
|
|
owner=task.owner,
|
|
body=run.result if output == "notification" else None,
|
|
)
|
|
|
|
# Log result to the assistant chat so all task activity is visible.
|
|
# Skip skipped/error rows — user shouldn't see "skipped: …" noise
|
|
# for cron tasks that no-op'd, or duplicate error spam for tasks
|
|
# that already fired an error notification above.
|
|
if run.status == "success":
|
|
self._log_to_assistant(db, task, run.result or "[success]")
|
|
|
|
# Task chaining — trigger the next task on success
|
|
if run.status == "success" and task.then_task_id:
|
|
chain_id = task.then_task_id
|
|
if not self._has_chain_cycle(db, chain_id):
|
|
logger.info(f"Chaining: '{task.name}' → task {chain_id}")
|
|
asyncio.create_task(self._run_chained(chain_id))
|
|
else:
|
|
logger.warning(f"Skipping chain from '{task.name}': cycle detected")
|
|
|
|
except Exception as exec_exc:
|
|
logger.exception(f"Task {task_id} execution error")
|
|
# Fetch the task's owner so the error notification reaches
|
|
# the same user the success notification would have.
|
|
_owner = None
|
|
try:
|
|
_t = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
|
|
_owner = _t.owner if _t else None
|
|
except Exception:
|
|
pass
|
|
_should_notify_error = False
|
|
try:
|
|
_t_for_notify = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
|
|
_should_notify_error = (
|
|
bool(_t_for_notify)
|
|
and (_t_for_notify.task_type or "llm") in {"llm", "research"}
|
|
and getattr(_t_for_notify, "notifications_enabled", True)
|
|
)
|
|
except Exception:
|
|
_should_notify_error = False
|
|
if _should_notify_error:
|
|
self.add_notification(f"Task {task_id}", "error", task_id, owner=_owner)
|
|
try:
|
|
# Persist the actual exception message so the UI can show it
|
|
err_text = f"{type(exec_exc).__name__}: {exec_exc}"
|
|
run_obj = db.query(TaskRun).filter(TaskRun.id == run_id).first()
|
|
if run_obj and run_obj.status in ("running", "success"):
|
|
run_obj.status = "error"
|
|
run_obj.error = err_text[:2000]
|
|
run_obj.finished_at = datetime.utcnow()
|
|
# Advance next_run even on failure so a broken task doesn't
|
|
# busy-loop the scheduler every tick with a stale past date.
|
|
task_obj = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
|
|
if task_obj and (task_obj.trigger_type or "schedule") == "schedule":
|
|
task_obj.last_run = datetime.utcnow()
|
|
try:
|
|
task_obj.next_run = compute_next_run(
|
|
task_obj.schedule, task_obj.scheduled_time,
|
|
task_obj.scheduled_day, task_obj.scheduled_date,
|
|
after=datetime.utcnow(),
|
|
cron_expression=task_obj.cron_expression,
|
|
tz_name=_resolve_task_timezone(db, task_obj),
|
|
)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
db.commit()
|
|
except Exception as commit_err:
|
|
# Commit failed — without a fallback the run row stays
|
|
# "running" forever AND next_run stays in the past, so the
|
|
# scheduler busy-loops dispatching the same task every tick
|
|
# until restart. Force the recovery in a fresh session.
|
|
logger.warning("Task %s error-path commit failed: %s — falling back", task_id, commit_err)
|
|
try:
|
|
db.rollback()
|
|
except Exception:
|
|
pass
|
|
from datetime import timedelta as _td
|
|
_recover_db = SessionLocal()
|
|
try:
|
|
_r = _recover_db.query(TaskRun).filter(TaskRun.id == run_id).first()
|
|
if _r and _r.status in ("running", "queued"):
|
|
_r.status = "aborted"
|
|
_r.error = f"commit_failed: {type(commit_err).__name__}: {commit_err}"[:2000]
|
|
_r.finished_at = datetime.utcnow()
|
|
_t = _recover_db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
|
|
if _t and (_t.trigger_type or "schedule") == "schedule":
|
|
# Push next_run forward 5min as a safe stall so the
|
|
# scheduler doesn't immediately re-dispatch.
|
|
_t.next_run = datetime.utcnow() + _td(minutes=5)
|
|
_t.last_run = datetime.utcnow()
|
|
_recover_db.commit()
|
|
except Exception as recover_err:
|
|
logger.error("Task %s recovery commit ALSO failed: %s", task_id, recover_err)
|
|
finally:
|
|
_recover_db.close()
|
|
except Exception:
|
|
logger.exception("Task %s error-path failed unexpectedly", task_id)
|
|
finally:
|
|
db.close()
|
|
handle = self._task_handles.get(task_id)
|
|
if handle is asyncio.current_task():
|
|
self._task_handles.pop(task_id, None)
|
|
if release_executing:
|
|
async with self._executing_lock:
|
|
self._executing.discard(task_id)
|
|
|
|
|
|
|
|
# Built-in housekeeping actions whose output is pure infra (no user-facing
|
|
# content) — don't pollute the assistant chat session with their summaries.
|
|
# Activity log + reminder email already carry everything the user needs.
|
|
_SILENT_ACTIONS = frozenset({
|
|
"check_email_urgency",
|
|
"mark_email_boundaries",
|
|
"learn_sender_signatures",
|
|
"summarize_emails",
|
|
"draft_email_replies",
|
|
"extract_email_events",
|
|
"classify_events",
|
|
"tidy_sessions",
|
|
"tidy_documents",
|
|
"consolidate_memory",
|
|
"tidy_research",
|
|
"test_skills",
|
|
"audit_skills",
|
|
})
|
|
|
|
_MODEL_BACKED_ACTIONS = frozenset({
|
|
"summarize_emails",
|
|
"draft_email_replies",
|
|
"extract_email_events",
|
|
"classify_events",
|
|
"mark_email_boundaries",
|
|
"learn_sender_signatures",
|
|
"check_email_urgency",
|
|
"test_skills",
|
|
"audit_skills",
|
|
"consolidate_memory",
|
|
})
|
|
|
|
def _task_needs_model_slot(self, task_id: str) -> bool:
|
|
"""Only LLM/research/model-backed actions should wait in the model
|
|
queue. Pure housekeeping actions can run immediately."""
|
|
from core.database import SessionLocal, ScheduledTask
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
task = db.query(ScheduledTask).filter(ScheduledTask.id == task_id).first()
|
|
if not task:
|
|
return True
|
|
task_type = task.task_type or "llm"
|
|
if task_type != "action":
|
|
return True
|
|
return (task.action or "") in self._MODEL_BACKED_ACTIONS
|
|
finally:
|
|
db.close()
|
|
|
|
def _log_to_assistant(self, db, task, result_text: str):
|
|
"""Log a task result to the assistant's chat session."""
|
|
# Don't double-log check-ins (they already save directly)
|
|
if "check-in" in (task.name or "").lower():
|
|
return
|
|
# Built-in housekeeping noise stays out of the chat.
|
|
if (task.action or "") in self._SILENT_ACTIONS:
|
|
return
|
|
from src.assistant_log import log_to_assistant
|
|
log_to_assistant(
|
|
task.owner,
|
|
result_text[:1000],
|
|
category=(task.name or "Task"),
|
|
)
|
|
|
|
async def _execute_action(self, task, run_id: str | None = None) -> tuple:
|
|
"""Execute a built-in action (no LLM needed)."""
|
|
from src.builtin_actions import BUILTIN_ACTIONS
|
|
|
|
action_fn = BUILTIN_ACTIONS.get(task.action)
|
|
if not action_fn:
|
|
return f"Unknown action: {task.action}", False
|
|
|
|
from src.builtin_actions import TaskNoop
|
|
try:
|
|
# Pass task prompt as script/command for ssh_command/run_script actions.
|
|
def _progress(message: str):
|
|
self._set_run_progress(run_id, message)
|
|
|
|
kwargs = {"owner": task.owner, "task_name": task.name, "progress_cb": _progress}
|
|
if task.action in ("run_script", "run_local", "ssh_command") and task.prompt:
|
|
kwargs["script" if task.action in ("run_script", "run_local") else "command"] = task.prompt
|
|
result, success = await action_fn(**kwargs)
|
|
return result, success
|
|
except TaskNoop:
|
|
# Bubble up so _execute_task_locked can drop the run row silently.
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Action '{task.action}' failed: {e}")
|
|
return str(e), False
|
|
|
|
# ── Check-in source discovery ──
|
|
# Pattern-based: if an MCP server has a tool matching a pattern, it becomes
|
|
# a check-in source. Add new patterns here to support new integrations —
|
|
# no code changes needed elsewhere.
|
|
CHECKIN_MCP_PATTERNS = [
|
|
{"detect": "list_emails", "section": "Email", "tool": "list_emails",
|
|
"args": {"mailbox": "INBOX", "limit": 10, "unread_only": True},
|
|
"label_from_identity": True,
|
|
"formatter": "_format_email_output"},
|
|
{"detect": "search_emails", "section": "Email", "tool": "search_emails",
|
|
"args": {"query": "is:unread", "limit": 10},
|
|
"label_from_identity": True,
|
|
"formatter": "_format_email_output"},
|
|
{"detect": "get_feed", "section": "RSS", "tool": "get_feed",
|
|
"args": {},
|
|
"label_from_identity": False},
|
|
{"detect": "list_feeds", "section": "RSS", "tool": "list_feeds",
|
|
"args": {},
|
|
"label_from_identity": False},
|
|
{"detect": "list_messages", "section": "Messages", "tool": "list_messages",
|
|
"args": {"limit": 10},
|
|
"label_from_identity": True},
|
|
]
|
|
|
|
@staticmethod
|
|
def _format_email_output(raw: str) -> str:
|
|
"""Clean up raw MCP email list output into readable format."""
|
|
import re as _re
|
|
lines = []
|
|
for line in raw.split("\n"):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
# Skip header lines like "📬 [INBOX] 856 emails..."
|
|
if line.startswith(("\U0001f4ec", "📬", "No emails", "---", "Page ")):
|
|
continue
|
|
# Skip "more pages available" etc
|
|
if "page" in line.lower() and "/" in line:
|
|
continue
|
|
# Parse: [1778] Re: Subject From: Name | Date
|
|
m = _re.match(r'\[?\d+\]?\s*(?:↩️\s*|📎\s*|🔵\s*|⭐\s*)?(.+?)(?:\s*From:\s*(.+?))?(?:\s*\|\s*(\S+))?$', line)
|
|
if m:
|
|
subject = m.group(1).strip().rstrip('|').strip()
|
|
sender = (m.group(2) or "").strip().rstrip('|').strip()
|
|
if sender:
|
|
lines.append(f"- {sender} — {subject}")
|
|
else:
|
|
lines.append(f"- {subject}")
|
|
elif line.startswith("[") or line.startswith("-"):
|
|
# Generic cleanup
|
|
cleaned = _re.sub(r'^\[?\d+\]?\s*(?:↩️\s*|📎\s*)?', '', line.lstrip('- '))
|
|
if cleaned.strip():
|
|
lines.append(f"- {cleaned.strip()}")
|
|
if not lines:
|
|
return "No unread emails"
|
|
return "\n".join(lines[:10])
|
|
|
|
async def _execute_checkin(self, task, crew, db, session_id: str,
|
|
endpoint_url: str, model: str) -> str:
|
|
"""Gather raw data from all integrations, hand it to the LLM to write the check-in."""
|
|
from src.tool_implementations import do_manage_notes
|
|
from src.agent_tools import get_mcp_manager
|
|
|
|
tz_name = _resolve_task_timezone(db, task)
|
|
try:
|
|
if tz_name:
|
|
from zoneinfo import ZoneInfo
|
|
from datetime import timezone, timedelta
|
|
now = datetime.utcnow().replace(tzinfo=timezone.utc).astimezone(ZoneInfo(tz_name))
|
|
else:
|
|
from datetime import timedelta
|
|
now = datetime.utcnow()
|
|
time_str = now.strftime("%A, %B %d %Y, %H:%M")
|
|
except Exception:
|
|
from datetime import timedelta
|
|
now = datetime.utcnow()
|
|
time_str = now.strftime("%H:%M UTC")
|
|
|
|
raw = {}
|
|
|
|
# Calendar: today+tomorrow, this week, month ahead
|
|
# Pull directly from DB so we can include event_type and importance.
|
|
try:
|
|
from core.database import SessionLocal as _SL, CalendarEvent as _CE
|
|
_db = _SL()
|
|
try:
|
|
for label, start, end in _digest_windows(now):
|
|
# Strip timezone for naive DB comparison
|
|
_s = start.replace(tzinfo=None) if start.tzinfo else start
|
|
_e = end.replace(tzinfo=None) if end.tzinfo else end
|
|
evs = _db.query(_CE).filter(
|
|
_CE.dtstart >= _s,
|
|
_CE.dtstart <= _e,
|
|
_CE.status != "cancelled",
|
|
).order_by(_CE.dtstart).all()
|
|
if not evs:
|
|
continue
|
|
# Group by importance for richer output
|
|
by_imp = {"critical": [], "high": [], "normal": [], "low": []}
|
|
for ev in evs:
|
|
imp = (ev.importance or "normal").lower()
|
|
by_imp.setdefault(imp, []).append(ev)
|
|
lines = []
|
|
for tier in ("critical", "high", "normal", "low"):
|
|
items = by_imp.get(tier, [])
|
|
if not items:
|
|
continue
|
|
marker = {"critical": "[!!]", "high": "[!]", "normal": " ", "low": " ·"}[tier]
|
|
for ev in items:
|
|
t = ev.dtstart.strftime("%a %b %d %H:%M")
|
|
tag = f" ({ev.event_type})" if ev.event_type else ""
|
|
loc = f" @ {ev.location}" if ev.location else ""
|
|
lines.append(f"{marker} {t} — {ev.summary}{tag}{loc}")
|
|
if lines:
|
|
raw[f"calendar_{label}"] = "\n".join(lines)
|
|
finally:
|
|
_db.close()
|
|
except Exception as e:
|
|
raw["calendar"] = f"Error: {e}"
|
|
|
|
# Notes/Tasks
|
|
try:
|
|
r = await do_manage_notes(json.dumps({"action": "list"}), owner=task.owner)
|
|
raw["notes_tasks"] = r.get("results") or r.get("response") or "No notes"
|
|
except Exception as e:
|
|
raw["notes_tasks"] = f"Error: {e}"
|
|
|
|
# Auto-discover API integrations (Miniflux RSS, etc.).
|
|
try:
|
|
import httpx
|
|
from src.integrations import load_integrations
|
|
for integ in load_integrations():
|
|
if not integ.get("enabled"):
|
|
continue
|
|
preset = integ.get("preset", "")
|
|
base_url = integ.get("base_url", "").rstrip("/")
|
|
api_key = integ.get("api_key", "")
|
|
if not base_url:
|
|
continue
|
|
|
|
# Build auth headers
|
|
headers = {}
|
|
if integ.get("auth_type") == "header" and api_key:
|
|
headers[integ.get("auth_header", "X-Auth-Token")] = api_key
|
|
elif integ.get("auth_type") == "bearer" and api_key:
|
|
headers["Authorization"] = f"Bearer {api_key}"
|
|
|
|
# Miniflux: fetch unread entries (cached 3 min across tasks)
|
|
if preset == "miniflux":
|
|
async def _fetch_miniflux(_base=base_url, _headers=dict(headers)):
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.get(
|
|
f"{_base}/v1/entries",
|
|
params={"status": "unread", "limit": 15, "order": "published_at", "direction": "desc"},
|
|
headers=_headers,
|
|
)
|
|
if resp.status_code != 200:
|
|
return None
|
|
entries = resp.json().get("entries", []) or []
|
|
if not entries:
|
|
return None
|
|
lines = []
|
|
for e in entries[:15]:
|
|
title = e.get("title", "?")
|
|
feed = (e.get("feed") or {}).get("title", "?")
|
|
url = e.get("url", "")
|
|
lines.append(f"- [{feed}] {title} — {url}")
|
|
return "\n".join(lines)
|
|
try:
|
|
val = await _cached(("miniflux_unread", base_url), 180, _fetch_miniflux)
|
|
if val:
|
|
raw["rss_miniflux_unread"] = val
|
|
except Exception as e:
|
|
logger.warning(f"Miniflux fetch failed: {e}")
|
|
except Exception as e:
|
|
logger.warning(f"Integrations discovery failed: {e}")
|
|
|
|
# Auto-discover MCP sources
|
|
mcp = get_mcp_manager()
|
|
if mcp:
|
|
discovered = set()
|
|
for server_id, tools in mcp._tools.items():
|
|
if mcp.is_builtin(server_id):
|
|
continue
|
|
conn = mcp._connections.get(server_id, {})
|
|
if conn.get("status") != "connected":
|
|
continue
|
|
identity = conn.get("identity", "")
|
|
tool_names = {t["name"] for t in tools}
|
|
for pattern in self.CHECKIN_MCP_PATTERNS:
|
|
if pattern["detect"] not in tool_names:
|
|
continue
|
|
key = f"{pattern['section']}_{server_id}"
|
|
if key in discovered:
|
|
continue
|
|
discovered.add(key)
|
|
label = f"{pattern['section']} ({identity})" if identity else pattern["section"]
|
|
qualified = f"mcp__{server_id}__{pattern['tool']}"
|
|
args = dict(pattern.get("args", {}))
|
|
args["account"] = "default"
|
|
try:
|
|
# Cache 3 min: different scheduled tasks firing at the
|
|
# same minute share the same MCP snapshot.
|
|
async def _call_mcp(_q=qualified, _args=args):
|
|
return await mcp.call_tool(_q, _args)
|
|
cache_key = ("mcp_snapshot", qualified, json.dumps(args, sort_keys=True))
|
|
result = await _cached(cache_key, 180, _call_mcp)
|
|
if result.get("exit_code", 0) != 0:
|
|
continue
|
|
content = result.get("stdout") or result.get("output") or ""
|
|
if content.strip():
|
|
raw[label] = content[:3000]
|
|
except Exception:
|
|
pass
|
|
|
|
# Build the data dump and hand it to the LLM
|
|
data_dump = f"Current time: {time_str}\n\n"
|
|
for key, val in raw.items():
|
|
data_dump += f"--- {key} ---\n{val}\n\n"
|
|
|
|
context = (
|
|
data_dump +
|
|
f"---\n\n{task.prompt}\n\n"
|
|
"Write the check-in. YOU decide what matters, what to skip, how to format. "
|
|
"Only show future events. Calendar events are pre-tagged with importance: "
|
|
"[!!] critical, [!] high, plain = normal, ' ·' = low. "
|
|
"GROUP your output by importance — lead with critical/high, then normal, "
|
|
"skip low entirely unless explicitly relevant. Mention event type (work/health/travel/etc) "
|
|
"where it adds context (e.g. 'leave 1h early for travel'). "
|
|
"Flag anything coming up that needs prep (birthdays, deadlines, holidays). "
|
|
"Use tools to take action if needed. Keep it concise — no raw data dumps."
|
|
)
|
|
|
|
return await self._run_agent_loop(
|
|
endpoint_url, model, task, session_id,
|
|
system_prompt=(crew.personality or "").strip() if crew else None,
|
|
disabled_tools=None, relevant_tools=None,
|
|
override_user_message=context,
|
|
)
|
|
|
|
async def _execute_llm_task(self, task, db) -> str:
|
|
"""Execute an LLM task with full tool access via the agent loop."""
|
|
from core.database import Session as DbSession, ChatMessage, CrewMember
|
|
|
|
# If this task is wired to a CrewMember (personal assistant, custom
|
|
# crew), prefer the crew member's persona/model/endpoint as overrides.
|
|
crew = None
|
|
if getattr(task, "crew_member_id", None):
|
|
try:
|
|
crew = db.query(CrewMember).filter(CrewMember.id == task.crew_member_id).first()
|
|
except Exception:
|
|
crew = None
|
|
|
|
# Determine endpoint + model
|
|
endpoint_url = task.endpoint_url
|
|
model = task.model
|
|
if (not endpoint_url or not model) and crew:
|
|
endpoint_url = endpoint_url or crew.endpoint_url
|
|
model = model or crew.model
|
|
if not endpoint_url or not model:
|
|
endpoint_url, model = self._resolve_defaults(db, task.owner)
|
|
if not endpoint_url or not model:
|
|
raise RuntimeError("No model/endpoint configured")
|
|
# Record the resolved model so _execute_task_locked can persist it on
|
|
# the run (tasks rarely pin a model, so this is the only record of
|
|
# which model actually produced the output).
|
|
self._last_run_model = model
|
|
|
|
# Ensure a session exists for output
|
|
session_id = task.session_id
|
|
if not session_id:
|
|
session_id = str(uuid.uuid4())
|
|
sess = DbSession(
|
|
id=session_id,
|
|
name=f"[Task] {task.name}",
|
|
endpoint_url=endpoint_url,
|
|
model=model,
|
|
owner=task.owner,
|
|
created_at=datetime.utcnow(),
|
|
updated_at=datetime.utcnow(),
|
|
)
|
|
db.add(sess)
|
|
task.session_id = session_id
|
|
db.commit()
|
|
if self._session_manager:
|
|
try:
|
|
self._session_manager.sessions[session_id] = self._session_manager._db_to_session(sess)
|
|
except Exception:
|
|
pass
|
|
|
|
# For assistant check-ins: call each tool directly and post results
|
|
# as separate messages. More reliable than hoping the model calls tools.
|
|
is_checkin = crew and crew.is_default_assistant and "check-in" in (task.name or "").lower()
|
|
if is_checkin:
|
|
return await self._execute_checkin(task, crew, db, session_id, endpoint_url, model)
|
|
|
|
# Build system prompt: crew member persona overrides the default.
|
|
system_prompt = (
|
|
(crew.personality or "").strip()
|
|
if crew and crew.personality
|
|
else "You are a helpful assistant executing a scheduled task. Use available tools to complete the task thoroughly."
|
|
)
|
|
# Inject current time so the model knows what's past vs upcoming
|
|
tz_name = _resolve_task_timezone(db, task)
|
|
try:
|
|
if tz_name:
|
|
from zoneinfo import ZoneInfo
|
|
from datetime import timezone
|
|
now_local = datetime.utcnow().replace(tzinfo=timezone.utc).astimezone(ZoneInfo(tz_name))
|
|
time_str = now_local.strftime("%A, %B %d %Y, %H:%M %Z")
|
|
else:
|
|
time_str = datetime.utcnow().strftime("%A, %B %d %Y, %H:%M UTC")
|
|
except Exception:
|
|
time_str = datetime.utcnow().strftime("%A, %B %d %Y, %H:%M UTC")
|
|
system_prompt = f"Current time: {time_str}\n\n{system_prompt}"
|
|
|
|
# Compute tool filter from CrewMember.enabled_tools if set
|
|
disabled_tools = None
|
|
if crew and crew.enabled_tools:
|
|
try:
|
|
enabled = json.loads(crew.enabled_tools)
|
|
if isinstance(enabled, list) and enabled:
|
|
from src.tool_index import BUILTIN_TOOL_DESCRIPTIONS
|
|
all_tools = set(BUILTIN_TOOL_DESCRIPTIONS.keys())
|
|
disabled_tools = all_tools - set(enabled)
|
|
except Exception:
|
|
pass
|
|
|
|
# RAG-select relevant tools for this prompt + always-available assistant tools.
|
|
# Without this, all 40+ tools get sent and models hit their tool limit.
|
|
relevant_tools = None
|
|
try:
|
|
from src.tool_index import get_tool_index, ASSISTANT_ALWAYS_AVAILABLE
|
|
tool_idx = get_tool_index()
|
|
if tool_idx:
|
|
rag_tools = tool_idx.get_tools_for_query(task.prompt or "", k=8)
|
|
relevant_tools = (rag_tools | ASSISTANT_ALWAYS_AVAILABLE)
|
|
if disabled_tools:
|
|
relevant_tools -= disabled_tools
|
|
logger.info(f"[assistant] RAG selected {len(rag_tools)} tools + {len(ASSISTANT_ALWAYS_AVAILABLE)} always-available = {len(relevant_tools)} total for '{task.name}'")
|
|
except Exception as e:
|
|
logger.warning(f"[assistant] RAG tool selection failed, using all: {e}")
|
|
|
|
# Try using the agent loop for full tool access
|
|
try:
|
|
result = await self._run_agent_loop(
|
|
endpoint_url, model, task, session_id,
|
|
system_prompt=system_prompt, disabled_tools=disabled_tools,
|
|
relevant_tools=relevant_tools,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Agent loop failed for task '{task.name}', falling back to simple call: {e}")
|
|
from src.llm_core import llm_call_async
|
|
messages = [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": task.prompt},
|
|
]
|
|
result = await llm_call_async(url=endpoint_url, model=model, messages=messages, timeout=120)
|
|
|
|
# Strip the model's chain-of-thought before saving/delivering. Task
|
|
# output is LLM-only, so prose=True (which also removes untagged
|
|
# "The user wants me to…" reasoning) is safe here — without this the
|
|
# thinking leaked into the saved result.
|
|
try:
|
|
from src.text_helpers import strip_think
|
|
result = strip_think(result or "", prose=True, prompt_echo=True).strip() or result
|
|
except Exception:
|
|
pass
|
|
|
|
return result
|
|
|
|
async def _deliver_task_result(self, task, result: str, db, model: str = None):
|
|
"""Deliver a completed task result according to output_target.
|
|
|
|
This is intentionally shared by LLM/research/action tasks so built-in
|
|
actions cannot drift into hidden delivery paths that disagree with the
|
|
task's visible output target.
|
|
"""
|
|
from core.database import Session as DbSession, ChatMessage, CrewMember
|
|
|
|
output = task.output_target or "session"
|
|
if output.startswith("mcp__"):
|
|
await self._deliver_via_mcp(output, task, result)
|
|
return
|
|
|
|
if self._is_email_output_target(output):
|
|
await self._deliver_via_email(output, task, result)
|
|
return
|
|
|
|
if output != "session":
|
|
return
|
|
|
|
endpoint_url = task.endpoint_url
|
|
model_name = model or task.model
|
|
crew = None
|
|
if getattr(task, "crew_member_id", None):
|
|
try:
|
|
crew = db.query(CrewMember).filter(CrewMember.id == task.crew_member_id).first()
|
|
except Exception:
|
|
crew = None
|
|
if (not endpoint_url or not model_name) and crew:
|
|
endpoint_url = endpoint_url or crew.endpoint_url
|
|
model_name = model_name or crew.model
|
|
if not endpoint_url or not model_name:
|
|
try:
|
|
resolved_url, resolved_model = self._resolve_defaults(db, task.owner)
|
|
endpoint_url = endpoint_url or resolved_url
|
|
model_name = model_name or resolved_model
|
|
except Exception:
|
|
pass
|
|
|
|
session_id = task.session_id
|
|
if not session_id:
|
|
session_id = str(uuid.uuid4())
|
|
sess = DbSession(
|
|
id=session_id,
|
|
name=f"[Task] {task.name}",
|
|
endpoint_url=endpoint_url or "",
|
|
model=model_name or "",
|
|
owner=task.owner,
|
|
created_at=datetime.utcnow(),
|
|
updated_at=datetime.utcnow(),
|
|
)
|
|
db.add(sess)
|
|
task.session_id = session_id
|
|
db.commit()
|
|
if self._session_manager:
|
|
try:
|
|
self._session_manager.sessions[session_id] = self._session_manager._db_to_session(sess)
|
|
except Exception:
|
|
pass
|
|
|
|
meta = {}
|
|
if model_name:
|
|
meta["model"] = model_name
|
|
if crew and crew.is_default_assistant:
|
|
meta.update({"source": "cron", "task_id": task.id, "task_name": task.name})
|
|
msg_meta = json.dumps(meta)
|
|
user_content = task.prompt or f"[Task] {task.name}"
|
|
user_msg = ChatMessage(
|
|
id=str(uuid.uuid4()),
|
|
session_id=session_id,
|
|
role="user",
|
|
content=user_content,
|
|
timestamp=datetime.utcnow(),
|
|
meta_data=msg_meta,
|
|
)
|
|
assistant_msg = ChatMessage(
|
|
id=str(uuid.uuid4()),
|
|
session_id=session_id,
|
|
role="assistant",
|
|
content=result or "",
|
|
timestamp=datetime.utcnow(),
|
|
meta_data=msg_meta,
|
|
)
|
|
db.add(user_msg)
|
|
db.add(assistant_msg)
|
|
db.commit()
|
|
|
|
if self._session_manager:
|
|
try:
|
|
from core.models import ChatMessage as MemMsg
|
|
sess_obj = self._session_manager.get_session(session_id)
|
|
sess_obj.history.append(MemMsg(role="user", content=user_msg.content, metadata=meta))
|
|
sess_obj.history.append(MemMsg(role="assistant", content=assistant_msg.content, metadata=meta))
|
|
except Exception:
|
|
pass
|
|
|
|
@staticmethod
|
|
def _is_email_output_target(output: str) -> bool:
|
|
target = (output or "").strip()
|
|
if target in {"email", "email:self"}:
|
|
return True
|
|
if target.startswith("email:"):
|
|
return True
|
|
return bool(re.fullmatch(r"[^@\s]+@[^@\s]+\.[^@\s]+", target))
|
|
|
|
async def _deliver_via_email(self, output: str, task, result: str):
|
|
"""Send task output through the app's configured SMTP account.
|
|
|
|
Supported output_target values:
|
|
- email / email:self: send to the account's From address
|
|
- email:name@example.com or raw name@example.com: send there
|
|
"""
|
|
from email.message import EmailMessage
|
|
|
|
target = (output or "").strip()
|
|
explicit = ""
|
|
if target.startswith("email:"):
|
|
explicit = target.split(":", 1)[1].strip()
|
|
elif "@" in target:
|
|
explicit = target
|
|
|
|
try:
|
|
from routes.email_routes import _resolve_send_config
|
|
from routes.email_helpers import _send_smtp_message
|
|
|
|
cfg = _resolve_send_config(owner=task.owner or "")
|
|
to_addr = explicit or cfg.get("from_address") or cfg.get("smtp_user") or ""
|
|
if not to_addr:
|
|
raise RuntimeError("No email recipient resolved for task output")
|
|
|
|
from_addr = cfg.get("from_address") or cfg.get("smtp_user") or to_addr
|
|
msg = EmailMessage()
|
|
msg["From"] = from_addr
|
|
msg["To"] = to_addr
|
|
msg["Subject"] = f"[Task] {task.name}"
|
|
msg["X-Odysseus-Origin"] = "odysseus-ui"
|
|
msg["X-Odysseus-Kind"] = "task"
|
|
msg["X-Odysseus-Ref"] = str(task.id)
|
|
msg.set_content(result or "")
|
|
_send_smtp_message(cfg, from_addr, [to_addr], msg.as_string(), timeout=30)
|
|
logger.info("Task %s emailed result to %s (%sb)", task.id, to_addr, len(result or ""))
|
|
except Exception as e:
|
|
logger.error("Task %s email delivery failed: %s", task.id, e, exc_info=True)
|
|
raise
|
|
|
|
async def _run_agent_loop(self, endpoint_url: str, model: str, task, session_id: str,
|
|
system_prompt: str | None = None,
|
|
disabled_tools: set | None = None,
|
|
relevant_tools: set | None = None,
|
|
override_user_message: str | None = None) -> str:
|
|
"""Run the full agent loop with tool access, collecting the final text."""
|
|
from src.agent_loop import stream_agent_loop
|
|
|
|
system_content = system_prompt or "You are a helpful assistant executing a scheduled task. Use available tools to complete the task thoroughly."
|
|
user_content = override_user_message or task.prompt
|
|
messages = [
|
|
{"role": "system", "content": system_content},
|
|
{"role": "user", "content": user_content},
|
|
]
|
|
|
|
# Resolve headers from the endpoint's API key
|
|
headers = {}
|
|
try:
|
|
from core.database import SessionLocal, ModelEndpoint
|
|
from src.endpoint_resolver import normalize_base, build_headers
|
|
db2 = SessionLocal()
|
|
try:
|
|
eps = db2.query(ModelEndpoint).filter(ModelEndpoint.is_enabled == True).all()
|
|
for ep in eps:
|
|
if normalize_base(ep.base_url) in endpoint_url or endpoint_url in normalize_base(ep.base_url):
|
|
headers = build_headers(ep.api_key, normalize_base(ep.base_url))
|
|
break
|
|
finally:
|
|
db2.close()
|
|
except Exception:
|
|
pass
|
|
full_text = ""
|
|
tool_results = []
|
|
|
|
# Honor per-task max_steps (defense against runaway agent loops).
|
|
# Falls back to 20 if not set — the historical default.
|
|
_task_max_rounds = task.max_steps if task.max_steps and task.max_steps > 0 else 20
|
|
# Tasks are background workloads — they share the Utility model's
|
|
# fallback chain (Settings → Utility Model → Fallbacks). A downed
|
|
# primary endpoint won't silently yield `(no output)` — same recipe
|
|
# chat uses but with the utility list (`utility_model_fallbacks`).
|
|
try:
|
|
from src.endpoint_resolver import resolve_utility_fallback_candidates
|
|
_task_fallbacks = resolve_utility_fallback_candidates()
|
|
except Exception:
|
|
_task_fallbacks = []
|
|
async for event_str in stream_agent_loop(
|
|
endpoint_url=endpoint_url,
|
|
model=model,
|
|
messages=messages,
|
|
max_rounds=_task_max_rounds,
|
|
session_id=session_id,
|
|
owner=task.owner,
|
|
headers=headers,
|
|
disabled_tools=disabled_tools,
|
|
relevant_tools=relevant_tools,
|
|
fallbacks=_task_fallbacks,
|
|
):
|
|
if event_str.startswith("data: ") and not event_str.startswith("data: [DONE]"):
|
|
try:
|
|
data = json.loads(event_str[6:])
|
|
# Capture text from all event types, not just delta
|
|
if "delta" in data:
|
|
full_text += data["delta"]
|
|
elif data.get("type") == "tool_output":
|
|
# Tool results — capture summary so we have SOMETHING even
|
|
# if the model never produces a final text response
|
|
tool_summary = data.get("stdout") or data.get("output") or data.get("result") or ""
|
|
if isinstance(tool_summary, str) and tool_summary.strip():
|
|
tool_results.append(f"[{data.get('tool', '?')}] {tool_summary[:500]}")
|
|
except (json.JSONDecodeError, KeyError):
|
|
pass
|
|
|
|
# Grace summarization — if the model exhausted rounds on tool calls
|
|
# without producing a final text response, do one last LLM call
|
|
# asking it to summarize what it did. Guarantees output.
|
|
if not full_text.strip():
|
|
try:
|
|
from src.llm_core import llm_call_async_with_fallback
|
|
from src.endpoint_resolver import resolve_utility_fallback_candidates
|
|
grace_context = "You ran out of steps. "
|
|
if tool_results:
|
|
grace_context += "Here's what your tools returned:\n" + "\n".join(tool_results[-5:])
|
|
else:
|
|
grace_context += "No tool results were captured."
|
|
grace_context += "\n\nSummarize what you accomplished and what's still pending. Be concise."
|
|
_grace_candidates = [(endpoint_url, model, headers)] + resolve_utility_fallback_candidates()
|
|
full_text = await llm_call_async_with_fallback(
|
|
_grace_candidates,
|
|
messages=[
|
|
{"role": "system", "content": system_content},
|
|
{"role": "user", "content": grace_context},
|
|
],
|
|
timeout=30,
|
|
)
|
|
full_text = (full_text or "").strip()
|
|
except Exception as e:
|
|
logger.warning(f"Grace summarization failed: {e}")
|
|
if tool_results:
|
|
full_text = "\n".join(tool_results[-5:])
|
|
|
|
return full_text or "(no output)"
|
|
|
|
async def _execute_research_task(self, task, db) -> str:
|
|
"""Execute a deep research task using DeepResearcher."""
|
|
from core.database import Session as DbSession, ChatMessage
|
|
from src.deep_research import DeepResearcher
|
|
from src.research_handler import RESEARCH_DATA_DIR, ResearchHandler
|
|
from src.research_utils import strip_thinking
|
|
from src.settings import get_setting
|
|
|
|
# Resolve endpoint/model: research settings > task settings > session defaults
|
|
endpoint_url = task.endpoint_url
|
|
model = task.model
|
|
|
|
if not endpoint_url or not model:
|
|
try:
|
|
from src.endpoint_resolver import resolve_endpoint
|
|
ep_url, ep_model, ep_headers = resolve_endpoint(
|
|
"research",
|
|
endpoint_url or None,
|
|
model or None,
|
|
None,
|
|
)
|
|
endpoint_url = ep_url or endpoint_url
|
|
model = ep_model or model
|
|
except Exception:
|
|
pass
|
|
|
|
if not endpoint_url or not model:
|
|
endpoint_url, model = self._resolve_defaults(db, task.owner)
|
|
if not endpoint_url or not model:
|
|
raise RuntimeError("No model/endpoint configured for research")
|
|
# Record the resolved model for the run record (see _execute_task_locked).
|
|
self._last_run_model = model
|
|
|
|
# Resolve headers
|
|
headers = {}
|
|
try:
|
|
from core.database import ModelEndpoint
|
|
from src.endpoint_resolver import normalize_base, build_headers
|
|
db2 = db
|
|
eps = db2.query(ModelEndpoint).filter(ModelEndpoint.is_enabled == True).all()
|
|
for ep in eps:
|
|
if normalize_base(ep.base_url) in endpoint_url or endpoint_url in normalize_base(ep.base_url):
|
|
headers = build_headers(ep.api_key, normalize_base(ep.base_url))
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
max_tokens = int(get_setting("research_max_tokens", 8192))
|
|
extraction_timeout = int(get_setting("research_extraction_timeout_seconds", 90) or 90)
|
|
extraction_concurrency = int(get_setting("research_extraction_concurrency", 3) or 3)
|
|
|
|
researcher = DeepResearcher(
|
|
llm_endpoint=endpoint_url,
|
|
llm_model=model,
|
|
llm_headers=headers,
|
|
max_rounds=8,
|
|
max_time=600, # 10 min for scheduled research
|
|
max_report_tokens=max_tokens,
|
|
extraction_timeout=extraction_timeout,
|
|
extraction_concurrency=extraction_concurrency,
|
|
)
|
|
|
|
started_ts = time.time()
|
|
report = await researcher.research(task.prompt)
|
|
completed_ts = time.time()
|
|
try:
|
|
stats = researcher.get_stats() or {}
|
|
except Exception:
|
|
stats = {}
|
|
|
|
# Ensure a session exists for output
|
|
session_id = task.session_id
|
|
if not session_id:
|
|
session_id = str(uuid.uuid4())
|
|
sess = DbSession(
|
|
id=session_id,
|
|
name=f"[Research] {task.name}",
|
|
endpoint_url=endpoint_url,
|
|
model=model,
|
|
owner=task.owner,
|
|
created_at=datetime.utcnow(),
|
|
updated_at=datetime.utcnow(),
|
|
)
|
|
db.add(sess)
|
|
task.session_id = session_id
|
|
db.commit()
|
|
if self._session_manager:
|
|
try:
|
|
self._session_manager.sessions[session_id] = self._session_manager._db_to_session(sess)
|
|
except Exception:
|
|
pass
|
|
|
|
# Persist scheduled research in the same on-disk shape used by the
|
|
# Research panel. Without this, task research had Markdown output but
|
|
# no Library entry and no visual report route to open.
|
|
try:
|
|
RESEARCH_DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
findings = getattr(researcher, "findings", []) or []
|
|
payload = {
|
|
"query": task.prompt or task.name or "Scheduled research",
|
|
"status": "done",
|
|
"result": report,
|
|
"raw_report": strip_thinking(report or ""),
|
|
"sources": ResearchHandler._extract_sources(findings),
|
|
"raw_findings": ResearchHandler._extract_raw_findings(findings),
|
|
"stats": stats,
|
|
"category": "scheduled",
|
|
"started_at": started_ts,
|
|
"completed_at": completed_ts,
|
|
"owner": task.owner or "",
|
|
"task_id": task.id,
|
|
"task_name": task.name,
|
|
}
|
|
(RESEARCH_DATA_DIR / f"{session_id}.json").write_text(json.dumps(payload), encoding="utf-8")
|
|
try:
|
|
from src.event_bus import fire_event
|
|
fire_event("research_completed", task.owner or None)
|
|
except Exception:
|
|
logger.debug("research_completed event dispatch failed", exc_info=True)
|
|
except Exception as e:
|
|
logger.warning("Failed to persist task research report %s: %s", session_id, e)
|
|
|
|
return report
|
|
|
|
async def _run_chained(self, task_id: str):
|
|
"""Run a chained task. Acquires _executing membership the same way
|
|
run_task_now does so an overlapping scheduler tick can't double-dispatch
|
|
the same task while the chain run is in flight."""
|
|
async with self._executing_lock:
|
|
if task_id in self._executing:
|
|
return # already in flight (manual trigger, scheduler tick, or another chain)
|
|
self._executing.add(task_id)
|
|
await self._execute_task(task_id)
|
|
|
|
def _has_chain_cycle(self, db, start_id: str, max_depth: int = 10) -> bool:
|
|
"""Detect cycles in task chains."""
|
|
from core.database import ScheduledTask
|
|
visited = set()
|
|
current = start_id
|
|
for _ in range(max_depth):
|
|
if current in visited:
|
|
return True
|
|
visited.add(current)
|
|
task = db.query(ScheduledTask).filter(ScheduledTask.id == current).first()
|
|
if not task or not task.then_task_id:
|
|
return False
|
|
current = task.then_task_id
|
|
return True # too deep, treat as cycle
|
|
|
|
def _resolve_defaults(self, db, owner):
|
|
"""Find the first available endpoint + model from an existing session."""
|
|
from core.database import Session as DbSession
|
|
try:
|
|
recent = db.query(DbSession).filter(
|
|
DbSession.endpoint_url.isnot(None),
|
|
DbSession.model.isnot(None),
|
|
*([DbSession.owner == owner] if owner else []),
|
|
).order_by(DbSession.created_at.desc()).first()
|
|
if recent:
|
|
return recent.endpoint_url, recent.model
|
|
except Exception:
|
|
pass
|
|
return None, None
|
|
|
|
async def _deliver_via_mcp(self, tool_name: str, task, result: str):
|
|
"""Send the task result via an MCP tool (e.g. Gmail send).
|
|
|
|
Resolves a recipient (so email-style tools have a 'to') by trying the
|
|
configured From address first (the `daily_brief` pattern — email
|
|
yourself) then falling back to the task owner. Common recipient field
|
|
names (to / recipient / email / address) are all populated so we don't
|
|
have to special-case each tool's schema; the MCP tool ignores keys it
|
|
doesn't recognise.
|
|
"""
|
|
from src.agent_tools import get_mcp_manager
|
|
mcp = get_mcp_manager()
|
|
if not mcp:
|
|
logger.warning(f"Task {task.id}: MCP manager not available for delivery")
|
|
return
|
|
|
|
# Resolve recipient — prefer the configured email From (the established
|
|
# "email yourself" pattern from daily_brief), fall back to task.owner.
|
|
# `_get_email_config()` is the single source of truth that handles both
|
|
# the legacy `email_from` setting and the per-account DB rows.
|
|
recipient = None
|
|
try:
|
|
from routes.email_helpers import _get_email_config
|
|
cfg = _get_email_config() or {}
|
|
recipient = cfg.get("from_address") or None
|
|
except Exception as _e:
|
|
logger.debug(f"_deliver_via_mcp: email config lookup failed: {_e}")
|
|
if not recipient and task.owner and "@" in str(task.owner):
|
|
recipient = task.owner
|
|
|
|
args = {
|
|
"subject": f"[Task] {task.name}",
|
|
"body": result,
|
|
"headers": {
|
|
"X-Odysseus-Origin": "odysseus-ui",
|
|
"X-Odysseus-Kind": "task",
|
|
"X-Odysseus-Ref": str(task.id),
|
|
},
|
|
}
|
|
if recipient:
|
|
# Cover the common field names so we work across MCP servers (Gmail,
|
|
# generic SMTP, Slack DMs, etc.) without having to hard-code each.
|
|
args["to"] = recipient
|
|
args["recipient"] = recipient
|
|
args["email"] = recipient
|
|
args["address"] = recipient
|
|
else:
|
|
logger.warning(
|
|
f"Task {task.id}: no recipient resolved for MCP delivery via {tool_name} — "
|
|
"set an email From address in Settings or give the task an owner email."
|
|
)
|
|
try:
|
|
mcp_result = await mcp.call_tool(tool_name, args)
|
|
stderr = mcp_result.get("stderr", "")
|
|
stdout = mcp_result.get("stdout", "")
|
|
body_len = len(result or "")
|
|
exit_code = mcp_result.get("exit_code", 0)
|
|
if exit_code != 0:
|
|
logger.warning(
|
|
f"Task {task.id} MCP delivery FAILED via {tool_name}: "
|
|
f"exit={exit_code} stderr={stderr[:400]!r} stdout={stdout[:400]!r}"
|
|
)
|
|
else:
|
|
# Include the MCP tool's own stdout (e.g. email_server returns
|
|
# "Sent email to ... with subject ...") + the body size so a
|
|
# silent SMTP failure is easier to spot in the logs.
|
|
logger.info(
|
|
f"Task {task.id} delivered via MCP tool {tool_name} "
|
|
f"(to={recipient or '<unset>'}, body={body_len}b, reply={stdout[:200]!r})"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Task {task.id} MCP delivery failed: {e}")
|
|
|
|
async def run_task_now(self, task_id: str, *, force: bool = False):
|
|
"""Manually trigger a task execution."""
|
|
if force:
|
|
asyncio.create_task(self._execute_task(task_id, bypass_model_slot=True, release_executing=False))
|
|
return True
|
|
async with self._executing_lock:
|
|
if task_id in self._executing:
|
|
return False
|
|
self._executing.add(task_id)
|
|
asyncio.create_task(self._execute_task(task_id))
|
|
return True
|
|
|
|
async def stop_task(self, task_id: str) -> bool:
|
|
"""Request cancellation of a running/queued task and mark its run aborted."""
|
|
handle = self._task_handles.get(task_id)
|
|
stopped = False
|
|
if handle and not handle.done():
|
|
handle.cancel()
|
|
stopped = True
|
|
async with self._executing_lock:
|
|
if task_id in self._executing:
|
|
self._executing.discard(task_id)
|
|
stopped = True
|
|
|
|
stopped = self._mark_run_aborted(task_id) or stopped
|
|
return stopped
|
|
|
|
async def ensure_defaults(self, owner: str):
|
|
"""Create default housekeeping tasks for this owner (idempotent per action)."""
|
|
from core.database import SessionLocal, ScheduledTask
|
|
try:
|
|
from routes.prefs_routes import _load_for_user
|
|
_prefs = _load_for_user(owner) or {}
|
|
except Exception:
|
|
_prefs = {}
|
|
tasks_enabled = bool(_prefs.get("tasks_enabled"))
|
|
tasks_opened = bool(_prefs.get("tasks_opened"))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
# Normalize old built-ins that were created before `task_type` /
|
|
# `action` were reliable. Match by current or legacy name so stale
|
|
# rows cannot keep running as scheduled LLM tasks forever.
|
|
name_to_action = {}
|
|
for action, defs in HOUSEKEEPING_DEFAULTS.items():
|
|
name_to_action[defs["name"]] = action
|
|
for legacy in defs.get("legacy_names") or []:
|
|
name_to_action[legacy] = action
|
|
possible_names = list(name_to_action.keys())
|
|
legacy_named = db.query(ScheduledTask).filter(
|
|
ScheduledTask.owner == owner,
|
|
ScheduledTask.name.in_(possible_names),
|
|
).all()
|
|
for task in legacy_named:
|
|
action = name_to_action.get(task.name)
|
|
if not action:
|
|
continue
|
|
task.task_type = "action"
|
|
task.action = action
|
|
|
|
retired_count = db.query(ScheduledTask).filter(
|
|
ScheduledTask.owner == owner,
|
|
ScheduledTask.task_type == "action",
|
|
ScheduledTask.action.in_(list(RETIRED_HOUSEKEEPING_ACTIONS)),
|
|
).delete(synchronize_session=False)
|
|
existing_actions = {
|
|
row[0] for row in db.query(ScheduledTask.action).filter(
|
|
ScheduledTask.owner == owner,
|
|
ScheduledTask.task_type == "action",
|
|
).all() if row[0]
|
|
}
|
|
renamed = []
|
|
builtin_tasks = db.query(ScheduledTask).filter(
|
|
ScheduledTask.owner == owner,
|
|
ScheduledTask.task_type == "action",
|
|
ScheduledTask.action.in_(list(HOUSEKEEPING_DEFAULTS.keys())),
|
|
).all()
|
|
by_action = {}
|
|
for task in builtin_tasks:
|
|
by_action.setdefault(task.action, []).append(task)
|
|
removed_dupes = []
|
|
kept_ids = set()
|
|
for action, tasks in by_action.items():
|
|
defs = HOUSEKEEPING_DEFAULTS.get(action)
|
|
if not defs:
|
|
continue
|
|
desired_trigger = defs.get("trigger_type", "schedule")
|
|
|
|
def _score(candidate):
|
|
matches_default = (
|
|
(candidate.trigger_type or "schedule") == desired_trigger
|
|
and (candidate.trigger_event or None) == defs.get("trigger_event")
|
|
and (candidate.trigger_count or 1) == (defs.get("trigger_count") or 1)
|
|
and (candidate.schedule or None) == defs.get("schedule")
|
|
and (candidate.scheduled_time or None) == defs.get("scheduled_time")
|
|
and (candidate.cron_expression or None) == defs.get("cron_expression")
|
|
)
|
|
created = candidate.created_at or datetime.min
|
|
created_key = (created.toordinal(), created.hour, created.minute, created.second, created.microsecond)
|
|
return (1 if matches_default else 0, 1 if candidate.status == "active" else 0, created_key)
|
|
|
|
keep = sorted(tasks, key=_score, reverse=True)[0]
|
|
kept_ids.add(keep.id)
|
|
for dupe in tasks:
|
|
if dupe.id == keep.id:
|
|
continue
|
|
db.delete(dupe)
|
|
removed_dupes.append(action)
|
|
|
|
for task in [t for t in builtin_tasks if t.id in kept_ids]:
|
|
defs = HOUSEKEEPING_DEFAULTS.get(task.action)
|
|
if not defs:
|
|
continue
|
|
legacy_names = set(defs.get("legacy_names") or [])
|
|
if (task.name or "") in legacy_names:
|
|
task.name = defs["name"]
|
|
renamed.append(task.action)
|
|
normalized = False
|
|
desired_trigger = defs.get("trigger_type", "schedule")
|
|
if task.action == "check_email_urgency":
|
|
old_crons = set(defs.get("old_cron_expressions") or [])
|
|
if task.schedule == "cron" and (task.cron_expression or "") in old_crons:
|
|
task.cron_expression = defs["cron_expression"]
|
|
task.next_run = compute_next_run(
|
|
defs["schedule"], defs["scheduled_time"], None, None,
|
|
after=datetime.utcnow(), cron_expression=defs["cron_expression"],
|
|
tz_name=_resolve_task_timezone(db, task),
|
|
)
|
|
normalized = True
|
|
if desired_trigger == "event" and (
|
|
(task.trigger_type or "schedule") != "event"
|
|
or task.trigger_event != defs.get("trigger_event")
|
|
or (task.trigger_count or 1) != (defs.get("trigger_count") or 1)
|
|
or task.schedule is not None
|
|
or task.scheduled_time is not None
|
|
or task.scheduled_date is not None
|
|
or task.cron_expression is not None
|
|
):
|
|
task.trigger_type = "event"
|
|
task.trigger_event = defs.get("trigger_event")
|
|
task.trigger_count = defs.get("trigger_count") or 1
|
|
task.trigger_counter = 0
|
|
task.schedule = defs.get("schedule")
|
|
task.scheduled_time = defs.get("scheduled_time")
|
|
task.scheduled_day = None
|
|
task.scheduled_date = None
|
|
task.cron_expression = defs.get("cron_expression")
|
|
normalized = True
|
|
if normalized:
|
|
renamed.append(task.action)
|
|
ships_paused = bool(defs.get("ship_paused"))
|
|
if not tasks_enabled and not tasks_opened:
|
|
if ships_paused and task.status == "active":
|
|
task.status = "paused"
|
|
elif not ships_paused and task.status == "paused":
|
|
task.status = "active"
|
|
if (task.trigger_type or "schedule") == "schedule":
|
|
task.next_run = compute_next_run(
|
|
task.schedule, task.scheduled_time,
|
|
task.scheduled_day, task.scheduled_date,
|
|
after=datetime.utcnow(), cron_expression=task.cron_expression,
|
|
tz_name=_resolve_task_timezone(db, task),
|
|
)
|
|
# Built-in housekeeping/action jobs should not create browser
|
|
# task notifications; user AI/research tasks still can.
|
|
task.notifications_enabled = False
|
|
seeded = []
|
|
for action, defs in HOUSEKEEPING_DEFAULTS.items():
|
|
if action in existing_actions:
|
|
continue
|
|
trigger_type = defs.get("trigger_type", "schedule")
|
|
next_run = None
|
|
if trigger_type == "schedule":
|
|
next_run = compute_next_run(
|
|
defs["schedule"], defs["scheduled_time"], None, None,
|
|
after=datetime.utcnow(), cron_expression=defs["cron_expression"],
|
|
)
|
|
ships_paused = bool(defs.get("ship_paused"))
|
|
task = ScheduledTask(
|
|
id=str(uuid.uuid4())[:8],
|
|
owner=owner,
|
|
name=defs["name"],
|
|
task_type="action",
|
|
action=action,
|
|
trigger_type=trigger_type,
|
|
trigger_event=defs.get("trigger_event"),
|
|
trigger_count=defs.get("trigger_count"),
|
|
trigger_counter=0,
|
|
schedule=defs["schedule"],
|
|
scheduled_time=defs["scheduled_time"],
|
|
cron_expression=defs["cron_expression"],
|
|
next_run=next_run,
|
|
# Most built-ins are active by default. The invasive
|
|
# AI/email/calendar tasks opt into a paused starting state
|
|
# via ship_paused so users can enable them deliberately.
|
|
status="paused" if ships_paused else "active",
|
|
output_target="session",
|
|
notifications_enabled=False,
|
|
)
|
|
db.add(task)
|
|
seeded.append(action)
|
|
if seeded or renamed or removed_dupes or retired_count:
|
|
db.commit()
|
|
logger.info(
|
|
"Housekeeping defaults for %s: seeded=%s renamed=%s deduped=%s retired=%s",
|
|
owner, seeded, sorted(set(renamed)), sorted(set(removed_dupes)), retired_count,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to create default tasks: {e}")
|
|
finally:
|
|
db.close()
|
|
# Always ensure the personal assistant exists (independent of other tasks).
|
|
try:
|
|
await self.ensure_assistant_defaults(owner)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to seed assistant for {owner}: {e}")
|
|
|
|
async def ensure_assistant_defaults(self, owner: str):
|
|
"""Create the personal-assistant CrewMember, its pinned session, and three
|
|
daily check-in ScheduledTasks for this owner — idempotent on is_default_assistant."""
|
|
# Hard-reject synthetic owners. Without this, AuthMiddleware-stamped
|
|
# values like 'internal-tool' (loopback agent-tool callbacks) or 'api'
|
|
# (bearer-token integrations) would get a real assistant + 3 daily
|
|
# check-ins seeded, which then double-fire alongside the human user's
|
|
# check-ins. This was the root cause of the duplicate 'Morning check-in'
|
|
# rows we had to manually clean up.
|
|
if not owner or owner in {"internal-tool", "api", "demo", "system"}:
|
|
logger.info(f"ensure_assistant_defaults: skip synthetic owner {owner!r}")
|
|
return
|
|
from core.database import SessionLocal, CrewMember, ScheduledTask
|
|
from core.database import Session as DbSession
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
existing = db.query(CrewMember).filter(
|
|
CrewMember.owner == owner,
|
|
CrewMember.is_default_assistant == True, # noqa: E712
|
|
).first()
|
|
if existing:
|
|
return # already seeded
|
|
|
|
# Resolve a default model/endpoint from any existing session so the
|
|
# assistant has something to call. The user can change this later.
|
|
endpoint_url, model = self._resolve_defaults(db, owner)
|
|
|
|
default_personality = (
|
|
"You are the user's personal assistant. Concise, warm, a little dry. "
|
|
"Never waste time with fluff. Default to English. Only match the other language when replying to a non-English email.\n\n"
|
|
|
|
"CORE RULE: You MUST use your tools to take action — do not describe what you would do. "
|
|
"Never say 'I would check your calendar' — actually call manage_calendar. "
|
|
"Never say 'I can look that up' — actually call web_search or search_chats. "
|
|
"If you have a tool for it, use it. No hypotheticals, no promises, only actions and results.\n\n"
|
|
|
|
"DECISION FRAMEWORK — follow these rules, not just tool descriptions:\n\n"
|
|
|
|
"CONTEXT GATHERING (before any response involving a specific person):\n"
|
|
"1. resolve_contact if you only have a name and need their email\n"
|
|
"2. search_chats for recent conversations mentioning them or their topic\n"
|
|
"3. manage_memory to check stored facts about them\n"
|
|
"Skip steps you already have answers for. Don't search for the user themselves.\n\n"
|
|
|
|
"EMAIL HANDLING:\n"
|
|
"- If a document is open in the editor, that IS the email. Use update_document to write the reply.\n"
|
|
"- BEFORE drafting any reply: gather context (steps above) about the sender and topic.\n"
|
|
"- When an email mentions a date/meeting: check calendar for conflicts, add if clear.\n"
|
|
"- When an email asks a question you can't answer from context: say so honestly. Never fabricate.\n"
|
|
"- Skip automated/marketing emails in check-ins. Only surface human-sent, actionable ones.\n"
|
|
"- Never duplicate information the user already saw in a previous check-in.\n\n"
|
|
|
|
"ESCALATION LADDER (when you need info you don't have):\n"
|
|
"1. search_chats (fast, free)\n"
|
|
"2. manage_memory (fast, free)\n"
|
|
"3. web_search (medium cost)\n"
|
|
"4. trigger_research (expensive, async — only for complex multi-source questions)\n"
|
|
"Stop as soon as you have a sufficient answer.\n\n"
|
|
|
|
"'SEND TO [NAME]' FLOW:\n"
|
|
"1. resolve_contact to find their email\n"
|
|
"2. If a document is open, use its content as the body\n"
|
|
"3. Draft the email in a document (create_document with language='email')\n"
|
|
"4. Tell the user to review — NEVER auto-send\n\n"
|
|
|
|
"SELF-IMPROVEMENT — use manage_memory constantly:\n"
|
|
"- When the user corrects you, IMMEDIATELY store the correction as a memory.\n"
|
|
"- After every check-in or task, store new facts you learned (contacts, preferences, patterns).\n"
|
|
"- Before responding about a person or topic, search_chats and manage_memory FIRST.\n"
|
|
"- Build knowledge over time: who people are, what projects are active, how the user likes things done.\n"
|
|
"- If something failed or you got corrected, store WHY so you never repeat it.\n"
|
|
"- When you figure out a multi-step workflow that works, save it as a SKILL using manage_skills.\n"
|
|
" A skill is a reusable procedure. Next time, recall the skill instead of figuring it out again.\n"
|
|
"- Before starting a complex task, check manage_skills for an existing procedure.\n\n"
|
|
|
|
"AUTONOMY RULES:\n"
|
|
"- Auto-add calendar events from clear meeting invitations (mention what you added)\n"
|
|
"- Auto-draft email replies (cached for when user clicks Reply)\n"
|
|
"- NEVER send emails without explicit user instruction\n"
|
|
"- NEVER delete anything without explicit instruction\n"
|
|
"- If uncertain, ask rather than guess"
|
|
)
|
|
|
|
# Create the singleton session first (CrewMember.session_id links to it).
|
|
session_id = str(uuid.uuid4())
|
|
sess = DbSession(
|
|
id=session_id,
|
|
name="Assistant",
|
|
endpoint_url=endpoint_url or "",
|
|
model=model or "",
|
|
owner=owner,
|
|
is_important=True,
|
|
mode="agent",
|
|
folder="Assistant",
|
|
created_at=datetime.utcnow(),
|
|
updated_at=datetime.utcnow(),
|
|
)
|
|
db.add(sess)
|
|
db.flush()
|
|
|
|
# Create the assistant CrewMember.
|
|
crew_id = str(uuid.uuid4())
|
|
assistant = CrewMember(
|
|
id=crew_id,
|
|
owner=owner,
|
|
name="Assistant",
|
|
avatar=None,
|
|
user_name=None,
|
|
personality=default_personality,
|
|
model=model,
|
|
endpoint_url=endpoint_url,
|
|
greeting=None,
|
|
enabled_tools=json.dumps([
|
|
"manage_calendar", "manage_notes", "manage_tasks", "manage_memory",
|
|
"list_email_accounts", "list_emails", "read_email", "send_email", "reply_to_email", "archive_email",
|
|
"mark_email_read", "delete_email", "resolve_contact",
|
|
"search_chats", "web_search", "web_fetch", "read_file",
|
|
"create_document", "update_document", "edit_document",
|
|
"generate_image", "trigger_research",
|
|
"download_model", "serve_model", "list_served_models", "stop_served_model",
|
|
"edit_image",
|
|
]),
|
|
session_id=session_id,
|
|
is_active=True,
|
|
sort_order=0,
|
|
is_default_assistant=True,
|
|
timezone=None, # user picks in settings; None = legacy UTC behavior
|
|
)
|
|
db.add(assistant)
|
|
|
|
# Link the session back to the crew member so UI can resolve either way.
|
|
sess.crew_member_id = crew_id
|
|
|
|
# No auto-seeded check-in tasks. The old behaviour created three
|
|
# daily ScheduledTasks (Morning/Midday/Evening) under every new
|
|
# owner, which was intrusive and ran under whatever account was
|
|
# marked is_default globally. Users now create their own
|
|
# recurring tasks from the Tasks UI.
|
|
|
|
db.commit()
|
|
logger.info(f"Seeded personal assistant (crew {crew_id}) for owner={owner}")
|
|
except Exception as e:
|
|
logger.exception(f"ensure_assistant_defaults({owner}) failed: {e}")
|
|
try:
|
|
db.rollback()
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
db.close()
|