Files
pocketpaw/tests/test_simulation_clock.py
Ragini Pandey 2bdb07308c feat: add SimulationClock primitive for tick-synchronized multi-agent execution
Add a lightweight discrete tick-based clock for synchronized multi-agent
simulation (issue #633). Agents all act on the same tick, world updates,
then the clock advances to the next tick.

- SimulationClock: advance(), elapsed(), reset(), wait_for_tick(),
  record_snapshot(), get_snapshots(), get_snapshot_at()
- TickSnapshot: per-tick world state record for replay/analysis
- DependencyScheduler: optional tick-synchronized dispatch mode via
  run_tick_synchronized() that dispatches all ready tasks per tick,
  waits for completion, records snapshots, then advances the clock
- MCTaskExecutor: injects simulation_tick into task prompt when present
  in task metadata
- Exported SimulationClock and TickSnapshot from deep_work package
- 17 new tests covering clock, snapshots, scheduler integration, and
  prompt injection
2026-03-26 02:06:52 +05:30

305 lines
11 KiB
Python

# Tests for SimulationClock and tick-synchronized scheduler dispatch.
# Created: 2026-03-26
#
# Covers:
# - SimulationClock: advance, elapsed, reset, wait_for_tick, snapshots
# - TickSnapshot: to_dict / from_dict round-trip
# - DependencyScheduler.run_tick_synchronized: tick-stepped dispatch with snapshots
# - MCTaskExecutor._build_task_prompt: simulation_tick metadata injection
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from pocketpaw.deep_work.clock import SimulationClock, TickSnapshot
from pocketpaw.deep_work.scheduler import DependencyScheduler
from pocketpaw.mission_control.models import Task, TaskPriority, TaskStatus
# ============================================================================
# SimulationClock
# ============================================================================
class TestSimulationClock:
async def test_initial_tick_is_zero(self):
clock = SimulationClock()
assert clock.current_tick == 0
assert clock.elapsed() == 0
async def test_advance_increments_tick(self):
clock = SimulationClock()
new_tick = await clock.advance()
assert new_tick == 1
assert clock.current_tick == 1
assert clock.elapsed() == 1
async def test_advance_multiple_times(self):
clock = SimulationClock()
for expected in range(1, 6):
tick = await clock.advance()
assert tick == expected
assert clock.current_tick == 5
async def test_reset_clears_tick_and_snapshots(self):
clock = SimulationClock()
await clock.advance()
await clock.advance()
clock.record_snapshot(TickSnapshot(tick=1, task_states={"t1": "done"}))
clock.reset()
assert clock.current_tick == 0
assert clock.get_snapshots() == []
async def test_record_and_get_snapshots(self):
clock = SimulationClock()
snap1 = TickSnapshot(tick=0, task_states={"t1": "inbox"})
snap2 = TickSnapshot(tick=1, task_states={"t1": "done"})
clock.record_snapshot(snap1)
clock.record_snapshot(snap2)
assert len(clock.get_snapshots()) == 2
assert clock.get_snapshots()[0].tick == 0
assert clock.get_snapshots()[1].tick == 1
async def test_get_snapshot_at(self):
clock = SimulationClock()
snap = TickSnapshot(tick=3, task_states={"t1": "done"})
clock.record_snapshot(snap)
assert clock.get_snapshot_at(3) is not None
assert clock.get_snapshot_at(3).tick == 3
assert clock.get_snapshot_at(99) is None
async def test_get_snapshots_returns_copy(self):
"""Mutating the returned list should not affect internal state."""
clock = SimulationClock()
clock.record_snapshot(TickSnapshot(tick=0))
snaps = clock.get_snapshots()
snaps.clear()
assert len(clock.get_snapshots()) == 1
# ============================================================================
# TickSnapshot
# ============================================================================
class TestTickSnapshot:
def test_to_dict(self):
snap = TickSnapshot(tick=5, task_states={"t1": "done"}, metadata={"key": "val"})
d = snap.to_dict()
assert d == {"tick": 5, "task_states": {"t1": "done"}, "metadata": {"key": "val"}}
def test_from_dict(self):
data = {"tick": 3, "task_states": {"t2": "inbox"}, "metadata": {"x": 1}}
snap = TickSnapshot.from_dict(data)
assert snap.tick == 3
assert snap.task_states == {"t2": "inbox"}
assert snap.metadata == {"x": 1}
def test_round_trip(self):
original = TickSnapshot(tick=7, task_states={"a": "done", "b": "inbox"}, metadata={"n": 2})
restored = TickSnapshot.from_dict(original.to_dict())
assert restored.tick == original.tick
assert restored.task_states == original.task_states
assert restored.metadata == original.metadata
def test_from_dict_defaults(self):
snap = TickSnapshot.from_dict({})
assert snap.tick == 0
assert snap.task_states == {}
assert snap.metadata == {}
# ============================================================================
# DependencyScheduler — tick-synchronized dispatch
# ============================================================================
def _make_task(
task_id: str,
status: TaskStatus = TaskStatus.INBOX,
project_id: str = "proj-1",
blocked_by: list[str] | None = None,
task_type: str = "agent",
assignee_ids: list[str] | None = None,
title: str = "",
) -> Task:
return Task(
id=task_id,
title=title or f"Task {task_id}",
status=status,
project_id=project_id,
blocked_by=blocked_by or [],
task_type=task_type,
assignee_ids=assignee_ids or [],
)
@pytest.fixture
def mock_manager():
manager = AsyncMock()
manager.list_tasks = AsyncMock(return_value=[])
manager.get_project_tasks = AsyncMock(return_value=[])
manager.get_task = AsyncMock(return_value=None)
manager.get_project = AsyncMock(return_value=None)
manager.update_project = AsyncMock()
manager.save_task = AsyncMock()
return manager
@pytest.fixture
def mock_executor():
executor = AsyncMock()
executor.execute_task_background = AsyncMock()
executor.is_task_running = MagicMock(return_value=False)
return executor
@pytest.fixture
def clock():
return SimulationClock()
class TestRunTickSynchronized:
async def test_raises_without_clock(self, mock_manager, mock_executor):
scheduler = DependencyScheduler(mock_manager, mock_executor)
with pytest.raises(RuntimeError, match="SimulationClock"):
await scheduler.run_tick_synchronized("proj-1")
async def test_empty_project_returns_no_snapshots(self, mock_manager, mock_executor, clock):
"""A project with no ready tasks yields no snapshots."""
mock_manager.get_project_tasks.return_value = []
scheduler = DependencyScheduler(mock_manager, mock_executor, clock=clock)
snapshots = await scheduler.run_tick_synchronized("proj-1")
assert snapshots == []
assert clock.current_tick == 0
async def test_single_tick_dispatches_all_ready_tasks(self, mock_manager, mock_executor, clock):
"""All unblocked tasks dispatch in one tick, then clock advances."""
t1 = _make_task("t1", assignee_ids=["a1"])
t2 = _make_task("t2", assignee_ids=["a2"])
call_count = 0
async def fake_get_project_tasks(pid):
nonlocal call_count
call_count += 1
if call_count <= 2:
# First call: get_ready_tasks returns t1+t2
# Second call: snapshot recording
return [t1, t2]
# After dispatch, tasks are done
t1_done = _make_task("t1", status=TaskStatus.DONE, assignee_ids=["a1"])
t2_done = _make_task("t2", status=TaskStatus.DONE, assignee_ids=["a2"])
return [t1_done, t2_done]
mock_manager.get_project_tasks.side_effect = fake_get_project_tasks
mock_manager.get_task.side_effect = lambda tid: {
"t1": _make_task("t1", status=TaskStatus.INBOX, assignee_ids=["a1"]),
"t2": _make_task("t2", status=TaskStatus.INBOX, assignee_ids=["a2"]),
}.get(tid)
from pocketpaw.deep_work.models import Project, ProjectStatus
mock_manager.get_project.return_value = Project(id="proj-1", status=ProjectStatus.EXECUTING)
scheduler = DependencyScheduler(mock_manager, mock_executor, clock=clock)
snapshots = await scheduler.run_tick_synchronized("proj-1")
# Tasks should have been dispatched
assert mock_executor.execute_task_background.call_count == 2
# simulation_tick metadata should be stamped
assert t1.metadata.get("simulation_tick") == 0
assert t2.metadata.get("simulation_tick") == 0
# Clock should have advanced at least once
assert clock.current_tick >= 1
assert len(snapshots) >= 1
async def test_tick_metadata_is_stamped_on_task(self, mock_manager, mock_executor, clock):
"""Tasks dispatched in tick-synchronized mode get simulation_tick in metadata."""
t1 = _make_task("t1", assignee_ids=["a1"])
first_call = True
async def fake_get_project_tasks(pid):
nonlocal first_call
if first_call:
first_call = False
return [t1]
return [_make_task("t1", status=TaskStatus.DONE, assignee_ids=["a1"])]
mock_manager.get_project_tasks.side_effect = fake_get_project_tasks
mock_manager.get_task.return_value = _make_task(
"t1", status=TaskStatus.INBOX, assignee_ids=["a1"]
)
mock_manager.get_project.return_value = MagicMock(status="executing", title="Test")
scheduler = DependencyScheduler(mock_manager, mock_executor, clock=clock)
await scheduler.run_tick_synchronized("proj-1")
# save_task should have been called with the tick stamped
mock_manager.save_task.assert_called()
saved_task = mock_manager.save_task.call_args[0][0]
assert saved_task.metadata["simulation_tick"] == 0
# ============================================================================
# MCTaskExecutor — simulation_tick in prompt
# ============================================================================
class TestSimulationTickInPrompt:
async def test_prompt_includes_simulation_tick(self):
"""When task metadata has simulation_tick, it appears in the prompt."""
from pocketpaw.mission_control.executor import MCTaskExecutor
executor = MCTaskExecutor()
# Use a task with no project_id to avoid project-context lookups
task = Task(
id="t1",
title="Research competitors",
priority=TaskPriority.MEDIUM,
metadata={"simulation_tick": 3},
)
agent = MagicMock()
agent.name = "Agent-1"
agent.role = "researcher"
agent.description = ""
agent.specialties = []
agent.backend = "claude_agent_sdk"
with patch("pocketpaw.mission_control.executor.get_mission_control_manager") as mock_mgr:
mock_mgr.return_value = AsyncMock()
prompt = await executor._build_task_prompt(task, agent)
assert "**Simulation Tick:** 3" in prompt
async def test_prompt_excludes_simulation_tick_when_absent(self):
"""When task metadata has no simulation_tick, prompt has no tick line."""
from pocketpaw.mission_control.executor import MCTaskExecutor
executor = MCTaskExecutor()
task = Task(
id="t1",
title="Research competitors",
priority=TaskPriority.MEDIUM,
)
agent = MagicMock()
agent.name = "Agent-1"
agent.role = "researcher"
agent.description = ""
agent.specialties = []
agent.backend = "claude_agent_sdk"
with patch("pocketpaw.mission_control.executor.get_mission_control_manager") as mock_mgr:
mock_mgr.return_value = AsyncMock()
prompt = await executor._build_task_prompt(task, agent)
assert "Simulation Tick" not in prompt