diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 7d31e8e2dd..80e8259a3b 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -3455,6 +3455,32 @@ DEFAULT_LOG_BACKUP_COUNT = 1 # and call kanban_block/kanban_complete before max_runtime_seconds kills it. KANBAN_TERMINAL_TIMEOUT_GRACE_SECONDS = 30 +# --------------------------------------------------------------------------- +# Respawn guard constants +# --------------------------------------------------------------------------- + +# Patterns in last_failure_error that indicate a quota / auth blocker. +# These errors won't resolve by retrying immediately — auto-block instead. +_RESPAWN_BLOCKER_RE = re.compile( + r"\b(quota|rate[\s_\-]?limit|429|403|auth\w*|" + r"unauthorized|forbidden|billing|subscription|" + r"access[\s_]denied|permission[\s_]denied|" + r"invalid[\s_]api[\s_]key)\b", + re.IGNORECASE, +) + +# Within this window a completed run counts as "recent proof"; don't re-spawn. +_RESPAWN_GUARD_SUCCESS_WINDOW = 3600 # 1 hour + +# Within this window a GitHub PR URL in a comment blocks re-spawn. +_RESPAWN_GUARD_PR_WINDOW = 86400 # 24 hours + +# Pattern matching a GitHub PR URL in task comments. +_RESPAWN_GUARD_PR_URL_RE = re.compile( + r"https?://github\.com/[^/\s]+/[^/\s]+/pull/\d+", + re.IGNORECASE, +) + @dataclass class DispatchResult: @@ -3483,6 +3509,12 @@ class DispatchResult: stale: list[str] = field(default_factory=list) """Task ids reclaimed because no progress (heartbeat) was seen within ``dispatch_stale_timeout_seconds``.""" + respawn_guarded: list[tuple[str, str]] = field(default_factory=list) + """Tasks skipped by the respawn guard, as ``(task_id, reason)`` pairs. + + Reasons: ``"blocker_auth"`` (quota/auth error — also auto-blocked), + ``"recent_success"`` (completed run within guard window), + ``"active_pr"`` (GitHub PR URL in a recent comment).""" # Bounded registry of recently-reaped worker child exits, populated by the @@ -4358,6 +4390,67 @@ def _clear_failure_counter(conn: sqlite3.Connection, task_id: str) -> None: _clear_spawn_failures = _clear_failure_counter +def check_respawn_guard(conn: sqlite3.Connection, task_id: str) -> Optional[str]: + """Return a guard reason if ``task_id`` should NOT be re-spawned, else None. + + Called per ready task in ``dispatch_once`` before any claim attempt. + Checks in priority order: + + ``"blocker_auth"`` + The task's last failure error matches a quota / authentication + pattern. Retrying immediately will not help; the dispatcher + should auto-block the task to stop the respawn cycle. + + ``"recent_success"`` + A completed run exists within ``_RESPAWN_GUARD_SUCCESS_WINDOW`` + seconds. Useful work already succeeded for this task; wait for + human review rather than immediately re-spawning. + + ``"active_pr"`` + A GitHub PR URL appears in a recent task comment (within + ``_RESPAWN_GUARD_PR_WINDOW`` seconds). A prior worker already + opened a PR; re-spawning risks a duplicate PR on the same task. + + Stale / dead claim locks are NOT a guard reason — they are handled + by ``release_stale_claims`` and ``detect_crashed_workers`` which + reset the task to ``ready`` only after verifying the lock is + genuinely dead (no live PID on this host). + """ + row = conn.execute( + "SELECT last_failure_error FROM tasks WHERE id = ?", + (task_id,), + ).fetchone() + if row is None: + return None + + # 1. Quota / auth blocker: retrying immediately will not help. + err = row["last_failure_error"] + if err and _RESPAWN_BLOCKER_RE.search(err): + return "blocker_auth" + + now = int(time.time()) + + # 2. Completed run within guard window — proof of recent success. + cutoff = now - _RESPAWN_GUARD_SUCCESS_WINDOW + if conn.execute( + "SELECT id FROM task_runs " + "WHERE task_id = ? AND outcome = 'completed' AND ended_at >= ?", + (task_id, cutoff), + ).fetchone(): + return "recent_success" + + # 3. GitHub PR URL in a recent comment — prior worker already opened a PR. + pr_cutoff = now - _RESPAWN_GUARD_PR_WINDOW + for c in conn.execute( + "SELECT body FROM task_comments WHERE task_id = ? AND created_at >= ?", + (task_id, pr_cutoff), + ).fetchall(): + if c["body"] and _RESPAWN_GUARD_PR_URL_RE.search(c["body"]): + return "active_pr" + + return None + + def has_spawnable_ready(conn: sqlite3.Connection) -> bool: """Return True iff there is at least one ready+assigned+unclaimed task whose assignee maps to a real Hermes profile. @@ -4569,6 +4662,32 @@ def dispatch_once( # of human-pulled work. result.skipped_nonspawnable.append(row["id"]) continue + # Respawn guard: refuse to re-spawn when useful work is already + # in-flight/recent, or when the last failure is a deterministic + # blocker (quota / auth) that retrying won't resolve. + guard_reason = check_respawn_guard(conn, row["id"]) + if guard_reason is not None: + if guard_reason == "blocker_auth" and not dry_run: + # Auto-block to stop the cycle — quota/auth errors are + # deterministic and retrying immediately wastes quota. + # block_task emits its own "blocked" event, so no + # additional respawn_guarded event is needed here. + if block_task(conn, row["id"], reason=f"respawn_guard: {guard_reason}"): + result.auto_blocked.append(row["id"]) + else: + result.respawn_guarded.append((row["id"], guard_reason)) + else: + result.respawn_guarded.append((row["id"], guard_reason)) + # Emit an event so operators can see why the task was + # skipped when reading `hermes kanban tail` — without + # this the task appears stuck in ready with no diagnosis. + if not dry_run: + with write_txn(conn): + _append_event( + conn, row["id"], "respawn_guarded", + {"reason": guard_reason}, + ) + continue if dry_run: result.spawned.append((row["id"], row["assignee"], "")) continue diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index 5904c9f208..15c0767d2c 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -974,6 +974,253 @@ def test_dispatch_reclaims_stale_before_spawning(kanban_home): assert res.reclaimed == 1 +# --------------------------------------------------------------------------- +# Respawn guard (check_respawn_guard + dispatch_once integration) +# --------------------------------------------------------------------------- + +def test_respawn_guard_none_on_fresh_task(kanban_home): + """A fresh task with no failures or runs is not guarded.""" + with kb.connect() as conn: + t = kb.create_task(conn, title="fresh", assignee="alice") + reason = kb.check_respawn_guard(conn, t) + assert reason is None + + +def test_respawn_guard_blocker_auth_on_quota_error(kanban_home): + """'quota' in last_failure_error triggers blocker_auth.""" + with kb.connect() as conn: + t = kb.create_task(conn, title="quota-task", assignee="alice") + conn.execute( + "UPDATE tasks SET last_failure_error = ? WHERE id = ?", + ("API quota exceeded: rate limit hit", t), + ) + reason = kb.check_respawn_guard(conn, t) + assert reason == "blocker_auth" + + +def test_respawn_guard_blocker_auth_on_auth_error(kanban_home): + """'unauthorized' in last_failure_error triggers blocker_auth.""" + with kb.connect() as conn: + t = kb.create_task(conn, title="auth-task", assignee="alice") + conn.execute( + "UPDATE tasks SET last_failure_error = ? WHERE id = ?", + ("403 Forbidden: unauthorized to access resource", t), + ) + reason = kb.check_respawn_guard(conn, t) + assert reason == "blocker_auth" + + +def test_respawn_guard_blocker_auth_on_authentication_error(kanban_home): + """Full word 'Authentication' triggers blocker_auth (regex covers auth\\w*).""" + with kb.connect() as conn: + t = kb.create_task(conn, title="authn-task", assignee="alice") + conn.execute( + "UPDATE tasks SET last_failure_error = ? WHERE id = ?", + ("Authentication failed: invalid credentials", t), + ) + reason = kb.check_respawn_guard(conn, t) + assert reason == "blocker_auth" + + +def test_respawn_guard_blocker_auth_on_authorization_error(kanban_home): + """Full word 'authorization' triggers blocker_auth (regex covers auth\\w*).""" + with kb.connect() as conn: + t = kb.create_task(conn, title="authz-task", assignee="alice") + conn.execute( + "UPDATE tasks SET last_failure_error = ? WHERE id = ?", + ("authorization denied for scope repo", t), + ) + reason = kb.check_respawn_guard(conn, t) + assert reason == "blocker_auth" + + +def test_respawn_guard_recent_success(kanban_home): + """A completed run within the guard window triggers recent_success.""" + with kb.connect() as conn: + t = kb.create_task(conn, title="already-done", assignee="alice") + now = int(time.time()) + conn.execute( + "INSERT INTO task_runs (task_id, status, outcome, started_at, ended_at) " + "VALUES (?, 'done', 'completed', ?, ?)", + (t, now - 120, now - 60), + ) + reason = kb.check_respawn_guard(conn, t) + assert reason == "recent_success" + + +def test_respawn_guard_stale_success_not_guarded(kanban_home): + """A completed run outside the guard window does not block re-spawn.""" + with kb.connect() as conn: + t = kb.create_task(conn, title="old-done", assignee="alice") + old_end = int(time.time()) - kb._RESPAWN_GUARD_SUCCESS_WINDOW - 60 + conn.execute( + "INSERT INTO task_runs (task_id, status, outcome, started_at, ended_at) " + "VALUES (?, 'done', 'completed', ?, ?)", + (t, old_end - 300, old_end), + ) + reason = kb.check_respawn_guard(conn, t) + assert reason is None + + +def test_respawn_guard_active_pr_in_comment(kanban_home): + """A GitHub PR URL in a recent comment triggers active_pr.""" + with kb.connect() as conn: + t = kb.create_task(conn, title="has-pr", assignee="alice") + kb.add_comment( + conn, t, "worker", + "PR created: https://github.com/totemx-AI/subsidysmart/pull/42", + ) + reason = kb.check_respawn_guard(conn, t) + assert reason == "active_pr" + + +def test_respawn_guard_old_pr_comment_not_guarded(kanban_home): + """A GitHub PR URL in a comment older than the PR window does not block.""" + with kb.connect() as conn: + t = kb.create_task(conn, title="old-pr", assignee="alice") + old_ts = int(time.time()) - kb._RESPAWN_GUARD_PR_WINDOW - 60 + conn.execute( + "INSERT INTO task_comments (task_id, author, body, created_at) " + "VALUES (?, 'worker', " + "'PR: https://github.com/totemx-AI/subsidysmart/pull/10', ?)", + (t, old_ts), + ) + reason = kb.check_respawn_guard(conn, t) + assert reason is None + + +def test_dispatch_respawn_guard_auto_blocks_auth_error( + kanban_home, all_assignees_spawnable +): + """dispatch_once auto-blocks a ready task whose last error is a blocker_auth.""" + spawned_ids = [] + + def fake_spawn(task, workspace): + spawned_ids.append(task.id) + + with kb.connect() as conn: + t = kb.create_task(conn, title="quota-storm", assignee="alice") + conn.execute( + "UPDATE tasks SET last_failure_error = ? WHERE id = ?", + ("rate limit exceeded: 429 Too Many Requests", t), + ) + res = kb.dispatch_once(conn, spawn_fn=fake_spawn) + + assert t in res.auto_blocked + assert t not in spawned_ids + with kb.connect() as conn: + assert kb.get_task(conn, t).status == "blocked" + + +def test_dispatch_respawn_guard_skips_recent_success( + kanban_home, all_assignees_spawnable +): + """dispatch_once skips (but does not block) a task with a recent completed run.""" + spawned_ids = [] + + def fake_spawn(task, workspace): + spawned_ids.append(task.id) + + with kb.connect() as conn: + t = kb.create_task(conn, title="recent-winner", assignee="alice") + now = int(time.time()) + conn.execute( + "INSERT INTO task_runs (task_id, status, outcome, started_at, ended_at) " + "VALUES (?, 'done', 'completed', ?, ?)", + (t, now - 300, now - 60), + ) + res = kb.dispatch_once(conn, spawn_fn=fake_spawn) + + assert (t, "recent_success") in res.respawn_guarded + assert t not in spawned_ids + assert t not in res.auto_blocked + with kb.connect() as conn: + assert kb.get_task(conn, t).status == "ready" # not blocked, just skipped + + +def test_dispatch_respawn_guard_skips_active_pr( + kanban_home, all_assignees_spawnable +): + """dispatch_once skips (but does not block) a task with an active PR comment.""" + spawned_ids = [] + + def fake_spawn(task, workspace): + spawned_ids.append(task.id) + + with kb.connect() as conn: + t = kb.create_task(conn, title="has-pr", assignee="alice") + kb.add_comment( + conn, t, "worker", + "Opened https://github.com/totemx-AI/subsidysmart/pull/99", + ) + res = kb.dispatch_once(conn, spawn_fn=fake_spawn) + + assert (t, "active_pr") in res.respawn_guarded + assert t not in spawned_ids + assert t not in res.auto_blocked + with kb.connect() as conn: + assert kb.get_task(conn, t).status == "ready" + + +def test_dispatch_respawn_guard_dry_run_no_auto_block( + kanban_home, all_assignees_spawnable +): + """In dry_run mode, blocker_auth tasks are recorded in respawn_guarded (not auto-blocked).""" + with kb.connect() as conn: + t = kb.create_task(conn, title="dry-quota", assignee="alice") + conn.execute( + "UPDATE tasks SET last_failure_error = ? WHERE id = ?", + ("quota exceeded", t), + ) + res = kb.dispatch_once(conn, dry_run=True) + + assert (t, "blocker_auth") in res.respawn_guarded + assert t not in res.auto_blocked + with kb.connect() as conn: + assert kb.get_task(conn, t).status == "ready" # dry_run: no writes + + +def test_dispatch_respawn_guard_allows_clean_task( + kanban_home, all_assignees_spawnable +): + """A task with no guard triggers is spawned normally.""" + spawned_ids = [] + + def fake_spawn(task, workspace): + spawned_ids.append(task.id) + + with kb.connect() as conn: + t = kb.create_task(conn, title="clean-task", assignee="alice") + res = kb.dispatch_once(conn, spawn_fn=fake_spawn) + + assert t in spawned_ids + assert not res.respawn_guarded + assert t not in res.auto_blocked + + +def test_dispatch_respawn_guard_emits_event_for_skipped_task( + kanban_home, all_assignees_spawnable +): + """dispatch_once emits a respawn_guarded task_event so operators can diagnose stuck-ready tasks.""" + with kb.connect() as conn: + t = kb.create_task(conn, title="event-check", assignee="alice") + now = int(time.time()) + conn.execute( + "INSERT INTO task_runs (task_id, status, outcome, started_at, ended_at) " + "VALUES (?, 'done', 'completed', ?, ?)", + (t, now - 300, now - 60), + ) + kb.dispatch_once(conn, spawn_fn=lambda task, ws: None) + events = kb.list_events(conn, t) + + kinds = [e.kind for e in events] + assert "respawn_guarded" in kinds + guarded_evt = next(e for e in events if e.kind == "respawn_guarded") + # Event.payload is already parsed as a dict by list_events. + assert isinstance(guarded_evt.payload, dict) + assert guarded_evt.payload.get("reason") == "recent_success" + + # --------------------------------------------------------------------------- # Workspace resolution # ---------------------------------------------------------------------------