feat(kanban): wire dispatcher to dispatch review agents from review column

Salvages #23772 by @thewillhuang. Adds 'review' as a valid kanban task
status and extends dispatch_once to monitor the review column as a
second dispatch source (in addition to the existing ready column).

- Adds 'review' to VALID_STATUSES
- Adds claim_review_task() — atomically transitions review → running
- Adds has_spawnable_review() — health telemetry mirror
- Extends dispatch_once with a review column dispatch loop
- Review agents get 'sdlc-review' skill auto-loaded

Resolved 2 conflicts (VALID_STATUSES merge with main's 'scheduled' state,
test file additions). Adapted claim_review_task to main's
ttl_seconds: Optional[int] = None convention (matches claim_task).
This commit is contained in:
thewillhuang
2026-05-18 21:19:45 -07:00
committed by Teknium
parent 31fe229039
commit f55d94a1e0
3 changed files with 354 additions and 1 deletions
+2
View File
@@ -4954,6 +4954,8 @@ class GatewayRunner:
conn = _kb.connect(board=slug)
if _kb.has_spawnable_ready(conn):
return True
if _kb.has_spawnable_review(conn):
return True
except Exception:
continue
finally:
+175 -1
View File
@@ -94,7 +94,7 @@ _log = logging.getLogger(__name__)
# Constants
# ---------------------------------------------------------------------------
VALID_STATUSES = {"triage", "todo", "scheduled", "ready", "running", "blocked", "done", "archived"}
VALID_STATUSES = {"triage", "todo", "scheduled", "ready", "running", "blocked", "review", "done", "archived"}
VALID_INITIAL_STATUSES = {"running", "blocked"}
VALID_WORKSPACE_KINDS = {"scratch", "worktree", "dir"}
KNOWN_TOOLSET_NAMES = frozenset(name.casefold() for name in get_toolset_names())
@@ -2132,6 +2132,81 @@ def claim_task(
return get_task(conn, task_id)
def claim_review_task(
conn: sqlite3.Connection,
task_id: str,
*,
ttl_seconds: Optional[int] = None,
claimer: Optional[str] = None,
) -> Optional[Task]:
"""Atomically transition ``review -> running``.
Returns the claimed ``Task`` on success, ``None`` if the task was
already claimed (or is not in ``review`` status).
Unlike ``claim_task`` (which handles ``ready -> running``), this
does NOT check parent dependencies — the task already passed that
gate on its original ``todo -> ready -> running`` transition.
Creates a new run entry so the review agent's lifecycle is tracked
independently from the original worker run.
"""
now = int(time.time())
lock = claimer or _claimer_id()
expires = now + _resolve_claim_ttl_seconds(ttl_seconds)
with write_txn(conn):
cur = conn.execute(
"""
UPDATE tasks
SET status = 'running',
claim_lock = ?,
claim_expires = ?,
started_at = COALESCE(started_at, ?)
WHERE id = ?
AND status = 'review'
AND claim_lock IS NULL
""",
(lock, expires, now, task_id),
)
if cur.rowcount != 1:
return None
trow = conn.execute(
"SELECT assignee, max_runtime_seconds, current_step_key "
"FROM tasks WHERE id = ?",
(task_id,),
).fetchone()
run_cur = conn.execute(
"""
INSERT INTO task_runs (
task_id, profile, step_key, status,
claim_lock, claim_expires, max_runtime_seconds,
started_at
) VALUES (?, ?, ?, 'running', ?, ?, ?, ?)
""",
(
task_id,
trow["assignee"] if trow else None,
trow["current_step_key"] if trow else None,
lock,
expires,
trow["max_runtime_seconds"] if trow else None,
now,
),
)
run_id = run_cur.lastrowid
conn.execute(
"UPDATE tasks SET current_run_id = ? WHERE id = ?",
(run_id, task_id),
)
_append_event(
conn, task_id, "claimed",
{"lock": lock, "expires": expires, "run_id": run_id,
"source_status": "review"},
run_id=run_id,
)
return get_task(conn, task_id)
def heartbeat_claim(
conn: sqlite3.Connection,
task_id: str,
@@ -4165,6 +4240,31 @@ def has_spawnable_ready(conn: sqlite3.Connection) -> bool:
return False
def has_spawnable_review(conn: sqlite3.Connection) -> bool:
"""Return True iff there is at least one review+assigned+unclaimed task
whose assignee maps to a real Hermes profile.
Mirror of :func:`has_spawnable_ready` for the review column —
used by the health telemetry to decide whether the dispatcher
should have spawned a review agent.
"""
rows = conn.execute(
"SELECT DISTINCT assignee FROM tasks "
"WHERE status = 'review' AND assignee IS NOT NULL "
" AND claim_lock IS NULL"
).fetchall()
if not rows:
return False
try:
from hermes_cli.profiles import profile_exists # local import: avoids cycle
except Exception:
return True
for row in rows:
if profile_exists(row["assignee"]):
return True
return False
def dispatch_once(
conn: sqlite3.Connection,
*,
@@ -4364,6 +4464,80 @@ def dispatch_once(
)
if auto:
result.auto_blocked.append(claimed.id)
# ---- review column dispatch ----
# Review tasks are tasks that a worker moved to 'review' after
# creating a PR. The dispatcher spawns a review agent (loading
# sdlc-review skill) that verifies the PR and either merges (→ done)
# or rejects (→ back to running for the worker to fix).
#
# Same concurrency model as ready dispatch: review spawns count
# against max_spawn alongside ready tasks, so the total number of
# running workers stays bounded.
review_rows = conn.execute(
"SELECT id, assignee FROM tasks "
"WHERE status = 'review' AND claim_lock IS NULL "
"ORDER BY priority DESC, created_at ASC"
).fetchall()
for row in review_rows:
if max_spawn is not None and running_count + spawned >= max_spawn:
break
if not row["assignee"]:
result.skipped_unassigned.append(row["id"])
continue
try:
from hermes_cli.profiles import profile_exists
except Exception:
profile_exists = None # type: ignore[assignment]
if profile_exists is not None and not profile_exists(row["assignee"]):
result.skipped_nonspawnable.append(row["id"])
continue
if dry_run:
result.spawned.append((row["id"], row["assignee"], ""))
continue
claimed = claim_review_task(conn, row["id"], ttl_seconds=ttl_seconds)
if claimed is None:
continue
try:
workspace = resolve_workspace(claimed, board=board)
except Exception as exc:
auto = _record_spawn_failure(
conn, claimed.id, f"workspace: {exc}",
failure_limit=failure_limit,
)
if auto:
result.auto_blocked.append(claimed.id)
continue
# Persist the resolved workspace path so the worker can cd there.
set_workspace_path(conn, claimed.id, str(workspace))
# Force-load sdlc-review skill for review agents. The
# _default_spawn function already auto-loads kanban-worker, and
# appends task.skills via --skills. Setting task.skills here
# means the review agent gets both kanban-worker (lifecycle)
# and sdlc-review (review logic: AC verification, merge, etc.).
claimed.skills = ["sdlc-review"]
_spawn = spawn_fn if spawn_fn is not None else _default_spawn
try:
import inspect
try:
sig = inspect.signature(_spawn)
if "board" in sig.parameters:
pid = _spawn(claimed, str(workspace), board=board)
else:
pid = _spawn(claimed, str(workspace))
except (TypeError, ValueError):
pid = _spawn(claimed, str(workspace))
if pid:
_set_worker_pid(conn, claimed.id, int(pid))
result.spawned.append((claimed.id, claimed.assignee or "", str(workspace)))
spawned += 1
except Exception as exc:
auto = _record_spawn_failure(
conn, claimed.id, str(exc),
failure_limit=failure_limit,
)
if auto:
result.auto_blocked.append(claimed.id)
return result
+177
View File
@@ -4,6 +4,7 @@ from __future__ import annotations
import concurrent.futures
import os
import sqlite3
import time
from pathlib import Path
@@ -2055,3 +2056,179 @@ def test_dispatch_max_in_progress_none_is_unlimited(kanban_home, all_assignees_s
kb.dispatch_once(conn, spawn_fn=fake_spawn, max_in_progress=None)
assert len(spawns) == 4, f"expected 4 spawns (unlimited), got {len(spawns)}"
# Review column dispatch
# ---------------------------------------------------------------------------
def _set_task_status(conn: sqlite3.Connection, task_id: str, status: str) -> None:
"""Test helper: set a task's status directly."""
conn.execute("UPDATE tasks SET status = ? WHERE id = ?", (status, task_id))
def test_claim_review_task_transitions_to_running(kanban_home):
"""claim_review_task atomically transitions review -> running."""
with kb.connect() as conn:
t = kb.create_task(conn, title="review me", assignee="alice")
_set_task_status(conn, t, "review")
claimed = kb.claim_review_task(conn, t)
assert claimed is not None
assert claimed.status == "running"
assert claimed.claim_lock is not None
def test_claim_review_task_fails_on_non_review(kanban_home):
"""claim_review_task returns None if task is not in review status."""
with kb.connect() as conn:
t = kb.create_task(conn, title="ready task", assignee="alice")
# Task is in 'ready', not 'review'
claimed = kb.claim_review_task(conn, t)
assert claimed is None
def test_claim_review_task_fails_when_already_claimed(kanban_home):
"""claim_review_task returns None if the task was already claimed."""
with kb.connect() as conn:
t = kb.create_task(conn, title="review me", assignee="alice")
_set_task_status(conn, t, "review")
first = kb.claim_review_task(conn, t)
assert first is not None
second = kb.claim_review_task(conn, t)
assert second is None
def test_dispatch_review_dry_run(kanban_home, all_assignees_spawnable):
"""dispatch_once dry-run sees review tasks and reports them as spawned."""
with kb.connect() as conn:
t = kb.create_task(conn, title="review me", assignee="alice")
_set_task_status(conn, t, "review")
res = kb.dispatch_once(conn, dry_run=True)
assert len(res.spawned) == 1
assert res.spawned[0][0] == t
# Dry run must NOT mutate status.
with kb.connect() as conn:
assert kb.get_task(conn, t).status == "review"
def test_dispatch_review_spawns_with_correct_skills(
kanban_home, all_assignees_spawnable,
):
"""Review tasks get sdlc-review skill set before spawning."""
spawned_tasks = []
def capture_spawn(task, workspace, board=None):
spawned_tasks.append(task)
return 42 # fake PID
with kb.connect() as conn:
t = kb.create_task(conn, title="review me", assignee="alice")
_set_task_status(conn, t, "review")
res = kb.dispatch_once(conn, spawn_fn=capture_spawn)
assert len(res.spawned) == 1
assert len(spawned_tasks) == 1
assert spawned_tasks[0].skills == ["sdlc-review"]
def test_dispatch_review_skips_unassigned(kanban_home):
"""Unassigned review tasks go to skipped_unassigned, not spawned."""
with kb.connect() as conn:
t = kb.create_task(conn, title="review floater")
_set_task_status(conn, t, "review")
res = kb.dispatch_once(conn, dry_run=True)
assert t in res.skipped_unassigned
assert not res.spawned
def test_dispatch_review_counts_toward_max_spawn(
kanban_home, all_assignees_spawnable,
):
"""Review spawns count against max_spawn alongside ready tasks."""
spawns = []
def fake_spawn(task, workspace, board=None):
spawns.append(task.id)
return 42
with kb.connect() as conn:
# Create 2 ready tasks + 1 review task, max_spawn=2
t1 = kb.create_task(conn, title="ready 1", assignee="alice")
t2 = kb.create_task(conn, title="ready 2", assignee="bob")
t3 = kb.create_task(conn, title="review", assignee="alice")
_set_task_status(conn, t3, "review")
res = kb.dispatch_once(conn, spawn_fn=fake_spawn, max_spawn=2)
# Only 2 should spawn (ready tasks get priority in the loop)
assert len(res.spawned) == 2
assert len(spawns) == 2
def test_dispatch_review_spawns_when_ready_empty(
kanban_home, all_assignees_spawnable,
):
"""When only review tasks exist, they still get dispatched."""
spawns = []
def fake_spawn(task, workspace, board=None):
spawns.append(task.id)
return 42
with kb.connect() as conn:
t = kb.create_task(conn, title="review me", assignee="alice")
_set_task_status(conn, t, "review")
res = kb.dispatch_once(conn, spawn_fn=fake_spawn)
assert len(res.spawned) == 1
assert spawns[0] == t
def test_has_spawnable_review_true(kanban_home):
"""has_spawnable_review returns True when review tasks exist with real profiles."""
with kb.connect() as conn:
t = kb.create_task(conn, title="review me", assignee="default")
_set_task_status(conn, t, "review")
# default profile should exist in the test env
assert kb.has_spawnable_review(conn) is True
def test_has_spawnable_review_false_on_empty(kanban_home):
"""has_spawnable_review returns False when no review tasks exist."""
with kb.connect() as conn:
assert kb.has_spawnable_review(conn) is False
def test_has_spawnable_review_false_when_only_terminal_lanes(
kanban_home, monkeypatch,
):
"""has_spawnable_review returns False when review tasks are terminal lanes."""
from hermes_cli import profiles
monkeypatch.setattr(profiles, "profile_exists", lambda name: False)
with kb.connect() as conn:
t = kb.create_task(conn, title="review", assignee="orion-cc")
_set_task_status(conn, t, "review")
assert kb.has_spawnable_review(conn) is False
def test_dispatch_review_skips_nonspawnable(kanban_home, monkeypatch):
"""Review tasks with non-existent profiles go to skipped_nonspawnable."""
from hermes_cli import profiles
monkeypatch.setattr(profiles, "profile_exists", lambda name: False)
with kb.connect() as conn:
t = kb.create_task(conn, title="review", assignee="orion-cc")
_set_task_status(conn, t, "review")
res = kb.dispatch_once(conn, dry_run=True)
assert t in res.skipped_nonspawnable
assert not res.spawned
def test_review_status_in_valid_statuses():
"""'review' is a valid task status."""
assert "review" in kb.VALID_STATUSES
def test_dispatch_review_does_not_claim_ready_tasks(
kanban_home, all_assignees_spawnable,
):
"""Review dispatch uses claim_review_task, which only claims review tasks."""
with kb.connect() as conn:
t = kb.create_task(conn, title="ready task", assignee="alice")
# claim_review_task should NOT claim a ready task
claimed = kb.claim_review_task(conn, t)
assert claimed is None