mirror of
https://github.com/nesquena/hermes-webui.git
synced 2026-05-19 13:47:04 +00:00
195 lines
7.5 KiB
Python
195 lines
7.5 KiB
Python
"""
|
|
Hermes Web UI -- Streaming performance metering.
|
|
|
|
Tracks Tokens Per Second (TPS) across active WebUI streams. Metering data is
|
|
emitted via SSE events so a streaming assistant message can update its own
|
|
header while the turn is running.
|
|
|
|
Architecture
|
|
────────────
|
|
Each streaming session is tracked independently. TPS per stream is:
|
|
|
|
stream_tps = total_stream_deltas / (last_delta_ts - first_delta_ts)
|
|
|
|
The global tps is the average of all currently active streams' TPS values.
|
|
This correctly represents the system's real-time capacity regardless of how
|
|
many sessions are running or how long each has been streaming.
|
|
|
|
For HIGH/LOW tracking, every stats snapshot records the current global tps
|
|
(only when > 0 — idle periods are skipped) into a rolling 60-minute history.
|
|
The max/min of that history gives the peak throughput observed over the past hour.
|
|
|
|
The ticker in streaming.py calls get_interval() — it returns 1.0 when streams
|
|
are actively receiving output deltas so message headers update at 1 Hz, and 10.0 when idle
|
|
so the ticker exits and no idle readings are emitted.
|
|
|
|
Usage from api/streaming.py
|
|
─────────────────────────────
|
|
from api.metering import meter
|
|
|
|
meter().begin_session(stream_id) # stream starts
|
|
meter().record_token(stream_id, running_output_deltas)
|
|
meter().record_reasoning(stream_id, running_reasoning_deltas)
|
|
|
|
The SSE `metering` event payload:
|
|
{
|
|
"tps": 47.3, # omitted/null until a real reading exists
|
|
"tps_available": true, # frontend must hide TPS when false
|
|
"estimated": false, # never show byte/character-size estimates
|
|
"high": 52.1,
|
|
"low": 31.4,
|
|
"active": 1,
|
|
}
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
|
|
_HOUR_SECS = 3600.0 # rolling window for HIGH/LOW tracking
|
|
_STALE_SECS = 60.0 # consider a session inactive after this
|
|
|
|
|
|
@dataclass
|
|
class _SessionMeter:
|
|
output_tokens: int = 0
|
|
reasoning_tokens: int = 0
|
|
first_token_ts: float = 0.0 # time.monotonic() of first token received
|
|
last_token_ts: float = 0.0 # time.monotonic() of last token received
|
|
|
|
def total_tokens(self) -> int:
|
|
return self.output_tokens + self.reasoning_tokens
|
|
|
|
def tps(self) -> float | None:
|
|
if self.first_token_ts == 0.0 or self.last_token_ts <= self.first_token_ts:
|
|
return None
|
|
return self.total_tokens() / (self.last_token_ts - self.first_token_ts)
|
|
|
|
|
|
class GlobalMeter:
|
|
"""Thread-safe global streaming meter.
|
|
|
|
Tracks per-session TPS, averages them for a global tps, and maintains a
|
|
60-minute rolling history of global tps snapshots for HIGH/LOW reporting.
|
|
"""
|
|
|
|
__slots__ = (
|
|
'_lock',
|
|
'_sessions', # stream_id -> _SessionMeter
|
|
'_readings', # [(monotonic_ts, tps), ...] rolling 60-minute history
|
|
'_window_start', # monotonic ts of current window
|
|
)
|
|
|
|
def __init__(self) -> None:
|
|
self._lock = threading.Lock()
|
|
self._sessions: dict[str, _SessionMeter] = {}
|
|
self._readings: list[tuple[float, float]] = []
|
|
self._window_start: float = time.monotonic()
|
|
|
|
# ── Public API ────────────────────────────────────────────────────────────
|
|
|
|
def begin_session(self, stream_id: str) -> None:
|
|
with self._lock:
|
|
self._sessions[stream_id] = _SessionMeter()
|
|
|
|
def get_interval(self) -> float:
|
|
"""Return 1.0 when sessions are actively receiving tokens, 10.0 when idle.
|
|
|
|
Used by the streaming ticker to run at 1 Hz during work and exit when
|
|
there is nothing to measure.
|
|
"""
|
|
now = time.monotonic()
|
|
with self._lock:
|
|
# Only count sessions that have received at least one token recently.
|
|
active_sids = {
|
|
sid for sid, s in self._sessions.items()
|
|
if s.first_token_ts > 0 and (now - s.last_token_ts) <= _STALE_SECS
|
|
}
|
|
return 1.0 if active_sids else 10.0
|
|
|
|
def record_token(self, stream_id: str, running_output_tokens: int) -> None:
|
|
now = time.monotonic()
|
|
with self._lock:
|
|
s = self._sessions.get(stream_id)
|
|
if s is None:
|
|
return
|
|
if s.first_token_ts == 0.0:
|
|
s.first_token_ts = now
|
|
s.last_token_ts = now
|
|
s.output_tokens = running_output_tokens
|
|
|
|
def record_reasoning(self, stream_id: str, running_reasoning_tokens: int) -> None:
|
|
now = time.monotonic()
|
|
with self._lock:
|
|
s = self._sessions.get(stream_id)
|
|
if s is None:
|
|
return
|
|
if s.first_token_ts == 0.0:
|
|
s.first_token_ts = now
|
|
s.last_token_ts = now
|
|
s.reasoning_tokens = running_reasoning_tokens
|
|
|
|
def end_session(self, stream_id: str, final_output_tokens: int, input_tokens: int = 0) -> None:
|
|
with self._lock:
|
|
self._sessions.pop(stream_id, None)
|
|
|
|
def get_stats(self) -> dict:
|
|
now = time.monotonic()
|
|
with self._lock:
|
|
# Prune stale sessions
|
|
stale = [
|
|
sid for sid, s in self._sessions.items()
|
|
if s.first_token_ts > 0 and (now - s.last_token_ts) > _STALE_SECS
|
|
]
|
|
for sid in stale:
|
|
self._sessions.pop(sid, None)
|
|
|
|
# Reset window if everything went stale
|
|
if not self._sessions:
|
|
self._window_start = now
|
|
|
|
# Compute global tps: average only streams with a real reading. The
|
|
# UI hides TPS entirely when this is unavailable instead of showing
|
|
# placeholder/estimated values.
|
|
active = [s for s in self._sessions.values() if s.first_token_ts > 0]
|
|
active_tps = [v for s in active for v in [s.tps()] if v is not None and v > 0]
|
|
if active_tps:
|
|
global_tps = sum(active_tps) / len(active_tps)
|
|
else:
|
|
global_tps = None
|
|
|
|
# Prune readings older than 1 hour
|
|
cutoff = now - _HOUR_SECS
|
|
self._readings = [(ts, v) for ts, v in self._readings if ts > cutoff]
|
|
|
|
# Only record this snapshot for HIGH/LOW if there is active work.
|
|
# This prevents idle periods from flooding the history and keeps
|
|
# HIGH/LOW meaningful for the past hour of actual throughput.
|
|
if global_tps is not None and global_tps > 0:
|
|
self._readings.append((now, global_tps))
|
|
|
|
# HIGH/LOW from the past hour (skip near-zero idle readings)
|
|
active_readings = [v for _, v in self._readings if v >= 1.0]
|
|
high = max(active_readings) if active_readings else 0.0
|
|
low = min(active_readings) if active_readings else 0.0
|
|
|
|
return {
|
|
'tps': round(global_tps, 1) if global_tps is not None else None,
|
|
'tps_available': global_tps is not None,
|
|
'estimated': False,
|
|
'high': round(high, 1) if high else None,
|
|
'low': round(low, 1) if low else None,
|
|
'active': len(self._sessions),
|
|
}
|
|
|
|
|
|
# ── Module-level singleton ─────────────────────────────────────────────────────
|
|
|
|
_meter = GlobalMeter()
|
|
|
|
|
|
def meter() -> GlobalMeter:
|
|
return _meter
|