mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-21 03:39:54 +00:00
fix(kanban): align failure diagnostics with retry limit
This commit is contained in:
+12
-1
@@ -1393,6 +1393,11 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int:
|
|||||||
the dashboard uses, so CLI output matches what the UI shows.
|
the dashboard uses, so CLI output matches what the UI shows.
|
||||||
"""
|
"""
|
||||||
from hermes_cli import kanban_diagnostics as kd
|
from hermes_cli import kanban_diagnostics as kd
|
||||||
|
from hermes_cli.config import load_config
|
||||||
|
|
||||||
|
diag_config = kd.config_from_kanban_config(
|
||||||
|
(load_config().get("kanban") or {})
|
||||||
|
)
|
||||||
|
|
||||||
with kb.connect() as conn:
|
with kb.connect() as conn:
|
||||||
# Either one-task mode or fleet mode.
|
# Either one-task mode or fleet mode.
|
||||||
@@ -1406,6 +1411,7 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int:
|
|||||||
task,
|
task,
|
||||||
kb.list_events(conn, args.task),
|
kb.list_events(conn, args.task),
|
||||||
kb.list_runs(conn, args.task),
|
kb.list_runs(conn, args.task),
|
||||||
|
config=diag_config,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
@@ -1433,7 +1439,12 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int:
|
|||||||
diags_by_task = {}
|
diags_by_task = {}
|
||||||
for r in rows:
|
for r in rows:
|
||||||
tid = r["id"]
|
tid = r["id"]
|
||||||
dl = kd.compute_task_diagnostics(r, ev_by.get(tid, []), run_by.get(tid, []))
|
dl = kd.compute_task_diagnostics(
|
||||||
|
r,
|
||||||
|
ev_by.get(tid, []),
|
||||||
|
run_by.get(tid, []),
|
||||||
|
config=diag_config,
|
||||||
|
)
|
||||||
if dl:
|
if dl:
|
||||||
diags_by_task[tid] = dl
|
diags_by_task[tid] = dl
|
||||||
|
|
||||||
|
|||||||
@@ -230,6 +230,14 @@ def _generic_recovery_actions(task: Any, *, running: bool) -> list[DiagnosticAct
|
|||||||
RuleFn = Callable[[Any, list[Any], list[Any], int, dict], list[Diagnostic]]
|
RuleFn = Callable[[Any, list[Any], list[Any], int, dict], list[Diagnostic]]
|
||||||
|
|
||||||
|
|
||||||
|
def _positive_int(value: Any, default: int) -> int:
|
||||||
|
try:
|
||||||
|
parsed = int(value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return default
|
||||||
|
return parsed if parsed >= 1 else default
|
||||||
|
|
||||||
|
|
||||||
def _rule_hallucinated_cards(task, events, runs, now, cfg) -> list[Diagnostic]:
|
def _rule_hallucinated_cards(task, events, runs, now, cfg) -> list[Diagnostic]:
|
||||||
"""Blocked-hallucination gate fires: a worker called kanban_complete
|
"""Blocked-hallucination gate fires: a worker called kanban_complete
|
||||||
with created_cards that didn't exist or weren't created by the
|
with created_cards that didn't exist or weren't created by the
|
||||||
@@ -319,18 +327,19 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
|
|||||||
all look the same: the kernel keeps retrying and the operator
|
all look the same: the kernel keeps retrying and the operator
|
||||||
needs to intervene.
|
needs to intervene.
|
||||||
|
|
||||||
Threshold: cfg["failure_threshold"] (default 3). A threshold of 3
|
Threshold: cfg["failure_threshold"]. Runtime callers should derive
|
||||||
is one below the circuit-breaker's default (5), so the diagnostic
|
this from ``kanban.failure_limit`` unless the user explicitly set a
|
||||||
surfaces BEFORE the breaker trips — giving operators a window to
|
diagnostics threshold, so the signal does not lag behind the
|
||||||
fix the problem while the dispatcher's still retrying.
|
dispatcher's circuit breaker.
|
||||||
|
|
||||||
Accepts the legacy ``spawn_failure_threshold`` config key for
|
Accepts the legacy ``spawn_failure_threshold`` config key for
|
||||||
back-compat.
|
back-compat.
|
||||||
"""
|
"""
|
||||||
threshold = int(cfg.get(
|
threshold = _positive_int(cfg.get(
|
||||||
"failure_threshold",
|
"failure_threshold",
|
||||||
cfg.get("spawn_failure_threshold", 3),
|
cfg.get("spawn_failure_threshold", 3),
|
||||||
))
|
), 3)
|
||||||
|
failure_limit = _positive_int(cfg.get("failure_limit"), threshold)
|
||||||
# Read the new unified counter name, with a fallback to the legacy
|
# Read the new unified counter name, with a fallback to the legacy
|
||||||
# column name so this rule keeps working against old DB rows the
|
# column name so this rule keeps working against old DB rows the
|
||||||
# caller somehow materialised without running the migration.
|
# caller somehow materialised without running the migration.
|
||||||
@@ -402,10 +411,9 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
|
|||||||
f"This task has failed {failures} times in a row "
|
f"This task has failed {failures} times in a row "
|
||||||
f"(most recent: {outcome_label}). Full last error:\n\n"
|
f"(most recent: {outcome_label}). Full last error:\n\n"
|
||||||
f"{err_snippet}\n\n"
|
f"{err_snippet}\n\n"
|
||||||
f"The dispatcher will keep retrying until the consecutive-"
|
f"The dispatcher circuit breaker is configured for "
|
||||||
f"failures counter trips the circuit breaker (default 5), "
|
f"{failure_limit} consecutive non-success attempts. Fix the "
|
||||||
f"at which point the task auto-blocks. Fix the root cause "
|
f"root cause and reclaim or unblock the task to retry."
|
||||||
f"and reclaim to retry."
|
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
title = f"Agent {outcome_label} x{failures} (no error recorded)"
|
title = f"Agent {outcome_label} x{failures} (no error recorded)"
|
||||||
@@ -427,6 +435,8 @@ def _rule_repeated_failures(task, events, runs, now, cfg) -> list[Diagnostic]:
|
|||||||
"consecutive_failures": failures,
|
"consecutive_failures": failures,
|
||||||
"most_recent_outcome": most_recent_outcome,
|
"most_recent_outcome": most_recent_outcome,
|
||||||
"last_error": last_err,
|
"last_error": last_err,
|
||||||
|
"failure_threshold": threshold,
|
||||||
|
"failure_limit": failure_limit,
|
||||||
},
|
},
|
||||||
)]
|
)]
|
||||||
|
|
||||||
@@ -716,9 +726,11 @@ DIAGNOSTIC_KINDS = (
|
|||||||
|
|
||||||
|
|
||||||
DEFAULT_CONFIG = {
|
DEFAULT_CONFIG = {
|
||||||
"failure_threshold": 3,
|
# Match the dispatcher default (kanban.failure_limit) so repeated-failure
|
||||||
|
# diagnostics do not lag behind the default auto-block threshold.
|
||||||
|
"failure_threshold": 2,
|
||||||
# Legacy alias accepted at read time by _rule_repeated_failures.
|
# Legacy alias accepted at read time by _rule_repeated_failures.
|
||||||
"spawn_failure_threshold": 3,
|
"spawn_failure_threshold": 2,
|
||||||
"crash_threshold": 2,
|
"crash_threshold": 2,
|
||||||
"blocked_stale_hours": 24,
|
"blocked_stale_hours": 24,
|
||||||
# Stranded-task threshold. 30 min by default — below that, the
|
# Stranded-task threshold. 30 min by default — below that, the
|
||||||
@@ -728,6 +740,28 @@ DEFAULT_CONFIG = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def config_from_kanban_config(kanban_cfg: Optional[dict]) -> dict:
|
||||||
|
"""Build diagnostics config from the runtime ``kanban`` config section.
|
||||||
|
|
||||||
|
``kanban.diagnostics.failure_threshold`` remains an explicit override.
|
||||||
|
Otherwise, derive the repeated-failure threshold from
|
||||||
|
``kanban.failure_limit`` so CLI/dashboard diagnostics match the
|
||||||
|
dispatcher's actual circuit-breaker threshold.
|
||||||
|
"""
|
||||||
|
kanban_cfg = kanban_cfg or {}
|
||||||
|
diag_cfg = dict(kanban_cfg.get("diagnostics") or {})
|
||||||
|
diag_cfg.setdefault(
|
||||||
|
"failure_limit",
|
||||||
|
kanban_cfg.get("failure_limit", DEFAULT_CONFIG["failure_threshold"]),
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
"failure_threshold" not in diag_cfg
|
||||||
|
and "spawn_failure_threshold" not in diag_cfg
|
||||||
|
):
|
||||||
|
diag_cfg["failure_threshold"] = diag_cfg["failure_limit"]
|
||||||
|
return diag_cfg
|
||||||
|
|
||||||
|
|
||||||
def compute_task_diagnostics(
|
def compute_task_diagnostics(
|
||||||
task,
|
task,
|
||||||
events: list,
|
events: list,
|
||||||
@@ -743,7 +777,17 @@ def compute_task_diagnostics(
|
|||||||
most-recent ``last_seen_at``.
|
most-recent ``last_seen_at``.
|
||||||
"""
|
"""
|
||||||
now_ts = int(now if now is not None else time.time())
|
now_ts = int(now if now is not None else time.time())
|
||||||
cfg = {**DEFAULT_CONFIG, **(config or {})}
|
config = config or {}
|
||||||
|
cfg = {**DEFAULT_CONFIG, **config}
|
||||||
|
if (
|
||||||
|
"failure_threshold" not in config
|
||||||
|
and "spawn_failure_threshold" not in config
|
||||||
|
and "failure_limit" in config
|
||||||
|
):
|
||||||
|
cfg["failure_threshold"] = _positive_int(
|
||||||
|
config.get("failure_limit"),
|
||||||
|
DEFAULT_CONFIG["failure_threshold"],
|
||||||
|
)
|
||||||
out: list[Diagnostic] = []
|
out: list[Diagnostic] = []
|
||||||
for rule in _RULES:
|
for rule in _RULES:
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -224,6 +224,11 @@ def _compute_task_diagnostics(
|
|||||||
rule definitions.
|
rule definitions.
|
||||||
"""
|
"""
|
||||||
from hermes_cli import kanban_diagnostics as kd
|
from hermes_cli import kanban_diagnostics as kd
|
||||||
|
from hermes_cli.config import load_config
|
||||||
|
|
||||||
|
diag_config = kd.config_from_kanban_config(
|
||||||
|
(load_config().get("kanban") or {})
|
||||||
|
)
|
||||||
|
|
||||||
# Build the candidate task list. We need each task's row + its
|
# Build the candidate task list. We need each task's row + its
|
||||||
# events + its runs. Doing N separate queries works but scales
|
# events + its runs. Doing N separate queries works but scales
|
||||||
@@ -270,6 +275,7 @@ def _compute_task_diagnostics(
|
|||||||
r,
|
r,
|
||||||
events_by_task.get(tid, []),
|
events_by_task.get(tid, []),
|
||||||
runs_by_task.get(tid, []),
|
runs_by_task.get(tid, []),
|
||||||
|
config=diag_config,
|
||||||
)
|
)
|
||||||
if diags:
|
if diags:
|
||||||
out[tid] = [d.to_dict() for d in diags]
|
out[tid] = [d.to_dict() for d in diags]
|
||||||
|
|||||||
@@ -177,10 +177,68 @@ def test_repeated_failures_escalates_to_critical():
|
|||||||
|
|
||||||
|
|
||||||
def test_repeated_failures_below_threshold_silent():
|
def test_repeated_failures_below_threshold_silent():
|
||||||
task = _task(consecutive_failures=2)
|
task = _task(consecutive_failures=1)
|
||||||
assert kd.compute_task_diagnostics(task, [], []) == []
|
assert kd.compute_task_diagnostics(task, [], []) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_repeated_failures_default_matches_dispatcher_failure_limit():
|
||||||
|
"""Default dispatcher auto-blocks at 2 failures, so diagnostics must
|
||||||
|
also surface at 2 instead of waiting for the stale threshold of 3.
|
||||||
|
"""
|
||||||
|
task = _task(status="blocked", consecutive_failures=2,
|
||||||
|
last_failure_error="elapsed 600s > limit 300s")
|
||||||
|
runs = [_run(outcome="timed_out", run_id=1)]
|
||||||
|
diags = kd.compute_task_diagnostics(task, [], runs)
|
||||||
|
repeated = [d for d in diags if d.kind == "repeated_failures"]
|
||||||
|
assert len(repeated) == 1
|
||||||
|
d = repeated[0]
|
||||||
|
assert d.data["failure_threshold"] == 2
|
||||||
|
assert d.data["failure_limit"] == 2
|
||||||
|
assert "default 5" not in d.detail
|
||||||
|
assert "configured for 2" in d.detail
|
||||||
|
|
||||||
|
|
||||||
|
def test_repeated_failures_derives_threshold_from_kanban_failure_limit():
|
||||||
|
task = _task(status="ready", consecutive_failures=2,
|
||||||
|
last_failure_error="Profile 'debugger' does not exist")
|
||||||
|
runs = [_run(outcome="spawn_failed", run_id=1)]
|
||||||
|
assert kd.compute_task_diagnostics(
|
||||||
|
task, [], runs, config={"failure_limit": 4}
|
||||||
|
) == []
|
||||||
|
|
||||||
|
task = _task(status="blocked", consecutive_failures=4,
|
||||||
|
last_failure_error="Profile 'debugger' does not exist")
|
||||||
|
diags = kd.compute_task_diagnostics(
|
||||||
|
task, [], runs, config={"failure_limit": 4}
|
||||||
|
)
|
||||||
|
repeated = [d for d in diags if d.kind == "repeated_failures"]
|
||||||
|
assert len(repeated) == 1
|
||||||
|
assert repeated[0].data["failure_threshold"] == 4
|
||||||
|
assert repeated[0].data["failure_limit"] == 4
|
||||||
|
|
||||||
|
|
||||||
|
def test_repeated_failures_explicit_threshold_overrides_failure_limit():
|
||||||
|
task = _task(status="ready", consecutive_failures=3,
|
||||||
|
last_failure_error="Profile 'debugger' does not exist")
|
||||||
|
runs = [_run(outcome="spawn_failed", run_id=1)]
|
||||||
|
diags = kd.compute_task_diagnostics(
|
||||||
|
task, [], runs, config={"failure_limit": 5, "failure_threshold": 3}
|
||||||
|
)
|
||||||
|
repeated = [d for d in diags if d.kind == "repeated_failures"]
|
||||||
|
assert len(repeated) == 1
|
||||||
|
assert repeated[0].data["failure_threshold"] == 3
|
||||||
|
assert repeated[0].data["failure_limit"] == 5
|
||||||
|
|
||||||
|
|
||||||
|
def test_config_from_kanban_config_preserves_explicit_diagnostics_threshold():
|
||||||
|
cfg = kd.config_from_kanban_config({
|
||||||
|
"failure_limit": 5,
|
||||||
|
"diagnostics": {"failure_threshold": 3},
|
||||||
|
})
|
||||||
|
assert cfg["failure_threshold"] == 3
|
||||||
|
assert cfg["failure_limit"] == 5
|
||||||
|
|
||||||
|
|
||||||
def test_repeated_crashes_counts_trailing_streak_only():
|
def test_repeated_crashes_counts_trailing_streak_only():
|
||||||
task = _task(status="ready", assignee="crashy")
|
task = _task(status="ready", assignee="crashy")
|
||||||
runs = [
|
runs = [
|
||||||
|
|||||||
Reference in New Issue
Block a user