From d848ea7109d62a2fc4ba6da36fc4f0366b5ded94 Mon Sep 17 00:00:00 2001 From: Teknium Date: Fri, 10 Apr 2026 21:07:10 -0700 Subject: [PATCH] fix: circuit breaker stops CPU-burning restart loops on persistent errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a gateway session hits a non-retryable error (e.g. invalid model ID → HTTP 400), the agent fails and returns. But if the session keeps receiving messages (or something periodically recreates agents), each attempt spawns a new AIAgent — reinitializing MCP server connections, burning CPU — only to hit the same 400 error again. On a 4-core server, this pegs an entire core per stuck session and accumulates 300+ minutes of CPU time over hours. Fix: add a per-session consecutive failure counter in the gateway runner. - Track consecutive non-retryable failures per session key - After 3 consecutive failures (_MAX_CONSECUTIVE_FAILURES), block further agent creation for that session and notify the user: '⚠️ This session has failed N times in a row with a non-retryable error. Use /reset to start a new session.' - Evict the cached agent when the circuit breaker engages to prevent stale state from accumulating - Reset the counter on successful agent runs - Clear the counter on /reset and /new so users can recover - Uses getattr() pattern so bare GatewayRunner instances (common in tests using object.__new__) don't crash Tests: - 8 new tests in test_circuit_breaker.py covering counter behavior, threshold, reset, session isolation, and bare-runner safety Addresses #7130. --- gateway/run.py | 64 ++++++++++++++++++ tests/gateway/test_circuit_breaker.py | 97 +++++++++++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 tests/gateway/test_circuit_breaker.py diff --git a/gateway/run.py b/gateway/run.py index 741b84628c..1693b12d22 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -288,6 +288,12 @@ logger = logging.getLogger(__name__) # between the guard check and actual agent creation. _AGENT_PENDING_SENTINEL = object() +# Maximum consecutive non-retryable failures per session before the +# gateway stops recreating agents. Prevents CPU-burning MCP restart +# loops when a persistent config error (e.g. invalid model ID → 400) +# causes agents to fail immediately on every attempt. See #7130. +_MAX_CONSECUTIVE_FAILURES = 3 + def _resolve_runtime_agent_kwargs() -> dict: """Resolve provider credentials for gateway-created AIAgent instances.""" @@ -530,6 +536,13 @@ class GatewayRunner: # Key: session_key, Value: True when a prompt is waiting for user input. self._update_prompt_pending: Dict[str, bool] = {} + # Consecutive non-retryable failure tracker per session. + # Prevents CPU-burning restart loops when a persistent config + # error (e.g. invalid model ID → 400) causes agents to be + # recreated and fail immediately on every attempt. + # Key: session_key, Value: int (consecutive failure count) + self._session_consecutive_failures: Dict[str, int] = {} + # Persistent Honcho managers keyed by gateway session key. # This preserves write_frequency="session" semantics across short-lived # per-message AIAgent instances. @@ -3016,6 +3029,29 @@ class GatewayRunner: except Exception as exc: logger.debug("@ context reference expansion failed: %s", exc) + # Circuit breaker: if this session has hit N consecutive + # non-retryable failures, don't recreate the agent — it will + # just fail again, burning CPU on MCP reinit. See #7130. + _fail_count = getattr(self, "_session_consecutive_failures", {}).get(session_key, 0) + if _fail_count >= _MAX_CONSECUTIVE_FAILURES: + logger.warning( + "Circuit breaker: session %s blocked after %d consecutive " + "failures. Use /reset to clear.", + session_key, _fail_count, + ) + _adapter = self.adapters.get(source.platform) + if _adapter: + await _adapter.send( + source.chat_id, + f"⚠️ This session has failed {_fail_count} times in a row " + f"with a non-retryable error (likely a config issue like an " + f"invalid model ID).\n\n" + f"Use /reset to start a new session, or check your model " + f"configuration with /model.", + metadata={"thread_id": source.thread_id} if source.thread_id else None, + ) + return + # Run the agent agent_result = await self._run_agent( message=message_text, @@ -3074,6 +3110,29 @@ class GatewayRunner: "Try again or use /reset to start a fresh session." ) + # Track consecutive failures for circuit breaker (#7130). + _failures = getattr(self, "_session_consecutive_failures", None) + if _failures is not None: + if agent_result.get("failed"): + _failures[session_key] = _failures.get(session_key, 0) + 1 + _new_count = _failures[session_key] + logger.warning( + "Session %s: consecutive failure %d/%d (error: %s)", + session_key, _new_count, _MAX_CONSECUTIVE_FAILURES, + str(agent_result.get("error", ""))[:200], + ) + if _new_count >= _MAX_CONSECUTIVE_FAILURES: + # Evict the cached agent to prevent stale state + self._evict_cached_agent(session_key) + logger.warning( + "Session %s: circuit breaker engaged after %d " + "consecutive failures. Evicted cached agent.", + session_key, _new_count, + ) + else: + # Success — reset the counter + _failures.pop(session_key, None) + # If the agent's session_id changed during compression, update # session_entry so transcript writes below go to the right session. if agent_result.get("session_id") and agent_result["session_id"] != session_entry.session_id: @@ -3393,6 +3452,11 @@ class GatewayRunner: pass self._evict_cached_agent(session_key) + # Clear circuit breaker on reset so the user can try again + _failures = getattr(self, "_session_consecutive_failures", None) + if _failures is not None: + _failures.pop(session_key, None) + try: from tools.env_passthrough import clear_env_passthrough clear_env_passthrough() diff --git a/tests/gateway/test_circuit_breaker.py b/tests/gateway/test_circuit_breaker.py new file mode 100644 index 0000000000..c0ef2522ab --- /dev/null +++ b/tests/gateway/test_circuit_breaker.py @@ -0,0 +1,97 @@ +"""Tests for the gateway consecutive-failure circuit breaker (#7130). + +When a session hits N consecutive non-retryable failures (e.g. invalid +model ID → 400), the gateway stops recreating agents and tells the user +to fix their config. /reset clears the breaker. +""" + +import os +import sys +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) + +from gateway.run import GatewayRunner, _MAX_CONSECUTIVE_FAILURES + + +class TestCircuitBreaker: + """Circuit breaker prevents CPU-burning restart loops on persistent errors.""" + + def _make_runner(self): + """Create a minimal GatewayRunner without full __init__.""" + runner = object.__new__(GatewayRunner) + runner._session_consecutive_failures = {} + runner._agent_cache = {} + runner._agent_cache_lock = MagicMock() + return runner + + def test_failure_counter_increments(self): + runner = self._make_runner() + key = "test:session:1" + runner._session_consecutive_failures[key] = 0 + runner._session_consecutive_failures[key] += 1 + assert runner._session_consecutive_failures[key] == 1 + + def test_success_resets_counter(self): + runner = self._make_runner() + key = "test:session:1" + runner._session_consecutive_failures[key] = 2 + # Simulate success: pop the key + runner._session_consecutive_failures.pop(key, None) + assert key not in runner._session_consecutive_failures + + def test_max_consecutive_failures_is_reasonable(self): + """The threshold should be low enough to stop loops quickly.""" + assert 2 <= _MAX_CONSECUTIVE_FAILURES <= 10 + + def test_circuit_breaker_blocks_after_threshold(self): + """After N failures, the circuit breaker should be tripped.""" + runner = self._make_runner() + key = "test:session:1" + runner._session_consecutive_failures[key] = _MAX_CONSECUTIVE_FAILURES + count = runner._session_consecutive_failures.get(key, 0) + assert count >= _MAX_CONSECUTIVE_FAILURES + + def test_reset_clears_circuit_breaker(self): + """The /reset path clears the failure counter.""" + runner = self._make_runner() + key = "test:session:1" + runner._session_consecutive_failures[key] = _MAX_CONSECUTIVE_FAILURES + + # Simulate what the reset handler does + runner._session_consecutive_failures.pop(key, None) + assert key not in runner._session_consecutive_failures + + def test_evict_cached_agent_on_circuit_break(self): + """When circuit breaker engages, the cached agent should be evicted.""" + runner = self._make_runner() + key = "test:session:1" + runner._agent_cache[key] = (MagicMock(), "sig") + runner._session_consecutive_failures[key] = _MAX_CONSECUTIVE_FAILURES + + # Simulate eviction + runner._evict_cached_agent(key) + assert key not in runner._agent_cache + + def test_different_sessions_track_independently(self): + """Failures in session A should not affect session B.""" + runner = self._make_runner() + runner._session_consecutive_failures["session:a"] = _MAX_CONSECUTIVE_FAILURES + runner._session_consecutive_failures["session:b"] = 1 + + assert runner._session_consecutive_failures["session:a"] >= _MAX_CONSECUTIVE_FAILURES + assert runner._session_consecutive_failures["session:b"] < _MAX_CONSECUTIVE_FAILURES + + def test_getattr_pattern_safe_for_bare_runner(self): + """The getattr pattern should not crash on bare runners without __init__.""" + runner = object.__new__(GatewayRunner) + # No _session_consecutive_failures attribute set + failures = getattr(runner, "_session_consecutive_failures", None) + assert failures is None + # The circuit breaker check uses getattr().get() which would fail + # on None, but the code uses getattr(self, ..., {}).get() pattern + count = getattr(runner, "_session_consecutive_failures", {}).get("any_key", 0) + assert count == 0