From 5fdcfd851f7e693fb72a9ea6a6ef25142e63259e Mon Sep 17 00:00:00 2001 From: SimbaKingjoe <126222269+SimbaKingjoe@users.noreply.github.com> Date: Mon, 18 May 2026 20:50:08 -0700 Subject: [PATCH] feat(kanban): add max_in_progress config to cap concurrent running tasks Salvages #22981 by @SimbaKingjoe. Adds 'kanban.max_in_progress' config that caps simultaneously running tasks. When the board already has N running, dispatcher skips spawning so slow workers (local LLMs, resource-constrained hosts) don't pile up and time out. Threads through dispatch_once(max_in_progress=) and gateway dispatcher config parsing with validation (warns on invalid/below-1 values). --- gateway/run.py | 26 +++++++++++++ hermes_cli/kanban_db.py | 15 ++++++++ tests/hermes_cli/test_kanban_db.py | 61 ++++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+) diff --git a/gateway/run.py b/gateway/run.py index 9512060d7c..091300af16 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4776,6 +4776,31 @@ class GatewayRunner: if max_spawn is not None: logger.info(f"kanban dispatcher: max_spawn={max_spawn}") + # Cap the number of simultaneously running tasks so slow workers + # (local LLMs, resource-constrained hosts) don't pile up and time + # out. When set, the dispatcher skips spawning when the board + # already has this many tasks in 'running' status. + raw_max_in_progress = kanban_cfg.get("max_in_progress", None) + max_in_progress = None + if raw_max_in_progress is not None: + try: + max_in_progress = int(raw_max_in_progress) + except (TypeError, ValueError): + logger.warning( + "kanban dispatcher: invalid kanban.max_in_progress=%r; ignoring", + raw_max_in_progress, + ) + max_in_progress = None + else: + if max_in_progress < 1: + logger.warning( + "kanban dispatcher: kanban.max_in_progress=%r is below 1; ignoring", + raw_max_in_progress, + ) + max_in_progress = None + else: + logger.info(f"kanban dispatcher: max_in_progress={max_in_progress}") + raw_failure_limit = kanban_cfg.get("failure_limit", _kb.DEFAULT_FAILURE_LIMIT) try: failure_limit = int(raw_failure_limit) @@ -4828,6 +4853,7 @@ class GatewayRunner: conn, board=slug, max_spawn=max_spawn, + max_in_progress=max_in_progress, failure_limit=failure_limit, ) except Exception: diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index acdac80c78..34961013d4 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -4112,6 +4112,7 @@ def dispatch_once( ttl_seconds: Optional[int] = None, dry_run: bool = False, max_spawn: Optional[int] = None, + max_in_progress: Optional[int] = None, failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT, board: Optional[str] = None, ) -> DispatchResult: @@ -4209,6 +4210,20 @@ def dispatch_once( "WHERE status = 'ready' AND claim_lock IS NULL " "ORDER BY priority DESC, created_at ASC" ).fetchall() + # Honour kanban.max_in_progress: if the board already has enough running + # tasks, skip spawning this tick so slow workers (local LLMs, + # resource-constrained hosts) can finish what they have before more tasks + # pile up and time out. + if max_in_progress is not None and ready_rows: + in_progress = conn.execute( + "SELECT COUNT(*) FROM tasks WHERE status = 'running'" + ).fetchone()[0] + if in_progress >= max_in_progress: + return result + # Only spawn enough to reach the cap, respecting max_spawn too. + remaining = max_in_progress - in_progress + if max_spawn is None or max_spawn > remaining: + max_spawn = remaining spawned = 0 for row in ready_rows: if max_spawn is not None and running_count + spawned >= max_spawn: diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index 384f026a14..3be740cc1e 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -1890,3 +1890,64 @@ def test_create_task_with_explicit_workspace_ignores_board_default(kanban_home): assert t is not None assert t.workspace_path == explicit assert t.workspace_path != "/board/default" + + +# --------------------------------------------------------------------------- +# dispatch_once — max_in_progress +# --------------------------------------------------------------------------- + + +def test_dispatch_max_in_progress_skips_when_at_limit(kanban_home, all_assignees_spawnable): + """When max_in_progress=N and N tasks are already running, spawn nothing.""" + spawns = [] + + def fake_spawn(task, workspace): + spawns.append(task.id) + + with kb.connect() as conn: + # Two running tasks. + t1 = kb.create_task(conn, title="a", assignee="alice") + t2 = kb.create_task(conn, title="b", assignee="bob") + kb.claim_task(conn, t1) + kb.claim_task(conn, t2) + # Two more ready to spawn — but cap is 2 so none should fire. + kb.create_task(conn, title="c", assignee="bob") + kb.create_task(conn, title="d", assignee="alice") + kb.dispatch_once(conn, spawn_fn=fake_spawn, max_in_progress=2) + + assert len(spawns) == 0, f"expected 0 spawns, got {len(spawns)}" + + +def test_dispatch_max_in_progress_spawns_up_to_cap(kanban_home, all_assignees_spawnable): + """When max_in_progress=3 and only 1 is running, spawn up to 2 more.""" + spawns = [] + + def fake_spawn(task, workspace): + spawns.append(task.id) + + with kb.connect() as conn: + # One running task. + t1 = kb.create_task(conn, title="a", assignee="alice") + kb.claim_task(conn, t1) + # Three ready tasks — only the first 2 should be spawned. + kb.create_task(conn, title="b", assignee="bob") + kb.create_task(conn, title="c", assignee="bob") + kb.create_task(conn, title="d", assignee="bob") + kb.dispatch_once(conn, spawn_fn=fake_spawn, max_in_progress=3) + + assert len(spawns) == 2, f"expected 2 spawns (cap 3 - 1 running), got {len(spawns)}" + + +def test_dispatch_max_in_progress_none_is_unlimited(kanban_home, all_assignees_spawnable): + """Default None means no limit — all ready tasks are spawned.""" + spawns = [] + + def fake_spawn(task, workspace): + spawns.append(task.id) + + with kb.connect() as conn: + for title in ["a", "b", "c", "d"]: + kb.create_task(conn, title=title, assignee="alice") + kb.dispatch_once(conn, spawn_fn=fake_spawn, max_in_progress=None) + + assert len(spawns) == 4, f"expected 4 spawns (unlimited), got {len(spawns)}"