mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-13 15:47:28 +00:00
fix(tasks): unify completion delivery routing
Route group/channel task, subagent, and media completions through one requester-session delivery policy, including plugin legacy session keys. Also keeps current Zalo lifecycle test typing green on CI after the latest main changes. Co-authored-by: Merlin <merlin@funcracker.net>
This commit is contained in:
@@ -79,6 +79,7 @@ Skills own workflows; root owns hard policy and routing.
|
||||
- PR create: real body required. Include Summary + Verification; mention refs, behavior, and proof.
|
||||
- PR artifacts/screenshots: attach to PR/comment/external artifact store. Do not commit `.github/pr-assets`.
|
||||
- CI polling: exact SHA, relevant checks only, minimal fields. Skip routine noise (`Auto response`, `Labeler`, docs agents, performance/stale). Logs only after failure/completion or concrete need.
|
||||
- Maintainers: ignore `Real behavior proof` failures that only say PR body lacks real after-fix evidence.
|
||||
- `/landpr`: use `~/.codex/prompts/landpr.md`; do not idle on `auto-response` or `check-docs`.
|
||||
|
||||
## Code
|
||||
|
||||
@@ -40,6 +40,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Auth/Claude CLI: persist fresher managed external CLI OAuth credentials back to `auth-profiles.json`, preventing stale `anthropic:claude-cli` profiles from repeatedly bootstrapping and flooding debug logs. Fixes #80129. Thanks @Caulderein.
|
||||
- Context: render `/context map` only from actual run context and persist Codex app-server run reports without counting deferred tool-search schemas as prompt-loaded tool schemas.
|
||||
- Codex app-server: report Codex-native tool execution to diagnostics so long-running native `bash`, web, file, and MCP tools no longer look like stale embedded runs to the watchdog. (#80217)
|
||||
- Tasks: route group and channel task completions through the requester session so the parent agent can send the visible summary instead of stopping at a generic task-status line. Fixes #77251. (#77365) Thanks @funmerlin.
|
||||
- Telegram: preserve blank lines between manually indented bullet blocks and following numbered sections in rendered replies. Fixes #76998. Thanks @evgyur.
|
||||
- Slack: pass configured agent identity through draft preview sends so partial streaming replies keep custom username/avatar on the initial Slack message. Fixes #38235. (#38237) Thanks @lacymorrow.
|
||||
- Slack: support `allowBots: "mentions"` for bot-authored messages that mention the receiving bot, matching the documented Discord-style mode without accepting every bot message. Fixes #43587. (#43588) Thanks @raw34.
|
||||
|
||||
@@ -162,7 +162,7 @@ Agent run completion is authoritative for active task records. A successful deta
|
||||
|
||||
When a task reaches a terminal state, OpenClaw notifies you. There are two delivery paths:
|
||||
|
||||
**Direct delivery** - if the task has a channel target (the `requesterOrigin`), the completion message goes straight to that channel (Telegram, Discord, Slack, etc.). For subagent completions, OpenClaw also preserves bound thread/topic routing when available and can fill a missing `to` / account from the requester session's stored route (`lastChannel` / `lastTo` / `lastAccountId`) before giving up on direct delivery.
|
||||
**Direct delivery** - if the task has a channel target (the `requesterOrigin`), the completion message goes straight to that channel (Telegram, Discord, Slack, etc.). Group and channel task completions are instead routed through the requester session so the parent agent can write the visible reply. For subagent completions, OpenClaw also preserves bound thread/topic routing when available and can fill a missing `to` / account from the requester session's stored route (`lastChannel` / `lastTo` / `lastAccountId`) before giving up on direct delivery.
|
||||
|
||||
**Session-queued delivery** - if direct delivery fails or no origin is set, the update is queued as a system event in the requester's session and surfaces on the next heartbeat.
|
||||
|
||||
|
||||
@@ -84,11 +84,15 @@ describe("Zalo pairing lifecycle", () => {
|
||||
meta: { name: "Unauthorized User" },
|
||||
});
|
||||
expect(sendMessageMock).toHaveBeenCalledTimes(1);
|
||||
const pairingTextCall = sendMessageMock.mock.calls[0];
|
||||
expect(pairingTextCall?.[0]).toBe("zalo-token");
|
||||
expect(pairingTextCall?.[1]?.chat_id).toBe("dm-pairing-1");
|
||||
expect(pairingTextCall?.[1]?.text).toContain("PAIRCODE");
|
||||
expect(pairingTextCall?.[2]).toBeUndefined();
|
||||
const [sendToken, sendPayload, sendOptions] = sendMessageMock.mock.calls[0] as [
|
||||
string,
|
||||
{ chat_id?: string; text?: string },
|
||||
unknown,
|
||||
];
|
||||
expect(sendToken).toBe("zalo-token");
|
||||
expect(sendPayload.chat_id).toBe("dm-pairing-1");
|
||||
expect(sendPayload.text).toContain("PAIRCODE");
|
||||
expect(sendOptions).toBeUndefined();
|
||||
} finally {
|
||||
await monitor.stop();
|
||||
}
|
||||
|
||||
@@ -16,7 +16,9 @@ import {
|
||||
|
||||
describe("Zalo reply-once lifecycle", () => {
|
||||
const finalizeInboundContextMock = vi.fn((ctx: Record<string, unknown>) => ctx);
|
||||
const recordInboundSessionMock = vi.fn(async () => undefined);
|
||||
const recordInboundSessionMock = vi.fn(
|
||||
async (_input: { sessionKey?: string; ctx?: Record<string, unknown> }) => undefined,
|
||||
);
|
||||
const resolveAgentRouteMock = vi.fn(() => ({
|
||||
agentId: "main",
|
||||
channel: "zalo",
|
||||
@@ -101,14 +103,15 @@ describe("Zalo reply-once lifecycle", () => {
|
||||
expect(recordArgs?.ctx?.To).toBe("zalo:dm-chat-1");
|
||||
expect(recordArgs?.ctx?.MessageSid).toContain("zalo-replay-");
|
||||
expect(sendMessageMock).toHaveBeenCalledTimes(1);
|
||||
expect(sendMessageMock).toHaveBeenCalledWith(
|
||||
"zalo-token",
|
||||
{
|
||||
chat_id: "dm-chat-1",
|
||||
text: "zalo reply once",
|
||||
},
|
||||
undefined,
|
||||
);
|
||||
const [sendToken, sendPayload, sendOptions] = sendMessageMock.mock.calls[0] as [
|
||||
string,
|
||||
{ chat_id?: string; text?: string },
|
||||
unknown,
|
||||
];
|
||||
expect(sendToken).toBe("zalo-token");
|
||||
expect(sendPayload.chat_id).toBe("dm-chat-1");
|
||||
expect(sendPayload.text).toBe("zalo reply once");
|
||||
expect(sendOptions).toBeUndefined();
|
||||
} finally {
|
||||
await monitor.stop();
|
||||
}
|
||||
|
||||
@@ -235,6 +235,13 @@ async function deliverSlackChannelAnnouncement(params: {
|
||||
sessionId: string;
|
||||
expectsCompletionMessage: boolean;
|
||||
directIdempotencyKey: string;
|
||||
requesterSessionKey?: string;
|
||||
requesterOrigin?: {
|
||||
channel?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
threadId?: string | number;
|
||||
};
|
||||
completionDirectOrigin?: {
|
||||
channel?: string;
|
||||
to?: string;
|
||||
@@ -265,14 +272,14 @@ async function deliverSlackChannelAnnouncement(params: {
|
||||
});
|
||||
|
||||
return deliverSubagentAnnouncement({
|
||||
requesterSessionKey: "agent:main:slack:channel:C123",
|
||||
targetRequesterSessionKey: "agent:main:slack:channel:C123",
|
||||
requesterSessionKey: params.requesterSessionKey ?? "agent:main:slack:channel:C123",
|
||||
targetRequesterSessionKey: params.requesterSessionKey ?? "agent:main:slack:channel:C123",
|
||||
triggerMessage: "child done",
|
||||
steerMessage: "child done",
|
||||
requesterOrigin: origin,
|
||||
requesterSessionOrigin: origin,
|
||||
completionDirectOrigin: params.completionDirectOrigin ?? origin,
|
||||
directOrigin: origin,
|
||||
requesterOrigin: params.requesterOrigin ?? origin,
|
||||
requesterSessionOrigin: params.requesterOrigin ?? origin,
|
||||
completionDirectOrigin: params.completionDirectOrigin ?? params.requesterOrigin ?? origin,
|
||||
directOrigin: params.requesterOrigin ?? origin,
|
||||
requesterIsSubagent: false,
|
||||
expectsCompletionMessage: params.expectsCompletionMessage,
|
||||
bestEffortDeliver: true,
|
||||
@@ -1241,6 +1248,74 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
|
||||
expect(sendMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it.each([
|
||||
{
|
||||
name: "legacy Discord channel",
|
||||
requesterSessionKey: "agent:main:discord:guild-123:channel-456",
|
||||
origin: { channel: "discord", to: "channel:456", accountId: "acct-1" },
|
||||
},
|
||||
{
|
||||
name: "legacy WhatsApp group",
|
||||
requesterSessionKey: "agent:main:whatsapp:123@g.us",
|
||||
origin: { channel: "whatsapp", to: "123@g.us", accountId: "acct-1" },
|
||||
},
|
||||
])(
|
||||
"requires message-tool delivery for generated media completions in $name sessions",
|
||||
async ({ requesterSessionKey, origin }) => {
|
||||
const callGateway = createGatewayMock({
|
||||
result: {
|
||||
payloads: [
|
||||
{
|
||||
text: "The track is ready.",
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
const sendMessage = createSendMessageMock();
|
||||
const result = await deliverSlackChannelAnnouncement({
|
||||
callGateway,
|
||||
sendMessage,
|
||||
sessionId: "requester-session-legacy-group",
|
||||
isActive: false,
|
||||
expectsCompletionMessage: true,
|
||||
directIdempotencyKey: `announce-legacy-media-message-tool-${origin.channel}`,
|
||||
requesterSessionKey,
|
||||
requesterOrigin: origin,
|
||||
sourceTool: "music_generate",
|
||||
internalEvents: [
|
||||
{
|
||||
type: "task_completion",
|
||||
source: "music_generation",
|
||||
childSessionKey: "music_generate:task-123",
|
||||
childSessionId: "task-123",
|
||||
announceType: "music generation task",
|
||||
taskLabel: "night-drive synthwave",
|
||||
status: "ok",
|
||||
statusLabel: "completed successfully",
|
||||
result: "Generated 1 track.\nMEDIA:/tmp/generated-night-drive.mp3",
|
||||
mediaUrls: ["/tmp/generated-night-drive.mp3"],
|
||||
replyInstruction:
|
||||
"Tell the user the music is ready. If visible source delivery requires the message tool, send it there with the generated media attached.",
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
expectRecordFields(result, {
|
||||
delivered: false,
|
||||
path: "direct",
|
||||
error: "completion agent did not deliver through the message tool",
|
||||
});
|
||||
expectGatewayAgentParams(callGateway, {
|
||||
deliver: false,
|
||||
channel: origin.channel,
|
||||
accountId: "acct-1",
|
||||
to: origin.to,
|
||||
threadId: undefined,
|
||||
});
|
||||
expect(sendMessage).not.toHaveBeenCalled();
|
||||
},
|
||||
);
|
||||
|
||||
it("does not fallback for generated media group completions when message tool evidence exists", async () => {
|
||||
const callGateway = createGatewayMock({
|
||||
result: {
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
import { normalizeChatType } from "../channels/chat-type.js";
|
||||
import { completionRequiresMessageToolDelivery } from "../auto-reply/reply/completion-delivery-policy.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import type { ConversationRef } from "../infra/outbound/session-binding-service.js";
|
||||
import { stringifyRouteThreadId } from "../plugin-sdk/channel-route.js";
|
||||
import { normalizeAccountId } from "../routing/session-key.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import { deriveSessionChatTypeFromKey } from "../sessions/session-chat-type-shared.js";
|
||||
import { isCronSessionKey } from "../sessions/session-key-utils.js";
|
||||
import { isNonTerminalAgentRunStatus } from "../shared/agent-run-status.js";
|
||||
import { normalizeOptionalLowercaseString } from "../shared/string-coerce.js";
|
||||
@@ -585,61 +584,6 @@ function isGatewayAgentRunPending(response: unknown): boolean {
|
||||
return isNonTerminalAgentRunStatus(status);
|
||||
}
|
||||
|
||||
function inferCompletionChatType(params: {
|
||||
requesterSessionKey: string;
|
||||
targetRequesterSessionKey: string;
|
||||
requesterEntry?: {
|
||||
chatType?: string | null;
|
||||
origin?: { chatType?: string | null };
|
||||
};
|
||||
directOrigin?: DeliveryContext;
|
||||
requesterSessionOrigin?: DeliveryContext;
|
||||
}): "direct" | "group" | "channel" | "unknown" {
|
||||
const explicit = normalizeChatType(
|
||||
params.requesterEntry?.chatType ?? params.requesterEntry?.origin?.chatType ?? undefined,
|
||||
);
|
||||
if (explicit) {
|
||||
return explicit;
|
||||
}
|
||||
for (const key of [params.targetRequesterSessionKey, params.requesterSessionKey]) {
|
||||
const derived = deriveSessionChatTypeFromKey(key);
|
||||
if (derived !== "unknown") {
|
||||
return derived;
|
||||
}
|
||||
}
|
||||
const target = params.directOrigin?.to ?? params.requesterSessionOrigin?.to;
|
||||
if (target?.startsWith("group:")) {
|
||||
return "group";
|
||||
}
|
||||
if (target?.startsWith("channel:")) {
|
||||
return "channel";
|
||||
}
|
||||
if (target?.startsWith("dm:")) {
|
||||
return "direct";
|
||||
}
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
function completionRequiresMessageToolDelivery(params: {
|
||||
cfg: OpenClawConfig;
|
||||
requesterSessionKey: string;
|
||||
targetRequesterSessionKey: string;
|
||||
requesterEntry?: {
|
||||
chatType?: string | null;
|
||||
origin?: { chatType?: string | null };
|
||||
};
|
||||
directOrigin?: DeliveryContext;
|
||||
requesterSessionOrigin?: DeliveryContext;
|
||||
}): boolean {
|
||||
const chatType = inferCompletionChatType(params);
|
||||
if (chatType === "group" || chatType === "channel") {
|
||||
const configuredMode =
|
||||
params.cfg.messages?.groupChat?.visibleReplies ?? params.cfg.messages?.visibleReplies;
|
||||
return configuredMode !== "automatic";
|
||||
}
|
||||
return params.cfg.messages?.visibleReplies === "message_tool";
|
||||
}
|
||||
|
||||
function stripNonDeliverableChannelForCompletionOrigin(
|
||||
context?: DeliveryContext,
|
||||
): DeliveryContext | undefined {
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import crypto from "node:crypto";
|
||||
import { completionRequiresMessageToolDelivery } from "../../auto-reply/reply/completion-delivery-policy.js";
|
||||
import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { clearAgentRunContext, registerAgentRunContext } from "../../infra/agent-events.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { deriveSessionChatTypeFromKey } from "../../sessions/session-chat-type-shared.js";
|
||||
import {
|
||||
completeTaskRunByRunId,
|
||||
createRunningTaskRun,
|
||||
@@ -245,39 +245,6 @@ function buildMediaGenerationReplyInstruction(params: {
|
||||
].join(" ");
|
||||
}
|
||||
|
||||
function inferMediaGenerationCompletionChatType(
|
||||
handle: MediaGenerationTaskHandle,
|
||||
): "direct" | "group" | "channel" | "unknown" {
|
||||
const sessionKeyChatType = deriveSessionChatTypeFromKey(handle.requesterSessionKey);
|
||||
if (sessionKeyChatType !== "unknown") {
|
||||
return sessionKeyChatType;
|
||||
}
|
||||
const to = handle.requesterOrigin?.to?.trim().toLowerCase();
|
||||
if (to?.startsWith("group:")) {
|
||||
return "group";
|
||||
}
|
||||
if (to?.startsWith("channel:")) {
|
||||
return "channel";
|
||||
}
|
||||
if (to?.startsWith("dm:") || to?.startsWith("direct:")) {
|
||||
return "direct";
|
||||
}
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
function mediaGenerationCompletionRequiresMessageToolDelivery(params: {
|
||||
config?: OpenClawConfig;
|
||||
handle: MediaGenerationTaskHandle;
|
||||
}): boolean {
|
||||
const chatType = inferMediaGenerationCompletionChatType(params.handle);
|
||||
if (chatType === "group" || chatType === "channel") {
|
||||
const configuredMode =
|
||||
params.config?.messages?.groupChat?.visibleReplies ?? params.config?.messages?.visibleReplies;
|
||||
return configuredMode !== "automatic";
|
||||
}
|
||||
return params.config?.messages?.visibleReplies === "message_tool";
|
||||
}
|
||||
|
||||
async function wakeMediaGenerationTaskCompletion(params: {
|
||||
config?: OpenClawConfig;
|
||||
handle: MediaGenerationTaskHandle | null;
|
||||
@@ -311,9 +278,10 @@ async function wakeMediaGenerationTaskCompletion(params: {
|
||||
replyInstruction: buildMediaGenerationReplyInstruction({
|
||||
status: params.status,
|
||||
completionLabel: params.completionLabel,
|
||||
requiresMessageToolDelivery: mediaGenerationCompletionRequiresMessageToolDelivery({
|
||||
config: params.config,
|
||||
handle: params.handle,
|
||||
requiresMessageToolDelivery: completionRequiresMessageToolDelivery({
|
||||
cfg: params.config ?? {},
|
||||
requesterSessionKey: params.handle.requesterSessionKey,
|
||||
directOrigin: params.handle.requesterOrigin,
|
||||
}),
|
||||
}),
|
||||
},
|
||||
|
||||
@@ -139,6 +139,33 @@ describe("music generate background helpers", () => {
|
||||
expectReplyInstructionContains("Do not put MEDIA: lines only in your final answer");
|
||||
});
|
||||
|
||||
it.each(["agent:main:discord:guild-123:channel-456", "agent:main:whatsapp:123@g.us"])(
|
||||
"warns legacy group/channel completion agents for %s",
|
||||
async (requesterSessionKey) => {
|
||||
announceDeliveryMocks.deliverSubagentAnnouncement.mockResolvedValue({
|
||||
delivered: true,
|
||||
path: "direct",
|
||||
});
|
||||
const completion = createMediaCompletionFixture({
|
||||
runId: "tool:music_generate:abc",
|
||||
taskLabel: "night-drive synthwave",
|
||||
result: "Generated 1 track.\nMEDIA:/tmp/generated-night-drive.mp3",
|
||||
mediaUrls: ["/tmp/generated-night-drive.mp3"],
|
||||
});
|
||||
|
||||
await wakeMusicGenerationTaskCompletion({
|
||||
...completion,
|
||||
handle: {
|
||||
...completion.handle,
|
||||
requesterSessionKey,
|
||||
},
|
||||
});
|
||||
|
||||
expectReplyInstructionContains("the user will NOT see your normal assistant final reply");
|
||||
expectReplyInstructionContains("Do not put MEDIA: lines only in your final answer");
|
||||
},
|
||||
);
|
||||
|
||||
it("queues a completion event when direct send is enabled globally", async () => {
|
||||
taskDeliveryRuntimeMocks.sendMessage.mockResolvedValue({
|
||||
channel: "discord",
|
||||
|
||||
110
src/auto-reply/reply/completion-delivery-policy.test.ts
Normal file
110
src/auto-reply/reply/completion-delivery-policy.test.ts
Normal file
@@ -0,0 +1,110 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
completionRequiresMessageToolDelivery,
|
||||
resolveCompletionChatType,
|
||||
shouldRouteCompletionThroughRequesterSession,
|
||||
} from "./completion-delivery-policy.js";
|
||||
|
||||
describe("completion delivery policy", () => {
|
||||
it.each([
|
||||
{
|
||||
name: "canonical group key",
|
||||
requesterSessionKey: "agent:main:telegram:group:-100123",
|
||||
expected: "group",
|
||||
},
|
||||
{
|
||||
name: "canonical channel key",
|
||||
requesterSessionKey: "agent:main:slack:channel:C123",
|
||||
expected: "channel",
|
||||
},
|
||||
{
|
||||
name: "canonical direct key",
|
||||
requesterSessionKey: "agent:main:discord:dm:U123",
|
||||
expected: "direct",
|
||||
},
|
||||
{
|
||||
name: "legacy Discord guild channel key",
|
||||
requesterSessionKey: "agent:main:discord:guild-123:channel-456",
|
||||
expected: "channel",
|
||||
},
|
||||
{
|
||||
name: "legacy WhatsApp group key",
|
||||
requesterSessionKey: "agent:main:whatsapp:123@g.us",
|
||||
expected: "group",
|
||||
},
|
||||
])("infers $name", ({ requesterSessionKey, expected }) => {
|
||||
expect(resolveCompletionChatType({ requesterSessionKey })).toBe(expected);
|
||||
});
|
||||
|
||||
it("prefers explicit session chat type over key inference", () => {
|
||||
expect(
|
||||
resolveCompletionChatType({
|
||||
requesterSessionKey: "agent:main:slack:channel:C123",
|
||||
requesterEntry: { chatType: "direct" },
|
||||
}),
|
||||
).toBe("direct");
|
||||
});
|
||||
|
||||
it.each([
|
||||
{ to: "group:ops", expected: "group" },
|
||||
{ to: "channel:C123", expected: "channel" },
|
||||
{ to: "thread:171.222", expected: "channel" },
|
||||
{ to: "dm:U123", expected: "direct" },
|
||||
{ to: "direct:U123", expected: "direct" },
|
||||
{ to: "user:U123", expected: "direct" },
|
||||
] as const)("falls back to origin target prefix $to", ({ to, expected }) => {
|
||||
expect(
|
||||
resolveCompletionChatType({
|
||||
requesterSessionKey: "agent:main:opaque:unknown-target",
|
||||
directOrigin: { channel: "test", to },
|
||||
}),
|
||||
).toBe(expected);
|
||||
});
|
||||
|
||||
it("requires message-tool delivery for group and channel completions by default", () => {
|
||||
expect(
|
||||
completionRequiresMessageToolDelivery({
|
||||
cfg: {},
|
||||
requesterSessionKey: "agent:main:whatsapp:123@g.us",
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
completionRequiresMessageToolDelivery({
|
||||
cfg: {},
|
||||
requesterSessionKey: "agent:main:discord:guild-123:channel-456",
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("honors automatic group visible-reply config", () => {
|
||||
expect(
|
||||
completionRequiresMessageToolDelivery({
|
||||
cfg: { messages: { groupChat: { visibleReplies: "automatic" } } },
|
||||
requesterSessionKey: "agent:main:slack:channel:C123",
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("requires message-tool delivery for direct completions only when globally configured", () => {
|
||||
expect(
|
||||
completionRequiresMessageToolDelivery({
|
||||
cfg: {},
|
||||
requesterSessionKey: "agent:main:discord:dm:U123",
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(
|
||||
completionRequiresMessageToolDelivery({
|
||||
cfg: { messages: { visibleReplies: "message_tool" } },
|
||||
requesterSessionKey: "agent:main:discord:dm:U123",
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("routes group and channel task completions through the requester session", () => {
|
||||
expect(shouldRouteCompletionThroughRequesterSession("agent:main:whatsapp:123@g.us")).toBe(true);
|
||||
expect(
|
||||
shouldRouteCompletionThroughRequesterSession("agent:main:discord:guild-123:channel-456"),
|
||||
).toBe(true);
|
||||
expect(shouldRouteCompletionThroughRequesterSession("agent:main:discord:dm:U123")).toBe(false);
|
||||
});
|
||||
});
|
||||
86
src/auto-reply/reply/completion-delivery-policy.ts
Normal file
86
src/auto-reply/reply/completion-delivery-policy.ts
Normal file
@@ -0,0 +1,86 @@
|
||||
import { normalizeChatType, type ChatType } from "../../channels/chat-type.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { deriveSessionChatType } from "../../sessions/session-chat-type.js";
|
||||
import type { DeliveryContext } from "../../utils/delivery-context.types.js";
|
||||
import { resolveSourceReplyDeliveryMode } from "./source-reply-delivery-mode.js";
|
||||
|
||||
export type CompletionChatType = ChatType | "unknown";
|
||||
|
||||
export type CompletionDeliverySessionEntry = {
|
||||
chatType?: string | null;
|
||||
origin?: { chatType?: string | null } | null;
|
||||
};
|
||||
|
||||
export function resolveCompletionChatType(params: {
|
||||
requesterSessionKey?: string | null;
|
||||
targetRequesterSessionKey?: string | null;
|
||||
requesterEntry?: CompletionDeliverySessionEntry;
|
||||
directOrigin?: DeliveryContext;
|
||||
requesterSessionOrigin?: DeliveryContext;
|
||||
}): CompletionChatType {
|
||||
const explicit = normalizeChatType(
|
||||
params.requesterEntry?.chatType ?? params.requesterEntry?.origin?.chatType ?? undefined,
|
||||
);
|
||||
if (explicit) {
|
||||
return explicit;
|
||||
}
|
||||
|
||||
for (const key of [params.targetRequesterSessionKey, params.requesterSessionKey]) {
|
||||
const derived = deriveSessionChatType(key);
|
||||
if (derived !== "unknown") {
|
||||
return derived;
|
||||
}
|
||||
}
|
||||
|
||||
return inferCompletionChatTypeFromTarget(
|
||||
params.directOrigin?.to ?? params.requesterSessionOrigin?.to,
|
||||
);
|
||||
}
|
||||
|
||||
export function completionRequiresMessageToolDelivery(params: {
|
||||
cfg: OpenClawConfig;
|
||||
requesterSessionKey?: string | null;
|
||||
targetRequesterSessionKey?: string | null;
|
||||
requesterEntry?: CompletionDeliverySessionEntry;
|
||||
directOrigin?: DeliveryContext;
|
||||
requesterSessionOrigin?: DeliveryContext;
|
||||
messageToolAvailable?: boolean;
|
||||
}): boolean {
|
||||
return (
|
||||
resolveSourceReplyDeliveryMode({
|
||||
cfg: params.cfg,
|
||||
ctx: {
|
||||
ChatType: resolveCompletionChatType(params),
|
||||
},
|
||||
messageToolAvailable: params.messageToolAvailable,
|
||||
}) === "message_tool_only"
|
||||
);
|
||||
}
|
||||
|
||||
export function shouldRouteCompletionThroughRequesterSession(
|
||||
sessionKey: string | undefined | null,
|
||||
): boolean {
|
||||
const chatType = deriveSessionChatType(sessionKey);
|
||||
return chatType === "group" || chatType === "channel";
|
||||
}
|
||||
|
||||
function inferCompletionChatTypeFromTarget(to: string | undefined): CompletionChatType {
|
||||
const normalized = to?.trim().toLowerCase();
|
||||
if (!normalized) {
|
||||
return "unknown";
|
||||
}
|
||||
if (normalized.startsWith("group:")) {
|
||||
return "group";
|
||||
}
|
||||
if (normalized.startsWith("channel:") || normalized.startsWith("thread:")) {
|
||||
return "channel";
|
||||
}
|
||||
if (
|
||||
normalized.startsWith("dm:") ||
|
||||
normalized.startsWith("direct:") ||
|
||||
normalized.startsWith("user:")
|
||||
) {
|
||||
return "direct";
|
||||
}
|
||||
return "unknown";
|
||||
}
|
||||
@@ -1,29 +1,65 @@
|
||||
import { iterateBootstrapChannelPlugins } from "../channels/plugins/bootstrap-registry.js";
|
||||
import { getBootstrapChannelPlugin } from "../channels/plugins/bootstrap-registry.js";
|
||||
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
|
||||
import {
|
||||
deriveSessionChatTypeFromKey,
|
||||
type SessionKeyChatType,
|
||||
} from "./session-chat-type-shared.js";
|
||||
import { parseAgentSessionKey } from "./session-key-utils.js";
|
||||
|
||||
export {
|
||||
deriveSessionChatTypeFromKey,
|
||||
type SessionKeyChatType,
|
||||
} from "./session-chat-type-shared.js";
|
||||
|
||||
type LegacySessionChatTypeDeriver = NonNullable<
|
||||
NonNullable<ReturnType<typeof getBootstrapChannelPlugin>>["messaging"]
|
||||
>["deriveLegacySessionChatType"];
|
||||
|
||||
function resolveScopedSessionKey(sessionKey: string | undefined | null): string {
|
||||
const raw = normalizeLowercaseStringOrEmpty(sessionKey);
|
||||
if (!raw) {
|
||||
return "";
|
||||
}
|
||||
return parseAgentSessionKey(raw)?.rest ?? raw;
|
||||
}
|
||||
|
||||
function collectLegacyChatTypeCandidatePluginIds(scopedSessionKey: string): string[] {
|
||||
const ids = new Set<string>();
|
||||
const firstToken = scopedSessionKey.split(":").find(Boolean);
|
||||
if (firstToken) {
|
||||
ids.add(firstToken);
|
||||
}
|
||||
if (scopedSessionKey.includes("@g.us")) {
|
||||
ids.add("whatsapp");
|
||||
}
|
||||
return Array.from(ids);
|
||||
}
|
||||
|
||||
function derivePluginLegacySessionChatType(
|
||||
scopedSessionKey: string,
|
||||
deriveLegacySessionChatType: LegacySessionChatTypeDeriver,
|
||||
): SessionKeyChatType | undefined {
|
||||
if (!deriveLegacySessionChatType) {
|
||||
return undefined;
|
||||
}
|
||||
return deriveLegacySessionChatType(scopedSessionKey);
|
||||
}
|
||||
|
||||
export function deriveSessionChatType(sessionKey: string | undefined | null): SessionKeyChatType {
|
||||
const builtInType = deriveSessionChatTypeFromKey(sessionKey);
|
||||
if (builtInType !== "unknown") {
|
||||
return builtInType;
|
||||
}
|
||||
|
||||
return deriveSessionChatTypeFromKey(
|
||||
sessionKey,
|
||||
Array.from(iterateBootstrapChannelPlugins())
|
||||
.map((plugin) => plugin.messaging?.deriveLegacySessionChatType)
|
||||
.filter(
|
||||
(
|
||||
deriveLegacySessionChatType,
|
||||
): deriveLegacySessionChatType is NonNullable<typeof deriveLegacySessionChatType> =>
|
||||
Boolean(deriveLegacySessionChatType),
|
||||
),
|
||||
);
|
||||
const scopedSessionKey = resolveScopedSessionKey(sessionKey);
|
||||
for (const pluginId of collectLegacyChatTypeCandidatePluginIds(scopedSessionKey)) {
|
||||
const derived = derivePluginLegacySessionChatType(
|
||||
scopedSessionKey,
|
||||
getBootstrapChannelPlugin(pluginId)?.messaging?.deriveLegacySessionChatType,
|
||||
);
|
||||
if (derived) {
|
||||
return derived;
|
||||
}
|
||||
}
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
@@ -920,6 +920,87 @@ describe("task-registry", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it.each([
|
||||
{
|
||||
id: "channel",
|
||||
name: "room channel",
|
||||
ownerKey: "agent:main:guildchat:channel:123",
|
||||
target: "guildchat:channel:123",
|
||||
},
|
||||
{
|
||||
id: "group",
|
||||
name: "group",
|
||||
ownerKey: "agent:main:guildchat:group:123",
|
||||
target: "guildchat:group:123",
|
||||
},
|
||||
{
|
||||
id: "topic",
|
||||
name: "group topic",
|
||||
ownerKey: "agent:main:guildchat:group:-100123:topic:42",
|
||||
target: "guildchat:group:-100123:topic:42",
|
||||
},
|
||||
{
|
||||
id: "discord-legacy-channel",
|
||||
name: "legacy Discord channel",
|
||||
ownerKey: "agent:main:discord:guild-123:channel-456",
|
||||
target: "guildchat:channel:456",
|
||||
},
|
||||
{
|
||||
id: "whatsapp-legacy-group",
|
||||
name: "legacy WhatsApp group",
|
||||
ownerKey: "agent:main:whatsapp:123@g.us",
|
||||
target: "guildchat:group:123@g.us",
|
||||
},
|
||||
])("routes $name ACP completion through the parent session", async ({ id, ownerKey, target }) => {
|
||||
await withTaskRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
resetTaskRegistryForTests();
|
||||
const runId = `run-group-terminal-${id}`;
|
||||
hoisted.sendMessageMock.mockResolvedValue({
|
||||
channel: "guildchat",
|
||||
to: target,
|
||||
via: "direct",
|
||||
});
|
||||
|
||||
createTaskRecord({
|
||||
runtime: "acp",
|
||||
ownerKey,
|
||||
scopeKind: "session",
|
||||
requesterOrigin: {
|
||||
channel: "guildchat",
|
||||
to: target,
|
||||
},
|
||||
childSessionKey: "agent:main:acp:child",
|
||||
runId,
|
||||
task: "Investigate issue",
|
||||
status: "running",
|
||||
deliveryStatus: "pending",
|
||||
startedAt: 100,
|
||||
});
|
||||
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "lifecycle",
|
||||
data: {
|
||||
phase: "end",
|
||||
endedAt: 250,
|
||||
},
|
||||
});
|
||||
|
||||
await waitForAssertion(() =>
|
||||
expect(findTaskByRunId(runId)).toMatchObject({
|
||||
status: "succeeded",
|
||||
deliveryStatus: "session_queued",
|
||||
}),
|
||||
);
|
||||
expect(hoisted.sendMessageMock).not.toHaveBeenCalled();
|
||||
expect(peekSystemEvents(ownerKey)).toEqual([
|
||||
expect.stringContaining("Background task done: ACP background task"),
|
||||
]);
|
||||
expect(hasPendingHeartbeatWake()).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
it("records delivery failure and queues a session fallback when direct delivery misses", async () => {
|
||||
await withTaskRegistryTempDir(async (root) => {
|
||||
process.env.OPENCLAW_STATE_DIR = root;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import crypto from "node:crypto";
|
||||
import { createRequire } from "node:module";
|
||||
import { shouldRouteCompletionThroughRequesterSession } from "../auto-reply/reply/completion-delivery-policy.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { onAgentEvent } from "../infra/agent-events.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
@@ -1055,7 +1056,11 @@ function getTaskDeliveryState(taskId: string): TaskDeliveryState | undefined {
|
||||
}
|
||||
|
||||
function canDeliverTaskToRequesterOrigin(task: TaskRecord): boolean {
|
||||
const origin = resolveTaskDeliveryOwner(task).requesterOrigin;
|
||||
const owner = resolveTaskDeliveryOwner(task);
|
||||
if (shouldRouteCompletionThroughRequesterSession(owner.sessionKey)) {
|
||||
return false;
|
||||
}
|
||||
const origin = owner.requesterOrigin;
|
||||
const channel = origin?.channel?.trim();
|
||||
const to = origin?.to?.trim();
|
||||
return Boolean(channel && to && isDeliverableMessageChannel(channel));
|
||||
|
||||
Reference in New Issue
Block a user