fix: expose active-run queue failure reasons

This commit is contained in:
Peter Steinberger
2026-05-10 13:04:23 +01:00
parent c3f817e0e0
commit 1ed50b0ced
21 changed files with 465 additions and 144 deletions

View File

@@ -25,6 +25,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Gateway/agents: keep structured reasons when active-run queueing fails and deprecate the legacy boolean queue helper, so steering and subagent wake diagnostics distinguish completed, non-streaming, and compacting runs. Fixes #80156. Thanks @markus-lassfolk.
- Agents/UI: compact exec and tool progress rows by hiding redundant shell tool names, replacing known workspace paths with short context markers, and preserving Discord trace scrubbing for compact command lines.
- ACPX: run and await the embedded ACP backend startup probe by default so the gateway `ready` signal no longer fires before the acpx runtime has either become usable or reported a probe failure; set `OPENCLAW_ACPX_RUNTIME_STARTUP_PROBE=0` to restore lazy startup. Fixes #79596. Thanks @bzelones.
- OpenAI-compatible models: strip prior assistant reasoning fields from replayed Chat Completions history by default, preventing oMLX/vLLM Qwen follow-up turns from rejecting or stalling on stale `reasoning` payloads. Fixes #46637. Thanks @zipzagster and @lexhoefsloot.

View File

