feat(pocket-specialist): adapter-pattern dispatch + agent-mode

The specialist's create flow now goes through a small adapter layer in
``ee/agent/pocket_specialist/adapters.py`` so the existing subagent
pipeline and a new agent-mode flow can sit behind one entry point. The
config knob is ``pocket_specialist_mode``; default ``subagent``
preserves today's behavior bit-for-bit.

What's new
----------

- ``pocket_specialist_mode: Literal["subagent","agent"]`` setting.
  ``subagent`` (default) = isolated backend running the specialist's
  own model; ``agent`` = two-call protocol where the calling chat agent
  is the spec drafter.

- ``adapters.py`` introduces:
  * ``SpecialistCreateAdapter`` Protocol — the single dispatch interface.
  * ``SubagentAdapter`` — thin wrapper around the renamed
    ``runtime._run_subagent_pipeline`` (the historical body). Tests
    confirm 1:1 delegation.
  * ``AgentModeAdapter`` — two-call protocol. First call (no
    ``input.spec``) returns ``{action:"draft_kit", draft_kit:{...}}``
    with the structural plan echoed back, a rippleSpec shape reminder,
    starter widget kinds, and instructions. Second call (with
    ``spec=<draft>``) skips the LLM and calls
    ``make_persist_pocket_tool`` directly for validate-and-persist.
    Reuses the same redraft-on-warnings semantics the subagent flow
    has, just driven by the chat agent instead of an LLM subprocess.
  * ``pick_adapter(mode)`` — dispatch with safe fallback to subagent
    on unknown modes (logs a warning).

- DTOs extended additively:
  * ``PocketSpecialistCreateInput.spec`` (optional dict) for the
    second-call payload.
  * ``PocketSpecialistCreateOutput.draft_kit`` (optional dict) and a
    new ``draft_kit`` literal in ``action``.

- Tool surfaces (mcp_tool, tool.py BaseTool, cli_tool) all forward
  ``spec`` through. In OpenAI Agents (strict-schema mode) the wire
  shape is a JSON-serialized string normalized by
  ``tool._normalize_spec`` — needed because strict mode rejects
  ``additionalProperties: True`` on object types. MCP and CLI keep the
  dict shape since they're lenient.

Tests
-----

- 14 new tests in ``tests/ee/agent/test_pocket_specialist/test_adapters.py``
  cover dispatch (pick_adapter routing + unknown-mode fallback),
  SubagentAdapter delegation, AgentModeAdapter draft-kit content,
  persist-via-tool happy path, redraft-on-warnings, persist-exception
  handling, ``target_pocket_id``→extended action, and the
  "no backend spawned in agent mode" invariant.

- 143 tests pass across the pocket-specialist suite plus
  tool_bridge_deep_agents, mcp_claude_sdk, tools_cli_cloud. No
  regressions from the strict-schema fix.

Backward compat
---------------

Default ``pocket_specialist_mode="subagent"`` keeps the old behavior
unchanged. New ``spec`` arg is optional everywhere. New ``draft_kit``
field defaults to None. Operators opt into agent mode by setting
``POCKETPAW_POCKET_SPECIALIST_MODE=agent``.
This commit is contained in:
prakashUXtech
2026-05-14 00:13:24 +05:30
parent 526ba0c108
commit fd00fe430e
8 changed files with 907 additions and 8 deletions

View File

