diff --git a/api/models.py b/api/models.py index ad0361c0..ed300617 100644 --- a/api/models.py +++ b/api/models.py @@ -966,12 +966,24 @@ def _enrich_sidebar_lineage_metadata(sessions: list[dict]) -> None: session.update(metadata[sid]) -def all_sessions(): +def _diag_stage(diag, name: str) -> None: + if diag is not None: + try: + diag.stage(name) + except Exception: + pass + + +def all_sessions(diag=None): + _diag_stage(diag, "all_sessions.active_streams") active_stream_ids = _active_stream_ids() # Phase C: try index first for O(1) read; fall back to full scan + _diag_stage(diag, "all_sessions.index_exists") if SESSION_INDEX_FILE.exists(): try: + _diag_stage(diag, "all_sessions.read_index") index = json.loads(SESSION_INDEX_FILE.read_text(encoding='utf-8')) + _diag_stage(diag, "all_sessions.prune_index") index = [ s for s in index if _index_entry_exists(s.get('session_id')) @@ -979,21 +991,25 @@ def all_sessions(): backfilled = [] for i, s in enumerate(index): if 'last_message_at' not in s: + _diag_stage(diag, "all_sessions.backfill_load") full = Session.load(s.get('session_id')) if full: index[i] = full.compact() backfilled.append(full) if backfilled: try: + _diag_stage(diag, "all_sessions.backfill_write") _write_session_index(updates=backfilled) except Exception: logger.debug("Failed to persist last_message_at backfill") + _diag_stage(diag, "all_sessions.mark_streaming") for s in index: s['is_streaming'] = _is_streaming_session( s.get('active_stream_id'), active_stream_ids, ) # Overlay any in-memory sessions that may be newer than the index + _diag_stage(diag, "all_sessions.overlay_lock") index_map = {s['session_id']: s for s in index} with LOCK: for s in SESSIONS.values(): @@ -1001,6 +1017,7 @@ def all_sessions(): include_runtime=True, active_stream_ids=active_stream_ids, ) + _diag_stage(diag, "all_sessions.sort_filter") result = sorted(index_map.values(), key=lambda s: (s.get('pinned', False), _session_sort_timestamp(s)), reverse=True) # Hide empty Untitled sessions from the UI entirely — they are ephemeral # scratch pads that only become real once the first message is sent (#1171). @@ -1025,11 +1042,13 @@ def all_sessions(): for s in result: if not s.get('profile'): s['profile'] = 'default' + _diag_stage(diag, "all_sessions.lineage_metadata") _enrich_sidebar_lineage_metadata(result) return result except Exception: logger.debug("Failed to load session index, falling back to full scan") # Full scan fallback + _diag_stage(diag, "all_sessions.full_scan") out = [] for p in SESSION_DIR.glob('*.json'): if p.name.startswith('_'): continue @@ -1038,8 +1057,10 @@ def all_sessions(): if s: out.append(s) except Exception: logger.debug("Failed to load session from %s", p) + _diag_stage(diag, "all_sessions.full_scan_overlay") for s in SESSIONS.values(): if all(s.session_id != x.session_id for x in out): out.append(s) + _diag_stage(diag, "all_sessions.full_scan_sort_filter") out.sort(key=lambda s: (getattr(s, 'pinned', False), _session_sort_timestamp(s)), reverse=True) # Hide empty Untitled sessions from the UI entirely — kept consistent with the # index-path filter above. No grace window: a 0-message Untitled session is @@ -1054,6 +1075,7 @@ def all_sessions(): for s in result: if not s.get('profile'): s['profile'] = 'default' + _diag_stage(diag, "all_sessions.lineage_metadata") _enrich_sidebar_lineage_metadata(result) return result diff --git a/api/request_diagnostics.py b/api/request_diagnostics.py new file mode 100644 index 00000000..4c3ec719 --- /dev/null +++ b/api/request_diagnostics.py @@ -0,0 +1,160 @@ +"""Slow request diagnostics for latency-sensitive browser API paths.""" + +from __future__ import annotations + +import json +import logging +import os +import sys +import threading +import time +import traceback +import uuid +from typing import Any + + +DEFAULT_SLOW_REQUEST_SECONDS = 5.0 +MAX_STACK_FRAMES_PER_THREAD = 40 + + +def _slow_request_seconds() -> float: + raw = os.getenv("HERMES_WEBUI_SLOW_REQUEST_SECONDS", "").strip() + if not raw: + return DEFAULT_SLOW_REQUEST_SECONDS + try: + value = float(raw) + except ValueError: + return DEFAULT_SLOW_REQUEST_SECONDS + return max(0.0, value) + + +class RequestDiagnostics: + """Track request stages and emit a watchdog record if a request wedges.""" + + def __init__( + self, + method: str, + path: str, + *, + logger: logging.Logger | None = None, + timeout_seconds: float | None = None, + auto_start: bool = True, + ) -> None: + self.request_id = uuid.uuid4().hex[:10] + self.method = str(method or "-") + self.path = str(path or "-").split("?", 1)[0] + self.logger = logger or logging.getLogger(__name__) + self.timeout_seconds = _slow_request_seconds() if timeout_seconds is None else max(0.0, float(timeout_seconds)) + self.started_monotonic = time.monotonic() + self.started_wall = time.time() + self._lock = threading.Lock() + self._stages: list[dict[str, Any]] = [] + self._current_stage = "start" + self._current_stage_started = self.started_monotonic + self._finished = False + self._watchdog_logged = False + self._timer: threading.Timer | None = None + if auto_start and self.timeout_seconds > 0: + self._timer = threading.Timer(self.timeout_seconds, self._on_timeout) + self._timer.daemon = True + self._timer.start() + + @classmethod + def maybe_start( + cls, + method: str, + path: str, + *, + logger: logging.Logger | None = None, + ) -> "RequestDiagnostics | None": + clean_path = str(path or "").split("?", 1)[0] + if (method.upper(), clean_path) not in { + ("GET", "/api/sessions"), + ("POST", "/api/chat/start"), + }: + return None + return cls(method, clean_path, logger=logger) + + def stage(self, name: str) -> None: + now = time.monotonic() + clean = str(name or "unknown").strip() or "unknown" + with self._lock: + if self._finished: + return + self._stages.append( + { + "name": self._current_stage, + "ms": round((now - self._current_stage_started) * 1000, 1), + } + ) + self._current_stage = clean + self._current_stage_started = now + + def finish(self) -> None: + timer = None + record = None + with self._lock: + if self._finished: + return + self._finished = True + timer = self._timer + record = self._build_record_locked(include_stacks=False) + if timer is not None: + timer.cancel() + if record and self.timeout_seconds > 0 and record["elapsed_ms"] >= self.timeout_seconds * 1000: + self.logger.warning( + "Slow WebUI request completed: %s", + json.dumps(record, sort_keys=True), + ) + + def _on_timeout(self) -> None: + with self._lock: + if self._finished or self._watchdog_logged: + return + self._watchdog_logged = True + record = self._build_record_locked(include_stacks=True) + self.logger.warning( + "Slow WebUI request still running: %s", + json.dumps(record, sort_keys=True), + ) + + def _build_record_locked(self, *, include_stacks: bool) -> dict[str, Any]: + now = time.monotonic() + stages = list(self._stages) + stages.append( + { + "name": self._current_stage, + "ms": round((now - self._current_stage_started) * 1000, 1), + } + ) + record: dict[str, Any] = { + "request_id": self.request_id, + "method": self.method, + "path": self.path, + "started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(self.started_wall)), + "elapsed_ms": round((now - self.started_monotonic) * 1000, 1), + "current_stage": self._current_stage, + "stages": stages, + } + if include_stacks: + record["thread_stacks"] = _thread_stack_snapshot() + return record + + +def _thread_stack_snapshot() -> list[dict[str, Any]]: + frames = sys._current_frames() + threads = {thread.ident: thread for thread in threading.enumerate()} + snapshot: list[dict[str, Any]] = [] + for ident, frame in frames.items(): + thread = threads.get(ident) + stack = traceback.format_stack(frame, limit=MAX_STACK_FRAMES_PER_THREAD) + snapshot.append( + { + "thread_id": ident, + "thread_name": thread.name if thread else "", + "daemon": bool(thread.daemon) if thread else None, + "stack": [line.rstrip() for line in stack], + } + ) + snapshot.sort(key=lambda item: str(item.get("thread_name") or "")) + return snapshot diff --git a/api/routes.py b/api/routes.py index 73d5201c..28fd4b61 100644 --- a/api/routes.py +++ b/api/routes.py @@ -587,6 +587,7 @@ from api.helpers import ( _redact_text, ) from api.agent_health import build_agent_health_payload +from api.request_diagnostics import RequestDiagnostics from api.system_health import build_system_health_payload @@ -2972,80 +2973,96 @@ def handle_get(handler, parsed) -> bool: return j(handler, {"results": get_results(sid)}) if parsed.path == "/api/sessions": - webui_sessions = all_sessions() - settings = load_settings() - show_cli_sessions = bool(settings.get("show_cli_sessions")) - if show_cli_sessions: - cli = get_cli_sessions() - cli_by_id = {s["session_id"]: s for s in cli} - for s in webui_sessions: - meta = cli_by_id.get(s.get("session_id")) - if not meta: - continue - if _is_messaging_session_record(meta): - s.update(_merge_cli_sidebar_metadata(s, meta)) - if s.get("session_id") != meta.get("session_id"): - s["session_id"] = meta.get("session_id") - else: - for key in ("source_tag", "raw_source", "session_source", "source_label"): - if not s.get(key) and meta.get(key): - s[key] = meta[key] - # Apply the same CLI visibility semantics to imported local copies so - # low-value imported artifacts do not leak into the sidebar. - webui_sessions = [s for s in webui_sessions if is_cli_session_row_visible(s)] - webui_ids = {s["session_id"] for s in webui_sessions} - from api.models import _hide_from_default_sidebar as _cron_hide - deduped_cli = [s for s in cli if s["session_id"] not in webui_ids and is_cli_session_row_visible(s) and not _cron_hide(s)] - else: - webui_sessions = [s for s in webui_sessions if not _is_cli_session_for_settings(s)] - deduped_cli = [] - merged = webui_sessions + deduped_cli - merged.sort( - key=lambda s: s.get("last_message_at") or s.get("updated_at", 0) or 0, - reverse=True, - ) - # ── Profile scoping (#1611) ──────────────────────────────────────── - # Default: filter to the active profile. ?all_profiles=1 opts into - # the aggregate view used by the "All profiles" sidebar toggle. - # The other_profile_count is always returned so the UI can render - # the "Show N from other profiles" affordance without sending the - # cross-profile rows by default. - # - # IMPORTANT: scope BEFORE _keep_latest_messaging_session_per_source. - # _messaging_source_key is profile-blind (#1614 follow-up): if the - # same Slack/Telegram identity has sessions in profiles A and B, a - # profile-blind dedupe would discard the older one even when scoped - # to its own profile, leaving that profile with zero rows for that - # source. Filter first so the dedupe operates only within the active - # profile's rows. - from api.profiles import get_active_profile_name - active_profile = get_active_profile_name() - all_profiles = _all_profiles_query_flag(parsed) - if all_profiles: - scoped = merged - other_profile_count = 0 - else: - scoped = [s for s in merged - if _profiles_match(s.get("profile"), active_profile)] - other_profile_count = len(merged) - len(scoped) - scoped = _keep_latest_messaging_session_per_source(scoped) - if show_cli_sessions: - scoped = _cap_recent_cli_sessions(scoped, cli_cap=CLI_VISIBLE_SESSION_CAP) - safe_merged = [] - for s in scoped: - item = dict(s) - if isinstance(item.get("title"), str): - item["title"] = _redact_text(item["title"]) - safe_merged.append(item) - return j(handler, { - "sessions": safe_merged, - "cli_count": len(deduped_cli), - "all_profiles": all_profiles, - "active_profile": active_profile, - "other_profile_count": other_profile_count, - "server_time": time.time(), - "server_tz": time.strftime("%z"), - }) + diag = RequestDiagnostics.maybe_start("GET", parsed.path, logger=logger) + try: + diag.stage("all_sessions") + webui_sessions = all_sessions(diag=diag) + diag.stage("load_settings") + settings = load_settings() + show_cli_sessions = bool(settings.get("show_cli_sessions")) + if show_cli_sessions: + diag.stage("get_cli_sessions") + cli = get_cli_sessions() + diag.stage("merge_cli_sessions") + cli_by_id = {s["session_id"]: s for s in cli} + for s in webui_sessions: + meta = cli_by_id.get(s.get("session_id")) + if not meta: + continue + if _is_messaging_session_record(meta): + s.update(_merge_cli_sidebar_metadata(s, meta)) + if s.get("session_id") != meta.get("session_id"): + s["session_id"] = meta.get("session_id") + else: + for key in ("source_tag", "raw_source", "session_source", "source_label"): + if not s.get(key) and meta.get(key): + s[key] = meta[key] + # Apply the same CLI visibility semantics to imported local copies so + # low-value imported artifacts do not leak into the sidebar. + webui_sessions = [s for s in webui_sessions if is_cli_session_row_visible(s)] + webui_ids = {s["session_id"] for s in webui_sessions} + from api.models import _hide_from_default_sidebar as _cron_hide + deduped_cli = [s for s in cli if s["session_id"] not in webui_ids and is_cli_session_row_visible(s) and not _cron_hide(s)] + else: + diag.stage("filter_webui_sessions") + webui_sessions = [s for s in webui_sessions if not _is_cli_session_for_settings(s)] + deduped_cli = [] + diag.stage("sort_sessions") + merged = webui_sessions + deduped_cli + merged.sort( + key=lambda s: s.get("last_message_at") or s.get("updated_at", 0) or 0, + reverse=True, + ) + # ── Profile scoping (#1611) ──────────────────────────────────────── + # Default: filter to the active profile. ?all_profiles=1 opts into + # the aggregate view used by the "All profiles" sidebar toggle. + # The other_profile_count is always returned so the UI can render + # the "Show N from other profiles" affordance without sending the + # cross-profile rows by default. + # + # IMPORTANT: scope BEFORE _keep_latest_messaging_session_per_source. + # _messaging_source_key is profile-blind (#1614 follow-up): if the + # same Slack/Telegram identity has sessions in profiles A and B, a + # profile-blind dedupe would discard the older one even when scoped + # to its own profile, leaving that profile with zero rows for that + # source. Filter first so the dedupe operates only within the active + # profile's rows. + diag.stage("active_profile") + from api.profiles import get_active_profile_name + active_profile = get_active_profile_name() + all_profiles = _all_profiles_query_flag(parsed) + diag.stage("profile_filter") + if all_profiles: + scoped = merged + other_profile_count = 0 + else: + scoped = [s for s in merged + if _profiles_match(s.get("profile"), active_profile)] + other_profile_count = len(merged) - len(scoped) + diag.stage("messaging_dedupe") + scoped = _keep_latest_messaging_session_per_source(scoped) + if show_cli_sessions: + diag.stage("cli_cap") + scoped = _cap_recent_cli_sessions(scoped, cli_cap=CLI_VISIBLE_SESSION_CAP) + diag.stage("redact_sessions") + safe_merged = [] + for s in scoped: + item = dict(s) + if isinstance(item.get("title"), str): + item["title"] = _redact_text(item["title"]) + safe_merged.append(item) + diag.stage("response_write") + return j(handler, { + "sessions": safe_merged, + "cli_count": len(deduped_cli), + "all_profiles": all_profiles, + "active_profile": active_profile, + "other_profile_count": other_profile_count, + "server_time": time.time(), + "server_tz": time.strftime("%z"), + }) + finally: + diag.finish() if parsed.path == "/api/projects": # ── Profile scoping (#1614) ──────────────────────────────────────── @@ -3453,9 +3470,16 @@ def handle_get(handler, parsed) -> bool: def handle_post(handler, parsed) -> bool: """Handle all POST routes. Returns True if handled, False for 404.""" + diag = RequestDiagnostics.maybe_start("POST", parsed.path, logger=logger) # CSRF: reject cross-origin browser requests + if diag: + diag.stage("csrf") if not _check_csrf(handler): - return j(handler, {"error": "Cross-origin request rejected"}, status=403) + try: + return j(handler, {"error": "Cross-origin request rejected"}, status=403) + finally: + if diag: + diag.finish() if parsed.path == "/api/upload": return handle_upload(handler) @@ -3465,7 +3489,14 @@ def handle_post(handler, parsed) -> bool: if parsed.path == "/api/transcribe": return handle_transcribe(handler) - body = read_body(handler) + if diag: + diag.stage("read_body") + try: + body = read_body(handler) + except Exception: + if diag: + diag.finish() + raise if parsed.path.startswith("/api/kanban/"): from api.kanban_bridge import handle_kanban_post @@ -4002,7 +4033,7 @@ def handle_post(handler, parsed) -> bool: return _handle_background(handler, body) if parsed.path == "/api/chat/start": - return _handle_chat_start(handler, body) + return _handle_chat_start(handler, body, diag=diag) if parsed.path == "/api/chat": return _handle_chat_sync(handler, body) @@ -6170,104 +6201,126 @@ def _prepare_chat_start_session_for_stream( s.save() -def _handle_chat_start(handler, body): +def _handle_chat_start(handler, body, diag=None): try: - require(body, "session_id") - except ValueError as e: - return bad(handler, str(e)) - try: - s = get_session(body["session_id"]) - except KeyError: - return bad(handler, "Session not found", 404) - requested_profile = str(body.get("profile") or "").strip() - if requested_profile: + diag.stage("validate_session_id") if diag else None try: - from api.profiles import _PROFILE_ID_RE + require(body, "session_id") + except ValueError as e: + return bad(handler, str(e)) + diag.stage("get_session") if diag else None + try: + s = get_session(body["session_id"]) + except KeyError: + return bad(handler, "Session not found", 404) + diag.stage("validate_profile") if diag else None + requested_profile = str(body.get("profile") or "").strip() + if requested_profile: + try: + from api.profiles import _PROFILE_ID_RE - if requested_profile != "default" and not _PROFILE_ID_RE.fullmatch(requested_profile): - return bad(handler, "invalid profile", 400) - except ImportError: - requested_profile = "" - if requested_profile and not _profiles_match(getattr(s, "profile", None), requested_profile): - has_persisted_turns = bool( - getattr(s, "messages", None) - or getattr(s, "context_messages", None) - or getattr(s, "pending_user_message", None) - ) - if not has_persisted_turns: - # Empty sessions are placeholders. If the user switches profiles - # before sending the first turn, run the placeholder under the - # currently-selected profile instead of the stale one stamped at - # creation time. - s.profile = requested_profile - msg = str(body.get("message", "")).strip() - if not msg: - return bad(handler, "message is required") - attachments = _normalize_chat_attachments(body.get("attachments") or [])[:20] - try: - workspace = str(resolve_trusted_workspace(body.get("workspace") or s.workspace)) - except ValueError as e: - return bad(handler, str(e)) - requested_model = body.get("model") or s.model - requested_provider = ( - body.get("model_provider") - if "model_provider" in body - else getattr(s, "model_provider", None) - ) - model, model_provider, normalized_model = _resolve_compatible_session_model_state( - requested_model, - requested_provider, - ) - # Prevent duplicate runs in the same session while a stream is still active. - # This commonly happens after page refresh/reconnect races and can produce - # duplicated clarify cards for what appears to be a single user request. - current_stream_id = getattr(s, "active_stream_id", None) - if current_stream_id: - with STREAMS_LOCK: - current_active = current_stream_id in STREAMS - if current_active: - return j( - handler, - { - "error": "session already has an active stream", - "active_stream_id": current_stream_id, - }, - status=409, + if requested_profile != "default" and not _PROFILE_ID_RE.fullmatch(requested_profile): + return bad(handler, "invalid profile", 400) + except ImportError: + requested_profile = "" + if requested_profile and not _profiles_match(getattr(s, "profile", None), requested_profile): + has_persisted_turns = bool( + getattr(s, "messages", None) + or getattr(s, "context_messages", None) + or getattr(s, "pending_user_message", None) ) - # Stale stream id from a previous run; clear and continue. - _clear_stale_stream_state(s) - stream_id = uuid.uuid4().hex - with _get_session_agent_lock(s.session_id): - _prepare_chat_start_session_for_stream( - s, - msg=msg, - attachments=attachments, - workspace=workspace, - model=model, - model_provider=model_provider, - stream_id=stream_id, + if not has_persisted_turns: + # Empty sessions are placeholders. If the user switches profiles + # before sending the first turn, run the placeholder under the + # currently-selected profile instead of the stale one stamped at + # creation time. + s.profile = requested_profile + diag.stage("normalize_message") if diag else None + msg = str(body.get("message", "")).strip() + if not msg: + return bad(handler, "message is required") + diag.stage("normalize_attachments") if diag else None + attachments = _normalize_chat_attachments(body.get("attachments") or [])[:20] + diag.stage("resolve_workspace") if diag else None + try: + workspace = str(resolve_trusted_workspace(body.get("workspace") or s.workspace)) + except ValueError as e: + return bad(handler, str(e)) + requested_model = body.get("model") or s.model + requested_provider = ( + body.get("model_provider") + if "model_provider" in body + else getattr(s, "model_provider", None) ) - set_last_workspace(workspace) - stream = create_stream_channel() - with STREAMS_LOCK: - STREAMS[stream_id] = stream - thr = threading.Thread( - target=_run_agent_streaming, - args=(s.session_id, msg, model, workspace, stream_id, attachments), - kwargs={"model_provider": model_provider}, - daemon=True, - ) - thr.start() - response = { - "stream_id": stream_id, - "session_id": s.session_id, - "pending_started_at": s.pending_started_at, - } - if normalized_model: - response["effective_model"] = model - if model_provider: - response["effective_model_provider"] = model_provider - return j(handler, response) + diag.stage("resolve_model_provider") if diag else None + model, model_provider, normalized_model = _resolve_compatible_session_model_state( + requested_model, + requested_provider, + ) + # Prevent duplicate runs in the same session while a stream is still active. + # This commonly happens after page refresh/reconnect races and can produce + # duplicated clarify cards for what appears to be a single user request. + diag.stage("active_stream_check") if diag else None + current_stream_id = getattr(s, "active_stream_id", None) + if current_stream_id: + diag.stage("active_stream_lock_wait") if diag else None + with STREAMS_LOCK: + current_active = current_stream_id in STREAMS + if current_active: + diag.stage("response_write") if diag else None + return j( + handler, + { + "error": "session already has an active stream", + "active_stream_id": current_stream_id, + }, + status=409, + ) + # Stale stream id from a previous run; clear and continue. + diag.stage("stale_stream_cleanup") if diag else None + _clear_stale_stream_state(s) + stream_id = uuid.uuid4().hex + session_lock = _get_session_agent_lock(s.session_id) + diag.stage("session_lock_wait") if diag else None + with session_lock: + diag.stage("save_pending_state") if diag else None + _prepare_chat_start_session_for_stream( + s, + msg=msg, + attachments=attachments, + workspace=workspace, + model=model, + model_provider=model_provider, + stream_id=stream_id, + ) + diag.stage("set_last_workspace") if diag else None + set_last_workspace(workspace) + diag.stage("stream_registration") if diag else None + stream = create_stream_channel() + with STREAMS_LOCK: + STREAMS[stream_id] = stream + diag.stage("worker_thread_start") if diag else None + thr = threading.Thread( + target=_run_agent_streaming, + args=(s.session_id, msg, model, workspace, stream_id, attachments), + kwargs={"model_provider": model_provider}, + daemon=True, + ) + thr.start() + response = { + "stream_id": stream_id, + "session_id": s.session_id, + "pending_started_at": s.pending_started_at, + } + if normalized_model: + response["effective_model"] = model + if model_provider: + response["effective_model_provider"] = model_provider + diag.stage("response_write") if diag else None + return j(handler, response) + finally: + if diag: + diag.finish() def _normalize_chat_attachments(raw_attachments): diff --git a/tests/test_issue1855_request_diagnostics.py b/tests/test_issue1855_request_diagnostics.py new file mode 100644 index 00000000..7a4ad54e --- /dev/null +++ b/tests/test_issue1855_request_diagnostics.py @@ -0,0 +1,110 @@ +import json +import logging +from pathlib import Path + +import api.models as models +from api.models import Session +from api.request_diagnostics import RequestDiagnostics + + +class _StageRecorder: + def __init__(self): + self.stages = [] + + def stage(self, name): + self.stages.append(name) + + +def test_request_diagnostics_timeout_record_includes_stage_and_thread_stacks(caplog): + logger = logging.getLogger("test.issue1855.timeout") + diag = RequestDiagnostics( + "GET", + "/api/sessions?all_profiles=1", + logger=logger, + timeout_seconds=5, + auto_start=False, + ) + diag.stage("all_sessions.read_index") + + with caplog.at_level(logging.WARNING, logger=logger.name): + diag._on_timeout() + + assert len(caplog.records) == 1 + record = json.loads(caplog.records[0].args[0]) + assert record["method"] == "GET" + assert record["path"] == "/api/sessions" + assert record["current_stage"] == "all_sessions.read_index" + assert record["elapsed_ms"] >= 0 + assert any(stage["name"] == "all_sessions.read_index" for stage in record["stages"]) + assert record["thread_stacks"] + + +def test_request_diagnostics_maybe_start_is_limited_to_issue1855_paths(): + assert RequestDiagnostics.maybe_start("GET", "/api/sessions") is not None + assert RequestDiagnostics.maybe_start("POST", "/api/chat/start") is not None + assert RequestDiagnostics.maybe_start("GET", "/health") is None + assert RequestDiagnostics.maybe_start("POST", "/api/session/new") is None + + +def test_all_sessions_reports_internal_index_stages(tmp_path, monkeypatch): + session_dir = tmp_path / "sessions" + session_dir.mkdir() + index_file = session_dir / "_index.json" + monkeypatch.setattr(models, "SESSION_DIR", session_dir) + monkeypatch.setattr(models, "SESSION_INDEX_FILE", index_file) + monkeypatch.setattr(models, "_enrich_sidebar_lineage_metadata", lambda sessions: None) + models.SESSIONS.clear() + + s = Session( + session_id="issue1855_indexed", + title="Indexed", + messages=[{"role": "user", "content": "hi", "timestamp": 100}], + ) + s.path.write_text(json.dumps(s.__dict__, ensure_ascii=False), encoding="utf-8") + index_file.write_text( + json.dumps( + [ + { + "session_id": s.session_id, + "title": s.title, + "updated_at": s.updated_at, + "workspace": s.workspace, + "model": s.model, + "message_count": 1, + "created_at": s.created_at, + "pinned": False, + "archived": False, + "last_message_at": 100, + } + ], + ensure_ascii=False, + ), + encoding="utf-8", + ) + + diag = _StageRecorder() + rows = models.all_sessions(diag=diag) + + assert [row["session_id"] for row in rows] == [s.session_id] + assert "all_sessions.read_index" in diag.stages + assert "all_sessions.overlay_lock" in diag.stages + assert "all_sessions.lineage_metadata" in diag.stages + + +def test_issue1855_target_routes_are_wired_to_diagnostics(): + src = Path("api/routes.py").read_text(encoding="utf-8") + + assert 'RequestDiagnostics.maybe_start("GET", parsed.path' in src + assert "all_sessions(diag=diag)" in src + assert 'RequestDiagnostics.maybe_start("POST", parsed.path' in src + assert "_handle_chat_start(handler, body, diag=diag)" in src + for stage in ( + "read_body", + "resolve_model_provider", + "session_lock_wait", + "save_pending_state", + "stream_registration", + "worker_thread_start", + "response_write", + ): + assert stage in src