mirror of
https://github.com/nesquena/hermes-webui.git
synced 2026-05-24 18:50:15 +00:00
ad8e10304c
* fix: remove orphaned i18n keys from top-level LOCALES object Three Traditional Chinese translation keys (cmd_status, memory_saved, profile_delete_title) were placed outside any locale block between the en and ru blocks in static/i18n.js. They became top-level properties of the LOCALES object, causing them to appear as invalid language options in the Settings > Preferences dropdown. The correct translations already exist in the zh-Hant locale block. Fixes #1008 * fix: block stale SSE events from polluting new session's DOM - appendThinking(): guard with !S.session||!S.activeStreamId to drop events from a previous session's SSE stream during a session switch - appendLiveToolCard(): same guard for consistency - finalizeThinkingCard(): scroll thinking-card-body to top when scroll is pinned, so completed response is immediately visible - appendThinking(): auto-scroll thinking card body to bottom while streaming if user is watching (scroll pinned) * Fix empty agent sessions in sidebar * fix: resolve cron UI UX issues — icon ambiguity, toast overlap, running status Fixes #995 — three sub-issues in the Cron Jobs UI: 1. Dual play icons ambiguous: Resume button now shows a distinct play+bar icon (play triangle + vertical line) instead of the identical triangle used by Run now. 2. Toast notification overlapping header buttons: Added position:relative; z-index:10 to .main-view-header so it stacks above the fixed toast (z-index:100 within its layer). 3. No running status after trigger: After triggering a job, the status badge immediately shows 'running…' with a CSS spinner animation, and polls the cron list every 3s (up to 30s) to refresh when the job completes. - Added cron_status_running i18n key in all 5 locales (en, es, de, ru, zh, zh-Hant) - Added .detail-badge.running CSS class with spinner animation - New functions: _setCronDetailStatus(), _startCronRunningPoll() * fix(#1011): address review feedback — poll cleanup, badge persistence, 30s fallback - _clearCronDetail() now clears _cronRunningPoll interval on navigation - Poll re-applies 'running' badge after loadCrons() re-render (prevents flicker) - When poll ends (30s max), detail re-renders with actual status as fallback * feat: create folder and add space directly from UI (#782) - After creating a folder via the file tree New folder button, offer to add it as a space via confirm dialog - Add Create folder if it doesnt exist checkbox in the New Space form - Backend: support create flag in /api/workspaces/add to mkdir before validation - i18n: 4 new keys (folder_add_as_space_title/msg/btn, workspace_auto_create_folder) in all 6 locales * fix: validate workspace path before mkdir to prevent orphan directories Review feedback (critical): the previous code called mkdir() before validate_workspace_to_add(), which meant a rejected path (e.g. system dir) would leave an orphan directory on disk. New flow: 1. Resolve path and check against blocked system roots BEFORE any mutation 2. mkdir() only if path passes the blocklist check 3. Full validation (exists, is_dir) after mkdir Also imports _workspace_blocked_roots for the pre-mutation blocklist check. * fix(#1014): classify model-not-found errors with helpful message - Add model_not_found error type to streaming.py exception classifier - Detect 404, 'not found', 'does not exist', 'invalid model' patterns - Strip HTML tags from provider error messages (nginx 404 pages, etc.) - Add model_not_found branch to apperror handler in messages.js - Add i18n key model_not_found_label in all 6 locales - 15 tests covering detection, sanitization, frontend, and i18n * feat(ui): add live TPS stat to header Adds a TPS (Tokens Per Second) chip to the right of the header title bar that updates live while AI output is streaming. Metering (api/metering.py) - Tracks per-session output + reasoning tokens via GlobalMeter singleton - Per-session TPS = total_tokens / elapsed_time - Global TPS = average of active sessions' TPS values - HIGH/LOW are max/min of global_tps snapshots over a 60-minute rolling window (only recorded when > 0, so idle periods are excluded) - Thread-safe with a single lock Metering events emitted from streaming.py - Throttled at 100ms from token/reasoning/tool callbacks so the display updates rapidly during fast token streams - 1Hz ticker as fallback for slow streams (exits when no active sessions) - Final stats emitted on stream end Routes (api/routes.py) - Removed POST /api/metering/interval endpoint (dynamic interval via focus/blur was replaced with simple always-1s-when-active approach) UI (static/messages.js, index.html, style.css) - TPS chip in titlebar: shows 'N.N t/s . N.N high . N.N low' - Default: '0.0 t/s . 0.0 high' when idle - Display updates on every metering SSE event (throttled to 100ms) * feat: session restore speed + title gen reasoning hardening (#1025, #1026) PR #1025 (@franksong2702): Speed up large session restore paths - GET /api/session?messages=0 now parses only metadata before the messages array - Metadata-only loads no longer populate the full-session LRU cache - Frontend lazy fetch uses resolve_model=0 to avoid cold model-catalog lookup - Hard reload no longer waits for populateModelDropdown() before restoring session PR #1026 (@franksong2702): Harden auto title generation for reasoning models - Raises title-gen completion budget to 512 tokens (reasoning-safe) - Retries once with 1024 tokens on empty content / finish_reason:length - Applies retry to both auxiliary and active-agent fallback routes - Preserves underlying failure reason in title_status on local fallback Co-authored-by: Frank Song <franksong2702@gmail.com> * feat: session attention indicators in right slot + last_message_at timestamps (#1024) PR #1024 (@franksong2702): Polish session attention indicators - Streaming spinners and unread dots now reuse the right-side actions slot - Running/unread rows hide timestamps; idle/read rows keep right-aligned timestamps - Date group carets point down when expanded, right when collapsed - Pinned group no longer repeats pinned-star icon per row - Running indicators appear immediately after send (local busy state while /api/sessions catches up) - Sidebar sorting/grouping/timestamps now prefer last_message_at (derived from last real message) so metadata-only saves don't make old sessions appear under Today Co-authored-by: Frank Song <franksong2702@gmail.com> * docs: v0.50.207 release notes — 10 PRs, 2169 tests (+36) --------- Co-authored-by: bergeouss <bergeouss@users.noreply.github.com> Co-authored-by: Josh <josh@fyul.link> Co-authored-by: Frank Song <franksong2702@gmail.com> Co-authored-by: nesquena-hermes <nesquena-hermes@users.noreply.github.com>
793 lines
30 KiB
Python
793 lines
30 KiB
Python
"""Hermes Web UI -- Session model and in-memory session store."""
|
|
import collections
|
|
import json
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
import api.config as _cfg
|
|
from api.config import (
|
|
SESSION_DIR, SESSION_INDEX_FILE, SESSIONS, SESSIONS_MAX,
|
|
LOCK, STREAMS, STREAMS_LOCK, DEFAULT_WORKSPACE, DEFAULT_MODEL, PROJECTS_FILE, HOME,
|
|
get_effective_default_model,
|
|
)
|
|
from api.workspace import get_last_workspace
|
|
from api.agent_sessions import read_importable_agent_session_rows
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Stale temp-file cleanup
|
|
# ---------------------------------------------------------------------------
|
|
# Both Session.save() and _write_session_index() use the atomic-write pattern:
|
|
# write to <path>.tmp.<pid>.<tid> → os.replace() to final path
|
|
# If the process crashes between write and replace the .tmp file is left
|
|
# behind. Because the name embeds pid + tid, leftover files can never be
|
|
# reused by a different process/thread, so they are safe to remove on the
|
|
# next startup. _cleanup_stale_tmp_files() is called from the full-rebuild
|
|
# path of _write_session_index (i.e. at first index access / startup) and
|
|
# removes any *.tmp.* file whose mtime is older than one hour.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_STALE_TMP_AGE_SECONDS = 3600 # 1 hour
|
|
|
|
# Serializes index writers so concurrent Session.save() calls cannot race on
|
|
# stale baselines while still allowing LOCK to be released before disk I/O.
|
|
_INDEX_WRITE_LOCK = threading.RLock()
|
|
|
|
|
|
def _cleanup_stale_tmp_files() -> None:
|
|
"""Best-effort removal of stale ``*.tmp.*`` files from SESSION_DIR.
|
|
|
|
Only files whose mtime is older than ``_STALE_TMP_AGE_SECONDS`` are
|
|
removed so that in-flight writes from a long-running sibling process
|
|
are not disturbed. Errors are logged and swallowed — this must never
|
|
prevent startup.
|
|
"""
|
|
cutoff = time.time() - _STALE_TMP_AGE_SECONDS
|
|
try:
|
|
for p in SESSION_DIR.glob('*.tmp.*'):
|
|
try:
|
|
if p.stat().st_mtime < cutoff:
|
|
p.unlink(missing_ok=True)
|
|
logger.debug("Cleaned up stale tmp file: %s", p.name)
|
|
except OSError:
|
|
pass # best-effort
|
|
except Exception:
|
|
pass # SESSION_DIR may not exist yet; that's fine
|
|
|
|
|
|
def _index_entry_exists(session_id: str, in_memory_ids=None) -> bool:
|
|
"""Return True if an index entry still has backing state.
|
|
|
|
A session can legitimately exist either as a persisted JSON file or as an
|
|
in-memory Session object that has not been flushed yet. This helper is used
|
|
to prune stale `_index.json` rows left behind after session-id rotation or
|
|
file removal.
|
|
"""
|
|
if not session_id:
|
|
return False
|
|
if in_memory_ids is None:
|
|
with LOCK:
|
|
in_memory_ids = set(SESSIONS.keys())
|
|
if session_id in in_memory_ids:
|
|
return True
|
|
p = SESSION_DIR / f'{session_id}.json'
|
|
return p.exists()
|
|
|
|
|
|
def _write_session_index(updates=None):
|
|
"""Update the session index file.
|
|
|
|
When *updates* is provided (a list of Session objects whose compact
|
|
entries should be refreshed), this does a targeted in-place update of
|
|
the existing index — O(1) for single-session changes. When *updates*
|
|
is None, a full rebuild is performed (used on startup / first call).
|
|
|
|
LOCK protects in-memory state snapshots and payload construction only;
|
|
disk I/O (write/flush/fsync/replace) always runs outside LOCK.
|
|
"""
|
|
_tmp = SESSION_INDEX_FILE.with_suffix(f'.tmp.{os.getpid()}.{threading.current_thread().ident}')
|
|
|
|
with _INDEX_WRITE_LOCK:
|
|
# Lazy full-rebuild path — used when index doesn't exist yet.
|
|
if updates is None or not SESSION_INDEX_FILE.exists():
|
|
_cleanup_stale_tmp_files() # best-effort sweep on startup / first call
|
|
entries = []
|
|
for p in SESSION_DIR.glob('*.json'):
|
|
if p.name.startswith('_'):
|
|
continue
|
|
try:
|
|
s = Session.load(p.stem)
|
|
if s:
|
|
entries.append(s.compact())
|
|
except Exception:
|
|
logger.debug("Failed to load session from %s", p)
|
|
|
|
with LOCK:
|
|
existing_ids = {e.get('session_id') for e in entries}
|
|
for s in SESSIONS.values():
|
|
if s.session_id not in existing_ids:
|
|
entries.append(s.compact())
|
|
entries.sort(key=lambda s: s.get('updated_at', 0), reverse=True)
|
|
_payload = json.dumps(entries, ensure_ascii=False, indent=2)
|
|
|
|
try:
|
|
with open(_tmp, 'w', encoding='utf-8') as f:
|
|
f.write(_payload)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.replace(_tmp, SESSION_INDEX_FILE)
|
|
except Exception:
|
|
# Best-effort cleanup of stale tmp on failure
|
|
try:
|
|
_tmp.unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
raise
|
|
return
|
|
|
|
# Fast path: patch existing index with updated sessions.
|
|
# This avoids loading every session file on every single save().
|
|
_fallback = False
|
|
try:
|
|
with LOCK:
|
|
existing = json.loads(SESSION_INDEX_FILE.read_text(encoding='utf-8'))
|
|
in_memory_ids = set(SESSIONS.keys())
|
|
|
|
# Avoid N filesystem exists() checks under LOCK by collecting
|
|
# on-disk IDs once.
|
|
on_disk_ids = {
|
|
p.stem
|
|
for p in SESSION_DIR.glob('*.json')
|
|
if not p.name.startswith('_')
|
|
}
|
|
|
|
existing = [
|
|
e for e in existing
|
|
if (e.get('session_id') in in_memory_ids or e.get('session_id') in on_disk_ids)
|
|
]
|
|
|
|
# Build lookup of updated entries
|
|
updated_map = {s.session_id: s.compact() for s in updates}
|
|
existing_ids = {e.get('session_id') for e in existing}
|
|
# Add any updated entries not yet in the index
|
|
for sid, entry in updated_map.items():
|
|
if sid not in existing_ids:
|
|
existing.append(entry)
|
|
# Replace matching entries in-place
|
|
for i, e in enumerate(existing):
|
|
sid = e.get('session_id')
|
|
if sid in updated_map:
|
|
existing[i] = updated_map[sid]
|
|
existing.sort(key=lambda s: s.get('updated_at', 0), reverse=True)
|
|
_payload = json.dumps(existing, ensure_ascii=False, indent=2)
|
|
|
|
try:
|
|
with open(_tmp, 'w', encoding='utf-8') as f:
|
|
f.write(_payload)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.replace(_tmp, SESSION_INDEX_FILE)
|
|
except Exception:
|
|
try:
|
|
_tmp.unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
raise
|
|
except Exception:
|
|
_fallback = True
|
|
|
|
if _fallback:
|
|
# Corrupt or missing index — fall back to full rebuild (called outside LOCK to avoid deadlock)
|
|
_write_session_index(updates=None)
|
|
|
|
|
|
def _active_stream_ids():
|
|
with STREAMS_LOCK:
|
|
return set(STREAMS.keys())
|
|
|
|
|
|
def _is_streaming_session(active_stream_id, active_stream_ids):
|
|
return bool(active_stream_id and active_stream_id in active_stream_ids)
|
|
|
|
def _session_sort_timestamp(session):
|
|
if isinstance(session, dict):
|
|
return session.get('last_message_at') or session.get('updated_at') or 0
|
|
return _last_message_timestamp(getattr(session, 'messages', None)) or getattr(session, 'updated_at', 0) or 0
|
|
|
|
|
|
def _message_timestamp(message):
|
|
if not isinstance(message, dict):
|
|
return None
|
|
raw = message.get('_ts') or message.get('timestamp')
|
|
try:
|
|
return float(raw) if raw is not None else None
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _last_message_timestamp(messages):
|
|
if not isinstance(messages, list):
|
|
return None
|
|
for message in reversed(messages):
|
|
if isinstance(message, dict) and message.get('role') == 'tool':
|
|
continue
|
|
ts = _message_timestamp(message)
|
|
if ts:
|
|
return ts
|
|
return None
|
|
|
|
|
|
def _find_top_level_json_key(text, key):
|
|
"""Return the byte offset of a top-level JSON object key, if present."""
|
|
depth = 0
|
|
i = 0
|
|
n = len(text)
|
|
while i < n:
|
|
ch = text[i]
|
|
if ch == '"':
|
|
start = i
|
|
i += 1
|
|
escaped = False
|
|
chars = []
|
|
while i < n:
|
|
c = text[i]
|
|
if escaped:
|
|
chars.append(c)
|
|
escaped = False
|
|
elif c == '\\':
|
|
escaped = True
|
|
elif c == '"':
|
|
break
|
|
else:
|
|
chars.append(c)
|
|
i += 1
|
|
if i >= n:
|
|
return None
|
|
if depth == 1 and ''.join(chars) == key:
|
|
j = i + 1
|
|
while j < n and text[j] in ' \t\r\n':
|
|
j += 1
|
|
if j < n and text[j] == ':':
|
|
return start
|
|
elif ch in '{[':
|
|
depth += 1
|
|
elif ch in '}]':
|
|
depth -= 1
|
|
i += 1
|
|
return None
|
|
|
|
|
|
def _read_metadata_json_prefix(path, max_prefix_bytes=65536):
|
|
"""Read only the metadata portion before the top-level messages array."""
|
|
buf = ''
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
while len(buf.encode('utf-8')) < max_prefix_bytes:
|
|
chunk = f.read(4096)
|
|
if not chunk:
|
|
return None
|
|
buf += chunk
|
|
messages_pos = _find_top_level_json_key(buf, 'messages')
|
|
if messages_pos is None:
|
|
continue
|
|
prefix = buf[:messages_pos].rstrip()
|
|
if prefix.endswith(','):
|
|
prefix = prefix[:-1].rstrip()
|
|
return f'{prefix}\n}}'
|
|
return None
|
|
|
|
|
|
def _lookup_index_message_count(session_id):
|
|
"""Return the indexed message count without loading the full session file."""
|
|
try:
|
|
entries = json.loads(SESSION_INDEX_FILE.read_text(encoding='utf-8'))
|
|
except Exception:
|
|
return None
|
|
if not isinstance(entries, list):
|
|
return None
|
|
for entry in entries:
|
|
if entry.get('session_id') != session_id:
|
|
continue
|
|
count = entry.get('message_count')
|
|
if isinstance(count, int) and count >= 0:
|
|
return count
|
|
try:
|
|
count = int(count)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
return count if count >= 0 else None
|
|
return None
|
|
|
|
|
|
class Session:
|
|
def __init__(self, session_id: str=None, title: str='Untitled',
|
|
workspace=str(DEFAULT_WORKSPACE), model=DEFAULT_MODEL,
|
|
messages=None, created_at=None, updated_at=None,
|
|
tool_calls=None, pinned: bool=False, archived: bool=False,
|
|
project_id: str=None, profile=None,
|
|
input_tokens: int=0, output_tokens: int=0, estimated_cost=None,
|
|
personality=None,
|
|
active_stream_id: str=None,
|
|
pending_user_message: str=None,
|
|
pending_attachments=None,
|
|
pending_started_at=None,
|
|
compression_anchor_visible_idx=None,
|
|
compression_anchor_message_key=None,
|
|
**kwargs):
|
|
self.session_id = session_id or uuid.uuid4().hex[:12]
|
|
self.title = title
|
|
self.workspace = str(Path(workspace).expanduser().resolve())
|
|
self.model = model
|
|
self.messages = messages or []
|
|
self.tool_calls = tool_calls or []
|
|
self.created_at = created_at or time.time()
|
|
self.updated_at = updated_at or time.time()
|
|
self.pinned = bool(pinned)
|
|
self.archived = bool(archived)
|
|
self.project_id = project_id or None
|
|
self.profile = profile
|
|
self.input_tokens = input_tokens or 0
|
|
self.output_tokens = output_tokens or 0
|
|
self.estimated_cost = estimated_cost
|
|
self.personality = personality
|
|
self.active_stream_id = active_stream_id
|
|
self.pending_user_message = pending_user_message
|
|
self.pending_attachments = pending_attachments or []
|
|
self.pending_started_at = pending_started_at
|
|
self.compression_anchor_visible_idx = compression_anchor_visible_idx
|
|
self.compression_anchor_message_key = compression_anchor_message_key
|
|
self._metadata_message_count = None
|
|
|
|
@property
|
|
def path(self):
|
|
return SESSION_DIR / f'{self.session_id}.json'
|
|
|
|
def save(self, touch_updated_at: bool = True, skip_index: bool = False) -> None:
|
|
if touch_updated_at:
|
|
self.updated_at = time.time()
|
|
# Write metadata fields first so load_metadata_only() can read them
|
|
# without parsing the full messages array (which may be 400KB+).
|
|
# Fields are listed in the order they should appear in the JSON file.
|
|
METADATA_FIELDS = [
|
|
'session_id', 'title', 'workspace', 'model', 'created_at', 'updated_at',
|
|
'pinned', 'archived', 'project_id', 'profile',
|
|
'input_tokens', 'output_tokens', 'estimated_cost',
|
|
'personality', 'active_stream_id',
|
|
'pending_user_message', 'pending_attachments', 'pending_started_at',
|
|
'compression_anchor_visible_idx', 'compression_anchor_message_key',
|
|
]
|
|
meta = {k: getattr(self, k, None) for k in METADATA_FIELDS}
|
|
meta['messages'] = self.messages
|
|
meta['tool_calls'] = self.tool_calls
|
|
# Fields not in METADATA_FIELDS (e.g. last_usage, message_count) go at the end
|
|
extra = {k: v for k, v in self.__dict__.items()
|
|
if k not in METADATA_FIELDS and k not in ('messages', 'tool_calls')
|
|
and not k.startswith('_')}
|
|
payload = json.dumps({**meta, **extra}, ensure_ascii=False, indent=2)
|
|
tmp = self.path.with_suffix(f'.tmp.{os.getpid()}.{threading.current_thread().ident}')
|
|
try:
|
|
with open(tmp, 'w', encoding='utf-8') as f:
|
|
f.write(payload)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.replace(tmp, self.path)
|
|
except Exception:
|
|
try:
|
|
tmp.unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
raise
|
|
if not skip_index:
|
|
_write_session_index(updates=[self])
|
|
|
|
@classmethod
|
|
def load(cls, sid):
|
|
# Validate session ID format to prevent path traversal
|
|
if not sid or not all(c in '0123456789abcdefghijklmnopqrstuvwxyz_' for c in sid):
|
|
return None
|
|
p = SESSION_DIR / f'{sid}.json'
|
|
if not p.exists():
|
|
return None
|
|
return cls(**json.loads(p.read_text(encoding='utf-8')))
|
|
|
|
@classmethod
|
|
def load_metadata_only(cls, sid):
|
|
"""Load only the compact metadata fields, skipping the messages array.
|
|
|
|
Session JSON files have metadata fields (session_id, title, model, etc.)
|
|
at the top level, before the large messages array. Read only up to the
|
|
top-level "messages" field and synthesize a small metadata-only object.
|
|
Falls back to load() for legacy or unexpected file layouts.
|
|
"""
|
|
if not sid or not all(c in '0123456789abcdefghijklmnopqrstuvwxyz_' for c in sid):
|
|
return None
|
|
p = SESSION_DIR / f'{sid}.json'
|
|
if not p.exists():
|
|
return None
|
|
try:
|
|
prefix = _read_metadata_json_prefix(p)
|
|
if not prefix:
|
|
return cls.load(sid)
|
|
parsed = json.loads(prefix)
|
|
needed = {'session_id', 'title', 'created_at', 'updated_at'}
|
|
if not needed.issubset(parsed.keys()):
|
|
return cls.load(sid)
|
|
parsed['messages'] = []
|
|
parsed['tool_calls'] = []
|
|
session = cls(**parsed)
|
|
session._metadata_message_count = _lookup_index_message_count(sid)
|
|
return session
|
|
except Exception:
|
|
# Corrupt prefix or decode error — fall back to full load
|
|
return cls.load(sid)
|
|
|
|
def compact(self, include_runtime=False, active_stream_ids=None) -> dict:
|
|
active_stream_ids = active_stream_ids if active_stream_ids is not None else set()
|
|
return {
|
|
'session_id': self.session_id,
|
|
'title': self.title,
|
|
'workspace': self.workspace,
|
|
'model': self.model,
|
|
'message_count': (
|
|
self._metadata_message_count
|
|
if self._metadata_message_count is not None
|
|
else len(self.messages)
|
|
),
|
|
'created_at': self.created_at,
|
|
'updated_at': self.updated_at,
|
|
'last_message_at': _last_message_timestamp(self.messages) or self.updated_at,
|
|
'pinned': self.pinned,
|
|
'archived': self.archived,
|
|
'project_id': self.project_id,
|
|
'profile': self.profile,
|
|
'input_tokens': self.input_tokens,
|
|
'output_tokens': self.output_tokens,
|
|
'estimated_cost': self.estimated_cost,
|
|
'personality': self.personality,
|
|
'compression_anchor_visible_idx': self.compression_anchor_visible_idx,
|
|
'compression_anchor_message_key': self.compression_anchor_message_key,
|
|
'active_stream_id': self.active_stream_id,
|
|
'is_streaming': _is_streaming_session(
|
|
self.active_stream_id, active_stream_ids
|
|
) if include_runtime else False,
|
|
}
|
|
|
|
def get_session(sid, metadata_only=False):
|
|
"""Load a session, optionally with metadata only (skipping the messages array).
|
|
|
|
Metadata-only loads intentionally do not populate the full-session cache.
|
|
Otherwise a later full load could return a compact object with an empty
|
|
messages list. Use this when you only need compact() metadata and not the
|
|
actual message history (e.g., for fast sidebar switching).
|
|
"""
|
|
with LOCK:
|
|
if sid in SESSIONS:
|
|
SESSIONS.move_to_end(sid) # LRU: mark as recently used
|
|
return SESSIONS[sid]
|
|
if metadata_only:
|
|
s = Session.load_metadata_only(sid)
|
|
if s:
|
|
return s
|
|
else:
|
|
s = Session.load(sid)
|
|
if s:
|
|
with LOCK:
|
|
SESSIONS[sid] = s
|
|
SESSIONS.move_to_end(sid)
|
|
while len(SESSIONS) > SESSIONS_MAX:
|
|
SESSIONS.popitem(last=False) # evict least recently used
|
|
return s
|
|
raise KeyError(sid)
|
|
|
|
def new_session(workspace=None, model=None, profile=None):
|
|
"""Create a new in-memory session and persist it.
|
|
|
|
*profile* — when supplied by the caller (e.g. from the request body sent
|
|
by the active browser tab), it is used directly so that concurrent clients
|
|
on different profiles don't fight over a shared process-global. If not
|
|
supplied, we fall back to the process-level active profile (the pre-#798
|
|
behaviour, preserved for calls that originate outside a request context).
|
|
"""
|
|
if profile is None:
|
|
# Fallback: read process-level global (single-client or startup path)
|
|
try:
|
|
from api.profiles import get_active_profile_name
|
|
profile = get_active_profile_name()
|
|
except ImportError:
|
|
profile = None
|
|
effective_model = model or get_effective_default_model()
|
|
s = Session(
|
|
workspace=workspace or get_last_workspace(),
|
|
model=effective_model,
|
|
profile=profile,
|
|
)
|
|
with LOCK:
|
|
SESSIONS[s.session_id] = s
|
|
SESSIONS.move_to_end(s.session_id)
|
|
while len(SESSIONS) > SESSIONS_MAX:
|
|
SESSIONS.popitem(last=False)
|
|
s.save()
|
|
return s
|
|
|
|
def all_sessions():
|
|
active_stream_ids = _active_stream_ids()
|
|
# Phase C: try index first for O(1) read; fall back to full scan
|
|
if SESSION_INDEX_FILE.exists():
|
|
try:
|
|
index = json.loads(SESSION_INDEX_FILE.read_text(encoding='utf-8'))
|
|
index = [
|
|
s for s in index
|
|
if _index_entry_exists(s.get('session_id'))
|
|
]
|
|
backfilled = []
|
|
for i, s in enumerate(index):
|
|
if 'last_message_at' not in s:
|
|
full = Session.load(s.get('session_id'))
|
|
if full:
|
|
index[i] = full.compact()
|
|
backfilled.append(full)
|
|
if backfilled:
|
|
try:
|
|
_write_session_index(updates=backfilled)
|
|
except Exception:
|
|
logger.debug("Failed to persist last_message_at backfill")
|
|
for s in index:
|
|
s['is_streaming'] = _is_streaming_session(
|
|
s.get('active_stream_id'),
|
|
active_stream_ids,
|
|
)
|
|
# Overlay any in-memory sessions that may be newer than the index
|
|
index_map = {s['session_id']: s for s in index}
|
|
with LOCK:
|
|
for s in SESSIONS.values():
|
|
index_map[s.session_id] = s.compact(
|
|
include_runtime=True,
|
|
active_stream_ids=active_stream_ids,
|
|
)
|
|
result = sorted(index_map.values(), key=lambda s: (s.get('pinned', False), _session_sort_timestamp(s)), reverse=True)
|
|
# Hide empty Untitled sessions from the UI (created by tests, page refreshes, etc.)
|
|
# Exempt sessions younger than 60 s so a brand-new session stays visible (#789)
|
|
_now = time.time()
|
|
result = [s for s in result if not (
|
|
s.get('title', 'Untitled') == 'Untitled'
|
|
and s.get('message_count', 0) == 0
|
|
and (_now - s.get('updated_at', _now)) > 60
|
|
)]
|
|
# Backfill: sessions created before Sprint 22 have no profile tag.
|
|
# Attribute them to 'default' so the client profile filter works correctly.
|
|
for s in result:
|
|
if not s.get('profile'):
|
|
s['profile'] = 'default'
|
|
return result
|
|
except Exception:
|
|
logger.debug("Failed to load session index, falling back to full scan")
|
|
# Full scan fallback
|
|
out = []
|
|
for p in SESSION_DIR.glob('*.json'):
|
|
if p.name.startswith('_'): continue
|
|
try:
|
|
s = Session.load(p.stem)
|
|
if s: out.append(s)
|
|
except Exception:
|
|
logger.debug("Failed to load session from %s", p)
|
|
for s in SESSIONS.values():
|
|
if all(s.session_id != x.session_id for x in out): out.append(s)
|
|
out.sort(key=lambda s: (getattr(s, 'pinned', False), _session_sort_timestamp(s)), reverse=True)
|
|
_now = time.time()
|
|
result = [s.compact(include_runtime=True, active_stream_ids=active_stream_ids) for s in out if not (
|
|
s.title == 'Untitled'
|
|
and len(s.messages) == 0
|
|
and (_now - s.updated_at) > 60
|
|
)]
|
|
for s in result:
|
|
if not s.get('profile'):
|
|
s['profile'] = 'default'
|
|
return result
|
|
|
|
|
|
def title_from(messages, fallback: str='Untitled'):
|
|
"""Derive a session title from the first user message."""
|
|
for m in messages:
|
|
if m.get('role') == 'user':
|
|
c = m.get('content', '')
|
|
if isinstance(c, list):
|
|
c = ' '.join(p.get('text', '') for p in c if isinstance(p, dict) and p.get('type') == 'text')
|
|
text = str(c).strip()
|
|
if text:
|
|
return text[:64]
|
|
return fallback
|
|
|
|
|
|
# ── Project helpers ──────────────────────────────────────────────────────────
|
|
|
|
def load_projects() -> list:
|
|
"""Load project list from disk. Returns list of project dicts."""
|
|
if not PROJECTS_FILE.exists():
|
|
return []
|
|
try:
|
|
return json.loads(PROJECTS_FILE.read_text(encoding='utf-8'))
|
|
except Exception:
|
|
return []
|
|
|
|
def save_projects(projects) -> None:
|
|
"""Write project list to disk."""
|
|
PROJECTS_FILE.write_text(json.dumps(projects, ensure_ascii=False, indent=2), encoding='utf-8')
|
|
|
|
|
|
def import_cli_session(
|
|
session_id: str,
|
|
title: str,
|
|
messages,
|
|
model: str='unknown',
|
|
profile=None,
|
|
created_at=None,
|
|
updated_at=None,
|
|
):
|
|
"""Create a new WebUI session populated with CLI messages.
|
|
Returns the Session object.
|
|
"""
|
|
s = Session(
|
|
session_id=session_id,
|
|
title=title,
|
|
workspace=get_last_workspace(),
|
|
model=model,
|
|
messages=messages,
|
|
profile=profile,
|
|
created_at=created_at,
|
|
updated_at=updated_at,
|
|
)
|
|
s.save(touch_updated_at=False)
|
|
return s
|
|
|
|
|
|
# ── CLI session bridge ──────────────────────────────────────────────────────
|
|
|
|
def get_cli_sessions() -> list:
|
|
"""Read CLI sessions from the agent's SQLite store and return them as
|
|
dicts in a format the WebUI sidebar can render alongside local sessions.
|
|
|
|
Returns empty list if the SQLite DB is missing or any error occurs -- the
|
|
bridge is purely additive and never crashes the WebUI.
|
|
"""
|
|
import os
|
|
cli_sessions = []
|
|
|
|
# Use the active WebUI profile's HERMES_HOME to find state.db.
|
|
# The active profile is determined by what the user has selected in the UI
|
|
# (stored in the server's runtime config). This means:
|
|
# - default profile -> ~/.hermes/state.db
|
|
# - named profile X -> ~/.hermes/profiles/X/state.db
|
|
# We resolve the active profile's home directory rather than just using
|
|
# HERMES_HOME (which is the server's launch profile, not necessarily the
|
|
# active one after a profile switch).
|
|
try:
|
|
from api.profiles import get_active_hermes_home
|
|
hermes_home = Path(get_active_hermes_home()).expanduser().resolve()
|
|
except Exception:
|
|
hermes_home = Path(os.getenv('HERMES_HOME', str(HOME / '.hermes'))).expanduser().resolve()
|
|
|
|
db_path = hermes_home / 'state.db'
|
|
if not db_path.exists():
|
|
return cli_sessions
|
|
|
|
# Try to resolve the active CLI profile so imported sessions integrate
|
|
# with the WebUI profile filter (available since Sprint 22).
|
|
try:
|
|
from api.profiles import get_active_profile_name
|
|
_cli_profile = get_active_profile_name()
|
|
except ImportError:
|
|
_cli_profile = None # older agent -- fall back to no profile
|
|
|
|
try:
|
|
for row in read_importable_agent_session_rows(db_path, limit=200, log=logger):
|
|
sid = row['id']
|
|
raw_ts = row['last_activity'] or row['started_at']
|
|
# Prefer the CLI session's own profile from the DB; fall back to
|
|
# the active CLI profile so sidebar filtering works either way.
|
|
profile = _cli_profile # CLI DB has no profile column; use active profile
|
|
|
|
_source = row['source'] or 'cli'
|
|
_display_title = row['title'] or f'{_source.title()} Session'
|
|
cli_sessions.append({
|
|
'session_id': sid,
|
|
'title': _display_title,
|
|
'workspace': str(get_last_workspace()),
|
|
'model': row['model'] or None,
|
|
'message_count': row['message_count'] or row['actual_message_count'] or 0,
|
|
'created_at': row['started_at'],
|
|
'updated_at': raw_ts,
|
|
'pinned': False,
|
|
'archived': False,
|
|
'project_id': None,
|
|
'profile': profile,
|
|
'source_tag': _source,
|
|
'is_cli_session': True,
|
|
})
|
|
except Exception as _cli_err:
|
|
# DB schema changed, locked, or corrupted -- log warning so admins can diagnose.
|
|
# Still degrade gracefully (don't crash the WebUI).
|
|
import logging as _logging
|
|
_logging.getLogger(__name__).warning(
|
|
"get_cli_sessions() failed — check state.db schema or path (%s): %s",
|
|
db_path, _cli_err,
|
|
)
|
|
return []
|
|
|
|
return cli_sessions
|
|
|
|
|
|
def get_cli_session_messages(sid) -> list:
|
|
"""Read messages for a single CLI session from the SQLite store.
|
|
Returns a list of {role, content, timestamp} dicts.
|
|
Returns empty list on any error.
|
|
"""
|
|
import os
|
|
try:
|
|
import sqlite3
|
|
except ImportError:
|
|
return []
|
|
|
|
try:
|
|
from api.profiles import get_active_hermes_home
|
|
hermes_home = Path(get_active_hermes_home()).expanduser().resolve()
|
|
except Exception:
|
|
hermes_home = Path(os.getenv('HERMES_HOME', str(HOME / '.hermes'))).expanduser().resolve()
|
|
db_path = hermes_home / 'state.db'
|
|
if not db_path.exists():
|
|
return []
|
|
|
|
try:
|
|
with sqlite3.connect(str(db_path)) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT role, content, timestamp
|
|
FROM messages
|
|
WHERE session_id = ?
|
|
ORDER BY timestamp ASC
|
|
""", (sid,))
|
|
msgs = []
|
|
for row in cur.fetchall():
|
|
msgs.append({
|
|
'role': row['role'],
|
|
'content': row['content'],
|
|
'timestamp': row['timestamp'],
|
|
})
|
|
except Exception:
|
|
return []
|
|
return msgs
|
|
|
|
|
|
def delete_cli_session(sid) -> bool:
|
|
"""Delete a CLI session from state.db (messages + session row).
|
|
Returns True if deleted, False if not found or error.
|
|
"""
|
|
import os
|
|
try:
|
|
import sqlite3
|
|
except ImportError:
|
|
return False
|
|
|
|
try:
|
|
from api.profiles import get_active_hermes_home
|
|
hermes_home = Path(get_active_hermes_home()).expanduser().resolve()
|
|
except Exception:
|
|
hermes_home = Path(os.getenv('HERMES_HOME', str(HOME / '.hermes'))).expanduser().resolve()
|
|
db_path = hermes_home / 'state.db'
|
|
if not db_path.exists():
|
|
return False
|
|
|
|
try:
|
|
with sqlite3.connect(str(db_path)) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
|
|
cur.execute("DELETE FROM sessions WHERE id = ?", (sid,))
|
|
conn.commit()
|
|
return cur.rowcount > 0
|
|
except Exception:
|
|
return False
|