From ce59e7ca207b6051a7f1aa7a2aa66ec7f35e7adc Mon Sep 17 00:00:00 2001 From: ai-ag2026 <261867348+ai-ag2026@users.noreply.github.com> Date: Thu, 28 May 2026 09:33:40 +0200 Subject: [PATCH] fix: defer stale stream repair for active workers --- api/routes.py | 36 ++++++++++++++++++ tests/test_stale_stream_writeback.py | 56 ++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/api/routes.py b/api/routes.py index 8ee764df..3c6b6850 100644 --- a/api/routes.py +++ b/api/routes.py @@ -1018,6 +1018,42 @@ def _clear_stale_stream_state(session) -> bool: stream_alive = stream_id in STREAMS if stream_alive: return False + try: + from api import config as _live_config + with _live_config.ACTIVE_RUNS_LOCK: + worker_alive = stream_id in (_live_config.ACTIVE_RUNS or {}) + except Exception: + worker_alive = False + if worker_alive: + logger.debug( + "_clear_stale_stream_state: stream %s for session %s missing SSE channel " + "but worker bookkeeping is still active; deferring stale cleanup", + stream_id, + getattr(session, "session_id", "?"), + ) + return False + grace_seconds = 30.0 + try: + from api.models import _REPAIR_STALE_PENDING_GRACE_SECONDS + grace_seconds = float(_REPAIR_STALE_PENDING_GRACE_SECONDS) + pending_started_at = getattr(session, "pending_started_at", None) + pending_age = time.time() - float(pending_started_at) if pending_started_at else None + except Exception: + pending_age = None + if ( + getattr(session, "pending_user_message", None) + and pending_age is not None + and pending_age < grace_seconds + ): + logger.debug( + "_clear_stale_stream_state: stream %s for session %s missing SSE channel " + "but pending turn is %.1fs old; waiting for %.1fs stale-repair grace", + stream_id, + getattr(session, "session_id", "?"), + pending_age, + grace_seconds, + ) + return False # ── #1558 P0 safety: if we were handed a metadata-only stub, reload the # full session before touching persisted state. The original diff --git a/tests/test_stale_stream_writeback.py b/tests/test_stale_stream_writeback.py index 1a4d2398..54a22e82 100644 --- a/tests/test_stale_stream_writeback.py +++ b/tests/test_stale_stream_writeback.py @@ -1,5 +1,6 @@ import queue import threading +import time from pathlib import Path from unittest.mock import Mock @@ -24,12 +25,14 @@ def _isolate_sessions(tmp_path, monkeypatch): config.STREAMS.clear() config.CANCEL_FLAGS.clear() config.AGENT_INSTANCES.clear() + config.ACTIVE_RUNS.clear() config.SESSION_AGENT_LOCKS.clear() yield models.SESSIONS.clear() config.STREAMS.clear() config.CANCEL_FLAGS.clear() config.AGENT_INSTANCES.clear() + config.ACTIVE_RUNS.clear() config.SESSION_AGENT_LOCKS.clear() @@ -75,6 +78,59 @@ def test_cancel_stream_does_not_append_marker_after_stream_ownership_rotated(): assert all(m.get("content") != "*Task cancelled.*" for m in s.messages) +def test_stale_stream_clear_skips_active_worker_when_sse_channel_is_gone(): + import api.routes as routes + + sid = "active_worker_missing_sse" + stream_id = "live-worker-stream" + s = Session( + session_id=sid, + title="Active worker missing SSE", + messages=[{"role": "user", "content": "previous prompt"}], + ) + s.active_stream_id = stream_id + s.pending_user_message = "new prompt" + s.pending_started_at = time.time() + s.save() + models.SESSIONS[sid] = s + + config.register_active_run(stream_id, session_id=sid, phase="running") + + assert routes._clear_stale_stream_state(s) is False + + assert s.active_stream_id == stream_id + assert s.pending_user_message == "new prompt" + assert s.pending_started_at is not None + assert [m["content"] for m in s.messages] == ["previous prompt"] + assert all(not m.get("_error") for m in s.messages) + + +def test_stale_stream_clear_skips_fresh_pending_turn_inside_grace_window(monkeypatch): + import api.routes as routes + + sid = "fresh_pending_missing_sse" + stream_id = "fresh-pending-stream" + s = Session( + session_id=sid, + title="Fresh pending missing SSE", + messages=[{"role": "user", "content": "previous prompt"}], + ) + s.active_stream_id = stream_id + s.pending_user_message = "new prompt" + s.pending_started_at = 1000.0 + s.save() + models.SESSIONS[sid] = s + monkeypatch.setattr(routes.time, "time", lambda: 1005.0) + + assert routes._clear_stale_stream_state(s) is False + + assert s.active_stream_id == stream_id + assert s.pending_user_message == "new prompt" + assert s.pending_started_at == 1000.0 + assert [m["content"] for m in s.messages] == ["previous prompt"] + assert all(not m.get("_error") for m in s.messages) + + def test_success_path_checks_stream_ownership_before_persisting_result(): src = Path("api/streaming.py").read_text(encoding="utf-8") guard = "if not ephemeral and not _stream_writeback_is_current(s, stream_id):"