Files
hermes-webui/api/oauth.py
T
Nathan Esquenazi b34ce63c97 fix(oauth): honor cancel during Codex device-token exchange (follow-up to #1652)
The Codex OAuth onboarding worker introduced in #1652 had a cancel-vs-worker
race: a `cancel_onboarding_oauth_flow` request that arrived while the worker
was mid-network-call (between the `live = dict(...)` snapshot and the next
status check) would be silently overridden:

  1. User clicks Cancel → server sets flow.status = "cancelled" and drops
     sensitive lifecycle fields under the lock.
  2. Worker is mid-`_poll_codex_authorization` / `_exchange_codex_authorization`
     using the local `live` snapshot it captured before the cancel.
  3. Worker calls `_persist_codex_credentials(...)` — auth.json gets written.
  4. Worker calls `_set_flow_status(flow_id, "success")` — overrides the
     cancelled status.

Net effect: the user's explicit cancel is ignored, credentials are persisted,
and the UI reports success. Reproduced with a behavioural harness that drove
a real worker thread against patched network helpers and confirmed:

  pre-fix : flow status `success`, auth.json written despite cancel
  post-fix: flow status `cancelled`, auth.json NOT written

The fix re-checks the flow status under `_OAUTH_FLOWS_LOCK` after the token
exchange completes and before persisting. If the status is no longer
`pending`, the worker exits without persisting credentials and without
overwriting the terminal status.

Regression test `test_cancel_during_token_exchange_does_not_persist_credentials`
drives the worker against threading.Event-gated network stubs to reproduce
the race deterministically and lock the new invariant.

Trace verified against fresh hermes-agent tarball — credential_pool entry
shape (`auth_type=oauth`, `source=manual:device_code`, `priority=0`, base_url)
remains compatible with `agent.credential_pool.load_pool("openai-codex")` and
the agent CLI's `_save_codex_tokens` legacy fallback path.

Tests:
- 10/10 in tests/test_issue1362_codex_oauth_onboarding.py
- Full suite: 4230 passed, 57 skipped, 3 xpassed, 0 failed in 33.82s

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 14:49:38 -07:00

450 lines
16 KiB
Python

"""In-app OAuth flow implementations for onboarding.
The browser receives only WebUI-local flow metadata (flow_id, user_code,
verification_uri, high-level status). Provider device/auth codes and OAuth
tokens stay server-side and are persisted to the active Hermes profile's
``auth.json`` credential_pool.
"""
from __future__ import annotations
import json
import logging
import os
import stat
import threading
import time
import uuid
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# Compatibility for older helper tests and self-heal code that import these.
AUTH_JSON_PATH = Path.home() / ".hermes" / "auth.json"
CODEX_ISSUER = "https://auth.openai.com"
CODEX_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
CODEX_VERIFICATION_URI = f"{CODEX_ISSUER}/codex/device"
CODEX_USER_CODE_URL = f"{CODEX_ISSUER}/api/accounts/deviceauth/usercode"
CODEX_DEVICE_TOKEN_URL = f"{CODEX_ISSUER}/api/accounts/deviceauth/token"
CODEX_TOKEN_URL = f"{CODEX_ISSUER}/oauth/token"
CODEX_REDIRECT_URI = f"{CODEX_ISSUER}/deviceauth/callback"
CODEX_BASE_URL = "https://chatgpt.com/backend-api/codex"
CODEX_FLOW_MAX_WAIT_SECONDS = 15 * 60
_ALLOWED_ONBOARDING_OAUTH_PROVIDERS = {"openai-codex"}
_REJECTED_ONBOARDING_OAUTH_PROVIDERS = {
"anthropic",
"claude",
"claude-code",
"nous",
"qwen-oauth",
"gemini-cli",
"google-gemini-cli",
"minimax",
"minimax-oauth",
"copilot",
"copilot-acp",
}
_OAUTH_FLOWS: dict[str, dict[str, Any]] = {}
_OAUTH_FLOWS_LOCK = threading.Lock()
def _get_active_hermes_home() -> Path:
try:
from api.profiles import get_active_hermes_home
return Path(get_active_hermes_home())
except Exception as exc:
# Per Opus advisor on stage-296: log the silent fallback so a corrupt
# profile state ending up writing tokens to ~/.hermes (instead of the
# active profile) is observable in logs rather than failing silently.
logger.warning(
"Falling back to ~/.hermes for OAuth credential storage: "
"active-profile resolution failed: %s",
exc,
)
return Path.home() / ".hermes"
# ── legacy auth.json helpers ────────────────────────────────────────────────
def _read_auth_json(auth_path: Path | None = None) -> dict[str, Any]:
"""Read auth.json and return parsed dict, or an empty compatible store."""
path = auth_path or AUTH_JSON_PATH
if path.exists():
try:
loaded = json.loads(path.read_text(encoding="utf-8"))
return loaded if isinstance(loaded, dict) else {}
except json.JSONDecodeError as exc:
logger.warning("Failed to parse %s: %s", path, exc)
return {}
return {}
def read_auth_json():
"""Public wrapper for streaming credential self-heal code."""
return _read_auth_json()
def _write_auth_json(data: dict[str, Any], auth_path: Path | None = None) -> Path:
"""Atomically write auth.json with owner-only permissions.
OAuth access/refresh tokens live in this file. The temp file is chmod 0600
before rename so the final path never inherits a permissive process umask.
"""
path = auth_path or AUTH_JSON_PATH
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_name(f"{path.name}.tmp.{os.getpid()}.{uuid.uuid4().hex}")
try:
tmp.write_text(json.dumps(data, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
try:
tmp.chmod(0o600)
except OSError as exc:
logger.warning("Failed to chmod 0600 on %s: %s", tmp, exc)
tmp.replace(path)
try:
path.chmod(stat.S_IRUSR | stat.S_IWUSR)
except OSError:
pass
return path
finally:
try:
if tmp.exists():
tmp.unlink()
except OSError:
pass
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def _persist_codex_credentials(hermes_home: Path, token_data: dict[str, Any]) -> Path:
"""Persist Codex OAuth credentials to active-profile auth.json."""
access_token = str(token_data.get("access_token") or "").strip()
refresh_token = str(token_data.get("refresh_token") or "").strip()
if not access_token:
raise RuntimeError("Codex token exchange did not return an access_token")
auth_path = Path(hermes_home) / "auth.json"
auth = _read_auth_json(auth_path)
auth.setdefault("version", 1)
pool = auth.setdefault("credential_pool", {})
if not isinstance(pool, dict):
pool = {}
auth["credential_pool"] = pool
entries = pool.setdefault("openai-codex", [])
if not isinstance(entries, list):
entries = []
pool["openai-codex"] = entries
now = _now_iso()
entry = None
# Per Opus advisor on stage-296: also accept the legacy `source ==
# "oauth_device"` value so users with prior Codex OAuth credentials
# (written by older WebUI versions before this PR's source-key change)
# get their existing entry updated in-place rather than accumulating a
# stale duplicate pool entry.
_accept_sources = {"manual:device_code", "oauth_device"}
for candidate in entries:
if isinstance(candidate, dict) and candidate.get("source") in _accept_sources:
entry = candidate
break
if entry is None:
entry = {
"id": "codex-oauth-" + uuid.uuid4().hex[:12],
"label": "Codex OAuth",
"auth_type": "oauth",
"priority": 0,
"source": "manual:device_code",
"base_url": CODEX_BASE_URL,
"created_at": now,
}
entries.insert(0, entry)
entry.update(
{
"label": "Codex OAuth",
"auth_type": "oauth",
"priority": 0,
"source": "manual:device_code",
"access_token": access_token,
"refresh_token": refresh_token,
"base_url": CODEX_BASE_URL,
"last_refresh": now,
"updated_at": now,
}
)
auth["updated_at"] = now
path = _write_auth_json(auth, auth_path)
try:
from api.config import invalidate_credential_pool_cache
invalidate_credential_pool_cache("openai-codex")
except Exception:
logger.debug("Failed to invalidate openai-codex credential cache", exc_info=True)
return path
# Backward-compatible wrapper used by older code/tests.
def _save_codex_credentials(token_data):
return _persist_codex_credentials(_get_active_hermes_home(), token_data)
# ── Codex protocol ──────────────────────────────────────────────────────────
def _json_request(url: str, payload: dict[str, Any], *, form: bool = False) -> dict[str, Any]:
if form:
data = urllib.parse.urlencode(payload).encode("utf-8")
content_type = "application/x-www-form-urlencoded"
else:
data = json.dumps(payload).encode("utf-8")
content_type = "application/json"
req = urllib.request.Request(
url,
data=data,
method="POST",
headers={"Content-Type": content_type, "Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode("utf-8"))
def _request_codex_user_code() -> dict[str, Any]:
return _json_request(CODEX_USER_CODE_URL, {"client_id": CODEX_CLIENT_ID})
def _poll_codex_authorization(device_auth_id: str, user_code: str) -> dict[str, Any] | None:
try:
return _json_request(
CODEX_DEVICE_TOKEN_URL,
{"device_auth_id": device_auth_id, "user_code": user_code},
)
except urllib.error.HTTPError as exc:
if exc.code in (403, 404):
return None
raise
def _exchange_codex_authorization(authorization_code: str, code_verifier: str) -> dict[str, Any]:
return _json_request(
CODEX_TOKEN_URL,
{
"grant_type": "authorization_code",
"code": authorization_code,
"redirect_uri": CODEX_REDIRECT_URI,
"client_id": CODEX_CLIENT_ID,
"code_verifier": code_verifier,
},
form=True,
)
def _public_start_payload(flow_id: str, flow: dict[str, Any]) -> dict[str, Any]:
return {
"ok": True,
"provider": "openai-codex",
"flow_id": flow_id,
"status": flow.get("status", "pending"),
"verification_uri": CODEX_VERIFICATION_URI,
"user_code": flow.get("user_code", ""),
"expires_at": flow.get("expires_at"),
"poll_interval_seconds": flow.get("poll_interval_seconds", 5),
}
def _public_status_payload(flow_id: str, flow: dict[str, Any]) -> dict[str, Any]:
payload = {
"ok": True,
"provider": "openai-codex",
"flow_id": flow_id,
"status": flow.get("status", "error"),
}
if flow.get("status") == "error" and flow.get("error"):
payload["error"] = str(flow.get("error"))[:200]
return payload
def _drop_sensitive_flow_fields(flow: dict[str, Any]) -> None:
for key in (
"device_auth_id",
"authorization_code",
"code_verifier",
"access_token",
"refresh_token",
"token_data",
):
flow.pop(key, None)
def _cleanup_oauth_flows(now: float | None = None) -> None:
now = now or time.time()
cutoff = now - 300
with _OAUTH_FLOWS_LOCK:
for fid, flow in list(_OAUTH_FLOWS.items()):
status = flow.get("status")
if status == "pending" and float(flow.get("expires_at") or 0) <= now:
flow["status"] = "expired"
_drop_sensitive_flow_fields(flow)
if status in {"success", "expired", "cancelled", "error"} and float(flow.get("updated_at") or 0) < cutoff:
_OAUTH_FLOWS.pop(fid, None)
def _spawn_codex_oauth_worker(flow_id: str) -> None:
worker = threading.Thread(target=_run_codex_oauth_worker, args=(flow_id,), daemon=True)
worker.start()
def _set_flow_status(flow_id: str, status: str, **fields: Any) -> None:
with _OAUTH_FLOWS_LOCK:
flow = _OAUTH_FLOWS.get(flow_id)
if not flow:
return
flow["status"] = status
flow["updated_at"] = time.time()
flow.update(fields)
if status in {"success", "expired", "cancelled", "error"}:
_drop_sensitive_flow_fields(flow)
def _run_codex_oauth_worker(flow_id: str) -> None:
while True:
with _OAUTH_FLOWS_LOCK:
flow = dict(_OAUTH_FLOWS.get(flow_id) or {})
if not flow:
return
status = flow.get("status")
if status != "pending":
return
if float(flow.get("expires_at") or 0) <= time.time():
_set_flow_status(flow_id, "expired")
return
time.sleep(max(1, int(flow.get("poll_interval_seconds") or 5)))
with _OAUTH_FLOWS_LOCK:
live = dict(_OAUTH_FLOWS.get(flow_id) or {})
if live.get("status") != "pending":
return
try:
code_resp = _poll_codex_authorization(
str(live.get("device_auth_id") or ""),
str(live.get("user_code") or ""),
)
if code_resp is None:
continue
authorization_code = str(code_resp.get("authorization_code") or "").strip()
code_verifier = str(code_resp.get("code_verifier") or "").strip()
if not authorization_code or not code_verifier:
raise RuntimeError("Device auth response missing authorization_code or code_verifier")
tokens = _exchange_codex_authorization(authorization_code, code_verifier)
# Re-check status under lock before persisting: a cancel/expire that
# raced with the device-token + token-exchange network calls must
# win, so we don't persist credentials the user explicitly aborted.
with _OAUTH_FLOWS_LOCK:
current = _OAUTH_FLOWS.get(flow_id)
if not current or current.get("status") != "pending":
return
_persist_codex_credentials(Path(live["hermes_home"]), tokens)
_set_flow_status(flow_id, "success")
return
except Exception as exc:
logger.warning("Codex OAuth onboarding flow failed: %s", exc)
_set_flow_status(flow_id, "error", error=str(exc))
return
def start_onboarding_oauth_flow(body: dict[str, Any] | None) -> dict[str, Any]:
"""Start the supported onboarding OAuth flow.
Currently v1 intentionally supports only OpenAI Codex. Other providers are
rejected instead of silently falling back to terminal-first setup.
"""
_cleanup_oauth_flows()
provider = str((body or {}).get("provider") or "").strip().lower()
if provider not in _ALLOWED_ONBOARDING_OAUTH_PROVIDERS:
if provider in _REJECTED_ONBOARDING_OAUTH_PROVIDERS or provider:
raise ValueError("Only OpenAI Codex OAuth is supported in WebUI onboarding right now")
raise ValueError("provider is required")
hermes_home = _get_active_hermes_home()
try:
device = _request_codex_user_code()
except Exception as exc:
raise RuntimeError(f"Failed to start Codex OAuth: {exc}") from exc
user_code = str(device.get("user_code") or "").strip()
device_auth_id = str(device.get("device_auth_id") or "").strip()
if not user_code or not device_auth_id:
raise RuntimeError("Device code response missing required fields")
interval = max(3, int(device.get("interval") or 5))
expires_in = int(device.get("expires_in") or CODEX_FLOW_MAX_WAIT_SECONDS)
expires_at = time.time() + min(max(expires_in, 60), CODEX_FLOW_MAX_WAIT_SECONDS)
flow_id = uuid.uuid4().hex
flow = {
"provider": "openai-codex",
"status": "pending",
"device_auth_id": device_auth_id,
"user_code": user_code,
"expires_at": expires_at,
"poll_interval_seconds": interval,
"hermes_home": str(hermes_home),
"created_at": time.time(),
"updated_at": time.time(),
}
with _OAUTH_FLOWS_LOCK:
_OAUTH_FLOWS[flow_id] = flow
_spawn_codex_oauth_worker(flow_id)
return _public_start_payload(flow_id, flow)
def poll_onboarding_oauth_flow(flow_id: str) -> dict[str, Any]:
_cleanup_oauth_flows()
fid = str(flow_id or "").strip()
if not fid:
raise ValueError("flow_id is required")
with _OAUTH_FLOWS_LOCK:
flow = _OAUTH_FLOWS.get(fid)
if not flow:
raise KeyError("OAuth flow not found")
if flow.get("status") == "pending" and float(flow.get("expires_at") or 0) <= time.time():
flow["status"] = "expired"
flow["updated_at"] = time.time()
_drop_sensitive_flow_fields(flow)
return _public_status_payload(fid, dict(flow))
def cancel_onboarding_oauth_flow(body: dict[str, Any] | None) -> dict[str, Any]:
fid = str((body or {}).get("flow_id") or "").strip()
if not fid:
raise ValueError("flow_id is required")
with _OAUTH_FLOWS_LOCK:
flow = _OAUTH_FLOWS.get(fid)
if not flow:
return {"ok": True, "provider": "openai-codex", "flow_id": fid, "status": "cancelled"}
if flow.get("status") == "pending":
flow["status"] = "cancelled"
flow["updated_at"] = time.time()
_drop_sensitive_flow_fields(flow)
return _public_status_payload(fid, dict(flow))
# Backward-compatible names from the abandoned spike. They intentionally do not
# expose provider device secrets to callers anymore.
def start_codex_device_code():
return start_onboarding_oauth_flow({"provider": "openai-codex"})
def poll_codex_token(device_code, interval=5):
yield {"status": "error", "error": "Use /api/onboarding/oauth/poll with flow_id"}