diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a759e1efe5..ca57ef3cc65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ Docs: https://docs.openclaw.ai - Plugin SDK: add bundled-plugin session actions, `sendSessionAttachment`, and Cron-backed `scheduleSessionTurn`/tag cleanup under the grouped session namespace. Replaces #75578/#75581/#75588 and part of #73384/#74483. Thanks @100yenadmin. - Plugin SDK/media-understanding: add `extractStructuredWithModel(...)` plus the optional provider-side `extractStructured(...)` seam so trusted plugins can run bounded image-first structured extraction with optional supplemental text context through provider-owned runtimes such as Codex. - Exec approvals: add `tools.exec.commandHighlighting` so parser-derived command highlighting in approval prompts can be enabled globally or per agent. (#79348) Thanks @jesse-merhi. +- Codex app-server: mirror native Codex subagent spawn lifecycle events into Task Registry so app-server child agents appear in task/status surfaces without relying on transcript text. (#79512) Thanks @mbelinky. ### Fixes diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index dbc60115afc..6a5f80d5606 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -19455aee06dd33e2679cfcd8075b10cce806069667097fd7e717aa641c262e51 plugin-sdk-api-baseline.json -ea6e0b36ab14977bed8dcf64118e58a8e58a76f41860c32055a73bcd04612826 plugin-sdk-api-baseline.jsonl +a79e7eca306cd4a0156699b0884aa0483b7eb1352158a7bebf806506ead41a66 plugin-sdk-api-baseline.json +ba307375a0714be1360fceb564e89d86724f8ad5a50a18377184254112c99128 plugin-sdk-api-baseline.jsonl diff --git a/extensions/codex/src/app-server/event-projector.test.ts b/extensions/codex/src/app-server/event-projector.test.ts index a8e27137200..bc101506cf8 100644 --- a/extensions/codex/src/app-server/event-projector.test.ts +++ b/extensions/codex/src/app-server/event-projector.test.ts @@ -20,6 +20,7 @@ import { type CodexAppServerEventProjectorOptions, type CodexAppServerToolTelemetry, } from "./event-projector.js"; +import { CodexNativeSubagentTaskMirror } from "./native-subagent-task-mirror.js"; import { rememberCodexRateLimits, resetCodexRateLimitCacheForTests } from "./rate-limit-cache.js"; import { createCodexTestModel } from "./test-support.js"; @@ -643,6 +644,36 @@ describe("CodexAppServerEventProjector", () => { expect(result.assistantTexts).toStrictEqual([]); }); + it("mirrors native subagent notifications before current-turn filtering", async () => { + const projector = await createProjector({ + ...(await createParams()), + sessionKey: "agent:main:main", + } as EmbeddedRunAttemptParams); + const mirrorSpy = vi.spyOn(CodexNativeSubagentTaskMirror.prototype, "handleNotification"); + const notification = { + method: "item/completed", + params: { + threadId: THREAD_ID, + turnId: "child-turn", + item: { + type: "collabAgentToolCall", + tool: "spawnAgent", + senderThreadId: THREAD_ID, + receiverThreadIds: ["child-thread"], + agentsStates: { + "child-thread": { status: "completed", message: "done" }, + }, + }, + }, + } as ProjectorNotification; + + await projector.handleNotification(notification); + + expect(mirrorSpy).toHaveBeenCalledWith(notification); + const result = projector.buildResult(buildEmptyToolTelemetry()); + expect(result.assistantTexts).toEqual([]); + }); + it("ignores notifications that omit top-level thread and turn ids", async () => { const projector = await createProjector(); diff --git a/extensions/codex/src/app-server/event-projector.ts b/extensions/codex/src/app-server/event-projector.ts index 6ba10ca888d..8e66f747148 100644 --- a/extensions/codex/src/app-server/event-projector.ts +++ b/extensions/codex/src/app-server/event-projector.ts @@ -20,6 +20,7 @@ import { type ToolProgressDetailMode, } from "openclaw/plugin-sdk/agent-harness-runtime"; import { emitTrustedDiagnosticEvent } from "openclaw/plugin-sdk/diagnostic-runtime"; +import { CodexNativeSubagentTaskMirror } from "./native-subagent-task-mirror.js"; import { readCodexTurn } from "./protocol-validators.js"; import { isJsonObject, @@ -135,19 +136,34 @@ export class CodexAppServerEventProjector { private guardianReviewCount = 0; private completedCompactionCount = 0; private latestRateLimits: JsonValue | undefined; + private readonly nativeSubagentTaskMirror: CodexNativeSubagentTaskMirror; constructor( private readonly params: EmbeddedRunAttemptParams, private readonly threadId: string, private readonly turnId: string, private readonly options: CodexAppServerEventProjectorOptions = {}, - ) {} + ) { + this.nativeSubagentTaskMirror = new CodexNativeSubagentTaskMirror({ + parentThreadId: threadId, + requesterSessionKey: params.sessionKey, + agentId: params.agentId, + }); + } async handleNotification(notification: CodexServerNotification): Promise { const params = isJsonObject(notification.params) ? notification.params : undefined; if (!params) { return; } + try { + this.nativeSubagentTaskMirror.handleNotification(notification); + } catch (error) { + embeddedAgentLog.warn("Failed to mirror Codex native subagent lifecycle event", { + method: notification.method, + error: formatErrorMessage(error), + }); + } if (notification.method === "account/rateLimits/updated") { this.latestRateLimits = params; rememberCodexRateLimits(params); diff --git a/extensions/codex/src/app-server/native-subagent-task-mirror.test.ts b/extensions/codex/src/app-server/native-subagent-task-mirror.test.ts new file mode 100644 index 00000000000..2ab689b838d --- /dev/null +++ b/extensions/codex/src/app-server/native-subagent-task-mirror.test.ts @@ -0,0 +1,380 @@ +import { describe, expect, it, vi } from "vitest"; +import { + codexNativeSubagentRunId, + CodexNativeSubagentTaskMirror, + type TaskLifecycleRuntime, +} from "./native-subagent-task-mirror.js"; + +function createRuntime() { + return { + createRunningTaskRun: vi.fn(), + recordTaskRunProgressByRunId: vi.fn(() => []), + finalizeTaskRunByRunId: vi.fn(() => []), + } as unknown as TaskLifecycleRuntime; +} + +describe("CodexNativeSubagentTaskMirror", () => { + it("creates a silent task-registry task for a native Codex subagent thread", () => { + const runtime = createRuntime(); + const mirror = new CodexNativeSubagentTaskMirror( + { + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:main", + agentId: "main", + now: () => 20_000, + }, + runtime, + ); + + mirror.handleNotification({ + method: "thread/started", + params: { + thread: { + id: "child-thread", + sessionId: "session-tree", + preview: "write the Madrid wine script", + createdAt: 10, + status: { type: "active", activeFlags: [] }, + source: { + subAgent: { + thread_spawn: { + parent_thread_id: "parent-thread", + depth: 1, + agent_nickname: "Poincare", + agent_role: "worker", + }, + }, + }, + }, + }, + }); + + expect(runtime.createRunningTaskRun).toHaveBeenCalledWith( + expect.objectContaining({ + runtime: "subagent", + taskKind: "codex-native", + sourceId: "codex-thread:child-thread", + requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", + agentId: "main", + runId: "codex-thread:child-thread", + label: "Poincare", + task: "write the Madrid wine script", + notifyPolicy: "silent", + deliveryStatus: "not_applicable", + startedAt: 10_000, + progressSummary: "Codex native subagent started.", + }), + ); + expect(vi.mocked(runtime.createRunningTaskRun).mock.calls[0]?.[0]).not.toHaveProperty( + "childSessionKey", + ); + expect(runtime.recordTaskRunProgressByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + runtime: "subagent", + progressSummary: "Codex native subagent is active.", + }), + ); + }); + + it("ignores subagent threads spawned by a different parent thread", () => { + const runtime = createRuntime(); + const mirror = new CodexNativeSubagentTaskMirror( + { + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:main", + }, + runtime, + ); + + mirror.handleNotification({ + method: "thread/started", + params: { + thread: { + id: "other-child", + source: { + subAgent: { + thread_spawn: { + parent_thread_id: "other-parent", + depth: 1, + }, + }, + }, + }, + }, + }); + + expect(runtime.createRunningTaskRun).not.toHaveBeenCalled(); + expect(runtime.recordTaskRunProgressByRunId).not.toHaveBeenCalled(); + expect(runtime.finalizeTaskRunByRunId).not.toHaveBeenCalled(); + }); + + it("deduplicates repeated thread-started notifications for the same child thread", () => { + const runtime = createRuntime(); + const mirror = new CodexNativeSubagentTaskMirror( + { + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:main", + }, + runtime, + ); + const notification = { + method: "thread/started", + params: { + thread: { + id: "child-thread", + source: { + subAgent: { + thread_spawn: { + parent_thread_id: "parent-thread", + depth: 1, + }, + }, + }, + }, + }, + } as const; + + mirror.handleNotification(notification); + mirror.handleNotification(notification); + + expect(runtime.createRunningTaskRun).toHaveBeenCalledTimes(1); + }); + + it("maps Codex thread status changes onto the mirrored task run", () => { + const runtime = createRuntime(); + const mirror = new CodexNativeSubagentTaskMirror( + { + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:main", + now: () => 30_000, + }, + runtime, + ); + + mirror.handleNotification({ + method: "thread/status/changed", + params: { + threadId: "child-thread", + status: { type: "idle" }, + }, + }); + mirror.handleNotification({ + method: "thread/status/changed", + params: { + threadId: "failed-child", + status: { type: "systemError" }, + }, + }); + + expect(runtime.finalizeTaskRunByRunId).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + runId: codexNativeSubagentRunId("child-thread"), + runtime: "subagent", + status: "succeeded", + terminalSummary: "Codex native subagent finished.", + }), + ); + expect(runtime.finalizeTaskRunByRunId).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + runId: codexNativeSubagentRunId("failed-child"), + runtime: "subagent", + status: "failed", + terminalSummary: "Codex native subagent failed.", + }), + ); + }); + + it("creates and updates tasks from Codex collab agent item state", () => { + const runtime = createRuntime(); + const mirror = new CodexNativeSubagentTaskMirror( + { + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:main", + now: () => 40_000, + }, + runtime, + ); + + mirror.handleNotification({ + method: "item/completed", + params: { + item: { + type: "collabAgentToolCall", + tool: "spawnAgent", + senderThreadId: "parent-thread", + receiverThreadIds: ["child-thread"], + prompt: "write the proof file", + agentsStates: { + "child-thread": { + status: "pendingInit", + message: null, + }, + }, + }, + }, + }); + mirror.handleNotification({ + method: "item/completed", + params: { + item: { + type: "collabAgentToolCall", + tool: "wait", + senderThreadId: "parent-thread", + receiverThreadIds: [], + agentsStates: { + "child-thread": { + status: "completed", + message: "done", + }, + }, + }, + }, + }); + + expect(runtime.createRunningTaskRun).toHaveBeenCalledWith( + expect.objectContaining({ + runtime: "subagent", + taskKind: "codex-native", + sourceId: "codex-thread:child-thread", + requesterSessionKey: "agent:main:main", + ownerKey: "agent:main:main", + scopeKind: "session", + runId: "codex-thread:child-thread", + label: "Codex subagent", + task: "write the proof file", + notifyPolicy: "silent", + deliveryStatus: "not_applicable", + }), + ); + expect(vi.mocked(runtime.createRunningTaskRun).mock.calls[0]?.[0]).not.toHaveProperty( + "childSessionKey", + ); + expect(runtime.recordTaskRunProgressByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + runtime: "subagent", + progressSummary: "Codex native subagent is initializing.", + }), + ); + expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + runtime: "subagent", + status: "succeeded", + terminalSummary: "done", + }), + ); + }); + + it("preserves a completed collab agent message when the thread later goes idle", () => { + const runtime = createRuntime(); + const mirror = new CodexNativeSubagentTaskMirror( + { + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:main", + now: () => 50_000, + }, + runtime, + ); + + mirror.handleNotification({ + method: "item/completed", + params: { + item: { + type: "collabAgentToolCall", + tool: "spawnAgent", + senderThreadId: "parent-thread", + receiverThreadIds: ["child-thread"], + prompt: "write the proof file", + agentsStates: { + "child-thread": { + status: "completed", + message: "No user task is specified.", + }, + }, + }, + }, + }); + mirror.handleNotification({ + method: "thread/status/changed", + params: { + threadId: "child-thread", + status: { type: "idle" }, + }, + }); + + expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledTimes(1); + expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + status: "succeeded", + terminalSummary: "No user task is specified.", + }), + ); + }); + + it("normalizes collab agent status spelling from alternate event surfaces", () => { + const runtime = createRuntime(); + const mirror = new CodexNativeSubagentTaskMirror( + { + parentThreadId: "parent-thread", + requesterSessionKey: "agent:main:main", + now: () => 60_000, + }, + runtime, + ); + + mirror.handleNotification({ + method: "item/completed", + params: { + item: { + type: "collabAgentToolCall", + tool: "spawnAgent", + senderThreadId: "parent-thread", + receiverThreadIds: ["child-thread"], + agentsStates: { + "child-thread": { + status: "pending_init", + message: null, + }, + }, + }, + }, + }); + mirror.handleNotification({ + method: "item/completed", + params: { + item: { + type: "collabAgentToolCall", + tool: "wait", + senderThreadId: "parent-thread", + agentsStates: { + "child-thread": { + status: "success", + message: "done", + }, + }, + }, + }, + }); + + expect(runtime.recordTaskRunProgressByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + progressSummary: "Codex native subagent is initializing.", + }), + ); + expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "codex-thread:child-thread", + status: "succeeded", + terminalSummary: "done", + }), + ); + }); +}); diff --git a/extensions/codex/src/app-server/native-subagent-task-mirror.ts b/extensions/codex/src/app-server/native-subagent-task-mirror.ts new file mode 100644 index 00000000000..41051b14d84 --- /dev/null +++ b/extensions/codex/src/app-server/native-subagent-task-mirror.ts @@ -0,0 +1,417 @@ +import { + createRunningTaskRun, + finalizeTaskRunByRunId, + recordTaskRunProgressByRunId, +} from "openclaw/plugin-sdk/codex-native-task-runtime"; +import type { + CodexServerNotification, + CodexSessionSource, + CodexSubAgentThreadSpawnSource, + CodexThread, + CodexThreadStartedNotification, + CodexThreadStatus, + CodexThreadStatusChangedNotification, + JsonObject, + JsonValue, +} from "./protocol.js"; +import { isJsonObject } from "./protocol.js"; + +const CODEX_NATIVE_SUBAGENT_RUNTIME = "subagent"; +const CODEX_NATIVE_SUBAGENT_TASK_KIND = "codex-native"; + +export type TaskLifecycleRuntime = { + createRunningTaskRun: typeof createRunningTaskRun; + recordTaskRunProgressByRunId: typeof recordTaskRunProgressByRunId; + finalizeTaskRunByRunId: typeof finalizeTaskRunByRunId; +}; + +export type CodexNativeSubagentTaskMirrorParams = { + parentThreadId: string; + requesterSessionKey?: string; + agentId?: string; + now?: () => number; +}; + +const defaultRuntime: TaskLifecycleRuntime = { + createRunningTaskRun, + recordTaskRunProgressByRunId, + finalizeTaskRunByRunId, +}; + +export class CodexNativeSubagentTaskMirror { + private readonly mirroredThreadIds = new Set(); + private readonly terminalRunIds = new Set(); + private readonly now: () => number; + + constructor( + private readonly params: CodexNativeSubagentTaskMirrorParams, + private readonly runtime: TaskLifecycleRuntime = defaultRuntime, + ) { + this.now = params.now ?? Date.now; + } + + handleNotification(notification: CodexServerNotification): void { + const params = isJsonObject(notification.params) ? notification.params : undefined; + if (!params) { + return; + } + if (notification.method === "thread/started") { + this.handleThreadStarted(params); + return; + } + if (notification.method === "thread/status/changed") { + this.handleThreadStatusChanged(params); + return; + } + if (notification.method === "item/started" || notification.method === "item/completed") { + this.handleCollabAgentItem(params); + } + } + + private handleThreadStarted(params: JsonObject): void { + const notification = readThreadStartedNotification(params); + if (!notification) { + return; + } + const thread = notification.thread; + const spawn = readSubagentThreadSpawnSource(thread.source, this.params.parentThreadId); + if (!spawn) { + return; + } + const threadId = thread.id.trim(); + if (!threadId || this.mirroredThreadIds.has(threadId)) { + return; + } + this.mirroredThreadIds.add(threadId); + const runId = codexNativeSubagentRunId(threadId); + const label = + trimOptional(spawn.agent_nickname) ?? + trimOptional(thread.agentNickname) ?? + trimOptional(spawn.agent_role) ?? + trimOptional(thread.agentRole) ?? + "Codex subagent"; + const task = + trimOptional(thread.preview) ?? + `Codex native subagent${label === "Codex subagent" ? "" : ` ${label}`}`; + const createdAt = secondsToMillis(thread.createdAt) ?? this.now(); + this.runtime.createRunningTaskRun({ + runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, + taskKind: CODEX_NATIVE_SUBAGENT_TASK_KIND, + sourceId: runId, + requesterSessionKey: this.params.requesterSessionKey, + ...(this.params.requesterSessionKey + ? { + ownerKey: this.params.requesterSessionKey, + scopeKind: "session" as const, + } + : {}), + agentId: this.params.agentId, + runId, + label, + task, + notifyPolicy: "silent", + deliveryStatus: "not_applicable", + preferMetadata: true, + startedAt: createdAt, + lastEventAt: this.now(), + progressSummary: "Codex native subagent started.", + }); + this.applyStatus(threadId, thread.status); + } + + private handleThreadStatusChanged(params: JsonObject): void { + const notification = readThreadStatusChangedNotification(params); + if (!notification) { + return; + } + this.applyStatus(notification.threadId, notification.status); + } + + private applyStatus(threadId: string, status: CodexThreadStatus | null | undefined): void { + const statusType = status?.type; + if (!statusType) { + return; + } + const runId = codexNativeSubagentRunId(threadId); + if (this.terminalRunIds.has(runId) && statusType !== "systemError") { + return; + } + const eventAt = this.now(); + if (statusType === "active") { + this.runtime.recordTaskRunProgressByRunId({ + runId, + runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, + lastEventAt: eventAt, + progressSummary: "Codex native subagent is active.", + }); + return; + } + if (statusType === "idle") { + this.terminalRunIds.add(runId); + this.runtime.finalizeTaskRunByRunId({ + runId, + runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, + status: "succeeded", + endedAt: eventAt, + lastEventAt: eventAt, + progressSummary: "Codex native subagent is idle.", + terminalSummary: "Codex native subagent finished.", + }); + return; + } + if (statusType === "systemError") { + this.terminalRunIds.add(runId); + this.runtime.finalizeTaskRunByRunId({ + runId, + runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, + status: "failed", + endedAt: eventAt, + lastEventAt: eventAt, + error: "Codex app-server reported a system error for the native subagent thread.", + progressSummary: "Codex native subagent hit a system error.", + terminalSummary: "Codex native subagent failed.", + }); + return; + } + if (statusType === "notLoaded") { + this.runtime.recordTaskRunProgressByRunId({ + runId, + runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, + lastEventAt: eventAt, + progressSummary: "Codex native subagent is not loaded.", + }); + } + } + + private handleCollabAgentItem(params: JsonObject): void { + const item = isJsonObject(params.item) ? params.item : undefined; + if (!item || readString(item, "type") !== "collabAgentToolCall") { + return; + } + if (readString(item, "senderThreadId") !== this.params.parentThreadId) { + return; + } + const receiverThreadIds = readStringArray(item.receiverThreadIds); + if (normalizeToolName(readString(item, "tool")) === "spawnagent") { + for (const receiverThreadId of receiverThreadIds) { + this.createTaskFromCollabSpawnItem(receiverThreadId, item); + } + } + const agentsStates = readAgentsStates(item.agentsStates); + for (const [threadId, state] of agentsStates) { + this.applyCollabAgentStatus(threadId, state.status, state.message); + } + } + + private createTaskFromCollabSpawnItem(threadId: string, item: JsonObject): void { + const normalizedThreadId = threadId.trim(); + if (!normalizedThreadId || this.mirroredThreadIds.has(normalizedThreadId)) { + return; + } + this.mirroredThreadIds.add(normalizedThreadId); + const prompt = trimOptional(readString(item, "prompt")); + const runId = codexNativeSubagentRunId(normalizedThreadId); + const createdAt = this.now(); + this.runtime.createRunningTaskRun({ + runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, + taskKind: CODEX_NATIVE_SUBAGENT_TASK_KIND, + sourceId: runId, + requesterSessionKey: this.params.requesterSessionKey, + ...(this.params.requesterSessionKey + ? { + ownerKey: this.params.requesterSessionKey, + scopeKind: "session" as const, + } + : {}), + agentId: this.params.agentId, + runId, + label: "Codex subagent", + task: prompt ?? "Codex native subagent", + notifyPolicy: "silent", + deliveryStatus: "not_applicable", + preferMetadata: true, + startedAt: createdAt, + lastEventAt: createdAt, + progressSummary: "Codex native subagent spawned.", + }); + } + + private applyCollabAgentStatus( + threadId: string, + status: string | undefined, + message: string | null | undefined, + ): void { + const normalizedStatus = normalizeAgentStateStatus(status); + if (!normalizedStatus) { + return; + } + const runId = codexNativeSubagentRunId(threadId); + const eventAt = this.now(); + if (normalizedStatus === "pendingInit" || normalizedStatus === "running") { + this.runtime.recordTaskRunProgressByRunId({ + runId, + runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, + lastEventAt: eventAt, + progressSummary: + trimOptional(message) ?? + (normalizedStatus === "pendingInit" + ? "Codex native subagent is initializing." + : "Codex native subagent is running."), + }); + return; + } + if (normalizedStatus === "completed") { + this.terminalRunIds.add(runId); + this.runtime.finalizeTaskRunByRunId({ + runId, + runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, + status: "succeeded", + endedAt: eventAt, + lastEventAt: eventAt, + progressSummary: trimOptional(message) ?? "Codex native subagent completed.", + terminalSummary: trimOptional(message) ?? "Codex native subagent finished.", + }); + return; + } + this.terminalRunIds.add(runId); + this.runtime.finalizeTaskRunByRunId({ + runId, + runtime: CODEX_NATIVE_SUBAGENT_RUNTIME, + status: + normalizedStatus === "interrupted" || normalizedStatus === "shutdown" + ? "cancelled" + : "failed", + endedAt: eventAt, + lastEventAt: eventAt, + error: trimOptional(message) ?? `Codex native subagent status: ${normalizedStatus}`, + progressSummary: trimOptional(message) ?? `Codex native subagent ${normalizedStatus}.`, + terminalSummary: trimOptional(message) ?? "Codex native subagent did not complete.", + }); + } +} + +export function codexNativeSubagentRunId(threadId: string): string { + return `codex-thread:${threadId.trim()}`; +} + +export function readSubagentThreadSpawnSource( + source: CodexSessionSource | null | undefined, + parentThreadId: string, +): CodexSubAgentThreadSpawnSource | undefined { + if (!source || typeof source !== "object" || !("subAgent" in source)) { + return undefined; + } + const subAgent = source.subAgent; + if (!subAgent || typeof subAgent !== "object" || !("thread_spawn" in subAgent)) { + return undefined; + } + const spawn = subAgent.thread_spawn; + if (!spawn || typeof spawn !== "object") { + return undefined; + } + return spawn.parent_thread_id === parentThreadId ? spawn : undefined; +} + +function readThreadStartedNotification( + params: JsonObject, +): CodexThreadStartedNotification | undefined { + const thread = params.thread; + if (!isJsonObject(thread) || typeof thread.id !== "string") { + return undefined; + } + return { thread: thread as CodexThread }; +} + +function readThreadStatusChangedNotification( + params: JsonObject, +): CodexThreadStatusChangedNotification | undefined { + if (typeof params.threadId !== "string") { + return undefined; + } + const status = params.status; + if (!isJsonObject(status) || !isCodexThreadStatusType(status.type)) { + return undefined; + } + return { + threadId: params.threadId, + status: status as CodexThreadStatus, + }; +} + +function isCodexThreadStatusType(value: unknown): value is CodexThreadStatus["type"] { + return value === "notLoaded" || value === "idle" || value === "systemError" || value === "active"; +} + +function readAgentsStates( + value: JsonValue | undefined, +): Map { + const states = new Map(); + if (!isJsonObject(value)) { + return states; + } + for (const [threadId, rawState] of Object.entries(value)) { + if (!isJsonObject(rawState)) { + continue; + } + const status = readString(rawState, "status"); + const message = readNullableString(rawState, "message"); + states.set(threadId, { status, message }); + } + return states; +} + +function readStringArray(value: JsonValue | undefined): string[] { + if (!Array.isArray(value)) { + return []; + } + return value.filter((entry): entry is string => typeof entry === "string" && entry.trim() !== ""); +} + +function readString(value: JsonObject, key: string): string | undefined { + const entry = value[key]; + return typeof entry === "string" ? entry : undefined; +} + +function readNullableString(value: JsonObject, key: string): string | null | undefined { + const entry = value[key]; + return typeof entry === "string" || entry === null ? entry : undefined; +} + +function normalizeToolName(value: string | undefined): string | undefined { + return value?.replace(/[^a-z0-9]/giu, "").toLowerCase(); +} + +function normalizeAgentStateStatus(value: string | undefined): string | undefined { + const key = value?.replace(/[^a-z0-9]/giu, "").toLowerCase(); + if (!key) { + return undefined; + } + if (key === "pendinginit") { + return "pendingInit"; + } + if (key === "inprogress" || key === "running") { + return "running"; + } + if (key === "completed" || key === "succeeded" || key === "success") { + return "completed"; + } + if (key === "interrupted" || key === "cancelled" || key === "canceled" || key === "shutdown") { + return key === "shutdown" ? "shutdown" : "interrupted"; + } + if (key === "failed" || key === "error" || key === "systemerror") { + return "failed"; + } + return value?.trim(); +} + +function secondsToMillis(value: number | null | undefined): number | undefined { + if (typeof value !== "number" || !Number.isFinite(value)) { + return undefined; + } + return value * 1000; +} + +function trimOptional(value: string | null | undefined): string | undefined { + const trimmed = value?.trim(); + return trimmed ? trimmed : undefined; +} diff --git a/extensions/codex/src/app-server/protocol.ts b/extensions/codex/src/app-server/protocol.ts index e9a476973f8..58b58a7a16c 100644 --- a/extensions/codex/src/app-server/protocol.ts +++ b/extensions/codex/src/app-server/protocol.ts @@ -166,7 +166,54 @@ export type CodexThread = { id: string; sessionId?: string; name?: string | null; + preview?: string | null; + createdAt?: number | null; + updatedAt?: number | null; + status?: CodexThreadStatus | null; cwd?: string | null; + source?: CodexSessionSource | null; + threadSource?: string | null; + agentNickname?: string | null; + agentRole?: string | null; +}; + +export type CodexThreadStatus = + | { type: "notLoaded" } + | { type: "idle" } + | { type: "systemError" } + | { type: "active"; activeFlags?: string[] }; + +export type CodexSubAgentThreadSpawnSource = { + parent_thread_id: string; + depth?: number; + agent_path?: string | null; + agent_nickname?: string | null; + agent_role?: string | null; +}; + +export type CodexSubAgentSource = + | "review" + | "compact" + | "memory_consolidation" + | { thread_spawn: CodexSubAgentThreadSpawnSource } + | { other: string }; + +export type CodexSessionSource = + | "cli" + | "vscode" + | "exec" + | "appServer" + | "unknown" + | { custom: string } + | { subAgent: CodexSubAgentSource }; + +export type CodexThreadStartedNotification = { + thread: CodexThread; +}; + +export type CodexThreadStatusChangedNotification = { + threadId: string; + status: CodexThreadStatus; }; export type CodexThreadItem = { diff --git a/scripts/lib/plugin-sdk-private-local-only-subpaths.json b/scripts/lib/plugin-sdk-private-local-only-subpaths.json index 0c6ea22241f..91e92dd8870 100644 --- a/scripts/lib/plugin-sdk-private-local-only-subpaths.json +++ b/scripts/lib/plugin-sdk-private-local-only-subpaths.json @@ -1,4 +1,5 @@ [ + "codex-native-task-runtime", "qa-channel", "qa-channel-protocol", "qa-lab", diff --git a/src/plugin-sdk/codex-native-task-runtime.ts b/src/plugin-sdk/codex-native-task-runtime.ts new file mode 100644 index 00000000000..046fcf18045 --- /dev/null +++ b/src/plugin-sdk/codex-native-task-runtime.ts @@ -0,0 +1,10 @@ +// Private helper surface for the bundled Codex plugin. This is intentionally +// local-only so Codex can mirror app-server native subagents into OpenClaw's +// task registry without promoting detached task mutation helpers to the public +// plugin SDK. + +export { + createRunningTaskRun, + finalizeTaskRunByRunId, + recordTaskRunProgressByRunId, +} from "../tasks/detached-task-runtime.js"; diff --git a/src/plugins/contracts/plugin-sdk-root-alias.test.ts b/src/plugins/contracts/plugin-sdk-root-alias.test.ts index b9c58e2525e..4c0d1d7ca64 100644 --- a/src/plugins/contracts/plugin-sdk-root-alias.test.ts +++ b/src/plugins/contracts/plugin-sdk-root-alias.test.ts @@ -443,6 +443,30 @@ describe("plugin-sdk root alias", () => { expect(aliasMap).not.toHaveProperty("openclaw/plugin-sdk/nested/path"); }); + it("keeps non-QA private local-only plugin-sdk subpaths out of the CJS root alias", () => { + const packageRoot = path.dirname(path.dirname(path.dirname(rootAliasPath))); + const sourceCodexNativeTaskRuntimePath = path.join( + packageRoot, + "src", + "plugin-sdk", + "codex-native-task-runtime.ts", + ); + const sourceQaRuntimePath = path.join(packageRoot, "src", "plugin-sdk", "qa-runtime.ts"); + const lazyModule = loadRootAliasWithStubs({ + privateLocalOnlySubpaths: ["codex-native-task-runtime", "qa-runtime"], + existingPaths: [sourceCodexNativeTaskRuntimePath, sourceQaRuntimePath], + monolithicExports: { + slowHelper: (): string => "loaded", + }, + }); + + expect((lazyModule.moduleExports.slowHelper as () => string)()).toBe("loaded"); + const aliasMap = (lazyModule.createJitiOptions.at(-1)?.alias ?? {}) as Record; + expect(aliasMap).not.toHaveProperty("openclaw/plugin-sdk/codex-native-task-runtime"); + expect(aliasMap).not.toHaveProperty("@openclaw/plugin-sdk/codex-native-task-runtime"); + expect(aliasMap).not.toHaveProperty("openclaw/plugin-sdk/qa-runtime"); + }); + it("builds source plugin-sdk subpath aliases through the wider source extension family", () => { const packageRoot = path.dirname(path.dirname(path.dirname(rootAliasPath))); const lazyModule = loadRootAliasWithStubs({ diff --git a/src/plugins/sdk-alias.test.ts b/src/plugins/sdk-alias.test.ts index 1885a76472c..0ac128acd11 100644 --- a/src/plugins/sdk-alias.test.ts +++ b/src/plugins/sdk-alias.test.ts @@ -631,6 +631,51 @@ describe("plugin sdk alias helpers", () => { expect(subpaths).toEqual(["core", "qa-channel", "qa-channel-protocol", "qa-lab", "qa-runtime"]); }); + it("adds the non-QA private Codex task runtime subpath only for bundled Codex", () => { + const fixture = createPluginSdkAliasFixture({ + packageExports: { + "./plugin-sdk/core": { default: "./dist/plugin-sdk/core.js" }, + }, + }); + fs.writeFileSync( + path.join(fixture.root, "scripts", "lib", "plugin-sdk-private-local-only-subpaths.json"), + JSON.stringify(["codex-native-task-runtime", "qa-runtime"], null, 2), + "utf-8", + ); + fs.writeFileSync( + path.join(fixture.root, "src", "plugin-sdk", "codex-native-task-runtime.ts"), + "export const codexNativeTaskRuntime = true;\n", + "utf-8", + ); + fs.writeFileSync( + path.join(fixture.root, "src", "plugin-sdk", "qa-runtime.ts"), + "export const qaRuntime = true;\n", + "utf-8", + ); + const sourceCodexEntry = writePluginEntry( + fixture.root, + bundledPluginFile("codex", "src/index.ts"), + ); + const sourceOtherEntry = writePluginEntry( + fixture.root, + bundledPluginFile("demo", "src/index.ts"), + ); + + const codexSubpaths = withEnv({ OPENCLAW_ENABLE_PRIVATE_QA_CLI: undefined }, () => + listPluginSdkExportedSubpaths({ + modulePath: sourceCodexEntry, + }), + ); + const otherSubpaths = withEnv({ OPENCLAW_ENABLE_PRIVATE_QA_CLI: undefined }, () => + listPluginSdkExportedSubpaths({ + modulePath: sourceOtherEntry, + }), + ); + + expect(codexSubpaths).toEqual(["codex-native-task-runtime", "core"]); + expect(otherSubpaths).toEqual(["core"]); + }); + it("does not reuse a non-private cached subpath list after private qa gets enabled", () => { const fixture = createPluginSdkAliasFixture({ packageExports: { @@ -799,6 +844,60 @@ describe("plugin sdk alias helpers", () => { ); }); + it("aliases non-QA private plugin-sdk subpaths for bundled runtime source loading", () => { + const fixture = createPluginSdkAliasFixture({ + packageExports: { + "./plugin-sdk/core": { default: "./dist/plugin-sdk/core.js" }, + }, + }); + const sourceRootAlias = path.join(fixture.root, "src", "plugin-sdk", "root-alias.cjs"); + const sourceCodexNativeTaskRuntimePath = path.join( + fixture.root, + "src", + "plugin-sdk", + "codex-native-task-runtime.ts", + ); + const sourceQaRuntimePath = path.join(fixture.root, "src", "plugin-sdk", "qa-runtime.ts"); + fs.writeFileSync(sourceRootAlias, "module.exports = {};\n", "utf-8"); + fs.writeFileSync( + path.join(fixture.root, "scripts", "lib", "plugin-sdk-private-local-only-subpaths.json"), + JSON.stringify(["codex-native-task-runtime", "qa-runtime"], null, 2), + "utf-8", + ); + fs.writeFileSync( + sourceCodexNativeTaskRuntimePath, + "export const codexNativeTaskRuntime = true;\n", + "utf-8", + ); + fs.writeFileSync(sourceQaRuntimePath, "export const qaRuntime = true;\n", "utf-8"); + const sourcePluginEntry = writePluginEntry( + fixture.root, + bundledPluginFile("codex", "src/index.ts"), + ); + const sourceOtherPluginEntry = writePluginEntry( + fixture.root, + bundledPluginFile("demo", "src/index.ts"), + ); + + const aliases = withEnv( + { OPENCLAW_ENABLE_PRIVATE_QA_CLI: undefined, NODE_ENV: undefined }, + () => buildPluginLoaderAliasMap(sourcePluginEntry), + ); + const otherAliases = withEnv( + { OPENCLAW_ENABLE_PRIVATE_QA_CLI: undefined, NODE_ENV: undefined }, + () => buildPluginLoaderAliasMap(sourceOtherPluginEntry), + ); + + expect(fs.realpathSync(aliases["openclaw/plugin-sdk"] ?? "")).toBe( + fs.realpathSync(sourceRootAlias), + ); + expect(fs.realpathSync(aliases["openclaw/plugin-sdk/codex-native-task-runtime"] ?? "")).toBe( + fs.realpathSync(sourceCodexNativeTaskRuntimePath), + ); + expect(aliases["openclaw/plugin-sdk/qa-runtime"]).toBeUndefined(); + expect(otherAliases["openclaw/plugin-sdk/codex-native-task-runtime"]).toBeUndefined(); + }); + it("applies explicit dist resolution to plugin-sdk subpath aliases too", () => { const { fixture, distRootAlias, distChannelRuntimePath } = createPluginSdkAliasTargetFixture(); const sourcePluginEntry = writePluginEntry( diff --git a/src/plugins/sdk-alias.ts b/src/plugins/sdk-alias.ts index 78a5ab75e3d..e82457370c9 100644 --- a/src/plugins/sdk-alias.ts +++ b/src/plugins/sdk-alias.ts @@ -265,6 +265,7 @@ const cachedPluginSdkScopedAliasMaps = new PluginLruCache MAX_PLUGIN_LOADER_ALIAS_CACHE_ENTRIES, ); const PLUGIN_SDK_PACKAGE_NAMES = ["openclaw/plugin-sdk", "@openclaw/plugin-sdk"] as const; +const CODEX_NATIVE_TASK_RUNTIME_PLUGIN_SDK_SUBPATH = "codex-native-task-runtime"; const PLUGIN_SDK_SOURCE_CANDIDATE_EXTENSIONS = [ ".ts", ".mts", @@ -453,6 +454,34 @@ function shouldIncludePrivateLocalOnlyPluginSdkSubpaths() { return process.env.OPENCLAW_ENABLE_PRIVATE_QA_CLI === "1"; } +function isBundledCodexPluginModulePath(params: { packageRoot: string; modulePath: string }) { + const normalizedModulePath = path.resolve(params.modulePath); + const roots = [ + path.join(params.packageRoot, "extensions", "codex"), + path.join(params.packageRoot, "dist", "extensions", "codex"), + path.join(params.packageRoot, "dist-runtime", "extensions", "codex"), + ]; + return roots.some( + (root) => + normalizedModulePath === root || normalizedModulePath.startsWith(`${root}${path.sep}`), + ); +} + +function shouldIncludePrivateLocalOnlyPluginSdkSubpath(params: { + packageRoot: string; + modulePath: string; + subpath: string; +}) { + return ( + shouldIncludePrivateLocalOnlyPluginSdkSubpaths() || + (params.subpath === CODEX_NATIVE_TASK_RUNTIME_PLUGIN_SDK_SUBPATH && + isBundledCodexPluginModulePath({ + packageRoot: params.packageRoot, + modulePath: params.modulePath, + })) + ); +} + function hasPluginSdkSubpathArtifact(packageRoot: string, subpath: string) { const distPath = path.join(packageRoot, "dist", "plugin-sdk", `${subpath}.js`); if (isUsableDistPluginSdkArtifact(distPath)) { @@ -478,12 +507,14 @@ function listDistPluginSdkArtifactSubpaths(packageRoot: string): Set { } } -function listPrivateLocalOnlyPluginSdkSubpaths(packageRoot: string): string[] { - if (!shouldIncludePrivateLocalOnlyPluginSdkSubpaths()) { - return []; - } - return readPrivateLocalOnlyPluginSdkSubpaths(packageRoot).filter((subpath) => - hasPluginSdkSubpathArtifact(packageRoot, subpath), +function listPrivateLocalOnlyPluginSdkSubpaths(params: { + packageRoot: string; + modulePath: string; +}): string[] { + return readPrivateLocalOnlyPluginSdkSubpaths(params.packageRoot).filter( + (subpath) => + shouldIncludePrivateLocalOnlyPluginSdkSubpath({ ...params, subpath }) && + hasPluginSdkSubpathArtifact(params.packageRoot, subpath), ); } @@ -504,7 +535,8 @@ export function listPluginSdkExportedSubpaths( if (!packageRoot) { return []; } - const cacheKey = `${packageRoot}::privateQa=${shouldIncludePrivateLocalOnlyPluginSdkSubpaths() ? "1" : "0"}`; + const includeCodexPrivateRuntime = isBundledCodexPluginModulePath({ packageRoot, modulePath }); + const cacheKey = `${packageRoot}::privateQa=${shouldIncludePrivateLocalOnlyPluginSdkSubpaths() ? "1" : "0"}::codexPrivate=${includeCodexPrivateRuntime ? "1" : "0"}`; const cached = cachedPluginSdkExportedSubpaths.get(cacheKey); if (cached) { return cached; @@ -512,7 +544,7 @@ export function listPluginSdkExportedSubpaths( const subpaths = [ ...new Set([ ...(readPluginSdkSubpathsFromPackageRoot(packageRoot) ?? []), - ...listPrivateLocalOnlyPluginSdkSubpaths(packageRoot), + ...listPrivateLocalOnlyPluginSdkSubpaths({ packageRoot, modulePath }), ]), ].toSorted(); cachedPluginSdkExportedSubpaths.set(cacheKey, subpaths); @@ -541,7 +573,8 @@ export function resolvePluginSdkScopedAliasMap( isProduction: process.env.NODE_ENV === "production", pluginSdkResolution: params.pluginSdkResolution, }); - const cacheKey = `${packageRoot}::${orderedKinds.join(",")}::privateQa=${shouldIncludePrivateLocalOnlyPluginSdkSubpaths() ? "1" : "0"}`; + const includeCodexPrivateRuntime = isBundledCodexPluginModulePath({ packageRoot, modulePath }); + const cacheKey = `${packageRoot}::${orderedKinds.join(",")}::privateQa=${shouldIncludePrivateLocalOnlyPluginSdkSubpaths() ? "1" : "0"}::codexPrivate=${includeCodexPrivateRuntime ? "1" : "0"}`; const cached = cachedPluginSdkScopedAliasMaps.get(cacheKey); if (cached) { return cached; diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index 717b51f885d..d6a0a109b22 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -1765,6 +1765,43 @@ describe("task-registry", () => { }); }); + it("does not mark codex-native subagent tasks lost when they have no OpenClaw child session", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + const now = Date.now(); + + const task = createTaskRecord({ + runtime: "subagent", + taskKind: "codex-native", + ownerKey: "agent:main:main", + scopeKind: "session", + sourceId: "codex-thread:child-thread", + runId: "codex-thread:child-thread", + task: "Codex native child", + status: "running", + deliveryStatus: "not_applicable", + notifyPolicy: "silent", + }); + setTaskTimingById({ + taskId: task.taskId, + lastEventAt: now - 10 * 60_000, + }); + + expect(await runTaskRegistryMaintenance()).toEqual({ + reconciled: 0, + recovered: 0, + cleanupStamped: 0, + pruned: 0, + }); + expect(getTaskById(task.taskId)).toMatchObject({ + status: "running", + taskKind: "codex-native", + runId: "codex-thread:child-thread", + }); + }); + }); + it("closes terminal parent-owned one-shot ACP sessions during maintenance", async () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; @@ -3012,4 +3049,39 @@ describe("task-registry", () => { }); }); }); + + it("does not route codex-native task cancellation through OpenClaw subagent sessions", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + const task = createTaskRecord({ + runtime: "subagent", + taskKind: "codex-native", + ownerKey: "agent:main:main", + scopeKind: "session", + sourceId: "codex-thread:child-thread", + runId: "codex-thread:child-thread", + task: "Codex native child", + status: "running", + deliveryStatus: "not_applicable", + notifyPolicy: "silent", + }); + + const result = await cancelTaskById({ + cfg: {} as never, + taskId: task.taskId, + }); + + expect(result).toMatchObject({ + found: true, + cancelled: false, + reason: "Task has no cancellable child session.", + task: expect.objectContaining({ + taskId: task.taskId, + status: "running", + }), + }); + expect(hoisted.killSubagentRunAdminMock).not.toHaveBeenCalled(); + }); + }); }); diff --git a/tsdown.config.ts b/tsdown.config.ts index d0b215c826c..cc962ce9b67 100644 --- a/tsdown.config.ts +++ b/tsdown.config.ts @@ -283,6 +283,8 @@ function buildUnifiedDistEntries(): Record { ...dockerE2eHarnessEntries, // Internal compat artifact for the root-alias.cjs lazy loader. "plugin-sdk/compat": "src/plugin-sdk/compat.ts", + // Private bundled Codex helper for app-server native subagent task mirroring. + "plugin-sdk/codex-native-task-runtime": "src/plugin-sdk/codex-native-task-runtime.ts", ...Object.fromEntries( Object.entries(buildPluginSdkEntrySources()).map(([entry, source]) => [ `plugin-sdk/${entry}`,