fix: circuit breaker stops CPU-burning restart loops on persistent errors

When a gateway session hits a non-retryable error (e.g. invalid model
ID → HTTP 400), the agent fails and returns. But if the session keeps
receiving messages (or something periodically recreates agents), each
attempt spawns a new AIAgent — reinitializing MCP server connections,
burning CPU — only to hit the same 400 error again. On a 4-core server,
this pegs an entire core per stuck session and accumulates 300+ minutes
of CPU time over hours.

Fix: add a per-session consecutive failure counter in the gateway runner.

- Track consecutive non-retryable failures per session key
- After 3 consecutive failures (_MAX_CONSECUTIVE_FAILURES), block
  further agent creation for that session and notify the user:
  '⚠️ This session has failed N times in a row with a non-retryable
  error. Use /reset to start a new session.'
- Evict the cached agent when the circuit breaker engages to prevent
  stale state from accumulating
- Reset the counter on successful agent runs
- Clear the counter on /reset and /new so users can recover
- Uses getattr() pattern so bare GatewayRunner instances (common in
  tests using object.__new__) don't crash

Tests:
- 8 new tests in test_circuit_breaker.py covering counter behavior,
  threshold, reset, session isolation, and bare-runner safety

Addresses #7130.
This commit is contained in:
Teknium
2026-04-10 21:07:10 -07:00
parent 79198eb3a0
commit d848ea7109
2 changed files with 161 additions and 0 deletions
+64
View File
@@ -288,6 +288,12 @@ logger = logging.getLogger(__name__)
# between the guard check and actual agent creation.
_AGENT_PENDING_SENTINEL = object()
# Maximum consecutive non-retryable failures per session before the
# gateway stops recreating agents. Prevents CPU-burning MCP restart
# loops when a persistent config error (e.g. invalid model ID → 400)
# causes agents to fail immediately on every attempt. See #7130.
_MAX_CONSECUTIVE_FAILURES = 3
def _resolve_runtime_agent_kwargs() -> dict:
"""Resolve provider credentials for gateway-created AIAgent instances."""
@@ -530,6 +536,13 @@ class GatewayRunner:
# Key: session_key, Value: True when a prompt is waiting for user input.
self._update_prompt_pending: Dict[str, bool] = {}
# Consecutive non-retryable failure tracker per session.
# Prevents CPU-burning restart loops when a persistent config
# error (e.g. invalid model ID → 400) causes agents to be
# recreated and fail immediately on every attempt.
# Key: session_key, Value: int (consecutive failure count)
self._session_consecutive_failures: Dict[str, int] = {}
# Persistent Honcho managers keyed by gateway session key.
# This preserves write_frequency="session" semantics across short-lived
# per-message AIAgent instances.
@@ -3016,6 +3029,29 @@ class GatewayRunner:
except Exception as exc:
logger.debug("@ context reference expansion failed: %s", exc)
# Circuit breaker: if this session has hit N consecutive
# non-retryable failures, don't recreate the agent — it will
# just fail again, burning CPU on MCP reinit. See #7130.
_fail_count = getattr(self, "_session_consecutive_failures", {}).get(session_key, 0)
if _fail_count >= _MAX_CONSECUTIVE_FAILURES:
logger.warning(
"Circuit breaker: session %s blocked after %d consecutive "
"failures. Use /reset to clear.",
session_key, _fail_count,
)
_adapter = self.adapters.get(source.platform)
if _adapter:
await _adapter.send(
source.chat_id,
f"⚠️ This session has failed {_fail_count} times in a row "
f"with a non-retryable error (likely a config issue like an "
f"invalid model ID).\n\n"
f"Use /reset to start a new session, or check your model "
f"configuration with /model.",
metadata={"thread_id": source.thread_id} if source.thread_id else None,
)
return
# Run the agent
agent_result = await self._run_agent(
message=message_text,
@@ -3074,6 +3110,29 @@ class GatewayRunner:
"Try again or use /reset to start a fresh session."
)
# Track consecutive failures for circuit breaker (#7130).
_failures = getattr(self, "_session_consecutive_failures", None)
if _failures is not None:
if agent_result.get("failed"):
_failures[session_key] = _failures.get(session_key, 0) + 1
_new_count = _failures[session_key]
logger.warning(
"Session %s: consecutive failure %d/%d (error: %s)",
session_key, _new_count, _MAX_CONSECUTIVE_FAILURES,
str(agent_result.get("error", ""))[:200],
)
if _new_count >= _MAX_CONSECUTIVE_FAILURES:
# Evict the cached agent to prevent stale state
self._evict_cached_agent(session_key)
logger.warning(
"Session %s: circuit breaker engaged after %d "
"consecutive failures. Evicted cached agent.",
session_key, _new_count,
)
else:
# Success — reset the counter
_failures.pop(session_key, None)
# If the agent's session_id changed during compression, update
# session_entry so transcript writes below go to the right session.
if agent_result.get("session_id") and agent_result["session_id"] != session_entry.session_id:
@@ -3393,6 +3452,11 @@ class GatewayRunner:
pass
self._evict_cached_agent(session_key)
# Clear circuit breaker on reset so the user can try again
_failures = getattr(self, "_session_consecutive_failures", None)
if _failures is not None:
_failures.pop(session_key, None)
try:
from tools.env_passthrough import clear_env_passthrough
clear_env_passthrough()
+97
View File
@@ -0,0 +1,97 @@
"""Tests for the gateway consecutive-failure circuit breaker (#7130).
When a session hits N consecutive non-retryable failures (e.g. invalid
model ID → 400), the gateway stops recreating agents and tells the user
to fix their config. /reset clears the breaker.
"""
import os
import sys
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent))
from gateway.run import GatewayRunner, _MAX_CONSECUTIVE_FAILURES
class TestCircuitBreaker:
"""Circuit breaker prevents CPU-burning restart loops on persistent errors."""
def _make_runner(self):
"""Create a minimal GatewayRunner without full __init__."""
runner = object.__new__(GatewayRunner)
runner._session_consecutive_failures = {}
runner._agent_cache = {}
runner._agent_cache_lock = MagicMock()
return runner
def test_failure_counter_increments(self):
runner = self._make_runner()
key = "test:session:1"
runner._session_consecutive_failures[key] = 0
runner._session_consecutive_failures[key] += 1
assert runner._session_consecutive_failures[key] == 1
def test_success_resets_counter(self):
runner = self._make_runner()
key = "test:session:1"
runner._session_consecutive_failures[key] = 2
# Simulate success: pop the key
runner._session_consecutive_failures.pop(key, None)
assert key not in runner._session_consecutive_failures
def test_max_consecutive_failures_is_reasonable(self):
"""The threshold should be low enough to stop loops quickly."""
assert 2 <= _MAX_CONSECUTIVE_FAILURES <= 10
def test_circuit_breaker_blocks_after_threshold(self):
"""After N failures, the circuit breaker should be tripped."""
runner = self._make_runner()
key = "test:session:1"
runner._session_consecutive_failures[key] = _MAX_CONSECUTIVE_FAILURES
count = runner._session_consecutive_failures.get(key, 0)
assert count >= _MAX_CONSECUTIVE_FAILURES
def test_reset_clears_circuit_breaker(self):
"""The /reset path clears the failure counter."""
runner = self._make_runner()
key = "test:session:1"
runner._session_consecutive_failures[key] = _MAX_CONSECUTIVE_FAILURES
# Simulate what the reset handler does
runner._session_consecutive_failures.pop(key, None)
assert key not in runner._session_consecutive_failures
def test_evict_cached_agent_on_circuit_break(self):
"""When circuit breaker engages, the cached agent should be evicted."""
runner = self._make_runner()
key = "test:session:1"
runner._agent_cache[key] = (MagicMock(), "sig")
runner._session_consecutive_failures[key] = _MAX_CONSECUTIVE_FAILURES
# Simulate eviction
runner._evict_cached_agent(key)
assert key not in runner._agent_cache
def test_different_sessions_track_independently(self):
"""Failures in session A should not affect session B."""
runner = self._make_runner()
runner._session_consecutive_failures["session:a"] = _MAX_CONSECUTIVE_FAILURES
runner._session_consecutive_failures["session:b"] = 1
assert runner._session_consecutive_failures["session:a"] >= _MAX_CONSECUTIVE_FAILURES
assert runner._session_consecutive_failures["session:b"] < _MAX_CONSECUTIVE_FAILURES
def test_getattr_pattern_safe_for_bare_runner(self):
"""The getattr pattern should not crash on bare runners without __init__."""
runner = object.__new__(GatewayRunner)
# No _session_consecutive_failures attribute set
failures = getattr(runner, "_session_consecutive_failures", None)
assert failures is None
# The circuit breaker check uses getattr().get() which would fail
# on None, but the code uses getattr(self, ..., {}).get() pattern
count = getattr(runner, "_session_consecutive_failures", {}).get("any_key", 0)
assert count == 0