Files
hermes-webui/api/run_journal.py
T
2026-05-16 02:58:34 +00:00

285 lines
9.8 KiB
Python

"""Append-only WebUI run event journal helpers.
This is the first #1925 journal/replay slice. It mirrors SSE events emitted by
the existing in-process streaming path without changing execution ownership.
"""
from __future__ import annotations
import json
import os
import re
import threading
import time
from pathlib import Path
from typing import Iterable
RUN_JOURNAL_DIR_NAME = "_run_journal"
_SAFE_ID_RE = re.compile(r"^[A-Za-z0-9_.-]+$")
_WRITER_LOCKS: dict[tuple[str, str, str], threading.Lock] = {}
_WRITER_LOCKS_GUARD = threading.Lock()
_TERMINAL_SSE_EVENTS = {"done", "cancel", "apperror", "error", "stream_end"}
_FSYNC_MODE_ENV = "HERMES_WEBUI_RUN_JOURNAL_FSYNC"
_FSYNC_MODE_EAGER = "eager"
_FSYNC_MODE_TERMINAL_ONLY = "terminal-only"
def _default_session_dir() -> Path:
from api.models import SESSION_DIR
return Path(SESSION_DIR)
def _validate_id(value: str, field: str) -> str:
cleaned = str(value or "").strip()
if not cleaned or "/" in cleaned or "\\" in cleaned or not _SAFE_ID_RE.fullmatch(cleaned):
raise ValueError(f"invalid {field}")
return cleaned
def _run_path(session_id: str, run_id: str, session_dir: Path | None = None) -> Path:
sid = _validate_id(session_id, "session_id")
rid = _validate_id(run_id, "run_id")
root = Path(session_dir) if session_dir is not None else _default_session_dir()
return root / RUN_JOURNAL_DIR_NAME / sid / f"{rid}.jsonl"
def _lock_for(path: Path) -> threading.Lock:
key = (str(path.parent), path.name, str(os.getpid()))
with _WRITER_LOCKS_GUARD:
lock = _WRITER_LOCKS.get(key)
if lock is None:
lock = threading.Lock()
_WRITER_LOCKS[key] = lock
return lock
def _read_jsonl(path: Path) -> tuple[list[dict], list[dict]]:
events: list[dict] = []
malformed: list[dict] = []
try:
lines = path.read_text(encoding="utf-8").splitlines()
except FileNotFoundError:
return events, malformed
for line_no, raw in enumerate(lines, start=1):
if not raw.strip():
continue
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
malformed.append({"line": line_no, "raw": raw})
continue
if isinstance(parsed, dict):
events.append(parsed)
else:
malformed.append({"line": line_no, "raw": raw})
return events, malformed
def _next_seq(path: Path) -> int:
events, _malformed = _read_jsonl(path)
seqs = [int(event.get("seq") or 0) for event in events if isinstance(event.get("seq"), int)]
return (max(seqs) + 1) if seqs else 1
def _terminal_state_for_event(event_name: str, payload) -> str | None:
name = str(event_name or "")
if name == "done" or name == "stream_end":
return "completed"
if name == "cancel":
return "interrupted-by-user"
if name in {"apperror", "error"}:
err_type = str((payload or {}).get("type") or "").strip().lower() if isinstance(payload, dict) else ""
if err_type in {"cancelled", "canceled"}:
return "interrupted-by-user"
if err_type == "interrupted":
return "interrupted-by-crash"
return "errored"
return None
def _run_journal_fsync_mode() -> str:
raw = os.environ.get(_FSYNC_MODE_ENV, _FSYNC_MODE_TERMINAL_ONLY)
mode = str(raw or "").strip().lower()
if mode in {_FSYNC_MODE_EAGER, _FSYNC_MODE_TERMINAL_ONLY}:
return mode
return _FSYNC_MODE_TERMINAL_ONLY
def _should_fsync_event(terminal_state: str | None) -> bool:
if _run_journal_fsync_mode() == _FSYNC_MODE_EAGER:
return True
return bool(terminal_state)
def _fsync_parent_dir(path: Path) -> None:
try:
dir_fd = os.open(path.parent, getattr(os, "O_DIRECTORY", 0))
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)
except OSError:
pass
def append_run_event(
session_id: str,
run_id: str,
event_name: str,
payload=None,
*,
session_dir: Path | None = None,
seq: int | None = None,
created_at: float | None = None,
) -> dict:
"""Append one durable run event and fsync it according to the journal policy."""
path = _run_path(session_id, run_id, session_dir=session_dir)
payload = payload if payload is not None else {}
event_name = str(event_name or "").strip()
if not event_name:
raise ValueError("event_name is required")
with _lock_for(path):
assigned_seq = int(seq) if seq is not None else _next_seq(path)
terminal_state = _terminal_state_for_event(event_name, payload)
event = {
"version": 1,
"event_id": f"{run_id}:{assigned_seq}",
"seq": assigned_seq,
"run_id": str(run_id),
"session_id": str(session_id),
"event": event_name,
"type": event_name,
"created_at": float(created_at if created_at is not None else time.time()),
"terminal": bool(terminal_state),
"terminal_state": terminal_state,
"payload": payload,
}
path.parent.mkdir(parents=True, exist_ok=True)
created_file = not path.exists()
line = json.dumps(event, ensure_ascii=False, separators=(",", ":")) + "\n"
fd = os.open(path, os.O_CREAT | os.O_APPEND | os.O_WRONLY, 0o600)
with os.fdopen(fd, "a", encoding="utf-8") as fh:
fh.write(line)
fh.flush()
if _should_fsync_event(terminal_state):
os.fsync(fh.fileno())
if created_file:
_fsync_parent_dir(path)
return event
class RunJournalWriter:
"""Stateful writer for one WebUI stream/run."""
def __init__(self, session_id: str, run_id: str, *, session_dir: Path | None = None):
self.session_id = _validate_id(session_id, "session_id")
self.run_id = _validate_id(run_id, "run_id")
self.session_dir = Path(session_dir) if session_dir is not None else None
self._path = _run_path(self.session_id, self.run_id, session_dir=self.session_dir)
self._lock = _lock_for(self._path)
with self._lock:
self._next_seq = _next_seq(self._path)
def append_sse_event(self, event_name: str, payload=None) -> dict:
with self._lock:
seq = self._next_seq
self._next_seq += 1
return append_run_event(
self.session_id,
self.run_id,
event_name,
payload or {},
session_dir=self.session_dir,
seq=seq,
)
def read_run_events(
session_id: str,
run_id: str,
*,
after_seq: int | None = None,
session_dir: Path | None = None,
) -> dict:
path = _run_path(session_id, run_id, session_dir=session_dir)
events, malformed = _read_jsonl(path)
if after_seq is not None:
events = [event for event in events if int(event.get("seq") or 0) > int(after_seq)]
return {
"session_id": str(session_id),
"run_id": str(run_id),
"events": events,
"malformed": malformed,
}
def _summary_from_events(session_id: str, run_id: str, events: Iterable[dict]) -> dict:
ordered = [event for event in events if isinstance(event, dict)]
last = ordered[-1] if ordered else None
terminal_events = [event for event in ordered if event.get("terminal")]
terminal = next(
(event for event in reversed(terminal_events) if event.get("event") != "stream_end"),
terminal_events[-1] if terminal_events else None,
)
status = terminal.get("terminal_state") if terminal else ("running" if ordered else "unknown")
return {
"session_id": str(session_id),
"run_id": str(run_id),
"stream_id": str(run_id),
"event_count": len(ordered),
"last_seq": int((last or {}).get("seq") or 0),
"last_event_id": (last or {}).get("event_id"),
"terminal": bool(terminal),
"terminal_state": status,
"last_event": (last or {}).get("event"),
}
def latest_run_summary(session_id: str, run_id: str, *, session_dir: Path | None = None) -> dict:
journal = read_run_events(session_id, run_id, session_dir=session_dir)
return _summary_from_events(session_id, run_id, journal.get("events") or [])
def find_run_summary(run_id: str, *, session_dir: Path | None = None) -> dict | None:
rid = _validate_id(run_id, "run_id")
root = Path(session_dir) if session_dir is not None else _default_session_dir()
journal_root = root / RUN_JOURNAL_DIR_NAME
for path in journal_root.glob(f"*/{rid}.jsonl"):
session_id = path.parent.name
events, _malformed = _read_jsonl(path)
summary = _summary_from_events(session_id, rid, events)
summary["path"] = str(path)
return summary
return None
def stale_interrupted_event(session_id: str, run_id: str, *, after_seq: int | None = None) -> dict | None:
summary = latest_run_summary(session_id, run_id)
if summary.get("terminal") or not summary.get("event_count"):
return None
seq = int(summary.get("last_seq") or 0) + 1
if after_seq is not None and seq <= int(after_seq):
return None
payload = {
"type": "interrupted",
"message": "WebUI restarted or lost the live worker before this run finished.",
"hint": "The transcript was restored to the last journaled event. Start a new turn if you still need the task to continue.",
"session_id": session_id,
"stream_id": run_id,
"journal_last_seq": summary.get("last_seq"),
}
return {
"version": 1,
"event_id": f"{run_id}:{seq}",
"seq": seq,
"run_id": run_id,
"session_id": session_id,
"event": "apperror",
"type": "apperror",
"created_at": time.time(),
"terminal": True,
"terminal_state": "stale-from-restart",
"payload": payload,
"synthetic": True,
}