"""WebUI bridge for Hermes persistent session goals.""" from __future__ import annotations import copy import logging import re import time from pathlib import Path from typing import Any, Dict, Optional logger = logging.getLogger(__name__) try: # Exposed as a module attribute so tests can monkeypatch it directly. from hermes_cli.goals import ( # type: ignore CONTINUATION_PROMPT_TEMPLATE, DEFAULT_MAX_TURNS, GoalManager as _NativeGoalManager, GoalState, judge_goal, ) except Exception: # pragma: no cover - depends on installed hermes-agent CONTINUATION_PROMPT_TEMPLATE = "" # type: ignore DEFAULT_MAX_TURNS = 20 # type: ignore _NativeGoalManager = None # type: ignore GoalState = None # type: ignore judge_goal = None # type: ignore GoalManager = _NativeGoalManager # type: ignore _DB_CACHE: dict[str, Any] = {} def _default_max_turns() -> int: """Return the configured /goal turn budget, defaulting to Hermes' 20 turns.""" try: from api import config as _config cfg = getattr(_config, "cfg", {}) or {} goals_cfg = cfg.get("goals", {}) if isinstance(cfg, dict) else {} if not isinstance(goals_cfg, dict): return int(DEFAULT_MAX_TURNS or 20) return max(1, int(goals_cfg.get("max_turns", DEFAULT_MAX_TURNS or 20) or 20)) except Exception: return int(DEFAULT_MAX_TURNS or 20) def _meta_key(session_id: str) -> str: return f"goal:{session_id}" def _profile_db(profile_home: str | Path): """Return a SessionDB pinned to *profile_home*, without reading HERMES_HOME. The upstream Hermes GoalManager persists through hermes_cli.goals.load_goal(), which resolves SessionDB from process-global HERMES_HOME. WebUI sessions are profile-scoped and can run concurrently, so the WebUI bridge uses an explicit state.db path whenever the caller provides the session's profile home. """ home = Path(profile_home).expanduser().resolve() key = str(home) cached = _DB_CACHE.get(key) if cached is not None: return cached try: from hermes_state import SessionDB # type: ignore db = SessionDB(db_path=home / "state.db") except Exception as exc: # pragma: no cover - import/env dependent logger.debug("GoalManager profile DB unavailable for %s: %s", home, exc) return None _DB_CACHE[key] = db return db class _ProfileGoalManager: """Small WebUI-local GoalManager adapter with explicit profile persistence.""" def __init__(self, session_id: str, *, profile_home: str | Path, default_max_turns: int = 20): if GoalState is None: raise RuntimeError("Hermes goal state unavailable") self.session_id = session_id self.profile_home = Path(profile_home).expanduser().resolve() self.default_max_turns = int(default_max_turns or DEFAULT_MAX_TURNS or 20) self._state = self._load() @property def state(self): return self._state def _load(self): db = _profile_db(self.profile_home) if db is None or not self.session_id: return None try: raw = db.get_meta(_meta_key(self.session_id)) except Exception as exc: logger.debug("GoalManager profile get_meta failed: %s", exc) return None if not raw: return None try: return GoalState.from_json(raw) # type: ignore[union-attr] except Exception as exc: logger.warning("GoalManager profile state parse failed for %s: %s", self.session_id, exc) return None def _save(self, state) -> None: db = _profile_db(self.profile_home) if db is None or not self.session_id or state is None: return try: db.set_meta(_meta_key(self.session_id), state.to_json()) except Exception as exc: logger.debug("GoalManager profile set_meta failed: %s", exc) def is_active(self) -> bool: return self._state is not None and self._state.status == "active" def has_goal(self) -> bool: return self._state is not None and self._state.status in ("active", "paused") def status_line(self) -> str: s = self._state if s is None or s.status in ("cleared",): return "No active goal. Set one with /goal ." turns = f"{s.turns_used}/{s.max_turns} turns" if s.status == "active": return f"⊙ Goal (active, {turns}): {s.goal}" if s.status == "paused": extra = f" — {s.paused_reason}" if s.paused_reason else "" return f"⏸ Goal (paused, {turns}{extra}): {s.goal}" if s.status == "done": return f"✓ Goal done ({turns}): {s.goal}" return f"Goal ({s.status}, {turns}): {s.goal}" def set(self, goal: str, *, max_turns: Optional[int] = None): goal = (goal or "").strip() if not goal: raise ValueError("goal text is empty") state = GoalState( # type: ignore[operator] goal=goal, status="active", turns_used=0, max_turns=int(max_turns) if max_turns else self.default_max_turns, created_at=time.time(), last_turn_at=0.0, ) self._state = state self._save(state) return state def pause(self, reason: str = "user-paused"): if not self._state: return None self._state.status = "paused" self._state.paused_reason = reason self._save(self._state) return self._state def resume(self, *, reset_budget: bool = True): if not self._state: return None self._state.status = "active" self._state.paused_reason = None if reset_budget: self._state.turns_used = 0 self._save(self._state) return self._state def clear(self) -> None: if self._state is None: return self._state.status = "cleared" self._save(self._state) self._state = None def evaluate_after_turn(self, last_response: str, *, user_initiated: bool = True) -> Dict[str, Any]: state = self._state if state is None or state.status != "active": return { "status": state.status if state else None, "should_continue": False, "continuation_prompt": None, "verdict": "inactive", "reason": "no active goal", "message": "", } state.turns_used += 1 state.last_turn_at = time.time() if judge_goal is None: verdict, reason = "continue", "goal judge unavailable" else: verdict, reason = judge_goal(state.goal, str(last_response or "")) state.last_verdict = verdict state.last_reason = reason if verdict == "done": state.status = "done" self._save(state) return { "status": "done", "should_continue": False, "continuation_prompt": None, "verdict": "done", "reason": reason, "message": f"✓ Goal achieved: {reason}", } if state.turns_used >= state.max_turns: state.status = "paused" state.paused_reason = f"turn budget exhausted ({state.turns_used}/{state.max_turns})" self._save(state) return { "status": "paused", "should_continue": False, "continuation_prompt": None, "verdict": "continue", "reason": reason, "message": ( f"⏸ Goal paused — {state.turns_used}/{state.max_turns} turns used. " "Use /goal resume to keep going, or /goal clear to stop." ), } self._save(state) return { "status": "active", "should_continue": True, "continuation_prompt": self.next_continuation_prompt(), "verdict": "continue", "reason": reason, "message": f"↻ Continuing toward goal ({state.turns_used}/{state.max_turns}): {reason}", } def next_continuation_prompt(self) -> Optional[str]: if not self._state or self._state.status != "active": return None return CONTINUATION_PROMPT_TEMPLATE.format(goal=self._state.goal) def _manager(session_id: str, *, profile_home: str | Path | None = None): if GoalManager is None: return None if profile_home and GoalManager is _NativeGoalManager and GoalState is not None: try: return _ProfileGoalManager( session_id=session_id, profile_home=profile_home, default_max_turns=_default_max_turns(), ) except Exception as exc: logger.debug("Profile-scoped GoalManager unavailable: %s", exc) return None return GoalManager(session_id=session_id, default_max_turns=_default_max_turns()) def _state_payload(state: Any) -> Optional[Dict[str, Any]]: if state is None: return None return { "goal": getattr(state, "goal", "") or "", "status": getattr(state, "status", "") or "", "turns_used": int(getattr(state, "turns_used", 0) or 0), "max_turns": int(getattr(state, "max_turns", 0) or 0), "last_verdict": getattr(state, "last_verdict", None), "last_reason": getattr(state, "last_reason", None), "paused_reason": getattr(state, "paused_reason", None), } def _payload( *, ok: bool = True, action: str, message: str, state: Any = None, error: str | None = None, kickoff_prompt: str | None = None, decision: Dict[str, Any] | None = None, message_key: str | None = None, message_args: list[Any] | None = None, ) -> Dict[str, Any]: body: Dict[str, Any] = { "ok": bool(ok), "action": action, "message": message, "goal": _state_payload(state), } if error: body["error"] = error if kickoff_prompt: body["kickoff_prompt"] = kickoff_prompt if decision is not None: body["decision"] = decision if message_key: body["message_key"] = message_key if message_args is not None: body["message_args"] = [a for a in message_args if a is not None] return body def _goal_status_payload(state: Any, *, default_message: str | None = None) -> Dict[str, Any]: """Build localized-status style payload fields from a goal state.""" if default_message is None: default_message = "No active goal. Set one with /goal ." if state is None: return {"message": default_message, "message_key": "goal_status_none"} status = str(getattr(state, "status", "") or "").strip() if status in ("cleared",): return {"message": default_message, "message_key": "goal_status_none"} turns_used = int(getattr(state, "turns_used", 0) or 0) max_turns = int(getattr(state, "max_turns", 0) or 0) goal = str(getattr(state, "goal", "") or "") if status == "active": return { "message": f"⊙ Goal (active, {turns_used}/{max_turns} turns): {goal}", "message_key": "goal_status_active", "message_args": [turns_used, max_turns, goal], } if status == "paused": reason = str(getattr(state, "paused_reason", "") or "") return { "message": f"⏸ Goal (paused, {turns_used}/{max_turns}{' — ' + reason if reason else ''}): {goal}", "message_key": "goal_status_paused", "message_args": [turns_used, max_turns, reason, goal], } if status == "done": return { "message": f"✓ Goal done ({turns_used}/{max_turns}): {goal}", "message_key": "goal_status_done", "message_args": [turns_used, max_turns, goal], } return { "message": f"Goal ({status}, {turns_used}/{max_turns}): {goal}", "message_args": [status, turns_used, max_turns, goal], } def _extract_goal_turns_from_message(message: str) -> tuple[int, int]: """Best-effort extraction for continuation messages like '(1/20)'.""" if not message: return 0, 0 match = re.search(r"\((\d+)\s*/\s*(\d+)\)", message) if not match: return 0, 0 try: return int(match.group(1)), int(match.group(2)) except Exception: return 0, 0 def _goal_decision_payload( decision: Dict[str, Any], state: Any, ) -> Dict[str, Any]: """Attach goal message i18n key/args to an evaluation decision.""" if not isinstance(decision, dict): return decision status = str(decision.get("status") or "").strip() reason = str(decision.get("reason") or "").strip() turns_used = int(getattr(state, "turns_used", 0) or 0) max_turns = int(getattr(state, "max_turns", 0) or 0) if (turns_used, max_turns) == (0, 0): turns_used, max_turns = _extract_goal_turns_from_message(str(decision.get("message") or "")) if status == "done": return { **decision, "message_key": "goal_achieved", "message_args": [reason], } if status == "paused": return { **decision, "message_key": "goal_paused_budget_exhausted", "message_args": [turns_used, max_turns], } if decision.get("should_continue"): return { **decision, "message_key": "goal_continuing", "message_args": [turns_used, max_turns, reason], } return decision def goal_state_snapshot(session_id: str, *, profile_home: str | Path | None = None) -> Any: """Return a deep copy of current goal state for rollback before kickoff.""" mgr = _manager(str(session_id or ""), profile_home=profile_home) if mgr is None: return None return copy.deepcopy(getattr(mgr, "state", None)) def restore_goal_state(session_id: str, snapshot: Any, *, profile_home: str | Path | None = None) -> None: """Restore a prior goal state after kickoff stream creation fails.""" mgr = _manager(str(session_id or ""), profile_home=profile_home) if mgr is None: return if snapshot is None: try: mgr.clear() except Exception: pass return if isinstance(mgr, _ProfileGoalManager): mgr._state = snapshot mgr._save(snapshot) return try: from hermes_cli.goals import save_goal # type: ignore save_goal(str(session_id or ""), snapshot) except Exception as exc: # pragma: no cover - native fallback only logger.debug("Goal state restore failed for %s: %s", session_id, exc) def goal_command_payload( session_id: str, args: str = "", *, stream_running: bool = False, profile_home: str | Path | None = None, ) -> Dict[str, Any]: """Return the WebUI response payload for a /goal command. Mirrors the gateway command semantics: - /goal or /goal status shows status - /goal pause pauses - /goal resume resumes without auto-starting a turn - /goal clear|stop|done clears - /goal sets a new active goal and returns kickoff_prompt so the caller can start the first normal user-role turn immediately. """ sid = str(session_id or "").strip() if not sid: return _payload(ok=False, action="error", error="missing_session", message="session_id required") mgr = _manager(sid, profile_home=profile_home) if mgr is None: return _payload(ok=False, action="error", error="unavailable", message="Goals unavailable on this session.") text = str(args or "").strip() lower = text.lower() if not text or lower == "status": state = getattr(mgr, "state", None) status_payload = _goal_status_payload(state) return _payload(action="status", state=state, **status_payload) if lower == "pause": state = mgr.pause(reason="user-paused") if state is None: return _payload( ok=False, action="pause", error="no_goal", message="No goal set.", message_key="goal_no_goal", ) return _payload( action="pause", message=f"⏸ Goal paused: {state.goal}", message_key="goal_paused", message_args=[str(state.goal)], state=state, ) if lower == "resume": state = mgr.resume() if state is None: return _payload( ok=False, action="resume", error="no_goal", message="No goal to resume.", message_key="goal_no_goal", ) return _payload( action="resume", message=( f"▶ Goal resumed: {state.goal}\n" "Send a new message, or type continue, to kick it off." ), message_key="goal_resumed", message_args=[str(state.goal)], state=state, ) if lower in ("clear", "stop", "done"): had = bool(mgr.has_goal()) mgr.clear() return _payload( action="clear", message="Goal cleared." if had else "No active goal.", message_key="goal_cleared" if had else "goal_no_goal", state=getattr(mgr, "state", None), ) if stream_running: return _payload( ok=False, action="set", error="agent_running", message=( "Agent is running — use /goal status / pause / clear mid-run, " "or /stop before setting a new goal." ), ) try: state = mgr.set(text) except ValueError as exc: return _payload(ok=False, action="set", error="invalid_goal", message=f"Invalid goal: {exc}") return _payload( action="set", message=( f"⊙ Goal set ({state.max_turns}-turn budget): {state.goal}\n" "I'll keep working until the goal is done, you pause/clear it, or the budget is exhausted.\n" "Controls: /goal status · /goal pause · /goal resume · /goal clear" ), message_key="goal_set", message_args=[state.max_turns, state.goal], state=state, kickoff_prompt=state.goal, ) def has_active_goal( session_id: str, *, profile_home: str | Path | None = None, ) -> bool: """Return True when the session has an active standing goal to evaluate.""" sid = str(session_id or "").strip() if not sid: return False mgr = _manager(sid, profile_home=profile_home) if mgr is None: return False try: return bool(mgr.is_active()) except Exception as exc: logger.debug("goal active-state check failed for session=%s: %s", sid, exc) return False def evaluate_goal_after_turn( session_id: str, last_response: str, *, user_initiated: bool = True, profile_home: str | Path | None = None, ) -> Dict[str, Any]: """Evaluate a completed turn against the standing goal, if any.""" sid = str(session_id or "").strip() if not sid: return { "status": None, "should_continue": False, "continuation_prompt": None, "verdict": "inactive", "reason": "missing session_id", "message": "", } mgr = _manager(sid, profile_home=profile_home) if mgr is None: return { "status": None, "should_continue": False, "continuation_prompt": None, "verdict": "inactive", "reason": "goals unavailable", "message": "", } try: if not mgr.is_active(): return { "status": getattr(getattr(mgr, "state", None), "status", None), "should_continue": False, "continuation_prompt": None, "verdict": "inactive", "reason": "no active goal", "message": "", } decision = mgr.evaluate_after_turn(str(last_response or ""), user_initiated=user_initiated) except Exception as exc: logger.debug("goal evaluation failed for session=%s: %s", sid, exc) return { "status": None, "should_continue": False, "continuation_prompt": None, "verdict": "error", "reason": f"goal evaluation failed: {type(exc).__name__}", "message": "", } if not isinstance(decision, dict): decision = {} decision.setdefault("should_continue", False) decision.setdefault("continuation_prompt", None) decision.setdefault("message", "") decision = dict(decision) decision = _goal_decision_payload(decision, getattr(mgr, "state", None)) return decision