diff --git a/packages/opencode/src/cli/cmd/run/runtime.ts b/packages/opencode/src/cli/cmd/run/runtime.ts index 882ff2e6c7..fd1708dc10 100644 --- a/packages/opencode/src/cli/cmd/run/runtime.ts +++ b/packages/opencode/src/cli/cmd/run/runtime.ts @@ -13,7 +13,9 @@ // local sessions, // 4. runs the prompt queue until the footer closes. import { createOpencodeClient } from "@opencode-ai/sdk/v2" +import { Effect } from "effect" import { Flag } from "@opencode-ai/core/flag/flag" +import { AppRuntime } from "@/effect/app-runtime" import { createRunDemo } from "./demo" import { resolveDiffStyle, resolveFooterKeybinds, resolveModelInfo, resolveSessionInfo } from "./runtime.boot" import { createRuntimeLifecycle } from "./runtime.lifecycle" @@ -70,9 +72,10 @@ type RunLocalInput = { demo?: RunInput["demo"] } +type StreamModule = Awaited type StreamState = { - mod: Awaited - handle: Awaited["createSessionTransport"]>> + mod: StreamModule + handle: import("./stream.transport").SessionTransport } type ResolvedSession = { @@ -485,15 +488,27 @@ async function runInteractiveRuntime(input: RunRuntimeInput): Promise { throw new Error("runtime closed") } - const handle = await mod.createSessionTransport({ - sdk: ctx.sdk, - directory: ctx.directory, - sessionID: state.sessionID, - thinking: input.thinking, - limits: () => state.limits, - footer, - trace: log, - }) + // Yield SessionTransport.Service inside Effect.gen and call svc.make. + // The handle holds its own internal scope; teardown happens via handle.close(). + const inner = await AppRuntime.runPromise( + Effect.gen(function* () { + const svc = yield* mod.SessionTransport.Service + return yield* svc.make({ + sdk: ctx.sdk, + directory: ctx.directory, + sessionID: state.sessionID, + thinking: input.thinking, + limits: () => state.limits, + footer, + trace: log, + }) + }).pipe(Effect.provide(mod.SessionTransport.layer)), + ) + const handle: import("./stream.transport").SessionTransport = { + runPromptTurn: (next) => AppRuntime.runPromise(inner.runPromptTurn(next)), + selectSubagent: (sessionID) => AppRuntime.runSync(inner.selectSubagent(sessionID)), + close: () => AppRuntime.runPromise(inner.close()), + } if (footer.isClosed) { await handle.close() throw new Error("runtime closed") diff --git a/packages/opencode/src/cli/cmd/run/stream.transport.ts b/packages/opencode/src/cli/cmd/run/stream.transport.ts index 22240ebf56..2be80e38bf 100644 --- a/packages/opencode/src/cli/cmd/run/stream.transport.ts +++ b/packages/opencode/src/cli/cmd/run/stream.transport.ts @@ -17,7 +17,7 @@ // delayed idle from an older turn cannot complete a newer busy turn. import type { Event, GlobalEvent, OpencodeClient } from "@opencode-ai/sdk/v2" import { Context, Deferred, Effect, Exit, Layer, Scope, Stream } from "effect" -import { makeRuntime } from "@/effect/run-service" +import { AppRuntime } from "@/effect/app-runtime" import { blockerStatus, bootstrapSessionData, @@ -61,7 +61,7 @@ type Trace = { write(type: string, data?: unknown): void } -type StreamInput = { +export type StreamInput = { sdk: OpencodeClient directory?: string sessionID: string @@ -89,12 +89,26 @@ export type SessionTurnInput = { signal?: AbortSignal } +export type TransportHandle = { + readonly runPromptTurn: (input: SessionTurnInput) => Effect.Effect + readonly selectSubagent: (sessionID: string | undefined) => Effect.Effect + readonly close: () => Effect.Effect +} + +/** Backward-compatible promise-shaped handle used by Promise-based callers. */ export type SessionTransport = { runPromptTurn(input: SessionTurnInput): Promise selectSubagent(sessionID: string | undefined): void close(): Promise } +export interface Interface { + /** Build a transport handle for the given input. Holds its own internal scope; tear down via `handle.close()`. */ + readonly make: (input: StreamInput) => Effect.Effect +} + +export class Service extends Context.Service()("@opencode/SessionTransport") {} + type State = { data: SessionData subagent: SubagentData @@ -107,14 +121,6 @@ type State = { blockers: Map } -type TransportService = { - readonly runPromptTurn: (input: SessionTurnInput) => Effect.Effect - readonly selectSubagent: (sessionID: string | undefined) => Effect.Effect - readonly close: () => Effect.Effect -} - -class Service extends Context.Service()("@opencode/RunStreamTransport") {} - function sid(event: Event): string | undefined { if (event.type === "message.updated") { return event.properties.sessionID @@ -369,679 +375,676 @@ function traceTabs(trace: Trace | undefined, prev: FooterSubagentTab[], next: Fo } } -function createLayer(input: StreamInput) { - return Layer.fresh( - Layer.effect( - Service, - Effect.gen(function* () { - const scope = yield* Scope.make() - const abort = yield* Scope.provide(scope)( - Effect.acquireRelease( - Effect.sync(() => new AbortController()), - (abort) => Effect.sync(() => abort.abort()), - ), - ) - let closed = false - let closeStream = () => {} - const halt = () => { - abort.abort() - } - const stop = () => { - input.signal?.removeEventListener("abort", halt) - abort.abort() - closeStream() - } - const closeScope = () => { - if (closed) { - return Effect.void - } +const make = Effect.fn("SessionTransport.make")(function* (input: StreamInput) { + const scope = yield* Scope.make() + const abort = yield* Scope.provide(scope)( + Effect.acquireRelease( + Effect.sync(() => new AbortController()), + (abort) => Effect.sync(() => abort.abort()), + ), + ) + let closed = false + let closeStream = () => {} + const halt = () => { + abort.abort() + } + const stop = () => { + input.signal?.removeEventListener("abort", halt) + abort.abort() + closeStream() + } + const closeScope = () => { + if (closed) { + return Effect.void + } - closed = true - stop() - return Scope.close(scope, Exit.void) - } + closed = true + stop() + return Scope.close(scope, Exit.void) + } - input.signal?.addEventListener("abort", halt, { once: true }) - yield* Effect.addFinalizer(() => closeScope()) + input.signal?.addEventListener("abort", halt, { once: true }) - const events = yield* Scope.provide(scope)( - Effect.acquireRelease( - Effect.promise(() => - input.sdk.global.event({ - signal: abort.signal, - }), - ), - (events) => - Effect.sync(() => { - void events.stream.return(undefined).catch(() => {}) - }), - ), - ) - closeStream = () => { + const events = yield* Scope.provide(scope)( + Effect.acquireRelease( + Effect.promise(() => + input.sdk.global.event({ + signal: abort.signal, + }), + ), + (events) => + Effect.sync(() => { void events.stream.return(undefined).catch(() => {}) - } - input.trace?.write("recv.subscribe", { - sessionID: input.sessionID, - }) + }), + ), + ) + closeStream = () => { + void events.stream.return(undefined).catch(() => {}) + } + input.trace?.write("recv.subscribe", { + sessionID: input.sessionID, + }) - const state: State = { - data: createSessionData(), - subagent: createSubagentData(), - tick: 0, - footerView: { type: "prompt" }, - blockerTick: 0, - blockers: new Map(), - } - const recovering = new Set() - const currentSubagentState = () => { - if (state.selectedSubagent && !state.subagent.tabs.has(state.selectedSubagent)) { - state.selectedSubagent = undefined - } + const state: State = { + data: createSessionData(), + subagent: createSubagentData(), + tick: 0, + footerView: { type: "prompt" }, + blockerTick: 0, + blockers: new Map(), + } + const recovering = new Set() + const currentSubagentState = () => { + if (state.selectedSubagent && !state.subagent.tabs.has(state.selectedSubagent)) { + state.selectedSubagent = undefined + } - return snapshotSelectedSubagentData(state.subagent, state.selectedSubagent) + return snapshotSelectedSubagentData(state.subagent, state.selectedSubagent) + } + + const seedBlocker = (id: string) => { + if (state.blockers.has(id)) { + return + } + + state.blockerTick += 1 + state.blockers.set(id, state.blockerTick) + } + + const trackBlocker = (event: Event) => { + if (event.type !== "permission.asked" && event.type !== "question.asked") { + return + } + + if (event.properties.sessionID !== input.sessionID && !state.subagent.tabs.has(event.properties.sessionID)) { + return + } + + seedBlocker(event.properties.id) + } + + const releaseBlocker = (event: Event) => { + if ( + event.type !== "permission.replied" && + event.type !== "question.replied" && + event.type !== "question.rejected" + ) { + return + } + + state.blockers.delete(event.properties.requestID) + } + + const syncFooter = (commits: StreamCommit[], patch?: FooterPatch, nextSubagent?: FooterSubagentState) => { + const current = pickView(state.data, state.subagent, state.blockers) + const footer = composeFooter({ + patch, + subagent: nextSubagent, + current, + previous: state.footerView, + }) + + if (commits.length === 0 && !footer) { + state.footerView = current + return + } + + input.trace?.write("reduce.output", { + commits, + footer: traceFooterOutput(footer), + }) + writeSessionOutput( + { + footer: input.footer, + trace: input.trace, + }, + { + commits, + footer, + }, + ) + state.footerView = current + } + + const recoverQuestion = Effect.fn("RunStreamTransport.recoverQuestion")(function* (partID: string) { + if (recovering.has(partID)) { + return + } + + recovering.add(partID) + try { + while (!closed && !abort.signal.aborted && !input.footer.isClosed) { + if (state.data.questions.length > 0 || !state.data.tools.has(partID)) { + return } - const seedBlocker = (id: string) => { - if (state.blockers.has(id)) { - return - } - - state.blockerTick += 1 - state.blockers.set(id, state.blockerTick) + const questions = yield* Effect.promise(() => input.sdk.question.list()).pipe( + Effect.map((item) => (item.data ?? []).filter((request) => request.sessionID === input.sessionID)), + Effect.orElseSucceed(() => []), + ) + if (state.data.questions.length > 0 || !state.data.tools.has(partID)) { + return } - const trackBlocker = (event: Event) => { - if (event.type !== "permission.asked" && event.type !== "question.asked") { - return - } - - if (event.properties.sessionID !== input.sessionID && !state.subagent.tabs.has(event.properties.sessionID)) { - return - } - - seedBlocker(event.properties.id) - } - - const releaseBlocker = (event: Event) => { - if ( - event.type !== "permission.replied" && - event.type !== "question.replied" && - event.type !== "question.rejected" - ) { - return - } - - state.blockers.delete(event.properties.requestID) - } - - const syncFooter = (commits: StreamCommit[], patch?: FooterPatch, nextSubagent?: FooterSubagentState) => { - const current = pickView(state.data, state.subagent, state.blockers) - const footer = composeFooter({ - patch, - subagent: nextSubagent, - current, - previous: state.footerView, - }) - - if (commits.length === 0 && !footer) { - state.footerView = current - return - } - - input.trace?.write("reduce.output", { - commits, - footer: traceFooterOutput(footer), - }) - writeSessionOutput( - { - footer: input.footer, - trace: input.trace, - }, - { - commits, - footer, - }, - ) - state.footerView = current - } - - const recoverQuestion = Effect.fn("RunStreamTransport.recoverQuestion")(function* (partID: string) { - if (recovering.has(partID)) { - return - } - - recovering.add(partID) - try { - while (!closed && !abort.signal.aborted && !input.footer.isClosed) { - if (state.data.questions.length > 0 || !state.data.tools.has(partID)) { - return - } - - const questions = yield* Effect.promise(() => input.sdk.question.list()).pipe( - Effect.map((item) => (item.data ?? []).filter((request) => request.sessionID === input.sessionID)), - Effect.orElseSucceed(() => []), - ) - if (state.data.questions.length > 0 || !state.data.tools.has(partID)) { - return - } - - if (questions.length > 0) { - bootstrapSessionData({ - data: state.data, - messages: [], - permissions: [], - questions, - }) - for (const request of questions) { - seedBlocker(request.id) - } - input.trace?.write("question.recover", { - sessionID: input.sessionID, - requests: questions.map((request) => request.id), - }) - syncFooter([]) - return - } - - yield* Effect.sleep("250 millis") - } - } finally { - recovering.delete(partID) - } - }) - - const messages = (sessionID: string, limit: number) => - Effect.promise(() => - input.sdk.session.messages({ - sessionID, - limit, - }), - ).pipe( - Effect.map((item) => item.data ?? []), - Effect.orElseSucceed(() => []), - ) - - const bootstrapSubagentHistory = Effect.fn("RunStreamTransport.bootstrapSubagentHistory")(function* ( - sessions: string[], - ) { - yield* Effect.forEach( - sessions, - (sessionID) => - messages(sessionID, SUBAGENT_CALL_BOOTSTRAP_LIMIT).pipe( - Effect.tap((messagesList) => - Effect.sync(() => { - if ( - !bootstrapSubagentCalls({ - data: state.subagent, - sessionID, - messages: messagesList, - thinking: input.thinking, - limits: input.limits(), - }) - ) { - return - } - - syncFooter([], undefined, currentSubagentState()) - }), - ), - ), - { - concurrency: 4, - discard: true, - }, - ) - }) - - const bootstrap = Effect.fn("RunStreamTransport.bootstrap")(function* () { - const [messagesList, children, permissions, questions] = yield* Effect.all( - [ - messages(input.sessionID, SUBAGENT_BOOTSTRAP_LIMIT), - Effect.promise(() => - input.sdk.session.children({ - sessionID: input.sessionID, - }), - ).pipe( - Effect.map((item) => item.data ?? []), - Effect.orElseSucceed(() => []), - ), - Effect.promise(() => input.sdk.permission.list()).pipe( - Effect.map((item) => item.data ?? []), - Effect.orElseSucceed(() => []), - ), - Effect.promise(() => input.sdk.question.list()).pipe( - Effect.map((item) => item.data ?? []), - Effect.orElseSucceed(() => []), - ), - ], - { - concurrency: "unbounded", - }, - ) - + if (questions.length > 0) { bootstrapSessionData({ data: state.data, - messages: messagesList, - permissions: permissions.filter((item) => item.sessionID === input.sessionID), - questions: questions.filter((item) => item.sessionID === input.sessionID), - }) - bootstrapSubagentData({ - data: state.subagent, - messages: messagesList, - children, - permissions, + messages: [], + permissions: [], questions, }) - - for (const request of [ - ...state.data.permissions, - ...listSubagentPermissions(state.subagent), - ...state.data.questions, - ...listSubagentQuestions(state.subagent), - ].sort((a, b) => a.id.localeCompare(b.id))) { + for (const request of questions) { seedBlocker(request.id) } + input.trace?.write("question.recover", { + sessionID: input.sessionID, + requests: questions.map((request) => request.id), + }) + syncFooter([]) + return + } - const snapshot = currentSubagentState() - traceTabs(input.trace, [], snapshot.tabs) - syncFooter([], undefined, snapshot) + yield* Effect.sleep("250 millis") + } + } finally { + recovering.delete(partID) + } + }) - const sessions = [...state.subagent.tabs.keys()] - if (sessions.length === 0) { + const messages = (sessionID: string, limit: number) => + Effect.promise(() => + input.sdk.session.messages({ + sessionID, + limit, + }), + ).pipe( + Effect.map((item) => item.data ?? []), + Effect.orElseSucceed(() => []), + ) + + const bootstrapSubagentHistory = Effect.fn("RunStreamTransport.bootstrapSubagentHistory")(function* ( + sessions: string[], + ) { + yield* Effect.forEach( + sessions, + (sessionID) => + messages(sessionID, SUBAGENT_CALL_BOOTSTRAP_LIMIT).pipe( + Effect.tap((messagesList) => + Effect.sync(() => { + if ( + !bootstrapSubagentCalls({ + data: state.subagent, + sessionID, + messages: messagesList, + thinking: input.thinking, + limits: input.limits(), + }) + ) { + return + } + + syncFooter([], undefined, currentSubagentState()) + }), + ), + ), + { + concurrency: 4, + discard: true, + }, + ) + }) + + const bootstrap = Effect.fn("RunStreamTransport.bootstrap")(function* () { + const [messagesList, children, permissions, questions] = yield* Effect.all( + [ + messages(input.sessionID, SUBAGENT_BOOTSTRAP_LIMIT), + Effect.promise(() => + input.sdk.session.children({ + sessionID: input.sessionID, + }), + ).pipe( + Effect.map((item) => item.data ?? []), + Effect.orElseSucceed(() => []), + ), + Effect.promise(() => input.sdk.permission.list()).pipe( + Effect.map((item) => item.data ?? []), + Effect.orElseSucceed(() => []), + ), + Effect.promise(() => input.sdk.question.list()).pipe( + Effect.map((item) => item.data ?? []), + Effect.orElseSucceed(() => []), + ), + ], + { + concurrency: "unbounded", + }, + ) + + bootstrapSessionData({ + data: state.data, + messages: messagesList, + permissions: permissions.filter((item) => item.sessionID === input.sessionID), + questions: questions.filter((item) => item.sessionID === input.sessionID), + }) + bootstrapSubagentData({ + data: state.subagent, + messages: messagesList, + children, + permissions, + questions, + }) + + for (const request of [ + ...state.data.permissions, + ...listSubagentPermissions(state.subagent), + ...state.data.questions, + ...listSubagentQuestions(state.subagent), + ].sort((a, b) => a.id.localeCompare(b.id))) { + seedBlocker(request.id) + } + + const snapshot = currentSubagentState() + traceTabs(input.trace, [], snapshot.tabs) + syncFooter([], undefined, snapshot) + + const sessions = [...state.subagent.tabs.keys()] + if (sessions.length === 0) { + return + } + + yield* bootstrapSubagentHistory(sessions).pipe( + Effect.forkIn(scope, { startImmediately: true }), + Effect.asVoid, + ) + }) + + const idle = Effect.fn("RunStreamTransport.idle")((fallback: boolean) => + Effect.promise(() => input.sdk.session.status()).pipe( + Effect.map((out) => { + const item = out.data?.[input.sessionID] + return !item || item.type === "idle" + }), + Effect.orElseSucceed(() => fallback), + ), + ) + + const fail = Effect.fn("RunStreamTransport.fail")(function* (error: unknown) { + if (state.fault) { + return + } + + state.fault = error + const next = state.wait + state.wait = undefined + if (!next) { + return + } + + yield* Deferred.fail(next.done, error).pipe(Effect.ignore) + }) + + const touch = (event: Event) => { + const next = state.wait + if (!next || !active(event, input.sessionID)) { + return + } + + next.live = true + } + + const complete = Effect.fn("RunStreamTransport.complete")(function* (next: Wait, fallback: boolean) { + if (state.wait !== next || !next.armed || !next.live) { + return + } + + if (!(yield* idle(fallback)) || state.wait !== next) { + return + } + + state.tick = next.tick + 1 + state.wait = undefined + yield* Deferred.succeed(next.done, undefined).pipe(Effect.ignore) + }) + + const mark = Effect.fn("RunStreamTransport.mark")(function* (event: Event) { + if ( + event.type !== "session.status" || + event.properties.sessionID !== input.sessionID || + event.properties.status.type !== "idle" + ) { + return + } + + const next = state.wait + if (!next) { + return + } + + yield* complete(next, true) + }) + + const poll = Effect.fn("RunStreamTransport.poll")(function* (next: Wait, signal: AbortSignal) { + while (state.wait === next && !signal.aborted && !input.footer.isClosed && !closed) { + yield* Effect.sleep("250 millis") + yield* complete(next, false) + } + }) + + const flush = (type: "turn.abort" | "turn.cancel") => { + const commits: StreamCommit[] = [] + flushInterrupted(state.data, commits) + syncFooter(commits) + input.trace?.write(type, { + sessionID: input.sessionID, + }) + } + + const watch = Effect.fn("RunStreamTransport.watch")(() => + Stream.fromAsyncIterable(events.stream, (error) => + error instanceof Error ? error : new Error(String(error)), + ).pipe( + Stream.takeUntil(() => input.footer.isClosed || abort.signal.aborted), + Stream.runForEach( + Effect.fn("RunStreamTransport.event")(function* (item: unknown) { + if (input.footer.isClosed) { + abort.abort() return } - yield* bootstrapSubagentHistory(sessions).pipe( - Effect.forkIn(scope, { startImmediately: true }), - Effect.asVoid, - ) - }) + if (isMatchingDisposeEvent(item, input.directory)) { + yield* fail(new Error("instance disposed")) + yield* closeScope() + return + } - const idle = Effect.fn("RunStreamTransport.idle")((fallback: boolean) => - Effect.promise(() => input.sdk.session.status()).pipe( - Effect.map((out) => { - const item = out.data?.[input.sessionID] - return !item || item.type === "idle" + const event = globalPayloadEvent(item) + if (!event) { + return + } + + const sessionID = sid(event) + if (sessionID !== input.sessionID && (!sessionID || !state.subagent.tabs.has(sessionID))) { + return + } + + input.trace?.write("recv.event", event) + trackBlocker(event) + + const prev = event.type === "message.part.updated" ? listSubagentTabs(state.subagent) : undefined + const next = reduceSessionData({ + data: state.data, + event, + sessionID: input.sessionID, + thinking: input.thinking, + limits: input.limits(), + }) + state.data = next.data + + if ( + event.type === "message.part.updated" && + event.properties.part.sessionID === input.sessionID && + event.properties.part.type === "tool" && + event.properties.part.tool === "question" && + event.properties.part.state.status === "running" && + state.data.questions.length === 0 + ) { + yield* recoverQuestion(event.properties.part.id).pipe( + Effect.forkIn(scope, { startImmediately: true }), + Effect.asVoid, + ) + } + + const changed = reduceSubagentData({ + data: state.subagent, + event, + sessionID: input.sessionID, + thinking: input.thinking, + limits: input.limits(), + }) + if (changed && prev) { + traceTabs(input.trace, prev, listSubagentTabs(state.subagent)) + } + releaseBlocker(event) + + syncFooter(next.commits, next.footer?.patch, changed ? currentSubagentState() : undefined) + + touch(event) + yield* mark(event) + }), + ), + Effect.catch((error) => (abort.signal.aborted ? Effect.void : fail(error))), + Effect.ensuring( + Effect.gen(function* () { + if (!abort.signal.aborted && !state.fault) { + yield* fail(new Error("global event stream closed")) + } + closeStream() + }), + ), + ), + ) + + yield* bootstrap() + yield* Scope.provide(scope)(watch().pipe(Effect.forkScoped)) + + const runPromptTurn = Effect.fn("RunStreamTransport.runPromptTurn")(function* (next: SessionTurnInput) { + if (closed || next.signal?.aborted || input.footer.isClosed) { + return + } + + if (state.fault) { + yield* Effect.fail(state.fault) + return + } + + if (state.wait) { + yield* Effect.fail(new Error("prompt already running")) + return + } + + const prev = listSubagentTabs(state.subagent) + if (clearFinishedSubagents(state.subagent)) { + const snapshot = currentSubagentState() + traceTabs(input.trace, prev, snapshot.tabs) + syncFooter([], undefined, snapshot) + } + + const item: Wait = { + tick: state.tick, + armed: false, + live: false, + done: yield* Deferred.make(), + } + state.wait = item + state.data.announced = false + + const turn = new AbortController() + const stop = () => { + turn.abort() + } + next.signal?.addEventListener("abort", stop, { once: true }) + abort.signal.addEventListener("abort", stop, { once: true }) + yield* poll(item, turn.signal).pipe(Effect.forkIn(scope, { startImmediately: true }), Effect.asVoid) + + const req = { + sessionID: input.sessionID, + agent: next.agent, + model: next.model, + variant: next.variant, + parts: [ + ...(next.includeFiles ? next.files : []), + { type: "text" as const, text: next.prompt.text }, + ...next.prompt.parts, + ], + } + const command = next.prompt.command + const send = command + ? Effect.sync(() => { + input.trace?.write("send.command", { sessionID: input.sessionID, command: command.name }) + }).pipe( + Effect.andThen( + Effect.promise(() => + input.sdk.session.command( + { + sessionID: input.sessionID, + agent: next.agent, + model: next.model ? `${next.model.providerID}/${next.model.modelID}` : undefined, + variant: next.variant, + command: command.name, + arguments: command.arguments, + parts: [ + ...(next.includeFiles ? next.files : []), + ...next.prompt.parts.filter( + (item): item is Extract => item.type === "file", + ), + ], + }, + { signal: turn.signal }, + ), + ).pipe( + Effect.tap(() => + Effect.sync(() => { + input.trace?.write("send.command.ok", { + sessionID: input.sessionID, + command: command.name, + }) + item.armed = true + item.live = true + }), + ), + Effect.flatMap(() => Deferred.succeed(item.done, undefined).pipe(Effect.ignore)), + Effect.catch((error) => Deferred.fail(item.done, error).pipe(Effect.ignore)), + Effect.forkIn(scope, { startImmediately: true }), + Effect.asVoid, + ), + ), + ) + : Effect.sync(() => { + input.trace?.write("send.prompt", req) + }).pipe( + Effect.andThen( + Effect.promise(() => + input.sdk.session.promptAsync(req, { + signal: turn.signal, + }), + ), + ), + Effect.tap(() => + Effect.sync(() => { + input.trace?.write("send.prompt.ok", { + sessionID: input.sessionID, + }) + item.armed = true }), - Effect.orElseSucceed(() => fallback), ), ) - const fail = Effect.fn("RunStreamTransport.fail")(function* (error: unknown) { - if (state.fault) { - return + yield* send.pipe( + Effect.flatMap(() => { + if (turn.signal.aborted || next.signal?.aborted || input.footer.isClosed || closed) { + if (state.wait === item) { + state.wait = undefined } - - state.fault = error - const next = state.wait - state.wait = undefined - if (!next) { - return - } - - yield* Deferred.fail(next.done, error).pipe(Effect.ignore) - }) - - const touch = (event: Event) => { - const next = state.wait - if (!next || !active(event, input.sessionID)) { - return - } - - next.live = true + flush("turn.abort") + return Effect.void } - const complete = Effect.fn("RunStreamTransport.complete")(function* (next: Wait, fallback: boolean) { - if (state.wait !== next || !next.armed || !next.live) { - return - } - - if (!(yield* idle(fallback)) || state.wait !== next) { - return - } - - state.tick = next.tick + 1 - state.wait = undefined - yield* Deferred.succeed(next.done, undefined).pipe(Effect.ignore) - }) - - const mark = Effect.fn("RunStreamTransport.mark")(function* (event: Event) { - if ( - event.type !== "session.status" || - event.properties.sessionID !== input.sessionID || - event.properties.status.type !== "idle" - ) { - return - } - - const next = state.wait - if (!next) { - return - } - - yield* complete(next, true) - }) - - const poll = Effect.fn("RunStreamTransport.poll")(function* (next: Wait, signal: AbortSignal) { - while (state.wait === next && !signal.aborted && !input.footer.isClosed && !closed) { - yield* Effect.sleep("250 millis") - yield* complete(next, false) - } - }) - - const flush = (type: "turn.abort" | "turn.cancel") => { - const commits: StreamCommit[] = [] - flushInterrupted(state.data, commits) - syncFooter(commits) - input.trace?.write(type, { - sessionID: input.sessionID, + if (!input.footer.isClosed && !state.data.announced) { + input.trace?.write("ui.patch", { + phase: "running", + status: "waiting for assistant", + }) + input.footer.event({ + type: "turn.wait", }) } - const watch = Effect.fn("RunStreamTransport.watch")(() => - Stream.fromAsyncIterable(events.stream, (error) => - error instanceof Error ? error : new Error(String(error)), - ).pipe( - Stream.takeUntil(() => input.footer.isClosed || abort.signal.aborted), - Stream.runForEach( - Effect.fn("RunStreamTransport.event")(function* (item: unknown) { - if (input.footer.isClosed) { - abort.abort() - return - } - - if (isMatchingDisposeEvent(item, input.directory)) { - yield* fail(new Error("instance disposed")) - yield* closeScope() - return - } - - const event = globalPayloadEvent(item) - if (!event) { - return - } - - const sessionID = sid(event) - if (sessionID !== input.sessionID && (!sessionID || !state.subagent.tabs.has(sessionID))) { - return - } - - input.trace?.write("recv.event", event) - trackBlocker(event) - - const prev = event.type === "message.part.updated" ? listSubagentTabs(state.subagent) : undefined - const next = reduceSessionData({ - data: state.data, - event, - sessionID: input.sessionID, - thinking: input.thinking, - limits: input.limits(), - }) - state.data = next.data - - if ( - event.type === "message.part.updated" && - event.properties.part.sessionID === input.sessionID && - event.properties.part.type === "tool" && - event.properties.part.tool === "question" && - event.properties.part.state.status === "running" && - state.data.questions.length === 0 - ) { - yield* recoverQuestion(event.properties.part.id).pipe( - Effect.forkIn(scope, { startImmediately: true }), - Effect.asVoid, - ) - } - - const changed = reduceSubagentData({ - data: state.subagent, - event, - sessionID: input.sessionID, - thinking: input.thinking, - limits: input.limits(), - }) - if (changed && prev) { - traceTabs(input.trace, prev, listSubagentTabs(state.subagent)) - } - releaseBlocker(event) - - syncFooter(next.commits, next.footer?.patch, changed ? currentSubagentState() : undefined) - - touch(event) - yield* mark(event) - }), - ), - Effect.catch((error) => (abort.signal.aborted ? Effect.void : fail(error))), - Effect.ensuring( - Effect.gen(function* () { - if (!abort.signal.aborted && !state.fault) { - yield* fail(new Error("global event stream closed")) - } - closeStream() - }), - ), - ), - ) - - yield* bootstrap() - yield* Scope.provide(scope)(watch().pipe(Effect.forkScoped)) - - const runPromptTurn = Effect.fn("RunStreamTransport.runPromptTurn")(function* (next: SessionTurnInput) { - if (closed || next.signal?.aborted || input.footer.isClosed) { - return + if (state.tick > item.tick) { + if (state.wait === item) { + state.wait = undefined } + return Effect.void + } - if (state.fault) { - yield* Effect.fail(state.fault) - return - } - - if (state.wait) { - yield* Effect.fail(new Error("prompt already running")) - return - } - - const prev = listSubagentTabs(state.subagent) - if (clearFinishedSubagents(state.subagent)) { - const snapshot = currentSubagentState() - traceTabs(input.trace, prev, snapshot.tabs) - syncFooter([], undefined, snapshot) - } - - const item: Wait = { - tick: state.tick, - armed: false, - live: false, - done: yield* Deferred.make(), - } - state.wait = item - state.data.announced = false - - const turn = new AbortController() - const stop = () => { - turn.abort() - } - next.signal?.addEventListener("abort", stop, { once: true }) - abort.signal.addEventListener("abort", stop, { once: true }) - yield* poll(item, turn.signal).pipe(Effect.forkIn(scope, { startImmediately: true }), Effect.asVoid) - - const req = { - sessionID: input.sessionID, - agent: next.agent, - model: next.model, - variant: next.variant, - parts: [ - ...(next.includeFiles ? next.files : []), - { type: "text" as const, text: next.prompt.text }, - ...next.prompt.parts, - ], - } - const command = next.prompt.command - const send = command - ? Effect.sync(() => { - input.trace?.write("send.command", { sessionID: input.sessionID, command: command.name }) - }).pipe( - Effect.andThen( - Effect.promise(() => - input.sdk.session.command( - { - sessionID: input.sessionID, - agent: next.agent, - model: next.model ? `${next.model.providerID}/${next.model.modelID}` : undefined, - variant: next.variant, - command: command.name, - arguments: command.arguments, - parts: [ - ...(next.includeFiles ? next.files : []), - ...next.prompt.parts.filter( - (item): item is Extract => item.type === "file", - ), - ], - }, - { signal: turn.signal }, - ), - ).pipe( - Effect.tap(() => - Effect.sync(() => { - input.trace?.write("send.command.ok", { - sessionID: input.sessionID, - command: command.name, - }) - item.armed = true - item.live = true - }), - ), - Effect.flatMap(() => Deferred.succeed(item.done, undefined).pipe(Effect.ignore)), - Effect.catch((error) => Deferred.fail(item.done, error).pipe(Effect.ignore)), - Effect.forkIn(scope, { startImmediately: true }), - Effect.asVoid, - ), - ), - ) - : Effect.sync(() => { - input.trace?.write("send.prompt", req) - }).pipe( - Effect.andThen( - Effect.promise(() => - input.sdk.session.promptAsync(req, { - signal: turn.signal, - }), - ), - ), - Effect.tap(() => - Effect.sync(() => { - input.trace?.write("send.prompt.ok", { - sessionID: input.sessionID, - }) - item.armed = true - }), - ), - ) - - yield* send.pipe( - Effect.flatMap(() => { - if (turn.signal.aborted || next.signal?.aborted || input.footer.isClosed || closed) { - if (state.wait === item) { - state.wait = undefined - } - flush("turn.abort") - return Effect.void - } - - if (!input.footer.isClosed && !state.data.announced) { - input.trace?.write("ui.patch", { - phase: "running", - status: "waiting for assistant", - }) - input.footer.event({ - type: "turn.wait", - }) - } - - if (state.tick > item.tick) { - if (state.wait === item) { - state.wait = undefined - } - return Effect.void - } - - return waitTurn(item.done, turn.signal).pipe( - Effect.flatMap((status) => - Effect.sync(() => { - if (state.wait === item) { - state.wait = undefined - } - - if (status === "abort") { - flush("turn.abort") - } - }), - ), - ) - }), - Effect.catch((error) => { + return waitTurn(item.done, turn.signal).pipe( + Effect.flatMap((status) => + Effect.sync(() => { if (state.wait === item) { state.wait = undefined } - const canceled = turn.signal.aborted || next.signal?.aborted === true || input.footer.isClosed || closed - if (canceled) { - flush("turn.cancel") - return Effect.void + if (status === "abort") { + flush("turn.abort") } - - if (error === state.fault) { - return Effect.fail(error) - } - - input.trace?.write("send.prompt.error", { - sessionID: input.sessionID, - error: formatUnknownError(error), - }) - return Effect.fail(error) }), - Effect.ensuring( - Effect.sync(() => { - input.trace?.write("turn.end", { - sessionID: input.sessionID, - }) - next.signal?.removeEventListener("abort", stop) - abort.signal.removeEventListener("abort", stop) - }), - ), - ) - return - }) - - const selectSubagent = Effect.fn("RunStreamTransport.selectSubagent")((sessionID: string | undefined) => - Effect.sync(() => { - if (closed) { - return - } - - const next = sessionID && state.subagent.tabs.has(sessionID) ? sessionID : undefined - if (state.selectedSubagent === next) { - return - } - - state.selectedSubagent = next - syncFooter([], undefined, currentSubagentState()) - }), + ), ) - - const close = Effect.fn("RunStreamTransport.close")(function* () { - yield* closeScope() - }) - - return Service.of({ - runPromptTurn, - selectSubagent, - close, - }) }), - ), + Effect.catch((error) => { + if (state.wait === item) { + state.wait = undefined + } + + const canceled = turn.signal.aborted || next.signal?.aborted === true || input.footer.isClosed || closed + if (canceled) { + flush("turn.cancel") + return Effect.void + } + + if (error === state.fault) { + return Effect.fail(error) + } + + input.trace?.write("send.prompt.error", { + sessionID: input.sessionID, + error: formatUnknownError(error), + }) + return Effect.fail(error) + }), + Effect.ensuring( + Effect.sync(() => { + input.trace?.write("turn.end", { + sessionID: input.sessionID, + }) + next.signal?.removeEventListener("abort", stop) + abort.signal.removeEventListener("abort", stop) + }), + ), + ) + return + }) + + const selectSubagent = Effect.fn("RunStreamTransport.selectSubagent")((sessionID: string | undefined) => + Effect.sync(() => { + if (closed) { + return + } + + const next = sessionID && state.subagent.tabs.has(sessionID) ? sessionID : undefined + if (state.selectedSubagent === next) { + return + } + + state.selectedSubagent = next + syncFooter([], undefined, currentSubagentState()) + }), ) -} + + const close = Effect.fn("RunStreamTransport.close")(function* () { + yield* closeScope() + }) + + const handle: TransportHandle = { + runPromptTurn, + selectSubagent, + close, + } + return handle +}) + +export const layer = Layer.succeed(Service, Service.of({ make })) + +export const defaultLayer = layer // Opens an SDK event subscription and returns a SessionTransport. // @@ -1052,13 +1055,23 @@ function createLayer(input: StreamInput) { // // The transport is single-turn: only one runPromptTurn() call can be active // at a time. The prompt queue enforces this from above. +// +// Promise-shaped wrapper for Promise-based callers (CLI runtime, tests). New +// Effect-style callers should yield `SessionTransport.Service` and call +// `svc.make(input)` directly inside their own scope. export async function createSessionTransport(input: StreamInput): Promise { - const runtime = makeRuntime(Service, createLayer(input)) - await runtime.runPromise(() => Effect.void) + const handle = await AppRuntime.runPromise( + Effect.gen(function* () { + const svc = yield* Service + return yield* svc.make(input) + }).pipe(Effect.provide(layer)), + ) return { - runPromptTurn: (next) => runtime.runPromise((svc) => svc.runPromptTurn(next)), - selectSubagent: (sessionID) => runtime.runSync((svc) => svc.selectSubagent(sessionID)), - close: () => runtime.runPromise((svc) => svc.close()), + runPromptTurn: (next) => AppRuntime.runPromise(handle.runPromptTurn(next)), + selectSubagent: (sessionID) => AppRuntime.runSync(handle.selectSubagent(sessionID)), + close: () => AppRuntime.runPromise(handle.close()), } } + +export * as SessionTransport from "./stream.transport"