fix: skip stale repair for compression parents

This commit is contained in:
ai-ag2026
2026-05-28 07:52:31 +02:00
parent 5528e2c579
commit 5f42e87aa9
5 changed files with 149 additions and 0 deletions
+4
View File
@@ -3,6 +3,10 @@
## [Unreleased]
### Fixed
- Compression parent sessions are no longer repaired as stale interrupted turns when a continuation already exists, preventing false "Response interrupted" markers and hidden continuation rows after auto-compression session rotation. (Refs #2361)
## [v0.51.152] — 2026-05-28 — Release DX (stage-batch34 — single-PR optional gateway-backed browser chat)
### Added
+78
View File
@@ -1850,6 +1850,72 @@ def _apply_core_sync_or_error_marker(
_REPAIR_STALE_PENDING_GRACE_SECONDS = 30
def _has_compression_continuation(session) -> bool:
"""Return True when ``session`` is an archived compression parent.
Context compression rotates the live WebUI session id: the old sidecar is
preserved for lineage while the new child owns the running/completed turn.
Stale-pending repair must not append an interruption marker to that old
parent just because its stream bookkeeping disappeared after the rotation.
"""
sid = getattr(session, 'session_id', None)
if not sid:
return False
def _row_is_continuation(row) -> bool:
if not isinstance(row, dict):
return False
child_sid = row.get('session_id')
if not child_sid or child_sid == sid:
return False
if row.get('parent_session_id') != sid:
return False
# Any child row is enough evidence that this pending state belongs to a
# compression lineage, not a dead standalone turn. The child may itself
# temporarily carry a bad pre_compression_snapshot flag from older code;
# do not filter it out here or the guard misses the exact regression.
return True
try:
with LOCK:
for child in SESSIONS.values():
if getattr(child, 'session_id', None) == sid:
continue
if getattr(child, 'parent_session_id', None) == sid:
return True
except Exception:
pass
try:
if SESSION_INDEX_FILE.exists():
entries = json.loads(SESSION_INDEX_FILE.read_text(encoding='utf-8'))
if isinstance(entries, list) and any(_row_is_continuation(e) for e in entries):
return True
except Exception:
logger.debug("Failed to inspect session index for compression continuation", exc_info=True)
# Index rows can lag behind rapid compression/save races. Fall back to a
# shallow JSON metadata scan; session files write parent_session_id before
# the messages array, so this avoids loading multi-MB transcripts.
try:
needle = f'"parent_session_id": "{sid}"'
for path in SESSION_DIR.glob('*.json'):
if path.name.startswith('_') or path.stem == sid:
continue
try:
head = path.read_text(encoding='utf-8', errors='ignore')[:4096]
except TypeError:
head = path.read_text(encoding='utf-8')[:4096]
except OSError:
continue
if needle in head:
return True
except Exception:
logger.debug("Failed to scan session files for compression continuation", exc_info=True)
return False
def _repair_stale_pending(session) -> bool:
"""Recover a sidecar stuck with messages=[] and stale pending state.
@@ -1872,6 +1938,18 @@ def _repair_stale_pending(session) -> bool:
or not _seen_stream_id
or _seen_stream_id in _active_stream_ids()):
return False
if getattr(session, 'pre_compression_snapshot', False):
logger.debug(
"_repair_stale_pending: skipping pre-compression snapshot %s",
getattr(session, 'session_id', '?'),
)
return False
if _has_compression_continuation(session):
logger.debug(
"_repair_stale_pending: skipping compression parent %s with continuation",
getattr(session, 'session_id', '?'),
)
return False
# Grace-period guard: bail if the turn is too fresh to be a real crash.
# Falsy pending_started_at (None, 0, missing) means "old enough" — preserve
+8
View File
@@ -5197,6 +5197,14 @@ def _run_agent_streaming(
# the write when the file already contains up-to-date data
# (i.e. it was just saved by a checkpoint).
_preserve_pre_compression_snapshot(s, old_sid)
# The continuation is the live/tip session, not another archived
# snapshot. If the in-memory object was itself loaded from a
# pre-compression snapshot (possible on repeated compression chains
# or stale-cache repair paths), _preserve_pre_compression_snapshot()
# intentionally restores that old flag; clear it before saving the
# new continuation so sidebar/discoverability code does not hide the
# session that owns the completed turn.
s.pre_compression_snapshot = False
# Always link the continuation session to its immediate predecessor
# (the preserved snapshot). This OVERRIDES any prior
# parent_session_id because the new continuation IS the next link
@@ -95,3 +95,22 @@ def test_preserve_pre_compression_snapshot_load_and_mark_branch_clears_runtime_f
assert saved["pending_user_message"] is None
assert saved["pending_attachments"] == []
assert saved["pending_started_at"] is None
def test_preserve_pre_compression_snapshot_does_not_leave_continuation_marked_as_snapshot(tmp_path, monkeypatch):
"""A continuation loaded from an old snapshot must not remain hidden."""
monkeypatch.setattr(streaming, "SESSION_DIR", tmp_path)
(tmp_path / "old_session.json").write_text(json.dumps({"messages": []}), encoding="utf-8")
session = FakeSession()
session.pre_compression_snapshot = True
streaming._preserve_pre_compression_snapshot(session, "old_session")
# The helper archives the parent and restores the incoming object state.
# The streaming compression path must clear this before saving the child.
assert session.pre_compression_snapshot is True
session.pre_compression_snapshot = False
session.save(touch_updated_at=False)
continuation = json.loads((tmp_path / "new_session.json").read_text(encoding="utf-8"))
assert continuation["pre_compression_snapshot"] is False
@@ -536,3 +536,43 @@ def test_marker_demotes_after_giveup_seconds(hermes_home, monkeypatch):
assert marker["content"] == models._INTERRUPTED_NEUTRAL_WORDING
_assert_retry_meta_removed(marker)
assert append_calls == 0
def test_repair_stale_pending_skips_pre_compression_snapshot_parent(hermes_home):
"""Archived compression parents must not get synthetic interrupt markers."""
s = _make_dead_stream_session("compressed_parent", stream_id="dead-stream")
s.pre_compression_snapshot = True
original_messages = list(s.messages)
assert models._repair_stale_pending(s) is False
assert s.messages == original_messages
assert s.active_stream_id == "dead-stream"
assert s.pending_user_message
def test_repair_stale_pending_skips_parent_when_continuation_exists(hermes_home):
"""Compression old→new rotation owns the turn in the child, not the old parent."""
parent = _make_dead_stream_session("compression_parent", stream_id="rotated-stream")
child = Session(
session_id="compression_child",
title="Continuation",
parent_session_id="compression_parent",
# Pin the production regression: older code could accidentally save the
# child with pre_compression_snapshot=True, but its parent link still
# proves the parent must not be repaired as a lost standalone turn.
pre_compression_snapshot=True,
messages=[
{"role": "user", "content": "ok, push beide", "timestamp": 10},
{"role": "assistant", "content": "done", "timestamp": 11},
],
)
child.save()
original_messages = list(parent.messages)
assert models._repair_stale_pending(parent) is False
assert parent.messages == original_messages
assert parent.active_stream_id == "rotated-stream"
assert parent.pending_user_message