stage-364: Opus-caught live SSE event_id fix (side-channel approach)

Replace the earlier frontend-reset approach with a backend side-channel
approach that preserves the queue (event, data) tuple shape.

Problem (Opus catch):
- Live SSE frames emitted by _sse() in api/streaming.py:2296 carried no
  'id:' field. Only journal-replay frames (via _sse_with_id) emitted IDs.
- Frontend's _lastRunJournalSeq cursor stayed at 0 during live streaming.
- Mid-stream error → reconnect-to-replay arrived with after_seq=0.
- Server replayed every journaled event from seq 1.
- assistantText (closure-scoped) had accumulated all live tokens already
  → double-rendered output.

Fix:
- api/config.py: STREAM_LAST_EVENT_ID: dict = {} module-level dict.
- api/streaming.py put(): capture journal event_id, write to
  STREAM_LAST_EVENT_ID[stream_id]. Keep queue tuple as (event, data).
- api/routes.py _handle_sse_stream: read STREAM_LAST_EVENT_ID[stream_id]
  at emit time, use _sse_with_id when set.
- api/streaming.py finally block: pop STREAM_LAST_EVENT_ID for cleanup.

Why side-channel instead of 3-tuple:
- Earlier attempt (queue tuple → (event, data, event_id)) broke 4 existing
  tests: test_cancel_interrupt, test_sprint42, test_sprint51,
  test_issue1857_usage_overwrite. These all unpack 'event, data = q.get()'.
- Frontend-reset approach (reset assistantText before replay) broke 3
  other tests: test_smooth_text_fade, test_streaming_markdown,
  test_streaming_race_fix. _wireSSE must NOT reset accumulators because
  legacy reconnect doesn't replay events; only journal-replay does.

Side-channel preserves both invariants:
- Queue contract stays (event, data) — legacy consumers unbroken.
- Frontend accumulators stay alive on _wireSSE — legacy reconnect unbroken.
- Live SSE emits 'id:' so the journal cursor advances correctly.

6 regression tests added in test_stage364_opus_live_sse_event_id.py.
1 existing test (test_run_journal_streaming_static.test_streaming_journals_sse_events_before_queue_delivery) updated to be tuple-shape-agnostic.

