Files

2147 lines
84 KiB
Python

"""Hermes Web UI -- provider management endpoints.
Provides CRUD operations for configuring provider API keys post-onboarding.
Closes #586 (allow provider key update) and part of #604 (model picker
multi-provider support).
"""
from __future__ import annotations
import base64
import hashlib
import json
import logging
import os
import signal
import subprocess
import sys
import threading
import time
import urllib.error
import urllib.request
from contextlib import contextmanager, nullcontext
from datetime import datetime, timedelta, timezone
from pathlib import Path
from types import SimpleNamespace
from typing import Any
try: # POSIX-only; Windows-style environments fall back to process-local locking.
import fcntl
except ImportError: # pragma: no cover - exercised only where fcntl is unavailable
fcntl = None # type: ignore[assignment]
from api.config import (
_PROVIDER_DISPLAY,
_PROVIDER_MODELS,
_custom_provider_slug_from_name,
_get_label_for_model,
_models_from_live_provider_ids,
_read_live_provider_model_ids,
_read_visible_codex_cache_model_ids,
_save_yaml_config_file,
get_config,
invalidate_models_cache,
reload_config,
)
logger = logging.getLogger(__name__)
def _custom_provider_name_matches(provider_id: str, name: object) -> bool:
"""Return True when *provider_id* refers to a named custom provider."""
pid = str(provider_id or "").strip().lower()
raw_name = str(name or "").strip().lower()
if not pid or not raw_name:
return False
slug = _custom_provider_slug_from_name(raw_name)
candidates = {raw_name, f"custom:{raw_name}"}
if slug:
candidates.add(slug)
return pid in candidates
_OPENROUTER_KEY_URL = "https://openrouter.ai/api/v1/key"
_PROVIDER_QUOTA_TIMEOUT_SECONDS = 3.0
_ACCOUNT_USAGE_SUBPROCESS_TIMEOUT_SECONDS = 35.0
_ACCOUNT_USAGE_CACHE_TTL_SECONDS = 45.0
_ACCOUNT_USAGE_CACHE_MAX_ENTRIES = 64
_ACCOUNT_USAGE_PROVIDERS = frozenset({"openai-codex", "anthropic"})
# Upper bound on simultaneous profile-isolated quota probe subprocesses.
# Each probe runs a Python child for up to 35 s; capping concurrency prevents
# resource exhaustion when the UI polls all providers rapidly. The limit is
# deliberately low (2) since _ACCOUNT_USAGE_SUBPROCESS_TIMEOUT_SECONDS is
# already 35 s and probe I/O is lightweight HTTP calls.
_MAX_CONCURRENT_ACCOUNT_USAGE_PROBES = 2
# Parent-death-signal setup: on Linux, arrange for the quota-probe child to
# receive SIGTERM when the WebUI parent dies (e.g. systemctl restart, OOM kill).
# This prevents probe children from becoming orphaned zombies that continue
# calling the provider API indefinitely after the WebUI process is gone.
# We use prctl(PR_SET_DEATHSIG, SIGTERM) which is standard on modern Linux
# kernels and available via ctypes (no external C extension needed).
# If prctl is unavailable (non-Linux, or Linux without prctl support), the
# probe child exits normally when its parent (WebUI) terminates -- on macOS/
# Windows this is handled by OS-level process tree cleanup.
# Portable parent-death-signal bootstrap. On Linux this arranges for the
# probe child to receive SIGTERM when the WebUI parent dies (systemctl
# restart, OOM kill, etc.), preventing orphaned zombie probes from continuing
# to call the provider API indefinitely. Non-Linux platforms (macOS, Windows)
# rely on OS-level process-tree cleanup instead; this variable is then unused.
# prctl(PR_SET_DEATHSIG, SIGTERM) is available via ctypes without any C
# extension — the same technique used throughout the Hermes codebase.
_ACCOUNT_USAGE_PARENT_DEATHSIG_BOOTSTRAP = (
# fmt: off
# Lines are written as string literals so this block passes
# `python3 -m py_compile` cleanly and is safe to include verbatim
# inside the single argument string passed to `python -c ...`.
'import sys\n'
'try:\n'
' import ctypes, signal\n'
' libc = ctypes.CDLL(None)\n'
' libc.prctl(1, signal.SIGTERM) # PR_SET_DEATHSIG=1, SIGTERM=15\n'
'except Exception:\n'
' pass\n'
# fmt: on
)
# Module-level cap on concurrent quota-probe subprocesses.
# Lazily created so this module compiles even when threading isn't ready.
_account_usage_probe_semaphore: threading.BoundedSemaphore | None = None
# Short-lived account-usage cache. The Codex pooled probe may check multiple
# credentials, so cache sanitized snapshots briefly to avoid re-querying the
# provider on every Settings repaint/profile-panel refresh. Pool composition
# changes can be stale for at most this TTL; that is preferred to hammering the
# provider usage API while the Settings panel is open. Transient None probe
# results are intentionally not cached; known exhausted/unavailable states are
# represented as non-None snapshots and remain cacheable.
_account_usage_status_cache: dict[tuple[str, str, str], tuple[float, Any]] = {}
_account_usage_status_cache_lock = threading.Lock()
def _get_account_usage_probe_semaphore() -> threading.BoundedSemaphore:
global _account_usage_probe_semaphore
if _account_usage_probe_semaphore is None:
_account_usage_probe_semaphore = threading.BoundedSemaphore(
_MAX_CONCURRENT_ACCOUNT_USAGE_PROBES
)
return _account_usage_probe_semaphore
# ── preexec_fn: parent-death signal for the probe subprocess ─────────────────
# On POSIX/Linux, arrange for the child to receive SIGTERM when the WebUI
# parent dies (systemctl restart, OOM kill, etc.). The parent's bootstrap
# code (_ACCOUNT_USAGE_PARENT_DEATHSIG_BOOTSTRAP) also covers the grandchild
# fork inside the child, but this preexec_fn handles the direct child-process
# case. Returns None on non-POSIX or when prctl is unavailable so that
# subprocess.run() works on Windows/macOS without changes.
def _account_usage_preexec_fn() -> None:
try:
import ctypes
libc = ctypes.CDLL(None)
libc.prctl(1, signal.SIGTERM) # PR_SET_PDEATHSIG=1, SIGTERM=15
except Exception:
pass
_ACCOUNT_USAGE_SUBPROCESS_CODE = r"""
import base64
import json
import sys
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from urllib import request as urllib_request
from agent.account_usage import fetch_account_usage
_CODEX_DEFAULT_BASE_URL = "https://chatgpt.com/backend-api/codex"
_CODEX_POOL_USAGE_TIMEOUT_SECONDS = 4.0
_CODEX_POOL_MAX_WORKERS = 6
def _iso(value):
if value in (None, ""):
return None
if hasattr(value, "isoformat"):
text = value.isoformat()
return text.replace("+00:00", "Z")
text = str(value).strip()
return text or None
def _snapshot_payload(snapshot):
if snapshot is None:
return None
windows = []
for window in getattr(snapshot, "windows", ()) or ():
windows.append({
"label": str(getattr(window, "label", "") or ""),
"used_percent": getattr(window, "used_percent", None),
"reset_at": _iso(getattr(window, "reset_at", None)),
"detail": getattr(window, "detail", None),
})
payload = {
"provider": str(getattr(snapshot, "provider", "") or ""),
"source": str(getattr(snapshot, "source", "") or ""),
"title": str(getattr(snapshot, "title", "") or ""),
"plan": getattr(snapshot, "plan", None),
"windows": windows,
"details": list(getattr(snapshot, "details", ()) or ()),
"available": bool(getattr(snapshot, "available", bool(windows))),
"unavailable_reason": getattr(snapshot, "unavailable_reason", None),
"fetched_at": _iso(getattr(snapshot, "fetched_at", None)),
}
pool = getattr(snapshot, "pool", None)
if isinstance(pool, dict):
payload["pool"] = pool
return payload
def _snapshot_available(snapshot):
if snapshot is None:
return False
try:
return bool(getattr(snapshot, "available", False))
except Exception:
return False
def _number(value):
if isinstance(value, bool) or value is None:
return None
if isinstance(value, (int, float)):
return value
try:
text = str(value).strip()
if not text:
return None
number = float(text)
return int(number) if number.is_integer() else number
except Exception:
return None
def _parse_dt(value):
if value in (None, ""):
return None
if isinstance(value, (int, float)):
try:
return datetime.fromtimestamp(float(value), tz=timezone.utc)
except Exception:
return None
text = str(value).strip()
if not text:
return None
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
dt = datetime.fromisoformat(text)
except ValueError:
return None
return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
def _title_case_slug(value):
cleaned = str(value or "").strip()
if not cleaned:
return None
return cleaned.replace("_", " ").replace("-", " ").title()
def _resolve_codex_usage_url(base_url):
normalized = str(base_url or "").strip().rstrip("/") or _CODEX_DEFAULT_BASE_URL
if normalized.endswith("/codex"):
normalized = normalized[: -len("/codex")]
if "/backend-api" in normalized:
return normalized + "/wham/usage"
return normalized + "/api/codex/usage"
def _jwt_claims(token):
if not isinstance(token, str) or token.count(".") != 2:
return {}
payload = token.split(".")[1]
payload += "=" * ((4 - len(payload) % 4) % 4)
try:
claims = json.loads(base64.urlsafe_b64decode(payload.encode("utf-8")).decode("utf-8"))
except Exception:
return {}
return claims if isinstance(claims, dict) else {}
def _codex_usage_headers(access_token):
headers = {
"Authorization": "Bearer " + access_token,
"Accept": "application/json",
"User-Agent": "codex_cli_rs/0.0.0 (Hermes WebUI)",
"originator": "codex_cli_rs",
}
auth_claim = _jwt_claims(access_token).get("https://api.openai.com/auth")
account_id = None
if isinstance(auth_claim, dict):
account_id = auth_claim.get("chatgpt_account_id")
if isinstance(account_id, str) and account_id.strip():
headers["ChatGPT-Account-ID"] = account_id.strip()
return headers
def _entry_value(entry, *names):
for name in names:
try:
value = getattr(entry, name)
except Exception:
value = None
if value in (None, ""):
continue
text = str(value).strip()
if text:
return text
return None
def _codex_snapshot_from_usage_payload(payload):
if not isinstance(payload, dict):
payload = {}
rate_limit = payload.get("rate_limit")
if not isinstance(rate_limit, dict):
rate_limit = {}
windows = []
for key, label in (("primary_window", "Session"), ("secondary_window", "Weekly")):
window = rate_limit.get(key)
if not isinstance(window, dict):
continue
used = _number(window.get("used_percent"))
if used is None:
continue
windows.append(SimpleNamespace(
label=label,
used_percent=float(used),
reset_at=_parse_dt(window.get("reset_at")),
detail=None,
))
details = []
credits = payload.get("credits")
if isinstance(credits, dict) and credits.get("has_credits"):
balance = _number(credits.get("balance"))
if balance is not None:
details.append("Credits balance: $" + format(float(balance), ".2f"))
elif credits.get("unlimited"):
details.append("Credits balance: unlimited")
return SimpleNamespace(
provider="openai-codex",
source="usage_api",
title="Account limits",
plan=_title_case_slug(payload.get("plan_type")),
windows=tuple(windows),
details=tuple(details),
available=bool(windows or details),
unavailable_reason=None,
fetched_at=datetime.now(timezone.utc),
)
def _snapshot_windows_payload(snapshot):
windows = []
for window in getattr(snapshot, "windows", ()) or ():
label = str(getattr(window, "label", "") or "").strip()
if not label:
continue
used_percent = _number(getattr(window, "used_percent", None))
remaining_percent = None
if used_percent is not None:
remaining_percent = max(0.0, min(100.0, 100.0 - float(used_percent)))
windows.append({
"label": label,
"used_percent": used_percent,
"remaining_percent": remaining_percent,
"reset_at": _iso(getattr(window, "reset_at", None)),
"detail": getattr(window, "detail", None),
})
return windows
def _snapshot_details_payload(snapshot):
return [
str(detail).strip()
for detail in (getattr(snapshot, "details", ()) or ())
if str(detail).strip()
]
def _safe_entry_label(entry, index):
label = _entry_value(entry, "label", "source") or ""
if not label:
label = "Credential " + str(index)
label = " ".join(str(label).split())
if len(label) > 64:
label = label[:61].rstrip() + "..."
return label
def _safe_unavailable_reason(reason):
text = " ".join(str(reason or "").split())
if not text:
return None
lowered = text.lower()
sensitive_terms = ("access_token", "refresh_token", "authorization", "bearer ", "jwt", "secret")
if any(term in lowered for term in sensitive_terms):
return "Usage unavailable for this credential."
return text[:180]
def _entry_exhausted_ttl_seconds(error_code):
code = str(error_code or "").strip()
if code == "401":
return 5 * 60
return 60 * 60
def _entry_pool_exhausted_until(entry):
if str(_entry_value(entry, "last_status") or "").strip().lower() != "exhausted":
return None
reset_at = _parse_dt(getattr(entry, "last_error_reset_at", None))
if reset_at is not None:
return reset_at
status_at = _parse_dt(getattr(entry, "last_status_at", None))
if status_at is None:
return None
return status_at + timedelta(seconds=_entry_exhausted_ttl_seconds(_entry_value(entry, "last_error_code")))
def _entry_is_pool_exhausted(entry):
exhausted_until = _entry_pool_exhausted_until(entry)
return exhausted_until is not None and datetime.now(timezone.utc) < exhausted_until
def _entry_pool_exhausted_reason(entry):
code = _entry_value(entry, "last_error_code")
reset_at = _entry_pool_retry_after(entry)
reason = "Credential pool marked this credential exhausted"
if code:
reason += " after provider status " + code
if reset_at:
reason += "; retry after " + reset_at
return reason + "."
def _entry_pool_retry_after(entry):
return _iso(_entry_pool_exhausted_until(entry))
def _fetch_codex_entry_snapshot(entry):
access_token = _entry_value(entry, "runtime_api_key", "access_token")
if not access_token:
return None, False, "No runtime token available."
base_url = _entry_value(entry, "runtime_base_url", "base_url") or _CODEX_DEFAULT_BASE_URL
request = urllib_request.Request(
_resolve_codex_usage_url(base_url),
headers=_codex_usage_headers(access_token),
)
with urllib_request.urlopen(request, timeout=_CODEX_POOL_USAGE_TIMEOUT_SECONDS) as response:
payload = json.loads(response.read().decode("utf-8") or "{}")
return _codex_snapshot_from_usage_payload(payload), True, None
def _best_remaining_by_window(rows):
best = {}
for row in rows:
if row.get("status") != "available":
continue
label = row.get("label") or "Credential"
for window in row.get("windows") or []:
if not isinstance(window, dict):
continue
window_label = str(window.get("label") or "").strip()
remaining = _number(window.get("remaining_percent"))
if not window_label or remaining is None:
continue
candidate = {
"label": window_label,
"remaining_percent": remaining,
"used_percent": window.get("used_percent"),
"reset_at": window.get("reset_at"),
"detail": window.get("detail"),
"credential_label": label,
}
current = best.get(window_label.lower())
# The normalized Codex account-limit payload currently exposes
# percentages, not absolute request/token capacity. If absolute
# remaining capacity becomes available, prefer it here.
if current is None or float(remaining) > float(current.get("remaining_percent") or -1):
best[window_label.lower()] = candidate
return list(best.values())
def _next_reset_at(rows):
best_dt = None
best_text = None
for row in rows:
for window in row.get("windows") or []:
if not isinstance(window, dict):
continue
reset_text = window.get("reset_at")
dt = _parse_dt(reset_text)
if dt is None:
continue
if best_dt is None or dt < best_dt:
best_dt = dt
best_text = _iso(dt)
return best_text
def _codex_pool_snapshot(entries, rows, queried):
available_rows = [row for row in rows if row.get("status") == "available"]
exhausted_rows = [row for row in rows if row.get("status") == "exhausted"]
failed_rows = [row for row in rows if row.get("status") not in {"available", "exhausted"}]
plans = []
for row in rows:
plan = row.get("plan")
if plan and plan not in plans:
plans.append(plan)
best_windows = _best_remaining_by_window(rows)
pool = {
"total_credentials": len(entries),
"queried_credentials": queried,
"available_credentials": len(available_rows),
"exhausted_credentials": len(exhausted_rows),
"failed_credentials": len(failed_rows),
"plans": plans,
"next_reset_at": _next_reset_at(rows),
"best_remaining_by_window": best_windows,
"credentials": rows,
}
details = [str(len(available_rows)) + "/" + str(len(entries)) + " credentials available"]
if exhausted_rows:
details.append(str(len(exhausted_rows)) + " exhausted")
if failed_rows:
details.append(str(len(failed_rows)) + " failed to load")
if plans:
details.append("Plans: " + ", ".join(plans))
plan = plans[0] if len(plans) == 1 else None
windows = tuple(
SimpleNamespace(
label=window.get("label"),
used_percent=window.get("used_percent"),
reset_at=window.get("reset_at"),
detail="Best of " + str(len(available_rows)) + " available credentials",
)
for window in best_windows
)
return SimpleNamespace(
provider="openai-codex",
source="usage_api_pool",
title="Account limits",
plan=plan,
windows=windows,
details=tuple(details),
available=bool(available_rows),
unavailable_reason=None if available_rows else "No Codex pool credentials returned available account limits.",
fetched_at=datetime.now(timezone.utc),
pool=pool,
)
def _codex_pool_exhausted_row(entry, index):
label = _safe_entry_label(entry, index)
retry_after = _entry_pool_retry_after(entry)
return {
"label": label,
"status": "exhausted",
"plan": None,
"windows": [],
"details": [],
"unavailable_reason": _entry_pool_exhausted_reason(entry),
"retry_after": retry_after,
"fetched_at": None,
}
def _probe_codex_pool_entry(item):
index, entry = item
label = _safe_entry_label(entry, index)
did_query_count = 0
try:
snapshot, did_query, reason = _fetch_codex_entry_snapshot(entry)
if did_query:
did_query_count = 1
except Exception as exc:
snapshot = None
reason = str(exc)
windows = _snapshot_windows_payload(snapshot) if snapshot is not None else []
details = _snapshot_details_payload(snapshot) if snapshot is not None else []
snapshot_available = _snapshot_available(snapshot)
status = "available" if snapshot_available else "unavailable"
row = {
"label": label,
"status": status,
"plan": getattr(snapshot, "plan", None) if snapshot is not None else None,
"windows": windows,
"details": details,
"unavailable_reason": None if snapshot_available else _safe_unavailable_reason(reason or getattr(snapshot, "unavailable_reason", None)),
"fetched_at": _iso(getattr(snapshot, "fetched_at", None)) if snapshot is not None else None,
}
return index, row, did_query_count
def _fetch_codex_account_usage_from_pool():
try:
from agent.credential_pool import load_pool
pool = load_pool("openai-codex")
entries = list(pool.entries()) if pool is not None and hasattr(pool, "entries") else []
if not entries:
return None
rows_by_index = {}
probe_items = []
queried = 0
for index, entry in enumerate(entries, start=1):
if _entry_is_pool_exhausted(entry):
rows_by_index[index] = _codex_pool_exhausted_row(entry, index)
else:
probe_items.append((index, entry))
if probe_items:
max_workers = min(_CODEX_POOL_MAX_WORKERS, len(probe_items))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for index, row, did_query_count in executor.map(_probe_codex_pool_entry, probe_items):
rows_by_index[index] = row
queried += did_query_count
rows = [rows_by_index[index] for index in range(1, len(entries) + 1)]
return _codex_pool_snapshot(entries, rows, queried)
except Exception:
return None
provider = sys.argv[1]
api_key = sys.argv[2] or None
try:
snapshot = fetch_account_usage(provider, api_key=api_key)
except Exception:
snapshot = None
if str(provider or "").strip().lower() == "openai-codex":
pool_snapshot = _fetch_codex_account_usage_from_pool()
if isinstance(getattr(pool_snapshot, "pool", None), dict):
snapshot = pool_snapshot
print(json.dumps(_snapshot_payload(snapshot)))
"""
# SECTION: Provider ↔ env var mapping
# Maps canonical provider slug → env var name for API key.
# Providers not listed here (OAuth/token-flow providers like copilot, nous,
# openai-codex) cannot have their keys managed from the WebUI.
_PROVIDER_ENV_VAR: dict[str, str] = {
"openrouter": "OPENROUTER_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
"openai": "OPENAI_API_KEY",
"google": "GOOGLE_API_KEY",
"gemini": "GEMINI_API_KEY",
"zai": "GLM_API_KEY",
"kimi-coding": "KIMI_API_KEY",
"deepseek": "DEEPSEEK_API_KEY",
"minimax": "MINIMAX_API_KEY",
"minimax-cn": "MINIMAX_CN_API_KEY",
"mistralai": "MISTRAL_API_KEY",
"x-ai": "XAI_API_KEY",
"xiaomi": "XIAOMI_API_KEY",
"opencode-zen": "OPENCODE_ZEN_API_KEY",
"opencode-go": "OPENCODE_GO_API_KEY",
# NOTE: bare "ollama" (local) deliberately omitted — local Ollama is keyless
# by default and the runtime in hermes_cli/runtime_provider.py only consumes
# OLLAMA_API_KEY when the base URL hostname is ollama.com (Ollama Cloud).
# If we mapped both providers to the same env var, configuring Ollama Cloud
# would falsely flip the local Ollama card to "API key configured" (#1410).
# Users who genuinely run an authenticated local Ollama can still set a key
# via providers.ollama.api_key in config.yaml — that path remains supported
# by _provider_has_key().
"ollama-cloud": "OLLAMA_API_KEY",
# Bare "lmstudio" maps to LM_API_KEY — the canonical env var the agent CLI
# runtime reads (hermes_cli/auth.py:182, api_key_env_vars=("LM_API_KEY",)).
# Pre-#1499/#1500 the WebUI used LMSTUDIO_API_KEY here, which made Settings
# report keys correctly but the agent runtime ignored them — masked in
# practice by the LMSTUDIO_NOAUTH_PLACEHOLDER for keyless local installs.
# Aligning to LM_API_KEY makes a configured LM Studio key actually work
# for chat. The legacy LMSTUDIO_API_KEY name is read by `_provider_has_key`
# via _PROVIDER_ENV_VAR_ALIASES below so existing users don't see Settings
# flip to "no key" after upgrading.
"lmstudio": "LM_API_KEY",
"nvidia": "NVIDIA_API_KEY",
}
# Read-only legacy env-var aliases. When `_provider_has_key(pid)` looks up its
# canonical env var name and finds nothing, it also checks any aliases listed
# here. Onboarding (api/onboarding.py:apply_onboarding_setup) only writes the
# canonical name. Use this for env vars that were renamed in a past release;
# add an entry, ship for a few releases, then remove the alias once enough
# users have upgraded.
_PROVIDER_ENV_VAR_ALIASES: dict[str, tuple[str, ...]] = {
# #1500 — agent runtime reads LM_API_KEY (canonical), but WebUI builds
# ≤ v0.50.272 wrote LMSTUDIO_API_KEY into .env. Keep reading both.
"lmstudio": ("LMSTUDIO_API_KEY",),
}
# Providers that use OAuth or token flows — their credentials are managed
# through the Hermes CLI, not via API keys. The WebUI cannot set these.
_OAUTH_PROVIDERS = frozenset({
"copilot",
"copilot-acp",
"nous",
"openai-codex",
"qwen-oauth",
})
# SECTION: Helper functions
def _get_hermes_home() -> Path:
"""Return the active Hermes home directory."""
try:
from api.profiles import get_active_hermes_home
return get_active_hermes_home()
except ImportError:
return Path.home() / ".hermes"
def _load_env_file(env_path: Path) -> dict[str, str]:
"""Read key=value pairs from a .env file."""
values: dict[str, str] = {}
if not env_path.exists():
return values
try:
for raw in env_path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
values[key.strip()] = value.strip().strip('"').strip("'")
except Exception:
return {}
return values
def _decode_jwt_claims_unverified(token: str) -> dict[str, Any]:
"""Decode JWT claims for token-shape classification only.
The signature is intentionally not verified because this helper is not an
authorization decision: it only prevents a Codex OAuth JWT-shaped value from
being treated as a raw OpenAI API key in provider-card detection.
"""
if not isinstance(token, str) or token.count(".") != 2:
return {}
payload = token.split(".", 2)[1]
payload += "=" * ((4 - len(payload) % 4) % 4)
try:
claims = json.loads(base64.urlsafe_b64decode(payload.encode("utf-8")).decode("utf-8"))
except Exception:
return {}
return claims if isinstance(claims, dict) else {}
def _looks_like_codex_oauth_token(value: str) -> bool:
"""Return True when a value is a ChatGPT/Codex OAuth JWT, not an OpenAI API key."""
token = str(value or "").strip()
if not token or token.startswith("sk-"):
return False
claims = _decode_jwt_claims_unverified(token)
if not claims:
return False
auth_claim = claims.get("https://api.openai.com/auth")
if isinstance(auth_claim, dict) and auth_claim:
return True
return any(key in claims for key in ("chatgpt_account_id", "https://api.openai.com/profile"))
def _provider_value_counts_as_api_key(provider_id: str, value: object) -> bool:
text = str(value or "").strip()
if not text:
return False
if (provider_id or "").strip().lower() == "openai" and _looks_like_codex_oauth_token(text):
return False
return True
def _provider_has_shadowed_codex_oauth_value(provider_id: str) -> bool:
"""True when the bare OpenAI credential slot contains only a Codex OAuth JWT.
Users who authenticate Codex can end up with a ChatGPT/Codex JWT in a
legacy OPENAI_API_KEY-shaped location. That value should not make the bare
OpenAI API provider appear as configured, and the Providers tab should not
show an extra OpenAI card solely because of that Codex-only credential.
"""
if (provider_id or "").strip().lower() != "openai":
return False
values: list[object] = []
env_var = _PROVIDER_ENV_VAR.get(provider_id)
if env_var:
env_path = _get_hermes_home() / ".env"
env_values = _load_env_file(env_path)
values.append(env_values.get(env_var))
values.append(os.getenv(env_var))
for alias in _PROVIDER_ENV_VAR_ALIASES.get(provider_id, ()) or ():
values.append(env_values.get(alias))
values.append(os.getenv(alias))
cfg = get_config()
model_cfg = cfg.get("model", {})
if isinstance(model_cfg, dict):
active_provider = str(model_cfg.get("provider") or "").strip().lower()
if active_provider == provider_id:
values.append(model_cfg.get("api_key"))
providers_cfg = cfg.get("providers", {})
if isinstance(providers_cfg, dict):
provider_cfg = providers_cfg.get(provider_id, {})
if isinstance(provider_cfg, dict):
values.append(provider_cfg.get("api_key"))
custom_providers = cfg.get("custom_providers", [])
if isinstance(custom_providers, list):
for cp in custom_providers:
if isinstance(cp, dict) and _custom_provider_name_matches(provider_id, cp.get("name")):
cp_key = cp.get("api_key")
if isinstance(cp_key, str) and cp_key.startswith("${") and cp_key.endswith("}"):
values.append(os.getenv(cp_key[2:-1]))
else:
values.append(cp_key)
return any(_looks_like_codex_oauth_token(str(value or "")) for value in values)
def _write_env_file(env_path: Path, updates: dict[str, str | None]) -> None:
"""Write key=value pairs to the .env file.
Values of ``None`` cause the key to be removed.
Preserves comments, blank lines, and original key order (#1164).
New keys are appended at the end of the file with a blank-line separator.
Holds ``_ENV_LOCK`` from ``api.streaming`` for the entire load → modify →
write cycle to prevent TOCTOU races between concurrent POST /api/providers
calls (each reading the same file baseline and overwriting the other's key).
Also serialises os.environ mutations with streaming sessions.
"""
from api.streaming import _ENV_LOCK
import stat as _stat
with _ENV_LOCK:
# ── Read existing lines (preserving comments and blank lines) ──
existing_lines: list[str] = []
if env_path.exists():
try:
existing_lines = env_path.read_text(encoding="utf-8").splitlines()
except Exception:
existing_lines = []
# Map each existing key to its line index so we can update in-place.
existing_key_indices: dict[str, int] = {}
for _i, _raw in enumerate(existing_lines):
_stripped = _raw.strip()
if _stripped and not _stripped.startswith("#") and "=" in _stripped:
_existing_key_indices_key = _stripped.split("=", 1)[0].strip()
existing_key_indices[_existing_key_indices_key] = _i
output_lines = list(existing_lines)
new_keys: list[str] = []
for key, value in updates.items():
if value is None:
# Mark the line for removal (None sentinel) and clear env.
os.environ.pop(key, None)
if key in existing_key_indices:
output_lines[existing_key_indices[key]] = None # type: ignore[assignment]
continue
clean = str(value).strip()
if not clean:
continue
# Reject embedded newlines/carriage returns to prevent .env injection
if "\n" in clean or "\r" in clean:
raise ValueError("API key must not contain newline characters.")
os.environ[key] = clean
if key in existing_key_indices:
output_lines[existing_key_indices[key]] = f"{key}={clean}"
else:
new_keys.append(f"{key}={clean}")
# Remove deleted lines (None sentinels)
output_lines = [l for l in output_lines if l is not None]
# Append new keys after a blank-line separator
if new_keys:
if output_lines and output_lines[-1].strip() != "":
output_lines.append("")
output_lines.extend(new_keys)
env_path.parent.mkdir(parents=True, exist_ok=True)
content = "\n".join(output_lines)
if content:
content += "\n"
# Atomic write via tempfile + os.replace so cross-process readers
# (Telegram bot, CLI) never see a half-truncated file. The shared
# ``~/.hermes/.env`` is also written by ``hermes_cli.config.save_env_value``
# using the same atomic pattern; matching it here closes the
# cross-process leg of #1164 (within-process is covered by _ENV_LOCK).
_mode = _stat.S_IRUSR | _stat.S_IWUSR # 0o600
import tempfile as _tempfile
_tmp_fd, _tmp_path = _tempfile.mkstemp(
dir=str(env_path.parent), prefix=".env_", suffix=".tmp"
)
try:
with os.fdopen(_tmp_fd, "w", encoding="utf-8") as _f:
_f.write(content)
_f.flush()
os.fsync(_f.fileno())
os.chmod(_tmp_path, _mode) # tighten before rename so readers see 0600
os.replace(_tmp_path, env_path)
except BaseException:
try:
os.unlink(_tmp_path)
except OSError:
pass
raise
try:
env_path.chmod(_mode)
except OSError:
pass
def _provider_has_key(provider_id: str) -> bool:
"""Check whether a provider has a configured API key.
Checks (in order):
1. ``~/.hermes/.env`` for the known env var
2. ``os.environ`` for the known env var
3. ``config.yaml → model.api_key`` (only if provider is the active one)
4. ``config.yaml → providers.<id>.api_key``
5. ``config.yaml → custom_providers[].api_key`` (for custom providers)
"""
env_var = _PROVIDER_ENV_VAR.get(provider_id)
if env_var:
env_path = _get_hermes_home() / ".env"
env_values = _load_env_file(env_path)
env_file_value = env_values.get(env_var)
if _provider_value_counts_as_api_key(provider_id, env_file_value):
return True
env_value = os.getenv(env_var)
if _provider_value_counts_as_api_key(provider_id, env_value):
return True
# Fall back to legacy env-var aliases (e.g. lmstudio's pre-#1500
# LMSTUDIO_API_KEY name) so existing users don't lose detection
# after an env-var rename. See _PROVIDER_ENV_VAR_ALIASES.
for alias in _PROVIDER_ENV_VAR_ALIASES.get(provider_id, ()) or ():
if _provider_value_counts_as_api_key(provider_id, env_values.get(alias)):
return True
if _provider_value_counts_as_api_key(provider_id, os.getenv(alias)):
return True
cfg = get_config()
# Check model.api_key — only match if this provider is the active one.
# Previously this checked globally, causing all providers to show
# "configured" when the active provider had a top-level api_key.
model_cfg = cfg.get("model", {})
if isinstance(model_cfg, dict) and str(model_cfg.get("api_key") or "").strip():
active_provider = model_cfg.get("provider")
if active_provider and str(active_provider).strip().lower() == provider_id.lower():
if _provider_value_counts_as_api_key(provider_id, model_cfg.get("api_key")):
return True
# Check providers.<id>.api_key
providers_cfg = cfg.get("providers", {})
if isinstance(providers_cfg, dict):
provider_cfg = providers_cfg.get(provider_id, {})
if isinstance(provider_cfg, dict) and str(provider_cfg.get("api_key") or "").strip():
if _provider_value_counts_as_api_key(provider_id, provider_cfg.get("api_key")):
return True
# Check custom_providers
custom_providers = cfg.get("custom_providers", [])
if isinstance(custom_providers, list):
for cp in custom_providers:
if isinstance(cp, dict):
if _custom_provider_name_matches(provider_id, cp.get("name")):
if _provider_value_counts_as_api_key(provider_id, cp.get("api_key")):
return True
return False
def _get_provider_api_key(provider_id: str) -> str | None:
"""Return a configured provider API key without exposing it to callers."""
provider_id = (provider_id or "").strip().lower()
env_var = _PROVIDER_ENV_VAR.get(provider_id)
if env_var:
env_path = _get_hermes_home() / ".env"
env_values = _load_env_file(env_path)
env_file_value = env_values.get(env_var)
if _provider_value_counts_as_api_key(provider_id, env_file_value):
return str(env_file_value).strip() or None
env_value = os.getenv(env_var)
if _provider_value_counts_as_api_key(provider_id, env_value):
return str(env_value).strip() or None
for alias in _PROVIDER_ENV_VAR_ALIASES.get(provider_id, ()) or ():
alias_file_value = env_values.get(alias)
if _provider_value_counts_as_api_key(provider_id, alias_file_value):
return str(alias_file_value).strip() or None
alias_value = os.getenv(alias)
if _provider_value_counts_as_api_key(provider_id, alias_value):
return str(alias_value).strip() or None
cfg = get_config()
model_cfg = cfg.get("model", {})
if isinstance(model_cfg, dict):
active_provider = str(model_cfg.get("provider") or "").strip().lower()
model_key = str(model_cfg.get("api_key") or "").strip()
if model_key and active_provider == provider_id and _provider_value_counts_as_api_key(provider_id, model_key):
return model_key
providers_cfg = cfg.get("providers", {})
if isinstance(providers_cfg, dict):
provider_cfg = providers_cfg.get(provider_id, {})
if isinstance(provider_cfg, dict):
provider_key = str(provider_cfg.get("api_key") or "").strip()
if _provider_value_counts_as_api_key(provider_id, provider_key):
return provider_key
custom_providers = cfg.get("custom_providers", [])
if isinstance(custom_providers, list):
for cp in custom_providers:
if not isinstance(cp, dict):
continue
if _custom_provider_name_matches(provider_id, cp.get("name")):
cp_key = str(cp.get("api_key") or "").strip()
if cp_key.startswith("${") and cp_key.endswith("}"):
return os.getenv(cp_key[2:-1], "").strip() or None
if _provider_value_counts_as_api_key(provider_id, cp_key):
return cp_key
return None
def _active_provider_id() -> str | None:
cfg = get_config()
model_cfg = cfg.get("model", {})
if not isinstance(model_cfg, dict):
return None
provider = str(model_cfg.get("provider") or "").strip().lower()
return provider or None
def _quota_number(value: Any) -> int | float | None:
if isinstance(value, bool) or value is None:
return None
if isinstance(value, (int, float)):
return value
try:
text = str(value).strip()
if not text:
return None
number = float(text)
return int(number) if number.is_integer() else number
except (TypeError, ValueError):
return None
def _sanitize_openrouter_quota(payload: Any) -> dict[str, int | float | None]:
if isinstance(payload, dict) and isinstance(payload.get("data"), dict):
payload = payload["data"]
if not isinstance(payload, dict):
payload = {}
return {
"limit_remaining": _quota_number(payload.get("limit_remaining")),
"usage": _quota_number(payload.get("usage")),
"limit": _quota_number(payload.get("limit")),
}
def _isoformat_utc(value: Any) -> str | None:
if value in (None, ""):
return None
if isinstance(value, datetime):
dt = value if value.tzinfo else value.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
text = str(value).strip()
return text or None
def _serialize_account_usage_snapshot(snapshot: Any) -> dict[str, Any] | None:
if snapshot is None:
return None
windows: list[dict[str, Any]] = []
for window in getattr(snapshot, "windows", ()) or ():
label = str(getattr(window, "label", "") or "").strip()
if not label:
continue
used_percent = _quota_number(getattr(window, "used_percent", None))
remaining_percent = None
if used_percent is not None:
remaining_percent = max(0.0, min(100.0, 100.0 - float(used_percent)))
windows.append({
"label": label,
"used_percent": used_percent,
"remaining_percent": remaining_percent,
"reset_at": _isoformat_utc(getattr(window, "reset_at", None)),
"detail": str(getattr(window, "detail", "") or "").strip() or None,
})
details = [
str(detail).strip()
for detail in (getattr(snapshot, "details", ()) or ())
if str(detail).strip()
]
plan = str(getattr(snapshot, "plan", "") or "").strip() or None
unavailable_reason = str(getattr(snapshot, "unavailable_reason", "") or "").strip() or None
result = {
"provider": str(getattr(snapshot, "provider", "") or "").strip() or None,
"source": str(getattr(snapshot, "source", "") or "").strip() or None,
"title": str(getattr(snapshot, "title", "") or "").strip() or "Account limits",
"plan": plan,
"windows": windows,
"details": details,
"available": bool(getattr(snapshot, "available", bool(windows or details))) and not unavailable_reason,
"unavailable_reason": unavailable_reason,
"fetched_at": _isoformat_utc(getattr(snapshot, "fetched_at", None)),
}
pool = getattr(snapshot, "pool", None)
if isinstance(pool, dict):
result["pool"] = pool
return result
def _agent_fetch_account_usage(provider: str, *, base_url: str | None = None, api_key: str | None = None) -> Any:
from agent.account_usage import fetch_account_usage
return fetch_account_usage(provider, base_url=base_url, api_key=api_key)
def _account_usage_subprocess_env(home: Path, provider: str, api_key: str | None) -> dict[str, str]:
env = dict(os.environ)
env["HERMES_HOME"] = str(Path(home))
# Profile .env values should affect only the child quota probe, not the
# WebUI process-global environment. This is especially important for
# Anthropic account usage, where the agent resolver reads OAuth/API tokens
# from environment variables.
for key, value in _load_env_file(Path(home) / ".env").items():
if value:
env[key] = value
env_var = _PROVIDER_ENV_VAR.get((provider or "").strip().lower())
if env_var and api_key:
env[env_var] = api_key
try:
from api.config import _AGENT_DIR
except Exception:
_AGENT_DIR = None
pythonpath_parts: list[str] = []
if _AGENT_DIR:
pythonpath_parts.append(str(_AGENT_DIR))
existing_pythonpath = env.get("PYTHONPATH", "")
if existing_pythonpath:
pythonpath_parts.append(existing_pythonpath)
if pythonpath_parts:
env["PYTHONPATH"] = os.pathsep.join(pythonpath_parts)
return env
def _account_usage_payload_to_snapshot(payload: Any) -> Any:
if not isinstance(payload, dict):
return None
windows = tuple(
SimpleNamespace(
label=window.get("label"),
used_percent=window.get("used_percent"),
reset_at=window.get("reset_at"),
detail=window.get("detail"),
)
for window in (payload.get("windows") or ())
if isinstance(window, dict)
)
return SimpleNamespace(
provider=payload.get("provider"),
source=payload.get("source"),
title=payload.get("title"),
plan=payload.get("plan"),
windows=windows,
details=tuple(payload.get("details") or ()),
available=bool(payload.get("available")),
unavailable_reason=payload.get("unavailable_reason"),
fetched_at=payload.get("fetched_at"),
pool=payload.get("pool") if isinstance(payload.get("pool"), dict) else None,
)
def _account_usage_cache_key(provider: str, home: Path, api_key: str | None) -> tuple[str, str, str]:
key_fingerprint = ""
if api_key:
key_fingerprint = hashlib.sha256(api_key.encode("utf-8", "ignore")).hexdigest()
return ((provider or "").strip().lower(), str(Path(home)), key_fingerprint)
def _get_cached_account_usage(cache_key: tuple[str, str, str]) -> tuple[bool, Any]:
now = time.monotonic()
with _account_usage_status_cache_lock:
cached = _account_usage_status_cache.get(cache_key)
if cached is None:
return False, None
fetched_at, snapshot = cached
if now - fetched_at <= _ACCOUNT_USAGE_CACHE_TTL_SECONDS:
return True, snapshot
_account_usage_status_cache.pop(cache_key, None)
return False, None
def invalidate_account_usage_status_cache(provider_id: str | None = None) -> None:
normalized = str(provider_id or "").strip().lower()
with _account_usage_status_cache_lock:
if not normalized:
_account_usage_status_cache.clear()
return
for key in list(_account_usage_status_cache):
if key[0] == normalized:
_account_usage_status_cache.pop(key, None)
def _set_cached_account_usage(
cache_key: tuple[str, str, str],
snapshot: Any,
) -> None:
now = time.monotonic()
with _account_usage_status_cache_lock:
if snapshot is None:
cached = _account_usage_status_cache.get(cache_key)
if cached is not None and cached[1] is not None:
return
_account_usage_status_cache.pop(cache_key, None)
return
_account_usage_status_cache[cache_key] = (now, snapshot)
expired = [
key for key, (fetched_at, _snapshot) in _account_usage_status_cache.items()
if now - fetched_at > _ACCOUNT_USAGE_CACHE_TTL_SECONDS
]
for key in expired:
_account_usage_status_cache.pop(key, None)
while len(_account_usage_status_cache) > _ACCOUNT_USAGE_CACHE_MAX_ENTRIES:
oldest_key = min(
_account_usage_status_cache,
key=lambda key: _account_usage_status_cache[key][0],
)
_account_usage_status_cache.pop(oldest_key, None)
def _agent_fetch_account_usage_for_home(provider: str, home: Path, *, api_key: str | None = None) -> Any:
try:
from api.config import PYTHON_EXE
except Exception:
PYTHON_EXE = sys.executable or "python3"
try:
# On POSIX (Linux/macOS), wire parent-death signal so the child dies
# cleanly if the WebUI parent terminates. preexec_fn is not safe on
# Windows, where OS-level process-tree cleanup handles child orphans.
kwargs: dict[str, Any] = {
"stdin": subprocess.DEVNULL,
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
"text": True,
"timeout": _ACCOUNT_USAGE_SUBPROCESS_TIMEOUT_SECONDS,
"check": False,
}
if hasattr(os, "fork"): # POSIX
kwargs["preexec_fn"] = _account_usage_preexec_fn
proc = subprocess.run(
[
PYTHON_EXE, "-c",
_ACCOUNT_USAGE_PARENT_DEATHSIG_BOOTSTRAP + _ACCOUNT_USAGE_SUBPROCESS_CODE,
provider,
api_key or "",
],
env=_account_usage_subprocess_env(home, provider, api_key),
**kwargs,
)
except subprocess.TimeoutExpired:
logger.debug("Account usage probe for %s timed out", provider)
return None
except Exception:
logger.debug("Account usage probe for %s failed to launch", provider, exc_info=True)
return None
if proc.returncode != 0:
logger.debug("Account usage probe for %s exited with status %s", provider, proc.returncode)
return None
try:
payload = json.loads((proc.stdout or "").strip() or "null")
except json.JSONDecodeError:
logger.debug("Account usage probe for %s returned invalid JSON", provider)
return None
return _account_usage_payload_to_snapshot(payload)
def _fetch_account_usage_with_profile_context(provider: str, *, refresh: bool = False) -> Any:
"""Fetch account usage for a provider within the active profile context.
Concurrency is capped by the module-level BoundedSemaphore so that rapid
UI polls (e.g. Settings page refresh) cannot exhaust file-descriptors or
memory by spawning more than _MAX_CONCURRENT_ACCOUNT_USAGE_PROBES probe
subprocesses simultaneously. Each probe runs up to 35 s.
A warm worker-pool (reuse of persistent subprocess handles) is a natural
follow-up if this first slice proves insufficient in production.
"""
home = _get_hermes_home()
api_key = _get_provider_api_key(provider)
cache_key = _account_usage_cache_key(provider, home, api_key)
if not refresh:
cache_hit, cached = _get_cached_account_usage(cache_key)
if cache_hit:
return cached
sem = _get_account_usage_probe_semaphore()
try:
with sem:
snapshot = _agent_fetch_account_usage_for_home(
provider,
home,
api_key=api_key,
)
_set_cached_account_usage(cache_key, snapshot)
return snapshot
except Exception:
logger.debug("Failed to fetch account usage for %s", provider, exc_info=True)
_set_cached_account_usage(cache_key, None)
return None
def _provider_account_usage_status(provider: str, display_name: str, *, refresh: bool = False) -> dict[str, Any]:
snapshot = _fetch_account_usage_with_profile_context(provider, refresh=refresh)
account_limits = _serialize_account_usage_snapshot(snapshot)
if account_limits and account_limits.get("available"):
return {
"ok": True,
"provider": provider,
"display_name": display_name,
"supported": True,
"status": "available",
"label": account_limits.get("title") or "Account limits",
"quota": None,
"account_limits": account_limits,
"message": f"{display_name} account limits loaded.",
}
reason = ""
if account_limits:
reason = str(account_limits.get("unavailable_reason") or "").strip()
message = (
f"{display_name} account limits are unavailable. {reason}"
if reason
else f"{display_name} account limits are unavailable. Confirm provider authentication and try again."
)
return {
"ok": False,
"provider": provider,
"display_name": display_name,
"supported": True,
"status": "unavailable",
"quota": None,
"account_limits": account_limits,
"message": message,
}
def get_provider_quota(provider_id: str | None = None, *, refresh: bool = False) -> dict[str, Any]:
"""Return sanitized quota/rate-limit status for the active provider.
OpenRouter keeps its documented key endpoint. OAuth-backed account usage
providers reuse Hermes Agent's /usage account-limits abstraction so WebUI
stays aligned with CLI/Gateway provider semantics.
"""
provider = (provider_id or _active_provider_id() or "").strip().lower()
if not provider:
return {
"ok": False,
"provider": None,
"display_name": None,
"supported": False,
"status": "unavailable",
"quota": None,
"message": "No active provider is configured.",
}
display_name = _PROVIDER_DISPLAY.get(provider, provider.replace("-", " ").title())
if provider in _ACCOUNT_USAGE_PROVIDERS:
return _provider_account_usage_status(provider, display_name, refresh=refresh)
if provider != "openrouter":
detail = "OpenAI/Anthropic rate-limit headers are a follow-up once WebUI captures provider response metadata."
return {
"ok": False,
"provider": provider,
"display_name": display_name,
"supported": False,
"status": "unsupported",
"quota": None,
"message": f"Quota status is not available for {display_name}. {detail}",
}
api_key = _get_provider_api_key("openrouter")
if not api_key:
return {
"ok": False,
"provider": "openrouter",
"display_name": display_name,
"supported": True,
"status": "no_key",
"quota": None,
"message": "OpenRouter quota status needs an OPENROUTER_API_KEY configured on the server.",
}
req = urllib.request.Request(
_OPENROUTER_KEY_URL,
headers={
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=_PROVIDER_QUOTA_TIMEOUT_SECONDS) as resp:
raw = resp.read()
payload = json.loads(raw.decode("utf-8")) if isinstance(raw, (bytes, bytearray)) else json.loads(raw)
quota = _sanitize_openrouter_quota(payload)
return {
"ok": True,
"provider": "openrouter",
"display_name": display_name,
"supported": True,
"status": "available",
"label": "OpenRouter credits",
"quota": quota,
"message": "OpenRouter quota status loaded.",
}
except urllib.error.HTTPError as exc:
status = "invalid_key" if exc.code in (401, 403) else "unavailable"
message = (
"OpenRouter rejected the configured API key."
if status == "invalid_key"
else "OpenRouter quota status is temporarily unavailable."
)
return {
"ok": False,
"provider": "openrouter",
"display_name": display_name,
"supported": True,
"status": status,
"quota": None,
"message": message,
}
except (TimeoutError, urllib.error.URLError, json.JSONDecodeError, OSError, ValueError):
return {
"ok": False,
"provider": "openrouter",
"display_name": display_name,
"supported": True,
"status": "unavailable",
"quota": None,
"message": "OpenRouter quota status is temporarily unavailable.",
}
def _provider_is_oauth(provider_id: str) -> bool:
"""Check whether a provider uses OAuth/token flows (managed by CLI)."""
return provider_id in _OAUTH_PROVIDERS
# ── OpenRouter cost-history snapshot helpers (#692) ──────────────────────────
_COST_SNAPSHOTS_DIR_NAME = "cost-snapshots"
_COST_SNAPSHOT_MAX_DAYS = 365 # hard cap to prevent unbounded growth
_COST_SNAPSHOT_LOCK = threading.Lock()
def _cost_snapshots_dir() -> Path:
"""Return the directory for cost-snapshot JSON files.
Uses the Hermes home directory (profile-aware) so snapshots are
isolated per profile, matching the existing STATE_DIR convention.
"""
return _get_hermes_home() / _COST_SNAPSHOTS_DIR_NAME
@contextmanager
def _cost_snapshot_file_lock(provider: str):
"""Serialize cost snapshot read-modify-write across worker processes."""
if fcntl is None:
with nullcontext():
yield
return
snap_dir = _cost_snapshots_dir()
snap_dir.mkdir(parents=True, exist_ok=True)
lock_path = snap_dir / f"{provider}.lock"
with lock_path.open("a", encoding="utf-8") as lock_file:
fcntl.flock(lock_file, fcntl.LOCK_EX)
yield
def _fetch_openrouter_key_usage(api_key: str) -> dict[str, Any] | None:
"""Fetch current usage/limit from the OpenRouter ``/auth/key`` endpoint.
Returns a dict with ``usage``, ``limit``, ``label`` on success, or
``None`` on any failure. Never raises; callers handle the None case.
"""
req = urllib.request.Request(
_OPENROUTER_KEY_URL,
headers={
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=_PROVIDER_QUOTA_TIMEOUT_SECONDS) as resp:
raw = resp.read()
payload = json.loads(raw.decode("utf-8") if isinstance(raw, (bytes, bytearray)) else raw)
sanitized = _sanitize_openrouter_quota(payload)
label = None
if isinstance(payload, dict):
data = payload.get("data", payload)
if isinstance(data, dict):
label = str(data.get("label") or "").strip() or None
return {
"usage": sanitized.get("usage"),
"limit": sanitized.get("limit"),
"label": label,
}
except Exception:
logger.debug("OpenRouter key usage fetch failed for cost-history", exc_info=True)
return None
def _read_cost_snapshots(provider: str) -> list[dict[str, Any]]:
"""Read persisted daily snapshots for *provider* from disk.
Returns a list of ``{date, used, limit}`` dicts sorted by date
ascending. Returns an empty list if the file does not exist or is
corrupt.
"""
path = _cost_snapshots_dir() / f"{provider}.json"
if not path.exists():
return []
try:
raw = path.read_text(encoding="utf-8")
data = json.loads(raw)
except (OSError, json.JSONDecodeError, ValueError):
return []
if not isinstance(data, dict):
return []
snapshots = data.get("snapshots")
if not isinstance(snapshots, list):
return []
# Validate and sort
valid = []
for entry in snapshots:
if not isinstance(entry, dict):
continue
date = str(entry.get("date") or "").strip()
if not date:
continue
valid.append({
"date": date,
"used": _quota_number(entry.get("used")),
"limit": _quota_number(entry.get("limit")),
})
valid.sort(key=lambda e: e["date"])
return valid
def _write_cost_snapshots(provider: str, snapshots: list[dict[str, Any]]) -> None:
"""Persist daily snapshots for *provider* to disk atomically."""
snap_dir = _cost_snapshots_dir()
snap_dir.mkdir(parents=True, exist_ok=True)
path = snap_dir / f"{provider}.json"
payload = {"provider": provider, "snapshots": snapshots}
body = json.dumps(payload, ensure_ascii=False, indent=2)
import tempfile as _tempfile
_tmp_fd, _tmp_path = _tempfile.mkstemp(
dir=str(snap_dir), prefix=f".{provider}_", suffix=".tmp"
)
try:
with os.fdopen(_tmp_fd, "w", encoding="utf-8") as _f:
_f.write(body)
_f.flush()
os.fsync(_f.fileno())
os.replace(_tmp_path, path)
except BaseException:
try:
os.unlink(_tmp_path)
except OSError:
pass
raise
def _append_cost_snapshot(provider: str, usage: int | float | None, limit: int | float | None) -> list[dict[str, Any]]:
"""Append today's snapshot and return the updated list.
If a snapshot for today already exists it is updated in-place so
repeated calls within the same day are idempotent.
"""
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
# Serialize the read-modify-write cycle. The atomic os.replace in
# _write_cost_snapshots protects the file write itself, but without these
# locks two concurrent requests can both read the same old snapshot list and
# race to replace it with stale data. The threading lock covers current
# single-process deployments; the file lock covers future multi-worker
# deployments that share one Hermes home/state directory.
with _COST_SNAPSHOT_LOCK:
with _cost_snapshot_file_lock(provider):
snapshots = _read_cost_snapshots(provider)
# Update or append today's entry
updated = False
for entry in snapshots:
if entry["date"] == today:
entry["used"] = usage
entry["limit"] = limit
updated = True
break
if not updated:
snapshots.append({"date": today, "used": usage, "limit": limit})
snapshots.sort(key=lambda e: e["date"])
# Cap to _COST_SNAPSHOT_MAX_DAYS entries (keep most recent)
if len(snapshots) > _COST_SNAPSHOT_MAX_DAYS:
snapshots = snapshots[-_COST_SNAPSHOT_MAX_DAYS:]
_write_cost_snapshots(provider, snapshots)
return snapshots
def _compute_deltas(snapshots: list[dict[str, Any]], window_days: int) -> list[dict[str, Any]]:
"""Compute daily deltas from cumulative usage snapshots.
Each snapshot carries cumulative ``used``; the delta for a day is
the difference between that day's cumulative value and the previous
day's. The oldest day in the window has ``delta=None`` (no
previous baseline). If the cumulative value drops, treat that day
as the start of a fresh series (for example after an API-key rotation)
and use the current value as that day's delta instead of emitting a
negative spend bar.
"""
# Take only the last *window_days* entries
window = snapshots[-window_days:] if len(snapshots) > window_days else list(snapshots)
result: list[dict[str, Any]] = []
for i, entry in enumerate(window):
delta = None
if i > 0 and entry.get("used") is not None and window[i - 1].get("used") is not None:
delta = float(entry["used"]) - float(window[i - 1]["used"])
if delta < 0:
delta = float(entry["used"])
# Rounding: avoid -0.0 and tiny floating-point noise
if abs(delta) < 1e-9:
delta = 0.0
else:
delta = round(delta, 6)
result.append({
"date": entry["date"],
"used": entry.get("used"),
"delta": delta,
})
return result
def get_provider_cost_history(provider_id: str | None = None, days: int = 7) -> dict[str, Any]:
"""Return daily cost-history snapshots with deltas for a provider.
Currently only ``openrouter`` is supported. On each call the
endpoint fetches the current cumulative usage from the OpenRouter
``/auth/key`` endpoint, appends/updates today's snapshot, and
returns the last *days* snapshots with per-day deltas.
Returns a dict matching the existing API style (``ok``, ``provider``,
``status``, ``message``, …).
"""
provider = (provider_id or "").strip().lower()
if not provider:
return {
"ok": False,
"provider": None,
"status": "missing_provider",
"message": "Provider parameter is required. Use ?provider=openrouter",
}
if provider != "openrouter":
display_name = _PROVIDER_DISPLAY.get(provider, provider.replace("-", " ").title())
return {
"ok": False,
"provider": provider,
"display_name": display_name,
"supported": False,
"status": "unsupported",
"message": f"Cost history is not available for {display_name}. Only openrouter is supported in this release.",
}
display_name = _PROVIDER_DISPLAY.get("openrouter", "OpenRouter")
api_key = _get_provider_api_key("openrouter")
if not api_key:
return {
"ok": False,
"provider": "openrouter",
"display_name": display_name,
"supported": True,
"status": "no_key",
"message": "OpenRouter cost history needs an OPENROUTER_API_KEY configured on the server.",
}
# Fetch current cumulative usage from OpenRouter
key_info = _fetch_openrouter_key_usage(api_key)
if key_info is None:
# Upstream failure — still return any previously persisted snapshots
# so the chart degrades gracefully instead of going blank.
snapshots = _read_cost_snapshots("openrouter")
deltas = _compute_deltas(snapshots, days)
return {
"ok": False,
"provider": "openrouter",
"display_name": display_name,
"supported": True,
"status": "unavailable",
"window_days": days,
"snapshots": deltas,
"limit": None,
"label": None,
"message": "OpenRouter cost history is temporarily unavailable. Showing last known data.",
}
# Persist today's snapshot
try:
snapshots = _append_cost_snapshot("openrouter", key_info["usage"], key_info["limit"])
except Exception:
logger.debug("Failed to persist cost snapshot for openrouter", exc_info=True)
snapshots = _read_cost_snapshots("openrouter")
deltas = _compute_deltas(snapshots, days)
return {
"ok": True,
"provider": "openrouter",
"display_name": display_name,
"supported": True,
"status": "available",
"window_days": days,
"snapshots": deltas,
"limit": key_info.get("limit"),
"label": key_info.get("label") or "OpenRouter credits",
"message": "OpenRouter cost history loaded.",
}
# SECTION: Public API
def get_providers() -> dict[str, Any]:
"""Return a list of all known providers with their configuration status.
Each entry contains:
- ``id``: canonical provider slug
- ``display_name``: human-readable name
- ``has_key``: whether an API key is configured
- ``configurable``: whether the key can be set from the WebUI
- ``key_source``: where the key was found (``env_file``, ``env_var``,
``config_yaml``, ``oauth``, ``none``)
- ``models``: list of known model IDs for this provider
"""
providers = []
# Collect all known provider IDs from multiple sources
known_ids = set(_PROVIDER_DISPLAY.keys()) | set(_PROVIDER_MODELS.keys())
# Also detect providers from config.yaml providers section
cfg = get_config()
providers_cfg = cfg.get("providers", {})
if isinstance(providers_cfg, dict):
known_ids.update(providers_cfg.keys())
# Add OAuth providers even if not in _PROVIDER_DISPLAY
known_ids.update(_OAUTH_PROVIDERS)
for pid in sorted(known_ids):
display_name = _PROVIDER_DISPLAY.get(pid, pid.replace("-", " ").title())
is_oauth = _provider_is_oauth(pid)
has_key = _provider_has_key(pid)
# Determine key source
key_source = "none"
auth_error = None
if is_oauth:
key_source = "oauth"
# Check if actually authenticated via hermes_cli.
# IMPORTANT: do not unconditionally overwrite has_key from _provider_has_key().
# A token in config.yaml is a valid credential even when get_auth_status()
# returns logged_in=False (e.g. token not in the hermes credential pool,
# or refresh token consumed by native Codex CLI / VS Code extension).
try:
from hermes_cli.auth import get_auth_status as _gas
status = _gas(pid)
if isinstance(status, dict) and status.get("logged_in"):
has_key = True
key_source = status.get("key_source", "oauth")
elif has_key:
# _provider_has_key() found a token in config.yaml — respect it
# rather than hiding a working credential from the Settings UI.
key_source = "config_yaml"
auth_error = status.get("error") if isinstance(status, dict) else None
else:
has_key = False
auth_error = status.get("error") if isinstance(status, dict) else None
except Exception:
# Import failed or auth check errored — don't override a known-good
# key just because the hermes_cli auth module is unavailable.
logger.debug("hermes_cli auth check failed for %s", pid, exc_info=True)
# keep has_key from _provider_has_key()
elif has_key:
env_var = _PROVIDER_ENV_VAR.get(pid)
if env_var:
env_path = _get_hermes_home() / ".env"
env_values = _load_env_file(env_path)
if _provider_value_counts_as_api_key(pid, env_values.get(env_var)):
key_source = "env_file"
elif _provider_value_counts_as_api_key(pid, os.getenv(env_var)):
key_source = "env_var"
else:
# Canonical name not set; check legacy aliases (e.g. lmstudio's
# pre-#1500 LMSTUDIO_API_KEY) so existing users see "env_file"
# instead of being misreported as "config_yaml" when the key
# actually lives in .env under the old name.
aliased = False
for alias in _PROVIDER_ENV_VAR_ALIASES.get(pid, ()) or ():
if _provider_value_counts_as_api_key(pid, env_values.get(alias)):
key_source = "env_file"
aliased = True
break
if _provider_value_counts_as_api_key(pid, os.getenv(alias)):
key_source = "env_var"
aliased = True
break
if not aliased:
key_source = "config_yaml"
else:
key_source = "config_yaml"
elif pid not in _PROVIDER_ENV_VAR:
# Fallback: provider is not a known API-key provider and not in
# the hardcoded _OAUTH_PROVIDERS set. It may be a custom or
# newly-added OAuth provider (e.g. Anthropic connected via OAuth).
# Check live auth status so the Providers tab agrees with the
# model picker (#1212).
#
# IMPORTANT: we skip providers in _PROVIDER_ENV_VAR because they
# are pure API-key providers — calling get_auth_status() for every
# unconfigured API-key provider would add unnecessary latency
# (network round-trip per provider) on the Settings page.
# Validate pid looks like a real provider before probing
import re as _re
if _re.match(r'^[a-z][a-z0-9_-]{0,63}$', pid):
try:
from hermes_cli.auth import get_auth_status as _gas
status = _gas(pid)
if isinstance(status, dict) and status.get("logged_in"):
has_key = True
# Constrain key_source to a known-safe closed set
_raw_ks = status.get("key_source", "")
key_source = _raw_ks if _raw_ks in {"oauth", "env", "config", "token"} else "oauth"
is_oauth = True
except Exception:
pass
if pid == "openai" and not has_key and _provider_has_shadowed_codex_oauth_value(pid):
continue
models = list(_PROVIDER_MODELS.get(pid, []))
models_total = len(models)
# OpenAI Codex account catalogs drift independently from WebUI releases.
# The model picker already prefers hermes_cli + Codex local cache for
# this provider (the agent's `provider_model_ids("openai-codex")` filters
# IDs with `supported_in_api: false`, but Codex CLI still surfaces some
# of those — notably `gpt-5.3-codex-spark` from #1680 — in its picker).
# Merge both sources here so the providers card matches the picker
# exactly. Static entries remain the offline fallback when live
# discovery and the local Codex cache are both unavailable. (#1807
# follow-up to v0.51.19 #1812.)
if pid == "openai-codex":
live_ids = _read_live_provider_model_ids("openai-codex")
for mid in _read_visible_codex_cache_model_ids():
if mid not in live_ids:
live_ids.append(mid)
live_models = _models_from_live_provider_ids(pid, live_ids)
if live_models:
models = live_models
models_total = len(models)
# Nous Portal: prefer the live catalog so the providers card matches
# the dropdown picker (#1538). Same fallback shape as the static-only
# case below — when hermes_cli is unavailable or its lookup raises,
# we keep the four-entry curated list.
#
# On large-tier accounts (#1567 reporter Deor saw 396 entries), we
# render the same featured subset the picker uses so the providers
# card body doesn't become a 396-pill wall. The full count is still
# reported via models_total — surfaced in the header line as
# "396 models · OAuth" by static/panels.js — so the user knows the
# complete catalog is reachable (via /model autocomplete or a future
# "show all" disclosure if added).
if pid == "nous":
try:
from hermes_cli.models import provider_model_ids as _provider_model_ids
live_ids = _provider_model_ids("nous") or []
if live_ids:
# Lazy-import to avoid circular dep with api.config.
from api.config import _format_nous_label, _build_nous_featured_set
featured_ids, _extras = _build_nous_featured_set(live_ids)
models = [
{"id": f"@nous:{mid}", "label": _format_nous_label(mid)}
for mid in featured_ids
]
models_total = len(live_ids)
except Exception:
logger.debug("Failed to load Nous Portal models from hermes_cli")
# LM Studio: fetch live locally-loaded models so the providers card
# matches what's actually available on the user's server (#WebUI).
if pid == "lmstudio":
try:
from hermes_cli.models import provider_model_ids as _pmi
lm_live = _pmi("lmstudio") or []
if lm_live:
models = [{"id": mid, "label": mid} for mid in lm_live]
models_total = len(models)
except Exception:
logger.debug("Failed to load LM Studio models from hermes_cli")
# Also include models from config.yaml providers section
if isinstance(providers_cfg, dict):
provider_cfg = providers_cfg.get(pid, {})
if isinstance(provider_cfg, dict) and "models" in provider_cfg:
cfg_models = provider_cfg["models"]
if isinstance(cfg_models, dict):
models = models + [{"id": k, "label": k} for k in cfg_models.keys()]
elif isinstance(cfg_models, list):
models = models + [{"id": k, "label": k} for k in cfg_models]
# Recompute models_total when config.yaml contributes additional
# entries on top of the live/static catalog. For non-Nous
# providers models_total still equals len(models); for Nous
# we keep the live count (which already includes any models
# surfaced in the curated featured slice).
if pid != "nous":
models_total = len(models)
providers.append({
"id": pid,
"display_name": display_name,
"has_key": has_key,
"configurable": not is_oauth and pid in _PROVIDER_ENV_VAR,
"is_oauth": is_oauth,
"key_source": key_source,
"auth_error": auth_error,
"models": models,
# models_total reflects the complete catalog size (e.g. 396 for
# an enterprise Nous Portal account), even when "models" is
# trimmed to a featured subset for UI scannability. The frontend
# uses this for the header text "396 models · OAuth" so users
# know the full catalog exists and is reachable via the slash
# command. For providers that don't trim, models_total ==
# len(models) and the frontend behaves identically to before.
"models_total": models_total,
})
# Scan custom_providers from config.yaml (e.g. glmcode, timicc)
custom_providers_cfg = cfg.get("custom_providers", [])
if isinstance(custom_providers_cfg, list):
for cp in custom_providers_cfg:
if not isinstance(cp, dict) or not cp.get("name"):
continue
cp_name = str(cp["name"]).strip()
cp_id = _custom_provider_slug_from_name(cp_name)
if not cp_id:
logger.warning(
"Custom provider entry %r produced empty slug; skipping",
cp_name,
)
continue
# Collect models from `models` list or `model` single
cp_models = []
if isinstance(cp.get("models"), list):
cp_models = [{"id": str(m), "label": str(m)} for m in cp["models"]]
elif cp.get("model"):
cp_models = [{"id": cp["model"], "label": cp["model"]}]
# Check for env var reference (${VAR_NAME} pattern)
cp_api_key = str(cp.get("api_key") or "")
cp_has_key = bool(cp_api_key.strip())
# Replace env var reference to check actual value
if cp_api_key.startswith("${") and cp_api_key.endswith("}"):
env_var = cp_api_key[2:-1]
cp_has_key = bool(os.getenv(env_var, "").strip())
providers.append({
"id": cp_id,
"display_name": cp_name,
"has_key": cp_has_key,
"configurable": False, # custom providers managed via config.yaml
"key_source": "config_yaml" if cp_has_key else "none",
"models": cp_models,
})
# Determine active provider
active_provider = None
model_cfg = cfg.get("model", {})
if isinstance(model_cfg, dict):
active_provider = model_cfg.get("provider")
return {
"providers": providers,
"active_provider": active_provider,
}
def set_provider_key(provider_id: str, api_key: str | None) -> dict[str, Any]:
"""Set or update the API key for a provider.
Writes the key to ``~/.hermes/.env`` using the standard env var name.
If ``api_key`` is None or empty, the key is removed.
Returns a status dict with the operation result.
"""
provider_id = provider_id.strip().lower()
if not provider_id:
return {"ok": False, "error": "Provider ID is required."}
if _provider_is_oauth(provider_id):
return {
"ok": False,
"error": f"'{_PROVIDER_DISPLAY.get(provider_id, provider_id)}' uses OAuth authentication. "
f"Use `hermes model` in the terminal to configure it.",
}
env_var = _PROVIDER_ENV_VAR.get(provider_id)
if not env_var:
return {
"ok": False,
"error": f"Cannot configure API key for '{_PROVIDER_DISPLAY.get(provider_id, provider_id)}'. "
f"This provider does not have a known env var mapping.",
}
# Validate API key format (basic sanity check)
if api_key:
api_key = api_key.strip()
if "\n" in api_key or "\r" in api_key:
return {"ok": False, "error": "API key must not contain newline characters."}
if len(api_key) < 8:
return {"ok": False, "error": "API key appears too short."}
env_path = _get_hermes_home() / ".env"
try:
_write_env_file(env_path, {env_var: api_key})
except ValueError as exc:
return {"ok": False, "error": str(exc)}
except Exception as exc:
logger.exception("Failed to write env file for provider %s", provider_id)
return {"ok": False, "error": f"Failed to save API key: {exc}"}
# Invalidate the model cache so the dropdown refreshes on next request.
# Using invalidate_models_cache() instead of reload_config() to avoid
# disrupting active streaming sessions that may be reading config.cfg.
invalidate_models_cache()
return {
"ok": True,
"provider": provider_id,
"display_name": _PROVIDER_DISPLAY.get(provider_id, provider_id),
"action": "updated" if api_key else "removed",
}
def remove_provider_key(provider_id: str) -> dict[str, Any]:
"""Remove the API key for a provider.
Removes the key from ``~/.hermes/.env`` (via ``set_provider_key``)
and also cleans up ``config.yaml`` if the key is stored there
(``providers.<id>.api_key`` or top-level ``model.api_key`` when this
provider is the active one).
Returns a status dict with the operation result.
"""
result = set_provider_key(provider_id, None)
# Even if the .env removal succeeded, the key might also live in
# config.yaml (e.g. providers.<id>.api_key or model.api_key).
# Clean those up so _provider_has_key() returns False after removal.
if result.get("ok"):
_clean_provider_key_from_config(provider_id)
return result
def _clean_provider_key_from_config(provider_id: str) -> None:
"""Remove provider API key entries from config.yaml.
Handles three storage locations:
1. ``providers.<id>.api_key`` — per-provider key
2. ``model.api_key`` — top-level key (only if provider is active)
3. ``custom_providers[].api_key`` — custom provider entries
Writes back to config.yaml only if something was actually removed.
Uses ``_cfg_lock`` to prevent TOCTOU races.
"""
from api.config import _cfg_lock
try:
# Resolve through api.config at call time instead of the function imported
# at module load. Several tests (and some profile flows) monkeypatch the
# config module's path resolver after api.providers has already been
# imported; using the stale imported reference can clean the wrong
# config.yaml.
import api.config as _config
config_path = _config._get_config_path()
except Exception:
return
if not config_path.exists():
return
try:
import yaml as _yaml
changed = False
with _cfg_lock:
raw = config_path.read_text(encoding="utf-8")
cfg = _yaml.safe_load(raw)
if not isinstance(cfg, dict):
return
# 1. Clean providers.<id>.api_key
providers_cfg = cfg.get("providers", {})
if isinstance(providers_cfg, dict):
provider_cfg = providers_cfg.get(provider_id, {})
if isinstance(provider_cfg, dict) and provider_cfg.get("api_key"):
del provider_cfg["api_key"]
changed = True
# 2. Clean model.api_key — only if this provider is the active one
model_cfg = cfg.get("model", {})
if isinstance(model_cfg, dict) and model_cfg.get("api_key"):
active_provider = model_cfg.get("provider")
if active_provider and str(active_provider).strip().lower() == provider_id.lower():
del model_cfg["api_key"]
changed = True
# 3. Clean custom_providers[].api_key
custom_providers = cfg.get("custom_providers", [])
if isinstance(custom_providers, list):
for cp in custom_providers:
if isinstance(cp, dict):
if _custom_provider_name_matches(provider_id, cp.get("name")):
if cp.get("api_key"):
del cp["api_key"]
changed = True
if changed:
_save_yaml_config_file(config_path, cfg)
# Sync in-memory cache and bust model TTL cache
# MUST be called outside _cfg_lock to avoid deadlock:
# _cfg_lock is a threading.Lock (non-reentrant) and
# reload_config() also acquires _cfg_lock internally.
if changed:
reload_config()
except Exception:
logger.exception("Failed to clean provider key from config.yaml for %s", provider_id)