mirror of
https://github.com/nesquena/hermes-webui.git
synced 2026-05-22 10:20:14 +00:00
609 lines
21 KiB
Python
609 lines
21 KiB
Python
"""WebUI bridge for Hermes persistent session goals."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import copy
|
|
import logging
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try: # Exposed as a module attribute so tests can monkeypatch it directly.
|
|
from hermes_cli.goals import ( # type: ignore
|
|
CONTINUATION_PROMPT_TEMPLATE,
|
|
DEFAULT_MAX_TURNS,
|
|
GoalManager as _NativeGoalManager,
|
|
GoalState,
|
|
judge_goal,
|
|
)
|
|
except Exception: # pragma: no cover - depends on installed hermes-agent
|
|
CONTINUATION_PROMPT_TEMPLATE = "" # type: ignore
|
|
DEFAULT_MAX_TURNS = 20 # type: ignore
|
|
_NativeGoalManager = None # type: ignore
|
|
GoalState = None # type: ignore
|
|
judge_goal = None # type: ignore
|
|
|
|
GoalManager = _NativeGoalManager # type: ignore
|
|
|
|
_DB_CACHE: dict[str, Any] = {}
|
|
|
|
|
|
def _default_max_turns() -> int:
|
|
"""Return the configured /goal turn budget, defaulting to Hermes' 20 turns."""
|
|
try:
|
|
from api import config as _config
|
|
|
|
cfg = getattr(_config, "cfg", {}) or {}
|
|
goals_cfg = cfg.get("goals", {}) if isinstance(cfg, dict) else {}
|
|
if not isinstance(goals_cfg, dict):
|
|
return int(DEFAULT_MAX_TURNS or 20)
|
|
return max(1, int(goals_cfg.get("max_turns", DEFAULT_MAX_TURNS or 20) or 20))
|
|
except Exception:
|
|
return int(DEFAULT_MAX_TURNS or 20)
|
|
|
|
|
|
def _meta_key(session_id: str) -> str:
|
|
return f"goal:{session_id}"
|
|
|
|
|
|
def _profile_db(profile_home: str | Path):
|
|
"""Return a SessionDB pinned to *profile_home*, without reading HERMES_HOME.
|
|
|
|
The upstream Hermes GoalManager persists through hermes_cli.goals.load_goal(),
|
|
which resolves SessionDB from process-global HERMES_HOME. WebUI sessions are
|
|
profile-scoped and can run concurrently, so the WebUI bridge uses an explicit
|
|
state.db path whenever the caller provides the session's profile home.
|
|
"""
|
|
home = Path(profile_home).expanduser().resolve()
|
|
key = str(home)
|
|
cached = _DB_CACHE.get(key)
|
|
if cached is not None:
|
|
return cached
|
|
try:
|
|
from hermes_state import SessionDB # type: ignore
|
|
|
|
db = SessionDB(db_path=home / "state.db")
|
|
except Exception as exc: # pragma: no cover - import/env dependent
|
|
logger.debug("GoalManager profile DB unavailable for %s: %s", home, exc)
|
|
return None
|
|
_DB_CACHE[key] = db
|
|
return db
|
|
|
|
|
|
class _ProfileGoalManager:
|
|
"""Small WebUI-local GoalManager adapter with explicit profile persistence."""
|
|
|
|
def __init__(self, session_id: str, *, profile_home: str | Path, default_max_turns: int = 20):
|
|
if GoalState is None:
|
|
raise RuntimeError("Hermes goal state unavailable")
|
|
self.session_id = session_id
|
|
self.profile_home = Path(profile_home).expanduser().resolve()
|
|
self.default_max_turns = int(default_max_turns or DEFAULT_MAX_TURNS or 20)
|
|
self._state = self._load()
|
|
|
|
@property
|
|
def state(self):
|
|
return self._state
|
|
|
|
def _load(self):
|
|
db = _profile_db(self.profile_home)
|
|
if db is None or not self.session_id:
|
|
return None
|
|
try:
|
|
raw = db.get_meta(_meta_key(self.session_id))
|
|
except Exception as exc:
|
|
logger.debug("GoalManager profile get_meta failed: %s", exc)
|
|
return None
|
|
if not raw:
|
|
return None
|
|
try:
|
|
return GoalState.from_json(raw) # type: ignore[union-attr]
|
|
except Exception as exc:
|
|
logger.warning("GoalManager profile state parse failed for %s: %s", self.session_id, exc)
|
|
return None
|
|
|
|
def _save(self, state) -> None:
|
|
db = _profile_db(self.profile_home)
|
|
if db is None or not self.session_id or state is None:
|
|
return
|
|
try:
|
|
db.set_meta(_meta_key(self.session_id), state.to_json())
|
|
except Exception as exc:
|
|
logger.debug("GoalManager profile set_meta failed: %s", exc)
|
|
|
|
def is_active(self) -> bool:
|
|
return self._state is not None and self._state.status == "active"
|
|
|
|
def has_goal(self) -> bool:
|
|
return self._state is not None and self._state.status in ("active", "paused")
|
|
|
|
def status_line(self) -> str:
|
|
s = self._state
|
|
if s is None or s.status in ("cleared",):
|
|
return "No active goal. Set one with /goal <text>."
|
|
turns = f"{s.turns_used}/{s.max_turns} turns"
|
|
if s.status == "active":
|
|
return f"⊙ Goal (active, {turns}): {s.goal}"
|
|
if s.status == "paused":
|
|
extra = f" — {s.paused_reason}" if s.paused_reason else ""
|
|
return f"⏸ Goal (paused, {turns}{extra}): {s.goal}"
|
|
if s.status == "done":
|
|
return f"✓ Goal done ({turns}): {s.goal}"
|
|
return f"Goal ({s.status}, {turns}): {s.goal}"
|
|
|
|
def set(self, goal: str, *, max_turns: Optional[int] = None):
|
|
goal = (goal or "").strip()
|
|
if not goal:
|
|
raise ValueError("goal text is empty")
|
|
state = GoalState( # type: ignore[operator]
|
|
goal=goal,
|
|
status="active",
|
|
turns_used=0,
|
|
max_turns=int(max_turns) if max_turns else self.default_max_turns,
|
|
created_at=time.time(),
|
|
last_turn_at=0.0,
|
|
)
|
|
self._state = state
|
|
self._save(state)
|
|
return state
|
|
|
|
def pause(self, reason: str = "user-paused"):
|
|
if not self._state:
|
|
return None
|
|
self._state.status = "paused"
|
|
self._state.paused_reason = reason
|
|
self._save(self._state)
|
|
return self._state
|
|
|
|
def resume(self, *, reset_budget: bool = True):
|
|
if not self._state:
|
|
return None
|
|
self._state.status = "active"
|
|
self._state.paused_reason = None
|
|
if reset_budget:
|
|
self._state.turns_used = 0
|
|
self._save(self._state)
|
|
return self._state
|
|
|
|
def clear(self) -> None:
|
|
if self._state is None:
|
|
return
|
|
self._state.status = "cleared"
|
|
self._save(self._state)
|
|
self._state = None
|
|
|
|
def evaluate_after_turn(self, last_response: str, *, user_initiated: bool = True) -> Dict[str, Any]:
|
|
state = self._state
|
|
if state is None or state.status != "active":
|
|
return {
|
|
"status": state.status if state else None,
|
|
"should_continue": False,
|
|
"continuation_prompt": None,
|
|
"verdict": "inactive",
|
|
"reason": "no active goal",
|
|
"message": "",
|
|
}
|
|
|
|
state.turns_used += 1
|
|
state.last_turn_at = time.time()
|
|
|
|
if judge_goal is None:
|
|
verdict, reason = "continue", "goal judge unavailable"
|
|
else:
|
|
verdict, reason = judge_goal(state.goal, str(last_response or ""))
|
|
state.last_verdict = verdict
|
|
state.last_reason = reason
|
|
|
|
if verdict == "done":
|
|
state.status = "done"
|
|
self._save(state)
|
|
return {
|
|
"status": "done",
|
|
"should_continue": False,
|
|
"continuation_prompt": None,
|
|
"verdict": "done",
|
|
"reason": reason,
|
|
"message": f"✓ Goal achieved: {reason}",
|
|
}
|
|
|
|
if state.turns_used >= state.max_turns:
|
|
state.status = "paused"
|
|
state.paused_reason = f"turn budget exhausted ({state.turns_used}/{state.max_turns})"
|
|
self._save(state)
|
|
return {
|
|
"status": "paused",
|
|
"should_continue": False,
|
|
"continuation_prompt": None,
|
|
"verdict": "continue",
|
|
"reason": reason,
|
|
"message": (
|
|
f"⏸ Goal paused — {state.turns_used}/{state.max_turns} turns used. "
|
|
"Use /goal resume to keep going, or /goal clear to stop."
|
|
),
|
|
}
|
|
|
|
self._save(state)
|
|
return {
|
|
"status": "active",
|
|
"should_continue": True,
|
|
"continuation_prompt": self.next_continuation_prompt(),
|
|
"verdict": "continue",
|
|
"reason": reason,
|
|
"message": f"↻ Continuing toward goal ({state.turns_used}/{state.max_turns}): {reason}",
|
|
}
|
|
|
|
def next_continuation_prompt(self) -> Optional[str]:
|
|
if not self._state or self._state.status != "active":
|
|
return None
|
|
return CONTINUATION_PROMPT_TEMPLATE.format(goal=self._state.goal)
|
|
|
|
|
|
def _manager(session_id: str, *, profile_home: str | Path | None = None):
|
|
if GoalManager is None:
|
|
return None
|
|
if profile_home and GoalManager is _NativeGoalManager and GoalState is not None:
|
|
try:
|
|
return _ProfileGoalManager(
|
|
session_id=session_id,
|
|
profile_home=profile_home,
|
|
default_max_turns=_default_max_turns(),
|
|
)
|
|
except Exception as exc:
|
|
logger.debug("Profile-scoped GoalManager unavailable: %s", exc)
|
|
return None
|
|
return GoalManager(session_id=session_id, default_max_turns=_default_max_turns())
|
|
|
|
|
|
def _state_payload(state: Any) -> Optional[Dict[str, Any]]:
|
|
if state is None:
|
|
return None
|
|
return {
|
|
"goal": getattr(state, "goal", "") or "",
|
|
"status": getattr(state, "status", "") or "",
|
|
"turns_used": int(getattr(state, "turns_used", 0) or 0),
|
|
"max_turns": int(getattr(state, "max_turns", 0) or 0),
|
|
"last_verdict": getattr(state, "last_verdict", None),
|
|
"last_reason": getattr(state, "last_reason", None),
|
|
"paused_reason": getattr(state, "paused_reason", None),
|
|
}
|
|
|
|
|
|
def _payload(
|
|
*,
|
|
ok: bool = True,
|
|
action: str,
|
|
message: str,
|
|
state: Any = None,
|
|
error: str | None = None,
|
|
kickoff_prompt: str | None = None,
|
|
decision: Dict[str, Any] | None = None,
|
|
message_key: str | None = None,
|
|
message_args: list[Any] | None = None,
|
|
) -> Dict[str, Any]:
|
|
body: Dict[str, Any] = {
|
|
"ok": bool(ok),
|
|
"action": action,
|
|
"message": message,
|
|
"goal": _state_payload(state),
|
|
}
|
|
if error:
|
|
body["error"] = error
|
|
if kickoff_prompt:
|
|
body["kickoff_prompt"] = kickoff_prompt
|
|
if decision is not None:
|
|
body["decision"] = decision
|
|
if message_key:
|
|
body["message_key"] = message_key
|
|
if message_args is not None:
|
|
body["message_args"] = [a for a in message_args if a is not None]
|
|
return body
|
|
|
|
|
|
def _goal_status_payload(state: Any, *, default_message: str | None = None) -> Dict[str, Any]:
|
|
"""Build localized-status style payload fields from a goal state."""
|
|
if default_message is None:
|
|
default_message = "No active goal. Set one with /goal <text>."
|
|
if state is None:
|
|
return {"message": default_message, "message_key": "goal_status_none"}
|
|
status = str(getattr(state, "status", "") or "").strip()
|
|
if status in ("cleared",):
|
|
return {"message": default_message, "message_key": "goal_status_none"}
|
|
turns_used = int(getattr(state, "turns_used", 0) or 0)
|
|
max_turns = int(getattr(state, "max_turns", 0) or 0)
|
|
goal = str(getattr(state, "goal", "") or "")
|
|
if status == "active":
|
|
return {
|
|
"message": f"⊙ Goal (active, {turns_used}/{max_turns} turns): {goal}",
|
|
"message_key": "goal_status_active",
|
|
"message_args": [turns_used, max_turns, goal],
|
|
}
|
|
if status == "paused":
|
|
reason = str(getattr(state, "paused_reason", "") or "")
|
|
return {
|
|
"message": f"⏸ Goal (paused, {turns_used}/{max_turns}{' — ' + reason if reason else ''}): {goal}",
|
|
"message_key": "goal_status_paused",
|
|
"message_args": [turns_used, max_turns, reason, goal],
|
|
}
|
|
if status == "done":
|
|
return {
|
|
"message": f"✓ Goal done ({turns_used}/{max_turns}): {goal}",
|
|
"message_key": "goal_status_done",
|
|
"message_args": [turns_used, max_turns, goal],
|
|
}
|
|
return {
|
|
"message": f"Goal ({status}, {turns_used}/{max_turns}): {goal}",
|
|
"message_args": [status, turns_used, max_turns, goal],
|
|
}
|
|
|
|
|
|
def _extract_goal_turns_from_message(message: str) -> tuple[int, int]:
|
|
"""Best-effort extraction for continuation messages like '(1/20)'."""
|
|
if not message:
|
|
return 0, 0
|
|
match = re.search(r"\((\d+)\s*/\s*(\d+)\)", message)
|
|
if not match:
|
|
return 0, 0
|
|
try:
|
|
return int(match.group(1)), int(match.group(2))
|
|
except Exception:
|
|
return 0, 0
|
|
|
|
|
|
def _goal_decision_payload(
|
|
decision: Dict[str, Any],
|
|
state: Any,
|
|
) -> Dict[str, Any]:
|
|
"""Attach goal message i18n key/args to an evaluation decision."""
|
|
if not isinstance(decision, dict):
|
|
return decision
|
|
status = str(decision.get("status") or "").strip()
|
|
reason = str(decision.get("reason") or "").strip()
|
|
turns_used = int(getattr(state, "turns_used", 0) or 0)
|
|
max_turns = int(getattr(state, "max_turns", 0) or 0)
|
|
if (turns_used, max_turns) == (0, 0):
|
|
turns_used, max_turns = _extract_goal_turns_from_message(str(decision.get("message") or ""))
|
|
|
|
if status == "done":
|
|
return {
|
|
**decision,
|
|
"message_key": "goal_achieved",
|
|
"message_args": [reason],
|
|
}
|
|
if status == "paused":
|
|
return {
|
|
**decision,
|
|
"message_key": "goal_paused_budget_exhausted",
|
|
"message_args": [turns_used, max_turns],
|
|
}
|
|
if decision.get("should_continue"):
|
|
return {
|
|
**decision,
|
|
"message_key": "goal_continuing",
|
|
"message_args": [turns_used, max_turns, reason],
|
|
}
|
|
return decision
|
|
|
|
|
|
def goal_state_snapshot(session_id: str, *, profile_home: str | Path | None = None) -> Any:
|
|
"""Return a deep copy of current goal state for rollback before kickoff."""
|
|
mgr = _manager(str(session_id or ""), profile_home=profile_home)
|
|
if mgr is None:
|
|
return None
|
|
return copy.deepcopy(getattr(mgr, "state", None))
|
|
|
|
|
|
def restore_goal_state(session_id: str, snapshot: Any, *, profile_home: str | Path | None = None) -> None:
|
|
"""Restore a prior goal state after kickoff stream creation fails."""
|
|
mgr = _manager(str(session_id or ""), profile_home=profile_home)
|
|
if mgr is None:
|
|
return
|
|
if snapshot is None:
|
|
try:
|
|
mgr.clear()
|
|
except Exception:
|
|
pass
|
|
return
|
|
if isinstance(mgr, _ProfileGoalManager):
|
|
mgr._state = snapshot
|
|
mgr._save(snapshot)
|
|
return
|
|
try:
|
|
from hermes_cli.goals import save_goal # type: ignore
|
|
|
|
save_goal(str(session_id or ""), snapshot)
|
|
except Exception as exc: # pragma: no cover - native fallback only
|
|
logger.debug("Goal state restore failed for %s: %s", session_id, exc)
|
|
|
|
|
|
def goal_command_payload(
|
|
session_id: str,
|
|
args: str = "",
|
|
*,
|
|
stream_running: bool = False,
|
|
profile_home: str | Path | None = None,
|
|
) -> Dict[str, Any]:
|
|
"""Return the WebUI response payload for a /goal command.
|
|
|
|
Mirrors the gateway command semantics:
|
|
- /goal or /goal status shows status
|
|
- /goal pause pauses
|
|
- /goal resume resumes without auto-starting a turn
|
|
- /goal clear|stop|done clears
|
|
- /goal <text> sets a new active goal and returns kickoff_prompt so the
|
|
caller can start the first normal user-role turn immediately.
|
|
"""
|
|
sid = str(session_id or "").strip()
|
|
if not sid:
|
|
return _payload(ok=False, action="error", error="missing_session", message="session_id required")
|
|
|
|
mgr = _manager(sid, profile_home=profile_home)
|
|
if mgr is None:
|
|
return _payload(ok=False, action="error", error="unavailable", message="Goals unavailable on this session.")
|
|
|
|
text = str(args or "").strip()
|
|
lower = text.lower()
|
|
|
|
if not text or lower == "status":
|
|
state = getattr(mgr, "state", None)
|
|
status_payload = _goal_status_payload(state)
|
|
return _payload(action="status", state=state, **status_payload)
|
|
|
|
if lower == "pause":
|
|
state = mgr.pause(reason="user-paused")
|
|
if state is None:
|
|
return _payload(
|
|
ok=False,
|
|
action="pause",
|
|
error="no_goal",
|
|
message="No goal set.",
|
|
message_key="goal_no_goal",
|
|
)
|
|
return _payload(
|
|
action="pause",
|
|
message=f"⏸ Goal paused: {state.goal}",
|
|
message_key="goal_paused",
|
|
message_args=[str(state.goal)],
|
|
state=state,
|
|
)
|
|
|
|
if lower == "resume":
|
|
state = mgr.resume()
|
|
if state is None:
|
|
return _payload(
|
|
ok=False,
|
|
action="resume",
|
|
error="no_goal",
|
|
message="No goal to resume.",
|
|
message_key="goal_no_goal",
|
|
)
|
|
return _payload(
|
|
action="resume",
|
|
message=(
|
|
f"▶ Goal resumed: {state.goal}\n"
|
|
"Send a new message, or type continue, to kick it off."
|
|
),
|
|
message_key="goal_resumed",
|
|
message_args=[str(state.goal)],
|
|
state=state,
|
|
)
|
|
|
|
if lower in ("clear", "stop", "done"):
|
|
had = bool(mgr.has_goal())
|
|
mgr.clear()
|
|
return _payload(
|
|
action="clear",
|
|
message="Goal cleared." if had else "No active goal.",
|
|
message_key="goal_cleared" if had else "goal_no_goal",
|
|
state=getattr(mgr, "state", None),
|
|
)
|
|
|
|
if stream_running:
|
|
return _payload(
|
|
ok=False,
|
|
action="set",
|
|
error="agent_running",
|
|
message=(
|
|
"Agent is running — use /goal status / pause / clear mid-run, "
|
|
"or /stop before setting a new goal."
|
|
),
|
|
)
|
|
|
|
try:
|
|
state = mgr.set(text)
|
|
except ValueError as exc:
|
|
return _payload(ok=False, action="set", error="invalid_goal", message=f"Invalid goal: {exc}")
|
|
|
|
return _payload(
|
|
action="set",
|
|
message=(
|
|
f"⊙ Goal set ({state.max_turns}-turn budget): {state.goal}\n"
|
|
"I'll keep working until the goal is done, you pause/clear it, or the budget is exhausted.\n"
|
|
"Controls: /goal status · /goal pause · /goal resume · /goal clear"
|
|
),
|
|
message_key="goal_set",
|
|
message_args=[state.max_turns, state.goal],
|
|
state=state,
|
|
kickoff_prompt=state.goal,
|
|
)
|
|
|
|
|
|
def has_active_goal(
|
|
session_id: str,
|
|
*,
|
|
profile_home: str | Path | None = None,
|
|
) -> bool:
|
|
"""Return True when the session has an active standing goal to evaluate."""
|
|
sid = str(session_id or "").strip()
|
|
if not sid:
|
|
return False
|
|
mgr = _manager(sid, profile_home=profile_home)
|
|
if mgr is None:
|
|
return False
|
|
try:
|
|
return bool(mgr.is_active())
|
|
except Exception as exc:
|
|
logger.debug("goal active-state check failed for session=%s: %s", sid, exc)
|
|
return False
|
|
|
|
|
|
def evaluate_goal_after_turn(
|
|
session_id: str,
|
|
last_response: str,
|
|
*,
|
|
user_initiated: bool = True,
|
|
profile_home: str | Path | None = None,
|
|
) -> Dict[str, Any]:
|
|
"""Evaluate a completed turn against the standing goal, if any."""
|
|
sid = str(session_id or "").strip()
|
|
if not sid:
|
|
return {
|
|
"status": None,
|
|
"should_continue": False,
|
|
"continuation_prompt": None,
|
|
"verdict": "inactive",
|
|
"reason": "missing session_id",
|
|
"message": "",
|
|
}
|
|
mgr = _manager(sid, profile_home=profile_home)
|
|
if mgr is None:
|
|
return {
|
|
"status": None,
|
|
"should_continue": False,
|
|
"continuation_prompt": None,
|
|
"verdict": "inactive",
|
|
"reason": "goals unavailable",
|
|
"message": "",
|
|
}
|
|
try:
|
|
if not mgr.is_active():
|
|
return {
|
|
"status": getattr(getattr(mgr, "state", None), "status", None),
|
|
"should_continue": False,
|
|
"continuation_prompt": None,
|
|
"verdict": "inactive",
|
|
"reason": "no active goal",
|
|
"message": "",
|
|
}
|
|
decision = mgr.evaluate_after_turn(str(last_response or ""), user_initiated=user_initiated)
|
|
except Exception as exc:
|
|
logger.debug("goal evaluation failed for session=%s: %s", sid, exc)
|
|
return {
|
|
"status": None,
|
|
"should_continue": False,
|
|
"continuation_prompt": None,
|
|
"verdict": "error",
|
|
"reason": f"goal evaluation failed: {type(exc).__name__}",
|
|
"message": "",
|
|
}
|
|
if not isinstance(decision, dict):
|
|
decision = {}
|
|
decision.setdefault("should_continue", False)
|
|
decision.setdefault("continuation_prompt", None)
|
|
decision.setdefault("message", "")
|
|
decision = dict(decision)
|
|
decision = _goal_decision_payload(decision, getattr(mgr, "state", None))
|
|
return decision
|