"""Shared auth helpers used by all route files.""" import os from typing import Optional from fastapi import Request, HTTPException def get_current_user(request: Request) -> Optional[str]: """Get current username from request state (set by auth middleware).""" return getattr(request.state, 'current_user', None) def _auth_disabled() -> bool: """True when the operator has explicitly turned off auth via .env. Mirrors the AUTH_ENABLED parse in app.py / core/middleware.py so the three call sites agree on what "off" means.""" return os.getenv("AUTH_ENABLED", "true").lower() == "false" def require_user(request: Request) -> str: """FastAPI dependency: reject unauthenticated callers when the upstream auth middleware was bypassed unexpectedly (e.g. SSRF from a sibling service). Returns the resolved username, or "" in single-user / anonymous modes where no username is available. The three "" cases are: 1. AUTH_ENABLED=false — the operator explicitly turned auth off. The full /login flow is skipped (issue #622), so route-level require_user must let the request through too instead of 401-ing and forcing the browser to /login. 2. Unconfigured first-run + loopback caller — pre-setup access from localhost so the operator can hit the SPA before creating the first admin. 3. LOCALHOST_BYPASS=true + loopback caller — documented dev bypass. Use this on routes that touch user data so middleware misconfig can't open them up. """ u = get_current_user(request) if u: return u # Operator-disabled auth: honor it at the route layer too. Without this, # routes that depend on require_user 401, the front-end fetch wrapper # redirects to /login, and the user sees a login page despite # AUTH_ENABLED=false (issue #622). Docker / reverse-proxy deployments # hit this because requests arrive from a non-loopback client.host, so # the loopback fall-through below never fires. if _auth_disabled(): return "" auth_mgr = getattr(request.app.state, "auth_manager", None) client = getattr(request, "client", None) host = (client.host if client else "") or "" is_loopback = host in ("127.0.0.1", "::1", "localhost") # LOCALHOST_BYPASS=true is the dev-only "I'm on loopback, skip auth" # switch. Mirror the middleware so routes don't 401 the same caller # the middleware just let through. if is_loopback and os.getenv("LOCALHOST_BYPASS", "false").lower() == "true": return "" if auth_mgr is not None and getattr(auth_mgr, "is_configured", False): raise HTTPException(401, "Not authenticated") # Unconfigured / first-run mode: only allow loopback callers. if is_loopback: return "" raise HTTPException(401, "Not authenticated") def require_privilege(request: Request, key: str) -> str: """Reject callers whose `auth.json` privilege flag for `key` is False. Returns the username so the route handler can keep using it. Admins always have every privilege via `auth_manager.get_privileges` (which returns ADMIN_PRIVILEGES wholesale), so this is a no-op for them. In unauthenticated single-user mode (`require_user` returns ""), privileges aren't enforced. """ user = require_user(request) if not user: return user auth_mgr = getattr(request.app.state, "auth_manager", None) if auth_mgr is None: return user try: privs = auth_mgr.get_privileges(user) or {} except Exception: return user # True = permitted; missing key defaults to permitted (unknown privileges # fail open — the UI gates display-side). if not privs.get(key, True): raise HTTPException(403, f"Your account is not allowed to {key.replace('_', ' ')}.") return user def owner_filter(query, model_cls, user: str, *, include_shared: bool = True): """Filter `query` so only rows owned by `user` (and optionally null-owner 'shared' rows) come through. No-op when `user` is empty (single-user mode). Returns the modified query.""" if not user: return query if include_shared: return query.filter((model_cls.owner == user) | (model_cls.owner == None)) # noqa: E711 return query.filter(model_cls.owner == user)