Stage 288: PR #1571 — cron profile isolation (closes #1573) by @kowenhaoai — APPROVED + reviewer fix + post-review tightening

This commit is contained in:
Hermes Bot
2026-05-03 22:37:43 +00:00
3 changed files with 424 additions and 15 deletions
+129
View File
@@ -139,6 +139,135 @@ def get_active_hermes_home() -> Path:
# ── Cron-call profile isolation (issue: Scheduled jobs ignored active profile) ─
# `cron.jobs` reads HERMES_HOME from os.environ (process-global) at function-
# call time. That bypasses our per-request thread-local profile, so the
# `/api/crons*` endpoints always returned the process-default profile's jobs.
# This context manager swaps HERMES_HOME (and the cached module-level constants
# in cron.jobs) for the duration of a cron call, serialized by a lock so
# concurrent requests from different profiles don't race on the global env var.
#
# Thread-safety note on os.environ mutation:
# CPython's os.environ assignment is GIL-protected at the bytecode level, but
# multi-step read-modify-write sequences (snapshot prev → assign new → restore
# on exit) are NOT atomic without explicit serialization. The _cron_env_lock
# below makes the entire context-manager body run-to-completion serially, so
# all webui access to HERMES_HOME goes through one thread at a time. Any
# subprocess.Popen() call inside `run_job` inherits the env at fork time,
# which is also under the lock — so child processes always see a consistent
# (own-profile) HERMES_HOME, never a half-swapped state.
_cron_env_lock = threading.Lock()
class cron_profile_context_for_home:
"""Context manager that pins HERMES_HOME to an explicit profile home path.
Use this variant from worker threads that don't have TLS context (e.g. the
background thread started by /api/crons/run). The HTTP-side variant below
resolves the home via TLS.
"""
def __init__(self, home: Path):
self._home = Path(home)
def __enter__(self):
_cron_env_lock.acquire()
try:
self._prev_env = os.environ.get('HERMES_HOME')
os.environ['HERMES_HOME'] = str(self._home)
# Re-patch cron.jobs module-level constants (see main context manager
# below for the rationale).
self._prev_cj = None
try:
import cron.jobs as _cj
self._prev_cj = (_cj.HERMES_DIR, _cj.CRON_DIR, _cj.JOBS_FILE, _cj.OUTPUT_DIR)
_cj.HERMES_DIR = self._home
_cj.CRON_DIR = self._home / 'cron'
_cj.JOBS_FILE = _cj.CRON_DIR / 'jobs.json'
_cj.OUTPUT_DIR = _cj.CRON_DIR / 'output'
except (ImportError, AttributeError):
logger.debug("cron_profile_context_for_home: cron.jobs unavailable")
except Exception:
_cron_env_lock.release()
raise
return self
def __exit__(self, exc_type, exc_val, exc_tb):
try:
if self._prev_env is None:
os.environ.pop('HERMES_HOME', None)
else:
os.environ['HERMES_HOME'] = self._prev_env
if self._prev_cj is not None:
try:
import cron.jobs as _cj
_cj.HERMES_DIR, _cj.CRON_DIR, _cj.JOBS_FILE, _cj.OUTPUT_DIR = self._prev_cj
except (ImportError, AttributeError):
pass
finally:
_cron_env_lock.release()
return False
class cron_profile_context:
"""Context manager that pins HERMES_HOME to the TLS-active profile.
Usage:
with cron_profile_context():
from cron.jobs import list_jobs
jobs = list_jobs(include_disabled=True)
Serializes cron API calls across profiles (cron API is low-frequency;
serialization cost is negligible compared to correctness).
"""
def __enter__(self):
_cron_env_lock.acquire()
try:
self._prev_env = os.environ.get('HERMES_HOME')
home = get_active_hermes_home()
os.environ['HERMES_HOME'] = str(home)
# Re-patch cron.jobs module-level constants. They are snapshot at
# import time (line 68-71 of cron/jobs.py) and don't participate in
# the module's __getattr__ lazy path, so env-var alone is not enough
# for callers that reference the module constants directly.
self._prev_cj = None
try:
import cron.jobs as _cj
self._prev_cj = (_cj.HERMES_DIR, _cj.CRON_DIR, _cj.JOBS_FILE, _cj.OUTPUT_DIR)
_cj.HERMES_DIR = home
_cj.CRON_DIR = home / 'cron'
_cj.JOBS_FILE = _cj.CRON_DIR / 'jobs.json'
_cj.OUTPUT_DIR = _cj.CRON_DIR / 'output'
except (ImportError, AttributeError):
logger.debug("cron_profile_context: cron.jobs unavailable; env-var only")
except Exception:
_cron_env_lock.release()
raise
return self
def __exit__(self, exc_type, exc_val, exc_tb):
try:
# Restore env var
if self._prev_env is None:
os.environ.pop('HERMES_HOME', None)
else:
os.environ['HERMES_HOME'] = self._prev_env
# Restore cron.jobs module constants
if self._prev_cj is not None:
try:
import cron.jobs as _cj
_cj.HERMES_DIR, _cj.CRON_DIR, _cj.JOBS_FILE, _cj.OUTPUT_DIR = self._prev_cj
except (ImportError, AttributeError):
pass
finally:
_cron_env_lock.release()
return False
def get_hermes_home_for_profile(name: str) -> Path:
"""Return the HERMES_HOME Path for *name* without mutating any process state.
+96 -15
View File
@@ -180,12 +180,33 @@ def _cron_output_content_window(text: str, limit: int = _CRON_OUTPUT_CONTENT_LIM
return text[-limit:]
def _run_cron_tracked(job):
"""Wrapper that tracks running state around cron.scheduler.run_job."""
def _run_cron_tracked(job, profile_home=None):
"""Wrapper that tracks running state around cron.scheduler.run_job.
``profile_home`` pins HERMES_HOME for this worker thread so output files
and run metadata land in the profile that triggered the run, not the
process-global default. Captured at dispatch time because the thread runs
after the HTTP request (and its TLS profile) has already been cleared.
"""
from cron.scheduler import run_job # import here — runs inside a worker thread
from cron.jobs import mark_job_run, save_job_output
job_id = job.get("id", "")
# Pin HERMES_HOME for the duration of this thread using a dedicated
# context manager variant that accepts the profile home directly
# (threads have no TLS, so get_active_hermes_home() can't resolve).
ctx = None
if profile_home is not None:
try:
from api.profiles import cron_profile_context_for_home
ctx = cron_profile_context_for_home(profile_home)
ctx.__enter__()
except Exception:
logger.exception("Failed to pin profile %s for cron run", profile_home)
ctx = None
try:
success, output, final_response, error = run_job(job)
save_job_output(job_id, output)
@@ -204,6 +225,11 @@ def _run_cron_tracked(job):
except Exception:
logger.debug("Failed to mark manual cron run failure for %s", job_id)
finally:
if ctx is not None:
try:
ctx.__exit__(None, None, None)
except Exception:
logger.debug("Failed to release cron_profile_context for %s", job_id)
_mark_cron_done(job_id)
_PROVIDER_ALIASES = {
@@ -2177,25 +2203,45 @@ def handle_get(handler, parsed) -> bool:
return # SSE handled, no JSON response
# ── Cron API (GET) ──
# All cron handlers touch cron.jobs which resolves HERMES_HOME from
# os.environ (process-global) at call time. Wrap in cron_profile_context
# so the TLS-active profile's jobs.json is read, not the process default.
if parsed.path == "/api/crons":
from cron.jobs import list_jobs
from api.profiles import cron_profile_context
return j(handler, {"jobs": list_jobs(include_disabled=True)})
with cron_profile_context():
return j(handler, {"jobs": list_jobs(include_disabled=True)})
if parsed.path == "/api/crons/output":
return _handle_cron_output(handler, parsed)
from api.profiles import cron_profile_context
with cron_profile_context():
return _handle_cron_output(handler, parsed)
if parsed.path == "/api/crons/history":
return _handle_cron_history(handler, parsed)
from api.profiles import cron_profile_context
with cron_profile_context():
return _handle_cron_history(handler, parsed)
if parsed.path == "/api/crons/run":
return _handle_cron_run_detail(handler, parsed)
from api.profiles import cron_profile_context
with cron_profile_context():
return _handle_cron_run_detail(handler, parsed)
if parsed.path == "/api/crons/recent":
return _handle_cron_recent(handler, parsed)
from api.profiles import cron_profile_context
with cron_profile_context():
return _handle_cron_recent(handler, parsed)
if parsed.path == "/api/crons/status":
return _handle_cron_status(handler, parsed)
from api.profiles import cron_profile_context
with cron_profile_context():
return _handle_cron_status(handler, parsed)
# ── Skills API (GET) ──
if parsed.path == "/api/skills":
@@ -2892,23 +2938,43 @@ def handle_post(handler, parsed) -> bool:
return _handle_terminal_close(handler, body)
# ── Cron API (POST) ──
# See GET-side comment above: wrap in cron_profile_context so writes go
# to the TLS-active profile's jobs.json instead of the process default.
if parsed.path == "/api/crons/create":
return _handle_cron_create(handler, body)
from api.profiles import cron_profile_context
with cron_profile_context():
return _handle_cron_create(handler, body)
if parsed.path == "/api/crons/update":
return _handle_cron_update(handler, body)
from api.profiles import cron_profile_context
with cron_profile_context():
return _handle_cron_update(handler, body)
if parsed.path == "/api/crons/delete":
return _handle_cron_delete(handler, body)
from api.profiles import cron_profile_context
with cron_profile_context():
return _handle_cron_delete(handler, body)
if parsed.path == "/api/crons/run":
return _handle_cron_run(handler, body)
from api.profiles import cron_profile_context
with cron_profile_context():
return _handle_cron_run(handler, body)
if parsed.path == "/api/crons/pause":
return _handle_cron_pause(handler, body)
from api.profiles import cron_profile_context
with cron_profile_context():
return _handle_cron_pause(handler, body)
if parsed.path == "/api/crons/resume":
return _handle_cron_resume(handler, body)
from api.profiles import cron_profile_context
with cron_profile_context():
return _handle_cron_resume(handler, body)
# ── File ops (POST) ──
if parsed.path == "/api/file/delete":
@@ -5166,7 +5232,22 @@ def _handle_cron_run(handler, body):
return j(handler, {"ok": False, "job_id": job_id, "status": "already_running",
"elapsed": round(elapsed, 1)})
_mark_cron_running(job_id)
threading.Thread(target=_run_cron_tracked, args=(job,), daemon=True).start()
# Capture the TLS-active profile home now — the thread runs after the
# request finishes, so TLS is gone by then.
#
# Resolve directly without a try/except: get_active_hermes_home() does
# in-memory dict reads + a single Path.is_dir() stat, so the only way
# it could raise from inside a request handler is if api.profiles
# itself partially failed to import (in which case we'd already be
# 500-ing the whole request). A silent fallback to None here would
# re-introduce the exact bug #1573 fixes — the worker thread would
# run unpinned against the process-global HERMES_HOME — so we'd
# rather let any unexpected exception 500 the request than corrupt
# cross-profile state.
from api.profiles import get_active_hermes_home
_profile_home = get_active_hermes_home()
threading.Thread(target=_run_cron_tracked, args=(job, _profile_home), daemon=True).start()
return j(handler, {"ok": True, "job_id": job_id, "status": "running"})
@@ -0,0 +1,199 @@
"""Regression test: /api/crons must read jobs.json from the *active profile*.
Before the fix, `cron.jobs.list_jobs()` resolved HERMES_HOME from os.environ
at call time, ignoring the WebUI's per-request thread-local profile. So the
Scheduled Jobs panel showed the process-default profile's jobs regardless of
which profile the user had selected in the cookie.
This test writes two distinct jobs.json files (default + a named profile),
then verifies `cron_profile_context` pins the cron.jobs call to the named
profile's file.
"""
import json
import os
import pathlib
import sys
import threading
from unittest import mock
import pytest
# Ensure both repos are importable.
WEBUI_ROOT = pathlib.Path(__file__).resolve().parent.parent
AGENT_ROOT = pathlib.Path(os.environ.get("HERMES_AGENT_ROOT", pathlib.Path.home() / "hermes-agent"))
for p in (str(WEBUI_ROOT), str(AGENT_ROOT)):
if p not in sys.path:
sys.path.insert(0, p)
def _write_jobs(home: pathlib.Path, jobs: list):
cron_dir = home / "cron"
cron_dir.mkdir(parents=True, exist_ok=True)
(cron_dir / "jobs.json").write_text(
json.dumps({"jobs": jobs}), encoding="utf-8"
)
def test_cron_profile_context_pins_profile_home(tmp_path, monkeypatch):
"""The context manager should swap cron.jobs to read from the named profile."""
pytest.importorskip("cron.jobs") # auto-skip when hermes-agent is unavailable
default_home = tmp_path / "default_home"
meow_home = tmp_path / "default_home" / "profiles" / "meow"
_write_jobs(default_home, [{"id": "d1", "name": "default-job"}])
_write_jobs(meow_home, [{"id": "m1", "name": "meow-job"}])
# Point base at default_home; HERMES_HOME env starts at default.
monkeypatch.setenv("HERMES_HOME", str(default_home))
from api import profiles as p
monkeypatch.setattr(p, "_DEFAULT_HERMES_HOME", default_home)
# Baseline: no context → default profile.
from cron.jobs import list_jobs
# Force cron.jobs to re-evaluate its cached constants for this test run.
import cron.jobs as _cj
_cj.HERMES_DIR = default_home
_cj.CRON_DIR = default_home / "cron"
_cj.JOBS_FILE = _cj.CRON_DIR / "jobs.json"
_cj.OUTPUT_DIR = _cj.CRON_DIR / "output"
jobs_before = list_jobs(include_disabled=True)
assert any(j["id"] == "d1" for j in jobs_before), \
f"Expected default-profile job before entering context, got {jobs_before}"
# Simulate a request with TLS profile = 'meow'.
p.set_request_profile("meow")
try:
with p.cron_profile_context():
jobs_inside = list_jobs(include_disabled=True)
assert any(j["id"] == "m1" for j in jobs_inside), \
f"Expected meow-profile job inside context, got {jobs_inside}"
assert not any(j["id"] == "d1" for j in jobs_inside), \
"Default-profile job leaked into meow context"
finally:
p.clear_request_profile()
# After the context exits, we should be back to default.
jobs_after = list_jobs(include_disabled=True)
assert any(j["id"] == "d1" for j in jobs_after), \
f"Expected default-profile job after exiting context, got {jobs_after}"
def test_cron_profile_context_for_home_pins_explicit_home(tmp_path):
"""Thread variant: pin by explicit path (no TLS)."""
pytest.importorskip("cron.jobs") # auto-skip when hermes-agent is unavailable
home_a = tmp_path / "a"
home_b = tmp_path / "b"
_write_jobs(home_a, [{"id": "a1", "name": "A"}])
_write_jobs(home_b, [{"id": "b1", "name": "B"}])
# Start with env pointing at A.
prev = os.environ.get("HERMES_HOME")
os.environ["HERMES_HOME"] = str(home_a)
try:
import cron.jobs as _cj
_cj.HERMES_DIR = home_a
_cj.CRON_DIR = home_a / "cron"
_cj.JOBS_FILE = _cj.CRON_DIR / "jobs.json"
_cj.OUTPUT_DIR = _cj.CRON_DIR / "output"
from cron.jobs import list_jobs
from api.profiles import cron_profile_context_for_home
assert any(j["id"] == "a1" for j in list_jobs(include_disabled=True))
with cron_profile_context_for_home(home_b):
jobs_inside = list_jobs(include_disabled=True)
assert any(j["id"] == "b1" for j in jobs_inside), jobs_inside
assert not any(j["id"] == "a1" for j in jobs_inside), jobs_inside
# Restored to A.
assert any(j["id"] == "a1" for j in list_jobs(include_disabled=True))
finally:
if prev is None:
os.environ.pop("HERMES_HOME", None)
else:
os.environ["HERMES_HOME"] = prev
def test_cron_profile_context_serializes_concurrent_access(tmp_path):
"""The lock must prevent concurrent contexts from interleaving."""
from api.profiles import cron_profile_context_for_home
home_a = tmp_path / "a"
home_b = tmp_path / "b"
home_a.mkdir()
home_b.mkdir()
# Ensure the context lock is released between tests.
from api import profiles as p
assert not p._cron_env_lock.locked(), \
"Lock leaked from a previous test"
observed = []
barrier = threading.Barrier(2)
def worker(home, tag):
barrier.wait()
with cron_profile_context_for_home(home):
observed.append(("enter", tag, os.environ["HERMES_HOME"]))
# If serialization works, the partner thread cannot be inside
# its own context at this moment.
observed.append(("exit", tag))
t1 = threading.Thread(target=worker, args=(home_a, "A"))
t2 = threading.Thread(target=worker, args=(home_b, "B"))
t1.start(); t2.start()
t1.join(); t2.join()
# Every enter must be immediately followed by its matching exit (no
# interleaving), because the lock serializes the two contexts.
assert len(observed) == 4
first, second, third, fourth = observed
assert first[0] == "enter" and second[0] == "exit" and first[1] == second[1]
assert third[0] == "enter" and fourth[0] == "exit" and third[1] == fourth[1]
def test_cron_run_does_not_silently_swallow_profile_resolution_errors():
"""_handle_cron_run must NOT silently fall through to profile_home=None
when get_active_hermes_home() raises.
A silent fallback would re-introduce the exact bug #1573 fixes — the
worker thread would run unpinned against the process-global HERMES_HOME,
silently corrupting cross-profile state. We'd rather 500 the request
than risk that, since get_active_hermes_home() raising at all from
inside a request handler means api.profiles is in a state we shouldn't
be making cron decisions in.
Source-level assertion to catch any future re-introduction of the
over-broad except clause.
"""
from pathlib import Path
src = (Path(__file__).resolve().parent.parent / "api" / "routes.py").read_text(encoding="utf-8")
# Locate _handle_cron_run definition; assert the spawn block does NOT
# wrap get_active_hermes_home() in a bare except that falls back to None.
idx = src.find("def _handle_cron_run(handler, body):")
assert idx != -1, "_handle_cron_run not found"
body = src[idx : idx + 4000]
# The spawn site must call get_active_hermes_home() unguarded (no
# try/except around it specifically), because a silent fallback to None
# is exactly what would re-introduce #1573.
spawn_idx = body.find("threading.Thread(target=_run_cron_tracked")
assert spawn_idx != -1, "thread spawn not found in _handle_cron_run"
# Look at the 1500 chars before the spawn — should NOT contain the
# `_profile_home = None` fallback pattern.
pre_spawn = body[max(0, spawn_idx - 1500) : spawn_idx]
assert "_profile_home = None" not in pre_spawn, (
"_handle_cron_run silently falls back to _profile_home=None when "
"get_active_hermes_home() raises. That re-introduces bug #1573 — "
"the worker thread would run unpinned against the process-global "
"HERMES_HOME. Let the exception propagate (500 the request) rather "
"than corrupt cross-profile state silently."
)