#!/usr/bin/env python3 """odysseus-mail — Unix-style command-line wrapper around the email backend that powers the web UI. Calls the same helpers `routes/email_helpers.py` exports, so a request issued from the shell hits IMAP/SMTP through the same connection pool and the same parsing pipeline as the HTTP routes. State is shared via `data/app.db` and `data/.app_key` (passwords decrypt automatically). Output is JSON on stdout, errors on stderr, non-zero exit on failure. Designed to compose: odysseus-mail list --folder INBOX --limit 5 \\ | jq -r '.[] | .uid + "\\t" + .from_name + "\\t" + .subject' odysseus-mail send --to alice@example.com --subject hi <<<"hello" odysseus-mail folders --account work | jq Subcommands: list List recent messages in a folder read Fetch one message by UID folders Enumerate IMAP folders on the account accounts List configured email accounts send Send a message via SMTP (body from stdin) Run with a subcommand + --help for argument details. """ from __future__ import annotations import sys import os, sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), "_lib")) from cli import quiet_logs, emit, fail, common_parser, run, REPO_ROOT as _REPO_ROOT quiet_logs() import argparse import email as email_mod import json import logging import os import re import sys from datetime import datetime from pathlib import Path # Anchor the path so the script works when invoked from anywhere. # The web-app logger is verbose at import time (cot_prompts, embeddings, # etc.). For a CLI tool we only care about warnings+; users can re-raise # with `LOG_LEVEL=DEBUG odysseus-mail ...`. We force the root level AFTER # imports because some submodules call basicConfig themselves during # initialization, which we have to override. def quiet_logs() -> None: level_name = os.environ.get("LOG_LEVEL", "WARNING").upper() level = getattr(logging, level_name, logging.WARNING) root = logging.getLogger() root.setLevel(level) for handler in root.handlers: handler.setLevel(level) # Suppress now so import-time `.info()` calls in helpers stay quiet. quiet_logs() # Defer the heavy imports until after path setup so a friendlier error # message can be printed if the venv isn't activated. try: from routes.email_helpers import ( _imap, _get_email_config, _decode_header, _extract_text, _extract_html, _list_attachments_from_msg, ) from routes.email_pollers import ( _scheduled_poll_once, _run_auto_summarize_once, ) from core.database import SessionLocal, EmailAccount # Re-apply: some submodules call basicConfig during their own # import, which raises root level back to INFO. Quench again. quiet_logs() except ModuleNotFoundError as e: sys.stderr.write( f"error: {e}\n" f"hint: run from the repo root with the venv active:\n" f" source venv/bin/activate && {sys.argv[0]} --help\n" ) sys.exit(2) def emit(obj, args) -> None: """Emit a JSON value on stdout. Pretty-print if requested or if stdout is a TTY.""" pretty = getattr(args, "pretty", False) or sys.stdout.isatty() indent = 2 if pretty else None json.dump(obj, sys.stdout, indent=indent, default=str, ensure_ascii=False) sys.stdout.write("\n") def fail(msg: str, code: int = 1) -> "None": sys.stderr.write(f"error: {msg}\n") sys.exit(code) def _q(name: str) -> str: """Local copy of the IMAP mailbox quoter (matches email_helpers._q).""" return '"' + (name or "").replace("\\", "\\\\").replace('"', '\\"') + '"' def _split_recipients(value: str) -> list[str]: return [r.strip() for r in (value or "").split(",") if r.strip()] def _recipient_list(to: str, cc: str = "", bcc: str = "") -> list[str]: recipients = _split_recipients(to) recipients.extend(_split_recipients(cc)) recipients.extend(_split_recipients(bcc)) if not recipients: fail("at least one recipient is required") return recipients # ─── list ──────────────────────────────────────────────────────────── def cmd_list(args) -> None: """List recent messages in a folder. Output: array of {uid, date, from_addr, from_name, subject, is_read, is_answered}.""" with _imap(args.account) as conn: st, _ = conn.select(_q(args.folder), readonly=True) if st != "OK": fail(f"select {args.folder!r} failed: {st}") st, data = conn.search(None, "ALL") if st != "OK" or not data[0]: emit([], args) return all_uids = data[0].split() # Newest first, limit uids = list(reversed(all_uids))[: args.limit] out = [] for uid in uids: try: st, msg_data = conn.fetch(uid, "(FLAGS RFC822.HEADER)") if st != "OK": continue raw_header = None flags = "" for part in msg_data: if isinstance(part, tuple): meta = part[0].decode() if isinstance(part[0], bytes) else str(part[0]) if isinstance(part[0], bytes) and b"RFC822.HEADER" in part[0]: raw_header = part[1] elif "RFC822.HEADER" in meta: raw_header = part[1] m = re.search(r"FLAGS \(([^)]*)\)", meta) if m: flags = m.group(1) if not raw_header: continue msg = email_mod.message_from_bytes(raw_header) subject = _decode_header(msg.get("Subject", "(no subject)")) sender = _decode_header(msg.get("From", "unknown")) from email.utils import parseaddr, parsedate_to_datetime sender_name, sender_addr = parseaddr(sender) date_raw = msg.get("Date", "") try: iso = parsedate_to_datetime(date_raw).isoformat() if date_raw else "" except Exception: iso = "" out.append({ "uid": uid.decode(), "date": iso, "from_addr": sender_addr, "from_name": sender_name or sender_addr, "subject": subject, "is_read": "\\Seen" in flags, "is_answered": "\\Answered" in flags, }) except Exception as e: sys.stderr.write(f"warn: skipping uid {uid!r}: {e}\n") emit(out, args) # ─── read ──────────────────────────────────────────────────────────── def cmd_read(args) -> None: """Fetch one message. Output: {uid, headers, body_text, body_html?, attachments}.""" with _imap(args.account) as conn: st, _ = conn.select(_q(args.folder), readonly=True) if st != "OK": fail(f"select {args.folder!r} failed: {st}") st, msg_data = conn.fetch(args.uid.encode(), "(BODY.PEEK[])") if st != "OK" or not msg_data or not msg_data[0]: fail(f"fetch UID {args.uid} failed: {st}") raw = msg_data[0][1] msg = email_mod.message_from_bytes(raw) headers = { "from": _decode_header(msg.get("From", "")), "to": _decode_header(msg.get("To", "")), "cc": _decode_header(msg.get("Cc", "")), "subject": _decode_header(msg.get("Subject", "")), "date": msg.get("Date", ""), "message_id": msg.get("Message-ID", ""), } out = { "uid": args.uid, "headers": headers, "body_text": _extract_text(msg) or "", "attachments": _list_attachments_from_msg(msg), } if args.html: out["body_html"] = _extract_html(msg) or "" emit(out, args) # ─── folders ───────────────────────────────────────────────────────── def cmd_folders(args) -> None: """List IMAP folders on the account. Output: array of folder names.""" with _imap(args.account) as conn: st, folders = conn.list() if st != "OK": fail("LIST failed") names = [] for f in folders or []: decoded = f.decode() if isinstance(f, bytes) else str(f) m = re.search(r'"([^"]*)"\s*$|(\S+)\s*$', decoded) if m: names.append(m.group(1) or m.group(2)) emit(names, args) # ─── accounts ──────────────────────────────────────────────────────── def cmd_accounts(args) -> None: """List configured email accounts (passwords masked). Output: array of {id, name, is_default, enabled, imap_host, smtp_host, from_address}.""" db = SessionLocal() try: rows = db.query(EmailAccount).order_by( EmailAccount.is_default.desc(), EmailAccount.created_at.asc(), ).all() out = [{ "id": r.id, "name": r.name, "is_default": bool(r.is_default), "enabled": bool(r.enabled), "imap_host": r.imap_host or "", "imap_user": r.imap_user or "", "smtp_host": r.smtp_host or "", "smtp_user": r.smtp_user or "", "from_address": r.from_address or "", "has_password": bool(r.imap_password) or bool(r.smtp_password), } for r in rows] emit(out, args) finally: db.close() # ─── poll-scheduled ────────────────────────────────────────────────── def cmd_poll_scheduled(args) -> None: """One pass of the scheduled-email queue. Cron-friendly: idempotent, exits 0 on success even if zero rows were due. Output: {sent: [ids], failed: [{id, error}]}.""" result = _scheduled_poll_once() emit(result, args) if result.get("failed"): sys.exit(1) # ─── poll-summary ──────────────────────────────────────────────────── def cmd_poll_summary(args) -> None: """One pass of the auto-summarize / auto-reply pipeline over recent mail. Cron-friendly: a single shot you can wire to a systemd timer instead of running the FastAPI process all the time.""" import asyncio msg = asyncio.run(_run_auto_summarize_once( do_summary=args.summary, do_reply=args.reply, do_tag=args.tag, do_spam=args.spam, days_back=args.days, )) emit({"message": msg or "(no output)"}, args) # ─── send ──────────────────────────────────────────────────────────── def cmd_send(args) -> None: """Send a message via SMTP. Body is read from stdin.""" import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart body = sys.stdin.read() if not body and not args.allow_empty: fail("body is empty (pipe content into stdin, or pass --allow-empty)") cfg = _get_email_config(args.account) if not (cfg.get("smtp_host") and cfg.get("smtp_user") and cfg.get("smtp_password")): fail( f"SMTP not configured for account {cfg.get('account_name', '')!r}; " f"check `odysseus-mail accounts` and the web settings" ) outer = MIMEMultipart("alternative") outer["From"] = cfg["from_address"] outer["To"] = args.to if args.cc: outer["Cc"] = args.cc outer["Subject"] = args.subject outer["Date"] = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S +0000") outer.attach(MIMEText(body, "plain", "utf-8")) recipients = _recipient_list(args.to, args.cc, args.bcc) if args.dry_run: emit({ "dry_run": True, "from": cfg["from_address"], "recipients": recipients, "subject": args.subject, "bytes": len(outer.as_bytes()), }, args) return with smtplib.SMTP_SSL(cfg["smtp_host"], int(cfg["smtp_port"] or 465)) as smtp: smtp.login(cfg["smtp_user"], cfg["smtp_password"]) smtp.sendmail(cfg["from_address"], recipients, outer.as_string()) emit({"ok": True, "from": cfg["from_address"], "recipients": recipients}, args) # ─── argparse wiring ───────────────────────────────────────────────── def _build_parser() -> argparse.ArgumentParser: # Common flags shared by every subcommand. Using `parents=[common]` # lets `--account` / `--pretty` appear either before OR after the # subcommand name, which matches how most Unix CLIs work. common = argparse.ArgumentParser(add_help=False) common.add_argument("--account", help="Account ID (default: configured default)") common.add_argument("--pretty", action="store_true", help="Pretty-print JSON output") p = argparse.ArgumentParser( prog="odysseus-mail", description="Shell-friendly wrapper around the Odysseus email backend.", parents=[common], ) sub = p.add_subparsers(dest="cmd", required=True) pl = sub.add_parser("list", help="list recent messages in a folder", parents=[common]) pl.add_argument("--folder", default="INBOX") pl.add_argument("--limit", type=int, default=20) pl.set_defaults(func=cmd_list) pr = sub.add_parser("read", help="fetch one message by UID", parents=[common]) pr.add_argument("uid") pr.add_argument("--folder", default="INBOX") pr.add_argument("--html", action="store_true", help="include HTML body in output") pr.set_defaults(func=cmd_read) pf = sub.add_parser("folders", help="list IMAP folders", parents=[common]) pf.set_defaults(func=cmd_folders) pa = sub.add_parser("accounts", help="list configured email accounts", parents=[common]) pa.set_defaults(func=cmd_accounts) ps = sub.add_parser("send", help="send a message (body from stdin)", parents=[common]) ps.add_argument("--to", required=True) ps.add_argument("--subject", required=True) ps.add_argument("--cc", default="") ps.add_argument("--bcc", default="") ps.add_argument("--dry-run", action="store_true", help="don't actually send; print envelope") ps.add_argument("--allow-empty", action="store_true", help="permit empty body") ps.set_defaults(func=cmd_send) pps = sub.add_parser( "poll-scheduled", help="one pass of the scheduled-email queue (cron/systemd-friendly)", parents=[common], ) pps.set_defaults(func=cmd_poll_scheduled) psm = sub.add_parser( "poll-summary", help="one pass of the AI auto-summarize / auto-reply pipeline", parents=[common], ) psm.add_argument("--summary", action="store_true", default=True, help="run summarize step (default on)") psm.add_argument("--no-summary", dest="summary", action="store_false") psm.add_argument("--reply", action="store_true", default=True, help="run auto-reply step (default on)") psm.add_argument("--no-reply", dest="reply", action="store_false") psm.add_argument("--tag", action="store_true", default=False, help="also run AI tagging step") psm.add_argument("--spam", action="store_true", default=False, help="also run AI spam-classify step") psm.add_argument("--days", type=int, default=1, help="how far back to scan (default 1)") psm.set_defaults(func=cmd_poll_summary) return p if __name__ == "__main__": sys.exit(run(_build_parser()))