Files
hermes-agent/tests/plugins/lsp/test_client_e2e.py
alt-glitch 23344a9a3c feat(lsp): plugin-based LSP diagnostics with zero core changes
Ship LSP semantic diagnostics as a bundled plugin (plugins/lsp/) using
existing hook system.  Zero lines of core code modified.

Plugin wiring:
- pre_tool_call: capture LSP baseline before write_file/patch
- transform_tool_result: inject diagnostics into tool result JSON
- on_session_start/on_session_end + atexit: lifecycle management

Key design:
- Baselines keyed by (session_id, abs_path) for concurrent safety
- Diagnostics added as 'lsp_diagnostics' JSON field (preserves shape)
- Per-file workspace detection (no static session-start gate)
- V4A multi-file patch skipped for MVP
- Short timeout (3s) — cold start degrades gracefully
- os.path.exists heuristic for Docker/SSH backend skip
- First relevant write with no server → INFO log with install hint

Tests: 77/77 pass including:
- Protocol framing, reporter formatting, workspace resolution
- Client E2E against mock LSP server (live_system_guard_bypass)
- Eventlog steady-state silence contract
- Backend-gate heuristic (local vs non-local paths)
- Full hook flow integration (pre→write→transform with diagnostics)

Source: PR #24168 by @teknium1, PR #24155 by @OutThisLife
Co-authored-by: Teknium <127238744+teknium1@users.noreply.github.com>
2026-05-12 13:01:13 +00:00

150 lines
4.9 KiB
Python

"""End-to-end client tests against the in-process mock LSP server.
Spins up :file:`_mock_lsp_server.py` as an actual subprocess, drives
it through real LSP traffic, and asserts diagnostic flow. This is
the closest thing we have to integration coverage without requiring
pyright/gopls/etc. to be installed in CI.
"""
from __future__ import annotations
import asyncio
import os
import sys
from pathlib import Path
import pytest
from plugins.lsp.client import LSPClient
# These tests spawn a real subprocess (mock LSP server) and terminate it
# via SIGTERM on shutdown. The conftest live-system guard blocks os.kill
# for PIDs outside the test process subtree; bypass it here because this
# is intentional subprocess lifecycle management.
pytestmark = pytest.mark.live_system_guard_bypass
MOCK_SERVER = str(Path(__file__).parent / "_mock_lsp_server.py")
def _client(workspace: Path, script: str = "clean") -> LSPClient:
env = {"MOCK_LSP_SCRIPT": script, "PYTHONPATH": os.environ.get("PYTHONPATH", "")}
return LSPClient(
server_id=f"mock-{script}",
workspace_root=str(workspace),
command=[sys.executable, MOCK_SERVER],
env=env,
cwd=str(workspace),
)
@pytest.mark.asyncio
async def test_client_lifecycle_clean(tmp_path: Path):
"""Full lifecycle: spawn, initialize, open, get clean diagnostics, shutdown."""
f = tmp_path / "x.py"
f.write_text("print('hi')\n")
client = _client(tmp_path, "clean")
await client.start()
try:
assert client.is_running
version = await client.open_file(str(f), language_id="python")
assert version == 0
await client.wait_for_diagnostics(str(f), version, mode="document")
diags = client.diagnostics_for(str(f))
assert diags == []
finally:
await client.shutdown()
assert not client.is_running
@pytest.mark.asyncio
async def test_client_receives_published_errors(tmp_path: Path):
f = tmp_path / "x.py"
f.write_text("print('hi')\n")
client = _client(tmp_path, "errors")
await client.start()
try:
version = await client.open_file(str(f), language_id="python")
await client.wait_for_diagnostics(str(f), version, mode="document")
diags = client.diagnostics_for(str(f))
assert len(diags) == 1
d = diags[0]
assert d["severity"] == 1
assert d["code"] == "MOCK001"
assert d["source"] == "mock-lsp"
assert "synthetic error" in d["message"]
finally:
await client.shutdown()
@pytest.mark.asyncio
async def test_client_didchange_bumps_version(tmp_path: Path):
f = tmp_path / "x.py"
f.write_text("print('hi')\n")
client = _client(tmp_path, "errors")
await client.start()
try:
v0 = await client.open_file(str(f), language_id="python")
f.write_text("print('hi 2')\n")
v1 = await client.open_file(str(f), language_id="python") # re-open path = didChange
assert v1 == v0 + 1
await client.wait_for_diagnostics(str(f), v1, mode="document")
# Mock pushed a diagnostic for both events; merged view has one
# entry (push store keyed by file path).
diags = client.diagnostics_for(str(f))
assert len(diags) == 1
finally:
await client.shutdown()
@pytest.mark.asyncio
async def test_client_handles_crashing_server(tmp_path: Path):
"""When the server exits right after initialize, subsequent requests
fail gracefully (not hang)."""
f = tmp_path / "x.py"
f.write_text("")
client = _client(tmp_path, "crash")
await client.start() # should succeed (mock answers initialize before crashing)
# Give the OS a moment to deliver the EOF.
await asyncio.sleep(0.2)
# The reader loop should detect EOF and mark pending requests as failed.
try:
await asyncio.wait_for(
client.open_file(str(f), language_id="python"), timeout=2.0
)
except Exception:
pass # any exception is acceptable; the contract is "doesn't hang"
await client.shutdown()
@pytest.mark.asyncio
async def test_client_shutdown_idempotent(tmp_path: Path):
"""Calling shutdown twice must be safe."""
f = tmp_path / "x.py"
f.write_text("")
client = _client(tmp_path, "clean")
await client.start()
await client.shutdown()
await client.shutdown() # must not raise
@pytest.mark.asyncio
async def test_client_diagnostics_are_deduped(tmp_path: Path):
"""Repeated identical pushes must not produce duplicate diagnostics."""
f = tmp_path / "x.py"
f.write_text("")
client = _client(tmp_path, "errors")
await client.start()
try:
for _ in range(3):
v = await client.open_file(str(f), language_id="python")
await client.wait_for_diagnostics(str(f), v, mode="document")
diags = client.diagnostics_for(str(f))
# Push store overwrites on every notification — should have 1.
assert len(diags) == 1
finally:
await client.shutdown()