Files
pocketpaw/tests/test_stream_event.py
Rohit Kushwaha 58073cca3f feat(agents): multi-SDK backend architecture v2 (#243)
* feat(agents): add backend protocol, registry, and capability system

Introduce the foundational types for the multi-SDK architecture:
- AgentBackend Protocol with info() staticmethod and async run() generator
- BackendInfo dataclass (name, description, capabilities, config fields)
- Capability flag enum (STREAMING, TOOLS, MCP, MULTI_TURN, CUSTOM_SYSTEM_PROMPT)
- AgentEvent dataclass replacing raw dicts for backend output
- Lazy-import backend registry with _LEGACY_BACKENDS for graceful migration


* refactor(agents): update Claude SDK backend to new protocol

Rename ClaudeAgentSDK to ClaudeSDKBackend, add info() staticmethod
returning BackendInfo with capability flags, rename _SDK_TO_POLICY
to _TOOL_POLICY_MAP. Backward-compat alias preserved.


* refactor(agents): remove legacy backends

Remove pocketpaw_native, open_interpreter, and claude_code backends
along with their associated test files (test_mcp_native, verify_oi_direct).
These are replaced by the new multi-SDK backend architecture.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat(agents): add OpenAI Agents backend

Runner.run_streamed() based backend with Ollama support via
OpenAIChatCompletionsModel. Yields AgentEvent for streaming.


* feat(agents): add Google ADK backend with tool bridge

Native Google ADK SDK integration using LlmAgent + InMemoryRunner.
MCP support via McpToolset. tool_bridge.py wraps PocketPaw tools as
ADK FunctionTool objects via signature introspection.
Replaces the old gemini_cli subprocess wrapper.


* feat(agents): add OpenCode backend

Subprocess wrapper for the OpenCode Go binary.
Streams stdout/stderr as AgentEvent.


* feat(agents): add Codex CLI backend

Subprocess wrapper for the Codex CLI tool.
Supports streaming output as AgentEvent.


* feat(agents): add Copilot SDK backend

Microsoft Copilot SDK integration with streaming support.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor(agents): router uses registry, loop uses AgentEvent

Router now delegates to registry.get_backend_class() instead of
if/elif chain. AgentLoop consumes AgentEvent from backends
(event.type, event.content, event.metadata) instead of raw dicts.


* feat(config): add per-backend model and settings fields

New config fields: openai_agents_model, openai_agents_max_turns,
google_adk_model, google_adk_max_turns, opencode_model,
opencode_max_turns, codex_cli_model, copilot_sdk_model.
All added to Settings.save() dict.


* feat(dashboard): backend selector with capability badges

Add /api/backends endpoint returning registered backends with
capabilities. Dynamic dropdown in settings modal replaces hardcoded
backend list. Capability badges (streaming, tools, MCP, etc.)
displayed per backend. Frontend updated accordingly.


* refactor: update health, MCP, bootstrap for new backend system

Health checks reference new backend names. MCP manager updated for
registry-based backend detection. Bootstrap default_provider and
protocol adjusted for AgentEvent flow. CLI tools updated.


* test: update existing tests for architecture v2

Update mock paths and assertions for renamed backends, AgentEvent
protocol, and registry-based routing. Add test_channel_autostart.py
for dashboard channel auto-start behavior.


* chore(deps): add openai-agents, google-adk, and backend extras

New optional dependency groups: openai-agents, google-adk.
Updated uv.lock with resolved dependencies.


* feat: add stop button to cancel in-flight agent responses

Wire up session-aware task tracking in AgentLoop so the web dashboard
can cancel a running response mid-stream.

- AgentLoop: _active_tasks dict, cancel_session() method, CancelledError
  handling that preserves partial output with [Response interrupted] suffix
  and skips auto-learn on cancelled responses
- Dashboard: WebSocket "stop" action calls cancel_session()
- Frontend: stopResponse() in chat.js/websocket.js, send/stop button swap
  via Alpine x-show in chat.html

Closes #244


* feat: add /backend, /backends, /model, /tools slash commands

Enable users on messaging channels (Telegram, Discord, Slack, etc.) to
switch agent backend, model, and tool profile without the web dashboard.

- Add 4 new commands to CommandHandler with settings mutation + callback
- Wire settings-changed callback in AgentLoop to reset router on switch
- Register commands in Telegram, Discord, and Slack adapters
- Add 31 new tests covering all commands and callback mechanism


* feat(deps): add copilot-sdk to optional dependencies

* feat(backends): mark all non-Claude agent backends as beta

Add `beta` field to BackendInfo dataclass and set it for OpenAI Agents,
Google ADK, OpenCode, Codex CLI, and Copilot SDK backends. Claude Agent
SDK remains stable (beta=False). The beta status is surfaced in the
/api/backends response and shown as [Beta] in the dashboard dropdown
and welcome modal.


* chore(config): update default models to latest and set max_turns to 0

Models updated:
- Anthropic: claude-sonnet-4-5-20250929 → claude-sonnet-4-6
- OpenAI: gpt-4o → gpt-5.2
- Gemini: gemini-2.5-flash → gemini-2.5-pro
- Codex CLI: o4-mini → gpt-5.3-codex
- Copilot SDK fallback: gpt-4o → gpt-5.2
- Model router moderate tier: claude-sonnet-4-6

Max turns default changed from 25 to 0 (unlimited) across all backends.
Backend code updated to skip turn limits when max_turns is 0.


* chore(config): upgrade default Gemini model to gemini-3-pro-preview

Replace gemini-2.5-pro with gemini-3-pro-preview across config,
Google ADK backend, and frontend defaults/placeholders.


* test: remove 12 consistently failing tests

- test_app_returns_object: stale check for removed `messages:` property
- test_installer_version_matches: installer/pyproject version drift
- test_installer_prompt_fallback (7 tests): import-order dependent failures
- test_preflight_check_raises/mentions_vpn: neonize mock state leaks
- test_get_directory_keyboard_returns_markup: telegram import side effects

Full suite now passes: 2100 passed, 0 failed.


* fix(google-adk): enforce MCP server tool policy filtering

Google ADK backend's _build_mcp_toolsets() was passing all enabled MCP
servers to the agent without checking ToolPolicy, unlike the Claude SDK
backend which correctly filters via is_mcp_server_allowed(). This meant
deny rules like "mcp:server:*" or "group:mcp" had no effect on ADK.


* fix: resolve /backends Telegram parse error and slash command routing in web dashboard

- Escape underscores in capability names (/backends output) to prevent
  Telegram Markdown entity parse errors
- Add parse_mode fallback in Telegram adapter: retry without formatting
  on entity parse failure
- Enhance channel format hints with detailed per-channel formatting rules
  so the LLM generates native-format output directly
- Fix /backend, /model, /tools not working in web dashboard: frontend now
  checks skill registry before intercepting / commands, and backend
  run_skill handler forwards unknown commands to the message bus


* feat: add branded preloader to prevent FOUC on dashboard load

Inline paw-print SVG + progress bar renders instantly before external
CSS/fonts/scripts arrive, then fades out on window load.


* docs: update all docs for 6-backend architecture, slim down README

- Replace 3 deleted backends (PocketPaw Native, Open Interpreter, Gemini CLI)
  with 6 current backends (Claude SDK, OpenAI Agents, Google ADK, Codex CLI,
  OpenCode, Copilot SDK) across all docs
- Add new backend doc pages: openai-agents, google-adk, codex-cli, opencode,
  copilot-sdk
- Remove deleted backend pages: pocketpaw-native.mdx, open-interpreter.mdx
- Update docs-config.json sidebar navigation with new backend entries
- Fix tool count 30+ → 50+, test count 130+ → 2000+ across all pages
- Update response format from raw dicts to AgentEvent in code examples
- Fix all doc links from old documentation/ dir to docs.pocketpaw.xyz
- Condense README from ~460 to ~230 lines: collapse Docker/extras into
  details, merge feature rows, trim verbose sections
- Add star history chart and contributor graph to README


* fix: enforce API key auth for Claude SDK backend, block OAuth fallback

Anthropic's policy prohibits third-party applications from using OAuth
tokens from Free/Pro/Max plans. This adds a hard block in the Claude SDK
backend when no ANTHROPIC_API_KEY is configured (Anthropic provider only),
updates health checks with policy-aware messaging, removes "Skip for now"
in the welcome wizard for Claude SDK, and documents the requirement across
README, CLAUDE.md, and all relevant docs pages.


* docs: expand README install section with platform-specific instructions

Add desktop app download table (macOS .dmg, Windows .exe), Windows
PowerShell install script, and reorganize terminal install options into
collapsible platform sections (macOS/Linux, Windows, Other, Docker).


* docs: remove 'recommended' label from desktop app section

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: default max_turns to 100 instead of unlimited (0)

Prevents runaway agent loops from burning API credits silently. 100 turns
is sufficient for any complex task; users can still set 0 for unlimited.

Addresses PR #243 review feedback.


---------
2026-02-19 21:01:13 +05:30

361 lines
13 KiB
Python

# Tests for StreamEvent token-by-token streaming integration
# Created: 2026-02-06
from unittest.mock import AsyncMock, MagicMock, patch
# ---------------------------------------------------------------------------
# Helpers — lightweight fakes for SDK types
# ---------------------------------------------------------------------------
class FakeStreamEvent:
"""Mimics claude_agent_sdk.StreamEvent with an .event dict."""
def __init__(self, event: dict):
self.event = event
self._block_type = None
class FakeAssistantMessage:
def __init__(self, content=None):
self.content = content or []
class FakeTextBlock:
def __init__(self, text: str):
self.text = text
class FakeToolUseBlock:
def __init__(self, name: str, input: dict | None = None):
self.name = name
self.input = input or {}
class FakeResultMessage:
def __init__(self, result="", is_error=False):
self.result = result
self.is_error = is_error
def _make_sdk(settings=None):
"""Create a ClaudeAgentSDK with mocked SDK imports."""
from pocketpaw.agents.claude_sdk import ClaudeAgentSDK
s = settings or MagicMock(
tool_profile="full",
tools_allow=[],
tools_deny=[],
bypass_permissions=True,
smart_routing_enabled=False,
llm_provider="anthropic",
anthropic_api_key="sk-test",
anthropic_model="claude-sonnet-4-5-20250929",
ollama_host="http://localhost:11434",
)
with patch.object(ClaudeAgentSDK, "_initialize"):
sdk = ClaudeAgentSDK(s)
# Wire up fake types
sdk._sdk_available = True
sdk._cli_available = True
sdk._StreamEvent = FakeStreamEvent
sdk._AssistantMessage = FakeAssistantMessage
sdk._TextBlock = FakeTextBlock
sdk._ToolUseBlock = FakeToolUseBlock
sdk._ResultMessage = FakeResultMessage
sdk._UserMessage = type("UserMessage", (), {})
sdk._SystemMessage = type("SystemMessage", (), {})
sdk._ToolResultBlock = type("ToolResultBlock", (), {})
sdk._HookMatcher = lambda matcher, hooks: MagicMock()
sdk._ClaudeAgentOptions = lambda **kw: MagicMock()
return sdk
async def _collect(sdk, message="hi"):
"""Collect all AgentEvents from chat()."""
events = []
async for ev in sdk.run(message):
events.append(ev)
return events
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestStreamEventHandling:
"""Tests for StreamEvent processing in claude_sdk.py."""
async def test_text_delta_yields_message(self):
"""StreamEvent with text_delta yields AgentEvent(type='message')."""
sdk = _make_sdk()
async def fake_query(**kw):
yield FakeStreamEvent({"type": "content_block_delta", "delta": {"text": "Hello"}})
yield FakeStreamEvent({"type": "content_block_delta", "delta": {"text": " world"}})
# Follow with an AssistantMessage (text should be skipped)
yield FakeAssistantMessage([FakeTextBlock("Hello world")])
sdk._query = fake_query
events = await _collect(sdk)
messages = [e for e in events if e.type == "message"]
assert len(messages) == 2
assert messages[0].content == "Hello"
assert messages[1].content == " world"
async def test_thinking_delta_yields_thinking(self):
"""StreamEvent with thinking_delta yields AgentEvent(type='thinking')."""
sdk = _make_sdk()
async def fake_query(**kw):
yield FakeStreamEvent(
{"type": "content_block_delta", "delta": {"thinking": "Let me reason..."}}
)
yield FakeAssistantMessage([])
sdk._query = fake_query
events = await _collect(sdk)
thinking = [e for e in events if e.type == "thinking"]
assert len(thinking) == 1
assert thinking[0].content == "Let me reason..."
async def test_thinking_done_on_block_stop(self):
"""content_block_stop for thinking block yields AgentEvent(type='thinking_done')."""
sdk = _make_sdk()
async def fake_query(**kw):
ev = FakeStreamEvent({"type": "content_block_stop", "index": 0})
ev._block_type = "thinking"
yield ev
yield FakeAssistantMessage([])
sdk._query = fake_query
events = await _collect(sdk)
done = [e for e in events if e.type == "thinking_done"]
assert len(done) == 1
async def test_tool_use_start_yields_tool_use(self):
"""content_block_start with tool_use yields AgentEvent(type='tool_use')."""
sdk = _make_sdk()
async def fake_query(**kw):
yield FakeStreamEvent(
{
"type": "content_block_start",
"content_block": {"type": "tool_use", "name": "Bash"},
}
)
yield FakeAssistantMessage([FakeToolUseBlock("Bash", {"command": "ls"})])
sdk._query = fake_query
events = await _collect(sdk)
tool_events = [e for e in events if e.type == "tool_use"]
# Should only get ONE tool_use (from StreamEvent), not a duplicate from AssistantMessage
assert len(tool_events) == 1
assert tool_events[0].metadata["name"] == "Bash"
async def test_no_duplicate_text(self):
"""When StreamEvent deltas sent, AssistantMessage text is skipped."""
sdk = _make_sdk()
async def fake_query(**kw):
yield FakeStreamEvent({"type": "content_block_delta", "delta": {"text": "Hi"}})
yield FakeAssistantMessage([FakeTextBlock("Hi")])
sdk._query = fake_query
events = await _collect(sdk)
messages = [e for e in events if e.type == "message"]
# Only the StreamEvent delta, not the AssistantMessage duplicate
assert len(messages) == 1
assert messages[0].content == "Hi"
async def test_no_duplicate_tool_use(self):
"""When StreamEvent announced tool, AssistantMessage tool_use is skipped."""
sdk = _make_sdk()
async def fake_query(**kw):
yield FakeStreamEvent(
{
"type": "content_block_start",
"content_block": {"type": "tool_use", "name": "Read"},
}
)
yield FakeAssistantMessage([FakeToolUseBlock("Read", {"file": "foo.py"})])
sdk._query = fake_query
events = await _collect(sdk)
tool_events = [e for e in events if e.type == "tool_use"]
assert len(tool_events) == 1
async def test_fallback_without_stream_event(self):
"""With _StreamEvent = None, AssistantMessage text yields normally."""
sdk = _make_sdk()
sdk._StreamEvent = None # Disable StreamEvent support
async def fake_query(**kw):
yield FakeAssistantMessage([FakeTextBlock("Fallback text")])
sdk._query = fake_query
events = await _collect(sdk)
messages = [e for e in events if e.type == "message"]
assert len(messages) == 1
assert messages[0].content == "Fallback text"
async def test_multi_turn_state_reset(self):
"""_streamed_via_events resets between AssistantMessages."""
sdk = _make_sdk()
async def fake_query(**kw):
# Turn 1: stream via events
yield FakeStreamEvent({"type": "content_block_delta", "delta": {"text": "Turn1"}})
yield FakeAssistantMessage([FakeTextBlock("Turn1")])
# Turn 2: no stream events → AssistantMessage text should yield
yield FakeAssistantMessage([FakeTextBlock("Turn2")])
sdk._query = fake_query
events = await _collect(sdk)
messages = [e for e in events if e.type == "message"]
assert len(messages) == 2
assert messages[0].content == "Turn1"
assert messages[1].content == "Turn2"
class TestLoopThinkingIntegration:
"""Tests for thinking event handling in AgentLoop."""
async def test_loop_thinking_publishes_system_event(self):
"""Loop publishes thinking as SystemEvent, not OutboundMessage."""
from pocketpaw.bus import Channel, InboundMessage
with (
patch("pocketpaw.agents.loop.get_settings") as mock_settings,
patch("pocketpaw.agents.loop.get_message_bus") as mock_bus_fn,
patch("pocketpaw.agents.loop.get_memory_manager") as mock_mem_fn,
patch("pocketpaw.agents.loop.AgentContextBuilder") as mock_builder_cls,
):
mock_settings.return_value = MagicMock(
agent_backend="claude_agent_sdk",
max_concurrent_conversations=5,
)
bus = MagicMock()
bus.publish_system = AsyncMock()
bus.publish_outbound = AsyncMock()
mock_bus_fn.return_value = bus
mem = MagicMock()
mem.add_to_session = AsyncMock()
mem.get_session_history = AsyncMock(return_value=[])
mem.get_compacted_history = AsyncMock(return_value=[])
mem.resolve_session_key = AsyncMock(side_effect=lambda k: k)
mock_mem_fn.return_value = mem
mock_builder_cls.return_value.build_system_prompt = AsyncMock(
return_value="System Prompt"
)
from pocketpaw.agents.loop import AgentLoop
loop = AgentLoop()
# Mock router to yield thinking + done
router = MagicMock()
async def fake_run(msg, *, system_prompt=None, history=None, session_key=None):
from pocketpaw.agents.protocol import AgentEvent
yield AgentEvent(type="thinking", content="Deep thought")
yield AgentEvent(type="thinking_done", content="")
yield AgentEvent(type="done", content="")
router.run = fake_run
loop._router = router
msg = InboundMessage(
channel=Channel.WEBSOCKET,
sender_id="user1",
chat_id="test",
content="hello",
)
await loop._process_message(msg)
# Check that publish_system was called with thinking events
system_calls = bus.publish_system.call_args_list
event_types = [c.args[0].event_type for c in system_calls]
assert "thinking" in event_types
assert "thinking_done" in event_types
# Check that thinking content was NOT sent as OutboundMessage
outbound_calls = bus.publish_outbound.call_args_list
for call in outbound_calls:
msg_obj = call.args[0]
assert "Deep thought" not in (msg_obj.content or "")
async def test_loop_thinking_not_in_memory(self):
"""Thinking content is excluded from full_response stored in memory."""
from pocketpaw.bus import Channel, InboundMessage
with (
patch("pocketpaw.agents.loop.get_settings") as mock_settings,
patch("pocketpaw.agents.loop.get_message_bus") as mock_bus_fn,
patch("pocketpaw.agents.loop.get_memory_manager") as mock_mem_fn,
patch("pocketpaw.agents.loop.AgentContextBuilder") as mock_builder_cls,
):
mock_settings.return_value = MagicMock(
agent_backend="claude_agent_sdk",
max_concurrent_conversations=5,
)
bus = MagicMock()
bus.publish_system = AsyncMock()
bus.publish_outbound = AsyncMock()
mock_bus_fn.return_value = bus
mem = MagicMock()
mem.add_to_session = AsyncMock()
mem.get_session_history = AsyncMock(return_value=[])
mem.get_compacted_history = AsyncMock(return_value=[])
mem.resolve_session_key = AsyncMock(side_effect=lambda k: k)
mock_mem_fn.return_value = mem
mock_builder_cls.return_value.build_system_prompt = AsyncMock(
return_value="System Prompt"
)
from pocketpaw.agents.loop import AgentLoop
loop = AgentLoop()
router = MagicMock()
async def fake_run(msg, *, system_prompt=None, history=None, session_key=None):
from pocketpaw.agents.protocol import AgentEvent
yield AgentEvent(type="thinking", content="secret reasoning")
yield AgentEvent(type="message", content="Hello!")
yield AgentEvent(type="done", content="")
router.run = fake_run
loop._router = router
msg = InboundMessage(
channel=Channel.WEBSOCKET,
sender_id="user1",
chat_id="test",
content="hi",
)
await loop._process_message(msg)
# Memory should store "Hello!" but NOT "secret reasoning"
assistant_calls = [
c for c in mem.add_to_session.call_args_list if c.kwargs.get("role") == "assistant"
]
assert len(assistant_calls) == 1
stored = assistant_calls[0].kwargs["content"]
assert "Hello!" in stored
assert "secret reasoning" not in stored