From 024a8e3ee90d79df5b6e58d2b09976eec0e12a89 Mon Sep 17 00:00:00 2001 From: yoniebans Date: Wed, 20 May 2026 09:20:09 +0200 Subject: [PATCH] refactor(gateway): drop JSONL fallback in load_transcript MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit state.db is canonical. The 'use whichever source is longer' branch was defensive code for the pre-DB migration; on every real DB it has not fired (verified on a session corpus with 27 jsonl files / 950 sessions — zero jsonl-bigger cases). Test changes: - TestLoadTranscriptCorruptLines: deleted (tested dead JSONL code path) - TestLoadTranscriptPreferLongerSource: deleted (tested removed fallback) - Replaced with TestLoadTranscriptDBOnly (DB-only reads) - TestSessionStoreRewriteTranscript: fixture now creates DB session - test_gateway_retry_replaces_last_user_turn: fixture uses real DB --- gateway/session.py | 63 ++---- tests/gateway/test_retry_replacement.py | 8 +- tests/gateway/test_session.py | 186 ++---------------- .../gateway/test_session_dm_thread_seeding.py | 7 +- 4 files changed, 35 insertions(+), 229 deletions(-) diff --git a/gateway/session.py b/gateway/session.py index ee90726a8b..52cf68753c 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -1312,58 +1312,19 @@ class SessionStore: f.write(json.dumps(msg, ensure_ascii=False) + "\n") def load_transcript(self, session_id: str) -> List[Dict[str, Any]]: - """Load all messages from a session's transcript.""" - db_messages = [] - # Try SQLite first - if self._db: - try: - db_messages = self._db.get_messages_as_conversation(session_id) - except Exception as e: - logger.debug("Could not load messages from DB: %s", e) + """Load all messages from a session's transcript. - # Load legacy JSONL transcript (may contain more history than SQLite - # for sessions created before the DB layer was introduced). - transcript_path = self.get_transcript_path(session_id) - jsonl_messages = [] - if transcript_path.exists(): - try: - with open(transcript_path, "r", encoding="utf-8") as f: - for line in f: - line = line.strip() - if line: - try: - jsonl_messages.append(json.loads(line)) - except json.JSONDecodeError: - logger.warning( - "Skipping corrupt line in transcript %s: %s", - session_id, line[:120], - ) - except OSError as e: - # JSONL is the legacy compatibility store. If it becomes - # unreadable, keep gateway recovery working by falling back to - # SQLite rows loaded above (or [] when no DB exists). - logger.debug("Failed to read JSONL transcript for %s: %s", session_id, e) - - # Prefer whichever source has more messages. - # - # Background: when a session pre-dates SQLite storage (or when the DB - # layer was added while a long-lived session was already active), the - # first post-migration turn writes only the *new* messages to SQLite - # (because _flush_messages_to_session_db skips messages already in - # conversation_history, assuming they're persisted). On the *next* - # turn load_transcript returns those few SQLite rows and ignores the - # full JSONL history — the model sees a context of 1-4 messages instead - # of hundreds. Using the longer source prevents this silent truncation. - if len(jsonl_messages) > len(db_messages): - if db_messages: - logger.debug( - "Session %s: JSONL has %d messages vs SQLite %d — " - "using JSONL (legacy session not yet fully migrated)", - session_id, len(jsonl_messages), len(db_messages), - ) - return jsonl_messages - - return db_messages + state.db is the canonical store. The legacy JSONL fallback was removed + in spec 002 — pre-DB sessions on existing disks have already been + migrated (their DB row holds the full message history). + """ + if not self._db: + return [] + try: + return self._db.get_messages_as_conversation(session_id) + except Exception as e: + logger.debug("Could not load messages from DB: %s", e) + return [] def build_session_context( diff --git a/tests/gateway/test_retry_replacement.py b/tests/gateway/test_retry_replacement.py index e62979cc73..571485caac 100644 --- a/tests/gateway/test_retry_replacement.py +++ b/tests/gateway/test_retry_replacement.py @@ -1,6 +1,6 @@ """Regression tests for /retry replacement semantics.""" -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock import pytest @@ -13,12 +13,10 @@ from gateway.session import SessionStore @pytest.mark.asyncio async def test_gateway_retry_replaces_last_user_turn_in_transcript(tmp_path): config = GatewayConfig() - with patch("gateway.session.SessionStore._ensure_loaded"): - store = SessionStore(sessions_dir=tmp_path, config=config) - store._db = None - store._loaded = True + store = SessionStore(sessions_dir=tmp_path, config=config) session_id = "retry_session" + store._db.create_session(session_id=session_id, source="test") for msg in [ {"role": "session_meta", "tools": []}, {"role": "user", "content": "first question"}, diff --git a/tests/gateway/test_session.py b/tests/gateway/test_session.py index dcd6ef9020..7e5aa1787c 100644 --- a/tests/gateway/test_session.py +++ b/tests/gateway/test_session.py @@ -1,6 +1,4 @@ """Tests for gateway session management.""" - -import builtins import json import pytest from pathlib import Path @@ -503,19 +501,17 @@ class TestSenderPrefixWithBackfill: class TestSessionStoreRewriteTranscript: - """Regression: /retry and /undo must persist truncated history to disk.""" + """Regression: /retry and /undo must persist truncated history to DB.""" @pytest.fixture() def store(self, tmp_path): config = GatewayConfig() - with patch("gateway.session.SessionStore._ensure_loaded"): - s = SessionStore(sessions_dir=tmp_path, config=config) - s._db = None # no SQLite for these tests - s._loaded = True + s = SessionStore(sessions_dir=tmp_path, config=config) return s - def test_rewrite_replaces_jsonl(self, store, tmp_path): + def test_rewrite_replaces_transcript(self, store, tmp_path): session_id = "test_session_1" + store._db.create_session(session_id=session_id, source="test") # Write initial transcript for msg in [ {"role": "user", "content": "hello"}, @@ -538,6 +534,7 @@ class TestSessionStoreRewriteTranscript: def test_rewrite_with_empty_list(self, store): session_id = "test_session_2" + store._db.create_session(session_id=session_id, source="test") store.append_to_transcript(session_id, {"role": "user", "content": "hi"}) store.rewrite_transcript(session_id, []) @@ -546,171 +543,24 @@ class TestSessionStoreRewriteTranscript: assert reloaded == [] -class TestLoadTranscriptCorruptLines: - """Regression: corrupt JSONL lines (e.g. from mid-write crash) must be - skipped instead of crashing the entire transcript load. GH-1193.""" +class TestLoadTranscriptDBOnly: + """After spec 002, load_transcript reads only from state.db.""" - @pytest.fixture() - def store(self, tmp_path): + def test_db_only_returns_empty_for_nonexistent(self, tmp_path): config = GatewayConfig() - with patch("gateway.session.SessionStore._ensure_loaded"): - s = SessionStore(sessions_dir=tmp_path, config=config) - s._db = None - s._loaded = True - return s - - def test_corrupt_line_skipped(self, store, tmp_path): - session_id = "corrupt_test" - transcript_path = store.get_transcript_path(session_id) - transcript_path.parent.mkdir(parents=True, exist_ok=True) - with open(transcript_path, "w") as f: - f.write('{"role": "user", "content": "hello"}\n') - f.write('{"role": "assistant", "content": "hi th') # truncated - f.write("\n") - f.write('{"role": "user", "content": "goodbye"}\n') - - messages = store.load_transcript(session_id) - assert len(messages) == 2 - assert messages[0]["content"] == "hello" - assert messages[1]["content"] == "goodbye" - - def test_all_lines_corrupt_returns_empty(self, store, tmp_path): - session_id = "all_corrupt" - transcript_path = store.get_transcript_path(session_id) - transcript_path.parent.mkdir(parents=True, exist_ok=True) - with open(transcript_path, "w") as f: - f.write("not json at all\n") - f.write("{truncated\n") - - messages = store.load_transcript(session_id) - assert messages == [] - - def test_valid_transcript_unaffected(self, store, tmp_path): - session_id = "valid_test" - store.append_to_transcript(session_id, {"role": "user", "content": "a"}) - store.append_to_transcript(session_id, {"role": "assistant", "content": "b"}) - - messages = store.load_transcript(session_id) - assert len(messages) == 2 - assert messages[0]["content"] == "a" - assert messages[1]["content"] == "b" - - -class TestLoadTranscriptPreferLongerSource: - """Regression: load_transcript must return whichever source (SQLite or JSONL) - has more messages to prevent silent truncation. GH-3212.""" - - @pytest.fixture() - def store_with_db(self, tmp_path): - """SessionStore with both SQLite and JSONL active.""" - from hermes_state import SessionDB - - config = GatewayConfig() - with patch("gateway.session.SessionStore._ensure_loaded"): - s = SessionStore(sessions_dir=tmp_path, config=config) - s._db = SessionDB(db_path=tmp_path / "state.db") - s._loaded = True - return s - - def test_jsonl_longer_than_sqlite_returns_jsonl(self, store_with_db): - """Legacy session: JSONL has full history, SQLite has only recent turn.""" - sid = "legacy_session" - store_with_db._db.create_session(session_id=sid, source="gateway", model="m") - # JSONL has 10 messages (legacy history — written before SQLite existed) - for i in range(10): - role = "user" if i % 2 == 0 else "assistant" - store_with_db.append_to_transcript( - sid, {"role": role, "content": f"msg-{i}"}, skip_db=True, - ) - # SQLite has only 2 messages (recent turn after migration) - store_with_db._db.append_message(session_id=sid, role="user", content="new-q") - store_with_db._db.append_message(session_id=sid, role="assistant", content="new-a") - - result = store_with_db.load_transcript(sid) - assert len(result) == 10 - assert result[0]["content"] == "msg-0" - - def test_sqlite_longer_than_jsonl_returns_sqlite(self, store_with_db): - """Fully migrated session: SQLite has more (JSONL stopped growing).""" - sid = "migrated_session" - store_with_db._db.create_session(session_id=sid, source="gateway", model="m") - # JSONL has 2 old messages - store_with_db.append_to_transcript( - sid, {"role": "user", "content": "old-q"}, skip_db=True, - ) - store_with_db.append_to_transcript( - sid, {"role": "assistant", "content": "old-a"}, skip_db=True, - ) - # SQLite has 4 messages (superset after migration) - for i in range(4): - role = "user" if i % 2 == 0 else "assistant" - store_with_db._db.append_message(session_id=sid, role=role, content=f"db-{i}") - - result = store_with_db.load_transcript(sid) - assert len(result) == 4 - assert result[0]["content"] == "db-0" - - def test_sqlite_empty_falls_back_to_jsonl(self, store_with_db): - """No SQLite rows — falls back to JSONL (original behavior preserved).""" - sid = "no_db_rows" - store_with_db.append_to_transcript( - sid, {"role": "user", "content": "hello"}, skip_db=True, - ) - store_with_db.append_to_transcript( - sid, {"role": "assistant", "content": "hi"}, skip_db=True, - ) - - result = store_with_db.load_transcript(sid) - assert len(result) == 2 - assert result[0]["content"] == "hello" - - def test_both_empty_returns_empty(self, store_with_db): - """Neither source has data — returns empty list.""" - result = store_with_db.load_transcript("nonexistent") + store = SessionStore(sessions_dir=tmp_path, config=config) + result = store.load_transcript("nonexistent") assert result == [] - def test_equal_length_prefers_sqlite(self, store_with_db): - """When both have same count, SQLite wins (has richer fields like reasoning).""" - sid = "equal_session" - store_with_db._db.create_session(session_id=sid, source="gateway", model="m") - # Write 2 messages to JSONL only - store_with_db.append_to_transcript( - sid, {"role": "user", "content": "jsonl-q"}, skip_db=True, - ) - store_with_db.append_to_transcript( - sid, {"role": "assistant", "content": "jsonl-a"}, skip_db=True, - ) - # Write 2 different messages to SQLite only - store_with_db._db.append_message(session_id=sid, role="user", content="db-q") - store_with_db._db.append_message(session_id=sid, role="assistant", content="db-a") + def test_db_only_returns_messages(self, tmp_path): + config = GatewayConfig() + store = SessionStore(sessions_dir=tmp_path, config=config) + sid = "db_only_session" + store._db.create_session(session_id=sid, source="gateway", model="m") + store._db.append_message(session_id=sid, role="user", content="db-q") + store._db.append_message(session_id=sid, role="assistant", content="db-a") - result = store_with_db.load_transcript(sid) - assert len(result) == 2 - # Should be the SQLite version (equal count → prefers SQLite) - assert result[0]["content"] == "db-q" - - def test_unreadable_jsonl_returns_sqlite(self, store_with_db, monkeypatch): - """Unreadable legacy JSONL must not hide valid SQLite history.""" - sid = "unreadable_jsonl" - store_with_db._db.create_session(session_id=sid, source="gateway", model="m") - store_with_db._db.append_message(session_id=sid, role="user", content="db-q") - store_with_db._db.append_message(session_id=sid, role="assistant", content="db-a") - - transcript_path = store_with_db.get_transcript_path(sid) - transcript_path.parent.mkdir(parents=True, exist_ok=True) - transcript_path.write_text('{"role": "user", "content": "jsonl-q"}\n', encoding="utf-8") - - real_open = builtins.open - - def raise_for_transcript(path, *args, **kwargs): - mode = args[0] if args else kwargs.get("mode", "r") - if Path(path) == transcript_path and "r" in mode: - raise OSError("simulated unreadable transcript") - return real_open(path, *args, **kwargs) - - monkeypatch.setattr(builtins, "open", raise_for_transcript) - - result = store_with_db.load_transcript(sid) + result = store.load_transcript(sid) assert len(result) == 2 assert result[0]["content"] == "db-q" assert result[1]["content"] == "db-a" diff --git a/tests/gateway/test_session_dm_thread_seeding.py b/tests/gateway/test_session_dm_thread_seeding.py index ef9f3ebee8..8c52225bf2 100644 --- a/tests/gateway/test_session_dm_thread_seeding.py +++ b/tests/gateway/test_session_dm_thread_seeding.py @@ -23,12 +23,9 @@ from gateway.session import SessionSource, SessionStore, build_session_key @pytest.fixture() def store(tmp_path): - """SessionStore with no SQLite, for fast unit tests.""" + """SessionStore with SQLite — load_transcript reads from DB only.""" config = GatewayConfig() - with patch("gateway.session.SessionStore._ensure_loaded"): - s = SessionStore(sessions_dir=tmp_path, config=config) - s._db = None - s._loaded = True + s = SessionStore(sessions_dir=tmp_path, config=config) return s