mirror of
https://github.com/pocketpaw/pocketpaw.git
synced 2026-05-13 21:21:53 +00:00
- Added TraceStore class for managing trace data with daily JSONL partitioning. - Implemented methods for appending, retrieving, and cleaning up traces. - Introduced helper functions for parsing timestamps and calculating trace costs. - Created API endpoints for accessing trace data and analytics. feat(api): Add budget and analytics API endpoints - Implemented budget status and override management routes. - Added analytics endpoints for cost, performance, usage, and health metrics. - Created tests for budget and analytics API functionality. test(traces): Add comprehensive tests for trace storage and API - Developed unit tests for trace storage helpers and integration tests for trace propagation. - Added tests for budget and analytics API endpoints to ensure correct behavior. - Included tests for trace collector event aggregation and lifecycle management.
1499 lines
53 KiB
Python
1499 lines
53 KiB
Python
# Tests for Unified Agent Loop
|
|
# Updated for AgentEvent-based architecture (no more dict chunks)
|
|
|
|
import asyncio
|
|
import logging
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from pocketpaw.agents.loop import AgentLoop
|
|
from pocketpaw.agents.protocol import AgentEvent
|
|
from pocketpaw.bus import Channel, InboundMessage
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_bus():
|
|
bus = MagicMock()
|
|
bus.consume_inbound = AsyncMock()
|
|
bus.publish_outbound = AsyncMock()
|
|
bus.publish_system = AsyncMock()
|
|
return bus
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_memory():
|
|
mem = MagicMock()
|
|
mem.add_to_session = AsyncMock()
|
|
mem.get_session_history = AsyncMock(return_value=[])
|
|
mem.get_compacted_history = AsyncMock(return_value=[])
|
|
mem.resolve_session_key = AsyncMock(side_effect=lambda k: k)
|
|
return mem
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_router():
|
|
"""Mock AgentRouter that yields AgentEvent objects."""
|
|
router = MagicMock()
|
|
|
|
async def mock_run(message, *, system_prompt=None, history=None, session_key=None):
|
|
yield AgentEvent(type="message", content="Hello ")
|
|
yield AgentEvent(type="message", content="world!")
|
|
yield AgentEvent(
|
|
type="tool_use",
|
|
content="Using test_tool...",
|
|
metadata={"name": "test_tool", "input": {}},
|
|
)
|
|
yield AgentEvent(
|
|
type="tool_result",
|
|
content="Tool completed",
|
|
metadata={"name": "test_tool"},
|
|
)
|
|
yield AgentEvent(type="done", content="")
|
|
|
|
router.run = mock_run
|
|
router.stop = AsyncMock()
|
|
return router
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_agent_loop_process_message(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
mock_router,
|
|
):
|
|
"""Test that AgentLoop processes messages through the router."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
mock_router_cls.return_value = mock_router
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(return_value="System Prompt")
|
|
|
|
with patch("pocketpaw.agents.loop.get_settings") as mock_settings:
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
mock_settings.return_value = settings
|
|
|
|
with patch("pocketpaw.agents.loop.Settings") as mock_settings_cls:
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="Hello",
|
|
)
|
|
|
|
await loop._process_message(msg)
|
|
|
|
mock_memory.add_to_session.assert_called()
|
|
assert mock_bus.publish_outbound.call_count >= 2
|
|
assert mock_bus.publish_system.call_count >= 1
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@pytest.mark.asyncio
|
|
async def test_agent_loop_reset_router(
|
|
mock_builder_cls, mock_get_memory, mock_get_bus, mock_bus, mock_memory
|
|
):
|
|
"""Test that reset_router clears the router instance."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
|
|
with patch("pocketpaw.agents.loop.get_settings") as mock_settings:
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
mock_settings.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
assert loop._router is None
|
|
loop.reset_router()
|
|
assert loop._router is None
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_agent_loop_handles_error(
|
|
mock_router_cls, mock_builder_cls, mock_get_memory, mock_get_bus, mock_bus, mock_memory
|
|
):
|
|
"""Test that AgentLoop handles errors gracefully."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
|
|
error_router = MagicMock()
|
|
|
|
async def mock_run_error(message, *, system_prompt=None, history=None, session_key=None):
|
|
yield AgentEvent(type="error", content="Something went wrong")
|
|
yield AgentEvent(type="done", content="")
|
|
|
|
error_router.run = mock_run_error
|
|
mock_router_cls.return_value = error_router
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(return_value="System Prompt")
|
|
|
|
with patch("pocketpaw.agents.loop.get_settings") as mock_settings:
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
mock_settings.return_value = settings
|
|
|
|
with patch("pocketpaw.agents.loop.Settings") as mock_settings_cls:
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="Hello",
|
|
)
|
|
|
|
await loop._process_message(msg)
|
|
mock_bus.publish_system.assert_called()
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
async def test_kill_audit_failure_is_logged(
|
|
mock_builder_cls, mock_get_memory, mock_get_bus, mock_bus, mock_memory, caplog
|
|
):
|
|
"""/kill should continue even if audit logging fails, while surfacing logs."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
|
|
with patch("pocketpaw.agents.loop.get_settings") as mock_settings:
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
mock_settings.return_value = settings
|
|
|
|
with patch("pocketpaw.agents.loop.Settings") as mock_settings_cls:
|
|
mock_settings_cls.load.return_value = settings
|
|
loop = AgentLoop()
|
|
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="/kill",
|
|
)
|
|
|
|
consume_calls = 0
|
|
|
|
async def _consume_once(timeout=1.0):
|
|
nonlocal consume_calls
|
|
consume_calls += 1
|
|
if consume_calls == 1:
|
|
return msg
|
|
loop._running = False
|
|
return None
|
|
|
|
mock_bus.consume_inbound = AsyncMock(side_effect=_consume_once)
|
|
|
|
mock_audit_logger = MagicMock()
|
|
mock_audit_logger.log.side_effect = RuntimeError("audit write failed")
|
|
|
|
with patch(
|
|
"pocketpaw.security.audit.get_audit_logger",
|
|
return_value=mock_audit_logger,
|
|
):
|
|
loop._running = True
|
|
with caplog.at_level(logging.ERROR):
|
|
await loop._loop()
|
|
|
|
assert any(
|
|
"Failed to write audit log for /kill action" in rec.message
|
|
for rec in caplog.records
|
|
)
|
|
# Reply + stream_end should still be sent
|
|
assert mock_bus.publish_outbound.call_count >= 2
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
async def test_recent_file_tracker_failures_are_logged(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
mock_router,
|
|
caplog,
|
|
):
|
|
"""Tool tracking errors should be visible in logs, not silently swallowed."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
mock_router_cls.return_value = mock_router
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(return_value="System Prompt")
|
|
|
|
with patch("pocketpaw.agents.loop.get_settings") as mock_settings:
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
settings.injection_scan_enabled = False
|
|
settings.injection_scan_llm = False
|
|
settings.pii_scan_enabled = False
|
|
settings.pii_scan_memory = False
|
|
settings.welcome_hint_enabled = False
|
|
mock_settings.return_value = settings
|
|
|
|
with patch("pocketpaw.agents.loop.Settings") as mock_settings_cls:
|
|
mock_settings_cls.load.return_value = settings
|
|
loop = AgentLoop()
|
|
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="Run a tool",
|
|
)
|
|
|
|
tracker = MagicMock()
|
|
tracker.record_tool_use.side_effect = RuntimeError("tracker failed")
|
|
|
|
with patch("pocketpaw.agents.loop.get_recent_files_tracker", return_value=tracker):
|
|
with caplog.at_level(logging.DEBUG):
|
|
await loop._process_message(msg)
|
|
|
|
assert any(
|
|
"Failed to record recent file tracker event for tool" in rec.message
|
|
for rec in caplog.records
|
|
)
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_agent_loop_emits_tool_events(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
mock_router,
|
|
):
|
|
"""Test that tool_use and tool_result events are emitted as SystemEvents."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
mock_router_cls.return_value = mock_router
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(return_value="System Prompt")
|
|
|
|
with patch("pocketpaw.agents.loop.get_settings") as mock_settings:
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
mock_settings.return_value = settings
|
|
|
|
with patch("pocketpaw.agents.loop.Settings") as mock_settings_cls:
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="Run a tool",
|
|
)
|
|
|
|
await loop._process_message(msg)
|
|
|
|
system_calls = mock_bus.publish_system.call_args_list
|
|
event_types = [call[0][0].event_type for call in system_calls]
|
|
|
|
assert "thinking" in event_types
|
|
assert "tool_start" in event_types
|
|
assert "tool_result" in event_types
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_agent_loop_builds_context_and_passes_to_router(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
):
|
|
"""Test that AgentLoop builds system prompt and passes it to router."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
|
|
captured_kwargs = {}
|
|
|
|
async def capturing_run(message, *, system_prompt=None, history=None, session_key=None):
|
|
captured_kwargs["system_prompt"] = system_prompt
|
|
captured_kwargs["history"] = history
|
|
yield AgentEvent(type="message", content="OK")
|
|
yield AgentEvent(type="done", content="")
|
|
|
|
router = MagicMock()
|
|
router.run = capturing_run
|
|
router.stop = AsyncMock()
|
|
mock_router_cls.return_value = router
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(
|
|
return_value="You are PocketPaw with identity and memory."
|
|
)
|
|
mock_builder_instance.bootstrap.get_context = AsyncMock(
|
|
return_value=MagicMock(to_identity_block=lambda: "<identity>Test</identity>")
|
|
)
|
|
|
|
session_history = [
|
|
{"role": "user", "content": "previous question"},
|
|
{"role": "assistant", "content": "previous answer"},
|
|
]
|
|
mock_memory.get_compacted_history = AsyncMock(return_value=session_history)
|
|
mock_memory._store.get_session = AsyncMock(return_value=[]) # No messages for this test
|
|
|
|
with patch("pocketpaw.agents.loop.get_settings") as mock_settings:
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
mock_settings.return_value = settings
|
|
|
|
with patch("pocketpaw.agents.loop.Settings") as mock_settings_cls:
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="What did I ask before?",
|
|
)
|
|
|
|
await loop._process_message(msg)
|
|
|
|
mock_builder_instance.build_system_prompt.assert_called_once()
|
|
mock_memory.get_compacted_history.assert_called_once()
|
|
assert captured_kwargs["system_prompt"] == "You are PocketPaw with identity and memory."
|
|
assert captured_kwargs["history"] == session_history
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@pytest.mark.asyncio
|
|
async def test_agent_loop_handles_error_before_router_init(
|
|
mock_builder_cls, mock_get_memory, mock_get_bus, mock_bus, mock_memory
|
|
):
|
|
"""Test that AgentLoop handles errors before router initialization without UnboundLocalError.
|
|
|
|
Regression test for issue #333: If an exception occurs before router is initialized,
|
|
the error handler should not raise UnboundLocalError when trying to call router.stop().
|
|
"""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
|
|
# Make memory.add_to_session raise an exception (before router is initialized)
|
|
mock_memory.add_to_session = AsyncMock(
|
|
side_effect=RuntimeError("Simulated memory failure before router init")
|
|
)
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(return_value="System Prompt")
|
|
|
|
with patch("pocketpaw.agents.loop.get_settings") as mock_settings:
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
settings.injection_scan_enabled = False
|
|
mock_settings.return_value = settings
|
|
|
|
with patch("pocketpaw.agents.loop.Settings") as mock_settings_cls:
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
# Patch health engine to avoid import errors
|
|
with patch("pocketpaw.health.get_health_engine"):
|
|
loop = AgentLoop()
|
|
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="This should fail",
|
|
)
|
|
|
|
# Patch the redact_output function to avoid import/dependency issues
|
|
with patch("pocketpaw.agents.loop.redact_output", side_effect=lambda x: x):
|
|
# This should not raise UnboundLocalError (the bug we're testing)
|
|
await loop._process_message(msg)
|
|
|
|
# Verify error was published to outbound channel
|
|
assert mock_bus.publish_outbound.call_count >= 1
|
|
|
|
# Find the error message among all outbound messages
|
|
error_found = False
|
|
for call in mock_bus.publish_outbound.call_args_list:
|
|
content = call[0][0].content
|
|
if content and "an error occurred" in content.lower():
|
|
assert "simulated memory failure" in content.lower()
|
|
error_found = True
|
|
break
|
|
|
|
assert error_found, "Error message should be published to outbound channel"
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_identity_reinforcement_appended_on_long_conversations(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
):
|
|
"""Full identity block is re-injected periodically when message count is multiple of 5."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
|
|
captured: dict = {}
|
|
|
|
async def capturing_run(message, *, system_prompt=None, history=None, session_key=None):
|
|
captured["system_prompt"] = system_prompt
|
|
yield AgentEvent(type="message", content="OK")
|
|
yield AgentEvent(type="done", content="")
|
|
|
|
router = MagicMock()
|
|
router.run = capturing_run
|
|
router.stop = AsyncMock()
|
|
mock_router_cls.return_value = router
|
|
|
|
# Mock the bootstrap context
|
|
from pocketpaw.bootstrap.protocol import BootstrapContext
|
|
|
|
mock_bootstrap = MagicMock()
|
|
mock_bootstrap.get_context = AsyncMock(
|
|
return_value=BootstrapContext(
|
|
name="TestAgent",
|
|
identity="You are a test agent",
|
|
soul="Test soul",
|
|
style="Test style",
|
|
user_profile="Test profile",
|
|
)
|
|
)
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(
|
|
return_value="<identity>You are PocketPaw</identity>"
|
|
)
|
|
mock_builder_instance.bootstrap = mock_bootstrap
|
|
|
|
# Mock session with 5 messages to trigger reinforcement
|
|
mock_memory._store.get_session = AsyncMock(
|
|
return_value=[MagicMock(role="user", content=f"msg {i}") for i in range(5)]
|
|
)
|
|
mock_memory.get_compacted_history = AsyncMock(
|
|
return_value=[{"role": "user", "content": f"message {i}"} for i in range(5)]
|
|
)
|
|
|
|
with patch("pocketpaw.agents.loop.get_settings") as mock_settings:
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
mock_settings.return_value = settings
|
|
|
|
with patch("pocketpaw.agents.loop.Settings") as mock_settings_cls:
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="Keep going",
|
|
)
|
|
await loop._process_message(msg)
|
|
|
|
# Check that the system prompt contains the re-injected identity block
|
|
system_prompt = captured.get("system_prompt", "")
|
|
assert "<identity>" in system_prompt
|
|
assert "TestAgent" in system_prompt # From the re-injected identity
|
|
assert system_prompt.count("<identity>") >= 2 # Original + re-injected
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_identity_reinforcement_not_appended_on_short_conversations(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
):
|
|
"""Identity re-injection does NOT happen when message count is not multiple of 5."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
|
|
captured: dict = {}
|
|
|
|
async def capturing_run(message, *, system_prompt=None, history=None, session_key=None):
|
|
captured["system_prompt"] = system_prompt
|
|
yield AgentEvent(type="message", content="OK")
|
|
yield AgentEvent(type="done", content="")
|
|
|
|
router = MagicMock()
|
|
router.run = capturing_run
|
|
router.stop = AsyncMock()
|
|
mock_router_cls.return_value = router
|
|
|
|
# Mock the bootstrap context
|
|
from pocketpaw.bootstrap.protocol import BootstrapContext
|
|
|
|
mock_bootstrap = MagicMock()
|
|
mock_bootstrap.get_context = AsyncMock(
|
|
return_value=BootstrapContext(
|
|
name="TestAgent",
|
|
identity="You are a test agent",
|
|
soul="Test soul",
|
|
style="Test style",
|
|
user_profile="Test profile",
|
|
)
|
|
)
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(
|
|
return_value="<identity>You are PocketPaw</identity>"
|
|
)
|
|
mock_builder_instance.bootstrap = mock_bootstrap
|
|
|
|
# Short session — 3 messages (not multiple of 5)
|
|
mock_memory._store.get_session = AsyncMock(
|
|
return_value=[MagicMock(role="user", content=f"msg {i}") for i in range(3)]
|
|
)
|
|
mock_memory.get_compacted_history = AsyncMock(
|
|
return_value=[{"role": "user", "content": f"message {i}"} for i in range(3)]
|
|
)
|
|
|
|
with patch("pocketpaw.agents.loop.get_settings") as mock_settings:
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
mock_settings.return_value = settings
|
|
|
|
with patch("pocketpaw.agents.loop.Settings") as mock_settings_cls:
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="Hello",
|
|
)
|
|
await loop._process_message(msg)
|
|
|
|
# Check that no additional identity block was appended
|
|
system_prompt = captured.get("system_prompt", "")
|
|
assert system_prompt.count("<identity>") == 1 # Only the original one
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Session-lock GC tests (regression for memory-leak bug)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gc_removes_stale_locks():
|
|
"""
|
|
_gc_session_locks must evict locks that have been idle longer than
|
|
_SESSION_LOCK_TTL and are not currently held.
|
|
"""
|
|
import time
|
|
|
|
from pocketpaw.agents.loop import _SESSION_LOCK_TTL, AgentLoop
|
|
|
|
with (
|
|
patch("pocketpaw.agents.loop.get_message_bus"),
|
|
patch("pocketpaw.agents.loop.get_memory_manager"),
|
|
patch("pocketpaw.agents.loop.AgentContextBuilder"),
|
|
patch("pocketpaw.agents.loop.get_settings") as mock_get_settings,
|
|
):
|
|
settings = MagicMock()
|
|
settings.max_concurrent_conversations = 5
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
mock_get_settings.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
|
|
# Manually insert a stale lock: last_used is far in the past
|
|
stale_key = "session:stale"
|
|
loop._session_locks[stale_key] = asyncio.Lock()
|
|
loop._session_lock_last_used[stale_key] = time.monotonic() - (_SESSION_LOCK_TTL + 1)
|
|
|
|
# Insert a fresh lock: last_used is now, should NOT be removed
|
|
fresh_key = "session:fresh"
|
|
loop._session_locks[fresh_key] = asyncio.Lock()
|
|
loop._session_lock_last_used[fresh_key] = time.monotonic()
|
|
|
|
# Patch asyncio.sleep so the GC body runs once then stops:
|
|
# - first call returns immediately (skips the 5-minute wait)
|
|
# - second call raises CancelledError to exit the while-True loop
|
|
sleep_call_count = 0
|
|
|
|
async def fast_sleep(_):
|
|
nonlocal sleep_call_count
|
|
sleep_call_count += 1
|
|
if sleep_call_count >= 2:
|
|
raise asyncio.CancelledError # stop after one full GC pass
|
|
|
|
with patch("asyncio.sleep", side_effect=fast_sleep):
|
|
try:
|
|
await loop._gc_session_locks()
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
assert stale_key not in loop._session_locks, "Stale lock should be evicted"
|
|
assert stale_key not in loop._session_lock_last_used, "Stale timestamp should be evicted"
|
|
assert fresh_key in loop._session_locks, "Fresh lock must not be evicted"
|
|
assert fresh_key in loop._session_lock_last_used, "Fresh timestamp must not be evicted"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gc_skips_acquired_locks():
|
|
"""
|
|
_gc_session_locks must NOT evict a lock that is currently held,
|
|
even if its TTL has expired.
|
|
"""
|
|
import time
|
|
|
|
from pocketpaw.agents.loop import _SESSION_LOCK_TTL, AgentLoop
|
|
|
|
with (
|
|
patch("pocketpaw.agents.loop.get_message_bus"),
|
|
patch("pocketpaw.agents.loop.get_memory_manager"),
|
|
patch("pocketpaw.agents.loop.AgentContextBuilder"),
|
|
patch("pocketpaw.agents.loop.get_settings") as mock_get_settings,
|
|
):
|
|
settings = MagicMock()
|
|
settings.max_concurrent_conversations = 5
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
mock_get_settings.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
|
|
held_key = "session:held"
|
|
lock = asyncio.Lock()
|
|
await lock.acquire() # lock is now held — must not be evicted
|
|
loop._session_locks[held_key] = lock
|
|
loop._session_lock_last_used[held_key] = time.monotonic() - (_SESSION_LOCK_TTL + 1)
|
|
|
|
sleep_call_count = 0
|
|
|
|
async def fast_sleep(_):
|
|
nonlocal sleep_call_count
|
|
sleep_call_count += 1
|
|
if sleep_call_count >= 2:
|
|
raise asyncio.CancelledError
|
|
|
|
with patch("asyncio.sleep", side_effect=fast_sleep):
|
|
try:
|
|
await loop._gc_session_locks()
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
assert held_key in loop._session_locks, "Held lock must never be evicted by GC"
|
|
lock.release()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_cancels_gc_task():
|
|
"""
|
|
stop() must cancel the GC background task so it does not outlive the loop.
|
|
"""
|
|
from pocketpaw.agents.loop import AgentLoop
|
|
|
|
with (
|
|
patch("pocketpaw.agents.loop.get_message_bus"),
|
|
patch("pocketpaw.agents.loop.get_memory_manager"),
|
|
patch("pocketpaw.agents.loop.AgentContextBuilder"),
|
|
patch("pocketpaw.agents.loop.get_settings") as mock_get_settings,
|
|
patch("pocketpaw.agents.loop.Settings") as mock_settings_cls,
|
|
):
|
|
settings = MagicMock()
|
|
settings.max_concurrent_conversations = 5
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
mock_get_settings.return_value = settings
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
|
|
# Simulate a live GC task by creating one manually
|
|
async def _noop():
|
|
await asyncio.sleep(9999)
|
|
|
|
loop._lock_gc_task = asyncio.create_task(_noop())
|
|
|
|
await loop.stop()
|
|
|
|
assert loop._lock_gc_task is None, "stop() must clear _lock_gc_task"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auto-TTS tests (voice reply feature)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_auto_tts_triggered_by_voice_message(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
):
|
|
"""Test that voice inbound message triggers auto-TTS and attaches audio."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
|
|
# Mock router yields text response without calling text_to_speech tool
|
|
async def mock_run(message, *, system_prompt=None, history=None, session_key=None):
|
|
yield AgentEvent(type="message", content="I heard your voice message!")
|
|
yield AgentEvent(type="done", content="")
|
|
|
|
router = MagicMock()
|
|
router.run = mock_run
|
|
router.stop = AsyncMock()
|
|
mock_router_cls.return_value = router
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(return_value="System Prompt")
|
|
|
|
with patch("pocketpaw.agents.loop.get_settings") as mock_settings:
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
settings.voice_reply_enabled = True
|
|
settings.injection_scan_enabled = False
|
|
settings.pii_scan_enabled = False
|
|
settings.welcome_hint_enabled = False
|
|
mock_settings.return_value = settings
|
|
|
|
with patch("pocketpaw.agents.loop.Settings") as mock_settings_cls:
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
# Mock synthesize_speech to return a fake audio path
|
|
mock_tts = AsyncMock(return_value="/tmp/tts_12345678.mp3")
|
|
with patch("pocketpaw.tools.builtin.voice.synthesize_speech", mock_tts):
|
|
loop = AgentLoop()
|
|
|
|
# Send voice message (has .ogg media attachment)
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="[voice message]",
|
|
media=["/tmp/voice_input.ogg"],
|
|
)
|
|
|
|
await loop._process_message(msg)
|
|
|
|
# Verify synthesize_speech was called with the agent's response
|
|
mock_tts.assert_called_once_with("I heard your voice message!")
|
|
|
|
# Verify Out boundMessage stream_end includes the generated audio
|
|
outbound_calls = mock_bus.publish_outbound.call_args_list
|
|
stream_end_call = [c for c in outbound_calls if c[0][0].is_stream_end]
|
|
assert len(stream_end_call) == 1
|
|
stream_end_msg = stream_end_call[0][0][0]
|
|
assert "/tmp/tts_12345678.mp3" in stream_end_msg.media
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_auto_tts_skipped_when_agent_already_sent_audio(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
):
|
|
"""Test that auto-TTS is skipped when agent already called text_to_speech tool."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
|
|
# Mock router yields tool_result with media tag (agent already generated audio)
|
|
async def mock_run(message, *, system_prompt=None, history=None, session_key=None):
|
|
yield AgentEvent(type="message", content="Here's my voice reply")
|
|
yield AgentEvent(
|
|
type="tool_result",
|
|
content="<!-- media:/tmp/agent_tts.mp3 -->",
|
|
metadata={"name": "text_to_speech"},
|
|
)
|
|
yield AgentEvent(type="done", content="")
|
|
|
|
router = MagicMock()
|
|
router.run = mock_run
|
|
router.stop = AsyncMock()
|
|
mock_router_cls.return_value = router
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(return_value="System Prompt")
|
|
|
|
with patch("pocketpaw.agents.loop.get_settings") as mock_settings:
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
settings.voice_reply_enabled = True
|
|
settings.injection_scan_enabled = False
|
|
settings.pii_scan_enabled = False
|
|
settings.welcome_hint_enabled = False
|
|
mock_settings.return_value = settings
|
|
|
|
with patch("pocketpaw.agents.loop.Settings") as mock_settings_cls:
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
mock_tts = AsyncMock()
|
|
with patch("pocketpaw.tools.builtin.voice.synthesize_speech", mock_tts):
|
|
loop = AgentLoop()
|
|
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="[voice]",
|
|
media=["/tmp/voice.ogg"],
|
|
)
|
|
|
|
await loop._process_message(msg)
|
|
|
|
# synthesize_speech should NOT be called (agent already provided audio)
|
|
mock_tts.assert_not_called()
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_auto_tts_disabled_by_setting(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
):
|
|
"""Test that auto-TTS is skipped when voice_reply_enabled=False."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
|
|
async def mock_run(message, *, system_prompt=None, history=None, session_key=None):
|
|
yield AgentEvent(type="message", content="Response")
|
|
yield AgentEvent(type="done", content="")
|
|
|
|
router = MagicMock()
|
|
router.run = mock_run
|
|
router.stop = AsyncMock()
|
|
mock_router_cls.return_value = router
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(return_value="System Prompt")
|
|
|
|
with patch("pocketpaw.agents.loop.get_settings") as mock_settings:
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
settings.voice_reply_enabled = False # Disabled
|
|
settings.injection_scan_enabled = False
|
|
settings.pii_scan_enabled = False
|
|
settings.welcome_hint_enabled = False
|
|
mock_settings.return_value = settings
|
|
|
|
with patch("pocketpaw.agents.loop.Settings") as mock_settings_cls:
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
mock_tts = AsyncMock()
|
|
with patch("pocketpaw.tools.builtin.voice.synthesize_speech", mock_tts):
|
|
loop = AgentLoop()
|
|
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="[voice]",
|
|
media=["/tmp/voice.ogg"],
|
|
)
|
|
|
|
await loop._process_message(msg)
|
|
|
|
# synthesize_speech should NOT be called (feature disabled)
|
|
mock_tts.assert_not_called()
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_auto_tts_handles_synthesis_failure_gracefully(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
):
|
|
"""Test that auto-TTS failure doesn't crash the agent loop."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
|
|
async def mock_run(message, *, system_prompt=None, history=None, session_key=None):
|
|
yield AgentEvent(type="message", content="Response text")
|
|
yield AgentEvent(type="done", content="")
|
|
|
|
router = MagicMock()
|
|
router.run = mock_run
|
|
router.stop = AsyncMock()
|
|
mock_router_cls.return_value = router
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(return_value="System Prompt")
|
|
|
|
with patch("pocketpaw.agents.loop.get_settings") as mock_settings:
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
settings.voice_reply_enabled = True
|
|
settings.injection_scan_enabled = False
|
|
settings.pii_scan_enabled = False
|
|
settings.welcome_hint_enabled = False
|
|
mock_settings.return_value = settings
|
|
|
|
with patch("pocketpaw.agents.loop.Settings") as mock_settings_cls:
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
# Mock synthesize_speech to raise an exception
|
|
mock_tts = AsyncMock(side_effect=RuntimeError("TTS service unavailable"))
|
|
with patch("pocketpaw.tools.builtin.voice.synthesize_speech", mock_tts):
|
|
loop = AgentLoop()
|
|
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="[voice]",
|
|
media=["/tmp/voice.wav"],
|
|
)
|
|
|
|
# Should not raise — failure is logged and swallowed
|
|
await loop._process_message(msg)
|
|
|
|
mock_tts.assert_called_once()
|
|
|
|
# Verify stream_end was still sent (no audio attached)
|
|
outbound_calls = mock_bus.publish_outbound.call_args_list
|
|
stream_end_call = [c for c in outbound_calls if c[0][0].is_stream_end]
|
|
assert len(stream_end_call) == 1
|
|
# No audio in media list (synthesis failed)
|
|
stream_end_msg = stream_end_call[0][0][0]
|
|
assert len(stream_end_msg.media) == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PR #658 reviewer suggestions: surface swallowed exceptions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_loop_with_settings(mock_get_bus, mock_get_memory, mock_builder_cls):
|
|
"""Helper to build an AgentLoop with standard mock settings."""
|
|
from pocketpaw.agents.loop import AgentLoop
|
|
|
|
with (
|
|
patch("pocketpaw.agents.loop.get_settings") as mock_get_settings,
|
|
patch("pocketpaw.agents.loop.Settings") as mock_settings_cls,
|
|
):
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
mock_get_settings.return_value = settings
|
|
mock_settings_cls.load.return_value = settings
|
|
return AgentLoop()
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_agents_md_discovery_failure_is_silently_logged(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
mock_router,
|
|
caplog,
|
|
):
|
|
"""AGENTS.md discovery failure must be caught and logged at DEBUG, not crash the loop."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
mock_router_cls.return_value = mock_router
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(return_value="System Prompt")
|
|
|
|
with (
|
|
patch("pocketpaw.agents.loop.get_settings") as mock_settings,
|
|
patch("pocketpaw.agents.loop.Settings") as mock_settings_cls,
|
|
patch(
|
|
"pocketpaw.agents_md.AgentsMdLoader.find_and_load",
|
|
side_effect=RuntimeError("disk error"),
|
|
),
|
|
):
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
mock_settings.return_value = settings
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="Hello",
|
|
)
|
|
|
|
with caplog.at_level(logging.DEBUG, logger="pocketpaw.agents.loop"):
|
|
await loop._process_message(msg)
|
|
|
|
# The loop must complete (publish stream-end) despite the AGENTS.md error.
|
|
outbound_calls = [str(c) for c in mock_bus.publish_outbound.call_args_list]
|
|
assert any("is_stream_end" in c for c in outbound_calls), (
|
|
"Loop must still publish stream-end even when AGENTS.md discovery raises"
|
|
)
|
|
assert any("AGENTS.md discovery failed" in r.message for r in caplog.records), (
|
|
"AGENTS.md failure must be logged"
|
|
)
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_token_metrics_persist_failure_is_logged_at_debug(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
caplog,
|
|
):
|
|
"""A crash inside the usage-tracker record() path must be caught and logged, not re-raised."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
|
|
token_router = MagicMock()
|
|
|
|
async def mock_run_with_token_usage(
|
|
message, *, system_prompt=None, history=None, session_key=None
|
|
):
|
|
yield AgentEvent(
|
|
type="token_usage",
|
|
content="",
|
|
metadata={
|
|
"backend": "claude_agent_sdk",
|
|
"model": "claude-3-haiku",
|
|
"input_tokens": 10,
|
|
"output_tokens": 5,
|
|
"cached_input_tokens": 0,
|
|
"total_cost_usd": 0.001,
|
|
},
|
|
)
|
|
yield AgentEvent(type="done", content="")
|
|
|
|
token_router.run = mock_run_with_token_usage
|
|
token_router.stop = AsyncMock()
|
|
mock_router_cls.return_value = token_router
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(return_value="System Prompt")
|
|
|
|
with (
|
|
patch("pocketpaw.agents.loop.get_settings") as mock_settings,
|
|
patch("pocketpaw.agents.loop.Settings") as mock_settings_cls,
|
|
patch(
|
|
"pocketpaw.usage_tracker.get_usage_tracker",
|
|
side_effect=RuntimeError("tracker unavailable"),
|
|
),
|
|
):
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
mock_settings.return_value = settings
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="Tokens please",
|
|
)
|
|
|
|
with caplog.at_level(logging.DEBUG, logger="pocketpaw.agents.loop"):
|
|
await loop._process_message(msg)
|
|
|
|
# Loop must still complete.
|
|
outbound_calls = [str(c) for c in mock_bus.publish_outbound.call_args_list]
|
|
assert any("is_stream_end" in c for c in outbound_calls), (
|
|
"Loop must publish stream-end even when usage tracker raises"
|
|
)
|
|
assert any("token usage metrics" in r.message for r in caplog.records), (
|
|
"Token metrics failure must be logged"
|
|
)
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_budget_exhaustion_blocks_before_router_run(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
):
|
|
"""When budget is exhausted, the loop must block before invoking AgentRouter."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
mock_router_cls.return_value = MagicMock()
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(return_value="System Prompt")
|
|
|
|
tracker = MagicMock()
|
|
tracker.get_summary.return_value = {"total_cost_usd": 12.0}
|
|
|
|
with (
|
|
patch("pocketpaw.agents.loop.get_settings") as mock_settings,
|
|
patch("pocketpaw.agents.loop.Settings") as mock_settings_cls,
|
|
patch("pocketpaw.agents.loop.usage_tracker_module.get_usage_tracker", return_value=tracker),
|
|
):
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
settings.injection_scan_enabled = False
|
|
settings.pii_scan_enabled = False
|
|
settings.pii_scan_memory = False
|
|
settings.welcome_hint_enabled = False
|
|
settings.budget_monthly_usd = 10.0
|
|
settings.budget_warning_threshold = 0.8
|
|
settings.budget_auto_pause = True
|
|
settings.budget_reset_day = 1
|
|
settings.budget_paused = False
|
|
settings.budget_override_usd = None
|
|
settings.budget_override_reason = ""
|
|
settings.budget_override_expires_at = None
|
|
mock_settings.return_value = settings
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="Hello",
|
|
)
|
|
|
|
await loop._process_message(msg)
|
|
|
|
mock_router_cls.assert_not_called()
|
|
outbound_contents = [
|
|
outbound.content
|
|
for outbound in (call.args[0] for call in mock_bus.publish_outbound.call_args_list)
|
|
if hasattr(outbound, "content")
|
|
]
|
|
assert any("Budget exhausted" in content for content in outbound_contents)
|
|
assert any(
|
|
call.args[0].is_stream_end is True for call in mock_bus.publish_outbound.call_args_list
|
|
)
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_budget_warning_event_emitted_on_threshold_cross(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
):
|
|
"""Crossing warning threshold after token_usage should emit budget_warning once."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
|
|
warning_router = MagicMock()
|
|
|
|
async def mock_run_with_token_usage(
|
|
message, *, system_prompt=None, history=None, session_key=None
|
|
):
|
|
yield AgentEvent(
|
|
type="token_usage",
|
|
content="",
|
|
metadata={
|
|
"backend": "claude_agent_sdk",
|
|
"model": "claude-3-haiku",
|
|
"input_tokens": 100,
|
|
"output_tokens": 50,
|
|
"cached_input_tokens": 0,
|
|
"total_cost_usd": 1.5,
|
|
},
|
|
)
|
|
yield AgentEvent(type="done", content="")
|
|
|
|
warning_router.run = mock_run_with_token_usage
|
|
warning_router.stop = AsyncMock()
|
|
mock_router_cls.return_value = warning_router
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(return_value="System Prompt")
|
|
|
|
class MutableTracker:
|
|
def __init__(self, initial_total: float) -> None:
|
|
self.total = initial_total
|
|
|
|
def get_summary(self, since: str | None = None) -> dict[str, float]:
|
|
_ = since
|
|
return {"total_cost_usd": self.total}
|
|
|
|
def record(self, *, total_cost_usd=None, **kwargs) -> None:
|
|
_ = kwargs
|
|
self.total += float(total_cost_usd or 0.0)
|
|
|
|
tracker = MutableTracker(initial_total=7.0)
|
|
|
|
with (
|
|
patch("pocketpaw.agents.loop.get_settings") as mock_settings,
|
|
patch("pocketpaw.agents.loop.Settings") as mock_settings_cls,
|
|
patch("pocketpaw.agents.loop.usage_tracker_module.get_usage_tracker", return_value=tracker),
|
|
):
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
settings.injection_scan_enabled = False
|
|
settings.pii_scan_enabled = False
|
|
settings.pii_scan_memory = False
|
|
settings.welcome_hint_enabled = False
|
|
settings.file_jail_path = "."
|
|
settings.compaction_recent_window = 20
|
|
settings.compaction_char_budget = 30000
|
|
settings.compaction_summary_chars = 1000
|
|
settings.compaction_llm_summarize = False
|
|
settings.tool_profile = "full"
|
|
settings.budget_monthly_usd = 10.0
|
|
settings.budget_warning_threshold = 0.8
|
|
settings.budget_auto_pause = True
|
|
settings.budget_reset_day = 1
|
|
settings.budget_paused = False
|
|
settings.budget_override_usd = None
|
|
settings.budget_override_reason = ""
|
|
settings.budget_override_expires_at = None
|
|
mock_settings.return_value = settings
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="hello",
|
|
)
|
|
|
|
await loop._process_message(msg)
|
|
|
|
budget_events = [
|
|
call.args[0]
|
|
for call in mock_bus.publish_system.call_args_list
|
|
if getattr(call.args[0], "event_type", "") == "budget_warning"
|
|
]
|
|
assert len(budget_events) == 1
|
|
assert budget_events[0].data["session_key"] == "cli:chat1"
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_health_engine_persist_failure_logged_as_warning(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
caplog,
|
|
):
|
|
"""When the health engine itself raises, the failure must be logged at WARNING level."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
|
|
# Router that raises an exception, triggering the health engine path.
|
|
boom_router = MagicMock()
|
|
|
|
async def mock_run_boom(message, *, system_prompt=None, history=None, session_key=None):
|
|
raise RuntimeError("simulated router crash")
|
|
yield # make it an async generator
|
|
|
|
boom_router.run = mock_run_boom
|
|
boom_router.stop = AsyncMock()
|
|
mock_router_cls.return_value = boom_router
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(return_value="System Prompt")
|
|
|
|
with (
|
|
patch("pocketpaw.agents.loop.get_settings") as mock_settings,
|
|
patch("pocketpaw.agents.loop.Settings") as mock_settings_cls,
|
|
patch(
|
|
"pocketpaw.health.get_health_engine",
|
|
side_effect=RuntimeError("health engine down"),
|
|
),
|
|
):
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
mock_settings.return_value = settings
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="Crash me",
|
|
)
|
|
|
|
with caplog.at_level(logging.WARNING, logger="pocketpaw.agents.loop"):
|
|
await loop._process_message(msg)
|
|
|
|
warning_messages = [r.message for r in caplog.records if r.levelno >= logging.WARNING]
|
|
assert any("health engine" in m for m in warning_messages), (
|
|
"Health engine persist failure must be logged at WARNING"
|
|
)
|
|
|
|
|
|
@patch("pocketpaw.agents.loop.get_message_bus")
|
|
@patch("pocketpaw.agents.loop.get_memory_manager")
|
|
@patch("pocketpaw.agents.loop.AgentContextBuilder")
|
|
@patch("pocketpaw.agents.loop.AgentRouter")
|
|
@pytest.mark.asyncio
|
|
async def test_router_stop_failure_logged_as_warning(
|
|
mock_router_cls,
|
|
mock_builder_cls,
|
|
mock_get_memory,
|
|
mock_get_bus,
|
|
mock_bus,
|
|
mock_memory,
|
|
caplog,
|
|
):
|
|
"""router.stop() failure during error handling must be logged at WARNING, not swallowed."""
|
|
mock_get_bus.return_value = mock_bus
|
|
mock_get_memory.return_value = mock_memory
|
|
|
|
flaky_router = MagicMock()
|
|
|
|
async def mock_run_boom(message, *, system_prompt=None, history=None, session_key=None):
|
|
raise RuntimeError("router processing error")
|
|
yield # make it an async generator
|
|
|
|
flaky_router.run = mock_run_boom
|
|
flaky_router.stop = AsyncMock(side_effect=OSError("stop failed"))
|
|
mock_router_cls.return_value = flaky_router
|
|
|
|
mock_builder_instance = mock_builder_cls.return_value
|
|
mock_builder_instance.build_system_prompt = AsyncMock(return_value="System Prompt")
|
|
|
|
with (
|
|
patch("pocketpaw.agents.loop.get_settings") as mock_settings,
|
|
patch("pocketpaw.agents.loop.Settings") as mock_settings_cls,
|
|
):
|
|
settings = MagicMock()
|
|
settings.agent_backend = "claude_agent_sdk"
|
|
settings.max_concurrent_conversations = 5
|
|
mock_settings.return_value = settings
|
|
mock_settings_cls.load.return_value = settings
|
|
|
|
loop = AgentLoop()
|
|
msg = InboundMessage(
|
|
channel=Channel.CLI,
|
|
sender_id="user1",
|
|
chat_id="chat1",
|
|
content="Crash me",
|
|
)
|
|
|
|
with caplog.at_level(logging.WARNING, logger="pocketpaw.agents.loop"):
|
|
await loop._process_message(msg)
|
|
|
|
warning_messages = [r.message for r in caplog.records if r.levelno >= logging.WARNING]
|
|
assert any("router" in m.lower() or "stop" in m.lower() for m in warning_messages), (
|
|
"router.stop() failure must be logged at WARNING level"
|
|
)
|