mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-21 03:39:54 +00:00
23344a9a3c
Ship LSP semantic diagnostics as a bundled plugin (plugins/lsp/) using existing hook system. Zero lines of core code modified. Plugin wiring: - pre_tool_call: capture LSP baseline before write_file/patch - transform_tool_result: inject diagnostics into tool result JSON - on_session_start/on_session_end + atexit: lifecycle management Key design: - Baselines keyed by (session_id, abs_path) for concurrent safety - Diagnostics added as 'lsp_diagnostics' JSON field (preserves shape) - Per-file workspace detection (no static session-start gate) - V4A multi-file patch skipped for MVP - Short timeout (3s) — cold start degrades gracefully - os.path.exists heuristic for Docker/SSH backend skip - First relevant write with no server → INFO log with install hint Tests: 77/77 pass including: - Protocol framing, reporter formatting, workspace resolution - Client E2E against mock LSP server (live_system_guard_bypass) - Eventlog steady-state silence contract - Backend-gate heuristic (local vs non-local paths) - Full hook flow integration (pre→write→transform with diagnostics) Source: PR #24168 by @teknium1, PR #24155 by @OutThisLife Co-authored-by: Teknium <127238744+teknium1@users.noreply.github.com>
150 lines
4.9 KiB
Python
150 lines
4.9 KiB
Python
"""End-to-end client tests against the in-process mock LSP server.
|
|
|
|
Spins up :file:`_mock_lsp_server.py` as an actual subprocess, drives
|
|
it through real LSP traffic, and asserts diagnostic flow. This is
|
|
the closest thing we have to integration coverage without requiring
|
|
pyright/gopls/etc. to be installed in CI.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from plugins.lsp.client import LSPClient
|
|
|
|
# These tests spawn a real subprocess (mock LSP server) and terminate it
|
|
# via SIGTERM on shutdown. The conftest live-system guard blocks os.kill
|
|
# for PIDs outside the test process subtree; bypass it here because this
|
|
# is intentional subprocess lifecycle management.
|
|
pytestmark = pytest.mark.live_system_guard_bypass
|
|
|
|
|
|
MOCK_SERVER = str(Path(__file__).parent / "_mock_lsp_server.py")
|
|
|
|
|
|
def _client(workspace: Path, script: str = "clean") -> LSPClient:
|
|
env = {"MOCK_LSP_SCRIPT": script, "PYTHONPATH": os.environ.get("PYTHONPATH", "")}
|
|
return LSPClient(
|
|
server_id=f"mock-{script}",
|
|
workspace_root=str(workspace),
|
|
command=[sys.executable, MOCK_SERVER],
|
|
env=env,
|
|
cwd=str(workspace),
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_lifecycle_clean(tmp_path: Path):
|
|
"""Full lifecycle: spawn, initialize, open, get clean diagnostics, shutdown."""
|
|
f = tmp_path / "x.py"
|
|
f.write_text("print('hi')\n")
|
|
|
|
client = _client(tmp_path, "clean")
|
|
await client.start()
|
|
try:
|
|
assert client.is_running
|
|
version = await client.open_file(str(f), language_id="python")
|
|
assert version == 0
|
|
await client.wait_for_diagnostics(str(f), version, mode="document")
|
|
diags = client.diagnostics_for(str(f))
|
|
assert diags == []
|
|
finally:
|
|
await client.shutdown()
|
|
assert not client.is_running
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_receives_published_errors(tmp_path: Path):
|
|
f = tmp_path / "x.py"
|
|
f.write_text("print('hi')\n")
|
|
|
|
client = _client(tmp_path, "errors")
|
|
await client.start()
|
|
try:
|
|
version = await client.open_file(str(f), language_id="python")
|
|
await client.wait_for_diagnostics(str(f), version, mode="document")
|
|
diags = client.diagnostics_for(str(f))
|
|
assert len(diags) == 1
|
|
d = diags[0]
|
|
assert d["severity"] == 1
|
|
assert d["code"] == "MOCK001"
|
|
assert d["source"] == "mock-lsp"
|
|
assert "synthetic error" in d["message"]
|
|
finally:
|
|
await client.shutdown()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_didchange_bumps_version(tmp_path: Path):
|
|
f = tmp_path / "x.py"
|
|
f.write_text("print('hi')\n")
|
|
|
|
client = _client(tmp_path, "errors")
|
|
await client.start()
|
|
try:
|
|
v0 = await client.open_file(str(f), language_id="python")
|
|
f.write_text("print('hi 2')\n")
|
|
v1 = await client.open_file(str(f), language_id="python") # re-open path = didChange
|
|
assert v1 == v0 + 1
|
|
await client.wait_for_diagnostics(str(f), v1, mode="document")
|
|
# Mock pushed a diagnostic for both events; merged view has one
|
|
# entry (push store keyed by file path).
|
|
diags = client.diagnostics_for(str(f))
|
|
assert len(diags) == 1
|
|
finally:
|
|
await client.shutdown()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_handles_crashing_server(tmp_path: Path):
|
|
"""When the server exits right after initialize, subsequent requests
|
|
fail gracefully (not hang)."""
|
|
f = tmp_path / "x.py"
|
|
f.write_text("")
|
|
|
|
client = _client(tmp_path, "crash")
|
|
await client.start() # should succeed (mock answers initialize before crashing)
|
|
# Give the OS a moment to deliver the EOF.
|
|
await asyncio.sleep(0.2)
|
|
# The reader loop should detect EOF and mark pending requests as failed.
|
|
try:
|
|
await asyncio.wait_for(
|
|
client.open_file(str(f), language_id="python"), timeout=2.0
|
|
)
|
|
except Exception:
|
|
pass # any exception is acceptable; the contract is "doesn't hang"
|
|
await client.shutdown()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_shutdown_idempotent(tmp_path: Path):
|
|
"""Calling shutdown twice must be safe."""
|
|
f = tmp_path / "x.py"
|
|
f.write_text("")
|
|
client = _client(tmp_path, "clean")
|
|
await client.start()
|
|
await client.shutdown()
|
|
await client.shutdown() # must not raise
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_diagnostics_are_deduped(tmp_path: Path):
|
|
"""Repeated identical pushes must not produce duplicate diagnostics."""
|
|
f = tmp_path / "x.py"
|
|
f.write_text("")
|
|
client = _client(tmp_path, "errors")
|
|
await client.start()
|
|
try:
|
|
for _ in range(3):
|
|
v = await client.open_file(str(f), language_id="python")
|
|
await client.wait_for_diagnostics(str(f), v, mode="document")
|
|
diags = client.diagnostics_for(str(f))
|
|
# Push store overwrites on every notification — should have 1.
|
|
assert len(diags) == 1
|
|
finally:
|
|
await client.shutdown()
|