Merge pull request #2128 into stage-344

Fix manual compression proxy timeouts (closes #2087)

# Conflicts:
#	CHANGELOG.md
This commit is contained in:
Hermes Agent
2026-05-12 16:13:01 +00:00
4 changed files with 625 additions and 83 deletions
+193
View File
@@ -5,6 +5,7 @@ Extracted from server.py (Sprint 11) so server.py is a thin shell.
import html as _html
import copy
import io
import json
import logging
import os
@@ -49,6 +50,9 @@ _CLIENT_DISCONNECT_ERRORS = (
# Track job IDs currently being executed so the frontend can poll status.
_RUNNING_CRON_JOBS: dict[str, float] = {} # job_id → start_timestamp
_RUNNING_CRON_LOCK = threading.Lock()
_MANUAL_COMPRESSION_JOBS: dict[str, dict] = {}
_MANUAL_COMPRESSION_JOBS_LOCK = threading.Lock()
_MANUAL_COMPRESSION_JOB_TTL_SECONDS = 10 * 60
_CRON_OUTPUT_CONTENT_LIMIT = 8000
_CRON_OUTPUT_HEADER_CONTEXT = 200
_MESSAGING_RAW_SOURCES = {str(s).strip().lower() for s in MESSAGING_SOURCES}
@@ -3072,6 +3076,10 @@ def handle_get(handler, parsed) -> bool:
logger.exception("failed to read worktree status for session %s", sid)
return bad(handler, _sanitize_error(exc), status=500)
if parsed.path == "/api/session/compress/status":
query = parse_qs(parsed.query)
return _handle_session_compress_status(handler, query.get("session_id", [""])[0])
if parsed.path == "/api/session":
import time as _time
_t0 = _time.monotonic()
@@ -4421,6 +4429,9 @@ def handle_post(handler, parsed) -> bool:
"parent_session_id": source.session_id,
})
if parsed.path == "/api/session/compress/start":
return _handle_session_compress_start(handler, body)
if parsed.path == "/api/session/compress":
return _handle_session_compress(handler, body)
@@ -7745,6 +7756,174 @@ def _handle_clarify_respond(handler, body):
return j(handler, {"ok": True, "response": response})
class _ManualCompressionMemoryHandler:
def __init__(self):
self.wfile = io.BytesIO()
self.status = None
self.sent_headers = {}
def send_response(self, status):
self.status = status
def send_header(self, key, value):
self.sent_headers[key] = value
def end_headers(self):
pass
def payload(self):
raw = self.wfile.getvalue().decode("utf-8")
return json.loads(raw) if raw else {}
def _manual_compression_cleanup_locked(now=None):
now = time.time() if now is None else now
for sid, job in list(_MANUAL_COMPRESSION_JOBS.items()):
if job.get("status") == "running":
continue
updated_at = float(job.get("updated_at") or job.get("started_at") or now)
if now - updated_at > _MANUAL_COMPRESSION_JOB_TTL_SECONDS:
_MANUAL_COMPRESSION_JOBS.pop(sid, None)
def _manual_compression_status_payload(job):
status = job.get("status") or "running"
payload = {
"ok": status not in {"error", "cancelled"},
"status": status,
"session_id": job.get("session_id"),
"focus_topic": job.get("focus_topic"),
"started_at": job.get("started_at"),
"updated_at": job.get("updated_at"),
}
if status == "done":
result = job.get("result")
if isinstance(result, dict):
payload.update(result)
payload["status"] = "done"
payload["ok"] = True
elif status == "error":
payload["ok"] = False
payload["error"] = job.get("error") or "Compression failed"
payload["error_status"] = int(job.get("error_status") or 400)
elif status == "cancelled":
payload["ok"] = False
payload["error"] = job.get("error") or "Compression cancelled"
payload["error_status"] = int(job.get("error_status") or 409)
return payload
def _run_manual_compression_job(sid, body):
memory_handler = _ManualCompressionMemoryHandler()
try:
_handle_session_compress(memory_handler, body)
status = int(memory_handler.status or 500)
payload = memory_handler.payload()
with _MANUAL_COMPRESSION_JOBS_LOCK:
job = _MANUAL_COMPRESSION_JOBS.get(sid)
if not job:
return
now = time.time()
if status >= 400 or not isinstance(payload, dict) or payload.get("error"):
job.update(
{
"status": "error",
"error": str((payload or {}).get("error") or "Compression failed"),
"error_status": status,
"updated_at": now,
}
)
else:
job.update(
{
"status": "done",
"result": payload,
"updated_at": now,
}
)
except Exception as exc:
logger.warning("Manual compression worker failed for session %s: %s", sid, exc)
with _MANUAL_COMPRESSION_JOBS_LOCK:
job = _MANUAL_COMPRESSION_JOBS.get(sid)
if job:
job.update(
{
"status": "error",
"error": f"Compression failed: {_sanitize_error(exc)}",
"error_status": 500,
"updated_at": time.time(),
}
)
def _handle_session_compress_start(handler, body):
try:
require(body, "session_id")
except ValueError as e:
return bad(handler, str(e))
sid = str(body.get("session_id") or "").strip()
if not sid:
return bad(handler, "session_id is required")
try:
s = get_session(sid)
except KeyError:
return bad(handler, "Session not found", 404)
if getattr(s, "active_stream_id", None):
return bad(handler, "Session is still streaming; wait for the current turn to finish.", 409)
focus_topic = str(body.get("focus_topic") or body.get("topic") or "").strip()[:500] or None
job_body = {"session_id": sid}
if focus_topic:
job_body["focus_topic"] = focus_topic
now = time.time()
with _MANUAL_COMPRESSION_JOBS_LOCK:
_manual_compression_cleanup_locked(now)
existing = _MANUAL_COMPRESSION_JOBS.get(sid)
if existing:
existing_payload = _manual_compression_status_payload(existing)
if existing_payload.get("status") == "running":
return j(handler, existing_payload)
_MANUAL_COMPRESSION_JOBS.pop(sid, None)
if existing_payload.get("status") == "done":
return j(handler, existing_payload)
job = {
"session_id": sid,
"focus_topic": focus_topic,
"status": "running",
"started_at": now,
"updated_at": now,
}
_MANUAL_COMPRESSION_JOBS[sid] = job
worker = threading.Thread(
target=_run_manual_compression_job,
args=(sid, job_body),
name=f"manual-compress-{sid[:8]}",
daemon=True,
)
worker.start()
with _MANUAL_COMPRESSION_JOBS_LOCK:
return j(handler, _manual_compression_status_payload(_MANUAL_COMPRESSION_JOBS.get(sid, job)))
def _handle_session_compress_status(handler, sid):
sid = str(sid or "").strip()
if not sid:
return bad(handler, "session_id is required")
with _MANUAL_COMPRESSION_JOBS_LOCK:
_manual_compression_cleanup_locked()
job = _MANUAL_COMPRESSION_JOBS.get(sid)
if not job:
return j(handler, {"ok": True, "status": "idle", "session_id": sid})
payload = _manual_compression_status_payload(job)
if payload.get("status") == "done":
_MANUAL_COMPRESSION_JOBS.pop(sid, None)
return j(handler, payload)
def _handle_session_compress(handler, body):
def _anchor_message_key(m):
if not isinstance(m, dict):
@@ -7943,6 +8122,12 @@ def _handle_session_compress(handler, body):
# Lock contract: hold for the in-memory mutation only, never across
# network I/O.
original_messages = list(messages)
original_stream_state = (
getattr(s, "active_stream_id", None),
getattr(s, "pending_user_message", None),
copy.deepcopy(getattr(s, "pending_attachments", None)),
getattr(s, "pending_started_at", None),
)
approx_tokens = _estimate_messages_tokens_rough(original_messages)
agent = _run_agent.AIAgent(
@@ -7974,6 +8159,14 @@ def _handle_session_compress(handler, body):
with _cfg._get_session_agent_lock(sid):
# Re-read messages to detect concurrent edits during the LLM call.
# If the history changed, the compression result is stale — abort.
current_stream_state = (
getattr(s, "active_stream_id", None),
getattr(s, "pending_user_message", None),
copy.deepcopy(getattr(s, "pending_attachments", None)),
getattr(s, "pending_started_at", None),
)
if current_stream_state != original_stream_state:
return bad(handler, "Session stream state changed during compression; please retry.", 409)
if _sanitize_messages_for_api(s.messages) != original_messages:
return bad(handler, "Session was modified during compression; please retry.", 409)