fix(kanban): align failure diagnostics with retry limit

This commit is contained in:
qWaitCrypto
2026-05-14 17:07:57 +08:00
committed by Teknium
parent 6e60a8a092
commit d9fef0c8ab
4 changed files with 134 additions and 15 deletions
+12 -1
View File
@@ -1393,6 +1393,11 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int:
the dashboard uses, so CLI output matches what the UI shows. the dashboard uses, so CLI output matches what the UI shows.
""" """
from hermes_cli import kanban_diagnostics as kd from hermes_cli import kanban_diagnostics as kd
from hermes_cli.config import load_config
diag_config = kd.config_from_kanban_config(
(load_config().get("kanban") or {})
)
with kb.connect() as conn: with kb.connect() as conn:
# Either one-task mode or fleet mode. # Either one-task mode or fleet mode.
@@ -1406,6 +1411,7 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int:
task, task,
kb.list_events(conn, args.task), kb.list_events(conn, args.task),
kb.list_runs(conn, args.task), kb.list_runs(conn, args.task),
config=diag_config,
) )
} }
else: else:
@@ -1433,7 +1439,12 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int:
diags_by_task = {} diags_by_task = {}
for r in rows: for r in rows:
tid = r["id"] tid = r["id"]
dl = kd.compute_task_diagnostics(r, ev_by.get(tid, []), run_by.get(tid, [])) dl = kd.compute_task_diagnostics(
r,
ev_by.get(tid, []),
run_by.get(tid, []),
config=diag_config,
)
if dl: if dl:
diags_by_task[tid] = dl diags_by_task[tid] = dl
+57 -13
View File
@@ -230,6 +230,14 @@ def _generic_recovery_actions(task: Any, *, running: bool) -> list[DiagnosticAct
RuleFn = Callable[[Any, list[Any], list[Any], int, dict], list[Diagnostic]] RuleFn = Callable[[Any, list[Any], list[Any], int, dict], list[Diagnostic]]
def _positive_int(value: Any, default: int) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
return default
return parsed if parsed >= 1 else default
def _rule_hallucinated_cards(task, events, runs, now, cfg) -> list[Diagnostic]: def _rule_hallucinated_cards(task, events, runs, now, cfg) -> list[Diagnostic]:
"""Blocked-hallucination gate fires: a worker called kanban_complete """Blocked-hallucination gate fires: a worker called kanban_complete
with created_cards that didn't exist or weren't created by the with created_cards that didn't exist or weren't created by the
@@ -319,18 +327,19 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
all look the same: the kernel keeps retrying and the operator all look the same: the kernel keeps retrying and the operator
needs to intervene. needs to intervene.
Threshold: cfg["failure_threshold"] (default 3). A threshold of 3 Threshold: cfg["failure_threshold"]. Runtime callers should derive
is one below the circuit-breaker's default (5), so the diagnostic this from ``kanban.failure_limit`` unless the user explicitly set a
surfaces BEFORE the breaker trips giving operators a window to diagnostics threshold, so the signal does not lag behind the
fix the problem while the dispatcher's still retrying. dispatcher's circuit breaker.
Accepts the legacy ``spawn_failure_threshold`` config key for Accepts the legacy ``spawn_failure_threshold`` config key for
back-compat. back-compat.
""" """
threshold = int(cfg.get( threshold = _positive_int(cfg.get(
"failure_threshold", "failure_threshold",
cfg.get("spawn_failure_threshold", 3), cfg.get("spawn_failure_threshold", 3),
)) ), 3)
failure_limit = _positive_int(cfg.get("failure_limit"), threshold)
# Read the new unified counter name, with a fallback to the legacy # Read the new unified counter name, with a fallback to the legacy
# column name so this rule keeps working against old DB rows the # column name so this rule keeps working against old DB rows the
# caller somehow materialised without running the migration. # caller somehow materialised without running the migration.
@@ -402,10 +411,9 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
f"This task has failed {failures} times in a row " f"This task has failed {failures} times in a row "
f"(most recent: {outcome_label}). Full last error:\n\n" f"(most recent: {outcome_label}). Full last error:\n\n"
f"{err_snippet}\n\n" f"{err_snippet}\n\n"
f"The dispatcher will keep retrying until the consecutive-" f"The dispatcher circuit breaker is configured for "
f"failures counter trips the circuit breaker (default 5), " f"{failure_limit} consecutive non-success attempts. Fix the "
f"at which point the task auto-blocks. Fix the root cause " f"root cause and reclaim or unblock the task to retry."
f"and reclaim to retry."
) )
else: else:
title = f"Agent {outcome_label} x{failures} (no error recorded)" title = f"Agent {outcome_label} x{failures} (no error recorded)"
@@ -427,6 +435,8 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
"consecutive_failures": failures, "consecutive_failures": failures,
"most_recent_outcome": most_recent_outcome, "most_recent_outcome": most_recent_outcome,
"last_error": last_err, "last_error": last_err,
"failure_threshold": threshold,
"failure_limit": failure_limit,
}, },
)] )]
@@ -716,9 +726,11 @@ DIAGNOSTIC_KINDS = (
DEFAULT_CONFIG = { DEFAULT_CONFIG = {
"failure_threshold": 3, # Match the dispatcher default (kanban.failure_limit) so repeated-failure
# diagnostics do not lag behind the default auto-block threshold.
"failure_threshold": 2,
# Legacy alias accepted at read time by _rule_repeated_failures. # Legacy alias accepted at read time by _rule_repeated_failures.
"spawn_failure_threshold": 3, "spawn_failure_threshold": 2,
"crash_threshold": 2, "crash_threshold": 2,
"blocked_stale_hours": 24, "blocked_stale_hours": 24,
# Stranded-task threshold. 30 min by default — below that, the # Stranded-task threshold. 30 min by default — below that, the
@@ -728,6 +740,28 @@ DEFAULT_CONFIG = {
} }
def config_from_kanban_config(kanban_cfg: Optional[dict]) -> dict:
"""Build diagnostics config from the runtime ``kanban`` config section.
``kanban.diagnostics.failure_threshold`` remains an explicit override.
Otherwise, derive the repeated-failure threshold from
``kanban.failure_limit`` so CLI/dashboard diagnostics match the
dispatcher's actual circuit-breaker threshold.
"""
kanban_cfg = kanban_cfg or {}
diag_cfg = dict(kanban_cfg.get("diagnostics") or {})
diag_cfg.setdefault(
"failure_limit",
kanban_cfg.get("failure_limit", DEFAULT_CONFIG["failure_threshold"]),
)
if (
"failure_threshold" not in diag_cfg
and "spawn_failure_threshold" not in diag_cfg
):
diag_cfg["failure_threshold"] = diag_cfg["failure_limit"]
return diag_cfg
def compute_task_diagnostics( def compute_task_diagnostics(
task, task,
events: list, events: list,
@@ -743,7 +777,17 @@ def compute_task_diagnostics(
most-recent ``last_seen_at``. most-recent ``last_seen_at``.
""" """
now_ts = int(now if now is not None else time.time()) now_ts = int(now if now is not None else time.time())
cfg = {**DEFAULT_CONFIG, **(config or {})} config = config or {}
cfg = {**DEFAULT_CONFIG, **config}
if (
"failure_threshold" not in config
and "spawn_failure_threshold" not in config
and "failure_limit" in config
):
cfg["failure_threshold"] = _positive_int(
config.get("failure_limit"),
DEFAULT_CONFIG["failure_threshold"],
)
out: list[Diagnostic] = [] out: list[Diagnostic] = []
for rule in _RULES: for rule in _RULES:
try: try:
+6
View File
@@ -224,6 +224,11 @@ def _compute_task_diagnostics(
rule definitions. rule definitions.
""" """
from hermes_cli import kanban_diagnostics as kd from hermes_cli import kanban_diagnostics as kd
from hermes_cli.config import load_config
diag_config = kd.config_from_kanban_config(
(load_config().get("kanban") or {})
)
# Build the candidate task list. We need each task's row + its # Build the candidate task list. We need each task's row + its
# events + its runs. Doing N separate queries works but scales # events + its runs. Doing N separate queries works but scales
@@ -270,6 +275,7 @@ def _compute_task_diagnostics(
r, r,
events_by_task.get(tid, []), events_by_task.get(tid, []),
runs_by_task.get(tid, []), runs_by_task.get(tid, []),
config=diag_config,
) )
if diags: if diags:
out[tid] = [d.to_dict() for d in diags] out[tid] = [d.to_dict() for d in diags]
+59 -1
View File
@@ -177,10 +177,68 @@ def test_repeated_failures_escalates_to_critical():
def test_repeated_failures_below_threshold_silent(): def test_repeated_failures_below_threshold_silent():
task = _task(consecutive_failures=2) task = _task(consecutive_failures=1)
assert kd.compute_task_diagnostics(task, [], []) == [] assert kd.compute_task_diagnostics(task, [], []) == []
def test_repeated_failures_default_matches_dispatcher_failure_limit():
"""Default dispatcher auto-blocks at 2 failures, so diagnostics must
also surface at 2 instead of waiting for the stale threshold of 3.
"""
task = _task(status="blocked", consecutive_failures=2,
last_failure_error="elapsed 600s > limit 300s")
runs = [_run(outcome="timed_out", run_id=1)]
diags = kd.compute_task_diagnostics(task, [], runs)
repeated = [d for d in diags if d.kind == "repeated_failures"]
assert len(repeated) == 1
d = repeated[0]
assert d.data["failure_threshold"] == 2
assert d.data["failure_limit"] == 2
assert "default 5" not in d.detail
assert "configured for 2" in d.detail
def test_repeated_failures_derives_threshold_from_kanban_failure_limit():
task = _task(status="ready", consecutive_failures=2,
last_failure_error="Profile 'debugger' does not exist")
runs = [_run(outcome="spawn_failed", run_id=1)]
assert kd.compute_task_diagnostics(
task, [], runs, config={"failure_limit": 4}
) == []
task = _task(status="blocked", consecutive_failures=4,
last_failure_error="Profile 'debugger' does not exist")
diags = kd.compute_task_diagnostics(
task, [], runs, config={"failure_limit": 4}
)
repeated = [d for d in diags if d.kind == "repeated_failures"]
assert len(repeated) == 1
assert repeated[0].data["failure_threshold"] == 4
assert repeated[0].data["failure_limit"] == 4
def test_repeated_failures_explicit_threshold_overrides_failure_limit():
task = _task(status="ready", consecutive_failures=3,
last_failure_error="Profile 'debugger' does not exist")
runs = [_run(outcome="spawn_failed", run_id=1)]
diags = kd.compute_task_diagnostics(
task, [], runs, config={"failure_limit": 5, "failure_threshold": 3}
)
repeated = [d for d in diags if d.kind == "repeated_failures"]
assert len(repeated) == 1
assert repeated[0].data["failure_threshold"] == 3
assert repeated[0].data["failure_limit"] == 5
def test_config_from_kanban_config_preserves_explicit_diagnostics_threshold():
cfg = kd.config_from_kanban_config({
"failure_limit": 5,
"diagnostics": {"failure_threshold": 3},
})
assert cfg["failure_threshold"] == 3
assert cfg["failure_limit"] == 5
def test_repeated_crashes_counts_trailing_streak_only(): def test_repeated_crashes_counts_trailing_streak_only():
task = _task(status="ready", assignee="crashy") task = _task(status="ready", assignee="crashy")
runs = [ runs = [