mirror of
https://github.com/nesquena/hermes-webui.git
synced 2026-05-14 02:27:00 +00:00
358 lines
11 KiB
Python
358 lines
11 KiB
Python
"""Helpers for WebUI-managed Hermes Agent git worktrees."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import subprocess
|
|
import time
|
|
from contextlib import redirect_stderr, redirect_stdout
|
|
from io import StringIO
|
|
from pathlib import Path
|
|
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _run_git(args: list[str], cwd: str | Path, timeout: float = 2) -> subprocess.CompletedProcess:
|
|
return subprocess.run(
|
|
["git", *args],
|
|
cwd=str(cwd),
|
|
text=True,
|
|
capture_output=True,
|
|
timeout=timeout,
|
|
check=False,
|
|
)
|
|
|
|
|
|
def _resolve_path(path: str | Path | None) -> Path | None:
|
|
if not path:
|
|
return None
|
|
try:
|
|
return Path(path).expanduser().resolve(strict=False)
|
|
except (OSError, RuntimeError):
|
|
return Path(path).expanduser()
|
|
|
|
|
|
def _worktree_list_cwd(worktree_path: Path, repo_root: str | Path | None) -> Path | None:
|
|
repo = _resolve_path(repo_root)
|
|
if repo and repo.is_dir():
|
|
return repo
|
|
if worktree_path.is_dir():
|
|
return worktree_path
|
|
return None
|
|
|
|
|
|
def _parse_worktree_list_porcelain(output: str) -> set[str]:
|
|
paths: set[str] = set()
|
|
for line in str(output or "").splitlines():
|
|
if not line.startswith("worktree "):
|
|
continue
|
|
path = line[len("worktree "):].strip()
|
|
if not path:
|
|
continue
|
|
resolved = _resolve_path(path)
|
|
paths.add(str(resolved or Path(path).expanduser()))
|
|
return paths
|
|
|
|
|
|
def _worktree_listed(worktree_path: Path, repo_root: str | Path | None) -> bool:
|
|
"""Return whether git currently lists the worktree.
|
|
|
|
False is a safe fallback for probe failures, not definitive orphan proof.
|
|
Future cleanup UI must combine this with the rest of the status payload.
|
|
"""
|
|
cwd = _worktree_list_cwd(worktree_path, repo_root)
|
|
if cwd is None:
|
|
return False
|
|
try:
|
|
result = _run_git(["worktree", "list", "--porcelain"], cwd)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
return False
|
|
if result.returncode != 0:
|
|
return False
|
|
return str(worktree_path) in _parse_worktree_list_porcelain(result.stdout)
|
|
|
|
|
|
def _status_porcelain(worktree_path: Path) -> tuple[bool, int]:
|
|
try:
|
|
result = _run_git(
|
|
["status", "--porcelain", "--untracked-files=normal"],
|
|
worktree_path,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
return False, 0
|
|
if result.returncode != 0:
|
|
return False, 0
|
|
lines = [line for line in result.stdout.splitlines() if line]
|
|
return bool(lines), sum(1 for line in lines if line.startswith("??"))
|
|
|
|
|
|
def _ahead_behind(worktree_path: Path) -> dict:
|
|
payload = {
|
|
"ahead": 0,
|
|
"behind": 0,
|
|
"available": False,
|
|
"upstream": None,
|
|
}
|
|
try:
|
|
upstream = _run_git(
|
|
["rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"],
|
|
worktree_path,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
return payload
|
|
if upstream.returncode != 0:
|
|
return payload
|
|
upstream_ref = upstream.stdout.strip()
|
|
if not upstream_ref:
|
|
return payload
|
|
payload["upstream"] = upstream_ref
|
|
try:
|
|
counts = _run_git(
|
|
["rev-list", "--left-right", "--count", "HEAD...@{u}"],
|
|
worktree_path,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
return payload
|
|
if counts.returncode != 0:
|
|
return payload
|
|
parts = counts.stdout.strip().split()
|
|
if len(parts) != 2:
|
|
return payload
|
|
try:
|
|
payload["ahead"] = max(0, int(parts[0]))
|
|
payload["behind"] = max(0, int(parts[1]))
|
|
payload["available"] = True
|
|
except ValueError:
|
|
pass
|
|
return payload
|
|
|
|
|
|
def _locked_by_stream(session) -> bool:
|
|
stream_id = getattr(session, "active_stream_id", None)
|
|
if not stream_id:
|
|
return False
|
|
try:
|
|
from api.config import STREAMS, STREAMS_LOCK
|
|
|
|
with STREAMS_LOCK:
|
|
return stream_id in STREAMS
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _locked_by_terminal(session_id: str, worktree_path: Path) -> bool:
|
|
try:
|
|
from api.terminal import get_terminal
|
|
|
|
term = get_terminal(session_id)
|
|
except Exception:
|
|
return False
|
|
if not term:
|
|
return False
|
|
try:
|
|
if not term.is_alive():
|
|
return False
|
|
terminal_workspace = _resolve_path(getattr(term, "workspace", None))
|
|
return terminal_workspace == worktree_path
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def worktree_status_for_session(session) -> dict:
|
|
"""Return a read-only worktree status snapshot for a WebUI session."""
|
|
raw_path = getattr(session, "worktree_path", None)
|
|
if not raw_path:
|
|
raise ValueError("Session is not worktree-backed")
|
|
|
|
worktree_path = _resolve_path(raw_path)
|
|
if worktree_path is None:
|
|
raise ValueError("Session is not worktree-backed")
|
|
|
|
exists = worktree_path.is_dir()
|
|
status = {
|
|
"path": str(worktree_path),
|
|
"exists": bool(exists),
|
|
"dirty": False,
|
|
"untracked_count": 0,
|
|
"ahead_behind": {
|
|
"ahead": 0,
|
|
"behind": 0,
|
|
"available": False,
|
|
"upstream": None,
|
|
},
|
|
"locked_by_stream": _locked_by_stream(session),
|
|
"locked_by_terminal": _locked_by_terminal(
|
|
getattr(session, "session_id", ""),
|
|
worktree_path,
|
|
),
|
|
"listed": _worktree_listed(
|
|
worktree_path,
|
|
getattr(session, "worktree_repo_root", None),
|
|
),
|
|
}
|
|
if not exists:
|
|
return status
|
|
|
|
dirty, untracked_count = _status_porcelain(worktree_path)
|
|
status["dirty"] = dirty
|
|
status["untracked_count"] = untracked_count
|
|
status["ahead_behind"] = _ahead_behind(worktree_path)
|
|
return status
|
|
|
|
|
|
def remove_worktree_for_session(session, *, force: bool = False) -> dict:
|
|
"""Remove a session's git worktree from disk.
|
|
|
|
Returns status dict with keys: ok, removed_path, warnings.
|
|
Raises ValueError for terminal blockers (locked by stream/terminal,
|
|
dirty with force=False).
|
|
"""
|
|
raw_path = getattr(session, "worktree_path", None)
|
|
if not raw_path:
|
|
raise ValueError("Session is not worktree-backed")
|
|
|
|
worktree_path = _resolve_path(raw_path)
|
|
if worktree_path is None:
|
|
raise ValueError("Session is not worktree-backed")
|
|
|
|
# Read current status before removal
|
|
status = worktree_status_for_session(session)
|
|
|
|
if not status["exists"]:
|
|
return {
|
|
"ok": True,
|
|
"removed_path": str(worktree_path),
|
|
"warnings": ["Worktree directory no longer exists on disk."],
|
|
}
|
|
|
|
warnings = []
|
|
|
|
# Guard: locked by stream
|
|
if status["locked_by_stream"]:
|
|
raise ValueError("Worktree is locked by an active streaming session")
|
|
|
|
# Guard: locked by terminal
|
|
if status["locked_by_terminal"]:
|
|
raise ValueError("Worktree is locked by an active terminal session")
|
|
|
|
# Guard: local changes and unpushed commits without explicit force.
|
|
if status["dirty"] and not force:
|
|
raise ValueError(
|
|
"Worktree has uncommitted changes. Use force=true to override."
|
|
)
|
|
if status["untracked_count"] > 0:
|
|
if force:
|
|
warnings.append(
|
|
f"{status['untracked_count']} untracked file(s) will be removed."
|
|
)
|
|
else:
|
|
raise ValueError(
|
|
f"Worktree has {status['untracked_count']} untracked file(s). "
|
|
"Use force=true to override."
|
|
)
|
|
ahead = int((status.get("ahead_behind") or {}).get("ahead") or 0)
|
|
if ahead > 0:
|
|
if force:
|
|
warnings.append(f"{ahead} unpushed commit(s) will be removed.")
|
|
else:
|
|
raise ValueError(
|
|
f"Worktree has {ahead} unpushed commit(s). "
|
|
"Use force=true to override."
|
|
)
|
|
|
|
# Remove the worktree — must run from the repo root, not the worktree dir
|
|
repo_root = getattr(session, "worktree_repo_root", None)
|
|
if not repo_root:
|
|
raise ValueError("Session missing worktree_repo_root")
|
|
try:
|
|
remove_args = ["worktree", "remove"]
|
|
if force:
|
|
remove_args.append("--force")
|
|
remove_args.append(str(worktree_path))
|
|
result = _run_git(remove_args, str(repo_root), timeout=10)
|
|
except (OSError, subprocess.TimeoutExpired) as exc:
|
|
raise ValueError(f"Failed to remove worktree: {exc}") from exc
|
|
|
|
if result.returncode != 0:
|
|
stderr = (result.stderr or "").strip().split("\n")[-1]
|
|
raise ValueError(
|
|
f"git worktree remove failed: {stderr or result.stdout.strip()}"
|
|
)
|
|
|
|
# Prune in case the worktree dir was already gone
|
|
try:
|
|
_run_git(
|
|
["worktree", "prune"],
|
|
str(repo_root),
|
|
timeout=5,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"ok": True,
|
|
"removed_path": str(worktree_path),
|
|
"warnings": warnings or None,
|
|
}
|
|
|
|
|
|
def find_git_repo_root(workspace: str | Path) -> Path:
|
|
"""Return the enclosing git repo root for *workspace*.
|
|
|
|
Use git itself instead of checking ``workspace/.git`` so nested workspaces
|
|
and linked git worktrees are both handled correctly.
|
|
"""
|
|
ws = Path(workspace).expanduser().resolve()
|
|
if not ws.is_dir():
|
|
raise ValueError("Workspace path does not exist or is not a directory")
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "rev-parse", "--show-toplevel"],
|
|
cwd=ws,
|
|
text=True,
|
|
capture_output=True,
|
|
timeout=5,
|
|
check=False,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired) as exc:
|
|
raise ValueError("Workspace is not inside a git repository") from exc
|
|
if result.returncode != 0:
|
|
raise ValueError("Workspace is not inside a git repository")
|
|
root = result.stdout.strip()
|
|
if not root:
|
|
raise ValueError("Workspace is not inside a git repository")
|
|
return Path(root).expanduser().resolve()
|
|
|
|
|
|
def _setup_agent_worktree(repo_root: str) -> dict:
|
|
try:
|
|
import api.config # noqa: F401 # ensure Hermes Agent dir is on sys.path
|
|
from cli import _setup_worktree
|
|
except Exception as exc:
|
|
raise RuntimeError("Hermes Agent worktree helper is unavailable") from exc
|
|
output = StringIO()
|
|
with redirect_stdout(output), redirect_stderr(output):
|
|
info = _setup_worktree(repo_root)
|
|
emitted = output.getvalue().strip()
|
|
if emitted:
|
|
logger.debug("Hermes Agent worktree helper output: %s", emitted)
|
|
if not info:
|
|
raise RuntimeError("Hermes Agent failed to create a git worktree")
|
|
return info
|
|
|
|
|
|
def create_worktree_for_workspace(workspace: str | Path) -> dict:
|
|
repo_root = find_git_repo_root(workspace)
|
|
info = _setup_agent_worktree(str(repo_root))
|
|
path = info.get("path")
|
|
branch = info.get("branch")
|
|
if not path or not branch:
|
|
raise RuntimeError("Hermes Agent returned incomplete worktree metadata")
|
|
return {
|
|
"path": str(Path(path).expanduser().resolve()),
|
|
"branch": str(branch),
|
|
"repo_root": str(Path(info.get("repo_root") or repo_root).expanduser().resolve()),
|
|
"created_at": time.time(),
|
|
}
|