feat(kanban): add respawn guard to block repeat worker storms

Salvages #27484 by @fardoche6. Adds a respawn guard that skips worker
spawn for tasks where:
- a recent run already succeeded (recent_success — within guard window)
- the previous run hit a quota/auth error (blocker_auth, also auto-blocks)
- a recent task comment includes a GitHub PR URL (active_pr)

The guard prevents repeat worker storms on the same bug/task. Includes
the contributor's review-findings fixup (regex hardening, observability,
auth coverage).

Resolved a small DispatchResult conflict alongside main's 'stale' field;
kept both. Authorship preserved via rebase merge.
This commit is contained in:
fardoche6
2026-05-18 21:22:26 -07:00
committed by Teknium
parent 341912c224
commit 264e85b3dd
2 changed files with 366 additions and 0 deletions
+119
View File
@@ -3455,6 +3455,32 @@ DEFAULT_LOG_BACKUP_COUNT = 1
# and call kanban_block/kanban_complete before max_runtime_seconds kills it.
KANBAN_TERMINAL_TIMEOUT_GRACE_SECONDS = 30
# ---------------------------------------------------------------------------
# Respawn guard constants
# ---------------------------------------------------------------------------
# Patterns in last_failure_error that indicate a quota / auth blocker.
# These errors won't resolve by retrying immediately — auto-block instead.
_RESPAWN_BLOCKER_RE = re.compile(
r"\b(quota|rate[\s_\-]?limit|429|403|auth\w*|"
r"unauthorized|forbidden|billing|subscription|"
r"access[\s_]denied|permission[\s_]denied|"
r"invalid[\s_]api[\s_]key)\b",
re.IGNORECASE,
)
# Within this window a completed run counts as "recent proof"; don't re-spawn.
_RESPAWN_GUARD_SUCCESS_WINDOW = 3600 # 1 hour
# Within this window a GitHub PR URL in a comment blocks re-spawn.
_RESPAWN_GUARD_PR_WINDOW = 86400 # 24 hours
# Pattern matching a GitHub PR URL in task comments.
_RESPAWN_GUARD_PR_URL_RE = re.compile(
r"https?://github\.com/[^/\s]+/[^/\s]+/pull/\d+",
re.IGNORECASE,
)
@dataclass
class DispatchResult:
@@ -3483,6 +3509,12 @@ class DispatchResult:
stale: list[str] = field(default_factory=list)
"""Task ids reclaimed because no progress (heartbeat) was seen
within ``dispatch_stale_timeout_seconds``."""
respawn_guarded: list[tuple[str, str]] = field(default_factory=list)
"""Tasks skipped by the respawn guard, as ``(task_id, reason)`` pairs.
Reasons: ``"blocker_auth"`` (quota/auth error — also auto-blocked),
``"recent_success"`` (completed run within guard window),
``"active_pr"`` (GitHub PR URL in a recent comment)."""
# Bounded registry of recently-reaped worker child exits, populated by the
@@ -4358,6 +4390,67 @@ def _clear_failure_counter(conn: sqlite3.Connection, task_id: str) -> None:
_clear_spawn_failures = _clear_failure_counter
def check_respawn_guard(conn: sqlite3.Connection, task_id: str) -> Optional[str]:
"""Return a guard reason if ``task_id`` should NOT be re-spawned, else None.
Called per ready task in ``dispatch_once`` before any claim attempt.
Checks in priority order:
``"blocker_auth"``
The task's last failure error matches a quota / authentication
pattern. Retrying immediately will not help; the dispatcher
should auto-block the task to stop the respawn cycle.
``"recent_success"``
A completed run exists within ``_RESPAWN_GUARD_SUCCESS_WINDOW``
seconds. Useful work already succeeded for this task; wait for
human review rather than immediately re-spawning.
``"active_pr"``
A GitHub PR URL appears in a recent task comment (within
``_RESPAWN_GUARD_PR_WINDOW`` seconds). A prior worker already
opened a PR; re-spawning risks a duplicate PR on the same task.
Stale / dead claim locks are NOT a guard reason — they are handled
by ``release_stale_claims`` and ``detect_crashed_workers`` which
reset the task to ``ready`` only after verifying the lock is
genuinely dead (no live PID on this host).
"""
row = conn.execute(
"SELECT last_failure_error FROM tasks WHERE id = ?",
(task_id,),
).fetchone()
if row is None:
return None
# 1. Quota / auth blocker: retrying immediately will not help.
err = row["last_failure_error"]
if err and _RESPAWN_BLOCKER_RE.search(err):
return "blocker_auth"
now = int(time.time())
# 2. Completed run within guard window — proof of recent success.
cutoff = now - _RESPAWN_GUARD_SUCCESS_WINDOW
if conn.execute(
"SELECT id FROM task_runs "
"WHERE task_id = ? AND outcome = 'completed' AND ended_at >= ?",
(task_id, cutoff),
).fetchone():
return "recent_success"
# 3. GitHub PR URL in a recent comment — prior worker already opened a PR.
pr_cutoff = now - _RESPAWN_GUARD_PR_WINDOW
for c in conn.execute(
"SELECT body FROM task_comments WHERE task_id = ? AND created_at >= ?",
(task_id, pr_cutoff),
).fetchall():
if c["body"] and _RESPAWN_GUARD_PR_URL_RE.search(c["body"]):
return "active_pr"
return None
def has_spawnable_ready(conn: sqlite3.Connection) -> bool:
"""Return True iff there is at least one ready+assigned+unclaimed task
whose assignee maps to a real Hermes profile.
@@ -4569,6 +4662,32 @@ def dispatch_once(
# of human-pulled work.
result.skipped_nonspawnable.append(row["id"])
continue
# Respawn guard: refuse to re-spawn when useful work is already
# in-flight/recent, or when the last failure is a deterministic
# blocker (quota / auth) that retrying won't resolve.
guard_reason = check_respawn_guard(conn, row["id"])
if guard_reason is not None:
if guard_reason == "blocker_auth" and not dry_run:
# Auto-block to stop the cycle — quota/auth errors are
# deterministic and retrying immediately wastes quota.
# block_task emits its own "blocked" event, so no
# additional respawn_guarded event is needed here.
if block_task(conn, row["id"], reason=f"respawn_guard: {guard_reason}"):
result.auto_blocked.append(row["id"])
else:
result.respawn_guarded.append((row["id"], guard_reason))
else:
result.respawn_guarded.append((row["id"], guard_reason))
# Emit an event so operators can see why the task was
# skipped when reading `hermes kanban tail` — without
# this the task appears stuck in ready with no diagnosis.
if not dry_run:
with write_txn(conn):
_append_event(
conn, row["id"], "respawn_guarded",
{"reason": guard_reason},
)
continue
if dry_run:
result.spawned.append((row["id"], row["assignee"], ""))
continue
+247
View File
@@ -974,6 +974,253 @@ def test_dispatch_reclaims_stale_before_spawning(kanban_home):
assert res.reclaimed == 1
# ---------------------------------------------------------------------------
# Respawn guard (check_respawn_guard + dispatch_once integration)
# ---------------------------------------------------------------------------
def test_respawn_guard_none_on_fresh_task(kanban_home):
"""A fresh task with no failures or runs is not guarded."""
with kb.connect() as conn:
t = kb.create_task(conn, title="fresh", assignee="alice")
reason = kb.check_respawn_guard(conn, t)
assert reason is None
def test_respawn_guard_blocker_auth_on_quota_error(kanban_home):
"""'quota' in last_failure_error triggers blocker_auth."""
with kb.connect() as conn:
t = kb.create_task(conn, title="quota-task", assignee="alice")
conn.execute(
"UPDATE tasks SET last_failure_error = ? WHERE id = ?",
("API quota exceeded: rate limit hit", t),
)
reason = kb.check_respawn_guard(conn, t)
assert reason == "blocker_auth"
def test_respawn_guard_blocker_auth_on_auth_error(kanban_home):
"""'unauthorized' in last_failure_error triggers blocker_auth."""
with kb.connect() as conn:
t = kb.create_task(conn, title="auth-task", assignee="alice")
conn.execute(
"UPDATE tasks SET last_failure_error = ? WHERE id = ?",
("403 Forbidden: unauthorized to access resource", t),
)
reason = kb.check_respawn_guard(conn, t)
assert reason == "blocker_auth"
def test_respawn_guard_blocker_auth_on_authentication_error(kanban_home):
"""Full word 'Authentication' triggers blocker_auth (regex covers auth\\w*)."""
with kb.connect() as conn:
t = kb.create_task(conn, title="authn-task", assignee="alice")
conn.execute(
"UPDATE tasks SET last_failure_error = ? WHERE id = ?",
("Authentication failed: invalid credentials", t),
)
reason = kb.check_respawn_guard(conn, t)
assert reason == "blocker_auth"
def test_respawn_guard_blocker_auth_on_authorization_error(kanban_home):
"""Full word 'authorization' triggers blocker_auth (regex covers auth\\w*)."""
with kb.connect() as conn:
t = kb.create_task(conn, title="authz-task", assignee="alice")
conn.execute(
"UPDATE tasks SET last_failure_error = ? WHERE id = ?",
("authorization denied for scope repo", t),
)
reason = kb.check_respawn_guard(conn, t)
assert reason == "blocker_auth"
def test_respawn_guard_recent_success(kanban_home):
"""A completed run within the guard window triggers recent_success."""
with kb.connect() as conn:
t = kb.create_task(conn, title="already-done", assignee="alice")
now = int(time.time())
conn.execute(
"INSERT INTO task_runs (task_id, status, outcome, started_at, ended_at) "
"VALUES (?, 'done', 'completed', ?, ?)",
(t, now - 120, now - 60),
)
reason = kb.check_respawn_guard(conn, t)
assert reason == "recent_success"
def test_respawn_guard_stale_success_not_guarded(kanban_home):
"""A completed run outside the guard window does not block re-spawn."""
with kb.connect() as conn:
t = kb.create_task(conn, title="old-done", assignee="alice")
old_end = int(time.time()) - kb._RESPAWN_GUARD_SUCCESS_WINDOW - 60
conn.execute(
"INSERT INTO task_runs (task_id, status, outcome, started_at, ended_at) "
"VALUES (?, 'done', 'completed', ?, ?)",
(t, old_end - 300, old_end),
)
reason = kb.check_respawn_guard(conn, t)
assert reason is None
def test_respawn_guard_active_pr_in_comment(kanban_home):
"""A GitHub PR URL in a recent comment triggers active_pr."""
with kb.connect() as conn:
t = kb.create_task(conn, title="has-pr", assignee="alice")
kb.add_comment(
conn, t, "worker",
"PR created: https://github.com/totemx-AI/subsidysmart/pull/42",
)
reason = kb.check_respawn_guard(conn, t)
assert reason == "active_pr"
def test_respawn_guard_old_pr_comment_not_guarded(kanban_home):
"""A GitHub PR URL in a comment older than the PR window does not block."""
with kb.connect() as conn:
t = kb.create_task(conn, title="old-pr", assignee="alice")
old_ts = int(time.time()) - kb._RESPAWN_GUARD_PR_WINDOW - 60
conn.execute(
"INSERT INTO task_comments (task_id, author, body, created_at) "
"VALUES (?, 'worker', "
"'PR: https://github.com/totemx-AI/subsidysmart/pull/10', ?)",
(t, old_ts),
)
reason = kb.check_respawn_guard(conn, t)
assert reason is None
def test_dispatch_respawn_guard_auto_blocks_auth_error(
kanban_home, all_assignees_spawnable
):
"""dispatch_once auto-blocks a ready task whose last error is a blocker_auth."""
spawned_ids = []
def fake_spawn(task, workspace):
spawned_ids.append(task.id)
with kb.connect() as conn:
t = kb.create_task(conn, title="quota-storm", assignee="alice")
conn.execute(
"UPDATE tasks SET last_failure_error = ? WHERE id = ?",
("rate limit exceeded: 429 Too Many Requests", t),
)
res = kb.dispatch_once(conn, spawn_fn=fake_spawn)
assert t in res.auto_blocked
assert t not in spawned_ids
with kb.connect() as conn:
assert kb.get_task(conn, t).status == "blocked"
def test_dispatch_respawn_guard_skips_recent_success(
kanban_home, all_assignees_spawnable
):
"""dispatch_once skips (but does not block) a task with a recent completed run."""
spawned_ids = []
def fake_spawn(task, workspace):
spawned_ids.append(task.id)
with kb.connect() as conn:
t = kb.create_task(conn, title="recent-winner", assignee="alice")
now = int(time.time())
conn.execute(
"INSERT INTO task_runs (task_id, status, outcome, started_at, ended_at) "
"VALUES (?, 'done', 'completed', ?, ?)",
(t, now - 300, now - 60),
)
res = kb.dispatch_once(conn, spawn_fn=fake_spawn)
assert (t, "recent_success") in res.respawn_guarded
assert t not in spawned_ids
assert t not in res.auto_blocked
with kb.connect() as conn:
assert kb.get_task(conn, t).status == "ready" # not blocked, just skipped
def test_dispatch_respawn_guard_skips_active_pr(
kanban_home, all_assignees_spawnable
):
"""dispatch_once skips (but does not block) a task with an active PR comment."""
spawned_ids = []
def fake_spawn(task, workspace):
spawned_ids.append(task.id)
with kb.connect() as conn:
t = kb.create_task(conn, title="has-pr", assignee="alice")
kb.add_comment(
conn, t, "worker",
"Opened https://github.com/totemx-AI/subsidysmart/pull/99",
)
res = kb.dispatch_once(conn, spawn_fn=fake_spawn)
assert (t, "active_pr") in res.respawn_guarded
assert t not in spawned_ids
assert t not in res.auto_blocked
with kb.connect() as conn:
assert kb.get_task(conn, t).status == "ready"
def test_dispatch_respawn_guard_dry_run_no_auto_block(
kanban_home, all_assignees_spawnable
):
"""In dry_run mode, blocker_auth tasks are recorded in respawn_guarded (not auto-blocked)."""
with kb.connect() as conn:
t = kb.create_task(conn, title="dry-quota", assignee="alice")
conn.execute(
"UPDATE tasks SET last_failure_error = ? WHERE id = ?",
("quota exceeded", t),
)
res = kb.dispatch_once(conn, dry_run=True)
assert (t, "blocker_auth") in res.respawn_guarded
assert t not in res.auto_blocked
with kb.connect() as conn:
assert kb.get_task(conn, t).status == "ready" # dry_run: no writes
def test_dispatch_respawn_guard_allows_clean_task(
kanban_home, all_assignees_spawnable
):
"""A task with no guard triggers is spawned normally."""
spawned_ids = []
def fake_spawn(task, workspace):
spawned_ids.append(task.id)
with kb.connect() as conn:
t = kb.create_task(conn, title="clean-task", assignee="alice")
res = kb.dispatch_once(conn, spawn_fn=fake_spawn)
assert t in spawned_ids
assert not res.respawn_guarded
assert t not in res.auto_blocked
def test_dispatch_respawn_guard_emits_event_for_skipped_task(
kanban_home, all_assignees_spawnable
):
"""dispatch_once emits a respawn_guarded task_event so operators can diagnose stuck-ready tasks."""
with kb.connect() as conn:
t = kb.create_task(conn, title="event-check", assignee="alice")
now = int(time.time())
conn.execute(
"INSERT INTO task_runs (task_id, status, outcome, started_at, ended_at) "
"VALUES (?, 'done', 'completed', ?, ?)",
(t, now - 300, now - 60),
)
kb.dispatch_once(conn, spawn_fn=lambda task, ws: None)
events = kb.list_events(conn, t)
kinds = [e.kind for e in events]
assert "respawn_guarded" in kinds
guarded_evt = next(e for e in events if e.kind == "respawn_guarded")
# Event.payload is already parsed as a dict by list_events.
assert isinstance(guarded_evt.payload, dict)
assert guarded_evt.payload.get("reason") == "recent_success"
# ---------------------------------------------------------------------------
# Workspace resolution
# ---------------------------------------------------------------------------