mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-21 03:39:54 +00:00
fix(kanban): align failure diagnostics with retry limit
This commit is contained in:
+12
-1
@@ -1393,6 +1393,11 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int:
|
||||
the dashboard uses, so CLI output matches what the UI shows.
|
||||
"""
|
||||
from hermes_cli import kanban_diagnostics as kd
|
||||
from hermes_cli.config import load_config
|
||||
|
||||
diag_config = kd.config_from_kanban_config(
|
||||
(load_config().get("kanban") or {})
|
||||
)
|
||||
|
||||
with kb.connect() as conn:
|
||||
# Either one-task mode or fleet mode.
|
||||
@@ -1406,6 +1411,7 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int:
|
||||
task,
|
||||
kb.list_events(conn, args.task),
|
||||
kb.list_runs(conn, args.task),
|
||||
config=diag_config,
|
||||
)
|
||||
}
|
||||
else:
|
||||
@@ -1433,7 +1439,12 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int:
|
||||
diags_by_task = {}
|
||||
for r in rows:
|
||||
tid = r["id"]
|
||||
dl = kd.compute_task_diagnostics(r, ev_by.get(tid, []), run_by.get(tid, []))
|
||||
dl = kd.compute_task_diagnostics(
|
||||
r,
|
||||
ev_by.get(tid, []),
|
||||
run_by.get(tid, []),
|
||||
config=diag_config,
|
||||
)
|
||||
if dl:
|
||||
diags_by_task[tid] = dl
|
||||
|
||||
|
||||
@@ -230,6 +230,14 @@ def _generic_recovery_actions(task: Any, *, running: bool) -> list[DiagnosticAct
|
||||
RuleFn = Callable[[Any, list[Any], list[Any], int, dict], list[Diagnostic]]
|
||||
|
||||
|
||||
def _positive_int(value: Any, default: int) -> int:
|
||||
try:
|
||||
parsed = int(value)
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
return parsed if parsed >= 1 else default
|
||||
|
||||
|
||||
def _rule_hallucinated_cards(task, events, runs, now, cfg) -> list[Diagnostic]:
|
||||
"""Blocked-hallucination gate fires: a worker called kanban_complete
|
||||
with created_cards that didn't exist or weren't created by the
|
||||
@@ -319,18 +327,19 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
|
||||
all look the same: the kernel keeps retrying and the operator
|
||||
needs to intervene.
|
||||
|
||||
Threshold: cfg["failure_threshold"] (default 3). A threshold of 3
|
||||
is one below the circuit-breaker's default (5), so the diagnostic
|
||||
surfaces BEFORE the breaker trips — giving operators a window to
|
||||
fix the problem while the dispatcher's still retrying.
|
||||
Threshold: cfg["failure_threshold"]. Runtime callers should derive
|
||||
this from ``kanban.failure_limit`` unless the user explicitly set a
|
||||
diagnostics threshold, so the signal does not lag behind the
|
||||
dispatcher's circuit breaker.
|
||||
|
||||
Accepts the legacy ``spawn_failure_threshold`` config key for
|
||||
back-compat.
|
||||
"""
|
||||
threshold = int(cfg.get(
|
||||
threshold = _positive_int(cfg.get(
|
||||
"failure_threshold",
|
||||
cfg.get("spawn_failure_threshold", 3),
|
||||
))
|
||||
), 3)
|
||||
failure_limit = _positive_int(cfg.get("failure_limit"), threshold)
|
||||
# Read the new unified counter name, with a fallback to the legacy
|
||||
# column name so this rule keeps working against old DB rows the
|
||||
# caller somehow materialised without running the migration.
|
||||
@@ -402,10 +411,9 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
|
||||
f"This task has failed {failures} times in a row "
|
||||
f"(most recent: {outcome_label}). Full last error:\n\n"
|
||||
f"{err_snippet}\n\n"
|
||||
f"The dispatcher will keep retrying until the consecutive-"
|
||||
f"failures counter trips the circuit breaker (default 5), "
|
||||
f"at which point the task auto-blocks. Fix the root cause "
|
||||
f"and reclaim to retry."
|
||||
f"The dispatcher circuit breaker is configured for "
|
||||
f"{failure_limit} consecutive non-success attempts. Fix the "
|
||||
f"root cause and reclaim or unblock the task to retry."
|
||||
)
|
||||
else:
|
||||
title = f"Agent {outcome_label} x{failures} (no error recorded)"
|
||||
@@ -427,6 +435,8 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
|
||||
"consecutive_failures": failures,
|
||||
"most_recent_outcome": most_recent_outcome,
|
||||
"last_error": last_err,
|
||||
"failure_threshold": threshold,
|
||||
"failure_limit": failure_limit,
|
||||
},
|
||||
)]
|
||||
|
||||
@@ -716,9 +726,11 @@ DIAGNOSTIC_KINDS = (
|
||||
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"failure_threshold": 3,
|
||||
# Match the dispatcher default (kanban.failure_limit) so repeated-failure
|
||||
# diagnostics do not lag behind the default auto-block threshold.
|
||||
"failure_threshold": 2,
|
||||
# Legacy alias accepted at read time by _rule_repeated_failures.
|
||||
"spawn_failure_threshold": 3,
|
||||
"spawn_failure_threshold": 2,
|
||||
"crash_threshold": 2,
|
||||
"blocked_stale_hours": 24,
|
||||
# Stranded-task threshold. 30 min by default — below that, the
|
||||
@@ -728,6 +740,28 @@ DEFAULT_CONFIG = {
|
||||
}
|
||||
|
||||
|
||||
def config_from_kanban_config(kanban_cfg: Optional[dict]) -> dict:
|
||||
"""Build diagnostics config from the runtime ``kanban`` config section.
|
||||
|
||||
``kanban.diagnostics.failure_threshold`` remains an explicit override.
|
||||
Otherwise, derive the repeated-failure threshold from
|
||||
``kanban.failure_limit`` so CLI/dashboard diagnostics match the
|
||||
dispatcher's actual circuit-breaker threshold.
|
||||
"""
|
||||
kanban_cfg = kanban_cfg or {}
|
||||
diag_cfg = dict(kanban_cfg.get("diagnostics") or {})
|
||||
diag_cfg.setdefault(
|
||||
"failure_limit",
|
||||
kanban_cfg.get("failure_limit", DEFAULT_CONFIG["failure_threshold"]),
|
||||
)
|
||||
if (
|
||||
"failure_threshold" not in diag_cfg
|
||||
and "spawn_failure_threshold" not in diag_cfg
|
||||
):
|
||||
diag_cfg["failure_threshold"] = diag_cfg["failure_limit"]
|
||||
return diag_cfg
|
||||
|
||||
|
||||
def compute_task_diagnostics(
|
||||
task,
|
||||
events: list,
|
||||
@@ -743,7 +777,17 @@ def compute_task_diagnostics(
|
||||
most-recent ``last_seen_at``.
|
||||
"""
|
||||
now_ts = int(now if now is not None else time.time())
|
||||
cfg = {**DEFAULT_CONFIG, **(config or {})}
|
||||
config = config or {}
|
||||
cfg = {**DEFAULT_CONFIG, **config}
|
||||
if (
|
||||
"failure_threshold" not in config
|
||||
and "spawn_failure_threshold" not in config
|
||||
and "failure_limit" in config
|
||||
):
|
||||
cfg["failure_threshold"] = _positive_int(
|
||||
config.get("failure_limit"),
|
||||
DEFAULT_CONFIG["failure_threshold"],
|
||||
)
|
||||
out: list[Diagnostic] = []
|
||||
for rule in _RULES:
|
||||
try:
|
||||
|
||||
@@ -224,6 +224,11 @@ def _compute_task_diagnostics(
|
||||
rule definitions.
|
||||
"""
|
||||
from hermes_cli import kanban_diagnostics as kd
|
||||
from hermes_cli.config import load_config
|
||||
|
||||
diag_config = kd.config_from_kanban_config(
|
||||
(load_config().get("kanban") or {})
|
||||
)
|
||||
|
||||
# Build the candidate task list. We need each task's row + its
|
||||
# events + its runs. Doing N separate queries works but scales
|
||||
@@ -270,6 +275,7 @@ def _compute_task_diagnostics(
|
||||
r,
|
||||
events_by_task.get(tid, []),
|
||||
runs_by_task.get(tid, []),
|
||||
config=diag_config,
|
||||
)
|
||||
if diags:
|
||||
out[tid] = [d.to_dict() for d in diags]
|
||||
|
||||
@@ -177,10 +177,68 @@ def test_repeated_failures_escalates_to_critical():
|
||||
|
||||
|
||||
def test_repeated_failures_below_threshold_silent():
|
||||
task = _task(consecutive_failures=2)
|
||||
task = _task(consecutive_failures=1)
|
||||
assert kd.compute_task_diagnostics(task, [], []) == []
|
||||
|
||||
|
||||
def test_repeated_failures_default_matches_dispatcher_failure_limit():
|
||||
"""Default dispatcher auto-blocks at 2 failures, so diagnostics must
|
||||
also surface at 2 instead of waiting for the stale threshold of 3.
|
||||
"""
|
||||
task = _task(status="blocked", consecutive_failures=2,
|
||||
last_failure_error="elapsed 600s > limit 300s")
|
||||
runs = [_run(outcome="timed_out", run_id=1)]
|
||||
diags = kd.compute_task_diagnostics(task, [], runs)
|
||||
repeated = [d for d in diags if d.kind == "repeated_failures"]
|
||||
assert len(repeated) == 1
|
||||
d = repeated[0]
|
||||
assert d.data["failure_threshold"] == 2
|
||||
assert d.data["failure_limit"] == 2
|
||||
assert "default 5" not in d.detail
|
||||
assert "configured for 2" in d.detail
|
||||
|
||||
|
||||
def test_repeated_failures_derives_threshold_from_kanban_failure_limit():
|
||||
task = _task(status="ready", consecutive_failures=2,
|
||||
last_failure_error="Profile 'debugger' does not exist")
|
||||
runs = [_run(outcome="spawn_failed", run_id=1)]
|
||||
assert kd.compute_task_diagnostics(
|
||||
task, [], runs, config={"failure_limit": 4}
|
||||
) == []
|
||||
|
||||
task = _task(status="blocked", consecutive_failures=4,
|
||||
last_failure_error="Profile 'debugger' does not exist")
|
||||
diags = kd.compute_task_diagnostics(
|
||||
task, [], runs, config={"failure_limit": 4}
|
||||
)
|
||||
repeated = [d for d in diags if d.kind == "repeated_failures"]
|
||||
assert len(repeated) == 1
|
||||
assert repeated[0].data["failure_threshold"] == 4
|
||||
assert repeated[0].data["failure_limit"] == 4
|
||||
|
||||
|
||||
def test_repeated_failures_explicit_threshold_overrides_failure_limit():
|
||||
task = _task(status="ready", consecutive_failures=3,
|
||||
last_failure_error="Profile 'debugger' does not exist")
|
||||
runs = [_run(outcome="spawn_failed", run_id=1)]
|
||||
diags = kd.compute_task_diagnostics(
|
||||
task, [], runs, config={"failure_limit": 5, "failure_threshold": 3}
|
||||
)
|
||||
repeated = [d for d in diags if d.kind == "repeated_failures"]
|
||||
assert len(repeated) == 1
|
||||
assert repeated[0].data["failure_threshold"] == 3
|
||||
assert repeated[0].data["failure_limit"] == 5
|
||||
|
||||
|
||||
def test_config_from_kanban_config_preserves_explicit_diagnostics_threshold():
|
||||
cfg = kd.config_from_kanban_config({
|
||||
"failure_limit": 5,
|
||||
"diagnostics": {"failure_threshold": 3},
|
||||
})
|
||||
assert cfg["failure_threshold"] == 3
|
||||
assert cfg["failure_limit"] == 5
|
||||
|
||||
|
||||
def test_repeated_crashes_counts_trailing_streak_only():
|
||||
task = _task(status="ready", assignee="crashy")
|
||||
runs = [
|
||||
|
||||
Reference in New Issue
Block a user