mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-21 03:39:54 +00:00
23344a9a3c
Ship LSP semantic diagnostics as a bundled plugin (plugins/lsp/) using existing hook system. Zero lines of core code modified. Plugin wiring: - pre_tool_call: capture LSP baseline before write_file/patch - transform_tool_result: inject diagnostics into tool result JSON - on_session_start/on_session_end + atexit: lifecycle management Key design: - Baselines keyed by (session_id, abs_path) for concurrent safety - Diagnostics added as 'lsp_diagnostics' JSON field (preserves shape) - Per-file workspace detection (no static session-start gate) - V4A multi-file patch skipped for MVP - Short timeout (3s) — cold start degrades gracefully - os.path.exists heuristic for Docker/SSH backend skip - First relevant write with no server → INFO log with install hint Tests: 77/77 pass including: - Protocol framing, reporter formatting, workspace resolution - Client E2E against mock LSP server (live_system_guard_bypass) - Eventlog steady-state silence contract - Backend-gate heuristic (local vs non-local paths) - Full hook flow integration (pre→write→transform with diagnostics) Source: PR #24168 by @teknium1, PR #24155 by @OutThisLife Co-authored-by: Teknium <127238744+teknium1@users.noreply.github.com>
537 lines
20 KiB
Python
537 lines
20 KiB
Python
"""Service-level orchestration for LSP clients.
|
|
|
|
The :class:`LSPService` is the bridge between the synchronous
|
|
file_operations layer and the async :class:`agent.lsp.client.LSPClient`.
|
|
|
|
Design choices:
|
|
|
|
- A **single asyncio event loop** runs in a background thread. All
|
|
client work happens on that loop. Synchronous callers from
|
|
``tools/file_operations.py`` use :meth:`get_diagnostics_sync` to
|
|
open + wait + drain in one blocking call.
|
|
|
|
- One client per ``(server_id, workspace_root)`` key. Lazy spawn:
|
|
the first request for a key spawns the client; subsequent requests
|
|
re-use it.
|
|
|
|
- A **broken-set** records ``(server_id, workspace_root)`` pairs that
|
|
failed to spawn or initialize. These are never retried for the
|
|
life of the service. Mirrors OpenCode's design.
|
|
|
|
- A **delta baseline** map keeps "diagnostics-as-of-the-last-snapshot"
|
|
per file. ``snapshot_baseline()`` is called BEFORE a write; the
|
|
next ``get_diagnostics_sync()`` returns only diagnostics that
|
|
weren't in the baseline. This is the lift from Claude Code's
|
|
``beforeFileEdited`` / ``getNewDiagnostics`` pattern, except wired
|
|
to the local LSP layer instead of MCP IDE RPC.
|
|
|
|
The service is **off by default** — call :meth:`is_active` to check
|
|
whether it's actually doing anything. When LSP is disabled in
|
|
config, when no git workspace can be detected, when all configured
|
|
servers are missing binaries and auto-install is off, ``is_active``
|
|
returns False and the file_operations layer falls through to the
|
|
in-process syntax check.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
from concurrent.futures import Future as ConcurrentFuture
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from plugins.lsp import eventlog
|
|
from plugins.lsp.client import (
|
|
DIAGNOSTICS_DOCUMENT_WAIT,
|
|
LSPClient,
|
|
file_uri,
|
|
)
|
|
from plugins.lsp.servers import (
|
|
ServerContext,
|
|
ServerDef,
|
|
SpawnSpec,
|
|
find_server_for_file,
|
|
language_id_for,
|
|
)
|
|
from plugins.lsp.workspace import (
|
|
clear_cache,
|
|
is_inside_workspace,
|
|
resolve_workspace_for_file,
|
|
)
|
|
|
|
logger = logging.getLogger("agent.lsp.manager")
|
|
|
|
DEFAULT_IDLE_TIMEOUT = 600 # seconds; servers idle for >10min get reaped
|
|
|
|
|
|
class _BackgroundLoop:
|
|
"""A daemon thread that owns one asyncio event loop.
|
|
|
|
Provides :meth:`run` for synchronous callers — submits a coroutine
|
|
to the loop and blocks until it finishes (or a timeout fires).
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._ready = threading.Event()
|
|
|
|
def start(self) -> None:
|
|
if self._thread is not None:
|
|
return
|
|
self._thread = threading.Thread(
|
|
target=self._run_forever,
|
|
name="hermes-lsp-loop",
|
|
daemon=True,
|
|
)
|
|
self._thread.start()
|
|
self._ready.wait(timeout=5.0)
|
|
|
|
def _run_forever(self) -> None:
|
|
loop = asyncio.new_event_loop()
|
|
self._loop = loop
|
|
asyncio.set_event_loop(loop)
|
|
self._ready.set()
|
|
try:
|
|
loop.run_forever()
|
|
finally:
|
|
try:
|
|
loop.close()
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
|
|
def run(self, coro, *, timeout: Optional[float] = None) -> Any:
|
|
"""Submit a coroutine to the loop and block until done.
|
|
|
|
Returns the coroutine's result, or raises its exception.
|
|
"""
|
|
if self._loop is None:
|
|
raise RuntimeError("background loop not started")
|
|
fut: ConcurrentFuture = asyncio.run_coroutine_threadsafe(coro, self._loop)
|
|
try:
|
|
return fut.result(timeout=timeout)
|
|
except Exception:
|
|
fut.cancel()
|
|
raise
|
|
|
|
def stop(self) -> None:
|
|
loop = self._loop
|
|
if loop is None:
|
|
return
|
|
try:
|
|
loop.call_soon_threadsafe(loop.stop)
|
|
except RuntimeError:
|
|
pass
|
|
if self._thread is not None:
|
|
self._thread.join(timeout=2.0)
|
|
self._loop = None
|
|
self._thread = None
|
|
|
|
|
|
class LSPService:
|
|
"""The process-wide LSP service.
|
|
|
|
Created once via :meth:`create_from_config`; the
|
|
:func:`agent.lsp.get_service` accessor manages the singleton.
|
|
Most callers should use that accessor rather than constructing
|
|
:class:`LSPService` directly.
|
|
"""
|
|
|
|
# ------------------------------------------------------------------
|
|
# construction + factory
|
|
# ------------------------------------------------------------------
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
enabled: bool,
|
|
wait_mode: str,
|
|
wait_timeout: float,
|
|
install_strategy: str,
|
|
binary_overrides: Optional[Dict[str, List[str]]] = None,
|
|
env_overrides: Optional[Dict[str, Dict[str, str]]] = None,
|
|
init_overrides: Optional[Dict[str, Dict[str, Any]]] = None,
|
|
disabled_servers: Optional[List[str]] = None,
|
|
idle_timeout: float = DEFAULT_IDLE_TIMEOUT,
|
|
) -> None:
|
|
self._enabled = enabled
|
|
self._wait_mode = wait_mode if wait_mode in ("document", "full") else "document"
|
|
self._wait_timeout = wait_timeout
|
|
self._install_strategy = install_strategy
|
|
self._binary_overrides = binary_overrides or {}
|
|
self._env_overrides = env_overrides or {}
|
|
self._init_overrides = init_overrides or {}
|
|
self._disabled_servers = set(disabled_servers or [])
|
|
self._idle_timeout = idle_timeout
|
|
|
|
self._loop = _BackgroundLoop()
|
|
if self._enabled:
|
|
self._loop.start()
|
|
|
|
# Per-(server_id, workspace_root) state
|
|
self._clients: Dict[Tuple[str, str], LSPClient] = {}
|
|
self._broken: set = set()
|
|
self._spawning: Dict[Tuple[str, str], asyncio.Future] = {}
|
|
self._last_used: Dict[Tuple[str, str], float] = {}
|
|
self._state_lock = threading.Lock()
|
|
|
|
# Delta baseline: file path → snapshot of diagnostics taken
|
|
# immediately before a write. ``get_diagnostics_sync`` filters
|
|
# out anything in the baseline so the agent only sees errors
|
|
# introduced by the current edit.
|
|
self._delta_baseline: Dict[str, List[Dict[str, Any]]] = {}
|
|
|
|
@classmethod
|
|
def create_from_config(cls) -> Optional["LSPService"]:
|
|
"""Build a service from ``hermes_cli.config`` settings.
|
|
|
|
Returns ``None`` if the config can't be loaded. The service
|
|
itself returns ``is_active()`` False when LSP is disabled.
|
|
"""
|
|
try:
|
|
from hermes_cli.config import load_config
|
|
cfg = load_config()
|
|
except Exception as e: # noqa: BLE001
|
|
logger.debug("LSP config load failed: %s", e)
|
|
return None
|
|
|
|
lsp_cfg = (cfg.get("lsp") or {}) if isinstance(cfg, dict) else {}
|
|
if not isinstance(lsp_cfg, dict):
|
|
lsp_cfg = {}
|
|
|
|
enabled = bool(lsp_cfg.get("enabled", True))
|
|
wait_mode = lsp_cfg.get("wait_mode", "document")
|
|
wait_timeout = float(lsp_cfg.get("wait_timeout", DIAGNOSTICS_DOCUMENT_WAIT))
|
|
install_strategy = lsp_cfg.get("install_strategy", "auto")
|
|
servers_cfg = lsp_cfg.get("servers") or {}
|
|
disabled = []
|
|
binary_overrides: Dict[str, List[str]] = {}
|
|
env_overrides: Dict[str, Dict[str, str]] = {}
|
|
init_overrides: Dict[str, Dict[str, Any]] = {}
|
|
if isinstance(servers_cfg, dict):
|
|
for name, sub in servers_cfg.items():
|
|
if not isinstance(sub, dict):
|
|
continue
|
|
if sub.get("disabled"):
|
|
disabled.append(name)
|
|
cmd = sub.get("command")
|
|
if isinstance(cmd, list) and cmd:
|
|
binary_overrides[name] = cmd
|
|
env = sub.get("env")
|
|
if isinstance(env, dict):
|
|
env_overrides[name] = {k: str(v) for k, v in env.items()}
|
|
init = sub.get("initialization_options")
|
|
if isinstance(init, dict):
|
|
init_overrides[name] = init
|
|
|
|
return cls(
|
|
enabled=enabled,
|
|
wait_mode=wait_mode,
|
|
wait_timeout=wait_timeout,
|
|
install_strategy=install_strategy,
|
|
binary_overrides=binary_overrides,
|
|
env_overrides=env_overrides,
|
|
init_overrides=init_overrides,
|
|
disabled_servers=disabled,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# public API
|
|
# ------------------------------------------------------------------
|
|
|
|
def is_active(self) -> bool:
|
|
"""Return True iff this service should be consulted at all."""
|
|
return self._enabled
|
|
|
|
def enabled_for(self, file_path: str) -> bool:
|
|
"""Return True iff LSP should run for this specific file.
|
|
|
|
Gates on workspace detection (file or cwd inside a git worktree)
|
|
and on whether any registered server matches the extension.
|
|
"""
|
|
if not self._enabled:
|
|
return False
|
|
srv = find_server_for_file(file_path)
|
|
if srv is None or srv.server_id in self._disabled_servers:
|
|
return False
|
|
ws_root, gated_in = resolve_workspace_for_file(file_path)
|
|
return bool(ws_root and gated_in)
|
|
|
|
def snapshot_baseline(self, file_path: str) -> None:
|
|
"""Snapshot current diagnostics for ``file_path`` as the delta baseline.
|
|
|
|
Called BEFORE a write so the next ``get_diagnostics_sync()``
|
|
can filter out pre-existing errors. Best-effort — failures
|
|
are silently swallowed so a flaky server can't break a write.
|
|
"""
|
|
if not self.enabled_for(file_path):
|
|
return
|
|
try:
|
|
diags = self._loop.run(self._snapshot_async(file_path), timeout=8.0)
|
|
self._delta_baseline[os.path.abspath(file_path)] = diags or []
|
|
except Exception as e: # noqa: BLE001
|
|
logger.debug("baseline snapshot failed for %s: %s", file_path, e)
|
|
# Set empty baseline so the next call still does the
|
|
# comparison (any post-edit diagnostic will be considered
|
|
# "new" — safe default).
|
|
self._delta_baseline[os.path.abspath(file_path)] = []
|
|
|
|
def get_diagnostics_sync(
|
|
self,
|
|
file_path: str,
|
|
*,
|
|
delta: bool = True,
|
|
timeout: Optional[float] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Synchronously open ``file_path`` in the right server, wait for
|
|
diagnostics, return them.
|
|
|
|
If ``delta`` is True (default), the result is filtered against
|
|
any baseline previously captured via :meth:`snapshot_baseline`.
|
|
Diagnostics present in the baseline are removed so the caller
|
|
only sees errors introduced by the current edit.
|
|
|
|
Returns an empty list when LSP is disabled, when no workspace
|
|
can be detected, when no server matches, or when the server
|
|
can't be spawned. Never raises.
|
|
"""
|
|
if not self.enabled_for(file_path):
|
|
return []
|
|
|
|
# Resolve server_id eagerly so we can emit structured logs even
|
|
# when the request errors out below.
|
|
srv = find_server_for_file(file_path)
|
|
server_id = srv.server_id if srv else "?"
|
|
|
|
try:
|
|
t = timeout if timeout is not None else self._wait_timeout + 2.0
|
|
diags = self._loop.run(self._open_and_wait_async(file_path), timeout=t) or []
|
|
except asyncio.TimeoutError as e:
|
|
eventlog.log_timeout(server_id, file_path)
|
|
logger.debug("LSP diagnostics timeout for %s: %s", file_path, e)
|
|
return []
|
|
except Exception as e: # noqa: BLE001
|
|
eventlog.log_server_error(server_id, file_path, e)
|
|
logger.debug("LSP diagnostics fetch failed for %s: %s", file_path, e)
|
|
return []
|
|
|
|
abs_path = os.path.abspath(file_path)
|
|
if delta:
|
|
baseline = self._delta_baseline.get(abs_path) or []
|
|
if baseline:
|
|
seen = {_diag_key(d) for d in baseline}
|
|
diags = [d for d in diags if _diag_key(d) not in seen]
|
|
# Roll baseline forward — next call returns deltas relative
|
|
# to the just-emitted state, mirroring claude-code's
|
|
# diagnosticTracking.
|
|
try:
|
|
fresh = self._loop.run(self._current_diags_async(file_path), timeout=2.0) or []
|
|
except Exception: # noqa: BLE001
|
|
fresh = []
|
|
if fresh:
|
|
self._delta_baseline[abs_path] = fresh
|
|
|
|
if diags:
|
|
eventlog.log_diagnostics(server_id, file_path, len(diags))
|
|
else:
|
|
eventlog.log_clean(server_id, file_path)
|
|
return diags
|
|
|
|
def shutdown(self) -> None:
|
|
"""Tear down all clients and stop the background loop."""
|
|
if not self._enabled:
|
|
return
|
|
try:
|
|
self._loop.run(self._shutdown_async(), timeout=10.0)
|
|
except Exception as e: # noqa: BLE001
|
|
logger.debug("LSP shutdown error: %s", e)
|
|
self._loop.stop()
|
|
clear_cache()
|
|
|
|
# ------------------------------------------------------------------
|
|
# async internals
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _snapshot_async(self, file_path: str) -> List[Dict[str, Any]]:
|
|
client = await self._get_or_spawn(file_path)
|
|
if client is None:
|
|
return []
|
|
try:
|
|
version = await client.open_file(file_path, language_id=language_id_for(file_path))
|
|
await client.wait_for_diagnostics(file_path, version, mode=self._wait_mode)
|
|
except Exception as e: # noqa: BLE001
|
|
logger.debug("snapshot open/wait failed: %s", e)
|
|
return []
|
|
self._last_used[(client.server_id, client.workspace_root)] = time.time()
|
|
return list(client.diagnostics_for(file_path))
|
|
|
|
async def _open_and_wait_async(self, file_path: str) -> List[Dict[str, Any]]:
|
|
client = await self._get_or_spawn(file_path)
|
|
if client is None:
|
|
return []
|
|
try:
|
|
version = await client.open_file(file_path, language_id=language_id_for(file_path))
|
|
await client.save_file(file_path)
|
|
await client.wait_for_diagnostics(file_path, version, mode=self._wait_mode)
|
|
except Exception as e: # noqa: BLE001
|
|
logger.debug("open/wait failed for %s: %s", file_path, e)
|
|
return []
|
|
self._last_used[(client.server_id, client.workspace_root)] = time.time()
|
|
return list(client.diagnostics_for(file_path))
|
|
|
|
async def _current_diags_async(self, file_path: str) -> List[Dict[str, Any]]:
|
|
ws, gated = resolve_workspace_for_file(file_path)
|
|
srv = find_server_for_file(file_path)
|
|
if not (ws and gated and srv):
|
|
return []
|
|
with self._state_lock:
|
|
client = self._clients.get((srv.server_id, ws))
|
|
if client is None:
|
|
return []
|
|
return list(client.diagnostics_for(file_path))
|
|
|
|
async def _get_or_spawn(self, file_path: str) -> Optional[LSPClient]:
|
|
srv = find_server_for_file(file_path)
|
|
if srv is None:
|
|
return None
|
|
if srv.server_id in self._disabled_servers:
|
|
eventlog.log_disabled(srv.server_id, file_path, "disabled in config")
|
|
return None
|
|
ws_root, gated = resolve_workspace_for_file(file_path)
|
|
if not (ws_root and gated):
|
|
eventlog.log_no_project_root(srv.server_id, file_path)
|
|
return None
|
|
per_server_root = srv.resolve_root(file_path, ws_root)
|
|
if per_server_root is None:
|
|
eventlog.log_disabled(
|
|
srv.server_id, file_path, "exclude marker hit (server gated off)"
|
|
)
|
|
return None # exclude marker hit, server gated off
|
|
|
|
key = (srv.server_id, per_server_root)
|
|
if key in self._broken:
|
|
return None
|
|
with self._state_lock:
|
|
client = self._clients.get(key)
|
|
if client is not None and client.is_running:
|
|
eventlog.log_active(srv.server_id, per_server_root)
|
|
return client
|
|
spawning = self._spawning.get(key)
|
|
if spawning is not None:
|
|
try:
|
|
return await spawning
|
|
except Exception: # noqa: BLE001
|
|
return None
|
|
|
|
# Begin spawn
|
|
loop = asyncio.get_running_loop()
|
|
spawn_future: asyncio.Future = loop.create_future()
|
|
with self._state_lock:
|
|
self._spawning[key] = spawn_future
|
|
try:
|
|
ctx = ServerContext(
|
|
workspace_root=per_server_root,
|
|
install_strategy=self._install_strategy,
|
|
binary_overrides=self._binary_overrides,
|
|
env_overrides=self._env_overrides,
|
|
init_overrides=self._init_overrides,
|
|
)
|
|
spec = srv.build_spawn(per_server_root, ctx)
|
|
if spec is None:
|
|
# ``build_spawn`` returns None when the binary can't be
|
|
# located (auto-install disabled, manual-only server,
|
|
# or install attempt failed). Surface this once via
|
|
# the structured logger so the user can act on it.
|
|
eventlog.log_server_unavailable(srv.server_id, srv.server_id)
|
|
self._broken.add(key)
|
|
spawn_future.set_result(None)
|
|
return None
|
|
client = LSPClient(
|
|
server_id=srv.server_id,
|
|
workspace_root=spec.workspace_root,
|
|
command=spec.command,
|
|
env=spec.env,
|
|
cwd=spec.cwd,
|
|
initialization_options=spec.initialization_options,
|
|
seed_diagnostics_on_first_push=spec.seed_diagnostics_on_first_push or srv.seed_first_push,
|
|
)
|
|
try:
|
|
await client.start()
|
|
except Exception as e: # noqa: BLE001
|
|
eventlog.log_spawn_failed(srv.server_id, per_server_root, e)
|
|
self._broken.add(key)
|
|
spawn_future.set_result(None)
|
|
return None
|
|
with self._state_lock:
|
|
self._clients[key] = client
|
|
self._last_used[key] = time.time()
|
|
eventlog.log_active(srv.server_id, per_server_root)
|
|
spawn_future.set_result(client)
|
|
return client
|
|
finally:
|
|
with self._state_lock:
|
|
self._spawning.pop(key, None)
|
|
|
|
async def _shutdown_async(self) -> None:
|
|
with self._state_lock:
|
|
clients = list(self._clients.values())
|
|
self._clients.clear()
|
|
self._broken.clear()
|
|
self._last_used.clear()
|
|
await asyncio.gather(
|
|
*(c.shutdown() for c in clients),
|
|
return_exceptions=True,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# status / introspection (used by ``hermes lsp status``)
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Return a snapshot of the service for the CLI status command."""
|
|
with self._state_lock:
|
|
clients = [
|
|
{
|
|
"server_id": k[0],
|
|
"workspace_root": k[1],
|
|
"state": c.state,
|
|
"running": c.is_running,
|
|
}
|
|
for k, c in self._clients.items()
|
|
]
|
|
broken = list(self._broken)
|
|
return {
|
|
"enabled": self._enabled,
|
|
"wait_mode": self._wait_mode,
|
|
"wait_timeout": self._wait_timeout,
|
|
"install_strategy": self._install_strategy,
|
|
"clients": clients,
|
|
"broken": broken,
|
|
"disabled_servers": sorted(self._disabled_servers),
|
|
}
|
|
|
|
|
|
def _diag_key(d: Dict[str, Any]) -> str:
|
|
"""Content equality key used for delta filtering. Mirrors
|
|
:func:`agent.lsp.client._diagnostic_key`."""
|
|
rng = d.get("range") or {}
|
|
start = rng.get("start") or {}
|
|
end = rng.get("end") or {}
|
|
code = d.get("code")
|
|
if code is not None and not isinstance(code, str):
|
|
code = str(code)
|
|
return "\x00".join(
|
|
[
|
|
str(d.get("severity") or 1),
|
|
str(code or ""),
|
|
str(d.get("source") or ""),
|
|
str(d.get("message") or "").strip(),
|
|
f"{start.get('line', 0)}:{start.get('character', 0)}-{end.get('line', 0)}:{end.get('character', 0)}",
|
|
]
|
|
)
|
|
|
|
|
|
__all__ = ["LSPService"]
|