Files
hermes-webui/api/gateway_watcher.py
T
nesquena-hermes ad8e10304c v0.50.207: batch of 10 PRs — TPS stat, SSE guard, session polish, cron UX, folder create, model errors, session speed, title gen (#1031)
* fix: remove orphaned i18n keys from top-level LOCALES object

Three Traditional Chinese translation keys (cmd_status, memory_saved,
profile_delete_title) were placed outside any locale block between the
en and ru blocks in static/i18n.js. They became top-level properties
of the LOCALES object, causing them to appear as invalid language
options in the Settings > Preferences dropdown.

The correct translations already exist in the zh-Hant locale block.

Fixes #1008

* fix: block stale SSE events from polluting new session's DOM

- appendThinking(): guard with !S.session||!S.activeStreamId to drop
  events from a previous session's SSE stream during a session switch
- appendLiveToolCard(): same guard for consistency
- finalizeThinkingCard(): scroll thinking-card-body to top when
  scroll is pinned, so completed response is immediately visible
- appendThinking(): auto-scroll thinking card body to bottom while
  streaming if user is watching (scroll pinned)

* Fix empty agent sessions in sidebar

* fix: resolve cron UI UX issues — icon ambiguity, toast overlap, running status

Fixes #995 — three sub-issues in the Cron Jobs UI:

1. Dual play icons ambiguous: Resume button now shows a distinct
   play+bar icon (play triangle + vertical line) instead of the
   identical triangle used by Run now.

2. Toast notification overlapping header buttons: Added
   position:relative; z-index:10 to .main-view-header so it
   stacks above the fixed toast (z-index:100 within its layer).

3. No running status after trigger: After triggering a job, the
   status badge immediately shows 'running…' with a CSS spinner
   animation, and polls the cron list every 3s (up to 30s) to
   refresh when the job completes.

- Added cron_status_running i18n key in all 5 locales (en, es, de, ru, zh, zh-Hant)
- Added .detail-badge.running CSS class with spinner animation
- New functions: _setCronDetailStatus(), _startCronRunningPoll()

* fix(#1011): address review feedback — poll cleanup, badge persistence, 30s fallback

- _clearCronDetail() now clears _cronRunningPoll interval on navigation
- Poll re-applies 'running' badge after loadCrons() re-render (prevents flicker)
- When poll ends (30s max), detail re-renders with actual status as fallback

* feat: create folder and add space directly from UI (#782)

- After creating a folder via the file tree New folder button, offer to add it as a space via confirm dialog
- Add Create folder if it doesnt exist checkbox in the New Space form
- Backend: support create flag in /api/workspaces/add to mkdir before validation
- i18n: 4 new keys (folder_add_as_space_title/msg/btn, workspace_auto_create_folder) in all 6 locales

* fix: validate workspace path before mkdir to prevent orphan directories

Review feedback (critical): the previous code called mkdir() before
validate_workspace_to_add(), which meant a rejected path (e.g. system dir)
would leave an orphan directory on disk.

New flow:
1. Resolve path and check against blocked system roots BEFORE any mutation
2. mkdir() only if path passes the blocklist check
3. Full validation (exists, is_dir) after mkdir

Also imports _workspace_blocked_roots for the pre-mutation blocklist check.

* fix(#1014): classify model-not-found errors with helpful message

- Add model_not_found error type to streaming.py exception classifier
- Detect 404, 'not found', 'does not exist', 'invalid model' patterns
- Strip HTML tags from provider error messages (nginx 404 pages, etc.)
- Add model_not_found branch to apperror handler in messages.js
- Add i18n key model_not_found_label in all 6 locales
- 15 tests covering detection, sanitization, frontend, and i18n

* feat(ui): add live TPS stat to header

Adds a TPS (Tokens Per Second) chip to the right of the header title bar
that updates live while AI output is streaming.

Metering (api/metering.py)
- Tracks per-session output + reasoning tokens via GlobalMeter singleton
- Per-session TPS = total_tokens / elapsed_time
- Global TPS = average of active sessions' TPS values
- HIGH/LOW are max/min of global_tps snapshots over a 60-minute rolling
  window (only recorded when > 0, so idle periods are excluded)
- Thread-safe with a single lock

Metering events emitted from streaming.py
- Throttled at 100ms from token/reasoning/tool callbacks so the display
  updates rapidly during fast token streams
- 1Hz ticker as fallback for slow streams (exits when no active sessions)
- Final stats emitted on stream end

Routes (api/routes.py)
- Removed POST /api/metering/interval endpoint (dynamic interval via
  focus/blur was replaced with simple always-1s-when-active approach)

UI (static/messages.js, index.html, style.css)
- TPS chip in titlebar: shows 'N.N t/s . N.N high . N.N low'
- Default: '0.0 t/s . 0.0 high' when idle
- Display updates on every metering SSE event (throttled to 100ms)

* feat: session restore speed + title gen reasoning hardening (#1025, #1026)

PR #1025 (@franksong2702): Speed up large session restore paths
- GET /api/session?messages=0 now parses only metadata before the messages array
- Metadata-only loads no longer populate the full-session LRU cache
- Frontend lazy fetch uses resolve_model=0 to avoid cold model-catalog lookup
- Hard reload no longer waits for populateModelDropdown() before restoring session

PR #1026 (@franksong2702): Harden auto title generation for reasoning models
- Raises title-gen completion budget to 512 tokens (reasoning-safe)
- Retries once with 1024 tokens on empty content / finish_reason:length
- Applies retry to both auxiliary and active-agent fallback routes
- Preserves underlying failure reason in title_status on local fallback

Co-authored-by: Frank Song <franksong2702@gmail.com>

* feat: session attention indicators in right slot + last_message_at timestamps (#1024)

PR #1024 (@franksong2702): Polish session attention indicators

- Streaming spinners and unread dots now reuse the right-side actions slot
- Running/unread rows hide timestamps; idle/read rows keep right-aligned timestamps
- Date group carets point down when expanded, right when collapsed
- Pinned group no longer repeats pinned-star icon per row
- Running indicators appear immediately after send (local busy state while /api/sessions catches up)
- Sidebar sorting/grouping/timestamps now prefer last_message_at (derived from last real message)
  so metadata-only saves don't make old sessions appear under Today

Co-authored-by: Frank Song <franksong2702@gmail.com>

* docs: v0.50.207 release notes — 10 PRs, 2169 tests (+36)

---------

Co-authored-by: bergeouss <bergeouss@users.noreply.github.com>
Co-authored-by: Josh <josh@fyul.link>
Co-authored-by: Frank Song <franksong2702@gmail.com>
Co-authored-by: nesquena-hermes <nesquena-hermes@users.noreply.github.com>
2026-04-25 13:07:35 -07:00

228 lines
7.9 KiB
Python

"""
Hermes Web UI -- Gateway session watcher.
Background daemon thread that polls state.db every 5 seconds for changes
to gateway sessions (telegram, discord, slack, etc.). When changes are
detected, it pushes notifications to all subscribed SSE clients.
This enables real-time session list updates in the sidebar without
requiring any changes to hermes-agent.
"""
import hashlib
import json
import logging
import os
import queue
import threading
import time
from pathlib import Path
from api.config import HOME
from api.agent_sessions import read_importable_agent_session_rows
logger = logging.getLogger(__name__)
# ── State hash tracking ─────────────────────────────────────────────────────
def _snapshot_hash(sessions: list) -> str:
"""Create a lightweight hash of session IDs and timestamps for change detection."""
key = '|'.join(
f"{s['session_id']}:{s.get('updated_at', 0)}:{s.get('message_count', 0)}"
for s in sorted(sessions, key=lambda x: x['session_id'])
)
return hashlib.md5(key.encode(), usedforsecurity=False).hexdigest()
# ── DB resolution (shared pattern with state_sync.py) ──────────────────────
def _get_state_db_path() -> Path:
"""Resolve state.db path for the active profile."""
try:
from api.profiles import get_active_hermes_home
hermes_home = Path(get_active_hermes_home()).expanduser().resolve()
except Exception:
hermes_home = Path(os.getenv('HERMES_HOME', str(HOME / '.hermes'))).expanduser().resolve()
return hermes_home / 'state.db'
def _get_agent_sessions_from_db() -> list:
"""Read all non-webui sessions from state.db.
Returns list of session dicts, or empty list on any error.
"""
db_path = _get_state_db_path()
if not db_path.exists():
return []
try:
sessions = []
for row in read_importable_agent_session_rows(db_path, limit=200, log=logger):
sessions.append({
'session_id': row['id'],
'title': row['title'] or 'Agent Session',
'model': row['model'] or None,
'message_count': row['message_count'] or row['actual_message_count'] or 0,
'created_at': row['started_at'],
'updated_at': row['last_activity'] or row['started_at'],
'source': row['source'] or 'cli',
})
return sessions
except Exception:
return []
# ── GatewayWatcher ──────────────────────────────────────────────────────────
class GatewayWatcher:
"""Background thread that polls state.db for agent session changes.
Usage:
watcher = GatewayWatcher()
watcher.start()
q = watcher.subscribe()
# ... receive change events via q.get() ...
watcher.unsubscribe(q)
watcher.stop()
"""
POLL_INTERVAL = 5 # seconds between polls
SUBSCRIBER_TIMEOUT = 30 # seconds before sending keepalive comment
def __init__(self):
self._subscribers: list[queue.Queue] = []
self._sub_lock = threading.Lock()
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
self._last_hash: str = ''
self._last_sessions: list = []
def start(self):
"""Start the watcher daemon thread."""
if self._thread and self._thread.is_alive():
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._poll_loop, daemon=True, name='gateway-watcher')
self._thread.start()
def is_alive(self) -> bool:
"""Return True when the poll thread is running.
Public accessor used by ``/api/sessions/gateway/stream`` probe mode and
the live SSE handler to detect a watcher instance whose poll thread
died silently (e.g. uncaught exception in ``_poll_loop``). Callers
use this to decide whether to return 503 and trigger the client-side
polling fallback, instead of handing out an SSE connection that would
never emit events.
"""
t = self._thread
return t is not None and t.is_alive()
def stop(self):
"""Stop the watcher thread."""
self._stop_event.set()
# Wake up any subscribers
with self._sub_lock:
for q in self._subscribers:
try:
q.put(None) # sentinel
except Exception:
logger.debug("Failed to send sentinel to subscriber")
if self._thread:
self._thread.join(timeout=3)
self._thread = None
def subscribe(self) -> queue.Queue:
"""Subscribe to change events. Returns a queue.Queue.
Events are dicts: {'type': 'sessions_changed', 'sessions': [...]}
A None sentinel means the watcher is stopping.
"""
q = queue.Queue(maxsize=10)
with self._sub_lock:
self._subscribers.append(q)
return q
def unsubscribe(self, q: queue.Queue):
"""Remove a subscriber queue."""
with self._sub_lock:
try:
self._subscribers.remove(q)
except ValueError:
pass
def _notify_subscribers(self, sessions: list):
"""Push change event to all subscribers."""
event = {
'type': 'sessions_changed',
'sessions': sessions,
}
with self._sub_lock:
dead = []
for q in self._subscribers:
try:
q.put_nowait(event)
except queue.Full:
dead.append(q) # remove slow consumers
except Exception:
dead.append(q)
for q in dead:
try:
self._subscribers.remove(q)
except ValueError:
pass
# Send a None sentinel so the SSE handler unblocks, closes,
# and lets the browser's EventSource auto-reconnect.
try:
q.put_nowait(None)
except Exception:
logger.debug("Failed to send sentinel to dead subscriber")
def _poll_loop(self):
"""Main polling loop. Runs in a daemon thread."""
while not self._stop_event.is_set():
try:
sessions = _get_agent_sessions_from_db()
current_hash = _snapshot_hash(sessions)
if current_hash != self._last_hash:
self._last_hash = current_hash
self._last_sessions = sessions
self._notify_subscribers(sessions)
except Exception:
logger.debug("Error in gateway watcher poll loop", exc_info=True)
# Sleep in small increments so we can stop promptly
for _ in range(self.POLL_INTERVAL * 10):
if self._stop_event.is_set():
return
time.sleep(0.1)
# ── Module-level singleton ─────────────────────────────────────────────────
_watcher: GatewayWatcher | None = None
_watcher_lock = threading.Lock()
def start_watcher():
"""Start the global gateway watcher (idempotent)."""
global _watcher
with _watcher_lock:
if _watcher is None:
_watcher = GatewayWatcher()
_watcher.start()
def stop_watcher():
"""Stop the global gateway watcher."""
global _watcher
with _watcher_lock:
if _watcher is not None:
_watcher.stop()
_watcher = None
def get_watcher() -> GatewayWatcher | None:
"""Get the global watcher instance (or None if not started)."""
with _watcher_lock:
return _watcher