"""Hermes Web UI -- provider management endpoints. Provides CRUD operations for configuring provider API keys post-onboarding. Closes #586 (allow provider key update) and part of #604 (model picker multi-provider support). """ from __future__ import annotations import base64 import hashlib import json import logging import os import signal import subprocess import sys import threading import time import urllib.error import urllib.request from contextlib import contextmanager, nullcontext from datetime import datetime, timedelta, timezone from pathlib import Path from types import SimpleNamespace from typing import Any try: # POSIX-only; Windows-style environments fall back to process-local locking. import fcntl except ImportError: # pragma: no cover - exercised only where fcntl is unavailable fcntl = None # type: ignore[assignment] from api.config import ( _PROVIDER_DISPLAY, _PROVIDER_MODELS, _custom_provider_slug_from_name, _get_label_for_model, _models_from_live_provider_ids, _read_live_provider_model_ids, _read_visible_codex_cache_model_ids, _save_yaml_config_file, get_config, invalidate_models_cache, reload_config, ) logger = logging.getLogger(__name__) def _custom_provider_name_matches(provider_id: str, name: object) -> bool: """Return True when *provider_id* refers to a named custom provider.""" pid = str(provider_id or "").strip().lower() raw_name = str(name or "").strip().lower() if not pid or not raw_name: return False slug = _custom_provider_slug_from_name(raw_name) candidates = {raw_name, f"custom:{raw_name}"} if slug: candidates.add(slug) return pid in candidates _OPENROUTER_KEY_URL = "https://openrouter.ai/api/v1/key" _PROVIDER_QUOTA_TIMEOUT_SECONDS = 3.0 _ACCOUNT_USAGE_SUBPROCESS_TIMEOUT_SECONDS = 35.0 _ACCOUNT_USAGE_CACHE_TTL_SECONDS = 45.0 _ACCOUNT_USAGE_CACHE_MAX_ENTRIES = 64 _ACCOUNT_USAGE_PROVIDERS = frozenset({"openai-codex", "anthropic"}) # Upper bound on simultaneous profile-isolated quota probe subprocesses. # Each probe runs a Python child for up to 35 s; capping concurrency prevents # resource exhaustion when the UI polls all providers rapidly. The limit is # deliberately low (2) since _ACCOUNT_USAGE_SUBPROCESS_TIMEOUT_SECONDS is # already 35 s and probe I/O is lightweight HTTP calls. _MAX_CONCURRENT_ACCOUNT_USAGE_PROBES = 2 # Parent-death-signal setup: on Linux, arrange for the quota-probe child to # receive SIGTERM when the WebUI parent dies (e.g. systemctl restart, OOM kill). # This prevents probe children from becoming orphaned zombies that continue # calling the provider API indefinitely after the WebUI process is gone. # We use prctl(PR_SET_DEATHSIG, SIGTERM) which is standard on modern Linux # kernels and available via ctypes (no external C extension needed). # If prctl is unavailable (non-Linux, or Linux without prctl support), the # probe child exits normally when its parent (WebUI) terminates -- on macOS/ # Windows this is handled by OS-level process tree cleanup. # Portable parent-death-signal bootstrap. On Linux this arranges for the # probe child to receive SIGTERM when the WebUI parent dies (systemctl # restart, OOM kill, etc.), preventing orphaned zombie probes from continuing # to call the provider API indefinitely. Non-Linux platforms (macOS, Windows) # rely on OS-level process-tree cleanup instead; this variable is then unused. # prctl(PR_SET_DEATHSIG, SIGTERM) is available via ctypes without any C # extension — the same technique used throughout the Hermes codebase. _ACCOUNT_USAGE_PARENT_DEATHSIG_BOOTSTRAP = ( # fmt: off # Lines are written as string literals so this block passes # `python3 -m py_compile` cleanly and is safe to include verbatim # inside the single argument string passed to `python -c ...`. 'import sys\n' 'try:\n' ' import ctypes, signal\n' ' libc = ctypes.CDLL(None)\n' ' libc.prctl(1, signal.SIGTERM) # PR_SET_DEATHSIG=1, SIGTERM=15\n' 'except Exception:\n' ' pass\n' # fmt: on ) # Module-level cap on concurrent quota-probe subprocesses. # Lazily created so this module compiles even when threading isn't ready. _account_usage_probe_semaphore: threading.BoundedSemaphore | None = None # Short-lived account-usage cache. The Codex pooled probe may check multiple # credentials, so cache sanitized snapshots briefly to avoid re-querying the # provider on every Settings repaint/profile-panel refresh. Pool composition # changes can be stale for at most this TTL; that is preferred to hammering the # provider usage API while the Settings panel is open. Transient None probe # results are intentionally not cached; known exhausted/unavailable states are # represented as non-None snapshots and remain cacheable. _account_usage_status_cache: dict[tuple[str, str, str], tuple[float, Any]] = {} _account_usage_status_cache_lock = threading.Lock() def _get_account_usage_probe_semaphore() -> threading.BoundedSemaphore: global _account_usage_probe_semaphore if _account_usage_probe_semaphore is None: _account_usage_probe_semaphore = threading.BoundedSemaphore( _MAX_CONCURRENT_ACCOUNT_USAGE_PROBES ) return _account_usage_probe_semaphore # ── preexec_fn: parent-death signal for the probe subprocess ───────────────── # On POSIX/Linux, arrange for the child to receive SIGTERM when the WebUI # parent dies (systemctl restart, OOM kill, etc.). The parent's bootstrap # code (_ACCOUNT_USAGE_PARENT_DEATHSIG_BOOTSTRAP) also covers the grandchild # fork inside the child, but this preexec_fn handles the direct child-process # case. Returns None on non-POSIX or when prctl is unavailable so that # subprocess.run() works on Windows/macOS without changes. def _account_usage_preexec_fn() -> None: try: import ctypes libc = ctypes.CDLL(None) libc.prctl(1, signal.SIGTERM) # PR_SET_PDEATHSIG=1, SIGTERM=15 except Exception: pass _ACCOUNT_USAGE_SUBPROCESS_CODE = r""" import base64 import json import sys from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta, timezone from types import SimpleNamespace from urllib import request as urllib_request from agent.account_usage import fetch_account_usage _CODEX_DEFAULT_BASE_URL = "https://chatgpt.com/backend-api/codex" _CODEX_POOL_USAGE_TIMEOUT_SECONDS = 4.0 _CODEX_POOL_MAX_WORKERS = 6 def _iso(value): if value in (None, ""): return None if hasattr(value, "isoformat"): text = value.isoformat() return text.replace("+00:00", "Z") text = str(value).strip() return text or None def _snapshot_payload(snapshot): if snapshot is None: return None windows = [] for window in getattr(snapshot, "windows", ()) or (): windows.append({ "label": str(getattr(window, "label", "") or ""), "used_percent": getattr(window, "used_percent", None), "reset_at": _iso(getattr(window, "reset_at", None)), "detail": getattr(window, "detail", None), }) payload = { "provider": str(getattr(snapshot, "provider", "") or ""), "source": str(getattr(snapshot, "source", "") or ""), "title": str(getattr(snapshot, "title", "") or ""), "plan": getattr(snapshot, "plan", None), "windows": windows, "details": list(getattr(snapshot, "details", ()) or ()), "available": bool(getattr(snapshot, "available", bool(windows))), "unavailable_reason": getattr(snapshot, "unavailable_reason", None), "fetched_at": _iso(getattr(snapshot, "fetched_at", None)), } pool = getattr(snapshot, "pool", None) if isinstance(pool, dict): payload["pool"] = pool return payload def _snapshot_available(snapshot): if snapshot is None: return False try: return bool(getattr(snapshot, "available", False)) except Exception: return False def _number(value): if isinstance(value, bool) or value is None: return None if isinstance(value, (int, float)): return value try: text = str(value).strip() if not text: return None number = float(text) return int(number) if number.is_integer() else number except Exception: return None def _parse_dt(value): if value in (None, ""): return None if isinstance(value, (int, float)): try: return datetime.fromtimestamp(float(value), tz=timezone.utc) except Exception: return None text = str(value).strip() if not text: return None if text.endswith("Z"): text = text[:-1] + "+00:00" try: dt = datetime.fromisoformat(text) except ValueError: return None return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) def _title_case_slug(value): cleaned = str(value or "").strip() if not cleaned: return None return cleaned.replace("_", " ").replace("-", " ").title() def _resolve_codex_usage_url(base_url): normalized = str(base_url or "").strip().rstrip("/") or _CODEX_DEFAULT_BASE_URL if normalized.endswith("/codex"): normalized = normalized[: -len("/codex")] if "/backend-api" in normalized: return normalized + "/wham/usage" return normalized + "/api/codex/usage" def _jwt_claims(token): if not isinstance(token, str) or token.count(".") != 2: return {} payload = token.split(".")[1] payload += "=" * ((4 - len(payload) % 4) % 4) try: claims = json.loads(base64.urlsafe_b64decode(payload.encode("utf-8")).decode("utf-8")) except Exception: return {} return claims if isinstance(claims, dict) else {} def _codex_usage_headers(access_token): headers = { "Authorization": "Bearer " + access_token, "Accept": "application/json", "User-Agent": "codex_cli_rs/0.0.0 (Hermes WebUI)", "originator": "codex_cli_rs", } auth_claim = _jwt_claims(access_token).get("https://api.openai.com/auth") account_id = None if isinstance(auth_claim, dict): account_id = auth_claim.get("chatgpt_account_id") if isinstance(account_id, str) and account_id.strip(): headers["ChatGPT-Account-ID"] = account_id.strip() return headers def _entry_value(entry, *names): for name in names: try: value = getattr(entry, name) except Exception: value = None if value in (None, ""): continue text = str(value).strip() if text: return text return None def _codex_snapshot_from_usage_payload(payload): if not isinstance(payload, dict): payload = {} rate_limit = payload.get("rate_limit") if not isinstance(rate_limit, dict): rate_limit = {} windows = [] for key, label in (("primary_window", "Session"), ("secondary_window", "Weekly")): window = rate_limit.get(key) if not isinstance(window, dict): continue used = _number(window.get("used_percent")) if used is None: continue windows.append(SimpleNamespace( label=label, used_percent=float(used), reset_at=_parse_dt(window.get("reset_at")), detail=None, )) details = [] credits = payload.get("credits") if isinstance(credits, dict) and credits.get("has_credits"): balance = _number(credits.get("balance")) if balance is not None: details.append("Credits balance: $" + format(float(balance), ".2f")) elif credits.get("unlimited"): details.append("Credits balance: unlimited") return SimpleNamespace( provider="openai-codex", source="usage_api", title="Account limits", plan=_title_case_slug(payload.get("plan_type")), windows=tuple(windows), details=tuple(details), available=bool(windows or details), unavailable_reason=None, fetched_at=datetime.now(timezone.utc), ) def _snapshot_windows_payload(snapshot): windows = [] for window in getattr(snapshot, "windows", ()) or (): label = str(getattr(window, "label", "") or "").strip() if not label: continue used_percent = _number(getattr(window, "used_percent", None)) remaining_percent = None if used_percent is not None: remaining_percent = max(0.0, min(100.0, 100.0 - float(used_percent))) windows.append({ "label": label, "used_percent": used_percent, "remaining_percent": remaining_percent, "reset_at": _iso(getattr(window, "reset_at", None)), "detail": getattr(window, "detail", None), }) return windows def _snapshot_details_payload(snapshot): return [ str(detail).strip() for detail in (getattr(snapshot, "details", ()) or ()) if str(detail).strip() ] def _safe_entry_label(entry, index): label = _entry_value(entry, "label", "source") or "" if not label: label = "Credential " + str(index) label = " ".join(str(label).split()) if len(label) > 64: label = label[:61].rstrip() + "..." return label def _safe_unavailable_reason(reason): text = " ".join(str(reason or "").split()) if not text: return None lowered = text.lower() sensitive_terms = ("access_token", "refresh_token", "authorization", "bearer ", "jwt", "secret") if any(term in lowered for term in sensitive_terms): return "Usage unavailable for this credential." return text[:180] def _entry_exhausted_ttl_seconds(error_code): code = str(error_code or "").strip() if code == "401": return 5 * 60 return 60 * 60 def _entry_pool_exhausted_until(entry): if str(_entry_value(entry, "last_status") or "").strip().lower() != "exhausted": return None reset_at = _parse_dt(getattr(entry, "last_error_reset_at", None)) if reset_at is not None: return reset_at status_at = _parse_dt(getattr(entry, "last_status_at", None)) if status_at is None: return None return status_at + timedelta(seconds=_entry_exhausted_ttl_seconds(_entry_value(entry, "last_error_code"))) def _entry_is_pool_exhausted(entry): exhausted_until = _entry_pool_exhausted_until(entry) return exhausted_until is not None and datetime.now(timezone.utc) < exhausted_until def _entry_pool_exhausted_reason(entry): code = _entry_value(entry, "last_error_code") reset_at = _entry_pool_retry_after(entry) reason = "Credential pool marked this credential exhausted" if code: reason += " after provider status " + code if reset_at: reason += "; retry after " + reset_at return reason + "." def _entry_pool_retry_after(entry): return _iso(_entry_pool_exhausted_until(entry)) def _fetch_codex_entry_snapshot(entry): access_token = _entry_value(entry, "runtime_api_key", "access_token") if not access_token: return None, False, "No runtime token available." base_url = _entry_value(entry, "runtime_base_url", "base_url") or _CODEX_DEFAULT_BASE_URL request = urllib_request.Request( _resolve_codex_usage_url(base_url), headers=_codex_usage_headers(access_token), ) with urllib_request.urlopen(request, timeout=_CODEX_POOL_USAGE_TIMEOUT_SECONDS) as response: payload = json.loads(response.read().decode("utf-8") or "{}") return _codex_snapshot_from_usage_payload(payload), True, None def _best_remaining_by_window(rows): best = {} for row in rows: if row.get("status") != "available": continue label = row.get("label") or "Credential" for window in row.get("windows") or []: if not isinstance(window, dict): continue window_label = str(window.get("label") or "").strip() remaining = _number(window.get("remaining_percent")) if not window_label or remaining is None: continue candidate = { "label": window_label, "remaining_percent": remaining, "used_percent": window.get("used_percent"), "reset_at": window.get("reset_at"), "detail": window.get("detail"), "credential_label": label, } current = best.get(window_label.lower()) # The normalized Codex account-limit payload currently exposes # percentages, not absolute request/token capacity. If absolute # remaining capacity becomes available, prefer it here. if current is None or float(remaining) > float(current.get("remaining_percent") or -1): best[window_label.lower()] = candidate return list(best.values()) def _next_reset_at(rows): best_dt = None best_text = None for row in rows: for window in row.get("windows") or []: if not isinstance(window, dict): continue reset_text = window.get("reset_at") dt = _parse_dt(reset_text) if dt is None: continue if best_dt is None or dt < best_dt: best_dt = dt best_text = _iso(dt) return best_text def _codex_pool_snapshot(entries, rows, queried): available_rows = [row for row in rows if row.get("status") == "available"] exhausted_rows = [row for row in rows if row.get("status") == "exhausted"] failed_rows = [row for row in rows if row.get("status") not in {"available", "exhausted"}] plans = [] for row in rows: plan = row.get("plan") if plan and plan not in plans: plans.append(plan) best_windows = _best_remaining_by_window(rows) pool = { "total_credentials": len(entries), "queried_credentials": queried, "available_credentials": len(available_rows), "exhausted_credentials": len(exhausted_rows), "failed_credentials": len(failed_rows), "plans": plans, "next_reset_at": _next_reset_at(rows), "best_remaining_by_window": best_windows, "credentials": rows, } details = [str(len(available_rows)) + "/" + str(len(entries)) + " credentials available"] if exhausted_rows: details.append(str(len(exhausted_rows)) + " exhausted") if failed_rows: details.append(str(len(failed_rows)) + " failed to load") if plans: details.append("Plans: " + ", ".join(plans)) plan = plans[0] if len(plans) == 1 else None windows = tuple( SimpleNamespace( label=window.get("label"), used_percent=window.get("used_percent"), reset_at=window.get("reset_at"), detail="Best of " + str(len(available_rows)) + " available credentials", ) for window in best_windows ) return SimpleNamespace( provider="openai-codex", source="usage_api_pool", title="Account limits", plan=plan, windows=windows, details=tuple(details), available=bool(available_rows), unavailable_reason=None if available_rows else "No Codex pool credentials returned available account limits.", fetched_at=datetime.now(timezone.utc), pool=pool, ) def _codex_pool_exhausted_row(entry, index): label = _safe_entry_label(entry, index) retry_after = _entry_pool_retry_after(entry) return { "label": label, "status": "exhausted", "plan": None, "windows": [], "details": [], "unavailable_reason": _entry_pool_exhausted_reason(entry), "retry_after": retry_after, "fetched_at": None, } def _probe_codex_pool_entry(item): index, entry = item label = _safe_entry_label(entry, index) did_query_count = 0 try: snapshot, did_query, reason = _fetch_codex_entry_snapshot(entry) if did_query: did_query_count = 1 except Exception as exc: snapshot = None reason = str(exc) windows = _snapshot_windows_payload(snapshot) if snapshot is not None else [] details = _snapshot_details_payload(snapshot) if snapshot is not None else [] snapshot_available = _snapshot_available(snapshot) status = "available" if snapshot_available else "unavailable" row = { "label": label, "status": status, "plan": getattr(snapshot, "plan", None) if snapshot is not None else None, "windows": windows, "details": details, "unavailable_reason": None if snapshot_available else _safe_unavailable_reason(reason or getattr(snapshot, "unavailable_reason", None)), "fetched_at": _iso(getattr(snapshot, "fetched_at", None)) if snapshot is not None else None, } return index, row, did_query_count def _fetch_codex_account_usage_from_pool(): try: from agent.credential_pool import load_pool pool = load_pool("openai-codex") entries = list(pool.entries()) if pool is not None and hasattr(pool, "entries") else [] if not entries: return None rows_by_index = {} probe_items = [] queried = 0 for index, entry in enumerate(entries, start=1): if _entry_is_pool_exhausted(entry): rows_by_index[index] = _codex_pool_exhausted_row(entry, index) else: probe_items.append((index, entry)) if probe_items: max_workers = min(_CODEX_POOL_MAX_WORKERS, len(probe_items)) with ThreadPoolExecutor(max_workers=max_workers) as executor: for index, row, did_query_count in executor.map(_probe_codex_pool_entry, probe_items): rows_by_index[index] = row queried += did_query_count rows = [rows_by_index[index] for index in range(1, len(entries) + 1)] return _codex_pool_snapshot(entries, rows, queried) except Exception: return None provider = sys.argv[1] api_key = sys.argv[2] or None try: snapshot = fetch_account_usage(provider, api_key=api_key) except Exception: snapshot = None if str(provider or "").strip().lower() == "openai-codex": pool_snapshot = _fetch_codex_account_usage_from_pool() if isinstance(getattr(pool_snapshot, "pool", None), dict): snapshot = pool_snapshot print(json.dumps(_snapshot_payload(snapshot))) """ # SECTION: Provider ↔ env var mapping # Maps canonical provider slug → env var name for API key. # Providers not listed here (OAuth/token-flow providers like copilot, nous, # openai-codex) cannot have their keys managed from the WebUI. _PROVIDER_ENV_VAR: dict[str, str] = { "openrouter": "OPENROUTER_API_KEY", "anthropic": "ANTHROPIC_API_KEY", "openai": "OPENAI_API_KEY", "google": "GOOGLE_API_KEY", "gemini": "GEMINI_API_KEY", "zai": "GLM_API_KEY", "kimi-coding": "KIMI_API_KEY", "deepseek": "DEEPSEEK_API_KEY", "minimax": "MINIMAX_API_KEY", "minimax-cn": "MINIMAX_CN_API_KEY", "mistralai": "MISTRAL_API_KEY", "x-ai": "XAI_API_KEY", "xiaomi": "XIAOMI_API_KEY", "opencode-zen": "OPENCODE_ZEN_API_KEY", "opencode-go": "OPENCODE_GO_API_KEY", # NOTE: bare "ollama" (local) deliberately omitted — local Ollama is keyless # by default and the runtime in hermes_cli/runtime_provider.py only consumes # OLLAMA_API_KEY when the base URL hostname is ollama.com (Ollama Cloud). # If we mapped both providers to the same env var, configuring Ollama Cloud # would falsely flip the local Ollama card to "API key configured" (#1410). # Users who genuinely run an authenticated local Ollama can still set a key # via providers.ollama.api_key in config.yaml — that path remains supported # by _provider_has_key(). "ollama-cloud": "OLLAMA_API_KEY", # Bare "lmstudio" maps to LM_API_KEY — the canonical env var the agent CLI # runtime reads (hermes_cli/auth.py:182, api_key_env_vars=("LM_API_KEY",)). # Pre-#1499/#1500 the WebUI used LMSTUDIO_API_KEY here, which made Settings # report keys correctly but the agent runtime ignored them — masked in # practice by the LMSTUDIO_NOAUTH_PLACEHOLDER for keyless local installs. # Aligning to LM_API_KEY makes a configured LM Studio key actually work # for chat. The legacy LMSTUDIO_API_KEY name is read by `_provider_has_key` # via _PROVIDER_ENV_VAR_ALIASES below so existing users don't see Settings # flip to "no key" after upgrading. "lmstudio": "LM_API_KEY", "nvidia": "NVIDIA_API_KEY", } # Read-only legacy env-var aliases. When `_provider_has_key(pid)` looks up its # canonical env var name and finds nothing, it also checks any aliases listed # here. Onboarding (api/onboarding.py:apply_onboarding_setup) only writes the # canonical name. Use this for env vars that were renamed in a past release; # add an entry, ship for a few releases, then remove the alias once enough # users have upgraded. _PROVIDER_ENV_VAR_ALIASES: dict[str, tuple[str, ...]] = { # #1500 — agent runtime reads LM_API_KEY (canonical), but WebUI builds # ≤ v0.50.272 wrote LMSTUDIO_API_KEY into .env. Keep reading both. "lmstudio": ("LMSTUDIO_API_KEY",), } # Providers that use OAuth or token flows — their credentials are managed # through the Hermes CLI, not via API keys. The WebUI cannot set these. _OAUTH_PROVIDERS = frozenset({ "copilot", "copilot-acp", "nous", "openai-codex", "qwen-oauth", "xai-oauth", }) # SECTION: Helper functions def _get_hermes_home() -> Path: """Return the active Hermes home directory.""" try: from api.profiles import get_active_hermes_home return get_active_hermes_home() except ImportError: return Path.home() / ".hermes" def _load_env_file(env_path: Path) -> dict[str, str]: """Read key=value pairs from a .env file.""" values: dict[str, str] = {} if not env_path.exists(): return values try: for raw in env_path.read_text(encoding="utf-8").splitlines(): line = raw.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) values[key.strip()] = value.strip().strip('"').strip("'") except Exception: return {} return values def _decode_jwt_claims_unverified(token: str) -> dict[str, Any]: """Decode JWT claims for token-shape classification only. The signature is intentionally not verified because this helper is not an authorization decision: it only prevents a Codex OAuth JWT-shaped value from being treated as a raw OpenAI API key in provider-card detection. """ if not isinstance(token, str) or token.count(".") != 2: return {} payload = token.split(".", 2)[1] payload += "=" * ((4 - len(payload) % 4) % 4) try: claims = json.loads(base64.urlsafe_b64decode(payload.encode("utf-8")).decode("utf-8")) except Exception: return {} return claims if isinstance(claims, dict) else {} def _looks_like_codex_oauth_token(value: str) -> bool: """Return True when a value is a ChatGPT/Codex OAuth JWT, not an OpenAI API key.""" token = str(value or "").strip() if not token or token.startswith("sk-"): return False claims = _decode_jwt_claims_unverified(token) if not claims: return False auth_claim = claims.get("https://api.openai.com/auth") if isinstance(auth_claim, dict) and auth_claim: return True return any(key in claims for key in ("chatgpt_account_id", "https://api.openai.com/profile")) def _provider_value_counts_as_api_key(provider_id: str, value: object) -> bool: text = str(value or "").strip() if not text: return False if (provider_id or "").strip().lower() == "openai" and _looks_like_codex_oauth_token(text): return False return True def _provider_has_shadowed_codex_oauth_value(provider_id: str) -> bool: """True when the bare OpenAI credential slot contains only a Codex OAuth JWT. Users who authenticate Codex can end up with a ChatGPT/Codex JWT in a legacy OPENAI_API_KEY-shaped location. That value should not make the bare OpenAI API provider appear as configured, and the Providers tab should not show an extra OpenAI card solely because of that Codex-only credential. """ if (provider_id or "").strip().lower() != "openai": return False values: list[object] = [] env_var = _PROVIDER_ENV_VAR.get(provider_id) if env_var: env_path = _get_hermes_home() / ".env" env_values = _load_env_file(env_path) values.append(env_values.get(env_var)) values.append(os.getenv(env_var)) for alias in _PROVIDER_ENV_VAR_ALIASES.get(provider_id, ()) or (): values.append(env_values.get(alias)) values.append(os.getenv(alias)) cfg = get_config() model_cfg = cfg.get("model", {}) if isinstance(model_cfg, dict): active_provider = str(model_cfg.get("provider") or "").strip().lower() if active_provider == provider_id: values.append(model_cfg.get("api_key")) providers_cfg = cfg.get("providers", {}) if isinstance(providers_cfg, dict): provider_cfg = providers_cfg.get(provider_id, {}) if isinstance(provider_cfg, dict): values.append(provider_cfg.get("api_key")) custom_providers = cfg.get("custom_providers", []) if isinstance(custom_providers, list): for cp in custom_providers: if isinstance(cp, dict) and _custom_provider_name_matches(provider_id, cp.get("name")): cp_key = cp.get("api_key") if isinstance(cp_key, str) and cp_key.startswith("${") and cp_key.endswith("}"): values.append(os.getenv(cp_key[2:-1])) else: values.append(cp_key) return any(_looks_like_codex_oauth_token(str(value or "")) for value in values) def _write_env_file(env_path: Path, updates: dict[str, str | None]) -> None: """Write key=value pairs to the .env file. Values of ``None`` cause the key to be removed. Preserves comments, blank lines, and original key order (#1164). New keys are appended at the end of the file with a blank-line separator. Holds ``_ENV_LOCK`` from ``api.streaming`` for the entire load → modify → write cycle to prevent TOCTOU races between concurrent POST /api/providers calls (each reading the same file baseline and overwriting the other's key). Also serialises os.environ mutations with streaming sessions. """ from api.streaming import _ENV_LOCK import stat as _stat with _ENV_LOCK: # ── Read existing lines (preserving comments and blank lines) ── existing_lines: list[str] = [] if env_path.exists(): try: existing_lines = env_path.read_text(encoding="utf-8").splitlines() except Exception: existing_lines = [] # Map each existing key to its line index so we can update in-place. existing_key_indices: dict[str, int] = {} for _i, _raw in enumerate(existing_lines): _stripped = _raw.strip() if _stripped and not _stripped.startswith("#") and "=" in _stripped: _existing_key_indices_key = _stripped.split("=", 1)[0].strip() existing_key_indices[_existing_key_indices_key] = _i output_lines = list(existing_lines) new_keys: list[str] = [] for key, value in updates.items(): if value is None: # Mark the line for removal (None sentinel) and clear env. os.environ.pop(key, None) if key in existing_key_indices: output_lines[existing_key_indices[key]] = None # type: ignore[assignment] continue clean = str(value).strip() if not clean: continue # Reject embedded newlines/carriage returns to prevent .env injection if "\n" in clean or "\r" in clean: raise ValueError("API key must not contain newline characters.") os.environ[key] = clean if key in existing_key_indices: output_lines[existing_key_indices[key]] = f"{key}={clean}" else: new_keys.append(f"{key}={clean}") # Remove deleted lines (None sentinels) output_lines = [l for l in output_lines if l is not None] # Append new keys after a blank-line separator if new_keys: if output_lines and output_lines[-1].strip() != "": output_lines.append("") output_lines.extend(new_keys) env_path.parent.mkdir(parents=True, exist_ok=True) content = "\n".join(output_lines) if content: content += "\n" # Atomic write via tempfile + os.replace so cross-process readers # (Telegram bot, CLI) never see a half-truncated file. The shared # ``~/.hermes/.env`` is also written by ``hermes_cli.config.save_env_value`` # using the same atomic pattern; matching it here closes the # cross-process leg of #1164 (within-process is covered by _ENV_LOCK). _mode = _stat.S_IRUSR | _stat.S_IWUSR # 0o600 import tempfile as _tempfile _tmp_fd, _tmp_path = _tempfile.mkstemp( dir=str(env_path.parent), prefix=".env_", suffix=".tmp" ) try: with os.fdopen(_tmp_fd, "w", encoding="utf-8") as _f: _f.write(content) _f.flush() os.fsync(_f.fileno()) os.chmod(_tmp_path, _mode) # tighten before rename so readers see 0600 os.replace(_tmp_path, env_path) except BaseException: try: os.unlink(_tmp_path) except OSError: pass raise try: env_path.chmod(_mode) except OSError: pass def _provider_has_key(provider_id: str) -> bool: """Check whether a provider has a configured API key. Checks (in order): 1. ``~/.hermes/.env`` for the known env var 2. ``os.environ`` for the known env var 3. ``config.yaml → model.api_key`` (only if provider is the active one) 4. ``config.yaml → providers..api_key`` 5. ``config.yaml → custom_providers[].api_key`` (for custom providers) """ env_var = _PROVIDER_ENV_VAR.get(provider_id) if env_var: env_path = _get_hermes_home() / ".env" env_values = _load_env_file(env_path) env_file_value = env_values.get(env_var) if _provider_value_counts_as_api_key(provider_id, env_file_value): return True env_value = os.getenv(env_var) if _provider_value_counts_as_api_key(provider_id, env_value): return True # Fall back to legacy env-var aliases (e.g. lmstudio's pre-#1500 # LMSTUDIO_API_KEY name) so existing users don't lose detection # after an env-var rename. See _PROVIDER_ENV_VAR_ALIASES. for alias in _PROVIDER_ENV_VAR_ALIASES.get(provider_id, ()) or (): if _provider_value_counts_as_api_key(provider_id, env_values.get(alias)): return True if _provider_value_counts_as_api_key(provider_id, os.getenv(alias)): return True cfg = get_config() # Check model.api_key — only match if this provider is the active one. # Previously this checked globally, causing all providers to show # "configured" when the active provider had a top-level api_key. model_cfg = cfg.get("model", {}) if isinstance(model_cfg, dict) and str(model_cfg.get("api_key") or "").strip(): active_provider = model_cfg.get("provider") if active_provider and str(active_provider).strip().lower() == provider_id.lower(): if _provider_value_counts_as_api_key(provider_id, model_cfg.get("api_key")): return True # Check providers..api_key providers_cfg = cfg.get("providers", {}) if isinstance(providers_cfg, dict): provider_cfg = providers_cfg.get(provider_id, {}) if isinstance(provider_cfg, dict) and str(provider_cfg.get("api_key") or "").strip(): if _provider_value_counts_as_api_key(provider_id, provider_cfg.get("api_key")): return True # Check custom_providers custom_providers = cfg.get("custom_providers", []) if isinstance(custom_providers, list): for cp in custom_providers: if isinstance(cp, dict): if _custom_provider_name_matches(provider_id, cp.get("name")): if _provider_value_counts_as_api_key(provider_id, cp.get("api_key")): return True return False def _get_provider_api_key(provider_id: str) -> str | None: """Return a configured provider API key without exposing it to callers.""" provider_id = (provider_id or "").strip().lower() env_var = _PROVIDER_ENV_VAR.get(provider_id) if env_var: env_path = _get_hermes_home() / ".env" env_values = _load_env_file(env_path) env_file_value = env_values.get(env_var) if _provider_value_counts_as_api_key(provider_id, env_file_value): return str(env_file_value).strip() or None env_value = os.getenv(env_var) if _provider_value_counts_as_api_key(provider_id, env_value): return str(env_value).strip() or None for alias in _PROVIDER_ENV_VAR_ALIASES.get(provider_id, ()) or (): alias_file_value = env_values.get(alias) if _provider_value_counts_as_api_key(provider_id, alias_file_value): return str(alias_file_value).strip() or None alias_value = os.getenv(alias) if _provider_value_counts_as_api_key(provider_id, alias_value): return str(alias_value).strip() or None cfg = get_config() model_cfg = cfg.get("model", {}) if isinstance(model_cfg, dict): active_provider = str(model_cfg.get("provider") or "").strip().lower() model_key = str(model_cfg.get("api_key") or "").strip() if model_key and active_provider == provider_id and _provider_value_counts_as_api_key(provider_id, model_key): return model_key providers_cfg = cfg.get("providers", {}) if isinstance(providers_cfg, dict): provider_cfg = providers_cfg.get(provider_id, {}) if isinstance(provider_cfg, dict): provider_key = str(provider_cfg.get("api_key") or "").strip() if _provider_value_counts_as_api_key(provider_id, provider_key): return provider_key custom_providers = cfg.get("custom_providers", []) if isinstance(custom_providers, list): for cp in custom_providers: if not isinstance(cp, dict): continue if _custom_provider_name_matches(provider_id, cp.get("name")): cp_key = str(cp.get("api_key") or "").strip() if cp_key.startswith("${") and cp_key.endswith("}"): return os.getenv(cp_key[2:-1], "").strip() or None if _provider_value_counts_as_api_key(provider_id, cp_key): return cp_key return None def _active_provider_id() -> str | None: cfg = get_config() model_cfg = cfg.get("model", {}) if not isinstance(model_cfg, dict): return None provider = str(model_cfg.get("provider") or "").strip().lower() return provider or None def _quota_number(value: Any) -> int | float | None: if isinstance(value, bool) or value is None: return None if isinstance(value, (int, float)): return value try: text = str(value).strip() if not text: return None number = float(text) return int(number) if number.is_integer() else number except (TypeError, ValueError): return None def _sanitize_openrouter_quota(payload: Any) -> dict[str, int | float | None]: if isinstance(payload, dict) and isinstance(payload.get("data"), dict): payload = payload["data"] if not isinstance(payload, dict): payload = {} return { "limit_remaining": _quota_number(payload.get("limit_remaining")), "usage": _quota_number(payload.get("usage")), "limit": _quota_number(payload.get("limit")), } def _isoformat_utc(value: Any) -> str | None: if value in (None, ""): return None if isinstance(value, datetime): dt = value if value.tzinfo else value.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") text = str(value).strip() return text or None def _serialize_account_usage_snapshot(snapshot: Any) -> dict[str, Any] | None: if snapshot is None: return None windows: list[dict[str, Any]] = [] for window in getattr(snapshot, "windows", ()) or (): label = str(getattr(window, "label", "") or "").strip() if not label: continue used_percent = _quota_number(getattr(window, "used_percent", None)) remaining_percent = None if used_percent is not None: remaining_percent = max(0.0, min(100.0, 100.0 - float(used_percent))) windows.append({ "label": label, "used_percent": used_percent, "remaining_percent": remaining_percent, "reset_at": _isoformat_utc(getattr(window, "reset_at", None)), "detail": str(getattr(window, "detail", "") or "").strip() or None, }) details = [ str(detail).strip() for detail in (getattr(snapshot, "details", ()) or ()) if str(detail).strip() ] plan = str(getattr(snapshot, "plan", "") or "").strip() or None unavailable_reason = str(getattr(snapshot, "unavailable_reason", "") or "").strip() or None result = { "provider": str(getattr(snapshot, "provider", "") or "").strip() or None, "source": str(getattr(snapshot, "source", "") or "").strip() or None, "title": str(getattr(snapshot, "title", "") or "").strip() or "Account limits", "plan": plan, "windows": windows, "details": details, "available": bool(getattr(snapshot, "available", bool(windows or details))) and not unavailable_reason, "unavailable_reason": unavailable_reason, "fetched_at": _isoformat_utc(getattr(snapshot, "fetched_at", None)), } pool = getattr(snapshot, "pool", None) if isinstance(pool, dict): result["pool"] = pool return result def _agent_fetch_account_usage(provider: str, *, base_url: str | None = None, api_key: str | None = None) -> Any: from agent.account_usage import fetch_account_usage return fetch_account_usage(provider, base_url=base_url, api_key=api_key) def _account_usage_subprocess_env(home: Path, provider: str, api_key: str | None) -> dict[str, str]: env = dict(os.environ) env["HERMES_HOME"] = str(Path(home)) # Profile .env values should affect only the child quota probe, not the # WebUI process-global environment. This is especially important for # Anthropic account usage, where the agent resolver reads OAuth/API tokens # from environment variables. for key, value in _load_env_file(Path(home) / ".env").items(): if value: env[key] = value env_var = _PROVIDER_ENV_VAR.get((provider or "").strip().lower()) if env_var and api_key: env[env_var] = api_key try: from api.config import _AGENT_DIR except Exception: _AGENT_DIR = None pythonpath_parts: list[str] = [] if _AGENT_DIR: pythonpath_parts.append(str(_AGENT_DIR)) existing_pythonpath = env.get("PYTHONPATH", "") if existing_pythonpath: pythonpath_parts.append(existing_pythonpath) if pythonpath_parts: env["PYTHONPATH"] = os.pathsep.join(pythonpath_parts) return env def _account_usage_payload_to_snapshot(payload: Any) -> Any: if not isinstance(payload, dict): return None windows = tuple( SimpleNamespace( label=window.get("label"), used_percent=window.get("used_percent"), reset_at=window.get("reset_at"), detail=window.get("detail"), ) for window in (payload.get("windows") or ()) if isinstance(window, dict) ) return SimpleNamespace( provider=payload.get("provider"), source=payload.get("source"), title=payload.get("title"), plan=payload.get("plan"), windows=windows, details=tuple(payload.get("details") or ()), available=bool(payload.get("available")), unavailable_reason=payload.get("unavailable_reason"), fetched_at=payload.get("fetched_at"), pool=payload.get("pool") if isinstance(payload.get("pool"), dict) else None, ) def _account_usage_cache_key(provider: str, home: Path, api_key: str | None) -> tuple[str, str, str]: key_fingerprint = "" if api_key: key_fingerprint = hashlib.sha256(api_key.encode("utf-8", "ignore")).hexdigest() return ((provider or "").strip().lower(), str(Path(home)), key_fingerprint) def _get_cached_account_usage(cache_key: tuple[str, str, str]) -> tuple[bool, Any]: now = time.monotonic() with _account_usage_status_cache_lock: cached = _account_usage_status_cache.get(cache_key) if cached is None: return False, None fetched_at, snapshot = cached if now - fetched_at <= _ACCOUNT_USAGE_CACHE_TTL_SECONDS: return True, snapshot _account_usage_status_cache.pop(cache_key, None) return False, None def invalidate_account_usage_status_cache(provider_id: str | None = None) -> None: normalized = str(provider_id or "").strip().lower() with _account_usage_status_cache_lock: if not normalized: _account_usage_status_cache.clear() return for key in list(_account_usage_status_cache): if key[0] == normalized: _account_usage_status_cache.pop(key, None) def _set_cached_account_usage( cache_key: tuple[str, str, str], snapshot: Any, ) -> None: now = time.monotonic() with _account_usage_status_cache_lock: if snapshot is None: cached = _account_usage_status_cache.get(cache_key) if cached is not None and cached[1] is not None: return _account_usage_status_cache.pop(cache_key, None) return _account_usage_status_cache[cache_key] = (now, snapshot) expired = [ key for key, (fetched_at, _snapshot) in _account_usage_status_cache.items() if now - fetched_at > _ACCOUNT_USAGE_CACHE_TTL_SECONDS ] for key in expired: _account_usage_status_cache.pop(key, None) while len(_account_usage_status_cache) > _ACCOUNT_USAGE_CACHE_MAX_ENTRIES: oldest_key = min( _account_usage_status_cache, key=lambda key: _account_usage_status_cache[key][0], ) _account_usage_status_cache.pop(oldest_key, None) def _agent_fetch_account_usage_for_home(provider: str, home: Path, *, api_key: str | None = None) -> Any: try: from api.config import PYTHON_EXE except Exception: PYTHON_EXE = sys.executable or "python3" try: # On POSIX (Linux/macOS), wire parent-death signal so the child dies # cleanly if the WebUI parent terminates. preexec_fn is not safe on # Windows, where OS-level process-tree cleanup handles child orphans. kwargs: dict[str, Any] = { "stdin": subprocess.DEVNULL, "stdout": subprocess.PIPE, "stderr": subprocess.PIPE, "text": True, "timeout": _ACCOUNT_USAGE_SUBPROCESS_TIMEOUT_SECONDS, "check": False, } if hasattr(os, "fork"): # POSIX kwargs["preexec_fn"] = _account_usage_preexec_fn proc = subprocess.run( [ PYTHON_EXE, "-c", _ACCOUNT_USAGE_PARENT_DEATHSIG_BOOTSTRAP + _ACCOUNT_USAGE_SUBPROCESS_CODE, provider, api_key or "", ], env=_account_usage_subprocess_env(home, provider, api_key), **kwargs, ) except subprocess.TimeoutExpired: logger.debug("Account usage probe for %s timed out", provider) return None except Exception: logger.debug("Account usage probe for %s failed to launch", provider, exc_info=True) return None if proc.returncode != 0: logger.debug("Account usage probe for %s exited with status %s", provider, proc.returncode) return None try: payload = json.loads((proc.stdout or "").strip() or "null") except json.JSONDecodeError: logger.debug("Account usage probe for %s returned invalid JSON", provider) return None return _account_usage_payload_to_snapshot(payload) def _fetch_account_usage_with_profile_context(provider: str, *, refresh: bool = False) -> Any: """Fetch account usage for a provider within the active profile context. Concurrency is capped by the module-level BoundedSemaphore so that rapid UI polls (e.g. Settings page refresh) cannot exhaust file-descriptors or memory by spawning more than _MAX_CONCURRENT_ACCOUNT_USAGE_PROBES probe subprocesses simultaneously. Each probe runs up to 35 s. A warm worker-pool (reuse of persistent subprocess handles) is a natural follow-up if this first slice proves insufficient in production. """ home = _get_hermes_home() api_key = _get_provider_api_key(provider) cache_key = _account_usage_cache_key(provider, home, api_key) if not refresh: cache_hit, cached = _get_cached_account_usage(cache_key) if cache_hit: return cached sem = _get_account_usage_probe_semaphore() try: with sem: snapshot = _agent_fetch_account_usage_for_home( provider, home, api_key=api_key, ) _set_cached_account_usage(cache_key, snapshot) return snapshot except Exception: logger.debug("Failed to fetch account usage for %s", provider, exc_info=True) _set_cached_account_usage(cache_key, None) return None def _provider_account_usage_status(provider: str, display_name: str, *, refresh: bool = False) -> dict[str, Any]: snapshot = _fetch_account_usage_with_profile_context(provider, refresh=refresh) account_limits = _serialize_account_usage_snapshot(snapshot) if account_limits and account_limits.get("available"): return { "ok": True, "provider": provider, "display_name": display_name, "supported": True, "status": "available", "label": account_limits.get("title") or "Account limits", "quota": None, "account_limits": account_limits, "message": f"{display_name} account limits loaded.", } reason = "" if account_limits: reason = str(account_limits.get("unavailable_reason") or "").strip() message = ( f"{display_name} account limits are unavailable. {reason}" if reason else f"{display_name} account limits are unavailable. Confirm provider authentication and try again." ) return { "ok": False, "provider": provider, "display_name": display_name, "supported": True, "status": "unavailable", "quota": None, "account_limits": account_limits, "message": message, } def get_provider_quota(provider_id: str | None = None, *, refresh: bool = False) -> dict[str, Any]: """Return sanitized quota/rate-limit status for the active provider. OpenRouter keeps its documented key endpoint. OAuth-backed account usage providers reuse Hermes Agent's /usage account-limits abstraction so WebUI stays aligned with CLI/Gateway provider semantics. """ provider = (provider_id or _active_provider_id() or "").strip().lower() if not provider: return { "ok": False, "provider": None, "display_name": None, "supported": False, "status": "unavailable", "quota": None, "message": "No active provider is configured.", } display_name = _PROVIDER_DISPLAY.get(provider, provider.replace("-", " ").title()) if provider in _ACCOUNT_USAGE_PROVIDERS: return _provider_account_usage_status(provider, display_name, refresh=refresh) if provider != "openrouter": detail = "OpenAI/Anthropic rate-limit headers are a follow-up once WebUI captures provider response metadata." return { "ok": False, "provider": provider, "display_name": display_name, "supported": False, "status": "unsupported", "quota": None, "message": f"Quota status is not available for {display_name}. {detail}", } api_key = _get_provider_api_key("openrouter") if not api_key: return { "ok": False, "provider": "openrouter", "display_name": display_name, "supported": True, "status": "no_key", "quota": None, "message": "OpenRouter quota status needs an OPENROUTER_API_KEY configured on the server.", } req = urllib.request.Request( _OPENROUTER_KEY_URL, headers={ "Authorization": f"Bearer {api_key}", "Accept": "application/json", }, ) try: with urllib.request.urlopen(req, timeout=_PROVIDER_QUOTA_TIMEOUT_SECONDS) as resp: raw = resp.read() payload = json.loads(raw.decode("utf-8")) if isinstance(raw, (bytes, bytearray)) else json.loads(raw) quota = _sanitize_openrouter_quota(payload) return { "ok": True, "provider": "openrouter", "display_name": display_name, "supported": True, "status": "available", "label": "OpenRouter credits", "quota": quota, "message": "OpenRouter quota status loaded.", } except urllib.error.HTTPError as exc: status = "invalid_key" if exc.code in (401, 403) else "unavailable" message = ( "OpenRouter rejected the configured API key." if status == "invalid_key" else "OpenRouter quota status is temporarily unavailable." ) return { "ok": False, "provider": "openrouter", "display_name": display_name, "supported": True, "status": status, "quota": None, "message": message, } except (TimeoutError, urllib.error.URLError, json.JSONDecodeError, OSError, ValueError): return { "ok": False, "provider": "openrouter", "display_name": display_name, "supported": True, "status": "unavailable", "quota": None, "message": "OpenRouter quota status is temporarily unavailable.", } def _provider_is_oauth(provider_id: str) -> bool: """Check whether a provider uses OAuth/token flows (managed by CLI).""" return provider_id in _OAUTH_PROVIDERS # ── OpenRouter cost-history snapshot helpers (#692) ────────────────────────── _COST_SNAPSHOTS_DIR_NAME = "cost-snapshots" _COST_SNAPSHOT_MAX_DAYS = 365 # hard cap to prevent unbounded growth _COST_SNAPSHOT_LOCK = threading.Lock() def _cost_snapshots_dir() -> Path: """Return the directory for cost-snapshot JSON files. Uses the Hermes home directory (profile-aware) so snapshots are isolated per profile, matching the existing STATE_DIR convention. """ return _get_hermes_home() / _COST_SNAPSHOTS_DIR_NAME @contextmanager def _cost_snapshot_file_lock(provider: str): """Serialize cost snapshot read-modify-write across worker processes.""" if fcntl is None: with nullcontext(): yield return snap_dir = _cost_snapshots_dir() snap_dir.mkdir(parents=True, exist_ok=True) lock_path = snap_dir / f"{provider}.lock" with lock_path.open("a", encoding="utf-8") as lock_file: fcntl.flock(lock_file, fcntl.LOCK_EX) yield def _fetch_openrouter_key_usage(api_key: str) -> dict[str, Any] | None: """Fetch current usage/limit from the OpenRouter ``/auth/key`` endpoint. Returns a dict with ``usage``, ``limit``, ``label`` on success, or ``None`` on any failure. Never raises; callers handle the None case. """ req = urllib.request.Request( _OPENROUTER_KEY_URL, headers={ "Authorization": f"Bearer {api_key}", "Accept": "application/json", }, ) try: with urllib.request.urlopen(req, timeout=_PROVIDER_QUOTA_TIMEOUT_SECONDS) as resp: raw = resp.read() payload = json.loads(raw.decode("utf-8") if isinstance(raw, (bytes, bytearray)) else raw) sanitized = _sanitize_openrouter_quota(payload) label = None if isinstance(payload, dict): data = payload.get("data", payload) if isinstance(data, dict): label = str(data.get("label") or "").strip() or None return { "usage": sanitized.get("usage"), "limit": sanitized.get("limit"), "label": label, } except Exception: logger.debug("OpenRouter key usage fetch failed for cost-history", exc_info=True) return None def _read_cost_snapshots(provider: str) -> list[dict[str, Any]]: """Read persisted daily snapshots for *provider* from disk. Returns a list of ``{date, used, limit}`` dicts sorted by date ascending. Returns an empty list if the file does not exist or is corrupt. """ path = _cost_snapshots_dir() / f"{provider}.json" if not path.exists(): return [] try: raw = path.read_text(encoding="utf-8") data = json.loads(raw) except (OSError, json.JSONDecodeError, ValueError): return [] if not isinstance(data, dict): return [] snapshots = data.get("snapshots") if not isinstance(snapshots, list): return [] # Validate and sort valid = [] for entry in snapshots: if not isinstance(entry, dict): continue date = str(entry.get("date") or "").strip() if not date: continue valid.append({ "date": date, "used": _quota_number(entry.get("used")), "limit": _quota_number(entry.get("limit")), }) valid.sort(key=lambda e: e["date"]) return valid def _write_cost_snapshots(provider: str, snapshots: list[dict[str, Any]]) -> None: """Persist daily snapshots for *provider* to disk atomically.""" snap_dir = _cost_snapshots_dir() snap_dir.mkdir(parents=True, exist_ok=True) path = snap_dir / f"{provider}.json" payload = {"provider": provider, "snapshots": snapshots} body = json.dumps(payload, ensure_ascii=False, indent=2) import tempfile as _tempfile _tmp_fd, _tmp_path = _tempfile.mkstemp( dir=str(snap_dir), prefix=f".{provider}_", suffix=".tmp" ) try: with os.fdopen(_tmp_fd, "w", encoding="utf-8") as _f: _f.write(body) _f.flush() os.fsync(_f.fileno()) os.replace(_tmp_path, path) except BaseException: try: os.unlink(_tmp_path) except OSError: pass raise def _append_cost_snapshot(provider: str, usage: int | float | None, limit: int | float | None) -> list[dict[str, Any]]: """Append today's snapshot and return the updated list. If a snapshot for today already exists it is updated in-place so repeated calls within the same day are idempotent. """ today = datetime.now(timezone.utc).strftime("%Y-%m-%d") # Serialize the read-modify-write cycle. The atomic os.replace in # _write_cost_snapshots protects the file write itself, but without these # locks two concurrent requests can both read the same old snapshot list and # race to replace it with stale data. The threading lock covers current # single-process deployments; the file lock covers future multi-worker # deployments that share one Hermes home/state directory. with _COST_SNAPSHOT_LOCK: with _cost_snapshot_file_lock(provider): snapshots = _read_cost_snapshots(provider) # Update or append today's entry updated = False for entry in snapshots: if entry["date"] == today: entry["used"] = usage entry["limit"] = limit updated = True break if not updated: snapshots.append({"date": today, "used": usage, "limit": limit}) snapshots.sort(key=lambda e: e["date"]) # Cap to _COST_SNAPSHOT_MAX_DAYS entries (keep most recent) if len(snapshots) > _COST_SNAPSHOT_MAX_DAYS: snapshots = snapshots[-_COST_SNAPSHOT_MAX_DAYS:] _write_cost_snapshots(provider, snapshots) return snapshots def _compute_deltas(snapshots: list[dict[str, Any]], window_days: int) -> list[dict[str, Any]]: """Compute daily deltas from cumulative usage snapshots. Each snapshot carries cumulative ``used``; the delta for a day is the difference between that day's cumulative value and the previous day's. The oldest day in the window has ``delta=None`` (no previous baseline). If the cumulative value drops, treat that day as the start of a fresh series (for example after an API-key rotation) and use the current value as that day's delta instead of emitting a negative spend bar. """ # Take only the last *window_days* entries window = snapshots[-window_days:] if len(snapshots) > window_days else list(snapshots) result: list[dict[str, Any]] = [] for i, entry in enumerate(window): delta = None if i > 0 and entry.get("used") is not None and window[i - 1].get("used") is not None: delta = float(entry["used"]) - float(window[i - 1]["used"]) if delta < 0: delta = float(entry["used"]) # Rounding: avoid -0.0 and tiny floating-point noise if abs(delta) < 1e-9: delta = 0.0 else: delta = round(delta, 6) result.append({ "date": entry["date"], "used": entry.get("used"), "delta": delta, }) return result def get_provider_cost_history(provider_id: str | None = None, days: int = 7) -> dict[str, Any]: """Return daily cost-history snapshots with deltas for a provider. Currently only ``openrouter`` is supported. On each call the endpoint fetches the current cumulative usage from the OpenRouter ``/auth/key`` endpoint, appends/updates today's snapshot, and returns the last *days* snapshots with per-day deltas. Returns a dict matching the existing API style (``ok``, ``provider``, ``status``, ``message``, …). """ provider = (provider_id or "").strip().lower() if not provider: return { "ok": False, "provider": None, "status": "missing_provider", "message": "Provider parameter is required. Use ?provider=openrouter", } if provider != "openrouter": display_name = _PROVIDER_DISPLAY.get(provider, provider.replace("-", " ").title()) return { "ok": False, "provider": provider, "display_name": display_name, "supported": False, "status": "unsupported", "message": f"Cost history is not available for {display_name}. Only openrouter is supported in this release.", } display_name = _PROVIDER_DISPLAY.get("openrouter", "OpenRouter") api_key = _get_provider_api_key("openrouter") if not api_key: return { "ok": False, "provider": "openrouter", "display_name": display_name, "supported": True, "status": "no_key", "message": "OpenRouter cost history needs an OPENROUTER_API_KEY configured on the server.", } # Fetch current cumulative usage from OpenRouter key_info = _fetch_openrouter_key_usage(api_key) if key_info is None: # Upstream failure — still return any previously persisted snapshots # so the chart degrades gracefully instead of going blank. snapshots = _read_cost_snapshots("openrouter") deltas = _compute_deltas(snapshots, days) return { "ok": False, "provider": "openrouter", "display_name": display_name, "supported": True, "status": "unavailable", "window_days": days, "snapshots": deltas, "limit": None, "label": None, "message": "OpenRouter cost history is temporarily unavailable. Showing last known data.", } # Persist today's snapshot try: snapshots = _append_cost_snapshot("openrouter", key_info["usage"], key_info["limit"]) except Exception: logger.debug("Failed to persist cost snapshot for openrouter", exc_info=True) snapshots = _read_cost_snapshots("openrouter") deltas = _compute_deltas(snapshots, days) return { "ok": True, "provider": "openrouter", "display_name": display_name, "supported": True, "status": "available", "window_days": days, "snapshots": deltas, "limit": key_info.get("limit"), "label": key_info.get("label") or "OpenRouter credits", "message": "OpenRouter cost history loaded.", } # SECTION: Public API def get_providers() -> dict[str, Any]: """Return a list of all known providers with their configuration status. Each entry contains: - ``id``: canonical provider slug - ``display_name``: human-readable name - ``has_key``: whether an API key is configured - ``configurable``: whether the key can be set from the WebUI - ``key_source``: where the key was found (``env_file``, ``env_var``, ``config_yaml``, ``oauth``, ``none``) - ``models``: list of known model IDs for this provider """ providers = [] # Collect all known provider IDs from multiple sources known_ids = set(_PROVIDER_DISPLAY.keys()) | set(_PROVIDER_MODELS.keys()) # Also detect providers from config.yaml providers section cfg = get_config() providers_cfg = cfg.get("providers", {}) if isinstance(providers_cfg, dict): known_ids.update(providers_cfg.keys()) # Add OAuth providers even if not in _PROVIDER_DISPLAY known_ids.update(_OAUTH_PROVIDERS) for pid in sorted(known_ids): display_name = _PROVIDER_DISPLAY.get(pid, pid.replace("-", " ").title()) is_oauth = _provider_is_oauth(pid) has_key = _provider_has_key(pid) # Determine key source key_source = "none" auth_error = None if is_oauth: key_source = "oauth" # Check if actually authenticated via hermes_cli. # IMPORTANT: do not unconditionally overwrite has_key from _provider_has_key(). # A token in config.yaml is a valid credential even when get_auth_status() # returns logged_in=False (e.g. token not in the hermes credential pool, # or refresh token consumed by native Codex CLI / VS Code extension). try: from hermes_cli.auth import get_auth_status as _gas status = _gas(pid) if isinstance(status, dict) and status.get("logged_in"): has_key = True key_source = status.get("key_source", "oauth") elif has_key: # _provider_has_key() found a token in config.yaml — respect it # rather than hiding a working credential from the Settings UI. key_source = "config_yaml" auth_error = status.get("error") if isinstance(status, dict) else None else: has_key = False auth_error = status.get("error") if isinstance(status, dict) else None except Exception: # Import failed or auth check errored — don't override a known-good # key just because the hermes_cli auth module is unavailable. logger.debug("hermes_cli auth check failed for %s", pid, exc_info=True) # keep has_key from _provider_has_key() elif has_key: env_var = _PROVIDER_ENV_VAR.get(pid) if env_var: env_path = _get_hermes_home() / ".env" env_values = _load_env_file(env_path) if _provider_value_counts_as_api_key(pid, env_values.get(env_var)): key_source = "env_file" elif _provider_value_counts_as_api_key(pid, os.getenv(env_var)): key_source = "env_var" else: # Canonical name not set; check legacy aliases (e.g. lmstudio's # pre-#1500 LMSTUDIO_API_KEY) so existing users see "env_file" # instead of being misreported as "config_yaml" when the key # actually lives in .env under the old name. aliased = False for alias in _PROVIDER_ENV_VAR_ALIASES.get(pid, ()) or (): if _provider_value_counts_as_api_key(pid, env_values.get(alias)): key_source = "env_file" aliased = True break if _provider_value_counts_as_api_key(pid, os.getenv(alias)): key_source = "env_var" aliased = True break if not aliased: key_source = "config_yaml" else: key_source = "config_yaml" elif pid not in _PROVIDER_ENV_VAR: # Fallback: provider is not a known API-key provider and not in # the hardcoded _OAUTH_PROVIDERS set. It may be a custom or # newly-added OAuth provider (e.g. Anthropic connected via OAuth). # Check live auth status so the Providers tab agrees with the # model picker (#1212). # # IMPORTANT: we skip providers in _PROVIDER_ENV_VAR because they # are pure API-key providers — calling get_auth_status() for every # unconfigured API-key provider would add unnecessary latency # (network round-trip per provider) on the Settings page. # Validate pid looks like a real provider before probing import re as _re if _re.match(r'^[a-z][a-z0-9_-]{0,63}$', pid): try: from hermes_cli.auth import get_auth_status as _gas status = _gas(pid) if isinstance(status, dict) and status.get("logged_in"): has_key = True # Constrain key_source to a known-safe closed set _raw_ks = status.get("key_source", "") key_source = _raw_ks if _raw_ks in {"oauth", "env", "config", "token"} else "oauth" is_oauth = True except Exception: pass if pid == "openai" and not has_key and _provider_has_shadowed_codex_oauth_value(pid): continue models = list(_PROVIDER_MODELS.get(pid, [])) models_total = len(models) # OpenAI Codex account catalogs drift independently from WebUI releases. # The model picker already prefers hermes_cli + Codex local cache for # this provider (the agent's `provider_model_ids("openai-codex")` filters # IDs with `supported_in_api: false`, but Codex CLI still surfaces some # of those — notably `gpt-5.3-codex-spark` from #1680 — in its picker). # Merge both sources here so the providers card matches the picker # exactly. Static entries remain the offline fallback when live # discovery and the local Codex cache are both unavailable. (#1807 # follow-up to v0.51.19 #1812.) if pid == "openai-codex": live_ids = _read_live_provider_model_ids("openai-codex") for mid in _read_visible_codex_cache_model_ids(): if mid not in live_ids: live_ids.append(mid) live_models = _models_from_live_provider_ids(pid, live_ids) if live_models: models = live_models models_total = len(models) if pid == "xai-oauth": live_models = _models_from_live_provider_ids( pid, _read_live_provider_model_ids("xai-oauth"), ) if live_models: models = live_models models_total = len(models) # Nous Portal: prefer the live catalog so the providers card matches # the dropdown picker (#1538). Same fallback shape as the static-only # case below — when hermes_cli is unavailable or its lookup raises, # we keep the four-entry curated list. # # On large-tier accounts (#1567 reporter Deor saw 396 entries), we # render the same featured subset the picker uses so the providers # card body doesn't become a 396-pill wall. The full count is still # reported via models_total — surfaced in the header line as # "396 models · OAuth" by static/panels.js — so the user knows the # complete catalog is reachable (via /model autocomplete or a future # "show all" disclosure if added). if pid == "nous": try: from hermes_cli.models import provider_model_ids as _provider_model_ids live_ids = _provider_model_ids("nous") or [] if live_ids: # Lazy-import to avoid circular dep with api.config. from api.config import _format_nous_label, _build_nous_featured_set featured_ids, _extras = _build_nous_featured_set(live_ids) models = [ {"id": f"@nous:{mid}", "label": _format_nous_label(mid)} for mid in featured_ids ] models_total = len(live_ids) except Exception: logger.debug("Failed to load Nous Portal models from hermes_cli") # LM Studio: fetch live locally-loaded models so the providers card # matches what's actually available on the user's server (#WebUI). if pid == "lmstudio": try: from hermes_cli.models import provider_model_ids as _pmi lm_live = _pmi("lmstudio") or [] if lm_live: models = [{"id": mid, "label": mid} for mid in lm_live] models_total = len(models) except Exception: logger.debug("Failed to load LM Studio models from hermes_cli") # Also include models from config.yaml providers section if isinstance(providers_cfg, dict): provider_cfg = providers_cfg.get(pid, {}) if isinstance(provider_cfg, dict) and "models" in provider_cfg: cfg_models = provider_cfg["models"] if isinstance(cfg_models, dict): models = models + [{"id": k, "label": k} for k in cfg_models.keys()] elif isinstance(cfg_models, list): models = models + [{"id": k, "label": k} for k in cfg_models] # Recompute models_total when config.yaml contributes additional # entries on top of the live/static catalog. For non-Nous # providers models_total still equals len(models); for Nous # we keep the live count (which already includes any models # surfaced in the curated featured slice). if pid != "nous": models_total = len(models) providers.append({ "id": pid, "display_name": display_name, "has_key": has_key, "configurable": not is_oauth and pid in _PROVIDER_ENV_VAR, "is_oauth": is_oauth, "key_source": key_source, "auth_error": auth_error, "models": models, # models_total reflects the complete catalog size (e.g. 396 for # an enterprise Nous Portal account), even when "models" is # trimmed to a featured subset for UI scannability. The frontend # uses this for the header text "396 models · OAuth" so users # know the full catalog exists and is reachable via the slash # command. For providers that don't trim, models_total == # len(models) and the frontend behaves identically to before. "models_total": models_total, }) # Scan custom_providers from config.yaml (e.g. glmcode, timicc) custom_providers_cfg = cfg.get("custom_providers", []) if isinstance(custom_providers_cfg, list): for cp in custom_providers_cfg: if not isinstance(cp, dict) or not cp.get("name"): continue cp_name = str(cp["name"]).strip() cp_id = _custom_provider_slug_from_name(cp_name) if not cp_id: logger.warning( "Custom provider entry %r produced empty slug; skipping", cp_name, ) continue # Collect models from `models` list or `model` single cp_models = [] if isinstance(cp.get("models"), list): cp_models = [{"id": str(m), "label": str(m)} for m in cp["models"]] elif cp.get("model"): cp_models = [{"id": cp["model"], "label": cp["model"]}] # Check for env var reference (${VAR_NAME} pattern) cp_api_key = str(cp.get("api_key") or "") cp_has_key = bool(cp_api_key.strip()) # Replace env var reference to check actual value if cp_api_key.startswith("${") and cp_api_key.endswith("}"): env_var = cp_api_key[2:-1] cp_has_key = bool(os.getenv(env_var, "").strip()) providers.append({ "id": cp_id, "display_name": cp_name, "has_key": cp_has_key, "configurable": False, # custom providers managed via config.yaml "is_custom": True, "key_source": "config_yaml" if cp_has_key else "none", "models": cp_models, "models_total": len(cp_models), }) # Determine active provider active_provider = None model_cfg = cfg.get("model", {}) if isinstance(model_cfg, dict): active_provider = model_cfg.get("provider") # Sort providers: active first, then custom:*, then has_key, then rest. def _provider_sort_key(p): pid = p.get("id") or "" if pid == active_provider: return (0, pid) if pid.startswith("custom:"): return (1, pid) if p.get("has_key"): return (2, pid) return (3, pid) providers.sort(key=_provider_sort_key) return { "providers": providers, "active_provider": active_provider, } def set_provider_key(provider_id: str, api_key: str | None) -> dict[str, Any]: """Set or update the API key for a provider. Writes the key to ``~/.hermes/.env`` using the standard env var name. If ``api_key`` is None or empty, the key is removed. Returns a status dict with the operation result. """ provider_id = provider_id.strip().lower() if not provider_id: return {"ok": False, "error": "Provider ID is required."} if _provider_is_oauth(provider_id): return { "ok": False, "error": f"'{_PROVIDER_DISPLAY.get(provider_id, provider_id)}' uses OAuth authentication. " f"Use `hermes model` in the terminal to configure it.", } env_var = _PROVIDER_ENV_VAR.get(provider_id) if not env_var: return { "ok": False, "error": f"Cannot configure API key for '{_PROVIDER_DISPLAY.get(provider_id, provider_id)}'. " f"This provider does not have a known env var mapping.", } # Validate API key format (basic sanity check) if api_key: api_key = api_key.strip() if "\n" in api_key or "\r" in api_key: return {"ok": False, "error": "API key must not contain newline characters."} if len(api_key) < 8: return {"ok": False, "error": "API key appears too short."} env_path = _get_hermes_home() / ".env" try: _write_env_file(env_path, {env_var: api_key}) except ValueError as exc: return {"ok": False, "error": str(exc)} except Exception as exc: logger.exception("Failed to write env file for provider %s", provider_id) return {"ok": False, "error": f"Failed to save API key: {exc}"} # Invalidate the model cache so the dropdown refreshes on next request. # Using invalidate_models_cache() instead of reload_config() to avoid # disrupting active streaming sessions that may be reading config.cfg. invalidate_models_cache() return { "ok": True, "provider": provider_id, "display_name": _PROVIDER_DISPLAY.get(provider_id, provider_id), "action": "updated" if api_key else "removed", } def remove_provider_key(provider_id: str) -> dict[str, Any]: """Remove the API key for a provider. Removes the key from ``~/.hermes/.env`` (via ``set_provider_key``) and also cleans up ``config.yaml`` if the key is stored there (``providers..api_key`` or top-level ``model.api_key`` when this provider is the active one). Returns a status dict with the operation result. """ result = set_provider_key(provider_id, None) # Even if the .env removal succeeded, the key might also live in # config.yaml (e.g. providers..api_key or model.api_key). # Clean those up so _provider_has_key() returns False after removal. if result.get("ok"): _clean_provider_key_from_config(provider_id) return result def _clean_provider_key_from_config(provider_id: str) -> None: """Remove provider API key entries from config.yaml. Handles three storage locations: 1. ``providers..api_key`` — per-provider key 2. ``model.api_key`` — top-level key (only if provider is active) 3. ``custom_providers[].api_key`` — custom provider entries Writes back to config.yaml only if something was actually removed. Uses ``_cfg_lock`` to prevent TOCTOU races. """ from api.config import _cfg_lock try: # Resolve through api.config at call time instead of the function imported # at module load. Several tests (and some profile flows) monkeypatch the # config module's path resolver after api.providers has already been # imported; using the stale imported reference can clean the wrong # config.yaml. import api.config as _config config_path = _config._get_config_path() except Exception: return if not config_path.exists(): return try: import yaml as _yaml changed = False with _cfg_lock: raw = config_path.read_text(encoding="utf-8") cfg = _yaml.safe_load(raw) if not isinstance(cfg, dict): return # 1. Clean providers..api_key providers_cfg = cfg.get("providers", {}) if isinstance(providers_cfg, dict): provider_cfg = providers_cfg.get(provider_id, {}) if isinstance(provider_cfg, dict) and provider_cfg.get("api_key"): del provider_cfg["api_key"] changed = True # 2. Clean model.api_key — only if this provider is the active one model_cfg = cfg.get("model", {}) if isinstance(model_cfg, dict) and model_cfg.get("api_key"): active_provider = model_cfg.get("provider") if active_provider and str(active_provider).strip().lower() == provider_id.lower(): del model_cfg["api_key"] changed = True # 3. Clean custom_providers[].api_key custom_providers = cfg.get("custom_providers", []) if isinstance(custom_providers, list): for cp in custom_providers: if isinstance(cp, dict): if _custom_provider_name_matches(provider_id, cp.get("name")): if cp.get("api_key"): del cp["api_key"] changed = True if changed: _save_yaml_config_file(config_path, cfg) # Sync in-memory cache and bust model TTL cache # MUST be called outside _cfg_lock to avoid deadlock: # _cfg_lock is a threading.Lock (non-reentrant) and # reload_config() also acquires _cfg_lock internally. if changed: reload_config() except Exception: logger.exception("Failed to clean provider key from config.yaml for %s", provider_id)