Merge pull request #2158 into stage-346

Fix stale stream exception writeback guards (closes #2154)

# Conflicts:
#	CHANGELOG.md
This commit is contained in:
Hermes Agent
2026-05-13 06:56:17 +00:00
2 changed files with 42 additions and 0 deletions
+16
View File
@@ -3946,6 +3946,14 @@ def _run_agent_streaming(
_ckpt_thread.join(timeout=15)
_lock_ctx = _agent_lock if _agent_lock is not None else contextlib.nullcontext()
with _lock_ctx:
if not ephemeral and not _stream_writeback_is_current(s, stream_id):
logger.info(
"Skipping stale stream self-heal writeback for session %s stream %s; active_stream_id=%s",
getattr(s, 'session_id', session_id),
stream_id,
getattr(s, 'active_stream_id', None),
)
return
_result_messages = _heal_result.get('messages') or _previous_context_messages
_next_context_messages = _restore_reasoning_metadata(
_previous_context_messages, _result_messages,
@@ -3987,6 +3995,14 @@ def _run_agent_streaming(
# API calls so the LLM never sees its own error as prior context on the next turn.
_lock_ctx = _agent_lock if _agent_lock is not None else contextlib.nullcontext()
with _lock_ctx:
if not ephemeral and not _stream_writeback_is_current(s, stream_id):
logger.info(
"Skipping stale stream error writeback for session %s stream %s; active_stream_id=%s",
getattr(s, 'session_id', session_id),
stream_id,
getattr(s, 'active_stream_id', None),
)
return
_materialize_pending_user_turn_before_error(s)
s.active_stream_id = None
s.pending_user_message = None
+26
View File
@@ -87,3 +87,29 @@ def test_success_path_checks_stream_ownership_before_persisting_result():
assert compression_pos != -1
assert guard_pos < result_merge_pos
assert guard_pos < compression_pos
def test_self_heal_retry_success_checks_stream_ownership_before_writeback():
src = Path("api/streaming.py").read_text(encoding="utf-8")
start = src.index("logger.info('[webui] self-heal (except path): retrying stream")
end = src.index("logger.info('[webui] self-heal (except path): retry succeeded')", start)
block = src[start:end]
guard = "if not ephemeral and not _stream_writeback_is_current(s, stream_id):"
assert guard in block
assert block.index(guard) < block.index("_result_messages = _heal_result.get('messages') or _previous_context_messages")
assert block.index(guard) < block.index("s.save()")
def test_outer_exception_path_checks_stream_ownership_before_error_writeback():
src = Path("api/streaming.py").read_text(encoding="utf-8")
outer_error_payload = src.index("_error_payload = _provider_error_payload(err_str, _exc_type, _exc_hint)")
start = src.index("# Persist the error so it survives page reload.", outer_error_payload)
end = src.index("put('apperror', _error_payload)", start)
block = src[start:end]
guard = "if not ephemeral and not _stream_writeback_is_current(s, stream_id):"
assert guard in block
assert block.index(guard) < block.index("_materialize_pending_user_turn_before_error(s)")
assert block.index(guard) < block.index("s.active_stream_id = None")
assert block.index(guard) < block.index("s.messages.append(_error_message)")