POST /api/v1/chat (the n8n/Make/Activepieces sync-chat endpoint) verified session ownership with `_tok_user and _sess_owner and _sess_owner != _tok_user`. The `_sess_owner and` clause skipped the check entirely whenever the session's owner was null — so any chat-scoped API token (e.g. a token minted for a paired mobile device) could pass a legacy/migrated null-owner session id, inject a message into that session, and read back its conversation history plus reuse the owner's endpoint credentials. This is the same `if owner and owner != user` null-owner-bypass pattern that was already hardened in the gallery, calendar, and notes routes (see test_null_owner_gates.py) and in session_routes._verify_session_owner. Make this gate strict and fail closed too: require a resolvable caller and an exact owner match, mirroring _verify_session_owner. Extract the decision into _caller_owns_session() and pin it with regression tests.
356 lines
14 KiB
Python
356 lines
14 KiB
Python
"""Webhook, API Token, and sync chat routes."""
|
|
|
|
import asyncio
|
|
import uuid
|
|
import logging
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, HTTPException, Request, Form
|
|
from pydantic import BaseModel, Field
|
|
|
|
from core.database import SessionLocal, Webhook
|
|
from src.webhook_manager import WebhookManager, validate_webhook_url, validate_events
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/api", tags=["webhooks"])
|
|
|
|
# Input limits
|
|
MAX_NAME_LEN = 100
|
|
MAX_URL_LEN = 2048
|
|
MAX_SECRET_LEN = 256
|
|
MAX_MESSAGE_LEN = 32_000
|
|
|
|
|
|
from core.middleware import require_admin as _require_admin
|
|
|
|
|
|
def _caller_owns_session(sess_owner, caller) -> bool:
|
|
"""Strict session-ownership gate for the token-authenticated sync-chat
|
|
endpoint (`POST /api/v1/chat`).
|
|
|
|
Mirrors ``_verify_session_owner`` in session_routes.py and the null-owner
|
|
gates in notes/calendar/gallery: a caller may resume a session ONLY when
|
|
its owner matches them exactly. A null/empty session owner (legacy or
|
|
migrated rows) is deliberately NOT resumable by an arbitrary token — the
|
|
old ``sess_owner and sess_owner != caller`` form skipped the check whenever
|
|
``sess_owner`` was falsy, so any chat-scoped token (e.g. a paired mobile
|
|
device) could resume such a session, inject a message, and read back its
|
|
history and reuse the owner's endpoint credentials. Fail closed: an
|
|
unresolvable caller also returns False.
|
|
"""
|
|
if not caller:
|
|
return False
|
|
return sess_owner == caller
|
|
|
|
|
|
def setup_webhook_routes(
|
|
webhook_manager: WebhookManager,
|
|
auth_manager,
|
|
session_manager=None,
|
|
api_key_manager=None,
|
|
) -> APIRouter:
|
|
|
|
@router.get("/webhooks")
|
|
def list_webhooks(request: Request):
|
|
_require_admin(request)
|
|
db = SessionLocal()
|
|
try:
|
|
hooks = db.query(Webhook).all()
|
|
return [
|
|
{
|
|
"id": w.id,
|
|
"name": w.name,
|
|
"url": w.url,
|
|
"has_secret": bool(w.secret),
|
|
"events": w.events.split(",") if w.events else [],
|
|
"is_active": w.is_active,
|
|
"last_triggered_at": w.last_triggered_at.isoformat() if w.last_triggered_at else None,
|
|
"last_status_code": w.last_status_code,
|
|
"last_error": w.last_error,
|
|
"created_at": w.created_at.isoformat() if w.created_at else None,
|
|
}
|
|
for w in hooks
|
|
]
|
|
finally:
|
|
db.close()
|
|
|
|
@router.post("/webhooks")
|
|
def create_webhook(
|
|
request: Request,
|
|
name: str = Form(""),
|
|
url: str = Form(""),
|
|
secret: str = Form(""),
|
|
events: str = Form(""),
|
|
):
|
|
_require_admin(request)
|
|
name = name.strip()[:MAX_NAME_LEN]
|
|
if not name:
|
|
raise HTTPException(400, "Webhook name is required")
|
|
try:
|
|
url = validate_webhook_url(url)
|
|
except ValueError as e:
|
|
raise HTTPException(400, str(e))
|
|
try:
|
|
events = validate_events(events)
|
|
except ValueError as e:
|
|
raise HTTPException(400, str(e))
|
|
|
|
secret_val = secret.strip()[:MAX_SECRET_LEN] or None
|
|
# Encrypt the secret at rest using the same Fernet key as API keys
|
|
encrypted_secret = None
|
|
if secret_val and api_key_manager:
|
|
encrypted_secret = api_key_manager.encrypt_api_key(secret_val)
|
|
elif secret_val:
|
|
encrypted_secret = secret_val # Fallback if no encryption available
|
|
|
|
webhook_id = str(uuid.uuid4())[:8]
|
|
db = SessionLocal()
|
|
try:
|
|
db.add(Webhook(
|
|
id=webhook_id,
|
|
name=name,
|
|
url=url,
|
|
secret=encrypted_secret,
|
|
events=events,
|
|
is_active=True,
|
|
))
|
|
db.commit()
|
|
finally:
|
|
db.close()
|
|
|
|
return {"id": webhook_id, "name": name}
|
|
|
|
@router.post("/webhooks/{webhook_id}/test")
|
|
async def test_webhook(request: Request, webhook_id: str):
|
|
_require_admin(request)
|
|
db = SessionLocal()
|
|
try:
|
|
wh = db.query(Webhook).filter(Webhook.id == webhook_id).first()
|
|
if not wh:
|
|
raise HTTPException(404, "Webhook not found")
|
|
url, secret = wh.url, wh.secret
|
|
finally:
|
|
db.close()
|
|
|
|
await webhook_manager.deliver_test(webhook_id, url, secret)
|
|
return {"status": "sent"}
|
|
|
|
@router.patch("/webhooks/{webhook_id}")
|
|
def toggle_webhook(request: Request, webhook_id: str):
|
|
_require_admin(request)
|
|
db = SessionLocal()
|
|
try:
|
|
wh = db.query(Webhook).filter(Webhook.id == webhook_id).first()
|
|
if not wh:
|
|
raise HTTPException(404, "Webhook not found")
|
|
wh.is_active = not wh.is_active
|
|
db.commit()
|
|
return {"id": webhook_id, "is_active": wh.is_active}
|
|
finally:
|
|
db.close()
|
|
|
|
@router.delete("/webhooks/{webhook_id}")
|
|
def delete_webhook(request: Request, webhook_id: str):
|
|
_require_admin(request)
|
|
db = SessionLocal()
|
|
try:
|
|
deleted = db.query(Webhook).filter(Webhook.id == webhook_id).delete()
|
|
db.commit()
|
|
if not deleted:
|
|
raise HTTPException(404, "Webhook not found")
|
|
finally:
|
|
db.close()
|
|
return {"status": "deleted"}
|
|
|
|
# ================================================================
|
|
# Sync Chat Endpoint (for n8n / Make / Activepieces)
|
|
# ================================================================
|
|
|
|
# Known provider base URLs — auto-resolved from api_key prefix or model name
|
|
KNOWN_PROVIDERS = {
|
|
"deepseek": "https://api.deepseek.com/v1",
|
|
"openai": "https://api.openai.com/v1",
|
|
"mistral": "https://api.mistral.ai/v1",
|
|
"groq": "https://api.groq.com/openai/v1",
|
|
"together": "https://api.together.xyz/v1",
|
|
"openrouter": "https://openrouter.ai/api/v1",
|
|
"ollama": "https://ollama.com/api",
|
|
"fireworks": "https://api.fireworks.ai/inference/v1",
|
|
}
|
|
|
|
# Model prefix → provider mapping for auto-detection
|
|
MODEL_PROVIDER_MAP = {
|
|
"deepseek": "deepseek",
|
|
"gpt-": "openai",
|
|
"o1": "openai",
|
|
"o3": "openai",
|
|
"o4": "openai",
|
|
"mistral": "mistral",
|
|
"llama": "groq",
|
|
"mixtral": "groq",
|
|
}
|
|
|
|
def _resolve_base_url(model: Optional[str], provider: Optional[str]) -> Optional[str]:
|
|
"""Try to auto-resolve a base URL from provider name or model prefix."""
|
|
if provider and provider.lower() in KNOWN_PROVIDERS:
|
|
return KNOWN_PROVIDERS[provider.lower()]
|
|
if model:
|
|
model_lower = model.lower()
|
|
for prefix, prov in MODEL_PROVIDER_MAP.items():
|
|
if model_lower.startswith(prefix):
|
|
return KNOWN_PROVIDERS[prov]
|
|
return None
|
|
|
|
class SyncChatRequest(BaseModel):
|
|
message: str = Field(..., max_length=MAX_MESSAGE_LEN)
|
|
model: Optional[str] = Field(None, max_length=200)
|
|
session: Optional[str] = Field(None, max_length=100)
|
|
api_key: Optional[str] = Field(None, max_length=256)
|
|
base_url: Optional[str] = Field(None, max_length=MAX_URL_LEN)
|
|
provider: Optional[str] = Field(None, max_length=50)
|
|
|
|
@router.post("/v1/chat")
|
|
async def sync_chat(request: Request, body: SyncChatRequest):
|
|
if not getattr(request.state, "api_token", False):
|
|
raise HTTPException(403, "This endpoint requires an API token")
|
|
scopes = set(getattr(request.state, "api_token_scopes", []) or [])
|
|
if "chat" not in scopes:
|
|
raise HTTPException(403, "API token is not scoped for chat")
|
|
token_owner = getattr(request.state, "api_token_owner", None)
|
|
|
|
from core.models import ChatMessage
|
|
from src.llm_core import llm_call_async
|
|
from core.database import ModelEndpoint
|
|
from src.endpoint_resolver import build_chat_url, build_headers, build_models_url, normalize_base
|
|
|
|
message = body.message.strip()
|
|
if not message:
|
|
raise HTTPException(400, "Message is required")
|
|
|
|
session_id = body.session
|
|
sess = None
|
|
|
|
# --- Case 1: Resume an existing session ---
|
|
if session_id and session_manager:
|
|
try:
|
|
sess = session_manager.get_session(session_id)
|
|
except (KeyError, Exception):
|
|
raise HTTPException(404, "Session not found")
|
|
# SECURITY: verify the API-token's user owns this session — without
|
|
# this any token holder could resume any user's chat by passing its
|
|
# ID. The token's user is on request.state.user (set by API-token
|
|
# middleware); fall back to require_user if not present.
|
|
try:
|
|
from src.auth_helpers import get_current_user as _gcu
|
|
_tok_user = token_owner or getattr(request.state, "user", None) or _gcu(request)
|
|
except Exception:
|
|
_tok_user = None
|
|
# Strict ownership (see _caller_owns_session): fail closed so a
|
|
# null-owner / cross-owner session can't be resumed by an arbitrary
|
|
# chat-scoped token.
|
|
_sess_owner = getattr(sess, "owner", None)
|
|
if not _caller_owns_session(_sess_owner, _tok_user):
|
|
raise HTTPException(404, "Session not found")
|
|
|
|
# --- Case 2: Direct API key + model (no pre-configured endpoint needed) ---
|
|
if not sess and body.api_key:
|
|
api_key = body.api_key.strip()
|
|
model = body.model or "deepseek-chat"
|
|
|
|
# Resolve base_url: explicit > provider name > model prefix auto-detect
|
|
base_url = body.base_url.strip().rstrip("/") if body.base_url else None
|
|
if not base_url:
|
|
base_url = _resolve_base_url(model, body.provider)
|
|
if not base_url:
|
|
raise HTTPException(400,
|
|
"Could not auto-detect provider. Pass base_url (e.g. 'https://api.deepseek.com/v1') "
|
|
"or provider ('deepseek', 'openai', 'groq', etc.)")
|
|
|
|
base_url = normalize_base(base_url)
|
|
endpoint_url = build_chat_url(base_url)
|
|
|
|
if not session_manager:
|
|
raise HTTPException(500, "Session manager not available")
|
|
|
|
sid = str(uuid.uuid4())
|
|
sess = session_manager.create_session(
|
|
session_id=sid, name="API Chat", endpoint_url=endpoint_url,
|
|
model=model, owner=token_owner,
|
|
)
|
|
sess.headers = build_headers(api_key, base_url)
|
|
session_manager.save_sessions()
|
|
session_id = sid
|
|
|
|
# --- Case 3: Fall back to first configured ModelEndpoint ---
|
|
if not sess:
|
|
db = SessionLocal()
|
|
try:
|
|
ep = db.query(ModelEndpoint).filter(ModelEndpoint.is_enabled == True).first()
|
|
finally:
|
|
db.close()
|
|
|
|
if not ep:
|
|
raise HTTPException(400,
|
|
"No session, api_key, or configured endpoints. "
|
|
"Pass api_key + model, or configure an endpoint in Admin.")
|
|
|
|
base_url = normalize_base(ep.base_url)
|
|
endpoint_url = build_chat_url(base_url)
|
|
model = body.model or "auto"
|
|
api_key = ep.api_key
|
|
|
|
if model == "auto":
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5) as client:
|
|
models_url = build_models_url(base_url)
|
|
hdrs = build_headers(api_key, base_url)
|
|
resp = await client.get(models_url, headers=hdrs)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
ids = [m.get("id") for m in (data.get("data") or []) if m.get("id")]
|
|
if not ids:
|
|
ids = [
|
|
m.get("name") or m.get("model")
|
|
for m in (data.get("models") or [])
|
|
if m.get("name") or m.get("model")
|
|
]
|
|
model = ids[0] if ids else "auto"
|
|
except Exception:
|
|
raise HTTPException(500, "Could not discover models from endpoint")
|
|
|
|
if not session_manager:
|
|
raise HTTPException(500, "Session manager not available")
|
|
|
|
sid = str(uuid.uuid4())
|
|
sess = session_manager.create_session(
|
|
session_id=sid, name="API Chat", endpoint_url=endpoint_url,
|
|
model=model, owner=token_owner,
|
|
)
|
|
if api_key:
|
|
sess.headers = build_headers(api_key, base_url)
|
|
session_manager.save_sessions()
|
|
session_id = sid
|
|
|
|
# --- Send message and get response ---
|
|
sess.add_message(ChatMessage("user", message))
|
|
|
|
messages = [{"role": m.role, "content": m.content} for m in sess.history]
|
|
|
|
reply = await llm_call_async(
|
|
sess.endpoint_url, sess.model, messages,
|
|
headers=sess.headers, timeout=120,
|
|
)
|
|
sess.add_message(ChatMessage("assistant", reply))
|
|
session_manager.save_sessions()
|
|
|
|
asyncio.create_task(webhook_manager.fire("chat.completed", {
|
|
"session_id": session_id, "model": sess.model,
|
|
"user_message": message[:2000], "response": reply[:2000],
|
|
}))
|
|
|
|
return {"response": reply, "session_id": session_id, "model": sess.model}
|
|
|
|
return router
|