@@ -363,11 +363,43 @@ pocket_specialist_max_validation_retries: int = Field(
"with remaining warnings."
),
)
pocket_specialist_mode: Literal["subagent", "agent"] = Field(
default="subagent",
description=(
"Which adapter handles create. ``subagent`` (default) spawns "
"an isolated backend running the specialist's own model — the "
"historical flow. ``agent`` uses a two-call protocol: first "
"call returns a draft kit; the calling chat agent drafts the "
"rippleSpec inline using its own LLM and calls back with "
"``spec=<draft>``. Agent mode ignores backend + model settings."
),
)
```
Env vars: `POCKETPAW_POCKET_SPECIALIST_BACKEND`,
`POCKETPAW_POCKET_SPECIALIST_MODEL`,
`POCKETPAW_POCKET_SPECIALIST_MAX_VALIDATION_RETRIES`.
`POCKETPAW_POCKET_SPECIALIST_MAX_VALIDATION_RETRIES`,
`POCKETPAW_POCKET_SPECIALIST_MODE`.
### Adapter dispatch (added 2026-05-14)
`run_specialist` is a thin dispatch shim — the real work lives in one of
two adapters in `ee/agent/pocket_specialist/adapters.py`:
| Adapter | Triggered by | What it does |
|---|---|---|
| `SubagentAdapter` | `pocket_specialist_mode="subagent"` (default) | Wraps the historical `_run_subagent_pipeline` — spawns an isolated backend with the specialist's own model, runs the agent loop, persists. |
| `AgentModeAdapter` | `pocket_specialist_mode="agent"` | Two-call protocol. First call (no `spec` arg) returns `{action: "draft_kit", draft_kit: {...}}` with the structural plan echoed back, a rippleSpec shape reminder, a starter list of widget kinds, and instructions. The chat agent drafts the spec using its own LLM and calls again with `spec=<draft>`. The second call skips the LLM and goes straight to `make_persist_pocket_tool` for validate-and-persist. |
`pick_adapter(mode)` is the dispatch function; unknown modes fall back to
`SubagentAdapter` with a warning log. Adding a new mode (e.g., remote
spec service): implement `SpecialistCreateAdapter` and wire a branch
into `pick_adapter`.
The MCP/CLI/BaseTool surfaces all pass `spec` through transparently —
in OpenAI Agents (strict-schema mode) the wire shape is a
JSON-serialized string normalized by `tool._normalize_spec`; in MCP and
CLI the wire shape is a dict.
## Specialist-internal tools

View File

@@ -0,0 +1,338 @@
# ee/agent/pocket_specialist/adapters.py
# Created: 2026-05-14 — split the ``pocket_specialist__create`` dispatch
# into two mode-specific adapters. Bumps the historical subagent flow
# into ``SubagentAdapter`` and introduces ``AgentModeAdapter`` for the
# new two-call protocol where the calling chat agent drafts the
# rippleSpec inline using its own LLM and the specialist only runs
# validate-and-persist on the returned draft.
"""Mode-specific adapters for the pocket specialist's create endpoint.
The MCP tool handler (``mcp_tool._create_handler``) doesn't know — and
shouldn't care — whether the specialist is spawning a subagent or
piggybacking on the chat agent. It calls one of these adapters via
``pick_adapter(settings.pocket_specialist_mode)`` and gets a uniform
``PocketSpecialistCreateOutput`` back.
Adding a new mode (e.g., ``remote`` calling a hosted spec service):
implement the ``SpecialistCreateAdapter`` protocol and wire a branch
into ``pick_adapter`` at the bottom of this file.
"""
from __future__ import annotations
import logging
import time
from typing import Any, Protocol
from pocketpaw.config import Settings
logger = logging.getLogger(__name__)
# A small, hand-curated starter list of widget kinds the chat agent can
# reach for in agent-mode drafts. NOT exhaustive — the manifest is the
# source of truth and the chat agent should use the
# ``mcp__pocketpaw_pocket__get_widget_spec`` tool to look up props for
# any kind it wants to use. Listing these here keeps the kit response
# small while still giving the chat agent a productive starting set.
_STARTER_WIDGET_KINDS: tuple[str, ...] = (
"flex",
"grid",
"stat",
"chart",
"table",
"text",
"button",
"badge",
"progress",
"kanban",
)
class SpecialistCreateAdapter(Protocol):
"""Dispatch interface for ``pocket_specialist__create`` request shapes.
Implementations decide HOW the rippleSpec gets drafted (subagent,
chat-agent inline, remote service, …). They all return the same
``PocketSpecialistCreateOutput`` shape so the MCP tool handler and
the chat agent don't branch on mode."""
async def create(
self,
input: Any,
*,
workspace_id: str,
user_id: str,
settings: Settings,
) -> Any:
...
class SubagentAdapter:
"""Spawn an isolated backend that runs the specialist's own LLM.
Wraps the historical flow in ``runtime._run_subagent_pipeline``.
The runtime keeps that function as the implementation — this
adapter is the dispatch shim. Importing inside ``create`` avoids
a circular import between ``adapters`` and ``runtime``.
"""
async def create(
self,
input: Any,
*,
workspace_id: str,
user_id: str,
settings: Settings,
) -> Any:
from ee.agent.pocket_specialist.runtime import _run_subagent_pipeline
return await _run_subagent_pipeline(
input,
workspace_id=workspace_id,
user_id=user_id,
settings=settings,
)
class AgentModeAdapter:
"""Two-call protocol — the calling chat agent IS the specialist.
First call (``input.spec is None``): return a draft kit (structural
plan echo + rippleSpec shape reminder + widget hint list + next-
step instructions). The chat agent then drafts the rippleSpec in
its own context using its own model.
Second call (``input.spec`` populated): skip the LLM draft phase
and go straight to validate-and-persist using the same
``make_persist_pocket_tool`` the subagent flow uses internally.
No backend is spawned in either call. ``pocket_specialist_backend``
and ``pocket_specialist_model`` are ignored entirely; the chat
agent's already-running model carries the spec-drafting cost.
"""
async def create(
self,
input: Any,
*,
workspace_id: str,
user_id: str,
settings: Settings,
) -> Any:
from ee.agent.pocket_specialist.runtime import PocketSpecialistCreateOutput
started = time.monotonic()
if input.spec is None:
return _draft_kit_response(input, started=started)
return await _validate_and_persist(
input,
workspace_id=workspace_id,
user_id=user_id,
settings=settings,
started=started,
)
# ---------------------------------------------------------------------------
# Agent-mode internals
# ---------------------------------------------------------------------------
def _draft_kit_response(input: Any, *, started: float) -> Any:
"""Build the first-call response: enough scaffolding for the chat
agent to draft a rippleSpec inline, without copying the full ~12k-
token specialist prompt into the chat agent's context.
The chat agent already has ``mcp__pocketpaw_pocket__get_widget_spec``
available — the kit tells it to use that for widget props on
demand rather than inlining the manifest here.
"""
from ee.agent.pocket_specialist.runtime import PocketSpecialistCreateOutput
hints_dict: dict[str, Any] = (
input.hints.model_dump(exclude_none=True) if input.hints else {}
)
kit: dict[str, Any] = {
"structural_plan": hints_dict,
"ripple_spec_shape": (
"A rippleSpec is a JSON tree: the root is typically a "
"``{type: 'flex', props: {direction, gap, padding}, children: [...]}`` "
"or a ``{type: 'grid', props: {columns, gap}, children: [...]}``. "
"Every node has ``type`` (the widget kind) and ``props`` (a flat "
"dict of allowed props for that kind). Containers add a "
"``children`` array of nested nodes. Mock data for stat/chart/"
"table widgets goes directly in props (e.g., ``chart.data`` is a "
"``[{label, value}]`` list)."
),
"starter_widget_kinds": list(_STARTER_WIDGET_KINDS),
"next_step": (
"Draft a rippleSpec for the structural plan above. Use your own "
"model — no subagent will be spawned. When ready, call "
"``pocket_specialist__create`` again with the same brief AND "
"``spec=<your drafted ripple spec>``. The tool will validate "
"against the widget manifest and persist the pocket. If "
"validation returns warnings, the response carries them and you "
"can call again with a corrected spec."
),
"lookup_tool": (
"Use ``mcp__pocketpaw_pocket__get_widget_spec`` to fetch allowed "
"props for any widget kind before drafting. Use "
"``mcp__pocketpaw_pocket__list_pockets`` to see existing pockets "
"in the workspace."
),
}
duration_ms = int((time.monotonic() - started) * 1000)
logger.info(
"[pocket-specialist] agent-mode draft kit returned (hints_keys=%s "
"starter_kinds=%d duration=%dms)",
sorted(hints_dict.keys()),
len(_STARTER_WIDGET_KINDS),
duration_ms,
)
return PocketSpecialistCreateOutput(
ok=False,
action="draft_kit",
pocket=None,
warnings=[],
error=None,
duration_ms=duration_ms,
backend_used="agent_mode",
draft_kit=kit,
)
async def _validate_and_persist(
input: Any,
*,
workspace_id: str,
user_id: str,
settings: Settings,
started: float,
) -> Any:
"""Second-call path: run the spec through the same persist tool the
subagent uses internally. No LLM in this step — the chat agent
already did the drafting.
Reuses ``make_persist_pocket_tool`` so the validation rules, the
redraft-on-warnings semantics, and the side-channel capture dict
behave exactly like the subagent flow does. On validation warnings
the chat agent gets the warnings back and can call once more with
a corrected spec — mirroring the subagent's internal retry loop.
"""
from ee.agent.pocket_specialist.runtime import PocketSpecialistCreateOutput
from ee.agent.pocket_specialist.tools import make_persist_pocket_tool
persist_capture: dict[str, Any] = {}
tool = make_persist_pocket_tool(
workspace_id=workspace_id,
user_id=user_id,
capture=persist_capture,
max_validation_retries=settings.pocket_specialist_max_validation_retries,
)
hints = input.hints
tool_args: dict[str, Any] = {
"ripple_spec": input.spec,
"name": getattr(hints, "name", None),
"description": getattr(hints, "description", None),
"icon": getattr(hints, "icon", None),
"color": getattr(hints, "color", None),
"target_pocket_id": getattr(hints, "target_pocket_id", None),
}
try:
await tool.ainvoke(tool_args)
except Exception as exc: # noqa: BLE001
duration_ms = int((time.monotonic() - started) * 1000)
logger.warning(
"[pocket-specialist] agent-mode persist raised "
"(workspace=%s duration=%dms): %s",
workspace_id,
duration_ms,
exc,
)
return PocketSpecialistCreateOutput(
ok=False,
action="failed",
pocket=None,
warnings=list(persist_capture.get("warnings", [])),
error=f"persist failed: {exc}",
duration_ms=duration_ms,
backend_used="agent_mode",
)
captured_pocket: dict[str, Any] | None = persist_capture.get("pocket")
captured_warnings: list[str] = list(persist_capture.get("warnings", []))
duration_ms = int((time.monotonic() - started) * 1000)
if captured_pocket is None:
# ``make_persist_pocket_tool`` short-circuits without saving when
# the manifest validator returns warnings and the retry budget
# is unspent. The chat agent should redraft and call again.
logger.info(
"[pocket-specialist] agent-mode redraft required "
"(warnings=%d duration=%dms)",
len(captured_warnings),
duration_ms,
)
return PocketSpecialistCreateOutput(
ok=False,
action="failed",
pocket=None,
warnings=captured_warnings,
error=(
"Spec validation produced warnings — redraft required. "
"Address each warning and call pocket_specialist__create "
"again with the corrected spec."
),
duration_ms=duration_ms,
backend_used="agent_mode",
)
action: str = "extended" if hints and hints.target_pocket_id else "created"
logger.info(
"[pocket-specialist] agent-mode complete: pocket_id=%s action=%s "
"duration=%dms warnings=%d",
captured_pocket.get("id", ""),
action,
duration_ms,
len(captured_warnings),
)
return PocketSpecialistCreateOutput(
ok=True,
action=action, # type: ignore[arg-type]
pocket=captured_pocket,
warnings=captured_warnings,
duration_ms=duration_ms,
backend_used="agent_mode",
)
def pick_adapter(mode: str) -> SpecialistCreateAdapter:
"""Pick the create adapter for the configured specialist mode.
Unknown modes fall through to the historical subagent adapter so a
stale config never bricks a deployed instance — the operator sees
the warning in logs and can correct the value.
"""
if mode == "agent":
return AgentModeAdapter()
if mode != "subagent":
logger.warning(
"Unknown pocket_specialist_mode=%r — falling back to subagent. "
"Valid values: 'subagent', 'agent'.",
mode,
)
return SubagentAdapter()
__all__ = [
"AgentModeAdapter",
"SpecialistCreateAdapter",
"SubagentAdapter",
"pick_adapter",
]

View File

@@ -50,6 +50,7 @@ async def _cloud_pocket_specialist_create(args: dict[str, Any]) -> dict[str, Any
"""
brief = args.get("brief", "")
raw_hints = args.get("hints")
raw_spec = args.get("spec")
workspace_id = (
current_workspace_id()
@@ -71,7 +72,11 @@ async def _cloud_pocket_specialist_create(args: dict[str, Any]) -> dict[str, Any
hints = PocketSpecialistHints(**raw_hints) if raw_hints else None
try:
payload = PocketSpecialistCreateInput(brief=brief, hints=hints)
payload = PocketSpecialistCreateInput(
brief=brief,
hints=hints,
spec=raw_spec if isinstance(raw_spec, dict) else None,
)
except Exception as exc: # noqa: BLE001 — pydantic ValidationError surfaces here
return {"ok": False, "error": f"invalid input: {exc}"}

View File

@@ -65,7 +65,12 @@ async def _create_handler(args: dict[str, Any]) -> dict[str, Any]:
raw_hints = args.get("hints")
hints = PocketSpecialistHints(**raw_hints) if raw_hints else None
payload = PocketSpecialistCreateInput(brief=args.get("brief", ""), hints=hints)
raw_spec = args.get("spec")
payload = PocketSpecialistCreateInput(
brief=args.get("brief", ""),
hints=hints,
spec=raw_spec if isinstance(raw_spec, dict) else None,
)
try:
out = await run_specialist(
@@ -242,6 +247,19 @@ def build_pocket_specialist_server() -> Any:
},
"additionalProperties": False,
},
"spec": {
"type": "object",
"description": (
"Agent-mode second call: a pre-drafted rippleSpec "
"from the chat agent. The specialist validates it "
"against the widget manifest and persists. Omit on "
"the first call (in agent mode you'll get back "
"``action='draft_kit'`` with instructions). In "
"subagent mode this argument is ignored — the "
"spawned specialist drafts its own spec."
),
"additionalProperties": True,
},
},
"required": ["brief"],
"additionalProperties": False,

View File

@@ -99,16 +99,33 @@ class PocketSpecialistHints(BaseModel):
class PocketSpecialistCreateInput(BaseModel):
brief: str = Field(..., min_length=10, max_length=4000)
hints: PocketSpecialistHints | None = None
spec: dict[str, Any] | None = Field(
default=None,
description=(
"Pre-drafted rippleSpec for agent-mode's second call. When set, "
"the specialist skips its own LLM draft phase and goes straight "
"to validate-and-persist. Ignored in subagent mode."
),
)
class PocketSpecialistCreateOutput(BaseModel):
ok: bool
action: Literal["created", "extended", "failed"]
action: Literal["created", "extended", "failed", "draft_kit"]
pocket: dict[str, Any] | None = None
warnings: list[str] = Field(default_factory=list)
error: str | None = None
duration_ms: int
backend_used: str
draft_kit: dict[str, Any] | None = Field(
default=None,
description=(
"Agent-mode first-call payload: design rules digest, structural "
"plan echo, available widget list, and instructions for the "
"calling chat agent to draft a rippleSpec and call back with "
"``spec=<draft>``. None in subagent mode."
),
)
async def run_specialist(
@@ -118,13 +135,44 @@ async def run_specialist(
user_id: str,
settings: Settings,
) -> PocketSpecialistCreateOutput:
"""Run the pocket specialist end-to-end.
"""Entry point — pick the adapter for ``settings.pocket_specialist_mode``
and delegate.
Two adapters live in ``adapters.py``. The default ``subagent`` mode
runs the historical pipeline below (an isolated backend with the
specialist's own model). The ``agent`` mode short-circuits the
backend spawn and hands a draft kit back to the calling chat agent
so it can draft the rippleSpec inline using its own LLM.
Signature is the public contract — call sites in ``mcp_tool``,
``cli_tool``, and ``tool`` rely on it being adapter-agnostic.
"""
from ee.agent.pocket_specialist.adapters import pick_adapter
adapter = pick_adapter(settings.pocket_specialist_mode)
return await adapter.create(
input, workspace_id=workspace_id, user_id=user_id, settings=settings
)
async def _run_subagent_pipeline(
input: PocketSpecialistCreateInput,
*,
workspace_id: str,
user_id: str,
settings: Settings,
) -> PocketSpecialistCreateOutput:
"""Subagent-mode pipeline (historical flow).
Builds an isolated backend, attaches the three internal tools, runs the
agent loop, captures the persist_pocket result, and emits status events
along the way. Always returns a persisted pocket - the safety-net
fallback (Task 8) covers the rare case where the LLM finishes without
calling persist_pocket.
Invoked by ``SubagentAdapter.create`` — kept private so the only
entry point remains ``run_specialist`` (which dispatches via
``pick_adapter``).
"""
started = time.monotonic()
backend_name = settings.pocket_specialist_backend

View File

@@ -50,6 +50,16 @@ class PocketSpecialistHintsModel(BaseModel):
class PocketSpecialistArgs(BaseModel):
brief: str = Field(..., min_length=10, max_length=4000)
hints: PocketSpecialistHintsModel | None = None
spec: str | dict[str, Any] | None = Field(
default=None,
description=(
"Agent-mode second-call argument: a pre-drafted rippleSpec the "
"calling chat agent produced after receiving the draft kit. "
"Accepted as a dict (MCP path) or a JSON-serialized string "
"(OpenAI Agents / strict-schema path) — the handler normalizes "
"before delegating. Ignored in subagent mode."
),
)
_PARAMS_JSON_SCHEMA: dict[str, Any] = {
@@ -78,6 +88,18 @@ _PARAMS_JSON_SCHEMA: dict[str, Any] = {
"Optional caller-supplied overrides for fields the user named explicitly."
),
},
"spec": {
"type": "string",
"description": (
"Agent-mode second-call: a JSON-serialized rippleSpec the "
"chat agent drafted after receiving the draft kit on the "
"first call. The specialist parses it, validates against "
"the widget manifest, and persists. Pass as a string (e.g., "
"``json.dumps(spec)``) so the schema stays strict-mode-"
"compatible across all backends. Omit on the first call in "
"agent mode and on every call in subagent mode."
),
},
},
"required": ["brief"],
"additionalProperties": False,
@@ -106,8 +128,9 @@ class PocketSpecialistTool(BaseTool):
async def execute(self, **params: Any) -> str:
brief = params.get("brief", "")
hints = params.get("hints")
spec = _normalize_spec(params.get("spec"))
normalized = _normalize_hints(hints)
return await _run_handler(brief, normalized)
return await _run_handler(brief, normalized, spec=spec)
def _normalize_hints(hints: Any) -> dict[str, Any] | None:
@@ -139,13 +162,47 @@ def _normalize_hints(hints: Any) -> dict[str, Any] | None:
return None
async def _run_handler(brief: str, hints: dict[str, Any] | None) -> str:
def _normalize_spec(spec: Any) -> dict[str, Any] | None:
"""Accept ``spec`` as dict, JSON-serialized string, or None.
Strict-schema-mode backends (OpenAI Agents) require ``spec`` to be a
string in the tool schema; the lenient MCP path passes it as a dict.
Both wire shapes funnel through this helper so ``_run_handler``
always sees a dict-or-None.
"""
if spec is None:
return None
if isinstance(spec, dict):
return spec
if isinstance(spec, str):
text = spec.strip()
if not text:
return None
try:
parsed = json.loads(text)
except (json.JSONDecodeError, TypeError):
logger.warning("pocket_specialist: dropped unparseable spec string")
return None
return parsed if isinstance(parsed, dict) else None
logger.warning("pocket_specialist: dropped spec of unsupported type %s", type(spec).__name__)
return None
async def _run_handler(
brief: str,
hints: dict[str, Any] | None,
*,
spec: dict[str, Any] | None = None,
) -> str:
"""Dispatch to ``run_specialist`` and serialize the result as JSON.
Reads workspace_id / user_id from the per-stream ContextVars. Returns
an ``{"ok": False, "error": ...}`` envelope (JSON-encoded) when
identity is missing or the run raises — the calling agent surfaces the
string back to the user.
``spec`` carries the agent-mode second-call payload (a pre-drafted
rippleSpec). Forwarded as-is; subagent mode ignores it.
"""
from ee.agent.pocket_specialist.runtime import (
PocketSpecialistCreateInput,
@@ -170,7 +227,7 @@ async def _run_handler(brief: str, hints: dict[str, Any] | None) -> str:
parsed_hints = PocketSpecialistHints(**hints) if hints else None
try:
payload = PocketSpecialistCreateInput(brief=brief, hints=parsed_hints)
payload = PocketSpecialistCreateInput(brief=brief, hints=parsed_hints, spec=spec)
except Exception as exc: # pydantic ValidationError lands here
return json.dumps({"ok": False, "error": f"invalid input: {exc}"})

View File

@@ -342,6 +342,20 @@ class Settings(BaseSettings):
"remaining warnings. Specialist always persists; this only bounds revision."
),
)
pocket_specialist_mode: Literal["subagent", "agent"] = Field(
default="subagent",
description=(
"Which adapter handles ``pocket_specialist__create`` calls. "
"``subagent`` (default) spawns an isolated backend running the "
"specialist's own model — the historical flow. ``agent`` uses a "
"two-call protocol: the first call returns a draft kit (design "
"rules digest + structural plan + widget list); the chat agent "
"drafts the rippleSpec inline using its own model and calls back "
"with ``spec=<draft>`` for validate-and-persist. ``agent`` mode "
"ignores ``pocket_specialist_backend`` and ``pocket_specialist_model`` "
"entirely — the chat agent's runtime is the LLM."
),
)
deep_agents_skills: list[str] = Field(
default_factory=list,
description=(

View File

@@ -0,0 +1,387 @@
# tests/ee/agent/test_pocket_specialist/test_adapters.py
# Created: 2026-05-14 (feat/pocket-specialist-agent-mode) — covers the new
# mode-dispatch layer in ee/agent/pocket_specialist/adapters.py.
# SubagentAdapter wraps the historical pipeline; AgentModeAdapter
# implements the two-call protocol (draft kit on first call, validate-
# and-persist on second). pick_adapter() routes by setting.
"""Tests for ``ee.agent.pocket_specialist.adapters``.
Adapter behavior is mocked end-to-end so we don't need a real backend
or a real Mongo: ``SubagentAdapter`` is tested by patching
``_run_subagent_pipeline`` and asserting delegation; ``AgentModeAdapter``
is tested against a patched ``make_persist_pocket_tool`` factory that
mimics the capture-dict mutation the real tool performs.
"""
from __future__ import annotations
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from ee.agent.pocket_specialist.adapters import (
AgentModeAdapter,
SubagentAdapter,
pick_adapter,
)
from ee.agent.pocket_specialist.runtime import (
PocketSpecialistCreateInput,
PocketSpecialistCreateOutput,
PocketSpecialistHints,
)
from pocketpaw.config import Settings
_LEAKY_ENV_KEYS = (
"POCKETPAW_POCKET_SPECIALIST_BACKEND",
"POCKETPAW_POCKET_SPECIALIST_MODEL",
"POCKETPAW_POCKET_SPECIALIST_MAX_VALIDATION_RETRIES",
"POCKETPAW_POCKET_SPECIALIST_MODE",
"POCKETPAW_DEEP_AGENTS_MODEL",
"POCKETPAW_CLAUDE_SDK_MODEL",
"POCKETPAW_LANGCHAIN_REACT_MODEL",
)
@pytest.fixture(autouse=True)
def _isolate_env(monkeypatch):
"""Strip every env var that might smuggle a real operator override
into these tests."""
for key in _LEAKY_ENV_KEYS:
monkeypatch.delenv(key, raising=False)
@pytest.fixture
def settings() -> Settings:
return Settings(_env_file=None)
@pytest.fixture
def agent_settings() -> Settings:
return Settings(_env_file=None, pocket_specialist_mode="agent")
def _persist_factory_stub(pocket: dict[str, Any] | None, warnings: list[str] | None = None):
"""Mirror tests/test_runtime.py's _persist_factory_stub: the real
tool factory captures the persisted pocket into the supplied dict.
Here we let the test pass a None pocket to simulate the validation-
redraft branch (no save)."""
def _stub(
*,
workspace_id: str,
user_id: str,
capture: dict[str, Any] | None = None,
max_validation_retries: int = 3,
):
async def _ainvoke(args: dict[str, Any]) -> dict[str, Any]:
if capture is not None:
if warnings:
capture["warnings"] = list(warnings)
if pocket is not None:
capture["pocket"] = pocket
return {"ok": pocket is not None}
tool = MagicMock()
tool.ainvoke = AsyncMock(side_effect=_ainvoke)
return tool
return _stub
# ---------------------------------------------------------------------------
# pick_adapter
# ---------------------------------------------------------------------------
class TestPickAdapter:
def test_subagent_mode_returns_subagent_adapter(self) -> None:
assert isinstance(pick_adapter("subagent"), SubagentAdapter)
def test_agent_mode_returns_agent_mode_adapter(self) -> None:
assert isinstance(pick_adapter("agent"), AgentModeAdapter)
def test_unknown_mode_falls_back_to_subagent(self, caplog) -> None:
"""A stale config value shouldn't brick a deployment — log + degrade."""
with caplog.at_level("WARNING"):
adapter = pick_adapter("not-a-real-mode")
assert isinstance(adapter, SubagentAdapter)
assert any("not-a-real-mode" in rec.message for rec in caplog.records)
# ---------------------------------------------------------------------------
# SubagentAdapter
# ---------------------------------------------------------------------------
class TestSubagentAdapter:
@pytest.mark.asyncio
async def test_delegates_to_run_subagent_pipeline(self, settings) -> None:
expected = PocketSpecialistCreateOutput(
ok=True,
action="created",
pocket={"id": "p1"},
duration_ms=10,
backend_used="deep_agents",
)
with patch(
"ee.agent.pocket_specialist.runtime._run_subagent_pipeline",
new=AsyncMock(return_value=expected),
) as mock_pipeline:
adapter = SubagentAdapter()
payload = PocketSpecialistCreateInput(brief="brief enough to clear minlen")
out = await adapter.create(
payload, workspace_id="w1", user_id="u1", settings=settings
)
assert out is expected
mock_pipeline.assert_awaited_once_with(
payload, workspace_id="w1", user_id="u1", settings=settings
)
# ---------------------------------------------------------------------------
# AgentModeAdapter
# ---------------------------------------------------------------------------
class TestAgentModeAdapterDraftKit:
@pytest.mark.asyncio
async def test_first_call_no_spec_returns_draft_kit(self, agent_settings) -> None:
adapter = AgentModeAdapter()
hints = PocketSpecialistHints(
name="Cat Tracker",
color="#F59E0B",
purpose="Track cat moods",
focal_widget="stat",
)
payload = PocketSpecialistCreateInput(
brief="A whimsical pocket for tracking cat moods", hints=hints
)
out = await adapter.create(
payload, workspace_id="w1", user_id="u1", settings=agent_settings
)
assert out.ok is False
assert out.action == "draft_kit"
assert out.pocket is None
assert out.backend_used == "agent_mode"
assert out.draft_kit is not None
# Kit echoes the structural plan from hints so the chat agent
# has it in fresh context, not just system-prompt-buried.
plan = out.draft_kit["structural_plan"]
assert plan["name"] == "Cat Tracker"
assert plan["focal_widget"] == "stat"
# Kit includes shape reminder + widget kinds + next step instructions.
assert "ripple_spec_shape" in out.draft_kit
assert "starter_widget_kinds" in out.draft_kit
assert "spec=" in out.draft_kit["next_step"]
@pytest.mark.asyncio
async def test_first_call_no_hints_returns_empty_plan(self, agent_settings) -> None:
"""No hints → empty structural plan, but the rest of the kit is intact."""
adapter = AgentModeAdapter()
payload = PocketSpecialistCreateInput(brief="brief enough to clear minlen")
out = await adapter.create(
payload, workspace_id="w1", user_id="u1", settings=agent_settings
)
assert out.action == "draft_kit"
assert out.draft_kit is not None
assert out.draft_kit["structural_plan"] == {}
assert isinstance(out.draft_kit["starter_widget_kinds"], list)
assert len(out.draft_kit["starter_widget_kinds"]) > 0
@pytest.mark.asyncio
async def test_no_backend_spawned_on_draft_kit(self, agent_settings) -> None:
"""Agent mode must not spin up an isolated backend for the kit
— the chat agent's own model is the one drafting."""
with patch(
"pocketpaw.agents.router.AgentRouter.create_isolated_backend"
) as mock_backend:
adapter = AgentModeAdapter()
payload = PocketSpecialistCreateInput(brief="brief enough to clear minlen")
await adapter.create(
payload, workspace_id="w1", user_id="u1", settings=agent_settings
)
mock_backend.assert_not_called()
class TestAgentModeAdapterPersist:
@pytest.mark.asyncio
async def test_second_call_with_spec_validates_and_persists(
self, agent_settings
) -> None:
"""The second call (input.spec set) goes through the same
persist tool the subagent flow uses — no LLM, no backend."""
persisted = {"id": "p-new", "name": "Cats"}
with patch(
"ee.agent.pocket_specialist.tools.make_persist_pocket_tool",
new=MagicMock(side_effect=_persist_factory_stub(persisted)),
) as mock_factory:
adapter = AgentModeAdapter()
payload = PocketSpecialistCreateInput(
brief="brief enough to clear minlen",
hints=PocketSpecialistHints(name="Cats", color="#F59E0B"),
spec={"type": "flex", "children": []},
)
out = await adapter.create(
payload, workspace_id="w1", user_id="u1", settings=agent_settings
)
assert out.ok is True
assert out.action == "created"
assert out.pocket == persisted
assert out.backend_used == "agent_mode"
assert out.draft_kit is None
mock_factory.assert_called_once()
@pytest.mark.asyncio
async def test_second_call_with_target_pocket_id_marks_extended(
self, agent_settings
) -> None:
persisted = {"id": "p-existing", "name": "Cats"}
with patch(
"ee.agent.pocket_specialist.tools.make_persist_pocket_tool",
new=MagicMock(side_effect=_persist_factory_stub(persisted)),
):
adapter = AgentModeAdapter()
payload = PocketSpecialistCreateInput(
brief="brief enough to clear minlen",
hints=PocketSpecialistHints(target_pocket_id="p-existing"),
spec={"type": "flex", "children": []},
)
out = await adapter.create(
payload, workspace_id="w1", user_id="u1", settings=agent_settings
)
assert out.action == "extended"
@pytest.mark.asyncio
async def test_validation_warnings_surface_for_redraft(
self, agent_settings
) -> None:
"""When the persist tool refuses to save (warnings present, retry
budget unspent), the adapter returns the warnings + action=failed
so the chat agent can redraft and call again."""
with patch(
"ee.agent.pocket_specialist.tools.make_persist_pocket_tool",
new=MagicMock(
side_effect=_persist_factory_stub(
None, warnings=["chart.xKey is not a valid prop"]
)
),
):
adapter = AgentModeAdapter()
payload = PocketSpecialistCreateInput(
brief="brief enough to clear minlen",
spec={"type": "chart", "props": {"xKey": "nope"}},
)
out = await adapter.create(
payload, workspace_id="w1", user_id="u1", settings=agent_settings
)
assert out.ok is False
assert out.action == "failed"
assert out.pocket is None
assert "chart.xKey" in out.warnings[0]
assert "redraft" in (out.error or "").lower()
@pytest.mark.asyncio
async def test_persist_exception_returns_failed(self, agent_settings) -> None:
"""If the persist tool raises (transport error, etc.) the
adapter surfaces the exception in the error field rather than
bubbling up to the chat agent as an unhandled traceback."""
def _exploding_factory(**kwargs):
tool = MagicMock()
tool.ainvoke = AsyncMock(side_effect=RuntimeError("mongo down"))
return tool
with patch(
"ee.agent.pocket_specialist.tools.make_persist_pocket_tool",
new=MagicMock(side_effect=_exploding_factory),
):
adapter = AgentModeAdapter()
payload = PocketSpecialistCreateInput(
brief="brief enough to clear minlen",
spec={"type": "flex"},
)
out = await adapter.create(
payload, workspace_id="w1", user_id="u1", settings=agent_settings
)
assert out.ok is False
assert out.action == "failed"
assert "mongo down" in (out.error or "")
@pytest.mark.asyncio
async def test_no_backend_spawned_on_persist(self, agent_settings) -> None:
with patch(
"pocketpaw.agents.router.AgentRouter.create_isolated_backend"
) as mock_backend, patch(
"ee.agent.pocket_specialist.tools.make_persist_pocket_tool",
new=MagicMock(side_effect=_persist_factory_stub({"id": "p1"})),
):
adapter = AgentModeAdapter()
payload = PocketSpecialistCreateInput(
brief="brief enough to clear minlen",
spec={"type": "flex"},
)
await adapter.create(
payload, workspace_id="w1", user_id="u1", settings=agent_settings
)
mock_backend.assert_not_called()
# ---------------------------------------------------------------------------
# Dispatch integration: run_specialist picks the right adapter
# ---------------------------------------------------------------------------
class TestRunSpecialistDispatch:
@pytest.mark.asyncio
async def test_run_specialist_uses_subagent_adapter_by_default(self) -> None:
"""The public entry point honors settings.pocket_specialist_mode."""
from ee.agent.pocket_specialist.runtime import run_specialist
s = Settings(_env_file=None) # default mode = subagent
sentinel = PocketSpecialistCreateOutput(
ok=True,
action="created",
pocket={"id": "p1"},
duration_ms=5,
backend_used="deep_agents",
)
with patch(
"ee.agent.pocket_specialist.runtime._run_subagent_pipeline",
new=AsyncMock(return_value=sentinel),
) as mock_pipeline:
out = await run_specialist(
PocketSpecialistCreateInput(brief="brief enough to clear minlen"),
workspace_id="w1",
user_id="u1",
settings=s,
)
assert out is sentinel
mock_pipeline.assert_awaited_once()
@pytest.mark.asyncio
async def test_run_specialist_uses_agent_mode_adapter_when_set(self) -> None:
from ee.agent.pocket_specialist.runtime import run_specialist
s = Settings(_env_file=None, pocket_specialist_mode="agent")
with patch(
"ee.agent.pocket_specialist.runtime._run_subagent_pipeline",
new=AsyncMock(),
) as mock_pipeline:
out = await run_specialist(
PocketSpecialistCreateInput(brief="brief enough to clear minlen"),
workspace_id="w1",
user_id="u1",
settings=s,
)
# Subagent pipeline is bypassed entirely in agent mode.
mock_pipeline.assert_not_called()
assert out.action == "draft_kit"
assert out.backend_used == "agent_mode"