Files
nesquena-hermes 5ce516ed38 v0.50.255: Opus follow-ups (4 fixes) + CHANGELOG
Opus pre-release advisor caught 4 issues in stage-255 (#1390 + #1405):

1. MUST-FIX: api/rollback.py path-traversal — _checkpoint_root() / ws_hash /
   checkpoint did NOT normalize Path() / "../escape", so an authenticated
   caller could read or restore from another allowlisted workspace via
   ../<other-ws-hash>/<sha>. New _validate_checkpoint_id() regex-guards
   with ^[A-Za-z0-9_-][A-Za-z0-9_.-]{0,63}$ and rejects . and .. literals.
   Both get_checkpoint_diff and restore_checkpoint validate.

2. SHOULD-FIX: redact_session_data perf cliff — the new api_redact_enabled
   toggle in #1405 called uncached load_settings() per string, recursed
   across messages[] and tool_calls[]. For a 50-message session: hundreds
   of disk reads per /api/session response. Now read once at the top and
   thread _enabled through via private kwarg.

3. SHOULD-FIX: voice-mode wrong-session TTS — the patched autoReadLastAssistant
   fires globally; if the user navigated to a different session between
   sending and stream completion, TTS would speak the wrong session\\s reply.
   New _voiceModeThinkingSid closure captures S.session.session_id at
   thinking-time; _speakResponse bails to _startListening() on mismatch.

4. NIT: rollback._inspect_checkpoint had bare Exception in the except tuple
   alongside specific catches, swallowing everything. Now (TimeoutExpired,
   OSError) only.

6 regression tests in test_v050255_opus_followups.py. Full suite: 3587 passed,
2 skipped, 3 xpassed.
2026-05-01 17:19:53 +00:00

321 lines
11 KiB
Python

"""
Hermes Web UI -- Filesystem checkpoint (rollback) API.
Provides endpoints to list, diff, and restore filesystem checkpoints
created by the Hermes agent's CheckpointManager. Checkpoints live at
``{hermes_home}/checkpoints/<hash>/`` as shadow git repositories.
"""
import hashlib
import json
import logging
import os
import re
import shutil
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# Checkpoint identifiers are SHA-style hex hashes from the agent's
# CheckpointManager. We only allow [A-Za-z0-9_.-]{1,64} (no '/' so the
# value cannot be a path separator, no leading '.' so it cannot escape
# upward via '..'/'.'). This is defense-in-depth: the workspace arg is
# already allowlisted, but ``Path() / "../escape"`` does not normalize,
# so without this guard a `checkpoint` value of `../<other-ws-hash>/<sha>`
# would let any authenticated caller diff or restore from another
# allowlisted workspace's checkpoint store. (Opus pre-release advisor.)
_CHECKPOINT_ID_RE = re.compile(r"^[A-Za-z0-9_-][A-Za-z0-9_.-]{0,63}$")
def _validate_checkpoint_id(checkpoint: str) -> str:
cid = str(checkpoint or "").strip()
if not cid or cid in (".", "..") or not _CHECKPOINT_ID_RE.fullmatch(cid):
raise ValueError(
"checkpoint id must match [A-Za-z0-9_-][A-Za-z0-9_.-]{0,63}"
)
return cid
def _hermes_home() -> Path:
"""Return the active Hermes home directory."""
try:
from api.profiles import get_active_hermes_home
return Path(get_active_hermes_home())
except Exception:
return Path(os.environ.get("HERMES_HOME", "~/.hermes")).expanduser()
def _workspace_hash(workspace: str) -> str:
"""Derive the checkpoint directory name from a workspace path.
Matches the agent's CheckpointManager._get_checkpoint_dir logic:
SHA-256 of the canonical workspace path.
"""
try:
canonical = os.path.realpath(workspace)
except (OSError, ValueError):
canonical = workspace
return hashlib.sha256(canonical.encode()).hexdigest()[:12]
def _checkpoint_root() -> Path:
return _hermes_home() / "checkpoints"
def _resolve_workspace(workspace: str) -> str:
"""Validate and return the canonical workspace path.
Security: workspace must match a known configured workspace
(from workspaces.json or session-attached workspaces).
"""
if not workspace or not isinstance(workspace, str):
raise ValueError("workspace is required")
# Basic path validation
resolved = os.path.realpath(workspace)
if not os.path.isdir(resolved):
raise ValueError(f"Workspace does not exist: {workspace}")
# Security: confirm workspace is in the known list
try:
from api.workspace import load_workspaces
known_paths = set()
for ws in load_workspaces():
p = ws.get("path", "")
if p:
known_paths.add(os.path.realpath(p))
if resolved not in known_paths:
raise ValueError(f"Workspace not in configured list: {workspace}")
except ImportError:
logger.warning("Could not load workspace list for rollback validation")
return resolved
def _find_git() -> str:
"""Return the path to the git binary."""
return shutil.which("git") or "git"
# ── Public API functions (called from routes.py) ────────────────────────────
def list_checkpoints(workspace: str) -> dict[str, Any]:
"""List all checkpoints for a workspace.
Returns a dict with:
checkpoints: list of checkpoint objects
workspace: resolved workspace path
checkpoint_dir: the checkpoint directory path
"""
resolved = _resolve_workspace(workspace)
ws_hash = _workspace_hash(resolved)
ckpt_dir = _checkpoint_root() / ws_hash
checkpoints = []
if not ckpt_dir.is_dir():
return {"checkpoints": [], "workspace": resolved, "checkpoint_dir": str(ckpt_dir)}
# Each checkpoint is a git repo in <ckpt_dir>/<commit_hash>/
git = _find_git()
for entry in sorted(ckpt_dir.iterdir(), key=lambda p: p.stat().st_mtime if p.is_dir() else 0, reverse=True):
if not entry.is_dir():
continue
ckpt_info = _inspect_checkpoint(entry, git)
if ckpt_info:
checkpoints.append(ckpt_info)
return {
"checkpoints": checkpoints,
"workspace": resolved,
"checkpoint_dir": str(ckpt_dir),
}
def _inspect_checkpoint(ckpt_path: Path, git: str) -> dict[str, Any] | None:
"""Extract metadata from a single checkpoint directory."""
git_dir = ckpt_path / ".git"
if not git_dir.is_dir():
return None
name = ckpt_path.name
try:
result = subprocess.run(
[git, "-C", str(ckpt_path), "log", "--format=%H%n%s%n%aI", "-1"],
capture_output=True, text=True, timeout=5,
)
if result.returncode != 0 or not result.stdout.strip():
return None
lines = result.stdout.strip().split("\n")
commit_hash = lines[0] if len(lines) > 0 else name
message = lines[1] if len(lines) > 1 else "checkpoint"
date_str = lines[2] if len(lines) > 2 else ""
# Parse date for display
date_display = ""
if date_str:
try:
dt = datetime.fromisoformat(date_str)
date_display = dt.strftime("%Y-%m-%d %H:%M")
except (ValueError, TypeError):
date_display = date_str
# Count files
files_result = subprocess.run(
[git, "-C", str(ckpt_path), "ls-files"],
capture_output=True, text=True, timeout=5,
)
file_count = len(files_result.stdout.strip().split("\n")) if files_result.stdout.strip() else 0
return {
"id": name,
"commit": commit_hash[:12],
"message": message,
"date": date_str,
"date_display": date_display,
"files": file_count,
"path": str(ckpt_path),
}
except (subprocess.TimeoutExpired, OSError) as e:
logger.debug("Failed to inspect checkpoint %s: %s", ckpt_path, e)
return None
def get_checkpoint_diff(workspace: str, checkpoint: str) -> dict[str, Any]:
"""Show the diff between a checkpoint and the current workspace state.
Returns a dict with:
diff: unified diff text
files_changed: list of changed file paths
"""
resolved = _resolve_workspace(workspace)
checkpoint = _validate_checkpoint_id(checkpoint)
ws_hash = _workspace_hash(resolved)
ckpt_dir = _checkpoint_root() / ws_hash / checkpoint
if not ckpt_dir.is_dir():
raise ValueError(f"Checkpoint not found: {checkpoint}")
git = _find_git()
# Get list of files in the checkpoint
ls_result = subprocess.run(
[git, "-C", str(ckpt_dir), "ls-files"],
capture_output=True, text=True, timeout=10,
)
if ls_result.returncode != 0:
raise ValueError("Failed to list checkpoint files")
ckpt_files = [f for f in ls_result.stdout.strip().split("\n") if f]
files_changed = []
diff_lines = []
for rel_path in ckpt_files:
ckpt_file = ckpt_dir / rel_path
ws_file = Path(resolved) / rel_path
if not ckpt_file.is_file():
continue
# Read checkpoint version
try:
ckpt_content = ckpt_file.read_text(errors="replace")
except OSError:
continue
# Read workspace version (if exists)
if ws_file.is_file():
try:
ws_content = ws_file.read_text(errors="replace")
except OSError:
ws_content = ""
else:
ws_content = None # File was deleted in workspace
if ws_content is None:
# File exists in checkpoint but not in workspace (deleted)
files_changed.append({"file": rel_path, "status": "deleted"})
diff_lines.append(f"--- a/{rel_path}")
diff_lines.append(f"+++ /dev/null")
diff_lines.append("@@ -1,{lines} +0,0 @@".format(lines=len(ckpt_content.splitlines())))
for line in ckpt_content.splitlines():
diff_lines.append(f"-{line}")
elif ckpt_content != ws_content:
# File changed
import difflib
ckpt_lines = ckpt_content.splitlines(keepends=True)
ws_lines = ws_content.splitlines(keepends=True)
diff = list(difflib.unified_diff(ckpt_lines, ws_lines, fromfile=f"a/{rel_path}", tofile=f"b/{rel_path}", lineterm=""))
if diff:
files_changed.append({"file": rel_path, "status": "modified"})
diff_lines.extend(diff)
# Check for new files in workspace that aren't in checkpoint
# (skip for performance — diff is primarily for seeing what the checkpoint captures)
return {
"checkpoint": checkpoint,
"workspace": resolved,
"diff": "\n".join(diff_lines) if diff_lines else "",
"files_changed": files_changed,
"total_changes": len(files_changed),
}
def restore_checkpoint(workspace: str, checkpoint: str) -> dict[str, Any]:
"""Restore a checkpoint by copying files back to the workspace.
Only restores files that exist in the checkpoint. Does NOT delete
files that were added after the checkpoint was created.
Returns a dict with:
ok: True
files_restored: list of restored file paths
"""
resolved = _resolve_workspace(workspace)
checkpoint = _validate_checkpoint_id(checkpoint)
ws_hash = _workspace_hash(resolved)
ckpt_dir = _checkpoint_root() / ws_hash / checkpoint
if not ckpt_dir.is_dir():
raise ValueError(f"Checkpoint not found: {checkpoint}")
git = _find_git()
# Get list of files in the checkpoint
ls_result = subprocess.run(
[git, "-C", str(ckpt_dir), "ls-files"],
capture_output=True, text=True, timeout=10,
)
if ls_result.returncode != 0:
raise ValueError("Failed to list checkpoint files")
ckpt_files = [f for f in ls_result.stdout.strip().split("\n") if f]
restored = []
errors = []
for rel_path in ckpt_files:
ckpt_file = ckpt_dir / rel_path
ws_file = Path(resolved) / rel_path
if not ckpt_file.is_file():
continue
try:
ws_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(str(ckpt_file), str(ws_file))
restored.append(rel_path)
except OSError as e:
errors.append({"file": rel_path, "error": str(e)})
logger.warning("Failed to restore %s: %s", rel_path, e)
return {
"ok": True,
"checkpoint": checkpoint,
"workspace": resolved,
"files_restored": restored,
"files_restored_count": len(restored),
"errors": errors,
}