Codex Agent integration: HTTP surface + plugin bundle + Settings UI

This persists work that had been living only in the cookbook docker
container's writable layer — never committed to the host source. Brought
back to git intact, app.py registration re-applied surgically on top of
current main (not the older container copy, which would have regressed
the Windows MIME fix, asynccontextmanager lifespan, and webhook auth
exempts).

routes/codex_routes.py (new):
- GET  /api/codex/capabilities  — what this Odysseus exposes.
- GET  /api/codex/plugin.zip    — downloads integrations/codex as a zip.
- GET  /api/codex/todos         — scope-gated todos:read|write.
- POST /api/codex/todos         — scope-gated todos:write.
- GET  /api/codex/emails        — scope-gated email:read|draft|send.
- GET  /api/codex/emails/{uid}  — single-message fetch.
- _scope_owner() enforces api_token scopes before touching user data.

routes/api_token_routes.py (+103 lines):
- Adds Codex-token-specific issuance + revocation paths.

integrations/codex/ (new bundle, shipped via /api/codex/plugin.zip):
- README.md                       — install instructions.
- .codex-plugin/plugin.json       — Codex plugin manifest.
- scripts/odysseus_api.py         — Python client used by the skill.
- skills/odysseus/SKILL.md        — Codex skill definition.

static/js/settings.js (+253 lines):
- New "Codex Agent" option in the Integrations dropdown.
- Add / edit panel with plugin-bundle download link + curl-with-token
  install instructions per agent.

app.py:
- 7-line surgical change: capture email_router = setup_email_routes()
  and register setup_codex_routes(email_router=email_router) after the
  email module so the Codex routes can borrow its helpers.
This commit is contained in:
pewdiepie-archdaemon
2026-06-03 22:38:05 +09:00
parent 1f6c5ac66b
commit 5939aec69f
8 changed files with 790 additions and 5 deletions

View File

