mirror of
https://github.com/nesquena/hermes-webui.git
synced 2026-05-25 11:10:18 +00:00
ff0d25fd0e
macOS Finder's 'Copy as Pathname' (Cmd+Option+C) wraps paths in single
quotes by default — '/Users/x/Documents/foo' — and users routinely paste
those quoted strings into the Add Space input expecting them to work.
Other shells and OS file managers do similar things with double quotes.
Today the path is taken via .strip() only, so the literal quote
characters become part of the resolved Path and the validator rejects
the result as 'not a directory'. cygnus reported this on Discord
(2026-05-01) — she had to manually un-quote her paths to register a
new Space.
Fix:
- New api.workspace._strip_surrounding_quotes() helper. Removes only
the outermost paired single or double quotes; preserves unpaired or
mismatched quotes (a path may legitimately contain a literal quote).
- validate_workspace_to_add() calls it before resolution so every
code path that registers a workspace benefits, not just the HTTP
route.
- _handle_workspace_add() also calls it at the route entry so the
blocked-system-path check and the duplicate-detection check both
see the cleaned form.
14 regression tests pin the behavior matrix:
- Unwrapped path unchanged
- Single quotes stripped
- Double quotes stripped
- Whitespace outside quotes handled (trim-then-strip)
- Only outermost pair removed (internal quotes preserved)
- Unpaired / mismatched quotes preserved
- Empty string + just-a-pair edge cases
- Validate_workspace_to_add accepts quoted form for existing dir
4610 tests pass (+14 from this PR), 0 regressions, ~2:27 full suite.
Reported by Cygnus on Discord, May 1 2026.
781 lines
30 KiB
Python
781 lines
30 KiB
Python
"""
|
|
Hermes Web UI -- Workspace and file system helpers.
|
|
|
|
Workspace lists and last-used workspace are stored per-profile so each
|
|
profile has its own workspace configuration. State files live at
|
|
``{profile_home}/webui_state/workspaces.json`` and
|
|
``{profile_home}/webui_state/last_workspace.txt``. The global STATE_DIR
|
|
paths are used as fallback when no profile module is available.
|
|
"""
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import concurrent.futures
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
from api.config import (
|
|
WORKSPACES_FILE as _GLOBAL_WS_FILE,
|
|
LAST_WORKSPACE_FILE as _GLOBAL_LW_FILE,
|
|
DEFAULT_WORKSPACE as _BOOT_DEFAULT_WORKSPACE,
|
|
MAX_FILE_BYTES, IMAGE_EXTS, MD_EXTS
|
|
)
|
|
|
|
|
|
# ── Profile-aware path resolution ───────────────────────────────────────────
|
|
|
|
def _profile_state_dir() -> Path:
|
|
"""Return the webui_state directory for the active profile.
|
|
|
|
For the default profile, returns the global STATE_DIR (respects
|
|
HERMES_WEBUI_STATE_DIR env var for test isolation).
|
|
For named profiles, returns {profile_home}/webui_state/.
|
|
"""
|
|
try:
|
|
from api.profiles import get_active_profile_name, get_active_hermes_home
|
|
name = get_active_profile_name()
|
|
if name and name != 'default':
|
|
d = get_active_hermes_home() / 'webui_state'
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
return d
|
|
except ImportError:
|
|
logger.debug("Failed to import profiles module, using global state dir")
|
|
return _GLOBAL_WS_FILE.parent
|
|
|
|
|
|
def _workspaces_file() -> Path:
|
|
"""Return the workspaces.json path for the active profile."""
|
|
return _profile_state_dir() / 'workspaces.json'
|
|
|
|
|
|
def _last_workspace_file() -> Path:
|
|
"""Return the last_workspace.txt path for the active profile."""
|
|
return _profile_state_dir() / 'last_workspace.txt'
|
|
|
|
|
|
def _profile_default_workspace() -> str:
|
|
"""Read the profile's default workspace from its config.yaml.
|
|
|
|
Checks keys in priority order:
|
|
1. 'workspace' — explicit webui workspace key
|
|
2. 'default_workspace' — alternate explicit key
|
|
3. 'terminal.cwd' — hermes-agent terminal working dir (most common)
|
|
|
|
Falls back to the boot-time DEFAULT_WORKSPACE constant.
|
|
"""
|
|
try:
|
|
from api.config import get_config
|
|
cfg = get_config()
|
|
# Explicit webui workspace keys first
|
|
for key in ('workspace', 'default_workspace'):
|
|
ws = cfg.get(key)
|
|
if ws:
|
|
p = Path(str(ws)).expanduser().resolve()
|
|
if p.is_dir():
|
|
return str(p)
|
|
# Fall through to terminal.cwd — the agent's configured working directory
|
|
terminal_cfg = cfg.get('terminal', {})
|
|
if isinstance(terminal_cfg, dict):
|
|
cwd = terminal_cfg.get('cwd', '')
|
|
if cwd and str(cwd) not in ('.', ''):
|
|
p = Path(str(cwd)).expanduser().resolve()
|
|
if p.is_dir():
|
|
return str(p)
|
|
except (ImportError, Exception):
|
|
logger.debug("Failed to load profile default workspace config")
|
|
return str(_BOOT_DEFAULT_WORKSPACE)
|
|
|
|
|
|
# ── Public API ──────────────────────────────────────────────────────────────
|
|
|
|
def _clean_workspace_list(workspaces: list) -> list:
|
|
"""Sanitize a workspace list:
|
|
- Remove entries whose paths no longer exist on disk.
|
|
- Remove entries whose paths live inside another profile's directory
|
|
(e.g. ~/.hermes/profiles/X/... should not appear on a different profile).
|
|
- Rename any entry whose name is literally 'default' to 'Home' (avoids
|
|
confusion with the 'default' profile name).
|
|
Returns the cleaned list (may be empty).
|
|
"""
|
|
hermes_profiles = (Path.home() / '.hermes' / 'profiles').resolve()
|
|
result = []
|
|
for w in workspaces:
|
|
path = w.get('path', '')
|
|
name = w.get('name', '')
|
|
p = Path(path).resolve() if path else Path('/')
|
|
# Skip paths that no longer exist
|
|
if not p.is_dir():
|
|
continue
|
|
# Skip paths inside a DIFFERENT profile's directory (cross-profile leak).
|
|
# Allow paths inside the CURRENT profile's own directory (e.g. test workspaces
|
|
# created under ~/.hermes/profiles/webui/webui-mvp-test/).
|
|
try:
|
|
p.relative_to(hermes_profiles)
|
|
# p is under ~/.hermes/profiles/ — only skip if it's under a DIFFERENT profile
|
|
try:
|
|
from api.profiles import get_active_hermes_home
|
|
own_profile_dir = get_active_hermes_home().resolve()
|
|
p.relative_to(own_profile_dir)
|
|
# p is under our own profile dir — keep it
|
|
except (ValueError, Exception):
|
|
continue # under profiles/ but not our own — cross-profile leak, skip
|
|
except ValueError:
|
|
pass # not under profiles/ at all — keep it
|
|
# Rename confusing 'default' label to 'Home'
|
|
if name.lower() == 'default':
|
|
name = 'Home'
|
|
result.append({'path': str(p), 'name': name})
|
|
return result
|
|
|
|
|
|
def _migrate_global_workspaces() -> list:
|
|
"""Read the legacy global workspaces.json, clean it, and return the result.
|
|
|
|
This is the migration path for users upgrading from a pre-profile version:
|
|
their global file may contain cross-profile entries, test artifacts, and
|
|
stale paths accumulated over time. We clean it in-place and rewrite it.
|
|
"""
|
|
if not _GLOBAL_WS_FILE.exists():
|
|
return []
|
|
try:
|
|
raw = json.loads(_GLOBAL_WS_FILE.read_text(encoding='utf-8'))
|
|
cleaned = _clean_workspace_list(raw)
|
|
if len(cleaned) != len(raw):
|
|
# Rewrite the cleaned version so future reads are already clean
|
|
_GLOBAL_WS_FILE.write_text(
|
|
json.dumps(cleaned, ensure_ascii=False, indent=2), encoding='utf-8'
|
|
)
|
|
return cleaned
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def load_workspaces() -> list:
|
|
ws_file = _workspaces_file()
|
|
if ws_file.exists():
|
|
try:
|
|
raw = json.loads(ws_file.read_text(encoding='utf-8'))
|
|
cleaned = _clean_workspace_list(raw)
|
|
if len(cleaned) != len(raw):
|
|
# Persist the cleaned version so stale entries don't keep reappearing
|
|
try:
|
|
ws_file.write_text(
|
|
json.dumps(cleaned, ensure_ascii=False, indent=2), encoding='utf-8'
|
|
)
|
|
except Exception:
|
|
logger.debug("Failed to persist cleaned workspace list")
|
|
return cleaned or [{'path': _profile_default_workspace(), 'name': 'Home'}]
|
|
except Exception:
|
|
logger.debug("Failed to load workspaces from %s", ws_file)
|
|
# No profile-local file yet.
|
|
# For the DEFAULT profile: migrate from the legacy global file (one-time cleanup).
|
|
# For NAMED profiles: always start clean with just their own workspace.
|
|
try:
|
|
from api.profiles import get_active_profile_name
|
|
is_default = get_active_profile_name() in ('default', None)
|
|
except ImportError:
|
|
is_default = True
|
|
if is_default:
|
|
migrated = _migrate_global_workspaces()
|
|
if migrated:
|
|
return migrated
|
|
# Fresh start: single entry from the profile's configured workspace, labeled "Home"
|
|
return [{'path': _profile_default_workspace(), 'name': 'Home'}]
|
|
|
|
|
|
def save_workspaces(workspaces: list) -> None:
|
|
ws_file = _workspaces_file()
|
|
ws_file.parent.mkdir(parents=True, exist_ok=True)
|
|
ws_file.write_text(json.dumps(workspaces, ensure_ascii=False, indent=2), encoding='utf-8')
|
|
|
|
|
|
def get_last_workspace() -> str:
|
|
lw_file = _last_workspace_file()
|
|
if lw_file.exists():
|
|
try:
|
|
p = lw_file.read_text(encoding='utf-8').strip()
|
|
if p and Path(p).is_dir():
|
|
return p
|
|
except Exception:
|
|
logger.debug("Failed to read last workspace from %s", lw_file)
|
|
# Fallback: try global file
|
|
if _GLOBAL_LW_FILE.exists():
|
|
try:
|
|
p = _GLOBAL_LW_FILE.read_text(encoding='utf-8').strip()
|
|
if p and Path(p).is_dir():
|
|
return p
|
|
except Exception:
|
|
logger.debug("Failed to read global last workspace")
|
|
return _profile_default_workspace()
|
|
|
|
|
|
def set_last_workspace(path: str) -> None:
|
|
try:
|
|
lw_file = _last_workspace_file()
|
|
lw_file.parent.mkdir(parents=True, exist_ok=True)
|
|
lw_file.write_text(str(path), encoding='utf-8')
|
|
except Exception:
|
|
logger.debug("Failed to set last workspace")
|
|
|
|
|
|
def _safe_resolve(p: Path) -> Path:
|
|
"""Path.resolve() that never raises — falls back to the input path on error."""
|
|
try:
|
|
return p.resolve()
|
|
except (OSError, RuntimeError):
|
|
return p
|
|
|
|
|
|
# Per-user temp directories that sit nominally under a "system" prefix but are
|
|
# actually user-writable scratch space. Workspaces registered here (e.g. by
|
|
# pytest's ``tmp_path_factory`` on macOS, which uses ``/var/folders/<hash>/T/``)
|
|
# must remain accepted even though their parent (``/var``) is blocked. These
|
|
# carve-outs apply to BOTH workspace registration and runtime file ops so a
|
|
# symlink target inside the carve-out is also reachable.
|
|
_USER_TMP_PREFIXES: tuple[Path, ...] = (
|
|
Path('/var/folders'), # macOS per-user tmp (literal form)
|
|
Path('/private/var/folders'), # macOS per-user tmp (resolved form)
|
|
Path('/var/tmp'), # Linux/macOS system-wide tmp (user-writable)
|
|
Path('/private/var/tmp'), # macOS resolved form
|
|
)
|
|
|
|
|
|
def _workspace_blocked_roots() -> tuple[Path, ...]:
|
|
"""System roots that must never be accepted as workspace candidates.
|
|
|
|
Returns both the literal path and its symlink-resolved canonical form,
|
|
deduped. This matters on macOS where ``/etc``, ``/var``, and ``/tmp``
|
|
are symlinks to ``/private/etc`` etc. Without the resolved forms,
|
|
callers that pass a ``.resolve()``-d candidate (every caller does)
|
|
would compare ``/private/etc`` against literal ``Path('/etc')`` and the
|
|
``relative_to`` check would miss — letting ``/etc`` through as a
|
|
registered workspace on macOS.
|
|
|
|
Carve-outs for legitimate user-tmp paths nominally under these roots
|
|
(e.g. ``/var/folders/.../T/`` on macOS) are handled by
|
|
:func:`_is_blocked_system_path`, not by exclusion from this list.
|
|
"""
|
|
_raw = (
|
|
# Linux / macOS
|
|
'/etc',
|
|
'/usr',
|
|
'/var',
|
|
'/bin',
|
|
'/sbin',
|
|
'/boot',
|
|
'/proc',
|
|
'/sys',
|
|
'/dev',
|
|
'/lib',
|
|
'/lib64',
|
|
'/opt/homebrew',
|
|
'/System',
|
|
'/Library',
|
|
)
|
|
_seen: set[Path] = set()
|
|
_out: list[Path] = []
|
|
for _p in _raw:
|
|
for _form in (Path(_p), _safe_resolve(Path(_p))):
|
|
if _form not in _seen:
|
|
_seen.add(_form)
|
|
_out.append(_form)
|
|
return tuple(_out)
|
|
|
|
|
|
def _is_blocked_system_path(candidate: Path) -> bool:
|
|
"""Return True if *candidate* falls under a blocked system root.
|
|
|
|
Honours :data:`_USER_TMP_PREFIXES` carve-outs so per-user tmp directories
|
|
nominally under ``/var`` (``/var/folders`` on macOS, ``/var/tmp`` on
|
|
Linux/macOS) remain valid workspace candidates and reachable file targets.
|
|
"""
|
|
for tmp in _USER_TMP_PREFIXES:
|
|
if _is_within(candidate, tmp):
|
|
return False
|
|
for blocked in _workspace_blocked_roots():
|
|
if _is_within(candidate, blocked):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _workspace_blocked_resolved_subtrees() -> tuple[Path, ...]:
|
|
roots = list(_workspace_blocked_roots()) + [Path('/private/etc')]
|
|
resolved: list[Path] = []
|
|
for root in roots:
|
|
try:
|
|
p = root.expanduser().resolve()
|
|
except Exception:
|
|
p = root
|
|
if p not in resolved:
|
|
resolved.append(p)
|
|
return tuple(resolved)
|
|
|
|
|
|
def _workspace_blocked_exact_roots() -> tuple[Path, ...]:
|
|
roots = [Path('/'), Path('/private/var')]
|
|
for root in _workspace_blocked_roots():
|
|
try:
|
|
roots.append(root.expanduser().resolve())
|
|
except Exception:
|
|
roots.append(root)
|
|
unique: list[Path] = []
|
|
for root in roots:
|
|
if root not in unique:
|
|
unique.append(root)
|
|
return tuple(unique)
|
|
|
|
|
|
def _is_blocked_workspace_path(candidate: Path, raw_path: str | Path | None = None) -> bool:
|
|
"""Return True when candidate points at a known OS/system directory.
|
|
|
|
Compare both the original spelling and the resolved path. This closes the
|
|
macOS /etc -> /private/etc bypass without globally banning temporary pytest
|
|
paths under /private/var/folders.
|
|
"""
|
|
raw = None
|
|
if raw_path not in (None, ""):
|
|
try:
|
|
raw = Path(raw_path).expanduser()
|
|
except Exception:
|
|
raw = None
|
|
|
|
exact = _workspace_blocked_exact_roots()
|
|
if candidate in exact or (raw is not None and raw in _workspace_blocked_roots()):
|
|
return True
|
|
|
|
for tmp in _USER_TMP_PREFIXES:
|
|
if _is_within(candidate, tmp) or (raw is not None and _is_within(raw, tmp)):
|
|
return False
|
|
|
|
# Raw paths under literal roots (e.g. /etc/ssh, /var/db) are always blocked.
|
|
if raw is not None:
|
|
for blocked in _workspace_blocked_roots():
|
|
if _is_within(raw, blocked):
|
|
return True
|
|
|
|
# Resolved subtree checks catch symlink aliases such as /private/etc. The
|
|
# macOS temp root /private/var/folders is intentionally allowed for pytest
|
|
# and per-user temporary workspaces; other direct /private/var system data
|
|
# such as /private/var/db and /private/var/log remains blocked.
|
|
allowed_private_var = (Path('/private/var/folders'), Path('/private/var/tmp'))
|
|
for blocked in _workspace_blocked_resolved_subtrees():
|
|
if blocked == Path('/private/var'):
|
|
if candidate == blocked:
|
|
return True
|
|
if any(_is_within(candidate, allowed) for allowed in allowed_private_var):
|
|
continue
|
|
if _is_within(candidate, blocked):
|
|
return True
|
|
continue
|
|
if _is_within(candidate, blocked):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _is_within(path: Path, root: Path) -> bool:
|
|
try:
|
|
path.relative_to(root)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def _trusted_workspace_roots() -> list[Path]:
|
|
roots: list[Path] = []
|
|
|
|
def add(candidate: str | Path | None) -> None:
|
|
if candidate in (None, ""):
|
|
return
|
|
try:
|
|
p = Path(candidate).expanduser().resolve()
|
|
except Exception:
|
|
return
|
|
if not p.exists() or not p.is_dir():
|
|
return
|
|
if _is_blocked_workspace_path(p, candidate):
|
|
return
|
|
if p not in roots:
|
|
roots.append(p)
|
|
|
|
add(Path.home())
|
|
add(_BOOT_DEFAULT_WORKSPACE)
|
|
for w in load_workspaces():
|
|
add(w.get("path"))
|
|
roots.sort(key=lambda p: len(str(p)))
|
|
return roots
|
|
|
|
|
|
def list_workspace_suggestions(prefix: str = "", limit: int = 12) -> list[str]:
|
|
"""Return workspace path suggestions under trusted roots only.
|
|
|
|
Suggestions are limited to directories under one of:
|
|
- Path.home()
|
|
- the boot default workspace
|
|
- already-saved workspace roots
|
|
|
|
Arbitrary system prefixes return an empty list rather than an error so the
|
|
UI can safely autocomplete while the user types.
|
|
"""
|
|
roots = _trusted_workspace_roots()
|
|
if not roots:
|
|
return []
|
|
|
|
raw = (prefix or "").strip()
|
|
if not raw:
|
|
return [str(p) for p in roots[:limit]]
|
|
|
|
if raw.startswith("~"):
|
|
target = Path(raw).expanduser()
|
|
elif Path(raw).is_absolute():
|
|
target = Path(raw)
|
|
else:
|
|
target = Path.home() / raw
|
|
|
|
normalized = str(target)
|
|
normalized_lower = normalized.lower()
|
|
suggestions: list[str] = []
|
|
|
|
def add(path: Path) -> None:
|
|
value = str(path)
|
|
if value not in suggestions:
|
|
suggestions.append(value)
|
|
|
|
# If the user is typing a partial trusted root like /Users/xuef..., suggest
|
|
# the matching trusted roots without scanning arbitrary system parents.
|
|
for root in roots:
|
|
if str(root).lower().startswith(normalized_lower):
|
|
add(root)
|
|
|
|
in_root = [
|
|
root
|
|
for root in roots
|
|
if normalized == str(root) or normalized.startswith(str(root) + os.sep)
|
|
]
|
|
if not in_root:
|
|
return suggestions[:limit]
|
|
|
|
anchor_root = max(in_root, key=lambda p: len(str(p)))
|
|
ends_with_sep = raw.endswith(os.sep) or raw.endswith('/')
|
|
parent = target if ends_with_sep else target.parent
|
|
leaf = '' if ends_with_sep else target.name
|
|
show_hidden = leaf.startswith('.')
|
|
|
|
try:
|
|
parent_resolved = parent.expanduser().resolve()
|
|
except Exception:
|
|
return suggestions[:limit]
|
|
|
|
if not parent_resolved.exists() or not parent_resolved.is_dir():
|
|
return suggestions[:limit]
|
|
if not _is_within(parent_resolved, anchor_root):
|
|
return suggestions[:limit]
|
|
|
|
leaf_lower = leaf.lower()
|
|
try:
|
|
children = sorted(parent_resolved.iterdir(), key=lambda p: p.name.lower())
|
|
except OSError:
|
|
return suggestions[:limit]
|
|
|
|
for child in children:
|
|
if not child.is_dir():
|
|
continue
|
|
if child.name.startswith('.') and not show_hidden:
|
|
continue
|
|
if leaf_lower and not child.name.lower().startswith(leaf_lower):
|
|
continue
|
|
add(child.resolve())
|
|
if len(suggestions) >= limit:
|
|
break
|
|
return suggestions[:limit]
|
|
|
|
|
|
def resolve_trusted_workspace(path: str | Path | None = None) -> Path:
|
|
"""Resolve and validate a workspace path.
|
|
|
|
A path is trusted if it satisfies at least one of:
|
|
(A) It is under the user's home directory (Path.home()).
|
|
Works cross-platform: ~/... on Linux/macOS, C:\\Users\\... on Windows.
|
|
(B) It is already in the profile's saved workspace list.
|
|
This covers self-hosted deployments where workspaces live outside home
|
|
(e.g. /data/projects, /opt/workspace) — once a workspace is saved by
|
|
an admin, it can be reused without re-validation.
|
|
|
|
Additionally enforced regardless of (A)/(B):
|
|
1. The path must exist.
|
|
2. The path must be a directory.
|
|
3. The path must not be a known system root (/etc, /usr, /var, /bin, /sbin,
|
|
/boot, /proc, /sys, /dev, /root on Linux/macOS; Windows system dirs).
|
|
This prevents even admin-saved workspaces from pointing at OS internals.
|
|
|
|
None/empty path falls back to the boot-time DEFAULT_WORKSPACE, which is always
|
|
trusted (it was validated at server startup).
|
|
"""
|
|
if path in (None, ""):
|
|
return Path(_BOOT_DEFAULT_WORKSPACE).expanduser().resolve()
|
|
|
|
candidate = Path(path).expanduser().resolve()
|
|
|
|
if not candidate.exists():
|
|
raise ValueError(f"Path does not exist: {candidate}")
|
|
if not candidate.is_dir():
|
|
raise ValueError(f"Path is not a directory: {candidate}")
|
|
|
|
# (A) Trusted if under the user's home directory — cross-platform via Path.home()
|
|
# Must be checked before system roots to allow symlinks like /var/home.
|
|
_home = Path.home().resolve()
|
|
if _home != Path("/"):
|
|
try:
|
|
candidate.relative_to(_home)
|
|
return candidate
|
|
except ValueError:
|
|
pass
|
|
|
|
# Block known system roots and their children.
|
|
if _is_blocked_workspace_path(candidate, path):
|
|
raise ValueError(f"Path points to a system directory: {candidate}")
|
|
|
|
# (B) Trusted if already in the saved workspace list — covers non-home installs
|
|
try:
|
|
saved = load_workspaces()
|
|
saved_paths = {Path(w["path"]).resolve() for w in saved if w.get("path")}
|
|
if candidate in saved_paths:
|
|
return candidate
|
|
except Exception:
|
|
pass
|
|
|
|
# (C) Trusted if it is equal to or under the boot-time DEFAULT_WORKSPACE.
|
|
# In Docker deployments HERMES_WEBUI_DEFAULT_WORKSPACE is often set to a
|
|
# volume mount outside the user's home (e.g. /data/workspace). That path
|
|
# was already validated at server startup, so any sub-path of it is safe
|
|
# without requiring the user to add it to the workspace list manually.
|
|
try:
|
|
boot_default = Path(_BOOT_DEFAULT_WORKSPACE).expanduser().resolve()
|
|
candidate.relative_to(boot_default)
|
|
return candidate
|
|
except ValueError:
|
|
pass
|
|
|
|
raise ValueError(
|
|
f"Path is outside the user home directory, not in the saved workspace "
|
|
f"list, and not under the default workspace: {candidate}. "
|
|
f"Add it via Settings → Workspaces first."
|
|
)
|
|
|
|
|
|
|
|
|
|
def _strip_surrounding_quotes(path: str) -> str:
|
|
"""Strip a single pair of surrounding single or double quotes from a path string.
|
|
|
|
macOS Finder's "Copy as Pathname" (Cmd+Option+C) returns paths wrapped in
|
|
single quotes, e.g. ``'/Users/x/Documents/foo'``. Other shells and OS file
|
|
managers do similar things with double quotes. Users routinely paste these
|
|
quoted strings into the Add Space input expecting them to "just work" —
|
|
the only reason they didn't was a missing strip.
|
|
|
|
Only paired quotes are stripped (matching opener and closer). One-sided quotes
|
|
are preserved on the slim chance a path legitimately contains a literal quote
|
|
character.
|
|
"""
|
|
s = path.strip()
|
|
if len(s) >= 2 and s[0] == s[-1] and s[0] in ("'", '"'):
|
|
return s[1:-1]
|
|
return s
|
|
|
|
|
|
def validate_workspace_to_add(path: str) -> Path:
|
|
"""Validate a path for *adding* to the workspace list (less restrictive than resolve_trusted_workspace).
|
|
|
|
When a user explicitly adds a new workspace path, we trust their intent — they
|
|
have console or filesystem access to that path and are consciously registering it.
|
|
We only block: non-existent paths, non-directories, and known system roots.
|
|
|
|
The stricter ``resolve_trusted_workspace`` is used when *using* an existing workspace
|
|
(file reads/writes) to prevent path traversal after the list is built.
|
|
|
|
Surrounding quotes (single or double) are stripped before validation —
|
|
macOS Finder's "Copy as Pathname" wraps paths in single quotes by default,
|
|
and users routinely paste those into the Add Space input.
|
|
"""
|
|
path = _strip_surrounding_quotes(path)
|
|
candidate = Path(path).expanduser().resolve()
|
|
|
|
if not candidate.exists():
|
|
raise ValueError(f"Path does not exist: {candidate}")
|
|
if not candidate.is_dir():
|
|
raise ValueError(f"Path is not a directory: {candidate}")
|
|
|
|
# Home directory is always trusted regardless of where it lives on disk
|
|
# (e.g. /var/home/... on systemd-homed Fedora/RHEL).
|
|
_home = Path.home().resolve()
|
|
if _home != Path("/") and _is_within(candidate, _home):
|
|
return candidate
|
|
|
|
# Block known system roots and their immediate children.
|
|
if _is_blocked_workspace_path(candidate, path):
|
|
raise ValueError(f"Path points to a system directory: {candidate}")
|
|
|
|
return candidate
|
|
|
|
def safe_resolve_ws(root: Path, requested: str) -> Path:
|
|
"""Resolve a relative path inside a workspace root, raising ValueError on traversal.
|
|
|
|
Symlinks whose *unresolved* path is within the workspace root are allowed —
|
|
the user placed them there intentionally. Only raw ``..`` traversal outside
|
|
the root is blocked.
|
|
"""
|
|
import os
|
|
unresolved = root / requested
|
|
resolved = unresolved.resolve()
|
|
# Fast path: resolved path is inside root (covers most cases)
|
|
try:
|
|
resolved.relative_to(root.resolve())
|
|
return resolved
|
|
except ValueError:
|
|
pass
|
|
# Symlink path: normalize '..' (without following symlinks) and check
|
|
# os.path.normpath collapses '..' but does NOT follow symlinks.
|
|
norm = Path(os.path.normpath(str(unresolved)))
|
|
try:
|
|
norm.relative_to(root)
|
|
except ValueError:
|
|
raise ValueError(f"Path traversal blocked: {requested}")
|
|
# Symlink points outside workspace root — additionally block system directories.
|
|
# Even if the user placed the symlink intentionally, prevent reads from
|
|
# /etc, /proc, /sys, /dev and other blocked roots (LLM agents can call
|
|
# read_file_content via tool calls, not just human users).
|
|
if _is_blocked_system_path(resolved):
|
|
raise ValueError(f"Path traversal blocked (system dir): {requested}")
|
|
return resolved
|
|
|
|
|
|
def list_dir(workspace: Path, rel: str='.'):
|
|
target = safe_resolve_ws(workspace, rel)
|
|
if not target.is_dir():
|
|
raise FileNotFoundError(f"Not a directory: {rel}")
|
|
ws_resolved = workspace.resolve()
|
|
entries = []
|
|
for item in sorted(target.iterdir(), key=lambda p: (not p.is_symlink(), p.is_file(), p.name.lower())):
|
|
if item.is_symlink():
|
|
# Resolve the symlink target and check if it stays within workspace
|
|
try:
|
|
link_target = item.resolve()
|
|
except OSError:
|
|
continue
|
|
# Cycle detection: skip if symlink points back to current dir,
|
|
# workspace root, or any ancestor of current dir.
|
|
# This must run REGARDLESS of whether target is inside workspace.
|
|
if (link_target == target.resolve() or link_target == target
|
|
or link_target == ws_resolved):
|
|
continue
|
|
try:
|
|
target.resolve().relative_to(link_target)
|
|
# target is under link_target — link_target is an ancestor → cycle
|
|
continue
|
|
except ValueError:
|
|
pass
|
|
# Block symlinks that resolve to system directories.
|
|
if _is_blocked_system_path(link_target):
|
|
continue
|
|
is_dir = link_target.is_dir()
|
|
# Keep the display path relative to workspace (don't follow the link)
|
|
display_path = str(Path(item.name))
|
|
if rel and rel != '.':
|
|
display_path = rel + '/' + display_path
|
|
entry = {
|
|
'name': item.name,
|
|
'path': display_path,
|
|
'type': 'symlink',
|
|
'target': str(link_target),
|
|
'is_dir': is_dir,
|
|
}
|
|
if not is_dir:
|
|
try:
|
|
entry['size'] = link_target.stat().st_size
|
|
except OSError:
|
|
entry['size'] = None
|
|
entries.append(entry)
|
|
else:
|
|
# Use rel-based path so entries under symlink targets (outside
|
|
# the workspace root) still get a valid workspace-relative path.
|
|
entry_path = item.name
|
|
if rel and rel != '.':
|
|
entry_path = rel + '/' + item.name
|
|
entries.append({
|
|
'name': item.name,
|
|
'path': entry_path,
|
|
'type': 'dir' if item.is_dir() else 'file',
|
|
'size': item.stat().st_size if item.is_file() else None,
|
|
})
|
|
if len(entries) >= 200:
|
|
break
|
|
return entries
|
|
|
|
|
|
def read_file_content(workspace: Path, rel: str) -> dict:
|
|
target = safe_resolve_ws(workspace, rel)
|
|
if not target.is_file():
|
|
raise FileNotFoundError(f"Not a file: {rel}")
|
|
size = target.stat().st_size
|
|
if size > MAX_FILE_BYTES:
|
|
raise ValueError(f"File too large ({size} bytes, max {MAX_FILE_BYTES})")
|
|
content = target.read_text(encoding='utf-8', errors='replace')
|
|
return {'path': rel, 'content': content, 'size': size, 'lines': content.count('\n') + 1}
|
|
|
|
|
|
# ── Git detection ──────────────────────────────────────────────────────────
|
|
|
|
def _run_git(args, cwd, timeout=3):
|
|
"""Run a git command and return stdout, or None on failure."""
|
|
try:
|
|
r = subprocess.run(
|
|
['git'] + args, cwd=str(cwd), capture_output=True,
|
|
text=True, timeout=timeout,
|
|
)
|
|
return r.stdout.strip() if r.returncode == 0 else None
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
|
|
return None
|
|
|
|
|
|
def git_info_for_workspace(workspace: Path) -> dict:
|
|
"""Return git info for a workspace directory, or None if not a git repo."""
|
|
if not (workspace / '.git').exists():
|
|
return None
|
|
branch = _run_git(['rev-parse', '--abbrev-ref', 'HEAD'], workspace)
|
|
if branch is None:
|
|
return None
|
|
# Run the remaining git commands in parallel via threads — they are
|
|
# independent subprocess calls and together can take 50-200ms when run
|
|
# serially. Threading is safe here because each call blocks only on the
|
|
# subprocess pipe, not on the GIL.
|
|
def _ahead():
|
|
r = _run_git(['rev-list', '--count', '@{u}..HEAD'], workspace)
|
|
return int(r) if r and r.isdigit() else 0
|
|
def _behind():
|
|
r = _run_git(['rev-list', '--count', 'HEAD..@{u}'], workspace)
|
|
return int(r) if r and r.isdigit() else 0
|
|
def _status():
|
|
out = _run_git(['status', '--porcelain'], workspace) or ''
|
|
lines = [l for l in out.splitlines() if l]
|
|
modified = sum(1 for l in lines if len(l) >= 2 and (l[0] in 'MAR' or l[1] in 'MAR'))
|
|
untracked = sum(1 for l in lines if l.startswith('??'))
|
|
return len(lines), modified, untracked
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as pool:
|
|
f_status = pool.submit(_status)
|
|
f_ahead = pool.submit(_ahead)
|
|
f_behind = pool.submit(_behind)
|
|
dirty, modified, untracked = f_status.result()
|
|
ahead = f_ahead.result()
|
|
behind = f_behind.result()
|
|
return {
|
|
'branch': branch,
|
|
'dirty': dirty,
|
|
'modified': modified,
|
|
'untracked': untracked,
|
|
'ahead': ahead,
|
|
'behind': behind,
|
|
'is_git': True,
|
|
}
|