diff --git a/api/streaming.py b/api/streaming.py index 09022ff2..ff1e4a07 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -3946,6 +3946,14 @@ def _run_agent_streaming( _ckpt_thread.join(timeout=15) _lock_ctx = _agent_lock if _agent_lock is not None else contextlib.nullcontext() with _lock_ctx: + if not ephemeral and not _stream_writeback_is_current(s, stream_id): + logger.info( + "Skipping stale stream self-heal writeback for session %s stream %s; active_stream_id=%s", + getattr(s, 'session_id', session_id), + stream_id, + getattr(s, 'active_stream_id', None), + ) + return _result_messages = _heal_result.get('messages') or _previous_context_messages _next_context_messages = _restore_reasoning_metadata( _previous_context_messages, _result_messages, @@ -3987,6 +3995,14 @@ def _run_agent_streaming( # API calls so the LLM never sees its own error as prior context on the next turn. _lock_ctx = _agent_lock if _agent_lock is not None else contextlib.nullcontext() with _lock_ctx: + if not ephemeral and not _stream_writeback_is_current(s, stream_id): + logger.info( + "Skipping stale stream error writeback for session %s stream %s; active_stream_id=%s", + getattr(s, 'session_id', session_id), + stream_id, + getattr(s, 'active_stream_id', None), + ) + return _materialize_pending_user_turn_before_error(s) s.active_stream_id = None s.pending_user_message = None diff --git a/tests/test_stale_stream_writeback.py b/tests/test_stale_stream_writeback.py index 03aa55da..1a4d2398 100644 --- a/tests/test_stale_stream_writeback.py +++ b/tests/test_stale_stream_writeback.py @@ -87,3 +87,29 @@ def test_success_path_checks_stream_ownership_before_persisting_result(): assert compression_pos != -1 assert guard_pos < result_merge_pos assert guard_pos < compression_pos + + +def test_self_heal_retry_success_checks_stream_ownership_before_writeback(): + src = Path("api/streaming.py").read_text(encoding="utf-8") + start = src.index("logger.info('[webui] self-heal (except path): retrying stream") + end = src.index("logger.info('[webui] self-heal (except path): retry succeeded')", start) + block = src[start:end] + guard = "if not ephemeral and not _stream_writeback_is_current(s, stream_id):" + + assert guard in block + assert block.index(guard) < block.index("_result_messages = _heal_result.get('messages') or _previous_context_messages") + assert block.index(guard) < block.index("s.save()") + + +def test_outer_exception_path_checks_stream_ownership_before_error_writeback(): + src = Path("api/streaming.py").read_text(encoding="utf-8") + outer_error_payload = src.index("_error_payload = _provider_error_payload(err_str, _exc_type, _exc_hint)") + start = src.index("# Persist the error so it survives page reload.", outer_error_payload) + end = src.index("put('apperror', _error_payload)", start) + block = src[start:end] + guard = "if not ephemeral and not _stream_writeback_is_current(s, stream_id):" + + assert guard in block + assert block.index(guard) < block.index("_materialize_pending_user_turn_before_error(s)") + assert block.index(guard) < block.index("s.active_stream_id = None") + assert block.index(guard) < block.index("s.messages.append(_error_message)")