From 8f98465024bc7b131168fae12abde2ef4e2ff219 Mon Sep 17 00:00:00 2001 From: nesquena-hermes Date: Sun, 17 May 2026 02:49:35 +0000 Subject: [PATCH] =?UTF-8?q?Stage=20374:=20PR=20#2427=20=E2=80=94=20fix(str?= =?UTF-8?q?eaming):=20recover=20journaled=20partial=20assistant=20output?= =?UTF-8?q?=20after=20WebUI=20restart=20by=20@franksong2702=20(fixes=20#24?= =?UTF-8?q?23)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Frank Song --- api/models.py | 190 +++++++++++++++++++++-- api/routes.py | 33 ++++ tests/test_issue1361_cancel_data_loss.py | 59 ++++++- tests/test_session_sidecar_repair.py | 124 +++++++++++++++ 4 files changed, 390 insertions(+), 16 deletions(-) diff --git a/api/models.py b/api/models.py index 84c21438..c45e33c0 100644 --- a/api/models.py +++ b/api/models.py @@ -685,26 +685,188 @@ def _get_profile_home(profile) -> Path: return Path(os.environ.get('HERMES_HOME') or '~/.hermes').expanduser() -def _interrupted_recovery_marker() -> dict: - return { - 'role': 'assistant', - 'content': ( +def _interrupted_recovery_marker(*, recovered_output: bool = False) -> dict: + if recovered_output: + content = ( + '**Response interrupted.**\n\n' + 'The WebUI process restarted before this turn finished. ' + 'The partial output above was recovered from the run journal, ' + 'but the interrupted agent process could not continue.' + ) + else: + content = ( '**Response interrupted.**\n\n' 'The WebUI process restarted before this turn finished. ' 'The user message above was preserved, but no agent output was recovered.' - ), + ) + return { + 'role': 'assistant', + 'content': content, 'timestamp': int(time.time()), '_error': True, 'type': 'interrupted', } +def _truncate_journal_tool_args(args, limit: int = 4) -> dict: + if not isinstance(args, dict): + return {} + out = {} + for key, value in list(args.items())[:limit]: + text = str(value) + out[str(key)] = text[:120] + ('...' if len(text) > 120 else '') + return out + + +def _append_journaled_partial_output(session, stream_id: str | None) -> bool: + """Recover already-emitted visible output from a dead stream journal. + + This repair path is intentionally conservative: it restores user-visible + assistant text and tool-card metadata that had already been emitted over + SSE before the WebUI process died. It does not restore hidden reasoning and + it does not try to continue execution. + """ + if not stream_id: + return False + + try: + from api.run_journal import read_run_events + journal = read_run_events(session.session_id, stream_id) + except Exception: + logger.debug( + "Session %s: failed to read run journal for stream %s", + getattr(session, 'session_id', '?'), + stream_id, + exc_info=True, + ) + return False + + events = [event for event in journal.get('events') or [] if isinstance(event, dict)] + if not events: + return False + + appended_any = False + assistant_parts: list[str] = [] + assistant_started_at: float | None = None + current_assistant_idx: int | None = None + recovered_tool_calls: list[dict] = [] + + def flush_assistant() -> int | None: + nonlocal appended_any, assistant_parts, assistant_started_at, current_assistant_idx + content = ''.join(assistant_parts).strip() + assistant_parts = [] + if not content: + return current_assistant_idx + timestamp = int(assistant_started_at or time.time()) + session.messages.append({ + 'role': 'assistant', + 'content': content, + 'timestamp': timestamp, + '_recovered_from_run_journal': True, + '_recovered_stream_id': stream_id, + }) + current_assistant_idx = len(session.messages) - 1 + assistant_started_at = None + appended_any = True + return current_assistant_idx + + def ensure_assistant_anchor(created_at: float | None = None) -> int: + nonlocal appended_any, current_assistant_idx + idx = flush_assistant() + if idx is not None: + return idx + # A stream can start with tools before any text. Keep those tools + # visible after restart with an empty recovered assistant anchor instead + # of inventing synthetic progress prose. + session.messages.append({ + 'role': 'assistant', + 'content': '', + 'timestamp': int(created_at or time.time()), + '_recovered_from_run_journal': True, + '_recovered_stream_id': stream_id, + }) + current_assistant_idx = len(session.messages) - 1 + appended_any = True + return current_assistant_idx + + for event in events: + event_name = str(event.get('event') or event.get('type') or '') + payload = event.get('payload') if isinstance(event.get('payload'), dict) else {} + created_at = event.get('created_at') if isinstance(event.get('created_at'), (int, float)) else None + if event_name == 'token': + text = str(payload.get('text') or '') + if not text: + continue + if not assistant_parts and assistant_started_at is None: + assistant_started_at = created_at or time.time() + assistant_parts.append(text) + continue + if event_name == 'interim_assistant': + if payload.get('already_streamed'): + flush_assistant() + continue + text = str(payload.get('text') or '').strip() + if not text: + continue + if not assistant_parts and assistant_started_at is None: + assistant_started_at = created_at or time.time() + if assistant_parts and not ''.join(assistant_parts).endswith(('\n', ' ')): + assistant_parts.append('\n\n') + assistant_parts.append(text) + flush_assistant() + continue + if event_name == 'tool': + anchor_idx = flush_assistant() + if anchor_idx is None: + anchor_idx = ensure_assistant_anchor(created_at) + name = str(payload.get('name') or 'tool') + preview = str(payload.get('preview') or '') + recovered_tool_calls.append({ + 'name': name, + 'preview': preview, + 'snippet': preview, + 'tid': f"journal-{event.get('seq') or len(recovered_tool_calls) + 1}", + 'assistant_msg_idx': anchor_idx, + 'args': _truncate_journal_tool_args(payload.get('args') or {}), + 'done': False, + '_recovered_from_run_journal': True, + '_recovered_stream_id': stream_id, + }) + appended_any = True + current_assistant_idx = anchor_idx + continue + if event_name == 'tool_complete': + name = str(payload.get('name') or '') + for tool_call in reversed(recovered_tool_calls): + if tool_call.get('done'): + continue + if not name or tool_call.get('name') == name: + tool_call['done'] = True + if payload.get('preview'): + tool_call['preview'] = str(payload.get('preview') or '') + tool_call['snippet'] = str(payload.get('preview') or '') + if payload.get('duration') is not None: + tool_call['duration'] = payload.get('duration') + tool_call['is_error'] = bool(payload.get('is_error', False)) + break + continue + if event_name in {'done', 'stream_end', 'cancel', 'apperror', 'error'}: + flush_assistant() + + flush_assistant() + if recovered_tool_calls: + session.tool_calls = list(session.tool_calls or []) + recovered_tool_calls + appended_any = True + return appended_any + + def _apply_core_sync_or_error_marker( session, core_path, stream_id_for_recheck=None, *, require_stream_dead=True, + touch_updated_at=True, ) -> bool: """Inner repair logic. Must be called with the per-session lock already held. @@ -761,12 +923,16 @@ def _apply_core_sync_or_error_marker( if session.pending_attachments: recovered['attachments'] = list(session.pending_attachments) _append_recovered_turn_to_context(session, recovered) + recovered_output = _append_journaled_partial_output( + session, + stream_id_for_recheck or session.active_stream_id, + ) session.active_stream_id = None session.pending_user_message = None session.pending_attachments = [] session.pending_started_at = None - session.messages.append(_interrupted_recovery_marker()) - session.save() + session.messages.append(_interrupted_recovery_marker(recovered_output=recovered_output)) + session.save(touch_updated_at=touch_updated_at) logger.info( "Session %s: recovered pending user turn (messages non-empty), added error marker", sid, @@ -789,7 +955,7 @@ def _apply_core_sync_or_error_marker( session.pending_user_message = None session.pending_attachments = [] session.pending_started_at = None - session.save() + session.save(touch_updated_at=touch_updated_at) logger.info( "Session %s: synced %d messages from core transcript", sid, len(core_messages), @@ -805,12 +971,16 @@ def _apply_core_sync_or_error_marker( if isinstance(session.pending_started_at, (int, float)) and session.pending_started_at > 0: _recovered_ts = int(session.pending_started_at) _append_recovered_pending_turn(session, timestamp=_recovered_ts) + recovered_output = _append_journaled_partial_output( + session, + stream_id_for_recheck or session.active_stream_id, + ) session.active_stream_id = None session.pending_user_message = None session.pending_attachments = [] session.pending_started_at = None - session.messages.append(_interrupted_recovery_marker()) - session.save() + session.messages.append(_interrupted_recovery_marker(recovered_output=recovered_output)) + session.save(touch_updated_at=touch_updated_at) logger.info("Session %s: no core transcript found, added error marker", sid) return True diff --git a/api/routes.py b/api/routes.py index 874fd8f6..217659a8 100644 --- a/api/routes.py +++ b/api/routes.py @@ -1004,6 +1004,39 @@ def _clear_stale_stream_state(session) -> bool: with _get_session_agent_lock(session.session_id): if getattr(session, "active_stream_id", None) != stream_id: return False + if getattr(session, "pending_user_message", None): + try: + from api.models import _apply_core_sync_or_error_marker, _get_profile_home + profile_home = _get_profile_home(getattr(session, "profile", None)) + core_path = profile_home / "sessions" / f"session_{session.session_id}.json" + repaired = _apply_core_sync_or_error_marker( + session, + core_path, + stream_id_for_recheck=stream_id, + touch_updated_at=False, + ) + except Exception: + logger.exception( + "_clear_stale_stream_state: failed to repair stale pending stream %s " + "for session %s", + stream_id, getattr(session, "session_id", "?"), + ) + repaired = False + if repaired: + if original_stub is not session: + try: + original_stub.active_stream_id = None + if hasattr(original_stub, "pending_user_message"): + original_stub.pending_user_message = None + if hasattr(original_stub, "pending_attachments"): + original_stub.pending_attachments = [] + if hasattr(original_stub, "pending_started_at"): + original_stub.pending_started_at = None + except Exception: + pass + return True + if getattr(session, "active_stream_id", None) != stream_id: + return False _materialize_pending_user_turn_before_error(session) session.active_stream_id = None if hasattr(session, "pending_user_message"): diff --git a/tests/test_issue1361_cancel_data_loss.py b/tests/test_issue1361_cancel_data_loss.py index 266b87d4..f6099236 100644 --- a/tests/test_issue1361_cancel_data_loss.py +++ b/tests/test_issue1361_cancel_data_loss.py @@ -25,6 +25,7 @@ import api.config as config import api.models as models import api.streaming as streaming from api.models import Session +from api.run_journal import append_run_event from api.streaming import cancel_stream REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve() @@ -391,14 +392,60 @@ def test_stale_stream_cleanup_materializes_pending_turn_before_clearing_state(): assert cleared is True assert s.active_stream_id is None assert s.pending_user_message is None - assert s.messages[-1]["role"] == "user" - assert s.messages[-1]["content"] == "please make the GUI fully usable" - assert s.messages[-1]["timestamp"] == 1778187755 - assert s.messages[-1]["attachments"] == [{"name": "visible-state.png"}] + assert s.messages[-2]["role"] == "user" + assert s.messages[-2]["content"] == "please make the GUI fully usable" + assert s.messages[-2]["timestamp"] == 1778187755 + assert s.messages[-2]["attachments"] == [{"name": "visible-state.png"}] + assert s.messages[-1]["role"] == "assistant" + assert s.messages[-1].get("_error") is True + assert s.messages[-1].get("type") == "interrupted" reloaded = models.get_session(sid, metadata_only=False) - assert reloaded.messages[-1]["role"] == "user" - assert reloaded.messages[-1]["content"] == "please make the GUI fully usable" + assert reloaded.messages[-2]["role"] == "user" + assert reloaded.messages[-2]["content"] == "please make the GUI fully usable" + assert reloaded.messages[-1]["role"] == "assistant" + assert reloaded.messages[-1].get("type") == "interrupted" + + +def test_stale_stream_cleanup_recovers_journaled_visible_output(): + """The /api/session stale cleanup path can run before a full chat reload; + it must preserve journaled partial output instead of only clearing runtime + flags.""" + from api.routes import _clear_stale_stream_state + + sid = "test_pending_error_d4_journal" + s = _make_session( + session_id=sid, + pending_msg="please check maintainer activity", + messages=[{"role": "assistant", "content": "previous answer"}], + ) + append_run_event( + sid, + "stream_1361", + "token", + {"text": "I will check GitHub first."}, + ) + append_run_event( + sid, + "stream_1361", + "tool", + {"name": "terminal", "preview": "gh issue view 2423", "args": {"command": "gh issue view 2423"}}, + ) + append_run_event( + sid, + "stream_1361", + "tool_complete", + {"name": "terminal", "duration": 0.4, "is_error": False}, + ) + + cleared = _clear_stale_stream_state(s) + + assert cleared is True + assert any("I will check GitHub first." in (m.get("content") or "") for m in s.messages) + assert s.tool_calls + assert s.tool_calls[0]["name"] == "terminal" + assert s.messages[-1].get("type") == "interrupted" + assert "partial output above was recovered" in s.messages[-1]["content"] # ── Structural guard: pin call sites of the materialize helper at error branches ── diff --git a/tests/test_session_sidecar_repair.py b/tests/test_session_sidecar_repair.py index 10a599ba..d6f17938 100644 --- a/tests/test_session_sidecar_repair.py +++ b/tests/test_session_sidecar_repair.py @@ -21,6 +21,7 @@ from api.models import ( import api.config as config import api.streaming as streaming import api.profiles as profiles +from api.run_journal import append_run_event # ── Fixtures ──────────────────────────────────────────────────────────────── @@ -617,6 +618,129 @@ class TestNonEmptyMessagesPendingCleared: assert s.pending_started_at is None assert s.active_stream_id is None + def test_journaled_partial_output_is_recovered_before_interrupted_marker(self, hermes_home, monkeypatch): + """When a WebUI restart leaves a dead stream with journaled partial + output, repair should not collapse the user-visible transcript to only + a generic interrupted marker.""" + s = _make_session(messages=[{"role": "user", "content": "existing turn"}]) + s.pending_user_message = "Check maintainer activity" + s.pending_started_at = time.time() - 120 + s.active_stream_id = "journaled_stream" + s.save() + + append_run_event( + s.session_id, + "journaled_stream", + "token", + {"text": "I will check GitHub first."}, + ) + append_run_event( + s.session_id, + "journaled_stream", + "tool", + { + "name": "terminal", + "preview": "gh pr list --repo nesquena/hermes-webui", + "args": {"command": "gh pr list --repo nesquena/hermes-webui"}, + }, + ) + append_run_event( + s.session_id, + "journaled_stream", + "tool_complete", + {"name": "terminal", "duration": 1.2, "is_error": False}, + ) + append_run_event( + s.session_id, + "journaled_stream", + "token", + {"text": "The first check finished before the restart."}, + ) + + core_path = hermes_home / "sessions" / f"session_{s.session_id}.json" + result = _apply_core_sync_or_error_marker( + s, + core_path, + stream_id_for_recheck="journaled_stream", + ) + + assert result is True + contents = [m.get("content", "") for m in s.messages] + assert any("I will check GitHub first." in c for c in contents) + assert any("The first check finished before the restart." in c for c in contents) + assert s.tool_calls, "journaled tool starts should become visible settled tool cards" + assert s.tool_calls[0]["name"] == "terminal" + assert s.tool_calls[0]["done"] is True + assert s.tool_calls[0]["assistant_msg_idx"] < len(s.messages) + error_msgs = [m for m in s.messages if m.get("_error")] + assert len(error_msgs) == 1 + assert "partial output above was recovered" in error_msgs[0]["content"] + assert "no agent output was recovered" not in error_msgs[0]["content"] + + def test_journal_recovery_does_not_materialize_reasoning_only_events(self, hermes_home, monkeypatch): + """Run-journal repair must not turn hidden reasoning into visible chat + transcript content.""" + s = _make_session(messages=[{"role": "user", "content": "existing turn"}]) + s.pending_user_message = "Keep going" + s.pending_started_at = time.time() - 120 + s.active_stream_id = "reasoning_only_stream" + s.save() + + append_run_event( + s.session_id, + "reasoning_only_stream", + "reasoning", + {"text": "private scratchpad text"}, + ) + + core_path = hermes_home / "sessions" / f"session_{s.session_id}.json" + result = _apply_core_sync_or_error_marker( + s, + core_path, + stream_id_for_recheck="reasoning_only_stream", + ) + + assert result is True + contents = [m.get("content", "") for m in s.messages] + assert not any("private scratchpad text" in c for c in contents) + error_msgs = [m for m in s.messages if m.get("_error")] + assert len(error_msgs) == 1 + assert "no agent output was recovered" in error_msgs[0]["content"] + + def test_journal_recovery_keeps_consecutive_tools_on_one_anchor(self, hermes_home, monkeypatch): + """Consecutive journaled tools without an intervening visible update + should recover as one activity group instead of repeated empty anchors.""" + s = _make_session(messages=[{"role": "user", "content": "existing turn"}]) + s.pending_user_message = "Inspect files" + s.pending_started_at = time.time() - 120 + s.active_stream_id = "tool_burst_stream" + s.save() + + append_run_event( + s.session_id, + "tool_burst_stream", + "token", + {"text": "I will inspect the relevant files first."}, + ) + for name in ("search_files", "read_file"): + append_run_event( + s.session_id, + "tool_burst_stream", + "tool", + {"name": name, "preview": name, "args": {"query": "stream recovery"}}, + ) + + core_path = hermes_home / "sessions" / f"session_{s.session_id}.json" + result = _apply_core_sync_or_error_marker( + s, + core_path, + stream_id_for_recheck="tool_burst_stream", + ) + + assert result is True + assert len(s.tool_calls) == 2 + assert s.tool_calls[0]["assistant_msg_idx"] == s.tool_calls[1]["assistant_msg_idx"] + class TestLastResortSyncDelegation: """_last_resort_sync_from_core delegates to the shared helpers