Files
hermes-webui/api/auth.py
T
2026-05-18 07:14:26 +08:00

525 lines
20 KiB
Python

"""
Hermes Web UI -- Optional password authentication.
Off by default. Enable by setting HERMES_WEBUI_PASSWORD env var
or configuring a password in the Settings panel.
"""
import hashlib
import hmac
import http.cookies
import json
import logging
import os
import secrets
import tempfile
import threading
import time
from api.config import STATE_DIR, load_settings
logger = logging.getLogger(__name__)
# Default session TTL — 30 days. Kept as a module-level constant for backwards
# compatibility with downstream code and regression tests that import it.
# At runtime, prefer ``_resolve_session_ttl()`` which honours the env var and
# settings.json overrides; this constant is the floor / fallback.
SESSION_TTL = 86400 * 30 # 30 days
def _resolve_session_ttl() -> int:
"""Resolve session TTL from env > settings > default.
Priority mirrors get_password_hash(): HERMES_WEBUI_SESSION_TTL env var
first, then settings.json, falling back to ``SESSION_TTL`` (30 days).
Clamped to [60s, 1 year] to prevent runaway cookies or self-lockout.
"""
env_v = os.getenv('HERMES_WEBUI_SESSION_TTL', '').strip()
if env_v.isdigit():
val = int(env_v)
if 60 <= val <= 86400 * 365:
return val
s = load_settings()
v = s.get('session_ttl_seconds')
if isinstance(v, int) and 60 <= v <= 86400 * 365:
return v
return SESSION_TTL
# ── Public paths (no auth required) ─────────────────────────────────────────
PUBLIC_PATHS = frozenset({
'/login', '/health', '/favicon.ico', '/sw.js',
'/api/auth/login', '/api/auth/status',
'/manifest.json', '/manifest.webmanifest',
'/session/manifest.json', '/session/manifest.webmanifest',
})
COOKIE_NAME = 'hermes_session'
CSRF_HEADER_NAME = 'X-Hermes-CSRF-Token'
_SESSIONS_FILE = STATE_DIR / '.sessions.json'
def _load_sessions() -> dict[str, float]:
"""Load persisted sessions from STATE_DIR, pruning expired entries.
Returns an empty dict on any read or parse error so startup is never
blocked by a corrupt or missing sessions file.
"""
try:
if _SESSIONS_FILE.exists():
data = json.loads(_SESSIONS_FILE.read_text(encoding='utf-8'))
if not isinstance(data, dict):
raise ValueError('malformed sessions file — expected dict')
now = time.time()
return {t: exp for t, exp in data.items()
if isinstance(t, str) and isinstance(exp, (int, float)) and exp > now}
except Exception as e:
logger.debug("Failed to load sessions file, starting fresh: %s", e)
return {}
def _save_sessions(sessions: dict[str, float]) -> None:
"""Atomically persist sessions to STATE_DIR/.sessions.json (0600).
Uses a temp file + os.replace() so a crash mid-write never leaves a
truncated file. Mirrors the same pattern as .signing_key persistence.
"""
try:
STATE_DIR.mkdir(parents=True, exist_ok=True)
fd, tmp = tempfile.mkstemp(dir=STATE_DIR, suffix='.sessions.tmp')
try:
with os.fdopen(fd, 'w', encoding='utf-8') as f:
json.dump(sessions, f)
os.chmod(tmp, 0o600)
os.replace(tmp, _SESSIONS_FILE)
except Exception:
try:
os.unlink(tmp)
except OSError:
pass
raise
except Exception as e:
logger.debug("Failed to persist sessions: %s", e)
# Active sessions: token -> expiry timestamp (persisted across restarts via STATE_DIR)
_sessions = _load_sessions()
# ── Login rate limiter ──────────────────────────────────────────────────────
_LOGIN_ATTEMPTS_FILE = STATE_DIR / '.login_attempts.json'
_LOGIN_MAX_ATTEMPTS = 5
_LOGIN_WINDOW = 60 # seconds
def _load_login_attempts() -> dict[str, list[float]]:
"""Load persisted login attempts from STATE_DIR, pruning expired entries."""
try:
if _LOGIN_ATTEMPTS_FILE.exists():
data = json.loads(_LOGIN_ATTEMPTS_FILE.read_text(encoding='utf-8'))
if not isinstance(data, dict):
raise ValueError('malformed login-attempts file — expected dict')
now = time.time()
attempts: dict[str, list[float]] = {}
for ip, raw_times in data.items():
if not isinstance(ip, str) or not isinstance(raw_times, list):
continue
fresh = [
float(t)
for t in raw_times
if isinstance(t, (int, float)) and now - float(t) < _LOGIN_WINDOW
]
if fresh:
attempts[ip] = fresh
return attempts
except Exception as e:
logger.debug("Failed to load login attempts file, starting fresh: %s", e)
return {}
def _save_login_attempts(attempts: dict[str, list[float]]) -> None:
"""Atomically persist login attempts to STATE_DIR/.login_attempts.json (0600)."""
try:
_LOGIN_ATTEMPTS_FILE.parent.mkdir(parents=True, exist_ok=True)
fd, tmp = tempfile.mkstemp(dir=_LOGIN_ATTEMPTS_FILE.parent, suffix='.login_attempts.tmp')
try:
with os.fdopen(fd, 'w', encoding='utf-8') as f:
json.dump(attempts, f)
os.chmod(tmp, 0o600)
os.replace(tmp, _LOGIN_ATTEMPTS_FILE)
except Exception:
try:
os.unlink(tmp)
except OSError:
pass
raise
except Exception as e:
logger.debug("Failed to persist login attempts: %s", e)
_login_attempts = _load_login_attempts() # ip -> [timestamp, ...]
_LOGIN_ATTEMPTS_LOCK = threading.Lock()
def _check_login_rate(ip: str) -> bool:
"""Return True if the IP is allowed to attempt login (thread-safe)."""
with _LOGIN_ATTEMPTS_LOCK:
now = time.time()
attempts = _login_attempts.get(ip, [])
# Prune old attempts
attempts = [t for t in attempts if now - t < _LOGIN_WINDOW]
if attempts:
_login_attempts[ip] = attempts
else:
_login_attempts.pop(ip, None)
_save_login_attempts(_login_attempts)
return len(attempts) < _LOGIN_MAX_ATTEMPTS
def _record_login_attempt(ip: str) -> None:
"""Record a login attempt for rate limiting (thread-safe)."""
with _LOGIN_ATTEMPTS_LOCK:
now = time.time()
attempts = _login_attempts.get(ip, [])
attempts.append(now)
_login_attempts[ip] = attempts
_save_login_attempts(_login_attempts)
def _load_key(filename: str) -> bytes:
"""Load a 32-byte key from STATE_DIR, generating and persisting one if missing."""
key_file = STATE_DIR / filename
try:
if key_file.exists():
raw = key_file.read_bytes()
if len(raw) >= 32:
return raw[:32]
except OSError:
logger.debug("Failed to read key %s", filename)
key = secrets.token_bytes(32)
try:
STATE_DIR.mkdir(parents=True, exist_ok=True)
key_file.write_bytes(key)
key_file.chmod(0o600)
except OSError:
logger.debug("Failed to persist key %s", filename)
return key
_PBKDF2_KEY_CACHE: bytes | None = None
_SIGNING_KEY_CACHE: bytes | None = None
def _pbkdf2_key() -> bytes:
global _PBKDF2_KEY_CACHE
if _PBKDF2_KEY_CACHE is None:
_PBKDF2_KEY_CACHE = _load_key('.pbkdf2_key')
return _PBKDF2_KEY_CACHE
def _signing_key() -> bytes:
global _SIGNING_KEY_CACHE
if _SIGNING_KEY_CACHE is None:
_SIGNING_KEY_CACHE = _load_key('.signing_key')
return _SIGNING_KEY_CACHE
def _hash_password(password, *, salt: bytes | None = None) -> str:
"""PBKDF2-SHA256 with 600k iterations (OWASP recommendation).
Salt is the persisted PBKDF2 key, which is secret and unique per
installation. This keeps the stored hash format a plain hex string
(no format change to settings.json) while replacing the predictable
STATE_DIR-derived salt from the original implementation.
The *salt* parameter exists solely to support transparent migration
of password hashes that were computed with a different key (e.g. the
old `.signing_key`). Normal callers should never pass it.
"""
if salt is None:
salt = _pbkdf2_key()
dk = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, 600_000)
return dk.hex()
_AUTH_HASH_LOCK = threading.Lock()
_AUTH_HASH_COMPUTED: bool = False
_AUTH_HASH_CACHE: str | None = None
def _invalidate_password_hash_cache() -> None:
"""Invalidate the in-process password hash cache so the next call to
get_password_hash() re-reads from settings.json or the env var."""
global _AUTH_HASH_COMPUTED, _AUTH_HASH_CACHE
with _AUTH_HASH_LOCK:
_AUTH_HASH_COMPUTED = False
_AUTH_HASH_CACHE = None
def get_password_hash() -> str | None:
"""Return the active password hash, or None if auth is disabled.
Priority: env var > settings.json.
The hash is computed once and cached for the lifetime of the process.
PBKDF2-600k takes ~1 s and is called on nearly every HTTP request via
check_auth → is_auth_enabled, so caching avoids wasting a full second
of CPU per request after the first one.
Thread-safe: double-checked locking ensures that under a burst of
concurrent requests only one thread computes PBKDF2, while the fast
path (after initialisation) requires zero locks.
"""
global _AUTH_HASH_COMPUTED, _AUTH_HASH_CACHE
# Fast path — no lock needed once cache is populated.
if _AUTH_HASH_COMPUTED:
return _AUTH_HASH_CACHE
with _AUTH_HASH_LOCK:
# Re-check inside lock — another thread may have populated while
# we were waiting to acquire.
if _AUTH_HASH_COMPUTED:
return _AUTH_HASH_CACHE
env_pw = os.getenv('HERMES_WEBUI_PASSWORD', '').strip()
if env_pw:
result = _hash_password(env_pw)
else:
result = load_settings().get('password_hash') or None
_AUTH_HASH_CACHE = result
_AUTH_HASH_COMPUTED = True
return result
def is_auth_enabled() -> bool:
"""True if a password is configured (env var or settings)."""
return get_password_hash() is not None
def verify_password(plain: str) -> bool:
"""Verify a plaintext password against the stored hash.
Supports transparent migration of password hashes that were computed
with the old `.signing_key` salt. When the two keys differ and the
legacy-salted hash matches, the password is transparently re-hashed
with the current `.pbkdf2_key` and persisted to settings.json.
"""
expected = get_password_hash()
if not expected:
return False
# Fast path: current PBKDF2 key
if hmac.compare_digest(_hash_password(plain), expected):
return True
# Migration: some hashes were computed with `.signing_key` before the
# PBKDF2 key was separated. Try the legacy salt; if it matches,
# transparently upgrade so the next login uses the fast path.
legacy_salt = _signing_key()
current_salt = _pbkdf2_key()
if legacy_salt != current_salt:
if hmac.compare_digest(_hash_password(plain, salt=legacy_salt), expected):
from api.config import save_settings
save_settings({'_set_password': plain})
# Password re-hashed and persisted to disk using the current salt.
# Cache invalidation is handled by fix 2/3 (#2192) which adds the
# _invalidate_password_hash_cache() call inside save_settings().
return True
return False
def create_session() -> str:
"""Create a new auth session. Returns signed cookie value."""
token = secrets.token_hex(32)
_sessions[token] = time.time() + _resolve_session_ttl()
_save_sessions(_sessions)
sig = hmac.new(_signing_key(), token.encode(), hashlib.sha256).hexdigest()
return f"{token}.{sig}"
def _prune_expired_sessions():
"""Remove all expired session entries to prevent unbounded memory growth."""
now = time.time()
expired = [t for t, exp in _sessions.items() if now > exp]
if expired:
for token in expired:
_sessions.pop(token, None)
_save_sessions(_sessions)
def verify_session(cookie_value: str) -> bool:
"""Verify a signed session cookie. Returns True if valid and not expired."""
if not cookie_value or '.' not in cookie_value:
return False
_prune_expired_sessions() # lazy cleanup on every verification attempt
token, sig = cookie_value.rsplit('.', 1)
full_sig = hmac.new(_signing_key(), token.encode(), hashlib.sha256).hexdigest()
# Accept both new (64-char) and legacy (32-char truncated) signatures so
# existing sessions survive the upgrade without a forced global logout.
# The legacy branch can be removed once session TTLs have expired (~30 days).
valid = hmac.compare_digest(sig, full_sig) or (
len(sig) == 32 and hmac.compare_digest(sig, full_sig[:32])
)
if not valid:
return False
expiry = _sessions.get(token)
if not expiry or time.time() > expiry:
_sessions.pop(token, None)
return False
return True
def _session_token_from_cookie_value(cookie_value: str) -> str | None:
"""Return the raw server-side session token from a signed cookie value."""
if not cookie_value or '.' not in cookie_value:
return None
token, _sig = cookie_value.rsplit('.', 1)
return token or None
def csrf_token_for_session(cookie_value: str) -> str | None:
"""Return the CSRF token bound to an authenticated WebUI session.
The browser can read this token from the authenticated shell and echoes it
in ``X-Hermes-CSRF-Token`` on unsafe API requests. The token is derived
from the HttpOnly session cookie's server-side token, so it automatically
rotates on login and is invalidated when the auth session expires or logs
out. Callers must still verify the auth session before trusting it.
"""
token = _session_token_from_cookie_value(cookie_value)
if not token:
return None
return hmac.new(_signing_key(), f"csrf:{token}".encode(), hashlib.sha256).hexdigest()
def verify_csrf_token(cookie_value: str, csrf_token: str) -> bool:
"""Verify a submitted CSRF token against the authenticated session."""
if not cookie_value or not csrf_token or not verify_session(cookie_value):
return False
expected = csrf_token_for_session(cookie_value)
return bool(expected and hmac.compare_digest(str(csrf_token), expected))
def invalidate_session(cookie_value) -> None:
"""Remove a session token."""
if cookie_value and '.' in cookie_value:
token = cookie_value.rsplit('.', 1)[0]
if token in _sessions:
_sessions.pop(token, None)
_save_sessions(_sessions)
def parse_cookie(handler) -> str | None:
"""Extract the auth cookie from the request headers."""
cookie_header = handler.headers.get('Cookie', '')
if not cookie_header:
return None
cookie = http.cookies.SimpleCookie()
try:
cookie.load(cookie_header)
except http.cookies.CookieError:
return None
morsel = cookie.get(COOKIE_NAME)
return morsel.value if morsel else None
def check_auth(handler, parsed) -> bool:
"""Check if request is authorized. Returns True if OK.
If not authorized, sends 401 (API) or 302 redirect (page) and returns False."""
if not is_auth_enabled():
return True
# Public paths don't require auth
if parsed.path in PUBLIC_PATHS or parsed.path.startswith('/static/') or parsed.path.startswith('/session/static/'):
return True
# Check session cookie
cookie_val = parse_cookie(handler)
if cookie_val and verify_session(cookie_val):
return True
# Not authorized
if parsed.path.startswith('/api/'):
handler.send_response(401)
handler.send_header('Content-Type', 'application/json')
handler.end_headers()
handler.wfile.write(b'{"error":"Authentication required"}')
else:
handler.send_response(302)
# Pass the original path as ?next= so login.js redirects back after auth.
# SECURITY/CORRECTNESS: the inner `?` and `&` MUST be percent-encoded
# when stuffed into the outer `?next=` parameter, otherwise:
# (a) multi-param query strings get truncated at the first inner `&`
# (e.g. `/api/sessions?limit=50&offset=0` would round-trip as
# just `/api/sessions?limit=50` after the browser parses the
# outer URL — `offset=0` becomes a separate top-level query
# parameter that the login page ignores).
# (b) attacker-controlled paths could inject a second `next=`
# parameter; per RFC 3986 the duplicate behaviour is undefined
# and parsers diverge (Python's parse_qs returns last-match,
# URLSearchParams returns first-match), opening a query-pollution
# footgun even though _safeNextPath() rejects most malicious
# shapes downstream.
# Encoding the entire `path?query` blob with quote(safe='/') turns
# `?` → `%3F` and `&` → `%26`, so the outer parameter holds exactly
# one path-with-query string and `searchParams.get('next')` returns
# the full original URL (the browser auto-decodes once).
# (Opus pre-release advisor finding for v0.50.258.)
import urllib.parse as _urlparse
_path_with_query = parsed.path or '/'
if parsed.query:
_path_with_query += '?' + parsed.query
# safe='/' keeps path separators readable; everything else (including
# `?`, `&`, `=`) gets percent-encoded.
_next = _urlparse.quote(_path_with_query, safe='/')
handler.send_header('Location', 'login?next=' + _next)
handler.end_headers()
return False
def _is_secure_context(handler=None) -> bool:
"""Return True if cookies should carry the Secure flag.
Behaviour is overridable via HERMES_WEBUI_SECURE env var for
reverse-proxy setups where TLS terminates at a frontend proxy
(nginx, Cloudflare, etc.) and Python only sees plain HTTP.
1/true/yes → force Secure on; 0/false/no → force Secure off.
When unset, fall back to heuristics: direct TLS socket (getpeercert)
or X-Forwarded-Proto header from the request.
.. warning::
The ``X-Forwarded-Proto`` header is only trustworthy when a
reverse proxy (nginx, Cloudflare, etc.) is deployed in front
of the application. Without a proxy, any client can forge the
header and cause the Secure flag to be set on plain HTTP.
"""
env = os.getenv('HERMES_WEBUI_SECURE', '').strip().lower()
if env in ('1', 'true', 'yes'):
return True
if env in ('0', 'false', 'no'):
return False
if handler is not None:
if getattr(handler.request, 'getpeercert', None) is not None:
return True
if handler.headers.get('X-Forwarded-Proto', '') == 'https':
return True
return False
def set_auth_cookie(handler, cookie_value) -> None:
"""Set the auth cookie on the response."""
cookie = http.cookies.SimpleCookie()
cookie[COOKIE_NAME] = cookie_value
cookie[COOKIE_NAME]['httponly'] = True
cookie[COOKIE_NAME]['samesite'] = 'Lax'
cookie[COOKIE_NAME]['path'] = '/'
cookie[COOKIE_NAME]['max-age'] = str(_resolve_session_ttl())
if _is_secure_context(handler):
cookie[COOKIE_NAME]['secure'] = True
handler.send_header('Set-Cookie', cookie[COOKIE_NAME].OutputString())
def clear_auth_cookie(handler) -> None:
"""Clear the auth cookie on the response."""
cookie = http.cookies.SimpleCookie()
cookie[COOKIE_NAME] = ''
cookie[COOKIE_NAME]['httponly'] = True
cookie[COOKIE_NAME]['path'] = '/'
cookie[COOKIE_NAME]['max-age'] = '0'
handler.send_header('Set-Cookie', cookie[COOKIE_NAME].OutputString())