fix(httpapi): provide instance context to event stream subscription

The /event SSE handler called bus.subscribeAll() which evaluates its
inner Effect (InstanceState.get → InstanceRef/Instance.current lookup)
inside the body-stream consumer fiber. That fiber does not carry the
request handler's ALS/Effect context, so the lookup failed and the
stream halted right after server.connected — no message deltas,
permission asks, or heartbeats could reach clients.

Capture InstanceState.context and InstanceState.workspaceID at handler
time and provide them to the subscription stream via Stream.provideService.

Fixes #27391.

Co-authored-by: James Long <longster@gmail.com>
This commit is contained in:
Aiden Cline
2026-05-13 20:16:20 -05:00
parent 3fc7486d15
commit 89c51a86bd
2 changed files with 42 additions and 3 deletions

View File

@@ -1,4 +1,8 @@
import { Bus } from "@/bus"
import type { WorkspaceID } from "@/control-plane/schema"
import { InstanceRef, WorkspaceRef } from "@/effect/instance-ref"
import { InstanceState } from "@/effect/instance-state"
import type { InstanceContext } from "@/project/instance"
import * as Log from "@opencode-ai/core/util/log"
import { Effect, Schema } from "effect"
import * as Stream from "effect/Stream"
@@ -39,8 +43,12 @@ function eventData(data: unknown): Sse.Event {
}
}
function eventResponse(bus: Bus.Interface) {
const events = bus.subscribeAll().pipe(Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type))
function eventResponse(bus: Bus.Interface, refs: { instance: InstanceContext; workspace?: WorkspaceID }) {
const events = bus.subscribeAll().pipe(
Stream.provideService(InstanceRef, refs.instance),
Stream.provideService(WorkspaceRef, refs.workspace),
Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type),
)
const heartbeat = Stream.tick("10 seconds").pipe(
Stream.drop(1),
Stream.map(() => ({ id: Bus.createID(), type: "server.heartbeat", properties: {} })),
@@ -72,7 +80,10 @@ export const eventHandlers = HttpApiBuilder.group(EventApi, "event", (handlers)
return handlers.handleRaw(
"subscribe",
Effect.fn("EventHttpApi.subscribe")(function* () {
return eventResponse(bus)
return eventResponse(bus, {
instance: yield* InstanceState.context,
workspace: yield* InstanceState.workspaceID,
})
}),
)
}),

View File

@@ -31,6 +31,18 @@ async function readFirstEvent(response: Response) {
}
}
async function readEvent(reader: ReadableStreamDefaultReader<Uint8Array>) {
const result = await Promise.race([
reader.read(),
new Promise<never>((_, reject) => setTimeout(() => reject(new Error("timed out waiting for event")), 5_000)),
])
return JSON.parse(new TextDecoder().decode(result.value).replace(/^data: /, "")) as {
id?: string
type: string
properties: Record<string, unknown>
}
}
afterEach(async () => {
await disposeAllInstances()
await resetDatabase()
@@ -56,4 +68,20 @@ describe("event HttpApi", () => {
expect(await readFirstEvent(response)).toMatchObject({ type: "server.connected", properties: {} })
})
test("keeps the event stream open after the initial event", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
if (!response.body) throw new Error("missing response body")
const reader = response.body.getReader()
expect(await readEvent(reader)).toMatchObject({ type: "server.connected", properties: {} })
const next = await Promise.race([
reader.read().then((result) => (result.done ? "closed" : "event")),
new Promise<"open">((resolve) => setTimeout(() => resolve("open"), 250)),
])
await reader.cancel()
expect(next).toBe("open")
})
})