Files
nesquena-hermes 9c69b646ff feat(commands): /background, /btw slash commands + undo button + reasoning chip
Rebased onto master after #931 (aux title routing) to resolve streaming.py conflict.
All changes from both PRs are cleanly integrated.

2088 tests passing (2065 master + 23 from #931).

Co-authored-by: bergeouss <bergeouss@gmail.com>
2026-04-24 01:24:51 +00:00

88 lines
2.9 KiB
Python

"""Background and ephemeral task tracking for /background and /btw commands."""
from __future__ import annotations
import logging
import threading
import time
from typing import Any
logger = logging.getLogger(__name__)
_lock = threading.Lock()
# parent_session_id -> list of task dicts
_BACKGROUND_TASKS: dict[str, list[dict[str, Any]]] = {}
# btw ephemeral session tracking: parent_sid -> {ephemeral_sid, stream_id, question}
_BTW_TRACKING: dict[str, dict[str, Any]] = {}
def track_background(parent_sid: str, bg_sid: str, stream_id: str,
task_id: str, prompt: str) -> None:
with _lock:
_BACKGROUND_TASKS.setdefault(parent_sid, []).append({
"task_id": task_id,
"bg_session_id": bg_sid,
"stream_id": stream_id,
"prompt": prompt,
"status": "running",
"started_at": time.time(),
"answer": None,
"completed_at": None,
})
def track_btw(parent_sid: str, ephemeral_sid: str, stream_id: str,
question: str) -> None:
with _lock:
_BTW_TRACKING[parent_sid] = {
"ephemeral_session_id": ephemeral_sid,
"stream_id": stream_id,
"question": question,
}
def complete_background(parent_sid: str, task_id: str, answer: str) -> None:
with _lock:
for t in _BACKGROUND_TASKS.get(parent_sid, []):
if t["task_id"] == task_id and t["status"] == "running":
t["status"] = "done"
t["answer"] = answer
t["completed_at"] = time.time()
break
def get_results(parent_sid: str) -> list[dict[str, Any]]:
"""Return completed background task results and remove only the done ones
from tracking. Tasks still in ``status="running"`` MUST stay in the list
so that ``complete_background()`` can still find them when the worker
thread finishes — otherwise the first poll during a long-running task
silently drops it and the result is lost forever.
"""
with _lock:
tasks = _BACKGROUND_TASKS.get(parent_sid, [])
done = [t for t in tasks if t["status"] == "done"]
still_running = [t for t in tasks if t["status"] != "done"]
if still_running:
_BACKGROUND_TASKS[parent_sid] = still_running
else:
_BACKGROUND_TASKS.pop(parent_sid, None)
return [{
"task_id": t["task_id"],
"prompt": t["prompt"],
"answer": t["answer"],
"completed_at": t["completed_at"],
} for t in done]
def get_background_tasks(parent_sid: str) -> list[dict[str, Any]]:
"""Return all background tasks (running and done) for a parent session."""
with _lock:
return list(_BACKGROUND_TASKS.get(parent_sid, []))
def cleanup_btw(parent_sid: str) -> dict[str, Any] | None:
"""Remove and return btw tracking for a parent session."""
with _lock:
return _BTW_TRACKING.pop(parent_sid, None)