""" Hermes Web UI -- Optional password authentication. Off by default. Enable by setting HERMES_WEBUI_PASSWORD env var or configuring a password in the Settings panel. """ import hashlib import hmac import http.cookies import json import logging import os import secrets import tempfile import threading import time from api.config import STATE_DIR, load_settings logger = logging.getLogger(__name__) # Default session TTL — 30 days. Kept as a module-level constant for backwards # compatibility with downstream code and regression tests that import it. # At runtime, prefer ``_resolve_session_ttl()`` which honours the env var and # settings.json overrides; this constant is the floor / fallback. SESSION_TTL = 86400 * 30 # 30 days def _resolve_session_ttl() -> int: """Resolve session TTL from env > settings > default. Priority mirrors get_password_hash(): HERMES_WEBUI_SESSION_TTL env var first, then settings.json, falling back to ``SESSION_TTL`` (30 days). Clamped to [60s, 1 year] to prevent runaway cookies or self-lockout. """ env_v = os.getenv('HERMES_WEBUI_SESSION_TTL', '').strip() if env_v.isdigit(): val = int(env_v) if 60 <= val <= 86400 * 365: return val s = load_settings() v = s.get('session_ttl_seconds') if isinstance(v, int) and 60 <= v <= 86400 * 365: return v return SESSION_TTL # ── Public paths (no auth required) ───────────────────────────────────────── PUBLIC_PATHS = frozenset({ '/login', '/health', '/favicon.ico', '/sw.js', '/api/auth/login', '/api/auth/status', '/manifest.json', '/manifest.webmanifest', '/session/manifest.json', '/session/manifest.webmanifest', }) COOKIE_NAME = 'hermes_session' CSRF_HEADER_NAME = 'X-Hermes-CSRF-Token' _SESSIONS_FILE = STATE_DIR / '.sessions.json' def _load_sessions() -> dict[str, float]: """Load persisted sessions from STATE_DIR, pruning expired entries. Returns an empty dict on any read or parse error so startup is never blocked by a corrupt or missing sessions file. """ try: if _SESSIONS_FILE.exists(): data = json.loads(_SESSIONS_FILE.read_text(encoding='utf-8')) if not isinstance(data, dict): raise ValueError('malformed sessions file — expected dict') now = time.time() return {t: exp for t, exp in data.items() if isinstance(t, str) and isinstance(exp, (int, float)) and exp > now} except Exception as e: logger.debug("Failed to load sessions file, starting fresh: %s", e) return {} def _save_sessions(sessions: dict[str, float]) -> None: """Atomically persist sessions to STATE_DIR/.sessions.json (0600). Uses a temp file + os.replace() so a crash mid-write never leaves a truncated file. Mirrors the same pattern as .signing_key persistence. """ try: STATE_DIR.mkdir(parents=True, exist_ok=True) fd, tmp = tempfile.mkstemp(dir=STATE_DIR, suffix='.sessions.tmp') try: with os.fdopen(fd, 'w', encoding='utf-8') as f: json.dump(sessions, f) os.chmod(tmp, 0o600) os.replace(tmp, _SESSIONS_FILE) except Exception: try: os.unlink(tmp) except OSError: pass raise except Exception as e: logger.debug("Failed to persist sessions: %s", e) # Active sessions: token -> expiry timestamp (persisted across restarts via STATE_DIR) _sessions = _load_sessions() # ── Login rate limiter ────────────────────────────────────────────────────── _LOGIN_ATTEMPTS_FILE = STATE_DIR / '.login_attempts.json' _LOGIN_MAX_ATTEMPTS = 5 _LOGIN_WINDOW = 60 # seconds def _load_login_attempts() -> dict[str, list[float]]: """Load persisted login attempts from STATE_DIR, pruning expired entries.""" try: if _LOGIN_ATTEMPTS_FILE.exists(): data = json.loads(_LOGIN_ATTEMPTS_FILE.read_text(encoding='utf-8')) if not isinstance(data, dict): raise ValueError('malformed login-attempts file — expected dict') now = time.time() attempts: dict[str, list[float]] = {} for ip, raw_times in data.items(): if not isinstance(ip, str) or not isinstance(raw_times, list): continue fresh = [ float(t) for t in raw_times if isinstance(t, (int, float)) and now - float(t) < _LOGIN_WINDOW ] if fresh: attempts[ip] = fresh return attempts except Exception as e: logger.debug("Failed to load login attempts file, starting fresh: %s", e) return {} def _save_login_attempts(attempts: dict[str, list[float]]) -> None: """Atomically persist login attempts to STATE_DIR/.login_attempts.json (0600).""" try: _LOGIN_ATTEMPTS_FILE.parent.mkdir(parents=True, exist_ok=True) fd, tmp = tempfile.mkstemp(dir=_LOGIN_ATTEMPTS_FILE.parent, suffix='.login_attempts.tmp') try: with os.fdopen(fd, 'w', encoding='utf-8') as f: json.dump(attempts, f) os.chmod(tmp, 0o600) os.replace(tmp, _LOGIN_ATTEMPTS_FILE) except Exception: try: os.unlink(tmp) except OSError: pass raise except Exception as e: logger.debug("Failed to persist login attempts: %s", e) _login_attempts = _load_login_attempts() # ip -> [timestamp, ...] _LOGIN_ATTEMPTS_LOCK = threading.Lock() def _check_login_rate(ip: str) -> bool: """Return True if the IP is allowed to attempt login (thread-safe).""" with _LOGIN_ATTEMPTS_LOCK: now = time.time() attempts = _login_attempts.get(ip, []) # Prune old attempts attempts = [t for t in attempts if now - t < _LOGIN_WINDOW] if attempts: _login_attempts[ip] = attempts else: _login_attempts.pop(ip, None) _save_login_attempts(_login_attempts) return len(attempts) < _LOGIN_MAX_ATTEMPTS def _record_login_attempt(ip: str) -> None: """Record a login attempt for rate limiting (thread-safe).""" with _LOGIN_ATTEMPTS_LOCK: now = time.time() attempts = _login_attempts.get(ip, []) attempts.append(now) _login_attempts[ip] = attempts _save_login_attempts(_login_attempts) def _load_key(filename: str) -> bytes: """Load a 32-byte key from STATE_DIR, generating and persisting one if missing.""" key_file = STATE_DIR / filename try: if key_file.exists(): raw = key_file.read_bytes() if len(raw) >= 32: return raw[:32] except OSError: logger.debug("Failed to read key %s", filename) key = secrets.token_bytes(32) try: STATE_DIR.mkdir(parents=True, exist_ok=True) key_file.write_bytes(key) key_file.chmod(0o600) except OSError: logger.debug("Failed to persist key %s", filename) return key _PBKDF2_KEY_CACHE: bytes | None = None _SIGNING_KEY_CACHE: bytes | None = None def _pbkdf2_key() -> bytes: global _PBKDF2_KEY_CACHE if _PBKDF2_KEY_CACHE is None: _PBKDF2_KEY_CACHE = _load_key('.pbkdf2_key') return _PBKDF2_KEY_CACHE def _signing_key() -> bytes: global _SIGNING_KEY_CACHE if _SIGNING_KEY_CACHE is None: _SIGNING_KEY_CACHE = _load_key('.signing_key') return _SIGNING_KEY_CACHE def _hash_password(password, *, salt: bytes | None = None) -> str: """PBKDF2-SHA256 with 600k iterations (OWASP recommendation). Salt is the persisted PBKDF2 key, which is secret and unique per installation. This keeps the stored hash format a plain hex string (no format change to settings.json) while replacing the predictable STATE_DIR-derived salt from the original implementation. The *salt* parameter exists solely to support transparent migration of password hashes that were computed with a different key (e.g. the old `.signing_key`). Normal callers should never pass it. """ if salt is None: salt = _pbkdf2_key() dk = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, 600_000) return dk.hex() _AUTH_HASH_LOCK = threading.Lock() _AUTH_HASH_COMPUTED: bool = False _AUTH_HASH_CACHE: str | None = None def _invalidate_password_hash_cache() -> None: """Invalidate the in-process password hash cache so the next call to get_password_hash() re-reads from settings.json or the env var.""" global _AUTH_HASH_COMPUTED, _AUTH_HASH_CACHE with _AUTH_HASH_LOCK: _AUTH_HASH_COMPUTED = False _AUTH_HASH_CACHE = None def get_password_hash() -> str | None: """Return the active password hash, or None if auth is disabled. Priority: env var > settings.json. The hash is computed once and cached for the lifetime of the process. PBKDF2-600k takes ~1 s and is called on nearly every HTTP request via check_auth → is_auth_enabled, so caching avoids wasting a full second of CPU per request after the first one. Thread-safe: double-checked locking ensures that under a burst of concurrent requests only one thread computes PBKDF2, while the fast path (after initialisation) requires zero locks. """ global _AUTH_HASH_COMPUTED, _AUTH_HASH_CACHE # Fast path — no lock needed once cache is populated. if _AUTH_HASH_COMPUTED: return _AUTH_HASH_CACHE with _AUTH_HASH_LOCK: # Re-check inside lock — another thread may have populated while # we were waiting to acquire. if _AUTH_HASH_COMPUTED: return _AUTH_HASH_CACHE env_pw = os.getenv('HERMES_WEBUI_PASSWORD', '').strip() if env_pw: result = _hash_password(env_pw) else: result = load_settings().get('password_hash') or None _AUTH_HASH_CACHE = result _AUTH_HASH_COMPUTED = True return result def is_auth_enabled() -> bool: """True if a password is configured (env var or settings).""" return get_password_hash() is not None def verify_password(plain: str) -> bool: """Verify a plaintext password against the stored hash. Supports transparent migration of password hashes that were computed with the old `.signing_key` salt. When the two keys differ and the legacy-salted hash matches, the password is transparently re-hashed with the current `.pbkdf2_key` and persisted to settings.json. """ expected = get_password_hash() if not expected: return False # Fast path: current PBKDF2 key if hmac.compare_digest(_hash_password(plain), expected): return True # Migration: some hashes were computed with `.signing_key` before the # PBKDF2 key was separated. Try the legacy salt; if it matches, # transparently upgrade so the next login uses the fast path. legacy_salt = _signing_key() current_salt = _pbkdf2_key() if legacy_salt != current_salt: if hmac.compare_digest(_hash_password(plain, salt=legacy_salt), expected): from api.config import save_settings save_settings({'_set_password': plain}) # Password re-hashed and persisted to disk using the current salt. # Cache invalidation is handled by fix 2/3 (#2192) which adds the # _invalidate_password_hash_cache() call inside save_settings(). return True return False def create_session() -> str: """Create a new auth session. Returns signed cookie value.""" token = secrets.token_hex(32) _sessions[token] = time.time() + _resolve_session_ttl() _save_sessions(_sessions) sig = hmac.new(_signing_key(), token.encode(), hashlib.sha256).hexdigest() return f"{token}.{sig}" def _prune_expired_sessions(): """Remove all expired session entries to prevent unbounded memory growth.""" now = time.time() expired = [t for t, exp in _sessions.items() if now > exp] if expired: for token in expired: _sessions.pop(token, None) _save_sessions(_sessions) def verify_session(cookie_value: str) -> bool: """Verify a signed session cookie. Returns True if valid and not expired.""" if not cookie_value or '.' not in cookie_value: return False _prune_expired_sessions() # lazy cleanup on every verification attempt token, sig = cookie_value.rsplit('.', 1) full_sig = hmac.new(_signing_key(), token.encode(), hashlib.sha256).hexdigest() # Accept both new (64-char) and legacy (32-char truncated) signatures so # existing sessions survive the upgrade without a forced global logout. # The legacy branch can be removed once session TTLs have expired (~30 days). valid = hmac.compare_digest(sig, full_sig) or ( len(sig) == 32 and hmac.compare_digest(sig, full_sig[:32]) ) if not valid: return False expiry = _sessions.get(token) if not expiry or time.time() > expiry: _sessions.pop(token, None) return False return True def _session_token_from_cookie_value(cookie_value: str) -> str | None: """Return the raw server-side session token from a signed cookie value.""" if not cookie_value or '.' not in cookie_value: return None token, _sig = cookie_value.rsplit('.', 1) return token or None def csrf_token_for_session(cookie_value: str) -> str | None: """Return the CSRF token bound to an authenticated WebUI session. The browser can read this token from the authenticated shell and echoes it in ``X-Hermes-CSRF-Token`` on unsafe API requests. The token is derived from the HttpOnly session cookie's server-side token, so it automatically rotates on login and is invalidated when the auth session expires or logs out. Callers must still verify the auth session before trusting it. """ token = _session_token_from_cookie_value(cookie_value) if not token: return None return hmac.new(_signing_key(), f"csrf:{token}".encode(), hashlib.sha256).hexdigest() def verify_csrf_token(cookie_value: str, csrf_token: str) -> bool: """Verify a submitted CSRF token against the authenticated session.""" if not cookie_value or not csrf_token or not verify_session(cookie_value): return False expected = csrf_token_for_session(cookie_value) return bool(expected and hmac.compare_digest(str(csrf_token), expected)) def invalidate_session(cookie_value) -> None: """Remove a session token.""" if cookie_value and '.' in cookie_value: token = cookie_value.rsplit('.', 1)[0] if token in _sessions: _sessions.pop(token, None) _save_sessions(_sessions) def parse_cookie(handler) -> str | None: """Extract the auth cookie from the request headers.""" cookie_header = handler.headers.get('Cookie', '') if not cookie_header: return None cookie = http.cookies.SimpleCookie() try: cookie.load(cookie_header) except http.cookies.CookieError: return None morsel = cookie.get(COOKIE_NAME) return morsel.value if morsel else None def check_auth(handler, parsed) -> bool: """Check if request is authorized. Returns True if OK. If not authorized, sends 401 (API) or 302 redirect (page) and returns False.""" if not is_auth_enabled(): return True # Public paths don't require auth if parsed.path in PUBLIC_PATHS or parsed.path.startswith('/static/') or parsed.path.startswith('/session/static/'): return True # Check session cookie cookie_val = parse_cookie(handler) if cookie_val and verify_session(cookie_val): return True # Not authorized if parsed.path.startswith('/api/'): handler.send_response(401) handler.send_header('Content-Type', 'application/json') handler.end_headers() handler.wfile.write(b'{"error":"Authentication required"}') else: handler.send_response(302) # Pass the original path as ?next= so login.js redirects back after auth. # SECURITY/CORRECTNESS: the inner `?` and `&` MUST be percent-encoded # when stuffed into the outer `?next=` parameter, otherwise: # (a) multi-param query strings get truncated at the first inner `&` # (e.g. `/api/sessions?limit=50&offset=0` would round-trip as # just `/api/sessions?limit=50` after the browser parses the # outer URL — `offset=0` becomes a separate top-level query # parameter that the login page ignores). # (b) attacker-controlled paths could inject a second `next=` # parameter; per RFC 3986 the duplicate behaviour is undefined # and parsers diverge (Python's parse_qs returns last-match, # URLSearchParams returns first-match), opening a query-pollution # footgun even though _safeNextPath() rejects most malicious # shapes downstream. # Encoding the entire `path?query` blob with quote(safe='/') turns # `?` → `%3F` and `&` → `%26`, so the outer parameter holds exactly # one path-with-query string and `searchParams.get('next')` returns # the full original URL (the browser auto-decodes once). # (Opus pre-release advisor finding for v0.50.258.) import urllib.parse as _urlparse _path_with_query = parsed.path or '/' if parsed.query: _path_with_query += '?' + parsed.query # safe='/' keeps path separators readable; everything else (including # `?`, `&`, `=`) gets percent-encoded. _next = _urlparse.quote(_path_with_query, safe='/') handler.send_header('Location', 'login?next=' + _next) handler.end_headers() return False def _is_secure_context(handler=None) -> bool: """Return True if cookies should carry the Secure flag. Behaviour is overridable via HERMES_WEBUI_SECURE env var for reverse-proxy setups where TLS terminates at a frontend proxy (nginx, Cloudflare, etc.) and Python only sees plain HTTP. 1/true/yes → force Secure on; 0/false/no → force Secure off. When unset, fall back to heuristics: direct TLS socket (getpeercert) or X-Forwarded-Proto header from the request. .. warning:: The ``X-Forwarded-Proto`` header is only trustworthy when a reverse proxy (nginx, Cloudflare, etc.) is deployed in front of the application. Without a proxy, any client can forge the header and cause the Secure flag to be set on plain HTTP. """ env = os.getenv('HERMES_WEBUI_SECURE', '').strip().lower() if env in ('1', 'true', 'yes'): return True if env in ('0', 'false', 'no'): return False if handler is not None: if getattr(handler.request, 'getpeercert', None) is not None: return True if handler.headers.get('X-Forwarded-Proto', '') == 'https': return True return False def set_auth_cookie(handler, cookie_value) -> None: """Set the auth cookie on the response.""" cookie = http.cookies.SimpleCookie() cookie[COOKIE_NAME] = cookie_value cookie[COOKIE_NAME]['httponly'] = True cookie[COOKIE_NAME]['samesite'] = 'Lax' cookie[COOKIE_NAME]['path'] = '/' cookie[COOKIE_NAME]['max-age'] = str(_resolve_session_ttl()) if _is_secure_context(handler): cookie[COOKIE_NAME]['secure'] = True handler.send_header('Set-Cookie', cookie[COOKIE_NAME].OutputString()) def clear_auth_cookie(handler) -> None: """Clear the auth cookie on the response.""" cookie = http.cookies.SimpleCookie() cookie[COOKIE_NAME] = '' cookie[COOKIE_NAME]['httponly'] = True cookie[COOKIE_NAME]['path'] = '/' cookie[COOKIE_NAME]['max-age'] = '0' handler.send_header('Set-Cookie', cookie[COOKIE_NAME].OutputString())