[FI] Fix path traversal in resume_session on WebSocket connect (#516)

* fix(security): add path traversal check to resume_session on WS connect

The switch_session action in dashboard_ws.py validates that the session
file path stays under the sessions directory using
session_file.resolve().relative_to(sessions_dir.resolve()). However,
the resume_session code path on initial WebSocket connect was missing
this same check — it constructed a file path from the untrusted
resume_session query parameter and only checked session_file.exists().

An attacker could craft a resume_session value like
'websocket_../../<path>' to escape the sessions directory and
potentially read arbitrary JSON files or pollute session state.

This commit adds the same resolve().relative_to() validation to the
resume_session path, logging a warning and falling back to a fresh
session when traversal is detected.

Also adds tests for path traversal rejection in both resume_session
and switch_session, and fixes rate limiter interference in WS tests.

* test: assert valid UUID format in resume_session traversal test

Address review feedback — verify the returned session_id is a valid
UUID after path traversal rejection, providing a stronger guarantee
that a fresh session was created.

* test: use escaping traversal payloads with decoy files

Address review feedback — the previous payload websocket_../../etc/passwd
did not actually escape sessions_dir because 'websocket_..' is a single
directory name. Switched to websocket_x/../../escaped which creates a
real path segment that walks above sessions_dir. Both tests now place a
decoy JSON file at the escaped target so they would fail if the guard
were removed.
This commit is contained in:
Divyanshu Sharma
2026-03-10 20:59:05 +05:30
committed by GitHub
parent 9247164e89
commit 11fa864bb0
2 changed files with 97 additions and 7 deletions

View File

@@ -168,13 +168,18 @@ async def websocket_handler(
parts = resume_session.split("_", 1)
if len(parts) == 2 and parts[0] == "websocket":
raw_id = parts[1]
# Verify session file exists
session_file = (
Path.home() / ".pocketpaw" / "memory" / "sessions" / f"{resume_session}.json"
)
if session_file.exists():
chat_id = raw_id
resumed = True
# Verify session file exists and path stays under sessions dir
sessions_dir = Path.home() / ".pocketpaw" / "memory" / "sessions"
session_file = sessions_dir / f"{resume_session}.json"
try:
session_file.resolve().relative_to(sessions_dir.resolve())
except ValueError:
logger.warning("Path traversal attempt in resume_session: %s", resume_session)
resume_session = None # fall through to fresh session
else:
if session_file.exists():
chat_id = raw_id
resumed = True
await ws_adapter.register_connection(websocket, chat_id)

View File

@@ -362,6 +362,14 @@ class TestSessionsRESTEndpoints:
class TestWebSocketSessionSwitching:
"""Test WebSocket switch_session and new_session handlers."""
@pytest.fixture(autouse=True)
def _reset_rate_limiter(self):
"""Reset WS rate limiter between tests to avoid false rate-limit failures."""
from pocketpaw.security.rate_limiter import ws_limiter
ws_limiter.cleanup()
ws_limiter._buckets.clear()
@pytest.fixture
def client(self, _mock_auth):
from fastapi.testclient import TestClient
@@ -433,6 +441,83 @@ class TestWebSocketSessionSwitching:
if session_file.exists():
session_file.unlink()
def test_websocket_resume_session_path_traversal_blocked(self, client):
"""Path traversal in resume_session must be rejected (falls back to fresh session).
The payload ``websocket_x/../../escaped`` produces:
sessions_dir / "websocket_x" / ".." / ".." / "escaped.json"
which resolves one level above sessions_dir. A decoy file is placed
at that location so the test would *fail* if the guard were removed
(the session would be resumed instead of rejected).
"""
# Place a decoy file where the traversal would land
sessions_dir = Path.home() / ".pocketpaw" / "memory" / "sessions"
sessions_dir.mkdir(parents=True, exist_ok=True)
escaped_file = sessions_dir.parent / "websocket_x/../../escaped.json"
escaped_target = escaped_file.resolve()
escaped_target.parent.mkdir(parents=True, exist_ok=True)
escaped_target.write_text(
json.dumps(
[
{
"id": "1",
"role": "user",
"content": "leaked",
"timestamp": "2026-01-01T00:00:00",
}
]
)
)
traversal_key = "websocket_x/../../escaped"
try:
with client.websocket_connect(self._ws_url(f"resume_session={traversal_key}")) as ws:
conn_info = ws.receive_json()
assert conn_info["type"] == "connection_info"
# Should get a fresh session with a valid UUID, not the traversal path
session_id = conn_info["id"]
assert ".." not in session_id
assert session_id.startswith("websocket_")
raw_uuid = session_id.removeprefix("websocket_")
uuid.UUID(raw_uuid) # raises ValueError if not a valid UUID
finally:
escaped_target.unlink(missing_ok=True)
def test_websocket_switch_session_path_traversal_blocked(self, client):
"""Path traversal in switch_session must return empty history.
Same strategy as above: a decoy file is placed at the escaped
target so the test fails without the guard.
"""
sessions_dir = Path.home() / ".pocketpaw" / "memory" / "sessions"
sessions_dir.mkdir(parents=True, exist_ok=True)
escaped_file = sessions_dir.parent / "websocket_x/../../escaped.json"
escaped_target = escaped_file.resolve()
escaped_target.parent.mkdir(parents=True, exist_ok=True)
escaped_target.write_text(
json.dumps(
[
{
"id": "1",
"role": "user",
"content": "leaked",
"timestamp": "2026-01-01T00:00:00",
}
]
)
)
traversal_key = "websocket_x/../../escaped"
try:
with client.websocket_connect(self._ws_url()) as ws:
ws.receive_json() # connection_info
ws.send_json({"action": "switch_session", "session_id": traversal_key})
data = ws.receive_json()
assert data["type"] == "session_history"
assert data["messages"] == []
finally:
escaped_target.unlink(missing_ok=True)
# =========================================================================
# F1: FileMemoryStore.search_sessions