fix(session): preserve retry budget while journal is still arriving

This commit is contained in:
Isla Liu
2026-05-20 20:49:43 +08:00
parent d5a185d9c6
commit 2a303de2a3
2 changed files with 98 additions and 6 deletions
+63 -5
View File
@@ -957,6 +957,39 @@ def _run_journal_has_visible_output(session, stream_id: str | None) -> bool:
return False
def _journal_is_still_arriving(session, stream_id: str | None) -> bool:
"""Return True for journals that may become visible on a later read.
`read_run_events()` deliberately collapses missing files and empty files
into an empty event list, so the lazy retry path needs a small filesystem
visibility check to avoid burning all retry attempts while WSL2 / network
filesystems are still surfacing the journal. Non-empty journals are treated
as sealed enough for retry-budget accounting; if they contain no visible
output, the normal capped give-up path handles them.
"""
if not stream_id:
return False
try:
from api.run_journal import _run_path, latest_run_summary
path = _run_path(session.session_id, stream_id)
summary = latest_run_summary(session.session_id, stream_id)
if summary.get('terminal'):
return False
try:
return (not path.exists()) or path.stat().st_size == 0
except OSError:
return True
except Exception:
logger.debug(
"Session %s: failed to classify journal visibility for stream %s",
getattr(session, 'session_id', '?'),
stream_id,
exc_info=True,
)
return False
def _append_journaled_partial_output(
session,
stream_id: str | None,
@@ -1137,7 +1170,10 @@ def _append_journaled_partial_output(
# `_append_journaled_partial_output` with `dedupe_existing=True`. On
# success the marker is promoted in place to the recovered-output
# wording, the journaled rows are reordered to sit above the marker,
# and all retry meta is stripped. On failure attempts is incremented.
# and all retry meta is stripped. If the journal is still missing or
# zero-byte, the retry is a no-op and does not consume attempt budget.
# Terminal/non-useful journals consume attempt budget and can demote
# immediately at the max-attempt cap.
# * After `_JOURNAL_RETRY_MAX_ATTEMPTS` failed retries or
# `_JOURNAL_RETRY_GIVEUP_SECONDS` of wall-clock age, the marker is
# demoted to the neutral wording ("Partial output may have been lost.")
@@ -1245,7 +1281,9 @@ def _try_retry_journal_recovery_in_place(session) -> bool:
logger.debug("lazy journal-retry already running for session %s", sid)
return False
try:
return _retry_journal_recovery_in_place(session)
return _retry_journal_recovery_in_place(
session, preserve_arriving_budget=True,
)
finally:
lock.release()
with _JOURNAL_RETRY_LOCKS_GUARD:
@@ -1253,7 +1291,11 @@ def _try_retry_journal_recovery_in_place(session) -> bool:
_JOURNAL_RETRY_LOCKS.pop(sid, None)
def _retry_journal_recovery_in_place(session) -> bool:
def _retry_journal_recovery_in_place(
session,
*,
preserve_arriving_budget: bool = False,
) -> bool:
"""Re-attempt run-journal recovery for the most recent pending marker.
Returns True if the marker was promoted to the recovered-output wording.
@@ -1338,12 +1380,28 @@ def _retry_journal_recovery_in_place(session) -> bool:
attempts,
)
return True
msg['_journal_retry_attempts'] = attempts + 1
if (
preserve_arriving_budget
and _journal_is_still_arriving(session, stream_id)
):
logger.debug(
"Session %s: journal for stream %s still arriving; "
"preserving retry budget",
getattr(session, 'session_id', '?'),
stream_id,
)
return False
next_attempts = attempts + 1
if next_attempts >= _JOURNAL_RETRY_MAX_ATTEMPTS:
msg['content'] = _INTERRUPTED_NEUTRAL_WORDING
_strip_journal_retry_meta(msg)
else:
msg['_journal_retry_attempts'] = next_attempts
try:
session.save(touch_updated_at=False)
except Exception:
logger.debug(
"save() failed while incrementing retry counter for session %s",
"save() failed while updating retry counter for session %s",
getattr(session, 'session_id', '?'),
exc_info=True,
)
+35 -1
View File
@@ -240,7 +240,7 @@ def test_concurrent_get_session_serializes_lazy_journal_retry(hermes_home, monke
counter_lock = threading.Lock()
calls = 0
def slow_retry(session):
def slow_retry(session, *, preserve_arriving_budget=False):
nonlocal calls
with counter_lock:
calls += 1
@@ -260,3 +260,37 @@ def test_concurrent_get_session_serializes_lazy_journal_retry(hermes_home, monke
assert calls == 1
def test_still_arriving_journal_does_not_consume_retry_budget(hermes_home, monkeypatch):
sid = "retry_arriving_sid"
stream_id = "retry_arriving_stream"
s = _make_pending_retry_session(sid, stream_id=stream_id)
models.SESSIONS[sid] = s
monkeypatch.setattr(models, "_append_journaled_partial_output", lambda *a, **kw: False)
monkeypatch.setattr(models, "_journal_is_still_arriving", lambda *a, **kw: True)
for _ in range(20):
assert models.get_session(sid) is s
marker = s.messages[-1]
assert marker["_journal_retry_attempts"] == 0
assert marker["_pending_journal_recovery"] is True
assert marker["content"] == models._INTERRUPTED_PENDING_RETRY_WORDING
def test_sealed_empty_journal_consumes_retry_budget_and_demotes_at_max(hermes_home, monkeypatch):
sid = "retry_sealed_sid"
stream_id = "retry_sealed_stream"
s = _make_pending_retry_session(sid, stream_id=stream_id)
s.messages[-1]["_journal_retry_attempts"] = models._JOURNAL_RETRY_MAX_ATTEMPTS - 1
append_run_event(sid, stream_id, "stream_end", {})
models.SESSIONS[sid] = s
assert models.get_session(sid) is s
marker = s.messages[-1]
assert marker["content"] == models._INTERRUPTED_NEUTRAL_WORDING
_assert_retry_meta_removed(marker)
assert not any(m.get("_recovered_from_run_journal") for m in s.messages)