mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-21 03:39:54 +00:00
feat(kanban): wire dispatcher to dispatch review agents from review column
Salvages #23772 by @thewillhuang. Adds 'review' as a valid kanban task status and extends dispatch_once to monitor the review column as a second dispatch source (in addition to the existing ready column). - Adds 'review' to VALID_STATUSES - Adds claim_review_task() — atomically transitions review → running - Adds has_spawnable_review() — health telemetry mirror - Extends dispatch_once with a review column dispatch loop - Review agents get 'sdlc-review' skill auto-loaded Resolved 2 conflicts (VALID_STATUSES merge with main's 'scheduled' state, test file additions). Adapted claim_review_task to main's ttl_seconds: Optional[int] = None convention (matches claim_task).
This commit is contained in:
@@ -4954,6 +4954,8 @@ class GatewayRunner:
|
||||
conn = _kb.connect(board=slug)
|
||||
if _kb.has_spawnable_ready(conn):
|
||||
return True
|
||||
if _kb.has_spawnable_review(conn):
|
||||
return True
|
||||
except Exception:
|
||||
continue
|
||||
finally:
|
||||
|
||||
+175
-1
@@ -94,7 +94,7 @@ _log = logging.getLogger(__name__)
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
VALID_STATUSES = {"triage", "todo", "scheduled", "ready", "running", "blocked", "done", "archived"}
|
||||
VALID_STATUSES = {"triage", "todo", "scheduled", "ready", "running", "blocked", "review", "done", "archived"}
|
||||
VALID_INITIAL_STATUSES = {"running", "blocked"}
|
||||
VALID_WORKSPACE_KINDS = {"scratch", "worktree", "dir"}
|
||||
KNOWN_TOOLSET_NAMES = frozenset(name.casefold() for name in get_toolset_names())
|
||||
@@ -2132,6 +2132,81 @@ def claim_task(
|
||||
return get_task(conn, task_id)
|
||||
|
||||
|
||||
def claim_review_task(
|
||||
conn: sqlite3.Connection,
|
||||
task_id: str,
|
||||
*,
|
||||
ttl_seconds: Optional[int] = None,
|
||||
claimer: Optional[str] = None,
|
||||
) -> Optional[Task]:
|
||||
"""Atomically transition ``review -> running``.
|
||||
|
||||
Returns the claimed ``Task`` on success, ``None`` if the task was
|
||||
already claimed (or is not in ``review`` status).
|
||||
|
||||
Unlike ``claim_task`` (which handles ``ready -> running``), this
|
||||
does NOT check parent dependencies — the task already passed that
|
||||
gate on its original ``todo -> ready -> running`` transition.
|
||||
|
||||
Creates a new run entry so the review agent's lifecycle is tracked
|
||||
independently from the original worker run.
|
||||
"""
|
||||
now = int(time.time())
|
||||
lock = claimer or _claimer_id()
|
||||
expires = now + _resolve_claim_ttl_seconds(ttl_seconds)
|
||||
with write_txn(conn):
|
||||
cur = conn.execute(
|
||||
"""
|
||||
UPDATE tasks
|
||||
SET status = 'running',
|
||||
claim_lock = ?,
|
||||
claim_expires = ?,
|
||||
started_at = COALESCE(started_at, ?)
|
||||
WHERE id = ?
|
||||
AND status = 'review'
|
||||
AND claim_lock IS NULL
|
||||
""",
|
||||
(lock, expires, now, task_id),
|
||||
)
|
||||
if cur.rowcount != 1:
|
||||
return None
|
||||
trow = conn.execute(
|
||||
"SELECT assignee, max_runtime_seconds, current_step_key "
|
||||
"FROM tasks WHERE id = ?",
|
||||
(task_id,),
|
||||
).fetchone()
|
||||
run_cur = conn.execute(
|
||||
"""
|
||||
INSERT INTO task_runs (
|
||||
task_id, profile, step_key, status,
|
||||
claim_lock, claim_expires, max_runtime_seconds,
|
||||
started_at
|
||||
) VALUES (?, ?, ?, 'running', ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
task_id,
|
||||
trow["assignee"] if trow else None,
|
||||
trow["current_step_key"] if trow else None,
|
||||
lock,
|
||||
expires,
|
||||
trow["max_runtime_seconds"] if trow else None,
|
||||
now,
|
||||
),
|
||||
)
|
||||
run_id = run_cur.lastrowid
|
||||
conn.execute(
|
||||
"UPDATE tasks SET current_run_id = ? WHERE id = ?",
|
||||
(run_id, task_id),
|
||||
)
|
||||
_append_event(
|
||||
conn, task_id, "claimed",
|
||||
{"lock": lock, "expires": expires, "run_id": run_id,
|
||||
"source_status": "review"},
|
||||
run_id=run_id,
|
||||
)
|
||||
return get_task(conn, task_id)
|
||||
|
||||
|
||||
def heartbeat_claim(
|
||||
conn: sqlite3.Connection,
|
||||
task_id: str,
|
||||
@@ -4165,6 +4240,31 @@ def has_spawnable_ready(conn: sqlite3.Connection) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def has_spawnable_review(conn: sqlite3.Connection) -> bool:
|
||||
"""Return True iff there is at least one review+assigned+unclaimed task
|
||||
whose assignee maps to a real Hermes profile.
|
||||
|
||||
Mirror of :func:`has_spawnable_ready` for the review column —
|
||||
used by the health telemetry to decide whether the dispatcher
|
||||
should have spawned a review agent.
|
||||
"""
|
||||
rows = conn.execute(
|
||||
"SELECT DISTINCT assignee FROM tasks "
|
||||
"WHERE status = 'review' AND assignee IS NOT NULL "
|
||||
" AND claim_lock IS NULL"
|
||||
).fetchall()
|
||||
if not rows:
|
||||
return False
|
||||
try:
|
||||
from hermes_cli.profiles import profile_exists # local import: avoids cycle
|
||||
except Exception:
|
||||
return True
|
||||
for row in rows:
|
||||
if profile_exists(row["assignee"]):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def dispatch_once(
|
||||
conn: sqlite3.Connection,
|
||||
*,
|
||||
@@ -4364,6 +4464,80 @@ def dispatch_once(
|
||||
)
|
||||
if auto:
|
||||
result.auto_blocked.append(claimed.id)
|
||||
|
||||
# ---- review column dispatch ----
|
||||
# Review tasks are tasks that a worker moved to 'review' after
|
||||
# creating a PR. The dispatcher spawns a review agent (loading
|
||||
# sdlc-review skill) that verifies the PR and either merges (→ done)
|
||||
# or rejects (→ back to running for the worker to fix).
|
||||
#
|
||||
# Same concurrency model as ready dispatch: review spawns count
|
||||
# against max_spawn alongside ready tasks, so the total number of
|
||||
# running workers stays bounded.
|
||||
review_rows = conn.execute(
|
||||
"SELECT id, assignee FROM tasks "
|
||||
"WHERE status = 'review' AND claim_lock IS NULL "
|
||||
"ORDER BY priority DESC, created_at ASC"
|
||||
).fetchall()
|
||||
for row in review_rows:
|
||||
if max_spawn is not None and running_count + spawned >= max_spawn:
|
||||
break
|
||||
if not row["assignee"]:
|
||||
result.skipped_unassigned.append(row["id"])
|
||||
continue
|
||||
try:
|
||||
from hermes_cli.profiles import profile_exists
|
||||
except Exception:
|
||||
profile_exists = None # type: ignore[assignment]
|
||||
if profile_exists is not None and not profile_exists(row["assignee"]):
|
||||
result.skipped_nonspawnable.append(row["id"])
|
||||
continue
|
||||
if dry_run:
|
||||
result.spawned.append((row["id"], row["assignee"], ""))
|
||||
continue
|
||||
claimed = claim_review_task(conn, row["id"], ttl_seconds=ttl_seconds)
|
||||
if claimed is None:
|
||||
continue
|
||||
try:
|
||||
workspace = resolve_workspace(claimed, board=board)
|
||||
except Exception as exc:
|
||||
auto = _record_spawn_failure(
|
||||
conn, claimed.id, f"workspace: {exc}",
|
||||
failure_limit=failure_limit,
|
||||
)
|
||||
if auto:
|
||||
result.auto_blocked.append(claimed.id)
|
||||
continue
|
||||
# Persist the resolved workspace path so the worker can cd there.
|
||||
set_workspace_path(conn, claimed.id, str(workspace))
|
||||
# Force-load sdlc-review skill for review agents. The
|
||||
# _default_spawn function already auto-loads kanban-worker, and
|
||||
# appends task.skills via --skills. Setting task.skills here
|
||||
# means the review agent gets both kanban-worker (lifecycle)
|
||||
# and sdlc-review (review logic: AC verification, merge, etc.).
|
||||
claimed.skills = ["sdlc-review"]
|
||||
_spawn = spawn_fn if spawn_fn is not None else _default_spawn
|
||||
try:
|
||||
import inspect
|
||||
try:
|
||||
sig = inspect.signature(_spawn)
|
||||
if "board" in sig.parameters:
|
||||
pid = _spawn(claimed, str(workspace), board=board)
|
||||
else:
|
||||
pid = _spawn(claimed, str(workspace))
|
||||
except (TypeError, ValueError):
|
||||
pid = _spawn(claimed, str(workspace))
|
||||
if pid:
|
||||
_set_worker_pid(conn, claimed.id, int(pid))
|
||||
result.spawned.append((claimed.id, claimed.assignee or "", str(workspace)))
|
||||
spawned += 1
|
||||
except Exception as exc:
|
||||
auto = _record_spawn_failure(
|
||||
conn, claimed.id, str(exc),
|
||||
failure_limit=failure_limit,
|
||||
)
|
||||
if auto:
|
||||
result.auto_blocked.append(claimed.id)
|
||||
return result
|
||||
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ from __future__ import annotations
|
||||
|
||||
import concurrent.futures
|
||||
import os
|
||||
import sqlite3
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
@@ -2055,3 +2056,179 @@ def test_dispatch_max_in_progress_none_is_unlimited(kanban_home, all_assignees_s
|
||||
kb.dispatch_once(conn, spawn_fn=fake_spawn, max_in_progress=None)
|
||||
|
||||
assert len(spawns) == 4, f"expected 4 spawns (unlimited), got {len(spawns)}"
|
||||
|
||||
# Review column dispatch
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _set_task_status(conn: sqlite3.Connection, task_id: str, status: str) -> None:
|
||||
"""Test helper: set a task's status directly."""
|
||||
conn.execute("UPDATE tasks SET status = ? WHERE id = ?", (status, task_id))
|
||||
|
||||
|
||||
def test_claim_review_task_transitions_to_running(kanban_home):
|
||||
"""claim_review_task atomically transitions review -> running."""
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="review me", assignee="alice")
|
||||
_set_task_status(conn, t, "review")
|
||||
claimed = kb.claim_review_task(conn, t)
|
||||
assert claimed is not None
|
||||
assert claimed.status == "running"
|
||||
assert claimed.claim_lock is not None
|
||||
|
||||
|
||||
def test_claim_review_task_fails_on_non_review(kanban_home):
|
||||
"""claim_review_task returns None if task is not in review status."""
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="ready task", assignee="alice")
|
||||
# Task is in 'ready', not 'review'
|
||||
claimed = kb.claim_review_task(conn, t)
|
||||
assert claimed is None
|
||||
|
||||
|
||||
def test_claim_review_task_fails_when_already_claimed(kanban_home):
|
||||
"""claim_review_task returns None if the task was already claimed."""
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="review me", assignee="alice")
|
||||
_set_task_status(conn, t, "review")
|
||||
first = kb.claim_review_task(conn, t)
|
||||
assert first is not None
|
||||
second = kb.claim_review_task(conn, t)
|
||||
assert second is None
|
||||
|
||||
|
||||
def test_dispatch_review_dry_run(kanban_home, all_assignees_spawnable):
|
||||
"""dispatch_once dry-run sees review tasks and reports them as spawned."""
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="review me", assignee="alice")
|
||||
_set_task_status(conn, t, "review")
|
||||
res = kb.dispatch_once(conn, dry_run=True)
|
||||
assert len(res.spawned) == 1
|
||||
assert res.spawned[0][0] == t
|
||||
# Dry run must NOT mutate status.
|
||||
with kb.connect() as conn:
|
||||
assert kb.get_task(conn, t).status == "review"
|
||||
|
||||
|
||||
def test_dispatch_review_spawns_with_correct_skills(
|
||||
kanban_home, all_assignees_spawnable,
|
||||
):
|
||||
"""Review tasks get sdlc-review skill set before spawning."""
|
||||
spawned_tasks = []
|
||||
|
||||
def capture_spawn(task, workspace, board=None):
|
||||
spawned_tasks.append(task)
|
||||
return 42 # fake PID
|
||||
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="review me", assignee="alice")
|
||||
_set_task_status(conn, t, "review")
|
||||
res = kb.dispatch_once(conn, spawn_fn=capture_spawn)
|
||||
assert len(res.spawned) == 1
|
||||
assert len(spawned_tasks) == 1
|
||||
assert spawned_tasks[0].skills == ["sdlc-review"]
|
||||
|
||||
|
||||
def test_dispatch_review_skips_unassigned(kanban_home):
|
||||
"""Unassigned review tasks go to skipped_unassigned, not spawned."""
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="review floater")
|
||||
_set_task_status(conn, t, "review")
|
||||
res = kb.dispatch_once(conn, dry_run=True)
|
||||
assert t in res.skipped_unassigned
|
||||
assert not res.spawned
|
||||
|
||||
|
||||
def test_dispatch_review_counts_toward_max_spawn(
|
||||
kanban_home, all_assignees_spawnable,
|
||||
):
|
||||
"""Review spawns count against max_spawn alongside ready tasks."""
|
||||
spawns = []
|
||||
|
||||
def fake_spawn(task, workspace, board=None):
|
||||
spawns.append(task.id)
|
||||
return 42
|
||||
|
||||
with kb.connect() as conn:
|
||||
# Create 2 ready tasks + 1 review task, max_spawn=2
|
||||
t1 = kb.create_task(conn, title="ready 1", assignee="alice")
|
||||
t2 = kb.create_task(conn, title="ready 2", assignee="bob")
|
||||
t3 = kb.create_task(conn, title="review", assignee="alice")
|
||||
_set_task_status(conn, t3, "review")
|
||||
res = kb.dispatch_once(conn, spawn_fn=fake_spawn, max_spawn=2)
|
||||
# Only 2 should spawn (ready tasks get priority in the loop)
|
||||
assert len(res.spawned) == 2
|
||||
assert len(spawns) == 2
|
||||
|
||||
|
||||
def test_dispatch_review_spawns_when_ready_empty(
|
||||
kanban_home, all_assignees_spawnable,
|
||||
):
|
||||
"""When only review tasks exist, they still get dispatched."""
|
||||
spawns = []
|
||||
|
||||
def fake_spawn(task, workspace, board=None):
|
||||
spawns.append(task.id)
|
||||
return 42
|
||||
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="review me", assignee="alice")
|
||||
_set_task_status(conn, t, "review")
|
||||
res = kb.dispatch_once(conn, spawn_fn=fake_spawn)
|
||||
assert len(res.spawned) == 1
|
||||
assert spawns[0] == t
|
||||
|
||||
|
||||
def test_has_spawnable_review_true(kanban_home):
|
||||
"""has_spawnable_review returns True when review tasks exist with real profiles."""
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="review me", assignee="default")
|
||||
_set_task_status(conn, t, "review")
|
||||
# default profile should exist in the test env
|
||||
assert kb.has_spawnable_review(conn) is True
|
||||
|
||||
|
||||
def test_has_spawnable_review_false_on_empty(kanban_home):
|
||||
"""has_spawnable_review returns False when no review tasks exist."""
|
||||
with kb.connect() as conn:
|
||||
assert kb.has_spawnable_review(conn) is False
|
||||
|
||||
|
||||
def test_has_spawnable_review_false_when_only_terminal_lanes(
|
||||
kanban_home, monkeypatch,
|
||||
):
|
||||
"""has_spawnable_review returns False when review tasks are terminal lanes."""
|
||||
from hermes_cli import profiles
|
||||
monkeypatch.setattr(profiles, "profile_exists", lambda name: False)
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="review", assignee="orion-cc")
|
||||
_set_task_status(conn, t, "review")
|
||||
assert kb.has_spawnable_review(conn) is False
|
||||
|
||||
|
||||
def test_dispatch_review_skips_nonspawnable(kanban_home, monkeypatch):
|
||||
"""Review tasks with non-existent profiles go to skipped_nonspawnable."""
|
||||
from hermes_cli import profiles
|
||||
monkeypatch.setattr(profiles, "profile_exists", lambda name: False)
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="review", assignee="orion-cc")
|
||||
_set_task_status(conn, t, "review")
|
||||
res = kb.dispatch_once(conn, dry_run=True)
|
||||
assert t in res.skipped_nonspawnable
|
||||
assert not res.spawned
|
||||
|
||||
|
||||
def test_review_status_in_valid_statuses():
|
||||
"""'review' is a valid task status."""
|
||||
assert "review" in kb.VALID_STATUSES
|
||||
|
||||
|
||||
def test_dispatch_review_does_not_claim_ready_tasks(
|
||||
kanban_home, all_assignees_spawnable,
|
||||
):
|
||||
"""Review dispatch uses claim_review_task, which only claims review tasks."""
|
||||
with kb.connect() as conn:
|
||||
t = kb.create_task(conn, title="ready task", assignee="alice")
|
||||
# claim_review_task should NOT claim a ready task
|
||||
claimed = kb.claim_review_task(conn, t)
|
||||
assert claimed is None
|
||||
|
||||
Reference in New Issue
Block a user