mirror of
https://github.com/nesquena/hermes-webui.git
synced 2026-05-22 18:30:28 +00:00
4a882bec66
_sessions is an in-memory dict, so every process restart (launchd bounce,
systemd restart, container recycle) invalidates all active browser sessions.
Users get 401 on every authenticated endpoint until they clear cookies.
The HMAC signing key already persists to STATE_DIR/.signing_key via atomic
owner-only write. This PR applies the same pattern to the session table:
- _load_sessions(): reads .sessions.json on module import, prunes expired
entries, tolerates missing/malformed files (returns {} on any error)
- _save_sessions(): atomic write via tempfile + os.replace(), chmod 0600,
mirrors .signing_key write pattern exactly
- create_session(): saves after inserting new token
- invalidate_session(): saves after removing token (only if token existed)
- _prune_expired_sessions(): saves only when entries are actually removed
Cookie format and signing are unchanged; existing sessions survive upgrade.
6 regression tests cover: restart survival, invalidation persistence,
expiry pruning on load, 0600 permissions, corrupt-file tolerance.
Co-authored with Claude Sonnet 4.6 / Anthropic.
259 lines
8.9 KiB
Python
259 lines
8.9 KiB
Python
"""
|
|
Hermes Web UI -- Optional password authentication.
|
|
Off by default. Enable by setting HERMES_WEBUI_PASSWORD env var
|
|
or configuring a password in the Settings panel.
|
|
"""
|
|
import hashlib
|
|
import hmac
|
|
import http.cookies
|
|
import json
|
|
import logging
|
|
import os
|
|
import secrets
|
|
import tempfile
|
|
import time
|
|
|
|
from api.config import STATE_DIR, load_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ── Public paths (no auth required) ─────────────────────────────────────────
|
|
PUBLIC_PATHS = frozenset({
|
|
'/login', '/health', '/favicon.ico',
|
|
'/api/auth/login', '/api/auth/status',
|
|
})
|
|
|
|
COOKIE_NAME = 'hermes_session'
|
|
SESSION_TTL = 86400 # 24 hours
|
|
|
|
_SESSIONS_FILE = STATE_DIR / '.sessions.json'
|
|
|
|
|
|
def _load_sessions() -> dict[str, float]:
|
|
"""Load persisted sessions from STATE_DIR, pruning expired entries.
|
|
|
|
Returns an empty dict on any read or parse error so startup is never
|
|
blocked by a corrupt or missing sessions file.
|
|
"""
|
|
try:
|
|
if _SESSIONS_FILE.exists():
|
|
data = json.loads(_SESSIONS_FILE.read_text(encoding='utf-8'))
|
|
if not isinstance(data, dict):
|
|
raise ValueError('malformed sessions file — expected dict')
|
|
now = time.time()
|
|
return {t: exp for t, exp in data.items()
|
|
if isinstance(t, str) and isinstance(exp, (int, float)) and exp > now}
|
|
except Exception as e:
|
|
logger.debug("Failed to load sessions file, starting fresh: %s", e)
|
|
return {}
|
|
|
|
|
|
def _save_sessions(sessions: dict[str, float]) -> None:
|
|
"""Atomically persist sessions to STATE_DIR/.sessions.json (0600).
|
|
|
|
Uses a temp file + os.replace() so a crash mid-write never leaves a
|
|
truncated file. Mirrors the same pattern as .signing_key persistence.
|
|
"""
|
|
try:
|
|
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
fd, tmp = tempfile.mkstemp(dir=STATE_DIR, suffix='.sessions.tmp')
|
|
try:
|
|
with os.fdopen(fd, 'w', encoding='utf-8') as f:
|
|
json.dump(sessions, f)
|
|
os.chmod(tmp, 0o600)
|
|
os.replace(tmp, _SESSIONS_FILE)
|
|
except Exception:
|
|
try:
|
|
os.unlink(tmp)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
except Exception as e:
|
|
logger.debug("Failed to persist sessions: %s", e)
|
|
|
|
|
|
# Active sessions: token -> expiry timestamp (persisted across restarts via STATE_DIR)
|
|
_sessions = _load_sessions()
|
|
|
|
# ── Login rate limiter ──────────────────────────────────────────────────────
|
|
_login_attempts = {} # ip -> [timestamp, ...]
|
|
_LOGIN_MAX_ATTEMPTS = 5
|
|
_LOGIN_WINDOW = 60 # seconds
|
|
|
|
def _check_login_rate(ip: str) -> bool:
|
|
"""Return True if the IP is allowed to attempt login."""
|
|
now = time.time()
|
|
attempts = _login_attempts.get(ip, [])
|
|
# Prune old attempts
|
|
attempts = [t for t in attempts if now - t < _LOGIN_WINDOW]
|
|
_login_attempts[ip] = attempts
|
|
return len(attempts) < _LOGIN_MAX_ATTEMPTS
|
|
|
|
def _record_login_attempt(ip: str) -> None:
|
|
now = time.time()
|
|
attempts = _login_attempts.get(ip, [])
|
|
attempts.append(now)
|
|
_login_attempts[ip] = attempts
|
|
|
|
|
|
def _signing_key():
|
|
"""Return a random signing key, generating and persisting one on first call."""
|
|
key_file = STATE_DIR / '.signing_key'
|
|
try:
|
|
if key_file.exists():
|
|
raw = key_file.read_bytes()
|
|
if len(raw) >= 32:
|
|
return raw[:32]
|
|
except Exception:
|
|
logger.debug("Failed to read or access signing key file, using in-memory key")
|
|
# Generate a new random key
|
|
key = secrets.token_bytes(32)
|
|
try:
|
|
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
key_file.write_bytes(key)
|
|
key_file.chmod(0o600)
|
|
except Exception:
|
|
logger.debug("Failed to persist signing key, using in-memory key only")
|
|
return key
|
|
|
|
|
|
def _hash_password(password):
|
|
"""PBKDF2-SHA256 with 600k iterations (OWASP recommendation).
|
|
Salt is the persisted random signing key, which is secret and unique per
|
|
installation. This keeps the stored hash format a plain hex string
|
|
(no format change to settings.json) while replacing the predictable
|
|
STATE_DIR-derived salt from the original implementation."""
|
|
salt = _signing_key()
|
|
dk = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, 600_000)
|
|
return dk.hex()
|
|
|
|
|
|
def get_password_hash() -> str | None:
|
|
"""Return the active password hash, or None if auth is disabled.
|
|
Priority: env var > settings.json."""
|
|
env_pw = os.getenv('HERMES_WEBUI_PASSWORD', '').strip()
|
|
if env_pw:
|
|
return _hash_password(env_pw)
|
|
settings = load_settings()
|
|
return settings.get('password_hash') or None
|
|
|
|
|
|
def is_auth_enabled() -> bool:
|
|
"""True if a password is configured (env var or settings)."""
|
|
return get_password_hash() is not None
|
|
|
|
|
|
def verify_password(plain) -> bool:
|
|
"""Verify a plaintext password against the stored hash."""
|
|
expected = get_password_hash()
|
|
if not expected:
|
|
return False
|
|
return hmac.compare_digest(_hash_password(plain), expected)
|
|
|
|
|
|
def create_session() -> str:
|
|
"""Create a new auth session. Returns signed cookie value."""
|
|
token = secrets.token_hex(32)
|
|
_sessions[token] = time.time() + SESSION_TTL
|
|
_save_sessions(_sessions)
|
|
sig = hmac.new(_signing_key(), token.encode(), hashlib.sha256).hexdigest()[:32]
|
|
return f"{token}.{sig}"
|
|
|
|
|
|
def _prune_expired_sessions():
|
|
"""Remove all expired session entries to prevent unbounded memory growth."""
|
|
now = time.time()
|
|
expired = [t for t, exp in _sessions.items() if now > exp]
|
|
if expired:
|
|
for token in expired:
|
|
_sessions.pop(token, None)
|
|
_save_sessions(_sessions)
|
|
|
|
|
|
def verify_session(cookie_value) -> bool:
|
|
"""Verify a signed session cookie. Returns True if valid and not expired."""
|
|
if not cookie_value or '.' not in cookie_value:
|
|
return False
|
|
_prune_expired_sessions() # lazy cleanup on every verification attempt
|
|
token, sig = cookie_value.rsplit('.', 1)
|
|
expected_sig = hmac.new(_signing_key(), token.encode(), hashlib.sha256).hexdigest()[:32]
|
|
if not hmac.compare_digest(sig, expected_sig):
|
|
return False
|
|
expiry = _sessions.get(token)
|
|
if not expiry or time.time() > expiry:
|
|
_sessions.pop(token, None)
|
|
return False
|
|
return True
|
|
|
|
|
|
def invalidate_session(cookie_value) -> None:
|
|
"""Remove a session token."""
|
|
if cookie_value and '.' in cookie_value:
|
|
token = cookie_value.rsplit('.', 1)[0]
|
|
if token in _sessions:
|
|
_sessions.pop(token, None)
|
|
_save_sessions(_sessions)
|
|
|
|
|
|
def parse_cookie(handler) -> str | None:
|
|
"""Extract the auth cookie from the request headers."""
|
|
cookie_header = handler.headers.get('Cookie', '')
|
|
if not cookie_header:
|
|
return None
|
|
cookie = http.cookies.SimpleCookie()
|
|
try:
|
|
cookie.load(cookie_header)
|
|
except http.cookies.CookieError:
|
|
return None
|
|
morsel = cookie.get(COOKIE_NAME)
|
|
return morsel.value if morsel else None
|
|
|
|
|
|
def check_auth(handler, parsed) -> bool:
|
|
"""Check if request is authorized. Returns True if OK.
|
|
If not authorized, sends 401 (API) or 302 redirect (page) and returns False."""
|
|
if not is_auth_enabled():
|
|
return True
|
|
# Public paths don't require auth
|
|
if parsed.path in PUBLIC_PATHS or parsed.path.startswith('/static/'):
|
|
return True
|
|
# Check session cookie
|
|
cookie_val = parse_cookie(handler)
|
|
if cookie_val and verify_session(cookie_val):
|
|
return True
|
|
# Not authorized
|
|
if parsed.path.startswith('/api/'):
|
|
handler.send_response(401)
|
|
handler.send_header('Content-Type', 'application/json')
|
|
handler.end_headers()
|
|
handler.wfile.write(b'{"error":"Authentication required"}')
|
|
else:
|
|
handler.send_response(302)
|
|
handler.send_header('Location', '/login')
|
|
handler.end_headers()
|
|
return False
|
|
|
|
|
|
def set_auth_cookie(handler, cookie_value) -> None:
|
|
"""Set the auth cookie on the response."""
|
|
cookie = http.cookies.SimpleCookie()
|
|
cookie[COOKIE_NAME] = cookie_value
|
|
cookie[COOKIE_NAME]['httponly'] = True
|
|
cookie[COOKIE_NAME]['samesite'] = 'Lax'
|
|
cookie[COOKIE_NAME]['path'] = '/'
|
|
cookie[COOKIE_NAME]['max-age'] = str(SESSION_TTL)
|
|
# Set Secure flag when connection is HTTPS
|
|
if getattr(handler.request, 'getpeercert', None) is not None or handler.headers.get('X-Forwarded-Proto', '') == 'https':
|
|
cookie[COOKIE_NAME]['secure'] = True
|
|
handler.send_header('Set-Cookie', cookie[COOKIE_NAME].OutputString())
|
|
|
|
|
|
def clear_auth_cookie(handler) -> None:
|
|
"""Clear the auth cookie on the response."""
|
|
cookie = http.cookies.SimpleCookie()
|
|
cookie[COOKIE_NAME] = ''
|
|
cookie[COOKIE_NAME]['httponly'] = True
|
|
cookie[COOKIE_NAME]['path'] = '/'
|
|
cookie[COOKIE_NAME]['max-age'] = '0'
|
|
handler.send_header('Set-Cookie', cookie[COOKIE_NAME].OutputString())
|