From 1ed50b0ced62ce2707ae50b65d7bacb087007b1b Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 10 May 2026 13:04:23 +0100 Subject: [PATCH] fix: expose active-run queue failure reasons --- CHANGELOG.md | 1 + .../codex/src/app-server/run-attempt.test.ts | 40 ++++++---- src/agents/pi-embedded-runner.ts | 1 + src/agents/pi-embedded-runner/runs.test.ts | 47 ++++++++++-- src/agents/pi-embedded-runner/runs.ts | 68 +++++++++++++++-- .../sessions-yield.orchestration.test.ts | 7 +- src/agents/pi-embedded.ts | 1 + .../subagent-announce-delivery.runtime.ts | 3 +- src/agents/subagent-announce-delivery.test.ts | 73 ++++++++++++++----- src/agents/subagent-announce-delivery.ts | 67 ++++++++++------- .../subagent-announce.format.e2e.test.ts | 42 ++++++----- src/agents/subagent-announce.test-support.ts | 11 ++- src/agents/subagent-announce.test.ts | 34 ++++++--- src/agents/subagent-announce.timeout.test.ts | 8 +- .../reply/agent-runner.media-paths.test.ts | 54 +++++++++++--- src/auto-reply/reply/agent-runner.ts | 15 +++- .../reply/commands-steer.runtime.ts | 3 +- src/auto-reply/reply/commands-steer.test.ts | 55 ++++++++++++-- src/auto-reply/reply/commands-steer.ts | 25 +++++-- src/plugin-sdk/agent-harness-runtime.ts | 26 +++++-- .../trigger-handling-test-harness.ts | 28 ++++++- 21 files changed, 465 insertions(+), 144 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 648ff6ce903..404967660c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Gateway/agents: keep structured reasons when active-run queueing fails and deprecate the legacy boolean queue helper, so steering and subagent wake diagnostics distinguish completed, non-streaming, and compacting runs. Fixes #80156. Thanks @markus-lassfolk. - Agents/UI: compact exec and tool progress rows by hiding redundant shell tool names, replacing known workspace paths with short context markers, and preserving Discord trace scrubbing for compact command lines. - ACPX: run and await the embedded ACP backend startup probe by default so the gateway `ready` signal no longer fires before the acpx runtime has either become usable or reported a probe failure; set `OPENCLAW_ACPX_RUNTIME_STARTUP_PROBE=0` to restore lazy startup. Fixes #79596. Thanks @bzelones. - OpenAI-compatible models: strip prior assistant reasoning fields from replayed Chat Completions history by default, preventing oMLX/vLLM Qwen follow-up turns from rejecting or stalling on stale `reasoning` payloads. Fixes #46637. Thanks @zipzagster and @lexhoefsloot. diff --git a/extensions/codex/src/app-server/run-attempt.test.ts b/extensions/codex/src/app-server/run-attempt.test.ts index a1f85836d65..222efe68f6a 100644 --- a/extensions/codex/src/app-server/run-attempt.test.ts +++ b/extensions/codex/src/app-server/run-attempt.test.ts @@ -7,7 +7,6 @@ import { embeddedAgentLog, nativeHookRelayTesting, onAgentEvent, - queueAgentHarnessMessage, resetAgentEventsForTest, type AgentEventPayload, type EmbeddedRunAttemptParams, @@ -18,6 +17,13 @@ import { } from "openclaw/plugin-sdk/hook-runtime"; import { createMockPluginRegistry } from "openclaw/plugin-sdk/plugin-test-runtime"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { queueEmbeddedPiMessageWithOutcome } from "../../../../src/agents/pi-embedded-runner/runs.js"; + +function queueActiveRunMessageForTest( + ...args: Parameters +): boolean { + return queueEmbeddedPiMessageWithOutcome(...args).queued; +} import { CODEX_GPT5_BEHAVIOR_CONTRACT } from "../../prompt-overlay.js"; import { buildCodexAppInventoryCacheKey, @@ -1177,7 +1183,7 @@ describe("runCodexAppServerAttempt", () => { }), { interval: 1 }, ); - expect(queueAgentHarnessMessage("session-1", "after timeout")).toBe(false); + expect(queueActiveRunMessageForTest("session-1", "after timeout")).toBe(false); }); it("does not count account rate-limit updates as turn completion activity", async () => { @@ -1486,7 +1492,7 @@ describe("runCodexAppServerAttempt", () => { }), { interval: 1 }, ); - expect(queueAgentHarnessMessage("session-1", "after silent turn")).toBe(false); + expect(queueActiveRunMessageForTest("session-1", "after silent turn")).toBe(false); }); it("applies before_prompt_build to Codex developer instructions and turn input", async () => { @@ -2332,7 +2338,7 @@ describe("runCodexAppServerAttempt", () => { ); await waitForMethod("turn/start"); - expect(queueAgentHarnessMessage("session-1", "more context", { debounceMs: 1 })).toBe(true); + expect(queueActiveRunMessageForTest("session-1", "more context", { debounceMs: 1 })).toBe(true); await vi.waitFor(() => expect(requests.map((entry) => entry.method)).toContain("turn/steer"), { interval: 1, }); @@ -2377,8 +2383,8 @@ describe("runCodexAppServerAttempt", () => { ); await waitForMethod("turn/start"); - expect(queueAgentHarnessMessage("session-1", "first", { debounceMs: 5 })).toBe(true); - expect(queueAgentHarnessMessage("session-1", "second", { debounceMs: 5 })).toBe(true); + expect(queueActiveRunMessageForTest("session-1", "first", { debounceMs: 5 })).toBe(true); + expect(queueActiveRunMessageForTest("session-1", "second", { debounceMs: 5 })).toBe(true); await vi.waitFor( () => @@ -2410,7 +2416,9 @@ describe("runCodexAppServerAttempt", () => { ); await waitForMethod("turn/start"); - expect(queueAgentHarnessMessage("session-1", "late steer", { debounceMs: 30_000 })).toBe(true); + expect(queueActiveRunMessageForTest("session-1", "late steer", { debounceMs: 30_000 })).toBe( + true, + ); await completeTurn({ threadId: "thread-1", turnId: "turn-1" }); await run; @@ -2435,12 +2443,12 @@ describe("runCodexAppServerAttempt", () => { ); await waitForMethod("turn/start"); - expect(queueAgentHarnessMessage("session-1", "first", { steeringMode: "one-at-a-time" })).toBe( - true, - ); - expect(queueAgentHarnessMessage("session-1", "second", { steeringMode: "one-at-a-time" })).toBe( - true, - ); + expect( + queueActiveRunMessageForTest("session-1", "first", { steeringMode: "one-at-a-time" }), + ).toBe(true); + expect( + queueActiveRunMessageForTest("session-1", "second", { steeringMode: "one-at-a-time" }), + ).toBe(true); await vi.waitFor( () => @@ -2540,7 +2548,7 @@ describe("runCodexAppServerAttempt", () => { }); await vi.waitFor(() => expect(params.onBlockReply).toHaveBeenCalledTimes(1), { interval: 1 }); - expect(queueAgentHarnessMessage("session-1", "2")).toBe(true); + expect(queueActiveRunMessageForTest("session-1", "2")).toBe(true); await expect(response).resolves.toEqual({ answers: { mode: { answers: ["Deep"] } }, }); @@ -3388,7 +3396,7 @@ describe("runCodexAppServerAttempt", () => { await expect(runCodexAppServerAttempt(params, { startupTimeoutFloorMs: 1 })).rejects.toThrow( "codex app-server startup timed out", ); - expect(queueAgentHarnessMessage("session-1", "after timeout")).toBe(false); + expect(queueActiveRunMessageForTest("session-1", "after timeout")).toBe(false); }); it("passes the selected auth profile into app-server startup", async () => { @@ -3450,7 +3458,7 @@ describe("runCodexAppServerAttempt", () => { params.timeoutMs = 1; await expect(runCodexAppServerAttempt(params)).rejects.toThrow("turn/start timed out"); - expect(queueAgentHarnessMessage("session-1", "after timeout")).toBe(false); + expect(queueActiveRunMessageForTest("session-1", "after timeout")).toBe(false); }); it("keeps extended history enabled when resuming a bound Codex thread", async () => { diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index 7f861a9b855..f99860655bc 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -19,6 +19,7 @@ export { isEmbeddedPiRunStreaming as isEmbeddedAgentRunStreaming, queueEmbeddedPiMessage, queueEmbeddedPiMessage as queueEmbeddedAgentMessage, + queueEmbeddedPiMessageWithOutcome, resolveActiveEmbeddedRunSessionId, resolveActiveEmbeddedRunSessionId as resolveActiveEmbeddedAgentRunSessionId, waitForEmbeddedPiRunEnd, diff --git a/src/agents/pi-embedded-runner/runs.test.ts b/src/agents/pi-embedded-runner/runs.test.ts index 3b8e61f2344..c7be417edc3 100644 --- a/src/agents/pi-embedded-runner/runs.test.ts +++ b/src/agents/pi-embedded-runner/runs.test.ts @@ -8,7 +8,8 @@ import { consumeEmbeddedRunModelSwitch, getActiveEmbeddedRunSnapshot, isEmbeddedPiRunHandleActive, - queueEmbeddedPiMessage, + formatEmbeddedPiQueueFailureSummary, + queueEmbeddedPiMessageWithOutcome, requestEmbeddedRunModelSwitch, resolveActiveEmbeddedRunHandleSessionId, setActiveEmbeddedRun, @@ -19,12 +20,12 @@ import { type RunHandle = Parameters[1]; function createRunHandle( - overrides: { isCompacting?: boolean; abort?: () => void } = {}, + overrides: { isCompacting?: boolean; isStreaming?: boolean; abort?: () => void } = {}, ): RunHandle { const abort = overrides.abort ?? (() => {}); return { queueMessage: async () => {}, - isStreaming: () => true, + isStreaming: () => overrides.isStreaming ?? true, isCompacting: () => overrides.isCompacting ?? false, abort, }; @@ -75,7 +76,9 @@ describe("pi-embedded runner run registry", () => { }); expect( - queueEmbeddedPiMessage("session-steer", "continue", { steeringMode: "one-at-a-time" }), + queueEmbeddedPiMessageWithOutcome("session-steer", "continue", { + steeringMode: "one-at-a-time", + }).queued, ).toBe(true); expect(queueMessage).toHaveBeenCalledWith("continue", { steeringMode: "one-at-a-time" }); @@ -88,11 +91,45 @@ describe("pi-embedded runner run registry", () => { queueMessage, }); - expect(queueEmbeddedPiMessage("session-default-steer", "continue")).toBe(true); + expect(queueEmbeddedPiMessageWithOutcome("session-default-steer", "continue").queued).toBe( + true, + ); expect(queueMessage).toHaveBeenCalledWith("continue", { steeringMode: "all" }); }); + it("returns a structured no-active-run queue failure", () => { + const outcome = queueEmbeddedPiMessageWithOutcome("session-missing", "continue"); + + expect(outcome).toEqual({ + queued: false, + sessionId: "session-missing", + reason: "no_active_run", + gatewayHealth: "live", + }); + expect(formatEmbeddedPiQueueFailureSummary(outcome)).toBe( + "queue_message_failed reason=no_active_run sessionId=session-missing gatewayHealth=live", + ); + }); + + it("returns structured queue failures for inactive active-run states", () => { + setActiveEmbeddedRun("session-not-streaming", createRunHandle({ isStreaming: false })); + setActiveEmbeddedRun("session-compacting", createRunHandle({ isCompacting: true })); + + expect(queueEmbeddedPiMessageWithOutcome("session-not-streaming", "continue")).toEqual({ + queued: false, + sessionId: "session-not-streaming", + reason: "not_streaming", + gatewayHealth: "live", + }); + expect(queueEmbeddedPiMessageWithOutcome("session-compacting", "continue")).toEqual({ + queued: false, + sessionId: "session-compacting", + reason: "compacting", + gatewayHealth: "live", + }); + }); + it("force-clears an aborted run that does not drain", async () => { vi.useFakeTimers(); try { diff --git a/src/agents/pi-embedded-runner/runs.ts b/src/agents/pi-embedded-runner/runs.ts index b354baa8562..70470cc2b02 100644 --- a/src/agents/pi-embedded-runner/runs.ts +++ b/src/agents/pi-embedded-runner/runs.ts @@ -40,6 +40,43 @@ export { type EmbeddedRunModelSwitchRequest, } from "./run-state.js"; +export type EmbeddedPiQueueFailureReason = "no_active_run" | "not_streaming" | "compacting"; + +export type EmbeddedPiQueueMessageOutcome = + | { + queued: true; + sessionId: string; + target: "embedded_run" | "reply_run"; + gatewayHealth: "live"; + } + | { + queued: false; + sessionId: string; + reason: EmbeddedPiQueueFailureReason; + gatewayHealth: "live"; + }; + +function createQueueFailureOutcome( + sessionId: string, + reason: EmbeddedPiQueueFailureReason, +): EmbeddedPiQueueMessageOutcome { + return { + queued: false, + sessionId, + reason, + gatewayHealth: "live", + }; +} + +export function formatEmbeddedPiQueueFailureSummary( + outcome: EmbeddedPiQueueMessageOutcome, +): string | undefined { + if (outcome.queued) { + return undefined; + } + return `queue_message_failed reason=${outcome.reason} sessionId=${outcome.sessionId} gatewayHealth=${outcome.gatewayHealth}`; +} + function setActiveRunSessionKey(sessionKey: string | undefined, sessionId: string): void { const normalizedSessionKey = sessionKey?.trim(); if (!normalizedSessionKey) { @@ -63,32 +100,53 @@ function clearActiveRunSessionKeys(sessionId: string, sessionKey?: string): void } } +/** + * @deprecated Use queueEmbeddedPiMessageWithOutcome so callers preserve failure reasons. + */ export function queueEmbeddedPiMessage( sessionId: string, text: string, options?: EmbeddedPiQueueMessageOptions, ): boolean { + return queueEmbeddedPiMessageWithOutcome(sessionId, text, options).queued; +} + +export function queueEmbeddedPiMessageWithOutcome( + sessionId: string, + text: string, + options?: EmbeddedPiQueueMessageOptions, +): EmbeddedPiQueueMessageOutcome { const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId); if (!handle) { const queuedReplyRunMessage = queueReplyRunMessage(sessionId, text); if (queuedReplyRunMessage) { logMessageQueued({ sessionId, source: "pi-embedded-runner" }); - return true; + return { + queued: true, + sessionId, + target: "reply_run", + gatewayHealth: "live", + }; } diag.debug(`queue message failed: sessionId=${sessionId} reason=no_active_run`); - return false; + return createQueueFailureOutcome(sessionId, "no_active_run"); } if (!handle.isStreaming()) { diag.debug(`queue message failed: sessionId=${sessionId} reason=not_streaming`); - return false; + return createQueueFailureOutcome(sessionId, "not_streaming"); } if (handle.isCompacting()) { diag.debug(`queue message failed: sessionId=${sessionId} reason=compacting`); - return false; + return createQueueFailureOutcome(sessionId, "compacting"); } logMessageQueued({ sessionId, source: "pi-embedded-runner" }); void handle.queueMessage(text, options ?? { steeringMode: "all" }); - return true; + return { + queued: true, + sessionId, + target: "embedded_run", + gatewayHealth: "live", + }; } /** diff --git a/src/agents/pi-embedded-runner/sessions-yield.orchestration.test.ts b/src/agents/pi-embedded-runner/sessions-yield.orchestration.test.ts index dd898a8ed18..f28d55693c9 100644 --- a/src/agents/pi-embedded-runner/sessions-yield.orchestration.test.ts +++ b/src/agents/pi-embedded-runner/sessions-yield.orchestration.test.ts @@ -11,7 +11,7 @@ import { mockedRunEmbeddedAttempt, overflowBaseRunParams, } from "./run.overflow-compaction.harness.js"; -import { isEmbeddedPiRunActive, queueEmbeddedPiMessage } from "./runs.js"; +import { isEmbeddedPiRunActive, queueEmbeddedPiMessageWithOutcome } from "./runs.js"; let runEmbeddedPiAgent: typeof import("./run.js").runEmbeddedPiAgent; @@ -53,7 +53,10 @@ describe("sessions_yield orchestration", () => { expect(isEmbeddedPiRunActive(sessionId)).toBe(false); // 4. Steer would fail (message delivery must take direct path, not steer) - expect(queueEmbeddedPiMessage(sessionId, "subagent result")).toBe(false); + expect(queueEmbeddedPiMessageWithOutcome(sessionId, "subagent result")).toMatchObject({ + queued: false, + reason: "no_active_run", + }); }); it("clientToolCalls takes precedence over yieldDetected", async () => { diff --git a/src/agents/pi-embedded.ts b/src/agents/pi-embedded.ts index 6d98cdcb860..74595b3d229 100644 --- a/src/agents/pi-embedded.ts +++ b/src/agents/pi-embedded.ts @@ -20,6 +20,7 @@ export { isEmbeddedPiRunStreaming, queueEmbeddedAgentMessage, queueEmbeddedPiMessage, + queueEmbeddedPiMessageWithOutcome, resolveActiveEmbeddedAgentRunSessionId, resolveActiveEmbeddedRunSessionId, resolveEmbeddedSessionLane, diff --git a/src/agents/subagent-announce-delivery.runtime.ts b/src/agents/subagent-announce-delivery.runtime.ts index 582a03c0e8c..928e17b6f75 100644 --- a/src/agents/subagent-announce-delivery.runtime.ts +++ b/src/agents/subagent-announce-delivery.runtime.ts @@ -16,7 +16,8 @@ export { createBoundDeliveryRouter } from "../infra/outbound/bound-delivery-rout export { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js"; export { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; export { + formatEmbeddedPiQueueFailureSummary, isEmbeddedPiRunActive, - queueEmbeddedPiMessage, + queueEmbeddedPiMessageWithOutcome, resolveActiveEmbeddedRunSessionId, } from "./pi-embedded-runner/runs.js"; diff --git a/src/agents/subagent-announce-delivery.test.ts b/src/agents/subagent-announce-delivery.test.ts index 5654f23f8f0..081e193985b 100644 --- a/src/agents/subagent-announce-delivery.test.ts +++ b/src/agents/subagent-announce-delivery.test.ts @@ -4,6 +4,10 @@ import { registerSessionBindingAdapter, } from "../infra/outbound/session-binding-service.js"; import type { AgentInternalEvent } from "./internal-events.js"; +import type { + EmbeddedPiQueueMessageOptions, + EmbeddedPiQueueMessageOutcome, +} from "./pi-embedded-runner/runs.js"; import { __testing, deliverSubagentAnnouncement, @@ -43,6 +47,32 @@ function createSendMessageMock() { })) as unknown as typeof runtimeSendMessage; } +type QueueEmbeddedPiMessageWithOutcome = ( + sessionId: string, + message: string, + options?: EmbeddedPiQueueMessageOptions, +) => EmbeddedPiQueueMessageOutcome; + +function createQueueOutcomeMock( + queued: boolean, +): ReturnType> { + return vi.fn((sessionId: string) => + queued + ? { + queued: true, + sessionId, + target: "embedded_run", + gatewayHealth: "live", + } + : { + queued: false, + sessionId, + reason: "not_streaming", + gatewayHealth: "live", + }, + ); +} + const longChildCompletionOutput = [ "34/34 tests pass, clean build. Now docker repro:", "Root cause: the requester's announce delivery accepted a prefix-only assistant payload as delivered.", @@ -85,7 +115,7 @@ async function deliverSlackThreadAnnouncement(params: { sessionId: string; expectsCompletionMessage: boolean; directIdempotencyKey: string; - queueEmbeddedPiMessage?: (sessionId: string, message: string) => boolean; + queueEmbeddedPiMessageWithOutcome?: QueueEmbeddedPiMessageWithOutcome; sendMessage?: typeof runtimeSendMessage; internalEvents?: AgentInternalEvent[]; sourceTool?: string; @@ -97,8 +127,8 @@ async function deliverSlackThreadAnnouncement(params: { isActive: params.isActive, }), getRuntimeConfig: () => ({}) as never, - ...(params.queueEmbeddedPiMessage - ? { queueEmbeddedPiMessage: params.queueEmbeddedPiMessage } + ...(params.queueEmbeddedPiMessageWithOutcome + ? { queueEmbeddedPiMessageWithOutcome: params.queueEmbeddedPiMessageWithOutcome } : {}), }); @@ -163,7 +193,7 @@ async function deliverTelegramDirectMessageCompletion(params: { sendMessage?: typeof runtimeSendMessage; internalEvents?: AgentInternalEvent[]; isActive?: boolean; - queueEmbeddedPiMessage?: (sessionId: string, message: string) => boolean; + queueEmbeddedPiMessageWithOutcome?: QueueEmbeddedPiMessageWithOutcome; }) { const origin = { channel: "telegram", @@ -177,8 +207,8 @@ async function deliverTelegramDirectMessageCompletion(params: { isActive: params.isActive === true, }), getRuntimeConfig: () => ({}) as never, - ...(params.queueEmbeddedPiMessage - ? { queueEmbeddedPiMessage: params.queueEmbeddedPiMessage } + ...(params.queueEmbeddedPiMessageWithOutcome + ? { queueEmbeddedPiMessageWithOutcome: params.queueEmbeddedPiMessageWithOutcome } : {}), }); @@ -211,7 +241,7 @@ async function deliverSlackChannelAnnouncement(params: { accountId?: string; threadId?: string | number; }; - queueEmbeddedPiMessage?: (sessionId: string, message: string) => boolean; + queueEmbeddedPiMessageWithOutcome?: QueueEmbeddedPiMessageWithOutcome; sendMessage?: typeof runtimeSendMessage; internalEvents?: AgentInternalEvent[]; sourceTool?: string; @@ -229,8 +259,8 @@ async function deliverSlackChannelAnnouncement(params: { isActive: params.isActive, }), getRuntimeConfig: () => ({}) as never, - ...(params.queueEmbeddedPiMessage - ? { queueEmbeddedPiMessage: params.queueEmbeddedPiMessage } + ...(params.queueEmbeddedPiMessageWithOutcome + ? { queueEmbeddedPiMessageWithOutcome: params.queueEmbeddedPiMessageWithOutcome } : {}), }); @@ -500,24 +530,28 @@ describe("deliverSubagentAnnouncement queued delivery", () => { describe("deliverSubagentAnnouncement completion delivery", () => { it("keeps completion announces session-internal while preserving route context for active requesters", async () => { const callGateway = createGatewayMock(); - const queueEmbeddedPiMessage = vi.fn(() => true); + const queueEmbeddedPiMessageWithOutcome = createQueueOutcomeMock(true); const result = await deliverSlackThreadAnnouncement({ callGateway, sessionId: "requester-session-1", isActive: true, expectsCompletionMessage: true, directIdempotencyKey: "announce-1", - queueEmbeddedPiMessage, + queueEmbeddedPiMessageWithOutcome, }); expectRecordFields(result, { delivered: true, path: "steered", }); - expect(queueEmbeddedPiMessage).toHaveBeenCalledWith("requester-session-1", "child done", { - steeringMode: "all", - debounceMs: 500, - }); + expect(queueEmbeddedPiMessageWithOutcome).toHaveBeenCalledWith( + "requester-session-1", + "child done", + { + steeringMode: "all", + debounceMs: 500, + }, + ); expect(callGateway).not.toHaveBeenCalled(); }); @@ -894,12 +928,12 @@ describe("deliverSubagentAnnouncement completion delivery", () => { it("queues when an active Telegram requester cannot be woken directly", async () => { const callGateway = createGatewayMock(); const sendMessage = createSendMessageMock(); - const queueEmbeddedPiMessage = vi.fn(() => false); + const queueEmbeddedPiMessageWithOutcome = createQueueOutcomeMock(false); const result = await deliverTelegramDirectMessageCompletion({ callGateway, sendMessage, isActive: true, - queueEmbeddedPiMessage, + queueEmbeddedPiMessageWithOutcome, internalEvents: [ { type: "task_completion", @@ -924,7 +958,8 @@ describe("deliverSubagentAnnouncement completion delivery", () => { phase: "direct-primary", delivered: false, path: "direct", - error: "active requester session could not be woken", + error: + "active requester session could not be woken: queue_message_failed reason=not_streaming sessionId=requester-session-telegram gatewayHealth=live", }, { phase: "queue-fallback", @@ -934,7 +969,7 @@ describe("deliverSubagentAnnouncement completion delivery", () => { }, ], }); - expect(queueEmbeddedPiMessage).toHaveBeenCalledWith( + expect(queueEmbeddedPiMessageWithOutcome).toHaveBeenCalledWith( "requester-session-telegram", "child done", { diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts index f253566a5c2..847c41149df 100644 --- a/src/agents/subagent-announce-delivery.ts +++ b/src/agents/subagent-announce-delivery.ts @@ -27,15 +27,17 @@ import { hasMessagingToolDeliveryEvidence, hasVisibleAgentPayload, } from "./pi-embedded-runner/delivery-evidence.js"; +import type { EmbeddedPiQueueMessageOptions } from "./pi-embedded-runner/run-state.js"; import { callGateway, createBoundDeliveryRouter, getGlobalHookRunner, isEmbeddedPiRunActive, getRuntimeConfig, + formatEmbeddedPiQueueFailureSummary, isSteeringQueueMode, loadSessionStore, - queueEmbeddedPiMessage, + queueEmbeddedPiMessageWithOutcome, resolvePiSteeringModeForQueueMode, resolveActiveEmbeddedRunSessionId, resolveAgentIdFromSessionKey, @@ -65,7 +67,7 @@ type SubagentAnnounceDeliveryDeps = { sessionId?: string; isActive: boolean; }; - queueEmbeddedPiMessage: typeof queueEmbeddedPiMessage; + queueEmbeddedPiMessageWithOutcome: typeof queueEmbeddedPiMessageWithOutcome; }; const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = { @@ -80,12 +82,28 @@ const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = { isActive: Boolean(sessionId && isEmbeddedPiRunActive(sessionId)), }; }, - queueEmbeddedPiMessage, + queueEmbeddedPiMessageWithOutcome, }; let subagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = defaultSubagentAnnounceDeliveryDeps; +function resolveQueueEmbeddedPiMessageOutcome( + sessionId: string, + text: string, + options?: EmbeddedPiQueueMessageOptions, +): ReturnType { + return subagentAnnounceDeliveryDeps.queueEmbeddedPiMessageWithOutcome(sessionId, text, options); +} + +function formatQueueWakeFailureError( + fallback: string, + outcome: ReturnType, +): string { + const summary = formatEmbeddedPiQueueFailureSummary(outcome); + return summary ? `${fallback}: ${summary}` : fallback; +} + function resolveBoundConversationOrigin(params: { bindingConversation: ConversationRef & { parentConversationId?: string }; requesterConversation?: ConversationRef; @@ -488,15 +506,11 @@ async function maybeQueueSubagentAnnounce(params: { const shouldSteer = isSteeringQueueMode(queueSettings.mode); if (shouldSteer) { - const steered = subagentAnnounceDeliveryDeps.queueEmbeddedPiMessage( - sessionId, - params.steerMessage, - { - steeringMode: resolvePiSteeringModeForQueueMode(queueSettings.mode), - ...(queueSettings.debounceMs !== undefined ? { debounceMs: queueSettings.debounceMs } : {}), - }, - ); - if (steered) { + const queueOutcome = resolveQueueEmbeddedPiMessageOutcome(sessionId, params.steerMessage, { + steeringMode: resolvePiSteeringModeForQueueMode(queueSettings.mode), + ...(queueSettings.debounceMs !== undefined ? { debounceMs: queueSettings.debounceMs } : {}), + }); + if (queueOutcome.queued) { return "steered"; } } @@ -728,19 +742,17 @@ async function sendSubagentAnnounceDirectly(params: { sessionEntry: requesterEntry, }); if (params.expectsCompletionMessage && requesterActivity.sessionId) { - const woke = requesterActivity.sessionId - ? subagentAnnounceDeliveryDeps.queueEmbeddedPiMessage( - requesterActivity.sessionId, - params.triggerMessage, - { - steeringMode: "all", - ...(requesterQueueSettings.debounceMs !== undefined - ? { debounceMs: requesterQueueSettings.debounceMs } - : {}), - }, - ) - : false; - if (woke) { + const wakeOutcome = resolveQueueEmbeddedPiMessageOutcome( + requesterActivity.sessionId, + params.triggerMessage, + { + steeringMode: "all", + ...(requesterQueueSettings.debounceMs !== undefined + ? { debounceMs: requesterQueueSettings.debounceMs } + : {}), + }, + ); + if (wakeOutcome.queued) { return { delivered: true, path: "steered", @@ -753,7 +765,10 @@ async function sendSubagentAnnounceDirectly(params: { return { delivered: false, path: "direct", - error: "active requester session could not be woken", + error: formatQueueWakeFailureError( + "active requester session could not be woken", + wakeOutcome, + ), }; } } diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index a5b36f515d6..fdba02d1a46 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -125,7 +125,10 @@ const getGlobalHookRunnerSpy = vi.spyOn(hookRunnerGlobal, "getGlobalHookRunner") const readLatestAssistantReplySpy = vi.spyOn(agentStep, "readLatestAssistantReply"); const isEmbeddedPiRunActiveSpy = vi.spyOn(piEmbedded, "isEmbeddedPiRunActive"); const isEmbeddedPiRunStreamingSpy = vi.spyOn(piEmbedded, "isEmbeddedPiRunStreaming"); -const queueEmbeddedPiMessageSpy = vi.spyOn(piEmbedded, "queueEmbeddedPiMessage"); +const queueEmbeddedPiMessageWithOutcomeSpy = vi.spyOn( + piEmbedded, + "queueEmbeddedPiMessageWithOutcome", +); const waitForEmbeddedPiRunEndSpy = vi.spyOn(piEmbedded, "waitForEmbeddedPiRunEnd"); const readLatestAssistantReplyMock = vi.fn( async (_sessionKey?: string): Promise => "raw subagent reply", @@ -136,20 +139,21 @@ const embeddedPiRunActiveMock = vi.fn( const embeddedPiRunStreamingMock = vi.fn( (_sessionId: string) => false, ); -const queueEmbeddedPiMessageMock = vi.fn( - ( - _sessionId: string, - _text: string, - _options?: Parameters[2], - ) => false, -); +const queueEmbeddedPiMessageWithOutcomeMock = vi.fn< + typeof piEmbedded.queueEmbeddedPiMessageWithOutcome +>((sessionId: string) => ({ + queued: false, + sessionId, + reason: "not_streaming", + gatewayHealth: "live", +})); const waitForEmbeddedPiRunEndMock = vi.fn( async (_sessionId: string, _timeoutMs?: number) => true, ); const embeddedRunMock = { isEmbeddedPiRunActive: embeddedPiRunActiveMock, isEmbeddedPiRunStreaming: embeddedPiRunStreamingMock, - queueEmbeddedPiMessage: queueEmbeddedPiMessageMock, + queueEmbeddedPiMessageWithOutcome: queueEmbeddedPiMessageWithOutcomeMock, waitForEmbeddedPiRunEnd: waitForEmbeddedPiRunEndMock, }; const { subagentRegistryMock } = vi.hoisted(() => ({ @@ -375,11 +379,8 @@ describe("subagent announce formatting", () => { isActive: Boolean(sessionId && embeddedRunMock.isEmbeddedPiRunActive(sessionId)), }; }, - queueEmbeddedPiMessage: ( - sessionId: string, - text: string, - options?: Parameters[2], - ) => embeddedRunMock.queueEmbeddedPiMessage(sessionId, text, options), + queueEmbeddedPiMessageWithOutcome: (sessionId, text, options) => + embeddedRunMock.queueEmbeddedPiMessageWithOutcome(sessionId, text, options), }); subagentAnnounceTesting.setDepsForTest({ callGateway: async >( @@ -405,10 +406,10 @@ describe("subagent announce formatting", () => { isEmbeddedPiRunStreamingSpy .mockReset() .mockImplementation((sessionId) => embeddedRunMock.isEmbeddedPiRunStreaming(sessionId)); - queueEmbeddedPiMessageSpy + queueEmbeddedPiMessageWithOutcomeSpy .mockReset() .mockImplementation((sessionId, text, options) => - embeddedRunMock.queueEmbeddedPiMessage(sessionId, text, options), + embeddedRunMock.queueEmbeddedPiMessageWithOutcome(sessionId, text, options), ); waitForEmbeddedPiRunEndSpy .mockReset() @@ -418,7 +419,14 @@ describe("subagent announce formatting", () => { ); embeddedRunMock.isEmbeddedPiRunActive.mockClear().mockReturnValue(false); embeddedRunMock.isEmbeddedPiRunStreaming.mockClear().mockReturnValue(false); - embeddedRunMock.queueEmbeddedPiMessage.mockClear().mockReturnValue(false); + embeddedRunMock.queueEmbeddedPiMessageWithOutcome + .mockClear() + .mockImplementation((sessionId) => ({ + queued: false, + sessionId, + reason: "not_streaming", + gatewayHealth: "live", + })); embeddedRunMock.waitForEmbeddedPiRunEnd.mockClear().mockResolvedValue(true); subagentRegistryMock.isSubagentSessionRunActive.mockClear().mockReturnValue(true); subagentRegistryMock.shouldIgnorePostCompletionAnnounceForSession diff --git a/src/agents/subagent-announce.test-support.ts b/src/agents/subagent-announce.test-support.ts index de9a7858f9d..dda9937cfb5 100644 --- a/src/agents/subagent-announce.test-support.ts +++ b/src/agents/subagent-announce.test-support.ts @@ -1,6 +1,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js"; import type { callGateway } from "../gateway/call.js"; import type { EmbeddedPiQueueMessageOptions } from "./pi-embedded-runner/run-state.js"; +import type { EmbeddedPiQueueMessageOutcome } from "./pi-embedded-runner/runs.js"; type DeliveryRuntimeMockOptions = { callGateway: (request: unknown) => Promise; @@ -10,11 +11,11 @@ type DeliveryRuntimeMockOptions = { resolveMainSessionKey: (cfg: unknown) => string; resolveStorePath: (store: unknown, options: unknown) => string; isEmbeddedPiRunActive: (sessionId: string) => boolean; - queueEmbeddedPiMessage: ( + queueEmbeddedPiMessageWithOutcome: ( sessionId: string, text: string, options?: EmbeddedPiQueueMessageOptions, - ) => boolean; + ) => EmbeddedPiQueueMessageOutcome; hasHooks?: () => boolean; }; @@ -58,7 +59,11 @@ export function createSubagentAnnounceDeliveryRuntimeMock(options: DeliveryRunti resolveMainSessionKey: options.resolveMainSessionKey, resolveStorePath: options.resolveStorePath, isEmbeddedPiRunActive: options.isEmbeddedPiRunActive, - queueEmbeddedPiMessage: options.queueEmbeddedPiMessage, + queueEmbeddedPiMessageWithOutcome: options.queueEmbeddedPiMessageWithOutcome, + formatEmbeddedPiQueueFailureSummary: (outcome: { reason?: string; sessionId?: string }) => + outcome.reason && outcome.sessionId + ? `queue_message_failed reason=${outcome.reason} sessionId=${outcome.sessionId} gatewayHealth=live` + : undefined, isSteeringQueueMode: (mode: string) => mode === "steer" || mode === "queue" || mode === "steer-backlog", resolvePiSteeringModeForQueueMode: (mode: string) => diff --git a/src/agents/subagent-announce.test.ts b/src/agents/subagent-announce.test.ts index 821d162cd4e..16cf964dd1e 100644 --- a/src/agents/subagent-announce.test.ts +++ b/src/agents/subagent-announce.test.ts @@ -1,4 +1,5 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { EmbeddedPiQueueMessageOutcome } from "./pi-embedded-runner/runs.js"; import { createSubagentAnnounceDeliveryRuntimeMock } from "./subagent-announce.test-support.js"; type AgentCallRequest = { method?: string; params?: Record }; @@ -14,8 +15,13 @@ const resolveStorePathMock = vi.fn((_store: unknown, _options: unknown) => "/tmp const resolveMainSessionKeyMock = vi.fn((_cfg: unknown) => "agent:main:main"); const readLatestAssistantReplyMock = vi.fn(async (_params?: unknown) => "raw subagent reply"); const isEmbeddedPiRunActiveMock = vi.fn((_sessionId: string) => false); -const queueEmbeddedPiMessageMock = vi.fn( - (_sessionId: string, _text: string, _options?: unknown) => false, +const queueEmbeddedPiMessageWithOutcomeMock = vi.fn( + (sessionId: string, _text: string, _options?: unknown): EmbeddedPiQueueMessageOutcome => ({ + queued: false, + sessionId, + reason: "not_streaming" as const, + gatewayHealth: "live" as const, + }), ); const waitForEmbeddedPiRunEndMock = vi.fn(async (_sessionId: string, _timeoutMs?: number) => true); let mockConfig: ReturnType<(typeof import("../config/config.js"))["getRuntimeConfig"]> = { @@ -43,8 +49,6 @@ vi.mock("./subagent-announce.runtime.js", () => ({ isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId), getRuntimeConfig: () => mockConfig, loadSessionStore: (storePath: string) => loadSessionStoreMock(storePath), - queueEmbeddedPiMessage: (sessionId: string, text: string, options?: unknown) => - queueEmbeddedPiMessageMock(sessionId, text, options), resolveAgentIdFromSessionKey: (sessionKey: string) => resolveAgentIdFromSessionKeyMock(sessionKey), resolveMainSessionKey: (cfg: unknown) => resolveMainSessionKeyMock(cfg), @@ -67,8 +71,8 @@ vi.mock("./subagent-announce-delivery.runtime.js", () => resolveMainSessionKey: (cfg: unknown) => resolveMainSessionKeyMock(cfg), resolveStorePath: (store: unknown, options: unknown) => resolveStorePathMock(store, options), isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId), - queueEmbeddedPiMessage: (sessionId: string, text: string, options?: unknown) => - queueEmbeddedPiMessageMock(sessionId, text, options), + queueEmbeddedPiMessageWithOutcome: (sessionId: string, text: string, options?: unknown) => + queueEmbeddedPiMessageWithOutcomeMock(sessionId, text, options), }), ); @@ -100,7 +104,7 @@ vi.mock("./subagent-announce-delivery.js", () => ({ params.requesterSessionOrigin?.channel; if (sessionId && queueChannel === "discord" && isEmbeddedPiRunActiveMock(sessionId)) { - queueEmbeddedPiMessageMock( + queueEmbeddedPiMessageWithOutcomeMock( sessionId, `[Internal task completion event]\n${params.triggerMessage}`, { steeringMode: "all" }, @@ -229,7 +233,12 @@ describe("subagent announce seam flow", () => { resolveMainSessionKeyMock.mockReset().mockImplementation(() => "agent:main:main"); readLatestAssistantReplyMock.mockReset().mockResolvedValue("raw subagent reply"); isEmbeddedPiRunActiveMock.mockReset().mockReturnValue(false); - queueEmbeddedPiMessageMock.mockReset().mockReturnValue(false); + queueEmbeddedPiMessageWithOutcomeMock.mockReset().mockImplementation((sessionId: string) => ({ + queued: false, + sessionId, + reason: "not_streaming", + gatewayHealth: "live", + })); waitForEmbeddedPiRunEndMock.mockReset().mockResolvedValue(true); mockConfig = { session: { @@ -338,7 +347,12 @@ describe("subagent announce seam flow", () => { }, })); isEmbeddedPiRunActiveMock.mockReturnValue(true); - queueEmbeddedPiMessageMock.mockReturnValue(true); + queueEmbeddedPiMessageWithOutcomeMock.mockImplementation((sessionId: string) => ({ + queued: true, + sessionId, + target: "embedded_run", + gatewayHealth: "live", + })); const didAnnounce = await runSubagentAnnounceFlow({ childSessionKey: "agent:main:subagent:test", @@ -355,7 +369,7 @@ describe("subagent announce seam flow", () => { }); expect(didAnnounce).toBe(true); - expect(queueEmbeddedPiMessageMock).toHaveBeenCalledWith( + expect(queueEmbeddedPiMessageWithOutcomeMock).toHaveBeenCalledWith( "session-origin-provider-steer", expect.stringContaining("[Internal task completion event]"), { steeringMode: "all" }, diff --git a/src/agents/subagent-announce.timeout.test.ts b/src/agents/subagent-announce.timeout.test.ts index 633c28b7c3b..5776f4c2245 100644 --- a/src/agents/subagent-announce.timeout.test.ts +++ b/src/agents/subagent-announce.timeout.test.ts @@ -88,7 +88,12 @@ vi.mock("./subagent-announce-delivery.runtime.js", () => resolveMainSessionKey: () => "agent:main:main", resolveStorePath: () => "/tmp/sessions-main.json", isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId), - queueEmbeddedPiMessage: () => false, + queueEmbeddedPiMessageWithOutcome: (sessionId: string) => ({ + queued: false, + sessionId, + reason: "not_streaming", + gatewayHealth: "live", + }), }), ); vi.mock("./subagent-announce-delivery.js", () => ({ @@ -176,7 +181,6 @@ vi.mock("./subagent-announce.runtime.js", () => ({ resolveStorePath: () => "/tmp/sessions-main.json", resolveMainSessionKey: () => "agent:main:main", isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId), - queueEmbeddedPiMessage: (_sessionId: string, _text: string) => false, waitForEmbeddedPiRunEnd: (sessionId: string, timeoutMs?: number) => waitForEmbeddedPiRunEndMock(sessionId, timeoutMs), })); diff --git a/src/auto-reply/reply/agent-runner.media-paths.test.ts b/src/auto-reply/reply/agent-runner.media-paths.test.ts index 3f93e1454c2..1bc475e20ed 100644 --- a/src/auto-reply/reply/agent-runner.media-paths.test.ts +++ b/src/auto-reply/reply/agent-runner.media-paths.test.ts @@ -1,5 +1,6 @@ import path from "node:path"; import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import type { EmbeddedPiQueueMessageOutcome } from "../../agents/pi-embedded-runner/runs.js"; import type { TemplateContext } from "../templating.js"; import type { FollowupRun, QueueSettings } from "./queue.js"; import { createMockFollowupRun, createMockTypingController } from "./test-helpers.js"; @@ -10,7 +11,14 @@ const abortEmbeddedPiRunMock = vi.fn(); const compactEmbeddedPiSessionMock = vi.fn(); const isEmbeddedPiRunActiveMock = vi.fn(() => false); const isEmbeddedPiRunStreamingMock = vi.fn(() => false); -const queueEmbeddedPiMessageMock = vi.fn(() => false); +const queueEmbeddedPiMessageWithOutcomeMock = vi.fn( + (sessionId: string, _text: string, _options?: unknown): EmbeddedPiQueueMessageOutcome => ({ + queued: false, + sessionId, + reason: "not_streaming", + gatewayHealth: "live", + }), +); const resolveEmbeddedSessionLaneMock = vi.fn(); const waitForEmbeddedPiRunEndMock = vi.fn(); const enqueueFollowupRunMock = vi.fn(); @@ -36,14 +44,18 @@ vi.mock("../../agents/pi-embedded.js", () => ({ compactEmbeddedPiSession: compactEmbeddedPiSessionMock, isEmbeddedPiRunActive: isEmbeddedPiRunActiveMock, isEmbeddedPiRunStreaming: isEmbeddedPiRunStreamingMock, - queueEmbeddedPiMessage: queueEmbeddedPiMessageMock, + queueEmbeddedPiMessageWithOutcome: queueEmbeddedPiMessageWithOutcomeMock, resolveEmbeddedSessionLane: resolveEmbeddedSessionLaneMock, runEmbeddedPiAgent: runEmbeddedPiAgentMock, waitForEmbeddedPiRunEnd: waitForEmbeddedPiRunEndMock, })); vi.mock("../../agents/pi-embedded-runner/runs.js", () => ({ - queueEmbeddedPiMessage: queueEmbeddedPiMessageMock, + formatEmbeddedPiQueueFailureSummary: (outcome: { reason?: string; sessionId?: string }) => + outcome.reason && outcome.sessionId + ? `queue_message_failed reason=${outcome.reason} sessionId=${outcome.sessionId} gatewayHealth=live` + : undefined, + queueEmbeddedPiMessageWithOutcome: queueEmbeddedPiMessageWithOutcomeMock, })); vi.mock("./queue.js", () => ({ @@ -135,8 +147,13 @@ describe("runReplyAgent media path normalization", () => { isEmbeddedPiRunActiveMock.mockReturnValue(false); isEmbeddedPiRunStreamingMock.mockReset(); isEmbeddedPiRunStreamingMock.mockReturnValue(false); - queueEmbeddedPiMessageMock.mockReset(); - queueEmbeddedPiMessageMock.mockReturnValue(false); + queueEmbeddedPiMessageWithOutcomeMock.mockReset(); + queueEmbeddedPiMessageWithOutcomeMock.mockImplementation((sessionId: string) => ({ + queued: false, + sessionId, + reason: "not_streaming", + gatewayHealth: "live", + })); resolveEmbeddedSessionLaneMock.mockReset(); waitForEmbeddedPiRunEndMock.mockReset(); enqueueFollowupRunMock.mockReset(); @@ -204,7 +221,12 @@ describe("runReplyAgent media path normalization", () => { }); it("maps steer queue modes to Pi steering drain modes", async () => { - queueEmbeddedPiMessageMock.mockReturnValue(true); + queueEmbeddedPiMessageWithOutcomeMock.mockImplementation((sessionId: string) => ({ + queued: true, + sessionId, + target: "embedded_run", + gatewayHealth: "live", + })); await runReplyAgent( makeRunReplyAgentParams({ @@ -214,9 +236,13 @@ describe("runReplyAgent media path normalization", () => { }), ); - expect(queueEmbeddedPiMessageMock).toHaveBeenLastCalledWith("session", "generate chart", { - steeringMode: "all", - }); + expect(queueEmbeddedPiMessageWithOutcomeMock).toHaveBeenLastCalledWith( + "session", + "generate chart", + { + steeringMode: "all", + }, + ); await runReplyAgent( makeRunReplyAgentParams({ @@ -226,9 +252,13 @@ describe("runReplyAgent media path normalization", () => { }), ); - expect(queueEmbeddedPiMessageMock).toHaveBeenLastCalledWith("session", "generate chart", { - steeringMode: "one-at-a-time", - }); + expect(queueEmbeddedPiMessageWithOutcomeMock).toHaveBeenLastCalledWith( + "session", + "generate chart", + { + steeringMode: "one-at-a-time", + }, + ); }); it("shares one media cache between block accumulation and final payload delivery", async () => { diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index b37cab18dd9..33edc8b26e7 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -4,7 +4,10 @@ import { resolveContextTokensForModel } from "../../agents/context.js"; import { DEFAULT_CONTEXT_TOKENS } from "../../agents/defaults.js"; import { resolveModelAuthMode } from "../../agents/model-auth.js"; import { isCliProvider } from "../../agents/model-selection.js"; -import { queueEmbeddedPiMessage } from "../../agents/pi-embedded-runner/runs.js"; +import { + formatEmbeddedPiQueueFailureSummary, + queueEmbeddedPiMessageWithOutcome, +} from "../../agents/pi-embedded-runner/runs.js"; import { deriveContextPromptTokens, hasNonzeroUsage, normalizeUsage } from "../../agents/usage.js"; import { enqueueCommitmentExtraction } from "../../commitments/runtime.js"; import type { OpenClawConfig } from "../../config/config.js"; @@ -1044,15 +1047,21 @@ export async function runReplyAgent(params: { const steerSessionId = (sessionKey ? replyRunRegistry.resolveSessionId(sessionKey) : undefined) ?? followupRun.run.sessionId; - const steered = queueEmbeddedPiMessage(steerSessionId, followupRun.prompt, { + const steerOutcome = queueEmbeddedPiMessageWithOutcome(steerSessionId, followupRun.prompt, { steeringMode: resolvePiSteeringModeForQueueMode(resolvedQueue.mode), ...(resolvedQueue.debounceMs !== undefined ? { debounceMs: resolvedQueue.debounceMs } : {}), }); - if (steered && !effectiveShouldFollowup) { + if (steerOutcome.queued && !effectiveShouldFollowup) { await touchActiveSessionEntry(); typing.cleanup(); return undefined; } + if (!steerOutcome.queued) { + const summary = formatEmbeddedPiQueueFailureSummary(steerOutcome); + if (summary) { + logVerbose(`reply queue steering failed: ${summary}`); + } + } } const activeRunQueueAction = resolveActiveRunQueueAction({ diff --git a/src/auto-reply/reply/commands-steer.runtime.ts b/src/auto-reply/reply/commands-steer.runtime.ts index f23c4bd4083..ae6d202db65 100644 --- a/src/auto-reply/reply/commands-steer.runtime.ts +++ b/src/auto-reply/reply/commands-steer.runtime.ts @@ -1,5 +1,6 @@ export { + formatEmbeddedPiQueueFailureSummary, isEmbeddedPiRunActive, - queueEmbeddedPiMessage, + queueEmbeddedPiMessageWithOutcome, resolveActiveEmbeddedRunSessionId, } from "../../agents/pi-embedded-runner/runs.js"; diff --git a/src/auto-reply/reply/commands-steer.test.ts b/src/auto-reply/reply/commands-steer.test.ts index eb3967a5a78..3578cbeba2e 100644 --- a/src/auto-reply/reply/commands-steer.test.ts +++ b/src/auto-reply/reply/commands-steer.test.ts @@ -3,8 +3,9 @@ import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { buildCommandTestParams } from "./commands.test-harness.js"; const steerRuntimeMocks = vi.hoisted(() => ({ + formatEmbeddedPiQueueFailureSummary: vi.fn(), isEmbeddedPiRunActive: vi.fn(), - queueEmbeddedPiMessage: vi.fn(), + queueEmbeddedPiMessageWithOutcome: vi.fn(), resolveActiveEmbeddedRunSessionId: vi.fn(), })); @@ -23,8 +24,18 @@ function buildParams(commandBody: string) { describe("handleSteerCommand", () => { beforeEach(() => { + steerRuntimeMocks.formatEmbeddedPiQueueFailureSummary + .mockReset() + .mockReturnValue( + "queue_message_failed reason=not_streaming sessionId=session-active gatewayHealth=live", + ); steerRuntimeMocks.isEmbeddedPiRunActive.mockReset().mockReturnValue(false); - steerRuntimeMocks.queueEmbeddedPiMessage.mockReset().mockReturnValue(true); + steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome.mockReset().mockReturnValue({ + queued: true, + sessionId: "session-active", + target: "embedded_run", + gatewayHealth: "live", + }); steerRuntimeMocks.resolveActiveEmbeddedRunSessionId.mockReset().mockReturnValue(undefined); }); @@ -40,7 +51,7 @@ describe("handleSteerCommand", () => { expect(steerRuntimeMocks.resolveActiveEmbeddedRunSessionId).toHaveBeenCalledWith( "agent:main:main", ); - expect(steerRuntimeMocks.queueEmbeddedPiMessage).toHaveBeenCalledWith( + expect(steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome).toHaveBeenCalledWith( "session-active", "keep going", { @@ -63,7 +74,7 @@ describe("handleSteerCommand", () => { expect(steerRuntimeMocks.resolveActiveEmbeddedRunSessionId).toHaveBeenCalledWith( "agent:main:discord:direct:target", ); - expect(steerRuntimeMocks.queueEmbeddedPiMessage).toHaveBeenCalledWith( + expect(steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome).toHaveBeenCalledWith( "session-target", "check the target", { @@ -85,7 +96,7 @@ describe("handleSteerCommand", () => { "agent:main:main", ); expect(steerRuntimeMocks.isEmbeddedPiRunActive).toHaveBeenCalledWith("stored-session-id"); - expect(steerRuntimeMocks.queueEmbeddedPiMessage).toHaveBeenCalledWith( + expect(steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome).toHaveBeenCalledWith( "stored-session-id", "continue from state", { @@ -102,7 +113,7 @@ describe("handleSteerCommand", () => { shouldContinue: false, reply: { text: "Usage: /steer " }, }); - expect(steerRuntimeMocks.queueEmbeddedPiMessage).not.toHaveBeenCalled(); + expect(steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome).not.toHaveBeenCalled(); }); it("does not start a new run when no current session run is active", async () => { @@ -112,12 +123,17 @@ describe("handleSteerCommand", () => { shouldContinue: false, reply: { text: "⚠️ No active run to steer in this session." }, }); - expect(steerRuntimeMocks.queueEmbeddedPiMessage).not.toHaveBeenCalled(); + expect(steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome).not.toHaveBeenCalled(); }); it("reports when the active run rejects steering injection", async () => { steerRuntimeMocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("session-active"); - steerRuntimeMocks.queueEmbeddedPiMessage.mockReturnValue(false); + steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome.mockReturnValue({ + queued: false, + sessionId: "session-active", + reason: "not_streaming", + gatewayHealth: "live", + }); const result = await handleSteerCommand(buildParams("/steer keep going"), true); @@ -125,5 +141,28 @@ describe("handleSteerCommand", () => { shouldContinue: false, reply: { text: "⚠️ Current run is active but not accepting steering right now." }, }); + expect(steerRuntimeMocks.formatEmbeddedPiQueueFailureSummary).toHaveBeenCalledWith({ + queued: false, + sessionId: "session-active", + reason: "not_streaming", + gatewayHealth: "live", + }); + }); + + it("reports compacting runs distinctly", async () => { + steerRuntimeMocks.resolveActiveEmbeddedRunSessionId.mockReturnValue("session-active"); + steerRuntimeMocks.queueEmbeddedPiMessageWithOutcome.mockReturnValue({ + queued: false, + sessionId: "session-active", + reason: "compacting", + gatewayHealth: "live", + }); + + const result = await handleSteerCommand(buildParams("/steer keep going"), true); + + expect(result).toEqual({ + shouldContinue: false, + reply: { text: "⚠️ Current run is compacting; retry after compaction finishes." }, + }); }); }); diff --git a/src/auto-reply/reply/commands-steer.ts b/src/auto-reply/reply/commands-steer.ts index 57a3412aafb..008232593d8 100644 --- a/src/auto-reply/reply/commands-steer.ts +++ b/src/auto-reply/reply/commands-steer.ts @@ -7,14 +7,28 @@ import { logVerbose } from "../../globals.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; import { rejectUnauthorizedCommand } from "./command-gates.js"; import { + formatEmbeddedPiQueueFailureSummary, isEmbeddedPiRunActive, - queueEmbeddedPiMessage, + queueEmbeddedPiMessageWithOutcome, resolveActiveEmbeddedRunSessionId, } from "./commands-steer.runtime.js"; import type { CommandHandler, HandleCommandsParams } from "./commands-types.js"; const STEER_USAGE = "Usage: /steer "; +function formatSteerQueueFailureReply(reason: string): string { + if (reason === "no_active_run") { + return "⚠️ This session no longer has an active run to steer."; + } + if (reason === "not_streaming") { + return "⚠️ Current run is active but not accepting steering right now."; + } + if (reason === "compacting") { + return "⚠️ Current run is compacting; retry after compaction finishes."; + } + return "⚠️ Current run is active but not accepting steering right now."; +} + function parseSteerMessage(raw: string): string | null { const match = raw.trim().match(/^\/(?:steer|tell)(?:\s+([\s\S]*))?$/i); if (!match) { @@ -97,15 +111,16 @@ export const handleSteerCommand: CommandHandler = async (params, allowTextComman return { shouldContinue: false, reply: { text: "⚠️ No active run to steer in this session." } }; } - const steered = queueEmbeddedPiMessage(sessionId, message, { + const queueOutcome = queueEmbeddedPiMessageWithOutcome(sessionId, message, { steeringMode: "all", debounceMs: 0, }); - if (!steered) { - logVerbose(`steer: active session ${sessionId} rejected steering injection`); + if (!queueOutcome.queued) { + const summary = formatEmbeddedPiQueueFailureSummary(queueOutcome); + logVerbose(`steer: active session ${sessionId} rejected steering injection: ${summary}`); return { shouldContinue: false, - reply: { text: "⚠️ Current run is active but not accepting steering right now." }, + reply: { text: formatSteerQueueFailureReply(queueOutcome.reason) }, }; } diff --git a/src/plugin-sdk/agent-harness-runtime.ts b/src/plugin-sdk/agent-harness-runtime.ts index 82a2acfb6a4..f82f3c45e4f 100644 --- a/src/plugin-sdk/agent-harness-runtime.ts +++ b/src/plugin-sdk/agent-harness-runtime.ts @@ -3,6 +3,13 @@ // register quickly inside gateway startup and Docker e2e runs. import type { EmbeddedRunAttemptResult } from "../agents/pi-embedded-runner/run/types.js"; +import { + abortEmbeddedPiRun, + clearActiveEmbeddedRun, + queueEmbeddedPiMessageWithOutcome, + setActiveEmbeddedRun, + type EmbeddedPiQueueMessageOptions, +} from "../agents/pi-embedded-runner/runs.js"; import { formatToolDetail, resolveToolDisplay } from "../agents/tool-display.js"; import { redactToolDetail } from "../logging/redact.js"; import { truncateUtf16Safe } from "../utils.js"; @@ -96,12 +103,19 @@ export { resolveModelAuthMode } from "../agents/model-auth.js"; export { supportsModelTools } from "../agents/model-tool-support.js"; export { resolveAttemptSpawnWorkspaceDir } from "../agents/pi-embedded-runner/run/attempt.thread-helpers.js"; export { buildEmbeddedAttemptToolRunContext } from "../agents/pi-embedded-runner/run/attempt.tool-run-context.js"; -export { - abortEmbeddedPiRun as abortAgentHarnessRun, - clearActiveEmbeddedRun, - queueEmbeddedPiMessage as queueAgentHarnessMessage, - setActiveEmbeddedRun, -} from "../agents/pi-embedded-runner/runs.js"; +export { abortEmbeddedPiRun as abortAgentHarnessRun, clearActiveEmbeddedRun, setActiveEmbeddedRun }; + +/** + * @deprecated Active-run queueing is an internal runtime concern. Use current + * runtime hooks instead of steering a harness through this legacy boolean API. + */ +export function queueAgentHarnessMessage( + sessionId: string, + text: string, + options?: EmbeddedPiQueueMessageOptions, +): boolean { + return queueEmbeddedPiMessageWithOutcome(sessionId, text, options).queued; +} export { disposeRegisteredAgentHarnesses } from "../agents/harness/registry.js"; export { logAgentRuntimeToolDiagnostics, diff --git a/test/helpers/auto-reply/trigger-handling-test-harness.ts b/test/helpers/auto-reply/trigger-handling-test-harness.ts index 6600ad43d6c..6acd3f637f8 100644 --- a/test/helpers/auto-reply/trigger-handling-test-harness.ts +++ b/test/helpers/auto-reply/trigger-handling-test-harness.ts @@ -4,6 +4,7 @@ import os from "node:os"; import { join } from "node:path"; import { afterAll, afterEach, beforeAll, expect, vi } from "vitest"; import { clearRuntimeAuthProfileStoreSnapshots } from "../../../src/agents/auth-profiles.js"; +import type { EmbeddedPiQueueMessageOutcome } from "../../../src/agents/pi-embedded-runner/runs.js"; import { withFastReplyConfig } from "../../../src/auto-reply/reply/get-reply-fast-path.js"; import type { OpenClawConfig } from "../../../src/config/types.openclaw.js"; @@ -24,7 +25,14 @@ const piEmbeddedMocks = getSharedMocks("openclaw.trigger-handling.pi-embedded-mo abortEmbeddedPiRun: vi.fn().mockReturnValue(false), compactEmbeddedPiSession: vi.fn(), runEmbeddedPiAgent: vi.fn(), - queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), + queueEmbeddedPiMessageWithOutcome: vi.fn( + (sessionId: string, _text?: string, _options?: unknown): EmbeddedPiQueueMessageOutcome => ({ + queued: false, + sessionId, + reason: "not_streaming", + gatewayHealth: "live", + }), + ), resolveActiveEmbeddedRunSessionId: vi.fn().mockReturnValue(undefined), isEmbeddedPiRunActive: vi.fn().mockReturnValue(false), isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false), @@ -48,7 +56,8 @@ const installPiEmbeddedMock = () => compactEmbeddedPiSession: (...args: unknown[]) => piEmbeddedMocks.compactEmbeddedPiSession(...args), runEmbeddedPiAgent: (...args: unknown[]) => piEmbeddedMocks.runEmbeddedPiAgent(...args), - queueEmbeddedPiMessage: (...args: unknown[]) => piEmbeddedMocks.queueEmbeddedPiMessage(...args), + queueEmbeddedPiMessageWithOutcome: (sessionId: string, text: string, options?: unknown) => + piEmbeddedMocks.queueEmbeddedPiMessageWithOutcome(sessionId, text, options), resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`, resolveActiveEmbeddedRunSessionId: (...args: unknown[]) => piEmbeddedMocks.resolveActiveEmbeddedRunSessionId(...args), @@ -61,6 +70,12 @@ installPiEmbeddedMock(); vi.doMock("../../../src/agents/pi-embedded-runner/runs.js", () => ({ abortEmbeddedPiRun: (...args: unknown[]) => piEmbeddedMocks.abortEmbeddedPiRun(...args), + formatEmbeddedPiQueueFailureSummary: (outcome: { reason?: string; sessionId?: string }) => + outcome.reason && outcome.sessionId + ? `queue_message_failed reason=${outcome.reason} sessionId=${outcome.sessionId} gatewayHealth=live` + : undefined, + queueEmbeddedPiMessageWithOutcome: (sessionId: string, text: string, options?: unknown) => + piEmbeddedMocks.queueEmbeddedPiMessageWithOutcome(sessionId, text, options), })); const providerUsageMocks = vi.hoisted(() => ({ @@ -246,7 +261,14 @@ export async function withTempHome(fn: (home: string) => Promise): Promise piEmbeddedMocks.runEmbeddedPiAgent.mockReset(); piEmbeddedMocks.abortEmbeddedPiRun.mockReset().mockReturnValue(false); piEmbeddedMocks.compactEmbeddedPiSession.mockReset(); - piEmbeddedMocks.queueEmbeddedPiMessage.mockReset().mockReturnValue(false); + piEmbeddedMocks.queueEmbeddedPiMessageWithOutcome + .mockReset() + .mockImplementation((sessionId: string) => ({ + queued: false, + sessionId, + reason: "not_streaming", + gatewayHealth: "live", + })); piEmbeddedMocks.isEmbeddedPiRunActive.mockReset().mockReturnValue(false); piEmbeddedMocks.isEmbeddedPiRunStreaming.mockReset().mockReturnValue(false); modelFallbackMocks.runWithModelFallback.mockClear();