@@ -7,7 +7,6 @@ import {
embeddedAgentLog,
nativeHookRelayTesting,
onAgentEvent,
queueAgentHarnessMessage,
resetAgentEventsForTest,
type AgentEventPayload,
type EmbeddedRunAttemptParams,
@@ -18,6 +17,13 @@ import {
} from "openclaw/plugin-sdk/hook-runtime";
import { createMockPluginRegistry } from "openclaw/plugin-sdk/plugin-test-runtime";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { queueEmbeddedPiMessageWithOutcome } from "../../../../src/agents/pi-embedded-runner/runs.js";
function queueActiveRunMessageForTest(
...args: Parameters<typeof queueEmbeddedPiMessageWithOutcome>
): boolean {
return queueEmbeddedPiMessageWithOutcome(...args).queued;
}
import { CODEX_GPT5_BEHAVIOR_CONTRACT } from "../../prompt-overlay.js";
import {
buildCodexAppInventoryCacheKey,
@@ -1177,7 +1183,7 @@ describe("runCodexAppServerAttempt", () => {
}),
{ interval: 1 },
);
expect(queueAgentHarnessMessage("session-1", "after timeout")).toBe(false);
expect(queueActiveRunMessageForTest("session-1", "after timeout")).toBe(false);
});
it("does not count account rate-limit updates as turn completion activity", async () => {
@@ -1486,7 +1492,7 @@ describe("runCodexAppServerAttempt", () => {
}),
{ interval: 1 },
);
expect(queueAgentHarnessMessage("session-1", "after silent turn")).toBe(false);
expect(queueActiveRunMessageForTest("session-1", "after silent turn")).toBe(false);
});
it("applies before_prompt_build to Codex developer instructions and turn input", async () => {
@@ -2332,7 +2338,7 @@ describe("runCodexAppServerAttempt", () => {
);
await waitForMethod("turn/start");
expect(queueAgentHarnessMessage("session-1", "more context", { debounceMs: 1 })).toBe(true);
expect(queueActiveRunMessageForTest("session-1", "more context", { debounceMs: 1 })).toBe(true);
await vi.waitFor(() => expect(requests.map((entry) => entry.method)).toContain("turn/steer"), {
interval: 1,
});
@@ -2377,8 +2383,8 @@ describe("runCodexAppServerAttempt", () => {
);
await waitForMethod("turn/start");
expect(queueAgentHarnessMessage("session-1", "first", { debounceMs: 5 })).toBe(true);
expect(queueAgentHarnessMessage("session-1", "second", { debounceMs: 5 })).toBe(true);
expect(queueActiveRunMessageForTest("session-1", "first", { debounceMs: 5 })).toBe(true);
expect(queueActiveRunMessageForTest("session-1", "second", { debounceMs: 5 })).toBe(true);
await vi.waitFor(
() =>
@@ -2410,7 +2416,9 @@ describe("runCodexAppServerAttempt", () => {
);
await waitForMethod("turn/start");
expect(queueAgentHarnessMessage("session-1", "late steer", { debounceMs: 30_000 })).toBe(true);
expect(queueActiveRunMessageForTest("session-1", "late steer", { debounceMs: 30_000 })).toBe(
true,
);
await completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
@@ -2435,12 +2443,12 @@ describe("runCodexAppServerAttempt", () => {
);
await waitForMethod("turn/start");
expect(queueAgentHarnessMessage("session-1", "first", { steeringMode: "one-at-a-time" })).toBe(
true,
);
expect(queueAgentHarnessMessage("session-1", "second", { steeringMode: "one-at-a-time" })).toBe(
true,
);
expect(
queueActiveRunMessageForTest("session-1", "first", { steeringMode: "one-at-a-time" }),
).toBe(true);
expect(
queueActiveRunMessageForTest("session-1", "second", { steeringMode: "one-at-a-time" }),
).toBe(true);
await vi.waitFor(
() =>
@@ -2540,7 +2548,7 @@ describe("runCodexAppServerAttempt", () => {
});
await vi.waitFor(() => expect(params.onBlockReply).toHaveBeenCalledTimes(1), { interval: 1 });
expect(queueAgentHarnessMessage("session-1", "2")).toBe(true);
expect(queueActiveRunMessageForTest("session-1", "2")).toBe(true);
await expect(response).resolves.toEqual({
answers: { mode: { answers: ["Deep"] } },
});
@@ -3388,7 +3396,7 @@ describe("runCodexAppServerAttempt", () => {
await expect(runCodexAppServerAttempt(params, { startupTimeoutFloorMs: 1 })).rejects.toThrow(
"codex app-server startup timed out",
);
expect(queueAgentHarnessMessage("session-1", "after timeout")).toBe(false);
expect(queueActiveRunMessageForTest("session-1", "after timeout")).toBe(false);
});
it("passes the selected auth profile into app-server startup", async () => {
@@ -3450,7 +3458,7 @@ describe("runCodexAppServerAttempt", () => {
params.timeoutMs = 1;
await expect(runCodexAppServerAttempt(params)).rejects.toThrow("turn/start timed out");
expect(queueAgentHarnessMessage("session-1", "after timeout")).toBe(false);
expect(queueActiveRunMessageForTest("session-1", "after timeout")).toBe(false);
});
it("keeps extended history enabled when resuming a bound Codex thread", async () => {

View File

@@ -19,6 +19,7 @@ export {
isEmbeddedPiRunStreaming as isEmbeddedAgentRunStreaming,
queueEmbeddedPiMessage,
queueEmbeddedPiMessage as queueEmbeddedAgentMessage,
queueEmbeddedPiMessageWithOutcome,
resolveActiveEmbeddedRunSessionId,
resolveActiveEmbeddedRunSessionId as resolveActiveEmbeddedAgentRunSessionId,
waitForEmbeddedPiRunEnd,

View File

@@ -8,7 +8,8 @@ import {
consumeEmbeddedRunModelSwitch,
getActiveEmbeddedRunSnapshot,
isEmbeddedPiRunHandleActive,
queueEmbeddedPiMessage,
formatEmbeddedPiQueueFailureSummary,
queueEmbeddedPiMessageWithOutcome,
requestEmbeddedRunModelSwitch,
resolveActiveEmbeddedRunHandleSessionId,
setActiveEmbeddedRun,
@@ -19,12 +20,12 @@ import {
type RunHandle = Parameters<typeof setActiveEmbeddedRun>[1];
function createRunHandle(
overrides: { isCompacting?: boolean; abort?: () => void } = {},
overrides: { isCompacting?: boolean; isStreaming?: boolean; abort?: () => void } = {},
): RunHandle {
const abort = overrides.abort ?? (() => {});
return {
queueMessage: async () => {},
isStreaming: () => true,
isStreaming: () => overrides.isStreaming ?? true,
isCompacting: () => overrides.isCompacting ?? false,
abort,
};
@@ -75,7 +76,9 @@ describe("pi-embedded runner run registry", () => {
});
expect(
queueEmbeddedPiMessage("session-steer", "continue", { steeringMode: "one-at-a-time" }),
queueEmbeddedPiMessageWithOutcome("session-steer", "continue", {
steeringMode: "one-at-a-time",
}).queued,
).toBe(true);
expect(queueMessage).toHaveBeenCalledWith("continue", { steeringMode: "one-at-a-time" });
@@ -88,11 +91,45 @@ describe("pi-embedded runner run registry", () => {
queueMessage,
});
expect(queueEmbeddedPiMessage("session-default-steer", "continue")).toBe(true);
expect(queueEmbeddedPiMessageWithOutcome("session-default-steer", "continue").queued).toBe(
true,
);
expect(queueMessage).toHaveBeenCalledWith("continue", { steeringMode: "all" });
});
it("returns a structured no-active-run queue failure", () => {
const outcome = queueEmbeddedPiMessageWithOutcome("session-missing", "continue");
expect(outcome).toEqual({
queued: false,
sessionId: "session-missing",
reason: "no_active_run",
gatewayHealth: "live",
});
expect(formatEmbeddedPiQueueFailureSummary(outcome)).toBe(
"queue_message_failed reason=no_active_run sessionId=session-missing gatewayHealth=live",
);
});
it("returns structured queue failures for inactive active-run states", () => {
setActiveEmbeddedRun("session-not-streaming", createRunHandle({ isStreaming: false }));
setActiveEmbeddedRun("session-compacting", createRunHandle({ isCompacting: true }));
expect(queueEmbeddedPiMessageWithOutcome("session-not-streaming", "continue")).toEqual({
queued: false,
sessionId: "session-not-streaming",
reason: "not_streaming",
gatewayHealth: "live",
});
expect(queueEmbeddedPiMessageWithOutcome("session-compacting", "continue")).toEqual({
queued: false,
sessionId: "session-compacting",
reason: "compacting",
gatewayHealth: "live",
});
});
it("force-clears an aborted run that does not drain", async () => {
vi.useFakeTimers();
try {

View File

@@ -40,6 +40,43 @@ export {
type EmbeddedRunModelSwitchRequest,
} from "./run-state.js";
export type EmbeddedPiQueueFailureReason = "no_active_run" | "not_streaming" | "compacting";
export type EmbeddedPiQueueMessageOutcome =
| {
queued: true;
sessionId: string;
target: "embedded_run" | "reply_run";
gatewayHealth: "live";
}
| {
queued: false;
sessionId: string;
reason: EmbeddedPiQueueFailureReason;
gatewayHealth: "live";
};
function createQueueFailureOutcome(
sessionId: string,
reason: EmbeddedPiQueueFailureReason,
): EmbeddedPiQueueMessageOutcome {
return {
queued: false,
sessionId,
reason,
gatewayHealth: "live",
};
}
export function formatEmbeddedPiQueueFailureSummary(
outcome: EmbeddedPiQueueMessageOutcome,
): string | undefined {
if (outcome.queued) {
return undefined;
}
return `queue_message_failed reason=${outcome.reason} sessionId=${outcome.sessionId} gatewayHealth=${outcome.gatewayHealth}`;
}
function setActiveRunSessionKey(sessionKey: string | undefined, sessionId: string): void {
const normalizedSessionKey = sessionKey?.trim();
if (!normalizedSessionKey) {
@@ -63,32 +100,53 @@ function clearActiveRunSessionKeys(sessionId: string, sessionKey?: string): void
}
}
/**
* @deprecated Use queueEmbeddedPiMessageWithOutcome so callers preserve failure reasons.
*/
export function queueEmbeddedPiMessage(
sessionId: string,
text: string,
options?: EmbeddedPiQueueMessageOptions,
): boolean {
return queueEmbeddedPiMessageWithOutcome(sessionId, text, options).queued;
}
export function queueEmbeddedPiMessageWithOutcome(
sessionId: string,
text: string,
options?: EmbeddedPiQueueMessageOptions,
): EmbeddedPiQueueMessageOutcome {
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
if (!handle) {
const queuedReplyRunMessage = queueReplyRunMessage(sessionId, text);
if (queuedReplyRunMessage) {
logMessageQueued({ sessionId, source: "pi-embedded-runner" });
return true;
return {
queued: true,
sessionId,
target: "reply_run",
gatewayHealth: "live",
};
}
diag.debug(`queue message failed: sessionId=${sessionId} reason=no_active_run`);
return false;
return createQueueFailureOutcome(sessionId, "no_active_run");
}
if (!handle.isStreaming()) {
diag.debug(`queue message failed: sessionId=${sessionId} reason=not_streaming`);
return false;
return createQueueFailureOutcome(sessionId, "not_streaming");
}
if (handle.isCompacting()) {
diag.debug(`queue message failed: sessionId=${sessionId} reason=compacting`);
return false;
return createQueueFailureOutcome(sessionId, "compacting");
}
logMessageQueued({ sessionId, source: "pi-embedded-runner" });
void handle.queueMessage(text, options ?? { steeringMode: "all" });
return true;
return {
queued: true,
sessionId,
target: "embedded_run",
gatewayHealth: "live",
};
}
/**

View File

@@ -11,7 +11,7 @@ import {
mockedRunEmbeddedAttempt,
overflowBaseRunParams,
} from "./run.overflow-compaction.harness.js";
import { isEmbeddedPiRunActive, queueEmbeddedPiMessage } from "./runs.js";
import { isEmbeddedPiRunActive, queueEmbeddedPiMessageWithOutcome } from "./runs.js";
let runEmbeddedPiAgent: typeof import("./run.js").runEmbeddedPiAgent;
@@ -53,7 +53,10 @@ describe("sessions_yield orchestration", () => {
expect(isEmbeddedPiRunActive(sessionId)).toBe(false);
// 4. Steer would fail (message delivery must take direct path, not steer)
expect(queueEmbeddedPiMessage(sessionId, "subagent result")).toBe(false);
expect(queueEmbeddedPiMessageWithOutcome(sessionId, "subagent result")).toMatchObject({
queued: false,
reason: "no_active_run",
});
});
it("clientToolCalls takes precedence over yieldDetected", async () => {

View File

@@ -20,6 +20,7 @@ export {
isEmbeddedPiRunStreaming,
queueEmbeddedAgentMessage,
queueEmbeddedPiMessage,
queueEmbeddedPiMessageWithOutcome,
resolveActiveEmbeddedAgentRunSessionId,
resolveActiveEmbeddedRunSessionId,
resolveEmbeddedSessionLane,

View File

@@ -16,7 +16,8 @@ export { createBoundDeliveryRouter } from "../infra/outbound/bound-delivery-rout
export { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js";
export { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
export {
formatEmbeddedPiQueueFailureSummary,
isEmbeddedPiRunActive,
queueEmbeddedPiMessage,
queueEmbeddedPiMessageWithOutcome,
resolveActiveEmbeddedRunSessionId,
} from "./pi-embedded-runner/runs.js";

View File

@@ -4,6 +4,10 @@ import {
registerSessionBindingAdapter,
} from "../infra/outbound/session-binding-service.js";
import type { AgentInternalEvent } from "./internal-events.js";
import type {
EmbeddedPiQueueMessageOptions,
EmbeddedPiQueueMessageOutcome,
} from "./pi-embedded-runner/runs.js";
import {
__testing,
deliverSubagentAnnouncement,
@@ -43,6 +47,32 @@ function createSendMessageMock() {
})) as unknown as typeof runtimeSendMessage;
}
type QueueEmbeddedPiMessageWithOutcome = (
sessionId: string,
message: string,
options?: EmbeddedPiQueueMessageOptions,
) => EmbeddedPiQueueMessageOutcome;
function createQueueOutcomeMock(
queued: boolean,
): ReturnType<typeof vi.fn<QueueEmbeddedPiMessageWithOutcome>> {
return vi.fn((sessionId: string) =>
queued
? {
queued: true,
sessionId,
target: "embedded_run",
gatewayHealth: "live",
}
: {
queued: false,
sessionId,
reason: "not_streaming",
gatewayHealth: "live",
},
);
}
const longChildCompletionOutput = [
"34/34 tests pass, clean build. Now docker repro:",
"Root cause: the requester's announce delivery accepted a prefix-only assistant payload as delivered.",
@@ -85,7 +115,7 @@ async function deliverSlackThreadAnnouncement(params: {
sessionId: string;
expectsCompletionMessage: boolean;
directIdempotencyKey: string;
queueEmbeddedPiMessage?: (sessionId: string, message: string) => boolean;
queueEmbeddedPiMessageWithOutcome?: QueueEmbeddedPiMessageWithOutcome;
sendMessage?: typeof runtimeSendMessage;
internalEvents?: AgentInternalEvent[];
sourceTool?: string;
@@ -97,8 +127,8 @@ async function deliverSlackThreadAnnouncement(params: {
isActive: params.isActive,
}),
getRuntimeConfig: () => ({}) as never,
...(params.queueEmbeddedPiMessage
? { queueEmbeddedPiMessage: params.queueEmbeddedPiMessage }
...(params.queueEmbeddedPiMessageWithOutcome
? { queueEmbeddedPiMessageWithOutcome: params.queueEmbeddedPiMessageWithOutcome }
: {}),
});
@@ -163,7 +193,7 @@ async function deliverTelegramDirectMessageCompletion(params: {
sendMessage?: typeof runtimeSendMessage;
internalEvents?: AgentInternalEvent[];
isActive?: boolean;
queueEmbeddedPiMessage?: (sessionId: string, message: string) => boolean;
queueEmbeddedPiMessageWithOutcome?: QueueEmbeddedPiMessageWithOutcome;
}) {
const origin = {
channel: "telegram",
@@ -177,8 +207,8 @@ async function deliverTelegramDirectMessageCompletion(params: {
isActive: params.isActive === true,
}),
getRuntimeConfig: () => ({}) as never,
...(params.queueEmbeddedPiMessage
? { queueEmbeddedPiMessage: params.queueEmbeddedPiMessage }
...(params.queueEmbeddedPiMessageWithOutcome
? { queueEmbeddedPiMessageWithOutcome: params.queueEmbeddedPiMessageWithOutcome }
: {}),
});
@@ -211,7 +241,7 @@ async function deliverSlackChannelAnnouncement(params: {
accountId?: string;
threadId?: string | number;
};
queueEmbeddedPiMessage?: (sessionId: string, message: string) => boolean;
queueEmbeddedPiMessageWithOutcome?: QueueEmbeddedPiMessageWithOutcome;
sendMessage?: typeof runtimeSendMessage;
internalEvents?: AgentInternalEvent[];
sourceTool?: string;
@@ -229,8 +259,8 @@ async function deliverSlackChannelAnnouncement(params: {
isActive: params.isActive,
}),
getRuntimeConfig: () => ({}) as never,
...(params.queueEmbeddedPiMessage
? { queueEmbeddedPiMessage: params.queueEmbeddedPiMessage }
...(params.queueEmbeddedPiMessageWithOutcome
? { queueEmbeddedPiMessageWithOutcome: params.queueEmbeddedPiMessageWithOutcome }
: {}),
});
@@ -500,24 +530,28 @@ describe("deliverSubagentAnnouncement queued delivery", () => {
describe("deliverSubagentAnnouncement completion delivery", () => {
it("keeps completion announces session-internal while preserving route context for active requesters", async () => {
const callGateway = createGatewayMock();
const queueEmbeddedPiMessage = vi.fn(() => true);
const queueEmbeddedPiMessageWithOutcome = createQueueOutcomeMock(true);
const result = await deliverSlackThreadAnnouncement({
callGateway,
sessionId: "requester-session-1",
isActive: true,
expectsCompletionMessage: true,
directIdempotencyKey: "announce-1",
queueEmbeddedPiMessage,
queueEmbeddedPiMessageWithOutcome,
});
expectRecordFields(result, {
delivered: true,
path: "steered",
});
expect(queueEmbeddedPiMessage).toHaveBeenCalledWith("requester-session-1", "child done", {
steeringMode: "all",
debounceMs: 500,
});
expect(queueEmbeddedPiMessageWithOutcome).toHaveBeenCalledWith(
"requester-session-1",
"child done",
{
steeringMode: "all",
debounceMs: 500,
},
);
expect(callGateway).not.toHaveBeenCalled();
});
@@ -894,12 +928,12 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
it("queues when an active Telegram requester cannot be woken directly", async () => {
const callGateway = createGatewayMock();
const sendMessage = createSendMessageMock();
const queueEmbeddedPiMessage = vi.fn(() => false);
const queueEmbeddedPiMessageWithOutcome = createQueueOutcomeMock(false);
const result = await deliverTelegramDirectMessageCompletion({
callGateway,
sendMessage,
isActive: true,
queueEmbeddedPiMessage,
queueEmbeddedPiMessageWithOutcome,
internalEvents: [
{
type: "task_completion",
@@ -924,7 +958,8 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
phase: "direct-primary",
delivered: false,
path: "direct",
error: "active requester session could not be woken",
error:
"active requester session could not be woken: queue_message_failed reason=not_streaming sessionId=requester-session-telegram gatewayHealth=live",
},
{
phase: "queue-fallback",
@@ -934,7 +969,7 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
},
],
});
expect(queueEmbeddedPiMessage).toHaveBeenCalledWith(
expect(queueEmbeddedPiMessageWithOutcome).toHaveBeenCalledWith(
"requester-session-telegram",
"child done",
{

View File

@@ -27,15 +27,17 @@ import {
hasMessagingToolDeliveryEvidence,
hasVisibleAgentPayload,
} from "./pi-embedded-runner/delivery-evidence.js";
import type { EmbeddedPiQueueMessageOptions } from "./pi-embedded-runner/run-state.js";
import {
callGateway,
createBoundDeliveryRouter,
getGlobalHookRunner,
isEmbeddedPiRunActive,
getRuntimeConfig,
formatEmbeddedPiQueueFailureSummary,
isSteeringQueueMode,
loadSessionStore,
queueEmbeddedPiMessage,
queueEmbeddedPiMessageWithOutcome,
resolvePiSteeringModeForQueueMode,
resolveActiveEmbeddedRunSessionId,
resolveAgentIdFromSessionKey,
@@ -65,7 +67,7 @@ type SubagentAnnounceDeliveryDeps = {
sessionId?: string;
isActive: boolean;
};
queueEmbeddedPiMessage: typeof queueEmbeddedPiMessage;
queueEmbeddedPiMessageWithOutcome: typeof queueEmbeddedPiMessageWithOutcome;
};
const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = {
@@ -80,12 +82,28 @@ const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = {
isActive: Boolean(sessionId && isEmbeddedPiRunActive(sessionId)),
};
},
queueEmbeddedPiMessage,
queueEmbeddedPiMessageWithOutcome,
};
let subagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps =
defaultSubagentAnnounceDeliveryDeps;
function resolveQueueEmbeddedPiMessageOutcome(
sessionId: string,
text: string,
options?: EmbeddedPiQueueMessageOptions,
): ReturnType<typeof queueEmbeddedPiMessageWithOutcome> {
return subagentAnnounceDeliveryDeps.queueEmbeddedPiMessageWithOutcome(sessionId, text, options);
}
function formatQueueWakeFailureError(
fallback: string,
outcome: ReturnType<typeof queueEmbeddedPiMessageWithOutcome>,
): string {
const summary = formatEmbeddedPiQueueFailureSummary(outcome);
return summary ? `${fallback}: ${summary}` : fallback;
}
function resolveBoundConversationOrigin(params: {
bindingConversation: ConversationRef & { parentConversationId?: string };
requesterConversation?: ConversationRef;
@@ -488,15 +506,11 @@ async function maybeQueueSubagentAnnounce(params: {
const shouldSteer = isSteeringQueueMode(queueSettings.mode);
if (shouldSteer) {
const steered = subagentAnnounceDeliveryDeps.queueEmbeddedPiMessage(
sessionId,
params.steerMessage,
{
steeringMode: resolvePiSteeringModeForQueueMode(queueSettings.mode),
...(queueSettings.debounceMs !== undefined ? { debounceMs: queueSettings.debounceMs } : {}),
},
);
if (steered) {
const queueOutcome = resolveQueueEmbeddedPiMessageOutcome(sessionId, params.steerMessage, {
steeringMode: resolvePiSteeringModeForQueueMode(queueSettings.mode),
...(queueSettings.debounceMs !== undefined ? { debounceMs: queueSettings.debounceMs } : {}),
});
if (queueOutcome.queued) {
return "steered";
}
}
@@ -728,19 +742,17 @@ async function sendSubagentAnnounceDirectly(params: {
sessionEntry: requesterEntry,
});
if (params.expectsCompletionMessage && requesterActivity.sessionId) {
const woke = requesterActivity.sessionId
? subagentAnnounceDeliveryDeps.queueEmbeddedPiMessage(
requesterActivity.sessionId,
params.triggerMessage,
{
steeringMode: "all",
...(requesterQueueSettings.debounceMs !== undefined
? { debounceMs: requesterQueueSettings.debounceMs }
: {}),
},
)
: false;
if (woke) {
const wakeOutcome = resolveQueueEmbeddedPiMessageOutcome(
requesterActivity.sessionId,
params.triggerMessage,
{
steeringMode: "all",
...(requesterQueueSettings.debounceMs !== undefined
? { debounceMs: requesterQueueSettings.debounceMs }
: {}),
},
);
if (wakeOutcome.queued) {
return {
delivered: true,
path: "steered",
@@ -753,7 +765,10 @@ async function sendSubagentAnnounceDirectly(params: {
return {
delivered: false,
path: "direct",
error: "active requester session could not be woken",
error: formatQueueWakeFailureError(
"active requester session could not be woken",
wakeOutcome,
),
};
}
}

View File

@@ -125,7 +125,10 @@ const getGlobalHookRunnerSpy = vi.spyOn(hookRunnerGlobal, "getGlobalHookRunner")
const readLatestAssistantReplySpy = vi.spyOn(agentStep, "readLatestAssistantReply");
const isEmbeddedPiRunActiveSpy = vi.spyOn(piEmbedded, "isEmbeddedPiRunActive");
const isEmbeddedPiRunStreamingSpy = vi.spyOn(piEmbedded, "isEmbeddedPiRunStreaming");
const queueEmbeddedPiMessageSpy = vi.spyOn(piEmbedded, "queueEmbeddedPiMessage");
const queueEmbeddedPiMessageWithOutcomeSpy = vi.spyOn(
piEmbedded,
"queueEmbeddedPiMessageWithOutcome",
);
const waitForEmbeddedPiRunEndSpy = vi.spyOn(piEmbedded, "waitForEmbeddedPiRunEnd");
const readLatestAssistantReplyMock = vi.fn(
async (_sessionKey?: string): Promise<string | undefined> => "raw subagent reply",
@@ -136,20 +139,21 @@ const embeddedPiRunActiveMock = vi.fn<typeof piEmbedded.isEmbeddedPiRunActive>(
const embeddedPiRunStreamingMock = vi.fn<typeof piEmbedded.isEmbeddedPiRunStreaming>(
(_sessionId: string) => false,
);
const queueEmbeddedPiMessageMock = vi.fn<typeof piEmbedded.queueEmbeddedPiMessage>(
(
_sessionId: string,
_text: string,
_options?: Parameters<typeof piEmbedded.queueEmbeddedPiMessage>[2],
) => false,
);
const queueEmbeddedPiMessageWithOutcomeMock = vi.fn<
typeof piEmbedded.queueEmbeddedPiMessageWithOutcome
>((sessionId: string) => ({
queued: false,
sessionId,
reason: "not_streaming",
gatewayHealth: "live",
}));
const waitForEmbeddedPiRunEndMock = vi.fn<typeof piEmbedded.waitForEmbeddedPiRunEnd>(
async (_sessionId: string, _timeoutMs?: number) => true,
);
const embeddedRunMock = {
isEmbeddedPiRunActive: embeddedPiRunActiveMock,
isEmbeddedPiRunStreaming: embeddedPiRunStreamingMock,
queueEmbeddedPiMessage: queueEmbeddedPiMessageMock,
queueEmbeddedPiMessageWithOutcome: queueEmbeddedPiMessageWithOutcomeMock,
waitForEmbeddedPiRunEnd: waitForEmbeddedPiRunEndMock,
};
const { subagentRegistryMock } = vi.hoisted(() => ({
@@ -375,11 +379,8 @@ describe("subagent announce formatting", () => {
isActive: Boolean(sessionId && embeddedRunMock.isEmbeddedPiRunActive(sessionId)),
};
},
queueEmbeddedPiMessage: (
sessionId: string,
text: string,
options?: Parameters<typeof piEmbedded.queueEmbeddedPiMessage>[2],
) => embeddedRunMock.queueEmbeddedPiMessage(sessionId, text, options),
queueEmbeddedPiMessageWithOutcome: (sessionId, text, options) =>
embeddedRunMock.queueEmbeddedPiMessageWithOutcome(sessionId, text, options),
});
subagentAnnounceTesting.setDepsForTest({
callGateway: async <T = Record<string, unknown>>(
@@ -405,10 +406,10 @@ describe("subagent announce formatting", () => {
isEmbeddedPiRunStreamingSpy
.mockReset()
.mockImplementation((sessionId) => embeddedRunMock.isEmbeddedPiRunStreaming(sessionId));
queueEmbeddedPiMessageSpy
queueEmbeddedPiMessageWithOutcomeSpy
.mockReset()
.mockImplementation((sessionId, text, options) =>
embeddedRunMock.queueEmbeddedPiMessage(sessionId, text, options),
embeddedRunMock.queueEmbeddedPiMessageWithOutcome(sessionId, text, options),
);
waitForEmbeddedPiRunEndSpy
.mockReset()
@@ -418,7 +419,14 @@ describe("subagent announce formatting", () => {
);
embeddedRunMock.isEmbeddedPiRunActive.mockClear().mockReturnValue(false);
embeddedRunMock.isEmbeddedPiRunStreaming.mockClear().mockReturnValue(false);
embeddedRunMock.queueEmbeddedPiMessage.mockClear().mockReturnValue(false);
embeddedRunMock.queueEmbeddedPiMessageWithOutcome
.mockClear()
.mockImplementation((sessionId) => ({
queued: false,
sessionId,
reason: "not_streaming",
gatewayHealth: "live",
}));
embeddedRunMock.waitForEmbeddedPiRunEnd.mockClear().mockResolvedValue(true);
subagentRegistryMock.isSubagentSessionRunActive.mockClear().mockReturnValue(true);
subagentRegistryMock.shouldIgnorePostCompletionAnnounceForSession

View File

@@ -1,6 +1,7 @@
import type { OpenClawConfig } from "../config/types.openclaw.js";
import type { callGateway } from "../gateway/call.js";
import type { EmbeddedPiQueueMessageOptions } from "./pi-embedded-runner/run-state.js";
import type { EmbeddedPiQueueMessageOutcome } from "./pi-embedded-runner/runs.js";
type DeliveryRuntimeMockOptions = {
callGateway: (request: unknown) => Promise<unknown>;
@@ -10,11 +11,11 @@ type DeliveryRuntimeMockOptions = {
resolveMainSessionKey: (cfg: unknown) => string;
resolveStorePath: (store: unknown, options: unknown) => string;
isEmbeddedPiRunActive: (sessionId: string) => boolean;
queueEmbeddedPiMessage: (
queueEmbeddedPiMessageWithOutcome: (
sessionId: string,
text: string,
options?: EmbeddedPiQueueMessageOptions,
) => boolean;
) => EmbeddedPiQueueMessageOutcome;
hasHooks?: () => boolean;
};
@@ -58,7 +59,11 @@ export function createSubagentAnnounceDeliveryRuntimeMock(options: DeliveryRunti
resolveMainSessionKey: options.resolveMainSessionKey,
resolveStorePath: options.resolveStorePath,
isEmbeddedPiRunActive: options.isEmbeddedPiRunActive,
queueEmbeddedPiMessage: options.queueEmbeddedPiMessage,
queueEmbeddedPiMessageWithOutcome: options.queueEmbeddedPiMessageWithOutcome,
formatEmbeddedPiQueueFailureSummary: (outcome: { reason?: string; sessionId?: string }) =>
outcome.reason && outcome.sessionId
? `queue_message_failed reason=${outcome.reason} sessionId=${outcome.sessionId} gatewayHealth=live`
: undefined,
isSteeringQueueMode: (mode: string) =>
mode === "steer" || mode === "queue" || mode === "steer-backlog",
resolvePiSteeringModeForQueueMode: (mode: string) =>

View File

@@ -1,4 +1,5 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { EmbeddedPiQueueMessageOutcome } from "./pi-embedded-runner/runs.js";
import { createSubagentAnnounceDeliveryRuntimeMock } from "./subagent-announce.test-support.js";
type AgentCallRequest = { method?: string; params?: Record<string, unknown> };
@@ -14,8 +15,13 @@ const resolveStorePathMock = vi.fn((_store: unknown, _options: unknown) => "/tmp
const resolveMainSessionKeyMock = vi.fn((_cfg: unknown) => "agent:main:main");
const readLatestAssistantReplyMock = vi.fn(async (_params?: unknown) => "raw subagent reply");
const isEmbeddedPiRunActiveMock = vi.fn((_sessionId: string) => false);
const queueEmbeddedPiMessageMock = vi.fn(
(_sessionId: string, _text: string, _options?: unknown) => false,
const queueEmbeddedPiMessageWithOutcomeMock = vi.fn(
(sessionId: string, _text: string, _options?: unknown): EmbeddedPiQueueMessageOutcome => ({
queued: false,
sessionId,
reason: "not_streaming" as const,
gatewayHealth: "live" as const,
}),
);
const waitForEmbeddedPiRunEndMock = vi.fn(async (_sessionId: string, _timeoutMs?: number) => true);
let mockConfig: ReturnType<(typeof import("../config/config.js"))["getRuntimeConfig"]> = {
@@ -43,8 +49,6 @@ vi.mock("./subagent-announce.runtime.js", () => ({
isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId),
getRuntimeConfig: () => mockConfig,
loadSessionStore: (storePath: string) => loadSessionStoreMock(storePath),
queueEmbeddedPiMessage: (sessionId: string, text: string, options?: unknown) =>
queueEmbeddedPiMessageMock(sessionId, text, options),
resolveAgentIdFromSessionKey: (sessionKey: string) =>
resolveAgentIdFromSessionKeyMock(sessionKey),
resolveMainSessionKey: (cfg: unknown) => resolveMainSessionKeyMock(cfg),
@@ -67,8 +71,8 @@ vi.mock("./subagent-announce-delivery.runtime.js", () =>
resolveMainSessionKey: (cfg: unknown) => resolveMainSessionKeyMock(cfg),
resolveStorePath: (store: unknown, options: unknown) => resolveStorePathMock(store, options),
isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId),
queueEmbeddedPiMessage: (sessionId: string, text: string, options?: unknown) =>
queueEmbeddedPiMessageMock(sessionId, text, options),
queueEmbeddedPiMessageWithOutcome: (sessionId: string, text: string, options?: unknown) =>
queueEmbeddedPiMessageWithOutcomeMock(sessionId, text, options),
}),
);
@@ -100,7 +104,7 @@ vi.mock("./subagent-announce-delivery.js", () => ({
params.requesterSessionOrigin?.channel;
if (sessionId && queueChannel === "discord" && isEmbeddedPiRunActiveMock(sessionId)) {
queueEmbeddedPiMessageMock(
queueEmbeddedPiMessageWithOutcomeMock(
sessionId,
`[Internal task completion event]\n${params.triggerMessage}`,
{ steeringMode: "all" },
@@ -229,7 +233,12 @@ describe("subagent announce seam flow", () => {
resolveMainSessionKeyMock.mockReset().mockImplementation(() => "agent:main:main");
readLatestAssistantReplyMock.mockReset().mockResolvedValue("raw subagent reply");
isEmbeddedPiRunActiveMock.mockReset().mockReturnValue(false);
queueEmbeddedPiMessageMock.mockReset().mockReturnValue(false);
queueEmbeddedPiMessageWithOutcomeMock.mockReset().mockImplementation((sessionId: string) => ({
queued: false,
sessionId,
reason: "not_streaming",
gatewayHealth: "live",
}));
waitForEmbeddedPiRunEndMock.mockReset().mockResolvedValue(true);
mockConfig = {
session: {
@@ -338,7 +347,12 @@ describe("subagent announce seam flow", () => {
},
}));
isEmbeddedPiRunActiveMock.mockReturnValue(true);
queueEmbeddedPiMessageMock.mockReturnValue(true);
queueEmbeddedPiMessageWithOutcomeMock.mockImplementation((sessionId: string) => ({
queued: true,
sessionId,
target: "embedded_run",
gatewayHealth: "live",
}));
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
@@ -355,7 +369,7 @@ describe("subagent announce seam flow", () => {
});
expect(didAnnounce).toBe(true);
expect(queueEmbeddedPiMessageMock).toHaveBeenCalledWith(
expect(queueEmbeddedPiMessageWithOutcomeMock).toHaveBeenCalledWith(
"session-origin-provider-steer",
expect.stringContaining("[Internal task completion event]"),
{ steeringMode: "all" },

View File

@@ -88,7 +88,12 @@ vi.mock("./subagent-announce-delivery.runtime.js", () =>
resolveMainSessionKey: () => "agent:main:main",
resolveStorePath: () => "/tmp/sessions-main.json",
isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId),
queueEmbeddedPiMessage: () => false,
queueEmbeddedPiMessageWithOutcome: (sessionId: string) => ({
queued: false,
sessionId,
reason: "not_streaming",
gatewayHealth: "live",
}),
}),
);
vi.mock("./subagent-announce-delivery.js", () => ({
@@ -176,7 +181,6 @@ vi.mock("./subagent-announce.runtime.js", () => ({
resolveStorePath: () => "/tmp/sessions-main.json",
resolveMainSessionKey: () => "agent:main:main",
isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId),
queueEmbeddedPiMessage: (_sessionId: string, _text: string) => false,
waitForEmbeddedPiRunEnd: (sessionId: string, timeoutMs?: number) =>
waitForEmbeddedPiRunEndMock(sessionId, timeoutMs),
}));

View File

@@ -1,5 +1,6 @@
import path from "node:path";
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import type { EmbeddedPiQueueMessageOutcome } from "../../agents/pi-embedded-runner/runs.js";
import type { TemplateContext } from "../templating.js";
import type { FollowupRun, QueueSettings } from "./queue.js";
import { createMockFollowupRun, createMockTypingController } from "./test-helpers.js";
@@ -10,7 +11,14 @@ const abortEmbeddedPiRunMock = vi.fn();
const compactEmbeddedPiSessionMock = vi.fn();
const isEmbeddedPiRunActiveMock = vi.fn(() => false);
const isEmbeddedPiRunStreamingMock = vi.fn(() => false);
const queueEmbeddedPiMessageMock = vi.fn(() => false);
const queueEmbeddedPiMessageWithOutcomeMock = vi.fn(
(sessionId: string, _text: string, _options?: unknown): EmbeddedPiQueueMessageOutcome => ({
queued: false,
sessionId,
reason: "not_streaming",
gatewayHealth: "live",
}),
);
const resolveEmbeddedSessionLaneMock = vi.fn();
const waitForEmbeddedPiRunEndMock = vi.fn();
const enqueueFollowupRunMock = vi.fn();
@@ -36,14 +44,18 @@ vi.mock("../../agents/pi-embedded.js", () => ({
compactEmbeddedPiSession: compactEmbeddedPiSessionMock,
isEmbeddedPiRunActive: isEmbeddedPiRunActiveMock,
isEmbeddedPiRunStreaming: isEmbeddedPiRunStreamingMock,
queueEmbeddedPiMessage: queueEmbeddedPiMessageMock,
queueEmbeddedPiMessageWithOutcome: queueEmbeddedPiMessageWithOutcomeMock,
resolveEmbeddedSessionLane: resolveEmbeddedSessionLaneMock,
runEmbeddedPiAgent: runEmbeddedPiAgentMock,
waitForEmbeddedPiRunEnd: waitForEmbeddedPiRunEndMock,
}));
vi.mock("../../agents/pi-embedded-runner/runs.js", () => ({
queueEmbeddedPiMessage: queueEmbeddedPiMessageMock,
formatEmbeddedPiQueueFailureSummary: (outcome: { reason?: string; sessionId?: string }) =>
outcome.reason && outcome.sessionId
? `queue_message_failed reason=${outcome.reason} sessionId=${outcome.sessionId} gatewayHealth=live`
: undefined,
queueEmbeddedPiMessageWithOutcome: queueEmbeddedPiMessageWithOutcomeMock,
}));
vi.mock("./queue.js", () => ({
@@ -135,8 +147,13 @@ describe("runReplyAgent media path normalization", () => {
isEmbeddedPiRunActiveMock.mockReturnValue(false);
isEmbeddedPiRunStreamingMock.mockReset();
isEmbeddedPiRunStreamingMock.mockReturnValue(false);
queueEmbeddedPiMessageMock.mockReset();
queueEmbeddedPiMessageMock.mockReturnValue(false);
queueEmbeddedPiMessageWithOutcomeMock.mockReset();
queueEmbeddedPiMessageWithOutcomeMock.mockImplementation((sessionId: string) => ({
queued: false,
sessionId,
reason: "not_streaming",
gatewayHealth: "live",
}));
resolveEmbeddedSessionLaneMock.mockReset();
waitForEmbeddedPiRunEndMock.mockReset();
enqueueFollowupRunMock.mockReset();
@@ -204,7 +221,12 @@ describe("runReplyAgent media path normalization", () => {
});
it("maps steer queue modes to Pi steering drain modes", async () => {
queueEmbeddedPiMessageMock.mockReturnValue(true);
queueEmbeddedPiMessageWithOutcomeMock.mockImplementation((sessionId: string) => ({
queued: true,
sessionId,
target: "embedded_run",
gatewayHealth: "live",
}));
await runReplyAgent(
makeRunReplyAgentParams({
@@ -214,9 +236,13 @@ describe("runReplyAgent media path normalization", () => {
}),
);
expect(queueEmbeddedPiMessageMock).toHaveBeenLastCalledWith("session", "generate chart", {
steeringMode: "all",
});
expect(queueEmbeddedPiMessageWithOutcomeMock).toHaveBeenLastCalledWith(
"session",
"generate chart",
{
steeringMode: "all",
},
);
await runReplyAgent(
makeRunReplyAgentParams({
@@ -226,9 +252,13 @@ describe("runReplyAgent media path normalization", () => {
}),
);
expect(queueEmbeddedPiMessageMock).toHaveBeenLastCalledWith("session", "generate chart", {
steeringMode: "one-at-a-time",
});
expect(queueEmbeddedPiMessageWithOutcomeMock).toHaveBeenLastCalledWith(
"session",
"generate chart",
{
steeringMode: "one-at-a-time",
},
);
});
it("shares one media cache between block accumulation and final payload delivery", async () => {

View File

@@ -4,7 +4,10 @@ import { resolveContextTokensForModel } from "../../agents/context.js";
import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js";
import { resolveModelAuthMode } from "../../agents/model-auth.js";
import { isCliProvider } from "../../agents/model-selection.js";
import { queueEmbeddedPiMessage } from "../../agents/pi-embedded-runner/runs.js";
import {
formatEmbeddedPiQueueFailureSummary,
queueEmbeddedPiMessageWithOutcome,
} from "../../agents/pi-embedded-runner/runs.js";
import { deriveContextPromptTokens, hasNonzeroUsage, normalizeUsage } from "../../agents/usage.js";
import { enqueueCommitmentExtraction } from "../../commitments/runtime.js";
import type { OpenClawConfig } from "../../config/config.js";
@@ -1044,15 +1047,21 @@ export async function runReplyAgent(params: {
const steerSessionId =
(sessionKey ? replyRunRegistry.resolveSessionId(sessionKey) : undefined) ??
followupRun.run.sessionId;
const steered = queueEmbeddedPiMessage(steerSessionId, followupRun.prompt, {
const steerOutcome = queueEmbeddedPiMessageWithOutcome(steerSessionId, followupRun.prompt, {
steeringMode: resolvePiSteeringModeForQueueMode(resolvedQueue.mode),
...(resolvedQueue.debounceMs !== undefined ? { debounceMs: resolvedQueue.debounceMs } : {}),
});
if (steered && !effectiveShouldFollowup) {
if (steerOutcome.queued && !effectiveShouldFollowup) {
await touchActiveSessionEntry();
typing.cleanup();
return undefined;
}
if (!steerOutcome.queued) {
const summary = formatEmbeddedPiQueueFailureSummary(steerOutcome);
if (summary) {
logVerbose(`reply queue steering failed: ${summary}`);
}
}
}
const activeRunQueueAction = resolveActiveRunQueueAction({

View File

@@ -1,5 +1,6 @@
export {
formatEmbeddedPiQueueFailureSummary,
isEmbeddedPiRunActive,
queueEmbeddedPiMessage,
queueEmbeddedPiMessageWithOutcome,
resolveActiveEmbeddedRunSessionId,
} from "../../agents/pi-embedded-runner/runs.js";

View File

@@ -3,8 +3,9 @@ import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { buildCommandTestParams } from "./commands.test-harness.js";
const steerRuntimeMocks = vi.hoisted(() => ({
formatEmbeddedPiQueueFailureSummary: vi.fn(),
isEmbeddedPiRunActive: vi.fn(),
queueEmbeddedPiMessage: vi.fn(),
queueEmbeddedPiMessageWithOutcome: vi.fn(),
resolveActiveEmbeddedRunSessionId: vi.fn(),
}));
@@ -23,8 +24,18 @@ function buildParams(commandBody: string) {
describe("handleSteerCommand", () => {
beforeEach(() => {
steerRuntimeMocks.formatEmbeddedPiQueueFailureSummary
.mockReset()
.mockReturnValue(
"queue_message_failed reason=not_streaming sessionId=session-active gatewayHealth=live",
);
steerRuntimeMocks.isEmbeddedPiRunActive.mockReset().mockReturnValue(false);
steerRuntimeMocks.queueEmbeddedPiMessage.mockReset().mockReturnValue(true);
steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome.mockReset().mockReturnValue({
queued: true,
sessionId: "session-active",
target: "embedded_run",
gatewayHealth: "live",
});
steerRuntimeMocks.resolveActiveEmbeddedRunSessionId.mockReset().mockReturnValue(undefined);
});
@@ -40,7 +51,7 @@ describe("handleSteerCommand", () => {
expect(steerRuntimeMocks.resolveActiveEmbeddedRunSessionId).toHaveBeenCalledWith(
"agent:main:main",
);
expect(steerRuntimeMocks.queueEmbeddedPiMessage).toHaveBeenCalledWith(
expect(steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome).toHaveBeenCalledWith(
"session-active",
"keep going",
{
@@ -63,7 +74,7 @@ describe("handleSteerCommand", () => {
expect(steerRuntimeMocks.resolveActiveEmbeddedRunSessionId).toHaveBeenCalledWith(
"agent:main:discord:direct:target",
);
expect(steerRuntimeMocks.queueEmbeddedPiMessage).toHaveBeenCalledWith(
expect(steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome).toHaveBeenCalledWith(
"session-target",
"check the target",
{
@@ -85,7 +96,7 @@ describe("handleSteerCommand", () => {
"agent:main:main",
);
expect(steerRuntimeMocks.isEmbeddedPiRunActive).toHaveBeenCalledWith("stored-session-id");
expect(steerRuntimeMocks.queueEmbeddedPiMessage).toHaveBeenCalledWith(
expect(steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome).toHaveBeenCalledWith(
"stored-session-id",
"continue from state",
{
@@ -102,7 +113,7 @@ describe("handleSteerCommand", () => {
shouldContinue: false,
reply: { text: "Usage: /steer <message>" },
});
expect(steerRuntimeMocks.queueEmbeddedPiMessage).not.toHaveBeenCalled();
expect(steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome).not.toHaveBeenCalled();
});
it("does not start a new run when no current session run is active", async () => {
@@ -112,12 +123,17 @@ describe("handleSteerCommand", () => {
shouldContinue: false,
reply: { text: "⚠️ No active run to steer in this session." },
});
expect(steerRuntimeMocks.queueEmbeddedPiMessage).not.toHaveBeenCalled();
expect(steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome).not.toHaveBeenCalled();
});
it("reports when the active run rejects steering injection", async () => {
steerRuntimeMocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("session-active");
steerRuntimeMocks.queueEmbeddedPiMessage.mockReturnValue(false);
steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome.mockReturnValue({
queued: false,
sessionId: "session-active",
reason: "not_streaming",
gatewayHealth: "live",
});
const result = await handleSteerCommand(buildParams("/steer keep going"), true);
@@ -125,5 +141,28 @@ describe("handleSteerCommand", () => {
shouldContinue: false,
reply: { text: "⚠️ Current run is active but not accepting steering right now." },
});
expect(steerRuntimeMocks.formatEmbeddedPiQueueFailureSummary).toHaveBeenCalledWith({
queued: false,
sessionId: "session-active",
reason: "not_streaming",
gatewayHealth: "live",
});
});
it("reports compacting runs distinctly", async () => {
steerRuntimeMocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("session-active");
steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome.mockReturnValue({
queued: false,
sessionId: "session-active",
reason: "compacting",
gatewayHealth: "live",
});
const result = await handleSteerCommand(buildParams("/steer keep going"), true);
expect(result).toEqual({
shouldContinue: false,
reply: { text: "⚠️ Current run is compacting; retry after compaction finishes." },
});
});
});

View File

@@ -7,14 +7,28 @@ import { logVerbose } from "../../globals.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import { rejectUnauthorizedCommand } from "./command-gates.js";
import {
formatEmbeddedPiQueueFailureSummary,
isEmbeddedPiRunActive,
queueEmbeddedPiMessage,
queueEmbeddedPiMessageWithOutcome,
resolveActiveEmbeddedRunSessionId,
} from "./commands-steer.runtime.js";
import type { CommandHandler, HandleCommandsParams } from "./commands-types.js";
const STEER_USAGE = "Usage: /steer <message>";
function formatSteerQueueFailureReply(reason: string): string {
if (reason === "no_active_run") {
return "⚠️ This session no longer has an active run to steer.";
}
if (reason === "not_streaming") {
return "⚠️ Current run is active but not accepting steering right now.";
}
if (reason === "compacting") {
return "⚠️ Current run is compacting; retry after compaction finishes.";
}
return "⚠️ Current run is active but not accepting steering right now.";
}
function parseSteerMessage(raw: string): string | null {
const match = raw.trim().match(/^\/(?:steer|tell)(?:\s+([\s\S]*))?$/i);
if (!match) {
@@ -97,15 +111,16 @@ export const handleSteerCommand: CommandHandler = async (params, allowTextComman
return { shouldContinue: false, reply: { text: "⚠️ No active run to steer in this session." } };
}
const steered = queueEmbeddedPiMessage(sessionId, message, {
const queueOutcome = queueEmbeddedPiMessageWithOutcome(sessionId, message, {
steeringMode: "all",
debounceMs: 0,
});
if (!steered) {
logVerbose(`steer: active session ${sessionId} rejected steering injection`);
if (!queueOutcome.queued) {
const summary = formatEmbeddedPiQueueFailureSummary(queueOutcome);
logVerbose(`steer: active session ${sessionId} rejected steering injection: ${summary}`);
return {
shouldContinue: false,
reply: { text: "⚠️ Current run is active but not accepting steering right now." },
reply: { text: formatSteerQueueFailureReply(queueOutcome.reason) },
};
}

View File

@@ -3,6 +3,13 @@
// register quickly inside gateway startup and Docker e2e runs.
import type { EmbeddedRunAttemptResult } from "../agents/pi-embedded-runner/run/types.js";
import {
abortEmbeddedPiRun,
clearActiveEmbeddedRun,
queueEmbeddedPiMessageWithOutcome,
setActiveEmbeddedRun,
type EmbeddedPiQueueMessageOptions,
} from "../agents/pi-embedded-runner/runs.js";
import { formatToolDetail, resolveToolDisplay } from "../agents/tool-display.js";
import { redactToolDetail } from "../logging/redact.js";
import { truncateUtf16Safe } from "../utils.js";
@@ -96,12 +103,19 @@ export { resolveModelAuthMode } from "../agents/model-auth.js";
export { supportsModelTools } from "../agents/model-tool-support.js";
export { resolveAttemptSpawnWorkspaceDir } from "../agents/pi-embedded-runner/run/attempt.thread-helpers.js";
export { buildEmbeddedAttemptToolRunContext } from "../agents/pi-embedded-runner/run/attempt.tool-run-context.js";
export {
abortEmbeddedPiRun as abortAgentHarnessRun,
clearActiveEmbeddedRun,
queueEmbeddedPiMessage as queueAgentHarnessMessage,
setActiveEmbeddedRun,
} from "../agents/pi-embedded-runner/runs.js";
export { abortEmbeddedPiRun as abortAgentHarnessRun, clearActiveEmbeddedRun, setActiveEmbeddedRun };
/**
* @deprecated Active-run queueing is an internal runtime concern. Use current
* runtime hooks instead of steering a harness through this legacy boolean API.
*/
export function queueAgentHarnessMessage(
sessionId: string,
text: string,
options?: EmbeddedPiQueueMessageOptions,
): boolean {
return queueEmbeddedPiMessageWithOutcome(sessionId, text, options).queued;
}
export { disposeRegisteredAgentHarnesses } from "../agents/harness/registry.js";
export {
logAgentRuntimeToolDiagnostics,

View File

@@ -4,6 +4,7 @@ import os from "node:os";
import { join } from "node:path";
import { afterAll, afterEach, beforeAll, expect, vi } from "vitest";
import { clearRuntimeAuthProfileStoreSnapshots } from "../../../src/agents/auth-profiles.js";
import type { EmbeddedPiQueueMessageOutcome } from "../../../src/agents/pi-embedded-runner/runs.js";
import { withFastReplyConfig } from "../../../src/auto-reply/reply/get-reply-fast-path.js";
import type { OpenClawConfig } from "../../../src/config/types.openclaw.js";
@@ -24,7 +25,14 @@ const piEmbeddedMocks = getSharedMocks("openclaw.trigger-handling.pi-embedded-mo
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
compactEmbeddedPiSession: vi.fn(),
runEmbeddedPiAgent: vi.fn(),
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
queueEmbeddedPiMessageWithOutcome: vi.fn(
(sessionId: string, _text?: string, _options?: unknown): EmbeddedPiQueueMessageOutcome => ({
queued: false,
sessionId,
reason: "not_streaming",
gatewayHealth: "live",
}),
),
resolveActiveEmbeddedRunSessionId: vi.fn().mockReturnValue(undefined),
isEmbeddedPiRunActive: vi.fn().mockReturnValue(false),
isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false),
@@ -48,7 +56,8 @@ const installPiEmbeddedMock = () =>
compactEmbeddedPiSession: (...args: unknown[]) =>
piEmbeddedMocks.compactEmbeddedPiSession(...args),
runEmbeddedPiAgent: (...args: unknown[]) => piEmbeddedMocks.runEmbeddedPiAgent(...args),
queueEmbeddedPiMessage: (...args: unknown[]) => piEmbeddedMocks.queueEmbeddedPiMessage(...args),
queueEmbeddedPiMessageWithOutcome: (sessionId: string, text: string, options?: unknown) =>
piEmbeddedMocks.queueEmbeddedPiMessageWithOutcome(sessionId, text, options),
resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`,
resolveActiveEmbeddedRunSessionId: (...args: unknown[]) =>
piEmbeddedMocks.resolveActiveEmbeddedRunSessionId(...args),
@@ -61,6 +70,12 @@ installPiEmbeddedMock();
vi.doMock("../../../src/agents/pi-embedded-runner/runs.js", () => ({
abortEmbeddedPiRun: (...args: unknown[]) => piEmbeddedMocks.abortEmbeddedPiRun(...args),
formatEmbeddedPiQueueFailureSummary: (outcome: { reason?: string; sessionId?: string }) =>
outcome.reason && outcome.sessionId
? `queue_message_failed reason=${outcome.reason} sessionId=${outcome.sessionId} gatewayHealth=live`
: undefined,
queueEmbeddedPiMessageWithOutcome: (sessionId: string, text: string, options?: unknown) =>
piEmbeddedMocks.queueEmbeddedPiMessageWithOutcome(sessionId, text, options),
}));
const providerUsageMocks = vi.hoisted(() => ({
@@ -246,7 +261,14 @@ export async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise
piEmbeddedMocks.runEmbeddedPiAgent.mockReset();
piEmbeddedMocks.abortEmbeddedPiRun.mockReset().mockReturnValue(false);
piEmbeddedMocks.compactEmbeddedPiSession.mockReset();
piEmbeddedMocks.queueEmbeddedPiMessage.mockReset().mockReturnValue(false);
piEmbeddedMocks.queueEmbeddedPiMessageWithOutcome
.mockReset()
.mockImplementation((sessionId: string) => ({
queued: false,
sessionId,
reason: "not_streaming",
gatewayHealth: "live",
}));
piEmbeddedMocks.isEmbeddedPiRunActive.mockReset().mockReturnValue(false);
piEmbeddedMocks.isEmbeddedPiRunStreaming.mockReset().mockReturnValue(false);
modelFallbackMocks.runWithModelFallback.mockClear();