mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-21 03:39:54 +00:00
refactor(gateway): drop JSONL fallback in load_transcript
state.db is canonical. The 'use whichever source is longer' branch was defensive code for the pre-DB migration; on every real DB it has not fired (verified on a session corpus with 27 jsonl files / 950 sessions — zero jsonl-bigger cases). Test changes: - TestLoadTranscriptCorruptLines: deleted (tested dead JSONL code path) - TestLoadTranscriptPreferLongerSource: deleted (tested removed fallback) - Replaced with TestLoadTranscriptDBOnly (DB-only reads) - TestSessionStoreRewriteTranscript: fixture now creates DB session - test_gateway_retry_replaces_last_user_turn: fixture uses real DB
This commit is contained in:
+12
-51
@@ -1312,58 +1312,19 @@ class SessionStore:
|
||||
f.write(json.dumps(msg, ensure_ascii=False) + "\n")
|
||||
|
||||
def load_transcript(self, session_id: str) -> List[Dict[str, Any]]:
|
||||
"""Load all messages from a session's transcript."""
|
||||
db_messages = []
|
||||
# Try SQLite first
|
||||
if self._db:
|
||||
try:
|
||||
db_messages = self._db.get_messages_as_conversation(session_id)
|
||||
except Exception as e:
|
||||
logger.debug("Could not load messages from DB: %s", e)
|
||||
"""Load all messages from a session's transcript.
|
||||
|
||||
# Load legacy JSONL transcript (may contain more history than SQLite
|
||||
# for sessions created before the DB layer was introduced).
|
||||
transcript_path = self.get_transcript_path(session_id)
|
||||
jsonl_messages = []
|
||||
if transcript_path.exists():
|
||||
try:
|
||||
with open(transcript_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
try:
|
||||
jsonl_messages.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(
|
||||
"Skipping corrupt line in transcript %s: %s",
|
||||
session_id, line[:120],
|
||||
)
|
||||
except OSError as e:
|
||||
# JSONL is the legacy compatibility store. If it becomes
|
||||
# unreadable, keep gateway recovery working by falling back to
|
||||
# SQLite rows loaded above (or [] when no DB exists).
|
||||
logger.debug("Failed to read JSONL transcript for %s: %s", session_id, e)
|
||||
|
||||
# Prefer whichever source has more messages.
|
||||
#
|
||||
# Background: when a session pre-dates SQLite storage (or when the DB
|
||||
# layer was added while a long-lived session was already active), the
|
||||
# first post-migration turn writes only the *new* messages to SQLite
|
||||
# (because _flush_messages_to_session_db skips messages already in
|
||||
# conversation_history, assuming they're persisted). On the *next*
|
||||
# turn load_transcript returns those few SQLite rows and ignores the
|
||||
# full JSONL history — the model sees a context of 1-4 messages instead
|
||||
# of hundreds. Using the longer source prevents this silent truncation.
|
||||
if len(jsonl_messages) > len(db_messages):
|
||||
if db_messages:
|
||||
logger.debug(
|
||||
"Session %s: JSONL has %d messages vs SQLite %d — "
|
||||
"using JSONL (legacy session not yet fully migrated)",
|
||||
session_id, len(jsonl_messages), len(db_messages),
|
||||
)
|
||||
return jsonl_messages
|
||||
|
||||
return db_messages
|
||||
state.db is the canonical store. The legacy JSONL fallback was removed
|
||||
in spec 002 — pre-DB sessions on existing disks have already been
|
||||
migrated (their DB row holds the full message history).
|
||||
"""
|
||||
if not self._db:
|
||||
return []
|
||||
try:
|
||||
return self._db.get_messages_as_conversation(session_id)
|
||||
except Exception as e:
|
||||
logger.debug("Could not load messages from DB: %s", e)
|
||||
return []
|
||||
|
||||
|
||||
def build_session_context(
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""Regression tests for /retry replacement semantics."""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -13,12 +13,10 @@ from gateway.session import SessionStore
|
||||
@pytest.mark.asyncio
|
||||
async def test_gateway_retry_replaces_last_user_turn_in_transcript(tmp_path):
|
||||
config = GatewayConfig()
|
||||
with patch("gateway.session.SessionStore._ensure_loaded"):
|
||||
store = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
store._db = None
|
||||
store._loaded = True
|
||||
store = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
|
||||
session_id = "retry_session"
|
||||
store._db.create_session(session_id=session_id, source="test")
|
||||
for msg in [
|
||||
{"role": "session_meta", "tools": []},
|
||||
{"role": "user", "content": "first question"},
|
||||
|
||||
+18
-168
@@ -1,6 +1,4 @@
|
||||
"""Tests for gateway session management."""
|
||||
|
||||
import builtins
|
||||
import json
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
@@ -503,19 +501,17 @@ class TestSenderPrefixWithBackfill:
|
||||
|
||||
|
||||
class TestSessionStoreRewriteTranscript:
|
||||
"""Regression: /retry and /undo must persist truncated history to disk."""
|
||||
"""Regression: /retry and /undo must persist truncated history to DB."""
|
||||
|
||||
@pytest.fixture()
|
||||
def store(self, tmp_path):
|
||||
config = GatewayConfig()
|
||||
with patch("gateway.session.SessionStore._ensure_loaded"):
|
||||
s = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
s._db = None # no SQLite for these tests
|
||||
s._loaded = True
|
||||
s = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
return s
|
||||
|
||||
def test_rewrite_replaces_jsonl(self, store, tmp_path):
|
||||
def test_rewrite_replaces_transcript(self, store, tmp_path):
|
||||
session_id = "test_session_1"
|
||||
store._db.create_session(session_id=session_id, source="test")
|
||||
# Write initial transcript
|
||||
for msg in [
|
||||
{"role": "user", "content": "hello"},
|
||||
@@ -538,6 +534,7 @@ class TestSessionStoreRewriteTranscript:
|
||||
|
||||
def test_rewrite_with_empty_list(self, store):
|
||||
session_id = "test_session_2"
|
||||
store._db.create_session(session_id=session_id, source="test")
|
||||
store.append_to_transcript(session_id, {"role": "user", "content": "hi"})
|
||||
|
||||
store.rewrite_transcript(session_id, [])
|
||||
@@ -546,171 +543,24 @@ class TestSessionStoreRewriteTranscript:
|
||||
assert reloaded == []
|
||||
|
||||
|
||||
class TestLoadTranscriptCorruptLines:
|
||||
"""Regression: corrupt JSONL lines (e.g. from mid-write crash) must be
|
||||
skipped instead of crashing the entire transcript load. GH-1193."""
|
||||
class TestLoadTranscriptDBOnly:
|
||||
"""After spec 002, load_transcript reads only from state.db."""
|
||||
|
||||
@pytest.fixture()
|
||||
def store(self, tmp_path):
|
||||
def test_db_only_returns_empty_for_nonexistent(self, tmp_path):
|
||||
config = GatewayConfig()
|
||||
with patch("gateway.session.SessionStore._ensure_loaded"):
|
||||
s = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
s._db = None
|
||||
s._loaded = True
|
||||
return s
|
||||
|
||||
def test_corrupt_line_skipped(self, store, tmp_path):
|
||||
session_id = "corrupt_test"
|
||||
transcript_path = store.get_transcript_path(session_id)
|
||||
transcript_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(transcript_path, "w") as f:
|
||||
f.write('{"role": "user", "content": "hello"}\n')
|
||||
f.write('{"role": "assistant", "content": "hi th') # truncated
|
||||
f.write("\n")
|
||||
f.write('{"role": "user", "content": "goodbye"}\n')
|
||||
|
||||
messages = store.load_transcript(session_id)
|
||||
assert len(messages) == 2
|
||||
assert messages[0]["content"] == "hello"
|
||||
assert messages[1]["content"] == "goodbye"
|
||||
|
||||
def test_all_lines_corrupt_returns_empty(self, store, tmp_path):
|
||||
session_id = "all_corrupt"
|
||||
transcript_path = store.get_transcript_path(session_id)
|
||||
transcript_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(transcript_path, "w") as f:
|
||||
f.write("not json at all\n")
|
||||
f.write("{truncated\n")
|
||||
|
||||
messages = store.load_transcript(session_id)
|
||||
assert messages == []
|
||||
|
||||
def test_valid_transcript_unaffected(self, store, tmp_path):
|
||||
session_id = "valid_test"
|
||||
store.append_to_transcript(session_id, {"role": "user", "content": "a"})
|
||||
store.append_to_transcript(session_id, {"role": "assistant", "content": "b"})
|
||||
|
||||
messages = store.load_transcript(session_id)
|
||||
assert len(messages) == 2
|
||||
assert messages[0]["content"] == "a"
|
||||
assert messages[1]["content"] == "b"
|
||||
|
||||
|
||||
class TestLoadTranscriptPreferLongerSource:
|
||||
"""Regression: load_transcript must return whichever source (SQLite or JSONL)
|
||||
has more messages to prevent silent truncation. GH-3212."""
|
||||
|
||||
@pytest.fixture()
|
||||
def store_with_db(self, tmp_path):
|
||||
"""SessionStore with both SQLite and JSONL active."""
|
||||
from hermes_state import SessionDB
|
||||
|
||||
config = GatewayConfig()
|
||||
with patch("gateway.session.SessionStore._ensure_loaded"):
|
||||
s = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
s._db = SessionDB(db_path=tmp_path / "state.db")
|
||||
s._loaded = True
|
||||
return s
|
||||
|
||||
def test_jsonl_longer_than_sqlite_returns_jsonl(self, store_with_db):
|
||||
"""Legacy session: JSONL has full history, SQLite has only recent turn."""
|
||||
sid = "legacy_session"
|
||||
store_with_db._db.create_session(session_id=sid, source="gateway", model="m")
|
||||
# JSONL has 10 messages (legacy history — written before SQLite existed)
|
||||
for i in range(10):
|
||||
role = "user" if i % 2 == 0 else "assistant"
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": role, "content": f"msg-{i}"}, skip_db=True,
|
||||
)
|
||||
# SQLite has only 2 messages (recent turn after migration)
|
||||
store_with_db._db.append_message(session_id=sid, role="user", content="new-q")
|
||||
store_with_db._db.append_message(session_id=sid, role="assistant", content="new-a")
|
||||
|
||||
result = store_with_db.load_transcript(sid)
|
||||
assert len(result) == 10
|
||||
assert result[0]["content"] == "msg-0"
|
||||
|
||||
def test_sqlite_longer_than_jsonl_returns_sqlite(self, store_with_db):
|
||||
"""Fully migrated session: SQLite has more (JSONL stopped growing)."""
|
||||
sid = "migrated_session"
|
||||
store_with_db._db.create_session(session_id=sid, source="gateway", model="m")
|
||||
# JSONL has 2 old messages
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "user", "content": "old-q"}, skip_db=True,
|
||||
)
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "assistant", "content": "old-a"}, skip_db=True,
|
||||
)
|
||||
# SQLite has 4 messages (superset after migration)
|
||||
for i in range(4):
|
||||
role = "user" if i % 2 == 0 else "assistant"
|
||||
store_with_db._db.append_message(session_id=sid, role=role, content=f"db-{i}")
|
||||
|
||||
result = store_with_db.load_transcript(sid)
|
||||
assert len(result) == 4
|
||||
assert result[0]["content"] == "db-0"
|
||||
|
||||
def test_sqlite_empty_falls_back_to_jsonl(self, store_with_db):
|
||||
"""No SQLite rows — falls back to JSONL (original behavior preserved)."""
|
||||
sid = "no_db_rows"
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "user", "content": "hello"}, skip_db=True,
|
||||
)
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "assistant", "content": "hi"}, skip_db=True,
|
||||
)
|
||||
|
||||
result = store_with_db.load_transcript(sid)
|
||||
assert len(result) == 2
|
||||
assert result[0]["content"] == "hello"
|
||||
|
||||
def test_both_empty_returns_empty(self, store_with_db):
|
||||
"""Neither source has data — returns empty list."""
|
||||
result = store_with_db.load_transcript("nonexistent")
|
||||
store = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
result = store.load_transcript("nonexistent")
|
||||
assert result == []
|
||||
|
||||
def test_equal_length_prefers_sqlite(self, store_with_db):
|
||||
"""When both have same count, SQLite wins (has richer fields like reasoning)."""
|
||||
sid = "equal_session"
|
||||
store_with_db._db.create_session(session_id=sid, source="gateway", model="m")
|
||||
# Write 2 messages to JSONL only
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "user", "content": "jsonl-q"}, skip_db=True,
|
||||
)
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "assistant", "content": "jsonl-a"}, skip_db=True,
|
||||
)
|
||||
# Write 2 different messages to SQLite only
|
||||
store_with_db._db.append_message(session_id=sid, role="user", content="db-q")
|
||||
store_with_db._db.append_message(session_id=sid, role="assistant", content="db-a")
|
||||
def test_db_only_returns_messages(self, tmp_path):
|
||||
config = GatewayConfig()
|
||||
store = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
sid = "db_only_session"
|
||||
store._db.create_session(session_id=sid, source="gateway", model="m")
|
||||
store._db.append_message(session_id=sid, role="user", content="db-q")
|
||||
store._db.append_message(session_id=sid, role="assistant", content="db-a")
|
||||
|
||||
result = store_with_db.load_transcript(sid)
|
||||
assert len(result) == 2
|
||||
# Should be the SQLite version (equal count → prefers SQLite)
|
||||
assert result[0]["content"] == "db-q"
|
||||
|
||||
def test_unreadable_jsonl_returns_sqlite(self, store_with_db, monkeypatch):
|
||||
"""Unreadable legacy JSONL must not hide valid SQLite history."""
|
||||
sid = "unreadable_jsonl"
|
||||
store_with_db._db.create_session(session_id=sid, source="gateway", model="m")
|
||||
store_with_db._db.append_message(session_id=sid, role="user", content="db-q")
|
||||
store_with_db._db.append_message(session_id=sid, role="assistant", content="db-a")
|
||||
|
||||
transcript_path = store_with_db.get_transcript_path(sid)
|
||||
transcript_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
transcript_path.write_text('{"role": "user", "content": "jsonl-q"}\n', encoding="utf-8")
|
||||
|
||||
real_open = builtins.open
|
||||
|
||||
def raise_for_transcript(path, *args, **kwargs):
|
||||
mode = args[0] if args else kwargs.get("mode", "r")
|
||||
if Path(path) == transcript_path and "r" in mode:
|
||||
raise OSError("simulated unreadable transcript")
|
||||
return real_open(path, *args, **kwargs)
|
||||
|
||||
monkeypatch.setattr(builtins, "open", raise_for_transcript)
|
||||
|
||||
result = store_with_db.load_transcript(sid)
|
||||
result = store.load_transcript(sid)
|
||||
assert len(result) == 2
|
||||
assert result[0]["content"] == "db-q"
|
||||
assert result[1]["content"] == "db-a"
|
||||
|
||||
@@ -23,12 +23,9 @@ from gateway.session import SessionSource, SessionStore, build_session_key
|
||||
|
||||
@pytest.fixture()
|
||||
def store(tmp_path):
|
||||
"""SessionStore with no SQLite, for fast unit tests."""
|
||||
"""SessionStore with SQLite — load_transcript reads from DB only."""
|
||||
config = GatewayConfig()
|
||||
with patch("gateway.session.SessionStore._ensure_loaded"):
|
||||
s = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
s._db = None
|
||||
s._loaded = True
|
||||
s = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
return s
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user