Files
hermes-webui/api/models.py
T
nesquena-hermes 6c343aff84 v0.50.210: gpt-5.5, cron titles, agent cache, bfcache fix, onboarding fix, mermaid CSP, PWA auth (#1056)
* feat(models): add gpt-5.5 to openai, openai-codex, copilot catalogs

Adds GPT-5.5 and GPT-5.5 Mini entries to the static _PROVIDER_MODELS
catalog so they appear in the model picker for the openai, openai-codex,
and copilot providers.

Signed-off-by: Pix (PiClaw, claude-opus-4-7) via Hermes Agent

* fix(models): add gpt-5.5-mini to copilot provider catalog

* fix(renderer): suppress Mermaid Google Fonts CSP violation via fontFamily inherit (#1044)

Mermaid's built-in 'dark' and 'default' themes inject an @import for
fonts.googleapis.com/Manrope into every generated SVG. The CSP style-src
only allows cdn.jsdelivr.net, so this request is blocked on every diagram
render, filling the console with CSP errors.

Fix: pass fontFamily:'inherit' (and fontSize:'14px') in the themeVariables
block of mermaid.initialize() in renderMermaidBlocks(). This suppresses
Mermaid's external font import and uses the page's existing font stack.

Avoids adding fonts.googleapis.com to the CSP — no new external dependency,
no font FOUT, consistent with the rest of the UI typography.

3 regression tests added in tests/test_1044_mermaid_csp_font.py.
2215/2215 tests passing.

* fix(onboarding): non-standard provider/path cluster (#1029)

* fix(bfcache): restore full layout on tab/session restore — rail, topbar, panels (#1045)

The pageshow handler added for #822 only cleared the session search filter
and re-rendered the session list. This left the rest of the layout chrome
(topbar, rail icons, workspace panel, resize handles, gateway SSE) in the
stale bfcache DOM state, causing a broken layout (oversized search icon,
uninitialized rail) that required a hard refresh to fix.

Fix: extend the pageshow handler to re-run the full set of layout sync calls
that the boot IIFE runs on a fresh page load:

  syncTopbar()              — restores model chip, title, topbar state
  syncWorkspacePanelState() — restores workspace panel open/closed
  _initResizePanels()       — reattaches panel resize drag listeners
  startGatewaySSE()         — reconnects the gateway SSE watcher
                              (bfcache-persisted connections are dead)

All four calls are typeof-guarded for safe degradation if a helper is not
yet defined. The existing #822 fixes (sessionSearch clear +
renderSessionListFromCache) are preserved unchanged.

loadSession() is intentionally NOT re-called — it would cause message
flicker; the sync calls above are sufficient to restore visual state.

7 regression tests added in tests/test_1045_bfcache_layout_restore.py.
2219/2219 tests passing.

* fix(bfcache): also close open dropdowns on bfcache restore (#1045)

Additional symptom noted in issue #1045: bfcache freezes the DOM including
any open dropdown/popover state. The thinking-level selector (and other
composer dropdowns) left open when navigating away would appear open without
user interaction on tab restore.

Extend the pageshow handler to call all four named close functions before
the layout sync:
  closeModelDropdown()     — composer model selector
  closeReasoningDropdown() — thinking/reasoning effort selector
  closeWsDropdown()        — workspace chip dropdown
  closeProfileDropdown()   — profile switcher dropdown

All calls are typeof-guarded, matching the style of the layout sync calls
already in the handler.

2 new tests (9 total in test_1045_bfcache_layout_restore.py):
- pageshow closes all four named dropdowns
- dropdown closes appear before layout sync calls (clean state first)

2221/2221 tests passing.

* fix(bfcache): remove _initResizePanels() — bfcache preserves listeners

* fix(bfcache): remove _initResizePanels from pageshow — bfcache preserves listeners; update test

* fix(sessions): use cron job name as session title when available (#1032)

* fix(test): add id column to messages table in cron title test fixture

* fix(merge): inject cron title lookup into read_importable loop, remove stale sqlite3 block

* fix(pwa): redirect to /login client-side on 401 — fixes iOS PWA auth expiry trap (#1038)

When an auth session expires, the server returns a 302→/login for page
requests. In a normal browser this works fine, but in an iOS PWA running
in standalone mode the redirect navigates out of the PWA shell into Safari,
leaving the app permanently stuck on 'Authentication required' with no
recovery path.

Fix: intercept 401 responses client-side before surfacing any error.

- workspace.js api(): check res.status===401 first; call
  window.location.href='/login' and return immediately (no throw)
- ui.js: add _redirectIfUnauth() helper; wire into all direct fetch()
  calls that bypass api() — api/models, api/models/live, api/upload

All fetch paths that could receive a 401 now redirect cleanly within
the PWA frame rather than opening Safari.

6 regression tests added in tests/test_1038_pwa_auth_redirect.py.
2175/2175 tests passing.

* fix(pwa): preserve current URL in ?next= param on 401 redirect

* fix(test): update 401-redirect assertion to accept ?next= URL format

* feat(pwa): add _safeNextPath() to login.js so ?next= param is honored after re-login

Addresses reviewer suggestion: the ?next= URL set on 401 redirect was ignored by
the login success handler (always redirected to ./). _safeNextPath() validates and
returns the ?next= param with open-redirect guards: rejects non-path-absolute inputs,
// protocol-relative URLs, backslash variants, and control characters.
4 new regression tests added.

* Implement session agent cache for AIAgent reuse

Added session agent cache to reuse AIAgent across messages.

* Implement agent caching for session management

* Implement session agent eviction on session deletion

Added session agent eviction to prevent turn count leakage in recycled sessions.

* docs: v0.50.210 release notes — 7 PRs, 2239 tests (+27)

* docs(changelog): drop stale [Unreleased] entries duplicated by v0.50.210

Three entries in the [Unreleased] section are duplicates of items now
listed under v0.50.210:

  - Mermaid CSP font fix (#1044)        → v0.50.210 / Mermaid Google Fonts CSP
  - bfcache layout restore (#1045)      → v0.50.210 / bfcache layout and dropdown restore
  - iOS PWA auth redirect (#1038)       → v0.50.210 / Login redirects back to original URL

The original drafts landed in [Unreleased] when individual PRs (#1047,
#1048, #1043) were approved; the v0.50.210 release-notes commit then
added the same items under the version section without removing the
[Unreleased] copies. Drop the duplicates so users reading the CHANGELOG
don't see the same fix listed twice.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Signed-off-by: Pix (PiClaw, claude-opus-4-7) via Hermes Agent
Co-authored-by: Pix (Hermes) <aliceisjustplaying@users.noreply.github.com>
Co-authored-by: nesquena-hermes <nesquena-hermes@users.noreply.github.com>
Co-authored-by: qxxaa <mrhanoi@outlook.com>
Co-authored-by: Nathan Esquenazi <nesquena@gmail.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 15:47:44 -07:00

811 lines
31 KiB
Python

"""Hermes Web UI -- Session model and in-memory session store."""
import collections
import json
import logging
import os
import threading
import time
import uuid
from pathlib import Path
import api.config as _cfg
from api.config import (
SESSION_DIR, SESSION_INDEX_FILE, SESSIONS, SESSIONS_MAX,
LOCK, STREAMS, STREAMS_LOCK, DEFAULT_WORKSPACE, DEFAULT_MODEL, PROJECTS_FILE, HOME,
get_effective_default_model,
)
from api.workspace import get_last_workspace
from api.agent_sessions import read_importable_agent_session_rows
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Stale temp-file cleanup
# ---------------------------------------------------------------------------
# Both Session.save() and _write_session_index() use the atomic-write pattern:
# write to <path>.tmp.<pid>.<tid> → os.replace() to final path
# If the process crashes between write and replace the .tmp file is left
# behind. Because the name embeds pid + tid, leftover files can never be
# reused by a different process/thread, so they are safe to remove on the
# next startup. _cleanup_stale_tmp_files() is called from the full-rebuild
# path of _write_session_index (i.e. at first index access / startup) and
# removes any *.tmp.* file whose mtime is older than one hour.
# ---------------------------------------------------------------------------
_STALE_TMP_AGE_SECONDS = 3600 # 1 hour
# Serializes index writers so concurrent Session.save() calls cannot race on
# stale baselines while still allowing LOCK to be released before disk I/O.
_INDEX_WRITE_LOCK = threading.RLock()
def _cleanup_stale_tmp_files() -> None:
"""Best-effort removal of stale ``*.tmp.*`` files from SESSION_DIR.
Only files whose mtime is older than ``_STALE_TMP_AGE_SECONDS`` are
removed so that in-flight writes from a long-running sibling process
are not disturbed. Errors are logged and swallowed — this must never
prevent startup.
"""
cutoff = time.time() - _STALE_TMP_AGE_SECONDS
try:
for p in SESSION_DIR.glob('*.tmp.*'):
try:
if p.stat().st_mtime < cutoff:
p.unlink(missing_ok=True)
logger.debug("Cleaned up stale tmp file: %s", p.name)
except OSError:
pass # best-effort
except Exception:
pass # SESSION_DIR may not exist yet; that's fine
def _index_entry_exists(session_id: str, in_memory_ids=None) -> bool:
"""Return True if an index entry still has backing state.
A session can legitimately exist either as a persisted JSON file or as an
in-memory Session object that has not been flushed yet. This helper is used
to prune stale `_index.json` rows left behind after session-id rotation or
file removal.
"""
if not session_id:
return False
if in_memory_ids is None:
with LOCK:
in_memory_ids = set(SESSIONS.keys())
if session_id in in_memory_ids:
return True
p = SESSION_DIR / f'{session_id}.json'
return p.exists()
def _write_session_index(updates=None):
"""Update the session index file.
When *updates* is provided (a list of Session objects whose compact
entries should be refreshed), this does a targeted in-place update of
the existing index — O(1) for single-session changes. When *updates*
is None, a full rebuild is performed (used on startup / first call).
LOCK protects in-memory state snapshots and payload construction only;
disk I/O (write/flush/fsync/replace) always runs outside LOCK.
"""
_tmp = SESSION_INDEX_FILE.with_suffix(f'.tmp.{os.getpid()}.{threading.current_thread().ident}')
with _INDEX_WRITE_LOCK:
# Lazy full-rebuild path — used when index doesn't exist yet.
if updates is None or not SESSION_INDEX_FILE.exists():
_cleanup_stale_tmp_files() # best-effort sweep on startup / first call
entries = []
for p in SESSION_DIR.glob('*.json'):
if p.name.startswith('_'):
continue
try:
s = Session.load(p.stem)
if s:
entries.append(s.compact())
except Exception:
logger.debug("Failed to load session from %s", p)
with LOCK:
existing_ids = {e.get('session_id') for e in entries}
for s in SESSIONS.values():
if s.session_id not in existing_ids:
entries.append(s.compact())
entries.sort(key=lambda s: s.get('updated_at', 0), reverse=True)
_payload = json.dumps(entries, ensure_ascii=False, indent=2)
try:
with open(_tmp, 'w', encoding='utf-8') as f:
f.write(_payload)
f.flush()
os.fsync(f.fileno())
os.replace(_tmp, SESSION_INDEX_FILE)
except Exception:
# Best-effort cleanup of stale tmp on failure
try:
_tmp.unlink(missing_ok=True)
except Exception:
pass
raise
return
# Fast path: patch existing index with updated sessions.
# This avoids loading every session file on every single save().
_fallback = False
try:
with LOCK:
existing = json.loads(SESSION_INDEX_FILE.read_text(encoding='utf-8'))
in_memory_ids = set(SESSIONS.keys())
# Avoid N filesystem exists() checks under LOCK by collecting
# on-disk IDs once.
on_disk_ids = {
p.stem
for p in SESSION_DIR.glob('*.json')
if not p.name.startswith('_')
}
existing = [
e for e in existing
if (e.get('session_id') in in_memory_ids or e.get('session_id') in on_disk_ids)
]
# Build lookup of updated entries
updated_map = {s.session_id: s.compact() for s in updates}
existing_ids = {e.get('session_id') for e in existing}
# Add any updated entries not yet in the index
for sid, entry in updated_map.items():
if sid not in existing_ids:
existing.append(entry)
# Replace matching entries in-place
for i, e in enumerate(existing):
sid = e.get('session_id')
if sid in updated_map:
existing[i] = updated_map[sid]
existing.sort(key=lambda s: s.get('updated_at', 0), reverse=True)
_payload = json.dumps(existing, ensure_ascii=False, indent=2)
try:
with open(_tmp, 'w', encoding='utf-8') as f:
f.write(_payload)
f.flush()
os.fsync(f.fileno())
os.replace(_tmp, SESSION_INDEX_FILE)
except Exception:
try:
_tmp.unlink(missing_ok=True)
except Exception:
pass
raise
except Exception:
_fallback = True
if _fallback:
# Corrupt or missing index — fall back to full rebuild (called outside LOCK to avoid deadlock)
_write_session_index(updates=None)
def _active_stream_ids():
with STREAMS_LOCK:
return set(STREAMS.keys())
def _is_streaming_session(active_stream_id, active_stream_ids):
return bool(active_stream_id and active_stream_id in active_stream_ids)
def _session_sort_timestamp(session):
if isinstance(session, dict):
return session.get('last_message_at') or session.get('updated_at') or 0
return _last_message_timestamp(getattr(session, 'messages', None)) or getattr(session, 'updated_at', 0) or 0
def _message_timestamp(message):
if not isinstance(message, dict):
return None
raw = message.get('_ts') or message.get('timestamp')
try:
return float(raw) if raw is not None else None
except (TypeError, ValueError):
return None
def _last_message_timestamp(messages):
if not isinstance(messages, list):
return None
for message in reversed(messages):
if isinstance(message, dict) and message.get('role') == 'tool':
continue
ts = _message_timestamp(message)
if ts:
return ts
return None
def _find_top_level_json_key(text, key):
"""Return the byte offset of a top-level JSON object key, if present."""
depth = 0
i = 0
n = len(text)
while i < n:
ch = text[i]
if ch == '"':
start = i
i += 1
escaped = False
chars = []
while i < n:
c = text[i]
if escaped:
chars.append(c)
escaped = False
elif c == '\\':
escaped = True
elif c == '"':
break
else:
chars.append(c)
i += 1
if i >= n:
return None
if depth == 1 and ''.join(chars) == key:
j = i + 1
while j < n and text[j] in ' \t\r\n':
j += 1
if j < n and text[j] == ':':
return start
elif ch in '{[':
depth += 1
elif ch in '}]':
depth -= 1
i += 1
return None
def _read_metadata_json_prefix(path, max_prefix_bytes=65536):
"""Read only the metadata portion before the top-level messages array."""
buf = ''
with open(path, 'r', encoding='utf-8') as f:
while len(buf.encode('utf-8')) < max_prefix_bytes:
chunk = f.read(4096)
if not chunk:
return None
buf += chunk
messages_pos = _find_top_level_json_key(buf, 'messages')
if messages_pos is None:
continue
prefix = buf[:messages_pos].rstrip()
if prefix.endswith(','):
prefix = prefix[:-1].rstrip()
return f'{prefix}\n}}'
return None
def _lookup_index_message_count(session_id):
"""Return the indexed message count without loading the full session file."""
try:
entries = json.loads(SESSION_INDEX_FILE.read_text(encoding='utf-8'))
except Exception:
return None
if not isinstance(entries, list):
return None
for entry in entries:
if entry.get('session_id') != session_id:
continue
count = entry.get('message_count')
if isinstance(count, int) and count >= 0:
return count
try:
count = int(count)
except (TypeError, ValueError):
return None
return count if count >= 0 else None
return None
class Session:
def __init__(self, session_id: str=None, title: str='Untitled',
workspace=str(DEFAULT_WORKSPACE), model=DEFAULT_MODEL,
messages=None, created_at=None, updated_at=None,
tool_calls=None, pinned: bool=False, archived: bool=False,
project_id: str=None, profile=None,
input_tokens: int=0, output_tokens: int=0, estimated_cost=None,
personality=None,
active_stream_id: str=None,
pending_user_message: str=None,
pending_attachments=None,
pending_started_at=None,
compression_anchor_visible_idx=None,
compression_anchor_message_key=None,
**kwargs):
self.session_id = session_id or uuid.uuid4().hex[:12]
self.title = title
self.workspace = str(Path(workspace).expanduser().resolve())
self.model = model
self.messages = messages or []
self.tool_calls = tool_calls or []
self.created_at = created_at or time.time()
self.updated_at = updated_at or time.time()
self.pinned = bool(pinned)
self.archived = bool(archived)
self.project_id = project_id or None
self.profile = profile
self.input_tokens = input_tokens or 0
self.output_tokens = output_tokens or 0
self.estimated_cost = estimated_cost
self.personality = personality
self.active_stream_id = active_stream_id
self.pending_user_message = pending_user_message
self.pending_attachments = pending_attachments or []
self.pending_started_at = pending_started_at
self.compression_anchor_visible_idx = compression_anchor_visible_idx
self.compression_anchor_message_key = compression_anchor_message_key
self._metadata_message_count = None
@property
def path(self):
return SESSION_DIR / f'{self.session_id}.json'
def save(self, touch_updated_at: bool = True, skip_index: bool = False) -> None:
if touch_updated_at:
self.updated_at = time.time()
# Write metadata fields first so load_metadata_only() can read them
# without parsing the full messages array (which may be 400KB+).
# Fields are listed in the order they should appear in the JSON file.
METADATA_FIELDS = [
'session_id', 'title', 'workspace', 'model', 'created_at', 'updated_at',
'pinned', 'archived', 'project_id', 'profile',
'input_tokens', 'output_tokens', 'estimated_cost',
'personality', 'active_stream_id',
'pending_user_message', 'pending_attachments', 'pending_started_at',
'compression_anchor_visible_idx', 'compression_anchor_message_key',
]
meta = {k: getattr(self, k, None) for k in METADATA_FIELDS}
meta['messages'] = self.messages
meta['tool_calls'] = self.tool_calls
# Fields not in METADATA_FIELDS (e.g. last_usage, message_count) go at the end
extra = {k: v for k, v in self.__dict__.items()
if k not in METADATA_FIELDS and k not in ('messages', 'tool_calls')
and not k.startswith('_')}
payload = json.dumps({**meta, **extra}, ensure_ascii=False, indent=2)
tmp = self.path.with_suffix(f'.tmp.{os.getpid()}.{threading.current_thread().ident}')
try:
with open(tmp, 'w', encoding='utf-8') as f:
f.write(payload)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, self.path)
except Exception:
try:
tmp.unlink(missing_ok=True)
except Exception:
pass
raise
if not skip_index:
_write_session_index(updates=[self])
@classmethod
def load(cls, sid):
# Validate session ID format to prevent path traversal
if not sid or not all(c in '0123456789abcdefghijklmnopqrstuvwxyz_' for c in sid):
return None
p = SESSION_DIR / f'{sid}.json'
if not p.exists():
return None
return cls(**json.loads(p.read_text(encoding='utf-8')))
@classmethod
def load_metadata_only(cls, sid):
"""Load only the compact metadata fields, skipping the messages array.
Session JSON files have metadata fields (session_id, title, model, etc.)
at the top level, before the large messages array. Read only up to the
top-level "messages" field and synthesize a small metadata-only object.
Falls back to load() for legacy or unexpected file layouts.
"""
if not sid or not all(c in '0123456789abcdefghijklmnopqrstuvwxyz_' for c in sid):
return None
p = SESSION_DIR / f'{sid}.json'
if not p.exists():
return None
try:
prefix = _read_metadata_json_prefix(p)
if not prefix:
return cls.load(sid)
parsed = json.loads(prefix)
needed = {'session_id', 'title', 'created_at', 'updated_at'}
if not needed.issubset(parsed.keys()):
return cls.load(sid)
parsed['messages'] = []
parsed['tool_calls'] = []
session = cls(**parsed)
session._metadata_message_count = _lookup_index_message_count(sid)
return session
except Exception:
# Corrupt prefix or decode error — fall back to full load
return cls.load(sid)
def compact(self, include_runtime=False, active_stream_ids=None) -> dict:
active_stream_ids = active_stream_ids if active_stream_ids is not None else set()
return {
'session_id': self.session_id,
'title': self.title,
'workspace': self.workspace,
'model': self.model,
'message_count': (
self._metadata_message_count
if self._metadata_message_count is not None
else len(self.messages)
),
'created_at': self.created_at,
'updated_at': self.updated_at,
'last_message_at': _last_message_timestamp(self.messages) or self.updated_at,
'pinned': self.pinned,
'archived': self.archived,
'project_id': self.project_id,
'profile': self.profile,
'input_tokens': self.input_tokens,
'output_tokens': self.output_tokens,
'estimated_cost': self.estimated_cost,
'personality': self.personality,
'compression_anchor_visible_idx': self.compression_anchor_visible_idx,
'compression_anchor_message_key': self.compression_anchor_message_key,
'active_stream_id': self.active_stream_id,
'is_streaming': _is_streaming_session(
self.active_stream_id, active_stream_ids
) if include_runtime else False,
}
def get_session(sid, metadata_only=False):
"""Load a session, optionally with metadata only (skipping the messages array).
Metadata-only loads intentionally do not populate the full-session cache.
Otherwise a later full load could return a compact object with an empty
messages list. Use this when you only need compact() metadata and not the
actual message history (e.g., for fast sidebar switching).
"""
with LOCK:
if sid in SESSIONS:
SESSIONS.move_to_end(sid) # LRU: mark as recently used
return SESSIONS[sid]
if metadata_only:
s = Session.load_metadata_only(sid)
if s:
return s
else:
s = Session.load(sid)
if s:
with LOCK:
SESSIONS[sid] = s
SESSIONS.move_to_end(sid)
while len(SESSIONS) > SESSIONS_MAX:
SESSIONS.popitem(last=False) # evict least recently used
return s
raise KeyError(sid)
def new_session(workspace=None, model=None, profile=None):
"""Create a new in-memory session and persist it.
*profile* — when supplied by the caller (e.g. from the request body sent
by the active browser tab), it is used directly so that concurrent clients
on different profiles don't fight over a shared process-global. If not
supplied, we fall back to the process-level active profile (the pre-#798
behaviour, preserved for calls that originate outside a request context).
"""
if profile is None:
# Fallback: read process-level global (single-client or startup path)
try:
from api.profiles import get_active_profile_name
profile = get_active_profile_name()
except ImportError:
profile = None
effective_model = model or get_effective_default_model()
s = Session(
workspace=workspace or get_last_workspace(),
model=effective_model,
profile=profile,
)
with LOCK:
SESSIONS[s.session_id] = s
SESSIONS.move_to_end(s.session_id)
while len(SESSIONS) > SESSIONS_MAX:
SESSIONS.popitem(last=False)
s.save()
return s
def all_sessions():
active_stream_ids = _active_stream_ids()
# Phase C: try index first for O(1) read; fall back to full scan
if SESSION_INDEX_FILE.exists():
try:
index = json.loads(SESSION_INDEX_FILE.read_text(encoding='utf-8'))
index = [
s for s in index
if _index_entry_exists(s.get('session_id'))
]
backfilled = []
for i, s in enumerate(index):
if 'last_message_at' not in s:
full = Session.load(s.get('session_id'))
if full:
index[i] = full.compact()
backfilled.append(full)
if backfilled:
try:
_write_session_index(updates=backfilled)
except Exception:
logger.debug("Failed to persist last_message_at backfill")
for s in index:
s['is_streaming'] = _is_streaming_session(
s.get('active_stream_id'),
active_stream_ids,
)
# Overlay any in-memory sessions that may be newer than the index
index_map = {s['session_id']: s for s in index}
with LOCK:
for s in SESSIONS.values():
index_map[s.session_id] = s.compact(
include_runtime=True,
active_stream_ids=active_stream_ids,
)
result = sorted(index_map.values(), key=lambda s: (s.get('pinned', False), _session_sort_timestamp(s)), reverse=True)
# Hide empty Untitled sessions from the UI (created by tests, page refreshes, etc.)
# Exempt sessions younger than 60 s so a brand-new session stays visible (#789)
_now = time.time()
result = [s for s in result if not (
s.get('title', 'Untitled') == 'Untitled'
and s.get('message_count', 0) == 0
and (_now - s.get('updated_at', _now)) > 60
)]
# Backfill: sessions created before Sprint 22 have no profile tag.
# Attribute them to 'default' so the client profile filter works correctly.
for s in result:
if not s.get('profile'):
s['profile'] = 'default'
return result
except Exception:
logger.debug("Failed to load session index, falling back to full scan")
# Full scan fallback
out = []
for p in SESSION_DIR.glob('*.json'):
if p.name.startswith('_'): continue
try:
s = Session.load(p.stem)
if s: out.append(s)
except Exception:
logger.debug("Failed to load session from %s", p)
for s in SESSIONS.values():
if all(s.session_id != x.session_id for x in out): out.append(s)
out.sort(key=lambda s: (getattr(s, 'pinned', False), _session_sort_timestamp(s)), reverse=True)
_now = time.time()
result = [s.compact(include_runtime=True, active_stream_ids=active_stream_ids) for s in out if not (
s.title == 'Untitled'
and len(s.messages) == 0
and (_now - s.updated_at) > 60
)]
for s in result:
if not s.get('profile'):
s['profile'] = 'default'
return result
def title_from(messages, fallback: str='Untitled'):
"""Derive a session title from the first user message."""
for m in messages:
if m.get('role') == 'user':
c = m.get('content', '')
if isinstance(c, list):
c = ' '.join(p.get('text', '') for p in c if isinstance(p, dict) and p.get('type') == 'text')
text = str(c).strip()
if text:
return text[:64]
return fallback
# ── Project helpers ──────────────────────────────────────────────────────────
def load_projects() -> list:
"""Load project list from disk. Returns list of project dicts."""
if not PROJECTS_FILE.exists():
return []
try:
return json.loads(PROJECTS_FILE.read_text(encoding='utf-8'))
except Exception:
return []
def save_projects(projects) -> None:
"""Write project list to disk."""
PROJECTS_FILE.write_text(json.dumps(projects, ensure_ascii=False, indent=2), encoding='utf-8')
def import_cli_session(
session_id: str,
title: str,
messages,
model: str='unknown',
profile=None,
created_at=None,
updated_at=None,
):
"""Create a new WebUI session populated with CLI messages.
Returns the Session object.
"""
s = Session(
session_id=session_id,
title=title,
workspace=get_last_workspace(),
model=model,
messages=messages,
profile=profile,
created_at=created_at,
updated_at=updated_at,
)
s.save(touch_updated_at=False)
return s
# ── CLI session bridge ──────────────────────────────────────────────────────
def get_cli_sessions() -> list:
"""Read CLI sessions from the agent's SQLite store and return them as
dicts in a format the WebUI sidebar can render alongside local sessions.
Returns empty list if the SQLite DB is missing or any error occurs -- the
bridge is purely additive and never crashes the WebUI.
"""
import os
cli_sessions = []
# Use the active WebUI profile's HERMES_HOME to find state.db.
# The active profile is determined by what the user has selected in the UI
# (stored in the server's runtime config). This means:
# - default profile -> ~/.hermes/state.db
# - named profile X -> ~/.hermes/profiles/X/state.db
# We resolve the active profile's home directory rather than just using
# HERMES_HOME (which is the server's launch profile, not necessarily the
# active one after a profile switch).
try:
from api.profiles import get_active_hermes_home
hermes_home = Path(get_active_hermes_home()).expanduser().resolve()
except Exception:
hermes_home = Path(os.getenv('HERMES_HOME', str(HOME / '.hermes'))).expanduser().resolve()
db_path = hermes_home / 'state.db'
if not db_path.exists():
return cli_sessions
# Try to resolve the active CLI profile so imported sessions integrate
# with the WebUI profile filter (available since Sprint 22).
try:
from api.profiles import get_active_profile_name
_cli_profile = get_active_profile_name()
except ImportError:
_cli_profile = None # older agent -- fall back to no profile
try:
for row in read_importable_agent_session_rows(db_path, limit=200, log=logger):
sid = row['id']
raw_ts = row['last_activity'] or row['started_at']
# Prefer the CLI session's own profile from the DB; fall back to
# the active CLI profile so sidebar filtering works either way.
profile = _cli_profile # CLI DB has no profile column; use active profile
_source = row['source'] or 'cli'
_title = row['title']
if not _title and _source == 'cron' and sid.startswith('cron_'):
# Extract job_id from session ID (cron_{job_id}_{timestamp})
# and look up the human-friendly job name from jobs.json
parts = sid.split('_')
if len(parts) >= 3:
_job_id = parts[1]
try:
_jobs_path = hermes_home / 'cron' / 'jobs.json'
if _jobs_path.exists():
import json as _json
_jobs_data = _json.loads(_jobs_path.read_text())
for _j in _jobs_data.get('jobs', []):
if _j.get('id') == _job_id:
_title = _j.get('name') or _title
break
except Exception:
pass # degrade gracefully
_display_title = _title or f'{_source.title()} Session'
cli_sessions.append({
'session_id': sid,
'title': _display_title,
'workspace': str(get_last_workspace()),
'model': row['model'] or None,
'message_count': row['message_count'] or row['actual_message_count'] or 0,
'created_at': row['started_at'],
'updated_at': raw_ts,
'pinned': False,
'archived': False,
'project_id': None,
'profile': profile,
'source_tag': _source,
'is_cli_session': True,
})
except Exception as _cli_err:
# DB schema changed, locked, or corrupted -- log warning so admins can diagnose.
# Still degrade gracefully (don't crash the WebUI).
import logging as _logging
_logging.getLogger(__name__).warning(
"get_cli_sessions() failed — check state.db schema or path (%s): %s",
db_path, _cli_err,
)
return []
return cli_sessions
def get_cli_session_messages(sid) -> list:
"""Read messages for a single CLI session from the SQLite store.
Returns a list of {role, content, timestamp} dicts.
Returns empty list on any error.
"""
import os
try:
import sqlite3
except ImportError:
return []
try:
from api.profiles import get_active_hermes_home
hermes_home = Path(get_active_hermes_home()).expanduser().resolve()
except Exception:
hermes_home = Path(os.getenv('HERMES_HOME', str(HOME / '.hermes'))).expanduser().resolve()
db_path = hermes_home / 'state.db'
if not db_path.exists():
return []
try:
with sqlite3.connect(str(db_path)) as conn:
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute("""
SELECT role, content, timestamp
FROM messages
WHERE session_id = ?
ORDER BY timestamp ASC
""", (sid,))
msgs = []
for row in cur.fetchall():
msgs.append({
'role': row['role'],
'content': row['content'],
'timestamp': row['timestamp'],
})
except Exception:
return []
return msgs
def delete_cli_session(sid) -> bool:
"""Delete a CLI session from state.db (messages + session row).
Returns True if deleted, False if not found or error.
"""
import os
try:
import sqlite3
except ImportError:
return False
try:
from api.profiles import get_active_hermes_home
hermes_home = Path(get_active_hermes_home()).expanduser().resolve()
except Exception:
hermes_home = Path(os.getenv('HERMES_HOME', str(HOME / '.hermes'))).expanduser().resolve()
db_path = hermes_home / 'state.db'
if not db_path.exists():
return False
try:
with sqlite3.connect(str(db_path)) as conn:
cur = conn.cursor()
cur.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
cur.execute("DELETE FROM sessions WHERE id = ?", (sid,))
conn.commit()
return cur.rowcount > 0
except Exception:
return False