Mirror Codex native subagents into task registry (#79512)

Merged via squash.

Prepared head SHA: 75bc96ad74
Co-authored-by: mbelinky <mbelinky@users.noreply.github.com>
Co-authored-by: marianobelinky <63976030+marianobelinky@users.noreply.github.com>
Reviewed-by: @pashpashpash
This commit is contained in:
Mariano
2026-05-11 20:52:41 +02:00
committed by GitHub
parent ba03d637ea
commit 8c75ed3eaa
14 changed files with 1145 additions and 12 deletions

View File

@@ -62,6 +62,7 @@ Docs: https://docs.openclaw.ai
- Plugin SDK: add bundled-plugin session actions, `sendSessionAttachment`, and Cron-backed `scheduleSessionTurn`/tag cleanup under the grouped session namespace. Replaces #75578/#75581/#75588 and part of #73384/#74483. Thanks @100yenadmin. - Plugin SDK: add bundled-plugin session actions, `sendSessionAttachment`, and Cron-backed `scheduleSessionTurn`/tag cleanup under the grouped session namespace. Replaces #75578/#75581/#75588 and part of #73384/#74483. Thanks @100yenadmin.
- Plugin SDK/media-understanding: add `extractStructuredWithModel(...)` plus the optional provider-side `extractStructured(...)` seam so trusted plugins can run bounded image-first structured extraction with optional supplemental text context through provider-owned runtimes such as Codex. - Plugin SDK/media-understanding: add `extractStructuredWithModel(...)` plus the optional provider-side `extractStructured(...)` seam so trusted plugins can run bounded image-first structured extraction with optional supplemental text context through provider-owned runtimes such as Codex.
- Exec approvals: add `tools.exec.commandHighlighting` so parser-derived command highlighting in approval prompts can be enabled globally or per agent. (#79348) Thanks @jesse-merhi. - Exec approvals: add `tools.exec.commandHighlighting` so parser-derived command highlighting in approval prompts can be enabled globally or per agent. (#79348) Thanks @jesse-merhi.
- Codex app-server: mirror native Codex subagent spawn lifecycle events into Task Registry so app-server child agents appear in task/status surfaces without relying on transcript text. (#79512) Thanks @mbelinky.
### Fixes ### Fixes

View File

@@ -1,2 +1,2 @@
19455aee06dd33e2679cfcd8075b10cce806069667097fd7e717aa641c262e51 plugin-sdk-api-baseline.json a79e7eca306cd4a0156699b0884aa0483b7eb1352158a7bebf806506ead41a66 plugin-sdk-api-baseline.json
ea6e0b36ab14977bed8dcf64118e58a8e58a76f41860c32055a73bcd04612826 plugin-sdk-api-baseline.jsonl ba307375a0714be1360fceb564e89d86724f8ad5a50a18377184254112c99128 plugin-sdk-api-baseline.jsonl

View File

@@ -20,6 +20,7 @@ import {
type CodexAppServerEventProjectorOptions, type CodexAppServerEventProjectorOptions,
type CodexAppServerToolTelemetry, type CodexAppServerToolTelemetry,
} from "./event-projector.js"; } from "./event-projector.js";
import { CodexNativeSubagentTaskMirror } from "./native-subagent-task-mirror.js";
import { rememberCodexRateLimits, resetCodexRateLimitCacheForTests } from "./rate-limit-cache.js"; import { rememberCodexRateLimits, resetCodexRateLimitCacheForTests } from "./rate-limit-cache.js";
import { createCodexTestModel } from "./test-support.js"; import { createCodexTestModel } from "./test-support.js";
@@ -643,6 +644,36 @@ describe("CodexAppServerEventProjector", () => {
expect(result.assistantTexts).toStrictEqual([]); expect(result.assistantTexts).toStrictEqual([]);
}); });
it("mirrors native subagent notifications before current-turn filtering", async () => {
const projector = await createProjector({
...(await createParams()),
sessionKey: "agent:main:main",
} as EmbeddedRunAttemptParams);
const mirrorSpy = vi.spyOn(CodexNativeSubagentTaskMirror.prototype, "handleNotification");
const notification = {
method: "item/completed",
params: {
threadId: THREAD_ID,
turnId: "child-turn",
item: {
type: "collabAgentToolCall",
tool: "spawnAgent",
senderThreadId: THREAD_ID,
receiverThreadIds: ["child-thread"],
agentsStates: {
"child-thread": { status: "completed", message: "done" },
},
},
},
} as ProjectorNotification;
await projector.handleNotification(notification);
expect(mirrorSpy).toHaveBeenCalledWith(notification);
const result = projector.buildResult(buildEmptyToolTelemetry());
expect(result.assistantTexts).toEqual([]);
});
it("ignores notifications that omit top-level thread and turn ids", async () => { it("ignores notifications that omit top-level thread and turn ids", async () => {
const projector = await createProjector(); const projector = await createProjector();

View File

@@ -20,6 +20,7 @@ import {
type ToolProgressDetailMode, type ToolProgressDetailMode,
} from "openclaw/plugin-sdk/agent-harness-runtime"; } from "openclaw/plugin-sdk/agent-harness-runtime";
import { emitTrustedDiagnosticEvent } from "openclaw/plugin-sdk/diagnostic-runtime"; import { emitTrustedDiagnosticEvent } from "openclaw/plugin-sdk/diagnostic-runtime";
import { CodexNativeSubagentTaskMirror } from "./native-subagent-task-mirror.js";
import { readCodexTurn } from "./protocol-validators.js"; import { readCodexTurn } from "./protocol-validators.js";
import { import {
isJsonObject, isJsonObject,
@@ -135,19 +136,34 @@ export class CodexAppServerEventProjector {
private guardianReviewCount = 0; private guardianReviewCount = 0;
private completedCompactionCount = 0; private completedCompactionCount = 0;
private latestRateLimits: JsonValue | undefined; private latestRateLimits: JsonValue | undefined;
private readonly nativeSubagentTaskMirror: CodexNativeSubagentTaskMirror;
constructor( constructor(
private readonly params: EmbeddedRunAttemptParams, private readonly params: EmbeddedRunAttemptParams,
private readonly threadId: string, private readonly threadId: string,
private readonly turnId: string, private readonly turnId: string,
private readonly options: CodexAppServerEventProjectorOptions = {}, private readonly options: CodexAppServerEventProjectorOptions = {},
) {} ) {
this.nativeSubagentTaskMirror = new CodexNativeSubagentTaskMirror({
parentThreadId: threadId,
requesterSessionKey: params.sessionKey,
agentId: params.agentId,
});
}
async handleNotification(notification: CodexServerNotification): Promise<void> { async handleNotification(notification: CodexServerNotification): Promise<void> {
const params = isJsonObject(notification.params) ? notification.params : undefined; const params = isJsonObject(notification.params) ? notification.params : undefined;
if (!params) { if (!params) {
return; return;
} }
try {
this.nativeSubagentTaskMirror.handleNotification(notification);
} catch (error) {
embeddedAgentLog.warn("Failed to mirror Codex native subagent lifecycle event", {
method: notification.method,
error: formatErrorMessage(error),
});
}
if (notification.method === "account/rateLimits/updated") { if (notification.method === "account/rateLimits/updated") {
this.latestRateLimits = params; this.latestRateLimits = params;
rememberCodexRateLimits(params); rememberCodexRateLimits(params);

View File

@@ -0,0 +1,380 @@
import { describe, expect, it, vi } from "vitest";
import {
codexNativeSubagentRunId,
CodexNativeSubagentTaskMirror,
type TaskLifecycleRuntime,
} from "./native-subagent-task-mirror.js";
function createRuntime() {
return {
createRunningTaskRun: vi.fn(),
recordTaskRunProgressByRunId: vi.fn(() => []),
finalizeTaskRunByRunId: vi.fn(() => []),
} as unknown as TaskLifecycleRuntime;
}
describe("CodexNativeSubagentTaskMirror", () => {
it("creates a silent task-registry task for a native Codex subagent thread", () => {
const runtime = createRuntime();
const mirror = new CodexNativeSubagentTaskMirror(
{
parentThreadId: "parent-thread",
requesterSessionKey: "agent:main:main",
agentId: "main",
now: () => 20_000,
},
runtime,
);
mirror.handleNotification({
method: "thread/started",
params: {
thread: {
id: "child-thread",
sessionId: "session-tree",
preview: "write the Madrid wine script",
createdAt: 10,
status: { type: "active", activeFlags: [] },
source: {
subAgent: {
thread_spawn: {
parent_thread_id: "parent-thread",
depth: 1,
agent_nickname: "Poincare",
agent_role: "worker",
},
},
},
},
},
});
expect(runtime.createRunningTaskRun).toHaveBeenCalledWith(
expect.objectContaining({
runtime: "subagent",
taskKind: "codex-native",
sourceId: "codex-thread:child-thread",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
agentId: "main",
runId: "codex-thread:child-thread",
label: "Poincare",
task: "write the Madrid wine script",
notifyPolicy: "silent",
deliveryStatus: "not_applicable",
startedAt: 10_000,
progressSummary: "Codex native subagent started.",
}),
);
expect(vi.mocked(runtime.createRunningTaskRun).mock.calls[0]?.[0]).not.toHaveProperty(
"childSessionKey",
);
expect(runtime.recordTaskRunProgressByRunId).toHaveBeenCalledWith(
expect.objectContaining({
runId: "codex-thread:child-thread",
runtime: "subagent",
progressSummary: "Codex native subagent is active.",
}),
);
});
it("ignores subagent threads spawned by a different parent thread", () => {
const runtime = createRuntime();
const mirror = new CodexNativeSubagentTaskMirror(
{
parentThreadId: "parent-thread",
requesterSessionKey: "agent:main:main",
},
runtime,
);
mirror.handleNotification({
method: "thread/started",
params: {
thread: {
id: "other-child",
source: {
subAgent: {
thread_spawn: {
parent_thread_id: "other-parent",
depth: 1,
},
},
},
},
},
});
expect(runtime.createRunningTaskRun).not.toHaveBeenCalled();
expect(runtime.recordTaskRunProgressByRunId).not.toHaveBeenCalled();
expect(runtime.finalizeTaskRunByRunId).not.toHaveBeenCalled();
});
it("deduplicates repeated thread-started notifications for the same child thread", () => {
const runtime = createRuntime();
const mirror = new CodexNativeSubagentTaskMirror(
{
parentThreadId: "parent-thread",
requesterSessionKey: "agent:main:main",
},
runtime,
);
const notification = {
method: "thread/started",
params: {
thread: {
id: "child-thread",
source: {
subAgent: {
thread_spawn: {
parent_thread_id: "parent-thread",
depth: 1,
},
},
},
},
},
} as const;
mirror.handleNotification(notification);
mirror.handleNotification(notification);
expect(runtime.createRunningTaskRun).toHaveBeenCalledTimes(1);
});
it("maps Codex thread status changes onto the mirrored task run", () => {
const runtime = createRuntime();
const mirror = new CodexNativeSubagentTaskMirror(
{
parentThreadId: "parent-thread",
requesterSessionKey: "agent:main:main",
now: () => 30_000,
},
runtime,
);
mirror.handleNotification({
method: "thread/status/changed",
params: {
threadId: "child-thread",
status: { type: "idle" },
},
});
mirror.handleNotification({
method: "thread/status/changed",
params: {
threadId: "failed-child",
status: { type: "systemError" },
},
});
expect(runtime.finalizeTaskRunByRunId).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
runId: codexNativeSubagentRunId("child-thread"),
runtime: "subagent",
status: "succeeded",
terminalSummary: "Codex native subagent finished.",
}),
);
expect(runtime.finalizeTaskRunByRunId).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
runId: codexNativeSubagentRunId("failed-child"),
runtime: "subagent",
status: "failed",
terminalSummary: "Codex native subagent failed.",
}),
);
});
it("creates and updates tasks from Codex collab agent item state", () => {
const runtime = createRuntime();
const mirror = new CodexNativeSubagentTaskMirror(
{
parentThreadId: "parent-thread",
requesterSessionKey: "agent:main:main",
now: () => 40_000,
},
runtime,
);
mirror.handleNotification({
method: "item/completed",
params: {
item: {
type: "collabAgentToolCall",
tool: "spawnAgent",
senderThreadId: "parent-thread",
receiverThreadIds: ["child-thread"],
prompt: "write the proof file",
agentsStates: {
"child-thread": {
status: "pendingInit",
message: null,
},
},
},
},
});
mirror.handleNotification({
method: "item/completed",
params: {
item: {
type: "collabAgentToolCall",
tool: "wait",
senderThreadId: "parent-thread",
receiverThreadIds: [],
agentsStates: {
"child-thread": {
status: "completed",
message: "done",
},
},
},
},
});
expect(runtime.createRunningTaskRun).toHaveBeenCalledWith(
expect.objectContaining({
runtime: "subagent",
taskKind: "codex-native",
sourceId: "codex-thread:child-thread",
requesterSessionKey: "agent:main:main",
ownerKey: "agent:main:main",
scopeKind: "session",
runId: "codex-thread:child-thread",
label: "Codex subagent",
task: "write the proof file",
notifyPolicy: "silent",
deliveryStatus: "not_applicable",
}),
);
expect(vi.mocked(runtime.createRunningTaskRun).mock.calls[0]?.[0]).not.toHaveProperty(
"childSessionKey",
);
expect(runtime.recordTaskRunProgressByRunId).toHaveBeenCalledWith(
expect.objectContaining({
runId: "codex-thread:child-thread",
runtime: "subagent",
progressSummary: "Codex native subagent is initializing.",
}),
);
expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith(
expect.objectContaining({
runId: "codex-thread:child-thread",
runtime: "subagent",
status: "succeeded",
terminalSummary: "done",
}),
);
});
it("preserves a completed collab agent message when the thread later goes idle", () => {
const runtime = createRuntime();
const mirror = new CodexNativeSubagentTaskMirror(
{
parentThreadId: "parent-thread",
requesterSessionKey: "agent:main:main",
now: () => 50_000,
},
runtime,
);
mirror.handleNotification({
method: "item/completed",
params: {
item: {
type: "collabAgentToolCall",
tool: "spawnAgent",
senderThreadId: "parent-thread",
receiverThreadIds: ["child-thread"],
prompt: "write the proof file",
agentsStates: {
"child-thread": {
status: "completed",
message: "No user task is specified.",
},
},
},
},
});
mirror.handleNotification({
method: "thread/status/changed",
params: {
threadId: "child-thread",
status: { type: "idle" },
},
});
expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledTimes(1);
expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith(
expect.objectContaining({
runId: "codex-thread:child-thread",
status: "succeeded",
terminalSummary: "No user task is specified.",
}),
);
});
it("normalizes collab agent status spelling from alternate event surfaces", () => {
const runtime = createRuntime();
const mirror = new CodexNativeSubagentTaskMirror(
{
parentThreadId: "parent-thread",
requesterSessionKey: "agent:main:main",
now: () => 60_000,
},
runtime,
);
mirror.handleNotification({
method: "item/completed",
params: {
item: {
type: "collabAgentToolCall",
tool: "spawnAgent",
senderThreadId: "parent-thread",
receiverThreadIds: ["child-thread"],
agentsStates: {
"child-thread": {
status: "pending_init",
message: null,
},
},
},
},
});
mirror.handleNotification({
method: "item/completed",
params: {
item: {
type: "collabAgentToolCall",
tool: "wait",
senderThreadId: "parent-thread",
agentsStates: {
"child-thread": {
status: "success",
message: "done",
},
},
},
},
});
expect(runtime.recordTaskRunProgressByRunId).toHaveBeenCalledWith(
expect.objectContaining({
runId: "codex-thread:child-thread",
progressSummary: "Codex native subagent is initializing.",
}),
);
expect(runtime.finalizeTaskRunByRunId).toHaveBeenCalledWith(
expect.objectContaining({
runId: "codex-thread:child-thread",
status: "succeeded",
terminalSummary: "done",
}),
);
});
});

View File

@@ -0,0 +1,417 @@
import {
createRunningTaskRun,
finalizeTaskRunByRunId,
recordTaskRunProgressByRunId,
} from "openclaw/plugin-sdk/codex-native-task-runtime";
import type {
CodexServerNotification,
CodexSessionSource,
CodexSubAgentThreadSpawnSource,
CodexThread,
CodexThreadStartedNotification,
CodexThreadStatus,
CodexThreadStatusChangedNotification,
JsonObject,
JsonValue,
} from "./protocol.js";
import { isJsonObject } from "./protocol.js";
const CODEX_NATIVE_SUBAGENT_RUNTIME = "subagent";
const CODEX_NATIVE_SUBAGENT_TASK_KIND = "codex-native";
export type TaskLifecycleRuntime = {
createRunningTaskRun: typeof createRunningTaskRun;
recordTaskRunProgressByRunId: typeof recordTaskRunProgressByRunId;
finalizeTaskRunByRunId: typeof finalizeTaskRunByRunId;
};
export type CodexNativeSubagentTaskMirrorParams = {
parentThreadId: string;
requesterSessionKey?: string;
agentId?: string;
now?: () => number;
};
const defaultRuntime: TaskLifecycleRuntime = {
createRunningTaskRun,
recordTaskRunProgressByRunId,
finalizeTaskRunByRunId,
};
export class CodexNativeSubagentTaskMirror {
private readonly mirroredThreadIds = new Set<string>();
private readonly terminalRunIds = new Set<string>();
private readonly now: () => number;
constructor(
private readonly params: CodexNativeSubagentTaskMirrorParams,
private readonly runtime: TaskLifecycleRuntime = defaultRuntime,
) {
this.now = params.now ?? Date.now;
}
handleNotification(notification: CodexServerNotification): void {
const params = isJsonObject(notification.params) ? notification.params : undefined;
if (!params) {
return;
}
if (notification.method === "thread/started") {
this.handleThreadStarted(params);
return;
}
if (notification.method === "thread/status/changed") {
this.handleThreadStatusChanged(params);
return;
}
if (notification.method === "item/started" || notification.method === "item/completed") {
this.handleCollabAgentItem(params);
}
}
private handleThreadStarted(params: JsonObject): void {
const notification = readThreadStartedNotification(params);
if (!notification) {
return;
}
const thread = notification.thread;
const spawn = readSubagentThreadSpawnSource(thread.source, this.params.parentThreadId);
if (!spawn) {
return;
}
const threadId = thread.id.trim();
if (!threadId || this.mirroredThreadIds.has(threadId)) {
return;
}
this.mirroredThreadIds.add(threadId);
const runId = codexNativeSubagentRunId(threadId);
const label =
trimOptional(spawn.agent_nickname) ??
trimOptional(thread.agentNickname) ??
trimOptional(spawn.agent_role) ??
trimOptional(thread.agentRole) ??
"Codex subagent";
const task =
trimOptional(thread.preview) ??
`Codex native subagent${label === "Codex subagent" ? "" : ` ${label}`}`;
const createdAt = secondsToMillis(thread.createdAt) ?? this.now();
this.runtime.createRunningTaskRun({
runtime: CODEX_NATIVE_SUBAGENT_RUNTIME,
taskKind: CODEX_NATIVE_SUBAGENT_TASK_KIND,
sourceId: runId,
requesterSessionKey: this.params.requesterSessionKey,
...(this.params.requesterSessionKey
? {
ownerKey: this.params.requesterSessionKey,
scopeKind: "session" as const,
}
: {}),
agentId: this.params.agentId,
runId,
label,
task,
notifyPolicy: "silent",
deliveryStatus: "not_applicable",
preferMetadata: true,
startedAt: createdAt,
lastEventAt: this.now(),
progressSummary: "Codex native subagent started.",
});
this.applyStatus(threadId, thread.status);
}
private handleThreadStatusChanged(params: JsonObject): void {
const notification = readThreadStatusChangedNotification(params);
if (!notification) {
return;
}
this.applyStatus(notification.threadId, notification.status);
}
private applyStatus(threadId: string, status: CodexThreadStatus | null | undefined): void {
const statusType = status?.type;
if (!statusType) {
return;
}
const runId = codexNativeSubagentRunId(threadId);
if (this.terminalRunIds.has(runId) && statusType !== "systemError") {
return;
}
const eventAt = this.now();
if (statusType === "active") {
this.runtime.recordTaskRunProgressByRunId({
runId,
runtime: CODEX_NATIVE_SUBAGENT_RUNTIME,
lastEventAt: eventAt,
progressSummary: "Codex native subagent is active.",
});
return;
}
if (statusType === "idle") {
this.terminalRunIds.add(runId);
this.runtime.finalizeTaskRunByRunId({
runId,
runtime: CODEX_NATIVE_SUBAGENT_RUNTIME,
status: "succeeded",
endedAt: eventAt,
lastEventAt: eventAt,
progressSummary: "Codex native subagent is idle.",
terminalSummary: "Codex native subagent finished.",
});
return;
}
if (statusType === "systemError") {
this.terminalRunIds.add(runId);
this.runtime.finalizeTaskRunByRunId({
runId,
runtime: CODEX_NATIVE_SUBAGENT_RUNTIME,
status: "failed",
endedAt: eventAt,
lastEventAt: eventAt,
error: "Codex app-server reported a system error for the native subagent thread.",
progressSummary: "Codex native subagent hit a system error.",
terminalSummary: "Codex native subagent failed.",
});
return;
}
if (statusType === "notLoaded") {
this.runtime.recordTaskRunProgressByRunId({
runId,
runtime: CODEX_NATIVE_SUBAGENT_RUNTIME,
lastEventAt: eventAt,
progressSummary: "Codex native subagent is not loaded.",
});
}
}
private handleCollabAgentItem(params: JsonObject): void {
const item = isJsonObject(params.item) ? params.item : undefined;
if (!item || readString(item, "type") !== "collabAgentToolCall") {
return;
}
if (readString(item, "senderThreadId") !== this.params.parentThreadId) {
return;
}
const receiverThreadIds = readStringArray(item.receiverThreadIds);
if (normalizeToolName(readString(item, "tool")) === "spawnagent") {
for (const receiverThreadId of receiverThreadIds) {
this.createTaskFromCollabSpawnItem(receiverThreadId, item);
}
}
const agentsStates = readAgentsStates(item.agentsStates);
for (const [threadId, state] of agentsStates) {
this.applyCollabAgentStatus(threadId, state.status, state.message);
}
}
private createTaskFromCollabSpawnItem(threadId: string, item: JsonObject): void {
const normalizedThreadId = threadId.trim();
if (!normalizedThreadId || this.mirroredThreadIds.has(normalizedThreadId)) {
return;
}
this.mirroredThreadIds.add(normalizedThreadId);
const prompt = trimOptional(readString(item, "prompt"));
const runId = codexNativeSubagentRunId(normalizedThreadId);
const createdAt = this.now();
this.runtime.createRunningTaskRun({
runtime: CODEX_NATIVE_SUBAGENT_RUNTIME,
taskKind: CODEX_NATIVE_SUBAGENT_TASK_KIND,
sourceId: runId,
requesterSessionKey: this.params.requesterSessionKey,
...(this.params.requesterSessionKey
? {
ownerKey: this.params.requesterSessionKey,
scopeKind: "session" as const,
}
: {}),
agentId: this.params.agentId,
runId,
label: "Codex subagent",
task: prompt ?? "Codex native subagent",
notifyPolicy: "silent",
deliveryStatus: "not_applicable",
preferMetadata: true,
startedAt: createdAt,
lastEventAt: createdAt,
progressSummary: "Codex native subagent spawned.",
});
}
private applyCollabAgentStatus(
threadId: string,
status: string | undefined,
message: string | null | undefined,
): void {
const normalizedStatus = normalizeAgentStateStatus(status);
if (!normalizedStatus) {
return;
}
const runId = codexNativeSubagentRunId(threadId);
const eventAt = this.now();
if (normalizedStatus === "pendingInit" || normalizedStatus === "running") {
this.runtime.recordTaskRunProgressByRunId({
runId,
runtime: CODEX_NATIVE_SUBAGENT_RUNTIME,
lastEventAt: eventAt,
progressSummary:
trimOptional(message) ??
(normalizedStatus === "pendingInit"
? "Codex native subagent is initializing."
: "Codex native subagent is running."),
});
return;
}
if (normalizedStatus === "completed") {
this.terminalRunIds.add(runId);
this.runtime.finalizeTaskRunByRunId({
runId,
runtime: CODEX_NATIVE_SUBAGENT_RUNTIME,
status: "succeeded",
endedAt: eventAt,
lastEventAt: eventAt,
progressSummary: trimOptional(message) ?? "Codex native subagent completed.",
terminalSummary: trimOptional(message) ?? "Codex native subagent finished.",
});
return;
}
this.terminalRunIds.add(runId);
this.runtime.finalizeTaskRunByRunId({
runId,
runtime: CODEX_NATIVE_SUBAGENT_RUNTIME,
status:
normalizedStatus === "interrupted" || normalizedStatus === "shutdown"
? "cancelled"
: "failed",
endedAt: eventAt,
lastEventAt: eventAt,
error: trimOptional(message) ?? `Codex native subagent status: ${normalizedStatus}`,
progressSummary: trimOptional(message) ?? `Codex native subagent ${normalizedStatus}.`,
terminalSummary: trimOptional(message) ?? "Codex native subagent did not complete.",
});
}
}
export function codexNativeSubagentRunId(threadId: string): string {
return `codex-thread:${threadId.trim()}`;
}
export function readSubagentThreadSpawnSource(
source: CodexSessionSource | null | undefined,
parentThreadId: string,
): CodexSubAgentThreadSpawnSource | undefined {
if (!source || typeof source !== "object" || !("subAgent" in source)) {
return undefined;
}
const subAgent = source.subAgent;
if (!subAgent || typeof subAgent !== "object" || !("thread_spawn" in subAgent)) {
return undefined;
}
const spawn = subAgent.thread_spawn;
if (!spawn || typeof spawn !== "object") {
return undefined;
}
return spawn.parent_thread_id === parentThreadId ? spawn : undefined;
}
function readThreadStartedNotification(
params: JsonObject,
): CodexThreadStartedNotification | undefined {
const thread = params.thread;
if (!isJsonObject(thread) || typeof thread.id !== "string") {
return undefined;
}
return { thread: thread as CodexThread };
}
function readThreadStatusChangedNotification(
params: JsonObject,
): CodexThreadStatusChangedNotification | undefined {
if (typeof params.threadId !== "string") {
return undefined;
}
const status = params.status;
if (!isJsonObject(status) || !isCodexThreadStatusType(status.type)) {
return undefined;
}
return {
threadId: params.threadId,
status: status as CodexThreadStatus,
};
}
function isCodexThreadStatusType(value: unknown): value is CodexThreadStatus["type"] {
return value === "notLoaded" || value === "idle" || value === "systemError" || value === "active";
}
function readAgentsStates(
value: JsonValue | undefined,
): Map<string, { status?: string; message?: string | null }> {
const states = new Map<string, { status?: string; message?: string | null }>();
if (!isJsonObject(value)) {
return states;
}
for (const [threadId, rawState] of Object.entries(value)) {
if (!isJsonObject(rawState)) {
continue;
}
const status = readString(rawState, "status");
const message = readNullableString(rawState, "message");
states.set(threadId, { status, message });
}
return states;
}
function readStringArray(value: JsonValue | undefined): string[] {
if (!Array.isArray(value)) {
return [];
}
return value.filter((entry): entry is string => typeof entry === "string" && entry.trim() !== "");
}
function readString(value: JsonObject, key: string): string | undefined {
const entry = value[key];
return typeof entry === "string" ? entry : undefined;
}
function readNullableString(value: JsonObject, key: string): string | null | undefined {
const entry = value[key];
return typeof entry === "string" || entry === null ? entry : undefined;
}
function normalizeToolName(value: string | undefined): string | undefined {
return value?.replace(/[^a-z0-9]/giu, "").toLowerCase();
}
function normalizeAgentStateStatus(value: string | undefined): string | undefined {
const key = value?.replace(/[^a-z0-9]/giu, "").toLowerCase();
if (!key) {
return undefined;
}
if (key === "pendinginit") {
return "pendingInit";
}
if (key === "inprogress" || key === "running") {
return "running";
}
if (key === "completed" || key === "succeeded" || key === "success") {
return "completed";
}
if (key === "interrupted" || key === "cancelled" || key === "canceled" || key === "shutdown") {
return key === "shutdown" ? "shutdown" : "interrupted";
}
if (key === "failed" || key === "error" || key === "systemerror") {
return "failed";
}
return value?.trim();
}
function secondsToMillis(value: number | null | undefined): number | undefined {
if (typeof value !== "number" || !Number.isFinite(value)) {
return undefined;
}
return value * 1000;
}
function trimOptional(value: string | null | undefined): string | undefined {
const trimmed = value?.trim();
return trimmed ? trimmed : undefined;
}

View File

@@ -166,7 +166,54 @@ export type CodexThread = {
id: string; id: string;
sessionId?: string; sessionId?: string;
name?: string | null; name?: string | null;
preview?: string | null;
createdAt?: number | null;
updatedAt?: number | null;
status?: CodexThreadStatus | null;
cwd?: string | null; cwd?: string | null;
source?: CodexSessionSource | null;
threadSource?: string | null;
agentNickname?: string | null;
agentRole?: string | null;
};
export type CodexThreadStatus =
| { type: "notLoaded" }
| { type: "idle" }
| { type: "systemError" }
| { type: "active"; activeFlags?: string[] };
export type CodexSubAgentThreadSpawnSource = {
parent_thread_id: string;
depth?: number;
agent_path?: string | null;
agent_nickname?: string | null;
agent_role?: string | null;
};
export type CodexSubAgentSource =
| "review"
| "compact"
| "memory_consolidation"
| { thread_spawn: CodexSubAgentThreadSpawnSource }
| { other: string };
export type CodexSessionSource =
| "cli"
| "vscode"
| "exec"
| "appServer"
| "unknown"
| { custom: string }
| { subAgent: CodexSubAgentSource };
export type CodexThreadStartedNotification = {
thread: CodexThread;
};
export type CodexThreadStatusChangedNotification = {
threadId: string;
status: CodexThreadStatus;
}; };
export type CodexThreadItem = { export type CodexThreadItem = {

View File

@@ -1,4 +1,5 @@
[ [
"codex-native-task-runtime",
"qa-channel", "qa-channel",
"qa-channel-protocol", "qa-channel-protocol",
"qa-lab", "qa-lab",

View File

@@ -0,0 +1,10 @@
// Private helper surface for the bundled Codex plugin. This is intentionally
// local-only so Codex can mirror app-server native subagents into OpenClaw's
// task registry without promoting detached task mutation helpers to the public
// plugin SDK.
export {
createRunningTaskRun,
finalizeTaskRunByRunId,
recordTaskRunProgressByRunId,
} from "../tasks/detached-task-runtime.js";

View File

@@ -443,6 +443,30 @@ describe("plugin-sdk root alias", () => {
expect(aliasMap).not.toHaveProperty("openclaw/plugin-sdk/nested/path"); expect(aliasMap).not.toHaveProperty("openclaw/plugin-sdk/nested/path");
}); });
it("keeps non-QA private local-only plugin-sdk subpaths out of the CJS root alias", () => {
const packageRoot = path.dirname(path.dirname(path.dirname(rootAliasPath)));
const sourceCodexNativeTaskRuntimePath = path.join(
packageRoot,
"src",
"plugin-sdk",
"codex-native-task-runtime.ts",
);
const sourceQaRuntimePath = path.join(packageRoot, "src", "plugin-sdk", "qa-runtime.ts");
const lazyModule = loadRootAliasWithStubs({
privateLocalOnlySubpaths: ["codex-native-task-runtime", "qa-runtime"],
existingPaths: [sourceCodexNativeTaskRuntimePath, sourceQaRuntimePath],
monolithicExports: {
slowHelper: (): string => "loaded",
},
});
expect((lazyModule.moduleExports.slowHelper as () => string)()).toBe("loaded");
const aliasMap = (lazyModule.createJitiOptions.at(-1)?.alias ?? {}) as Record<string, string>;
expect(aliasMap).not.toHaveProperty("openclaw/plugin-sdk/codex-native-task-runtime");
expect(aliasMap).not.toHaveProperty("@openclaw/plugin-sdk/codex-native-task-runtime");
expect(aliasMap).not.toHaveProperty("openclaw/plugin-sdk/qa-runtime");
});
it("builds source plugin-sdk subpath aliases through the wider source extension family", () => { it("builds source plugin-sdk subpath aliases through the wider source extension family", () => {
const packageRoot = path.dirname(path.dirname(path.dirname(rootAliasPath))); const packageRoot = path.dirname(path.dirname(path.dirname(rootAliasPath)));
const lazyModule = loadRootAliasWithStubs({ const lazyModule = loadRootAliasWithStubs({

View File

@@ -631,6 +631,51 @@ describe("plugin sdk alias helpers", () => {
expect(subpaths).toEqual(["core", "qa-channel", "qa-channel-protocol", "qa-lab", "qa-runtime"]); expect(subpaths).toEqual(["core", "qa-channel", "qa-channel-protocol", "qa-lab", "qa-runtime"]);
}); });
it("adds the non-QA private Codex task runtime subpath only for bundled Codex", () => {
const fixture = createPluginSdkAliasFixture({
packageExports: {
"./plugin-sdk/core": { default: "./dist/plugin-sdk/core.js" },
},
});
fs.writeFileSync(
path.join(fixture.root, "scripts", "lib", "plugin-sdk-private-local-only-subpaths.json"),
JSON.stringify(["codex-native-task-runtime", "qa-runtime"], null, 2),
"utf-8",
);
fs.writeFileSync(
path.join(fixture.root, "src", "plugin-sdk", "codex-native-task-runtime.ts"),
"export const codexNativeTaskRuntime = true;\n",
"utf-8",
);
fs.writeFileSync(
path.join(fixture.root, "src", "plugin-sdk", "qa-runtime.ts"),
"export const qaRuntime = true;\n",
"utf-8",
);
const sourceCodexEntry = writePluginEntry(
fixture.root,
bundledPluginFile("codex", "src/index.ts"),
);
const sourceOtherEntry = writePluginEntry(
fixture.root,
bundledPluginFile("demo", "src/index.ts"),
);
const codexSubpaths = withEnv({ OPENCLAW_ENABLE_PRIVATE_QA_CLI: undefined }, () =>
listPluginSdkExportedSubpaths({
modulePath: sourceCodexEntry,
}),
);
const otherSubpaths = withEnv({ OPENCLAW_ENABLE_PRIVATE_QA_CLI: undefined }, () =>
listPluginSdkExportedSubpaths({
modulePath: sourceOtherEntry,
}),
);
expect(codexSubpaths).toEqual(["codex-native-task-runtime", "core"]);
expect(otherSubpaths).toEqual(["core"]);
});
it("does not reuse a non-private cached subpath list after private qa gets enabled", () => { it("does not reuse a non-private cached subpath list after private qa gets enabled", () => {
const fixture = createPluginSdkAliasFixture({ const fixture = createPluginSdkAliasFixture({
packageExports: { packageExports: {
@@ -799,6 +844,60 @@ describe("plugin sdk alias helpers", () => {
); );
}); });
it("aliases non-QA private plugin-sdk subpaths for bundled runtime source loading", () => {
const fixture = createPluginSdkAliasFixture({
packageExports: {
"./plugin-sdk/core": { default: "./dist/plugin-sdk/core.js" },
},
});
const sourceRootAlias = path.join(fixture.root, "src", "plugin-sdk", "root-alias.cjs");
const sourceCodexNativeTaskRuntimePath = path.join(
fixture.root,
"src",
"plugin-sdk",
"codex-native-task-runtime.ts",
);
const sourceQaRuntimePath = path.join(fixture.root, "src", "plugin-sdk", "qa-runtime.ts");
fs.writeFileSync(sourceRootAlias, "module.exports = {};\n", "utf-8");
fs.writeFileSync(
path.join(fixture.root, "scripts", "lib", "plugin-sdk-private-local-only-subpaths.json"),
JSON.stringify(["codex-native-task-runtime", "qa-runtime"], null, 2),
"utf-8",
);
fs.writeFileSync(
sourceCodexNativeTaskRuntimePath,
"export const codexNativeTaskRuntime = true;\n",
"utf-8",
);
fs.writeFileSync(sourceQaRuntimePath, "export const qaRuntime = true;\n", "utf-8");
const sourcePluginEntry = writePluginEntry(
fixture.root,
bundledPluginFile("codex", "src/index.ts"),
);
const sourceOtherPluginEntry = writePluginEntry(
fixture.root,
bundledPluginFile("demo", "src/index.ts"),
);
const aliases = withEnv(
{ OPENCLAW_ENABLE_PRIVATE_QA_CLI: undefined, NODE_ENV: undefined },
() => buildPluginLoaderAliasMap(sourcePluginEntry),
);
const otherAliases = withEnv(
{ OPENCLAW_ENABLE_PRIVATE_QA_CLI: undefined, NODE_ENV: undefined },
() => buildPluginLoaderAliasMap(sourceOtherPluginEntry),
);
expect(fs.realpathSync(aliases["openclaw/plugin-sdk"] ?? "")).toBe(
fs.realpathSync(sourceRootAlias),
);
expect(fs.realpathSync(aliases["openclaw/plugin-sdk/codex-native-task-runtime"] ?? "")).toBe(
fs.realpathSync(sourceCodexNativeTaskRuntimePath),
);
expect(aliases["openclaw/plugin-sdk/qa-runtime"]).toBeUndefined();
expect(otherAliases["openclaw/plugin-sdk/codex-native-task-runtime"]).toBeUndefined();
});
it("applies explicit dist resolution to plugin-sdk subpath aliases too", () => { it("applies explicit dist resolution to plugin-sdk subpath aliases too", () => {
const { fixture, distRootAlias, distChannelRuntimePath } = createPluginSdkAliasTargetFixture(); const { fixture, distRootAlias, distChannelRuntimePath } = createPluginSdkAliasTargetFixture();
const sourcePluginEntry = writePluginEntry( const sourcePluginEntry = writePluginEntry(

View File

@@ -265,6 +265,7 @@ const cachedPluginSdkScopedAliasMaps = new PluginLruCache<Record<string, string>
MAX_PLUGIN_LOADER_ALIAS_CACHE_ENTRIES, MAX_PLUGIN_LOADER_ALIAS_CACHE_ENTRIES,
); );
const PLUGIN_SDK_PACKAGE_NAMES = ["openclaw/plugin-sdk", "@openclaw/plugin-sdk"] as const; const PLUGIN_SDK_PACKAGE_NAMES = ["openclaw/plugin-sdk", "@openclaw/plugin-sdk"] as const;
const CODEX_NATIVE_TASK_RUNTIME_PLUGIN_SDK_SUBPATH = "codex-native-task-runtime";
const PLUGIN_SDK_SOURCE_CANDIDATE_EXTENSIONS = [ const PLUGIN_SDK_SOURCE_CANDIDATE_EXTENSIONS = [
".ts", ".ts",
".mts", ".mts",
@@ -453,6 +454,34 @@ function shouldIncludePrivateLocalOnlyPluginSdkSubpaths() {
return process.env.OPENCLAW_ENABLE_PRIVATE_QA_CLI === "1"; return process.env.OPENCLAW_ENABLE_PRIVATE_QA_CLI === "1";
} }
function isBundledCodexPluginModulePath(params: { packageRoot: string; modulePath: string }) {
const normalizedModulePath = path.resolve(params.modulePath);
const roots = [
path.join(params.packageRoot, "extensions", "codex"),
path.join(params.packageRoot, "dist", "extensions", "codex"),
path.join(params.packageRoot, "dist-runtime", "extensions", "codex"),
];
return roots.some(
(root) =>
normalizedModulePath === root || normalizedModulePath.startsWith(`${root}${path.sep}`),
);
}
function shouldIncludePrivateLocalOnlyPluginSdkSubpath(params: {
packageRoot: string;
modulePath: string;
subpath: string;
}) {
return (
shouldIncludePrivateLocalOnlyPluginSdkSubpaths() ||
(params.subpath === CODEX_NATIVE_TASK_RUNTIME_PLUGIN_SDK_SUBPATH &&
isBundledCodexPluginModulePath({
packageRoot: params.packageRoot,
modulePath: params.modulePath,
}))
);
}
function hasPluginSdkSubpathArtifact(packageRoot: string, subpath: string) { function hasPluginSdkSubpathArtifact(packageRoot: string, subpath: string) {
const distPath = path.join(packageRoot, "dist", "plugin-sdk", `${subpath}.js`); const distPath = path.join(packageRoot, "dist", "plugin-sdk", `${subpath}.js`);
if (isUsableDistPluginSdkArtifact(distPath)) { if (isUsableDistPluginSdkArtifact(distPath)) {
@@ -478,12 +507,14 @@ function listDistPluginSdkArtifactSubpaths(packageRoot: string): Set<string> {
} }
} }
function listPrivateLocalOnlyPluginSdkSubpaths(packageRoot: string): string[] { function listPrivateLocalOnlyPluginSdkSubpaths(params: {
if (!shouldIncludePrivateLocalOnlyPluginSdkSubpaths()) { packageRoot: string;
return []; modulePath: string;
} }): string[] {
return readPrivateLocalOnlyPluginSdkSubpaths(packageRoot).filter((subpath) => return readPrivateLocalOnlyPluginSdkSubpaths(params.packageRoot).filter(
hasPluginSdkSubpathArtifact(packageRoot, subpath), (subpath) =>
shouldIncludePrivateLocalOnlyPluginSdkSubpath({ ...params, subpath }) &&
hasPluginSdkSubpathArtifact(params.packageRoot, subpath),
); );
} }
@@ -504,7 +535,8 @@ export function listPluginSdkExportedSubpaths(
if (!packageRoot) { if (!packageRoot) {
return []; return [];
} }
const cacheKey = `${packageRoot}::privateQa=${shouldIncludePrivateLocalOnlyPluginSdkSubpaths() ? "1" : "0"}`; const includeCodexPrivateRuntime = isBundledCodexPluginModulePath({ packageRoot, modulePath });
const cacheKey = `${packageRoot}::privateQa=${shouldIncludePrivateLocalOnlyPluginSdkSubpaths() ? "1" : "0"}::codexPrivate=${includeCodexPrivateRuntime ? "1" : "0"}`;
const cached = cachedPluginSdkExportedSubpaths.get(cacheKey); const cached = cachedPluginSdkExportedSubpaths.get(cacheKey);
if (cached) { if (cached) {
return cached; return cached;
@@ -512,7 +544,7 @@ export function listPluginSdkExportedSubpaths(
const subpaths = [ const subpaths = [
...new Set([ ...new Set([
...(readPluginSdkSubpathsFromPackageRoot(packageRoot) ?? []), ...(readPluginSdkSubpathsFromPackageRoot(packageRoot) ?? []),
...listPrivateLocalOnlyPluginSdkSubpaths(packageRoot), ...listPrivateLocalOnlyPluginSdkSubpaths({ packageRoot, modulePath }),
]), ]),
].toSorted(); ].toSorted();
cachedPluginSdkExportedSubpaths.set(cacheKey, subpaths); cachedPluginSdkExportedSubpaths.set(cacheKey, subpaths);
@@ -541,7 +573,8 @@ export function resolvePluginSdkScopedAliasMap(
isProduction: process.env.NODE_ENV === "production", isProduction: process.env.NODE_ENV === "production",
pluginSdkResolution: params.pluginSdkResolution, pluginSdkResolution: params.pluginSdkResolution,
}); });
const cacheKey = `${packageRoot}::${orderedKinds.join(",")}::privateQa=${shouldIncludePrivateLocalOnlyPluginSdkSubpaths() ? "1" : "0"}`; const includeCodexPrivateRuntime = isBundledCodexPluginModulePath({ packageRoot, modulePath });
const cacheKey = `${packageRoot}::${orderedKinds.join(",")}::privateQa=${shouldIncludePrivateLocalOnlyPluginSdkSubpaths() ? "1" : "0"}::codexPrivate=${includeCodexPrivateRuntime ? "1" : "0"}`;
const cached = cachedPluginSdkScopedAliasMaps.get(cacheKey); const cached = cachedPluginSdkScopedAliasMaps.get(cacheKey);
if (cached) { if (cached) {
return cached; return cached;

View File

@@ -1765,6 +1765,43 @@ describe("task-registry", () => {
}); });
}); });
it("does not mark codex-native subagent tasks lost when they have no OpenClaw child session", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const now = Date.now();
const task = createTaskRecord({
runtime: "subagent",
taskKind: "codex-native",
ownerKey: "agent:main:main",
scopeKind: "session",
sourceId: "codex-thread:child-thread",
runId: "codex-thread:child-thread",
task: "Codex native child",
status: "running",
deliveryStatus: "not_applicable",
notifyPolicy: "silent",
});
setTaskTimingById({
taskId: task.taskId,
lastEventAt: now - 10 * 60_000,
});
expect(await runTaskRegistryMaintenance()).toEqual({
reconciled: 0,
recovered: 0,
cleanupStamped: 0,
pruned: 0,
});
expect(getTaskById(task.taskId)).toMatchObject({
status: "running",
taskKind: "codex-native",
runId: "codex-thread:child-thread",
});
});
});
it("closes terminal parent-owned one-shot ACP sessions during maintenance", async () => { it("closes terminal parent-owned one-shot ACP sessions during maintenance", async () => {
await withTaskRegistryTempDir(async (root) => { await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root; process.env.OPENCLAW_STATE_DIR = root;
@@ -3012,4 +3049,39 @@ describe("task-registry", () => {
}); });
}); });
}); });
it("does not route codex-native task cancellation through OpenClaw subagent sessions", async () => {
await withTaskRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
const task = createTaskRecord({
runtime: "subagent",
taskKind: "codex-native",
ownerKey: "agent:main:main",
scopeKind: "session",
sourceId: "codex-thread:child-thread",
runId: "codex-thread:child-thread",
task: "Codex native child",
status: "running",
deliveryStatus: "not_applicable",
notifyPolicy: "silent",
});
const result = await cancelTaskById({
cfg: {} as never,
taskId: task.taskId,
});
expect(result).toMatchObject({
found: true,
cancelled: false,
reason: "Task has no cancellable child session.",
task: expect.objectContaining({
taskId: task.taskId,
status: "running",
}),
});
expect(hoisted.killSubagentRunAdminMock).not.toHaveBeenCalled();
});
});
}); });

View File

@@ -283,6 +283,8 @@ function buildUnifiedDistEntries(): Record<string, string> {
...dockerE2eHarnessEntries, ...dockerE2eHarnessEntries,
// Internal compat artifact for the root-alias.cjs lazy loader. // Internal compat artifact for the root-alias.cjs lazy loader.
"plugin-sdk/compat": "src/plugin-sdk/compat.ts", "plugin-sdk/compat": "src/plugin-sdk/compat.ts",
// Private bundled Codex helper for app-server native subagent task mirroring.
"plugin-sdk/codex-native-task-runtime": "src/plugin-sdk/codex-native-task-runtime.ts",
...Object.fromEntries( ...Object.fromEntries(
Object.entries(buildPluginSdkEntrySources()).map(([entry, source]) => [ Object.entries(buildPluginSdkEntrySources()).map(([entry, source]) => [
`plugin-sdk/${entry}`, `plugin-sdk/${entry}`,