@@ -12,6 +12,61 @@ from src.auth_helpers import get_current_user
MAX_NAME_LEN = 100
DEFAULT_SCOPES = "chat"
ALLOWED_SCOPES = {
"chat",
"todos:read",
"todos:write",
"documents:read",
"documents:write",
"email:read",
"email:draft",
"email:send",
"calendar:read",
"calendar:write",
"memory:read",
"memory:write",
}
TOKEN_PROFILES = {
"chat": ["chat"],
"codex_todos": ["todos:read", "todos:write"],
"codex_email_drafts": ["email:read", "email:draft", "documents:read", "documents:write"],
}
def _normalize_scopes(scopes: str | list[str] | None = None, profile: str | None = None) -> list[str]:
profile = profile if isinstance(profile, str) else None
profile_key = (profile or "").strip()
if profile_key:
if profile_key not in TOKEN_PROFILES:
raise HTTPException(400, "Unknown token profile")
requested = list(TOKEN_PROFILES[profile_key])
elif isinstance(scopes, list):
requested = [str(s).strip() for s in scopes if str(s).strip()]
elif isinstance(scopes, str) and scopes:
requested = [s.strip() for s in scopes.replace(" ", ",").split(",") if s.strip()]
else:
requested = [DEFAULT_SCOPES]
normalized = []
for scope in requested:
if scope not in ALLOWED_SCOPES:
raise HTTPException(400, f"Unknown token scope: {scope}")
if scope not in normalized:
normalized.append(scope)
def ensure_before(write_scope: str, read_scope: str):
if write_scope not in normalized or read_scope in normalized:
return
idx = normalized.index(write_scope)
normalized.insert(idx, read_scope)
ensure_before("todos:write", "todos:read")
ensure_before("documents:write", "documents:read")
ensure_before("calendar:write", "calendar:read")
ensure_before("memory:write", "memory:read")
ensure_before("email:draft", "email:read")
return normalized or [DEFAULT_SCOPES]
def setup_api_token_routes() -> APIRouter:
@@ -45,13 +100,28 @@ def setup_api_token_routes() -> APIRouter:
except Exception:
pass
@router.get("/tokens/profiles")
def token_profiles(request: Request):
require_admin(request)
return {
"profiles": TOKEN_PROFILES,
"allowed_scopes": sorted(ALLOWED_SCOPES),
}
@router.post("/tokens")
def create_token(request: Request, name: str = Form("")):
def create_token(
request: Request,
name: str = Form(""),
scopes: str = Form(None),
profile: str = Form(None),
):
require_admin(request)
name = name.strip()[:MAX_NAME_LEN]
if not name:
raise HTTPException(400, "Token name is required")
owner = get_current_user(request)
scope_list = _normalize_scopes(scopes, profile)
scopes_value = ",".join(scope_list)
raw_token = "ody_" + secrets.token_urlsafe(32)
token_hash = bcrypt.hashpw(raw_token.encode(), bcrypt.gensalt()).decode()
@@ -64,7 +134,7 @@ def setup_api_token_routes() -> APIRouter:
name=name,
token_hash=token_hash,
token_prefix=raw_token[:8],
scopes=DEFAULT_SCOPES,
scopes=scopes_value,
is_active=True,
))
_invalidate_cache(request)
@@ -75,9 +145,36 @@ def setup_api_token_routes() -> APIRouter:
"owner": owner,
"token": raw_token,
"token_prefix": raw_token[:8],
"scopes": DEFAULT_SCOPES.split(","),
"scopes": scope_list,
}
@router.patch("/tokens/{token_id}")
async def update_token(request: Request, token_id: str):
require_admin(request)
try:
payload = await request.json()
except Exception:
payload = {}
scope_list = _normalize_scopes(payload.get("scopes"))
scopes_value = ",".join(scope_list)
with get_db_session() as db:
token = db.query(ApiToken).filter(ApiToken.id == token_id).first()
if not token:
raise HTTPException(404, "Token not found")
if isinstance(payload.get("name"), str) and payload["name"].strip():
token.name = payload["name"].strip()[:MAX_NAME_LEN]
token.scopes = scopes_value
db.add(token)
response = {
"id": token_id,
"name": getattr(token, "name", ""),
"owner": getattr(token, "owner", None),
"token_prefix": getattr(token, "token_prefix", ""),
"scopes": scope_list,
}
_invalidate_cache(request)
return response
@router.delete("/tokens/{token_id}")
def delete_token(request: Request, token_id: str):
require_admin(request)

169
routes/codex_routes.py Normal file
View File

