diff --git a/api/streaming.py b/api/streaming.py index e93827c8..acdf5004 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -2102,6 +2102,17 @@ def _run_agent_streaming( meter().record_reasoning(stream_id, _metering_reasoning_deltas[0]) _emit_metering() + def on_interim_assistant(text, **cb_kwargs): + if text is None: + return + visible = str(text).strip() + if not visible: + return + put('interim_assistant', { + 'text': visible, + 'already_streamed': bool(cb_kwargs.get('already_streamed', False)), + }) + # Pre-initialise the activity counter here so on_tool (which # closes over it) never captures an unbound name even if this # block is reordered later (Issue #765). @@ -2353,6 +2364,8 @@ def _run_agent_streaming( # but guard defensively to avoid TypeError on an older agent build. if 'reasoning_config' in _agent_params and _reasoning_config is not None: _agent_kwargs['reasoning_config'] = _reasoning_config + if 'interim_assistant_callback' in _agent_params: + _agent_kwargs['interim_assistant_callback'] = on_interim_assistant if 'status_callback' in _agent_params: _agent_kwargs['status_callback'] = _agent_status_callback if 'max_tokens' in _agent_params and _max_tokens_cfg is not None: @@ -2410,6 +2423,8 @@ def _run_agent_streaming( agent.tool_progress_callback = _agent_kwargs.get('tool_progress_callback') if hasattr(agent, 'status_callback'): agent.status_callback = _agent_kwargs.get('status_callback') + if hasattr(agent, 'interim_assistant_callback'): + agent.interim_assistant_callback = _agent_kwargs.get('interim_assistant_callback') if hasattr(agent, 'reasoning_callback'): agent.reasoning_callback = _agent_kwargs.get('reasoning_callback') if hasattr(agent, 'clarify_callback'): diff --git a/static/messages.js b/static/messages.js index 9f6f5683..9dde9bf1 100644 --- a/static/messages.js +++ b/static/messages.js @@ -645,6 +645,14 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){ if(!_SMD_SAFE_URL_RE.test(v)){n.removeAttribute('src');n.setAttribute('data-blocked-scheme','1');} } } + function _resetAssistantSegment(){ + assistantRow=null; + assistantBody=null; + segmentStart=assistantText.length; + _freshSegment=true; + _smdEndParser(); + } + let _lastRenderMs=0; function _scheduleRender(){ if(_renderPending) return; @@ -725,6 +733,26 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){ _scheduleRender(); }); + source.addEventListener('interim_assistant',e=>{ + if(!S.session||S.session.session_id!==activeSid) return; + const d=JSON.parse(e.data); + const visible=String(d&&d.text?d.text:'').trim(); + const alreadyStreamed=!!(d&&d.already_streamed); + if(!visible){ + return; + } + if(alreadyStreamed){ + _resetAssistantSegment(); + return; + } + assistantText+=visible; + syncInflightAssistantMessage(); + if(!S.session||S.session.session_id!==activeSid) return; + const parsed=_parseStreamState(); + if(String((parsed&&parsed.displayText)||'').trim()||assistantRow) ensureAssistantRow(); + _scheduleRender(); + }); + source.addEventListener('reasoning',e=>{ const d=JSON.parse(e.data); reasoningText += d.text || ''; @@ -768,11 +796,7 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){ // Reset the live assistant row reference so that any text tokens arriving // after this tool call create a NEW segment appended below the tool card, // rather than updating the old segment that sits above it in the DOM. - assistantRow=null; - assistantBody=null; - segmentStart=assistantText.length; // new segment starts at current text length - _freshSegment=true; // prevent reuse of old DOM node - _smdEndParser(); // finalize current smd parser; new one created on next token + _resetAssistantSegment(); scrollIfPinned(); }); diff --git a/tests/test_regressions.py b/tests/test_regressions.py index cbd50faf..0022bcb9 100644 --- a/tests/test_regressions.py +++ b/tests/test_regressions.py @@ -693,6 +693,23 @@ def test_messages_js_supports_live_reasoning_and_tool_completion(cleanup_test_se "messages.js must parse live stream state into reasoning + visible answer" +def test_messages_js_supports_interim_assistant_events(cleanup_test_sessions): + """R18b: messages.js must render live interim assistant commentary when + `interim_assistant` SSE events arrive. + + AIAgent emits completed mid-turn commentary through an interim callback. + Without a dedicated SSE handler, Codex-style interim status text disappears + from the live answer and users only see the final response after tool calls. + """ + src = (REPO_ROOT / "static/messages.js").read_text() + assert "source.addEventListener('interim_assistant'" in src or 'source.addEventListener("interim_assistant"' in src, \ + "messages.js must listen for interim_assistant SSE events" + assert "function _resetAssistantSegment()" in src, \ + "messages.js should share live-segment reset logic between interim assistant updates and tool events" + assert "_resetAssistantSegment();" in src, \ + "messages.js should apply segment reset when tool or interim assistant events require it" + + def test_ui_js_can_upgrade_thinking_spinner_into_live_reasoning_card(cleanup_test_sessions): """R19: ui.js must be able to replace the placeholder thinking spinner with streamed reasoning text while a turn is in progress. diff --git a/tests/test_sprint42.py b/tests/test_sprint42.py index ea32361b..49eaacd4 100644 --- a/tests/test_sprint42.py +++ b/tests/test_sprint42.py @@ -251,6 +251,157 @@ class TestRuntimeRouteInjection(unittest.TestCase): self.assertEqual(init_kwargs["api_key"], "rt-key") self.assertIs(init_kwargs["session_db"], fake_session_db) + def test_runtime_provider_forwards_interim_assistant_callback(self): + """WebUI must pass interim_assistant_callback to AIAgent and emit SSE events.""" + import api.streaming as streaming + + captured = {} + + class CapturingAgent: + def __init__( + self, + model=None, + provider=None, + base_url=None, + api_key=None, + platform=None, + quiet_mode=False, + enabled_toolsets=None, + fallback_model=None, + session_id=None, + session_db=None, + stream_delta_callback=None, + reasoning_callback=None, + tool_progress_callback=None, + interim_assistant_callback=None, + clarify_callback=None, + **kwargs, + ): + captured["init_kwargs"] = dict( + model=model, provider=provider, base_url=base_url, api_key=api_key, + platform=platform, quiet_mode=quiet_mode, + enabled_toolsets=enabled_toolsets, fallback_model=fallback_model, + session_id=session_id, session_db=session_db, + stream_delta_callback=stream_delta_callback, + reasoning_callback=reasoning_callback, + tool_progress_callback=tool_progress_callback, + interim_assistant_callback=interim_assistant_callback, + clarify_callback=clarify_callback, + ) + self.session_id = session_id + self.context_compressor = None + self.session_prompt_tokens = 0 + self.session_completion_tokens = 0 + self.session_estimated_cost_usd = None + self.reasoning_config = None + self.ephemeral_system_prompt = None + self._last_error = None + self.interim_assistant_callback = interim_assistant_callback + + def run_conversation(self, **kwargs): + if self.interim_assistant_callback: + self.interim_assistant_callback("Inspecting repo structure.", already_streamed=False) + return { + "messages": [ + {"role": "user", "content": kwargs.get("persist_user_message", "")}, + {"role": "assistant", "content": "ok"}, + ] + } + + def interrupt(self, _message): + captured["interrupted"] = True + + class FakeSession: + session_id = "sess-interim-test" + title = "Test" + workspace = "/tmp" + model = "gpt-4o" + messages = [] + personality = None + input_tokens = 0 + output_tokens = 0 + estimated_cost = None + tool_calls = [] + active_stream_id = None + pending_user_message = None + pending_attachments = [] + pending_started_at = None + + def save(self, touch_updated_at=True, skip_index=True): + pass + + def compact(self): + return { + "session_id": self.session_id, "title": self.title, + "workspace": self.workspace, "model": self.model, + "created_at": 0, "updated_at": 0, "pinned": False, + "archived": False, "project_id": None, "profile": None, + "input_tokens": 0, "output_tokens": 0, + "estimated_cost": None, "personality": None, + } + + @property + def path(self): + return "/tmp/fake.json" + + fake_stream_id = "stream-interim-callback" + fake_queue = queue.Queue() + fake_rt_module = types.ModuleType("hermes_cli.runtime_provider") + fake_rt_module.resolve_runtime_provider = mock.Mock(return_value={ + "provider": "openai-codex", + "base_url": "https://api.openai.com/v1", + "api_key": "rt-key", + "api_mode": "codex_responses", + "command": "codex", + "args": ["exec", "--json"], + "credential_pool": object(), + }) + fake_hermes_cli = types.ModuleType("hermes_cli") + fake_hermes_cli.runtime_provider = fake_rt_module + fake_hermes_state = types.ModuleType("hermes_state") + fake_hermes_state.SessionDB = mock.Mock(return_value=object()) + + with mock.patch.object(streaming, "get_session", return_value=FakeSession()), \ + mock.patch.object(streaming, "_get_ai_agent", return_value=CapturingAgent), \ + mock.patch.object(streaming, "resolve_model_provider", return_value=("gpt-4o", "openai-codex", None)), \ + mock.patch("api.config.get_config", return_value={}), \ + mock.patch("api.config._resolve_cli_toolsets", return_value=[]), \ + mock.patch.dict(sys.modules, { + "hermes_cli": fake_hermes_cli, + "hermes_cli.runtime_provider": fake_rt_module, + "hermes_state": fake_hermes_state, + }): + streaming.STREAMS[fake_stream_id] = fake_queue + streaming._run_agent_streaming( + session_id="sess-interim-test", + msg_text="hello", + model="gpt-4o", + workspace="/tmp", + stream_id=fake_stream_id, + ) + + init_kwargs = captured["init_kwargs"] + self.assertIsNotNone(init_kwargs["interim_assistant_callback"]) + self.assertTrue(callable(init_kwargs["interim_assistant_callback"])) + + interim_events = [] + while not fake_queue.empty(): + try: + interim_events.append(fake_queue.get_nowait()) + except queue.Empty: + break + self.assertTrue( + any(event == "interim_assistant" for event, _ in interim_events), + "interim_assistant callback should emit interim_assistant SSE events", + ) + self.assertTrue( + any( + event == "interim_assistant" and event_data.get("text") == "Inspecting repo structure." + for event, event_data in interim_events + ), + "interim_assistant event should carry the assistant commentary text" + ) + class TestSessionDBAST(unittest.TestCase): """AST-level checks: verify the try/except is not inside _ENV_LOCK (deadlock guard)."""