Files
pocketpaw/tests/test_api_chat.py
Rohit Kushwaha 0c3ebf1326 fix(ci): unblock dev-inherited CI failures
These failures already exist on `dev` HEAD — rolling the fixes into this
branch so the salvage PR can go green instead of waiting on a separate
cleanup PR.

- tests/test_api_chat.py — replace `asyncio.get_event_loop()` with
  `asyncio.new_event_loop()`. Python 3.12+ removed the implicit
  loop-creation behavior in the main thread, breaking all three
  `test_stream_*` cases with RuntimeError.
- ee/widget/projection.py — drop unused `field` from `dataclasses`
  import (F401).
- tests/test_dos_hardening.py, tests/test_logging_scrub.py — drop
  unused `pytest` imports (F401).
- src/pocketpaw/dashboard.py — remove two duplicated
  `lifespan`/`startup_event`/`shutdown_event` definitions (F811).
  The upper pair was shadowed by the lower redefinitions; the
  post-CORS duplicate was fully dead.
- src/pocketpaw/api/v1/auth.py — move `http_utils` import above
  `router = APIRouter(...)` to satisfy E402.
2026-04-22 06:13:22 +05:30

186 lines
6.6 KiB
Python

# Tests for API v1 chat router.
# Created: 2026-02-20
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from pocketpaw.api.v1.chat import _active_streams, _APISessionBridge, router
@pytest.fixture
def test_app():
app = FastAPI()
app.include_router(router, prefix="/api/v1")
return app
@pytest.fixture
def client(test_app):
return TestClient(test_app)
class TestAPISessionBridge:
"""Tests for the _APISessionBridge class."""
@pytest.mark.asyncio
async def test_bridge_creation(self):
bridge = _APISessionBridge("test-chat-123")
assert bridge.chat_id == "test-chat-123"
assert bridge.queue is not None
@pytest.mark.asyncio
async def test_bridge_queue_put_get(self):
bridge = _APISessionBridge("test")
await bridge.queue.put({"event": "chunk", "data": {"content": "hello"}})
event = await bridge.queue.get()
assert event["event"] == "chunk"
assert event["data"]["content"] == "hello"
class TestChatStream:
"""Tests for POST /api/v1/chat/stream SSE endpoint."""
@patch("pocketpaw.api.v1.chat._send_message")
@patch("pocketpaw.api.v1.chat._APISessionBridge")
def test_stream_returns_sse(self, mock_bridge_cls, mock_send, client):
# Set up mock bridge
bridge = MagicMock()
q = asyncio.Queue()
bridge.queue = q
bridge.start = AsyncMock()
bridge.stop = AsyncMock()
mock_bridge_cls.return_value = bridge
mock_send.return_value = "api:test123"
# Pre-load events into the queue
async def _load():
await q.put({"event": "chunk", "data": {"content": "Hello "}})
await q.put({"event": "chunk", "data": {"content": "world"}})
await q.put({"event": "stream_end", "data": {"session_id": "api:test123", "usage": {}}})
asyncio.new_event_loop().run_until_complete(_load())
with client.stream(
"POST",
"/api/v1/chat/stream",
json={"content": "Hello"},
) as resp:
assert resp.status_code == 200
assert "text/event-stream" in resp.headers.get("content-type", "")
events = list(resp.iter_lines())
# Should have stream_start, chunks, and stream_end
event_text = "\n".join(events)
assert "stream_start" in event_text
class TestChatStop:
"""Tests for POST /api/v1/chat/stop."""
def test_stop_no_session_id(self, client):
resp = client.post("/api/v1/chat/stop")
assert resp.status_code == 400
def test_stop_nonexistent_session(self, client):
resp = client.post("/api/v1/chat/stop?session_id=nonexistent")
assert resp.status_code == 404
def test_stop_active_stream(self, client):
event = asyncio.Event()
_active_streams["test-sess"] = event
try:
resp = client.post("/api/v1/chat/stop?session_id=test-sess")
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
assert event.is_set()
finally:
_active_streams.pop("test-sess", None)
class TestChatSend:
"""Tests for POST /api/v1/chat (non-streaming)."""
@patch("pocketpaw.api.v1.chat._send_message")
@patch("pocketpaw.api.v1.chat._APISessionBridge")
def test_send_returns_complete_response(self, mock_bridge_cls, mock_send, client):
bridge = MagicMock()
q = asyncio.Queue()
bridge.queue = q
bridge.chat_id = "api:test"
bridge.start = AsyncMock()
bridge.stop = AsyncMock()
mock_bridge_cls.return_value = bridge
mock_send.return_value = "api:test"
# Load events
async def _load():
await q.put({"event": "chunk", "data": {"content": "Hello "}})
await q.put({"event": "chunk", "data": {"content": "world!"}})
await q.put(
{"event": "stream_end", "data": {"session_id": "api:test", "usage": {"tokens": 10}}}
)
asyncio.new_event_loop().run_until_complete(_load())
resp = client.post("/api/v1/chat", json={"content": "Hi", "session_id": "api:test"})
assert resp.status_code == 200
data = resp.json()
assert data["content"] == "Hello world!"
assert data["session_id"] == "websocket_api:test"
def test_send_empty_content(self, client):
resp = client.post("/api/v1/chat", json={"content": ""})
assert resp.status_code == 422 # Pydantic validation
class TestSSEFormat:
"""Validate that SSE events follow the spec (event: <type>\\ndata: <json>\\n\\n)."""
@patch("pocketpaw.api.v1.chat._send_message")
@patch("pocketpaw.api.v1.chat._APISessionBridge")
def test_sse_events_are_valid_format(self, mock_bridge_cls, mock_send, client):
"""Each SSE event must have 'event:' and 'data:' lines with valid JSON data."""
import json as _json
bridge = MagicMock()
q = asyncio.Queue()
bridge.queue = q
bridge.start = AsyncMock()
bridge.stop = AsyncMock()
mock_bridge_cls.return_value = bridge
mock_send.return_value = "api:sse-test"
async def _load():
await q.put({"event": "chunk", "data": {"content": "hi"}})
await q.put(
{"event": "stream_end", "data": {"session_id": "api:sse-test", "usage": {}}}
)
asyncio.new_event_loop().run_until_complete(_load())
with client.stream("POST", "/api/v1/chat/stream", json={"content": "test"}) as resp:
assert resp.status_code == 200
raw = resp.read().decode()
# Split SSE events by double newlines
events = [e.strip() for e in raw.split("\n\n") if e.strip()]
assert len(events) >= 2, f"Expected at least 2 SSE events, got {len(events)}: {raw!r}"
for event_block in events:
lines = event_block.split("\n")
event_line = next((line for line in lines if line.startswith("event:")), None)
data_line = next((line for line in lines if line.startswith("data:")), None)
assert event_line is not None, f"Missing 'event:' line in SSE block: {event_block!r}"
assert data_line is not None, f"Missing 'data:' line in SSE block: {event_block!r}"
event_type = event_line.split(":", 1)[1].strip()
assert event_type, f"Empty event type in: {event_block!r}"
data_str = data_line.split(":", 1)[1].strip()
parsed = _json.loads(data_str)
assert isinstance(parsed, dict), f"SSE data must be a JSON object, got: {type(parsed)}"