405 lines
16 KiB
Python
Executable File
405 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""odysseus-mail — Unix-style command-line wrapper around the email
|
|
backend that powers the web UI.
|
|
|
|
Calls the same helpers `routes/email_helpers.py` exports, so a request
|
|
issued from the shell hits IMAP/SMTP through the same connection pool
|
|
and the same parsing pipeline as the HTTP routes. State is shared via
|
|
`data/app.db` and `data/.app_key` (passwords decrypt automatically).
|
|
|
|
Output is JSON on stdout, errors on stderr, non-zero exit on failure.
|
|
Designed to compose:
|
|
|
|
odysseus-mail list --folder INBOX --limit 5 \\
|
|
| jq -r '.[] | .uid + "\\t" + .from_name + "\\t" + .subject'
|
|
|
|
odysseus-mail send --to alice@example.com --subject hi <<<"hello"
|
|
|
|
odysseus-mail folders --account work | jq
|
|
|
|
Subcommands:
|
|
list List recent messages in a folder
|
|
read Fetch one message by UID
|
|
folders Enumerate IMAP folders on the account
|
|
accounts List configured email accounts
|
|
send Send a message via SMTP (body from stdin)
|
|
|
|
Run with a subcommand + --help for argument details.
|
|
"""
|
|
|
|
|
|
from __future__ import annotations
|
|
import sys
|
|
import os, sys
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "_lib"))
|
|
from cli import quiet_logs, emit, fail, common_parser, run, REPO_ROOT as _REPO_ROOT
|
|
quiet_logs()
|
|
|
|
|
|
import argparse
|
|
import email as email_mod
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# Anchor the path so the script works when invoked from anywhere.
|
|
# The web-app logger is verbose at import time (cot_prompts, embeddings,
|
|
# etc.). For a CLI tool we only care about warnings+; users can re-raise
|
|
# with `LOG_LEVEL=DEBUG odysseus-mail ...`. We force the root level AFTER
|
|
# imports because some submodules call basicConfig themselves during
|
|
# initialization, which we have to override.
|
|
def quiet_logs() -> None:
|
|
level_name = os.environ.get("LOG_LEVEL", "WARNING").upper()
|
|
level = getattr(logging, level_name, logging.WARNING)
|
|
root = logging.getLogger()
|
|
root.setLevel(level)
|
|
for handler in root.handlers:
|
|
handler.setLevel(level)
|
|
|
|
|
|
# Suppress now so import-time `.info()` calls in helpers stay quiet.
|
|
quiet_logs()
|
|
|
|
# Defer the heavy imports until after path setup so a friendlier error
|
|
# message can be printed if the venv isn't activated.
|
|
try:
|
|
from routes.email_helpers import (
|
|
_imap, _get_email_config, _decode_header,
|
|
_extract_text, _extract_html,
|
|
_list_attachments_from_msg,
|
|
)
|
|
from routes.email_pollers import (
|
|
_scheduled_poll_once, _run_auto_summarize_once,
|
|
)
|
|
from core.database import SessionLocal, EmailAccount
|
|
# Re-apply: some submodules call basicConfig during their own
|
|
# import, which raises root level back to INFO. Quench again.
|
|
quiet_logs()
|
|
except ModuleNotFoundError as e:
|
|
sys.stderr.write(
|
|
f"error: {e}\n"
|
|
f"hint: run from the repo root with the venv active:\n"
|
|
f" source venv/bin/activate && {sys.argv[0]} --help\n"
|
|
)
|
|
sys.exit(2)
|
|
|
|
|
|
def emit(obj, args) -> None:
|
|
"""Emit a JSON value on stdout. Pretty-print if requested or if
|
|
stdout is a TTY."""
|
|
pretty = getattr(args, "pretty", False) or sys.stdout.isatty()
|
|
indent = 2 if pretty else None
|
|
json.dump(obj, sys.stdout, indent=indent, default=str, ensure_ascii=False)
|
|
sys.stdout.write("\n")
|
|
|
|
|
|
def fail(msg: str, code: int = 1) -> "None":
|
|
sys.stderr.write(f"error: {msg}\n")
|
|
sys.exit(code)
|
|
|
|
|
|
def _q(name: str) -> str:
|
|
"""Local copy of the IMAP mailbox quoter (matches email_helpers._q)."""
|
|
return '"' + (name or "").replace("\\", "\\\\").replace('"', '\\"') + '"'
|
|
|
|
|
|
def _split_recipients(value: str) -> list[str]:
|
|
return [r.strip() for r in (value or "").split(",") if r.strip()]
|
|
|
|
|
|
def _recipient_list(to: str, cc: str = "", bcc: str = "") -> list[str]:
|
|
recipients = _split_recipients(to)
|
|
recipients.extend(_split_recipients(cc))
|
|
recipients.extend(_split_recipients(bcc))
|
|
if not recipients:
|
|
fail("at least one recipient is required")
|
|
return recipients
|
|
|
|
|
|
# ─── list ────────────────────────────────────────────────────────────
|
|
|
|
def cmd_list(args) -> None:
|
|
"""List recent messages in a folder. Output: array of {uid, date,
|
|
from_addr, from_name, subject, is_read, is_answered}."""
|
|
with _imap(args.account) as conn:
|
|
st, _ = conn.select(_q(args.folder), readonly=True)
|
|
if st != "OK":
|
|
fail(f"select {args.folder!r} failed: {st}")
|
|
st, data = conn.search(None, "ALL")
|
|
if st != "OK" or not data[0]:
|
|
emit([], args)
|
|
return
|
|
all_uids = data[0].split()
|
|
# Newest first, limit
|
|
uids = list(reversed(all_uids))[: args.limit]
|
|
out = []
|
|
for uid in uids:
|
|
try:
|
|
st, msg_data = conn.fetch(uid, "(FLAGS RFC822.HEADER)")
|
|
if st != "OK":
|
|
continue
|
|
raw_header = None
|
|
flags = ""
|
|
for part in msg_data:
|
|
if isinstance(part, tuple):
|
|
meta = part[0].decode() if isinstance(part[0], bytes) else str(part[0])
|
|
if isinstance(part[0], bytes) and b"RFC822.HEADER" in part[0]:
|
|
raw_header = part[1]
|
|
elif "RFC822.HEADER" in meta:
|
|
raw_header = part[1]
|
|
m = re.search(r"FLAGS \(([^)]*)\)", meta)
|
|
if m:
|
|
flags = m.group(1)
|
|
if not raw_header:
|
|
continue
|
|
msg = email_mod.message_from_bytes(raw_header)
|
|
subject = _decode_header(msg.get("Subject", "(no subject)"))
|
|
sender = _decode_header(msg.get("From", "unknown"))
|
|
from email.utils import parseaddr, parsedate_to_datetime
|
|
sender_name, sender_addr = parseaddr(sender)
|
|
date_raw = msg.get("Date", "")
|
|
try:
|
|
iso = parsedate_to_datetime(date_raw).isoformat() if date_raw else ""
|
|
except Exception:
|
|
iso = ""
|
|
out.append({
|
|
"uid": uid.decode(),
|
|
"date": iso,
|
|
"from_addr": sender_addr,
|
|
"from_name": sender_name or sender_addr,
|
|
"subject": subject,
|
|
"is_read": "\\Seen" in flags,
|
|
"is_answered": "\\Answered" in flags,
|
|
})
|
|
except Exception as e:
|
|
sys.stderr.write(f"warn: skipping uid {uid!r}: {e}\n")
|
|
emit(out, args)
|
|
|
|
|
|
# ─── read ────────────────────────────────────────────────────────────
|
|
|
|
def cmd_read(args) -> None:
|
|
"""Fetch one message. Output: {uid, headers, body_text, body_html?,
|
|
attachments}."""
|
|
with _imap(args.account) as conn:
|
|
st, _ = conn.select(_q(args.folder), readonly=True)
|
|
if st != "OK":
|
|
fail(f"select {args.folder!r} failed: {st}")
|
|
st, msg_data = conn.fetch(args.uid.encode(), "(BODY.PEEK[])")
|
|
if st != "OK" or not msg_data or not msg_data[0]:
|
|
fail(f"fetch UID {args.uid} failed: {st}")
|
|
raw = msg_data[0][1]
|
|
msg = email_mod.message_from_bytes(raw)
|
|
headers = {
|
|
"from": _decode_header(msg.get("From", "")),
|
|
"to": _decode_header(msg.get("To", "")),
|
|
"cc": _decode_header(msg.get("Cc", "")),
|
|
"subject": _decode_header(msg.get("Subject", "")),
|
|
"date": msg.get("Date", ""),
|
|
"message_id": msg.get("Message-ID", ""),
|
|
}
|
|
out = {
|
|
"uid": args.uid,
|
|
"headers": headers,
|
|
"body_text": _extract_text(msg) or "",
|
|
"attachments": _list_attachments_from_msg(msg),
|
|
}
|
|
if args.html:
|
|
out["body_html"] = _extract_html(msg) or ""
|
|
emit(out, args)
|
|
|
|
|
|
# ─── folders ─────────────────────────────────────────────────────────
|
|
|
|
def cmd_folders(args) -> None:
|
|
"""List IMAP folders on the account. Output: array of folder names."""
|
|
with _imap(args.account) as conn:
|
|
st, folders = conn.list()
|
|
if st != "OK":
|
|
fail("LIST failed")
|
|
names = []
|
|
for f in folders or []:
|
|
decoded = f.decode() if isinstance(f, bytes) else str(f)
|
|
m = re.search(r'"([^"]*)"\s*$|(\S+)\s*$', decoded)
|
|
if m:
|
|
names.append(m.group(1) or m.group(2))
|
|
emit(names, args)
|
|
|
|
|
|
# ─── accounts ────────────────────────────────────────────────────────
|
|
|
|
def cmd_accounts(args) -> None:
|
|
"""List configured email accounts (passwords masked). Output: array
|
|
of {id, name, is_default, enabled, imap_host, smtp_host, from_address}."""
|
|
db = SessionLocal()
|
|
try:
|
|
rows = db.query(EmailAccount).order_by(
|
|
EmailAccount.is_default.desc(),
|
|
EmailAccount.created_at.asc(),
|
|
).all()
|
|
out = [{
|
|
"id": r.id,
|
|
"name": r.name,
|
|
"is_default": bool(r.is_default),
|
|
"enabled": bool(r.enabled),
|
|
"imap_host": r.imap_host or "",
|
|
"imap_user": r.imap_user or "",
|
|
"smtp_host": r.smtp_host or "",
|
|
"smtp_user": r.smtp_user or "",
|
|
"from_address": r.from_address or "",
|
|
"has_password": bool(r.imap_password) or bool(r.smtp_password),
|
|
} for r in rows]
|
|
emit(out, args)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ─── poll-scheduled ──────────────────────────────────────────────────
|
|
|
|
def cmd_poll_scheduled(args) -> None:
|
|
"""One pass of the scheduled-email queue. Cron-friendly: idempotent,
|
|
exits 0 on success even if zero rows were due. Output: {sent: [ids],
|
|
failed: [{id, error}]}."""
|
|
result = _scheduled_poll_once()
|
|
emit(result, args)
|
|
if result.get("failed"):
|
|
sys.exit(1)
|
|
|
|
|
|
# ─── poll-summary ────────────────────────────────────────────────────
|
|
|
|
def cmd_poll_summary(args) -> None:
|
|
"""One pass of the auto-summarize / auto-reply pipeline over recent
|
|
mail. Cron-friendly: a single shot you can wire to a systemd timer
|
|
instead of running the FastAPI process all the time."""
|
|
import asyncio
|
|
msg = asyncio.run(_run_auto_summarize_once(
|
|
do_summary=args.summary,
|
|
do_reply=args.reply,
|
|
do_tag=args.tag,
|
|
do_spam=args.spam,
|
|
days_back=args.days,
|
|
))
|
|
emit({"message": msg or "(no output)"}, args)
|
|
|
|
|
|
# ─── send ────────────────────────────────────────────────────────────
|
|
|
|
def cmd_send(args) -> None:
|
|
"""Send a message via SMTP. Body is read from stdin."""
|
|
import smtplib
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
|
|
body = sys.stdin.read()
|
|
if not body and not args.allow_empty:
|
|
fail("body is empty (pipe content into stdin, or pass --allow-empty)")
|
|
|
|
cfg = _get_email_config(args.account)
|
|
if not (cfg.get("smtp_host") and cfg.get("smtp_user") and cfg.get("smtp_password")):
|
|
fail(
|
|
f"SMTP not configured for account {cfg.get('account_name', '<default>')!r}; "
|
|
f"check `odysseus-mail accounts` and the web settings"
|
|
)
|
|
|
|
outer = MIMEMultipart("alternative")
|
|
outer["From"] = cfg["from_address"]
|
|
outer["To"] = args.to
|
|
if args.cc:
|
|
outer["Cc"] = args.cc
|
|
outer["Subject"] = args.subject
|
|
outer["Date"] = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S +0000")
|
|
outer.attach(MIMEText(body, "plain", "utf-8"))
|
|
|
|
recipients = _recipient_list(args.to, args.cc, args.bcc)
|
|
|
|
if args.dry_run:
|
|
emit({
|
|
"dry_run": True,
|
|
"from": cfg["from_address"],
|
|
"recipients": recipients,
|
|
"subject": args.subject,
|
|
"bytes": len(outer.as_bytes()),
|
|
}, args)
|
|
return
|
|
|
|
with smtplib.SMTP_SSL(cfg["smtp_host"], int(cfg["smtp_port"] or 465)) as smtp:
|
|
smtp.login(cfg["smtp_user"], cfg["smtp_password"])
|
|
smtp.sendmail(cfg["from_address"], recipients, outer.as_string())
|
|
emit({"ok": True, "from": cfg["from_address"], "recipients": recipients}, args)
|
|
|
|
|
|
# ─── argparse wiring ─────────────────────────────────────────────────
|
|
|
|
def _build_parser() -> argparse.ArgumentParser:
|
|
# Common flags shared by every subcommand. Using `parents=[common]`
|
|
# lets `--account` / `--pretty` appear either before OR after the
|
|
# subcommand name, which matches how most Unix CLIs work.
|
|
common = argparse.ArgumentParser(add_help=False)
|
|
common.add_argument("--account", help="Account ID (default: configured default)")
|
|
common.add_argument("--pretty", action="store_true", help="Pretty-print JSON output")
|
|
|
|
p = argparse.ArgumentParser(
|
|
prog="odysseus-mail",
|
|
description="Shell-friendly wrapper around the Odysseus email backend.",
|
|
parents=[common],
|
|
)
|
|
sub = p.add_subparsers(dest="cmd", required=True)
|
|
|
|
pl = sub.add_parser("list", help="list recent messages in a folder", parents=[common])
|
|
pl.add_argument("--folder", default="INBOX")
|
|
pl.add_argument("--limit", type=int, default=20)
|
|
pl.set_defaults(func=cmd_list)
|
|
|
|
pr = sub.add_parser("read", help="fetch one message by UID", parents=[common])
|
|
pr.add_argument("uid")
|
|
pr.add_argument("--folder", default="INBOX")
|
|
pr.add_argument("--html", action="store_true", help="include HTML body in output")
|
|
pr.set_defaults(func=cmd_read)
|
|
|
|
pf = sub.add_parser("folders", help="list IMAP folders", parents=[common])
|
|
pf.set_defaults(func=cmd_folders)
|
|
|
|
pa = sub.add_parser("accounts", help="list configured email accounts", parents=[common])
|
|
pa.set_defaults(func=cmd_accounts)
|
|
|
|
ps = sub.add_parser("send", help="send a message (body from stdin)", parents=[common])
|
|
ps.add_argument("--to", required=True)
|
|
ps.add_argument("--subject", required=True)
|
|
ps.add_argument("--cc", default="")
|
|
ps.add_argument("--bcc", default="")
|
|
ps.add_argument("--dry-run", action="store_true", help="don't actually send; print envelope")
|
|
ps.add_argument("--allow-empty", action="store_true", help="permit empty body")
|
|
ps.set_defaults(func=cmd_send)
|
|
|
|
pps = sub.add_parser(
|
|
"poll-scheduled",
|
|
help="one pass of the scheduled-email queue (cron/systemd-friendly)",
|
|
parents=[common],
|
|
)
|
|
pps.set_defaults(func=cmd_poll_scheduled)
|
|
|
|
psm = sub.add_parser(
|
|
"poll-summary",
|
|
help="one pass of the AI auto-summarize / auto-reply pipeline",
|
|
parents=[common],
|
|
)
|
|
psm.add_argument("--summary", action="store_true", default=True, help="run summarize step (default on)")
|
|
psm.add_argument("--no-summary", dest="summary", action="store_false")
|
|
psm.add_argument("--reply", action="store_true", default=True, help="run auto-reply step (default on)")
|
|
psm.add_argument("--no-reply", dest="reply", action="store_false")
|
|
psm.add_argument("--tag", action="store_true", default=False, help="also run AI tagging step")
|
|
psm.add_argument("--spam", action="store_true", default=False, help="also run AI spam-classify step")
|
|
psm.add_argument("--days", type=int, default=1, help="how far back to scan (default 1)")
|
|
psm.set_defaults(func=cmd_poll_summary)
|
|
|
|
return p
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(run(_build_parser()))
|