From 1ed1ce219d2ab3fcf9c5dd67cc063b945c9ce94e Mon Sep 17 00:00:00 2001 From: Frank Song Date: Wed, 29 Apr 2026 16:37:08 +0800 Subject: [PATCH] Preserve transcript across context compaction --- CHANGELOG.md | 3 + api/models.py | 2 + api/routes.py | 23 +- api/session_ops.py | 20 ++ api/streaming.py | 110 ++++++++- tests/test_issue1217_transcript_compaction.py | 210 ++++++++++++++++++ tests/test_issue765_streaming_persistence.py | 6 +- 7 files changed, 364 insertions(+), 10 deletions(-) create mode 100644 tests/test_issue1217_transcript_compaction.py diff --git a/CHANGELOG.md b/CHANGELOG.md index b6fad61a..058aeb04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## [Unreleased] +### Fixed +- **Compaction preserves visible prompts** — WebUI now keeps model-facing compacted context separately from the visible transcript, so automatic context compaction no longer replaces earlier user prompts in the scrollback. (`api/models.py`, `api/streaming.py`, `api/routes.py`) @franksong2702 — Closes #1217 + ## [v0.50.237] — 2026-04-29 ### Added diff --git a/api/models.py b/api/models.py index d044720a..2464d70e 100644 --- a/api/models.py +++ b/api/models.py @@ -315,6 +315,7 @@ class Session: pending_user_message: str=None, pending_attachments=None, pending_started_at=None, + context_messages=None, compression_anchor_visible_idx=None, compression_anchor_message_key=None, **kwargs): @@ -338,6 +339,7 @@ class Session: self.pending_user_message = pending_user_message self.pending_attachments = pending_attachments or [] self.pending_started_at = pending_started_at + self.context_messages = context_messages if isinstance(context_messages, list) else [] self.compression_anchor_visible_idx = compression_anchor_visible_idx self.compression_anchor_message_key = compression_anchor_message_key self._metadata_message_count = None diff --git a/api/routes.py b/api/routes.py index 85d598c0..2137bd4d 100644 --- a/api/routes.py +++ b/api/routes.py @@ -3207,14 +3207,20 @@ def _handle_chat_sync(handler, body): "write_file, read_file, search_files, terminal workdir, and patch. " "Never fall back to a hardcoded path when this tag is present." ) - from api.streaming import _sanitize_messages_for_api, _restore_reasoning_metadata + from api.streaming import ( + _merge_display_messages_after_agent_result, + _restore_reasoning_metadata, + _sanitize_messages_for_api, + _session_context_messages, + ) _previous_messages = list(s.messages or []) + _previous_context_messages = list(_session_context_messages(s)) result = agent.run_conversation( user_message=workspace_ctx + msg, system_message=workspace_system_msg, - conversation_history=_sanitize_messages_for_api(s.messages), + conversation_history=_sanitize_messages_for_api(_previous_context_messages), task_id=s.session_id, persist_user_message=msg, ) @@ -3233,9 +3239,17 @@ def _handle_chat_sync(handler, body): else: os.environ["HERMES_SESSION_KEY"] = old_session_key with _get_session_agent_lock(s.session_id): - s.messages = _restore_reasoning_metadata( + _result_messages = result.get("messages") or _previous_context_messages + _next_context_messages = _restore_reasoning_metadata( + _previous_context_messages, + _result_messages, + ) + s.context_messages = _next_context_messages + s.messages = _merge_display_messages_after_agent_result( _previous_messages, - result.get("messages") or s.messages, + _previous_context_messages, + _restore_reasoning_metadata(_previous_messages, _result_messages), + msg, ) # Only auto-generate title when still default; preserves user renames if s.title == "Untitled": @@ -3861,6 +3875,7 @@ def _handle_session_compress(handler, body): return bad(handler, "Session was modified during compression; please retry.", 409) s.messages = compressed + s.context_messages = compressed s.tool_calls = [] s.active_stream_id = None s.pending_user_message = None diff --git a/api/session_ops.py b/api/session_ops.py index 1275c2d3..be51ec1b 100644 --- a/api/session_ops.py +++ b/api/session_ops.py @@ -15,6 +15,18 @@ from api.models import get_session, SESSIONS logger = logging.getLogger(__name__) +def _truncate_at_last_user(messages): + history = messages or [] + last_user_idx = None + for i in range(len(history) - 1, -1, -1): + if isinstance(history[i], dict) and history[i].get('role') == 'user': + last_user_idx = i + break + if last_user_idx is None: + return None + return history[:last_user_idx] + + def retry_last(session_id: str) -> dict[str, Any]: """Truncate the session to before the last user message, return its text. @@ -63,6 +75,10 @@ def retry_last(session_id: str) -> dict[str, Any]: last_user_text = _extract_text(history[last_user_idx].get('content', '')) removed_count = len(history) - last_user_idx s.messages = history[:last_user_idx] + if isinstance(getattr(s, 'context_messages', None), list) and s.context_messages: + truncated_context = _truncate_at_last_user(s.context_messages) + if truncated_context is not None: + s.context_messages = truncated_context s.save() return {'last_user_text': last_user_text, 'removed_count': removed_count} @@ -98,6 +114,10 @@ def undo_last(session_id: str) -> dict[str, Any]: removed_text = _extract_text(history[last_user_idx].get('content', '')) removed_count = len(history) - last_user_idx s.messages = history[:last_user_idx] + if isinstance(getattr(s, 'context_messages', None), list) and s.context_messages: + truncated_context = _truncate_at_last_user(s.context_messages) + if truncated_context is not None: + s.context_messages = truncated_context s.save() # outside LOCK -- save() re-acquires LOCK via _write_session_index() preview = (removed_text[:40] + '...') if len(removed_text) > 40 else removed_text return { diff --git a/api/streaming.py b/api/streaming.py index 044a7548..49638507 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -998,6 +998,101 @@ def _restore_reasoning_metadata(previous_messages, updated_messages): return updated_messages +def _session_context_messages(session): + """Return model-facing history without assuming it matches the UI transcript.""" + context_messages = getattr(session, 'context_messages', None) + if isinstance(context_messages, list) and context_messages: + return context_messages + return session.messages or [] + + +def _message_identity(msg): + if not isinstance(msg, dict): + return None + role = str(msg.get('role') or '') + content = msg.get('content', '') + text = _message_text(content) + if not text and not msg.get('tool_call_id') and not msg.get('tool_calls'): + return None + return ( + role, + " ".join(str(text or '').split())[:500], + str(msg.get('tool_call_id') or ''), + json.dumps(msg.get('tool_calls') or [], sort_keys=True, ensure_ascii=False), + ) + + +def _messages_have_prefix(messages, prefix): + if len(messages or []) < len(prefix or []): + return False + for idx, expected in enumerate(prefix or []): + if _message_identity((messages or [])[idx]) != _message_identity(expected): + return False + return True + + +def _is_context_compression_marker(msg): + if not isinstance(msg, dict): + return False + text = _message_text(msg.get('content', '')).lower() + return ( + 'context compaction' in text + or 'context compression' in text + or 'context was auto-compressed' in text + or 'active task list was preserved across context compression' in text + ) + + +def _find_current_user_turn(messages, msg_text): + needle = " ".join(str(msg_text or '').split()) + fallback = None + for idx, msg in enumerate(messages or []): + if not isinstance(msg, dict) or msg.get('role') != 'user': + continue + fallback = idx + text = " ".join(_message_text(msg.get('content', '')).split()) + if needle and (needle in text or text in needle): + return idx + return fallback + + +def _merge_display_messages_after_agent_result(previous_display, previous_context, result_messages, msg_text): + """Keep UI transcript durable while allowing model context to compact. + + If Hermes Agent returns a normal append-only history, append that delta to + the UI transcript. If the model/context history was compacted and no longer + has the prior context as a prefix, keep the previous UI transcript and append + only compaction marker messages plus the current user turn onward. + """ + previous_display = list(previous_display or []) + previous_context = list(previous_context or []) + result_messages = list(result_messages or []) + if not result_messages: + return previous_display + + if _messages_have_prefix(result_messages, previous_context): + candidates = result_messages[len(previous_context):] + else: + current_user_idx = _find_current_user_turn(result_messages, msg_text) + marker_candidates = [ + m for m in result_messages[:current_user_idx if current_user_idx is not None else len(result_messages)] + if _is_context_compression_marker(m) + ] + turn_candidates = result_messages[current_user_idx:] if current_user_idx is not None else [] + candidates = marker_candidates + turn_candidates + + merged = previous_display[:] + seen = {_message_identity(m) for m in merged} + for msg in candidates: + key = _message_identity(msg) + if _is_context_compression_marker(msg) and key is not None and key in seen: + continue + merged.append(copy.deepcopy(msg)) + if key is not None: + seen.add(key) + return merged + + def _tool_result_snippet(raw) -> str: """Extract a compact result preview from a stored tool message payload.""" text = str(raw or '') @@ -1668,6 +1763,7 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta if _personality_prompt: agent.ephemeral_system_prompt = _personality_prompt _previous_messages = list(s.messages or []) + _previous_context_messages = list(_session_context_messages(s)) # ── Periodic checkpoint during streaming (Issue #765) ── # The agent works on an internal copy of s.messages during run_conversation() @@ -1711,7 +1807,7 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta result = agent.run_conversation( user_message=workspace_ctx + msg_text, system_message=workspace_system_msg, - conversation_history=_sanitize_messages_for_api(s.messages), + conversation_history=_sanitize_messages_for_api(_previous_context_messages), task_id=session_id, persist_user_message=msg_text, ) @@ -1741,9 +1837,17 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta if _ckpt_thread is not None: _ckpt_thread.join(timeout=15) with _agent_lock: - s.messages = _restore_reasoning_metadata( + _result_messages = result.get('messages') or _previous_context_messages + _next_context_messages = _restore_reasoning_metadata( + _previous_context_messages, + _result_messages, + ) + s.context_messages = _next_context_messages + s.messages = _merge_display_messages_after_agent_result( _previous_messages, - result.get('messages') or s.messages, + _previous_context_messages, + _restore_reasoning_metadata(_previous_messages, _result_messages), + msg_text, ) # Strip XML tool-call blocks from assistant message content. # DeepSeek and some other providers emit ... diff --git a/tests/test_issue1217_transcript_compaction.py b/tests/test_issue1217_transcript_compaction.py new file mode 100644 index 00000000..72c6f08a --- /dev/null +++ b/tests/test_issue1217_transcript_compaction.py @@ -0,0 +1,210 @@ +from api.models import Session +import contextlib + +from api.streaming import ( + _merge_display_messages_after_agent_result, + _sanitize_messages_for_api, + _session_context_messages, +) + + +def test_session_persists_model_context_separately_from_display_transcript(tmp_path, monkeypatch): + """Compacted model context must not replace the visible WebUI transcript.""" + state_dir = tmp_path / "state" + session_dir = state_dir / "sessions" + session_dir.mkdir(parents=True) + + import api.models as models + + monkeypatch.setattr(models, "SESSION_DIR", session_dir) + monkeypatch.setattr(models, "SESSION_INDEX_FILE", state_dir / "session_index.json") + + original_display = [ + {"role": "user", "content": "original long prompt"}, + {"role": "assistant", "content": "original detailed answer"}, + ] + compacted_context = [ + { + "role": "user", + "content": "[CONTEXT COMPACTION — REFERENCE ONLY] Earlier turns were compacted.", + }, + {"role": "user", "content": "continue from here"}, + {"role": "assistant", "content": "continued response"}, + ] + + session = Session( + session_id="issue1217", + workspace=str(tmp_path), + messages=original_display, + context_messages=compacted_context, + ) + session.save(touch_updated_at=False) + + reloaded = Session.load("issue1217") + assert reloaded.messages == original_display + assert reloaded.context_messages == compacted_context + assert _session_context_messages(reloaded) == compacted_context + assert _sanitize_messages_for_api(_session_context_messages(reloaded)) == compacted_context + + +def test_compacted_agent_result_keeps_old_prompts_and_appends_current_turn(): + previous_display = [ + {"role": "user", "content": "first prompt that must remain visible"}, + {"role": "assistant", "content": "first answer"}, + {"role": "user", "content": "second prompt that must remain visible"}, + {"role": "assistant", "content": "second answer"}, + ] + previous_context = list(previous_display) + compacted_result = [ + { + "role": "user", + "content": "[CONTEXT COMPACTION — REFERENCE ONLY] Earlier turns were compacted.", + }, + {"role": "user", "content": "new question after compaction"}, + {"role": "assistant", "content": "new answer after compaction"}, + ] + + merged = _merge_display_messages_after_agent_result( + previous_display, + previous_context, + compacted_result, + "new question after compaction", + ) + + assert [m["content"] for m in merged] == [ + "first prompt that must remain visible", + "first answer", + "second prompt that must remain visible", + "second answer", + "[CONTEXT COMPACTION — REFERENCE ONLY] Earlier turns were compacted.", + "new question after compaction", + "new answer after compaction", + ] + + +def test_append_only_agent_result_preserves_normal_delta_behavior(): + previous_display = [ + {"role": "user", "content": "hello"}, + {"role": "assistant", "content": "hi"}, + ] + previous_context = list(previous_display) + result_messages = previous_context + [ + {"role": "user", "content": "what next?"}, + {"role": "assistant", "content": "next answer"}, + ] + + merged = _merge_display_messages_after_agent_result( + previous_display, + previous_context, + result_messages, + "what next?", + ) + + assert merged == result_messages + + +def test_repeated_user_text_after_compaction_is_not_dropped(): + previous_display = [ + {"role": "user", "content": "continue"}, + {"role": "assistant", "content": "old answer"}, + ] + previous_context = list(previous_display) + compacted_result = [ + {"role": "user", "content": "[CONTEXT COMPACTION — REFERENCE ONLY] summary"}, + {"role": "user", "content": "continue"}, + {"role": "assistant", "content": "new answer"}, + ] + + merged = _merge_display_messages_after_agent_result( + previous_display, + previous_context, + compacted_result, + "continue", + ) + + assert [m["content"] for m in merged] == [ + "continue", + "old answer", + "[CONTEXT COMPACTION — REFERENCE ONLY] summary", + "continue", + "new answer", + ] + + +def test_session_context_falls_back_to_display_messages_for_legacy_sessions(tmp_path): + messages = [ + {"role": "user", "content": "legacy prompt"}, + {"role": "assistant", "content": "legacy answer"}, + ] + session = Session(session_id="legacy1217", workspace=str(tmp_path), messages=messages) + + assert session.context_messages == [] + assert _session_context_messages(session) == messages + + +def test_retry_truncates_model_context_when_it_is_separate(monkeypatch, tmp_path): + import api.session_ops as session_ops + + session = Session( + session_id="retry1217", + workspace=str(tmp_path), + messages=[ + {"role": "user", "content": "visible one"}, + {"role": "assistant", "content": "visible two"}, + {"role": "user", "content": "visible three"}, + {"role": "assistant", "content": "visible four"}, + ], + context_messages=[ + {"role": "user", "content": "[CONTEXT COMPACTION — REFERENCE ONLY] summary"}, + {"role": "user", "content": "visible three"}, + {"role": "assistant", "content": "visible four"}, + ], + ) + saved = [] + session.save = lambda *args, **kwargs: saved.append(True) + monkeypatch.setattr(session_ops, "get_session", lambda sid: session) + monkeypatch.setattr(session_ops, "SESSIONS", {session.session_id: session}) + monkeypatch.setattr(session_ops, "_get_session_agent_lock", lambda sid: contextlib.nullcontext()) + + result = session_ops.retry_last(session.session_id) + + assert result["last_user_text"] == "visible three" + assert [m["content"] for m in session.messages] == ["visible one", "visible two"] + assert [m["content"] for m in session.context_messages] == [ + "[CONTEXT COMPACTION — REFERENCE ONLY] summary" + ] + assert saved + + +def test_undo_truncates_model_context_when_it_is_separate(monkeypatch, tmp_path): + import api.session_ops as session_ops + + session = Session( + session_id="undo1217", + workspace=str(tmp_path), + messages=[ + {"role": "user", "content": "visible one"}, + {"role": "assistant", "content": "visible two"}, + {"role": "user", "content": "visible three"}, + {"role": "assistant", "content": "visible four"}, + ], + context_messages=[ + {"role": "user", "content": "[CONTEXT COMPACTION — REFERENCE ONLY] summary"}, + {"role": "user", "content": "visible three"}, + {"role": "assistant", "content": "visible four"}, + ], + ) + saved = [] + session.save = lambda *args, **kwargs: saved.append(True) + monkeypatch.setattr(session_ops, "get_session", lambda sid: session) + monkeypatch.setattr(session_ops, "SESSIONS", {session.session_id: session}) + monkeypatch.setattr(session_ops, "_get_session_agent_lock", lambda sid: contextlib.nullcontext()) + + result = session_ops.undo_last(session.session_id) + + assert result["removed_count"] == 2 + assert [m["content"] for m in session.messages] == ["visible one", "visible two"] + assert [m["content"] for m in session.context_messages] == [ + "[CONTEXT COMPACTION — REFERENCE ONLY] summary" + ] + assert saved diff --git a/tests/test_issue765_streaming_persistence.py b/tests/test_issue765_streaming_persistence.py index 2efdd241..e68c08d1 100644 --- a/tests/test_issue765_streaming_persistence.py +++ b/tests/test_issue765_streaming_persistence.py @@ -318,8 +318,8 @@ class TestIssue765FollowupHardening: ) stop_idx = src.find("if _checkpoint_stop is not None:\n _checkpoint_stop.set()") join_idx = src.find("if _ckpt_thread is not None:\n _ckpt_thread.join(timeout=15)") - lock_idx = src.find("with _agent_lock:\n s.messages = _restore_reasoning_metadata(") - save_idx = src.find("s.messages = _restore_reasoning_metadata(") + lock_idx = src.find("with _agent_lock:\n _result_messages =") + save_idx = src.find("s.context_messages = _next_context_messages") assert stop_idx != -1, "Success path must stop the checkpoint thread" assert join_idx != -1, "Success path must join the checkpoint thread" @@ -338,7 +338,7 @@ class TestIssue765FollowupHardening: src = (Path(__file__).parent.parent / "api" / "streaming.py").read_text( encoding="utf-8" ) - outer_lock_idx = src.find("with _agent_lock:\n s.messages = _restore_reasoning_metadata(") + outer_lock_idx = src.find("with _agent_lock:\n _result_messages =") silent_failure_idx = src.find("if not _assistant_added and not _token_sent:") inner_lock_idx = src.find("with _agent_lock:", outer_lock_idx + 1) compression_idx = src.find("# ── Handle context compression side effects ──")