mirror of
https://github.com/pocketpaw/pocketpaw.git
synced 2026-05-21 01:04:57 +00:00
* docs: add agent status API & CLI design Design for GET /api/v1/agent/status endpoint, SSE stream, and pocketpaw status CLI command. Exposes real-time agent state (idle/thinking/tool_running/streaming/error) for external integrations. * docs: add agent status API implementation plan 9-task plan covering StatusTracker, REST endpoint, SSE stream, CLI command, auth, tests, and lifecycle wiring. * feat: add agent status API, SSE stream, and CLI command Adds a public status endpoint for external integrations (stream decks, LED indicators, desktop widgets) to monitor PocketPaw agent state. - GET /api/v1/agent/status returns global state (idle/active/degraded) and per-session breakdown (thinking/tool_running/streaming/error) - GET /api/v1/agent/status/stream pushes SSE events on state changes - pocketpaw status CLI with --json and --watch flags - Optional API key auth via POCKETPAW_STATUS_API_KEY / X-Status-Key - StatusTracker subscribes to bus events, no internal coupling - 22 tests covering state transitions, auth, and CLI formatting * fix(tests): update router count to 24 and fix lint in test files Update test_v1_routers_count assertion from 23 to 24 for the new agent_status router. Reformat test files and fix UP038 lint error. * fix: address review issues in agent status API - Cache status API key to avoid Settings.load() on every request - Add client disconnect detection in SSE stream loop - Fix wait_for_change race condition with version-based tracking - Wire response_model=AgentStatusResponse and fix schema alias mismatch - Move lifecycle registration from module scope to startup_event - Extract session title enrichment into dedicated method - Update tests for new caching and version tracking * fix: emit agent_start/end events, fix SSE stream spamming, add docs - AgentLoop now emits agent_start before processing and agent_end on completion/error so StatusTracker actually tracks sessions - Skip redundant thinking notifications when state is already thinking - Deduplicate SSE stream using state fingerprints (ignores timing fields) - Increase SSE debounce from 200ms to 1s to coalesce rapid tool events - Add API docs for GET /agent/status and GET /agent/status/stream
135 lines
5.6 KiB
Python
135 lines
5.6 KiB
Python
"""Tests for StatusTracker."""
|
|
|
|
import pytest
|
|
|
|
from pocketpaw.bus.events import SystemEvent
|
|
from pocketpaw.status import StatusTracker
|
|
|
|
|
|
@pytest.fixture
|
|
def tracker():
|
|
return StatusTracker(max_concurrent=3)
|
|
|
|
|
|
class TestStatusTracker:
|
|
async def test_idle_by_default(self, tracker):
|
|
snap = tracker.snapshot()
|
|
assert snap["global"]["state"] == "idle"
|
|
assert snap["global"]["active_sessions"] == 0
|
|
assert snap["sessions"] == []
|
|
|
|
async def test_agent_start_creates_session(self, tracker):
|
|
await tracker._on_event(
|
|
SystemEvent(event_type="agent_start", data={"session_key": "websocket:abc"})
|
|
)
|
|
snap = tracker.snapshot()
|
|
assert snap["global"]["state"] == "active"
|
|
assert snap["global"]["active_sessions"] == 1
|
|
assert snap["sessions"][0]["session_key"] == "websocket:abc"
|
|
assert snap["sessions"][0]["channel"] == "websocket"
|
|
assert snap["sessions"][0]["session_id"] == "abc"
|
|
|
|
async def test_thinking_state(self, tracker):
|
|
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
|
|
await tracker._on_event(SystemEvent(event_type="thinking", data={"session_key": "ws:1"}))
|
|
snap = tracker.snapshot()
|
|
assert snap["sessions"][0]["state"] == "thinking"
|
|
|
|
async def test_tool_running_state(self, tracker):
|
|
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
|
|
await tracker._on_event(
|
|
SystemEvent(
|
|
event_type="tool_start",
|
|
data={"session_key": "ws:1", "name": "bash"},
|
|
)
|
|
)
|
|
snap = tracker.snapshot()
|
|
assert snap["sessions"][0]["state"] == "tool_running"
|
|
assert snap["sessions"][0]["tool_name"] == "bash"
|
|
|
|
async def test_tool_result_transitions_to_streaming(self, tracker):
|
|
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
|
|
await tracker._on_event(
|
|
SystemEvent(event_type="tool_start", data={"session_key": "ws:1", "name": "bash"})
|
|
)
|
|
await tracker._on_event(SystemEvent(event_type="tool_result", data={"session_key": "ws:1"}))
|
|
snap = tracker.snapshot()
|
|
assert snap["sessions"][0]["state"] == "streaming"
|
|
assert snap["sessions"][0]["tool_name"] is None
|
|
|
|
async def test_error_state_sets_degraded(self, tracker):
|
|
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
|
|
await tracker._on_event(
|
|
SystemEvent(
|
|
event_type="error",
|
|
data={"session_key": "ws:1", "message": "Rate limit"},
|
|
)
|
|
)
|
|
snap = tracker.snapshot()
|
|
assert snap["global"]["state"] == "degraded"
|
|
assert snap["sessions"][0]["state"] == "error"
|
|
assert snap["sessions"][0]["error_message"] == "Rate limit"
|
|
|
|
async def test_agent_end_removes_session(self, tracker):
|
|
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
|
|
await tracker._on_event(SystemEvent(event_type="agent_end", data={"session_key": "ws:1"}))
|
|
snap = tracker.snapshot()
|
|
assert snap["global"]["state"] == "idle"
|
|
assert snap["sessions"] == []
|
|
|
|
async def test_waiting_for_user_state(self, tracker):
|
|
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
|
|
await tracker._on_event(
|
|
SystemEvent(event_type="ask_user_question", data={"session_key": "ws:1"})
|
|
)
|
|
snap = tracker.snapshot()
|
|
assert snap["sessions"][0]["state"] == "waiting_for_user"
|
|
|
|
async def test_token_usage_accumulates(self, tracker):
|
|
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
|
|
await tracker._on_event(
|
|
SystemEvent(
|
|
event_type="token_usage",
|
|
data={"session_key": "ws:1", "input": 100, "output": 50},
|
|
)
|
|
)
|
|
await tracker._on_event(
|
|
SystemEvent(
|
|
event_type="token_usage",
|
|
data={"session_key": "ws:1", "input": 200, "output": 80},
|
|
)
|
|
)
|
|
snap = tracker.snapshot()
|
|
assert snap["sessions"][0]["token_usage"] == {"input": 300, "output": 130}
|
|
|
|
async def test_max_concurrent_in_snapshot(self, tracker):
|
|
snap = tracker.snapshot()
|
|
assert snap["global"]["max_concurrent"] == 3
|
|
|
|
async def test_ignores_events_without_session_key(self, tracker):
|
|
await tracker._on_event(SystemEvent(event_type="thinking", data={}))
|
|
snap = tracker.snapshot()
|
|
assert snap["global"]["state"] == "idle"
|
|
|
|
async def test_multiple_sessions(self, tracker):
|
|
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
|
|
await tracker._on_event(
|
|
SystemEvent(event_type="agent_start", data={"session_key": "discord:2"})
|
|
)
|
|
snap = tracker.snapshot()
|
|
assert snap["global"]["active_sessions"] == 2
|
|
assert len(snap["sessions"]) == 2
|
|
|
|
async def test_degraded_with_mixed_states(self, tracker):
|
|
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
|
|
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:2"}))
|
|
await tracker._on_event(
|
|
SystemEvent(
|
|
event_type="error",
|
|
data={"session_key": "ws:1", "message": "fail"},
|
|
)
|
|
)
|
|
snap = tracker.snapshot()
|
|
assert snap["global"]["state"] == "degraded"
|
|
assert snap["global"]["active_sessions"] == 2
|