Files
hermes-webui/tests/test_turn_journal.py
2026-05-15 16:39:45 -07:00

204 lines
6.8 KiB
Python

import json
import api.turn_journal as turn_journal
from api.session_recovery import audit_session_recovery
from api.turn_journal import (
append_turn_journal_event,
derive_turn_journal_states,
read_turn_journal,
)
def _write_session(session_dir, sid, messages=None):
payload = {
"session_id": sid,
"title": "Turn journal test",
"messages": messages or [],
}
(session_dir / f"{sid}.json").write_text(json.dumps(payload), encoding="utf-8")
def test_append_turn_journal_event_fsyncs_jsonl_and_preserves_payload(tmp_path):
event = append_turn_journal_event(
"sid-1",
{
"event": "submitted",
"turn_id": "turn-1",
"stream_id": "stream-1",
"role": "user",
"content": "hello",
"attachments": [{"name": "a.png", "path": "/tmp/a.png"}],
},
session_dir=tmp_path,
)
assert event["version"] == 1
assert event["session_id"] == "sid-1"
assert event["created_at"] > 0
journal_path = tmp_path / "_turn_journal" / "sid-1.jsonl"
assert journal_path.exists()
lines = journal_path.read_text(encoding="utf-8").splitlines()
assert len(lines) == 1
assert json.loads(lines[0])["content"] == "hello"
def test_read_turn_journal_tolerates_malformed_lines(tmp_path):
journal_dir = tmp_path / "_turn_journal"
journal_dir.mkdir()
(journal_dir / "sid-1.jsonl").write_text(
'{"event":"submitted","turn_id":"turn-1","session_id":"sid-1"}\n'
'not-json\n'
'{"event":"completed","turn_id":"turn-1","session_id":"sid-1"}\n',
encoding="utf-8",
)
result = read_turn_journal("sid-1", session_dir=tmp_path)
assert [event["event"] for event in result["events"]] == ["submitted", "completed"]
assert result["malformed"] == [{"line": 2, "raw": "not-json"}]
def test_append_turn_journal_event_locks_around_write_and_fsync(tmp_path, monkeypatch):
calls = []
class FakeFcntl:
LOCK_EX = 1
LOCK_UN = 2
@staticmethod
def flock(fd, flag):
calls.append((fd, flag))
monkeypatch.setattr(turn_journal, "_fcntl", FakeFcntl)
append_turn_journal_event(
"sid-1",
{"event": "submitted", "turn_id": "turn-locked", "content": "x" * 5000},
session_dir=tmp_path,
)
assert [flag for _, flag in calls] == [FakeFcntl.LOCK_EX, FakeFcntl.LOCK_UN]
def test_append_turn_journal_event_still_writes_when_fcntl_unavailable(tmp_path, monkeypatch):
monkeypatch.setattr(turn_journal, "_fcntl", None)
append_turn_journal_event(
"sid-1",
{"event": "submitted", "turn_id": "turn-no-fcntl", "content": "hello"},
session_dir=tmp_path,
)
result = read_turn_journal("sid-1", session_dir=tmp_path)
assert result["events"][0]["turn_id"] == "turn-no-fcntl"
def test_derive_turn_journal_states_keeps_latest_event_per_turn():
states, _ = derive_turn_journal_states([
{"event": "submitted", "turn_id": "turn-1", "created_at": 1},
{"event": "worker_started", "turn_id": "turn-1", "created_at": 2},
{"event": "submitted", "turn_id": "turn-2", "created_at": 3},
{"event": "completed", "turn_id": "turn-1", "created_at": 4},
])
assert states["turn-1"]["event"] == "completed"
assert states["turn-2"]["event"] == "submitted"
def test_derive_turn_journal_states_uses_created_at_not_file_order():
states, _ = derive_turn_journal_states([
{"event": "completed", "turn_id": "turn-1", "created_at": 20},
{"event": "submitted", "turn_id": "turn-1", "created_at": 10},
])
assert states["turn-1"]["event"] == "completed"
def test_audit_reports_pending_turn_journal_entry_when_user_message_absent(tmp_path):
_write_session(tmp_path, "sid-1", messages=[])
append_turn_journal_event(
"sid-1",
{
"event": "submitted",
"turn_id": "turn-1",
"stream_id": "stream-1",
"role": "user",
"content": "recover me",
"attachments": [],
},
session_dir=tmp_path,
)
report = audit_session_recovery(tmp_path)
assert report["status"] == "warn"
assert report["summary"]["repairable"] == 1
assert report["items"] == [
{
"session_id": "sid-1",
"kind": "turn_journal_pending_turn",
"category": "repairable",
"recommendation": "audit_only_pending_turn_journal",
"live_messages": 0,
"bak_messages": -1,
"turn_id": "turn-1",
"event": "submitted",
}
]
def test_audit_ignores_completed_or_already_materialized_turn_journal_entry(tmp_path):
_write_session(tmp_path, "sid-1", messages=[{"role": "user", "content": "already there"}])
append_turn_journal_event(
"sid-1",
{
"event": "submitted",
"turn_id": "turn-1",
"role": "user",
"content": "already there",
},
session_dir=tmp_path,
)
append_turn_journal_event(
"sid-1",
{"event": "completed", "turn_id": "turn-1"},
session_dir=tmp_path,
)
report = audit_session_recovery(tmp_path)
assert report["status"] == "ok"
assert report["items"] == []
def test_derive_turn_journal_states_reports_terminal_collision_when_both_completed_and_interrupted():
# A turn that recorded both completed and interrupted terminal events should
# not silently collapse to one winner — the collision must be reported.
events = [
{'event': 'submitted', 'turn_id': 'turn-double-terminal', 'created_at': 1},
{'event': 'worker_started', 'turn_id': 'turn-double-terminal', 'created_at': 2},
{'event': 'completed', 'turn_id': 'turn-double-terminal', 'created_at': 3},
{'event': 'interrupted', 'turn_id': 'turn-double-terminal', 'created_at': 4, 'reason': 'server_restart'},
]
states, collisions = derive_turn_journal_states(events)
# Derived state still picks the latest by timestamp (interrupted)
assert states['turn-double-terminal']['event'] == 'interrupted'
# But the collision is explicitly reported so callers can audit it
assert len(collisions) == 1
assert collisions[0]['turn_id'] == 'turn-double-terminal'
assert [e['event'] for e in collisions[0]['events']] == ['completed', 'interrupted']
def test_derive_turn_journal_states_no_collision_when_single_terminal():
# A normal turn with only one terminal event must not produce a collision.
events = [
{'event': 'submitted', 'turn_id': 'turn-normal', 'created_at': 1},
{'event': 'worker_started', 'turn_id': 'turn-normal', 'created_at': 2},
{'event': 'completed', 'turn_id': 'turn-normal', 'created_at': 3},
]
states, collisions = derive_turn_journal_states(events)
assert states['turn-normal']['event'] == 'completed'
assert collisions == []