From 8afefc86c9ca79d59faf7d696d01ec64c034ffbf Mon Sep 17 00:00:00 2001 From: ekko Date: Mon, 25 May 2026 09:55:16 +0800 Subject: [PATCH] fix mobile chat run reconnect --- packages/client/src/api/hermes/chat.ts | 102 +++++++++++--- packages/client/src/stores/hermes/chat.ts | 104 +++++++++++++- tests/client/chat-run-reconnect.test.ts | 160 ++++++++++++++++++++++ 3 files changed, 346 insertions(+), 20 deletions(-) create mode 100644 tests/client/chat-run-reconnect.test.ts diff --git a/packages/client/src/api/hermes/chat.ts b/packages/client/src/api/hermes/chat.ts index 773c9eab..dc7c9f78 100644 --- a/packages/client/src/api/hermes/chat.ts +++ b/packages/client/src/api/hermes/chat.ts @@ -72,6 +72,19 @@ export interface RunEvent { } } +export interface ResumeSessionPayload { + session_id: string + messages: any[] + isWorking: boolean + isAborting?: boolean + events: Array<{ event: string; data: RunEvent }> + inputTokens?: number + outputTokens?: number + contextTokens?: number + queueLength?: number + queueMessages?: RunEvent['queued_messages'] +} + // ============================ // Socket.IO chat run connection // ============================ @@ -80,6 +93,12 @@ let chatRunSocket: Socket | null = null let globalListenersRegistered = false let chatRunSocketProfile: string | null = null +const TRANSIENT_DISCONNECT_REASONS = new Set([ + 'transport close', + 'transport error', + 'ping timeout', +]) + /** * Session event handlers map * Maps session_id to event handling functions for isolating concurrent session streams @@ -597,7 +616,7 @@ function removeSocketListener(socket: Socket, event: string, handler: (...args: */ export function resumeSession( sessionId: string, - onResumed: (data: { session_id: string; messages: any[]; isWorking: boolean; isAborting?: boolean; events: any[]; inputTokens?: number; outputTokens?: number; contextTokens?: number; queueLength?: number; queueMessages?: RunEvent['queued_messages'] }) => void, + onResumed: (data: ResumeSessionPayload) => void, profile?: string | null, ): Socket { const socket = connectChatRun(profile) @@ -614,6 +633,9 @@ export function startRunViaSocket( onDone: () => void, onError: (err: Error) => void, onStarted?: (runId: string) => void, + options?: { + onReconnectResume?: (data: ResumeSessionPayload) => void + }, ): { abort: () => void } { const sid = body.session_id if (!sid) { @@ -622,24 +644,6 @@ export function startRunViaSocket( let closed = false const socket = connectChatRun(body.profile) - const handleSocketError = (err: Error) => { - if (closed) return - closed = true - sessionEventHandlers.delete(sid) - onError(err) - } - socket.once('connect_error', handleSocketError) - const handleSocketDisconnect = (reason: string) => { - if (closed || reason === 'io client disconnect') return - handleSocketError(new Error(`Socket disconnected: ${reason}`)) - } - socket.once('disconnect', handleSocketDisconnect) - - const removeTerminalSocketListeners = () => { - removeSocketListener(socket, 'connect_error', handleSocketError) - removeSocketListener(socket, 'disconnect', handleSocketDisconnect) - } - if (sessionEventHandlers.has(sid)) { socket.emit('run', body) return { @@ -651,6 +655,66 @@ export function startRunViaSocket( } } + let sawTransientDisconnect = false + let removeTerminalSocketListeners: () => void = () => {} + let reconnectResumeHandler: ((data: ResumeSessionPayload) => void) | null = null + + const clearReconnectResumeHandler = () => { + if (!reconnectResumeHandler) return + removeSocketListener(socket, 'resumed', reconnectResumeHandler) + reconnectResumeHandler = null + } + + const emitReconnectResume = () => { + clearReconnectResumeHandler() + if (options?.onReconnectResume) { + reconnectResumeHandler = (data: ResumeSessionPayload) => { + clearReconnectResumeHandler() + if (closed || data.session_id !== sid) return + options.onReconnectResume?.(data) + } + socket.on('resumed', reconnectResumeHandler) + } + socket.emit('resume', { session_id: sid, ...(body.profile ? { profile: body.profile } : {}) }) + } + + const handleSocketError = (err: Error) => { + if (closed) return + closed = true + removeTerminalSocketListeners() + sessionEventHandlers.delete(sid) + onError(err) + } + const handleSocketConnectError = (err: Error) => { + if (closed) return + if (sawTransientDisconnect) return + handleSocketError(err) + } + socket.on('connect_error', handleSocketConnectError) + const handleSocketDisconnect = (reason: string) => { + if (closed || reason === 'io client disconnect') return + if (TRANSIENT_DISCONNECT_REASONS.has(reason)) { + sawTransientDisconnect = true + return + } + handleSocketError(new Error(`Socket disconnected: ${reason}`)) + } + socket.on('disconnect', handleSocketDisconnect) + + const handleSocketReconnect = () => { + if (closed || !sawTransientDisconnect) return + sawTransientDisconnect = false + emitReconnectResume() + } + socket.on('connect', handleSocketReconnect) + + removeTerminalSocketListeners = () => { + clearReconnectResumeHandler() + removeSocketListener(socket, 'connect_error', handleSocketConnectError) + removeSocketListener(socket, 'disconnect', handleSocketDisconnect) + removeSocketListener(socket, 'connect', handleSocketReconnect) + } + // Define event handlers for this session const handlers = { onMessageDelta: (evt: RunEvent) => { diff --git a/packages/client/src/stores/hermes/chat.ts b/packages/client/src/stores/hermes/chat.ts index 093641bc..5b9b811a 100644 --- a/packages/client/src/stores/hermes/chat.ts +++ b/packages/client/src/stores/hermes/chat.ts @@ -1,4 +1,4 @@ -import { startRunViaSocket, resumeSession, registerSessionHandlers, unregisterSessionHandlers, getChatRunSocket, respondToolApproval, onPeerUserMessage, respondClarify, type RunEvent, type ContentBlock as ContentBlockImport } from '@/api/hermes/chat' +import { startRunViaSocket, resumeSession, registerSessionHandlers, unregisterSessionHandlers, getChatRunSocket, respondToolApproval, onPeerUserMessage, respondClarify, type RunEvent, type ResumeSessionPayload, type ContentBlock as ContentBlockImport } from '@/api/hermes/chat' import { deleteSession as deleteSessionApi, fetchSession, fetchSessions, setSessionModel, type HermesMessage, type SessionSummary } from '@/api/hermes/sessions' import { getActiveProfileName } from '@/api/client' import { getDownloadUrl } from '@/api/hermes/download' @@ -1277,6 +1277,107 @@ export const useChatStore = defineStore('chat', () => { activeAssistantMessageId = null } + const applyReconnectResume = (data: ResumeSessionPayload) => { + if (data.session_id !== sid) return + const target = sessions.value.find(s => s.id === sid) + if (!target) return + + if (data.isWorking) serverWorking.value.add(sid) + else serverWorking.value.delete(sid) + + if (data.queueLength && data.queueLength > 0) { + queueLengths.value.set(sid, data.queueLength) + } else { + queueLengths.value.delete(sid) + } + + if (Array.isArray(data.queueMessages)) { + replaceQueuedUserMessages(sid, normalizeQueuedUserMessages(data.queueMessages)) + } else if (!data.queueLength) { + replaceQueuedUserMessages(sid, []) + } + + if (data.isAborting) { + setAbortState({ aborting: true, synced: null }) + } else if (!data.isWorking) { + setAbortState(null) + } + + if (data.inputTokens != null) target.inputTokens = data.inputTokens + if (data.outputTokens != null) target.outputTokens = data.outputTokens + if (data.contextTokens != null) target.contextTokens = data.contextTokens + + if (Array.isArray(data.messages)) { + target.messages = mapHermesMessages(data.messages as any[]) + const lastAssistant = [...target.messages].reverse().find(m => m.role === 'assistant') + if (data.isWorking && lastAssistant) { + lastAssistant.isStreaming = true + activeAssistantMessageId = lastAssistant.id + if (lastAssistant.reasoning) noteReasoningStart(lastAssistant.id) + } else { + activeAssistantMessageId = null + } + } + + if (data.events?.length) { + for (const evt of data.events) { + const e = evt.data as RunEvent + switch (e.event) { + case 'compression.started': + setCompressionState({ + compressing: true, + messageCount: (e as any).message_count || 0, + beforeTokens: (e as any).token_count || 0, + afterTokens: 0, + compressed: null, + }) + break + case 'compression.completed': { + const afterTokens = (e as any).contextTokens || (e as any).afterTokens || 0 + setCompressionState({ + compressing: false, + messageCount: (e as any).totalMessages || 0, + beforeTokens: (e as any).beforeTokens || 0, + afterTokens, + compressed: (e as any).compressed ?? false, + error: (e as any).error, + }) + if ((e as any).contextTokens != null) target.contextTokens = (e as any).contextTokens + break + } + case 'abort.started': + setAbortState({ aborting: true, synced: null }) + break + case 'abort.completed': + setAbortState({ aborting: false, synced: (e as any).synced ?? false }) + break + case 'approval.requested': + setPendingApproval({ ...e, session_id: sid }) + break + case 'approval.resolved': + clearPendingApproval({ ...e, session_id: sid }) + break + case 'clarify.requested': + setPendingClarify({ ...e, session_id: sid }) + break + case 'clarify.resolved': + clearPendingClarify({ ...e, session_id: sid }) + break + case 'run.failed': + addAgentErrorMessage(sid, e.error) + break + } + } + } + + if (activeSessionId.value === sid) activeSession.value = target + if (!data.isWorking && !(data.queueLength && data.queueLength > 0)) { + cleanup() + activeAssistantMessageId = null + updateSessionTitle(sid) + } + } + // Send run via Socket.IO and listen to streamed events — all closures capture `sid` const ctrl = startRunViaSocket( runPayload, @@ -1694,6 +1795,7 @@ export const useChatStore = defineStore('chat', () => { cleanup() }, undefined, + { onReconnectResume: applyReconnectResume }, ) if (!isBridgeSlashCommand || isBridgeCompressCommand) { diff --git a/tests/client/chat-run-reconnect.test.ts b/tests/client/chat-run-reconnect.test.ts new file mode 100644 index 00000000..8125ede5 --- /dev/null +++ b/tests/client/chat-run-reconnect.test.ts @@ -0,0 +1,160 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const socketState = vi.hoisted(() => ({ + sockets: [] as any[], +})) + +vi.mock('socket.io-client', () => { + function createSocket() { + const listeners = new Map void>>() + + const addListener = (event: string, handler: (...args: any[]) => void) => { + if (!listeners.has(event)) listeners.set(event, new Set()) + listeners.get(event)!.add(handler) + } + + const removeListener = (event: string, handler: (...args: any[]) => void) => { + const eventListeners = listeners.get(event) + if (!eventListeners) return + for (const candidate of [...eventListeners]) { + if (candidate === handler || (candidate as any).__original === handler) { + eventListeners.delete(candidate) + } + } + } + + const socket: any = { + connected: true, + on: vi.fn((event: string, handler: (...args: any[]) => void) => { + addListener(event, handler) + return socket + }), + once: vi.fn((event: string, handler: (...args: any[]) => void) => { + const wrapped = (...args: any[]) => { + removeListener(event, wrapped) + handler(...args) + } + ;(wrapped as any).__original = handler + addListener(event, wrapped) + return socket + }), + off: vi.fn((event: string, handler: (...args: any[]) => void) => { + removeListener(event, handler) + return socket + }), + removeListener: vi.fn((event: string, handler: (...args: any[]) => void) => { + removeListener(event, handler) + return socket + }), + removeAllListeners: vi.fn(() => { + listeners.clear() + return socket + }), + emit: vi.fn(), + disconnect: vi.fn(() => { + socket.connected = false + }), + __listenerCount: (event: string) => listeners.get(event)?.size || 0, + __trigger: (event: string, ...args: any[]) => { + if (event === 'connect') socket.connected = true + if (event === 'disconnect') socket.connected = false + for (const handler of [...(listeners.get(event) || [])]) handler(...args) + }, + } + + return socket + } + + return { + io: vi.fn(() => { + const socket = createSocket() + socketState.sockets.push(socket) + return socket + }), + } +}) + +vi.mock('../../packages/client/src/api/client', () => ({ + getApiKey: () => 'test-token', + getBaseUrlValue: () => '', +})) + +describe('chat-run socket reconnect handling', () => { + beforeEach(() => { + vi.resetModules() + socketState.sockets = [] + }) + + it('keeps transient mobile disconnects alive and resumes after reconnect', async () => { + const { startRunViaSocket } = await import('../../packages/client/src/api/hermes/chat') + const onEvent = vi.fn() + const onDone = vi.fn() + const onError = vi.fn() + const onReconnectResume = vi.fn() + + startRunViaSocket( + { session_id: 'session-1', input: 'hello', profile: 'default', source: 'cli' }, + onEvent, + onDone, + onError, + undefined, + { onReconnectResume }, + ) + + const socket = socketState.sockets[0] + expect(socket.emit).toHaveBeenCalledWith('run', expect.objectContaining({ session_id: 'session-1' })) + + socket.__trigger('disconnect', 'ping timeout') + expect(onError).not.toHaveBeenCalled() + + socket.__trigger('connect_error', new Error('temporary reconnect failure')) + expect(onError).not.toHaveBeenCalled() + + socket.__trigger('connect') + expect(socket.emit).toHaveBeenCalledWith('resume', { session_id: 'session-1', profile: 'default' }) + + const resumed = { session_id: 'session-1', messages: [], isWorking: true, events: [] } + socket.__trigger('resumed', resumed) + expect(onReconnectResume).toHaveBeenCalledWith(resumed) + + socket.__trigger('message.delta', { event: 'message.delta', session_id: 'session-1', delta: 'after reconnect' }) + expect(onEvent).toHaveBeenCalledWith({ event: 'message.delta', session_id: 'session-1', delta: 'after reconnect' }) + expect(onDone).not.toHaveBeenCalled() + }) + + it('keeps fatal disconnects fatal and removes per-run listeners', async () => { + const { startRunViaSocket } = await import('../../packages/client/src/api/hermes/chat') + const onError = vi.fn() + + startRunViaSocket( + { session_id: 'session-1', input: 'hello', profile: 'default', source: 'cli' }, + vi.fn(), + vi.fn(), + onError, + ) + + const socket = socketState.sockets[0] + socket.__trigger('disconnect', 'io server disconnect') + + expect(onError).toHaveBeenCalledOnce() + expect(onError.mock.calls[0][0].message).toBe('Socket disconnected: io server disconnect') + expect(socket.__listenerCount('connect')).toBe(0) + expect(socket.__listenerCount('disconnect')).toBe(0) + expect(socket.__listenerCount('connect_error')).toBe(0) + }) + + it('does not attach extra reconnect listeners when the session already has handlers', async () => { + const { startRunViaSocket } = await import('../../packages/client/src/api/hermes/chat') + const body = { session_id: 'session-1', input: 'hello', profile: 'default', source: 'cli' as const } + + startRunViaSocket(body, vi.fn(), vi.fn(), vi.fn()) + const socket = socketState.sockets[0] + expect(socket.__listenerCount('connect')).toBe(1) + expect(socket.__listenerCount('disconnect')).toBe(1) + + startRunViaSocket(body, vi.fn(), vi.fn(), vi.fn()) + expect(socket.__listenerCount('connect')).toBe(1) + expect(socket.__listenerCount('disconnect')).toBe(1) + expect(socket.emit).toHaveBeenCalledWith('run', body) + }) +})