diff --git a/api/routes.py b/api/routes.py index 6ee22d3b..2aaac2bf 100644 --- a/api/routes.py +++ b/api/routes.py @@ -5,6 +5,7 @@ Extracted from server.py (Sprint 11) so server.py is a thin shell. import html as _html import copy +import io import json import logging import os @@ -49,6 +50,9 @@ _CLIENT_DISCONNECT_ERRORS = ( # Track job IDs currently being executed so the frontend can poll status. _RUNNING_CRON_JOBS: dict[str, float] = {} # job_id → start_timestamp _RUNNING_CRON_LOCK = threading.Lock() +_MANUAL_COMPRESSION_JOBS: dict[str, dict] = {} +_MANUAL_COMPRESSION_JOBS_LOCK = threading.Lock() +_MANUAL_COMPRESSION_JOB_TTL_SECONDS = 10 * 60 _CRON_OUTPUT_CONTENT_LIMIT = 8000 _CRON_OUTPUT_HEADER_CONTEXT = 200 _MESSAGING_RAW_SOURCES = {str(s).strip().lower() for s in MESSAGING_SOURCES} @@ -3072,6 +3076,10 @@ def handle_get(handler, parsed) -> bool: logger.exception("failed to read worktree status for session %s", sid) return bad(handler, _sanitize_error(exc), status=500) + if parsed.path == "/api/session/compress/status": + query = parse_qs(parsed.query) + return _handle_session_compress_status(handler, query.get("session_id", [""])[0]) + if parsed.path == "/api/session": import time as _time _t0 = _time.monotonic() @@ -4421,6 +4429,9 @@ def handle_post(handler, parsed) -> bool: "parent_session_id": source.session_id, }) + if parsed.path == "/api/session/compress/start": + return _handle_session_compress_start(handler, body) + if parsed.path == "/api/session/compress": return _handle_session_compress(handler, body) @@ -7745,6 +7756,174 @@ def _handle_clarify_respond(handler, body): return j(handler, {"ok": True, "response": response}) +class _ManualCompressionMemoryHandler: + def __init__(self): + self.wfile = io.BytesIO() + self.status = None + self.sent_headers = {} + + def send_response(self, status): + self.status = status + + def send_header(self, key, value): + self.sent_headers[key] = value + + def end_headers(self): + pass + + def payload(self): + raw = self.wfile.getvalue().decode("utf-8") + return json.loads(raw) if raw else {} + + +def _manual_compression_cleanup_locked(now=None): + now = time.time() if now is None else now + for sid, job in list(_MANUAL_COMPRESSION_JOBS.items()): + if job.get("status") == "running": + continue + updated_at = float(job.get("updated_at") or job.get("started_at") or now) + if now - updated_at > _MANUAL_COMPRESSION_JOB_TTL_SECONDS: + _MANUAL_COMPRESSION_JOBS.pop(sid, None) + + +def _manual_compression_status_payload(job): + status = job.get("status") or "running" + payload = { + "ok": status not in {"error", "cancelled"}, + "status": status, + "session_id": job.get("session_id"), + "focus_topic": job.get("focus_topic"), + "started_at": job.get("started_at"), + "updated_at": job.get("updated_at"), + } + if status == "done": + result = job.get("result") + if isinstance(result, dict): + payload.update(result) + payload["status"] = "done" + payload["ok"] = True + elif status == "error": + payload["ok"] = False + payload["error"] = job.get("error") or "Compression failed" + payload["error_status"] = int(job.get("error_status") or 400) + elif status == "cancelled": + payload["ok"] = False + payload["error"] = job.get("error") or "Compression cancelled" + payload["error_status"] = int(job.get("error_status") or 409) + return payload + + +def _run_manual_compression_job(sid, body): + memory_handler = _ManualCompressionMemoryHandler() + try: + _handle_session_compress(memory_handler, body) + status = int(memory_handler.status or 500) + payload = memory_handler.payload() + with _MANUAL_COMPRESSION_JOBS_LOCK: + job = _MANUAL_COMPRESSION_JOBS.get(sid) + if not job: + return + now = time.time() + if status >= 400 or not isinstance(payload, dict) or payload.get("error"): + job.update( + { + "status": "error", + "error": str((payload or {}).get("error") or "Compression failed"), + "error_status": status, + "updated_at": now, + } + ) + else: + job.update( + { + "status": "done", + "result": payload, + "updated_at": now, + } + ) + except Exception as exc: + logger.warning("Manual compression worker failed for session %s: %s", sid, exc) + with _MANUAL_COMPRESSION_JOBS_LOCK: + job = _MANUAL_COMPRESSION_JOBS.get(sid) + if job: + job.update( + { + "status": "error", + "error": f"Compression failed: {_sanitize_error(exc)}", + "error_status": 500, + "updated_at": time.time(), + } + ) + + +def _handle_session_compress_start(handler, body): + try: + require(body, "session_id") + except ValueError as e: + return bad(handler, str(e)) + + sid = str(body.get("session_id") or "").strip() + if not sid: + return bad(handler, "session_id is required") + try: + s = get_session(sid) + except KeyError: + return bad(handler, "Session not found", 404) + if getattr(s, "active_stream_id", None): + return bad(handler, "Session is still streaming; wait for the current turn to finish.", 409) + + focus_topic = str(body.get("focus_topic") or body.get("topic") or "").strip()[:500] or None + job_body = {"session_id": sid} + if focus_topic: + job_body["focus_topic"] = focus_topic + + now = time.time() + with _MANUAL_COMPRESSION_JOBS_LOCK: + _manual_compression_cleanup_locked(now) + existing = _MANUAL_COMPRESSION_JOBS.get(sid) + if existing: + existing_payload = _manual_compression_status_payload(existing) + if existing_payload.get("status") == "running": + return j(handler, existing_payload) + _MANUAL_COMPRESSION_JOBS.pop(sid, None) + if existing_payload.get("status") == "done": + return j(handler, existing_payload) + job = { + "session_id": sid, + "focus_topic": focus_topic, + "status": "running", + "started_at": now, + "updated_at": now, + } + _MANUAL_COMPRESSION_JOBS[sid] = job + + worker = threading.Thread( + target=_run_manual_compression_job, + args=(sid, job_body), + name=f"manual-compress-{sid[:8]}", + daemon=True, + ) + worker.start() + + with _MANUAL_COMPRESSION_JOBS_LOCK: + return j(handler, _manual_compression_status_payload(_MANUAL_COMPRESSION_JOBS.get(sid, job))) + + +def _handle_session_compress_status(handler, sid): + sid = str(sid or "").strip() + if not sid: + return bad(handler, "session_id is required") + with _MANUAL_COMPRESSION_JOBS_LOCK: + _manual_compression_cleanup_locked() + job = _MANUAL_COMPRESSION_JOBS.get(sid) + if not job: + return j(handler, {"ok": True, "status": "idle", "session_id": sid}) + payload = _manual_compression_status_payload(job) + if payload.get("status") == "done": + _MANUAL_COMPRESSION_JOBS.pop(sid, None) + return j(handler, payload) + + def _handle_session_compress(handler, body): def _anchor_message_key(m): if not isinstance(m, dict): @@ -7943,6 +8122,12 @@ def _handle_session_compress(handler, body): # Lock contract: hold for the in-memory mutation only, never across # network I/O. original_messages = list(messages) + original_stream_state = ( + getattr(s, "active_stream_id", None), + getattr(s, "pending_user_message", None), + copy.deepcopy(getattr(s, "pending_attachments", None)), + getattr(s, "pending_started_at", None), + ) approx_tokens = _estimate_messages_tokens_rough(original_messages) agent = _run_agent.AIAgent( @@ -7974,6 +8159,14 @@ def _handle_session_compress(handler, body): with _cfg._get_session_agent_lock(sid): # Re-read messages to detect concurrent edits during the LLM call. # If the history changed, the compression result is stale — abort. + current_stream_state = ( + getattr(s, "active_stream_id", None), + getattr(s, "pending_user_message", None), + copy.deepcopy(getattr(s, "pending_attachments", None)), + getattr(s, "pending_started_at", None), + ) + if current_stream_state != original_stream_state: + return bad(handler, "Session stream state changed during compression; please retry.", 409) if _sanitize_messages_for_api(s.messages) != original_messages: return bad(handler, "Session was modified during compression; please retry.", 409) diff --git a/static/commands.js b/static/commands.js index bd2f146f..c09cbf54 100644 --- a/static/commands.js +++ b/static/commands.js @@ -382,6 +382,131 @@ async function cmdNew(){ showToast(t('new_session')); } +function _manualCompressionVisibleMessages(){ + return (S.messages||[]).filter(m=>{ + if(!m||!m.role||m.role==='tool') return false; + if(m.role==='assistant'){ + const hasTc=Array.isArray(m.tool_calls)&&m.tool_calls.length>0; + const hasTu=Array.isArray(m.content)&&m.content.some(p=>p&&p.type==='tool_use'); + if(hasTc||hasTu|| (typeof _messageHasReasoningPayload==='function' && _messageHasReasoningPayload(m))) return true; + } + return typeof msgContent==='function' ? !!msgContent(m) || !!m.attachments?.length : !!m.content || !!m.attachments?.length; + }); +} + +function _manualCompressionSleep(ms){ + return new Promise(resolve=>setTimeout(resolve, ms)); +} + +async function _pollManualCompressionResult(sid){ + let delay=700; + while(true){ + const data=await api(`/api/session/compress/status?session_id=${encodeURIComponent(sid)}`); + if(data&&data.status==='done') return data; + if(data&&data.status==='error'){ + const err=new Error(data.error||'Compression failed'); + err.status=data.error_status||400; + throw err; + } + if(data&&data.status==='idle') throw new Error('Compression job is no longer available'); + await _manualCompressionSleep(delay); + delay=Math.min(2000, delay+300); + } +} + +async function _applyManualCompressionResult(data, focusTopic, visibleCount, commandText){ + if(data&&data.session){ + const currentSid=S.session&&S.session.session_id; + if(data.session.session_id&&data.session.session_id!==currentSid){ + await loadSession(data.session.session_id); + }else{ + S.session=data.session; + S.messages=data.session.messages||[]; + S.toolCalls=data.session.tool_calls||[]; + clearLiveToolCards(); + localStorage.setItem('hermes-webui-session',S.session.session_id); + if(typeof _setActiveSessionUrl==='function') _setActiveSessionUrl(S.session.session_id); + syncTopbar(); + renderMessages(); + await renderSessionList(); + updateQueueBadge(S.session.session_id); + } + } + const summary=data&&data.summary; + if(typeof setCompressionUi==='function'&&S.session){ + const referenceMsg=(S.messages||[]).find(m=>typeof _isContextCompactionMessage==='function'&&_isContextCompactionMessage(m)); + const messageRef=referenceMsg?msgContent(referenceMsg)||String(referenceMsg.content||''):''; + const summaryRef=summary&&typeof summary.reference_message==='string' ? String(summary.reference_message||'').trim() : ''; + // Prefer the persisted compaction handoff when it already exists in session state. + // The short summary fallback is only for environments where that message is unavailable. + const referenceText=messageRef || summaryRef; + const effectiveFocus=(data&&data.focus_topic)||focusTopic||''; + setCompressionUi({ + sessionId:S.session.session_id, + phase:'done', + focusTopic:effectiveFocus, + commandText:effectiveFocus?`/compress ${effectiveFocus}`:(commandText||'/compress'), + beforeCount:visibleCount, + summary:summary||null, + referenceText, + anchorVisibleIdx: data?.session?.compression_anchor_visible_idx, + anchorMessageKey: data?.session?.compression_anchor_message_key||null, + }); + } + if(typeof setComposerStatus==='function') setComposerStatus(''); + renderMessages(); + if(typeof _setCompressionSessionLock==='function') _setCompressionSessionLock(null); +} + +async function resumeManualCompressionForSession(sid){ + if(!sid) return; + try{ + const status=await api(`/api/session/compress/status?session_id=${encodeURIComponent(sid)}`); + if(!status||status.status!=='running') return; + const visibleMessages=_manualCompressionVisibleMessages(); + const visibleCount=visibleMessages.length; + const anchorMessageKey=_compressionAnchorMessageKey(visibleMessages[visibleMessages.length-1]||null); + if(typeof setBusy==='function') setBusy(true); + if(typeof setComposerStatus==='function') setComposerStatus(t('compressing')); + if(typeof setCompressionUi==='function'){ + setCompressionUi({ + sessionId:sid, + phase:'running', + focusTopic:status.focus_topic||'', + commandText:status.focus_topic?`/compress ${status.focus_topic}`:'/compress', + beforeCount:visibleCount, + anchorVisibleIdx:Math.max(0, visibleCount-1), + anchorMessageKey, + }); + } + renderMessages(); + const done=await _pollManualCompressionResult(sid); + if(!S.session||S.session.session_id!==sid) return; + await _applyManualCompressionResult(done, status.focus_topic||'', visibleCount, status.focus_topic?`/compress ${status.focus_topic}`:'/compress'); + }catch(e){ + if(S.session&&S.session.session_id===sid&&typeof setCompressionUi==='function'){ + const visibleMessages=_manualCompressionVisibleMessages(); + setCompressionUi({ + sessionId:sid, + phase:'error', + focusTopic:'', + commandText:'/compress', + beforeCount:visibleMessages.length, + errorText:`Compression failed: ${e.message}`, + anchorVisibleIdx:Math.max(0, visibleMessages.length-1), + anchorMessageKey:null, + }); + renderMessages(); + } + }finally{ + if(S.session&&S.session.session_id===sid){ + if(typeof _setCompressionSessionLock==='function') _setCompressionSessionLock(null); + if(typeof setBusy==='function') setBusy(false); + if(typeof setComposerStatus==='function') setComposerStatus(''); + } + } +} + async function _runManualCompression(focusTopic){ if(!S.session){showToast(t('no_active_session'));return;} let visibleCount=0; @@ -410,15 +535,7 @@ async function _runManualCompression(focusTopic){ if(typeof setBusy==='function') setBusy(true); const body={session_id:sid}; if(focusTopic) body.focus_topic=focusTopic; - const visibleMessages=(S.messages||[]).filter(m=>{ - if(!m||!m.role||m.role==='tool') return false; - if(m.role==='assistant'){ - const hasTc=Array.isArray(m.tool_calls)&&m.tool_calls.length>0; - const hasTu=Array.isArray(m.content)&&m.content.some(p=>p&&p.type==='tool_use'); - if(hasTc||hasTu|| (typeof _messageHasReasoningPayload==='function' && _messageHasReasoningPayload(m))) return true; - } - return typeof msgContent==='function' ? !!msgContent(m) || !!m.attachments?.length : !!m.content || !!m.attachments?.length; - }); + const visibleMessages=_manualCompressionVisibleMessages(); visibleCount=visibleMessages.length; const anchorVisibleIdx=Math.max(0, visibleCount - 1); const anchorMessageKey=_compressionAnchorMessageKey(visibleMessages[visibleMessages.length-1]||null); @@ -436,48 +553,14 @@ async function _runManualCompression(focusTopic){ } if(typeof setComposerStatus==='function') setComposerStatus(t('compressing')); renderMessages(); - const data=await api('/api/session/compress',{method:'POST',body:JSON.stringify(body)}); - if(data&&data.session){ - const currentSid=S.session&&S.session.session_id; - if(data.session.session_id&&data.session.session_id!==currentSid){ - await loadSession(data.session.session_id); - }else{ - S.session=data.session; - S.messages=data.session.messages||[]; - S.toolCalls=data.session.tool_calls||[]; - clearLiveToolCards(); - localStorage.setItem('hermes-webui-session',S.session.session_id); - if(typeof _setActiveSessionUrl==='function') _setActiveSessionUrl(S.session.session_id); - syncTopbar(); - renderMessages(); - await renderSessionList(); - updateQueueBadge(S.session.session_id); - } + const started=await api('/api/session/compress/start',{method:'POST',body:JSON.stringify(body)}); + if(started&&started.status==='error'){ + const err=new Error(started.error||'Compression failed'); + err.status=started.error_status||400; + throw err; } - const summary=data&&data.summary; - if(typeof setCompressionUi==='function'&&S.session){ - const referenceMsg=(S.messages||[]).find(m=>typeof _isContextCompactionMessage==='function'&&_isContextCompactionMessage(m)); - const messageRef=referenceMsg?msgContent(referenceMsg)||String(referenceMsg.content||''):''; - const summaryRef=summary&&typeof summary.reference_message==='string' ? String(summary.reference_message||'').trim() : ''; - // Prefer the persisted compaction handoff when it already exists in session state. - // The short summary fallback is only for environments where that message is unavailable. - const referenceText=messageRef || summaryRef; - const effectiveFocus=(data&&data.focus_topic)||focusTopic||''; - setCompressionUi({ - sessionId:S.session.session_id, - phase:'done', - focusTopic:effectiveFocus, - commandText:effectiveFocus?`/compress ${effectiveFocus}`:'/compress', - beforeCount:visibleCount, - summary:summary||null, - referenceText, - anchorVisibleIdx: data?.session?.compression_anchor_visible_idx, - anchorMessageKey: data?.session?.compression_anchor_message_key||null, - }); - } - if(typeof setComposerStatus==='function') setComposerStatus(''); - renderMessages(); - if(typeof _setCompressionSessionLock==='function') _setCompressionSessionLock(null); + const data=(started&&started.status==='done')?started:await _pollManualCompressionResult(sid); + await _applyManualCompressionResult(data, focusTopic, visibleCount, commandText); }catch(e){ if(typeof setCompressionUi==='function'){ const currentSid=S.session&&S.session.session_id; diff --git a/static/sessions.js b/static/sessions.js index f9451a91..17ee262c 100644 --- a/static/sessions.js +++ b/static/sessions.js @@ -653,6 +653,7 @@ async function loadSession(sid){ setComposerStatus(''); updateQueueBadge(sid); syncTopbar();renderMessages(); + if(typeof resumeManualCompressionForSession==='function') resumeManualCompressionForSession(sid); // Kick off loadDir first (issues network requests), then highlight code. // The fetch is dispatched before the CPU-bound Prism pass begins. const _dirP=loadDir('.'); diff --git a/tests/test_sprint46.py b/tests/test_sprint46.py index 35145c95..e502ea9b 100644 --- a/tests/test_sprint46.py +++ b/tests/test_sprint46.py @@ -6,6 +6,8 @@ import contextlib import io import json import sys +import threading +import time import types from api.models import Session @@ -59,39 +61,9 @@ class _FakeAgent: _FakeAgent.last_instance = self -def _make_session(messages=None): - SESSION_DIR.mkdir(parents=True, exist_ok=True) - messages = messages or [ - {"role": "user", "content": "one"}, - {"role": "assistant", "content": "two"}, - {"role": "user", "content": "three"}, - {"role": "assistant", "content": "four"}, - ] - s = Session( - session_id="compress_test_001", - title="Untitled", - workspace="/tmp/hermes-webui-test", - model="openai/gpt-5.4-mini", - messages=messages, - ) - s.save(touch_updated_at=False) - return s.session_id - - -def test_session_compress_requires_session_id(cleanup_test_sessions): - handler = _FakeHandler() - _handle_session_compress(handler, {}) - assert handler.status == 400 - assert handler.payload()["error"] == "Missing required field(s): session_id" - - -def test_session_compress_roundtrip(monkeypatch, cleanup_test_sessions): - created = cleanup_test_sessions - sid = _make_session() - created.append(sid) - +def _install_fake_compression_runtime(monkeypatch, agent_cls): fake_run_agent = types.ModuleType("run_agent") - fake_run_agent.AIAgent = _FakeAgent + fake_run_agent.AIAgent = agent_cls monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) import api.config as _cfg @@ -128,6 +100,40 @@ def test_session_compress_roundtrip(monkeypatch, cleanup_test_sessions): }, ) + +def _make_session(messages=None): + SESSION_DIR.mkdir(parents=True, exist_ok=True) + messages = messages or [ + {"role": "user", "content": "one"}, + {"role": "assistant", "content": "two"}, + {"role": "user", "content": "three"}, + {"role": "assistant", "content": "four"}, + ] + s = Session( + session_id=f"compress_test_{time.time_ns()}", + title="Untitled", + workspace="/tmp/hermes-webui-test", + model="openai/gpt-5.4-mini", + messages=messages, + ) + s.save(touch_updated_at=False) + return s.session_id + + +def test_session_compress_requires_session_id(cleanup_test_sessions): + handler = _FakeHandler() + _handle_session_compress(handler, {}) + assert handler.status == 400 + assert handler.payload()["error"] == "Missing required field(s): session_id" + + +def test_session_compress_roundtrip(monkeypatch, cleanup_test_sessions): + created = cleanup_test_sessions + sid = _make_session() + created.append(sid) + + _install_fake_compression_runtime(monkeypatch, _FakeAgent) + handler = _FakeHandler() _handle_session_compress(handler, {"session_id": sid, "focus_topic": "database schema"}) @@ -153,6 +159,253 @@ def test_session_compress_roundtrip(monkeypatch, cleanup_test_sessions): assert _FakeAgent.last_instance.context_compressor.calls[0]["focus_topic"] == "database schema" +def test_session_compress_start_is_async_and_reuses_running_job(monkeypatch, cleanup_test_sessions): + import api.routes as routes + + assert hasattr(routes, "_handle_session_compress_start") + assert hasattr(routes, "_handle_session_compress_status") + + class BlockingCompressor: + entered = threading.Event() + release = threading.Event() + calls = [] + + def compress(self, messages, current_tokens=None, focus_topic=None): + self.calls.append({"messages": list(messages), "focus_topic": focus_topic}) + self.entered.set() + assert self.release.wait(timeout=5), "test timed out waiting to release compression" + return [messages[0], messages[-1]] + + class BlockingAgent: + instances = [] + + def __init__(self, **kwargs): + self.context_compressor = BlockingCompressor() + self.instances.append(self) + + created = cleanup_test_sessions + sid = _make_session() + created.append(sid) + _install_fake_compression_runtime(monkeypatch, BlockingAgent) + try: + first = _FakeHandler() + routes._handle_session_compress_start(first, {"session_id": sid, "focus_topic": "slow"}) + assert first.status == 200 + first_payload = first.payload() + assert first_payload["ok"] is True + assert first_payload["status"] == "running" + assert first_payload["session_id"] == sid + assert first_payload["focus_topic"] == "slow" + assert BlockingCompressor.entered.wait(timeout=2) + + second = _FakeHandler() + routes._handle_session_compress_start(second, {"session_id": sid, "focus_topic": "slow"}) + assert second.status == 200 + second_payload = second.payload() + assert second_payload["status"] == "running" + assert len(BlockingAgent.instances) == 1 + + running = _FakeHandler() + routes._handle_session_compress_status(running, sid) + assert running.status == 200 + assert running.payload()["status"] == "running" + finally: + BlockingCompressor.release.set() + + deadline = time.time() + 5 + done_payload = None + while time.time() < deadline: + done = _FakeHandler() + routes._handle_session_compress_status(done, sid) + payload = done.payload() + if payload["status"] == "done": + done_payload = payload + break + time.sleep(0.02) + assert done_payload is not None + assert done_payload["summary"]["headline"] == "Compressed: 4 → 2 messages" + assert done_payload["session"]["messages"] == [ + {"role": "user", "content": "one"}, + {"role": "assistant", "content": "four"}, + ] + + +def test_session_compress_status_reports_worker_error_without_raw_paths(monkeypatch, cleanup_test_sessions): + import api.routes as routes + + assert hasattr(routes, "_handle_session_compress_start") + assert hasattr(routes, "_handle_session_compress_status") + + class FailingCompressor: + entered = threading.Event() + + def compress(self, messages, current_tokens=None, focus_topic=None): + self.entered.set() + raise RuntimeError("provider log at /Users/alice/.hermes/secrets/token.txt failed") + + class FailingAgent: + def __init__(self, **kwargs): + self.context_compressor = FailingCompressor() + + created = cleanup_test_sessions + sid = _make_session() + created.append(sid) + _install_fake_compression_runtime(monkeypatch, FailingAgent) + + start = _FakeHandler() + routes._handle_session_compress_start(start, {"session_id": sid}) + assert start.status == 200 + assert FailingCompressor.entered.wait(timeout=2) + + deadline = time.time() + 5 + error_payload = None + while time.time() < deadline: + status = _FakeHandler() + routes._handle_session_compress_status(status, sid) + payload = status.payload() + if payload["status"] == "error": + error_payload = payload + break + time.sleep(0.02) + assert error_payload is not None + assert error_payload["ok"] is False + assert error_payload["error_status"] == 400 + assert "" in error_payload["error"] + assert "/Users/alice" not in error_payload["error"] + + +def test_session_compress_start_retries_after_terminal_error(monkeypatch, cleanup_test_sessions): + import api.routes as routes + + class BlockingCompressor: + entered = threading.Event() + release = threading.Event() + + def compress(self, messages, current_tokens=None, focus_topic=None): + self.entered.set() + assert self.release.wait(timeout=5), "test timed out waiting to release compression" + return [messages[0], messages[-1]] + + class BlockingAgent: + def __init__(self, **kwargs): + self.context_compressor = BlockingCompressor() + + created = cleanup_test_sessions + sid = _make_session() + created.append(sid) + _install_fake_compression_runtime(monkeypatch, BlockingAgent) + + with routes._MANUAL_COMPRESSION_JOBS_LOCK: + routes._MANUAL_COMPRESSION_JOBS[sid] = { + "session_id": sid, + "focus_topic": None, + "status": "error", + "error": "previous failure", + "error_status": 400, + "started_at": time.time(), + "updated_at": time.time(), + } + + try: + retry = _FakeHandler() + routes._handle_session_compress_start(retry, {"session_id": sid}) + assert retry.status == 200 + retry_payload = retry.payload() + assert retry_payload["status"] == "running" + assert retry_payload["ok"] is True + assert BlockingCompressor.entered.wait(timeout=2) + finally: + BlockingCompressor.release.set() + + +def test_session_compress_async_reports_stale_session_guard(monkeypatch, cleanup_test_sessions): + import api.routes as routes + + created = cleanup_test_sessions + sid = _make_session() + created.append(sid) + + class MutatingCompressor: + entered = threading.Event() + + def compress(self, messages, current_tokens=None, focus_topic=None): + live = get_session(sid) + live.messages.append({"role": "user", "content": "concurrent edit"}) + self.entered.set() + return [messages[0], messages[-1]] + + class MutatingAgent: + def __init__(self, **kwargs): + self.context_compressor = MutatingCompressor() + + _install_fake_compression_runtime(monkeypatch, MutatingAgent) + + start = _FakeHandler() + routes._handle_session_compress_start(start, {"session_id": sid}) + assert start.status == 200 + assert MutatingCompressor.entered.wait(timeout=2) + + deadline = time.time() + 5 + error_payload = None + while time.time() < deadline: + status = _FakeHandler() + routes._handle_session_compress_status(status, sid) + payload = status.payload() + if payload["status"] == "error": + error_payload = payload + break + time.sleep(0.02) + assert error_payload is not None + assert error_payload["ok"] is False + assert error_payload["error_status"] == 409 + assert "modified during compression" in error_payload["error"] + assert get_session(sid).messages[-1]["content"] == "concurrent edit" + + +def test_session_compress_async_reports_stream_state_guard(monkeypatch, cleanup_test_sessions): + import api.routes as routes + + created = cleanup_test_sessions + sid = _make_session() + created.append(sid) + + class StreamMutatingCompressor: + entered = threading.Event() + + def compress(self, messages, current_tokens=None, focus_topic=None): + live = get_session(sid) + live.active_stream_id = "stream-concurrent" + self.entered.set() + return [messages[0], messages[-1]] + + class StreamMutatingAgent: + def __init__(self, **kwargs): + self.context_compressor = StreamMutatingCompressor() + + _install_fake_compression_runtime(monkeypatch, StreamMutatingAgent) + + start = _FakeHandler() + routes._handle_session_compress_start(start, {"session_id": sid}) + assert start.status == 200 + assert StreamMutatingCompressor.entered.wait(timeout=2) + + deadline = time.time() + 5 + error_payload = None + while time.time() < deadline: + status = _FakeHandler() + routes._handle_session_compress_status(status, sid) + payload = status.payload() + if payload["status"] == "error": + error_payload = payload + break + time.sleep(0.02) + assert error_payload is not None + assert error_payload["ok"] is False + assert error_payload["error_status"] == 409 + assert "stream state changed" in error_payload["error"] + assert get_session(sid).active_stream_id == "stream-concurrent" + + def test_static_commands_js_registers_compress_alias(cleanup_test_sessions): from pathlib import Path @@ -160,7 +413,10 @@ def test_static_commands_js_registers_compress_alias(cleanup_test_sessions): src = f.read() assert "name:'compress'" in src assert "name:'compact'" in src - assert "/api/session/compress" in src + assert "/api/session/compress/start" in src + assert "/api/session/compress/status" in src + assert "await api('/api/session/compress'," not in src + assert "beforeCount:visibleCount" in src assert "cmdCompress" in src assert "cmdCompact" in src @@ -173,3 +429,12 @@ def test_static_commands_js_prefers_persisted_reference_message(cleanup_test_ses assert "const messageRef=referenceMsg?msgContent(referenceMsg)||String(referenceMsg.content||''):'';" in src assert "const referenceText=messageRef || summaryRef;" in src + + +def test_static_session_load_resumes_manual_compression_polling(cleanup_test_sessions): + from pathlib import Path + + with open(Path(__file__).resolve().parents[1] / "static" / "sessions.js", encoding="utf-8") as f: + src = f.read() + + assert "resumeManualCompressionForSession" in src