mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-13 15:47:28 +00:00
feat(gateway): add SDK task ledger RPCs (#74847)
Adds Gateway task ledger RPCs and SDK methods for listing, fetching, and cancelling durable background tasks. Includes protocol schemas/scopes, generated Swift models, docs, and tests. Public task summary text is sanitized before SDK exposure.
This commit is contained in:
@@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Telegram/Feishu: honor configured per-agent and global `reasoningDefault` values when deciding whether channel reasoning previews should stream or stay hidden, addressing the preview-default part of #73182. Thanks @anagnorisis2peripeteia.
|
||||
- Docker: run the runtime image under `tini` so long-lived containers reap orphaned child processes and forward signals correctly. (#77885) Thanks @VintageAyu.
|
||||
- Logging/redaction: redact quoted HTTP client secret fields and auth/cookie headers in shared log and formatted error output. Related #71211 and #65623. (#75033) Thanks @liaoandi.
|
||||
- Gateway/SDK: document and stabilize the task ledger RPC surface for `tasks.list`, `tasks.get`, and `tasks.cancel`, including generated Swift model typing for optional task summaries. Thanks @BunsDev.
|
||||
- Google/Gemini: normalize retired `google/gemini-3-pro-preview` and `google-gemini-cli/gemini-3-pro-preview` selections to `google/gemini-3.1-pro-preview` before they are written to model config.
|
||||
- Google/Gemini: emit canonical `google/gemini-3.1-pro-preview` ids from configured provider catalog rows so model list and selection paths can test Gemini 3.1 instead of retired Gemini 3 Pro.
|
||||
- Google/Gemini: normalize nested proxy-provider catalog ids like `google/gemini-3-pro-preview` to `google/gemini-3.1-pro-preview`, so Kilo-style configured catalogs test Gemini 3.1 instead of the retired Gemini 3 Pro id.
|
||||
|
||||
@@ -165,14 +165,14 @@ public struct ResponseFrame: Codable, Sendable {
|
||||
public let id: String
|
||||
public let ok: Bool
|
||||
public let payload: AnyCodable?
|
||||
public let error: [String: AnyCodable]?
|
||||
public let error: ErrorShape?
|
||||
|
||||
public init(
|
||||
type: String,
|
||||
id: String,
|
||||
ok: Bool,
|
||||
payload: AnyCodable?,
|
||||
error: [String: AnyCodable]?)
|
||||
error: ErrorShape?)
|
||||
{
|
||||
self.type = type
|
||||
self.id = id
|
||||
@@ -195,14 +195,14 @@ public struct EventFrame: Codable, Sendable {
|
||||
public let event: String
|
||||
public let payload: AnyCodable?
|
||||
public let seq: Int?
|
||||
public let stateversion: [String: AnyCodable]?
|
||||
public let stateversion: StateVersion?
|
||||
|
||||
public init(
|
||||
type: String,
|
||||
event: String,
|
||||
payload: AnyCodable?,
|
||||
seq: Int?,
|
||||
stateversion: [String: AnyCodable]?)
|
||||
stateversion: StateVersion?)
|
||||
{
|
||||
self.type = type
|
||||
self.event = event
|
||||
@@ -2288,6 +2288,220 @@ public struct SessionsUsageParams: Codable, Sendable {
|
||||
}
|
||||
}
|
||||
|
||||
public struct TaskSummary: Codable, Sendable {
|
||||
public let id: String
|
||||
public let kind: String?
|
||||
public let runtime: String?
|
||||
public let status: AnyCodable
|
||||
public let title: String?
|
||||
public let agentid: String?
|
||||
public let sessionkey: String?
|
||||
public let childsessionkey: String?
|
||||
public let ownerkey: String?
|
||||
public let runid: String?
|
||||
public let taskid: String?
|
||||
public let flowid: String?
|
||||
public let parenttaskid: String?
|
||||
public let sourceid: String?
|
||||
public let createdat: AnyCodable?
|
||||
public let updatedat: AnyCodable?
|
||||
public let startedat: AnyCodable?
|
||||
public let endedat: AnyCodable?
|
||||
public let progresssummary: String?
|
||||
public let terminalsummary: String?
|
||||
public let error: String?
|
||||
|
||||
public init(
|
||||
id: String,
|
||||
kind: String?,
|
||||
runtime: String?,
|
||||
status: AnyCodable,
|
||||
title: String?,
|
||||
agentid: String?,
|
||||
sessionkey: String?,
|
||||
childsessionkey: String?,
|
||||
ownerkey: String?,
|
||||
runid: String?,
|
||||
taskid: String?,
|
||||
flowid: String?,
|
||||
parenttaskid: String?,
|
||||
sourceid: String?,
|
||||
createdat: AnyCodable?,
|
||||
updatedat: AnyCodable?,
|
||||
startedat: AnyCodable?,
|
||||
endedat: AnyCodable?,
|
||||
progresssummary: String?,
|
||||
terminalsummary: String?,
|
||||
error: String?)
|
||||
{
|
||||
self.id = id
|
||||
self.kind = kind
|
||||
self.runtime = runtime
|
||||
self.status = status
|
||||
self.title = title
|
||||
self.agentid = agentid
|
||||
self.sessionkey = sessionkey
|
||||
self.childsessionkey = childsessionkey
|
||||
self.ownerkey = ownerkey
|
||||
self.runid = runid
|
||||
self.taskid = taskid
|
||||
self.flowid = flowid
|
||||
self.parenttaskid = parenttaskid
|
||||
self.sourceid = sourceid
|
||||
self.createdat = createdat
|
||||
self.updatedat = updatedat
|
||||
self.startedat = startedat
|
||||
self.endedat = endedat
|
||||
self.progresssummary = progresssummary
|
||||
self.terminalsummary = terminalsummary
|
||||
self.error = error
|
||||
}
|
||||
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case id
|
||||
case kind
|
||||
case runtime
|
||||
case status
|
||||
case title
|
||||
case agentid = "agentId"
|
||||
case sessionkey = "sessionKey"
|
||||
case childsessionkey = "childSessionKey"
|
||||
case ownerkey = "ownerKey"
|
||||
case runid = "runId"
|
||||
case taskid = "taskId"
|
||||
case flowid = "flowId"
|
||||
case parenttaskid = "parentTaskId"
|
||||
case sourceid = "sourceId"
|
||||
case createdat = "createdAt"
|
||||
case updatedat = "updatedAt"
|
||||
case startedat = "startedAt"
|
||||
case endedat = "endedAt"
|
||||
case progresssummary = "progressSummary"
|
||||
case terminalsummary = "terminalSummary"
|
||||
case error
|
||||
}
|
||||
}
|
||||
|
||||
public struct TasksListParams: Codable, Sendable {
|
||||
public let status: AnyCodable?
|
||||
public let agentid: String?
|
||||
public let sessionkey: String?
|
||||
public let limit: Int?
|
||||
public let cursor: String?
|
||||
|
||||
public init(
|
||||
status: AnyCodable?,
|
||||
agentid: String?,
|
||||
sessionkey: String?,
|
||||
limit: Int?,
|
||||
cursor: String?)
|
||||
{
|
||||
self.status = status
|
||||
self.agentid = agentid
|
||||
self.sessionkey = sessionkey
|
||||
self.limit = limit
|
||||
self.cursor = cursor
|
||||
}
|
||||
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case status
|
||||
case agentid = "agentId"
|
||||
case sessionkey = "sessionKey"
|
||||
case limit
|
||||
case cursor
|
||||
}
|
||||
}
|
||||
|
||||
public struct TasksListResult: Codable, Sendable {
|
||||
public let tasks: [TaskSummary]
|
||||
public let nextcursor: String?
|
||||
|
||||
public init(
|
||||
tasks: [TaskSummary],
|
||||
nextcursor: String?)
|
||||
{
|
||||
self.tasks = tasks
|
||||
self.nextcursor = nextcursor
|
||||
}
|
||||
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case tasks
|
||||
case nextcursor = "nextCursor"
|
||||
}
|
||||
}
|
||||
|
||||
public struct TasksGetParams: Codable, Sendable {
|
||||
public let taskid: String
|
||||
|
||||
public init(
|
||||
taskid: String)
|
||||
{
|
||||
self.taskid = taskid
|
||||
}
|
||||
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case taskid = "taskId"
|
||||
}
|
||||
}
|
||||
|
||||
public struct TasksGetResult: Codable, Sendable {
|
||||
public let task: TaskSummary
|
||||
|
||||
public init(
|
||||
task: TaskSummary)
|
||||
{
|
||||
self.task = task
|
||||
}
|
||||
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case task
|
||||
}
|
||||
}
|
||||
|
||||
public struct TasksCancelParams: Codable, Sendable {
|
||||
public let taskid: String
|
||||
public let reason: String?
|
||||
|
||||
public init(
|
||||
taskid: String,
|
||||
reason: String?)
|
||||
{
|
||||
self.taskid = taskid
|
||||
self.reason = reason
|
||||
}
|
||||
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case taskid = "taskId"
|
||||
case reason
|
||||
}
|
||||
}
|
||||
|
||||
public struct TasksCancelResult: Codable, Sendable {
|
||||
public let found: Bool
|
||||
public let cancelled: Bool
|
||||
public let reason: String?
|
||||
public let task: TaskSummary?
|
||||
|
||||
public init(
|
||||
found: Bool,
|
||||
cancelled: Bool,
|
||||
reason: String?,
|
||||
task: TaskSummary?)
|
||||
{
|
||||
self.found = found
|
||||
self.cancelled = cancelled
|
||||
self.reason = reason
|
||||
self.task = task
|
||||
}
|
||||
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case found
|
||||
case cancelled
|
||||
case reason
|
||||
case task
|
||||
}
|
||||
}
|
||||
|
||||
public struct ConfigGetParams: Codable, Sendable {}
|
||||
|
||||
public struct ConfigSetParams: Codable, Sendable {
|
||||
@@ -2564,13 +2778,13 @@ public struct WizardStep: Codable, Sendable {
|
||||
|
||||
public struct WizardNextResult: Codable, Sendable {
|
||||
public let done: Bool
|
||||
public let step: [String: AnyCodable]?
|
||||
public let step: WizardStep?
|
||||
public let status: AnyCodable?
|
||||
public let error: String?
|
||||
|
||||
public init(
|
||||
done: Bool,
|
||||
step: [String: AnyCodable]?,
|
||||
step: WizardStep?,
|
||||
status: AnyCodable?,
|
||||
error: String?)
|
||||
{
|
||||
@@ -2591,14 +2805,14 @@ public struct WizardNextResult: Codable, Sendable {
|
||||
public struct WizardStartResult: Codable, Sendable {
|
||||
public let sessionid: String
|
||||
public let done: Bool
|
||||
public let step: [String: AnyCodable]?
|
||||
public let step: WizardStep?
|
||||
public let status: AnyCodable?
|
||||
public let error: String?
|
||||
|
||||
public init(
|
||||
sessionid: String,
|
||||
done: Bool,
|
||||
step: [String: AnyCodable]?,
|
||||
step: WizardStep?,
|
||||
status: AnyCodable?,
|
||||
error: String?)
|
||||
{
|
||||
@@ -4541,7 +4755,7 @@ public struct ToolsInvokeResult: Codable, Sendable {
|
||||
public let requiresapproval: Bool?
|
||||
public let approvalid: String?
|
||||
public let source: AnyCodable?
|
||||
public let error: [String: AnyCodable]?
|
||||
public let error: ToolsInvokeError?
|
||||
|
||||
public init(
|
||||
ok: Bool,
|
||||
@@ -4550,7 +4764,7 @@ public struct ToolsInvokeResult: Codable, Sendable {
|
||||
requiresapproval: Bool?,
|
||||
approvalid: String?,
|
||||
source: AnyCodable?,
|
||||
error: [String: AnyCodable]?)
|
||||
error: ToolsInvokeError?)
|
||||
{
|
||||
self.ok = ok
|
||||
self.toolname = toolname
|
||||
|
||||
@@ -37,6 +37,7 @@ resources.
|
||||
| `Run.cancel()` | Ready | Calls `sessions.abort` by run id, with session key when available. |
|
||||
| `oc.sessions` | Ready | Creates, resolves, sends to, patches, compacts, and gets session handles. |
|
||||
| `Session.send()` | Ready | Calls `sessions.send` and returns a `Run`. |
|
||||
| `oc.tasks` | Ready | Lists, reads, and cancels Gateway task ledger entries. |
|
||||
| `oc.models` | Ready | Calls `models.list` and the current `models.authStatus` status RPC. |
|
||||
| `oc.tools` | Ready | Lists, scopes, and invokes Gateway tools through the policy pipeline. |
|
||||
| `oc.artifacts` | Ready | Lists, gets, and downloads Gateway transcript artifacts. |
|
||||
@@ -50,7 +51,9 @@ The SDK also exports the core types used by those surfaces:
|
||||
`OpenClawEventType`, `GatewayEvent`, `OpenClawTransport`,
|
||||
`GatewayRequestOptions`, `SessionCreateParams`, `SessionSendParams`,
|
||||
`ArtifactSummary`, `ArtifactQuery`, `ArtifactsListResult`,
|
||||
`ArtifactsGetResult`, `ArtifactsDownloadResult`, `RuntimeSelection`,
|
||||
`ArtifactsGetResult`, `ArtifactsDownloadResult`,
|
||||
`TaskSummary`, `TaskStatus`, `TasksListParams`, `TasksListResult`,
|
||||
`TasksGetResult`, `TasksCancelResult`, `RuntimeSelection`,
|
||||
`EnvironmentSelection`, `WorkspaceSelection`, `ApprovalMode`, and related
|
||||
result types.
|
||||
|
||||
@@ -254,6 +257,14 @@ const approvals = await oc.approvals.list();
|
||||
await oc.approvals.respond("approval-id", { decision: "approve" });
|
||||
```
|
||||
|
||||
Task helpers use the durable task ledger that also backs `openclaw tasks`:
|
||||
|
||||
```typescript
|
||||
const tasks = await oc.tasks.list({ status: "running", sessionKey: "agent:main:main" });
|
||||
const task = await oc.tasks.get(tasks.tasks[0].id);
|
||||
await oc.tasks.cancel(task.task.id, { reason: "user stopped task" });
|
||||
```
|
||||
|
||||
Environment helpers expose read-only Gateway-local and node discovery:
|
||||
|
||||
```typescript
|
||||
@@ -268,10 +279,6 @@ pretend Gateway RPCs exist. These calls currently throw explicit unsupported
|
||||
errors:
|
||||
|
||||
```typescript
|
||||
await oc.tasks.list();
|
||||
await oc.tasks.get("task-id");
|
||||
await oc.tasks.cancel("task-id");
|
||||
|
||||
await oc.environments.create({});
|
||||
await oc.environments.delete("environment-id");
|
||||
```
|
||||
|
||||
@@ -411,6 +411,7 @@ enumeration of `src/gateway/server-methods/*.ts`.
|
||||
- `agents.list` returns configured agent entries, including effective model and runtime metadata.
|
||||
- `agents.create`, `agents.update`, and `agents.delete` manage agent records and workspace wiring.
|
||||
- `agents.files.list`, `agents.files.get`, and `agents.files.set` manage the bootstrap workspace files exposed for an agent.
|
||||
- `tasks.list`, `tasks.get`, and `tasks.cancel` expose the Gateway task ledger to SDK and operator clients.
|
||||
- `artifacts.list`, `artifacts.get`, and `artifacts.download` expose transcript-derived artifact summaries and downloads for an explicit `sessionKey`, `runId`, or `taskId` scope. Run and task queries resolve the owning session server-side and only return transcript media with matching provenance; unsafe or local URL sources return unsupported downloads instead of fetching server-side.
|
||||
- `environments.list` and `environments.status` expose read-only Gateway-local and node environment discovery for SDK clients.
|
||||
- `agent.identity.get` returns the effective assistant identity for an agent or session.
|
||||
@@ -499,6 +500,34 @@ enumeration of `src/gateway/server-methods/*.ts`.
|
||||
- Nodes may call `skills.bins` to fetch the current list of skill executables
|
||||
for auto-allow checks.
|
||||
|
||||
### Task ledger RPCs
|
||||
|
||||
Operator clients may inspect and cancel Gateway background task records through
|
||||
the task ledger RPCs. These methods return sanitized task summaries, not raw
|
||||
runtime state.
|
||||
|
||||
- `tasks.list` requires `operator.read`.
|
||||
- Params: optional `status` (`"queued"`, `"running"`, `"completed"`,
|
||||
`"failed"`, `"cancelled"`, or `"timed_out"`) or an array of those statuses,
|
||||
optional `agentId`, optional `sessionKey`, optional `limit` from `1` to
|
||||
`500`, and optional string `cursor`.
|
||||
- Result: `{ "tasks": TaskSummary[], "nextCursor"?: string }`.
|
||||
- `tasks.get` requires `operator.read`.
|
||||
- Params: `{ "taskId": string }`.
|
||||
- Result: `{ "task": TaskSummary }`.
|
||||
- Missing task ids return the Gateway not-found error shape.
|
||||
- `tasks.cancel` requires `operator.write`.
|
||||
- Params: `{ "taskId": string, "reason"?: string }`.
|
||||
- Result:
|
||||
`{ "found": boolean, "cancelled": boolean, "reason"?: string, "task"?: TaskSummary }`.
|
||||
- `found` reports whether the ledger had a matching task. `cancelled`
|
||||
reports whether the runtime accepted or recorded cancellation.
|
||||
|
||||
`TaskSummary` includes `id`, `status`, and optional metadata such as `kind`,
|
||||
`runtime`, `title`, `agentId`, `sessionKey`, `childSessionKey`, `ownerKey`,
|
||||
`runId`, `taskId`, `flowId`, `parentTaskId`, `sourceId`, timestamps, progress,
|
||||
terminal summary, and sanitized error text.
|
||||
|
||||
### Operator helper methods
|
||||
|
||||
- Operators may call `commands.list` (`operator.read`) to fetch the runtime
|
||||
|
||||
@@ -49,9 +49,9 @@ oc.runs.events(runId, { after });
|
||||
oc.runs.wait(runId);
|
||||
oc.runs.cancel(runId);
|
||||
|
||||
oc.tasks.list(); // future API: current SDK throws unsupported
|
||||
oc.tasks.get(taskId); // future API: current SDK throws unsupported
|
||||
oc.tasks.cancel(taskId); // future API: current SDK throws unsupported
|
||||
oc.tasks.list({ status: "running" });
|
||||
oc.tasks.get(taskId);
|
||||
oc.tasks.cancel(taskId, { reason });
|
||||
oc.tasks.events(taskId, { after }); // future API
|
||||
|
||||
oc.models.list();
|
||||
|
||||
@@ -20,6 +20,10 @@ import type {
|
||||
SessionCreateParams,
|
||||
SessionSendParams,
|
||||
SessionTarget,
|
||||
TasksCancelResult,
|
||||
TasksGetResult,
|
||||
TasksListParams,
|
||||
TasksListResult,
|
||||
ToolInvokeParams,
|
||||
ToolInvokeResult,
|
||||
} from "./types.js";
|
||||
@@ -725,19 +729,19 @@ export class TasksNamespace extends RpcNamespace {
|
||||
super(client, "tasks");
|
||||
}
|
||||
|
||||
async list(params?: unknown): Promise<unknown> {
|
||||
void params;
|
||||
return unsupportedGatewayApi("oc.tasks.list");
|
||||
async list(params?: TasksListParams): Promise<TasksListResult> {
|
||||
return await this.call("list", params);
|
||||
}
|
||||
|
||||
async get(taskId: string): Promise<unknown> {
|
||||
void taskId;
|
||||
return unsupportedGatewayApi("oc.tasks.get");
|
||||
async get(taskId: string): Promise<TasksGetResult> {
|
||||
return await this.call("get", { taskId });
|
||||
}
|
||||
|
||||
async cancel(taskId: string): Promise<unknown> {
|
||||
void taskId;
|
||||
return unsupportedGatewayApi("oc.tasks.cancel");
|
||||
async cancel(taskId: string, options?: { reason?: string }): Promise<TasksCancelResult> {
|
||||
return await this.call("cancel", {
|
||||
taskId,
|
||||
...(options?.reason ? { reason: options.reason } : {}),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -114,6 +114,9 @@ async function createFakeGateway(port = 0): Promise<FakeGateway> {
|
||||
"sessions.patch",
|
||||
"sessions.resolve",
|
||||
"sessions.send",
|
||||
"tasks.cancel",
|
||||
"tasks.get",
|
||||
"tasks.list",
|
||||
"tools.catalog",
|
||||
"tools.effective",
|
||||
"tools.invoke",
|
||||
@@ -250,6 +253,44 @@ async function createFakeGateway(port = 0): Promise<FakeGateway> {
|
||||
return;
|
||||
}
|
||||
|
||||
if (frame.method === "tasks.list") {
|
||||
reply({
|
||||
tasks: [
|
||||
{
|
||||
id: "task-sdk-e2e",
|
||||
status: "running",
|
||||
title: "SDK task",
|
||||
runId: "run-sdk-e2e",
|
||||
sessionKey: "sdk-session",
|
||||
},
|
||||
],
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (frame.method === "tasks.get") {
|
||||
reply({
|
||||
task: {
|
||||
id: (frame.params as { taskId?: string } | undefined)?.taskId ?? "task-sdk-e2e",
|
||||
status: "running",
|
||||
title: "SDK task",
|
||||
},
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (frame.method === "tasks.cancel") {
|
||||
reply({
|
||||
found: true,
|
||||
cancelled: true,
|
||||
task: {
|
||||
id: (frame.params as { taskId?: string } | undefined)?.taskId ?? "task-sdk-e2e",
|
||||
status: "cancelled",
|
||||
},
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (frame.method === "models.list") {
|
||||
reply({ models: [{ id: "gpt-5.4" }] });
|
||||
return;
|
||||
@@ -427,6 +468,16 @@ describe("OpenClaw SDK websocket e2e", () => {
|
||||
method: "sessions.compact",
|
||||
});
|
||||
|
||||
await expect(oc.tasks.list({ status: "running" })).resolves.toMatchObject({
|
||||
tasks: [{ id: "task-sdk-e2e" }],
|
||||
});
|
||||
await expect(oc.tasks.get("task-sdk-e2e")).resolves.toMatchObject({
|
||||
task: { id: "task-sdk-e2e" },
|
||||
});
|
||||
await expect(oc.tasks.cancel("task-sdk-e2e")).resolves.toMatchObject({
|
||||
cancelled: true,
|
||||
});
|
||||
|
||||
await expect(oc.models.list()).resolves.toMatchObject({ models: [{ id: "gpt-5.4" }] });
|
||||
await expect(oc.models.status({ probe: false })).resolves.toMatchObject({ providers: [] });
|
||||
await expect(oc.tools.list()).resolves.toMatchObject({ tools: [{ name: "shell" }] });
|
||||
@@ -455,6 +506,9 @@ describe("OpenClaw SDK websocket e2e", () => {
|
||||
"sessions.abort",
|
||||
"sessions.patch",
|
||||
"sessions.compact",
|
||||
"tasks.list",
|
||||
"tasks.get",
|
||||
"tasks.cancel",
|
||||
"models.list",
|
||||
"models.authStatus",
|
||||
"tools.catalog",
|
||||
|
||||
@@ -334,15 +334,6 @@ describe("OpenClaw SDK", () => {
|
||||
const transport = new FakeTransport({});
|
||||
const oc = new OpenClaw({ transport });
|
||||
|
||||
await expect(oc.tasks.list()).rejects.toThrow(
|
||||
"oc.tasks.list is not supported by the current OpenClaw Gateway yet",
|
||||
);
|
||||
await expect(oc.tasks.get("task_123")).rejects.toThrow(
|
||||
"oc.tasks.get is not supported by the current OpenClaw Gateway yet",
|
||||
);
|
||||
await expect(oc.tasks.cancel("task_123")).rejects.toThrow(
|
||||
"oc.tasks.cancel is not supported by the current OpenClaw Gateway yet",
|
||||
);
|
||||
await expect(oc.environments.create({ provider: "testbox" })).rejects.toThrow(
|
||||
"oc.environments.create is not supported by the current OpenClaw Gateway yet",
|
||||
);
|
||||
@@ -381,6 +372,70 @@ describe("OpenClaw SDK", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it("calls task ledger Gateway methods", async () => {
|
||||
const transport = new FakeTransport({
|
||||
"tasks.list": {
|
||||
tasks: [
|
||||
{
|
||||
id: "task_123",
|
||||
status: "running",
|
||||
title: "Investigate issue",
|
||||
runId: "run_123",
|
||||
sessionKey: "agent:main:main",
|
||||
},
|
||||
],
|
||||
},
|
||||
"tasks.get": {
|
||||
task: {
|
||||
id: "task_123",
|
||||
status: "running",
|
||||
title: "Investigate issue",
|
||||
},
|
||||
},
|
||||
"tasks.cancel": {
|
||||
found: true,
|
||||
cancelled: true,
|
||||
task: {
|
||||
id: "task_123",
|
||||
status: "cancelled",
|
||||
},
|
||||
},
|
||||
});
|
||||
const oc = new OpenClaw({ transport });
|
||||
|
||||
await expect(
|
||||
oc.tasks.list({ status: "running", agentId: "main", sessionKey: "agent:main:main" }),
|
||||
).resolves.toMatchObject({ tasks: [{ id: "task_123", status: "running" }] });
|
||||
await expect(oc.tasks.get("task_123")).resolves.toMatchObject({
|
||||
task: { id: "task_123" },
|
||||
});
|
||||
await expect(
|
||||
oc.tasks.cancel("task_123", { reason: "user stopped task" }),
|
||||
).resolves.toMatchObject({
|
||||
found: true,
|
||||
cancelled: true,
|
||||
task: { status: "cancelled" },
|
||||
});
|
||||
|
||||
expect(transport.calls).toEqual([
|
||||
{
|
||||
method: "tasks.list",
|
||||
params: { status: "running", agentId: "main", sessionKey: "agent:main:main" },
|
||||
options: undefined,
|
||||
},
|
||||
{
|
||||
method: "tasks.get",
|
||||
params: { taskId: "task_123" },
|
||||
options: undefined,
|
||||
},
|
||||
{
|
||||
method: "tasks.cancel",
|
||||
params: { taskId: "task_123", reason: "user stopped task" },
|
||||
options: undefined,
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("lists and reads environment status through current Gateway methods", async () => {
|
||||
const gatewayEnvironment = {
|
||||
id: "gateway",
|
||||
|
||||
@@ -44,6 +44,12 @@ export type {
|
||||
SessionCreateParams,
|
||||
SessionSendParams,
|
||||
SessionTarget,
|
||||
TaskStatus,
|
||||
TaskSummary,
|
||||
TasksCancelResult,
|
||||
TasksGetResult,
|
||||
TasksListParams,
|
||||
TasksListResult,
|
||||
ToolInvokeParams,
|
||||
ToolInvokeResult,
|
||||
WorkspaceSelection,
|
||||
|
||||
@@ -120,6 +120,56 @@ export type ArtifactsDownloadResult = {
|
||||
url?: string;
|
||||
};
|
||||
|
||||
export type TaskStatus = "queued" | "running" | "completed" | "failed" | "cancelled" | "timed_out";
|
||||
|
||||
export type TaskSummary = {
|
||||
id: string;
|
||||
taskId?: string;
|
||||
kind?: string;
|
||||
runtime?: string;
|
||||
status: TaskStatus;
|
||||
title?: string;
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
childSessionKey?: string;
|
||||
ownerKey?: string;
|
||||
runId?: string;
|
||||
flowId?: string;
|
||||
parentTaskId?: string;
|
||||
sourceId?: string;
|
||||
createdAt?: RunTimestamp;
|
||||
updatedAt?: RunTimestamp;
|
||||
startedAt?: RunTimestamp;
|
||||
endedAt?: RunTimestamp;
|
||||
progressSummary?: string;
|
||||
terminalSummary?: string;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
export type TasksListParams = {
|
||||
status?: TaskStatus | TaskStatus[];
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
limit?: number;
|
||||
cursor?: string;
|
||||
};
|
||||
|
||||
export type TasksListResult = {
|
||||
tasks: TaskSummary[];
|
||||
nextCursor?: string;
|
||||
};
|
||||
|
||||
export type TasksGetResult = {
|
||||
task: TaskSummary;
|
||||
};
|
||||
|
||||
export type TasksCancelResult = {
|
||||
found: boolean;
|
||||
cancelled: boolean;
|
||||
reason?: string;
|
||||
task?: TaskSummary;
|
||||
};
|
||||
|
||||
export type SDKError = {
|
||||
code?: string;
|
||||
message: string;
|
||||
|
||||
@@ -175,7 +175,7 @@ function emitStruct(name: string, schema: JsonSchema): string {
|
||||
const codingKeys: string[] = [];
|
||||
for (const [key, propSchema] of Object.entries(props)) {
|
||||
const propName = safeName(key);
|
||||
const propType = swiftType(propSchema, required.has(key));
|
||||
const propType = swiftType(propSchema, required.has(key), true);
|
||||
lines.push(` public let ${propName}: ${propType}`);
|
||||
if (propName !== key) {
|
||||
codingKeys.push(` case ${propName} = "${key}"`);
|
||||
@@ -189,7 +189,7 @@ function emitStruct(name: string, schema: JsonSchema): string {
|
||||
.map(([key, prop]) => {
|
||||
const propName = safeName(key);
|
||||
const req = required.has(key);
|
||||
return ` ${propName}: ${swiftType(prop, true)}${req ? "" : "?"}`;
|
||||
return ` ${propName}: ${swiftType(prop, true, true)}${req ? "" : "?"}`;
|
||||
})
|
||||
.join(",\n") +
|
||||
")\n" +
|
||||
|
||||
@@ -29,10 +29,13 @@ afterEach(() => {
|
||||
describe("method scope resolution", () => {
|
||||
it.each([
|
||||
["sessions.resolve", ["operator.read"]],
|
||||
["tasks.list", ["operator.read"]],
|
||||
["tasks.get", ["operator.read"]],
|
||||
["config.schema.lookup", ["operator.read"]],
|
||||
["sessions.create", ["operator.write"]],
|
||||
["sessions.send", ["operator.write"]],
|
||||
["sessions.abort", ["operator.write"]],
|
||||
["tasks.cancel", ["operator.write"]],
|
||||
["tools.invoke", ["operator.write"]],
|
||||
["sessions.messages.subscribe", ["operator.read"]],
|
||||
["sessions.messages.unsubscribe", ["operator.read"]],
|
||||
|
||||
@@ -86,6 +86,8 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
|
||||
"models.authStatus",
|
||||
"tools.catalog",
|
||||
"tools.effective",
|
||||
"tasks.list",
|
||||
"tasks.get",
|
||||
"plugins.uiDescriptors",
|
||||
"agents.list",
|
||||
"agent.identity.get",
|
||||
@@ -165,6 +167,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
|
||||
"sessions.send",
|
||||
"sessions.steer",
|
||||
"sessions.abort",
|
||||
"tasks.cancel",
|
||||
"sessions.compaction.branch",
|
||||
"doctor.memory.backfillDreamDiary",
|
||||
"doctor.memory.resetDreamDiary",
|
||||
|
||||
@@ -6,6 +6,8 @@ import {
|
||||
validateModelsListParams,
|
||||
validateNodeEventResult,
|
||||
validateNodePresenceAlivePayload,
|
||||
validateTasksCancelParams,
|
||||
validateTasksListParams,
|
||||
validateTalkConfigResult,
|
||||
validateTalkEvent,
|
||||
validateTalkClientCreateParams,
|
||||
@@ -457,6 +459,25 @@ describe("validateModelsListParams", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("validateTasksListParams", () => {
|
||||
it("accepts SDK task ledger filters", () => {
|
||||
expect(
|
||||
validateTasksListParams({
|
||||
status: ["running", "completed"],
|
||||
agentId: "main",
|
||||
sessionKey: "agent:main:main",
|
||||
limit: 50,
|
||||
cursor: "100",
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("rejects internal task statuses and unknown fields", () => {
|
||||
expect(validateTasksListParams({ status: "succeeded" })).toBe(false);
|
||||
expect(validateTasksCancelParams({ taskId: "task-1", force: true })).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("validateNodePresenceAlivePayload", () => {
|
||||
it("accepts a closed trigger and known metadata fields", () => {
|
||||
expect(
|
||||
|
||||
@@ -328,6 +328,20 @@ import {
|
||||
SessionsSendParamsSchema,
|
||||
type SessionsUsageParams,
|
||||
SessionsUsageParamsSchema,
|
||||
type TaskSummary,
|
||||
TaskSummarySchema,
|
||||
type TasksCancelParams,
|
||||
TasksCancelParamsSchema,
|
||||
type TasksCancelResult,
|
||||
TasksCancelResultSchema,
|
||||
type TasksGetParams,
|
||||
TasksGetParamsSchema,
|
||||
type TasksGetResult,
|
||||
TasksGetResultSchema,
|
||||
type TasksListParams,
|
||||
TasksListParamsSchema,
|
||||
type TasksListResult,
|
||||
TasksListResultSchema,
|
||||
type ShutdownEvent,
|
||||
ShutdownEventSchema,
|
||||
type SkillsBinsParams,
|
||||
@@ -540,6 +554,9 @@ export const validateSessionsCompactionRestoreParams = ajv.compile<SessionsCompa
|
||||
);
|
||||
export const validateSessionsUsageParams =
|
||||
ajv.compile<SessionsUsageParams>(SessionsUsageParamsSchema);
|
||||
export const validateTasksListParams = ajv.compile<TasksListParams>(TasksListParamsSchema);
|
||||
export const validateTasksGetParams = ajv.compile<TasksGetParams>(TasksGetParamsSchema);
|
||||
export const validateTasksCancelParams = ajv.compile<TasksCancelParams>(TasksCancelParamsSchema);
|
||||
export const validateConfigGetParams = ajv.compile<ConfigGetParams>(ConfigGetParamsSchema);
|
||||
export const validateConfigSetParams = ajv.compile<ConfigSetParams>(ConfigSetParamsSchema);
|
||||
export const validateConfigApplyParams = ajv.compile<ConfigApplyParams>(ConfigApplyParamsSchema);
|
||||
@@ -804,6 +821,13 @@ export {
|
||||
ArtifactsListParamsSchema,
|
||||
ArtifactsGetParamsSchema,
|
||||
ArtifactsDownloadParamsSchema,
|
||||
TaskSummarySchema,
|
||||
TasksListParamsSchema,
|
||||
TasksListResultSchema,
|
||||
TasksGetParamsSchema,
|
||||
TasksGetResultSchema,
|
||||
TasksCancelParamsSchema,
|
||||
TasksCancelResultSchema,
|
||||
ConfigGetParamsSchema,
|
||||
ConfigSetParamsSchema,
|
||||
ConfigApplyParamsSchema,
|
||||
@@ -1049,6 +1073,13 @@ export type {
|
||||
SessionsDeleteParams,
|
||||
SessionsCompactParams,
|
||||
SessionsUsageParams,
|
||||
TaskSummary,
|
||||
TasksListParams,
|
||||
TasksListResult,
|
||||
TasksGetParams,
|
||||
TasksGetResult,
|
||||
TasksCancelParams,
|
||||
TasksCancelResult,
|
||||
CronJob,
|
||||
CronListParams,
|
||||
CronStatusParams,
|
||||
|
||||
@@ -17,6 +17,7 @@ export * from "./schema/push.js";
|
||||
export * from "./schema/secrets.js";
|
||||
export * from "./schema/sessions.js";
|
||||
export * from "./schema/snapshot.js";
|
||||
export * from "./schema/tasks.js";
|
||||
export * from "./schema/types.js";
|
||||
export * from "./schema/plugin-approvals.js";
|
||||
export * from "./schema/plugins.js";
|
||||
|
||||
@@ -238,6 +238,15 @@ import {
|
||||
SessionsUsageParamsSchema,
|
||||
} from "./sessions.js";
|
||||
import { PresenceEntrySchema, SnapshotSchema, StateVersionSchema } from "./snapshot.js";
|
||||
import {
|
||||
TasksCancelParamsSchema,
|
||||
TasksCancelResultSchema,
|
||||
TasksGetParamsSchema,
|
||||
TasksGetResultSchema,
|
||||
TasksListParamsSchema,
|
||||
TasksListResultSchema,
|
||||
TaskSummarySchema,
|
||||
} from "./tasks.js";
|
||||
import {
|
||||
WizardCancelParamsSchema,
|
||||
WizardNextParamsSchema,
|
||||
@@ -328,6 +337,13 @@ export const ProtocolSchemas = {
|
||||
SessionsDeleteParams: SessionsDeleteParamsSchema,
|
||||
SessionsCompactParams: SessionsCompactParamsSchema,
|
||||
SessionsUsageParams: SessionsUsageParamsSchema,
|
||||
TaskSummary: TaskSummarySchema,
|
||||
TasksListParams: TasksListParamsSchema,
|
||||
TasksListResult: TasksListResultSchema,
|
||||
TasksGetParams: TasksGetParamsSchema,
|
||||
TasksGetResult: TasksGetResultSchema,
|
||||
TasksCancelParams: TasksCancelParamsSchema,
|
||||
TasksCancelResult: TasksCancelResultSchema,
|
||||
ConfigGetParams: ConfigGetParamsSchema,
|
||||
ConfigSetParams: ConfigSetParamsSchema,
|
||||
ConfigApplyParams: ConfigApplyParamsSchema,
|
||||
|
||||
91
src/gateway/protocol/schema/tasks.ts
Normal file
91
src/gateway/protocol/schema/tasks.ts
Normal file
@@ -0,0 +1,91 @@
|
||||
import { Type } from "typebox";
|
||||
import { NonEmptyString } from "./primitives.js";
|
||||
|
||||
export const TaskLedgerStatusSchema = Type.Union([
|
||||
Type.Literal("queued"),
|
||||
Type.Literal("running"),
|
||||
Type.Literal("completed"),
|
||||
Type.Literal("failed"),
|
||||
Type.Literal("cancelled"),
|
||||
Type.Literal("timed_out"),
|
||||
]);
|
||||
|
||||
const TimestampSchema = Type.Union([Type.String(), Type.Integer({ minimum: 0 })]);
|
||||
|
||||
export const TaskSummarySchema = Type.Object(
|
||||
{
|
||||
id: NonEmptyString,
|
||||
kind: Type.Optional(Type.String()),
|
||||
runtime: Type.Optional(Type.String()),
|
||||
status: TaskLedgerStatusSchema,
|
||||
title: Type.Optional(Type.String()),
|
||||
agentId: Type.Optional(Type.String()),
|
||||
sessionKey: Type.Optional(Type.String()),
|
||||
childSessionKey: Type.Optional(Type.String()),
|
||||
ownerKey: Type.Optional(Type.String()),
|
||||
runId: Type.Optional(Type.String()),
|
||||
taskId: Type.Optional(Type.String()),
|
||||
flowId: Type.Optional(Type.String()),
|
||||
parentTaskId: Type.Optional(Type.String()),
|
||||
sourceId: Type.Optional(Type.String()),
|
||||
createdAt: Type.Optional(TimestampSchema),
|
||||
updatedAt: Type.Optional(TimestampSchema),
|
||||
startedAt: Type.Optional(TimestampSchema),
|
||||
endedAt: Type.Optional(TimestampSchema),
|
||||
progressSummary: Type.Optional(Type.String()),
|
||||
terminalSummary: Type.Optional(Type.String()),
|
||||
error: Type.Optional(Type.String()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const TasksListParamsSchema = Type.Object(
|
||||
{
|
||||
status: Type.Optional(Type.Union([TaskLedgerStatusSchema, Type.Array(TaskLedgerStatusSchema)])),
|
||||
agentId: Type.Optional(NonEmptyString),
|
||||
sessionKey: Type.Optional(NonEmptyString),
|
||||
limit: Type.Optional(Type.Integer({ minimum: 1, maximum: 500 })),
|
||||
cursor: Type.Optional(Type.String()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const TasksListResultSchema = Type.Object(
|
||||
{
|
||||
tasks: Type.Array(TaskSummarySchema),
|
||||
nextCursor: Type.Optional(Type.String()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const TasksGetParamsSchema = Type.Object(
|
||||
{
|
||||
taskId: NonEmptyString,
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const TasksGetResultSchema = Type.Object(
|
||||
{
|
||||
task: TaskSummarySchema,
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const TasksCancelParamsSchema = Type.Object(
|
||||
{
|
||||
taskId: NonEmptyString,
|
||||
reason: Type.Optional(Type.String()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const TasksCancelResultSchema = Type.Object(
|
||||
{
|
||||
found: Type.Boolean(),
|
||||
cancelled: Type.Boolean(),
|
||||
reason: Type.Optional(Type.String()),
|
||||
task: Type.Optional(TaskSummarySchema),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
@@ -75,6 +75,13 @@ export type SessionsResetParams = SchemaType<"SessionsResetParams">;
|
||||
export type SessionsDeleteParams = SchemaType<"SessionsDeleteParams">;
|
||||
export type SessionsCompactParams = SchemaType<"SessionsCompactParams">;
|
||||
export type SessionsUsageParams = SchemaType<"SessionsUsageParams">;
|
||||
export type TaskSummary = SchemaType<"TaskSummary">;
|
||||
export type TasksListParams = SchemaType<"TasksListParams">;
|
||||
export type TasksListResult = SchemaType<"TasksListResult">;
|
||||
export type TasksGetParams = SchemaType<"TasksGetParams">;
|
||||
export type TasksGetResult = SchemaType<"TasksGetResult">;
|
||||
export type TasksCancelParams = SchemaType<"TasksCancelParams">;
|
||||
export type TasksCancelResult = SchemaType<"TasksCancelResult">;
|
||||
export type ConfigGetParams = SchemaType<"ConfigGetParams">;
|
||||
export type ConfigSetParams = SchemaType<"ConfigSetParams">;
|
||||
export type ConfigApplyParams = SchemaType<"ConfigApplyParams">;
|
||||
|
||||
@@ -77,6 +77,9 @@ const BASE_METHODS = [
|
||||
"tools.catalog",
|
||||
"tools.effective",
|
||||
"tools.invoke",
|
||||
"tasks.list",
|
||||
"tasks.get",
|
||||
"tasks.cancel",
|
||||
"environments.list",
|
||||
"environments.status",
|
||||
"agents.list",
|
||||
|
||||
@@ -37,6 +37,7 @@ import { sessionsHandlers } from "./server-methods/sessions.js";
|
||||
import { skillsHandlers } from "./server-methods/skills.js";
|
||||
import { systemHandlers } from "./server-methods/system.js";
|
||||
import { talkHandlers } from "./server-methods/talk.js";
|
||||
import { tasksHandlers } from "./server-methods/tasks.js";
|
||||
import { toolsCatalogHandlers } from "./server-methods/tools-catalog.js";
|
||||
import { toolsEffectiveHandlers } from "./server-methods/tools-effective.js";
|
||||
import { toolsInvokeHandlers } from "./server-methods/tools-invoke.js";
|
||||
@@ -107,6 +108,7 @@ export const coreGatewayHandlers: GatewayRequestHandlers = {
|
||||
...configHandlers,
|
||||
...wizardHandlers,
|
||||
...talkHandlers,
|
||||
...tasksHandlers,
|
||||
...toolsCatalogHandlers,
|
||||
...toolsEffectiveHandlers,
|
||||
...toolsInvokeHandlers,
|
||||
|
||||
221
src/gateway/server-methods/tasks.test.ts
Normal file
221
src/gateway/server-methods/tasks.test.ts
Normal file
@@ -0,0 +1,221 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
createTaskRecord,
|
||||
markTaskTerminalById,
|
||||
recordTaskProgressByRunId,
|
||||
resetTaskRegistryForTests,
|
||||
} from "../../tasks/runtime-internal.js";
|
||||
import { tasksHandlers } from "./tasks.js";
|
||||
import type { RespondFn } from "./types.js";
|
||||
|
||||
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
|
||||
|
||||
let stateDir: string;
|
||||
|
||||
beforeEach(async () => {
|
||||
stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gateway-tasks-"));
|
||||
process.env.OPENCLAW_STATE_DIR = stateDir;
|
||||
resetTaskRegistryForTests();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
resetTaskRegistryForTests();
|
||||
if (ORIGINAL_STATE_DIR === undefined) {
|
||||
delete process.env.OPENCLAW_STATE_DIR;
|
||||
} else {
|
||||
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
|
||||
}
|
||||
await fs.rm(stateDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
function captureRespond() {
|
||||
const calls: Parameters<RespondFn>[] = [];
|
||||
const respond: RespondFn = (...args) => {
|
||||
calls.push(args);
|
||||
};
|
||||
return { calls, respond };
|
||||
}
|
||||
|
||||
function createContext() {
|
||||
return {
|
||||
getRuntimeConfig: () => ({}),
|
||||
} as never;
|
||||
}
|
||||
|
||||
describe("tasks gateway handlers", () => {
|
||||
it("lists task summaries with SDK-facing statuses and filters", async () => {
|
||||
const running = createTaskRecord({
|
||||
runtime: "subagent",
|
||||
taskKind: "investigation",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
ownerKey: "agent:main:main",
|
||||
scopeKind: "session",
|
||||
childSessionKey: "agent:worker:subagent:child",
|
||||
agentId: "main",
|
||||
runId: "run-running",
|
||||
task: "Investigate issue",
|
||||
status: "running",
|
||||
deliveryStatus: "pending",
|
||||
});
|
||||
createTaskRecord({
|
||||
runtime: "cli",
|
||||
requesterSessionKey: "agent:other:main",
|
||||
ownerKey: "agent:other:main",
|
||||
scopeKind: "session",
|
||||
runId: "run-other",
|
||||
task: "Other task",
|
||||
status: "running",
|
||||
deliveryStatus: "pending",
|
||||
});
|
||||
|
||||
const { calls, respond } = captureRespond();
|
||||
await tasksHandlers["tasks.list"]({
|
||||
req: { type: "req", id: "req-1", method: "tasks.list" },
|
||||
params: {
|
||||
status: "running",
|
||||
agentId: "main",
|
||||
sessionKey: "agent:main:main",
|
||||
},
|
||||
respond,
|
||||
context: createContext(),
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(calls[0]?.[0]).toBe(true);
|
||||
expect(calls[0]?.[1]).toMatchObject({
|
||||
tasks: [
|
||||
{
|
||||
id: running.taskId,
|
||||
taskId: running.taskId,
|
||||
kind: "investigation",
|
||||
runtime: "subagent",
|
||||
status: "running",
|
||||
title: "Investigate issue",
|
||||
agentId: "main",
|
||||
sessionKey: "agent:main:main",
|
||||
childSessionKey: "agent:worker:subagent:child",
|
||||
runId: "run-running",
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it("gets completed tasks with stable completed status", async () => {
|
||||
const task = createTaskRecord({
|
||||
runtime: "cli",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
ownerKey: "agent:main:main",
|
||||
scopeKind: "session",
|
||||
runId: "run-completed",
|
||||
task: "Done task",
|
||||
status: "succeeded",
|
||||
deliveryStatus: "not_applicable",
|
||||
});
|
||||
|
||||
const { calls, respond } = captureRespond();
|
||||
await tasksHandlers["tasks.get"]({
|
||||
req: { type: "req", id: "req-2", method: "tasks.get" },
|
||||
params: { taskId: task.taskId },
|
||||
respond,
|
||||
context: createContext(),
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(calls[0]?.[0]).toBe(true);
|
||||
expect(calls[0]?.[1]).toMatchObject({
|
||||
task: {
|
||||
id: task.taskId,
|
||||
status: "completed",
|
||||
title: "Done task",
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("sanitizes task text before exposing SDK summaries", async () => {
|
||||
const task = createTaskRecord({
|
||||
runtime: "cli",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
ownerKey: "agent:main:main",
|
||||
scopeKind: "session",
|
||||
runId: "run-sanitized",
|
||||
label:
|
||||
"Compile artifact\nOpenClaw runtime context (internal): Keep internal details private.",
|
||||
task: "Compile artifact",
|
||||
status: "running",
|
||||
deliveryStatus: "pending",
|
||||
});
|
||||
recordTaskProgressByRunId({
|
||||
runId: "run-sanitized",
|
||||
progressSummary:
|
||||
"Bundling output\nOpenClaw runtime context (internal): Keep internal details private.",
|
||||
});
|
||||
markTaskTerminalById({
|
||||
taskId: task.taskId,
|
||||
status: "failed",
|
||||
endedAt: Date.now(),
|
||||
terminalSummary:
|
||||
"Failed after build\nOpenClaw runtime context (internal): Keep internal details private.",
|
||||
error: "Tool failed\nOpenClaw runtime context (internal): Keep internal details private.",
|
||||
});
|
||||
|
||||
const { calls, respond } = captureRespond();
|
||||
await tasksHandlers["tasks.get"]({
|
||||
req: { type: "req", id: "req-sanitized", method: "tasks.get" },
|
||||
params: { taskId: task.taskId },
|
||||
respond,
|
||||
context: createContext(),
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(calls[0]?.[0]).toBe(true);
|
||||
expect(calls[0]?.[1]).toMatchObject({
|
||||
task: {
|
||||
id: task.taskId,
|
||||
title: "Compile artifact",
|
||||
terminalSummary: "Failed after build",
|
||||
error: "Tool failed",
|
||||
},
|
||||
});
|
||||
expect(JSON.stringify(calls[0]?.[1])).not.toContain("OpenClaw runtime context");
|
||||
});
|
||||
|
||||
it("cancels running task records and returns the updated task", async () => {
|
||||
const task = createTaskRecord({
|
||||
runtime: "cli",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
ownerKey: "agent:main:main",
|
||||
scopeKind: "session",
|
||||
runId: "run-cancel",
|
||||
task: "Cancelable task",
|
||||
status: "running",
|
||||
deliveryStatus: "pending",
|
||||
});
|
||||
|
||||
const { calls, respond } = captureRespond();
|
||||
await tasksHandlers["tasks.cancel"]({
|
||||
req: { type: "req", id: "req-3", method: "tasks.cancel" },
|
||||
params: { taskId: task.taskId, reason: "user stopped task" },
|
||||
respond,
|
||||
context: createContext(),
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(calls[0]?.[0]).toBe(true);
|
||||
expect(calls[0]?.[1]).toMatchObject({
|
||||
found: true,
|
||||
cancelled: true,
|
||||
task: {
|
||||
id: task.taskId,
|
||||
status: "cancelled",
|
||||
error: "user stopped task",
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
224
src/gateway/server-methods/tasks.ts
Normal file
224
src/gateway/server-methods/tasks.ts
Normal file
@@ -0,0 +1,224 @@
|
||||
import { parseAgentSessionKey } from "../../routing/session-key.js";
|
||||
import { normalizeOptionalString } from "../../shared/string-coerce.js";
|
||||
import { cancelDetachedTaskRunById } from "../../tasks/detached-task-runtime.js";
|
||||
import { getTaskById, listTaskRecords } from "../../tasks/runtime-internal.js";
|
||||
import type { TaskRecord, TaskStatus } from "../../tasks/task-registry.types.js";
|
||||
import {
|
||||
TASK_STATUS_DETAIL_MAX_CHARS,
|
||||
formatTaskStatusTitle,
|
||||
sanitizeTaskStatusText,
|
||||
} from "../../tasks/task-status.js";
|
||||
import {
|
||||
ErrorCodes,
|
||||
errorShape,
|
||||
formatValidationErrors,
|
||||
type TaskSummary,
|
||||
type TasksListParams,
|
||||
validateTasksCancelParams,
|
||||
validateTasksGetParams,
|
||||
validateTasksListParams,
|
||||
} from "../protocol/index.js";
|
||||
import type { GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
const DEFAULT_TASKS_LIST_LIMIT = 100;
|
||||
const MAX_TASKS_LIST_LIMIT = 500;
|
||||
|
||||
type TaskLedgerStatus = TaskSummary["status"];
|
||||
|
||||
const TASK_STATUS_TO_LEDGER_STATUS: Record<TaskStatus, TaskLedgerStatus> = {
|
||||
queued: "queued",
|
||||
running: "running",
|
||||
succeeded: "completed",
|
||||
failed: "failed",
|
||||
timed_out: "timed_out",
|
||||
cancelled: "cancelled",
|
||||
lost: "failed",
|
||||
};
|
||||
|
||||
const LEDGER_STATUS_TO_TASK_STATUSES: Record<TaskLedgerStatus, TaskStatus[]> = {
|
||||
queued: ["queued"],
|
||||
running: ["running"],
|
||||
completed: ["succeeded"],
|
||||
failed: ["failed", "lost"],
|
||||
timed_out: ["timed_out"],
|
||||
cancelled: ["cancelled"],
|
||||
};
|
||||
|
||||
function taskUpdatedAt(task: TaskRecord): number {
|
||||
return task.lastEventAt ?? task.endedAt ?? task.startedAt ?? task.createdAt;
|
||||
}
|
||||
|
||||
function sanitizeOptionalTaskText(
|
||||
value: unknown,
|
||||
opts?: { errorContext?: boolean },
|
||||
): string | undefined {
|
||||
const sanitized = sanitizeTaskStatusText(value, {
|
||||
errorContext: opts?.errorContext,
|
||||
maxChars: TASK_STATUS_DETAIL_MAX_CHARS,
|
||||
});
|
||||
return sanitized || undefined;
|
||||
}
|
||||
|
||||
function mapTaskSummary(task: TaskRecord): TaskSummary {
|
||||
const progressSummary = sanitizeOptionalTaskText(task.progressSummary);
|
||||
const terminalSummary = sanitizeOptionalTaskText(task.terminalSummary, { errorContext: true });
|
||||
const error = sanitizeOptionalTaskText(task.error, { errorContext: true });
|
||||
return {
|
||||
id: task.taskId,
|
||||
taskId: task.taskId,
|
||||
kind: task.taskKind ?? task.runtime,
|
||||
runtime: task.runtime,
|
||||
status: TASK_STATUS_TO_LEDGER_STATUS[task.status],
|
||||
title: formatTaskStatusTitle(task),
|
||||
...(task.agentId ? { agentId: task.agentId } : {}),
|
||||
sessionKey: task.requesterSessionKey,
|
||||
...(task.childSessionKey ? { childSessionKey: task.childSessionKey } : {}),
|
||||
ownerKey: task.ownerKey,
|
||||
...(task.runId ? { runId: task.runId } : {}),
|
||||
...(task.parentFlowId ? { flowId: task.parentFlowId } : {}),
|
||||
...(task.parentTaskId ? { parentTaskId: task.parentTaskId } : {}),
|
||||
...(task.sourceId ? { sourceId: task.sourceId } : {}),
|
||||
createdAt: task.createdAt,
|
||||
updatedAt: taskUpdatedAt(task),
|
||||
...(task.startedAt !== undefined ? { startedAt: task.startedAt } : {}),
|
||||
...(task.endedAt !== undefined ? { endedAt: task.endedAt } : {}),
|
||||
...(progressSummary ? { progressSummary } : {}),
|
||||
...(terminalSummary ? { terminalSummary } : {}),
|
||||
...(error ? { error } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeTaskStatusFilter(status: TasksListParams["status"]): Set<TaskStatus> | null {
|
||||
if (!status) {
|
||||
return null;
|
||||
}
|
||||
const statuses = Array.isArray(status) ? status : [status];
|
||||
return new Set(statuses.flatMap((value) => LEDGER_STATUS_TO_TASK_STATUSES[value] ?? []));
|
||||
}
|
||||
|
||||
function taskMatchesSession(task: TaskRecord, sessionKey: string | undefined): boolean {
|
||||
const normalized = normalizeOptionalString(sessionKey);
|
||||
if (!normalized) {
|
||||
return true;
|
||||
}
|
||||
return [task.requesterSessionKey, task.childSessionKey, task.ownerKey].some(
|
||||
(candidate) => normalizeOptionalString(candidate) === normalized,
|
||||
);
|
||||
}
|
||||
|
||||
function taskMatchesAgent(task: TaskRecord, agentId: string | undefined): boolean {
|
||||
const normalized = normalizeOptionalString(agentId);
|
||||
if (!normalized) {
|
||||
return true;
|
||||
}
|
||||
if (normalizeOptionalString(task.agentId) === normalized) {
|
||||
return true;
|
||||
}
|
||||
return [task.requesterSessionKey, task.childSessionKey, task.ownerKey].some(
|
||||
(candidate) => parseAgentSessionKey(candidate)?.agentId === normalized,
|
||||
);
|
||||
}
|
||||
|
||||
function parseCursor(cursor: string | undefined): number | null {
|
||||
if (!cursor) {
|
||||
return 0;
|
||||
}
|
||||
if (!/^\d+$/.test(cursor.trim())) {
|
||||
return null;
|
||||
}
|
||||
const parsed = Number(cursor);
|
||||
return Number.isSafeInteger(parsed) ? parsed : null;
|
||||
}
|
||||
|
||||
export const tasksHandlers: GatewayRequestHandlers = {
|
||||
"tasks.list": ({ params, respond }) => {
|
||||
if (!validateTasksListParams(params)) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(
|
||||
ErrorCodes.INVALID_REQUEST,
|
||||
`invalid tasks.list params: ${formatValidationErrors(validateTasksListParams.errors)}`,
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
const cursor = parseCursor(params.cursor);
|
||||
if (cursor === null) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(ErrorCodes.INVALID_REQUEST, "invalid tasks.list cursor"),
|
||||
);
|
||||
return;
|
||||
}
|
||||
const statusFilter = normalizeTaskStatusFilter(params.status);
|
||||
const limit = Math.min(params.limit ?? DEFAULT_TASKS_LIST_LIMIT, MAX_TASKS_LIST_LIMIT);
|
||||
const filtered = listTaskRecords().filter((task) => {
|
||||
if (statusFilter && !statusFilter.has(task.status)) {
|
||||
return false;
|
||||
}
|
||||
return taskMatchesAgent(task, params.agentId) && taskMatchesSession(task, params.sessionKey);
|
||||
});
|
||||
const page = filtered.slice(cursor, cursor + limit);
|
||||
const nextOffset = cursor + page.length;
|
||||
respond(true, {
|
||||
tasks: page.map((task) => mapTaskSummary(task)),
|
||||
...(nextOffset < filtered.length ? { nextCursor: String(nextOffset) } : {}),
|
||||
});
|
||||
},
|
||||
"tasks.get": ({ params, respond }) => {
|
||||
if (!validateTasksGetParams(params)) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(
|
||||
ErrorCodes.INVALID_REQUEST,
|
||||
`invalid tasks.get params: ${formatValidationErrors(validateTasksGetParams.errors)}`,
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
const taskId = params.taskId;
|
||||
const task = getTaskById(taskId);
|
||||
if (!task) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(ErrorCodes.INVALID_REQUEST, `task not found: ${taskId}`),
|
||||
);
|
||||
return;
|
||||
}
|
||||
respond(true, { task: mapTaskSummary(task) });
|
||||
},
|
||||
"tasks.cancel": async ({ params, respond, context }) => {
|
||||
if (!validateTasksCancelParams(params)) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(
|
||||
ErrorCodes.INVALID_REQUEST,
|
||||
`invalid tasks.cancel params: ${formatValidationErrors(validateTasksCancelParams.errors)}`,
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
const taskId = params.taskId;
|
||||
const reason = normalizeOptionalString(params.reason);
|
||||
const result = await cancelDetachedTaskRunById({
|
||||
cfg: context.getRuntimeConfig(),
|
||||
taskId,
|
||||
...(reason ? { reason } : {}),
|
||||
});
|
||||
respond(true, {
|
||||
found: result.found,
|
||||
cancelled: result.cancelled,
|
||||
...(result.reason ? { reason: result.reason } : {}),
|
||||
...(result.task ? { task: mapTaskSummary(result.task) } : {}),
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
export const __test = {
|
||||
mapTaskSummary,
|
||||
};
|
||||
@@ -102,6 +102,7 @@ export type DetachedTaskDeliveryStatusParams = {
|
||||
export type DetachedTaskCancelParams = {
|
||||
cfg: OpenClawConfig;
|
||||
taskId: string;
|
||||
reason?: string;
|
||||
};
|
||||
|
||||
export type DetachedTaskCancelResult = {
|
||||
|
||||
@@ -1858,6 +1858,7 @@ export function linkTaskToFlowById(params: { taskId: string; flowId: string }):
|
||||
export async function cancelTaskById(params: {
|
||||
cfg: OpenClawConfig;
|
||||
taskId: string;
|
||||
reason?: string;
|
||||
}): Promise<{ found: boolean; cancelled: boolean; reason?: string; task?: TaskRecord }> {
|
||||
ensureTaskRegistryReady();
|
||||
const task = tasks.get(params.taskId.trim());
|
||||
@@ -1894,7 +1895,7 @@ export async function cancelTaskById(params: {
|
||||
await getAcpSessionManager().cancelSession({
|
||||
cfg: params.cfg,
|
||||
sessionKey: childSessionKey,
|
||||
reason: "task-cancel",
|
||||
reason: params.reason?.trim() || "task-cancel",
|
||||
});
|
||||
} else if (task.runtime === "subagent") {
|
||||
const { killSubagentRunAdmin } = await loadTaskRegistryControlRuntime();
|
||||
@@ -1923,7 +1924,7 @@ export async function cancelTaskById(params: {
|
||||
status: "cancelled",
|
||||
endedAt: Date.now(),
|
||||
lastEventAt: Date.now(),
|
||||
error: "Cancelled by operator.",
|
||||
error: params.reason?.trim() || "Cancelled by operator.",
|
||||
});
|
||||
if (updated) {
|
||||
void maybeDeliverTaskTerminalUpdate(updated.taskId);
|
||||
|
||||
Reference in New Issue
Block a user