"""Helpers for WebUI-managed Hermes Agent git worktrees.""" from __future__ import annotations import subprocess import time from contextlib import redirect_stderr, redirect_stdout from io import StringIO from pathlib import Path import logging logger = logging.getLogger(__name__) def _run_git(args: list[str], cwd: str | Path, timeout: float = 2) -> subprocess.CompletedProcess: return subprocess.run( ["git", *args], cwd=str(cwd), text=True, capture_output=True, timeout=timeout, check=False, ) def _resolve_path(path: str | Path | None) -> Path | None: if not path: return None try: return Path(path).expanduser().resolve(strict=False) except (OSError, RuntimeError): return Path(path).expanduser() def _worktree_list_cwd(worktree_path: Path, repo_root: str | Path | None) -> Path | None: repo = _resolve_path(repo_root) if repo and repo.is_dir(): return repo if worktree_path.is_dir(): return worktree_path return None def _parse_worktree_list_porcelain(output: str) -> set[str]: paths: set[str] = set() for line in str(output or "").splitlines(): if not line.startswith("worktree "): continue path = line[len("worktree "):].strip() if not path: continue resolved = _resolve_path(path) paths.add(str(resolved or Path(path).expanduser())) return paths def _worktree_listed(worktree_path: Path, repo_root: str | Path | None) -> bool: """Return whether git currently lists the worktree. False is a safe fallback for probe failures, not definitive orphan proof. Future cleanup UI must combine this with the rest of the status payload. """ cwd = _worktree_list_cwd(worktree_path, repo_root) if cwd is None: return False try: result = _run_git(["worktree", "list", "--porcelain"], cwd) except (OSError, subprocess.TimeoutExpired): return False if result.returncode != 0: return False return str(worktree_path) in _parse_worktree_list_porcelain(result.stdout) def _status_porcelain(worktree_path: Path) -> tuple[bool, int]: try: result = _run_git( ["status", "--porcelain", "--untracked-files=normal"], worktree_path, ) except (OSError, subprocess.TimeoutExpired): return False, 0 if result.returncode != 0: return False, 0 lines = [line for line in result.stdout.splitlines() if line] return bool(lines), sum(1 for line in lines if line.startswith("??")) def _ahead_behind(worktree_path: Path) -> dict: payload = { "ahead": 0, "behind": 0, "available": False, "upstream": None, } try: upstream = _run_git( ["rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"], worktree_path, ) except (OSError, subprocess.TimeoutExpired): return payload if upstream.returncode != 0: return payload upstream_ref = upstream.stdout.strip() if not upstream_ref: return payload payload["upstream"] = upstream_ref try: counts = _run_git( ["rev-list", "--left-right", "--count", "HEAD...@{u}"], worktree_path, ) except (OSError, subprocess.TimeoutExpired): return payload if counts.returncode != 0: return payload parts = counts.stdout.strip().split() if len(parts) != 2: return payload try: payload["ahead"] = max(0, int(parts[0])) payload["behind"] = max(0, int(parts[1])) payload["available"] = True except ValueError: pass return payload def _locked_by_stream(session) -> bool: stream_id = getattr(session, "active_stream_id", None) if not stream_id: return False try: from api.config import STREAMS, STREAMS_LOCK with STREAMS_LOCK: return stream_id in STREAMS except Exception: return False def _locked_by_terminal(session_id: str, worktree_path: Path) -> bool: try: from api.terminal import get_terminal term = get_terminal(session_id) except Exception: return False if not term: return False try: if not term.is_alive(): return False terminal_workspace = _resolve_path(getattr(term, "workspace", None)) return terminal_workspace == worktree_path except Exception: return False def worktree_status_for_session(session) -> dict: """Return a read-only worktree status snapshot for a WebUI session.""" raw_path = getattr(session, "worktree_path", None) if not raw_path: raise ValueError("Session is not worktree-backed") worktree_path = _resolve_path(raw_path) if worktree_path is None: raise ValueError("Session is not worktree-backed") exists = worktree_path.is_dir() status = { "path": str(worktree_path), "exists": bool(exists), "dirty": False, "untracked_count": 0, "ahead_behind": { "ahead": 0, "behind": 0, "available": False, "upstream": None, }, "locked_by_stream": _locked_by_stream(session), "locked_by_terminal": _locked_by_terminal( getattr(session, "session_id", ""), worktree_path, ), "listed": _worktree_listed( worktree_path, getattr(session, "worktree_repo_root", None), ), } if not exists: return status dirty, untracked_count = _status_porcelain(worktree_path) status["dirty"] = dirty status["untracked_count"] = untracked_count status["ahead_behind"] = _ahead_behind(worktree_path) return status def remove_worktree_for_session(session, *, force: bool = False) -> dict: """Remove a session's git worktree from disk. Returns status dict with keys: ok, removed_path, warnings. Raises ValueError for terminal blockers (locked by stream/terminal, dirty with force=False). """ raw_path = getattr(session, "worktree_path", None) if not raw_path: raise ValueError("Session is not worktree-backed") worktree_path = _resolve_path(raw_path) if worktree_path is None: raise ValueError("Session is not worktree-backed") # Read current status before removal status = worktree_status_for_session(session) if not status["exists"]: return { "ok": True, "removed_path": str(worktree_path), "warnings": ["Worktree directory no longer exists on disk."], } warnings = [] # Guard: locked by stream if status["locked_by_stream"]: raise ValueError("Worktree is locked by an active streaming session") # Guard: locked by terminal if status["locked_by_terminal"]: raise ValueError("Worktree is locked by an active terminal session") # Guard: local changes and unpushed commits without explicit force. if status["dirty"] and not force: raise ValueError( "Worktree has uncommitted changes. Use force=true to override." ) if status["untracked_count"] > 0: if force: warnings.append( f"{status['untracked_count']} untracked file(s) will be removed." ) else: raise ValueError( f"Worktree has {status['untracked_count']} untracked file(s). " "Use force=true to override." ) ahead = int((status.get("ahead_behind") or {}).get("ahead") or 0) if ahead > 0: if force: warnings.append(f"{ahead} unpushed commit(s) will be removed.") else: raise ValueError( f"Worktree has {ahead} unpushed commit(s). " "Use force=true to override." ) # Remove the worktree — must run from the repo root, not the worktree dir repo_root = getattr(session, "worktree_repo_root", None) if not repo_root: raise ValueError("Session missing worktree_repo_root") try: remove_args = ["worktree", "remove"] if force: remove_args.append("--force") remove_args.append(str(worktree_path)) result = _run_git(remove_args, str(repo_root), timeout=10) except (OSError, subprocess.TimeoutExpired) as exc: raise ValueError(f"Failed to remove worktree: {exc}") from exc if result.returncode != 0: stderr = (result.stderr or "").strip().split("\n")[-1] raise ValueError( f"git worktree remove failed: {stderr or result.stdout.strip()}" ) # Prune in case the worktree dir was already gone try: _run_git( ["worktree", "prune"], str(repo_root), timeout=5, ) except Exception: pass return { "ok": True, "removed_path": str(worktree_path), "warnings": warnings or None, } def find_git_repo_root(workspace: str | Path) -> Path: """Return the enclosing git repo root for *workspace*. Use git itself instead of checking ``workspace/.git`` so nested workspaces and linked git worktrees are both handled correctly. """ ws = Path(workspace).expanduser().resolve() if not ws.is_dir(): raise ValueError("Workspace path does not exist or is not a directory") try: result = subprocess.run( ["git", "rev-parse", "--show-toplevel"], cwd=ws, text=True, capture_output=True, timeout=5, check=False, ) except (OSError, subprocess.TimeoutExpired) as exc: raise ValueError("Workspace is not inside a git repository") from exc if result.returncode != 0: raise ValueError("Workspace is not inside a git repository") root = result.stdout.strip() if not root: raise ValueError("Workspace is not inside a git repository") return Path(root).expanduser().resolve() def _setup_agent_worktree(repo_root: str) -> dict: try: import api.config # noqa: F401 # ensure Hermes Agent dir is on sys.path from cli import _setup_worktree except Exception as exc: raise RuntimeError("Hermes Agent worktree helper is unavailable") from exc output = StringIO() with redirect_stdout(output), redirect_stderr(output): info = _setup_worktree(repo_root) emitted = output.getvalue().strip() if emitted: logger.debug("Hermes Agent worktree helper output: %s", emitted) if not info: raise RuntimeError("Hermes Agent failed to create a git worktree") return info def create_worktree_for_workspace(workspace: str | Path) -> dict: repo_root = find_git_repo_root(workspace) info = _setup_agent_worktree(str(repo_root)) path = info.get("path") branch = info.get("branch") if not path or not branch: raise RuntimeError("Hermes Agent returned incomplete worktree metadata") return { "path": str(Path(path).expanduser().resolve()), "branch": str(branch), "repo_root": str(Path(info.get("repo_root") or repo_root).expanduser().resolve()), "created_at": time.time(), }