Files
hermes-webui/tests/test_sprint3.py
T
nesquena-hermes 7189416969 fix: batch v0.50.234-235 — XSS hardening, workspace validation, profile switch fixes (#1206)
fix: batch v0.50.234-235 — XSS hardening, workspace validation, profile switch fixes

v0.50.235 (#1203 — profile switch workspace/model/chip, 3 bugs + flaky test):
- switch_profile now reads target profile's workspace directly (thread-local bypass)
- invalidate_models_cache() after profile switch (model dropdown staleness)
- syncTopbar() updates chip before early-return (no-session path)

v0.50.234 (#1201/#1205 — XSS hardening + workspace security):
- renderMd() full HTML attribute sanitizer replacing tag-name-only allowlist
- Delegated image lightbox (removes all inline onclick)
- macOS /etc → /private/etc symlink bypass fixed
- /System /Library added to blocked workspace roots
- Legacy /api/chat workspace trust gap closed

Both PRs independently reviewed. 2787/2787 tests. QA harness 20/20 + 11/11 API checks.

Co-authored-by: Brendan Schmid <bschmidy10@Wilson.bschmidy10>
Co-authored-by: Nathan Esquenazi <nesquena@gmail.com>
2026-04-27 21:39:30 -07:00

225 lines
8.3 KiB
Python

"""Sprint 3 tests: cron API, skills API, memory API, input validation."""
import json, uuid, urllib.request, urllib.error
from tests._pytest_port import BASE
def get(path):
with urllib.request.urlopen(BASE + path, timeout=10) as r:
return json.loads(r.read()), r.status
def post(path, body=None):
data = json.dumps(body or {}).encode()
req = urllib.request.Request(BASE + path, data=data, headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=10) as r:
return json.loads(r.read()), r.status
except urllib.error.HTTPError as e:
return json.loads(e.read()), e.code
def make_session_tracked(created_list, ws=None):
"""Create a session and register it with the cleanup fixture."""
import pathlib as _pathlib
body = {}
if ws: body["workspace"] = str(ws)
d, _ = post("/api/session/new", body)
sid = d["session"]["session_id"]
created_list.append(sid)
return sid, _pathlib.Path(d["session"]["workspace"])
def test_crons_list():
data, status = get("/api/crons")
assert status == 200
assert "jobs" in data
def test_crons_list_has_required_fields():
data, _ = get("/api/crons")
if not data["jobs"]: return
job = data["jobs"][0]
for field in ("id", "name", "prompt", "enabled", "schedule_display"):
assert field in job
def test_crons_output_requires_job_id():
try:
get("/api/crons/output")
assert False
except urllib.error.HTTPError as e:
assert e.code == 400
def test_crons_output_real_job():
data, _ = get("/api/crons")
if not data["jobs"]: return
job_id = data["jobs"][0]["id"]
out, status = get(f"/api/crons/output?job_id={job_id}&limit=3")
assert status == 200
assert "outputs" in out
def test_crons_pause_requires_job_id():
result, status = post("/api/crons/pause", {})
assert status in (400, 404)
def test_crons_resume_requires_job_id():
result, status = post("/api/crons/resume", {})
assert status in (400, 404)
def test_crons_run_nonexistent():
result, status = post("/api/crons/run", {"job_id": "doesnotexist999"})
assert status == 404
def test_skills_list():
data, status = get("/api/skills")
assert status == 200
assert len(data["skills"]) > 0
def test_skills_list_has_required_fields():
data, _ = get("/api/skills")
skill = data["skills"][0]
assert "name" in skill and "description" in skill
def test_skills_content_known():
data, status = get("/api/skills/content?name=dogfood")
assert status == 200
assert len(data["content"]) > 0
def test_skills_content_requires_name():
try:
get("/api/skills/content")
assert False
except urllib.error.HTTPError as e:
assert e.code == 400
def test_skills_search_returns_subset():
data, _ = get("/api/skills")
assert len(data["skills"]) > 5
def test_memory_returns_both_files():
data, status = get("/api/memory")
assert status == 200
assert "memory" in data and "user" in data
def test_memory_content_is_string():
data, _ = get("/api/memory")
assert isinstance(data["memory"], str)
assert isinstance(data["user"], str)
def test_memory_has_mtime():
data, _ = get("/api/memory")
assert "memory_mtime" in data and "user_mtime" in data
def test_session_update_requires_session_id():
result, status = post("/api/session/update", {"model": "openai/gpt-5.4-mini"})
assert status == 400
def test_session_delete_requires_session_id():
result, status = post("/api/session/delete", {})
assert status == 400
def test_session_delete_rejects_absolute_path_payload(tmp_path):
victim = tmp_path / "victim.json"
victim.write_text("TOPSECRET", encoding="utf-8")
result, status = post("/api/session/delete", {"session_id": str(victim.with_suffix(""))})
assert status == 400
assert victim.exists(), "absolute-path payload must not delete arbitrary files"
def test_session_delete_rejects_traversal_payload(tmp_path):
victim = tmp_path / "outside.json"
victim.write_text("TOPSECRET", encoding="utf-8")
traversal = f"../../../../{victim.with_suffix('').as_posix().lstrip('/')}"
result, status = post("/api/session/delete", {"session_id": traversal})
assert status == 400
assert victim.exists(), "traversal payload must not delete arbitrary files"
def test_chat_start_requires_session_id():
result, status = post("/api/chat/start", {"message": "hello"})
assert status == 400
def test_chat_start_requires_message(cleanup_test_sessions):
sid, _ = make_session_tracked(cleanup_test_sessions)
result, status = post("/api/chat/start", {"session_id": sid, "message": ""})
assert status == 400
def test_session_update_unknown_id_returns_404():
result, status = post("/api/session/update", {"session_id": "nosuchsession", "model": "openai/gpt-5.4-mini"})
assert status == 404
def test_session_update_rejects_workspace_outside_trusted_root(tmp_path):
d, _ = post("/api/session/new", {})
sid = d["session"]["session_id"]
outside = tmp_path / "outside"
outside.mkdir(parents=True, exist_ok=True)
result, status = post("/api/session/update", {"session_id": sid, "workspace": str(outside)})
assert status == 400
assert "outside" in result.get("error", "").lower()
def test_chat_start_rejects_workspace_outside_trusted_root(tmp_path):
d, _ = post("/api/session/new", {})
sid = d["session"]["session_id"]
outside = tmp_path / "outside-chat"
outside.mkdir(parents=True, exist_ok=True)
result, status = post("/api/chat/start", {"session_id": sid, "message": "hello", "workspace": str(outside)})
assert status == 400
assert "outside" in result.get("error", "").lower()
def test_workspace_add_allows_external_valid_paths(tmp_path):
"""Adding a path outside home is now allowed when the user explicitly provides it.
The strict trust check (resolve_trusted_workspace) is only applied when *using*
an existing workspace, not when registering a new one (validate_workspace_to_add)."""
outside = tmp_path / "outside-add"
outside.mkdir(parents=True, exist_ok=True)
result, status = post("/api/workspaces/add", {"path": str(outside), "name": "Outside"})
# Explicit registration of an external path is now allowed
assert status == 200, f"Expected 200, got {status}: {result}"
# Verify it was actually saved
wss_result, ws_status = get("/api/workspaces")
paths = [w["path"] for w in wss_result.get("workspaces", [])]
assert str(outside.resolve()) in paths
def test_workspace_add_rejects_system_paths():
"""System paths (/, /etc, /sys) are always rejected even with the relaxed add validation."""
for path in ("/etc", "/private/etc"):
_, status = post("/api/workspaces/add", {"path": path, "name": "System"})
assert status == 400, f"{path} should be rejected"
def test_legacy_chat_rejects_workspace_outside_trusted_root(tmp_path):
"""Legacy /api/chat must use the same trusted workspace validation as /api/chat/start."""
d, _ = post("/api/session/new", {})
sid = d["session"]["session_id"]
outside = tmp_path / "outside-legacy-chat"
outside.mkdir(parents=True, exist_ok=True)
result, status = post("/api/chat", {"session_id": sid, "message": "hello", "workspace": str(outside)})
assert status == 400
assert "outside" in result.get("error", "").lower()
def test_session_new_rejects_workspace_outside_trusted_root(tmp_path):
outside = tmp_path / "outside-new"
outside.mkdir(parents=True, exist_ok=True)
result, status = post("/api/session/new", {"workspace": str(outside)})
assert status == 400
assert "outside" in result.get("error", "").lower()
def test_session_search_returns_matches(cleanup_test_sessions):
sid, _ = make_session_tracked(cleanup_test_sessions)
post("/api/session/rename", {"session_id": sid, "title": f"unique-s3-{sid}"})
data, status = get(f"/api/sessions/search?q=unique-s3-{sid}")
assert status == 200
sids = [s["session_id"] for s in data["sessions"]]
assert sid in sids
def test_session_search_empty_query_returns_all():
data, status = get("/api/sessions/search?q=")
assert status == 200 and "sessions" in data
def test_session_search_no_results():
data, status = get("/api/sessions/search?q=zzznomatchzzz9999")
assert status == 200 and data["sessions"] == []