Files
hermes-webui/api/worktrees.py
T
Hermes Agent 10cfcee30e stage-342: apply Opus SHOULD-FIX — tighten worktree status _run_git timeout 5s → 2s
Worst case 4×5s=20s per polling request on ThreadingHTTPServer pool is risky
given today's _cron_env_lock near-miss on production 8787. Status probes
should fail fast; client can retry. All four call sites use default timeout.
2026-05-12 05:22:01 +00:00

262 lines
7.9 KiB
Python

"""Helpers for WebUI-managed Hermes Agent git worktrees."""
from __future__ import annotations
import subprocess
import time
from contextlib import redirect_stderr, redirect_stdout
from io import StringIO
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
def _run_git(args: list[str], cwd: str | Path, timeout: float = 2) -> subprocess.CompletedProcess:
return subprocess.run(
["git", *args],
cwd=str(cwd),
text=True,
capture_output=True,
timeout=timeout,
check=False,
)
def _resolve_path(path: str | Path | None) -> Path | None:
if not path:
return None
try:
return Path(path).expanduser().resolve(strict=False)
except (OSError, RuntimeError):
return Path(path).expanduser()
def _worktree_list_cwd(worktree_path: Path, repo_root: str | Path | None) -> Path | None:
repo = _resolve_path(repo_root)
if repo and repo.is_dir():
return repo
if worktree_path.is_dir():
return worktree_path
return None
def _parse_worktree_list_porcelain(output: str) -> set[str]:
paths: set[str] = set()
for line in str(output or "").splitlines():
if not line.startswith("worktree "):
continue
path = line[len("worktree "):].strip()
if not path:
continue
resolved = _resolve_path(path)
paths.add(str(resolved or Path(path).expanduser()))
return paths
def _worktree_listed(worktree_path: Path, repo_root: str | Path | None) -> bool:
"""Return whether git currently lists the worktree.
False is a safe fallback for probe failures, not definitive orphan proof.
Future cleanup UI must combine this with the rest of the status payload.
"""
cwd = _worktree_list_cwd(worktree_path, repo_root)
if cwd is None:
return False
try:
result = _run_git(["worktree", "list", "--porcelain"], cwd)
except (OSError, subprocess.TimeoutExpired):
return False
if result.returncode != 0:
return False
return str(worktree_path) in _parse_worktree_list_porcelain(result.stdout)
def _status_porcelain(worktree_path: Path) -> tuple[bool, int]:
try:
result = _run_git(
["status", "--porcelain", "--untracked-files=normal"],
worktree_path,
)
except (OSError, subprocess.TimeoutExpired):
return False, 0
if result.returncode != 0:
return False, 0
lines = [line for line in result.stdout.splitlines() if line]
return bool(lines), sum(1 for line in lines if line.startswith("??"))
def _ahead_behind(worktree_path: Path) -> dict:
payload = {
"ahead": 0,
"behind": 0,
"available": False,
"upstream": None,
}
try:
upstream = _run_git(
["rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"],
worktree_path,
)
except (OSError, subprocess.TimeoutExpired):
return payload
if upstream.returncode != 0:
return payload
upstream_ref = upstream.stdout.strip()
if not upstream_ref:
return payload
payload["upstream"] = upstream_ref
try:
counts = _run_git(
["rev-list", "--left-right", "--count", "HEAD...@{u}"],
worktree_path,
)
except (OSError, subprocess.TimeoutExpired):
return payload
if counts.returncode != 0:
return payload
parts = counts.stdout.strip().split()
if len(parts) != 2:
return payload
try:
payload["ahead"] = max(0, int(parts[0]))
payload["behind"] = max(0, int(parts[1]))
payload["available"] = True
except ValueError:
pass
return payload
def _locked_by_stream(session) -> bool:
stream_id = getattr(session, "active_stream_id", None)
if not stream_id:
return False
try:
from api.config import STREAMS, STREAMS_LOCK
with STREAMS_LOCK:
return stream_id in STREAMS
except Exception:
return False
def _locked_by_terminal(session_id: str, worktree_path: Path) -> bool:
try:
from api.terminal import get_terminal
term = get_terminal(session_id)
except Exception:
return False
if not term:
return False
try:
if not term.is_alive():
return False
terminal_workspace = _resolve_path(getattr(term, "workspace", None))
return terminal_workspace == worktree_path
except Exception:
return False
def worktree_status_for_session(session) -> dict:
"""Return a read-only worktree status snapshot for a WebUI session."""
raw_path = getattr(session, "worktree_path", None)
if not raw_path:
raise ValueError("Session is not worktree-backed")
worktree_path = _resolve_path(raw_path)
if worktree_path is None:
raise ValueError("Session is not worktree-backed")
exists = worktree_path.is_dir()
status = {
"path": str(worktree_path),
"exists": bool(exists),
"dirty": False,
"untracked_count": 0,
"ahead_behind": {
"ahead": 0,
"behind": 0,
"available": False,
"upstream": None,
},
"locked_by_stream": _locked_by_stream(session),
"locked_by_terminal": _locked_by_terminal(
getattr(session, "session_id", ""),
worktree_path,
),
"listed": _worktree_listed(
worktree_path,
getattr(session, "worktree_repo_root", None),
),
}
if not exists:
return status
dirty, untracked_count = _status_porcelain(worktree_path)
status["dirty"] = dirty
status["untracked_count"] = untracked_count
status["ahead_behind"] = _ahead_behind(worktree_path)
return status
def find_git_repo_root(workspace: str | Path) -> Path:
"""Return the enclosing git repo root for *workspace*.
Use git itself instead of checking ``workspace/.git`` so nested workspaces
and linked git worktrees are both handled correctly.
"""
ws = Path(workspace).expanduser().resolve()
if not ws.is_dir():
raise ValueError("Workspace path does not exist or is not a directory")
try:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
cwd=ws,
text=True,
capture_output=True,
timeout=5,
check=False,
)
except (OSError, subprocess.TimeoutExpired) as exc:
raise ValueError("Workspace is not inside a git repository") from exc
if result.returncode != 0:
raise ValueError("Workspace is not inside a git repository")
root = result.stdout.strip()
if not root:
raise ValueError("Workspace is not inside a git repository")
return Path(root).expanduser().resolve()
def _setup_agent_worktree(repo_root: str) -> dict:
try:
import api.config # noqa: F401 # ensure Hermes Agent dir is on sys.path
from cli import _setup_worktree
except Exception as exc:
raise RuntimeError("Hermes Agent worktree helper is unavailable") from exc
output = StringIO()
with redirect_stdout(output), redirect_stderr(output):
info = _setup_worktree(repo_root)
emitted = output.getvalue().strip()
if emitted:
logger.debug("Hermes Agent worktree helper output: %s", emitted)
if not info:
raise RuntimeError("Hermes Agent failed to create a git worktree")
return info
def create_worktree_for_workspace(workspace: str | Path) -> dict:
repo_root = find_git_repo_root(workspace)
info = _setup_agent_worktree(str(repo_root))
path = info.get("path")
branch = info.get("branch")
if not path or not branch:
raise RuntimeError("Hermes Agent returned incomplete worktree metadata")
return {
"path": str(Path(path).expanduser().resolve()),
"branch": str(branch),
"repo_root": str(Path(info.get("repo_root") or repo_root).expanduser().resolve()),
"created_at": time.time(),
}