@@ -0,0 +1,169 @@
"""Codex integration routes.
These are small HTTP surfaces intended for the Codex plugin/MCP bridge. They
reuse existing Odysseus helpers and enforce API-token scopes before touching
user data.
"""
import json
import zipfile
from io import BytesIO
from pathlib import Path
from typing import Any
from fastapi import APIRouter, Body, HTTPException, Request
from fastapi.responses import StreamingResponse
from src.auth_helpers import require_user
from src.tool_implementations import do_manage_notes
TODO_READ_SCOPES = {"todos:read", "todos:write"}
TODO_WRITE_SCOPES = {"todos:write"}
EMAIL_READ_SCOPES = {"email:read", "email:draft", "email:send"}
WRITE_ACTIONS = {"add", "create", "new", "save", "remind", "update", "delete", "toggle_item", "remove", "remove_item"}
def _scope_owner(request: Request, allowed: set[str]) -> str:
"""Return the data owner if the caller is allowed for this Codex action."""
if getattr(request.state, "api_token", False):
scopes = set(getattr(request.state, "api_token_scopes", []) or [])
if not scopes.intersection(allowed):
required = " or ".join(sorted(allowed))
raise HTTPException(403, f"API token missing required scope: {required}")
owner = getattr(request.state, "api_token_owner", None)
if not owner:
raise HTTPException(403, "API token has no owner")
return owner
return require_user(request)
def _find_endpoint(router: APIRouter | None, method: str, path: str):
if router is None:
return None
for route in getattr(router, "routes", []):
if getattr(route, "path", "") == path and method in getattr(route, "methods", set()):
return route.endpoint
return None
def setup_codex_routes(email_router: APIRouter | None = None) -> APIRouter:
router = APIRouter(prefix="/api/codex", tags=["codex"])
email_list_endpoint = _find_endpoint(email_router, "GET", "/api/email/list")
email_read_endpoint = _find_endpoint(email_router, "GET", "/api/email/read/{uid}")
@router.get("/capabilities")
def capabilities(request: Request):
token_scopes = set(getattr(request.state, "api_token_scopes", []) or [])
has_token = bool(getattr(request.state, "api_token", False))
return {
"integration": "codex",
"token_scopes": sorted(token_scopes),
"tools": {
"todos": {
"read": bool(token_scopes.intersection(TODO_READ_SCOPES)) if has_token else True,
"write": bool(token_scopes.intersection(TODO_WRITE_SCOPES)) if has_token else True,
"actions": ["list", "add", "update", "delete", "toggle_item"],
},
"email": {
"read": bool(token_scopes.intersection(EMAIL_READ_SCOPES)) if has_token else True,
"draft": "email:draft" in token_scopes if has_token else True,
"send": "email:send" in token_scopes if has_token else True,
"actions": ["list", "read"],
}
},
"safety": {
"email_send_requires_confirmation": True,
"destructive_actions_should_confirm": True,
},
}
@router.get("/plugin.zip")
def plugin_zip(request: Request):
require_user(request)
root = Path(__file__).resolve().parent.parent / "integrations" / "codex"
if not root.exists():
raise HTTPException(404, "Codex plugin bundle not found")
buf = BytesIO()
with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for path in sorted(root.rglob("*")):
if path.is_dir() or "__pycache__" in path.parts or path.suffix == ".pyc":
continue
zf.write(path, Path("odysseus") / path.relative_to(root))
buf.seek(0)
headers = {"Content-Disposition": 'attachment; filename="odysseus-codex-plugin.zip"'}
return StreamingResponse(buf, media_type="application/zip", headers=headers)
@router.get("/todos")
async def list_todos(request: Request, archived: bool = False, label: str | None = None):
owner = _scope_owner(request, TODO_READ_SCOPES)
args: dict[str, Any] = {"action": "list", "archived": archived}
if label:
args["label"] = label
return await do_manage_notes(json.dumps(args), owner=owner)
@router.post("/todos")
async def manage_todos(request: Request, body: dict[str, Any] = Body(default_factory=dict)):
action = str(body.get("action") or "add").replace("-", "_").strip().lower()
allowed = TODO_WRITE_SCOPES if action in WRITE_ACTIONS else TODO_READ_SCOPES
owner = _scope_owner(request, allowed)
args = dict(body)
args["action"] = action
return await do_manage_notes(json.dumps(args), owner=owner)
@router.get("/emails")
async def list_emails(
request: Request,
folder: str = "INBOX",
limit: int = 10,
offset: int = 0,
filter: str = "all",
from_addr: str | None = None,
account_id: str | None = None,
has_attachments: int = 0,
):
owner = _scope_owner(request, EMAIL_READ_SCOPES)
if email_list_endpoint is None:
raise HTTPException(503, "Email integration is not available")
limit = max(1, min(int(limit or 10), 50))
offset = max(0, int(offset or 0))
if account_id:
from routes.email_helpers import _assert_owns_account
_assert_owns_account(account_id, owner)
return await email_list_endpoint(
folder=folder,
limit=limit,
offset=offset,
filter=filter,
from_addr=from_addr,
account_id=account_id,
has_attachments=has_attachments,
cache_bust=None,
owner=owner,
)
@router.get("/emails/{uid}")
async def read_email(
request: Request,
uid: str,
folder: str = "INBOX",
account_id: str | None = None,
mark_seen: bool = False,
):
owner = _scope_owner(request, EMAIL_READ_SCOPES)
if email_read_endpoint is None:
raise HTTPException(503, "Email integration is not available")
if account_id:
from routes.email_helpers import _assert_owns_account
_assert_owns_account(account_id, owner)
return await email_read_endpoint(
uid=uid,
folder=folder,
account_id=account_id,
mark_seen=mark_seen,
owner=owner,
)
return router