Files
2026-05-13 09:49:15 +08:00

358 lines
11 KiB
Python

"""Helpers for WebUI-managed Hermes Agent git worktrees."""
from __future__ import annotations
import subprocess
import time
from contextlib import redirect_stderr, redirect_stdout
from io import StringIO
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
def _run_git(args: list[str], cwd: str | Path, timeout: float = 2) -> subprocess.CompletedProcess:
return subprocess.run(
["git", *args],
cwd=str(cwd),
text=True,
capture_output=True,
timeout=timeout,
check=False,
)
def _resolve_path(path: str | Path | None) -> Path | None:
if not path:
return None
try:
return Path(path).expanduser().resolve(strict=False)
except (OSError, RuntimeError):
return Path(path).expanduser()
def _worktree_list_cwd(worktree_path: Path, repo_root: str | Path | None) -> Path | None:
repo = _resolve_path(repo_root)
if repo and repo.is_dir():
return repo
if worktree_path.is_dir():
return worktree_path
return None
def _parse_worktree_list_porcelain(output: str) -> set[str]:
paths: set[str] = set()
for line in str(output or "").splitlines():
if not line.startswith("worktree "):
continue
path = line[len("worktree "):].strip()
if not path:
continue
resolved = _resolve_path(path)
paths.add(str(resolved or Path(path).expanduser()))
return paths
def _worktree_listed(worktree_path: Path, repo_root: str | Path | None) -> bool:
"""Return whether git currently lists the worktree.
False is a safe fallback for probe failures, not definitive orphan proof.
Future cleanup UI must combine this with the rest of the status payload.
"""
cwd = _worktree_list_cwd(worktree_path, repo_root)
if cwd is None:
return False
try:
result = _run_git(["worktree", "list", "--porcelain"], cwd)
except (OSError, subprocess.TimeoutExpired):
return False
if result.returncode != 0:
return False
return str(worktree_path) in _parse_worktree_list_porcelain(result.stdout)
def _status_porcelain(worktree_path: Path) -> tuple[bool, int]:
try:
result = _run_git(
["status", "--porcelain", "--untracked-files=normal"],
worktree_path,
)
except (OSError, subprocess.TimeoutExpired):
return False, 0
if result.returncode != 0:
return False, 0
lines = [line for line in result.stdout.splitlines() if line]
return bool(lines), sum(1 for line in lines if line.startswith("??"))
def _ahead_behind(worktree_path: Path) -> dict:
payload = {
"ahead": 0,
"behind": 0,
"available": False,
"upstream": None,
}
try:
upstream = _run_git(
["rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"],
worktree_path,
)
except (OSError, subprocess.TimeoutExpired):
return payload
if upstream.returncode != 0:
return payload
upstream_ref = upstream.stdout.strip()
if not upstream_ref:
return payload
payload["upstream"] = upstream_ref
try:
counts = _run_git(
["rev-list", "--left-right", "--count", "HEAD...@{u}"],
worktree_path,
)
except (OSError, subprocess.TimeoutExpired):
return payload
if counts.returncode != 0:
return payload
parts = counts.stdout.strip().split()
if len(parts) != 2:
return payload
try:
payload["ahead"] = max(0, int(parts[0]))
payload["behind"] = max(0, int(parts[1]))
payload["available"] = True
except ValueError:
pass
return payload
def _locked_by_stream(session) -> bool:
stream_id = getattr(session, "active_stream_id", None)
if not stream_id:
return False
try:
from api.config import STREAMS, STREAMS_LOCK
with STREAMS_LOCK:
return stream_id in STREAMS
except Exception:
return False
def _locked_by_terminal(session_id: str, worktree_path: Path) -> bool:
try:
from api.terminal import get_terminal
term = get_terminal(session_id)
except Exception:
return False
if not term:
return False
try:
if not term.is_alive():
return False
terminal_workspace = _resolve_path(getattr(term, "workspace", None))
return terminal_workspace == worktree_path
except Exception:
return False
def worktree_status_for_session(session) -> dict:
"""Return a read-only worktree status snapshot for a WebUI session."""
raw_path = getattr(session, "worktree_path", None)
if not raw_path:
raise ValueError("Session is not worktree-backed")
worktree_path = _resolve_path(raw_path)
if worktree_path is None:
raise ValueError("Session is not worktree-backed")
exists = worktree_path.is_dir()
status = {
"path": str(worktree_path),
"exists": bool(exists),
"dirty": False,
"untracked_count": 0,
"ahead_behind": {
"ahead": 0,
"behind": 0,
"available": False,
"upstream": None,
},
"locked_by_stream": _locked_by_stream(session),
"locked_by_terminal": _locked_by_terminal(
getattr(session, "session_id", ""),
worktree_path,
),
"listed": _worktree_listed(
worktree_path,
getattr(session, "worktree_repo_root", None),
),
}
if not exists:
return status
dirty, untracked_count = _status_porcelain(worktree_path)
status["dirty"] = dirty
status["untracked_count"] = untracked_count
status["ahead_behind"] = _ahead_behind(worktree_path)
return status
def remove_worktree_for_session(session, *, force: bool = False) -> dict:
"""Remove a session's git worktree from disk.
Returns status dict with keys: ok, removed_path, warnings.
Raises ValueError for terminal blockers (locked by stream/terminal,
dirty with force=False).
"""
raw_path = getattr(session, "worktree_path", None)
if not raw_path:
raise ValueError("Session is not worktree-backed")
worktree_path = _resolve_path(raw_path)
if worktree_path is None:
raise ValueError("Session is not worktree-backed")
# Read current status before removal
status = worktree_status_for_session(session)
if not status["exists"]:
return {
"ok": True,
"removed_path": str(worktree_path),
"warnings": ["Worktree directory no longer exists on disk."],
}
warnings = []
# Guard: locked by stream
if status["locked_by_stream"]:
raise ValueError("Worktree is locked by an active streaming session")
# Guard: locked by terminal
if status["locked_by_terminal"]:
raise ValueError("Worktree is locked by an active terminal session")
# Guard: local changes and unpushed commits without explicit force.
if status["dirty"] and not force:
raise ValueError(
"Worktree has uncommitted changes. Use force=true to override."
)
if status["untracked_count"] > 0:
if force:
warnings.append(
f"{status['untracked_count']} untracked file(s) will be removed."
)
else:
raise ValueError(
f"Worktree has {status['untracked_count']} untracked file(s). "
"Use force=true to override."
)
ahead = int((status.get("ahead_behind") or {}).get("ahead") or 0)
if ahead > 0:
if force:
warnings.append(f"{ahead} unpushed commit(s) will be removed.")
else:
raise ValueError(
f"Worktree has {ahead} unpushed commit(s). "
"Use force=true to override."
)
# Remove the worktree — must run from the repo root, not the worktree dir
repo_root = getattr(session, "worktree_repo_root", None)
if not repo_root:
raise ValueError("Session missing worktree_repo_root")
try:
remove_args = ["worktree", "remove"]
if force:
remove_args.append("--force")
remove_args.append(str(worktree_path))
result = _run_git(remove_args, str(repo_root), timeout=10)
except (OSError, subprocess.TimeoutExpired) as exc:
raise ValueError(f"Failed to remove worktree: {exc}") from exc
if result.returncode != 0:
stderr = (result.stderr or "").strip().split("\n")[-1]
raise ValueError(
f"git worktree remove failed: {stderr or result.stdout.strip()}"
)
# Prune in case the worktree dir was already gone
try:
_run_git(
["worktree", "prune"],
str(repo_root),
timeout=5,
)
except Exception:
pass
return {
"ok": True,
"removed_path": str(worktree_path),
"warnings": warnings or None,
}
def find_git_repo_root(workspace: str | Path) -> Path:
"""Return the enclosing git repo root for *workspace*.
Use git itself instead of checking ``workspace/.git`` so nested workspaces
and linked git worktrees are both handled correctly.
"""
ws = Path(workspace).expanduser().resolve()
if not ws.is_dir():
raise ValueError("Workspace path does not exist or is not a directory")
try:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
cwd=ws,
text=True,
capture_output=True,
timeout=5,
check=False,
)
except (OSError, subprocess.TimeoutExpired) as exc:
raise ValueError("Workspace is not inside a git repository") from exc
if result.returncode != 0:
raise ValueError("Workspace is not inside a git repository")
root = result.stdout.strip()
if not root:
raise ValueError("Workspace is not inside a git repository")
return Path(root).expanduser().resolve()
def _setup_agent_worktree(repo_root: str) -> dict:
try:
import api.config # noqa: F401 # ensure Hermes Agent dir is on sys.path
from cli import _setup_worktree
except Exception as exc:
raise RuntimeError("Hermes Agent worktree helper is unavailable") from exc
output = StringIO()
with redirect_stdout(output), redirect_stderr(output):
info = _setup_worktree(repo_root)
emitted = output.getvalue().strip()
if emitted:
logger.debug("Hermes Agent worktree helper output: %s", emitted)
if not info:
raise RuntimeError("Hermes Agent failed to create a git worktree")
return info
def create_worktree_for_workspace(workspace: str | Path) -> dict:
repo_root = find_git_repo_root(workspace)
info = _setup_agent_worktree(str(repo_root))
path = info.get("path")
branch = info.get("branch")
if not path or not branch:
raise RuntimeError("Hermes Agent returned incomplete worktree metadata")
return {
"path": str(Path(path).expanduser().resolve()),
"branch": str(branch),
"repo_root": str(Path(info.get("repo_root") or repo_root).expanduser().resolve()),
"created_at": time.time(),
}