diff --git a/hermes_cli/kanban.py b/hermes_cli/kanban.py index 55b1d4125a..12e3e71e9e 100644 --- a/hermes_cli/kanban.py +++ b/hermes_cli/kanban.py @@ -1393,6 +1393,11 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int: the dashboard uses, so CLI output matches what the UI shows. """ from hermes_cli import kanban_diagnostics as kd + from hermes_cli.config import load_config + + diag_config = kd.config_from_kanban_config( + (load_config().get("kanban") or {}) + ) with kb.connect() as conn: # Either one-task mode or fleet mode. @@ -1406,6 +1411,7 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int: task, kb.list_events(conn, args.task), kb.list_runs(conn, args.task), + config=diag_config, ) } else: @@ -1433,7 +1439,12 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int: diags_by_task = {} for r in rows: tid = r["id"] - dl = kd.compute_task_diagnostics(r, ev_by.get(tid, []), run_by.get(tid, [])) + dl = kd.compute_task_diagnostics( + r, + ev_by.get(tid, []), + run_by.get(tid, []), + config=diag_config, + ) if dl: diags_by_task[tid] = dl diff --git a/hermes_cli/kanban_diagnostics.py b/hermes_cli/kanban_diagnostics.py index 42c0c2043f..2f8b7c8ed0 100644 --- a/hermes_cli/kanban_diagnostics.py +++ b/hermes_cli/kanban_diagnostics.py @@ -230,6 +230,14 @@ def _generic_recovery_actions(task: Any, *, running: bool) -> list[DiagnosticAct RuleFn = Callable[[Any, list[Any], list[Any], int, dict], list[Diagnostic]] +def _positive_int(value: Any, default: int) -> int: + try: + parsed = int(value) + except (TypeError, ValueError): + return default + return parsed if parsed >= 1 else default + + def _rule_hallucinated_cards(task, events, runs, now, cfg) -> list[Diagnostic]: """Blocked-hallucination gate fires: a worker called kanban_complete with created_cards that didn't exist or weren't created by the @@ -319,18 +327,19 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]: all look the same: the kernel keeps retrying and the operator needs to intervene. - Threshold: cfg["failure_threshold"] (default 3). A threshold of 3 - is one below the circuit-breaker's default (5), so the diagnostic - surfaces BEFORE the breaker trips — giving operators a window to - fix the problem while the dispatcher's still retrying. + Threshold: cfg["failure_threshold"]. Runtime callers should derive + this from ``kanban.failure_limit`` unless the user explicitly set a + diagnostics threshold, so the signal does not lag behind the + dispatcher's circuit breaker. Accepts the legacy ``spawn_failure_threshold`` config key for back-compat. """ - threshold = int(cfg.get( + threshold = _positive_int(cfg.get( "failure_threshold", cfg.get("spawn_failure_threshold", 3), - )) + ), 3) + failure_limit = _positive_int(cfg.get("failure_limit"), threshold) # Read the new unified counter name, with a fallback to the legacy # column name so this rule keeps working against old DB rows the # caller somehow materialised without running the migration. @@ -402,10 +411,9 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]: f"This task has failed {failures} times in a row " f"(most recent: {outcome_label}). Full last error:\n\n" f"{err_snippet}\n\n" - f"The dispatcher will keep retrying until the consecutive-" - f"failures counter trips the circuit breaker (default 5), " - f"at which point the task auto-blocks. Fix the root cause " - f"and reclaim to retry." + f"The dispatcher circuit breaker is configured for " + f"{failure_limit} consecutive non-success attempts. Fix the " + f"root cause and reclaim or unblock the task to retry." ) else: title = f"Agent {outcome_label} x{failures} (no error recorded)" @@ -427,6 +435,8 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]: "consecutive_failures": failures, "most_recent_outcome": most_recent_outcome, "last_error": last_err, + "failure_threshold": threshold, + "failure_limit": failure_limit, }, )] @@ -716,9 +726,11 @@ DIAGNOSTIC_KINDS = ( DEFAULT_CONFIG = { - "failure_threshold": 3, + # Match the dispatcher default (kanban.failure_limit) so repeated-failure + # diagnostics do not lag behind the default auto-block threshold. + "failure_threshold": 2, # Legacy alias accepted at read time by _rule_repeated_failures. - "spawn_failure_threshold": 3, + "spawn_failure_threshold": 2, "crash_threshold": 2, "blocked_stale_hours": 24, # Stranded-task threshold. 30 min by default — below that, the @@ -728,6 +740,28 @@ DEFAULT_CONFIG = { } +def config_from_kanban_config(kanban_cfg: Optional[dict]) -> dict: + """Build diagnostics config from the runtime ``kanban`` config section. + + ``kanban.diagnostics.failure_threshold`` remains an explicit override. + Otherwise, derive the repeated-failure threshold from + ``kanban.failure_limit`` so CLI/dashboard diagnostics match the + dispatcher's actual circuit-breaker threshold. + """ + kanban_cfg = kanban_cfg or {} + diag_cfg = dict(kanban_cfg.get("diagnostics") or {}) + diag_cfg.setdefault( + "failure_limit", + kanban_cfg.get("failure_limit", DEFAULT_CONFIG["failure_threshold"]), + ) + if ( + "failure_threshold" not in diag_cfg + and "spawn_failure_threshold" not in diag_cfg + ): + diag_cfg["failure_threshold"] = diag_cfg["failure_limit"] + return diag_cfg + + def compute_task_diagnostics( task, events: list, @@ -743,7 +777,17 @@ def compute_task_diagnostics( most-recent ``last_seen_at``. """ now_ts = int(now if now is not None else time.time()) - cfg = {**DEFAULT_CONFIG, **(config or {})} + config = config or {} + cfg = {**DEFAULT_CONFIG, **config} + if ( + "failure_threshold" not in config + and "spawn_failure_threshold" not in config + and "failure_limit" in config + ): + cfg["failure_threshold"] = _positive_int( + config.get("failure_limit"), + DEFAULT_CONFIG["failure_threshold"], + ) out: list[Diagnostic] = [] for rule in _RULES: try: diff --git a/plugins/kanban/dashboard/plugin_api.py b/plugins/kanban/dashboard/plugin_api.py index 16e6066385..0a4685b4a5 100644 --- a/plugins/kanban/dashboard/plugin_api.py +++ b/plugins/kanban/dashboard/plugin_api.py @@ -224,6 +224,11 @@ def _compute_task_diagnostics( rule definitions. """ from hermes_cli import kanban_diagnostics as kd + from hermes_cli.config import load_config + + diag_config = kd.config_from_kanban_config( + (load_config().get("kanban") or {}) + ) # Build the candidate task list. We need each task's row + its # events + its runs. Doing N separate queries works but scales @@ -270,6 +275,7 @@ def _compute_task_diagnostics( r, events_by_task.get(tid, []), runs_by_task.get(tid, []), + config=diag_config, ) if diags: out[tid] = [d.to_dict() for d in diags] diff --git a/tests/hermes_cli/test_kanban_diagnostics.py b/tests/hermes_cli/test_kanban_diagnostics.py index ad00e4136a..53fdf4fc34 100644 --- a/tests/hermes_cli/test_kanban_diagnostics.py +++ b/tests/hermes_cli/test_kanban_diagnostics.py @@ -177,10 +177,68 @@ def test_repeated_failures_escalates_to_critical(): def test_repeated_failures_below_threshold_silent(): - task = _task(consecutive_failures=2) + task = _task(consecutive_failures=1) assert kd.compute_task_diagnostics(task, [], []) == [] +def test_repeated_failures_default_matches_dispatcher_failure_limit(): + """Default dispatcher auto-blocks at 2 failures, so diagnostics must + also surface at 2 instead of waiting for the stale threshold of 3. + """ + task = _task(status="blocked", consecutive_failures=2, + last_failure_error="elapsed 600s > limit 300s") + runs = [_run(outcome="timed_out", run_id=1)] + diags = kd.compute_task_diagnostics(task, [], runs) + repeated = [d for d in diags if d.kind == "repeated_failures"] + assert len(repeated) == 1 + d = repeated[0] + assert d.data["failure_threshold"] == 2 + assert d.data["failure_limit"] == 2 + assert "default 5" not in d.detail + assert "configured for 2" in d.detail + + +def test_repeated_failures_derives_threshold_from_kanban_failure_limit(): + task = _task(status="ready", consecutive_failures=2, + last_failure_error="Profile 'debugger' does not exist") + runs = [_run(outcome="spawn_failed", run_id=1)] + assert kd.compute_task_diagnostics( + task, [], runs, config={"failure_limit": 4} + ) == [] + + task = _task(status="blocked", consecutive_failures=4, + last_failure_error="Profile 'debugger' does not exist") + diags = kd.compute_task_diagnostics( + task, [], runs, config={"failure_limit": 4} + ) + repeated = [d for d in diags if d.kind == "repeated_failures"] + assert len(repeated) == 1 + assert repeated[0].data["failure_threshold"] == 4 + assert repeated[0].data["failure_limit"] == 4 + + +def test_repeated_failures_explicit_threshold_overrides_failure_limit(): + task = _task(status="ready", consecutive_failures=3, + last_failure_error="Profile 'debugger' does not exist") + runs = [_run(outcome="spawn_failed", run_id=1)] + diags = kd.compute_task_diagnostics( + task, [], runs, config={"failure_limit": 5, "failure_threshold": 3} + ) + repeated = [d for d in diags if d.kind == "repeated_failures"] + assert len(repeated) == 1 + assert repeated[0].data["failure_threshold"] == 3 + assert repeated[0].data["failure_limit"] == 5 + + +def test_config_from_kanban_config_preserves_explicit_diagnostics_threshold(): + cfg = kd.config_from_kanban_config({ + "failure_limit": 5, + "diagnostics": {"failure_threshold": 3}, + }) + assert cfg["failure_threshold"] == 3 + assert cfg["failure_limit"] == 5 + + def test_repeated_crashes_counts_trailing_streak_only(): task = _task(status="ready", assignee="crashy") runs = [