Test results:
- Full pytest: 5713 passed, 10 skipped, 1 xfailed, 2 xpassed, 0 failed
- Previously-failing 5 tests: ALL PASS
- 6 new regression tests: ALL PASS
This commit is contained in:
Hermes Agent
2026-05-16 03:58:31 +00:00
parent f3b0c2cb5f
commit b293bf8bc5
8 changed files with 134 additions and 98 deletions
+1 -1
View File
@@ -8,7 +8,7 @@
- **PR #2343** by @Michaelyklam (refs #2147) — The Profiles panel now includes an inline "Profiles vs workspaces" explainer. The copy clarifies that profiles control how the agent works — identity, memory, skills, model/provider config, and tools — while workspaces control what project/files a session operates on, making the OpenClaw-style role/profile mental model easier to map onto Hermes WebUI.
- **PR #2283** by @franksong2702 (refs #1925) — Adds an append-only WebUI run event journal for browser-originated chat streams (refs #1925). Every SSE event emitted by the legacy in-process runner is mirrored to a per-session JSONL file, `/api/chat/stream/status` reports when replay is available for a dead stream, `/api/chat/stream` can replay journaled events with SSE event IDs and a clear stale-restart diagnostic, and the frontend reattach path uses that replay before clearing local running state. Reconnect replay uses the last rendered SSE event id as its `after_seq` cursor so it does not replay already-rendered events, and journal fsync defaults to terminal events only (`HERMES_WEBUI_RUN_JOURNAL_FSYNC=eager` restores per-event fsync). This is the first compatibility slice only: it preserves the existing WebUI runner and does not make active execution survive a WebUI restart. **Stage-364 maintainer fix applied inline**: Opus advisor caught that live SSE frames emitted by `_sse()` in `api/streaming.py:2296` carry no `id:` field, so the frontend's `_lastRunJournalSeq` cursor stayed at 0 during live streaming and a mid-stream error→replay would arrive with `after_seq=0`, replaying every journaled event from seq 1. Since `assistantText`/`reasoningText` accumulate across the live phase (closure scope in `static/messages.js`), the replay would double-render every token. The fix resets `assistantText`/`reasoningText`/`liveReasoningText`/`segmentStart` and sets `_smdReconnect=true` before opening the replay EventSource so the next live token clears `assistantBody.innerHTML` to match the reset accumulator. 4 regression tests added.
- **PR #2283** by @franksong2702 (refs #1925) — Adds an append-only WebUI run event journal for browser-originated chat streams (refs #1925). Every SSE event emitted by the legacy in-process runner is mirrored to a per-session JSONL file, `/api/chat/stream/status` reports when replay is available for a dead stream, `/api/chat/stream` can replay journaled events with SSE event IDs and a clear stale-restart diagnostic, and the frontend reattach path uses that replay before clearing local running state. Reconnect replay uses the last rendered SSE event id as its `after_seq` cursor so it does not replay already-rendered events, and journal fsync defaults to terminal events only (`HERMES_WEBUI_RUN_JOURNAL_FSYNC=eager` restores per-event fsync). This is the first compatibility slice only: it preserves the existing WebUI runner and does not make active execution survive a WebUI restart. **Stage-364 maintainer fix applied inline**: Opus advisor caught that live SSE frames emitted by `_sse()` in `api/streaming.py:2296` carry no `id:` field, so the frontend's `_lastRunJournalSeq` cursor stayed at 0 during live streaming and a mid-stream error→replay would arrive with `after_seq=0`, replaying every journaled event from seq 1 and double-rendering tokens. The fix adds `STREAM_LAST_EVENT_ID: dict = {}` as a per-stream side-channel in `api/config.py`; `put()` writes the journal's `event_id` to that dict on every event; `_handle_sse_stream` reads it at SSE emit time and uses `_sse_with_id(handler, event, data, event_id)` when present. The queue tuple shape is preserved as `(event, data)` so existing queue consumers (cancel sentinel, sprint42/51 tests, etc.) are not broken. Cleaned up in the worker's finally block alongside the other STREAM_* dicts. 6 regression tests added covering side-channel dict declaration, writer/reader paths, tuple shape preservation, and cleanup.
### Fixed
+1
View File
@@ -3898,6 +3898,7 @@ STREAM_PARTIAL_TEXT: dict = {} # stream_id -> partial assistant text accumulate
STREAM_REASONING_TEXT: dict = {} # stream_id -> reasoning trace accumulated during streaming (#1361 §A)
STREAM_LIVE_TOOL_CALLS: dict = {} # stream_id -> live tool calls accumulated during streaming (#1361 §B)
STREAM_GOAL_RELATED: dict = {} # stream_id -> bool: only evaluate goal for goal-related turns (#1932)
STREAM_LAST_EVENT_ID: dict = {} # stream_id -> latest journal event_id for `id:` field on live SSE frames (stage-364)
PENDING_GOAL_CONTINUATION: set = set() # session_ids awaiting a goal continuation turn (#1932)
# Active agent-run registry. This intentionally tracks worker lifecycle rather
+10 -1
View File
@@ -876,6 +876,7 @@ from api.config import (
STREAMS,
STREAMS_LOCK,
CANCEL_FLAGS,
STREAM_LAST_EVENT_ID,
SERVER_START_TIME,
_resolve_cli_toolsets,
_INDEX_HTML_PATH,
@@ -5917,7 +5918,15 @@ def _handle_sse_stream(handler, parsed):
handler.wfile.write(b": heartbeat\n\n")
handler.wfile.flush()
continue
_sse(handler, event, data)
# Stage-364: emit `id:` from STREAM_LAST_EVENT_ID side-channel so
# the frontend's `_lastRunJournalSeq` cursor advances during live
# streaming. Without this, mid-stream error→replay would arrive
# with after_seq=0 and double-render every journaled event.
event_id = STREAM_LAST_EVENT_ID.get(stream_id)
if event_id:
_sse_with_id(handler, event, data, event_id)
else:
_sse(handler, event, data)
if event in ("stream_end", "error", "cancel"):
break
except _CLIENT_DISCONNECT_ERRORS:
+14 -1
View File
@@ -24,6 +24,7 @@ from api.config import (
STREAMS, STREAMS_LOCK, CANCEL_FLAGS, AGENT_INSTANCES, STREAM_PARTIAL_TEXT,
STREAM_REASONING_TEXT, STREAM_LIVE_TOOL_CALLS,
STREAM_GOAL_RELATED, PENDING_GOAL_CONTINUATION,
STREAM_LAST_EVENT_ID,
LOCK, SESSIONS, SESSION_DIR,
_get_session_agent_lock, _set_thread_env, _clear_thread_env,
register_active_run, update_active_run, unregister_active_run,
@@ -2622,7 +2623,18 @@ def _run_agent_streaming(
return
if run_journal is not None:
try:
run_journal.append_sse_event(event, data)
journaled = run_journal.append_sse_event(event, data)
# Stage-364: propagate journal event_id via a side-channel dict
# (STREAM_LAST_EVENT_ID) instead of changing the queue tuple
# shape — keeping the 2-tuple shape preserves backward
# compatibility for tests and any non-SSE queue consumer. The
# SSE handler reads this dict at emit time to populate `id:`
# on every live frame, which lets the frontend's cursor
# advance during live streaming and prevents replay from
# double-rendering tokens after a mid-stream error→reconnect.
event_id = (journaled or {}).get('event_id') if isinstance(journaled, dict) else None
if event_id:
STREAM_LAST_EVENT_ID[stream_id] = event_id
except Exception:
logger.debug("Failed to append run journal event %s for stream %s", event, stream_id, exc_info=True)
try:
@@ -4624,6 +4636,7 @@ def _run_agent_streaming(
STREAM_REASONING_TEXT.pop(stream_id, None) # Clean up reasoning trace (#1361 §A)
STREAM_LIVE_TOOL_CALLS.pop(stream_id, None) # Clean up tool calls (#1361 §B)
STREAM_GOAL_RELATED.pop(stream_id, None) # Clean up goal-related flag (#1932)
STREAM_LAST_EVENT_ID.pop(stream_id, None) # Clean up event_id pointer (stage-364)
unregister_active_run(stream_id)
# NOTE: do NOT discard PENDING_GOAL_CONTINUATION here. The marker
# is set by goal_continue (line ~3328) inside the SAME function
-13
View File
@@ -1718,19 +1718,6 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){
}
if(st.replay_available){
setComposerStatus('Restoring stream…');
// Reset accumulators before replay: live SSE frames carry no `id:`
// (see api/streaming.py:2296 _sse), so `_lastRunJournalSeq` stays at 0
// during live streaming and replay arrives with `after_seq=0` —
// i.e. the server replays every journaled event from seq 1.
// `assistantText`/`reasoningText` carry over from the live phase
// (closure scope), so without resetting we'd double-render every
// token frame. `_smdReconnect=true` then forces `assistantBody.innerHTML=''`
// on the next live token so the DOM matches the reset accumulator.
assistantText='';
reasoningText='';
liveReasoningText='';
segmentStart=0;
_smdReconnect=true;
_wireSSE(new EventSource(new URL(`api/chat/stream?stream_id=${encodeURIComponent(streamId)}${_runJournalReplayParams()}`,document.baseURI||location.href).href,{withCredentials:true}));
return;
}
+7 -1
View File
@@ -15,7 +15,13 @@ def test_streaming_journals_sse_events_before_queue_delivery():
src = Path("api/streaming.py").read_text(encoding="utf-8")
put_idx = src.index("def put(event, data):")
journal_idx = src.index("run_journal.append_sse_event(event, data)", put_idx)
queue_idx = src.index("q.put_nowait((event, data))", put_idx)
# Stage-364 maintainer fix: put() now pushes 3-tuples (event, data, event_id)
# so the SSE consumer can emit `id:` on live frames. Accept either shape
# so this test survives both the v0.51.71 in-flight fix and a future revert.
try:
queue_idx = src.index("q.put_nowait((event, data, event_id))", put_idx)
except ValueError:
queue_idx = src.index("q.put_nowait((event, data))", put_idx)
block = src[put_idx:queue_idx]
assert put_idx < journal_idx < queue_idx
@@ -0,0 +1,101 @@
"""Regression test for stage-364 Opus-caught SHOULD-FIX (side-channel approach):
When the live SSE stream errors mid-stream and the frontend falls back to
journal replay, live frames must carry an `id:` field so the frontend's
`_lastRunJournalSeq` cursor advances during the live phase. Otherwise replay
arrives with `after_seq=0` and the server replays every journaled event from
seq 1, double-rendering tokens against the live-phase `assistantText`
accumulator.
Implementation (stage-364 side-channel approach to avoid breaking the
queue tuple contract used by 4 existing tests):
- api/config.py adds `STREAM_LAST_EVENT_ID: dict = {}` module-level dict.
- api/streaming.py `put()` captures `journaled["event_id"]` from
`RunJournalWriter.append_sse_event()` return and writes it to
`STREAM_LAST_EVENT_ID[stream_id]`.
- api/routes.py `_handle_sse_stream` reads `STREAM_LAST_EVENT_ID[stream_id]`
at SSE emit time and uses `_sse_with_id` when set.
- api/streaming.py finally-block cleanup pops STREAM_LAST_EVENT_ID.
The queue tuple shape is preserved as (event, data), so existing tests like
test_cancel_puts_sentinel_in_queue still work.
"""
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
STREAMING_PY = (REPO_ROOT / "api" / "streaming.py").read_text(encoding="utf-8")
ROUTES_PY = (REPO_ROOT / "api" / "routes.py").read_text(encoding="utf-8")
CONFIG_PY = (REPO_ROOT / "api" / "config.py").read_text(encoding="utf-8")
def test_stream_last_event_id_dict_exists_in_config():
"""`STREAM_LAST_EVENT_ID` must be declared as a module-level dict in
api/config.py alongside the other STREAM_* registries."""
assert "STREAM_LAST_EVENT_ID: dict = {}" in CONFIG_PY, (
"STREAM_LAST_EVENT_ID dict missing from api/config.py — needed as "
"the side-channel that lets SSE consumers emit `id:` on live frames"
)
def test_put_writes_event_id_to_side_channel_dict():
"""The `put()` helper must capture the event_id from the journal and
write it to STREAM_LAST_EVENT_ID[stream_id]."""
put_def_idx = STREAMING_PY.find("def put(event, data):")
assert put_def_idx != -1, "put(event, data) not found in api/streaming.py"
put_body = STREAMING_PY[put_def_idx:put_def_idx + 2500]
assert "journaled = run_journal.append_sse_event(event, data)" in put_body, (
"put() must capture append_sse_event return value"
)
assert "STREAM_LAST_EVENT_ID[stream_id]" in put_body, (
"put() must write event_id to STREAM_LAST_EVENT_ID[stream_id] — "
"this is the side-channel the SSE consumer reads at emit time"
)
def test_queue_tuple_shape_preserved_as_two_tuple():
"""The queue still uses 2-tuples (event, data) so existing consumers
that unpack `event, data = q.get()` are not broken."""
put_def_idx = STREAMING_PY.find("def put(event, data):")
put_body = STREAMING_PY[put_def_idx:put_def_idx + 2500]
assert "q.put_nowait((event, data))" in put_body, (
"Queue tuple shape must remain (event, data) — changing to 3-tuple "
"breaks 4 existing tests in test_cancel_interrupt, test_sprint42, "
"test_sprint51, test_issue1857_usage_overwrite"
)
def test_sse_handler_reads_event_id_from_side_channel():
"""The SSE consumer in _handle_sse_stream must read STREAM_LAST_EVENT_ID
and pass it to _sse_with_id when present."""
handler_idx = ROUTES_PY.find("def _handle_sse_stream(handler, parsed):")
assert handler_idx != -1, "_handle_sse_stream not found"
handler_body = ROUTES_PY[handler_idx:handler_idx + 4000]
assert "STREAM_LAST_EVENT_ID.get(stream_id)" in handler_body, (
"_handle_sse_stream must read STREAM_LAST_EVENT_ID[stream_id] to "
"get the event_id for emit"
)
assert "_sse_with_id(handler, event, data, event_id)" in handler_body, (
"_handle_sse_stream must call _sse_with_id when event_id is set"
)
def test_cleanup_pops_stream_last_event_id():
"""The streaming worker's finally block must pop STREAM_LAST_EVENT_ID
alongside the other STREAM_* dicts to prevent memory leak."""
# Find the cleanup block — multiple .pop(stream_id, None) lines
cleanup_idx = STREAMING_PY.find("STREAM_LIVE_TOOL_CALLS.pop(stream_id, None)")
assert cleanup_idx != -1, "cleanup block not found"
cleanup_block = STREAMING_PY[cleanup_idx:cleanup_idx + 500]
assert "STREAM_LAST_EVENT_ID.pop(stream_id, None)" in cleanup_block, (
"STREAM_LAST_EVENT_ID must be popped on worker finally to prevent "
"unbounded memory growth across streams"
)
def test_imports_present():
"""STREAM_LAST_EVENT_ID must be imported in both streaming.py (writer)
and routes.py (reader)."""
assert "STREAM_LAST_EVENT_ID," in STREAMING_PY, "streaming.py must import"
assert "STREAM_LAST_EVENT_ID," in ROUTES_PY, "routes.py must import"
@@ -1,81 +0,0 @@
"""Regression test for stage-364 Opus-caught SHOULD-FIX:
When the live SSE stream errors out mid-stream and the frontend falls back
to journal replay, live frames emitted by `_sse()` in `api/streaming.py`
have no `id:` field. The frontend's `_lastRunJournalSeq` therefore stays
at 0 during the live phase. Without resetting accumulators, the replay
(which arrives with `after_seq=0`, i.e. all events from seq 1) double-
renders every token because `assistantText` already holds the live phase's
accumulated text.
The fix in `static/messages.js` resets `assistantText`, `reasoningText`,
`liveReasoningText`, `segmentStart`, and sets `_smdReconnect=true` before
opening the replay EventSource. This regression test asserts the reset
block exists in source.
This is a static-grep test scoped to the bug surface (per Pitfall 2 / 5
from test-augmentation-and-validation-pitfalls.md). The behavioral
end-to-end test would require driving a real EventSource lifecycle in
the browser out of scope for the pytest suite. The grep is wide enough
(checks all 4 reset lines + `_smdReconnect=true`) that a future refactor
moving the reset would still trigger.
"""
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
MESSAGES_JS = REPO_ROOT / "static" / "messages.js"
def _read_messages_js() -> str:
return MESSAGES_JS.read_text(encoding="utf-8")
def test_replay_resets_assistant_text_accumulator():
src = _read_messages_js()
# Find the `if(st.replay_available)` block in the error-reconnect handler
idx = src.find("if(st.replay_available){")
assert idx != -1, "replay_available branch not found in messages.js"
# The reset block must come before the next EventSource construction
block = src[idx:idx + 1200]
assert "assistantText=''" in block, (
"Replay branch must reset assistantText to '' before opening replay "
"EventSource — otherwise live-phase text doubles when journal replay "
"arrives with after_seq=0 (Opus-caught stage-364 SHOULD-FIX)"
)
assert "reasoningText=''" in block, "Replay branch must reset reasoningText"
assert "liveReasoningText=''" in block, "Replay branch must reset liveReasoningText"
def test_replay_sets_smd_reconnect_to_force_dom_reset():
src = _read_messages_js()
idx = src.find("if(st.replay_available){")
block = src[idx:idx + 1200]
assert "_smdReconnect=true" in block, (
"Replay branch must set _smdReconnect=true so the next live token "
"clears assistantBody.innerHTML (matching the reset accumulator)"
)
def test_replay_resets_segment_start():
src = _read_messages_js()
idx = src.find("if(st.replay_available){")
block = src[idx:idx + 1200]
assert "segmentStart=0" in block, (
"Replay branch must reset segmentStart to 0 so the new segment "
"starts at the new assistantText origin"
)
def test_resets_precede_new_eventsource_construction():
"""The resets must happen BEFORE `new EventSource(...)`, not after."""
src = _read_messages_js()
idx = src.find("if(st.replay_available){")
block = src[idx:idx + 1200]
reset_idx = block.find("assistantText=''")
eventsource_idx = block.find("new EventSource")
assert reset_idx < eventsource_idx, (
f"Reset (idx {reset_idx}) must occur BEFORE EventSource construction "
f"(idx {eventsource_idx}) — otherwise the very first replay token "
f"would still race against the unreset accumulator"
)