mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-21 03:39:54 +00:00
6d80aa80eb
- Remove dead _post_tool_call (body was only comments) - Remove _on_session_start (redundant — _ensure_service lazy-inits) - Remove _atexit_cleanup (duplicate of _on_session_end) - Switch _baselines from dict to set (presence sentinel only) - Remove redundant enabled_for recheck in transform_tool_result - Remove V4A guard (path-empty check already covers it) - Use modern type syntax (X | None, dict[], set[]) - Reduce from 322 → 217 lines, same behavior 77/77 tests pass.
231 lines
6.6 KiB
Python
231 lines
6.6 KiB
Python
"""LSP Plugin — semantic diagnostics from real language servers.
|
|
|
|
Hooks into write_file/patch via the Hermes plugin system to surface
|
|
type errors, undefined names, missing imports, and other semantic
|
|
issues detected by pyright, gopls, rust-analyzer, typescript-language-server,
|
|
and ~20 more.
|
|
|
|
Opt-in: add ``lsp`` to ``plugins.enabled`` in config.yaml.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import atexit
|
|
import json
|
|
import logging
|
|
import os
|
|
import threading
|
|
from typing import Any
|
|
|
|
logger = logging.getLogger("plugins.lsp")
|
|
|
|
# Module-level state
|
|
_service: Any = None # LSPService | None
|
|
_service_lock = threading.Lock()
|
|
# Presence set: (session_id, abs_path) entries where a baseline was captured.
|
|
_baselines: set[tuple[str, str]] = set()
|
|
|
|
|
|
def register(ctx) -> None:
|
|
"""Plugin registration — wire hooks and CLI commands."""
|
|
ctx.register_hook("on_session_end", _on_session_end)
|
|
ctx.register_hook("pre_tool_call", _pre_tool_call)
|
|
ctx.register_hook("transform_tool_result", _transform_tool_result)
|
|
|
|
try:
|
|
from plugins.lsp.cli import setup_lsp_parser, run_lsp_command
|
|
ctx.register_cli_command(
|
|
name="lsp",
|
|
help="Language Server Protocol management",
|
|
setup_fn=setup_lsp_parser,
|
|
handler_fn=run_lsp_command,
|
|
)
|
|
except Exception as e:
|
|
logger.debug("LSP CLI registration failed: %s", e)
|
|
|
|
atexit.register(_on_session_end)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lifecycle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _on_session_end(**kwargs) -> None:
|
|
"""Tear down all language servers and clear baselines."""
|
|
global _service
|
|
with _service_lock:
|
|
if _service is not None:
|
|
try:
|
|
_service.shutdown()
|
|
except Exception as e:
|
|
logger.debug("LSP shutdown error: %s", e)
|
|
_service = None
|
|
_baselines.clear()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool hooks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _pre_tool_call(**kwargs) -> None:
|
|
"""Snapshot LSP baseline before a file write."""
|
|
tool_name = kwargs.get("tool_name", "")
|
|
if tool_name not in ("write_file", "patch"):
|
|
return
|
|
|
|
svc = _ensure_service()
|
|
if svc is None:
|
|
return
|
|
|
|
args = _parse_args(kwargs.get("args"))
|
|
if args is None:
|
|
return
|
|
|
|
path = args.get("path", "")
|
|
if not path:
|
|
return
|
|
|
|
abs_path = _resolve_path(path)
|
|
|
|
# Best-effort local-only check: skip if parent dir doesn't exist on host
|
|
if not os.path.exists(os.path.dirname(abs_path) or "."):
|
|
return
|
|
|
|
if not svc.enabled_for(abs_path):
|
|
return
|
|
|
|
session_id = kwargs.get("session_id") or ""
|
|
key = (session_id, abs_path)
|
|
|
|
try:
|
|
svc.snapshot_baseline(abs_path)
|
|
_baselines.add(key)
|
|
except Exception as e:
|
|
logger.debug("LSP baseline snapshot failed for %s: %s", abs_path, e)
|
|
|
|
|
|
def _transform_tool_result(**kwargs) -> str | None:
|
|
"""Inject LSP diagnostics into the tool result JSON.
|
|
|
|
Returns modified result string with ``lsp_diagnostics`` field,
|
|
or None to leave unchanged.
|
|
"""
|
|
tool_name = kwargs.get("tool_name", "")
|
|
if tool_name not in ("write_file", "patch"):
|
|
return None
|
|
|
|
svc = _service
|
|
if svc is None or not svc.is_active():
|
|
return None
|
|
|
|
args = _parse_args(kwargs.get("args"))
|
|
if args is None:
|
|
return None
|
|
|
|
path = args.get("path", "")
|
|
if not path:
|
|
return None
|
|
|
|
abs_path = _resolve_path(path)
|
|
session_id = kwargs.get("session_id") or ""
|
|
key = (session_id, abs_path)
|
|
|
|
if key not in _baselines:
|
|
return None
|
|
_baselines.discard(key)
|
|
|
|
# Fetch diagnostics with short timeout
|
|
try:
|
|
diagnostics = svc.get_diagnostics_sync(abs_path, delta=True, timeout=3.0)
|
|
except Exception as e:
|
|
logger.debug("LSP diagnostics fetch failed for %s: %s", abs_path, e)
|
|
return None
|
|
|
|
if not diagnostics:
|
|
return None
|
|
|
|
# Format
|
|
try:
|
|
from plugins.lsp.reporter import report_for_file, truncate
|
|
block = report_for_file(abs_path, diagnostics)
|
|
if not block:
|
|
return None
|
|
lsp_output = truncate(block)
|
|
except Exception:
|
|
return None
|
|
|
|
# Inject into result JSON (only when result is a JSON dict)
|
|
result = kwargs.get("result")
|
|
if not isinstance(result, str):
|
|
return None
|
|
try:
|
|
result_data = json.loads(result)
|
|
if not isinstance(result_data, dict):
|
|
return None
|
|
result_data["lsp_diagnostics"] = lsp_output
|
|
return json.dumps(result_data, ensure_ascii=False)
|
|
except (json.JSONDecodeError, TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _ensure_service():
|
|
"""Lazy-initialize the LSP service singleton."""
|
|
global _service
|
|
svc = _service
|
|
if svc is not None:
|
|
return svc if svc.is_active() else None
|
|
with _service_lock:
|
|
if _service is not None:
|
|
return _service if _service.is_active() else None
|
|
try:
|
|
from plugins.lsp.manager import LSPService
|
|
_service = LSPService.create_from_config()
|
|
except Exception as e:
|
|
logger.debug("LSP service creation failed: %s", e)
|
|
return None
|
|
return _service if (_service and _service.is_active()) else None
|
|
|
|
|
|
def _parse_args(args) -> dict[str, Any] | None:
|
|
"""Normalize args (may be dict or JSON string)."""
|
|
if isinstance(args, dict):
|
|
return args
|
|
if isinstance(args, str):
|
|
try:
|
|
parsed = json.loads(args)
|
|
if isinstance(parsed, dict):
|
|
return parsed
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
return None
|
|
|
|
|
|
def _resolve_path(path: str) -> str:
|
|
"""Expand and absolutify a path."""
|
|
expanded = os.path.expanduser(path)
|
|
if not os.path.isabs(expanded):
|
|
expanded = os.path.join(os.getcwd(), expanded)
|
|
return os.path.normpath(expanded)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API (used by plugins/lsp/cli.py)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def get_service():
|
|
"""Return the active LSP service or None."""
|
|
svc = _service
|
|
return svc if (svc is not None and svc.is_active()) else None
|
|
|
|
|
|
def shutdown_service() -> None:
|
|
"""Tear down the LSP service (idempotent)."""
|
|
_on_session_end()
|