From 9ea4f1145dac0d98a435f7ddc3a3a112b410240b Mon Sep 17 00:00:00 2001 From: Frank Song Date: Wed, 13 May 2026 10:23:03 +0800 Subject: [PATCH] Fix stale stream exception writeback guards --- CHANGELOG.md | 1 + api/streaming.py | 16 ++++++++++++++++ tests/test_stale_stream_writeback.py | 26 ++++++++++++++++++++++++++ 3 files changed, 43 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d737b24..7ca359c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Fixed +- Stale stream ownership checks now also cover the credential self-heal retry success path and the outer exception error persistence path, preventing an old worker from saving retry results or error markers after the active stream has rotated (refs #2154). - **PR #2136** by @LumenYoung — Stale stream writebacks no longer poison the active session transcript. `cancel_stream()` intentionally clears `active_stream_id` early so the UI can accept a follow-up turn while an old worker is unwinding — but the old worker could still return later from `run_conversation()` and persist its stale result over the newer transcript, causing visible transcript / turn journal / `state.db` to disagree (especially around cancel+retry on compressed continuations). Adds a single-line ownership check `_stream_writeback_is_current(session, stream_id)` (token equality against `session.active_stream_id`) and short-circuits both finalize paths: the success path in `_run_agent_streaming` and the cancel-handler path in `cancel_stream()`. When the stream no longer owns the writeback, both paths log `Skipping stale stream/cancel writeback` and return cleanly without persisting. 89-line regression suite in `tests/test_stale_stream_writeback.py`; companion updates to `tests/test_issue1361_cancel_data_loss.py` and `tests/test_sprint42.py` for the new return-without-persist behavior. ### Added diff --git a/api/streaming.py b/api/streaming.py index 09022ff2..ff1e4a07 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -3946,6 +3946,14 @@ def _run_agent_streaming( _ckpt_thread.join(timeout=15) _lock_ctx = _agent_lock if _agent_lock is not None else contextlib.nullcontext() with _lock_ctx: + if not ephemeral and not _stream_writeback_is_current(s, stream_id): + logger.info( + "Skipping stale stream self-heal writeback for session %s stream %s; active_stream_id=%s", + getattr(s, 'session_id', session_id), + stream_id, + getattr(s, 'active_stream_id', None), + ) + return _result_messages = _heal_result.get('messages') or _previous_context_messages _next_context_messages = _restore_reasoning_metadata( _previous_context_messages, _result_messages, @@ -3987,6 +3995,14 @@ def _run_agent_streaming( # API calls so the LLM never sees its own error as prior context on the next turn. _lock_ctx = _agent_lock if _agent_lock is not None else contextlib.nullcontext() with _lock_ctx: + if not ephemeral and not _stream_writeback_is_current(s, stream_id): + logger.info( + "Skipping stale stream error writeback for session %s stream %s; active_stream_id=%s", + getattr(s, 'session_id', session_id), + stream_id, + getattr(s, 'active_stream_id', None), + ) + return _materialize_pending_user_turn_before_error(s) s.active_stream_id = None s.pending_user_message = None diff --git a/tests/test_stale_stream_writeback.py b/tests/test_stale_stream_writeback.py index 03aa55da..1a4d2398 100644 --- a/tests/test_stale_stream_writeback.py +++ b/tests/test_stale_stream_writeback.py @@ -87,3 +87,29 @@ def test_success_path_checks_stream_ownership_before_persisting_result(): assert compression_pos != -1 assert guard_pos < result_merge_pos assert guard_pos < compression_pos + + +def test_self_heal_retry_success_checks_stream_ownership_before_writeback(): + src = Path("api/streaming.py").read_text(encoding="utf-8") + start = src.index("logger.info('[webui] self-heal (except path): retrying stream") + end = src.index("logger.info('[webui] self-heal (except path): retry succeeded')", start) + block = src[start:end] + guard = "if not ephemeral and not _stream_writeback_is_current(s, stream_id):" + + assert guard in block + assert block.index(guard) < block.index("_result_messages = _heal_result.get('messages') or _previous_context_messages") + assert block.index(guard) < block.index("s.save()") + + +def test_outer_exception_path_checks_stream_ownership_before_error_writeback(): + src = Path("api/streaming.py").read_text(encoding="utf-8") + outer_error_payload = src.index("_error_payload = _provider_error_payload(err_str, _exc_type, _exc_hint)") + start = src.index("# Persist the error so it survives page reload.", outer_error_payload) + end = src.index("put('apperror', _error_payload)", start) + block = src[start:end] + guard = "if not ephemeral and not _stream_writeback_is_current(s, stream_id):" + + assert guard in block + assert block.index(guard) < block.index("_materialize_pending_user_turn_before_error(s)") + assert block.index(guard) < block.index("s.active_stream_id = None") + assert block.index(guard) < block.index("s.messages.append(_error_message)")