From cb354932422caf220aa5dfcf487c9335196b4e82 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 18 May 2026 11:38:05 -0400 Subject: [PATCH] fix(bus): acquire PubSub subscription eagerly to close /event race (#27959) --- packages/opencode/src/bus/index.ts | 50 +- packages/opencode/src/plugin/index.ts | 2 +- packages/opencode/src/project/project.ts | 2 +- packages/opencode/src/project/vcs.ts | 2 +- .../routes/instance/httpapi/handlers/event.ts | 9 +- packages/opencode/src/share/share-next.ts | 18 +- packages/opencode/test/bus/bus-effect.test.ts | 98 +++- .../server/httpapi-event-diagnostics.test.ts | 453 ++++++++++++++++++ .../opencode/test/server/httpapi-sdk.test.ts | 68 ++- 9 files changed, 659 insertions(+), 43 deletions(-) create mode 100644 packages/opencode/test/server/httpapi-event-diagnostics.test.ts diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 5c4685f5ec..3dfec40135 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -37,8 +37,16 @@ export interface Interface { properties: BusProperties, options?: { id?: string }, ) => Effect.Effect - readonly subscribe: (def: D) => Stream.Stream> - readonly subscribeAll: () => Stream.Stream + // subscribe / subscribeAll are eager: the underlying PubSub subscription is + // acquired in the caller's Scope at `yield*` time. Any publish after the + // yield is delivered, even if stream consumption starts later. The previous + // Stream-returning shape acquired the subscription lazily on first pull, + // opening a race window during which publishes were lost — see + // test/bus/bus-effect.test.ts RACE tests. + readonly subscribe: ( + def: D, + ) => Effect.Effect>, never, Scope.Scope> + readonly subscribeAll: () => Effect.Effect, never, Scope.Scope> readonly subscribeCallback: ( def: D, callback: (event: Payload) => unknown, @@ -109,26 +117,26 @@ export const layer = Layer.effect( }) } - function subscribe(def: D): Stream.Stream> { - log.info("subscribing", { type: def.type }) - return Stream.unwrap( - Effect.gen(function* () { - const s = yield* InstanceState.get(state) - const ps = yield* getOrCreate(s, def) - return Stream.fromPubSub(ps) - }), - ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type })))) - } + const subscribe = ( + def: D, + ): Effect.Effect>, never, Scope.Scope> => + Effect.gen(function* () { + log.info("subscribing", { type: def.type }) + const s = yield* InstanceState.get(state) + const ps = yield* getOrCreate(s, def) + const subscription = yield* PubSub.subscribe(ps) + yield* Effect.addFinalizer(() => Effect.sync(() => log.info("unsubscribing", { type: def.type }))) + return Stream.fromSubscription(subscription) + }) - function subscribeAll(): Stream.Stream { - log.info("subscribing", { type: "*" }) - return Stream.unwrap( - Effect.gen(function* () { - const s = yield* InstanceState.get(state) - return Stream.fromPubSub(s.wildcard) - }), - ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" })))) - } + const subscribeAll = (): Effect.Effect, never, Scope.Scope> => + Effect.gen(function* () { + log.info("subscribing", { type: "*" }) + const s = yield* InstanceState.get(state) + const subscription = yield* PubSub.subscribe(s.wildcard) + yield* Effect.addFinalizer(() => Effect.sync(() => log.info("unsubscribing", { type: "*" }))) + return Stream.fromSubscription(subscription) + }) function on(pubsub: PubSub.PubSub, type: string, callback: (event: T) => unknown) { return Effect.gen(function* () { diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index e87f6db238..a9a5067677 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -243,7 +243,7 @@ export const layer = Layer.effect( } // Subscribe to bus events, fiber interrupted when scope closes - yield* bus.subscribeAll().pipe( + yield* (yield* bus.subscribeAll()).pipe( Stream.runForEach((input) => Effect.sync(() => { for (const hook of hooks) { diff --git a/packages/opencode/src/project/project.ts b/packages/opencode/src/project/project.ts index 399412cdce..5107fde3e4 100644 --- a/packages/opencode/src/project/project.ts +++ b/packages/opencode/src/project/project.ts @@ -425,7 +425,7 @@ export const layer: Layer.Layer< const initState = yield* InstanceState.make( Effect.fn("Project.initState")(function* (ctx) { - yield* bus.subscribe(Command.Event.Executed).pipe( + yield* (yield* bus.subscribe(Command.Event.Executed)).pipe( Stream.runForEach((payload) => payload.properties.name === Command.Default.INIT ? setInitialized(ctx.project.id) : Effect.void, ), diff --git a/packages/opencode/src/project/vcs.ts b/packages/opencode/src/project/vcs.ts index 5a477e02b3..a454cddbbb 100644 --- a/packages/opencode/src/project/vcs.ts +++ b/packages/opencode/src/project/vcs.ts @@ -298,7 +298,7 @@ export const layer: Layer.Layer = Lay const value = { current, root } log.info("initialized", { branch: value.current, default_branch: value.root?.name }) - yield* bus.subscribe(FileWatcher.Event.Updated).pipe( + yield* (yield* bus.subscribe(FileWatcher.Event.Updated)).pipe( Stream.filter((evt) => evt.properties.file.endsWith("HEAD")), Stream.runForEach((_evt) => Effect.gen(function* () { diff --git a/packages/opencode/src/server/routes/instance/httpapi/handlers/event.ts b/packages/opencode/src/server/routes/instance/httpapi/handlers/event.ts index c0bcbc82c0..e770a7cfba 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/handlers/event.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/handlers/event.ts @@ -20,10 +20,11 @@ function eventData(data: unknown): Sse.Event { function eventResponse(bus: Bus.Interface) { return Effect.gen(function* () { - const context = yield* Effect.context() - - const events = bus.subscribeAll().pipe( - Stream.provideContext(context), + // Subscribe eagerly: the bus subscription is acquired in the request scope + // at this yield, so any publish from now on is queued for the body-pump + // fiber to drain — closing the race where Stream.concat(server.connected, + // lazy-subscribe) used to drop publishes in the prefix-consume window. + const events = (yield* bus.subscribeAll()).pipe( Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type), ) const heartbeat = Stream.tick("10 seconds").pipe( diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index a386211dc6..f5e0e654a9 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -168,16 +168,20 @@ export const layer = Layer.effect( fn: (evt: { properties: any }) => Effect.Effect, ) => bus.subscribe(def as never).pipe( - Stream.runForEach((evt) => - fn(evt).pipe( - Effect.catchCause((cause) => - Effect.sync(() => { - log.error("share subscriber failed", { type: def.type, cause }) - }), + Effect.flatMap((stream) => + stream.pipe( + Stream.runForEach((evt) => + fn(evt).pipe( + Effect.catchCause((cause) => + Effect.sync(() => { + log.error("share subscriber failed", { type: def.type, cause }) + }), + ), + ), ), + Effect.forkScoped, ), ), - Effect.forkScoped, ) yield* watch(Session.Event.Updated, (evt) => diff --git a/packages/opencode/test/bus/bus-effect.test.ts b/packages/opencode/test/bus/bus-effect.test.ts index 0dc58ecdae..dfe653dd10 100644 --- a/packages/opencode/test/bus/bus-effect.test.ts +++ b/packages/opencode/test/bus/bus-effect.test.ts @@ -44,7 +44,7 @@ describe("Bus (Effect-native)", () => { const done = yield* Deferred.make() const ready = yield* Latch.make() - yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => + yield* Stream.runForEach(yield* bus.subscribe(TestEvent.Ping), (evt) => Effect.gen(function* () { if (evt.properties.value < 0) { yield* ready.open @@ -71,7 +71,7 @@ describe("Bus (Effect-native)", () => { const done = yield* Deferred.make() const ready = yield* Latch.make() - yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => + yield* Stream.runForEach(yield* bus.subscribe(TestEvent.Ping), (evt) => Effect.gen(function* () { if (evt.properties.value < 0) { yield* ready.open @@ -98,7 +98,7 @@ describe("Bus (Effect-native)", () => { const done = yield* Deferred.make() const ready = yield* Latch.make() - yield* Stream.runForEach(bus.subscribeAll(), (evt) => + yield* Stream.runForEach(yield* bus.subscribeAll(), (evt) => Effect.gen(function* () { if (evt.type === TestEvent.Warmup.type) { yield* ready.open @@ -129,7 +129,7 @@ describe("Bus (Effect-native)", () => { const readyA = yield* Latch.make() const readyB = yield* Latch.make() - yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => + yield* Stream.runForEach(yield* bus.subscribe(TestEvent.Ping), (evt) => Effect.gen(function* () { if (evt.properties.value < 0) { yield* readyA.open @@ -140,7 +140,7 @@ describe("Bus (Effect-native)", () => { }), ).pipe(Effect.forkScoped) - yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) => + yield* Stream.runForEach(yield* bus.subscribe(TestEvent.Ping), (evt) => Effect.gen(function* () { if (evt.properties.value < 0) { yield* readyB.open @@ -162,6 +162,92 @@ describe("Bus (Effect-native)", () => { }), ) + // RACE 1: eager subscription means publishing immediately after yield* + // bus.subscribe is delivered. Regression for the old lazy `Stream.unwrap` + // shape where PubSub.subscribe ran on first pull and missed any publish + // in the hand-off window. + it.instance("eager subscribe: publish after yield* is delivered without consumer-activation race", () => + Effect.gen(function* () { + const bus = yield* Bus.Service + const stream = yield* bus.subscribe(TestEvent.Ping) + + // Hand-off window: subscription is alive (we yielded). Publish goes + // straight into the subscription queue, even with no consumer running. + yield* bus.publish(TestEvent.Ping, { value: 99 }) + + const collected = yield* stream.pipe( + Stream.take(1), + Stream.runCollect, + Effect.timeout("400 millis"), + Effect.option, + ) + + expect(collected._tag).toBe("Some") + if (collected._tag === "Some") { + const arr = Array.from(collected.value) + expect(arr[0].properties.value).toBe(99) + } + }), + ) + + // RACE 2: same property for subscribeAll. + it.instance("eager subscribeAll: publish after yield* is delivered", () => + Effect.gen(function* () { + const bus = yield* Bus.Service + const stream = yield* bus.subscribeAll() + + yield* bus.publish(TestEvent.Ping, { value: 42 }) + + const collected = yield* stream.pipe( + Stream.take(1), + Stream.runCollect, + Effect.timeout("400 millis"), + Effect.option, + ) + + expect(collected._tag).toBe("Some") + if (collected._tag === "Some") { + const arr = Array.from(collected.value) + expect(arr[0].type).toBe(TestEvent.Ping.type) + } + }), + ) + + // RACE 3: the /event-handler shape exactly. With eager subscription, the + // bus subscription is alive before Stream.concat ever starts. Publishes + // during the prefix consumption window are queued and delivered. + it.instance("eager subscribe: Stream.concat(initial, subscribe) delivers publish during prefix", () => + Effect.gen(function* () { + const bus = yield* Bus.Service + const sawInitial = yield* Deferred.make() + const sawPublish = yield* Deferred.make() + + type Frame = { marker?: "initial"; value?: number } + const subscriptionStream = yield* bus.subscribe(TestEvent.Ping) + const handlerStream: Stream.Stream = Stream.make({ marker: "initial" } as Frame).pipe( + Stream.concat(subscriptionStream.pipe(Stream.map((evt): Frame => ({ value: evt.properties.value })))), + ) + + yield* Stream.runForEach(handlerStream, (frame) => + Effect.gen(function* () { + if (frame.marker === "initial") { + Deferred.doneUnsafe(sawInitial, Effect.void) + return + } + if (frame.value !== undefined) Deferred.doneUnsafe(sawPublish, Effect.succeed(frame.value)) + }), + ).pipe(Effect.forkScoped) + + yield* Deferred.await(sawInitial).pipe(Effect.timeout("1 second")) + + yield* bus.publish(TestEvent.Ping, { value: 7 }) + + const got = yield* Deferred.await(sawPublish).pipe(Effect.timeout("1 second"), Effect.option) + expect(got._tag).toBe("Some") + if (got._tag === "Some") expect(got.value).toBe(7) + }), + ) + it.live("subscribeAll stream sees InstanceDisposed on disposal", () => Effect.gen(function* () { const dir = yield* tmpdirScoped() @@ -174,7 +260,7 @@ describe("Bus (Effect-native)", () => { yield* Effect.gen(function* () { const bus = yield* Bus.Service - yield* Stream.runForEach(bus.subscribeAll(), (evt) => + yield* Stream.runForEach(yield* bus.subscribeAll(), (evt) => Effect.gen(function* () { if (evt.type === TestEvent.Warmup.type) { yield* ready.open diff --git a/packages/opencode/test/server/httpapi-event-diagnostics.test.ts b/packages/opencode/test/server/httpapi-event-diagnostics.test.ts new file mode 100644 index 0000000000..5967583993 --- /dev/null +++ b/packages/opencode/test/server/httpapi-event-diagnostics.test.ts @@ -0,0 +1,453 @@ +// Diagnostic suite for /event SSE delivery. +// +// Each test isolates ONE variable in the publisher chain while keeping the +// subscriber path constant (raw `app().request` reading the SSE body — no SDK +// consumer involvement). The pass/fail pattern across tests tells us where the +// bug lives: +// +// D1 (baseline): publish via Bus.Service.use via AppRuntime — mirror of the +// existing httpapi-event.test.ts test 3. Confirms /event SSE delivery +// works for a SOME publish path. +// +// D2: publish N times in quick succession via Bus.Service.use. If the bus +// subscription is acquired correctly there should be no message loss. +// +// D3: publish via SyncEvent.use.run via AppRuntime — exercises the same path +// the HTTP handlers use (Session.updatePart → sync.run → bus.publish) +// without the HTTP roundtrip. Tells us whether the sync path itself can +// deliver in-process. +// +// D4: publish via SyncEvent.use.run from a fresh `Effect.provide` scope +// (mimicking what happens if a handler's layer was scoped per-request). +// +// D5: in-process Bus.Service callback subscriber AND raw /event SSE subscriber +// receive the same publish. If both receive: no bug. If only the +// callback receives: the /event handler has an acquisition race. +import { afterEach, describe, expect, test } from "bun:test" +import { Bus } from "../../src/bus" +import { AppRuntime } from "../../src/effect/app-runtime" +import { InstanceRef } from "../../src/effect/instance-ref" +import { Server } from "../../src/server/server" +import { EventPaths } from "../../src/server/routes/instance/httpapi/groups/event" +import { Event as ServerEvent } from "../../src/server/event" +import { SyncEvent } from "../../src/sync" +import { MessageV2 } from "../../src/session/message-v2" +import { MessageID, PartID, SessionID } from "../../src/session/schema" +import * as Log from "@opencode-ai/core/util/log" +import { Effect, Schema } from "effect" +import { resetDatabase } from "../fixture/db" +import { disposeAllInstances, reloadTestInstance, tmpdir } from "../fixture/fixture" + +void Log.init({ print: false }) + +function app() { + return Server.Default().app +} + +const EventData = Schema.Struct({ + id: Schema.optional(Schema.String), + type: Schema.String, + properties: Schema.Record(Schema.String, Schema.Any), +}) + +type SseEvent = Schema.Schema.Type + +async function readChunk(reader: ReadableStreamDefaultReader, timeoutMs = 3_000) { + let timeout: ReturnType | undefined + try { + return await Promise.race([ + reader.read(), + new Promise((_, reject) => { + timeout = setTimeout(() => reject(new Error(`timed out after ${timeoutMs}ms`)), timeoutMs) + }), + ]) + } finally { + if (timeout) clearTimeout(timeout) + } +} + +const textDecoder = new TextDecoder() + +function decodeFrame(value: Uint8Array): SseEvent[] { + // SSE frames are separated by blank lines and each starts with "data: ". + // For our happy-path tests one chunk == one frame, but be defensive. + const text = textDecoder.decode(value) + return text + .split(/\n\n+/) + .map((part) => part.trim()) + .filter((part) => part.length > 0) + .map((part) => { + const payload = part.replace(/^data: /, "") + return Schema.decodeUnknownSync(EventData)(JSON.parse(payload)) + }) +} + +async function readNextEvent(reader: ReadableStreamDefaultReader, timeoutMs = 3_000): Promise { + const result = await readChunk(reader, timeoutMs) + if (result.done || !result.value) throw new Error("event stream closed") + const frames = decodeFrame(result.value) + if (frames.length === 0) throw new Error("empty SSE frame") + return frames[0] +} + +async function collectUntil( + reader: ReadableStreamDefaultReader, + predicate: (event: SseEvent) => boolean, + timeoutMs = 3_000, +): Promise { + const events: SseEvent[] = [] + const deadline = Date.now() + timeoutMs + while (Date.now() < deadline) { + const remaining = deadline - Date.now() + const result = await readChunk(reader, remaining).catch((cause) => { + throw new Error(`collectUntil timed out after ${events.length} events: ${cause}`) + }) + if (result.done || !result.value) throw new Error("event stream closed mid-collect") + for (const event of decodeFrame(result.value)) { + events.push(event) + if (predicate(event)) return events + } + } + throw new Error(`collectUntil deadline exceeded; collected ${events.length}: ${JSON.stringify(events)}`) +} + +afterEach(async () => { + await disposeAllInstances() + await resetDatabase() +}) + +describe("/event SSE delivery diagnostics", () => { + // Sanity: baseline same as httpapi-event.test.ts test 3 (already known to pass) + // but explicit about timing — publish happens with NO wait after reading + // server.connected. If this fails we have a deeper problem than just sync. + test("D1: delivers a single bus event published right after server.connected", async () => { + await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) + const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) + if (!response.body) throw new Error("missing response body") + const reader = response.body.getReader() + try { + const first = await readNextEvent(reader) + expect(first.type).toBe("server.connected") + + const ctx = await reloadTestInstance({ directory: tmp.path }) + // NO wait — publish immediately + await AppRuntime.runPromise( + Bus.Service.use((svc) => svc.publish(ServerEvent.Connected, {})).pipe(Effect.provideService(InstanceRef, ctx)), + ) + + const next = await readNextEvent(reader) + expect(next.type).toBe("server.connected") // ServerEvent.Connected.type === "server.connected" + } finally { + await reader.cancel() + } + }) + + // If D1 passes but D2 fails, we have a queue-drain or partial-loss issue. + test("D2: delivers all N bus events published in rapid succession", async () => { + await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) + const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) + if (!response.body) throw new Error("missing response body") + const reader = response.body.getReader() + try { + const first = await readNextEvent(reader) + expect(first.type).toBe("server.connected") + + const ctx = await reloadTestInstance({ directory: tmp.path }) + const N = 5 + for (let i = 0; i < N; i++) { + await AppRuntime.runPromise( + Bus.Service.use((svc) => svc.publish(ServerEvent.Connected, {})).pipe( + Effect.provideService(InstanceRef, ctx), + ), + ) + } + + const received: SseEvent[] = [] + for (let i = 0; i < N; i++) { + received.push(await readNextEvent(reader)) + } + expect(received).toHaveLength(N) + for (const event of received) expect(event.type).toBe("server.connected") + } finally { + await reader.cancel() + } + }) + + // The critical test. If D1 passes but this fails, the bus-identity fix is + // incomplete OR the sync.run publish path doesn't reach the same bus + // /event subscribes to, even within the same AppRuntime. + test("D3: delivers a SyncEvent published via SyncEvent.use.run after server.connected", async () => { + await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) + const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) + if (!response.body) throw new Error("missing response body") + const reader = response.body.getReader() + try { + const first = await readNextEvent(reader) + expect(first.type).toBe("server.connected") + + const ctx = await reloadTestInstance({ directory: tmp.path }) + const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`) + const messageID = MessageID.ascending() + const partID = PartID.ascending() + const part: MessageV2.Part = { + id: partID, + sessionID, + messageID, + type: "text", + text: "diag", + } + + await AppRuntime.runPromise( + SyncEvent.use + .run(MessageV2.Event.PartUpdated, { + sessionID, + part: structuredClone(part) as MessageV2.Part, + time: Date.now(), + }) + .pipe(Effect.provideService(InstanceRef, ctx)), + ) + + const collected = await collectUntil( + reader, + (event) => event.type === MessageV2.Event.PartUpdated.type, + 4_000, + ) + const updated = collected.find((event) => event.type === MessageV2.Event.PartUpdated.type) + expect(updated).toBeDefined() + expect((updated as any).properties.part.id).toBe(partID) + } finally { + await reader.cancel() + } + }) + + // If D3 passes but D5 (the SDK E2E in httpapi-sdk.test.ts) fails, then the + // bug is specifically in the cross-request / cross-fiber HTTP path, not in + // the publish itself. If D3 also fails, the publish chain is broken. + // + // D4: ensure the publish reaches an in-process Bus subscriber too. Confirms + // pub/sub identity end-to-end without involving /event SSE. + test("D4: SyncEvent.use.run publish reaches an in-process Bus.Service.use callback", async () => { + await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) + const ctx = await reloadTestInstance({ directory: tmp.path }) + + let resolveReceived: (event: { id: string; type: string; properties: unknown }) => void + const received = new Promise<{ id: string; type: string; properties: unknown }>( + (resolve) => (resolveReceived = resolve as typeof resolveReceived), + ) + + const dispose = await AppRuntime.runPromise( + Bus.Service.use((svc) => + svc.subscribeAllCallback((event) => { + if (event.type === MessageV2.Event.PartUpdated.type) resolveReceived(event) + }), + ).pipe(Effect.provideService(InstanceRef, ctx)), + ) + + try { + const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`) + const messageID = MessageID.ascending() + const partID = PartID.ascending() + const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d4" } + + await AppRuntime.runPromise( + SyncEvent.use + .run(MessageV2.Event.PartUpdated, { + sessionID, + part: structuredClone(part) as MessageV2.Part, + time: Date.now(), + }) + .pipe(Effect.provideService(InstanceRef, ctx)), + ) + + const event = await Promise.race([ + received, + new Promise((_, reject) => setTimeout(() => reject(new Error("D4 timed out")), 3_000)), + ]) + expect(event.type).toBe(MessageV2.Event.PartUpdated.type) + expect((event.properties as any).part.id).toBe(partID) + } finally { + dispose() + } + }) + + // D5: BOTH subscribers attached simultaneously. Trigger ONE publish via + // SyncEvent.use.run. Both subscribers should receive it. If only one does + // we know exactly which side of the chain is failing. + test("D5: same SyncEvent.use.run publish reaches BOTH /event SSE and in-process callback", async () => { + await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) + const ctx = await reloadTestInstance({ directory: tmp.path }) + + // In-process callback subscriber + let resolveCallback: (event: { type: string; properties: unknown }) => void + const callbackReceived = new Promise<{ type: string; properties: unknown }>( + (resolve) => (resolveCallback = resolve as typeof resolveCallback), + ) + const dispose = await AppRuntime.runPromise( + Bus.Service.use((svc) => + svc.subscribeAllCallback((event) => { + if (event.type === MessageV2.Event.PartUpdated.type) resolveCallback(event) + }), + ).pipe(Effect.provideService(InstanceRef, ctx)), + ) + + // SSE subscriber via raw HTTP + const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) + if (!response.body) throw new Error("missing response body") + const reader = response.body.getReader() + + try { + const first = await readNextEvent(reader) + expect(first.type).toBe("server.connected") + + const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`) + const messageID = MessageID.ascending() + const partID = PartID.ascending() + const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d5" } + + await AppRuntime.runPromise( + SyncEvent.use + .run(MessageV2.Event.PartUpdated, { + sessionID, + part: structuredClone(part) as MessageV2.Part, + time: Date.now(), + }) + .pipe(Effect.provideService(InstanceRef, ctx)), + ) + + const sseCollected = await collectUntil( + reader, + (event) => event.type === MessageV2.Event.PartUpdated.type, + 4_000, + ).catch((err) => err as Error) + const callbackResult = await Promise.race([ + callbackReceived, + new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 1_000)), + ]) + + const sseSaw = + Array.isArray(sseCollected) && + sseCollected.some((event) => event.type === MessageV2.Event.PartUpdated.type) + const callbackSaw = callbackResult !== "timeout" + + // Both should see it. The reason we use a single assert with the boolean + // pair is so the test failure message tells us exactly which side broke. + expect({ sseSaw, callbackSaw }).toEqual({ sseSaw: true, callbackSaw: true }) + } finally { + await reader.cancel() + dispose() + } + }) + + // D7: like D5 but the "second subscriber" is a NO-OP AppRuntime.runPromise + // call (no PubSub.subscribe). If D7 passes, the specific subscribeAllCallback + // is what breaks SSE — not arbitrary AppRuntime usage. If D7 fails, anything + // running through AppRuntime concurrently with /event SSE breaks delivery. + test("D7: SSE receives sync.run publish even with concurrent no-op AppRuntime activity", async () => { + await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) + const ctx = await reloadTestInstance({ directory: tmp.path }) + + // No-op: just touches the runtime, no bus interaction + await AppRuntime.runPromise(Effect.void) + + const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) + if (!response.body) throw new Error("missing response body") + const reader = response.body.getReader() + try { + const first = await readNextEvent(reader) + expect(first.type).toBe("server.connected") + + const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`) + const messageID = MessageID.ascending() + const partID = PartID.ascending() + const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d7" } + + await AppRuntime.runPromise( + SyncEvent.use + .run(MessageV2.Event.PartUpdated, { + sessionID, + part: structuredClone(part) as MessageV2.Part, + time: Date.now(), + }) + .pipe(Effect.provideService(InstanceRef, ctx)), + ) + + const collected = await collectUntil( + reader, + (event) => event.type === MessageV2.Event.PartUpdated.type, + 4_000, + ) + const updated = collected.find((event) => event.type === MessageV2.Event.PartUpdated.type) + expect(updated).toBeDefined() + } finally { + await reader.cancel() + } + }) + + // D6: same as D5 but the callback subscriber is attached AFTER /event SSE + // subscription is established. If D5 fails and D6 passes, the order of + // subscriber setup is the determining factor. + test("D6: /event SSE receives sync.run publish when callback is attached AFTER /event opens", async () => { + await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) + const ctx = await reloadTestInstance({ directory: tmp.path }) + + // Open SSE FIRST + const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) + if (!response.body) throw new Error("missing response body") + const reader = response.body.getReader() + + try { + const first = await readNextEvent(reader) + expect(first.type).toBe("server.connected") + + // THEN attach callback subscriber + let resolveCallback: (event: { type: string; properties: unknown }) => void + const callbackReceived = new Promise<{ type: string; properties: unknown }>( + (resolve) => (resolveCallback = resolve as typeof resolveCallback), + ) + const dispose = await AppRuntime.runPromise( + Bus.Service.use((svc) => + svc.subscribeAllCallback((event) => { + if (event.type === MessageV2.Event.PartUpdated.type) resolveCallback(event) + }), + ).pipe(Effect.provideService(InstanceRef, ctx)), + ) + + try { + const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`) + const messageID = MessageID.ascending() + const partID = PartID.ascending() + const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d6" } + + await AppRuntime.runPromise( + SyncEvent.use + .run(MessageV2.Event.PartUpdated, { + sessionID, + part: structuredClone(part) as MessageV2.Part, + time: Date.now(), + }) + .pipe(Effect.provideService(InstanceRef, ctx)), + ) + + const sseCollected = await collectUntil( + reader, + (event) => event.type === MessageV2.Event.PartUpdated.type, + 4_000, + ).catch((err) => err as Error) + const callbackResult = await Promise.race([ + callbackReceived, + new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 1_000)), + ]) + + const sseSaw = + Array.isArray(sseCollected) && + sseCollected.some((event) => event.type === MessageV2.Event.PartUpdated.type) + const callbackSaw = callbackResult !== "timeout" + expect({ sseSaw, callbackSaw }).toEqual({ sseSaw: true, callbackSaw: true }) + } finally { + dispose() + } + } finally { + await reader.cancel() + } + }) +}) diff --git a/packages/opencode/test/server/httpapi-sdk.test.ts b/packages/opencode/test/server/httpapi-sdk.test.ts index 4a11be61ec..9e6533a99c 100644 --- a/packages/opencode/test/server/httpapi-sdk.test.ts +++ b/packages/opencode/test/server/httpapi-sdk.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, expect } from "bun:test" -import { ConfigProvider, Effect, Layer } from "effect" +import { ConfigProvider, Deferred, Effect, Layer } from "effect" import type * as Scope from "effect/Scope" import { HttpRouter } from "effect/unstable/http" import { ChildProcessSpawner } from "effect/unstable/process" @@ -22,7 +22,7 @@ import { TestLLMServer } from "../lib/llm-server" import path from "path" import { resetDatabase } from "../fixture/db" import { disposeAllInstances, TestInstance, tmpdirScoped } from "../fixture/fixture" -import { testEffect } from "../lib/effect" +import { awaitWithTimeout, testEffect } from "../lib/effect" const noopBootstrap = Layer.succeed(InstanceBootstrap.Service, InstanceBootstrap.Service.of({ run: Effect.void })) const it = testEffect( @@ -671,6 +671,70 @@ describe("HttpApi SDK", () => { ), ) + // Regression: SyncEvent must publish on the same ProjectBus the /event handler +// subscribes to, AND the /event stream must forward handler ALS/context into the +// body-pump fiber. Drives the full SDK → /event → Session.updatePart → sync.run → +// bus.publish → SDK subscriber path. Goes red if either the publisher uses a +// different bus instance (Bug 2 / pre-#27825) or the stream loses context (Bug 1 / +// pre-#27425). + serverPathParity("streams sync-backed part updates to /event subscribers", (serverPath) => + withStandardProject(serverPath, ({ sdk, directory }) => + Effect.gen(function* () { + const session = yield* capture(() => sdk.session.create({ title: "sync-backed part event" })) + const sessionID = String(record(session.data).id) + const seeded = yield* seedMessage(directory, sessionID) + + const controller = new AbortController() + yield* Effect.addFinalizer(() => Effect.sync(() => controller.abort())) + const events = yield* call(() => sdk.event.subscribe(undefined, { signal: controller.signal })) + yield* Effect.addFinalizer(() => + call(async () => void (await events.stream.return?.(undefined))).pipe(Effect.ignore), + ) + + const ready = yield* Deferred.make() + const received = yield* Deferred.make() + + yield* call(async () => { + for await (const event of events.stream) { + const payload = record(event).payload ?? event + const type = record(payload).type + if (type === "server.connected") { + Deferred.doneUnsafe(ready, Effect.void) + continue + } + if (type === MessageV2.Event.PartUpdated.type) { + Deferred.doneUnsafe(received, Effect.succeed(payload)) + return + } + } + }).pipe(Effect.forkScoped) + + yield* awaitWithTimeout(Deferred.await(ready), "timed out waiting for /event server.connected", "2 seconds") + + const updated = yield* capture(() => + sdk.part.update({ + sessionID, + messageID: seeded.message.id, + partID: seeded.part.id, + part: { ...seeded.part, text: "updated via sync" } as NonNullable< + Parameters[0]["part"] + >, + }), + ) + expect(updated.status).toBe(200) + + const event = yield* awaitWithTimeout( + Deferred.await(received), + "timed out waiting for message.part.updated bus payload over /event", + "5 seconds", + ) + const properties = record(record(event).properties) + expect(record(properties.part)).toMatchObject({ id: seeded.part.id, type: "text" }) + return { type: record(event).type, partType: record(properties.part).type } + }), + ), + ) + serverPathParity("matches generated SDK prompt no-reply routes", (serverPath) => withStandardProject(serverPath, ({ sdk }) => Effect.gen(function* () {