From f252afbdf676e9eab44bfc966c906141ff53ba50 Mon Sep 17 00:00:00 2001 From: Simon Klee Date: Mon, 11 May 2026 15:11:07 +0200 Subject: [PATCH] run: replay session history on interactive resume When resuming an interactive session with `run -i -s ` rebuild the visible session view from persisted message history during bootstrap, feeding synthetic events through the live reducer so future deltas continue from correct offsets without duplicates. Add --replay-limit N to cap bootstrap replay to the newest N messages for large sessions. --- packages/opencode/src/cli/cmd/run.ts | 17 + packages/opencode/src/cli/cmd/run/runtime.ts | 5 + .../src/cli/cmd/run/session-replay.ts | 188 ++++++++ .../src/cli/cmd/run/session.shared.ts | 4 +- .../src/cli/cmd/run/stream.transport.ts | 186 ++++++-- .../opencode/src/cli/cmd/run/subagent-data.ts | 21 +- packages/opencode/src/cli/cmd/run/types.ts | 1 + .../test/cli/run/scrollback.surface.test.ts | 44 ++ .../test/cli/run/session-replay.test.ts | 156 ++++++ .../test/cli/run/stream.transport.test.ts | 447 +++++++++++++++++- 10 files changed, 997 insertions(+), 72 deletions(-) create mode 100644 packages/opencode/src/cli/cmd/run/session-replay.ts create mode 100644 packages/opencode/test/cli/run/session-replay.test.ts diff --git a/packages/opencode/src/cli/cmd/run.ts b/packages/opencode/src/cli/cmd/run.ts index bca89c3cab..01e8dcdde5 100644 --- a/packages/opencode/src/cli/cmd/run.ts +++ b/packages/opencode/src/cli/cmd/run.ts @@ -212,6 +212,10 @@ export const RunCommand = effectCmd({ type: "boolean", describe: "show thinking blocks", }) + .option("replay-limit", { + type: "number", + describe: "cap interactive replay bootstrap to the newest N messages", + }) .option("interactive", { alias: ["i"], type: "boolean", @@ -261,6 +265,17 @@ export const RunCommand = effectCmd({ die("--interactive cannot be used with --format json") } + if (args["replay-limit"] !== undefined && !args.interactive) { + die("--replay-limit requires --interactive") + } + + if ( + args["replay-limit"] !== undefined && + (!Number.isInteger(args["replay-limit"]) || args["replay-limit"] <= 0) + ) { + die("--replay-limit must be a positive integer") + } + if (args.interactive && !process.stdout.isTTY) { die("--interactive requires a TTY stdout") } @@ -767,6 +782,7 @@ export const RunCommand = effectCmd({ sessionID, sessionTitle: sess.title, resume: Boolean(args.session || args.continue) && !args.fork, + replayLimit: args["replay-limit"], agent, model, variant: args.variant, @@ -802,6 +818,7 @@ export const RunCommand = effectCmd({ agent: args.agent, model, variant: args.variant, + replayLimit: args["replay-limit"], files, initialInput, thinking, diff --git a/packages/opencode/src/cli/cmd/run/runtime.ts b/packages/opencode/src/cli/cmd/run/runtime.ts index 882ff2e6c7..236417b8ab 100644 --- a/packages/opencode/src/cli/cmd/run/runtime.ts +++ b/packages/opencode/src/cli/cmd/run/runtime.ts @@ -51,6 +51,7 @@ type RunRuntimeInput = { files: RunInput["files"] initialInput?: string thinking: boolean + replayLimit?: number demo?: RunInput["demo"] } @@ -67,6 +68,7 @@ type RunLocalInput = { files: RunInput["files"] initialInput?: string thinking: boolean + replayLimit?: number demo?: RunInput["demo"] } @@ -490,6 +492,7 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise { directory: ctx.directory, sessionID: state.sessionID, thinking: input.thinking, + replayLimit: input.replayLimit, limits: () => state.limits, footer, trace: log, @@ -722,6 +725,7 @@ export async function runInteractiveLocalMode(input: RunLocalInput): Promise { if (session) { @@ -774,6 +778,7 @@ export async function runInteractiveMode(input: RunInput & { createSession?: Cre files: input.files, initialInput: input.initialInput, thinking: input.thinking, + replayLimit: input.replayLimit, demo: input.demo, boot: async () => ({ sdk: input.sdk, diff --git a/packages/opencode/src/cli/cmd/run/session-replay.ts b/packages/opencode/src/cli/cmd/run/session-replay.ts new file mode 100644 index 0000000000..f43bff5bed --- /dev/null +++ b/packages/opencode/src/cli/cmd/run/session-replay.ts @@ -0,0 +1,188 @@ +import type { Event, PermissionRequest, QuestionRequest } from "@opencode-ai/sdk/v2" +import { bootstrapSessionData, createSessionData, reduceSessionData, type SessionData } from "./session-data" +import { messagePrompt, type SessionMessages } from "./session.shared" +import type { FooterPatch, StreamCommit } from "./types" + +type ReplayInput = { + messages: SessionMessages + permissions: PermissionRequest[] + questions: QuestionRequest[] + thinking: boolean + limits: Record +} + +export type SessionReplay = { + data: SessionData + commits: StreamCommit[] + patch?: FooterPatch +} + +type ReplayMessage = { + commits: StreamCommit[] + patch?: FooterPatch +} + +function apply(data: SessionData, event: Event, sessionID: string, thinking: boolean, limits: Record) { + return reduceSessionData({ + data, + event, + sessionID, + thinking, + limits, + }) +} + +function mergePatch(left: FooterPatch | undefined, right: FooterPatch | undefined) { + if (!left) { + return right + } + + if (!right) { + return left + } + + return { + ...left, + ...right, + } +} + +function active(data: SessionData) { + return data.part.size > 0 || data.tools.size > 0 +} + +function replayPatch(data: SessionData, patch: FooterPatch | undefined) { + if (active(data)) { + if (!patch) { + return { + phase: "running", + } satisfies FooterPatch + } + + return { + ...patch, + phase: "running", + } satisfies FooterPatch + } + + if (data.permissions.length > 0 || data.questions.length > 0) { + if (!patch) { + return { + phase: "idle", + } satisfies FooterPatch + } + + return { + ...patch, + phase: "idle", + } satisfies FooterPatch + } + + if (!patch) { + return undefined + } + + return { + ...patch, + phase: "idle", + status: "", + } satisfies FooterPatch +} + +function replayMessage( + data: SessionData, + message: SessionMessages[number], + thinking: boolean, + limits: Record, +): ReplayMessage { + if (message.info.role === "user") { + const prompt = messagePrompt(message) + if (!prompt.text.trim()) { + return { + commits: [], + } + } + + return { + commits: [ + { + kind: "user", + text: prompt.text, + phase: "start", + source: "system", + messageID: message.info.id, + }, + ], + } + } + + const commits: StreamCommit[] = [] + let patch: FooterPatch | undefined + + const info = apply( + data, + { + id: `bootstrap:message:${message.info.id}`, + type: "message.updated", + properties: { + sessionID: message.info.sessionID, + info: message.info, + }, + }, + message.info.sessionID, + thinking, + limits, + ) + commits.push(...info.commits) + patch = mergePatch(patch, info.footer?.patch) + + for (const part of message.parts) { + const next = apply( + data, + { + id: `bootstrap:part:${part.id}`, + type: "message.part.updated", + properties: { + sessionID: part.sessionID, + part, + time: 0, + }, + }, + message.info.sessionID, + thinking, + limits, + ) + patch = mergePatch(patch, next.footer?.patch) + commits.push(...next.commits) + } + + return { + commits, + patch, + } +} + +export function replaySession(input: ReplayInput): SessionReplay { + const data = createSessionData() + const commits: StreamCommit[] = [] + let patch: FooterPatch | undefined + + bootstrapSessionData({ + data, + messages: input.messages, + permissions: input.permissions, + questions: input.questions, + }) + + for (const message of input.messages) { + const next = replayMessage(data, message, input.thinking, input.limits) + commits.push(...next.commits) + patch = mergePatch(patch, next.patch) + } + + return { + data, + commits, + patch: replayPatch(data, patch), + } +} diff --git a/packages/opencode/src/cli/cmd/run/session.shared.ts b/packages/opencode/src/cli/cmd/run/session.shared.ts index 14e69da2cc..7dbce26efd 100644 --- a/packages/opencode/src/cli/cmd/run/session.shared.ts +++ b/packages/opencode/src/cli/cmd/run/session.shared.ts @@ -60,7 +60,7 @@ function fileSource( } } -function prompt(msg: SessionMessages[number]): RunPrompt { +export function messagePrompt(msg: SessionMessages[number]): RunPrompt { const parts: RunPrompt["parts"] = [] let text = msg.parts .filter((part): part is Extract => { @@ -135,7 +135,7 @@ function turn(msg: SessionMessages[number]): Turn | undefined { } return { - prompt: prompt(msg), + prompt: messagePrompt(msg), provider: msg.info.model.providerID, model: msg.info.model.modelID, variant: msg.info.model.variant, diff --git a/packages/opencode/src/cli/cmd/run/stream.transport.ts b/packages/opencode/src/cli/cmd/run/stream.transport.ts index 22240ebf56..efc208fc5b 100644 --- a/packages/opencode/src/cli/cmd/run/stream.transport.ts +++ b/packages/opencode/src/cli/cmd/run/stream.transport.ts @@ -27,6 +27,7 @@ import { reduceSessionData, type SessionData, } from "./session-data" +import { replaySession } from "./session-replay" import { bootstrapSubagentCalls, bootstrapSubagentData, @@ -66,6 +67,7 @@ type StreamInput = { directory?: string sessionID: string thinking: boolean + replayLimit?: number limits: () => Record footer: FooterApi trace?: Trace @@ -432,7 +434,12 @@ function createLayer(input: StreamInput) { blockerTick: 0, blockers: new Map(), } + let booting = true + const buffered: Event[] = [] + const replayedParts = new Set() const recovering = new Set() + const tracked = (sessionID: string | undefined) => + sessionID === input.sessionID || (!!sessionID && state.subagent.tabs.has(sessionID)) const currentSubagentState = () => { if (state.selectedSubagent && !state.subagent.tabs.has(state.selectedSubagent)) { state.selectedSubagent = undefined @@ -550,11 +557,11 @@ function createLayer(input: StreamInput) { } }) - const messages = (sessionID: string, limit: number) => + const messages = (sessionID: string, limit?: number) => Effect.promise(() => input.sdk.session.messages({ sessionID, - limit, + ...(typeof limit === "number" ? { limit } : {}), }), ).pipe( Effect.map((item) => item.data ?? []), @@ -596,7 +603,7 @@ function createLayer(input: StreamInput) { const bootstrap = Effect.fn("RunStreamTransport.bootstrap")(function* () { const [messagesList, children, permissions, questions] = yield* Effect.all( [ - messages(input.sessionID, SUBAGENT_BOOTSTRAP_LIMIT), + messages(input.sessionID, input.replayLimit), Effect.promise(() => input.sdk.session.children({ sessionID: input.sessionID, @@ -619,12 +626,22 @@ function createLayer(input: StreamInput) { }, ) - bootstrapSessionData({ - data: state.data, + const replay = replaySession({ messages: messagesList, permissions: permissions.filter((item) => item.sessionID === input.sessionID), questions: questions.filter((item) => item.sessionID === input.sessionID), + thinking: input.thinking, + limits: input.limits(), }) + state.data = replay.data + replayedParts.clear() + for (const [partID] of state.data.text) { + if (!state.data.part.has(partID)) { + continue + } + + replayedParts.add(partID) + } bootstrapSubagentData({ data: state.subagent, messages: messagesList, @@ -632,6 +649,7 @@ function createLayer(input: StreamInput) { permissions, questions, }) + clearFinishedSubagents(state.subagent) for (const request of [ ...state.data.permissions, @@ -642,9 +660,25 @@ function createLayer(input: StreamInput) { seedBlocker(request.id) } + const activeCommitIDs = new Set([...state.data.part.keys(), ...state.data.tools]) + for (const commit of replay.commits) { + input.trace?.write("ui.commit", commit) + input.footer.append(commit) + + if (commit.partID && activeCommitIDs.has(commit.partID)) { + continue + } + + yield* Effect.promise(() => input.footer.idle()).pipe(Effect.orElseSucceed(() => undefined)) + } + const snapshot = currentSubagentState() traceTabs(input.trace, [], snapshot.tabs) - syncFooter([], undefined, snapshot) + syncFooter([], replay.patch, snapshot) + yield* Effect.promise(() => input.footer.idle()).pipe(Effect.orElseSucceed(() => undefined)) + + booting = false + yield* drainBuffered() const sessions = [...state.subagent.tabs.keys()] if (sessions.length === 0) { @@ -738,6 +772,86 @@ function createLayer(input: StreamInput) { }) } + const applyEvent = Effect.fn("RunStreamTransport.applyEvent")(function* (event: Event) { + if (event.type === "message.part.delta" && event.properties.sessionID === input.sessionID) { + if (replayedParts.has(event.properties.partID)) { + const seen = state.data.text.get(event.properties.partID) ?? "" + if (seen.endsWith(event.properties.delta)) { + return + } + + replayedParts.delete(event.properties.partID) + } + } + + trackBlocker(event) + + const prev = event.type === "message.part.updated" ? listSubagentTabs(state.subagent) : undefined + const next = reduceSessionData({ + data: state.data, + event, + sessionID: input.sessionID, + thinking: input.thinking, + limits: input.limits(), + }) + state.data = next.data + + if ( + event.type === "message.part.updated" && + event.properties.part.sessionID === input.sessionID && + event.properties.part.type === "tool" && + event.properties.part.tool === "question" && + event.properties.part.state.status === "running" && + state.data.questions.length === 0 + ) { + yield* recoverQuestion(event.properties.part.id).pipe( + Effect.forkIn(scope, { startImmediately: true }), + Effect.asVoid, + ) + } + + const changed = reduceSubagentData({ + data: state.subagent, + event, + sessionID: input.sessionID, + thinking: input.thinking, + limits: input.limits(), + }) + if (changed && prev) { + traceTabs(input.trace, prev, listSubagentTabs(state.subagent)) + } + releaseBlocker(event) + + syncFooter(next.commits, next.footer?.patch, changed ? currentSubagentState() : undefined) + + touch(event) + yield* mark(event) + }) + + const drainBuffered = Effect.fn("RunStreamTransport.drainBuffered")(function* () { + let pending = buffered.splice(0) + while (pending.length > 0) { + const next: Event[] = [] + let changed = false + for (const event of pending) { + if (!tracked(sid(event))) { + next.push(event) + continue + } + + changed = true + yield* applyEvent(event) + } + + if (!changed) { + buffered.push(...next) + return + } + + pending = next + } + }) + const watch = Effect.fn("RunStreamTransport.watch")(() => Stream.fromAsyncIterable(events.stream, (error) => error instanceof Error ? error : new Error(String(error)), @@ -762,53 +876,25 @@ function createLayer(input: StreamInput) { } const sessionID = sid(event) - if (sessionID !== input.sessionID && (!sessionID || !state.subagent.tabs.has(sessionID))) { + if (booting) { + if (sessionID) { + input.trace?.write("recv.event", event) + buffered.push(event) + } + return + } + + if (!tracked(sessionID)) { + if (sessionID) { + input.trace?.write("recv.event", event) + buffered.push(event) + } return } input.trace?.write("recv.event", event) - trackBlocker(event) - - const prev = event.type === "message.part.updated" ? listSubagentTabs(state.subagent) : undefined - const next = reduceSessionData({ - data: state.data, - event, - sessionID: input.sessionID, - thinking: input.thinking, - limits: input.limits(), - }) - state.data = next.data - - if ( - event.type === "message.part.updated" && - event.properties.part.sessionID === input.sessionID && - event.properties.part.type === "tool" && - event.properties.part.tool === "question" && - event.properties.part.state.status === "running" && - state.data.questions.length === 0 - ) { - yield* recoverQuestion(event.properties.part.id).pipe( - Effect.forkIn(scope, { startImmediately: true }), - Effect.asVoid, - ) - } - - const changed = reduceSubagentData({ - data: state.subagent, - event, - sessionID: input.sessionID, - thinking: input.thinking, - limits: input.limits(), - }) - if (changed && prev) { - traceTabs(input.trace, prev, listSubagentTabs(state.subagent)) - } - releaseBlocker(event) - - syncFooter(next.commits, next.footer?.patch, changed ? currentSubagentState() : undefined) - - touch(event) - yield* mark(event) + yield* applyEvent(event) + yield* drainBuffered() }), ), Effect.catch((error) => (abort.signal.aborted ? Effect.void : fail(error))), @@ -823,8 +909,8 @@ function createLayer(input: StreamInput) { ), ) - yield* bootstrap() yield* Scope.provide(scope)(watch().pipe(Effect.forkScoped)) + yield* bootstrap() const runPromptTurn = Effect.fn("RunStreamTransport.runPromptTurn")(function* (next: SessionTurnInput) { if (closed || next.signal?.aborted || input.footer.isClosed) { diff --git a/packages/opencode/src/cli/cmd/run/subagent-data.ts b/packages/opencode/src/cli/cmd/run/subagent-data.ts index e9dcd6538a..ecd3def444 100644 --- a/packages/opencode/src/cli/cmd/run/subagent-data.ts +++ b/packages/opencode/src/cli/cmd/run/subagent-data.ts @@ -420,9 +420,26 @@ function ensureBlockerTab( title: string | undefined, kind: "permission" | "question", ) { - if (data.tabs.has(sessionID)) { + const current = data.tabs.get(sessionID) + if (current) { ensureDetail(data, sessionID) - return false + if (current.status !== "running") { + return false + } + + const next = { + ...current, + description: kind === "permission" ? "Pending permission" : "Pending question", + status: "running" as const, + title: current.title ?? title, + lastUpdatedAt: Date.now(), + } + if (sameSubagentTab(current, next)) { + return false + } + + data.tabs.set(sessionID, next) + return true } data.tabs.set(sessionID, { diff --git a/packages/opencode/src/cli/cmd/run/types.ts b/packages/opencode/src/cli/cmd/run/types.ts index 76ccc4c31e..0733d0e8b1 100644 --- a/packages/opencode/src/cli/cmd/run/types.ts +++ b/packages/opencode/src/cli/cmd/run/types.ts @@ -52,6 +52,7 @@ export type RunInput = { sessionID: string sessionTitle?: string resume?: boolean + replayLimit?: number agent: string | undefined model: PromptModel | undefined variant: string | undefined diff --git a/packages/opencode/test/cli/run/scrollback.surface.test.ts b/packages/opencode/test/cli/run/scrollback.surface.test.ts index 5b2966a5e1..c61de69f8c 100644 --- a/packages/opencode/test/cli/run/scrollback.surface.test.ts +++ b/packages/opencode/test/cli/run/scrollback.surface.test.ts @@ -96,6 +96,17 @@ function assistant(text: string, phase: StreamCommit["phase"] = "progress"): Str } } +function reasoning(text: string, phase: StreamCommit["phase"] = "progress"): StreamCommit { + return { + kind: "reasoning", + text, + phase, + source: "reasoning", + messageID: "msg-r-1", + partID: "part-r-1", + } +} + function user(text: string): StreamCommit { return { kind: "user", @@ -392,6 +403,39 @@ test("inserts spacers for new visible groups", async () => { } }) +test("renders replayed user, reasoning, and assistant output after completion", async () => { + const out = await setup() + + try { + const lines: string[] = [] + const take = () => { + const commits = claim(out.renderer) + try { + lines.push(...commits.flatMap((commit) => renderRows(commit).flatMap((row) => row.split("\n")))) + } finally { + destroy(commits) + } + } + + await out.scrollback.append(user("Hello you")) + take() + await out.scrollback.append(reasoning("Thinking: **Plan**\n\nSay hello.", "progress")) + await out.scrollback.complete() + take() + await out.scrollback.append(assistant("Hello.", "progress")) + await out.scrollback.complete() + take() + + const output = lines.join("\n") + expect(output).toContain("› Hello you") + expect(output).toContain("Thinking:") + expect(output).toContain("Plan") + expect(output).toContain("Hello.") + } finally { + out.scrollback.destroy() + } +}) + test("coalesces same-line tool progress into one snapshot", async () => { const out = await setup() diff --git a/packages/opencode/test/cli/run/session-replay.test.ts b/packages/opencode/test/cli/run/session-replay.test.ts new file mode 100644 index 0000000000..d22ba03f65 --- /dev/null +++ b/packages/opencode/test/cli/run/session-replay.test.ts @@ -0,0 +1,156 @@ +import { describe, expect, test } from "bun:test" +import { replaySession } from "@/cli/cmd/run/session-replay" +import type { SessionMessages } from "@/cli/cmd/run/session.shared" + +function userMessage(id: string, text: string): SessionMessages[number] { + return { + info: { + id, + sessionID: "session-1", + role: "user", + time: { + created: 1, + }, + agent: "build", + model: { + providerID: "openai", + modelID: "gpt-5", + }, + }, + parts: [ + { + id: `${id}-text`, + sessionID: "session-1", + messageID: id, + type: "text", + text, + }, + ], + } +} + +function assistantInfo(id: string) { + return { + id, + sessionID: "session-1", + role: "assistant" as const, + time: { + created: 2, + }, + parentID: "msg-user-1", + modelID: "gpt-5", + providerID: "openai", + mode: "chat", + agent: "build", + path: { + cwd: "/tmp", + root: "/tmp", + }, + cost: 0, + tokens: { + input: 1, + output: 1, + reasoning: 0, + cache: { + read: 0, + write: 0, + }, + }, + } +} + +function assistantMessage(id: string, text: string): SessionMessages[number] { + return { + info: assistantInfo(id), + parts: [ + { + id: `${id}-text`, + sessionID: "session-1", + messageID: id, + type: "text", + text, + time: { + start: 2, + end: 3, + }, + }, + ], + } +} + +function runningToolMessage(id: string): SessionMessages[number] { + return { + info: assistantInfo(id), + parts: [ + { + id: `${id}-tool`, + sessionID: "session-1", + messageID: id, + type: "tool", + callID: `${id}-call`, + tool: "bash", + state: { + status: "running", + input: { + command: "pwd", + }, + time: { + start: 2, + }, + }, + }, + ], + } +} + +describe("run session replay", () => { + test("replays persisted user and assistant history into scrollback commits", () => { + const out = replaySession({ + messages: [userMessage("msg-user-1", "Hello, whats the weather today?"), assistantMessage("msg-1", "What city or ZIP code should I check?")], + permissions: [], + questions: [], + thinking: true, + limits: {}, + }) + + expect(out.commits).toEqual([ + expect.objectContaining({ + kind: "user", + text: "Hello, whats the weather today?", + phase: "start", + source: "system", + messageID: "msg-user-1", + }), + expect.objectContaining({ + kind: "assistant", + text: "What city or ZIP code should I check?", + phase: "progress", + source: "assistant", + messageID: "msg-1", + }), + ]) + expect(out.patch).toEqual( + expect.objectContaining({ + phase: "idle", + status: "", + }), + ) + }) + + test("keeps the footer in a running state for resumed active tools", () => { + const out = replaySession({ + messages: [runningToolMessage("msg-1")], + permissions: [], + questions: [], + thinking: true, + limits: {}, + }) + + expect(out.patch).toEqual( + expect.objectContaining({ + phase: "running", + status: "running bash", + }), + ) + }) +}) diff --git a/packages/opencode/test/cli/run/stream.transport.test.ts b/packages/opencode/test/cli/run/stream.transport.test.ts index 3358ae774d..50bb456056 100644 --- a/packages/opencode/test/cli/run/stream.transport.test.ts +++ b/packages/opencode/test/cli/run/stream.transport.test.ts @@ -67,6 +67,22 @@ function idle(sessionID = "session-1") { } satisfies SdkEvent } +function retry(sessionID: string, attempt: number, message: string) { + return { + id: `evt-${sessionID}-retry-${attempt}`, + type: "session.status", + properties: { + sessionID, + status: { + type: "retry", + attempt, + message, + next: 1, + }, + }, + } satisfies SdkEvent +} + function assistant(id: string) { return { id: `evt-${id}`, @@ -290,12 +306,12 @@ function toolUpdated(part: SessionToolPart): SdkEvent { } } -function textDelta(messageID: string, partID: string, delta: string): SdkEvent { +function textDelta(messageID: string, partID: string, delta: string, sessionID = "session-1"): SdkEvent { return { id: `evt-${partID}-delta`, type: "message.part.delta", properties: { - sessionID: "session-1", + sessionID, messageID, partID, field: "text", @@ -331,6 +347,7 @@ function footer(fn?: (commit: StreamCommit) => void) { const commits: StreamCommit[] = [] const events: FooterEvent[] = [] let closed = false + let idleCalls = 0 const api: FooterApi = { get isClosed() { @@ -346,6 +363,7 @@ function footer(fn?: (commit: StreamCommit) => void) { fn?.(next) }, idle() { + idleCalls += 1 return Promise.resolve() }, close() { @@ -356,7 +374,7 @@ function footer(fn?: (commit: StreamCommit) => void) { }, } - return { api, commits, events } + return { api, commits, events, get idleCalls() { return idleCalls } } } function sdk( @@ -398,6 +416,296 @@ function sdk( } describe("run stream transport", () => { + test("replays persisted main-session history during bootstrap", async () => { + const src = eventFeed() + const ui = footer() + const transport = await createSessionTransport({ + sdk: sdk({ + stream: src.stream, + messages: async ({ sessionID }) => + sessionID === "session-1" + ? ok([ + assistantMessage({ + sessionID: "session-1", + id: "msg-1", + parts: [ + { + ...textPart("text-1", "msg-1", "Hello."), + time: { + start: 1, + end: 2, + }, + }, + ], + }), + ]) + : ok([]), + }), + sessionID: "session-1", + thinking: true, + limits: () => ({}), + footer: ui.api, + }) + + try { + await waitFor(() => ui.commits.find((item) => item.kind === "assistant" && item.text === "Hello.")) + expect(ui.idleCalls).toBeGreaterThan(0) + } finally { + src.close() + await transport.close() + } + }) + + test("applies the configured replay message limit during bootstrap", async () => { + const src = eventFeed() + const ui = footer() + const seen: Array<{ sessionID: string; limit?: number }> = [] + const transport = await createSessionTransport({ + sdk: sdk({ + stream: src.stream, + messages: async ({ sessionID, limit }) => { + seen.push({ sessionID, limit }) + return ok( + sessionID === "session-1" + ? [ + assistantMessage({ + sessionID: "session-1", + id: "msg-1", + parts: [ + { + ...textPart("text-1", "msg-1", "Hello."), + time: { + start: 1, + end: 2, + }, + }, + ], + }), + ] + : [], + ) + }, + }), + sessionID: "session-1", + thinking: true, + replayLimit: 1, + limits: () => ({}), + footer: ui.api, + }) + + try { + await waitFor(() => (ui.commits.length > 0 ? ui.commits : undefined)) + expect(seen[0]).toEqual({ sessionID: "session-1", limit: 1 }) + } finally { + src.close() + await transport.close() + } + }) + + test("skips buffered pre-bootstrap deltas already covered by replay history", async () => { + const src = eventFeed() + const ui = footer() + const gate = defer() + let transport: Awaited> | undefined + const task = createSessionTransport({ + sdk: sdk({ + stream: src.stream, + messages: async ({ sessionID }) => { + if (sessionID !== "session-1") { + return ok([]) + } + + await gate.promise + return ok([ + assistantMessage({ + sessionID: "session-1", + id: "msg-1", + parts: [textPart("text-1", "msg-1", "Hello")], + }), + ]) + }, + }), + sessionID: "session-1", + thinking: true, + limits: () => ({}), + footer: ui.api, + }) + + try { + await Promise.resolve() + src.push(textDelta("msg-1", "text-1", "lo")) + gate.resolve() + transport = await task + + await waitFor(() => (ui.commits.length > 0 ? ui.commits : undefined)) + await Bun.sleep(20) + expect(ui.commits.filter((item) => item.kind === "assistant")).toEqual([ + expect.objectContaining({ + text: "Hello", + }), + ]) + } finally { + src.close() + await transport?.close() + } + }) + + test("applies buffered pre-bootstrap deltas not yet persisted", async () => { + const src = eventFeed() + const ui = footer() + const gate = defer() + let transport: Awaited> | undefined + const task = createSessionTransport({ + sdk: sdk({ + stream: src.stream, + messages: async ({ sessionID }) => { + if (sessionID !== "session-1") { + return ok([]) + } + + await gate.promise + return ok([ + assistantMessage({ + sessionID: "session-1", + id: "msg-1", + parts: [textPart("text-1", "msg-1", "")], + }), + ]) + }, + }), + sessionID: "session-1", + thinking: true, + limits: () => ({}), + footer: ui.api, + }) + + try { + await Promise.resolve() + src.push(textDelta("msg-1", "text-1", "Hello")) + gate.resolve() + transport = await task + + await waitFor(() => (ui.commits.length > 0 ? ui.commits : undefined)) + await Bun.sleep(20) + expect(ui.commits.filter((item) => item.kind === "assistant")).toEqual([ + expect.objectContaining({ + text: "Hello", + }), + ]) + } finally { + src.close() + await transport?.close() + } + }) + + test("preserves running footer state for resumed active sessions", async () => { + const src = eventFeed() + const ui = footer() + const transport = await createSessionTransport({ + sdk: sdk({ + stream: src.stream, + messages: async ({ sessionID }) => + sessionID === "session-1" + ? ok([ + assistantMessage({ + sessionID: "session-1", + id: "msg-1", + parts: [ + runningTool({ + sessionID: "session-1", + messageID: "msg-1", + id: "bash-1", + callID: "call-1", + tool: "bash", + body: { + command: "pwd", + }, + }), + ], + }), + ]) + : ok([]), + }), + sessionID: "session-1", + thinking: true, + limits: () => ({}), + footer: ui.api, + }) + + try { + const patch = await waitFor(() => { + const item = ui.events.findLast((event) => event.type === "stream.patch") + return item?.type === "stream.patch" ? item.patch : undefined + }) + + expect(patch).toEqual( + expect.objectContaining({ + phase: "running", + status: "running bash", + }), + ) + } finally { + src.close() + await transport.close() + } + }) + + test("drops completed historical subagent tabs during bootstrap", async () => { + const src = eventFeed() + const ui = footer() + const transport = await createSessionTransport({ + sdk: sdk({ + stream: src.stream, + messages: async ({ sessionID }) => { + if (sessionID !== "session-1") { + return ok([]) + } + + return ok([ + assistantMessage({ + sessionID: "session-1", + id: "msg-1", + parts: [ + completedTool({ + sessionID: "session-1", + messageID: "msg-1", + id: "task-1", + callID: "call-1", + tool: "task", + body: { + description: "Explore run folder", + subagent_type: "explore", + }, + metadata: { + sessionId: "child-1", + }, + }), + ], + }), + ]) + }, + children: async () => ok([child("child-1")]), + }), + sessionID: "session-1", + thinking: true, + limits: () => ({}), + footer: ui.api, + }) + + try { + const state = await waitFor(() => { + const item = ui.events.findLast((event) => event.type === "stream.subagent") + return item?.type === "stream.subagent" ? item.state : undefined + }) + + expect(state.tabs).toEqual([]) + expect(state.details).toEqual({}) + } finally { + src.close() + await transport.close() + } + }) + test("bootstraps child tabs and resumed blocker input", async () => { const src = eventFeed() const ui = footer() @@ -487,7 +795,7 @@ describe("run stream transport", () => { expect.objectContaining({ sessionID: "child-1", label: "Explore", - description: "Explore run folder", + description: "Pending permission", status: "running", }), ]) @@ -565,16 +873,16 @@ describe("run stream transport", () => { messages: async ({ sessionID }) => { if (sessionID === "session-1") { return ok([ - assistantMessage({ - sessionID: "session-1", - id: "msg-1", - parts: [ - completedTool({ - sessionID: "session-1", - messageID: "msg-1", - id: "task-1", - callID: "call-1", - tool: "task", + assistantMessage({ + sessionID: "session-1", + id: "msg-1", + parts: [ + runningTool({ + sessionID: "session-1", + messageID: "msg-1", + id: "task-1", + callID: "call-1", + tool: "task", body: { description: "Explore run.ts", subagent_type: "explore", @@ -582,10 +890,10 @@ describe("run stream transport", () => { metadata: { sessionId: "child-1", }, - }), - ], - }), - ]) + }), + ], + }), + ]) } return sessionID === "child-1" @@ -711,6 +1019,109 @@ describe("run stream transport", () => { } }) + test("replays child events buffered during bootstrap once the tab is known", async () => { + const global = globalFeed() + const ui = footer() + const gate = defer() + let transport: Awaited> | undefined + const task = createSessionTransport({ + sdk: sdk({ + globalStream: global.stream, + messages: async ({ sessionID }) => { + if (sessionID !== "session-1") { + return ok([]) + } + + await gate.promise + return ok([]) + }, + children: async () => ok([]), + }), + sessionID: "session-1", + thinking: true, + limits: () => ({}), + footer: ui.api, + }) + + try { + await Promise.resolve() + global.push(globalEvent(retry("child-1", 1, "retry child"))) + global.push( + globalEvent({ + id: "evt-child-message", + type: "message.updated", + properties: { + sessionID: "child-1", + info: assistantMessage({ + sessionID: "child-1", + id: "msg-child-1", + parts: [], + }).info, + }, + }), + ) + global.push(globalEvent(textUpdated(textPart("txt-child-1", "msg-child-1", "", "child-1")))) + global.push(globalEvent(textDelta("msg-child-1", "txt-child-1", "Hello", "child-1"))) + global.push( + globalEvent( + toolUpdated( + runningTool({ + sessionID: "session-1", + messageID: "msg-1", + id: "task-1", + callID: "call-1", + tool: "task", + body: { + description: "Explore run.ts", + subagent_type: "explore", + }, + metadata: { + sessionId: "child-1", + }, + }), + ), + ), + ) + gate.resolve() + transport = await task + + await waitFor(() => { + const item = ui.events.findLast((event) => event.type === "stream.subagent") + return item?.type === "stream.subagent" && item.state.tabs.some((tab) => tab.sessionID === "child-1") + ? item + : undefined + }) + + transport.selectSubagent("child-1") + + const detail = await waitFor(() => { + const item = ui.events.findLast((event) => event.type === "stream.subagent") + const next = item?.type === "stream.subagent" ? item.state.details["child-1"] : undefined + return next?.commits.some((commit) => commit.kind === "error" && commit.text === "retry child") && + next.commits.some((commit) => commit.kind === "assistant" && commit.text === "Hello") + ? next + : undefined + }) + + expect(detail).toEqual({ + sessionID: "child-1", + commits: expect.arrayContaining([ + expect.objectContaining({ + kind: "error", + text: "retry child", + }), + expect.objectContaining({ + kind: "assistant", + text: "Hello", + }), + ]), + }) + } finally { + global.close() + await transport?.close() + } + }) + test("streams selected subagent output from global events while it is running", async () => { const global = globalFeed() const ui = footer()