feat: add SimulationClock primitive for tick-synchronized multi-agent execution

Add a lightweight discrete tick-based clock for synchronized multi-agent
simulation (issue #633). Agents all act on the same tick, world updates,
then the clock advances to the next tick.

- SimulationClock: advance(), elapsed(), reset(), wait_for_tick(),
  record_snapshot(), get_snapshots(), get_snapshot_at()
- TickSnapshot: per-tick world state record for replay/analysis
- DependencyScheduler: optional tick-synchronized dispatch mode via
  run_tick_synchronized() that dispatches all ready tasks per tick,
  waits for completion, records snapshots, then advances the clock
- MCTaskExecutor: injects simulation_tick into task prompt when present
  in task metadata
- Exported SimulationClock and TickSnapshot from deep_work package
- 17 new tests covering clock, snapshots, scheduler integration, and
  prompt injection
This commit is contained in:
Ragini Pandey
2026-03-26 02:06:52 +05:30
parent c84c9bf29f
commit 2bdb07308c
5 changed files with 521 additions and 1 deletions

View File

@@ -4,6 +4,7 @@
# Added PawKitConfig export. Retry/timeout/output fields propagated through.
# Updated: 2026-02-18 — Added GoalParser and GoalAnalysis exports.
# Updated: 2026-02-12 — Added executor integration, public API functions.
# Updated: 2026-03-26 — Added SimulationClock and TickSnapshot exports (issue #633).
#
# Provides a singleton DeepWorkSession and convenience functions for
# starting and managing Deep Work projects.
@@ -20,6 +21,7 @@
import logging
from pocketpaw.deep_work.clock import SimulationClock, TickSnapshot
from pocketpaw.deep_work.goal_parser import GoalAnalysis, GoalParser
from pocketpaw.deep_work.models import (
AgentSpec,
@@ -38,7 +40,9 @@ __all__ = [
"PlannerResult",
"Project",
"ProjectStatus",
"SimulationClock",
"TaskSpec",
"TickSnapshot",
"get_deep_work_session",
"reset_deep_work_session",
"parse_goal",

View File

@@ -0,0 +1,130 @@
# Simulation Clock — Discrete tick-based time for synchronized multi-agent execution.
# Created: 2026-03-26
#
# Provides a lightweight clock that simulation PawKits can use for discrete
# time-stepped execution instead of wall-clock time. Agents all act on tick N,
# world updates, then tick N+1.
#
# See: https://github.com/pocketpaw/pocketpaw/issues/633
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any
logger = logging.getLogger(__name__)
@dataclass
class TickSnapshot:
"""Immutable snapshot of world state at a specific tick.
Attributes:
tick: The tick number this snapshot was taken at.
task_states: Mapping of task_id to status string at this tick.
metadata: Arbitrary extra data captured at this tick.
"""
tick: int
task_states: dict[str, str] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"tick": self.tick,
"task_states": self.task_states,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> TickSnapshot:
"""Create from dictionary."""
return cls(
tick=data.get("tick", 0),
task_states=data.get("task_states", {}),
metadata=data.get("metadata", {}),
)
class SimulationClock:
"""Discrete tick-based clock for synchronized multi-agent simulation.
All agents act on the same tick, the world updates, then the clock
advances to the next tick. This replaces wall-clock time when you
want deterministic, fast-forwarded simulation.
Usage::
clock = SimulationClock()
while not done:
# dispatch all ready tasks for this tick
tick = await clock.advance()
# world updates happen here
Attributes:
current_tick: The current tick number (starts at 0).
"""
def __init__(self) -> None:
self._current_tick: int = 0
self._snapshots: list[TickSnapshot] = []
self._tick_event: asyncio.Event = asyncio.Event()
@property
def current_tick(self) -> int:
"""Return the current tick number."""
return self._current_tick
async def advance(self) -> int:
"""Advance one tick. Returns the new tick number."""
self._current_tick += 1
self._tick_event.set()
self._tick_event.clear()
logger.debug("SimulationClock advanced to tick %d", self._current_tick)
return self._current_tick
def elapsed(self) -> int:
"""Return ticks since start (same as current_tick)."""
return self._current_tick
def reset(self) -> None:
"""Reset the clock to tick 0 and clear all snapshots."""
self._current_tick = 0
self._snapshots.clear()
logger.debug("SimulationClock reset")
async def wait_for_tick(self, tick: int) -> None:
"""Block until the clock reaches the given tick.
Args:
tick: The tick number to wait for.
"""
while self._current_tick < tick:
self._tick_event.clear()
await self._tick_event.wait()
def record_snapshot(self, snapshot: TickSnapshot) -> None:
"""Record a world-state snapshot for the current tick.
Args:
snapshot: The snapshot to record.
"""
self._snapshots.append(snapshot)
def get_snapshots(self) -> list[TickSnapshot]:
"""Return all recorded snapshots (for replay/analysis)."""
return list(self._snapshots)
def get_snapshot_at(self, tick: int) -> TickSnapshot | None:
"""Return the snapshot recorded at a specific tick, or None.
Args:
tick: The tick number to look up.
"""
for snap in self._snapshots:
if snap.tick == tick:
return snap
return None

View File

@@ -3,19 +3,28 @@
# Updated: 2026-02-12 — Treat SKIPPED status same as DONE for blocker resolution
# and project completion checks.
# Optimized: use get_project_tasks (no 100-limit), concurrent dispatch.
# Updated: 2026-03-26 — Added optional SimulationClock integration for tick-synchronized
# dispatch mode (issue #633).
#
# Key features:
# - get_ready_tasks: finds tasks with all blockers satisfied (DONE or SKIPPED)
# - on_task_completed: auto-dispatches newly unblocked tasks
# - validate_graph: cycle detection via Kahn's algorithm (works with Task and TaskSpec)
# - get_execution_order: groups tasks by dependency level (works with Task and TaskSpec)
# - run_tick_synchronized: dispatch all ready tasks per tick, wait, then advance clock
from __future__ import annotations
import asyncio
import logging
from collections import deque
from typing import TYPE_CHECKING
from pocketpaw.mission_control.models import Task, TaskStatus, now_iso
if TYPE_CHECKING:
from pocketpaw.deep_work.clock import SimulationClock, TickSnapshot
logger = logging.getLogger(__name__)
@@ -40,17 +49,19 @@ class DependencyScheduler:
is responsible for wiring on_task_completed to the appropriate bus events.
"""
def __init__(self, manager, executor, human_router=None):
def __init__(self, manager, executor, human_router=None, clock: SimulationClock | None = None):
"""Initialize the scheduler.
Args:
manager: MissionControlManager instance (list_tasks, get_task, etc.)
executor: MCTaskExecutor instance (execute_task_background)
human_router: Optional human notification router (notify_human_task, notify_review_task)
clock: Optional SimulationClock for tick-synchronized dispatch mode.
"""
self.manager = manager
self.executor = executor
self.human_router = human_router
self.clock = clock
async def get_ready_tasks(self, project_id: str) -> list[Task]:
"""Return tasks in project where all blockers are satisfied.
@@ -273,3 +284,69 @@ class DependencyScheduler:
current_level = next_level
return levels
# ------------------------------------------------------------------
# Tick-synchronized dispatch (SimulationClock integration, issue #633)
# ------------------------------------------------------------------
async def run_tick_synchronized(self, project_id: str) -> list[TickSnapshot]:
"""Execute a project in tick-synchronized mode.
Each tick:
1. Gather all ready (unblocked) tasks.
2. Inject ``current_tick`` into each task's metadata.
3. Dispatch them concurrently and wait for all to complete.
4. Record a :class:`TickSnapshot` of task states.
5. Advance the clock.
Repeats until no more tasks are ready (project complete or blocked).
Requires ``self.clock`` to be set.
Args:
project_id: The project to execute.
Returns:
List of :class:`TickSnapshot` objects recorded during execution.
Raises:
RuntimeError: If no SimulationClock is attached.
"""
if self.clock is None:
raise RuntimeError(
"run_tick_synchronized requires a SimulationClock — "
"pass clock= to DependencyScheduler.__init__"
)
from pocketpaw.deep_work.clock import TickSnapshot
while True:
ready = await self.get_ready_tasks(project_id)
if not ready:
break
# Stamp each task with the current tick
for task in ready:
task.metadata["simulation_tick"] = self.clock.current_tick
await self.manager.save_task(task)
# Dispatch all ready tasks and wait for completion
await asyncio.gather(*(self._dispatch_task(t) for t in ready))
# Record snapshot
project_tasks = await self.manager.get_project_tasks(project_id)
snapshot = TickSnapshot(
tick=self.clock.current_tick,
task_states={t.id: t.status.value for t in project_tasks},
)
self.clock.record_snapshot(snapshot)
# Advance clock
await self.clock.advance()
# Check project completion
done = await self.check_project_completion(project_id)
if done:
break
return self.clock.get_snapshots()

View File

@@ -707,6 +707,11 @@ class MCTaskExecutor:
if task.description:
prompt_parts.append(f"**Description:** {task.description}")
# Inject simulation tick context when running in tick-synchronized mode
sim_tick = task.metadata.get("simulation_tick")
if sim_tick is not None:
prompt_parts.append(f"**Simulation Tick:** {sim_tick}")
prompt_parts.extend(
[
f"**Priority:** {task.priority.value}",

View File

@@ -0,0 +1,304 @@
# Tests for SimulationClock and tick-synchronized scheduler dispatch.
# Created: 2026-03-26
#
# Covers:
# - SimulationClock: advance, elapsed, reset, wait_for_tick, snapshots
# - TickSnapshot: to_dict / from_dict round-trip
# - DependencyScheduler.run_tick_synchronized: tick-stepped dispatch with snapshots
# - MCTaskExecutor._build_task_prompt: simulation_tick metadata injection
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from pocketpaw.deep_work.clock import SimulationClock, TickSnapshot
from pocketpaw.deep_work.scheduler import DependencyScheduler
from pocketpaw.mission_control.models import Task, TaskPriority, TaskStatus
# ============================================================================
# SimulationClock
# ============================================================================
class TestSimulationClock:
async def test_initial_tick_is_zero(self):
clock = SimulationClock()
assert clock.current_tick == 0
assert clock.elapsed() == 0
async def test_advance_increments_tick(self):
clock = SimulationClock()
new_tick = await clock.advance()
assert new_tick == 1
assert clock.current_tick == 1
assert clock.elapsed() == 1
async def test_advance_multiple_times(self):
clock = SimulationClock()
for expected in range(1, 6):
tick = await clock.advance()
assert tick == expected
assert clock.current_tick == 5
async def test_reset_clears_tick_and_snapshots(self):
clock = SimulationClock()
await clock.advance()
await clock.advance()
clock.record_snapshot(TickSnapshot(tick=1, task_states={"t1": "done"}))
clock.reset()
assert clock.current_tick == 0
assert clock.get_snapshots() == []
async def test_record_and_get_snapshots(self):
clock = SimulationClock()
snap1 = TickSnapshot(tick=0, task_states={"t1": "inbox"})
snap2 = TickSnapshot(tick=1, task_states={"t1": "done"})
clock.record_snapshot(snap1)
clock.record_snapshot(snap2)
assert len(clock.get_snapshots()) == 2
assert clock.get_snapshots()[0].tick == 0
assert clock.get_snapshots()[1].tick == 1
async def test_get_snapshot_at(self):
clock = SimulationClock()
snap = TickSnapshot(tick=3, task_states={"t1": "done"})
clock.record_snapshot(snap)
assert clock.get_snapshot_at(3) is not None
assert clock.get_snapshot_at(3).tick == 3
assert clock.get_snapshot_at(99) is None
async def test_get_snapshots_returns_copy(self):
"""Mutating the returned list should not affect internal state."""
clock = SimulationClock()
clock.record_snapshot(TickSnapshot(tick=0))
snaps = clock.get_snapshots()
snaps.clear()
assert len(clock.get_snapshots()) == 1
# ============================================================================
# TickSnapshot
# ============================================================================
class TestTickSnapshot:
def test_to_dict(self):
snap = TickSnapshot(tick=5, task_states={"t1": "done"}, metadata={"key": "val"})
d = snap.to_dict()
assert d == {"tick": 5, "task_states": {"t1": "done"}, "metadata": {"key": "val"}}
def test_from_dict(self):
data = {"tick": 3, "task_states": {"t2": "inbox"}, "metadata": {"x": 1}}
snap = TickSnapshot.from_dict(data)
assert snap.tick == 3
assert snap.task_states == {"t2": "inbox"}
assert snap.metadata == {"x": 1}
def test_round_trip(self):
original = TickSnapshot(tick=7, task_states={"a": "done", "b": "inbox"}, metadata={"n": 2})
restored = TickSnapshot.from_dict(original.to_dict())
assert restored.tick == original.tick
assert restored.task_states == original.task_states
assert restored.metadata == original.metadata
def test_from_dict_defaults(self):
snap = TickSnapshot.from_dict({})
assert snap.tick == 0
assert snap.task_states == {}
assert snap.metadata == {}
# ============================================================================
# DependencyScheduler — tick-synchronized dispatch
# ============================================================================
def _make_task(
task_id: str,
status: TaskStatus = TaskStatus.INBOX,
project_id: str = "proj-1",
blocked_by: list[str] | None = None,
task_type: str = "agent",
assignee_ids: list[str] | None = None,
title: str = "",
) -> Task:
return Task(
id=task_id,
title=title or f"Task {task_id}",
status=status,
project_id=project_id,
blocked_by=blocked_by or [],
task_type=task_type,
assignee_ids=assignee_ids or [],
)
@pytest.fixture
def mock_manager():
manager = AsyncMock()
manager.list_tasks = AsyncMock(return_value=[])
manager.get_project_tasks = AsyncMock(return_value=[])
manager.get_task = AsyncMock(return_value=None)
manager.get_project = AsyncMock(return_value=None)
manager.update_project = AsyncMock()
manager.save_task = AsyncMock()
return manager
@pytest.fixture
def mock_executor():
executor = AsyncMock()
executor.execute_task_background = AsyncMock()
executor.is_task_running = MagicMock(return_value=False)
return executor
@pytest.fixture
def clock():
return SimulationClock()
class TestRunTickSynchronized:
async def test_raises_without_clock(self, mock_manager, mock_executor):
scheduler = DependencyScheduler(mock_manager, mock_executor)
with pytest.raises(RuntimeError, match="SimulationClock"):
await scheduler.run_tick_synchronized("proj-1")
async def test_empty_project_returns_no_snapshots(self, mock_manager, mock_executor, clock):
"""A project with no ready tasks yields no snapshots."""
mock_manager.get_project_tasks.return_value = []
scheduler = DependencyScheduler(mock_manager, mock_executor, clock=clock)
snapshots = await scheduler.run_tick_synchronized("proj-1")
assert snapshots == []
assert clock.current_tick == 0
async def test_single_tick_dispatches_all_ready_tasks(self, mock_manager, mock_executor, clock):
"""All unblocked tasks dispatch in one tick, then clock advances."""
t1 = _make_task("t1", assignee_ids=["a1"])
t2 = _make_task("t2", assignee_ids=["a2"])
call_count = 0
async def fake_get_project_tasks(pid):
nonlocal call_count
call_count += 1
if call_count <= 2:
# First call: get_ready_tasks returns t1+t2
# Second call: snapshot recording
return [t1, t2]
# After dispatch, tasks are done
t1_done = _make_task("t1", status=TaskStatus.DONE, assignee_ids=["a1"])
t2_done = _make_task("t2", status=TaskStatus.DONE, assignee_ids=["a2"])
return [t1_done, t2_done]
mock_manager.get_project_tasks.side_effect = fake_get_project_tasks
mock_manager.get_task.side_effect = lambda tid: {
"t1": _make_task("t1", status=TaskStatus.INBOX, assignee_ids=["a1"]),
"t2": _make_task("t2", status=TaskStatus.INBOX, assignee_ids=["a2"]),
}.get(tid)
from pocketpaw.deep_work.models import Project, ProjectStatus
mock_manager.get_project.return_value = Project(id="proj-1", status=ProjectStatus.EXECUTING)
scheduler = DependencyScheduler(mock_manager, mock_executor, clock=clock)
snapshots = await scheduler.run_tick_synchronized("proj-1")
# Tasks should have been dispatched
assert mock_executor.execute_task_background.call_count == 2
# simulation_tick metadata should be stamped
assert t1.metadata.get("simulation_tick") == 0
assert t2.metadata.get("simulation_tick") == 0
# Clock should have advanced at least once
assert clock.current_tick >= 1
assert len(snapshots) >= 1
async def test_tick_metadata_is_stamped_on_task(self, mock_manager, mock_executor, clock):
"""Tasks dispatched in tick-synchronized mode get simulation_tick in metadata."""
t1 = _make_task("t1", assignee_ids=["a1"])
first_call = True
async def fake_get_project_tasks(pid):
nonlocal first_call
if first_call:
first_call = False
return [t1]
return [_make_task("t1", status=TaskStatus.DONE, assignee_ids=["a1"])]
mock_manager.get_project_tasks.side_effect = fake_get_project_tasks
mock_manager.get_task.return_value = _make_task(
"t1", status=TaskStatus.INBOX, assignee_ids=["a1"]
)
mock_manager.get_project.return_value = MagicMock(status="executing", title="Test")
scheduler = DependencyScheduler(mock_manager, mock_executor, clock=clock)
await scheduler.run_tick_synchronized("proj-1")
# save_task should have been called with the tick stamped
mock_manager.save_task.assert_called()
saved_task = mock_manager.save_task.call_args[0][0]
assert saved_task.metadata["simulation_tick"] == 0
# ============================================================================
# MCTaskExecutor — simulation_tick in prompt
# ============================================================================
class TestSimulationTickInPrompt:
async def test_prompt_includes_simulation_tick(self):
"""When task metadata has simulation_tick, it appears in the prompt."""
from pocketpaw.mission_control.executor import MCTaskExecutor
executor = MCTaskExecutor()
# Use a task with no project_id to avoid project-context lookups
task = Task(
id="t1",
title="Research competitors",
priority=TaskPriority.MEDIUM,
metadata={"simulation_tick": 3},
)
agent = MagicMock()
agent.name = "Agent-1"
agent.role = "researcher"
agent.description = ""
agent.specialties = []
agent.backend = "claude_agent_sdk"
with patch("pocketpaw.mission_control.executor.get_mission_control_manager") as mock_mgr:
mock_mgr.return_value = AsyncMock()
prompt = await executor._build_task_prompt(task, agent)
assert "**Simulation Tick:** 3" in prompt
async def test_prompt_excludes_simulation_tick_when_absent(self):
"""When task metadata has no simulation_tick, prompt has no tick line."""
from pocketpaw.mission_control.executor import MCTaskExecutor
executor = MCTaskExecutor()
task = Task(
id="t1",
title="Research competitors",
priority=TaskPriority.MEDIUM,
)
agent = MagicMock()
agent.name = "Agent-1"
agent.role = "researcher"
agent.description = ""
agent.specialties = []
agent.backend = "claude_agent_sdk"
with patch("pocketpaw.mission_control.executor.get_mission_control_manager") as mock_mgr:
mock_mgr.return_value = AsyncMock()
prompt = await executor._build_task_prompt(task, agent)
assert "Simulation Tick" not in prompt