Files
2026-05-12 06:28:35 -06:00

811 lines
31 KiB
Python

"""
Hermes Web UI -- Workspace and file system helpers.
Workspace lists and last-used workspace are stored per-profile so each
profile has its own workspace configuration. State files live at
``{profile_home}/webui_state/workspaces.json`` and
``{profile_home}/webui_state/last_workspace.txt``. The global STATE_DIR
paths are used as fallback when no profile module is available.
"""
import json
import logging
import os
import stat
import subprocess
import concurrent.futures
from pathlib import Path
logger = logging.getLogger(__name__)
from api.config import (
WORKSPACES_FILE as _GLOBAL_WS_FILE,
LAST_WORKSPACE_FILE as _GLOBAL_LW_FILE,
DEFAULT_WORKSPACE as _BOOT_DEFAULT_WORKSPACE,
MAX_FILE_BYTES, IMAGE_EXTS, MD_EXTS
)
# ── Profile-aware path resolution ───────────────────────────────────────────
def _profile_state_dir() -> Path:
"""Return the webui_state directory for the active profile.
For the default profile, returns the global STATE_DIR (respects
HERMES_WEBUI_STATE_DIR env var for test isolation).
For named profiles, returns {profile_home}/webui_state/.
"""
try:
from api.profiles import get_active_profile_name, get_active_hermes_home
name = get_active_profile_name()
if name and name != 'default':
d = get_active_hermes_home() / 'webui_state'
d.mkdir(parents=True, exist_ok=True)
return d
except ImportError:
logger.debug("Failed to import profiles module, using global state dir")
return _GLOBAL_WS_FILE.parent
def _workspaces_file() -> Path:
"""Return the workspaces.json path for the active profile."""
return _profile_state_dir() / 'workspaces.json'
def _last_workspace_file() -> Path:
"""Return the last_workspace.txt path for the active profile."""
return _profile_state_dir() / 'last_workspace.txt'
def _profile_default_workspace() -> str:
"""Read the profile's default workspace from its config.yaml.
Checks keys in priority order:
1. 'workspace' — explicit webui workspace key
2. 'default_workspace' — alternate explicit key
3. 'terminal.cwd' — hermes-agent terminal working dir (most common)
Falls back to the live DEFAULT_WORKSPACE from api.config.
"""
try:
from api.config import get_config
cfg = get_config()
# Explicit webui workspace keys first
for key in ('workspace', 'default_workspace'):
ws = cfg.get(key)
if ws:
p = Path(str(ws)).expanduser().resolve()
if p.is_dir():
return str(p)
# Fall through to terminal.cwd — the agent's configured working directory
terminal_cfg = cfg.get('terminal', {})
if isinstance(terminal_cfg, dict):
cwd = terminal_cfg.get('cwd', '')
if cwd and str(cwd) not in ('.', ''):
p = Path(str(cwd)).expanduser().resolve()
if p.is_dir():
return str(p)
except (ImportError, Exception):
logger.debug("Failed to load profile default workspace config")
try:
from api.config import DEFAULT_WORKSPACE as _LIVE_DEFAULT_WORKSPACE
return str(Path(_LIVE_DEFAULT_WORKSPACE).expanduser().resolve())
except Exception:
return str(Path(_BOOT_DEFAULT_WORKSPACE).expanduser().resolve())
# ── Public API ──────────────────────────────────────────────────────────────
def _clean_workspace_list(workspaces: list) -> list:
"""Sanitize a workspace list:
- Preserve saved paths even when they are currently missing or inaccessible;
picker state must not be destroyed by a transient stat/permission failure.
- Remove entries whose paths live inside another profile's directory
(e.g. ~/.hermes/profiles/X/... should not appear on a different profile).
- Rename any entry whose name is literally 'default' to 'Home' (avoids
confusion with the 'default' profile name).
Returns the cleaned list (may be empty).
"""
hermes_profiles = (Path.home() / '.hermes' / 'profiles').resolve()
result = []
for w in workspaces:
path = w.get('path', '')
name = w.get('name', '')
if not path:
continue
p = _safe_resolve(Path(path).expanduser())
# Skip paths inside a DIFFERENT profile's directory (cross-profile leak).
# Allow paths inside the CURRENT profile's own directory (e.g. test workspaces
# created under ~/.hermes/profiles/webui/webui-mvp-test/).
try:
p.relative_to(hermes_profiles)
# p is under ~/.hermes/profiles/ — only skip if it's under a DIFFERENT profile
try:
from api.profiles import get_active_hermes_home
own_profile_dir = get_active_hermes_home().resolve()
p.relative_to(own_profile_dir)
# p is under our own profile dir — keep it
except (ValueError, Exception):
continue # under profiles/ but not our own — cross-profile leak, skip
except ValueError:
pass # not under profiles/ at all — keep it
# Rename confusing 'default' label to 'Home'
if name.lower() == 'default':
name = 'Home'
result.append({'path': str(p), 'name': name})
return result
def _workspace_access_error(candidate: Path, *, missing_label: str = "Path does not exist") -> str | None:
"""Return a user-facing validation error for an unusable workspace path.
``Path.exists()`` can collapse permission/stat failures into a generic falsey
result on some Python/OS combinations, which produced misleading "does not
exist" messages for macOS/TCC-denied directories. Probe with ``stat()`` so
missing paths, non-directories, and permission-denied paths can be reported
separately.
"""
try:
st = candidate.stat()
except FileNotFoundError:
return f"{missing_label}: {candidate}"
except PermissionError as exc:
return (
f"Cannot access path: {candidate}. The server process could not inspect "
f"this directory ({exc}). On macOS, grant Full Disk Access or Files and "
f"Folders permission to the Hermes/WebUI app or server process, then try again."
)
except OSError as exc:
return f"Cannot access path: {candidate}. The server process could not inspect this path ({exc})."
if not stat.S_ISDIR(st.st_mode):
return f"Path is not a directory: {candidate}"
return None
def _migrate_global_workspaces() -> list:
"""Read the legacy global workspaces.json, clean it, and return the result.
This is the migration path for users upgrading from a pre-profile version:
their global file may contain cross-profile entries, test artifacts, and
stale paths accumulated over time. We clean it in-place and rewrite it.
"""
if not _GLOBAL_WS_FILE.exists():
return []
try:
raw = json.loads(_GLOBAL_WS_FILE.read_text(encoding='utf-8'))
cleaned = _clean_workspace_list(raw)
if len(cleaned) != len(raw):
# Rewrite the cleaned version so future reads are already clean
_GLOBAL_WS_FILE.write_text(
json.dumps(cleaned, ensure_ascii=False, indent=2), encoding='utf-8'
)
return cleaned
except Exception:
return []
def load_workspaces() -> list:
ws_file = _workspaces_file()
if ws_file.exists():
try:
raw = json.loads(ws_file.read_text(encoding='utf-8'))
cleaned = _clean_workspace_list(raw)
if len(cleaned) != len(raw):
# Persist the cleaned version so stale entries don't keep reappearing
try:
ws_file.write_text(
json.dumps(cleaned, ensure_ascii=False, indent=2), encoding='utf-8'
)
except Exception:
logger.debug("Failed to persist cleaned workspace list")
return cleaned or [{'path': _profile_default_workspace(), 'name': 'Home'}]
except Exception:
logger.debug("Failed to load workspaces from %s", ws_file)
# No profile-local file yet.
# For the DEFAULT profile: migrate from the legacy global file (one-time cleanup).
# For NAMED profiles: always start clean with just their own workspace.
try:
from api.profiles import get_active_profile_name
is_default = get_active_profile_name() in ('default', None)
except ImportError:
is_default = True
if is_default:
migrated = _migrate_global_workspaces()
if migrated:
return migrated
# Fresh start: single entry from the profile's configured workspace, labeled "Home"
return [{'path': _profile_default_workspace(), 'name': 'Home'}]
def save_workspaces(workspaces: list) -> None:
ws_file = _workspaces_file()
ws_file.parent.mkdir(parents=True, exist_ok=True)
ws_file.write_text(json.dumps(workspaces, ensure_ascii=False, indent=2), encoding='utf-8')
def get_last_workspace() -> str:
lw_file = _last_workspace_file()
if lw_file.exists():
try:
p = lw_file.read_text(encoding='utf-8').strip()
if p and Path(p).is_dir():
return p
except Exception:
logger.debug("Failed to read last workspace from %s", lw_file)
# Fallback: try global file
if _GLOBAL_LW_FILE.exists():
try:
p = _GLOBAL_LW_FILE.read_text(encoding='utf-8').strip()
if p and Path(p).is_dir():
return p
except Exception:
logger.debug("Failed to read global last workspace")
return _profile_default_workspace()
def set_last_workspace(path: str) -> None:
try:
lw_file = _last_workspace_file()
lw_file.parent.mkdir(parents=True, exist_ok=True)
lw_file.write_text(str(path), encoding='utf-8')
except Exception:
logger.debug("Failed to set last workspace")
def _safe_resolve(p: Path) -> Path:
"""Path.resolve() that never raises — falls back to the input path on error."""
try:
return p.resolve()
except (OSError, RuntimeError):
return p
# Per-user temp directories that sit nominally under a "system" prefix but are
# actually user-writable scratch space. Workspaces registered here (e.g. by
# pytest's ``tmp_path_factory`` on macOS, which uses ``/var/folders/<hash>/T/``)
# must remain accepted even though their parent (``/var``) is blocked. These
# carve-outs apply to BOTH workspace registration and runtime file ops so a
# symlink target inside the carve-out is also reachable.
_USER_TMP_PREFIXES: tuple[Path, ...] = (
Path('/var/folders'), # macOS per-user tmp (literal form)
Path('/private/var/folders'), # macOS per-user tmp (resolved form)
Path('/var/tmp'), # Linux/macOS system-wide tmp (user-writable)
Path('/private/var/tmp'), # macOS resolved form
)
def _workspace_blocked_roots() -> tuple[Path, ...]:
"""System roots that must never be accepted as workspace candidates.
Returns both the literal path and its symlink-resolved canonical form,
deduped. This matters on macOS where ``/etc``, ``/var``, and ``/tmp``
are symlinks to ``/private/etc`` etc. Without the resolved forms,
callers that pass a ``.resolve()``-d candidate (every caller does)
would compare ``/private/etc`` against literal ``Path('/etc')`` and the
``relative_to`` check would miss — letting ``/etc`` through as a
registered workspace on macOS.
Carve-outs for legitimate user-tmp paths nominally under these roots
(e.g. ``/var/folders/.../T/`` on macOS) are handled by
:func:`_is_blocked_system_path`, not by exclusion from this list.
"""
_raw = (
# Linux / macOS
'/etc',
'/usr',
'/var',
'/bin',
'/sbin',
'/boot',
'/proc',
'/sys',
'/dev',
'/lib',
'/lib64',
'/opt/homebrew',
'/System',
'/Library',
)
_seen: set[Path] = set()
_out: list[Path] = []
for _p in _raw:
for _form in (Path(_p), _safe_resolve(Path(_p))):
if _form not in _seen:
_seen.add(_form)
_out.append(_form)
return tuple(_out)
def _is_blocked_system_path(candidate: Path) -> bool:
"""Return True if *candidate* falls under a blocked system root.
Honours :data:`_USER_TMP_PREFIXES` carve-outs so per-user tmp directories
nominally under ``/var`` (``/var/folders`` on macOS, ``/var/tmp`` on
Linux/macOS) remain valid workspace candidates and reachable file targets.
"""
for tmp in _USER_TMP_PREFIXES:
if _is_within(candidate, tmp):
return False
for blocked in _workspace_blocked_roots():
if _is_within(candidate, blocked):
return True
return False
def _workspace_blocked_resolved_subtrees() -> tuple[Path, ...]:
roots = list(_workspace_blocked_roots()) + [Path('/private/etc')]
resolved: list[Path] = []
for root in roots:
try:
p = root.expanduser().resolve()
except Exception:
p = root
if p not in resolved:
resolved.append(p)
return tuple(resolved)
def _workspace_blocked_exact_roots() -> tuple[Path, ...]:
roots = [Path('/'), Path('/private/var')]
for root in _workspace_blocked_roots():
try:
roots.append(root.expanduser().resolve())
except Exception:
roots.append(root)
unique: list[Path] = []
for root in roots:
if root not in unique:
unique.append(root)
return tuple(unique)
def _is_blocked_workspace_path(candidate: Path, raw_path: str | Path | None = None) -> bool:
"""Return True when candidate points at a known OS/system directory.
Compare both the original spelling and the resolved path. This closes the
macOS /etc -> /private/etc bypass without globally banning temporary pytest
paths under /private/var/folders.
"""
raw = None
if raw_path not in (None, ""):
try:
raw = Path(raw_path).expanduser()
except Exception:
raw = None
exact = _workspace_blocked_exact_roots()
if candidate in exact or (raw is not None and raw in _workspace_blocked_roots()):
return True
for tmp in _USER_TMP_PREFIXES:
if _is_within(candidate, tmp) or (raw is not None and _is_within(raw, tmp)):
return False
# Raw paths under literal roots (e.g. /etc/ssh, /var/db) are always blocked.
if raw is not None:
for blocked in _workspace_blocked_roots():
if _is_within(raw, blocked):
return True
# Resolved subtree checks catch symlink aliases such as /private/etc. The
# macOS temp root /private/var/folders is intentionally allowed for pytest
# and per-user temporary workspaces; other direct /private/var system data
# such as /private/var/db and /private/var/log remains blocked.
allowed_private_var = (Path('/private/var/folders'), Path('/private/var/tmp'))
for blocked in _workspace_blocked_resolved_subtrees():
if blocked == Path('/private/var'):
if candidate == blocked:
return True
if any(_is_within(candidate, allowed) for allowed in allowed_private_var):
continue
if _is_within(candidate, blocked):
return True
continue
if _is_within(candidate, blocked):
return True
return False
def _is_within(path: Path, root: Path) -> bool:
try:
path.relative_to(root)
return True
except ValueError:
return False
def _trusted_workspace_roots() -> list[Path]:
roots: list[Path] = []
def add(candidate: str | Path | None) -> None:
if candidate in (None, ""):
return
try:
p = Path(candidate).expanduser().resolve()
except Exception:
return
if not p.exists() or not p.is_dir():
return
if _is_blocked_workspace_path(p, candidate):
return
if p not in roots:
roots.append(p)
add(Path.home())
add(_BOOT_DEFAULT_WORKSPACE)
for w in load_workspaces():
add(w.get("path"))
roots.sort(key=lambda p: len(str(p)))
return roots
def list_workspace_suggestions(prefix: str = "", limit: int = 12) -> list[str]:
"""Return workspace path suggestions under trusted roots only.
Suggestions are limited to directories under one of:
- Path.home()
- the boot default workspace
- already-saved workspace roots
Arbitrary system prefixes return an empty list rather than an error so the
UI can safely autocomplete while the user types.
"""
roots = _trusted_workspace_roots()
if not roots:
return []
raw = (prefix or "").strip()
if not raw:
return [str(p) for p in roots[:limit]]
if raw.startswith("~"):
target = Path(raw).expanduser()
elif Path(raw).is_absolute():
target = Path(raw)
else:
target = Path.home() / raw
normalized = str(target)
normalized_lower = normalized.lower()
suggestions: list[str] = []
def add(path: Path) -> None:
value = str(path)
if value not in suggestions:
suggestions.append(value)
# If the user is typing a partial trusted root like /Users/xuef..., suggest
# the matching trusted roots without scanning arbitrary system parents.
for root in roots:
if str(root).lower().startswith(normalized_lower):
add(root)
in_root = [
root
for root in roots
if normalized == str(root) or normalized.startswith(str(root) + os.sep)
]
if not in_root:
return suggestions[:limit]
anchor_root = max(in_root, key=lambda p: len(str(p)))
ends_with_sep = raw.endswith(os.sep) or raw.endswith('/')
parent = target if ends_with_sep else target.parent
leaf = '' if ends_with_sep else target.name
show_hidden = leaf.startswith('.')
try:
parent_resolved = parent.expanduser().resolve()
except Exception:
return suggestions[:limit]
if not parent_resolved.exists() or not parent_resolved.is_dir():
return suggestions[:limit]
if not _is_within(parent_resolved, anchor_root):
return suggestions[:limit]
leaf_lower = leaf.lower()
try:
children = sorted(parent_resolved.iterdir(), key=lambda p: p.name.lower())
except OSError:
return suggestions[:limit]
for child in children:
if not child.is_dir():
continue
if child.name.startswith('.') and not show_hidden:
continue
if leaf_lower and not child.name.lower().startswith(leaf_lower):
continue
add(child.resolve())
if len(suggestions) >= limit:
break
return suggestions[:limit]
def resolve_trusted_workspace(path: str | Path | None = None) -> Path:
"""Resolve and validate a workspace path.
A path is trusted if it satisfies at least one of:
(A) It is under the user's home directory (Path.home()).
Works cross-platform: ~/... on Linux/macOS, C:\\Users\\... on Windows.
(B) It is already in the profile's saved workspace list.
This covers self-hosted deployments where workspaces live outside home
(e.g. /data/projects, /opt/workspace) — once a workspace is saved by
an admin, it can be reused without re-validation.
Additionally enforced regardless of (A)/(B):
1. The path must exist.
2. The path must be a directory.
3. The path must not be a known system root (/etc, /usr, /var, /bin, /sbin,
/boot, /proc, /sys, /dev, /root on Linux/macOS; Windows system dirs).
This prevents even admin-saved workspaces from pointing at OS internals.
None/empty path falls back to the boot-time DEFAULT_WORKSPACE, which is always
trusted (it was validated at server startup).
"""
if path in (None, ""):
return Path(_BOOT_DEFAULT_WORKSPACE).expanduser().resolve()
candidate = Path(path).expanduser().resolve()
access_error = _workspace_access_error(candidate)
if access_error:
raise ValueError(access_error)
# (A) Trusted if under the user's home directory — cross-platform via Path.home()
# Must be checked before system roots to allow symlinks like /var/home.
_home = Path.home().resolve()
if _home != Path("/"):
try:
candidate.relative_to(_home)
return candidate
except ValueError:
pass
# Block known system roots and their children.
if _is_blocked_workspace_path(candidate, path):
raise ValueError(f"Path points to a system directory: {candidate}")
# (B) Trusted if already in the saved workspace list — covers non-home installs
try:
saved = load_workspaces()
saved_paths = {Path(w["path"]).resolve() for w in saved if w.get("path")}
if candidate in saved_paths:
return candidate
except Exception:
pass
# (C) Trusted if it is equal to or under the boot-time DEFAULT_WORKSPACE.
# In Docker deployments HERMES_WEBUI_DEFAULT_WORKSPACE is often set to a
# volume mount outside the user's home (e.g. /data/workspace). That path
# was already validated at server startup, so any sub-path of it is safe
# without requiring the user to add it to the workspace list manually.
try:
boot_default = Path(_BOOT_DEFAULT_WORKSPACE).expanduser().resolve()
candidate.relative_to(boot_default)
return candidate
except ValueError:
pass
raise ValueError(
f"Path is outside the user home directory, not in the saved workspace "
f"list, and not under the default workspace: {candidate}. "
f"Add it via Settings → Workspaces first."
)
def _strip_surrounding_quotes(path: str) -> str:
"""Strip a single pair of surrounding single or double quotes from a path string.
macOS Finder's "Copy as Pathname" (Cmd+Option+C) returns paths wrapped in
single quotes, e.g. ``'/Users/x/Documents/foo'``. Other shells and OS file
managers do similar things with double quotes. Users routinely paste these
quoted strings into the Add Space input expecting them to "just work"
the only reason they didn't was a missing strip.
Only paired quotes are stripped (matching opener and closer). One-sided quotes
are preserved on the slim chance a path legitimately contains a literal quote
character.
"""
s = path.strip()
if len(s) >= 2 and s[0] == s[-1] and s[0] in ("'", '"'):
return s[1:-1]
return s
def validate_workspace_to_add(path: str) -> Path:
"""Validate a path for *adding* to the workspace list (less restrictive than resolve_trusted_workspace).
When a user explicitly adds a new workspace path, we trust their intent — they
have console or filesystem access to that path and are consciously registering it.
We only block: non-existent paths, non-directories, and known system roots.
The stricter ``resolve_trusted_workspace`` is used when *using* an existing workspace
(file reads/writes) to prevent path traversal after the list is built.
Surrounding quotes (single or double) are stripped before validation —
macOS Finder's "Copy as Pathname" wraps paths in single quotes by default,
and users routinely paste those into the Add Space input.
"""
path = _strip_surrounding_quotes(path)
candidate = Path(path).expanduser().resolve()
access_error = _workspace_access_error(candidate)
if access_error:
raise ValueError(access_error)
# Home directory is always trusted regardless of where it lives on disk
# (e.g. /var/home/... on systemd-homed Fedora/RHEL).
_home = Path.home().resolve()
if _home != Path("/") and _is_within(candidate, _home):
return candidate
# Block known system roots and their immediate children.
if _is_blocked_workspace_path(candidate, path):
raise ValueError(f"Path points to a system directory: {candidate}")
return candidate
def safe_resolve_ws(root: Path, requested: str) -> Path:
"""Resolve a relative path inside a workspace root, raising ValueError on traversal.
Symlinks whose *unresolved* path is within the workspace root are allowed —
the user placed them there intentionally. Only raw ``..`` traversal outside
the root is blocked.
"""
import os
unresolved = root / requested
resolved = unresolved.resolve()
# Fast path: resolved path is inside root (covers most cases)
try:
resolved.relative_to(root.resolve())
return resolved
except ValueError:
pass
# Symlink path: normalize '..' (without following symlinks) and check
# os.path.normpath collapses '..' but does NOT follow symlinks.
norm = Path(os.path.normpath(str(unresolved)))
try:
norm.relative_to(root)
except ValueError:
raise ValueError(f"Path traversal blocked: {requested}")
# Symlink points outside workspace root — additionally block system directories.
# Even if the user placed the symlink intentionally, prevent reads from
# /etc, /proc, /sys, /dev and other blocked roots (LLM agents can call
# read_file_content via tool calls, not just human users).
if _is_blocked_system_path(resolved):
raise ValueError(f"Path traversal blocked (system dir): {requested}")
return resolved
def list_dir(workspace: Path, rel: str='.'):
target = safe_resolve_ws(workspace, rel)
if not target.is_dir():
raise FileNotFoundError(f"Not a directory: {rel}")
ws_resolved = workspace.resolve()
entries = []
for item in sorted(target.iterdir(), key=lambda p: (not p.is_symlink(), p.is_file(), p.name.lower())):
if item.is_symlink():
# Resolve the symlink target and check if it stays within workspace
try:
link_target = item.resolve()
except OSError:
continue
# Cycle detection: skip if symlink points back to current dir,
# workspace root, or any ancestor of current dir.
# This must run REGARDLESS of whether target is inside workspace.
if (link_target == target.resolve() or link_target == target
or link_target == ws_resolved):
continue
try:
target.resolve().relative_to(link_target)
# target is under link_target — link_target is an ancestor → cycle
continue
except ValueError:
pass
# Block symlinks that resolve to system directories.
if _is_blocked_system_path(link_target):
continue
is_dir = link_target.is_dir()
# Keep the display path relative to workspace (don't follow the link)
display_path = str(Path(item.name))
if rel and rel != '.':
display_path = rel + '/' + display_path
entry = {
'name': item.name,
'path': display_path,
'type': 'symlink',
'target': str(link_target),
'is_dir': is_dir,
}
if not is_dir:
try:
entry['size'] = link_target.stat().st_size
except OSError:
entry['size'] = None
entries.append(entry)
else:
# Use rel-based path so entries under symlink targets (outside
# the workspace root) still get a valid workspace-relative path.
entry_path = item.name
if rel and rel != '.':
entry_path = rel + '/' + item.name
entries.append({
'name': item.name,
'path': entry_path,
'type': 'dir' if item.is_dir() else 'file',
'size': item.stat().st_size if item.is_file() else None,
})
if len(entries) >= 200:
break
return entries
def read_file_content(workspace: Path, rel: str) -> dict:
target = safe_resolve_ws(workspace, rel)
if not target.is_file():
raise FileNotFoundError(f"Not a file: {rel}")
size = target.stat().st_size
if size > MAX_FILE_BYTES:
raise ValueError(f"File too large ({size} bytes, max {MAX_FILE_BYTES})")
content = target.read_text(encoding='utf-8', errors='replace')
return {'path': rel, 'content': content, 'size': size, 'lines': content.count('\n') + 1}
# ── Git detection ──────────────────────────────────────────────────────────
def _run_git(args, cwd, timeout=3):
"""Run a git command and return stdout, or None on failure."""
try:
r = subprocess.run(
['git'] + args, cwd=str(cwd), capture_output=True,
text=True, timeout=timeout,
)
return r.stdout.strip() if r.returncode == 0 else None
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
return None
def git_info_for_workspace(workspace: Path) -> dict:
"""Return git info for a workspace directory, or None if not a git repo."""
if not (workspace / '.git').exists():
return None
branch = _run_git(['rev-parse', '--abbrev-ref', 'HEAD'], workspace)
if branch is None:
return None
# Run the remaining git commands in parallel via threads — they are
# independent subprocess calls and together can take 50-200ms when run
# serially. Threading is safe here because each call blocks only on the
# subprocess pipe, not on the GIL.
def _ahead():
r = _run_git(['rev-list', '--count', '@{u}..HEAD'], workspace)
return int(r) if r and r.isdigit() else 0
def _behind():
r = _run_git(['rev-list', '--count', 'HEAD..@{u}'], workspace)
return int(r) if r and r.isdigit() else 0
def _status():
out = _run_git(['status', '--porcelain'], workspace) or ''
lines = [l for l in out.splitlines() if l]
modified = sum(1 for l in lines if len(l) >= 2 and (l[0] in 'MAR' or l[1] in 'MAR'))
untracked = sum(1 for l in lines if l.startswith('??'))
return len(lines), modified, untracked
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as pool:
f_status = pool.submit(_status)
f_ahead = pool.submit(_ahead)
f_behind = pool.submit(_behind)
dirty, modified, untracked = f_status.result()
ahead = f_ahead.result()
behind = f_behind.result()
return {
'branch': branch,
'dirty': dirty,
'modified': modified,
'untracked': untracked,
'ahead': ahead,
'behind': behind,
'is_git': True,
}