ClawFlow: add runtime substrate (#58336)

Merged via squash.

Prepared head SHA: 6a6158179e
Reviewed-by: @mbelinky
This commit is contained in:
Mariano
2026-03-31 13:58:29 +02:00
committed by GitHub
parent f2d4089ca2
commit 607076d164
20 changed files with 1031 additions and 28 deletions

View File

@@ -156,6 +156,7 @@ Docs: https://docs.openclaw.ai
- Pairing: enforce pending request limits per account instead of per shared channel queue, so one account's outstanding pairing challenges no longer block new pairing on other accounts. Thanks @smaeljaish771 and @vincentkoc.
- Exec approvals: unwrap `caffeinate` and `sandbox-exec` before persisting allow-always trust so later shell payload changes still require a fresh approval. Thanks @tdjackey and @vincentkoc.
- Matrix/DM threads: keep strict unnamed fresh-invite rooms promotable even when Matrix omits the optional direct hint, preserve repair-failed local DM promotions while still revalidating later room metadata, and keep both bound and thread-isolated Matrix sessions reporting the correct route policy. (#58099) Thanks @gumadeiras.
- ClawFlow: add a small flow runtime substrate for authoring layers with persisted wait targets and output bags, plus bundled skills/Lobster examples and richer `flows show` / `doctor` recovery hints for multi-task flow state. (#58336) Thanks @mbelinky.
## 2026.3.28

View File

@@ -45,6 +45,51 @@ ClawFlow sits above that ledger:
For a single detached run, the flow can be a one-task flow. For more structured work, ClawFlow can keep multiple task runs under the same job.
## Runtime substrate
ClawFlow is the runtime substrate, not a workflow language.
It owns:
- the flow id
- the owner session and return context
- waiting state
- small persisted outputs
- finish, fail, cancel, and blocked state
It does **not** own branching or business logic. Put that in the authoring layer that sits above it:
- Lobster
- acpx
- plain TypeScript helpers
- bundled skills
In practice, authoring layers target a small runtime surface:
- `createFlow(...)`
- `runTaskInFlow(...)`
- `setFlowWaiting(...)`
- `setFlowOutput(...)`
- `appendFlowOutput(...)`
- `emitFlowUpdate(...)`
- `resumeFlow(...)`
- `finishFlow(...)`
- `failFlow(...)`
That keeps flow ownership and return-to-thread behavior in core without forcing a single DSL on top of it.
## Authoring pattern
The intended shape is linear:
1. Create one flow for the job.
2. Run one detached task under that flow.
3. Wait for the child task or outside event.
4. Resume the flow in the caller.
5. Spawn the next child task or finish.
ClawFlow persists the minimal state needed to resume that job: the current step, the task it is waiting on, and a small output bag for handoff between steps.
## CLI surface
The flow CLI is intentionally small:
@@ -53,6 +98,8 @@ The flow CLI is intentionally small:
- `openclaw flows show <lookup>` shows one flow and its linked tasks
- `openclaw flows cancel <lookup>` cancels the flow and any active child tasks
`flows show` also surfaces the current wait target and any stored output keys, which is often enough to answer "what is this job waiting on?" without digging into every child task.
The lookup token accepts either a flow id or the owner session key.
## Related

View File

@@ -37,7 +37,7 @@ openclaw flows show <lookup>
openclaw flows show <lookup> --json
```
The output includes the flow status, current step, blocked summary when present, and linked tasks.
The output includes the flow status, current step, wait target, blocked summary when present, stored output keys, and linked tasks.
### `flows cancel`

View File

@@ -10,6 +10,8 @@ read_when:
Lobster is a workflow shell that lets OpenClaw run multi-step tool sequences as a single, deterministic operation with explicit approval checkpoints.
Lobster is one authoring layer above [ClawFlow](/automation/clawflow). Lobster can decide the step logic, but ClawFlow still owns the job identity, owner context, and how detached work returns to the original conversation.
## Hook
Your assistant can build the tools that manage itself. Ask for a workflow, and 30 minutes later you have a CLI plus pipelines that run as one call. Lobster is the missing piece: deterministic pipelines, explicit approvals, and resumable state.

View File

@@ -0,0 +1,62 @@
---
name: clawflow-inbox-triage
description: Example ClawFlow authoring pattern for inbox triage. Use when messages need different treatment based on intent, with some routes notifying immediately, some waiting on outside answers, and others rolling into a later summary.
metadata: { "openclaw": { "emoji": "📥" } }
---
# ClawFlow inbox triage
This is a concrete example of how to think about ClawFlow without turning the core runtime into a DSL.
## Goal
Triage inbox items with one owner flow:
- business → post to Slack and wait for reply
- personal → notify the owner now
- everything else → keep for end-of-day summary
## Pattern
1. Create one flow for the inbox batch.
2. Run one detached task to classify new items.
3. Resume the flow when classification completes.
4. Route each item in the calling logic.
5. Persist only the summary bucket and the current wait target.
## Suggested flow outputs
- `business_threads`
- `personal_items`
- `eod_summary`
## Minimal runtime calls
```ts
const flow = createFlow({
ownerSessionKey,
goal: "triage inbox",
});
runTaskInFlow({
flowId: flow.flowId,
runtime: "acp",
task: "Classify inbox messages",
currentStep: "wait_for_classification",
});
resumeFlow({
flowId: flow.flowId,
currentStep: "route_items",
});
appendFlowOutput({
flowId: flow.flowId,
key: "eod_summary",
value: { subject: "Newsletter", route: "later" },
});
```
## Related example
- `skills/clawflow/examples/inbox-triage.lobster`

76
skills/clawflow/SKILL.md Normal file
View File

@@ -0,0 +1,76 @@
---
name: clawflow
description: Use when work should span one or more detached tasks but still behave like one job with a single owner context. ClawFlow is the runtime substrate under authoring layers like Lobster, acpx, or plain code. Keep conditional logic in the caller; use ClawFlow for flow identity, waiting, outputs, and user-facing emergence.
metadata: { "openclaw": { "emoji": "🪝" } }
---
# ClawFlow
Use ClawFlow when a job needs to outlive one prompt or one detached run, but you still want one owner session, one thread context, and one place to inspect or resume the work.
## When to use it
- Multi-step background work with one owner
- Work that waits on detached ACP or subagent tasks
- Jobs that may need to emit one clear update back to the owner
- Jobs that need a small persisted output bag between steps
## What ClawFlow owns
- flow identity
- owner session and return context
- waiting state
- small persisted outputs
- finish, fail, cancel, and blocked state
It does **not** own branching or business logic. Put that in Lobster, acpx, or the calling code.
## Runtime pattern
1. `createFlow(...)`
2. `runTaskInFlow(...)`
3. `setFlowWaiting(...)` or `setFlowOutput(...)`
4. `resumeFlow(...)`
5. `emitFlowUpdate(...)` only when needed
6. `finishFlow(...)` or `failFlow(...)`
## Example shape
```ts
const flow = createFlow({
ownerSessionKey,
goal: "triage inbox",
});
const classify = runTaskInFlow({
flowId: flow.flowId,
runtime: "acp",
task: "Classify inbox messages",
currentStep: "wait_for_classification",
});
resumeFlow({
flowId: flow.flowId,
currentStep: "route_results",
});
setFlowOutput({
flowId: flow.flowId,
key: "classification",
value: { route: "business" },
});
```
## Keep conditionals above the runtime
Use the flow runtime for state and task linkage. Keep decisions in the authoring layer:
- `business` → post to Slack and wait
- `personal` → notify the owner now
- `later` → append to an end-of-day summary bucket
## Examples
- See `skills/clawflow/examples/inbox-triage.lobster`
- See `skills/clawflow/examples/pr-intake.lobster`
- See `skills/clawflow-inbox-triage/SKILL.md` for a concrete routing pattern

View File

@@ -0,0 +1,33 @@
# Illustrative Lobster authoring example for a ClawFlow-style inbox triage job.
# Swap the placeholder commands for your own tools or scripts.
name: inbox-triage
steps:
- id: fetch
command: gog.gmail.search --query 'newer_than:1d' --max 20
- id: classify
command: >-
openclaw.invoke --tool llm-task --action json --args-json
'{"prompt":"Classify each inbox item as business, personal, or later. Return one JSON object per item with route and summary.","thinking":"low","schema":{"type":"object","properties":{"items":{"type":"array"}},"required":["items"],"additionalProperties":false}}'
stdin: $fetch.stdout
- id: post_business
command: slack-route --bucket business
stdin: $classify.stdout
condition: $classify.json.items[0].route == "business"
- id: wait_for_business_reply
command: echo '{"status":"waiting","reason":"slack_reply"}'
condition: $classify.json.items[0].route == "business"
- id: notify_personal
command: >-
openclaw.invoke --tool message --action send --args-json
'{"provider":"telegram","to":"owner-thread","content":"Personal inbox item needs attention."}'
condition: $classify.json.items[0].route == "personal"
- id: stash_for_eod
command: summary-append --bucket eod
stdin: $classify.stdout
condition: $classify.json.items[0].route == "later"

View File

@@ -0,0 +1,32 @@
# Illustrative Lobster authoring example for a ClawFlow-style PR intake lane.
# Replace the placeholder commands with repo-specific tooling.
name: pr-intake
steps:
- id: fetch
command: gh pr list --repo owner/repo --state open --json number,title,body,headRefName
- id: classify
command: >-
openclaw.invoke --tool llm-task --action json --args-json
'{"prompt":"Classify each PR as close, request_changes, refactor, or maintainer_review. Return intent and recommended next action.","thinking":"low","schema":{"type":"object","properties":{"items":{"type":"array"}},"required":["items"],"additionalProperties":false}}'
stdin: $fetch.stdout
- id: close_low_signal
command: pr-close-low-signal
stdin: $classify.stdout
condition: $classify.json.items[0].nextAction == "close"
- id: request_changes
command: pr-request-changes
stdin: $classify.stdout
condition: $classify.json.items[0].nextAction == "request_changes"
- id: refactor_branch
command: pr-refactor-branch
stdin: $classify.stdout
condition: $classify.json.items[0].nextAction == "refactor"
- id: escalate
command: echo '{"status":"notify","target":"maintainer"}'
condition: $classify.json.items[0].nextAction == "maintainer_review"

View File

@@ -182,6 +182,7 @@ describe("noteWorkspaceStatus", () => {
status: "waiting",
notifyPolicy: "done_only",
goal: "Process PRs",
waitingOnTaskId: "task-wait-missing",
createdAt: 10,
updatedAt: 20,
},
@@ -210,7 +211,9 @@ describe("noteWorkspaceStatus", () => {
const recoveryCalls = noteSpy.mock.calls.filter(([, title]) => title === "ClawFlow recovery");
expect(recoveryCalls).toHaveLength(1);
const body = String(recoveryCalls[0]?.[0]);
expect(body).toContain("flow-orphaned: waiting linear flow has no linked tasks");
expect(body).toContain(
"flow-orphaned: waiting flow points at missing task task-wait-missing",
);
expect(body).toContain("flow-blocked: blocked flow points at missing task task-missing");
expect(body).toContain("openclaw flows show <flow-id>");
expect(body).toContain("openclaw flows cancel <flow-id>");

View File

@@ -12,20 +12,32 @@ function noteFlowRecoveryHints() {
const suspicious = listFlowRecords().flatMap((flow) => {
const tasks = listTasksForFlowId(flow.flowId);
const findings: string[] = [];
const missingWaitingTask =
flow.shape === "linear" &&
flow.status === "waiting" &&
flow.waitingOnTaskId &&
!tasks.some((task) => task.taskId === flow.waitingOnTaskId);
const missingBlockedTask =
flow.status === "blocked" &&
flow.blockedTaskId &&
!tasks.some((task) => task.taskId === flow.blockedTaskId);
if (
flow.shape === "linear" &&
(flow.status === "running" || flow.status === "waiting" || flow.status === "blocked") &&
tasks.length === 0
tasks.length === 0 &&
!missingWaitingTask &&
!missingBlockedTask
) {
findings.push(
`${flow.flowId}: ${flow.status} linear flow has no linked tasks; inspect or cancel it manually.`,
);
}
if (
flow.status === "blocked" &&
flow.blockedTaskId &&
!tasks.some((task) => task.taskId === flow.blockedTaskId)
) {
if (missingWaitingTask) {
findings.push(
`${flow.flowId}: waiting flow points at missing task ${flow.waitingOnTaskId}; inspect or cancel it manually.`,
);
}
if (missingBlockedTask) {
findings.push(
`${flow.flowId}: blocked flow points at missing task ${flow.blockedTaskId}; inspect before retrying.`,
);

View File

@@ -46,6 +46,10 @@ const flowFixture = {
notifyPolicy: "done_only",
goal: "Process related PRs",
currentStep: "wait_for",
waitingOnTaskId: "task-12345678",
outputs: {
bucket: ["personal"],
},
createdAt: Date.parse("2026-03-31T10:00:00.000Z"),
updatedAt: Date.parse("2026-03-31T10:05:00.000Z"),
} as const;
@@ -109,7 +113,7 @@ describe("flows commands", () => {
await flowsListCommand({}, runtime);
expect(runtimeLogs[0]).toContain("Flows: 1");
expect(runtimeLogs[1]).toContain("Flow pressure: 0 active · 0 blocked · 1 total");
expect(runtimeLogs[1]).toContain("Flow pressure: 1 active · 0 blocked · 1 total");
expect(runtimeLogs.join("\n")).toContain("Process related PRs");
expect(runtimeLogs.join("\n")).toContain("1 active/2 total");
});
@@ -122,6 +126,8 @@ describe("flows commands", () => {
expect(runtimeLogs.join("\n")).toContain("shape: linear");
expect(runtimeLogs.join("\n")).toContain("currentStep: wait_for");
expect(runtimeLogs.join("\n")).toContain("waitingOnTaskId: task-12345678");
expect(runtimeLogs.join("\n")).toContain("outputKeys: bucket");
expect(runtimeLogs.join("\n")).toContain("tasks: 2 total · 1 active · 0 issues");
expect(runtimeLogs.join("\n")).toContain("task-12345678 running run-12345678 Review PR");
});

View File

@@ -78,7 +78,7 @@ function formatFlowRows(flows: FlowRecord[], rich: boolean) {
function formatFlowListSummary(flows: FlowRecord[]) {
const active = flows.filter(
(flow) => flow.status === "queued" || flow.status === "running",
(flow) => flow.status === "queued" || flow.status === "running" || flow.status === "waiting",
).length;
const blocked = flows.filter((flow) => flow.status === "blocked").length;
return `${active} active · ${blocked} blocked · ${flows.length} total`;
@@ -167,6 +167,10 @@ export async function flowsShowCommand(
`ownerSessionKey: ${flow.ownerSessionKey}`,
`goal: ${flow.goal}`,
`currentStep: ${flow.currentStep ?? "n/a"}`,
`waitingOnTaskId: ${flow.waitingOnTaskId ?? "n/a"}`,
`outputKeys: ${
flow.outputs ? Object.keys(flow.outputs).toSorted().join(", ") || "n/a" : "n/a"
}`,
`blockedTaskId: ${flow.blockedTaskId ?? "n/a"}`,
`blockedSummary: ${flow.blockedSummary ?? "n/a"}`,
`createdAt: ${new Date(flow.createdAt).toISOString()}`,

View File

@@ -4,7 +4,7 @@ import { requireNodeSqlite } from "../infra/node-sqlite.js";
import type { DeliveryContext } from "../utils/delivery-context.js";
import { resolveFlowRegistryDir, resolveFlowRegistrySqlitePath } from "./flow-registry.paths.js";
import type { FlowRegistryStoreSnapshot } from "./flow-registry.store.js";
import type { FlowRecord, FlowShape } from "./flow-registry.types.js";
import type { FlowOutputBag, FlowRecord, FlowShape } from "./flow-registry.types.js";
type FlowRegistryRow = {
flow_id: string;
@@ -15,6 +15,8 @@ type FlowRegistryRow = {
notify_policy: FlowRecord["notifyPolicy"];
goal: string;
current_step: string | null;
waiting_on_task_id: string | null;
outputs_json: string | null;
blocked_task_id: string | null;
blocked_summary: string | null;
created_at: number | bigint;
@@ -65,6 +67,7 @@ function parseJsonValue<T>(raw: string | null): T | undefined {
function rowToFlowRecord(row: FlowRegistryRow): FlowRecord {
const endedAt = normalizeNumber(row.ended_at);
const requesterOrigin = parseJsonValue<DeliveryContext>(row.requester_origin_json);
const outputs = parseJsonValue<FlowOutputBag>(row.outputs_json);
return {
flowId: row.flow_id,
shape: row.shape === "linear" ? "linear" : "single_task",
@@ -74,6 +77,8 @@ function rowToFlowRecord(row: FlowRegistryRow): FlowRecord {
notifyPolicy: row.notify_policy,
goal: row.goal,
...(row.current_step ? { currentStep: row.current_step } : {}),
...(row.waiting_on_task_id ? { waitingOnTaskId: row.waiting_on_task_id } : {}),
...(outputs ? { outputs } : {}),
...(row.blocked_task_id ? { blockedTaskId: row.blocked_task_id } : {}),
...(row.blocked_summary ? { blockedSummary: row.blocked_summary } : {}),
createdAt: normalizeNumber(row.created_at) ?? 0,
@@ -92,6 +97,8 @@ function bindFlowRecord(record: FlowRecord) {
notify_policy: record.notifyPolicy,
goal: record.goal,
current_step: record.currentStep ?? null,
waiting_on_task_id: record.waitingOnTaskId ?? null,
outputs_json: serializeJson(record.outputs),
blocked_task_id: record.blockedTaskId ?? null,
blocked_summary: record.blockedSummary ?? null,
created_at: record.createdAt,
@@ -112,6 +119,8 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements {
notify_policy,
goal,
current_step,
waiting_on_task_id,
outputs_json,
blocked_task_id,
blocked_summary,
created_at,
@@ -130,6 +139,8 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements {
notify_policy,
goal,
current_step,
waiting_on_task_id,
outputs_json,
blocked_task_id,
blocked_summary,
created_at,
@@ -144,6 +155,8 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements {
@notify_policy,
@goal,
@current_step,
@waiting_on_task_id,
@outputs_json,
@blocked_task_id,
@blocked_summary,
@created_at,
@@ -158,6 +171,8 @@ function createStatements(db: DatabaseSync): FlowRegistryStatements {
notify_policy = excluded.notify_policy,
goal = excluded.goal,
current_step = excluded.current_step,
waiting_on_task_id = excluded.waiting_on_task_id,
outputs_json = excluded.outputs_json,
blocked_task_id = excluded.blocked_task_id,
blocked_summary = excluded.blocked_summary,
created_at = excluded.created_at,
@@ -180,6 +195,8 @@ function ensureSchema(db: DatabaseSync) {
notify_policy TEXT NOT NULL,
goal TEXT NOT NULL,
current_step TEXT,
waiting_on_task_id TEXT,
outputs_json TEXT,
blocked_task_id TEXT,
blocked_summary TEXT,
created_at INTEGER NOT NULL,
@@ -188,6 +205,8 @@ function ensureSchema(db: DatabaseSync) {
);
`);
ensureColumn(db, "flow_runs", "shape", "TEXT");
ensureColumn(db, "flow_runs", "waiting_on_task_id", "TEXT");
ensureColumn(db, "flow_runs", "outputs_json", "TEXT");
ensureColumn(db, "flow_runs", "blocked_task_id", "TEXT");
ensureColumn(db, "flow_runs", "blocked_summary", "TEXT");
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_status ON flow_runs(status);`);

View File

@@ -15,6 +15,10 @@ function createStoredFlow(): FlowRecord {
notifyPolicy: "done_only",
goal: "Restored flow",
currentStep: "spawn_task",
waitingOnTaskId: "task-waiting",
outputs: {
bucket: ["business"],
},
blockedTaskId: "task-restored",
blockedSummary: "Writable session required.",
createdAt: 100,
@@ -64,6 +68,10 @@ describe("flow-registry store runtime", () => {
flowId: "flow-restored",
shape: "linear",
goal: "Restored flow",
waitingOnTaskId: "task-waiting",
outputs: {
bucket: ["business"],
},
blockedTaskId: "task-restored",
blockedSummary: "Writable session required.",
});
@@ -94,6 +102,10 @@ describe("flow-registry store runtime", () => {
goal: "Persisted flow",
status: "waiting",
currentStep: "ask_user",
waitingOnTaskId: "task-restored",
outputs: {
bucket: ["personal"],
},
});
resetFlowRegistryForTests({ persist: false });
@@ -103,6 +115,10 @@ describe("flow-registry store runtime", () => {
shape: "linear",
status: "waiting",
currentStep: "ask_user",
waitingOnTaskId: "task-restored",
outputs: {
bucket: ["personal"],
},
});
});
});

View File

@@ -61,11 +61,19 @@ describe("flow-registry", () => {
const updated = updateFlowRecordById(created.flowId, {
status: "waiting",
currentStep: "ask_user",
waitingOnTaskId: "task-123",
outputs: {
bucket: ["personal"],
},
});
expect(updated).toMatchObject({
flowId: created.flowId,
status: "waiting",
currentStep: "ask_user",
waitingOnTaskId: "task-123",
outputs: {
bucket: ["personal"],
},
});
expect(listFlowRecords()).toEqual([

View File

@@ -1,15 +1,23 @@
import crypto from "node:crypto";
import { getFlowRegistryStore, resetFlowRegistryRuntimeForTests } from "./flow-registry.store.js";
import type { FlowRecord, FlowShape, FlowStatus } from "./flow-registry.types.js";
import type { FlowOutputBag, FlowRecord, FlowShape, FlowStatus } from "./flow-registry.types.js";
import type { TaskNotifyPolicy, TaskRecord } from "./task-registry.types.js";
const flows = new Map<string, FlowRecord>();
let restoreAttempted = false;
function cloneFlowOutputs(outputs: FlowOutputBag | undefined): FlowOutputBag | undefined {
if (!outputs) {
return undefined;
}
return JSON.parse(JSON.stringify(outputs)) as FlowOutputBag;
}
function cloneFlowRecord(record: FlowRecord): FlowRecord {
return {
...record,
...(record.requesterOrigin ? { requesterOrigin: { ...record.requesterOrigin } } : {}),
...(record.outputs ? { outputs: cloneFlowOutputs(record.outputs) } : {}),
};
}
@@ -38,25 +46,16 @@ function resolveFlowBlockedSummary(
return task.terminalSummary?.trim() || task.progressSummary?.trim() || undefined;
}
type FlowRecordPatch = Partial<
Omit<
Pick<
FlowRecord,
| "status"
| "notifyPolicy"
| "goal"
| "currentStep"
| "blockedTaskId"
| "blockedSummary"
| "updatedAt"
| "endedAt"
>,
"currentStep" | "blockedTaskId" | "blockedSummary" | "endedAt"
>
> & {
type FlowRecordPatch = {
status?: FlowStatus;
notifyPolicy?: TaskNotifyPolicy;
goal?: string;
currentStep?: string | null;
waitingOnTaskId?: string | null;
outputs?: FlowOutputBag | null;
blockedTaskId?: string | null;
blockedSummary?: string | null;
updatedAt?: number;
endedAt?: number | null;
};
@@ -125,6 +124,8 @@ export function createFlowRecord(params: {
notifyPolicy?: TaskNotifyPolicy;
goal: string;
currentStep?: string;
waitingOnTaskId?: string;
outputs?: FlowOutputBag;
blockedTaskId?: string;
blockedSummary?: string;
createdAt?: number;
@@ -142,6 +143,8 @@ export function createFlowRecord(params: {
notifyPolicy: ensureNotifyPolicy(params.notifyPolicy),
goal: params.goal,
currentStep: params.currentStep?.trim() || undefined,
waitingOnTaskId: params.waitingOnTaskId?.trim() || undefined,
outputs: cloneFlowOutputs(params.outputs),
blockedTaskId: params.blockedTaskId?.trim() || undefined,
blockedSummary: params.blockedSummary?.trim() || undefined,
createdAt: now,
@@ -212,6 +215,14 @@ export function updateFlowRecordById(flowId: string, patch: FlowRecordPatch): Fl
patch.currentStep === undefined
? current.currentStep
: patch.currentStep?.trim() || undefined,
waitingOnTaskId:
patch.waitingOnTaskId === undefined
? current.waitingOnTaskId
: patch.waitingOnTaskId?.trim() || undefined,
outputs:
patch.outputs === undefined
? cloneFlowOutputs(current.outputs)
: (cloneFlowOutputs(patch.outputs ?? undefined) ?? undefined),
blockedTaskId:
patch.blockedTaskId === undefined
? current.blockedTaskId

View File

@@ -3,6 +3,16 @@ import type { TaskNotifyPolicy } from "./task-registry.types.js";
export type FlowShape = "single_task" | "linear";
export type FlowOutputValue =
| null
| boolean
| number
| string
| FlowOutputValue[]
| { [key: string]: FlowOutputValue };
export type FlowOutputBag = Record<string, FlowOutputValue>;
export type FlowStatus =
| "queued"
| "running"
@@ -22,6 +32,8 @@ export type FlowRecord = {
notifyPolicy: TaskNotifyPolicy;
goal: string;
currentStep?: string;
waitingOnTaskId?: string;
outputs?: FlowOutputBag;
blockedTaskId?: string;
blockedSummary?: string;
createdAt: number;

View File

@@ -0,0 +1,281 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import { getFlowById, resetFlowRegistryForTests, updateFlowRecordById } from "./flow-registry.js";
import {
appendFlowOutput,
createFlow,
emitFlowUpdate,
failFlow,
finishFlow,
resumeFlow,
runTaskInFlow,
setFlowOutput,
} from "./flow-runtime.js";
import { listTasksForFlowId, resetTaskRegistryForTests } from "./task-registry.js";
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
const mocks = vi.hoisted(() => ({
sendMessageMock: vi.fn(),
enqueueSystemEventMock: vi.fn(),
requestHeartbeatNowMock: vi.fn(),
}));
vi.mock("./task-registry-delivery-runtime.js", () => ({
sendMessage: (...args: unknown[]) => mocks.sendMessageMock(...args),
}));
vi.mock("../infra/system-events.js", () => ({
enqueueSystemEvent: (...args: unknown[]) => mocks.enqueueSystemEventMock(...args),
}));
vi.mock("../infra/heartbeat-wake.js", () => ({
requestHeartbeatNow: (...args: unknown[]) => mocks.requestHeartbeatNowMock(...args),
}));
vi.mock("../infra/agent-events.js", () => ({
onAgentEvent: () => () => {},
}));
vi.mock("../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => ({
cancelSession: vi.fn(),
}),
}));
vi.mock("../agents/subagent-control.js", () => ({
killSubagentRunAdmin: vi.fn(),
}));
async function withFlowRuntimeStateDir(run: (root: string) => Promise<void>): Promise<void> {
await withTempDir({ prefix: "openclaw-flow-runtime-" }, async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskRegistryForTests();
resetFlowRegistryForTests();
try {
await run(root);
} finally {
resetTaskRegistryForTests();
resetFlowRegistryForTests();
}
});
}
describe("flow-runtime", () => {
afterEach(() => {
if (ORIGINAL_STATE_DIR === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = ORIGINAL_STATE_DIR;
}
resetTaskRegistryForTests();
resetFlowRegistryForTests();
mocks.sendMessageMock.mockReset();
mocks.enqueueSystemEventMock.mockReset();
mocks.requestHeartbeatNowMock.mockReset();
});
it("runs a child task under a linear flow and marks the flow as waiting on it", async () => {
await withFlowRuntimeStateDir(async () => {
const flow = createFlow({
ownerSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
},
goal: "Triage inbox",
});
const started = runTaskInFlow({
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:codex:acp:child",
runId: "run-flow-runtime-1",
task: "Classify inbox messages",
currentStep: "wait_for_classification",
});
expect(started.task).toMatchObject({
requesterSessionKey: "agent:main:main",
parentFlowId: flow.flowId,
childSessionKey: "agent:codex:acp:child",
runId: "run-flow-runtime-1",
status: "queued",
});
expect(started.flow).toMatchObject({
flowId: flow.flowId,
status: "waiting",
currentStep: "wait_for_classification",
waitingOnTaskId: started.task.taskId,
});
expect(listTasksForFlowId(flow.flowId)).toHaveLength(1);
});
});
it("stores outputs and waiting metadata across sqlite restore", async () => {
await withFlowRuntimeStateDir(async () => {
const flow = createFlow({
ownerSessionKey: "agent:main:main",
goal: "Inbox routing",
});
const started = runTaskInFlow({
flowId: flow.flowId,
runtime: "subagent",
childSessionKey: "agent:codex:subagent:child",
runId: "run-flow-runtime-restore",
task: "Bucket messages",
});
setFlowOutput({
flowId: flow.flowId,
key: "classification",
value: {
business: 1,
personal: 2,
},
});
appendFlowOutput({
flowId: flow.flowId,
key: "eod_summary",
value: {
subject: "Newsletter",
},
});
resetTaskRegistryForTests({ persist: false });
resetFlowRegistryForTests({ persist: false });
expect(getFlowById(flow.flowId)).toMatchObject({
flowId: flow.flowId,
status: "waiting",
waitingOnTaskId: started.task.taskId,
outputs: {
classification: {
business: 1,
personal: 2,
},
eod_summary: [
{
subject: "Newsletter",
},
],
},
});
});
});
it("reopens a blocked flow with resume and marks terminal states with finish/fail", async () => {
await withFlowRuntimeStateDir(async () => {
const flow = createFlow({
ownerSessionKey: "agent:main:main",
goal: "Review inbox",
});
const started = runTaskInFlow({
flowId: flow.flowId,
runtime: "acp",
childSessionKey: "agent:codex:acp:child",
runId: "run-flow-runtime-reopen",
task: "Review inbox",
});
updateFlowRecordById(flow.flowId, {
status: "blocked",
blockedTaskId: started.task.taskId,
blockedSummary: "Need auth.",
endedAt: 120,
});
expect(resumeFlow({ flowId: flow.flowId, currentStep: "retry_auth" })).toMatchObject({
flowId: flow.flowId,
status: "running",
currentStep: "retry_auth",
});
expect(getFlowById(flow.flowId)?.blockedTaskId).toBeUndefined();
expect(getFlowById(flow.flowId)?.waitingOnTaskId).toBeUndefined();
expect(getFlowById(flow.flowId)?.endedAt).toBeUndefined();
expect(
finishFlow({ flowId: flow.flowId, currentStep: "finish", endedAt: 200 }),
).toMatchObject({
flowId: flow.flowId,
status: "succeeded",
currentStep: "finish",
endedAt: 200,
});
const failed = createFlow({
ownerSessionKey: "agent:main:main",
goal: "Failing flow",
});
expect(failFlow({ flowId: failed.flowId, currentStep: "abort", endedAt: 300 })).toMatchObject(
{
flowId: failed.flowId,
status: "failed",
currentStep: "abort",
endedAt: 300,
},
);
});
});
it("delivers explicit flow updates through the flow owner context when possible", async () => {
await withFlowRuntimeStateDir(async () => {
const flow = createFlow({
ownerSessionKey: "agent:main:main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
threadId: "42",
},
goal: "Inbox routing",
});
const result = await emitFlowUpdate({
flowId: flow.flowId,
content: "Personal message needs your attention.",
eventKey: "personal-alert",
});
expect(result.delivery).toBe("direct");
expect(mocks.sendMessageMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "telegram:123",
threadId: "42",
content: "Personal message needs your attention.",
idempotencyKey: `flow:${flow.flowId}:update:personal-alert`,
mirror: expect.objectContaining({
sessionKey: "agent:main:main",
}),
}),
);
});
});
it("falls back to session-queued flow updates when direct delivery is unavailable", async () => {
await withFlowRuntimeStateDir(async () => {
const flow = createFlow({
ownerSessionKey: "agent:main:main",
goal: "Inbox routing",
});
const result = await emitFlowUpdate({
flowId: flow.flowId,
content: "Business email sent to Slack and waiting for reply.",
});
expect(result.delivery).toBe("session_queued");
expect(mocks.enqueueSystemEventMock).toHaveBeenCalledWith(
"Business email sent to Slack and waiting for reply.",
expect.objectContaining({
sessionKey: "agent:main:main",
contextKey: `flow:${flow.flowId}`,
}),
);
expect(mocks.requestHeartbeatNowMock).toHaveBeenCalledWith({
reason: "clawflow-update",
sessionKey: "agent:main:main",
});
});
});
});

377
src/tasks/flow-runtime.ts Normal file
View File

@@ -0,0 +1,377 @@
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
import { createFlowRecord, getFlowById, updateFlowRecordById } from "./flow-registry.js";
import type { FlowOutputBag, FlowOutputValue, FlowRecord } from "./flow-registry.types.js";
import { createQueuedTaskRun, createRunningTaskRun } from "./task-executor.js";
import { listTasksForFlowId } from "./task-registry.js";
import type {
TaskDeliveryStatus,
TaskNotifyPolicy,
TaskRecord,
TaskRuntime,
} from "./task-registry.types.js";
let deliveryRuntimePromise: Promise<typeof import("./task-registry-delivery-runtime.js")> | null =
null;
type FlowTaskLaunch = "queued" | "running";
export type FlowUpdateDelivery = "direct" | "session_queued" | "parent_missing" | "failed";
function loadFlowDeliveryRuntime() {
deliveryRuntimePromise ??= import("./task-registry-delivery-runtime.js");
return deliveryRuntimePromise;
}
function requireFlow(flowId: string): FlowRecord {
const flow = getFlowById(flowId);
if (!flow) {
throw new Error(`Flow not found: ${flowId}`);
}
return flow;
}
function requireLinearFlow(flowId: string): FlowRecord {
const flow = requireFlow(flowId);
if (flow.shape !== "linear") {
throw new Error(`Flow is not linear: ${flowId}`);
}
return flow;
}
function cloneOutputValue<T extends FlowOutputValue>(value: T): T {
return JSON.parse(JSON.stringify(value)) as T;
}
function updateRequiredFlow(
flowId: string,
patch: Parameters<typeof updateFlowRecordById>[1],
): FlowRecord {
const updated = updateFlowRecordById(flowId, patch);
if (!updated) {
throw new Error(`Flow not found: ${flowId}`);
}
return updated;
}
function resolveFlowOutputs(flow: FlowRecord): FlowOutputBag {
return flow.outputs ? cloneOutputValue(flow.outputs) : {};
}
function canDeliverFlowToRequesterOrigin(flow: FlowRecord): boolean {
const channel = flow.requesterOrigin?.channel?.trim();
const to = flow.requesterOrigin?.to?.trim();
return Boolean(channel && to && isDeliverableMessageChannel(channel));
}
export function createFlow(params: {
ownerSessionKey: string;
requesterOrigin?: FlowRecord["requesterOrigin"];
goal: string;
notifyPolicy?: TaskNotifyPolicy;
currentStep?: string;
createdAt?: number;
updatedAt?: number;
}): FlowRecord {
return createFlowRecord({
shape: "linear",
ownerSessionKey: params.ownerSessionKey,
requesterOrigin: params.requesterOrigin,
goal: params.goal,
notifyPolicy: params.notifyPolicy,
currentStep: params.currentStep,
status: "queued",
createdAt: params.createdAt,
updatedAt: params.updatedAt,
});
}
export function runTaskInFlow(params: {
flowId: string;
runtime: TaskRuntime;
sourceId?: string;
childSessionKey?: string;
parentTaskId?: string;
agentId?: string;
runId?: string;
label?: string;
task: string;
preferMetadata?: boolean;
notifyPolicy?: TaskNotifyPolicy;
deliveryStatus?: TaskDeliveryStatus;
launch?: FlowTaskLaunch;
startedAt?: number;
lastEventAt?: number;
progressSummary?: string | null;
currentStep?: string;
}): { flow: FlowRecord; task: TaskRecord } {
const flow = requireLinearFlow(params.flowId);
const launch = params.launch ?? "queued";
const task =
launch === "running"
? createRunningTaskRun({
runtime: params.runtime,
sourceId: params.sourceId,
requesterSessionKey: flow.ownerSessionKey,
requesterOrigin: flow.requesterOrigin,
parentFlowId: flow.flowId,
childSessionKey: params.childSessionKey,
parentTaskId: params.parentTaskId,
agentId: params.agentId,
runId: params.runId,
label: params.label,
task: params.task,
preferMetadata: params.preferMetadata,
notifyPolicy: params.notifyPolicy ?? flow.notifyPolicy,
deliveryStatus: params.deliveryStatus,
startedAt: params.startedAt,
lastEventAt: params.lastEventAt,
progressSummary: params.progressSummary,
})
: createQueuedTaskRun({
runtime: params.runtime,
sourceId: params.sourceId,
requesterSessionKey: flow.ownerSessionKey,
requesterOrigin: flow.requesterOrigin,
parentFlowId: flow.flowId,
childSessionKey: params.childSessionKey,
parentTaskId: params.parentTaskId,
agentId: params.agentId,
runId: params.runId,
label: params.label,
task: params.task,
preferMetadata: params.preferMetadata,
notifyPolicy: params.notifyPolicy ?? flow.notifyPolicy,
deliveryStatus: params.deliveryStatus,
});
return {
task,
flow: updateRequiredFlow(flow.flowId, {
status: "waiting",
currentStep: params.currentStep ?? flow.currentStep ?? "wait_for_task",
waitingOnTaskId: task.taskId,
blockedTaskId: null,
blockedSummary: null,
endedAt: null,
updatedAt: task.lastEventAt ?? task.startedAt ?? Date.now(),
}),
};
}
export function setFlowWaiting(params: {
flowId: string;
currentStep?: string | null;
waitingOnTaskId?: string | null;
updatedAt?: number;
}): FlowRecord {
const flow = requireLinearFlow(params.flowId);
if (params.waitingOnTaskId?.trim()) {
const waitingOnTaskId = params.waitingOnTaskId.trim();
const linkedTaskIds = new Set(listTasksForFlowId(flow.flowId).map((task) => task.taskId));
if (!linkedTaskIds.has(waitingOnTaskId)) {
throw new Error(`Flow ${flow.flowId} is not linked to task ${waitingOnTaskId}`);
}
}
return updateRequiredFlow(flow.flowId, {
status: "waiting",
currentStep: params.currentStep,
waitingOnTaskId: params.waitingOnTaskId,
endedAt: null,
updatedAt: params.updatedAt ?? Date.now(),
});
}
export function setFlowOutput(params: {
flowId: string;
key: string;
value: FlowOutputValue;
updatedAt?: number;
}): FlowRecord {
const flow = requireLinearFlow(params.flowId);
const key = params.key.trim();
if (!key) {
throw new Error("Flow output key is required.");
}
const outputs = resolveFlowOutputs(flow);
outputs[key] = cloneOutputValue(params.value);
return updateRequiredFlow(flow.flowId, {
outputs,
updatedAt: params.updatedAt ?? Date.now(),
});
}
export function appendFlowOutput(params: {
flowId: string;
key: string;
value: FlowOutputValue;
updatedAt?: number;
}): FlowRecord {
const flow = requireLinearFlow(params.flowId);
const key = params.key.trim();
if (!key) {
throw new Error("Flow output key is required.");
}
const outputs = resolveFlowOutputs(flow);
const nextValue = cloneOutputValue(params.value);
const current = outputs[key];
if (current === undefined) {
outputs[key] = [nextValue];
} else if (Array.isArray(current)) {
outputs[key] = [...current, nextValue];
} else {
throw new Error(`Flow output ${key} is not an array.`);
}
return updateRequiredFlow(flow.flowId, {
outputs,
updatedAt: params.updatedAt ?? Date.now(),
});
}
export function resumeFlow(params: {
flowId: string;
currentStep?: string | null;
updatedAt?: number;
}): FlowRecord {
const flow = requireLinearFlow(params.flowId);
return updateRequiredFlow(flow.flowId, {
status: "running",
currentStep: params.currentStep,
waitingOnTaskId: null,
blockedTaskId: null,
blockedSummary: null,
endedAt: null,
updatedAt: params.updatedAt ?? Date.now(),
});
}
export function finishFlow(params: {
flowId: string;
currentStep?: string | null;
updatedAt?: number;
endedAt?: number;
}): FlowRecord {
const flow = requireLinearFlow(params.flowId);
const endedAt = params.endedAt ?? params.updatedAt ?? Date.now();
return updateRequiredFlow(flow.flowId, {
status: "succeeded",
currentStep: params.currentStep,
waitingOnTaskId: null,
blockedTaskId: null,
blockedSummary: null,
updatedAt: params.updatedAt ?? endedAt,
endedAt,
});
}
export function failFlow(params: {
flowId: string;
currentStep?: string | null;
updatedAt?: number;
endedAt?: number;
}): FlowRecord {
const flow = requireLinearFlow(params.flowId);
const endedAt = params.endedAt ?? params.updatedAt ?? Date.now();
return updateRequiredFlow(flow.flowId, {
status: "failed",
currentStep: params.currentStep,
waitingOnTaskId: null,
blockedTaskId: null,
blockedSummary: null,
updatedAt: params.updatedAt ?? endedAt,
endedAt,
});
}
export async function emitFlowUpdate(params: {
flowId: string;
content: string;
eventKey?: string;
currentStep?: string | null;
updatedAt?: number;
}): Promise<{ flow: FlowRecord; delivery: FlowUpdateDelivery }> {
const flow = requireFlow(params.flowId);
const content = params.content.trim();
if (!content) {
throw new Error("Flow update content is required.");
}
const ownerSessionKey = flow.ownerSessionKey.trim();
const updatedAt = params.updatedAt ?? Date.now();
const updatedFlow = updateRequiredFlow(flow.flowId, {
currentStep: params.currentStep,
updatedAt,
});
if (!ownerSessionKey) {
return {
flow: updatedFlow,
delivery: "parent_missing",
};
}
if (!canDeliverFlowToRequesterOrigin(updatedFlow)) {
try {
enqueueSystemEvent(content, {
sessionKey: ownerSessionKey,
contextKey: `flow:${updatedFlow.flowId}`,
deliveryContext: updatedFlow.requesterOrigin,
});
requestHeartbeatNow({
reason: "clawflow-update",
sessionKey: ownerSessionKey,
});
return {
flow: updatedFlow,
delivery: "session_queued",
};
} catch {
return {
flow: updatedFlow,
delivery: "failed",
};
}
}
try {
const requesterAgentId = parseAgentSessionKey(ownerSessionKey)?.agentId;
const idempotencyKey = `flow:${updatedFlow.flowId}:update:${params.eventKey?.trim() || updatedAt}`;
const { sendMessage } = await loadFlowDeliveryRuntime();
await sendMessage({
channel: updatedFlow.requesterOrigin?.channel,
to: updatedFlow.requesterOrigin?.to ?? "",
accountId: updatedFlow.requesterOrigin?.accountId,
threadId: updatedFlow.requesterOrigin?.threadId,
content,
agentId: requesterAgentId,
idempotencyKey,
mirror: {
sessionKey: ownerSessionKey,
agentId: requesterAgentId,
idempotencyKey,
},
});
return {
flow: updatedFlow,
delivery: "direct",
};
} catch {
try {
enqueueSystemEvent(content, {
sessionKey: ownerSessionKey,
contextKey: `flow:${updatedFlow.flowId}`,
deliveryContext: updatedFlow.requesterOrigin,
});
requestHeartbeatNow({
reason: "clawflow-update",
sessionKey: ownerSessionKey,
});
return {
flow: updatedFlow,
delivery: "session_queued",
};
} catch {
return {
flow: updatedFlow,
delivery: "failed",
};
}
}
}

View File

@@ -12,6 +12,7 @@ const ALLOWED_IMPORTERS = new Set([
"commands/doctor-workspace-status.ts",
"commands/flows.ts",
"commands/tasks.ts",
"tasks/flow-runtime.ts",
"tasks/task-executor.ts",
"tasks/task-registry.maintenance.ts",
]);