Stage 320: PR #1860 — request wedge diagnostics by @franksong2702

This commit is contained in:
nesquena-hermes
2026-05-08 15:37:08 +00:00
4 changed files with 516 additions and 171 deletions
+23 -1
View File
@@ -966,12 +966,24 @@ def _enrich_sidebar_lineage_metadata(sessions: list[dict]) -> None:
session.update(metadata[sid])
def all_sessions():
def _diag_stage(diag, name: str) -> None:
if diag is not None:
try:
diag.stage(name)
except Exception:
pass
def all_sessions(diag=None):
_diag_stage(diag, "all_sessions.active_streams")
active_stream_ids = _active_stream_ids()
# Phase C: try index first for O(1) read; fall back to full scan
_diag_stage(diag, "all_sessions.index_exists")
if SESSION_INDEX_FILE.exists():
try:
_diag_stage(diag, "all_sessions.read_index")
index = json.loads(SESSION_INDEX_FILE.read_text(encoding='utf-8'))
_diag_stage(diag, "all_sessions.prune_index")
index = [
s for s in index
if _index_entry_exists(s.get('session_id'))
@@ -979,21 +991,25 @@ def all_sessions():
backfilled = []
for i, s in enumerate(index):
if 'last_message_at' not in s:
_diag_stage(diag, "all_sessions.backfill_load")
full = Session.load(s.get('session_id'))
if full:
index[i] = full.compact()
backfilled.append(full)
if backfilled:
try:
_diag_stage(diag, "all_sessions.backfill_write")
_write_session_index(updates=backfilled)
except Exception:
logger.debug("Failed to persist last_message_at backfill")
_diag_stage(diag, "all_sessions.mark_streaming")
for s in index:
s['is_streaming'] = _is_streaming_session(
s.get('active_stream_id'),
active_stream_ids,
)
# Overlay any in-memory sessions that may be newer than the index
_diag_stage(diag, "all_sessions.overlay_lock")
index_map = {s['session_id']: s for s in index}
with LOCK:
for s in SESSIONS.values():
@@ -1001,6 +1017,7 @@ def all_sessions():
include_runtime=True,
active_stream_ids=active_stream_ids,
)
_diag_stage(diag, "all_sessions.sort_filter")
result = sorted(index_map.values(), key=lambda s: (s.get('pinned', False), _session_sort_timestamp(s)), reverse=True)
# Hide empty Untitled sessions from the UI entirely — they are ephemeral
# scratch pads that only become real once the first message is sent (#1171).
@@ -1025,11 +1042,13 @@ def all_sessions():
for s in result:
if not s.get('profile'):
s['profile'] = 'default'
_diag_stage(diag, "all_sessions.lineage_metadata")
_enrich_sidebar_lineage_metadata(result)
return result
except Exception:
logger.debug("Failed to load session index, falling back to full scan")
# Full scan fallback
_diag_stage(diag, "all_sessions.full_scan")
out = []
for p in SESSION_DIR.glob('*.json'):
if p.name.startswith('_'): continue
@@ -1038,8 +1057,10 @@ def all_sessions():
if s: out.append(s)
except Exception:
logger.debug("Failed to load session from %s", p)
_diag_stage(diag, "all_sessions.full_scan_overlay")
for s in SESSIONS.values():
if all(s.session_id != x.session_id for x in out): out.append(s)
_diag_stage(diag, "all_sessions.full_scan_sort_filter")
out.sort(key=lambda s: (getattr(s, 'pinned', False), _session_sort_timestamp(s)), reverse=True)
# Hide empty Untitled sessions from the UI entirely — kept consistent with the
# index-path filter above. No grace window: a 0-message Untitled session is
@@ -1054,6 +1075,7 @@ def all_sessions():
for s in result:
if not s.get('profile'):
s['profile'] = 'default'
_diag_stage(diag, "all_sessions.lineage_metadata")
_enrich_sidebar_lineage_metadata(result)
return result
+160
View File
@@ -0,0 +1,160 @@
"""Slow request diagnostics for latency-sensitive browser API paths."""
from __future__ import annotations
import json
import logging
import os
import sys
import threading
import time
import traceback
import uuid
from typing import Any
DEFAULT_SLOW_REQUEST_SECONDS = 5.0
MAX_STACK_FRAMES_PER_THREAD = 40
def _slow_request_seconds() -> float:
raw = os.getenv("HERMES_WEBUI_SLOW_REQUEST_SECONDS", "").strip()
if not raw:
return DEFAULT_SLOW_REQUEST_SECONDS
try:
value = float(raw)
except ValueError:
return DEFAULT_SLOW_REQUEST_SECONDS
return max(0.0, value)
class RequestDiagnostics:
"""Track request stages and emit a watchdog record if a request wedges."""
def __init__(
self,
method: str,
path: str,
*,
logger: logging.Logger | None = None,
timeout_seconds: float | None = None,
auto_start: bool = True,
) -> None:
self.request_id = uuid.uuid4().hex[:10]
self.method = str(method or "-")
self.path = str(path or "-").split("?", 1)[0]
self.logger = logger or logging.getLogger(__name__)
self.timeout_seconds = _slow_request_seconds() if timeout_seconds is None else max(0.0, float(timeout_seconds))
self.started_monotonic = time.monotonic()
self.started_wall = time.time()
self._lock = threading.Lock()
self._stages: list[dict[str, Any]] = []
self._current_stage = "start"
self._current_stage_started = self.started_monotonic
self._finished = False
self._watchdog_logged = False
self._timer: threading.Timer | None = None
if auto_start and self.timeout_seconds > 0:
self._timer = threading.Timer(self.timeout_seconds, self._on_timeout)
self._timer.daemon = True
self._timer.start()
@classmethod
def maybe_start(
cls,
method: str,
path: str,
*,
logger: logging.Logger | None = None,
) -> "RequestDiagnostics | None":
clean_path = str(path or "").split("?", 1)[0]
if (method.upper(), clean_path) not in {
("GET", "/api/sessions"),
("POST", "/api/chat/start"),
}:
return None
return cls(method, clean_path, logger=logger)
def stage(self, name: str) -> None:
now = time.monotonic()
clean = str(name or "unknown").strip() or "unknown"
with self._lock:
if self._finished:
return
self._stages.append(
{
"name": self._current_stage,
"ms": round((now - self._current_stage_started) * 1000, 1),
}
)
self._current_stage = clean
self._current_stage_started = now
def finish(self) -> None:
timer = None
record = None
with self._lock:
if self._finished:
return
self._finished = True
timer = self._timer
record = self._build_record_locked(include_stacks=False)
if timer is not None:
timer.cancel()
if record and self.timeout_seconds > 0 and record["elapsed_ms"] >= self.timeout_seconds * 1000:
self.logger.warning(
"Slow WebUI request completed: %s",
json.dumps(record, sort_keys=True),
)
def _on_timeout(self) -> None:
with self._lock:
if self._finished or self._watchdog_logged:
return
self._watchdog_logged = True
record = self._build_record_locked(include_stacks=True)
self.logger.warning(
"Slow WebUI request still running: %s",
json.dumps(record, sort_keys=True),
)
def _build_record_locked(self, *, include_stacks: bool) -> dict[str, Any]:
now = time.monotonic()
stages = list(self._stages)
stages.append(
{
"name": self._current_stage,
"ms": round((now - self._current_stage_started) * 1000, 1),
}
)
record: dict[str, Any] = {
"request_id": self.request_id,
"method": self.method,
"path": self.path,
"started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(self.started_wall)),
"elapsed_ms": round((now - self.started_monotonic) * 1000, 1),
"current_stage": self._current_stage,
"stages": stages,
}
if include_stacks:
record["thread_stacks"] = _thread_stack_snapshot()
return record
def _thread_stack_snapshot() -> list[dict[str, Any]]:
frames = sys._current_frames()
threads = {thread.ident: thread for thread in threading.enumerate()}
snapshot: list[dict[str, Any]] = []
for ident, frame in frames.items():
thread = threads.get(ident)
stack = traceback.format_stack(frame, limit=MAX_STACK_FRAMES_PER_THREAD)
snapshot.append(
{
"thread_id": ident,
"thread_name": thread.name if thread else "",
"daemon": bool(thread.daemon) if thread else None,
"stack": [line.rstrip() for line in stack],
}
)
snapshot.sort(key=lambda item: str(item.get("thread_name") or ""))
return snapshot
+223 -170
View File
@@ -587,6 +587,7 @@ from api.helpers import (
_redact_text,
)
from api.agent_health import build_agent_health_payload
from api.request_diagnostics import RequestDiagnostics
from api.system_health import build_system_health_payload
@@ -2972,80 +2973,96 @@ def handle_get(handler, parsed) -> bool:
return j(handler, {"results": get_results(sid)})
if parsed.path == "/api/sessions":
webui_sessions = all_sessions()
settings = load_settings()
show_cli_sessions = bool(settings.get("show_cli_sessions"))
if show_cli_sessions:
cli = get_cli_sessions()
cli_by_id = {s["session_id"]: s for s in cli}
for s in webui_sessions:
meta = cli_by_id.get(s.get("session_id"))
if not meta:
continue
if _is_messaging_session_record(meta):
s.update(_merge_cli_sidebar_metadata(s, meta))
if s.get("session_id") != meta.get("session_id"):
s["session_id"] = meta.get("session_id")
else:
for key in ("source_tag", "raw_source", "session_source", "source_label"):
if not s.get(key) and meta.get(key):
s[key] = meta[key]
# Apply the same CLI visibility semantics to imported local copies so
# low-value imported artifacts do not leak into the sidebar.
webui_sessions = [s for s in webui_sessions if is_cli_session_row_visible(s)]
webui_ids = {s["session_id"] for s in webui_sessions}
from api.models import _hide_from_default_sidebar as _cron_hide
deduped_cli = [s for s in cli if s["session_id"] not in webui_ids and is_cli_session_row_visible(s) and not _cron_hide(s)]
else:
webui_sessions = [s for s in webui_sessions if not _is_cli_session_for_settings(s)]
deduped_cli = []
merged = webui_sessions + deduped_cli
merged.sort(
key=lambda s: s.get("last_message_at") or s.get("updated_at", 0) or 0,
reverse=True,
)
# ── Profile scoping (#1611) ────────────────────────────────────────
# Default: filter to the active profile. ?all_profiles=1 opts into
# the aggregate view used by the "All profiles" sidebar toggle.
# The other_profile_count is always returned so the UI can render
# the "Show N from other profiles" affordance without sending the
# cross-profile rows by default.
#
# IMPORTANT: scope BEFORE _keep_latest_messaging_session_per_source.
# _messaging_source_key is profile-blind (#1614 follow-up): if the
# same Slack/Telegram identity has sessions in profiles A and B, a
# profile-blind dedupe would discard the older one even when scoped
# to its own profile, leaving that profile with zero rows for that
# source. Filter first so the dedupe operates only within the active
# profile's rows.
from api.profiles import get_active_profile_name
active_profile = get_active_profile_name()
all_profiles = _all_profiles_query_flag(parsed)
if all_profiles:
scoped = merged
other_profile_count = 0
else:
scoped = [s for s in merged
if _profiles_match(s.get("profile"), active_profile)]
other_profile_count = len(merged) - len(scoped)
scoped = _keep_latest_messaging_session_per_source(scoped)
if show_cli_sessions:
scoped = _cap_recent_cli_sessions(scoped, cli_cap=CLI_VISIBLE_SESSION_CAP)
safe_merged = []
for s in scoped:
item = dict(s)
if isinstance(item.get("title"), str):
item["title"] = _redact_text(item["title"])
safe_merged.append(item)
return j(handler, {
"sessions": safe_merged,
"cli_count": len(deduped_cli),
"all_profiles": all_profiles,
"active_profile": active_profile,
"other_profile_count": other_profile_count,
"server_time": time.time(),
"server_tz": time.strftime("%z"),
})
diag = RequestDiagnostics.maybe_start("GET", parsed.path, logger=logger)
try:
diag.stage("all_sessions")
webui_sessions = all_sessions(diag=diag)
diag.stage("load_settings")
settings = load_settings()
show_cli_sessions = bool(settings.get("show_cli_sessions"))
if show_cli_sessions:
diag.stage("get_cli_sessions")
cli = get_cli_sessions()
diag.stage("merge_cli_sessions")
cli_by_id = {s["session_id"]: s for s in cli}
for s in webui_sessions:
meta = cli_by_id.get(s.get("session_id"))
if not meta:
continue
if _is_messaging_session_record(meta):
s.update(_merge_cli_sidebar_metadata(s, meta))
if s.get("session_id") != meta.get("session_id"):
s["session_id"] = meta.get("session_id")
else:
for key in ("source_tag", "raw_source", "session_source", "source_label"):
if not s.get(key) and meta.get(key):
s[key] = meta[key]
# Apply the same CLI visibility semantics to imported local copies so
# low-value imported artifacts do not leak into the sidebar.
webui_sessions = [s for s in webui_sessions if is_cli_session_row_visible(s)]
webui_ids = {s["session_id"] for s in webui_sessions}
from api.models import _hide_from_default_sidebar as _cron_hide
deduped_cli = [s for s in cli if s["session_id"] not in webui_ids and is_cli_session_row_visible(s) and not _cron_hide(s)]
else:
diag.stage("filter_webui_sessions")
webui_sessions = [s for s in webui_sessions if not _is_cli_session_for_settings(s)]
deduped_cli = []
diag.stage("sort_sessions")
merged = webui_sessions + deduped_cli
merged.sort(
key=lambda s: s.get("last_message_at") or s.get("updated_at", 0) or 0,
reverse=True,
)
# ── Profile scoping (#1611) ────────────────────────────────────────
# Default: filter to the active profile. ?all_profiles=1 opts into
# the aggregate view used by the "All profiles" sidebar toggle.
# The other_profile_count is always returned so the UI can render
# the "Show N from other profiles" affordance without sending the
# cross-profile rows by default.
#
# IMPORTANT: scope BEFORE _keep_latest_messaging_session_per_source.
# _messaging_source_key is profile-blind (#1614 follow-up): if the
# same Slack/Telegram identity has sessions in profiles A and B, a
# profile-blind dedupe would discard the older one even when scoped
# to its own profile, leaving that profile with zero rows for that
# source. Filter first so the dedupe operates only within the active
# profile's rows.
diag.stage("active_profile")
from api.profiles import get_active_profile_name
active_profile = get_active_profile_name()
all_profiles = _all_profiles_query_flag(parsed)
diag.stage("profile_filter")
if all_profiles:
scoped = merged
other_profile_count = 0
else:
scoped = [s for s in merged
if _profiles_match(s.get("profile"), active_profile)]
other_profile_count = len(merged) - len(scoped)
diag.stage("messaging_dedupe")
scoped = _keep_latest_messaging_session_per_source(scoped)
if show_cli_sessions:
diag.stage("cli_cap")
scoped = _cap_recent_cli_sessions(scoped, cli_cap=CLI_VISIBLE_SESSION_CAP)
diag.stage("redact_sessions")
safe_merged = []
for s in scoped:
item = dict(s)
if isinstance(item.get("title"), str):
item["title"] = _redact_text(item["title"])
safe_merged.append(item)
diag.stage("response_write")
return j(handler, {
"sessions": safe_merged,
"cli_count": len(deduped_cli),
"all_profiles": all_profiles,
"active_profile": active_profile,
"other_profile_count": other_profile_count,
"server_time": time.time(),
"server_tz": time.strftime("%z"),
})
finally:
diag.finish()
if parsed.path == "/api/projects":
# ── Profile scoping (#1614) ────────────────────────────────────────
@@ -3453,9 +3470,16 @@ def handle_get(handler, parsed) -> bool:
def handle_post(handler, parsed) -> bool:
"""Handle all POST routes. Returns True if handled, False for 404."""
diag = RequestDiagnostics.maybe_start("POST", parsed.path, logger=logger)
# CSRF: reject cross-origin browser requests
if diag:
diag.stage("csrf")
if not _check_csrf(handler):
return j(handler, {"error": "Cross-origin request rejected"}, status=403)
try:
return j(handler, {"error": "Cross-origin request rejected"}, status=403)
finally:
if diag:
diag.finish()
if parsed.path == "/api/upload":
return handle_upload(handler)
@@ -3465,7 +3489,14 @@ def handle_post(handler, parsed) -> bool:
if parsed.path == "/api/transcribe":
return handle_transcribe(handler)
body = read_body(handler)
if diag:
diag.stage("read_body")
try:
body = read_body(handler)
except Exception:
if diag:
diag.finish()
raise
if parsed.path.startswith("/api/kanban/"):
from api.kanban_bridge import handle_kanban_post
@@ -4002,7 +4033,7 @@ def handle_post(handler, parsed) -> bool:
return _handle_background(handler, body)
if parsed.path == "/api/chat/start":
return _handle_chat_start(handler, body)
return _handle_chat_start(handler, body, diag=diag)
if parsed.path == "/api/chat":
return _handle_chat_sync(handler, body)
@@ -6170,104 +6201,126 @@ def _prepare_chat_start_session_for_stream(
s.save()
def _handle_chat_start(handler, body):
def _handle_chat_start(handler, body, diag=None):
try:
require(body, "session_id")
except ValueError as e:
return bad(handler, str(e))
try:
s = get_session(body["session_id"])
except KeyError:
return bad(handler, "Session not found", 404)
requested_profile = str(body.get("profile") or "").strip()
if requested_profile:
diag.stage("validate_session_id") if diag else None
try:
from api.profiles import _PROFILE_ID_RE
require(body, "session_id")
except ValueError as e:
return bad(handler, str(e))
diag.stage("get_session") if diag else None
try:
s = get_session(body["session_id"])
except KeyError:
return bad(handler, "Session not found", 404)
diag.stage("validate_profile") if diag else None
requested_profile = str(body.get("profile") or "").strip()
if requested_profile:
try:
from api.profiles import _PROFILE_ID_RE
if requested_profile != "default" and not _PROFILE_ID_RE.fullmatch(requested_profile):
return bad(handler, "invalid profile", 400)
except ImportError:
requested_profile = ""
if requested_profile and not _profiles_match(getattr(s, "profile", None), requested_profile):
has_persisted_turns = bool(
getattr(s, "messages", None)
or getattr(s, "context_messages", None)
or getattr(s, "pending_user_message", None)
)
if not has_persisted_turns:
# Empty sessions are placeholders. If the user switches profiles
# before sending the first turn, run the placeholder under the
# currently-selected profile instead of the stale one stamped at
# creation time.
s.profile = requested_profile
msg = str(body.get("message", "")).strip()
if not msg:
return bad(handler, "message is required")
attachments = _normalize_chat_attachments(body.get("attachments") or [])[:20]
try:
workspace = str(resolve_trusted_workspace(body.get("workspace") or s.workspace))
except ValueError as e:
return bad(handler, str(e))
requested_model = body.get("model") or s.model
requested_provider = (
body.get("model_provider")
if "model_provider" in body
else getattr(s, "model_provider", None)
)
model, model_provider, normalized_model = _resolve_compatible_session_model_state(
requested_model,
requested_provider,
)
# Prevent duplicate runs in the same session while a stream is still active.
# This commonly happens after page refresh/reconnect races and can produce
# duplicated clarify cards for what appears to be a single user request.
current_stream_id = getattr(s, "active_stream_id", None)
if current_stream_id:
with STREAMS_LOCK:
current_active = current_stream_id in STREAMS
if current_active:
return j(
handler,
{
"error": "session already has an active stream",
"active_stream_id": current_stream_id,
},
status=409,
if requested_profile != "default" and not _PROFILE_ID_RE.fullmatch(requested_profile):
return bad(handler, "invalid profile", 400)
except ImportError:
requested_profile = ""
if requested_profile and not _profiles_match(getattr(s, "profile", None), requested_profile):
has_persisted_turns = bool(
getattr(s, "messages", None)
or getattr(s, "context_messages", None)
or getattr(s, "pending_user_message", None)
)
# Stale stream id from a previous run; clear and continue.
_clear_stale_stream_state(s)
stream_id = uuid.uuid4().hex
with _get_session_agent_lock(s.session_id):
_prepare_chat_start_session_for_stream(
s,
msg=msg,
attachments=attachments,
workspace=workspace,
model=model,
model_provider=model_provider,
stream_id=stream_id,
if not has_persisted_turns:
# Empty sessions are placeholders. If the user switches profiles
# before sending the first turn, run the placeholder under the
# currently-selected profile instead of the stale one stamped at
# creation time.
s.profile = requested_profile
diag.stage("normalize_message") if diag else None
msg = str(body.get("message", "")).strip()
if not msg:
return bad(handler, "message is required")
diag.stage("normalize_attachments") if diag else None
attachments = _normalize_chat_attachments(body.get("attachments") or [])[:20]
diag.stage("resolve_workspace") if diag else None
try:
workspace = str(resolve_trusted_workspace(body.get("workspace") or s.workspace))
except ValueError as e:
return bad(handler, str(e))
requested_model = body.get("model") or s.model
requested_provider = (
body.get("model_provider")
if "model_provider" in body
else getattr(s, "model_provider", None)
)
set_last_workspace(workspace)
stream = create_stream_channel()
with STREAMS_LOCK:
STREAMS[stream_id] = stream
thr = threading.Thread(
target=_run_agent_streaming,
args=(s.session_id, msg, model, workspace, stream_id, attachments),
kwargs={"model_provider": model_provider},
daemon=True,
)
thr.start()
response = {
"stream_id": stream_id,
"session_id": s.session_id,
"pending_started_at": s.pending_started_at,
}
if normalized_model:
response["effective_model"] = model
if model_provider:
response["effective_model_provider"] = model_provider
return j(handler, response)
diag.stage("resolve_model_provider") if diag else None
model, model_provider, normalized_model = _resolve_compatible_session_model_state(
requested_model,
requested_provider,
)
# Prevent duplicate runs in the same session while a stream is still active.
# This commonly happens after page refresh/reconnect races and can produce
# duplicated clarify cards for what appears to be a single user request.
diag.stage("active_stream_check") if diag else None
current_stream_id = getattr(s, "active_stream_id", None)
if current_stream_id:
diag.stage("active_stream_lock_wait") if diag else None
with STREAMS_LOCK:
current_active = current_stream_id in STREAMS
if current_active:
diag.stage("response_write") if diag else None
return j(
handler,
{
"error": "session already has an active stream",
"active_stream_id": current_stream_id,
},
status=409,
)
# Stale stream id from a previous run; clear and continue.
diag.stage("stale_stream_cleanup") if diag else None
_clear_stale_stream_state(s)
stream_id = uuid.uuid4().hex
session_lock = _get_session_agent_lock(s.session_id)
diag.stage("session_lock_wait") if diag else None
with session_lock:
diag.stage("save_pending_state") if diag else None
_prepare_chat_start_session_for_stream(
s,
msg=msg,
attachments=attachments,
workspace=workspace,
model=model,
model_provider=model_provider,
stream_id=stream_id,
)
diag.stage("set_last_workspace") if diag else None
set_last_workspace(workspace)
diag.stage("stream_registration") if diag else None
stream = create_stream_channel()
with STREAMS_LOCK:
STREAMS[stream_id] = stream
diag.stage("worker_thread_start") if diag else None
thr = threading.Thread(
target=_run_agent_streaming,
args=(s.session_id, msg, model, workspace, stream_id, attachments),
kwargs={"model_provider": model_provider},
daemon=True,
)
thr.start()
response = {
"stream_id": stream_id,
"session_id": s.session_id,
"pending_started_at": s.pending_started_at,
}
if normalized_model:
response["effective_model"] = model
if model_provider:
response["effective_model_provider"] = model_provider
diag.stage("response_write") if diag else None
return j(handler, response)
finally:
if diag:
diag.finish()
def _normalize_chat_attachments(raw_attachments):
+110
View File
@@ -0,0 +1,110 @@
import json
import logging
from pathlib import Path
import api.models as models
from api.models import Session
from api.request_diagnostics import RequestDiagnostics
class _StageRecorder:
def __init__(self):
self.stages = []
def stage(self, name):
self.stages.append(name)
def test_request_diagnostics_timeout_record_includes_stage_and_thread_stacks(caplog):
logger = logging.getLogger("test.issue1855.timeout")
diag = RequestDiagnostics(
"GET",
"/api/sessions?all_profiles=1",
logger=logger,
timeout_seconds=5,
auto_start=False,
)
diag.stage("all_sessions.read_index")
with caplog.at_level(logging.WARNING, logger=logger.name):
diag._on_timeout()
assert len(caplog.records) == 1
record = json.loads(caplog.records[0].args[0])
assert record["method"] == "GET"
assert record["path"] == "/api/sessions"
assert record["current_stage"] == "all_sessions.read_index"
assert record["elapsed_ms"] >= 0
assert any(stage["name"] == "all_sessions.read_index" for stage in record["stages"])
assert record["thread_stacks"]
def test_request_diagnostics_maybe_start_is_limited_to_issue1855_paths():
assert RequestDiagnostics.maybe_start("GET", "/api/sessions") is not None
assert RequestDiagnostics.maybe_start("POST", "/api/chat/start") is not None
assert RequestDiagnostics.maybe_start("GET", "/health") is None
assert RequestDiagnostics.maybe_start("POST", "/api/session/new") is None
def test_all_sessions_reports_internal_index_stages(tmp_path, monkeypatch):
session_dir = tmp_path / "sessions"
session_dir.mkdir()
index_file = session_dir / "_index.json"
monkeypatch.setattr(models, "SESSION_DIR", session_dir)
monkeypatch.setattr(models, "SESSION_INDEX_FILE", index_file)
monkeypatch.setattr(models, "_enrich_sidebar_lineage_metadata", lambda sessions: None)
models.SESSIONS.clear()
s = Session(
session_id="issue1855_indexed",
title="Indexed",
messages=[{"role": "user", "content": "hi", "timestamp": 100}],
)
s.path.write_text(json.dumps(s.__dict__, ensure_ascii=False), encoding="utf-8")
index_file.write_text(
json.dumps(
[
{
"session_id": s.session_id,
"title": s.title,
"updated_at": s.updated_at,
"workspace": s.workspace,
"model": s.model,
"message_count": 1,
"created_at": s.created_at,
"pinned": False,
"archived": False,
"last_message_at": 100,
}
],
ensure_ascii=False,
),
encoding="utf-8",
)
diag = _StageRecorder()
rows = models.all_sessions(diag=diag)
assert [row["session_id"] for row in rows] == [s.session_id]
assert "all_sessions.read_index" in diag.stages
assert "all_sessions.overlay_lock" in diag.stages
assert "all_sessions.lineage_metadata" in diag.stages
def test_issue1855_target_routes_are_wired_to_diagnostics():
src = Path("api/routes.py").read_text(encoding="utf-8")
assert 'RequestDiagnostics.maybe_start("GET", parsed.path' in src
assert "all_sessions(diag=diag)" in src
assert 'RequestDiagnostics.maybe_start("POST", parsed.path' in src
assert "_handle_chat_start(handler, body, diag=diag)" in src
for stage in (
"read_body",
"resolve_model_provider",
"session_lock_wait",
"save_pending_state",
"stream_registration",
"worker_thread_start",
"response_write",
):
assert stage in src