Files
pocketpaw/tests/test_status_tracker.py
Rohit Kushwaha 5adeae1f9c feat: agent status API, SSE stream, and CLI command (#586)
* docs: add agent status API & CLI design

Design for GET /api/v1/agent/status endpoint, SSE stream, and
pocketpaw status CLI command. Exposes real-time agent state
(idle/thinking/tool_running/streaming/error) for external integrations.

* docs: add agent status API implementation plan

9-task plan covering StatusTracker, REST endpoint, SSE stream,
CLI command, auth, tests, and lifecycle wiring.

* feat: add agent status API, SSE stream, and CLI command

Adds a public status endpoint for external integrations (stream decks,
LED indicators, desktop widgets) to monitor PocketPaw agent state.

- GET /api/v1/agent/status returns global state (idle/active/degraded)
  and per-session breakdown (thinking/tool_running/streaming/error)
- GET /api/v1/agent/status/stream pushes SSE events on state changes
- pocketpaw status CLI with --json and --watch flags
- Optional API key auth via POCKETPAW_STATUS_API_KEY / X-Status-Key
- StatusTracker subscribes to bus events, no internal coupling
- 22 tests covering state transitions, auth, and CLI formatting

* fix(tests): update router count to 24 and fix lint in test files

Update test_v1_routers_count assertion from 23 to 24 for the new
agent_status router. Reformat test files and fix UP038 lint error.

* fix: address review issues in agent status API

- Cache status API key to avoid Settings.load() on every request
- Add client disconnect detection in SSE stream loop
- Fix wait_for_change race condition with version-based tracking
- Wire response_model=AgentStatusResponse and fix schema alias mismatch
- Move lifecycle registration from module scope to startup_event
- Extract session title enrichment into dedicated method
- Update tests for new caching and version tracking

* fix: emit agent_start/end events, fix SSE stream spamming, add docs

- AgentLoop now emits agent_start before processing and agent_end on
  completion/error so StatusTracker actually tracks sessions
- Skip redundant thinking notifications when state is already thinking
- Deduplicate SSE stream using state fingerprints (ignores timing fields)
- Increase SSE debounce from 200ms to 1s to coalesce rapid tool events
- Add API docs for GET /agent/status and GET /agent/status/stream
2026-03-14 10:14:12 +05:30

135 lines
5.6 KiB
Python

"""Tests for StatusTracker."""
import pytest
from pocketpaw.bus.events import SystemEvent
from pocketpaw.status import StatusTracker
@pytest.fixture
def tracker():
return StatusTracker(max_concurrent=3)
class TestStatusTracker:
async def test_idle_by_default(self, tracker):
snap = tracker.snapshot()
assert snap["global"]["state"] == "idle"
assert snap["global"]["active_sessions"] == 0
assert snap["sessions"] == []
async def test_agent_start_creates_session(self, tracker):
await tracker._on_event(
SystemEvent(event_type="agent_start", data={"session_key": "websocket:abc"})
)
snap = tracker.snapshot()
assert snap["global"]["state"] == "active"
assert snap["global"]["active_sessions"] == 1
assert snap["sessions"][0]["session_key"] == "websocket:abc"
assert snap["sessions"][0]["channel"] == "websocket"
assert snap["sessions"][0]["session_id"] == "abc"
async def test_thinking_state(self, tracker):
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
await tracker._on_event(SystemEvent(event_type="thinking", data={"session_key": "ws:1"}))
snap = tracker.snapshot()
assert snap["sessions"][0]["state"] == "thinking"
async def test_tool_running_state(self, tracker):
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
await tracker._on_event(
SystemEvent(
event_type="tool_start",
data={"session_key": "ws:1", "name": "bash"},
)
)
snap = tracker.snapshot()
assert snap["sessions"][0]["state"] == "tool_running"
assert snap["sessions"][0]["tool_name"] == "bash"
async def test_tool_result_transitions_to_streaming(self, tracker):
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
await tracker._on_event(
SystemEvent(event_type="tool_start", data={"session_key": "ws:1", "name": "bash"})
)
await tracker._on_event(SystemEvent(event_type="tool_result", data={"session_key": "ws:1"}))
snap = tracker.snapshot()
assert snap["sessions"][0]["state"] == "streaming"
assert snap["sessions"][0]["tool_name"] is None
async def test_error_state_sets_degraded(self, tracker):
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
await tracker._on_event(
SystemEvent(
event_type="error",
data={"session_key": "ws:1", "message": "Rate limit"},
)
)
snap = tracker.snapshot()
assert snap["global"]["state"] == "degraded"
assert snap["sessions"][0]["state"] == "error"
assert snap["sessions"][0]["error_message"] == "Rate limit"
async def test_agent_end_removes_session(self, tracker):
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
await tracker._on_event(SystemEvent(event_type="agent_end", data={"session_key": "ws:1"}))
snap = tracker.snapshot()
assert snap["global"]["state"] == "idle"
assert snap["sessions"] == []
async def test_waiting_for_user_state(self, tracker):
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
await tracker._on_event(
SystemEvent(event_type="ask_user_question", data={"session_key": "ws:1"})
)
snap = tracker.snapshot()
assert snap["sessions"][0]["state"] == "waiting_for_user"
async def test_token_usage_accumulates(self, tracker):
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
await tracker._on_event(
SystemEvent(
event_type="token_usage",
data={"session_key": "ws:1", "input": 100, "output": 50},
)
)
await tracker._on_event(
SystemEvent(
event_type="token_usage",
data={"session_key": "ws:1", "input": 200, "output": 80},
)
)
snap = tracker.snapshot()
assert snap["sessions"][0]["token_usage"] == {"input": 300, "output": 130}
async def test_max_concurrent_in_snapshot(self, tracker):
snap = tracker.snapshot()
assert snap["global"]["max_concurrent"] == 3
async def test_ignores_events_without_session_key(self, tracker):
await tracker._on_event(SystemEvent(event_type="thinking", data={}))
snap = tracker.snapshot()
assert snap["global"]["state"] == "idle"
async def test_multiple_sessions(self, tracker):
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
await tracker._on_event(
SystemEvent(event_type="agent_start", data={"session_key": "discord:2"})
)
snap = tracker.snapshot()
assert snap["global"]["active_sessions"] == 2
assert len(snap["sessions"]) == 2
async def test_degraded_with_mixed_states(self, tracker):
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:1"}))
await tracker._on_event(SystemEvent(event_type="agent_start", data={"session_key": "ws:2"}))
await tracker._on_event(
SystemEvent(
event_type="error",
data={"session_key": "ws:1", "message": "fail"},
)
)
snap = tracker.snapshot()
assert snap["global"]["state"] == "degraded"
assert snap["global"]["active_sessions"] == 2