Polish email tasks and window controls
This commit is contained in:
@@ -469,7 +469,12 @@ async def action_draft_email_replies(owner: str, **kwargs) -> Tuple[str, bool]:
|
||||
"""Run one pass of AI reply drafting."""
|
||||
try:
|
||||
from routes.email_pollers import _run_auto_summarize_once
|
||||
result = await _run_auto_summarize_once(do_summary=False, do_reply=True)
|
||||
result = await _run_auto_summarize_once(
|
||||
do_summary=False,
|
||||
do_reply=True,
|
||||
days_back=7,
|
||||
progress_cb=kwargs.get("progress_cb"),
|
||||
)
|
||||
if not _result_has_work(result):
|
||||
raise TaskNoop(f"draft replies: {result or 'no new emails'}")
|
||||
return result, True
|
||||
|
||||
@@ -222,6 +222,24 @@ class TaskScheduler:
|
||||
# This is a hard guarantee, not configurable.
|
||||
self._run_semaphore = asyncio.Semaphore(1)
|
||||
self._concurrency_cap = 1
|
||||
self._task_handles = {}
|
||||
|
||||
def _set_run_progress(self, run_id: str, message: str):
|
||||
"""Persist short live progress text for Activity while a run is active."""
|
||||
if not run_id:
|
||||
return
|
||||
try:
|
||||
from core.database import SessionLocal, TaskRun
|
||||
db = SessionLocal()
|
||||
try:
|
||||
run = db.query(TaskRun).filter(TaskRun.id == run_id).first()
|
||||
if run and run.status in ("queued", "running"):
|
||||
run.result = (message or "")[:4000]
|
||||
db.commit()
|
||||
finally:
|
||||
db.close()
|
||||
except Exception:
|
||||
logger.debug("Task progress update failed", exc_info=True)
|
||||
|
||||
def add_notification(self, task_name: str, status: str, task_id: str = None, owner: str = None, body: str = None):
|
||||
"""Store a notification about a completed task run. Tagged with the
|
||||
@@ -516,6 +534,9 @@ class TaskScheduler:
|
||||
# line behind another. Once we acquire the slot, flip to "running"
|
||||
# and hand off to _execute_task_locked.
|
||||
from core.database import SessionLocal, TaskRun
|
||||
current = asyncio.current_task()
|
||||
if current:
|
||||
self._task_handles[task_id] = current
|
||||
run_id = str(uuid.uuid4())
|
||||
_q_db = SessionLocal()
|
||||
try:
|
||||
@@ -524,6 +545,7 @@ class TaskScheduler:
|
||||
task_id=task_id,
|
||||
started_at=datetime.utcnow(),
|
||||
status="queued",
|
||||
result="Queued — waiting for a free slot…",
|
||||
)
|
||||
_q_db.add(run)
|
||||
_q_db.commit()
|
||||
@@ -563,6 +585,7 @@ class TaskScheduler:
|
||||
if run:
|
||||
run.status = "running"
|
||||
run.started_at = datetime.utcnow()
|
||||
run.result = "Starting…"
|
||||
db.commit()
|
||||
else:
|
||||
# Defensive: row may have been wiped; recreate so the rest of
|
||||
@@ -572,6 +595,7 @@ class TaskScheduler:
|
||||
task_id=task.id,
|
||||
started_at=datetime.utcnow(),
|
||||
status="running",
|
||||
result="Starting…",
|
||||
)
|
||||
db.add(run)
|
||||
db.commit()
|
||||
@@ -586,7 +610,7 @@ class TaskScheduler:
|
||||
self._last_run_model = None
|
||||
try:
|
||||
if task_type == "action":
|
||||
result, success = await self._execute_action(task)
|
||||
result, success = await self._execute_action(task, run_id=run_id)
|
||||
run.status = "success" if success else "error"
|
||||
run.result = result
|
||||
if not success:
|
||||
@@ -622,6 +646,27 @@ class TaskScheduler:
|
||||
task.next_run = when
|
||||
db.commit()
|
||||
return
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Task '%s' stopped by user", task.name)
|
||||
run_obj = db.query(TaskRun).filter(TaskRun.id == run_id).first()
|
||||
if run_obj:
|
||||
run_obj.status = "aborted"
|
||||
run_obj.error = "Stopped by user"
|
||||
run_obj.result = run_obj.result or "Stopped by user"
|
||||
run_obj.finished_at = datetime.utcnow()
|
||||
task.last_run = datetime.utcnow()
|
||||
if (task.trigger_type or "schedule") == "schedule":
|
||||
task.next_run = compute_next_run(
|
||||
task.schedule, task.scheduled_time,
|
||||
task.scheduled_day, task.scheduled_date,
|
||||
after=datetime.utcnow(),
|
||||
cron_expression=task.cron_expression,
|
||||
tz_name=_resolve_task_timezone(db, task),
|
||||
)
|
||||
else:
|
||||
task.next_run = None
|
||||
db.commit()
|
||||
return
|
||||
except TaskNoop as noop:
|
||||
# Action reported "nothing to do". Mark the run as `skipped`
|
||||
# with the reason in `result` so it surfaces in Activity as a
|
||||
@@ -783,6 +828,9 @@ class TaskScheduler:
|
||||
logger.exception("Task %s error-path failed unexpectedly", task_id)
|
||||
finally:
|
||||
db.close()
|
||||
handle = self._task_handles.get(task_id)
|
||||
if handle is asyncio.current_task():
|
||||
self._task_handles.pop(task_id, None)
|
||||
if release_executing:
|
||||
async with self._executing_lock:
|
||||
self._executing.discard(task_id)
|
||||
@@ -853,7 +901,7 @@ class TaskScheduler:
|
||||
category=(task.name or "Task"),
|
||||
)
|
||||
|
||||
async def _execute_action(self, task) -> tuple:
|
||||
async def _execute_action(self, task, run_id: str | None = None) -> tuple:
|
||||
"""Execute a built-in action (no LLM needed)."""
|
||||
from src.builtin_actions import BUILTIN_ACTIONS
|
||||
|
||||
@@ -864,7 +912,10 @@ class TaskScheduler:
|
||||
from src.builtin_actions import TaskNoop
|
||||
try:
|
||||
# Pass task prompt as script/command for ssh_command/run_script actions.
|
||||
kwargs = {"owner": task.owner, "task_name": task.name}
|
||||
def _progress(message: str):
|
||||
self._set_run_progress(run_id, message)
|
||||
|
||||
kwargs = {"owner": task.owner, "task_name": task.name, "progress_cb": _progress}
|
||||
if task.action in ("run_script", "run_local", "ssh_command") and task.prompt:
|
||||
kwargs["script" if task.action in ("run_script", "run_local") else "command"] = task.prompt
|
||||
result, success = await action_fn(**kwargs)
|
||||
@@ -1752,6 +1803,38 @@ class TaskScheduler:
|
||||
asyncio.create_task(self._execute_task(task_id))
|
||||
return True
|
||||
|
||||
async def stop_task(self, task_id: str) -> bool:
|
||||
"""Request cancellation of a running/queued task and mark its run aborted."""
|
||||
handle = self._task_handles.get(task_id)
|
||||
stopped = False
|
||||
if handle and not handle.done():
|
||||
handle.cancel()
|
||||
stopped = True
|
||||
async with self._executing_lock:
|
||||
if task_id in self._executing:
|
||||
self._executing.discard(task_id)
|
||||
stopped = True
|
||||
|
||||
from core.database import SessionLocal, TaskRun
|
||||
db = SessionLocal()
|
||||
try:
|
||||
run = (
|
||||
db.query(TaskRun)
|
||||
.filter(TaskRun.task_id == task_id, TaskRun.status.in_(("queued", "running")))
|
||||
.order_by(TaskRun.started_at.desc())
|
||||
.first()
|
||||
)
|
||||
if run:
|
||||
run.status = "aborted"
|
||||
run.error = "Stopped by user"
|
||||
run.result = run.result or "Stopped by user"
|
||||
run.finished_at = datetime.utcnow()
|
||||
db.commit()
|
||||
stopped = True
|
||||
finally:
|
||||
db.close()
|
||||
return stopped
|
||||
|
||||
async def ensure_defaults(self, owner: str):
|
||||
"""Create default housekeeping tasks for this owner (idempotent per action)."""
|
||||
from core.database import SessionLocal, ScheduledTask
|
||||
|
||||
Reference in New Issue
Block a user