Files
hermes-webui/api/gateway_chat.py
T
2026-05-28 08:40:51 -04:00

329 lines
13 KiB
Python

"""Default-off Hermes Gateway bridge for browser-originated chat turns."""
from __future__ import annotations
import json
import logging
import os
import threading
import time
import urllib.error
import urllib.request
from typing import Any
from api.config import (
CANCEL_FLAGS,
STREAMS,
STREAMS_LOCK,
STREAM_LAST_EVENT_ID,
STREAM_LIVE_TOOL_CALLS,
STREAM_PARTIAL_TEXT,
STREAM_REASONING_TEXT,
_get_session_agent_lock,
register_active_run,
unregister_active_run,
update_active_run,
)
from api.helpers import _redact_text, redact_session_data
from api.models import get_session
from api.run_journal import RunJournalWriter
logger = logging.getLogger(__name__)
_WEBUI_CHAT_BACKEND_ENV = "HERMES_WEBUI_CHAT_BACKEND"
_WEBUI_GATEWAY_BASE_URL_ENV = "HERMES_WEBUI_GATEWAY_BASE_URL"
_WEBUI_GATEWAY_API_KEY_ENV = "HERMES_WEBUI_GATEWAY_API_KEY"
_GATEWAY_CHAT_BACKENDS = {"gateway", "api_server", "api-server"}
def webui_chat_backend_mode(config_data=None, environ: dict[str, str] | None = None) -> str:
"""Return the explicitly selected browser chat backend.
The default remains the in-process WebUI runtime. Only explicit gateway
values opt browser chat into the Hermes API server bridge; generic truthy
strings are deliberately ignored so deployments do not change execution
ownership by accident.
"""
source = os.environ if environ is None else environ
cfg = config_data if isinstance(config_data, dict) else {}
raw = str(
source.get(_WEBUI_CHAT_BACKEND_ENV)
or cfg.get("webui_chat_backend")
or ""
).strip().lower()
if raw in _GATEWAY_CHAT_BACKENDS:
return "gateway"
return "legacy"
def webui_gateway_chat_enabled(config_data=None, environ: dict[str, str] | None = None) -> bool:
return webui_chat_backend_mode(config_data, environ) == "gateway"
def _gateway_base_url(config_data=None, environ: dict[str, str] | None = None) -> str:
source = os.environ if environ is None else environ
cfg = config_data if isinstance(config_data, dict) else {}
raw = str(
source.get(_WEBUI_GATEWAY_BASE_URL_ENV)
or cfg.get("webui_gateway_base_url")
or "http://127.0.0.1:8642"
).strip()
return raw.rstrip("/") or "http://127.0.0.1:8642"
def _gateway_api_key(environ: dict[str, str] | None = None) -> str:
source = os.environ if environ is None else environ
return str(
source.get(_WEBUI_GATEWAY_API_KEY_ENV)
or source.get("API_SERVER_KEY")
or ""
).strip()
def _gateway_sse_delta(payload: dict) -> str:
"""Extract assistant text from an OpenAI-compatible streaming chunk."""
try:
choices = payload.get("choices") or []
if not choices:
return ""
choice = choices[0] or {}
delta = choice.get("delta") or {}
content = delta.get("content")
if isinstance(content, str):
return content
message = choice.get("message") or {}
content = message.get("content")
return content if isinstance(content, str) else ""
except Exception:
return ""
def _gateway_stream_usage(payload: dict) -> dict:
usage = payload.get("usage") if isinstance(payload, dict) else None
if not isinstance(usage, dict):
return {}
return {
"input_tokens": int(usage.get("prompt_tokens") or usage.get("input_tokens") or 0),
"output_tokens": int(usage.get("completion_tokens") or usage.get("output_tokens") or 0),
"estimated_cost": usage.get("estimated_cost") or usage.get("estimated_cost_usd") or 0,
}
def _stream_writeback_is_current(session: Any, stream_id: str) -> bool:
return bool(stream_id and getattr(session, "active_stream_id", None) == stream_id)
def _clear_gateway_pending_state(session: Any, stream_id: str) -> None:
if not _stream_writeback_is_current(session, stream_id):
return
session.active_stream_id = None
session.pending_user_message = None
session.pending_attachments = None
session.pending_started_at = None
session.save()
def _run_gateway_chat_streaming(
session_id,
msg_text,
model,
workspace,
stream_id,
attachments=None,
*,
model_provider=None,
):
"""Bridge a WebUI chat turn through Hermes Gateway's API server.
This default-off path keeps the browser contract unchanged: /api/chat/start
still returns a local stream_id and /api/chat/stream still receives WebUI SSE
event names. The worker translates OpenAI-compatible streaming chunks from
the configured Gateway API server into those local events and persists the
final user/assistant turn back into the WebUI session.
"""
q = STREAMS.get(stream_id)
if q is None:
return
register_active_run(
stream_id,
session_id=session_id,
started_at=time.time(),
phase="gateway-starting",
workspace=str(workspace),
model=model,
provider=model_provider,
backend="gateway",
)
try:
run_journal = RunJournalWriter(session_id, stream_id)
except Exception:
run_journal = None
logger.debug("Failed to initialize gateway run journal for stream %s", stream_id, exc_info=True)
cancel_event = threading.Event()
with STREAMS_LOCK:
CANCEL_FLAGS[stream_id] = cancel_event
STREAM_PARTIAL_TEXT[stream_id] = ""
STREAM_REASONING_TEXT[stream_id] = ""
STREAM_LIVE_TOOL_CALLS[stream_id] = []
def put_gateway_event(event, data):
if cancel_event.is_set() and event not in ("cancel", "error", "apperror"):
return
if run_journal is not None:
try:
journaled = run_journal.append_sse_event(event, data)
event_id = (journaled or {}).get("event_id") if isinstance(journaled, dict) else None
if event_id:
STREAM_LAST_EVENT_ID[stream_id] = event_id
except Exception:
logger.debug("Failed to append gateway event %s for stream %s", event, stream_id, exc_info=True)
try:
q.put_nowait((event, data))
except Exception:
logger.debug("Failed to put gateway event to queue")
s = None
final_text = ""
usage = {"input_tokens": 0, "output_tokens": 0, "estimated_cost": 0}
try:
s = get_session(session_id)
from api.config import get_config # imported lazily to avoid config-cycle churn
cfg = get_config()
base_url = _gateway_base_url(cfg)
api_key = _gateway_api_key()
url = f"{base_url}/v1/chat/completions"
headers = {
"Content-Type": "application/json",
"Accept": "text/event-stream",
"X-Hermes-Session-Id": session_id,
}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
# Scope Gateway long-term continuity to this WebUI conversation
# without exposing the browser's auth cookie or CSRF material.
headers["X-Hermes-Session-Key"] = f"webui:{session_id}"
message_content: Any = str(msg_text or "")
if attachments:
try:
from api.streaming import _build_native_multimodal_message
message_content = _build_native_multimodal_message("", str(msg_text or ""), attachments, str(workspace), cfg=cfg)
except Exception:
logger.debug("Failed to build gateway multimodal attachment payload", exc_info=True)
message_content = str(msg_text or "")
body = {
"model": model or "default",
"stream": True,
"messages": [{"role": "user", "content": message_content}],
}
if model_provider:
body["provider"] = model_provider
req = urllib.request.Request(
url,
data=json.dumps(body).encode("utf-8"),
headers=headers,
method="POST",
)
update_active_run(stream_id, phase="gateway-request")
last_payload = {}
with urllib.request.urlopen(req, timeout=600) as resp:
for raw_line in resp:
if cancel_event.is_set():
put_gateway_event("cancel", {"message": "Cancelled by user"})
return
line = raw_line.decode("utf-8", errors="replace").strip()
if not line or not line.startswith("data:"):
continue
data = line[5:].strip()
if data == "[DONE]":
break
try:
payload = json.loads(data)
except json.JSONDecodeError:
continue
last_payload = payload
delta = _gateway_sse_delta(payload)
if delta:
final_text += delta
if stream_id in STREAM_PARTIAL_TEXT:
STREAM_PARTIAL_TEXT[stream_id] += delta
put_gateway_event("token", {"text": delta})
usage.update({k: v for k, v in _gateway_stream_usage(payload).items() if v})
usage.update({k: v for k, v in _gateway_stream_usage(last_payload).items() if v})
assistant_text = final_text.strip()
if not assistant_text:
put_gateway_event("apperror", {
"label": "Gateway returned no response",
"type": "gateway_empty_response",
"message": "Gateway returned no assistant message for this turn.",
"hint": "Check that Hermes Gateway API server is running and reachable.",
})
return
with _get_session_agent_lock(session_id):
s = get_session(session_id)
if not _stream_writeback_is_current(s, stream_id):
return
now = int(time.time())
user_msg = {"role": "user", "content": str(msg_text or ""), "timestamp": now}
if attachments:
user_msg["attachments"] = list(attachments)
assistant_msg = {"role": "assistant", "content": assistant_text, "timestamp": now}
previous_context = list(getattr(s, "context_messages", None) or getattr(s, "messages", None) or [])
s.context_messages = previous_context + [user_msg, assistant_msg]
display = list(getattr(s, "messages", None) or [])
# Avoid duplicating the eager-save checkpointed user message.
if display:
latest = display[-1]
if isinstance(latest, dict) and latest.get("role") == "user":
latest_text = " ".join(str(latest.get("content") or "").split())
msg_norm = " ".join(str(msg_text or "").split())
if latest_text == msg_norm:
display = display[:-1]
s.messages = display + [user_msg, assistant_msg]
s.active_stream_id = None
s.pending_user_message = None
s.pending_attachments = None
s.pending_started_at = None
s.workspace = str(workspace)
s.model = model
s.model_provider = model_provider
s.save()
gateway_session_payload = s.compact() | {"messages": s.messages, "tool_calls": []}
put_gateway_event("done", {"session": redact_session_data(gateway_session_payload), "usage": usage})
put_gateway_event("stream_end", {"session_id": session_id})
except urllib.error.HTTPError as exc:
try:
err_body = exc.read(2048).decode("utf-8", errors="replace")
except Exception:
err_body = ""
safe = _redact_text(err_body or str(exc))[:500]
put_gateway_event("apperror", {
"label": "Gateway request failed",
"type": "gateway_http_error",
"message": f"Gateway returned HTTP {exc.code}.",
"hint": safe or "Check the configured Gateway API server.",
})
except Exception as exc:
safe = _redact_text(str(exc))[:500]
put_gateway_event("apperror", {
"label": "Gateway request failed",
"type": "gateway_error",
"message": safe or "Gateway request failed.",
"hint": "Check HERMES_WEBUI_GATEWAY_BASE_URL and Gateway API server health.",
})
finally:
if s is not None:
try:
with _get_session_agent_lock(session_id):
_clear_gateway_pending_state(get_session(session_id), stream_id)
except Exception:
logger.debug("Failed to clear gateway stream state", exc_info=True)
with STREAMS_LOCK:
CANCEL_FLAGS.pop(stream_id, None)
STREAM_PARTIAL_TEXT.pop(stream_id, None)
STREAM_REASONING_TEXT.pop(stream_id, None)
STREAM_LIVE_TOOL_CALLS.pop(stream_id, None)
STREAM_LAST_EVENT_ID.pop(stream_id, None)
STREAMS.pop(stream_id, None)
unregister_active_run(stream_id)