From 5f42e87aa910f737efad4ccb9d431adffe78e31d Mon Sep 17 00:00:00 2001 From: ai-ag2026 <261867348+ai-ag2026@users.noreply.github.com> Date: Thu, 28 May 2026 07:52:31 +0200 Subject: [PATCH] fix: skip stale repair for compression parents --- CHANGELOG.md | 4 + api/models.py | 78 +++++++++++++++++++ api/streaming.py | 8 ++ ...test_compression_snapshot_runtime_clear.py | 19 +++++ .../test_session_lost_response_regression.py | 40 ++++++++++ 5 files changed, 149 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a990d204..6c36c771 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ ## [Unreleased] +### Fixed + +- Compression parent sessions are no longer repaired as stale interrupted turns when a continuation already exists, preventing false "Response interrupted" markers and hidden continuation rows after auto-compression session rotation. (Refs #2361) + ## [v0.51.152] — 2026-05-28 — Release DX (stage-batch34 — single-PR optional gateway-backed browser chat) ### Added diff --git a/api/models.py b/api/models.py index 4ae8d965..b9da5ffa 100644 --- a/api/models.py +++ b/api/models.py @@ -1850,6 +1850,72 @@ def _apply_core_sync_or_error_marker( _REPAIR_STALE_PENDING_GRACE_SECONDS = 30 +def _has_compression_continuation(session) -> bool: + """Return True when ``session`` is an archived compression parent. + + Context compression rotates the live WebUI session id: the old sidecar is + preserved for lineage while the new child owns the running/completed turn. + Stale-pending repair must not append an interruption marker to that old + parent just because its stream bookkeeping disappeared after the rotation. + """ + sid = getattr(session, 'session_id', None) + if not sid: + return False + + def _row_is_continuation(row) -> bool: + if not isinstance(row, dict): + return False + child_sid = row.get('session_id') + if not child_sid or child_sid == sid: + return False + if row.get('parent_session_id') != sid: + return False + # Any child row is enough evidence that this pending state belongs to a + # compression lineage, not a dead standalone turn. The child may itself + # temporarily carry a bad pre_compression_snapshot flag from older code; + # do not filter it out here or the guard misses the exact regression. + return True + + try: + with LOCK: + for child in SESSIONS.values(): + if getattr(child, 'session_id', None) == sid: + continue + if getattr(child, 'parent_session_id', None) == sid: + return True + except Exception: + pass + + try: + if SESSION_INDEX_FILE.exists(): + entries = json.loads(SESSION_INDEX_FILE.read_text(encoding='utf-8')) + if isinstance(entries, list) and any(_row_is_continuation(e) for e in entries): + return True + except Exception: + logger.debug("Failed to inspect session index for compression continuation", exc_info=True) + + # Index rows can lag behind rapid compression/save races. Fall back to a + # shallow JSON metadata scan; session files write parent_session_id before + # the messages array, so this avoids loading multi-MB transcripts. + try: + needle = f'"parent_session_id": "{sid}"' + for path in SESSION_DIR.glob('*.json'): + if path.name.startswith('_') or path.stem == sid: + continue + try: + head = path.read_text(encoding='utf-8', errors='ignore')[:4096] + except TypeError: + head = path.read_text(encoding='utf-8')[:4096] + except OSError: + continue + if needle in head: + return True + except Exception: + logger.debug("Failed to scan session files for compression continuation", exc_info=True) + + return False + + def _repair_stale_pending(session) -> bool: """Recover a sidecar stuck with messages=[] and stale pending state. @@ -1872,6 +1938,18 @@ def _repair_stale_pending(session) -> bool: or not _seen_stream_id or _seen_stream_id in _active_stream_ids()): return False + if getattr(session, 'pre_compression_snapshot', False): + logger.debug( + "_repair_stale_pending: skipping pre-compression snapshot %s", + getattr(session, 'session_id', '?'), + ) + return False + if _has_compression_continuation(session): + logger.debug( + "_repair_stale_pending: skipping compression parent %s with continuation", + getattr(session, 'session_id', '?'), + ) + return False # Grace-period guard: bail if the turn is too fresh to be a real crash. # Falsy pending_started_at (None, 0, missing) means "old enough" — preserve diff --git a/api/streaming.py b/api/streaming.py index 6c70234d..7800ed06 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -5197,6 +5197,14 @@ def _run_agent_streaming( # the write when the file already contains up-to-date data # (i.e. it was just saved by a checkpoint). _preserve_pre_compression_snapshot(s, old_sid) + # The continuation is the live/tip session, not another archived + # snapshot. If the in-memory object was itself loaded from a + # pre-compression snapshot (possible on repeated compression chains + # or stale-cache repair paths), _preserve_pre_compression_snapshot() + # intentionally restores that old flag; clear it before saving the + # new continuation so sidebar/discoverability code does not hide the + # session that owns the completed turn. + s.pre_compression_snapshot = False # Always link the continuation session to its immediate predecessor # (the preserved snapshot). This OVERRIDES any prior # parent_session_id because the new continuation IS the next link diff --git a/tests/test_compression_snapshot_runtime_clear.py b/tests/test_compression_snapshot_runtime_clear.py index 6ee5bd50..56ca8deb 100644 --- a/tests/test_compression_snapshot_runtime_clear.py +++ b/tests/test_compression_snapshot_runtime_clear.py @@ -95,3 +95,22 @@ def test_preserve_pre_compression_snapshot_load_and_mark_branch_clears_runtime_f assert saved["pending_user_message"] is None assert saved["pending_attachments"] == [] assert saved["pending_started_at"] is None + + +def test_preserve_pre_compression_snapshot_does_not_leave_continuation_marked_as_snapshot(tmp_path, monkeypatch): + """A continuation loaded from an old snapshot must not remain hidden.""" + monkeypatch.setattr(streaming, "SESSION_DIR", tmp_path) + (tmp_path / "old_session.json").write_text(json.dumps({"messages": []}), encoding="utf-8") + session = FakeSession() + session.pre_compression_snapshot = True + + streaming._preserve_pre_compression_snapshot(session, "old_session") + # The helper archives the parent and restores the incoming object state. + # The streaming compression path must clear this before saving the child. + assert session.pre_compression_snapshot is True + + session.pre_compression_snapshot = False + session.save(touch_updated_at=False) + continuation = json.loads((tmp_path / "new_session.json").read_text(encoding="utf-8")) + assert continuation["pre_compression_snapshot"] is False + diff --git a/tests/test_session_lost_response_regression.py b/tests/test_session_lost_response_regression.py index 3c9707b9..9f8b63c7 100644 --- a/tests/test_session_lost_response_regression.py +++ b/tests/test_session_lost_response_regression.py @@ -536,3 +536,43 @@ def test_marker_demotes_after_giveup_seconds(hermes_home, monkeypatch): assert marker["content"] == models._INTERRUPTED_NEUTRAL_WORDING _assert_retry_meta_removed(marker) assert append_calls == 0 + + +def test_repair_stale_pending_skips_pre_compression_snapshot_parent(hermes_home): + """Archived compression parents must not get synthetic interrupt markers.""" + s = _make_dead_stream_session("compressed_parent", stream_id="dead-stream") + s.pre_compression_snapshot = True + original_messages = list(s.messages) + + assert models._repair_stale_pending(s) is False + + assert s.messages == original_messages + assert s.active_stream_id == "dead-stream" + assert s.pending_user_message + + +def test_repair_stale_pending_skips_parent_when_continuation_exists(hermes_home): + """Compression old→new rotation owns the turn in the child, not the old parent.""" + parent = _make_dead_stream_session("compression_parent", stream_id="rotated-stream") + child = Session( + session_id="compression_child", + title="Continuation", + parent_session_id="compression_parent", + # Pin the production regression: older code could accidentally save the + # child with pre_compression_snapshot=True, but its parent link still + # proves the parent must not be repaired as a lost standalone turn. + pre_compression_snapshot=True, + messages=[ + {"role": "user", "content": "ok, push beide", "timestamp": 10}, + {"role": "assistant", "content": "done", "timestamp": 11}, + ], + ) + child.save() + original_messages = list(parent.messages) + + assert models._repair_stale_pending(parent) is False + + assert parent.messages == original_messages + assert parent.active_stream_id == "rotated-stream" + assert parent.pending_user_message +