Merge PR #2077 into stage-338

Refactor compression anchor visibility helpers
by @franksong2702
This commit is contained in:
nesquena-hermes
2026-05-11 17:17:25 +00:00
4 changed files with 140 additions and 85 deletions
+77
View File
@@ -0,0 +1,77 @@
"""
Shared helpers for session compression anchor metadata.
"""
def _content_text(content, *, part_types):
if isinstance(content, list):
return "\n".join(
str(part.get("text") or part.get("content") or "")
for part in content
if isinstance(part, dict) and part.get("type") in part_types
).strip()
return str(content or "").strip()
def _content_has_part_type(content, part_types):
if not isinstance(content, list):
return False
return any(
isinstance(part, dict) and part.get("type") in part_types
for part in content
)
def visible_messages_for_anchor(messages, *, auto_compression: bool = False):
"""Return transcript messages that can anchor compression UI metadata.
Manual compression historically only counted plain ``text`` content parts
for non-assistant messages, while the streaming auto-compression path also
accepted provider-style ``input_text`` / ``output_text`` parts and metadata
markers on any non-tool role. Keep that difference explicit at the call site
instead of carrying two near-identical helper implementations.
"""
out = []
text_part_types = {"text", "input_text", "output_text"} if auto_compression else {"text"}
for message in messages or []:
if not isinstance(message, dict):
continue
role = message.get("role")
if not role or role == "tool":
continue
content = message.get("content", "")
has_attachments = bool(message.get("attachments"))
text = _content_text(content, part_types=text_part_types)
if auto_compression:
has_tool_calls = bool(
isinstance(message.get("tool_calls"), list) and message.get("tool_calls")
)
has_tool_use = _content_has_part_type(content, {"tool_use"})
has_reasoning = bool(message.get("reasoning"))
if not text:
has_reasoning = has_reasoning or _content_has_part_type(
content,
{"thinking", "reasoning"},
)
if text or has_attachments or has_tool_calls or has_tool_use or has_reasoning:
out.append(message)
continue
if role == "assistant":
has_tool_calls = bool(
isinstance(message.get("tool_calls"), list) and message.get("tool_calls")
)
has_tool_use = _content_has_part_type(content, {"tool_use"})
has_reasoning = bool(message.get("reasoning")) or _content_has_part_type(
content,
{"thinking", "reasoning"},
)
if text or has_attachments or has_tool_calls or has_tool_use or has_reasoning:
out.append(message)
continue
if text or has_attachments:
out.append(message)
return out
+2 -46
View File
@@ -28,6 +28,7 @@ from api.agent_sessions import (
is_cli_session_row_visible,
read_session_lineage_report,
)
from api.compression_anchor import visible_messages_for_anchor
logger = logging.getLogger(__name__)
@@ -7563,51 +7564,6 @@ def _handle_clarify_respond(handler, body):
def _handle_session_compress(handler, body):
def _visible_messages_for_anchor(messages):
out = []
for m in messages or []:
if not isinstance(m, dict):
continue
role = m.get("role")
if not role or role == "tool":
continue
content = m.get("content", "")
has_attachments = bool(m.get("attachments"))
if role == "assistant":
tool_calls = m.get("tool_calls")
has_tool_calls = isinstance(tool_calls, list) and len(tool_calls) > 0
has_tool_use = False
has_reasoning = bool(m.get("reasoning"))
if isinstance(content, list):
for p in content:
if not isinstance(p, dict):
continue
if p.get("type") == "tool_use":
has_tool_use = True
if p.get("type") in {"thinking", "reasoning"}:
has_reasoning = True
text = "\n".join(
str(p.get("text") or p.get("content") or "")
for p in content
if isinstance(p, dict) and p.get("type") == "text"
).strip()
else:
text = str(content or "").strip()
if text or has_attachments or has_tool_calls or has_tool_use or has_reasoning:
out.append(m)
continue
if isinstance(content, list):
text = "\n".join(
str(p.get("text") or p.get("content") or "")
for p in content
if isinstance(p, dict) and p.get("type") == "text"
).strip()
else:
text = str(content or "").strip()
if text or has_attachments:
out.append(m)
return out
def _anchor_message_key(m):
if not isinstance(m, dict):
return None
@@ -7846,7 +7802,7 @@ def _handle_session_compress(handler, body):
s.pending_user_message = None
s.pending_attachments = []
s.pending_started_at = None
visible_after = _visible_messages_for_anchor(compressed)
visible_after = visible_messages_for_anchor(compressed, auto_compression=False)
s.compression_anchor_visible_idx = max(0, len(visible_after) - 1) if visible_after else None
s.compression_anchor_message_key = _anchor_message_key(visible_after[-1]) if visible_after else None
summary_text = None
+2 -39
View File
@@ -33,6 +33,7 @@ from api.config import (
model_with_provider_context,
)
from api.helpers import redact_session_data, _redact_text
from api.compression_anchor import visible_messages_for_anchor
from api.metering import meter
# Global lock for os.environ writes. Per-session locks (_agent_lock) prevent
@@ -1606,44 +1607,6 @@ def _compression_anchor_message_key(message):
return {'role': role, 'ts': ts, 'text': text, 'attachments': attach_count}
def _visible_messages_for_compression_anchor(messages):
out = []
for m in messages or []:
if not isinstance(m, dict):
continue
role = m.get('role')
if not role or role == 'tool':
continue
content = m.get('content', '')
has_attachments = bool(m.get('attachments'))
has_tool_calls = bool(isinstance(m.get('tool_calls'), list) and m.get('tool_calls'))
has_tool_use = False
has_reasoning = bool(m.get('reasoning'))
if isinstance(content, list):
text = '\n'.join(
str(p.get('text') or p.get('content') or '')
for p in content
if isinstance(p, dict)
and p.get('type') in {'text', 'input_text', 'output_text'}
).strip()
for part in content:
if not isinstance(part, dict):
continue
if part.get('type') == 'tool_use':
has_tool_use = True
if not text:
has_reasoning = has_reasoning or any(
isinstance(part, dict)
and part.get('type') in {'thinking', 'reasoning'}
for part in content
)
else:
text = str(content or '').strip()
if text or has_attachments or has_tool_calls or has_tool_use or has_reasoning:
out.append(m)
return out
def _compression_summary_from_messages(messages):
for m in reversed(messages or []):
if not isinstance(m, dict):
@@ -3360,7 +3323,7 @@ def _run_agent_streaming(
_compressed = True
# Notify the frontend that compression happened
if _compressed:
visible_after = _visible_messages_for_compression_anchor(s.messages)
visible_after = visible_messages_for_anchor(s.messages, auto_compression=True)
s.compression_anchor_visible_idx = (
max(0, len(visible_after) - 1) if visible_after else None
)
@@ -0,0 +1,59 @@
"""
Regression coverage for shared compression-anchor visibility helpers (#2028).
"""
from pathlib import Path
from api.compression_anchor import visible_messages_for_anchor
def test_legacy_duplicate_anchor_helpers_are_removed():
routes_src = Path("api/routes.py").read_text(encoding="utf-8")
streaming_src = Path("api/streaming.py").read_text(encoding="utf-8")
assert "def _visible_messages_for_anchor" not in routes_src
assert "def _visible_messages_for_compression_anchor" not in streaming_src
assert "visible_messages_for_anchor(compressed, auto_compression=False)" in routes_src
assert "visible_messages_for_anchor(s.messages, auto_compression=True)" in streaming_src
def test_visible_messages_for_anchor_preserves_manual_text_part_filter():
text_only = {"role": "assistant", "content": [{"type": "text", "text": "Visible"}]}
input_only = {"role": "assistant", "content": [{"type": "input_text", "text": "Model input"}]}
reasoning_only = {"role": "assistant", "content": [{"type": "thinking", "text": "hidden"}]}
tool_use_only = {"role": "assistant", "content": [{"type": "tool_use", "id": "call_1"}]}
tool_message = {"role": "tool", "content": "tool output"}
assert visible_messages_for_anchor(
[text_only, input_only, reasoning_only, tool_use_only, tool_message],
auto_compression=False,
) == [text_only, reasoning_only, tool_use_only]
def test_visible_messages_for_anchor_preserves_auto_compression_text_part_filter():
text_only = {"role": "assistant", "content": [{"type": "text", "text": "Visible"}]}
input_only = {"role": "assistant", "content": [{"type": "input_text", "text": "Model input"}]}
output_only = {"role": "assistant", "content": [{"type": "output_text", "text": "Model output"}]}
reasoning_only = {"role": "assistant", "content": [{"type": "reasoning", "text": "hidden"}]}
tool_message = {"role": "tool", "content": "tool output"}
assert visible_messages_for_anchor(
[text_only, input_only, output_only, reasoning_only, tool_message],
auto_compression=True,
) == [text_only, input_only, output_only, reasoning_only]
def test_visible_messages_for_anchor_keeps_manual_user_messages_simple():
user_tool_metadata = {"role": "user", "content": [], "tool_calls": [{"id": "call_1"}]}
user_attachment = {"role": "user", "content": [], "attachments": [{"name": "screenshot.png"}]}
assistant_tool_metadata = {"role": "assistant", "content": [], "tool_calls": [{"id": "call_2"}]}
assert visible_messages_for_anchor(
[user_tool_metadata, user_attachment, assistant_tool_metadata],
auto_compression=False,
) == [user_attachment, assistant_tool_metadata]
assert visible_messages_for_anchor(
[user_tool_metadata, user_attachment, assistant_tool_metadata],
auto_compression=True,
) == [user_tool_metadata, user_attachment, assistant_tool_metadata]