fix: defer stale stream repair for active workers

This commit is contained in:
ai-ag2026
2026-05-28 09:33:40 +02:00
parent 5f42e87aa9
commit ce59e7ca20
2 changed files with 92 additions and 0 deletions
+36
View File
@@ -1018,6 +1018,42 @@ def _clear_stale_stream_state(session) -> bool:
stream_alive = stream_id in STREAMS
if stream_alive:
return False
try:
from api import config as _live_config
with _live_config.ACTIVE_RUNS_LOCK:
worker_alive = stream_id in (_live_config.ACTIVE_RUNS or {})
except Exception:
worker_alive = False
if worker_alive:
logger.debug(
"_clear_stale_stream_state: stream %s for session %s missing SSE channel "
"but worker bookkeeping is still active; deferring stale cleanup",
stream_id,
getattr(session, "session_id", "?"),
)
return False
grace_seconds = 30.0
try:
from api.models import _REPAIR_STALE_PENDING_GRACE_SECONDS
grace_seconds = float(_REPAIR_STALE_PENDING_GRACE_SECONDS)
pending_started_at = getattr(session, "pending_started_at", None)
pending_age = time.time() - float(pending_started_at) if pending_started_at else None
except Exception:
pending_age = None
if (
getattr(session, "pending_user_message", None)
and pending_age is not None
and pending_age < grace_seconds
):
logger.debug(
"_clear_stale_stream_state: stream %s for session %s missing SSE channel "
"but pending turn is %.1fs old; waiting for %.1fs stale-repair grace",
stream_id,
getattr(session, "session_id", "?"),
pending_age,
grace_seconds,
)
return False
# ── #1558 P0 safety: if we were handed a metadata-only stub, reload the
# full session before touching persisted state. The original
+56
View File
@@ -1,5 +1,6 @@
import queue
import threading
import time
from pathlib import Path
from unittest.mock import Mock
@@ -24,12 +25,14 @@ def _isolate_sessions(tmp_path, monkeypatch):
config.STREAMS.clear()
config.CANCEL_FLAGS.clear()
config.AGENT_INSTANCES.clear()
config.ACTIVE_RUNS.clear()
config.SESSION_AGENT_LOCKS.clear()
yield
models.SESSIONS.clear()
config.STREAMS.clear()
config.CANCEL_FLAGS.clear()
config.AGENT_INSTANCES.clear()
config.ACTIVE_RUNS.clear()
config.SESSION_AGENT_LOCKS.clear()
@@ -75,6 +78,59 @@ def test_cancel_stream_does_not_append_marker_after_stream_ownership_rotated():
assert all(m.get("content") != "*Task cancelled.*" for m in s.messages)
def test_stale_stream_clear_skips_active_worker_when_sse_channel_is_gone():
import api.routes as routes
sid = "active_worker_missing_sse"
stream_id = "live-worker-stream"
s = Session(
session_id=sid,
title="Active worker missing SSE",
messages=[{"role": "user", "content": "previous prompt"}],
)
s.active_stream_id = stream_id
s.pending_user_message = "new prompt"
s.pending_started_at = time.time()
s.save()
models.SESSIONS[sid] = s
config.register_active_run(stream_id, session_id=sid, phase="running")
assert routes._clear_stale_stream_state(s) is False
assert s.active_stream_id == stream_id
assert s.pending_user_message == "new prompt"
assert s.pending_started_at is not None
assert [m["content"] for m in s.messages] == ["previous prompt"]
assert all(not m.get("_error") for m in s.messages)
def test_stale_stream_clear_skips_fresh_pending_turn_inside_grace_window(monkeypatch):
import api.routes as routes
sid = "fresh_pending_missing_sse"
stream_id = "fresh-pending-stream"
s = Session(
session_id=sid,
title="Fresh pending missing SSE",
messages=[{"role": "user", "content": "previous prompt"}],
)
s.active_stream_id = stream_id
s.pending_user_message = "new prompt"
s.pending_started_at = 1000.0
s.save()
models.SESSIONS[sid] = s
monkeypatch.setattr(routes.time, "time", lambda: 1005.0)
assert routes._clear_stale_stream_state(s) is False
assert s.active_stream_id == stream_id
assert s.pending_user_message == "new prompt"
assert s.pending_started_at == 1000.0
assert [m["content"] for m in s.messages] == ["previous prompt"]
assert all(not m.get("_error") for m in s.messages)
def test_success_path_checks_stream_ownership_before_persisting_result():
src = Path("api/streaming.py").read_text(encoding="utf-8")
guard = "if not ephemeral and not _stream_writeback_is_current(s, stream_id):"