mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-21 03:39:54 +00:00
665 lines
23 KiB
Python
665 lines
23 KiB
Python
"""Tests for the `hermes proxy` subcommand and its upstream adapters."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import Any, Dict
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from hermes_cli.proxy.adapters import ADAPTERS, get_adapter
|
|
from hermes_cli.proxy.adapters.base import UpstreamAdapter, UpstreamCredential
|
|
from hermes_cli.proxy.adapters.nous_portal import NousPortalAdapter
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Adapter registry
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_registry_lists_nous():
|
|
assert "nous" in ADAPTERS
|
|
|
|
|
|
def test_get_adapter_returns_instance():
|
|
adapter = get_adapter("nous")
|
|
assert isinstance(adapter, NousPortalAdapter)
|
|
assert isinstance(adapter, UpstreamAdapter)
|
|
|
|
|
|
def test_get_adapter_case_insensitive():
|
|
assert isinstance(get_adapter("NOUS"), NousPortalAdapter)
|
|
assert isinstance(get_adapter(" Nous "), NousPortalAdapter)
|
|
|
|
|
|
def test_get_adapter_unknown_provider_raises():
|
|
with pytest.raises(ValueError, match="anthropic"):
|
|
get_adapter("anthropic") # not yet implemented
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# NousPortalAdapter
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _write_auth_store(hermes_home: Path, nous_state: Dict[str, Any]) -> Path:
|
|
"""Write an auth.json with the given nous state into a hermetic HERMES_HOME."""
|
|
auth_path = hermes_home / "auth.json"
|
|
auth_path.write_text(json.dumps({
|
|
"version": 1,
|
|
"providers": {"nous": nous_state},
|
|
}))
|
|
return auth_path
|
|
|
|
|
|
def test_nous_adapter_metadata():
|
|
adapter = NousPortalAdapter()
|
|
assert adapter.name == "nous"
|
|
assert adapter.display_name == "Nous Portal"
|
|
assert "/chat/completions" in adapter.allowed_paths
|
|
assert "/embeddings" in adapter.allowed_paths
|
|
assert "/completions" in adapter.allowed_paths
|
|
assert "/models" in adapter.allowed_paths
|
|
|
|
|
|
def test_nous_adapter_not_authenticated_when_no_auth_file(tmp_path, monkeypatch):
|
|
# HERMES_HOME is already set by conftest, but make doubly sure
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
adapter = NousPortalAdapter()
|
|
assert not adapter.is_authenticated()
|
|
|
|
|
|
def test_nous_adapter_not_authenticated_when_provider_missing(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
(tmp_path / "auth.json").write_text(json.dumps({
|
|
"version": 1,
|
|
"providers": {},
|
|
}))
|
|
assert not NousPortalAdapter().is_authenticated()
|
|
|
|
|
|
def test_nous_adapter_authenticated_with_agent_key(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
_write_auth_store(tmp_path, {
|
|
"agent_key": "ov-test-key",
|
|
"agent_key_expires_at": "2099-01-01T00:00:00Z",
|
|
"inference_base_url": "https://inference-api.nousresearch.com/v1",
|
|
})
|
|
assert NousPortalAdapter().is_authenticated()
|
|
|
|
|
|
def test_nous_adapter_authenticated_with_refresh_token_only(tmp_path, monkeypatch):
|
|
"""If access_token+refresh_token exist but no agent_key yet, we can still mint."""
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
_write_auth_store(tmp_path, {
|
|
"access_token": "access-tok",
|
|
"refresh_token": "refresh-tok",
|
|
})
|
|
assert NousPortalAdapter().is_authenticated()
|
|
|
|
|
|
def test_nous_adapter_get_credential_uses_runtime_resolver(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
_write_auth_store(tmp_path, {
|
|
"access_token": "access-tok",
|
|
"refresh_token": "refresh-tok",
|
|
"client_id": "hermes-cli",
|
|
"portal_base_url": "https://portal.nousresearch.com",
|
|
"inference_base_url": "https://inference-api.nousresearch.com/v1",
|
|
})
|
|
|
|
refreshed_state = {
|
|
"api_key": "minted-bearer",
|
|
"base_url": "https://inference-api.nousresearch.com/v1",
|
|
"expires_at": "2099-01-01T00:00:00Z",
|
|
}
|
|
|
|
with patch(
|
|
"hermes_cli.proxy.adapters.nous_portal.resolve_nous_runtime_credentials",
|
|
return_value=refreshed_state,
|
|
) as mock_resolve:
|
|
adapter = NousPortalAdapter()
|
|
cred = adapter.get_credential()
|
|
|
|
mock_resolve.assert_called_once()
|
|
assert cred.bearer == "minted-bearer"
|
|
assert cred.base_url == "https://inference-api.nousresearch.com/v1"
|
|
assert cred.expires_at == "2099-01-01T00:00:00Z"
|
|
assert cred.token_type == "Bearer"
|
|
|
|
|
|
def test_nous_adapter_retry_credential_forces_legacy_mint(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
_write_auth_store(tmp_path, {
|
|
"access_token": "jwt-access",
|
|
"refresh_token": "refresh-tok",
|
|
"client_id": "hermes-cli",
|
|
"portal_base_url": "https://portal.nousresearch.com",
|
|
"inference_base_url": "https://inference-api.nousresearch.com/v1",
|
|
"agent_key": "jwt-access",
|
|
})
|
|
|
|
refreshed_state = {
|
|
"api_key": "legacy-bearer",
|
|
"base_url": "https://inference-api.nousresearch.com/v1",
|
|
"expires_at": "2099-01-01T00:00:00Z",
|
|
}
|
|
|
|
with patch(
|
|
"hermes_cli.proxy.adapters.nous_portal.resolve_nous_runtime_credentials",
|
|
return_value=refreshed_state,
|
|
) as mock_resolve:
|
|
adapter = NousPortalAdapter()
|
|
cred = adapter.get_retry_credential(
|
|
failed_credential=UpstreamCredential(
|
|
bearer="header.jwt.signature",
|
|
base_url="https://inference-api.nousresearch.com/v1",
|
|
),
|
|
status_code=401,
|
|
)
|
|
|
|
assert cred is not None
|
|
assert cred.bearer == "legacy-bearer"
|
|
assert mock_resolve.call_args.kwargs["inference_auth_mode"] == "legacy"
|
|
|
|
|
|
def test_nous_adapter_retry_credential_skips_opaque_bearer(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
_write_auth_store(tmp_path, {
|
|
"access_token": "jwt-access",
|
|
"refresh_token": "refresh-tok",
|
|
"agent_key": "opaque-bearer",
|
|
})
|
|
|
|
with patch(
|
|
"hermes_cli.proxy.adapters.nous_portal.resolve_nous_runtime_credentials",
|
|
) as mock_resolve:
|
|
adapter = NousPortalAdapter()
|
|
cred = adapter.get_retry_credential(
|
|
failed_credential=UpstreamCredential(
|
|
bearer="opaque-bearer",
|
|
base_url="https://inference-api.nousresearch.com/v1",
|
|
),
|
|
status_code=401,
|
|
)
|
|
|
|
assert cred is None
|
|
mock_resolve.assert_not_called()
|
|
|
|
|
|
def test_nous_adapter_get_credential_raises_when_not_logged_in(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
adapter = NousPortalAdapter()
|
|
with pytest.raises(RuntimeError, match="hermes login nous"):
|
|
adapter.get_credential()
|
|
|
|
|
|
def test_nous_adapter_get_credential_raises_on_refresh_failure(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
_write_auth_store(tmp_path, {
|
|
"access_token": "access-tok",
|
|
"refresh_token": "refresh-tok",
|
|
})
|
|
|
|
with patch(
|
|
"hermes_cli.proxy.adapters.nous_portal.resolve_nous_runtime_credentials",
|
|
side_effect=RuntimeError("Refresh session has been revoked"),
|
|
):
|
|
adapter = NousPortalAdapter()
|
|
with pytest.raises(RuntimeError, match="Refresh session has been revoked"):
|
|
adapter.get_credential()
|
|
|
|
|
|
def test_nous_adapter_quarantines_terminal_refresh_failure(tmp_path, monkeypatch):
|
|
from hermes_cli.auth import AuthError
|
|
from agent.credential_pool import load_pool
|
|
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
_write_auth_store(tmp_path, {
|
|
"access_token": "access-tok",
|
|
"refresh_token": "refresh-tok",
|
|
"agent_key": "stale-agent-key",
|
|
})
|
|
assert load_pool("nous").select() is not None
|
|
|
|
with patch(
|
|
"hermes_cli.proxy.adapters.nous_portal.resolve_nous_runtime_credentials",
|
|
side_effect=AuthError(
|
|
"Refresh session has been revoked",
|
|
provider="nous",
|
|
code="invalid_grant",
|
|
relogin_required=True,
|
|
),
|
|
):
|
|
adapter = NousPortalAdapter()
|
|
with pytest.raises(RuntimeError, match="Refresh session has been revoked"):
|
|
adapter.get_credential()
|
|
|
|
stored = json.loads((tmp_path / "auth.json").read_text())
|
|
nous_state = stored["providers"]["nous"]
|
|
assert not nous_state.get("refresh_token")
|
|
assert not nous_state.get("access_token")
|
|
assert not nous_state.get("agent_key")
|
|
assert nous_state["last_auth_error"]["code"] == "invalid_grant"
|
|
assert stored.get("credential_pool", {}).get("nous") == []
|
|
|
|
|
|
def test_nous_adapter_get_credential_raises_when_no_agent_key_returned(tmp_path, monkeypatch):
|
|
"""If the refresh helper succeeds but produces no agent_key, we surface a clear error."""
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
_write_auth_store(tmp_path, {
|
|
"access_token": "access-tok",
|
|
"refresh_token": "refresh-tok",
|
|
})
|
|
|
|
with patch(
|
|
"hermes_cli.proxy.adapters.nous_portal.resolve_nous_runtime_credentials",
|
|
return_value={"access_token": "a", "refresh_token": "r"},
|
|
):
|
|
adapter = NousPortalAdapter()
|
|
with pytest.raises(RuntimeError, match="did not return a usable agent_key"):
|
|
adapter.get_credential()
|
|
|
|
|
|
def test_nous_adapter_concurrent_refresh_serialized(tmp_path, monkeypatch):
|
|
"""Two parallel get_credential() calls must serialize through the lock."""
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
_write_auth_store(tmp_path, {
|
|
"access_token": "a", "refresh_token": "r",
|
|
})
|
|
|
|
call_log: list = []
|
|
in_flight = threading.Event()
|
|
overlap_detected = threading.Event()
|
|
counter = [0]
|
|
counter_lock = threading.Lock()
|
|
|
|
def serializing_refresh(**kwargs):
|
|
# If another thread is already inside refresh, the lock is broken.
|
|
if in_flight.is_set():
|
|
overlap_detected.set()
|
|
in_flight.set()
|
|
try:
|
|
call_log.append(threading.current_thread().ident)
|
|
# Simulate refresh latency so any race window is exposed.
|
|
import time
|
|
time.sleep(0.05)
|
|
with counter_lock:
|
|
counter[0] += 1
|
|
idx = counter[0]
|
|
return {
|
|
"api_key": f"key-{idx}",
|
|
"expires_at": "2099-01-01T00:00:00Z",
|
|
"base_url": "https://inference-api.nousresearch.com/v1",
|
|
}
|
|
finally:
|
|
in_flight.clear()
|
|
|
|
adapter = NousPortalAdapter()
|
|
results: list = []
|
|
errors: list = []
|
|
|
|
def worker():
|
|
try:
|
|
results.append(adapter.get_credential().bearer)
|
|
except Exception as exc: # pragma: no cover - shouldn't happen
|
|
errors.append(exc)
|
|
|
|
with patch(
|
|
"hermes_cli.proxy.adapters.nous_portal.resolve_nous_runtime_credentials",
|
|
side_effect=serializing_refresh,
|
|
):
|
|
threads = [threading.Thread(target=worker) for _ in range(3)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
assert not errors, f"workers errored: {errors}"
|
|
assert len(results) == 3
|
|
assert len(call_log) == 3
|
|
assert not overlap_detected.is_set(), "refresh calls overlapped — lock is broken"
|
|
assert all(r.startswith("key-") for r in results)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Server: path filtering + forwarding
|
|
#
|
|
# We run the proxy AND a fake upstream as real aiohttp servers on ephemeral
|
|
# ports. Avoids pytest-aiohttp's fixtures (extra dependency for one test file).
|
|
# ---------------------------------------------------------------------------
|
|
|
|
aiohttp = pytest.importorskip("aiohttp")
|
|
from aiohttp import web # noqa: E402
|
|
|
|
from hermes_cli.proxy.server import create_app # noqa: E402
|
|
|
|
|
|
class FakeAdapter(UpstreamAdapter):
|
|
"""A test adapter that returns a fixed credential without touching disk."""
|
|
|
|
def __init__(self, base_url: str, bearer: str = "test-bearer",
|
|
allowed=None, raise_on_credential=False,
|
|
retry_bearer: str | None = None):
|
|
self._base_url = base_url
|
|
self._bearer = bearer
|
|
self._allowed = frozenset(allowed or ["/chat/completions"])
|
|
self._raise = raise_on_credential
|
|
self._retry_bearer = retry_bearer
|
|
self.calls = 0
|
|
self.retry_calls = 0
|
|
|
|
@property
|
|
def name(self): return "fake"
|
|
|
|
@property
|
|
def display_name(self): return "Fake Provider"
|
|
|
|
@property
|
|
def allowed_paths(self): return self._allowed
|
|
|
|
def is_authenticated(self): return True
|
|
|
|
def get_credential(self):
|
|
self.calls += 1
|
|
if self._raise:
|
|
raise RuntimeError("simulated auth failure")
|
|
return UpstreamCredential(
|
|
bearer=self._bearer, base_url=self._base_url,
|
|
expires_at="2099-01-01T00:00:00Z",
|
|
)
|
|
|
|
def get_retry_credential(self, *, failed_credential, status_code):
|
|
_ = failed_credential
|
|
self.retry_calls += 1
|
|
if status_code != 401 or not self._retry_bearer:
|
|
return None
|
|
return UpstreamCredential(
|
|
bearer=self._retry_bearer,
|
|
base_url=self._base_url,
|
|
expires_at="2099-01-01T00:00:00Z",
|
|
)
|
|
|
|
|
|
async def _start_runner(app: "web.Application"):
|
|
"""Spin up an aiohttp app on an ephemeral localhost port. Returns (runner, base_url)."""
|
|
runner = web.AppRunner(app, access_log=None)
|
|
await runner.setup()
|
|
site = web.TCPSite(runner, host="127.0.0.1", port=0)
|
|
await site.start()
|
|
sockets = list(site._server.sockets) # type: ignore[union-attr]
|
|
port = sockets[0].getsockname()[1]
|
|
return runner, f"http://127.0.0.1:{port}"
|
|
|
|
|
|
def _build_fake_upstream(captured: Dict[str, Any]) -> "web.Application":
|
|
async def echo(request):
|
|
body = await request.read()
|
|
captured["requests"].append({
|
|
"method": request.method,
|
|
"path": request.path,
|
|
"auth": request.headers.get("Authorization"),
|
|
"body": body.decode("utf-8") if body else "",
|
|
})
|
|
return web.json_response({"echoed": True, "path": request.path})
|
|
|
|
async def sse(request):
|
|
resp = web.StreamResponse(
|
|
status=200, headers={"Content-Type": "text/event-stream"},
|
|
)
|
|
await resp.prepare(request)
|
|
for chunk in [b"data: hello\n\n", b"data: world\n\n", b"data: [DONE]\n\n"]:
|
|
await resp.write(chunk)
|
|
await resp.write_eof()
|
|
return resp
|
|
|
|
app = web.Application()
|
|
app.router.add_route("*", "/v1/chat/completions", echo)
|
|
app.router.add_route("*", "/v1/embeddings", echo)
|
|
app.router.add_route("*", "/v1/sse", sse)
|
|
return app
|
|
|
|
|
|
def _build_retrying_fake_upstream(captured: Dict[str, Any]) -> "web.Application":
|
|
async def maybe_unauthorized(request):
|
|
body = await request.read()
|
|
auth = request.headers.get("Authorization")
|
|
captured["requests"].append({
|
|
"method": request.method,
|
|
"path": request.path,
|
|
"auth": auth,
|
|
"body": body.decode("utf-8") if body else "",
|
|
})
|
|
if auth == "Bearer jwt-bearer":
|
|
return web.json_response({"error": "bad token"}, status=401)
|
|
return web.json_response({"ok": True})
|
|
|
|
app = web.Application()
|
|
app.router.add_route("*", "/v1/chat/completions", maybe_unauthorized)
|
|
return app
|
|
|
|
|
|
def test_server_forwards_chat_completions():
|
|
async def run():
|
|
captured: Dict[str, Any] = {"requests": []}
|
|
upstream_runner, upstream_base = await _start_runner(_build_fake_upstream(captured))
|
|
adapter = FakeAdapter(f"{upstream_base}/v1", bearer="real-portal-key")
|
|
proxy_runner, proxy_base = await _start_runner(create_app(adapter))
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
f"{proxy_base}/v1/chat/completions",
|
|
json={"model": "Hermes-4-70B",
|
|
"messages": [{"role": "user", "content": "hi"}]},
|
|
headers={"Authorization": "Bearer client-dummy-key"},
|
|
) as resp:
|
|
assert resp.status == 200
|
|
data = await resp.json()
|
|
assert data["echoed"] is True
|
|
|
|
assert len(captured["requests"]) == 1
|
|
req = captured["requests"][0]
|
|
assert req["auth"] == "Bearer real-portal-key"
|
|
assert "Hermes-4-70B" in req["body"]
|
|
finally:
|
|
await proxy_runner.cleanup()
|
|
await upstream_runner.cleanup()
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_server_retries_once_with_adapter_retry_credential_on_401():
|
|
async def run():
|
|
captured: Dict[str, Any] = {"requests": []}
|
|
upstream_runner, upstream_base = await _start_runner(
|
|
_build_retrying_fake_upstream(captured)
|
|
)
|
|
adapter = FakeAdapter(
|
|
f"{upstream_base}/v1",
|
|
bearer="jwt-bearer",
|
|
retry_bearer="legacy-bearer",
|
|
)
|
|
proxy_runner, proxy_base = await _start_runner(create_app(adapter))
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
f"{proxy_base}/v1/chat/completions",
|
|
json={"model": "Hermes-4-70B"},
|
|
) as resp:
|
|
assert resp.status == 200
|
|
data = await resp.json()
|
|
assert data["ok"] is True
|
|
|
|
assert adapter.retry_calls == 1
|
|
assert [req["auth"] for req in captured["requests"]] == [
|
|
"Bearer jwt-bearer",
|
|
"Bearer legacy-bearer",
|
|
]
|
|
finally:
|
|
await proxy_runner.cleanup()
|
|
await upstream_runner.cleanup()
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_server_rejects_disallowed_path():
|
|
async def run():
|
|
adapter = FakeAdapter("http://unused.example/v1", allowed=["/chat/completions"])
|
|
runner, base = await _start_runner(create_app(adapter))
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(f"{base}/v1/random/endpoint") as resp:
|
|
assert resp.status == 404
|
|
body = await resp.json()
|
|
assert body["error"]["type"] == "path_not_allowed"
|
|
assert "/chat/completions" in body["error"]["message"]
|
|
finally:
|
|
await runner.cleanup()
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_server_returns_401_when_adapter_fails():
|
|
async def run():
|
|
adapter = FakeAdapter("http://unused.example/v1", raise_on_credential=True)
|
|
runner, base = await _start_runner(create_app(adapter))
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(f"{base}/v1/chat/completions", json={}) as resp:
|
|
assert resp.status == 401
|
|
body = await resp.json()
|
|
assert body["error"]["type"] == "upstream_auth_failed"
|
|
assert "simulated auth failure" in body["error"]["message"]
|
|
finally:
|
|
await runner.cleanup()
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_server_health_endpoint():
|
|
async def run():
|
|
adapter = FakeAdapter("http://unused.example/v1")
|
|
runner, base = await _start_runner(create_app(adapter))
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(f"{base}/health") as resp:
|
|
assert resp.status == 200
|
|
body = await resp.json()
|
|
assert body["status"] == "ok"
|
|
assert body["upstream"] == "Fake Provider"
|
|
assert body["authenticated"] is True
|
|
finally:
|
|
await runner.cleanup()
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_server_streams_sse():
|
|
async def run():
|
|
captured: Dict[str, Any] = {"requests": []}
|
|
upstream_runner, upstream_base = await _start_runner(_build_fake_upstream(captured))
|
|
adapter = FakeAdapter(f"{upstream_base}/v1", allowed=["/sse"])
|
|
proxy_runner, proxy_base = await _start_runner(create_app(adapter))
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(f"{proxy_base}/v1/sse") as resp:
|
|
assert resp.status == 200
|
|
chunks = []
|
|
async for chunk in resp.content.iter_any():
|
|
chunks.append(chunk)
|
|
full = b"".join(chunks)
|
|
assert b"data: hello" in full
|
|
assert b"data: [DONE]" in full
|
|
finally:
|
|
await proxy_runner.cleanup()
|
|
await upstream_runner.cleanup()
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_server_strips_client_auth_header():
|
|
"""The client's Authorization header MUST NOT reach the upstream."""
|
|
async def run():
|
|
captured: Dict[str, Any] = {"requests": []}
|
|
upstream_runner, upstream_base = await _start_runner(_build_fake_upstream(captured))
|
|
adapter = FakeAdapter(f"{upstream_base}/v1", bearer="ours")
|
|
proxy_runner, proxy_base = await _start_runner(create_app(adapter))
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
f"{proxy_base}/v1/chat/completions",
|
|
json={},
|
|
headers={"Authorization": "Bearer SHOULD_NOT_LEAK"},
|
|
) as resp:
|
|
await resp.read()
|
|
assert captured["requests"][0]["auth"] == "Bearer ours"
|
|
assert "SHOULD_NOT_LEAK" not in captured["requests"][0]["auth"]
|
|
finally:
|
|
await proxy_runner.cleanup()
|
|
await upstream_runner.cleanup()
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI handlers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_cmd_proxy_status_runs(capsys, tmp_path, monkeypatch):
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
from hermes_cli.proxy.cli import cmd_proxy_status
|
|
|
|
args = MagicMock()
|
|
rc = cmd_proxy_status(args)
|
|
assert rc == 0
|
|
out = capsys.readouterr().out
|
|
assert "nous" in out
|
|
assert "Nous Portal" in out
|
|
assert "not logged in" in out
|
|
|
|
|
|
def test_cmd_proxy_providers_runs(capsys):
|
|
from hermes_cli.proxy.cli import cmd_proxy_list_providers
|
|
|
|
args = MagicMock()
|
|
rc = cmd_proxy_list_providers(args)
|
|
assert rc == 0
|
|
out = capsys.readouterr().out
|
|
assert "nous" in out
|
|
assert "Nous Portal" in out
|
|
|
|
|
|
def test_cmd_proxy_start_refuses_unknown_provider(capsys):
|
|
from hermes_cli.proxy.cli import cmd_proxy_start
|
|
|
|
args = MagicMock()
|
|
args.provider = "no-such-provider"
|
|
args.host = None
|
|
args.port = None
|
|
rc = cmd_proxy_start(args)
|
|
assert rc == 2
|
|
err = capsys.readouterr().err
|
|
assert "no-such-provider" in err
|
|
|
|
|
|
def test_cmd_proxy_start_refuses_when_unauthenticated(capsys, tmp_path, monkeypatch):
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
from hermes_cli.proxy.cli import cmd_proxy_start
|
|
|
|
args = MagicMock()
|
|
args.provider = "nous"
|
|
args.host = None
|
|
args.port = None
|
|
rc = cmd_proxy_start(args)
|
|
assert rc == 2
|
|
err = capsys.readouterr().err
|
|
assert "hermes login nous" in err
|