Files
2026-05-14 12:13:49 +08:00

1125 lines
44 KiB
Python

"""
Hermes Web UI -- Profile state management.
Wraps hermes_cli.profiles to provide profile switching for the web UI.
The web UI maintains a process-level "active profile" that determines which
HERMES_HOME directory is used for config, skills, memory, cron, and API keys.
Profile switches update os.environ['HERMES_HOME'] and monkey-patch module-level
cached paths in hermes-agent modules (skills_tool, skill_manager_tool,
cron/jobs) that snapshot HERMES_HOME at import time.
"""
import json
import logging
import os
import re
import shutil
import sys
import threading
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# ── Constants (match hermes_cli.profiles upstream) ─────────────────────────
_PROFILE_ID_RE = re.compile(r'^[a-z0-9][a-z0-9_-]{0,63}$')
_PROFILE_DIRS = [
'memories', 'sessions', 'skills', 'skins',
'logs', 'plans', 'workspace', 'cron',
]
_CLONE_CONFIG_FILES = ['config.yaml', '.env', 'SOUL.md']
# ── Module state ────────────────────────────────────────────────────────────
_active_profile = 'default'
_profile_lock = threading.Lock()
_loaded_profile_env_keys: set[str] = set()
# Thread-local profile context: set per-request by server.py, cleared after.
# Enables per-client profile isolation (issue #798) — each HTTP request thread
# reads its own profile from the hermes_profile cookie instead of the
# process-global _active_profile.
_tls = threading.local()
_SKILL_HOME_MODULES = ("tools.skills_tool", "tools.skill_manager_tool")
def patch_skill_home_modules(home: Path) -> None:
"""Patch imported skill modules that cache HERMES_HOME at import time."""
for module_name in _SKILL_HOME_MODULES:
module = sys.modules.get(module_name)
if module is None:
continue
try:
module.HERMES_HOME = home
module.SKILLS_DIR = home / "skills"
except AttributeError:
logger.debug("Failed to patch %s module", module_name)
def _unwrap_profile_home_to_base(home: Path) -> Path:
"""Return the base Hermes home when *home* is already a named profile dir."""
if home.parent.name == 'profiles':
return home.parent.parent
return home
def _resolve_base_hermes_home() -> Path:
"""Return the BASE ~/.hermes directory — the root that contains profiles/.
This is intentionally distinct from HERMES_HOME, which tracks the *active
profile's* home and changes on every profile switch. The base dir must
always point to the top-level .hermes regardless of which profile is active.
Resolution order:
1. HERMES_BASE_HOME env var (set explicitly, highest priority)
2. HERMES_HOME env var — but only if it does NOT look like a profile subdir
(i.e. its parent is not named 'profiles'). This handles test isolation
where HERMES_HOME is set to an isolated test state dir.
3. ~/.hermes (always-correct default)
The bug this prevents: if HERMES_HOME has already been mutated to
/home/user/.hermes/profiles/webui (by init_profile_state at startup),
reading it here would make _DEFAULT_HERMES_HOME point to that subdir,
causing switch_profile('webui') to look for
/home/user/.hermes/profiles/webui/profiles/webui — which doesn't exist.
HERMES_BASE_HOME normally points at the base home already, but isolated
single-profile WebUI deployments can provide /base/profiles/<name> there as
well. Normalize both env vars through the same helper so active-profile
and per-request resolution share one base-root contract (#749).
"""
# Explicit override for tests or unusual setups
base_override = os.getenv('HERMES_BASE_HOME', '').strip()
if base_override:
return _unwrap_profile_home_to_base(Path(base_override).expanduser())
hermes_home = os.getenv('HERMES_HOME', '').strip()
if hermes_home:
p = Path(hermes_home).expanduser()
# If HERMES_HOME points to a profiles/ subdir, walk up two levels to the base
return _unwrap_profile_home_to_base(p)
return Path.home() / '.hermes'
_DEFAULT_HERMES_HOME = _resolve_base_hermes_home()
def _read_active_profile_file() -> str:
"""Read the sticky active profile from ~/.hermes/active_profile."""
ap_file = _DEFAULT_HERMES_HOME / 'active_profile'
if ap_file.exists():
try:
name = ap_file.read_text(encoding="utf-8").strip()
if name:
return name
except Exception:
logger.debug("Failed to read active profile file")
return 'default'
# ── Public API ──────────────────────────────────────────────────────────────
# ── Root-profile resolution (#1612) ────────────────────────────────────────
#
# Hermes Agent allows the root/default profile (~/.hermes itself) to have a
# display name other than the legacy literal 'default'. When that happens,
# WebUI must NOT resolve the display name as ~/.hermes/profiles/<name> — that
# directory doesn't exist, and every site that does `if name == 'default':`
# will fall through to the wrong filesystem path.
#
# `_is_root_profile(name)` answers "does this name resolve to ~/.hermes?" and
# is the canonical replacement for scattered `if name == 'default':` checks
# in switch_profile, get_active_hermes_home, _validate_profile_name, etc.
#
# Cost note: list_profiles_api() shells out via hermes_cli (non-trivial), so
# we memoize the lookup. The cache is invalidated whenever profiles are
# created, deleted, renamed, or cloned — i.e. on every mutation site we
# control.
_root_profile_name_cache: set[str] = {'default'}
_root_profile_name_cache_lock = threading.Lock()
_root_profile_name_cache_loaded = False
def _invalidate_root_profile_cache() -> None:
"""Drop the memoized root-profile-name set.
Called whenever profile metadata might have changed: create, clone,
delete, rename. The next _is_root_profile() call repopulates from
list_profiles_api().
"""
global _root_profile_name_cache_loaded
with _root_profile_name_cache_lock:
_root_profile_name_cache.clear()
_root_profile_name_cache.add('default')
_root_profile_name_cache_loaded = False
def _is_root_profile(name: str) -> bool:
"""True if *name* resolves to the Hermes Agent root profile (~/.hermes).
Matches the legacy 'default' alias plus any name where list_profiles_api()
reports is_default=True. Memoized; call _invalidate_root_profile_cache()
after mutating profile metadata.
"""
global _root_profile_name_cache_loaded
if not name:
return False
if name == 'default':
return True
with _root_profile_name_cache_lock:
if _root_profile_name_cache_loaded:
return name in _root_profile_name_cache
# Cache miss — populate from list_profiles_api(). Done outside the lock to
# avoid holding it across a hermes_cli subprocess call.
try:
infos = list_profiles_api()
except Exception:
logger.debug("Failed to list profiles for root-profile lookup", exc_info=True)
return False
with _root_profile_name_cache_lock:
_root_profile_name_cache.clear()
_root_profile_name_cache.add('default')
for p in infos:
try:
if p.get('is_default') and p.get('name'):
_root_profile_name_cache.add(p['name'])
except (AttributeError, TypeError):
continue
_root_profile_name_cache_loaded = True
return name in _root_profile_name_cache
def _profiles_match(row_profile, active_profile) -> bool:
"""Return True if a session/project row's profile matches the active profile.
Treats both the literal alias 'default' and any renamed-root display name
(per _is_root_profile) as equivalent, so legacy rows tagged 'default'
still surface when the user has renamed the root profile to e.g. 'kinni',
and vice versa.
A row with no profile (`None` or empty string) is treated as belonging to
the root profile — that's the convention used by the legacy backfill at
api/models.py::all_sessions, and matches the default seen in
`static/sessions.js` (`S.activeProfile||'default'`).
Originally lived in api/routes.py; relocated here so both routes.py and
out-of-process consumers (mcp_server.py) can import the canonical helper
instead of duplicating the body. See #1614 for the visibility model.
"""
row = row_profile or 'default'
active = active_profile or 'default'
if row == active:
return True
# Cross-alias the renamed root.
if _is_root_profile(row) and _is_root_profile(active):
return True
return False
def get_active_profile_name() -> str:
"""Return the currently active profile name.
Priority:
1. Thread-local (set per-request from hermes_profile cookie) — issue #798
2. Process-level default (_active_profile)
"""
tls_name = getattr(_tls, 'profile', None)
if tls_name is not None:
return tls_name
return _active_profile
def set_request_profile(name: str) -> None:
"""Set the per-request profile context for this thread.
Called by server.py at the start of each request when a hermes_profile
cookie is present. Always paired with clear_request_profile() in a
finally block so the thread-local is released after the request.
"""
_tls.profile = name
def clear_request_profile() -> None:
"""Clear the per-request profile context for this thread.
Called by server.py in the finally block of do_GET / do_POST.
Safe to call even if set_request_profile() was never called.
"""
_tls.profile = None
def _resolve_profile_home_for_name(name: str) -> Path:
"""Resolve a logical profile name to its Hermes home path.
Root/default aliases resolve to _DEFAULT_HERMES_HOME. Valid named profiles
resolve to _DEFAULT_HERMES_HOME/profiles/<name> even when the directory has
not been created yet; the agent layer may create it on first use. Invalid
names fall back to the base home so traversal-shaped cookie values cannot
influence filesystem paths.
"""
if not name or _is_root_profile(name):
return _DEFAULT_HERMES_HOME
if not _PROFILE_ID_RE.fullmatch(name):
return _DEFAULT_HERMES_HOME
return _resolve_named_profile_home(name)
def get_active_hermes_home() -> Path:
"""Return the HERMES_HOME path for the currently active profile.
Uses get_active_profile_name() so per-request TLS context (issue #798)
is respected, not just the process-level global.
"""
return _resolve_profile_home_for_name(get_active_profile_name())
# ── Cron-call profile isolation (issue: Scheduled jobs ignored active profile) ─
# `cron.jobs` reads HERMES_HOME from os.environ (process-global) at function-
# call time. That bypasses our per-request thread-local profile, so the
# `/api/crons*` endpoints always returned the process-default profile's jobs.
# This context manager swaps HERMES_HOME (and the cached module-level constants
# in cron.jobs) for the duration of a cron call, serialized by a lock so
# concurrent requests from different profiles don't race on the global env var.
#
# Thread-safety note on os.environ mutation:
# CPython's os.environ assignment is GIL-protected at the bytecode level, but
# multi-step read-modify-write sequences (snapshot prev → assign new → restore
# on exit) are NOT atomic without explicit serialization. The _cron_env_lock
# below makes the entire context-manager body run-to-completion serially, so
# all webui access to HERMES_HOME goes through one thread at a time. Any
# subprocess.Popen() call inside `run_job` inherits the env at fork time,
# which is also under the lock — so child processes always see a consistent
# (own-profile) HERMES_HOME, never a half-swapped state.
_cron_env_lock = threading.Lock()
def _cron_profile_context_depth() -> int:
return int(getattr(_tls, 'cron_profile_depth', 0) or 0)
def _push_cron_profile_context_depth() -> None:
_tls.cron_profile_depth = _cron_profile_context_depth() + 1
def _pop_cron_profile_context_depth() -> None:
depth = _cron_profile_context_depth()
_tls.cron_profile_depth = max(0, depth - 1)
def _home_for_scheduled_cron_job(job: dict) -> Path:
"""Resolve the profile home an auto-fired scheduler job should execute in.
Legacy jobs with no profile keep the scheduler's server-default profile.
Jobs pinned to a named profile execute under that profile's HERMES_HOME, so
an in-process WebUI scheduler thread does not leak process-global config or
.env into the agent run. If a profile was deleted after the job was saved,
fall back to the server default rather than crashing every scheduler tick.
"""
raw = str((job or {}).get('profile') or '').strip()
if not raw:
return get_active_hermes_home()
if _is_root_profile(raw):
return _DEFAULT_HERMES_HOME
if not _PROFILE_ID_RE.fullmatch(raw):
logger.warning(
"Cron job %s has invalid profile %r; falling back to server default",
(job or {}).get('id', '?'), raw,
)
return get_active_hermes_home()
home = _resolve_named_profile_home(raw)
if not home.is_dir():
logger.warning(
"Cron job %s references missing profile %r; falling back to server default",
(job or {}).get('id', '?'), raw,
)
return get_active_hermes_home()
return home
def install_cron_scheduler_profile_isolation() -> None:
"""Patch cron.scheduler.run_job for WebUI in-process scheduler safety.
Standard WebUI deployments do not start the scheduler thread in-process, but
if a future/single-process deployment calls cron.scheduler.tick() from the
WebUI worker, tick's background job path has no request TLS context. Wrap
run_job so each auto-fired job's persisted ``profile`` field gets the same
HERMES_HOME isolation as the manual /api/crons/run path.
"""
try:
import cron.scheduler as _cs
except ImportError:
logger.debug("install_cron_scheduler_profile_isolation: cron.scheduler unavailable")
return
original = getattr(_cs, 'run_job', None)
if original is None or getattr(original, '_webui_profile_isolated', False):
return
def _webui_profile_isolated_run_job(job, *args, **kwargs):
# Manual WebUI runs already enter cron_profile_context_for_home before
# calling run_job. Avoid nesting the non-reentrant env lock or changing
# the explicitly selected manual execution profile.
if _cron_profile_context_depth() > 0:
return original(job, *args, **kwargs)
with cron_profile_context_for_home(_home_for_scheduled_cron_job(job)):
return original(job, *args, **kwargs)
_webui_profile_isolated_run_job._webui_profile_isolated = True
_webui_profile_isolated_run_job._webui_original_run_job = original
_cs.run_job = _webui_profile_isolated_run_job
class cron_profile_context_for_home:
"""Context manager that pins HERMES_HOME to an explicit profile home path.
Use this variant from worker threads that don't have TLS context (e.g. the
background thread started by /api/crons/run). The HTTP-side variant below
resolves the home via TLS.
"""
def __init__(self, home: Path):
self._home = Path(home)
def __enter__(self):
_cron_env_lock.acquire()
_push_cron_profile_context_depth()
try:
self._prev_env = os.environ.get('HERMES_HOME')
os.environ['HERMES_HOME'] = str(self._home)
# Re-patch cron.jobs module-level constants (see main context manager
# below for the rationale).
self._prev_cj = None
try:
import cron.jobs as _cj
self._prev_cj = (_cj.HERMES_DIR, _cj.CRON_DIR, _cj.JOBS_FILE, _cj.OUTPUT_DIR)
_cj.HERMES_DIR = self._home
_cj.CRON_DIR = self._home / 'cron'
_cj.JOBS_FILE = _cj.CRON_DIR / 'jobs.json'
_cj.OUTPUT_DIR = _cj.CRON_DIR / 'output'
except (ImportError, AttributeError):
logger.debug("cron_profile_context_for_home: cron.jobs unavailable")
# cron.scheduler snapshots _hermes_home at import time and run_job()
# reads config/.env from that module global. Patch it alongside
# cron.jobs so manual WebUI runs actually execute under the selected
# profile, not merely write output metadata there (#617).
self._prev_cs = None
try:
import cron.scheduler as _cs
self._prev_cs = (
getattr(_cs, '_hermes_home', None),
getattr(_cs, '_LOCK_DIR', None),
getattr(_cs, '_LOCK_FILE', None),
)
_cs._hermes_home = self._home
_cs._LOCK_DIR = self._home / 'cron'
_cs._LOCK_FILE = _cs._LOCK_DIR / '.tick.lock'
except (ImportError, AttributeError):
logger.debug("cron_profile_context_for_home: cron.scheduler unavailable")
except Exception:
_pop_cron_profile_context_depth()
_cron_env_lock.release()
raise
return self
def __exit__(self, exc_type, exc_val, exc_tb):
try:
if self._prev_env is None:
os.environ.pop('HERMES_HOME', None)
else:
os.environ['HERMES_HOME'] = self._prev_env
if self._prev_cj is not None:
try:
import cron.jobs as _cj
_cj.HERMES_DIR, _cj.CRON_DIR, _cj.JOBS_FILE, _cj.OUTPUT_DIR = self._prev_cj
except (ImportError, AttributeError):
pass
if getattr(self, '_prev_cs', None) is not None:
try:
import cron.scheduler as _cs
_cs._hermes_home, _cs._LOCK_DIR, _cs._LOCK_FILE = self._prev_cs
except (ImportError, AttributeError):
pass
finally:
_pop_cron_profile_context_depth()
_cron_env_lock.release()
return False
class cron_profile_context:
"""Context manager that pins HERMES_HOME to the TLS-active profile.
Usage:
with cron_profile_context():
from cron.jobs import list_jobs
jobs = list_jobs(include_disabled=True)
Serializes cron API calls across profiles (cron API is low-frequency;
serialization cost is negligible compared to correctness).
"""
def __enter__(self):
_cron_env_lock.acquire()
_push_cron_profile_context_depth()
try:
self._prev_env = os.environ.get('HERMES_HOME')
home = get_active_hermes_home()
os.environ['HERMES_HOME'] = str(home)
# Re-patch cron.jobs module-level constants. They are snapshot at
# import time (line 68-71 of cron/jobs.py) and don't participate in
# the module's __getattr__ lazy path, so env-var alone is not enough
# for callers that reference the module constants directly.
self._prev_cj = None
try:
import cron.jobs as _cj
self._prev_cj = (_cj.HERMES_DIR, _cj.CRON_DIR, _cj.JOBS_FILE, _cj.OUTPUT_DIR)
_cj.HERMES_DIR = home
_cj.CRON_DIR = home / 'cron'
_cj.JOBS_FILE = _cj.CRON_DIR / 'jobs.json'
_cj.OUTPUT_DIR = _cj.CRON_DIR / 'output'
except (ImportError, AttributeError):
logger.debug("cron_profile_context: cron.jobs unavailable; env-var only")
self._prev_cs = None
try:
import cron.scheduler as _cs
self._prev_cs = (
getattr(_cs, '_hermes_home', None),
getattr(_cs, '_LOCK_DIR', None),
getattr(_cs, '_LOCK_FILE', None),
)
_cs._hermes_home = home
_cs._LOCK_DIR = home / 'cron'
_cs._LOCK_FILE = _cs._LOCK_DIR / '.tick.lock'
except (ImportError, AttributeError):
logger.debug("cron_profile_context: cron.scheduler unavailable; env-var only")
except Exception:
_pop_cron_profile_context_depth()
_cron_env_lock.release()
raise
return self
def __exit__(self, exc_type, exc_val, exc_tb):
try:
# Restore env var
if self._prev_env is None:
os.environ.pop('HERMES_HOME', None)
else:
os.environ['HERMES_HOME'] = self._prev_env
# Restore cron.jobs module constants
if self._prev_cj is not None:
try:
import cron.jobs as _cj
_cj.HERMES_DIR, _cj.CRON_DIR, _cj.JOBS_FILE, _cj.OUTPUT_DIR = self._prev_cj
except (ImportError, AttributeError):
pass
if getattr(self, '_prev_cs', None) is not None:
try:
import cron.scheduler as _cs
_cs._hermes_home, _cs._LOCK_DIR, _cs._LOCK_FILE = self._prev_cs
except (ImportError, AttributeError):
pass
finally:
_pop_cron_profile_context_depth()
_cron_env_lock.release()
return False
def get_hermes_home_for_profile(name: str) -> Path:
"""Return the HERMES_HOME Path for *name* without mutating any process state.
Safe to call from per-request context (streaming, session creation) because
it reads only the filesystem — it never touches os.environ, module-level
cached paths, or the process-level _active_profile global.
Falls back to _DEFAULT_HERMES_HOME (same as 'default') when *name* is None,
empty, 'default', or does not match the profile-name format (rejects path
traversal such as '../../etc').
"""
return _resolve_profile_home_for_name(name)
_TERMINAL_ENV_MAPPINGS = {
'backend': 'TERMINAL_ENV',
'env_type': 'TERMINAL_ENV',
'cwd': 'TERMINAL_CWD',
'timeout': 'TERMINAL_TIMEOUT',
'lifetime_seconds': 'TERMINAL_LIFETIME_SECONDS',
'modal_mode': 'TERMINAL_MODAL_MODE',
'docker_image': 'TERMINAL_DOCKER_IMAGE',
'docker_forward_env': 'TERMINAL_DOCKER_FORWARD_ENV',
'docker_env': 'TERMINAL_DOCKER_ENV',
'docker_mount_cwd_to_workspace': 'TERMINAL_DOCKER_MOUNT_CWD_TO_WORKSPACE',
'singularity_image': 'TERMINAL_SINGULARITY_IMAGE',
'modal_image': 'TERMINAL_MODAL_IMAGE',
'daytona_image': 'TERMINAL_DAYTONA_IMAGE',
'container_cpu': 'TERMINAL_CONTAINER_CPU',
'container_memory': 'TERMINAL_CONTAINER_MEMORY',
'container_disk': 'TERMINAL_CONTAINER_DISK',
'container_persistent': 'TERMINAL_CONTAINER_PERSISTENT',
'docker_volumes': 'TERMINAL_DOCKER_VOLUMES',
'persistent_shell': 'TERMINAL_PERSISTENT_SHELL',
'ssh_host': 'TERMINAL_SSH_HOST',
'ssh_user': 'TERMINAL_SSH_USER',
'ssh_port': 'TERMINAL_SSH_PORT',
'ssh_key': 'TERMINAL_SSH_KEY',
'ssh_persistent': 'TERMINAL_SSH_PERSISTENT',
'local_persistent': 'TERMINAL_LOCAL_PERSISTENT',
}
def _stringify_env_value(value) -> str:
if isinstance(value, bool):
return 'true' if value else 'false'
if isinstance(value, (list, dict)):
return json.dumps(value)
return str(value)
def get_profile_runtime_env(home: Path) -> dict[str, str]:
"""Return env vars needed to run an agent turn for a profile home.
WebUI profile switching is per-client/cookie scoped, so it intentionally
does not call ``switch_profile(..., process_wide=True)`` for every browser.
Agent/tool code still consumes terminal backend settings through
environment variables (matching ``hermes -p <profile>``), so streaming must
apply the selected profile's terminal config and ``.env`` for the duration
of that run.
"""
home = Path(home).expanduser()
env: dict[str, str] = {}
try:
import yaml as _yaml
cfg_path = home / 'config.yaml'
cfg = _yaml.safe_load(cfg_path.read_text(encoding='utf-8')) if cfg_path.exists() else {}
if not isinstance(cfg, dict):
cfg = {}
except Exception:
cfg = {}
terminal_cfg = cfg.get('terminal', {}) if isinstance(cfg, dict) else {}
if isinstance(terminal_cfg, dict):
for key, env_key in _TERMINAL_ENV_MAPPINGS.items():
if key in terminal_cfg and terminal_cfg[key] is not None:
env[env_key] = _stringify_env_value(terminal_cfg[key])
env_path = home / '.env'
if env_path.exists():
try:
for line in env_path.read_text(encoding='utf-8').splitlines():
line = line.strip()
if line and not line.startswith('#') and '=' in line:
k, v = line.split('=', 1)
k = k.strip()
v = v.strip().strip('"').strip("'")
if k and v:
env[k] = v
except Exception:
logger.debug("Failed to read runtime env from %s", env_path)
return env
def _set_hermes_home(home: Path):
"""Set HERMES_HOME env var and monkey-patch cached module-level paths."""
os.environ['HERMES_HOME'] = str(home)
patch_skill_home_modules(home)
# Patch cron/jobs module-level cache
try:
import cron.jobs as _cj
_cj.HERMES_DIR = home
_cj.CRON_DIR = home / 'cron'
_cj.JOBS_FILE = _cj.CRON_DIR / 'jobs.json'
_cj.OUTPUT_DIR = _cj.CRON_DIR / 'output'
except (ImportError, AttributeError):
logger.debug("Failed to patch cron.jobs module")
try:
import cron.scheduler as _cs
_cs._hermes_home = home
_cs._LOCK_DIR = home / 'cron'
_cs._LOCK_FILE = _cs._LOCK_DIR / '.tick.lock'
except (ImportError, AttributeError):
logger.debug("Failed to patch cron.scheduler module")
def _reload_dotenv(home: Path):
"""Load .env from the profile dir into os.environ with profile isolation.
Clears env vars that were loaded from the previously active profile before
applying the current profile's .env. This prevents API keys and other
profile-scoped secrets from leaking across profile switches.
"""
global _loaded_profile_env_keys
# Remove keys loaded from the previous profile first.
for key in list(_loaded_profile_env_keys):
os.environ.pop(key, None)
_loaded_profile_env_keys = set()
env_path = home / '.env'
if not env_path.exists():
return
try:
loaded_keys: set[str] = set()
for line in env_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if line and not line.startswith('#') and '=' in line:
k, v = line.split('=', 1)
k = k.strip()
v = v.strip().strip('"').strip("'")
if k and v:
os.environ[k] = v
loaded_keys.add(k)
_loaded_profile_env_keys = loaded_keys
except Exception:
_loaded_profile_env_keys = set()
logger.debug("Failed to reload dotenv from %s", env_path)
def init_profile_state() -> None:
"""Initialize profile state at server startup.
Reads ~/.hermes/active_profile, sets HERMES_HOME env var, patches
module-level cached paths. Called once from config.py after imports.
"""
global _active_profile
_active_profile = _read_active_profile_file()
home = get_active_hermes_home()
_set_hermes_home(home)
install_cron_scheduler_profile_isolation()
_reload_dotenv(home)
def switch_profile(name: str, *, process_wide: bool = True) -> dict:
"""Switch the active profile.
Validates the profile exists, updates process state, patches module caches,
reloads .env, and reloads config.yaml.
Args:
name: Profile name to switch to.
process_wide: If True (default), updates the process-global
_active_profile. Set to False for per-client switches from the
WebUI where the profile is managed via cookie + thread-local (#798).
Returns: {'profiles': [...], 'active': name}
Raises ValueError if profile doesn't exist or agent is busy.
"""
global _active_profile
# Import here to avoid circular import at module load
from api.config import STREAMS, STREAMS_LOCK, reload_config
# Process-wide profile switches mutate HERMES_HOME, module-level path caches,
# os.environ-backed .env keys, and the global config cache. Keep those blocked
# while any agent stream is active. Per-client WebUI switches are cookie/TLS
# scoped (process_wide=False) and do not mutate those globals, so users can
# leave a running session in one profile and start work in another (#1700).
if process_wide:
with STREAMS_LOCK:
if len(STREAMS) > 0:
raise RuntimeError(
'Cannot switch profiles while an agent is running. '
'Cancel or wait for it to finish.'
)
# Resolve profile directory
if _is_root_profile(name):
home = _DEFAULT_HERMES_HOME
else:
home = _resolve_named_profile_home(name)
if not home.is_dir():
raise ValueError(f"Profile '{name}' does not exist.")
with _profile_lock:
if process_wide:
global _active_profile
_active_profile = name
_set_hermes_home(home)
_reload_dotenv(home)
if process_wide:
# Write sticky default for CLI consistency
try:
ap_file = _DEFAULT_HERMES_HOME / 'active_profile'
ap_file.write_text('' if _is_root_profile(name) else name, encoding='utf-8')
except Exception:
logger.debug("Failed to write active profile file")
# Reload config.yaml from the new profile
reload_config()
# Return profile-specific defaults so frontend can apply them.
# For process_wide=False (per-client switch), read the target profile's
# config.yaml directly from disk rather than from _cfg_cache (process-global),
# since reload_config() was intentionally skipped.
if process_wide:
from api.config import get_config
cfg = get_config()
else:
# Direct disk read — does not touch _cfg_cache
try:
import yaml as _yaml
cfg_path = home / 'config.yaml'
cfg = _yaml.safe_load(cfg_path.read_text(encoding='utf-8')) if cfg_path.exists() else {}
if not isinstance(cfg, dict):
cfg = {}
except Exception:
cfg = {}
model_cfg = cfg.get('model', {})
default_model = None
if isinstance(model_cfg, str):
default_model = model_cfg
elif isinstance(model_cfg, dict):
default_model = model_cfg.get('default')
# Read the target profile's workspace directly from *home* rather than via
# get_last_workspace() which routes through the thread-local/process-global active
# profile — both of which still point to the OLD profile during process_wide=False
# switches (the Set-Cookie has been sent but hasn't been processed by a new request
# yet). We derive workspace in priority order:
# 1. {home}/webui_state/last_workspace.txt (previously chosen workspace for this profile)
# 2. cfg terminal.cwd / workspace / default_workspace keys
# 3. Boot-time DEFAULT_WORKSPACE constant
# Use the module-level ``Path`` (imported at line 17) rather than re-importing
# it locally — keeps the exception fallback simple and avoids a latent NameError
# if a future refactor moves the inner imports.
default_workspace = None
try:
from api.config import DEFAULT_WORKSPACE as _DW
lw_file = home / 'webui_state' / 'last_workspace.txt'
if lw_file.exists():
_p = lw_file.read_text(encoding='utf-8').strip()
if _p:
_pp = Path(_p).expanduser()
if _pp.is_dir():
default_workspace = str(_pp.resolve())
if default_workspace is None:
for _key in ('workspace', 'default_workspace'):
_v = cfg.get(_key)
if _v:
_pp = Path(str(_v)).expanduser().resolve()
if _pp.is_dir():
default_workspace = str(_pp)
break
if default_workspace is None:
_tc = cfg.get('terminal', {})
if isinstance(_tc, dict):
_cwd = _tc.get('cwd', '')
if _cwd and str(_cwd) not in ('.', ''):
_pp = Path(str(_cwd)).expanduser().resolve()
if _pp.is_dir():
default_workspace = str(_pp)
if default_workspace is None:
default_workspace = str(_DW)
except Exception:
try:
from api.config import DEFAULT_WORKSPACE as _DW2
default_workspace = str(_DW2)
except Exception:
default_workspace = str(Path.home())
return {
'profiles': list_profiles_api(),
'active': name,
'default_model': default_model,
'default_workspace': default_workspace,
}
def list_profiles_api() -> list:
"""List all profiles with metadata, serialized for JSON response."""
try:
from hermes_cli.profiles import list_profiles
infos = list_profiles()
except ImportError:
# hermes_cli not available -- return just the default
return [_default_profile_dict()]
active = get_active_profile_name()
result = []
for p in infos:
result.append({
'name': p.name,
'path': str(p.path),
'is_default': p.is_default,
'is_active': p.name == active,
'gateway_running': p.gateway_running,
'model': p.model,
'provider': p.provider,
'has_env': p.has_env,
'skill_count': p.skill_count,
})
return result
def _default_profile_dict() -> dict:
"""Fallback profile dict when hermes_cli is not importable."""
return {
'name': 'default',
'path': str(_DEFAULT_HERMES_HOME),
'is_default': True,
'is_active': True,
'gateway_running': False,
'model': None,
'provider': None,
'has_env': (_DEFAULT_HERMES_HOME / '.env').exists(),
'skill_count': 0,
}
def _validate_profile_name(name: str):
"""Validate profile name format (matches hermes_cli.profiles upstream)."""
if name == 'default':
raise ValueError("Cannot create a profile named 'default' -- it is the built-in profile.")
# Use fullmatch (not match) so a trailing newline can't sneak past the $ anchor
if not _PROFILE_ID_RE.fullmatch(name):
raise ValueError(
f"Invalid profile name {name!r}. "
"Must match [a-z0-9][a-z0-9_-]{0,63}"
)
def _profiles_root() -> Path:
"""Return the canonical root that contains named profiles."""
return (_DEFAULT_HERMES_HOME / 'profiles').resolve()
def _resolve_named_profile_home(name: str) -> Path:
"""Resolve a named profile to a directory under the profiles root.
Validates *name* as a logical profile identifier first, then resolves the
final filesystem path and enforces containment under ~/.hermes/profiles.
"""
_validate_profile_name(name)
profiles_root = _profiles_root()
candidate = (profiles_root / name).resolve()
candidate.relative_to(profiles_root)
return candidate
def _create_profile_fallback(name: str, clone_from: str = None,
clone_config: bool = False) -> Path:
"""Create a profile directory without hermes_cli (Docker/standalone fallback)."""
profile_dir = _DEFAULT_HERMES_HOME / 'profiles' / name
if profile_dir.exists():
raise FileExistsError(f"Profile '{name}' already exists.")
# Bootstrap directory structure (exist_ok=False so a concurrent create raises)
profile_dir.mkdir(parents=True, exist_ok=False)
for subdir in _PROFILE_DIRS:
(profile_dir / subdir).mkdir(parents=True, exist_ok=True)
# Clone config files from source profile if requested
if clone_config and clone_from:
if _is_root_profile(clone_from):
source_dir = _DEFAULT_HERMES_HOME
else:
source_dir = _DEFAULT_HERMES_HOME / 'profiles' / clone_from
if source_dir.is_dir():
for filename in _CLONE_CONFIG_FILES:
src = source_dir / filename
if src.exists():
shutil.copy2(src, profile_dir / filename)
return profile_dir
def _write_endpoint_to_config(profile_dir: Path, base_url: str = None, api_key: str = None) -> None:
"""Write custom endpoint fields into config.yaml for a profile."""
if not base_url and not api_key:
return
config_path = profile_dir / 'config.yaml'
try:
import yaml as _yaml
except ImportError:
return
cfg = {}
if config_path.exists():
try:
loaded = _yaml.safe_load(config_path.read_text(encoding="utf-8"))
if isinstance(loaded, dict):
cfg = loaded
except Exception:
logger.debug("Failed to load config from %s", config_path)
model_section = cfg.get('model', {})
if not isinstance(model_section, dict):
model_section = {}
if base_url:
model_section['base_url'] = base_url
if api_key:
model_section['api_key'] = api_key
cfg['model'] = model_section
config_path.write_text(_yaml.dump(cfg, default_flow_style=False, allow_unicode=True), encoding='utf-8')
def _clean_profile_config_value(value: Optional[str], field: str) -> Optional[str]:
"""Return a safe single-line config value or raise ValueError."""
if value is None:
return None
cleaned = str(value).strip()
if not cleaned:
return None
if any(ch in cleaned for ch in ("\x00", "\r", "\n")):
raise ValueError(f"{field} must be a single-line value")
if len(cleaned) > 512:
raise ValueError(f"{field} is too long")
return cleaned
def _split_webui_provider_model_value(default_model: Optional[str], model_provider: Optional[str]) -> tuple[Optional[str], Optional[str]]:
"""Normalize WebUI-internal @provider:model picker values for config.yaml."""
model = _clean_profile_config_value(default_model, "default_model")
provider = _clean_profile_config_value(model_provider, "model_provider")
if model and model.startswith("@") and ":" in model:
provider_part, model_part = model[1:].rsplit(":", 1)
provider = provider or _clean_profile_config_value(provider_part, "model_provider")
model = _clean_profile_config_value(model_part, "default_model")
return model, provider
def _write_model_defaults_to_config(
profile_dir: Path,
*,
default_model: Optional[str] = None,
model_provider: Optional[str] = None,
) -> None:
"""Write model default/provider fields into config.yaml for a profile."""
default_model, model_provider = _split_webui_provider_model_value(default_model, model_provider)
if not default_model and not model_provider:
return
config_path = profile_dir / 'config.yaml'
try:
import yaml as _yaml
except ImportError:
return
cfg = {}
if config_path.exists():
try:
loaded = _yaml.safe_load(config_path.read_text(encoding="utf-8"))
if isinstance(loaded, dict):
cfg = loaded
except Exception:
logger.debug("Failed to load config from %s", config_path)
model_section = cfg.get('model', {})
if not isinstance(model_section, dict):
model_section = {}
if default_model:
model_section['default'] = default_model
if model_provider:
model_section['provider'] = model_provider
cfg['model'] = model_section
config_path.write_text(_yaml.dump(cfg, default_flow_style=False, allow_unicode=True), encoding='utf-8')
def create_profile_api(name: str, clone_from: str = None,
clone_config: bool = False,
base_url: str = None,
api_key: str = None,
default_model: str = None,
model_provider: str = None) -> dict:
"""Create a new profile. Returns the new profile info dict."""
_validate_profile_name(name)
# Defense-in-depth: validate clone_from here too, even though routes.py
# also validates it. Any caller that bypasses the HTTP layer gets protection.
if clone_from is not None and not _is_root_profile(clone_from):
_validate_profile_name(clone_from)
default_model, model_provider = _split_webui_provider_model_value(default_model, model_provider)
try:
from hermes_cli.profiles import create_profile
create_profile(
name,
clone_from=clone_from,
clone_config=clone_config,
clone_all=False,
no_alias=True,
)
except ImportError:
_create_profile_fallback(name, clone_from, clone_config)
# Resolve the profile directory from the profile list when possible.
# hermes_cli and the webui runtime do not always agree on the exact root,
# so we prefer the path returned by list_profiles_api() and fall back to the
# standard profile location only if the profile cannot be found there yet.
profile_path = _DEFAULT_HERMES_HOME / 'profiles' / name
for p in list_profiles_api():
if p['name'] == name:
try:
profile_path = Path(p.get('path') or profile_path)
except Exception:
logger.debug("Failed to parse profile path")
break
profile_path.mkdir(parents=True, exist_ok=True)
_write_endpoint_to_config(profile_path, base_url=base_url, api_key=api_key)
_write_model_defaults_to_config(
profile_path,
default_model=default_model,
model_provider=model_provider,
)
# Invalidate cached root-profile-name lookup; create_profile may have added
# a new profile that flips is_default semantics on the agent side (#1612).
_invalidate_root_profile_cache()
# Find and return the newly created profile info.
# When hermes_cli is not importable, list_profiles_api() also falls back
# to the stub default-only list and won't find the new profile by name.
# In that case, return a complete profile dict directly.
for p in list_profiles_api():
if p['name'] == name:
return p
return {
'name': name,
'path': str(profile_path),
'is_default': False,
'is_active': _active_profile == name,
'gateway_running': False,
'model': None,
'provider': None,
'has_env': (profile_path / '.env').exists(),
'skill_count': 0,
}
def delete_profile_api(name: str) -> dict:
"""Delete a profile. Switches to default first if it's the active one."""
if _is_root_profile(name):
raise ValueError("Cannot delete the default profile.")
_validate_profile_name(name)
# If deleting the active profile, switch to default first
if _active_profile == name:
try:
switch_profile('default')
except RuntimeError:
raise RuntimeError(
f"Cannot delete active profile '{name}' while an agent is running. "
"Cancel or wait for it to finish."
)
try:
from hermes_cli.profiles import delete_profile
delete_profile(name, yes=True)
except ImportError:
# Manual fallback: just remove the directory
import shutil
profile_dir = _resolve_named_profile_home(name)
if profile_dir.is_dir():
shutil.rmtree(str(profile_dir))
else:
raise ValueError(f"Profile '{name}' does not exist.")
# Drop cached root-profile-name lookup — list_profiles_api() shape changed.
_invalidate_root_profile_cache()
return {'ok': True, 'name': name}