fix(approval): peek _gateway_queues for session-level approval when _pending is empty

During active streaming, dangerous-command approvals go through the
gateway path and are stored in _gateway_queues as _ApprovalEntry
objects, not in _pending. The _resolve_approval_legacy helper only
looked at _pending, so 'Allow for this session' never called
approve_session() — the user clicked Allow, the card vanished, but
the next dangerous command asked again.

Now when _pending has no matching entry, the helper peeks into
_gateway_queues to extract pattern_keys, calls approve_session(),
and marks found_target=True so resolve_gateway_approval also fires.

This commit is re-scoped to peek-only (no agent_session_key round-trip,
no state_db metadata changes).

Includes:
- Import + fallback for _gateway_queues
- Null-safe key filtering in all_keys
- Source-contract test (static) + functional test with
  @requires_agent_modules skip marker for CI
- All comments and docstrings in English
This commit is contained in:
keyos
2026-05-19 20:23:46 +00:00
parent 0310fcc466
commit 729ed415ff
2 changed files with 107 additions and 10 deletions
+33 -10
View File
@@ -2277,6 +2277,7 @@ try:
_pending,
_lock,
_permanent_approved,
_gateway_queues,
resolve_gateway_approval,
enable_session_yolo,
disable_session_yolo,
@@ -2295,6 +2296,7 @@ except ImportError:
_pending = {}
_lock = threading.Lock()
_permanent_approved = set()
_gateway_queues = {}
# ── Approval SSE subscribers (long-connection push) ──────────────────────────
@@ -8739,6 +8741,7 @@ def _resolve_approval_legacy(sid: str, approval_id: str, choice: str) -> bool:
# that omit approval_id still resolve the oldest entry for compatibility.
pending = None
found_target = False
gateway_keys = []
with _lock:
queue = _pending.get(sid)
if isinstance(queue, list):
@@ -8764,6 +8767,25 @@ def _resolve_approval_legacy(sid: str, approval_id: str, choice: str) -> bool:
if not approval_id or queue.get("approval_id") == approval_id:
pending = _pending.pop(sid, None)
found_target = pending is not None
# When no _pending entry found, peek into _gateway_queues for
# pattern_keys so session-level approval still works. The gateway
# path is the primary mechanism during active streaming; _pending
# is only used for UI polling/SSE notification.
# NOTE: Gateway queue entries don't carry approval_id, so when
# approval_id is given and _pending is empty, we assume the gateway
# entry at the head of the queue corresponds. This is safe because
# gateway entries are consumed synchronously with _pending entries
# under the same lock — there is no interleaving where a stale
# approval_id could match a different gateway entry.
if not pending:
gw_queue = _gateway_queues.get(sid)
if gw_queue and len(gw_queue) > 0:
gw_entry = gw_queue[0]
# _gateway_queues stores _ApprovalEntry objects; their
# .data dict carries command, pattern_key, pattern_keys.
gw_data = getattr(gw_entry, 'data', None) or {}
gateway_keys = gw_data.get("pattern_keys") or [gw_data.get("pattern_key", "")] if gw_data else []
found_target = True
# Notify SSE subscribers of the new head (or empty state) so the UI
# surfaces any trailing approvals that were queued behind this one
# without waiting for the next submit_pending. Without this, a parallel
@@ -8775,16 +8797,17 @@ def _resolve_approval_legacy(sid: str, approval_id: str, choice: str) -> bool:
else:
_approval_sse_notify_locked(sid, None, 0)
if pending:
keys = pending.get("pattern_keys") or [pending.get("pattern_key", "")]
if choice in ("once", "session"):
for k in keys:
approve_session(sid, k)
elif choice == "always":
for k in keys:
approve_session(sid, k)
approve_permanent(k)
save_permanent_allowlist(_permanent_approved)
# Collect keys from both _pending and _gateway_queues
keys_from_pending = pending.get("pattern_keys") or [pending.get("pattern_key", "")] if pending else []
all_keys = [k for k in keys_from_pending if k] + [k for k in gateway_keys if k]
if choice in ("once", "session"):
for k in all_keys:
approve_session(sid, k)
elif choice == "always":
for k in all_keys:
approve_session(sid, k)
approve_permanent(k)
save_permanent_allowlist(_permanent_approved)
# Unblock the agent thread waiting in the gateway approval queue.
# This is the primary signal when streaming is active — the agent
# thread is parked in entry.event.wait() and needs to be woken up.
+74
View File
@@ -1,6 +1,8 @@
import importlib
import queue
from tests.conftest import requires_agent_modules
def test_runtime_adapter_interface_and_legacy_journal_methods_exist():
runtime = importlib.import_module("api.runtime_adapter")
@@ -271,6 +273,78 @@ def test_approval_respond_does_not_fallback_to_oldest_when_explicit_id_is_stale(
assert "queue.pop(0)" not in stale_branch
def test_approval_respond_peeks_gateway_queues_when_pending_empty() -> None:
"""When _pending has no matching entry but _gateway_queues does, the
helper should extract pattern_keys from the gateway queue and call
approve_session even though pending is None.
"""
routes = importlib.import_module("api.routes")
src = (routes.Path(__file__).parent.parent / "api" / "routes.py").read_text(encoding="utf-8")
helper_idx = src.index("def _resolve_approval_legacy")
helper_body = src[helper_idx:src.index("def _handle_approval_respond", helper_idx)]
assert "_gateway_queues" in helper_body, (
"_resolve_approval_legacy must reference _gateway_queues "
"to read pattern_keys when _pending is empty"
)
assert "gateway_keys" in helper_body, (
"Must extract pattern_keys from _gateway_queues into a gateway_keys variable"
)
assert "approve_session" in helper_body[helper_body.index("all_keys"):], (
"Must call approve_session for keys extracted from _gateway_queues"
)
@requires_agent_modules
def test_approval_respond_approves_from_gateway_queues_when_pending_empty() -> None:
"""Verify _resolve_approval_legacy peeks into _gateway_queues for
pattern_keys when _pending has no matching entry, and calls
approve_session() even though pending is None (the real streaming case).
"""
import threading
from api.routes import _resolve_approval_legacy
routes = importlib.import_module("api.routes")
approval_mod = importlib.import_module("tools.approval")
test_sid = "__test_gateway_approval_sid__"
test_key = "__test_pattern_key__"
# 1. Ensure _pending is empty for this sid
with approval_mod._lock:
approval_mod._pending.pop(test_sid, None)
# 2. Populate _gateway_queues with a real entry
entry = approval_mod._ApprovalEntry({
"command": "test_cmd",
"pattern_key": test_key,
"pattern_keys": [test_key],
"description": "test dangerous cmd",
})
with approval_mod._lock:
approval_mod._gateway_queues.setdefault(test_sid, []).append(entry)
try:
# 3. Run the helper with empty _pending but populated _gateway_queues
result = _resolve_approval_legacy(test_sid, "", "session")
# 4. Verify approve_session was called (is_approved must return True)
assert approval_mod.is_approved(test_sid, test_key), (
"approve_session should have been called for the pattern_key "
"extracted from _gateway_queues"
)
assert result is True, (
"_resolve_approval_legacy should return True when it finds "
"and resolves the gateway entry"
)
finally:
# 5. Cleanup
with approval_mod._lock:
approval_mod._gateway_queues.pop(test_sid, None)
approval_mod._session_approved.pop(test_sid, None)
approval_mod._pending.pop(test_sid, None)
def test_chat_start_route_selects_adapter_only_when_flag_enabled():
routes = importlib.import_module("api.routes")
src = (routes.Path(__file__).parent.parent / "api" / "routes.py").read_text(encoding="utf-8")