mirror of
https://github.com/pocketpaw/pocketpaw.git
synced 2026-05-21 01:04:57 +00:00
Remove /api/qr and /api/v1/qr from exempt_paths in auth middleware so the QR endpoint can no longer be hit without a valid session. Previously any network-reachable client could GET /api/qr, decode the PNG, and extract a fully valid 1-hour session token — a complete auth bypass (OWASP A01 — Broken Access Control). Changes: - Remove /api/qr and /api/v1/qr from exempt_paths in dashboard_auth.py - Reduce QR pairing token TTL from 1 hour to 60 seconds - Add ttl_seconds param to create_session_token() for short-lived tokens - Add audit log event on QR code generation - Update v1 QR endpoint (/api/v1/auth.py) with matching fix - Update tests: unauthenticated /api/qr now returns 401 - Update docs to reflect auth requirement Fixes #854
120 lines
4.8 KiB
Python
120 lines
4.8 KiB
Python
# E2E Tests for Security Features
|
|
# Created: 2026-02-06
|
|
#
|
|
# Tests WebSocket authentication changes:
|
|
# - Auth via first message (new preferred method)
|
|
# - Legacy query-param auth still works
|
|
# - Unauthenticated non-localhost connections are rejected
|
|
# - wss:// upgrade hint in frontend JS
|
|
#
|
|
# Run with: pytest tests/e2e/test_security.py -v --headed
|
|
|
|
|
|
from playwright.sync_api import Page, expect
|
|
|
|
|
|
class TestWebSocketAuth:
|
|
"""Tests for WebSocket authentication via first message."""
|
|
|
|
def test_websocket_connects_on_localhost(self, page: Page, dashboard_url: str):
|
|
"""Localhost connections should succeed without explicit auth."""
|
|
page.goto(dashboard_url)
|
|
|
|
# Wait for WebSocket connection — check for the "Connected" console log
|
|
# or the connection_info message that arrives on successful connect
|
|
page.wait_for_timeout(2000)
|
|
|
|
# The chat view should be functional (WebSocket connected)
|
|
chat_input = page.locator("textarea, input[type='text']").first
|
|
expect(chat_input).to_be_visible(timeout=5000)
|
|
|
|
def test_websocket_sends_auth_first_message(self, page: Page, dashboard_url: str):
|
|
"""Frontend should send authenticate action as first WebSocket message."""
|
|
# Collect console logs to verify auth message was sent
|
|
ws_logs = []
|
|
page.on("console", lambda msg: ws_logs.append(msg.text))
|
|
|
|
page.goto(dashboard_url)
|
|
page.wait_for_timeout(2000)
|
|
|
|
# Check that WS connected log appears (auth succeeded on localhost)
|
|
connected = any("[WS] Connected" in log for log in ws_logs)
|
|
assert connected, f"WebSocket did not connect. Logs: {ws_logs[:10]}"
|
|
|
|
def test_websocket_protocol_auto_detection(self, page: Page, dashboard_url: str):
|
|
"""Frontend JS should use ws:// for http:// and wss:// for https://."""
|
|
# Evaluate the frontend logic directly
|
|
result = page.evaluate("""
|
|
() => {
|
|
// Simulate what websocket.js does
|
|
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
|
|
return {
|
|
pageProtocol: window.location.protocol,
|
|
wsProtocol: protocol
|
|
};
|
|
}
|
|
""")
|
|
|
|
# Dashboard runs on http:// in tests, so ws:// is expected
|
|
assert result["pageProtocol"] == "http:"
|
|
assert result["wsProtocol"] == "ws:"
|
|
|
|
def test_no_token_in_websocket_url(self, page: Page, dashboard_url: str):
|
|
"""Token must NOT appear in WebSocket URL (moved to first message)."""
|
|
ws_urls = []
|
|
|
|
# Listen for WebSocket connections
|
|
page.on("websocket", lambda ws: ws_urls.append(ws.url))
|
|
|
|
page.goto(dashboard_url)
|
|
page.wait_for_timeout(2000)
|
|
|
|
# Verify at least one WebSocket connection was made
|
|
assert len(ws_urls) > 0, "No WebSocket connections observed"
|
|
|
|
# Verify no token in the URL
|
|
for url in ws_urls:
|
|
assert "token=" not in url, f"Token leaked in WebSocket URL: {url}"
|
|
|
|
def test_chat_works_after_auth(self, page: Page, dashboard_url: str):
|
|
"""After WebSocket auth, chat messages should flow correctly."""
|
|
page.goto(dashboard_url)
|
|
page.wait_for_timeout(2000)
|
|
|
|
# Find chat input and type a message
|
|
chat_input = page.locator("textarea, input[type='text']").first
|
|
if chat_input.is_visible():
|
|
chat_input.fill("Hello test")
|
|
chat_input.press("Enter")
|
|
|
|
# Wait for any response (stream_start, message, or error)
|
|
page.wait_for_timeout(2000)
|
|
|
|
# The message should appear in the chat area (even if agent errors out)
|
|
# We just need to verify the WebSocket round-trip works
|
|
page.locator("text=Hello test").first.wait_for(timeout=5000)
|
|
|
|
|
|
class TestSecurityHeaders:
|
|
"""Tests for security-related HTTP behavior."""
|
|
|
|
def test_api_requires_auth(self, page: Page, dashboard_url: str):
|
|
"""Protected API endpoints should return 401 without auth."""
|
|
response = page.request.get(f"{dashboard_url}/api/identity")
|
|
assert response.status == 401
|
|
|
|
def test_qr_endpoint_requires_auth(self, page: Page, dashboard_url: str):
|
|
"""QR endpoint must require authentication (fixes #854)."""
|
|
response = page.request.get(f"{dashboard_url}/api/qr")
|
|
assert response.status == 401
|
|
|
|
def test_static_files_accessible(self, page: Page, dashboard_url: str):
|
|
"""Static files should be accessible without auth."""
|
|
response = page.request.get(f"{dashboard_url}/static/js/app.js")
|
|
assert response.status == 200
|
|
|
|
def test_index_page_accessible(self, page: Page, dashboard_url: str):
|
|
"""Index page must load without auth (serves the HTML shell)."""
|
|
response = page.request.get(dashboard_url)
|
|
assert response.status == 200
|