run: replay session history on interactive resume

When resuming an interactive session with `run -i -s <id>` rebuild
the visible session view from persisted message history during
bootstrap, feeding synthetic events through the live reducer so
future deltas continue from correct offsets without duplicates.

Add --replay-limit N to cap bootstrap replay to the newest N
messages for large sessions.
This commit is contained in:
Simon Klee
2026-05-11 15:11:07 +02:00
parent c933504d9c
commit f252afbdf6
10 changed files with 997 additions and 72 deletions

View File

@@ -212,6 +212,10 @@ export const RunCommand = effectCmd({
type: "boolean",
describe: "show thinking blocks",
})
.option("replay-limit", {
type: "number",
describe: "cap interactive replay bootstrap to the newest N messages",
})
.option("interactive", {
alias: ["i"],
type: "boolean",
@@ -261,6 +265,17 @@ export const RunCommand = effectCmd({
die("--interactive cannot be used with --format json")
}
if (args["replay-limit"] !== undefined && !args.interactive) {
die("--replay-limit requires --interactive")
}
if (
args["replay-limit"] !== undefined &&
(!Number.isInteger(args["replay-limit"]) || args["replay-limit"] <= 0)
) {
die("--replay-limit must be a positive integer")
}
if (args.interactive && !process.stdout.isTTY) {
die("--interactive requires a TTY stdout")
}
@@ -767,6 +782,7 @@ export const RunCommand = effectCmd({
sessionID,
sessionTitle: sess.title,
resume: Boolean(args.session || args.continue) && !args.fork,
replayLimit: args["replay-limit"],
agent,
model,
variant: args.variant,
@@ -802,6 +818,7 @@ export const RunCommand = effectCmd({
agent: args.agent,
model,
variant: args.variant,
replayLimit: args["replay-limit"],
files,
initialInput,
thinking,

View File

@@ -51,6 +51,7 @@ type RunRuntimeInput = {
files: RunInput["files"]
initialInput?: string
thinking: boolean
replayLimit?: number
demo?: RunInput["demo"]
}
@@ -67,6 +68,7 @@ type RunLocalInput = {
files: RunInput["files"]
initialInput?: string
thinking: boolean
replayLimit?: number
demo?: RunInput["demo"]
}
@@ -490,6 +492,7 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise<void> {
directory: ctx.directory,
sessionID: state.sessionID,
thinking: input.thinking,
replayLimit: input.replayLimit,
limits: () => state.limits,
footer,
trace: log,
@@ -722,6 +725,7 @@ export async function runInteractiveLocalMode(input: RunLocalInput): Promise<voi
files: input.files,
initialInput: input.initialInput,
thinking: input.thinking,
replayLimit: input.replayLimit,
demo: input.demo,
resolveSession: () => {
if (session) {
@@ -774,6 +778,7 @@ export async function runInteractiveMode(input: RunInput & { createSession?: Cre
files: input.files,
initialInput: input.initialInput,
thinking: input.thinking,
replayLimit: input.replayLimit,
demo: input.demo,
boot: async () => ({
sdk: input.sdk,

View File

@@ -0,0 +1,188 @@
import type { Event, PermissionRequest, QuestionRequest } from "@opencode-ai/sdk/v2"
import { bootstrapSessionData, createSessionData, reduceSessionData, type SessionData } from "./session-data"
import { messagePrompt, type SessionMessages } from "./session.shared"
import type { FooterPatch, StreamCommit } from "./types"
type ReplayInput = {
messages: SessionMessages
permissions: PermissionRequest[]
questions: QuestionRequest[]
thinking: boolean
limits: Record<string, number>
}
export type SessionReplay = {
data: SessionData
commits: StreamCommit[]
patch?: FooterPatch
}
type ReplayMessage = {
commits: StreamCommit[]
patch?: FooterPatch
}
function apply(data: SessionData, event: Event, sessionID: string, thinking: boolean, limits: Record<string, number>) {
return reduceSessionData({
data,
event,
sessionID,
thinking,
limits,
})
}
function mergePatch(left: FooterPatch | undefined, right: FooterPatch | undefined) {
if (!left) {
return right
}
if (!right) {
return left
}
return {
...left,
...right,
}
}
function active(data: SessionData) {
return data.part.size > 0 || data.tools.size > 0
}
function replayPatch(data: SessionData, patch: FooterPatch | undefined) {
if (active(data)) {
if (!patch) {
return {
phase: "running",
} satisfies FooterPatch
}
return {
...patch,
phase: "running",
} satisfies FooterPatch
}
if (data.permissions.length > 0 || data.questions.length > 0) {
if (!patch) {
return {
phase: "idle",
} satisfies FooterPatch
}
return {
...patch,
phase: "idle",
} satisfies FooterPatch
}
if (!patch) {
return undefined
}
return {
...patch,
phase: "idle",
status: "",
} satisfies FooterPatch
}
function replayMessage(
data: SessionData,
message: SessionMessages[number],
thinking: boolean,
limits: Record<string, number>,
): ReplayMessage {
if (message.info.role === "user") {
const prompt = messagePrompt(message)
if (!prompt.text.trim()) {
return {
commits: [],
}
}
return {
commits: [
{
kind: "user",
text: prompt.text,
phase: "start",
source: "system",
messageID: message.info.id,
},
],
}
}
const commits: StreamCommit[] = []
let patch: FooterPatch | undefined
const info = apply(
data,
{
id: `bootstrap:message:${message.info.id}`,
type: "message.updated",
properties: {
sessionID: message.info.sessionID,
info: message.info,
},
},
message.info.sessionID,
thinking,
limits,
)
commits.push(...info.commits)
patch = mergePatch(patch, info.footer?.patch)
for (const part of message.parts) {
const next = apply(
data,
{
id: `bootstrap:part:${part.id}`,
type: "message.part.updated",
properties: {
sessionID: part.sessionID,
part,
time: 0,
},
},
message.info.sessionID,
thinking,
limits,
)
patch = mergePatch(patch, next.footer?.patch)
commits.push(...next.commits)
}
return {
commits,
patch,
}
}
export function replaySession(input: ReplayInput): SessionReplay {
const data = createSessionData()
const commits: StreamCommit[] = []
let patch: FooterPatch | undefined
bootstrapSessionData({
data,
messages: input.messages,
permissions: input.permissions,
questions: input.questions,
})
for (const message of input.messages) {
const next = replayMessage(data, message, input.thinking, input.limits)
commits.push(...next.commits)
patch = mergePatch(patch, next.patch)
}
return {
data,
commits,
patch: replayPatch(data, patch),
}
}

View File

@@ -60,7 +60,7 @@ function fileSource(
}
}
function prompt(msg: SessionMessages[number]): RunPrompt {
export function messagePrompt(msg: SessionMessages[number]): RunPrompt {
const parts: RunPrompt["parts"] = []
let text = msg.parts
.filter((part): part is Extract<SessionMessages[number]["parts"][number], { type: "text" }> => {
@@ -135,7 +135,7 @@ function turn(msg: SessionMessages[number]): Turn | undefined {
}
return {
prompt: prompt(msg),
prompt: messagePrompt(msg),
provider: msg.info.model.providerID,
model: msg.info.model.modelID,
variant: msg.info.model.variant,

View File

@@ -27,6 +27,7 @@ import {
reduceSessionData,
type SessionData,
} from "./session-data"
import { replaySession } from "./session-replay"
import {
bootstrapSubagentCalls,
bootstrapSubagentData,
@@ -66,6 +67,7 @@ type StreamInput = {
directory?: string
sessionID: string
thinking: boolean
replayLimit?: number
limits: () => Record<string, number>
footer: FooterApi
trace?: Trace
@@ -432,7 +434,12 @@ function createLayer(input: StreamInput) {
blockerTick: 0,
blockers: new Map(),
}
let booting = true
const buffered: Event[] = []
const replayedParts = new Set<string>()
const recovering = new Set<string>()
const tracked = (sessionID: string | undefined) =>
sessionID === input.sessionID || (!!sessionID && state.subagent.tabs.has(sessionID))
const currentSubagentState = () => {
if (state.selectedSubagent && !state.subagent.tabs.has(state.selectedSubagent)) {
state.selectedSubagent = undefined
@@ -550,11 +557,11 @@ function createLayer(input: StreamInput) {
}
})
const messages = (sessionID: string, limit: number) =>
const messages = (sessionID: string, limit?: number) =>
Effect.promise(() =>
input.sdk.session.messages({
sessionID,
limit,
...(typeof limit === "number" ? { limit } : {}),
}),
).pipe(
Effect.map((item) => item.data ?? []),
@@ -596,7 +603,7 @@ function createLayer(input: StreamInput) {
const bootstrap = Effect.fn("RunStreamTransport.bootstrap")(function* () {
const [messagesList, children, permissions, questions] = yield* Effect.all(
[
messages(input.sessionID, SUBAGENT_BOOTSTRAP_LIMIT),
messages(input.sessionID, input.replayLimit),
Effect.promise(() =>
input.sdk.session.children({
sessionID: input.sessionID,
@@ -619,12 +626,22 @@ function createLayer(input: StreamInput) {
},
)
bootstrapSessionData({
data: state.data,
const replay = replaySession({
messages: messagesList,
permissions: permissions.filter((item) => item.sessionID === input.sessionID),
questions: questions.filter((item) => item.sessionID === input.sessionID),
thinking: input.thinking,
limits: input.limits(),
})
state.data = replay.data
replayedParts.clear()
for (const [partID] of state.data.text) {
if (!state.data.part.has(partID)) {
continue
}
replayedParts.add(partID)
}
bootstrapSubagentData({
data: state.subagent,
messages: messagesList,
@@ -632,6 +649,7 @@ function createLayer(input: StreamInput) {
permissions,
questions,
})
clearFinishedSubagents(state.subagent)
for (const request of [
...state.data.permissions,
@@ -642,9 +660,25 @@ function createLayer(input: StreamInput) {
seedBlocker(request.id)
}
const activeCommitIDs = new Set([...state.data.part.keys(), ...state.data.tools])
for (const commit of replay.commits) {
input.trace?.write("ui.commit", commit)
input.footer.append(commit)
if (commit.partID && activeCommitIDs.has(commit.partID)) {
continue
}
yield* Effect.promise(() => input.footer.idle()).pipe(Effect.orElseSucceed(() => undefined))
}
const snapshot = currentSubagentState()
traceTabs(input.trace, [], snapshot.tabs)
syncFooter([], undefined, snapshot)
syncFooter([], replay.patch, snapshot)
yield* Effect.promise(() => input.footer.idle()).pipe(Effect.orElseSucceed(() => undefined))
booting = false
yield* drainBuffered()
const sessions = [...state.subagent.tabs.keys()]
if (sessions.length === 0) {
@@ -738,6 +772,86 @@ function createLayer(input: StreamInput) {
})
}
const applyEvent = Effect.fn("RunStreamTransport.applyEvent")(function* (event: Event) {
if (event.type === "message.part.delta" && event.properties.sessionID === input.sessionID) {
if (replayedParts.has(event.properties.partID)) {
const seen = state.data.text.get(event.properties.partID) ?? ""
if (seen.endsWith(event.properties.delta)) {
return
}
replayedParts.delete(event.properties.partID)
}
}
trackBlocker(event)
const prev = event.type === "message.part.updated" ? listSubagentTabs(state.subagent) : undefined
const next = reduceSessionData({
data: state.data,
event,
sessionID: input.sessionID,
thinking: input.thinking,
limits: input.limits(),
})
state.data = next.data
if (
event.type === "message.part.updated" &&
event.properties.part.sessionID === input.sessionID &&
event.properties.part.type === "tool" &&
event.properties.part.tool === "question" &&
event.properties.part.state.status === "running" &&
state.data.questions.length === 0
) {
yield* recoverQuestion(event.properties.part.id).pipe(
Effect.forkIn(scope, { startImmediately: true }),
Effect.asVoid,
)
}
const changed = reduceSubagentData({
data: state.subagent,
event,
sessionID: input.sessionID,
thinking: input.thinking,
limits: input.limits(),
})
if (changed && prev) {
traceTabs(input.trace, prev, listSubagentTabs(state.subagent))
}
releaseBlocker(event)
syncFooter(next.commits, next.footer?.patch, changed ? currentSubagentState() : undefined)
touch(event)
yield* mark(event)
})
const drainBuffered = Effect.fn("RunStreamTransport.drainBuffered")(function* () {
let pending = buffered.splice(0)
while (pending.length > 0) {
const next: Event[] = []
let changed = false
for (const event of pending) {
if (!tracked(sid(event))) {
next.push(event)
continue
}
changed = true
yield* applyEvent(event)
}
if (!changed) {
buffered.push(...next)
return
}
pending = next
}
})
const watch = Effect.fn("RunStreamTransport.watch")(() =>
Stream.fromAsyncIterable(events.stream, (error) =>
error instanceof Error ? error : new Error(String(error)),
@@ -762,53 +876,25 @@ function createLayer(input: StreamInput) {
}
const sessionID = sid(event)
if (sessionID !== input.sessionID && (!sessionID || !state.subagent.tabs.has(sessionID))) {
if (booting) {
if (sessionID) {
input.trace?.write("recv.event", event)
buffered.push(event)
}
return
}
if (!tracked(sessionID)) {
if (sessionID) {
input.trace?.write("recv.event", event)
buffered.push(event)
}
return
}
input.trace?.write("recv.event", event)
trackBlocker(event)
const prev = event.type === "message.part.updated" ? listSubagentTabs(state.subagent) : undefined
const next = reduceSessionData({
data: state.data,
event,
sessionID: input.sessionID,
thinking: input.thinking,
limits: input.limits(),
})
state.data = next.data
if (
event.type === "message.part.updated" &&
event.properties.part.sessionID === input.sessionID &&
event.properties.part.type === "tool" &&
event.properties.part.tool === "question" &&
event.properties.part.state.status === "running" &&
state.data.questions.length === 0
) {
yield* recoverQuestion(event.properties.part.id).pipe(
Effect.forkIn(scope, { startImmediately: true }),
Effect.asVoid,
)
}
const changed = reduceSubagentData({
data: state.subagent,
event,
sessionID: input.sessionID,
thinking: input.thinking,
limits: input.limits(),
})
if (changed && prev) {
traceTabs(input.trace, prev, listSubagentTabs(state.subagent))
}
releaseBlocker(event)
syncFooter(next.commits, next.footer?.patch, changed ? currentSubagentState() : undefined)
touch(event)
yield* mark(event)
yield* applyEvent(event)
yield* drainBuffered()
}),
),
Effect.catch((error) => (abort.signal.aborted ? Effect.void : fail(error))),
@@ -823,8 +909,8 @@ function createLayer(input: StreamInput) {
),
)
yield* bootstrap()
yield* Scope.provide(scope)(watch().pipe(Effect.forkScoped))
yield* bootstrap()
const runPromptTurn = Effect.fn("RunStreamTransport.runPromptTurn")(function* (next: SessionTurnInput) {
if (closed || next.signal?.aborted || input.footer.isClosed) {

View File

@@ -420,9 +420,26 @@ function ensureBlockerTab(
title: string | undefined,
kind: "permission" | "question",
) {
if (data.tabs.has(sessionID)) {
const current = data.tabs.get(sessionID)
if (current) {
ensureDetail(data, sessionID)
return false
if (current.status !== "running") {
return false
}
const next = {
...current,
description: kind === "permission" ? "Pending permission" : "Pending question",
status: "running" as const,
title: current.title ?? title,
lastUpdatedAt: Date.now(),
}
if (sameSubagentTab(current, next)) {
return false
}
data.tabs.set(sessionID, next)
return true
}
data.tabs.set(sessionID, {

View File

@@ -52,6 +52,7 @@ export type RunInput = {
sessionID: string
sessionTitle?: string
resume?: boolean
replayLimit?: number
agent: string | undefined
model: PromptModel | undefined
variant: string | undefined

View File

@@ -96,6 +96,17 @@ function assistant(text: string, phase: StreamCommit["phase"] = "progress"): Str
}
}
function reasoning(text: string, phase: StreamCommit["phase"] = "progress"): StreamCommit {
return {
kind: "reasoning",
text,
phase,
source: "reasoning",
messageID: "msg-r-1",
partID: "part-r-1",
}
}
function user(text: string): StreamCommit {
return {
kind: "user",
@@ -392,6 +403,39 @@ test("inserts spacers for new visible groups", async () => {
}
})
test("renders replayed user, reasoning, and assistant output after completion", async () => {
const out = await setup()
try {
const lines: string[] = []
const take = () => {
const commits = claim(out.renderer)
try {
lines.push(...commits.flatMap((commit) => renderRows(commit).flatMap((row) => row.split("\n"))))
} finally {
destroy(commits)
}
}
await out.scrollback.append(user("Hello you"))
take()
await out.scrollback.append(reasoning("Thinking: **Plan**\n\nSay hello.", "progress"))
await out.scrollback.complete()
take()
await out.scrollback.append(assistant("Hello.", "progress"))
await out.scrollback.complete()
take()
const output = lines.join("\n")
expect(output).toContain(" Hello you")
expect(output).toContain("Thinking:")
expect(output).toContain("Plan")
expect(output).toContain("Hello.")
} finally {
out.scrollback.destroy()
}
})
test("coalesces same-line tool progress into one snapshot", async () => {
const out = await setup()

View File

@@ -0,0 +1,156 @@
import { describe, expect, test } from "bun:test"
import { replaySession } from "@/cli/cmd/run/session-replay"
import type { SessionMessages } from "@/cli/cmd/run/session.shared"
function userMessage(id: string, text: string): SessionMessages[number] {
return {
info: {
id,
sessionID: "session-1",
role: "user",
time: {
created: 1,
},
agent: "build",
model: {
providerID: "openai",
modelID: "gpt-5",
},
},
parts: [
{
id: `${id}-text`,
sessionID: "session-1",
messageID: id,
type: "text",
text,
},
],
}
}
function assistantInfo(id: string) {
return {
id,
sessionID: "session-1",
role: "assistant" as const,
time: {
created: 2,
},
parentID: "msg-user-1",
modelID: "gpt-5",
providerID: "openai",
mode: "chat",
agent: "build",
path: {
cwd: "/tmp",
root: "/tmp",
},
cost: 0,
tokens: {
input: 1,
output: 1,
reasoning: 0,
cache: {
read: 0,
write: 0,
},
},
}
}
function assistantMessage(id: string, text: string): SessionMessages[number] {
return {
info: assistantInfo(id),
parts: [
{
id: `${id}-text`,
sessionID: "session-1",
messageID: id,
type: "text",
text,
time: {
start: 2,
end: 3,
},
},
],
}
}
function runningToolMessage(id: string): SessionMessages[number] {
return {
info: assistantInfo(id),
parts: [
{
id: `${id}-tool`,
sessionID: "session-1",
messageID: id,
type: "tool",
callID: `${id}-call`,
tool: "bash",
state: {
status: "running",
input: {
command: "pwd",
},
time: {
start: 2,
},
},
},
],
}
}
describe("run session replay", () => {
test("replays persisted user and assistant history into scrollback commits", () => {
const out = replaySession({
messages: [userMessage("msg-user-1", "Hello, whats the weather today?"), assistantMessage("msg-1", "What city or ZIP code should I check?")],
permissions: [],
questions: [],
thinking: true,
limits: {},
})
expect(out.commits).toEqual([
expect.objectContaining({
kind: "user",
text: "Hello, whats the weather today?",
phase: "start",
source: "system",
messageID: "msg-user-1",
}),
expect.objectContaining({
kind: "assistant",
text: "What city or ZIP code should I check?",
phase: "progress",
source: "assistant",
messageID: "msg-1",
}),
])
expect(out.patch).toEqual(
expect.objectContaining({
phase: "idle",
status: "",
}),
)
})
test("keeps the footer in a running state for resumed active tools", () => {
const out = replaySession({
messages: [runningToolMessage("msg-1")],
permissions: [],
questions: [],
thinking: true,
limits: {},
})
expect(out.patch).toEqual(
expect.objectContaining({
phase: "running",
status: "running bash",
}),
)
})
})

View File

@@ -67,6 +67,22 @@ function idle(sessionID = "session-1") {
} satisfies SdkEvent
}
function retry(sessionID: string, attempt: number, message: string) {
return {
id: `evt-${sessionID}-retry-${attempt}`,
type: "session.status",
properties: {
sessionID,
status: {
type: "retry",
attempt,
message,
next: 1,
},
},
} satisfies SdkEvent
}
function assistant(id: string) {
return {
id: `evt-${id}`,
@@ -290,12 +306,12 @@ function toolUpdated(part: SessionToolPart): SdkEvent {
}
}
function textDelta(messageID: string, partID: string, delta: string): SdkEvent {
function textDelta(messageID: string, partID: string, delta: string, sessionID = "session-1"): SdkEvent {
return {
id: `evt-${partID}-delta`,
type: "message.part.delta",
properties: {
sessionID: "session-1",
sessionID,
messageID,
partID,
field: "text",
@@ -331,6 +347,7 @@ function footer(fn?: (commit: StreamCommit) => void) {
const commits: StreamCommit[] = []
const events: FooterEvent[] = []
let closed = false
let idleCalls = 0
const api: FooterApi = {
get isClosed() {
@@ -346,6 +363,7 @@ function footer(fn?: (commit: StreamCommit) => void) {
fn?.(next)
},
idle() {
idleCalls += 1
return Promise.resolve()
},
close() {
@@ -356,7 +374,7 @@ function footer(fn?: (commit: StreamCommit) => void) {
},
}
return { api, commits, events }
return { api, commits, events, get idleCalls() { return idleCalls } }
}
function sdk(
@@ -398,6 +416,296 @@ function sdk(
}
describe("run stream transport", () => {
test("replays persisted main-session history during bootstrap", async () => {
const src = eventFeed()
const ui = footer()
const transport = await createSessionTransport({
sdk: sdk({
stream: src.stream,
messages: async ({ sessionID }) =>
sessionID === "session-1"
? ok([
assistantMessage({
sessionID: "session-1",
id: "msg-1",
parts: [
{
...textPart("text-1", "msg-1", "Hello."),
time: {
start: 1,
end: 2,
},
},
],
}),
])
: ok([]),
}),
sessionID: "session-1",
thinking: true,
limits: () => ({}),
footer: ui.api,
})
try {
await waitFor(() => ui.commits.find((item) => item.kind === "assistant" && item.text === "Hello."))
expect(ui.idleCalls).toBeGreaterThan(0)
} finally {
src.close()
await transport.close()
}
})
test("applies the configured replay message limit during bootstrap", async () => {
const src = eventFeed()
const ui = footer()
const seen: Array<{ sessionID: string; limit?: number }> = []
const transport = await createSessionTransport({
sdk: sdk({
stream: src.stream,
messages: async ({ sessionID, limit }) => {
seen.push({ sessionID, limit })
return ok(
sessionID === "session-1"
? [
assistantMessage({
sessionID: "session-1",
id: "msg-1",
parts: [
{
...textPart("text-1", "msg-1", "Hello."),
time: {
start: 1,
end: 2,
},
},
],
}),
]
: [],
)
},
}),
sessionID: "session-1",
thinking: true,
replayLimit: 1,
limits: () => ({}),
footer: ui.api,
})
try {
await waitFor(() => (ui.commits.length > 0 ? ui.commits : undefined))
expect(seen[0]).toEqual({ sessionID: "session-1", limit: 1 })
} finally {
src.close()
await transport.close()
}
})
test("skips buffered pre-bootstrap deltas already covered by replay history", async () => {
const src = eventFeed()
const ui = footer()
const gate = defer<void>()
let transport: Awaited<ReturnType<typeof createSessionTransport>> | undefined
const task = createSessionTransport({
sdk: sdk({
stream: src.stream,
messages: async ({ sessionID }) => {
if (sessionID !== "session-1") {
return ok([])
}
await gate.promise
return ok([
assistantMessage({
sessionID: "session-1",
id: "msg-1",
parts: [textPart("text-1", "msg-1", "Hello")],
}),
])
},
}),
sessionID: "session-1",
thinking: true,
limits: () => ({}),
footer: ui.api,
})
try {
await Promise.resolve()
src.push(textDelta("msg-1", "text-1", "lo"))
gate.resolve()
transport = await task
await waitFor(() => (ui.commits.length > 0 ? ui.commits : undefined))
await Bun.sleep(20)
expect(ui.commits.filter((item) => item.kind === "assistant")).toEqual([
expect.objectContaining({
text: "Hello",
}),
])
} finally {
src.close()
await transport?.close()
}
})
test("applies buffered pre-bootstrap deltas not yet persisted", async () => {
const src = eventFeed()
const ui = footer()
const gate = defer<void>()
let transport: Awaited<ReturnType<typeof createSessionTransport>> | undefined
const task = createSessionTransport({
sdk: sdk({
stream: src.stream,
messages: async ({ sessionID }) => {
if (sessionID !== "session-1") {
return ok([])
}
await gate.promise
return ok([
assistantMessage({
sessionID: "session-1",
id: "msg-1",
parts: [textPart("text-1", "msg-1", "")],
}),
])
},
}),
sessionID: "session-1",
thinking: true,
limits: () => ({}),
footer: ui.api,
})
try {
await Promise.resolve()
src.push(textDelta("msg-1", "text-1", "Hello"))
gate.resolve()
transport = await task
await waitFor(() => (ui.commits.length > 0 ? ui.commits : undefined))
await Bun.sleep(20)
expect(ui.commits.filter((item) => item.kind === "assistant")).toEqual([
expect.objectContaining({
text: "Hello",
}),
])
} finally {
src.close()
await transport?.close()
}
})
test("preserves running footer state for resumed active sessions", async () => {
const src = eventFeed()
const ui = footer()
const transport = await createSessionTransport({
sdk: sdk({
stream: src.stream,
messages: async ({ sessionID }) =>
sessionID === "session-1"
? ok([
assistantMessage({
sessionID: "session-1",
id: "msg-1",
parts: [
runningTool({
sessionID: "session-1",
messageID: "msg-1",
id: "bash-1",
callID: "call-1",
tool: "bash",
body: {
command: "pwd",
},
}),
],
}),
])
: ok([]),
}),
sessionID: "session-1",
thinking: true,
limits: () => ({}),
footer: ui.api,
})
try {
const patch = await waitFor(() => {
const item = ui.events.findLast((event) => event.type === "stream.patch")
return item?.type === "stream.patch" ? item.patch : undefined
})
expect(patch).toEqual(
expect.objectContaining({
phase: "running",
status: "running bash",
}),
)
} finally {
src.close()
await transport.close()
}
})
test("drops completed historical subagent tabs during bootstrap", async () => {
const src = eventFeed()
const ui = footer()
const transport = await createSessionTransport({
sdk: sdk({
stream: src.stream,
messages: async ({ sessionID }) => {
if (sessionID !== "session-1") {
return ok([])
}
return ok([
assistantMessage({
sessionID: "session-1",
id: "msg-1",
parts: [
completedTool({
sessionID: "session-1",
messageID: "msg-1",
id: "task-1",
callID: "call-1",
tool: "task",
body: {
description: "Explore run folder",
subagent_type: "explore",
},
metadata: {
sessionId: "child-1",
},
}),
],
}),
])
},
children: async () => ok([child("child-1")]),
}),
sessionID: "session-1",
thinking: true,
limits: () => ({}),
footer: ui.api,
})
try {
const state = await waitFor(() => {
const item = ui.events.findLast((event) => event.type === "stream.subagent")
return item?.type === "stream.subagent" ? item.state : undefined
})
expect(state.tabs).toEqual([])
expect(state.details).toEqual({})
} finally {
src.close()
await transport.close()
}
})
test("bootstraps child tabs and resumed blocker input", async () => {
const src = eventFeed()
const ui = footer()
@@ -487,7 +795,7 @@ describe("run stream transport", () => {
expect.objectContaining({
sessionID: "child-1",
label: "Explore",
description: "Explore run folder",
description: "Pending permission",
status: "running",
}),
])
@@ -565,16 +873,16 @@ describe("run stream transport", () => {
messages: async ({ sessionID }) => {
if (sessionID === "session-1") {
return ok([
assistantMessage({
sessionID: "session-1",
id: "msg-1",
parts: [
completedTool({
sessionID: "session-1",
messageID: "msg-1",
id: "task-1",
callID: "call-1",
tool: "task",
assistantMessage({
sessionID: "session-1",
id: "msg-1",
parts: [
runningTool({
sessionID: "session-1",
messageID: "msg-1",
id: "task-1",
callID: "call-1",
tool: "task",
body: {
description: "Explore run.ts",
subagent_type: "explore",
@@ -582,10 +890,10 @@ describe("run stream transport", () => {
metadata: {
sessionId: "child-1",
},
}),
],
}),
])
}),
],
}),
])
}
return sessionID === "child-1"
@@ -711,6 +1019,109 @@ describe("run stream transport", () => {
}
})
test("replays child events buffered during bootstrap once the tab is known", async () => {
const global = globalFeed()
const ui = footer()
const gate = defer<void>()
let transport: Awaited<ReturnType<typeof createSessionTransport>> | undefined
const task = createSessionTransport({
sdk: sdk({
globalStream: global.stream,
messages: async ({ sessionID }) => {
if (sessionID !== "session-1") {
return ok([])
}
await gate.promise
return ok([])
},
children: async () => ok([]),
}),
sessionID: "session-1",
thinking: true,
limits: () => ({}),
footer: ui.api,
})
try {
await Promise.resolve()
global.push(globalEvent(retry("child-1", 1, "retry child")))
global.push(
globalEvent({
id: "evt-child-message",
type: "message.updated",
properties: {
sessionID: "child-1",
info: assistantMessage({
sessionID: "child-1",
id: "msg-child-1",
parts: [],
}).info,
},
}),
)
global.push(globalEvent(textUpdated(textPart("txt-child-1", "msg-child-1", "", "child-1"))))
global.push(globalEvent(textDelta("msg-child-1", "txt-child-1", "Hello", "child-1")))
global.push(
globalEvent(
toolUpdated(
runningTool({
sessionID: "session-1",
messageID: "msg-1",
id: "task-1",
callID: "call-1",
tool: "task",
body: {
description: "Explore run.ts",
subagent_type: "explore",
},
metadata: {
sessionId: "child-1",
},
}),
),
),
)
gate.resolve()
transport = await task
await waitFor(() => {
const item = ui.events.findLast((event) => event.type === "stream.subagent")
return item?.type === "stream.subagent" && item.state.tabs.some((tab) => tab.sessionID === "child-1")
? item
: undefined
})
transport.selectSubagent("child-1")
const detail = await waitFor(() => {
const item = ui.events.findLast((event) => event.type === "stream.subagent")
const next = item?.type === "stream.subagent" ? item.state.details["child-1"] : undefined
return next?.commits.some((commit) => commit.kind === "error" && commit.text === "retry child") &&
next.commits.some((commit) => commit.kind === "assistant" && commit.text === "Hello")
? next
: undefined
})
expect(detail).toEqual({
sessionID: "child-1",
commits: expect.arrayContaining([
expect.objectContaining({
kind: "error",
text: "retry child",
}),
expect.objectContaining({
kind: "assistant",
text: "Hello",
}),
]),
})
} finally {
global.close()
await transport?.close()
}
})
test("streams selected subagent output from global events while it is running", async () => {
const global = globalFeed()
const ui = footer()