Files
hermes-webui/tests/test_gateway_sync.py
T
nesquena-hermes 58ad315dca v0.50.216: compression chains, renderer fixes, HTML preview, approval z-index, /steer fix, reasoning chip (#1075)
* fix(workspace): add .html/.htm to MIME_MAP so HTML preview renders correctly

MIME_MAP was missing entries for .html and .htm. The server fell back to
Content-Type: application/octet-stream, which browsers refuse to render as
HTML in an iframe — causing a blank white preview.

The rest of the pipeline was already correct: the iframe exists in
static/index.html, openFile() in static/workspace.js routes .html to
showPreview('html'), and _handle_file_raw() in api/routes.py sets the
correct CSP sandbox header when ?inline=1 is present. The only missing
piece was the MIME type.

* test(workspace): lock in MIME_MAP entry for .html/.htm

PR #1070 added .html/.htm → text/html to MIME_MAP in api/config.py
to fix the blank workspace HTML preview iframe. Without a direct
assertion on the MIME_MAP entries, the fix could silently regress
(the existing test_779_html_preview.py tests cover the iframe wiring,
the inline=1 query handling, and the CSP sandbox header — but none of
them touch MIME_MAP itself).

Add a single regression test that asserts MIME_MAP['.html'] and
MIME_MAP['.htm'] are both 'text/html' so any future removal of those
entries fails CI immediately.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(composer): raise .approval-card.visible z-index above .queue-card

.queue-card has z-index:2. .approval-card.visible had no z-index, so the
queue flyout would render on top of the approval card when both were visible
simultaneously — obscuring the Allow/Deny buttons.

Fix: add z-index:3 to .approval-card.visible so approvals always render
above the queue flyout. Approval is a blocking, security-relevant interaction
and must never be obscured by passive UI elements.

* test(composer): pin approval-card z-index > queue-card invariant

PR #1071 raises .approval-card.visible to z-index:3 so the security-
relevant Allow / Deny buttons stay clickable when the queue flyout is
also open. Without a regression test, a future CSS edit could silently
drop the z-index back below queue-card (z-index:2) and reintroduce the
bug — there is no automated UI test covering this stacking interaction.

Add a focused regex check that pins the invariant:
.approval-card.visible z-index must be strictly greater than
.queue-card z-index.

Modeled on the existing CSS-regex regression style in
tests/test_mobile_layout.py (test_profile_dropdown_not_clipped_by_overflow).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: intercept /steer /interrupt /queue before busy-mode routing in send()

Root cause: slash commands entered while the agent is busy never reached
the command dispatcher. send() enters the busy block and returns early at
line ~50, so the slash-command intercept (~line 56) is never reached.
The text was queued as a plain message. When it drained after the turn
ended, cmdSteer / cmdInterrupt ran on an idle session, saw no active stream,
and showed "No active task to stop."

Fix: at the top of the busy block, before checking busyMode, check if the
text starts with / and is one of the three control commands. If so, dispatch
the handler immediately and return. This lets the user type /steer, /interrupt,
or /queue at any time — including while the agent is mid-stream — and have
them execute against the live session.

Two new regression tests added:
- test_slash_commands_intercepted_before_busymode_routing: verifies the
  intercept appears before the busyMode routing in the busy block
- test_steer_intercept_calls_handler_directly: verifies the intercept calls
  _bc.fn(_pc.args) and returns, not queues

* test(busy-intercept): pin sync input-clear before await in slash intercept

PR #1072's intercept clears the msg input before awaiting the handler.
Order matters: if the await happens first (or if the clear is moved
inside the handler), the input still shows '/steer foo' for the duration
of the await. A reflexive second Enter press during that window — common
while waiting for the toast — re-runs send(): either re-fires the
handler (double-steer) or, if the turn just ended, falls through to the
non-busy slash dispatcher and drops a confusing "No active task to stop."

Add test_steer_intercept_clears_input_before_await pinning the order so
this UX invariant cannot silently regress.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: update steer i18n and settings copy — steer no longer interrupts

With the real /steer implementation (agent.steer() via /api/chat/steer),
steer injects a correction mid-turn WITHOUT interrupting the current stream.
The previous copy said "falls back to interrupt", "Steer (interrupt + send)",
etc. — accurate only for the old placeholder, not the real implementation.

Changes across all 6 locales (en/ru/es/de/zh/zh-Hant):
  cmd_steer:                  "falls back to interrupt" removed
  settings_busy_input_mode_steer: "interrupt + send" → "mid-turn correction"
  cmd_steer_fallback:         "interrupted" → "queued for next turn"
  busy_steer_fallback:        "interrupted instead" → "queued for next turn"
  settings_desc_busy_input_mode: "currently falls back to interrupt" removed

Also:
  static/index.html: inline fallback text updated to match
  static/commands.js: internal comment clarified (fallback = queue+cancel,
                      not "interrupt mode" which implies the primary action)

* fix(renderer): group consecutive blockquote lines into single element

Root cause: the old rule `s.replace(/^> (.+)$/gm, ...)` had three bugs:
  1. `.+` required at least one character — bare `>` lines (blank
     continuation lines) did not match and passed through as literal `>`
  2. Each matching line became its own `<blockquote>` element — a 10-line
     blockquote produced 10 stacked `<blockquote>` tags with no grouping
  3. When a fenced code block sat inside a blockquote, the fence-stash
     pass consumed the code content and left orphaned `>` lines that the
     old `.+` pattern could not match

Fix: replace the single-line regex with a group-based approach that matches
one or more consecutive `>` lines as a single block, strips the `>` prefix
from each line, passes each non-empty line through inlineMd(), turns blank
`>` lines into `<br>`, and wraps the entire group in one `<blockquote>`.

14 regression tests added covering:
- Single-line blockquotes (regression)
- Multi-line grouping (2 and 10 lines)
- Two separate blockquotes staying separate
- Bare `>` and `>text` (no space) edge cases
- Blank continuation lines → <br>
- Bold / italic / inline-code inside blockquotes
- Blockquote followed by normal paragraph

* fix(renderer): drop empty trailing line from blockquote match

The new group-based blockquote rule introduced in this PR captures the
trailing newline in its (?:\n|$) clause. After block.split('\n') that
trailing newline produces an empty final element. The original filter
only dropped lone bare '>' artifacts on the last line, so the empty
final element survived, and the .map(blank → '<br>') step turned it
into a phantom <br> immediately before </blockquote>.

Visible symptom: any blockquote whose source ends with \n (the common
case — a quote followed by another paragraph or end-of-message) renders
with an extra blank line at the bottom of the quote.

Reproducer:
  '> Hello\n\nThe rest of the message.'
    → '<blockquote>Hello\n<br></blockquote>\nThe rest of the message.'
                          ^^^ phantom <br>

Fix: replace the single-line filter with a while-loop that pops trailing
lines while they are either empty OR a bare '>'. This matches the
intent the Python test mirror in tests/test_blockquote_rendering.py
already had (the mirror was correct; the JS was not — that's why
the original tests passed despite the bug).

Also add four new regression tests in TestNoPhantomTrailingBr that pin
the no-trailing-<br> invariant for the common shapes:
  - input ending with \n
  - quote followed by paragraph (the real-world case)
  - multi-line quote ending with \n
  - quote with blank continuation + trailing \n (internal <br> stays,
    trailing <br> does not)

Verified end-to-end with node against the actual JS regex.
244 renderer-adjacent tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(renderer): comprehensive markdown fixes — strikethrough, task lists, CRLF, nested blockquotes

Five additional fixes on top of the blockquote grouping from the initial commit:

1. CRLF normalisation: strip \r\n → \n at start of renderMd so Windows
   line endings do not produce stray \r characters in rendered output

2. Strikethrough: ~~text~~ → <del>text</del> in both inlineMd() (for use
   inside blockquotes/lists) and the outer pass (for plain paragraphs).
   Added <del> to SAFE_TAGS and SAFE_INLINE so it is not HTML-escaped.

3. Task lists: - [x] / - [ ] items in unordered lists render as /☐
   via task-done/task-todo span wrappers. Checks [X] (uppercase) too.

4. Nested blockquotes: >> / >>> etc. now recurse so each level gets its
   own <blockquote> element rather than passing through as literal >.
   Implemented by extracting the blockquote rule into _applyBlockquotes()
   which calls itself recursively on the stripped inner content.

5. Lists inside blockquotes: > - item now renders <ul><li> inside the
   blockquote instead of a literal "- item" string. Task list items work
   inside blockquotes too (> - [x] done →  inside <blockquote><ul>).

Also fixed test_issue342.py search window (5000→10000 chars) — the CRLF
strip at the top of renderMd pushed the autolink regex past the old limit.

68 new tests in test_renderer_comprehensive.py + test_blockquote_rendering.py
covering all constructs, edge cases, and combinations.

* fix(renderer): restore space in blockquote prefix-strip regex

Commit 04e7b53 changed the blockquote prefix-strip regex from
  /^>[ \t]?/   (consume "> ", "\t>", or just ">")
to
  /^>[\t]?/    (only consume "\t>" or just ">")

The space character was dropped from the character class. Since
practically every blockquote an LLM produces is "> " (greater-than
followed by a space), this leaves a leading space artifact on every
stripped blockquote line. Worse, the leading space breaks the
list-detection regex `^(?:  )?[-*+] ` inside the new `_applyBlockquotes`
helper — that regex requires either zero or two leading spaces, never
one — so the new "list inside blockquote" feature never fired for
the canonical input shape `> - item`.

Reproducer (against the actual ui.js via node, before the fix):
  > Hello world         → <blockquote> Hello world</blockquote>
                                       ^ phantom leading space
  > Steps:              → <blockquote>Steps:
  > - one                  - one
  > - two                  - two</blockquote>
                          ^ literal text, NOT a <ul>; lists-in-quote feature broken
  > - [x] done          → blockquote with literal "[x] done", no checkbox span

Tests passed despite the bug because tests/test_blockquote_rendering.py
and tests/test_renderer_comprehensive.py validate against a Python
mirror (`_apply_blockquotes`) whose strip regex is `^>[ \t]?` — i.e.
the mirror is correct, the JS is not, and the static-mirror tests
can't catch the divergence. Same shape of bug as commit 94d63d0
(phantom <br> in trailing line) where the mirror was right and the JS
was wrong.

Fix: restore the space character in the strip regex's character class.

Add tests/test_renderer_js_behaviour.py — 11 tests that drive the
ACTUAL renderMd via node and assert on rendered output for the most
common LLM shapes (single-line quote, multi-line quote, list inside
quote, task list inside quote, nested >>>, strikethrough inside and
outside quote, top-level task list, quote followed by heading,
multi-paragraph quote with list, CRLF normalisation).

Verified: the buggy regex makes 6 of those 11 tests fail; the corrected
regex makes all 11 pass.

Suite: 2354 passed, 0 new failures.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Collapse agent session compression chains

* Restore upstream changelog entries

* fix(agent_sessions): bubble active compression chains to top by tip last_activity

The original PR merge kept the chain head's id/title/started_at and overrode
id/model/message_count/ended_at/end_reason from the tip — but did NOT override
last_activity. Since the projected list is sorted by last_activity DESC and
the WebUI sidebar surfaces updated_at = last_activity, an actively-used
compression chain whose tip is being edited NOW would sort by the ROOT's
old last_activity and fall below recently touched standalone sessions.

Reproducer (with the harness against actual code, before the fix):
  - root: started 30 days ago, last msg 30 days ago
  - tip:  started 28 days ago (parent_session_id=root), last msg 5 seconds ago
  - standalone: last msg 2 days ago

  Sidebar order with original PR:
    [0] standalone  (48h ago)
    [1] active_tip  (last_activity=root's 720h ago)  ← wrong

  Sidebar order after fix:
    [0] active_tip  (last_activity=tip's 0h ago)     ← correct
    [1] standalone  (48h ago)

This matches Hermes Agent's own list_sessions_rich projection at
hermes_state.py:903-909, which overrides "last_active" from the tip
exactly so that the agent CLI's session list orders the same way.

Add ``last_activity`` to the merge-from-tip key list, update the existing
test_compression_chain_collapses_to_latest_tip_in_sidebar assertion to
expect tip-derived updated_at, and add
test_compression_chain_bubbles_to_top_by_tip_activity locking in the
bubble-to-top invariant — without this regression test the previous
behaviour passed CI because no test exercised the sort order against a
mixed set of chains and standalone sessions.

The chain head's started_at (created_at) and title remain preserved, so
users can still find the conversation by its original date and name.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs: v0.50.216 release notes and version bump

Compression chains, renderer fixes, HTML preview, approval z-index, /steer fix.

* chore: gitignore local-only review harness directory

Adds .local-review/ to .gitignore so renderer drivers, sample inputs,
fixture builders, and other reviewer scratch files do not accidentally
get committed. Nothing under that path is ever shared in the repo;
keeping the entry tracked makes the boundary explicit for any future
contributor who creates the directory locally.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Keep reasoning chip visible for None effort

* test(reasoning): pin chip render output via node, not just source regex

The PR's static checks in test_reasoning_chip_btw_fixes.py validate the
shape of _applyReasoningChip (no display='none' literal, the right
classList.toggle call exists, the right label literals are in the
function body) but pass even if the runtime detail is wrong — for
example if `inactive` were inverted, _normalizeReasoningEffort
mishandled whitespace, or _formatReasoningEffortLabel returned the
wrong literal for an unknown input.

Add tests/test_reasoning_chip_js_behaviour.py — 11 tests that drive
the actual _applyReasoningChip() via node and assert on the rendered
DOM state for each effort value:

  TestChipAlwaysVisible
    - empty / null  -> "Default" label, inactive=true
    - "none"        -> "None" label, inactive=true
    - "low"/"high"  -> verbatim label, inactive=false
  TestNormalizationEdgeCases
    - "NONE"        -> normalises to "None"
    - "  none  "    -> trims and normalises
    - unknown junk  -> falls through visible, never hidden
  TestTitleAttributeAccessibility
    - title attribute carries the human-readable label for tooltip /
      screen-reader use

Sanity-checked against master's pre-fix ui.js: 11/11 fail (bug caught).
Against this PR's ui.js: 11/11 pass.

This pattern (drive the actual JS via node) caught two regex-only
regressions in PR #1073 where the Python mirror was correct while the
JS was broken. Same protection added here so the chip-visibility
contract can't silently break in a future refactor.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs: add #1074 to v0.50.216 changelog, bump test count to 2428

* fix(i18n): restore broken Unicode in Russian and Spanish steer strings

Commit 56c7a14 (fix: update steer i18n and settings copy) accidentally
stripped the `\u` prefix from Unicode escape sequences in two locales,
producing garbled literal hex strings visible to users:

  Spanish (es):
    - cmd_steer:                   correcci00f3n  → corrección
    - cmd_steer_fallback:          2014 en cola   → — en cola
    - busy_steer_fallback:         2014 en cola   → — en cola
    - settings_desc_busy_input_mode: qu00e9, est00e1, correcci00f3n → qué, está, corrección
    - settings_busy_input_mode_steer: correcci00f3n  → corrección

  Russian (ru):
    - settings_desc_busy_input_mode: the entire Cyrillic string was
      replaced with raw 4-hex-char code-points without the \u prefix
      (041e043f... instead of actual Cyrillic). Decoded:
      "Определяет поведение при отправке сообщения во время работы
      агента. Очередь ждёт; Прерывание отменяет и начинает заново;
      Steer внедряет коррекцию без прерывания."

Fix: write the correct characters directly (UTF-8 is the file encoding
so embedding them literally is cleaner than \u escapes for long text).

All other locales (en, de, zh, zh-Hant) were not affected — confirmed
by grepping for bare hex run-ons in the updated file.

Verified: node --check static/i18n.js passes; full pytest suite green
(2365 passed, 47 skipped).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs: remove duplicate compression chain entry from [Unreleased]

---------

Co-authored-by: nesquena-hermes <nesquena-hermes@users.noreply.github.com>
Co-authored-by: Nathan Esquenazi <nesquena@gmail.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Frank Song <franksong2702@gmail.com>
2026-04-25 21:06:31 -07:00

1025 lines
38 KiB
Python

"""
Tests for Phase 1: Real-time Gateway Session Sync.
Tests are ordered TDD-style:
1. Gateway sessions appear in /api/sessions when setting enabled
2. Gateway sessions excluded when setting disabled
3. Gateway sessions have correct metadata (source_tag, is_cli_session)
4. SSE stream endpoint opens and receives events
5. Watcher detects new sessions inserted into state.db
6. Settings UI has renamed label
"""
import json
import os
import pathlib
import sqlite3
import time
import urllib.error
import urllib.request
REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve()
from tests._pytest_port import BASE
def get(path):
with urllib.request.urlopen(BASE + path, timeout=10) as r:
return json.loads(r.read()), r.status
def post(path, body=None):
data = json.dumps(body or {}).encode()
req = urllib.request.Request(BASE + path, data=data,
headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=10) as r:
return json.loads(r.read()), r.status
except urllib.error.HTTPError as e:
try:
return json.loads(e.read()), e.code
except Exception:
return {}, e.code
def _get_test_state_dir():
"""Return the test state directory (matches conftest.py TEST_STATE_DIR).
conftest.py sets HERMES_WEBUI_TEST_STATE_DIR in the test-process environment
(via os.environ.setdefault) so that tests writing directly to state.db always
use the same path the test server was started with. If the env var is not
set (e.g. when running this file standalone), fall back to the conftest
formula: HERMES_HOME/webui-mvp-test.
"""
# Use _pytest_port which applies the same auto-derivation as conftest.py
from tests._pytest_port import TEST_STATE_DIR as _ptsd
return _ptsd
def _get_state_db_path():
"""Return path to the test state.db."""
return _get_test_state_dir() / 'state.db'
def _ensure_state_db():
"""Create state.db with sessions and messages tables if it doesn't exist.
Returns a connection. Does NOT delete existing data (safe for parallel tests).
"""
db_path = _get_state_db_path()
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript("""
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
user_id TEXT,
model TEXT,
started_at REAL NOT NULL,
message_count INTEGER DEFAULT 0,
title TEXT
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT,
timestamp REAL NOT NULL
);
""")
for column, ddl in (
('parent_session_id', 'ALTER TABLE sessions ADD COLUMN parent_session_id TEXT'),
('ended_at', 'ALTER TABLE sessions ADD COLUMN ended_at REAL'),
('end_reason', 'ALTER TABLE sessions ADD COLUMN end_reason TEXT'),
):
existing = {row[1] for row in conn.execute("PRAGMA table_info(sessions)").fetchall()}
if column not in existing:
conn.execute(ddl)
conn.commit()
return conn
def _insert_gateway_session(conn, session_id='20260401_120000_abcdefgh', source='telegram',
title='Telegram Chat', model='anthropic/claude-sonnet-4-5',
started_at=None, message_count=2):
"""Insert a gateway session into state.db."""
conn.execute(
"INSERT OR REPLACE INTO sessions (id, source, title, model, started_at, message_count) "
"VALUES (?, ?, ?, ?, ?, ?)",
(session_id, source, title, model, started_at or time.time(), message_count)
)
# Delete any existing messages for this session (idempotent re-insert)
conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
# Insert some messages
conn.execute(
"INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, 'user', ?, ?)",
(session_id, 'Hello from Telegram', started_at or time.time())
)
conn.execute(
"INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, 'assistant', ?, ?)",
(session_id, 'Hi there!', (started_at or time.time()) + 1)
)
conn.commit()
def _insert_agent_session_row(
conn,
session_id,
source='weixin',
title='Agent Session',
model='openai/gpt-5',
started_at=None,
parent_session_id=None,
ended_at=None,
end_reason=None,
messages=1,
):
"""Insert an agent session row with optional compression lineage."""
started_at = started_at or time.time()
conn.execute(
"INSERT OR REPLACE INTO sessions "
"(id, source, title, model, started_at, message_count, parent_session_id, ended_at, end_reason) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
session_id,
source,
title,
model,
started_at,
messages,
parent_session_id,
ended_at,
end_reason,
),
)
conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
for i in range(messages):
conn.execute(
"INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, ?, ?, ?)",
(
session_id,
'user' if i % 2 == 0 else 'assistant',
f'{title} message {i + 1}',
started_at + i,
),
)
conn.commit()
def _remove_test_sessions(conn, *session_ids):
"""Remove specific test sessions from state.db (parallel-safe cleanup)."""
for sid in session_ids:
conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
conn.commit()
def _cleanup_state_db():
"""Remove state.db if it exists (only used for tests that need a blank slate)."""
db_path = _get_state_db_path()
for p in [db_path, db_path.parent / 'state.db-wal', db_path.parent / 'state.db-shm']:
try:
p.unlink(missing_ok=True)
except Exception:
pass
# ── Tests ──────────────────────────────────────────────────────────────────
def test_gateway_sessions_appear_when_enabled():
"""Gateway sessions from state.db appear in /api/sessions when show_cli_sessions is on."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='gw_test_tg_001', source='telegram', title='TG Test Chat')
# Enable the setting
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
sessions = data.get('sessions', [])
gw_ids = [s['session_id'] for s in sessions if s.get('session_id') == 'gw_test_tg_001']
assert len(gw_ids) == 1, f"Expected gateway session gw_test_tg_001, got {[s['session_id'] for s in sessions]}"
finally:
try:
_remove_test_sessions(conn, 'gw_test_tg_001')
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_sessions_without_messages_are_hidden_from_sidebar():
"""Regression: empty agent session rows must not appear as broken sidebar entries."""
conn = _ensure_state_db()
empty_sid = 'gw_empty_no_messages_001'
try:
conn.execute(
"INSERT OR REPLACE INTO sessions (id, source, title, model, started_at, message_count) "
"VALUES (?, ?, ?, ?, ?, ?)",
(empty_sid, 'cron', 'Cron Session', 'openai/gpt-5', time.time(), 0),
)
conn.execute("DELETE FROM messages WHERE session_id = ?", (empty_sid,))
conn.commit()
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
sessions = data.get('sessions', [])
assert empty_sid not in {s.get('session_id') for s in sessions}, (
"Agent sessions with no readable message rows should be filtered before "
"they reach the sidebar; otherwise clicking them fails during import."
)
finally:
try:
_remove_test_sessions(conn, empty_sid)
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_watcher_hides_sessions_without_messages(monkeypatch):
"""Regression: SSE watcher must use the same importable-agent filter."""
conn = _ensure_state_db()
empty_sid = 'gw_empty_watcher_001'
live_sid = 'gw_live_watcher_001'
try:
conn.execute(
"INSERT OR REPLACE INTO sessions (id, source, title, model, started_at, message_count) "
"VALUES (?, ?, ?, ?, ?, ?)",
(empty_sid, 'cron', 'Empty Cron Session', 'openai/gpt-5', time.time(), 0),
)
conn.execute("DELETE FROM messages WHERE session_id = ?", (empty_sid,))
_insert_gateway_session(
conn,
session_id=live_sid,
source='cron',
title='Live Cron Session',
message_count=0,
)
import api.gateway_watcher as gateway_watcher
monkeypatch.setattr(gateway_watcher, '_get_state_db_path', _get_state_db_path)
sessions = gateway_watcher._get_agent_sessions_from_db()
ids = {s.get('session_id') for s in sessions}
live = next((s for s in sessions if s.get('session_id') == live_sid), None)
assert empty_sid not in ids
assert live is not None
assert live.get('message_count') == 2, (
"Watcher should fall back to actual message rows when stored "
"message_count is zero, matching the sidebar route."
)
finally:
try:
_remove_test_sessions(conn, empty_sid, live_sid)
conn.close()
except Exception:
pass
def test_compression_chain_collapses_to_latest_tip_in_sidebar():
"""Show one logical agent conversation for a compression continuation chain."""
conn = _ensure_state_db()
ids_to_remove = ('chain_root_001', 'chain_empty_mid_001', 'chain_tip_001')
t0 = time.time() - 600
try:
_insert_agent_session_row(
conn,
'chain_root_001',
title='Magazine Style PPT Skill',
started_at=t0,
ended_at=t0 + 100,
end_reason='compression',
messages=3,
)
_insert_agent_session_row(
conn,
'chain_empty_mid_001',
title='Magazine Style PPT Skill #2',
started_at=t0 + 101,
parent_session_id='chain_root_001',
ended_at=t0 + 200,
end_reason='compression',
messages=0,
)
_insert_agent_session_row(
conn,
'chain_tip_001',
title='Magazine Style PPT Skill #3',
started_at=t0 + 201,
parent_session_id='chain_empty_mid_001',
messages=2,
)
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
ids = {s.get('session_id') for s in data.get('sessions', [])}
tip = next((s for s in data.get('sessions', []) if s.get('session_id') == 'chain_tip_001'), None)
assert 'chain_tip_001' in ids
assert 'chain_root_001' not in ids
assert 'chain_empty_mid_001' not in ids
assert tip is not None
assert tip.get('title') == 'Magazine Style PPT Skill'
assert tip.get('message_count') == 2
# created_at = the chain head's started_at (preserves original conversation date)
assert abs(tip.get('created_at') - t0) < 0.01
# updated_at = the tip's last message timestamp so the sidebar entry
# bubbles to the top by true recency, not by the root's stale activity.
# tip messages are at t0+201 and t0+202, so last_activity = t0 + 202.
assert abs(tip.get('updated_at') - (t0 + 202)) < 0.01
from api.agent_sessions import read_importable_agent_session_rows
rows = read_importable_agent_session_rows(_get_state_db_path(), limit=None)
projected_tip = next((row for row in rows if row.get('id') == 'chain_tip_001'), None)
assert projected_tip is not None
assert projected_tip.get('title') == 'Magazine Style PPT Skill'
assert projected_tip.get('_lineage_root_id') == 'chain_root_001'
assert projected_tip.get('_lineage_tip_id') == 'chain_tip_001'
assert projected_tip.get('_compression_segment_count') == 3
finally:
try:
_remove_test_sessions(conn, *ids_to_remove)
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_compression_chain_with_empty_latest_tip_falls_back_to_latest_importable_segment():
"""Empty latest tips should not make the whole conversation disappear."""
conn = _ensure_state_db()
ids_to_remove = ('empty_tip_root_001', 'empty_tip_001')
t0 = time.time() - 500
try:
_insert_agent_session_row(
conn,
'empty_tip_root_001',
title='Long Conversation',
started_at=t0,
ended_at=t0 + 100,
end_reason='compression',
messages=2,
)
_insert_agent_session_row(
conn,
'empty_tip_001',
title='Long Conversation #2',
started_at=t0 + 101,
parent_session_id='empty_tip_root_001',
messages=0,
)
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
ids = {s.get('session_id') for s in data.get('sessions', [])}
assert 'empty_tip_root_001' in ids
assert 'empty_tip_001' not in ids
root = next((s for s in data.get('sessions', []) if s.get('session_id') == 'empty_tip_root_001'), None)
assert root and root.get('title') == 'Long Conversation'
finally:
try:
_remove_test_sessions(conn, *ids_to_remove)
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_compression_chain_with_all_empty_segments_is_hidden():
"""A compression chain with no importable segment should not appear."""
conn = _ensure_state_db()
ids_to_remove = ('all_empty_root_001', 'all_empty_tip_001')
t0 = time.time() - 450
try:
_insert_agent_session_row(
conn,
'all_empty_root_001',
title='Empty Long Conversation',
started_at=t0,
ended_at=t0 + 100,
end_reason='compression',
messages=0,
)
_insert_agent_session_row(
conn,
'all_empty_tip_001',
title='Empty Long Conversation #2',
started_at=t0 + 101,
parent_session_id='all_empty_root_001',
messages=0,
)
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
ids = {s.get('session_id') for s in data.get('sessions', [])}
assert 'all_empty_root_001' not in ids
assert 'all_empty_tip_001' not in ids
finally:
try:
_remove_test_sessions(conn, *ids_to_remove)
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_non_compression_child_is_not_collapsed_into_parent():
"""Parent/child relationships that are not compression continuations stay flat."""
conn = _ensure_state_db()
ids_to_remove = ('branch_parent_001', 'branch_child_001')
t0 = time.time() - 400
try:
_insert_agent_session_row(
conn,
'branch_parent_001',
title='Branch Parent',
started_at=t0,
ended_at=t0 + 100,
end_reason='branched',
messages=2,
)
_insert_agent_session_row(
conn,
'branch_child_001',
title='Branch Child',
started_at=t0 + 101,
parent_session_id='branch_parent_001',
messages=2,
)
from api.agent_sessions import read_importable_agent_session_rows
rows = read_importable_agent_session_rows(_get_state_db_path(), limit=None)
ids = {row.get('id') for row in rows}
assert 'branch_parent_001' in ids
assert 'branch_child_001' in ids
finally:
try:
_remove_test_sessions(conn, *ids_to_remove)
conn.close()
except Exception:
pass
def test_agent_session_limit_applies_after_compression_projection():
"""A long raw chain should count as one logical sidebar row before limiting."""
conn = _ensure_state_db()
chain_ids = [f'limit_chain_{i:03d}' for i in range(8)]
standalone_id = 'limit_standalone_001'
t0 = time.time() - 300
try:
for i, sid in enumerate(chain_ids):
_insert_agent_session_row(
conn,
sid,
title=f'Limit Chain #{i + 1}',
started_at=t0 + i,
parent_session_id=chain_ids[i - 1] if i else None,
ended_at=t0 + i + 0.5 if i < len(chain_ids) - 1 else None,
end_reason='compression' if i < len(chain_ids) - 1 else None,
messages=1,
)
_insert_agent_session_row(
conn,
standalone_id,
title='Limit Standalone',
started_at=t0 + 20,
messages=1,
)
from api.agent_sessions import read_importable_agent_session_rows
rows = read_importable_agent_session_rows(_get_state_db_path(), limit=2)
ids = [row.get('id') for row in rows]
assert len(rows) == 2
assert chain_ids[-1] in ids
assert standalone_id in ids
assert not any(sid in ids for sid in chain_ids[:-1])
chain = next(row for row in rows if row.get('id') == chain_ids[-1])
assert chain.get('title') == 'Limit Chain #1'
assert chain.get('_lineage_root_id') == chain_ids[0]
assert chain.get('_compression_segment_count') == len(chain_ids)
finally:
try:
_remove_test_sessions(conn, *(chain_ids + [standalone_id]))
conn.close()
except Exception:
pass
def test_compression_chain_bubbles_to_top_by_tip_activity():
"""An actively-used compression chain must surface in the sidebar by its
TIP's last activity, not by the (stale) root's last activity.
Without overriding ``last_activity`` from the tip, a long-running chain
whose tip is being actively edited NOW would sort by the root's old
timestamp and fall below recently touched standalone sessions — the
inverse of what users expect from "Show agent sessions" sorted by
recency. This regression test pins the override.
"""
conn = _ensure_state_db()
ids_to_remove = ('bubble_root_001', 'bubble_tip_001', 'bubble_standalone_001')
now = time.time()
# Root started long ago; tip is being edited "now" (very recent message)
root_started = now - 30 * 86400
root_ended = now - 28 * 86400
tip_started = root_ended + 1
tip_latest_msg = now - 5 # 5 seconds ago — most recent activity in the DB
# A standalone session active 2 days ago — older than tip, much newer
# than the root. Without the fix, the chain row sorts by ROOT's age and
# standalone wins; with the fix, the chain wins.
standalone_msg = now - 2 * 86400
try:
_insert_agent_session_row(
conn,
'bubble_root_001',
title='Bubble Root',
started_at=root_started,
ended_at=root_ended,
end_reason='compression',
messages=2,
)
# Override message timestamps so root's last_activity is genuinely old.
conn.execute("DELETE FROM messages WHERE session_id = 'bubble_root_001'")
conn.execute(
"INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, ?, ?, ?)",
('bubble_root_001', 'user', 'old root msg', root_started + 60),
)
_insert_agent_session_row(
conn,
'bubble_tip_001',
title='Bubble Tip',
started_at=tip_started,
parent_session_id='bubble_root_001',
messages=1,
)
conn.execute("DELETE FROM messages WHERE session_id = 'bubble_tip_001'")
conn.execute(
"INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, ?, ?, ?)",
('bubble_tip_001', 'user', 'fresh tip msg', tip_latest_msg),
)
_insert_agent_session_row(
conn,
'bubble_standalone_001',
title='Bubble Standalone',
started_at=now - 2 * 86400 - 60,
messages=1,
)
conn.execute("DELETE FROM messages WHERE session_id = 'bubble_standalone_001'")
conn.execute(
"INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, ?, ?, ?)",
('bubble_standalone_001', 'user', 'standalone msg', standalone_msg),
)
conn.commit()
from api.agent_sessions import read_importable_agent_session_rows
rows = read_importable_agent_session_rows(_get_state_db_path(), limit=200)
ids = [row.get('id') for row in rows]
# Filter out unrelated rows from the shared DB
ids = [i for i in ids if i in ('bubble_root_001', 'bubble_tip_001', 'bubble_standalone_001')]
assert 'bubble_tip_001' in ids, (
f"Compression tip must appear in projected output. ids={ids}"
)
assert 'bubble_root_001' not in ids, (
"Compression root row must be hidden once the tip is the active row."
)
tip_pos = ids.index('bubble_tip_001')
standalone_pos = ids.index('bubble_standalone_001') if 'bubble_standalone_001' in ids else -1
assert standalone_pos == -1 or tip_pos < standalone_pos, (
f"Active compression tip (last msg 5s ago) must sort BEFORE standalone "
f"session (last msg 2d ago). Got order: {ids}. "
f"This indicates merged.last_activity is the root's stale value, "
f"not the tip's recent value."
)
tip_row = next(r for r in rows if r['id'] == 'bubble_tip_001')
assert abs(tip_row['last_activity'] - tip_latest_msg) < 0.01, (
f"Projected tip's last_activity must equal the tip's most recent "
f"message timestamp ({tip_latest_msg}), not the root's "
f"({root_started + 60}). Got: {tip_row['last_activity']}"
)
finally:
try:
_remove_test_sessions(conn, *ids_to_remove)
conn.close()
except Exception:
pass
def test_gateway_sessions_excluded_when_disabled():
"""Gateway sessions are NOT returned when show_cli_sessions is off."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='gw_test_dc_001', source='discord', title='DC Test Chat')
# Ensure setting is off
post('/api/settings', {'show_cli_sessions': False})
data, status = get('/api/sessions')
assert status == 200
sessions = data.get('sessions', [])
gw_ids = [s['session_id'] for s in sessions if s.get('session_id') == 'gw_test_dc_001']
assert len(gw_ids) == 0, "Gateway session should not appear when setting is off"
finally:
try:
_remove_test_sessions(conn, 'gw_test_dc_001')
conn.close()
except Exception:
pass
def test_gateway_session_has_correct_metadata():
"""Gateway sessions include source_tag and is_cli_session fields."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='gw_meta_001', source='telegram', title='Meta Test')
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
sessions = data.get('sessions', [])
gw = next((s for s in sessions if s['session_id'] == 'gw_meta_001'), None)
assert gw is not None, "Gateway session not found"
assert gw.get('source_tag') == 'telegram', f"Expected source_tag=telegram, got {gw.get('source_tag')}"
assert gw.get('is_cli_session') is True, "is_cli_session should be True for agent sessions"
assert gw.get('title') == 'Meta Test'
finally:
try:
_remove_test_sessions(conn, 'gw_meta_001')
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_session_has_message_count():
"""Gateway sessions report correct message_count from state.db."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='gw_msg_001', source='discord', title='Msg Count Test', message_count=5)
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
sessions = data.get('sessions', [])
gw = next((s for s in sessions if s['session_id'] == 'gw_msg_001'), None)
assert gw is not None
assert gw.get('message_count') == 5, f"Expected message_count=5, got {gw.get('message_count')}"
finally:
try:
_remove_test_sessions(conn, 'gw_msg_001')
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_sessions_multiple_sources():
"""Sessions from multiple gateway sources (telegram, discord, slack) all appear."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='gw_multi_tg', source='telegram', title='TG Chat')
_insert_gateway_session(conn, session_id='gw_multi_dc', source='discord', title='DC Chat')
_insert_gateway_session(conn, session_id='gw_multi_sl', source='slack', title='SL Chat')
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
sessions = data.get('sessions', [])
gw_ids = {s['session_id'] for s in sessions if s.get('session_id') in ('gw_multi_tg', 'gw_multi_dc', 'gw_multi_sl')}
assert len(gw_ids) == 3, f"Expected 3 gateway sessions, got {len(gw_ids)}: {gw_ids}"
finally:
try:
_remove_test_sessions(conn, 'gw_multi_tg', 'gw_multi_dc', 'gw_multi_sl')
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_session_messages_readable():
"""Gateway session messages can be loaded via /api/session."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='gw_read_001', source='telegram', title='Readable')
post('/api/settings', {'show_cli_sessions': True})
data, status = get(f'/api/session?session_id=gw_read_001')
assert status == 200
msgs = data.get('session', {}).get('messages', [])
assert len(msgs) >= 2, f"Expected at least 2 messages, got {len(msgs)}"
assert msgs[0].get('role') == 'user'
assert msgs[0].get('content') == 'Hello from Telegram'
finally:
try:
_remove_test_sessions(conn, 'gw_read_001')
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_importing_older_gateway_session_preserves_original_timestamps_and_order():
"""Importing an older gateway session should not bump it above newer WebUI sessions."""
conn = _ensure_state_db()
older_started_at = time.time() - 1800
imported_sid = 'gw_import_old_001'
newer_webui_sid = None
try:
newer_webui, status = post('/api/session/new', {'model': 'openai/gpt-5'})
assert status == 200, newer_webui
newer_webui_sid = newer_webui['session']['session_id']
rename, rename_status = post(
'/api/session/rename',
{'session_id': newer_webui_sid, 'title': 'Newer WebUI Session'},
)
assert rename_status == 200, rename
_insert_gateway_session(
conn,
session_id=imported_sid,
source='discord',
title='Older imported gateway session',
started_at=older_started_at,
)
post('/api/settings', {'show_cli_sessions': True})
imported, imported_status = post('/api/session/import_cli', {'session_id': imported_sid})
assert imported_status == 200, imported
imported_session = imported['session']
assert abs(imported_session['created_at'] - older_started_at) < 2, imported_session
assert abs(imported_session['updated_at'] - older_started_at) < 5, imported_session
sessions_payload, sessions_status = get('/api/sessions')
assert sessions_status == 200, sessions_payload
ordered_ids = [item['session_id'] for item in sessions_payload.get('sessions', [])]
assert newer_webui_sid in ordered_ids, ordered_ids
assert imported_sid in ordered_ids, ordered_ids
assert ordered_ids.index(newer_webui_sid) < ordered_ids.index(imported_sid), ordered_ids
finally:
try:
_remove_test_sessions(conn, imported_sid)
conn.close()
except Exception:
pass
if imported_sid:
try:
post('/api/session/delete', {'session_id': imported_sid})
except Exception:
pass
if newer_webui_sid:
try:
post('/api/session/delete', {'session_id': newer_webui_sid})
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_sse_stream_endpoint_exists():
"""GET /api/sessions/gateway/stream returns a response (200 or 200-range)."""
# The SSE endpoint requires show_cli_sessions to be enabled
post('/api/settings', {'show_cli_sessions': True})
try:
req = urllib.request.Request(BASE + '/api/sessions/gateway/stream')
with urllib.request.urlopen(req, timeout=5) as r:
assert r.status in (200, 204), f"Expected 200/204, got {r.status}"
# SSE should have content-type text/event-stream
ctype = r.headers.get('Content-Type', '')
assert 'text/event-stream' in ctype, f"Expected text/event-stream, got {ctype}"
except Exception as e:
# Timeout is acceptable — means the connection is held open (SSE behavior)
if 'timed out' in str(e).lower() or 'timeout' in str(e).lower():
pass # Good: SSE keeps the connection open
else:
raise
finally:
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_sse_stream_probe_reports_status():
"""Probe mode returns JSON watcher status instead of holding open an SSE stream."""
post('/api/settings', {'show_cli_sessions': True})
try:
req = urllib.request.Request(BASE + '/api/sessions/gateway/stream?probe=1')
with urllib.request.urlopen(req, timeout=5) as r:
assert r.status == 200, f"Expected 200, got {r.status}"
ctype = r.headers.get('Content-Type', '')
assert 'application/json' in ctype, f"Expected application/json, got {ctype}"
data = json.loads(r.read().decode('utf-8'))
assert data['enabled'] is True
assert 'watcher_running' in data
assert data['fallback_poll_ms'] == 30000
finally:
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_webui_sessions_not_duplicated():
"""If a session_id exists both in WebUI store and state.db, it's not duplicated."""
# Create a WebUI session with a known ID
body = {}
d, _ = post('/api/session/new', body)
webui_sid = d['session']['session_id']
try:
# Insert the same session_id into state.db as a gateway session
conn = _ensure_state_db()
_insert_gateway_session(conn, session_id=webui_sid, source='telegram', title='Dup Test')
conn.close()
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
sessions = data.get('sessions', [])
matching = [s for s in sessions if s['session_id'] == webui_sid]
assert len(matching) == 1, f"Expected 1 entry for {webui_sid}, got {len(matching)}"
finally:
try:
conn2 = sqlite3.connect(str(_get_state_db_path()))
_remove_test_sessions(conn2, webui_sid)
conn2.close()
except Exception:
pass
post('/api/session/delete', {'session_id': webui_sid})
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_sessions_no_state_db():
"""When state.db doesn't exist, /api/sessions works fine (no gateway sessions)."""
_cleanup_state_db()
post('/api/settings', {'show_cli_sessions': True})
try:
data, status = get('/api/sessions')
assert status == 200
# Should succeed with just webui sessions (or empty)
assert 'sessions' in data
finally:
post('/api/settings', {'show_cli_sessions': False})
def test_cli_sessions_still_work():
"""CLI sessions (source='cli') still appear alongside gateway sessions."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='cli_legacy_001', source='cli', title='CLI Legacy')
_insert_gateway_session(conn, session_id='gw_new_001', source='telegram', title='GW New')
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
sessions = data.get('sessions', [])
agent_ids = {s['session_id'] for s in sessions if s.get('session_id') in ('cli_legacy_001', 'gw_new_001')}
assert len(agent_ids) == 2, f"Expected 2 agent sessions (cli + gateway), got {len(agent_ids)}"
finally:
try:
_remove_test_sessions(conn, 'cli_legacy_001', 'gw_new_001')
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
# ── Unit tests for _gateway_sse_probe_payload ────────────────────────────────
# These replace the deleted repo-root test_gateway_sse_probe_unit.py and account
# for the watcher_alive check (thread existence + is_alive()).
import sys
import threading
sys.path.insert(0, str(REPO_ROOT))
from api.routes import _gateway_sse_probe_payload
def test_probe_payload_when_disabled():
"""Probe returns 404 when show_cli_sessions is False."""
body, status = _gateway_sse_probe_payload({'show_cli_sessions': False}, watcher=None)
assert status == 404
assert body['ok'] is False
assert body['enabled'] is False
assert body['watcher_running'] is False
assert body['error'] == 'agent sessions not enabled'
assert body['fallback_poll_ms'] == 30000
def test_probe_payload_when_watcher_missing():
"""Probe returns 503 when enabled but no watcher instance."""
body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=None)
assert status == 503
assert body['ok'] is False
assert body['enabled'] is True
assert body['watcher_running'] is False
assert body['error'] == 'watcher not started'
assert body['fallback_poll_ms'] == 30000
def test_probe_payload_when_watcher_instance_no_thread():
"""Probe returns 503 when watcher exists but _thread attribute is missing/None."""
class _FakeWatcher:
_thread = None
body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=_FakeWatcher())
assert status == 503
assert body['watcher_running'] is False
def test_probe_payload_when_watcher_thread_alive():
"""Probe returns 200 when enabled and watcher thread is alive."""
class _FakeWatcher:
pass
w = _FakeWatcher()
t = threading.Thread(target=lambda: None)
t.daemon = True
t.start()
w._thread = t
# Thread may finish fast — loop-start a live daemon thread for reliability
import time as _time
done = threading.Event()
live = threading.Thread(target=done.wait, daemon=True)
live.start()
w._thread = live
try:
body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=w)
assert status == 200
assert body['ok'] is True
assert body['watcher_running'] is True
assert body['fallback_poll_ms'] == 30000
finally:
done.set()
live.join(timeout=1)
def test_probe_payload_when_watcher_thread_dead():
"""Probe returns 503 when watcher instance exists but thread has exited."""
class _FakeWatcher:
pass
w = _FakeWatcher()
t = threading.Thread(target=lambda: None)
t.start()
t.join() # wait for it to finish
w._thread = t
body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=w)
assert status == 503
assert body['watcher_running'] is False
assert body['ok'] is False
def test_gateway_watcher_is_alive_public_method():
"""GatewayWatcher.is_alive() is the public API the probe uses. Cover all
three states: before start(), while running, after stop()."""
from api.gateway_watcher import GatewayWatcher
w = GatewayWatcher()
# Before start(): no thread
assert w.is_alive() is False, "is_alive() must be False before start()"
# After start(): thread running
w.start()
try:
assert w.is_alive() is True, "is_alive() must be True while running"
finally:
w.stop()
# After stop(): thread cleared
assert w.is_alive() is False, "is_alive() must be False after stop()"
def test_probe_payload_prefers_public_is_alive():
"""Regression guard: _gateway_sse_probe_payload must call watcher.is_alive()
rather than poking at _thread directly when the public method exists."""
calls = []
class _WatcherWithPublicApi:
def is_alive(self):
calls.append('is_alive')
return True
# _thread is deliberately absent — must not be accessed.
body, status = _gateway_sse_probe_payload(
{'show_cli_sessions': True},
watcher=_WatcherWithPublicApi(),
)
assert status == 200
assert body['watcher_running'] is True
assert calls == ['is_alive'], (
"probe must prefer the public is_alive() method over poking _thread"
)