Files
2026-05-04 21:26:43 +00:00

195 lines
7.5 KiB
Python

"""
Hermes Web UI -- Streaming performance metering.
Tracks Tokens Per Second (TPS) across active WebUI streams. Metering data is
emitted via SSE events so a streaming assistant message can update its own
header while the turn is running.
Architecture
────────────
Each streaming session is tracked independently. TPS per stream is:
stream_tps = total_stream_deltas / (last_delta_ts - first_delta_ts)
The global tps is the average of all currently active streams' TPS values.
This correctly represents the system's real-time capacity regardless of how
many sessions are running or how long each has been streaming.
For HIGH/LOW tracking, every stats snapshot records the current global tps
(only when > 0 — idle periods are skipped) into a rolling 60-minute history.
The max/min of that history gives the peak throughput observed over the past hour.
The ticker in streaming.py calls get_interval() — it returns 1.0 when streams
are actively receiving output deltas so message headers update at 1 Hz, and 10.0 when idle
so the ticker exits and no idle readings are emitted.
Usage from api/streaming.py
─────────────────────────────
from api.metering import meter
meter().begin_session(stream_id) # stream starts
meter().record_token(stream_id, running_output_deltas)
meter().record_reasoning(stream_id, running_reasoning_deltas)
The SSE `metering` event payload:
{
"tps": 47.3, # omitted/null until a real reading exists
"tps_available": true, # frontend must hide TPS when false
"estimated": false, # never show byte/character-size estimates
"high": 52.1,
"low": 31.4,
"active": 1,
}
"""
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
_HOUR_SECS = 3600.0 # rolling window for HIGH/LOW tracking
_STALE_SECS = 60.0 # consider a session inactive after this
@dataclass
class _SessionMeter:
output_tokens: int = 0
reasoning_tokens: int = 0
first_token_ts: float = 0.0 # time.monotonic() of first token received
last_token_ts: float = 0.0 # time.monotonic() of last token received
def total_tokens(self) -> int:
return self.output_tokens + self.reasoning_tokens
def tps(self) -> float | None:
if self.first_token_ts == 0.0 or self.last_token_ts <= self.first_token_ts:
return None
return self.total_tokens() / (self.last_token_ts - self.first_token_ts)
class GlobalMeter:
"""Thread-safe global streaming meter.
Tracks per-session TPS, averages them for a global tps, and maintains a
60-minute rolling history of global tps snapshots for HIGH/LOW reporting.
"""
__slots__ = (
'_lock',
'_sessions', # stream_id -> _SessionMeter
'_readings', # [(monotonic_ts, tps), ...] rolling 60-minute history
'_window_start', # monotonic ts of current window
)
def __init__(self) -> None:
self._lock = threading.Lock()
self._sessions: dict[str, _SessionMeter] = {}
self._readings: list[tuple[float, float]] = []
self._window_start: float = time.monotonic()
# ── Public API ────────────────────────────────────────────────────────────
def begin_session(self, stream_id: str) -> None:
with self._lock:
self._sessions[stream_id] = _SessionMeter()
def get_interval(self) -> float:
"""Return 1.0 when sessions are actively receiving tokens, 10.0 when idle.
Used by the streaming ticker to run at 1 Hz during work and exit when
there is nothing to measure.
"""
now = time.monotonic()
with self._lock:
# Only count sessions that have received at least one token recently.
active_sids = {
sid for sid, s in self._sessions.items()
if s.first_token_ts > 0 and (now - s.last_token_ts) <= _STALE_SECS
}
return 1.0 if active_sids else 10.0
def record_token(self, stream_id: str, running_output_tokens: int) -> None:
now = time.monotonic()
with self._lock:
s = self._sessions.get(stream_id)
if s is None:
return
if s.first_token_ts == 0.0:
s.first_token_ts = now
s.last_token_ts = now
s.output_tokens = running_output_tokens
def record_reasoning(self, stream_id: str, running_reasoning_tokens: int) -> None:
now = time.monotonic()
with self._lock:
s = self._sessions.get(stream_id)
if s is None:
return
if s.first_token_ts == 0.0:
s.first_token_ts = now
s.last_token_ts = now
s.reasoning_tokens = running_reasoning_tokens
def end_session(self, stream_id: str, final_output_tokens: int, input_tokens: int = 0) -> None:
with self._lock:
self._sessions.pop(stream_id, None)
def get_stats(self) -> dict:
now = time.monotonic()
with self._lock:
# Prune stale sessions
stale = [
sid for sid, s in self._sessions.items()
if s.first_token_ts > 0 and (now - s.last_token_ts) > _STALE_SECS
]
for sid in stale:
self._sessions.pop(sid, None)
# Reset window if everything went stale
if not self._sessions:
self._window_start = now
# Compute global tps: average only streams with a real reading. The
# UI hides TPS entirely when this is unavailable instead of showing
# placeholder/estimated values.
active = [s for s in self._sessions.values() if s.first_token_ts > 0]
active_tps = [v for s in active for v in [s.tps()] if v is not None and v > 0]
if active_tps:
global_tps = sum(active_tps) / len(active_tps)
else:
global_tps = None
# Prune readings older than 1 hour
cutoff = now - _HOUR_SECS
self._readings = [(ts, v) for ts, v in self._readings if ts > cutoff]
# Only record this snapshot for HIGH/LOW if there is active work.
# This prevents idle periods from flooding the history and keeps
# HIGH/LOW meaningful for the past hour of actual throughput.
if global_tps is not None and global_tps > 0:
self._readings.append((now, global_tps))
# HIGH/LOW from the past hour (skip near-zero idle readings)
active_readings = [v for _, v in self._readings if v >= 1.0]
high = max(active_readings) if active_readings else 0.0
low = min(active_readings) if active_readings else 0.0
return {
'tps': round(global_tps, 1) if global_tps is not None else None,
'tps_available': global_tps is not None,
'estimated': False,
'high': round(high, 1) if high else None,
'low': round(low, 1) if low else None,
'active': len(self._sessions),
}
# ── Module-level singleton ─────────────────────────────────────────────────────
_meter = GlobalMeter()
def meter() -> GlobalMeter:
return _meter