diff --git a/src/pocketpaw/dashboard_ws.py b/src/pocketpaw/dashboard_ws.py index a8b520e2..b540b743 100644 --- a/src/pocketpaw/dashboard_ws.py +++ b/src/pocketpaw/dashboard_ws.py @@ -168,13 +168,18 @@ async def websocket_handler( parts = resume_session.split("_", 1) if len(parts) == 2 and parts[0] == "websocket": raw_id = parts[1] - # Verify session file exists - session_file = ( - Path.home() / ".pocketpaw" / "memory" / "sessions" / f"{resume_session}.json" - ) - if session_file.exists(): - chat_id = raw_id - resumed = True + # Verify session file exists and path stays under sessions dir + sessions_dir = Path.home() / ".pocketpaw" / "memory" / "sessions" + session_file = sessions_dir / f"{resume_session}.json" + try: + session_file.resolve().relative_to(sessions_dir.resolve()) + except ValueError: + logger.warning("Path traversal attempt in resume_session: %s", resume_session) + resume_session = None # fall through to fresh session + else: + if session_file.exists(): + chat_id = raw_id + resumed = True await ws_adapter.register_connection(websocket, chat_id) diff --git a/tests/test_session_index.py b/tests/test_session_index.py index de46e626..8a2d7b34 100644 --- a/tests/test_session_index.py +++ b/tests/test_session_index.py @@ -362,6 +362,14 @@ class TestSessionsRESTEndpoints: class TestWebSocketSessionSwitching: """Test WebSocket switch_session and new_session handlers.""" + @pytest.fixture(autouse=True) + def _reset_rate_limiter(self): + """Reset WS rate limiter between tests to avoid false rate-limit failures.""" + from pocketpaw.security.rate_limiter import ws_limiter + + ws_limiter.cleanup() + ws_limiter._buckets.clear() + @pytest.fixture def client(self, _mock_auth): from fastapi.testclient import TestClient @@ -433,6 +441,83 @@ class TestWebSocketSessionSwitching: if session_file.exists(): session_file.unlink() + def test_websocket_resume_session_path_traversal_blocked(self, client): + """Path traversal in resume_session must be rejected (falls back to fresh session). + + The payload ``websocket_x/../../escaped`` produces: + sessions_dir / "websocket_x" / ".." / ".." / "escaped.json" + which resolves one level above sessions_dir. A decoy file is placed + at that location so the test would *fail* if the guard were removed + (the session would be resumed instead of rejected). + """ + # Place a decoy file where the traversal would land + sessions_dir = Path.home() / ".pocketpaw" / "memory" / "sessions" + sessions_dir.mkdir(parents=True, exist_ok=True) + escaped_file = sessions_dir.parent / "websocket_x/../../escaped.json" + escaped_target = escaped_file.resolve() + escaped_target.parent.mkdir(parents=True, exist_ok=True) + escaped_target.write_text( + json.dumps( + [ + { + "id": "1", + "role": "user", + "content": "leaked", + "timestamp": "2026-01-01T00:00:00", + } + ] + ) + ) + + traversal_key = "websocket_x/../../escaped" + try: + with client.websocket_connect(self._ws_url(f"resume_session={traversal_key}")) as ws: + conn_info = ws.receive_json() + assert conn_info["type"] == "connection_info" + # Should get a fresh session with a valid UUID, not the traversal path + session_id = conn_info["id"] + assert ".." not in session_id + assert session_id.startswith("websocket_") + raw_uuid = session_id.removeprefix("websocket_") + uuid.UUID(raw_uuid) # raises ValueError if not a valid UUID + finally: + escaped_target.unlink(missing_ok=True) + + def test_websocket_switch_session_path_traversal_blocked(self, client): + """Path traversal in switch_session must return empty history. + + Same strategy as above: a decoy file is placed at the escaped + target so the test fails without the guard. + """ + sessions_dir = Path.home() / ".pocketpaw" / "memory" / "sessions" + sessions_dir.mkdir(parents=True, exist_ok=True) + escaped_file = sessions_dir.parent / "websocket_x/../../escaped.json" + escaped_target = escaped_file.resolve() + escaped_target.parent.mkdir(parents=True, exist_ok=True) + escaped_target.write_text( + json.dumps( + [ + { + "id": "1", + "role": "user", + "content": "leaked", + "timestamp": "2026-01-01T00:00:00", + } + ] + ) + ) + + traversal_key = "websocket_x/../../escaped" + try: + with client.websocket_connect(self._ws_url()) as ws: + ws.receive_json() # connection_info + ws.send_json({"action": "switch_session", "session_id": traversal_key}) + data = ws.receive_json() + assert data["type"] == "session_history" + assert data["messages"] == [] + finally: + escaped_target.unlink(missing_ok=True) + # ========================================================================= # F1: FileMemoryStore.search_sessions