From ddf18a7f9cc46fff060bce067e401be80ee5dd30 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Wed, 20 May 2026 22:55:54 -0400 Subject: [PATCH] test(server): port event SSE tests to it.instance + testEffectShared (#28569) --- packages/opencode/src/bus/index.ts | 3 + packages/opencode/test/lib/effect.ts | 34 +++- .../server/httpapi-event-diagnostics.test.ts | 97 ++++------ .../test/server/httpapi-event.test.ts | 166 ++++++++---------- 4 files changed, 139 insertions(+), 161 deletions(-) diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 3dfec40135..b5f4320bda 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -5,6 +5,7 @@ import { BusEvent } from "./bus-event" import { GlobalBus } from "./global" import { InstanceState } from "@/effect/instance-state" import { makeRuntime } from "@/effect/run-service" +import { serviceUse } from "@/effect/service-use" import { Identifier } from "@/id/id" import type { InstanceContext } from "@/project/instance-context" import { InstanceRef } from "@/effect/instance-ref" @@ -56,6 +57,8 @@ export interface Interface { export class Service extends Context.Service()("@opencode/Bus") {} +export const use = serviceUse(Service) + export const layer = Layer.effect( Service, Effect.gen(function* () { diff --git a/packages/opencode/test/lib/effect.ts b/packages/opencode/test/lib/effect.ts index 6ad2838fca..f1fa5b9624 100644 --- a/packages/opencode/test/lib/effect.ts +++ b/packages/opencode/test/lib/effect.ts @@ -1,8 +1,9 @@ import { test, type TestOptions } from "bun:test" import { Cause, Duration, Effect, Exit, Layer } from "effect" -import type * as Scope from "effect/Scope" +import * as Scope from "effect/Scope" import * as TestClock from "effect/testing/TestClock" import * as TestConsole from "effect/testing/TestConsole" +import { memoMap } from "@opencode-ai/core/effect/memo-map" import type { Config } from "@/config/config" import { TestInstance, withTmpdirInstance } from "../fixture/fixture" @@ -24,7 +25,9 @@ function instanceArgs( const body = (value: Body) => Effect.suspend(() => (typeof value === "function" ? value() : value)) -const run = (value: Body, layer: Layer.Layer) => +type Runner = (value: Body, layer: Layer.Layer) => Promise + +const isolatedRun: Runner = (value, layer) => Effect.gen(function* () { const exit = yield* body(value).pipe(Effect.scoped, Effect.provide(layer), Effect.exit) if (Exit.isFailure(exit)) { @@ -35,7 +38,25 @@ const run = (value: Body, layer: Layer.Layer return yield* exit }).pipe(Effect.runPromise) -const make = (testLayer: Layer.Layer, liveLayer: Layer.Layer) => { +// Builds the test layer through the shared process-wide memoMap so cached +// services (Bus, Session, …) match Server.Default's instances. Use for tests +// that publish to an in-process HTTP server and need pub/sub identity with +// the server's handlers. +const sharedRun: Runner = (value, layer) => + Effect.gen(function* () { + const scope = yield* Scope.make() + const ctx = yield* Layer.buildWithMemoMap(layer, memoMap, scope) + const exit = yield* body(value).pipe(Effect.scoped, Effect.provide(ctx), Effect.exit) + yield* Scope.close(scope, Exit.void) + if (Exit.isFailure(exit)) { + for (const err of Cause.prettyErrors(exit.cause)) { + yield* Effect.logError(err) + } + } + return yield* exit + }).pipe(Effect.runPromise) + +const make = (testLayer: Layer.Layer, liveLayer: Layer.Layer, run: Runner = isolatedRun) => { const effect = (name: string, value: Body, opts?: number | TestOptions) => test(name, () => run(value, testLayer), opts) @@ -110,6 +131,13 @@ export const it = make(testEnv, liveEnv) export const testEffect = (layer: Layer.Layer) => make(Layer.provideMerge(layer, testEnv), Layer.provideMerge(layer, liveEnv)) +// Variant of `testEffect` that builds the test layer through the shared +// process-wide memoMap so services like Bus/Session resolve to the same +// instances Server.Default uses. Use when a test needs pub/sub identity with +// an in-process HTTP server — most tests should stick with `testEffect`. +export const testEffectShared = (layer: Layer.Layer) => + make(Layer.provideMerge(layer, testEnv), Layer.provideMerge(layer, liveEnv), sharedRun) + export const awaitWithTimeout = ( self: Effect.Effect, message: string, diff --git a/packages/opencode/test/server/httpapi-event-diagnostics.test.ts b/packages/opencode/test/server/httpapi-event-diagnostics.test.ts index c3ad3e0584..66bd0bcedd 100644 --- a/packages/opencode/test/server/httpapi-event-diagnostics.test.ts +++ b/packages/opencode/test/server/httpapi-event-diagnostics.test.ts @@ -1,53 +1,52 @@ // Diagnostic suite for /event SSE delivery. // // Each test isolates ONE variable in the publisher chain while keeping the -// subscriber path constant (raw `app().request` reading the SSE body — no SDK -// consumer involvement). The pass/fail pattern across tests tells us where the -// bug lives: +// subscriber path constant (in-process HttpApi via Server.Default reading the +// SSE body). The pass/fail pattern across tests tells us where the bug lives: // -// D1 (baseline): publish via Bus.Service.use via AppRuntime — mirror of the -// existing httpapi-event.test.ts test 3. Confirms /event SSE delivery -// works for a SOME publish path. +// D1 (baseline): publish via Bus.use.publish — mirror of httpapi-event.test.ts +// test 3. Confirms /event SSE delivery works for SOME publish path. // -// D2: publish N times in quick succession via Bus.Service.use. If the bus +// D2: publish N times in quick succession via Bus.use.publish. If the bus // subscription is acquired correctly there should be no message loss. // -// D3: publish via SyncEvent.use.run via AppRuntime — exercises the same path -// the HTTP handlers use (Session.updatePart → sync.run → bus.publish) -// without the HTTP roundtrip. Tells us whether the sync path itself can -// deliver in-process. +// D3: publish via SyncEvent.use.run — exercises the same path the HTTP +// handlers use (Session.updatePart → sync.run → bus.publish) without +// the HTTP roundtrip. Tells us whether the sync path itself can deliver +// in-process. // -// D4: publish via SyncEvent.use.run from a fresh `Effect.provide` scope -// (mimicking what happens if a handler's layer was scoped per-request). +// D4: publish via SyncEvent.use.run; subscriber is an in-process Bus +// callback. Confirms pub/sub identity end-to-end without /event SSE. // -// D5: in-process Bus.Service callback subscriber AND raw /event SSE subscriber +// D5: in-process Bus callback subscriber AND raw /event SSE subscriber // receive the same publish. If both receive: no bug. If only the // callback receives: the /event handler has an acquisition race. +// +// D6: same as D5 but the callback subscriber is attached AFTER /event SSE +// subscription is established. Order-of-setup variable. import { afterEach, describe, expect } from "bun:test" -import { Deferred, Effect, Schema } from "effect" +import { Deferred, Effect, Layer, Schema } from "effect" import * as Log from "@opencode-ai/core/util/log" import { Bus } from "../../src/bus" -import { type AppServices, AppRuntime } from "../../src/effect/app-runtime" -import { InstanceRef } from "../../src/effect/instance-ref" -import { Server } from "../../src/server/server" import { Event as ServerEvent } from "../../src/server/event" +import { Server } from "../../src/server/server" import { EventPaths } from "../../src/server/routes/instance/httpapi/groups/event" import { MessageV2 } from "../../src/session/message-v2" import { MessageID, PartID, SessionID } from "../../src/session/schema" import { SyncEvent } from "../../src/sync" import { resetDatabase } from "../fixture/db" import { disposeAllInstances, TestInstance } from "../fixture/fixture" -import { it } from "../lib/effect" +import { testEffectShared } from "../lib/effect" void Log.init({ print: false }) -const EventData = Schema.Struct({ +const SseEvent = Schema.Struct({ id: Schema.optional(Schema.String), type: Schema.String, properties: Schema.Record(Schema.String, Schema.Any), }) -type SseEvent = Schema.Schema.Type +type SseEvent = Schema.Schema.Type type BusEvent = { type: string; properties: unknown } afterEach(async () => { @@ -55,30 +54,21 @@ afterEach(async () => { await resetDatabase() }) -const inApp = (eff: Effect.Effect) => - Effect.gen(function* () { - const ctx = yield* InstanceRef - if (!ctx) return yield* Effect.die("InstanceRef not provided in test scope") - return yield* Effect.promise(() => AppRuntime.runPromise(eff.pipe(Effect.provideService(InstanceRef, ctx)))) - }) +const it = testEffectShared(Layer.mergeAll(Bus.defaultLayer, SyncEvent.defaultLayer)) -const publishConnected = inApp(Bus.Service.use((svc) => svc.publish(ServerEvent.Connected, {}))) +const publishConnected = Bus.use.publish(ServerEvent.Connected, {}) const publishPartUpdated = (partID: ReturnType) => { const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`) - return inApp( - SyncEvent.use.run(MessageV2.Event.PartUpdated, { - sessionID, - part: { id: partID, sessionID, messageID: MessageID.ascending(), type: "text", text: "diag" }, - time: Date.now(), - }), - ) + return SyncEvent.use.run(MessageV2.Event.PartUpdated, { + sessionID, + part: { id: partID, sessionID, messageID: MessageID.ascending(), type: "text", text: "diag" }, + time: Date.now(), + }) } const subscribeAllCallback = (handler: (event: BusEvent) => void) => - Effect.acquireRelease(inApp(Bus.Service.use((svc) => svc.subscribeAllCallback(handler))), (dispose) => - Effect.sync(() => dispose()), - ) + Effect.acquireRelease(Bus.use.subscribeAllCallback(handler), (dispose) => Effect.sync(() => dispose())) const openEventStream = (directory: string) => Effect.gen(function* () { @@ -99,7 +89,7 @@ function decodeFrame(value: Uint8Array): SseEvent[] { .split(/\n\n+/) .map((part) => part.trim()) .filter((part) => part.length > 0) - .map((part) => Schema.decodeUnknownSync(EventData)(JSON.parse(part.replace(/^data: /, "")))) + .map((part) => Schema.decodeUnknownSync(SseEvent)(JSON.parse(part.replace(/^data: /, "")))) } const readNextEvent = (reader: ReadableStreamDefaultReader) => @@ -112,7 +102,7 @@ const readNextEvent = (reader: ReadableStreamDefaultReader) => if (result.done || !result.value) return Effect.fail(new Error("event stream closed")) const frames = decodeFrame(result.value) if (frames.length === 0) return Effect.fail(new Error("empty SSE frame")) - return Effect.succeed(frames[0]) + return Effect.succeed(frames[0]!) }), ) @@ -172,7 +162,7 @@ describe("/event SSE delivery diagnostics", () => { // The critical test. If D1 passes but this fails, the bus-identity fix is // incomplete OR the sync.run publish path doesn't reach the same bus - // /event subscribes to, even within the same AppRuntime. + // /event subscribes to, even when both share the memoMap. it.instance( "D3: delivers a SyncEvent published via SyncEvent.use.run after server.connected", () => @@ -198,7 +188,7 @@ describe("/event SSE delivery diagnostics", () => { // D4: ensure the publish reaches an in-process Bus subscriber too. Confirms // pub/sub identity end-to-end without involving /event SSE. it.instance( - "D4: SyncEvent.use.run publish reaches an in-process Bus.Service.use callback", + "D4: SyncEvent.use.run publish reaches an in-process Bus callback", () => Effect.gen(function* () { const received = yield* Deferred.make() @@ -255,29 +245,6 @@ describe("/event SSE delivery diagnostics", () => { { git: true, config: { formatter: false, lsp: false } }, ) - // D7: like D5 but the "second subscriber" is a NO-OP AppRuntime.runPromise - // call (no PubSub.subscribe). If D7 passes, the specific subscribeAllCallback - // is what breaks SSE — not arbitrary AppRuntime usage. If D7 fails, anything - // running through AppRuntime concurrently with /event SSE breaks delivery. - it.instance( - "D7: SSE receives sync.run publish even with concurrent no-op AppRuntime activity", - () => - Effect.gen(function* () { - const { directory } = yield* TestInstance - yield* inApp(Effect.void) - - const reader = yield* openEventStream(directory) - expect((yield* readNextEvent(reader)).type).toBe("server.connected") - - const partID = PartID.ascending() - yield* publishPartUpdated(partID) - - const collected = yield* collectUntilEvent(reader, isPartUpdated) - expect(collected.find(isPartUpdated)).toBeDefined() - }), - { git: true, config: { formatter: false, lsp: false } }, - ) - // D6: same as D5 but the callback subscriber is attached AFTER /event SSE // subscription is established. If D5 fails and D6 passes, the order of // subscriber setup is the determining factor. diff --git a/packages/opencode/test/server/httpapi-event.test.ts b/packages/opencode/test/server/httpapi-event.test.ts index fcf7b59ff3..44d421ea0a 100644 --- a/packages/opencode/test/server/httpapi-event.test.ts +++ b/packages/opencode/test/server/httpapi-event.test.ts @@ -1,121 +1,101 @@ -import { afterEach, describe, expect, test } from "bun:test" +import { afterEach, describe, expect } from "bun:test" +import { Effect, Schema } from "effect" +import * as Log from "@opencode-ai/core/util/log" import { Bus } from "../../src/bus" -import { AppRuntime } from "../../src/effect/app-runtime" -import { InstanceRef } from "../../src/effect/instance-ref" +import { Event as ServerEvent } from "../../src/server/event" import { Server } from "../../src/server/server" import { EventPaths } from "../../src/server/routes/instance/httpapi/groups/event" -import { Event as ServerEvent } from "../../src/server/event" -import * as Log from "@opencode-ai/core/util/log" -import { Effect, Schema } from "effect" import { resetDatabase } from "../fixture/db" -import { disposeAllInstances, reloadTestInstance, tmpdir } from "../fixture/fixture" +import { disposeAllInstances, TestInstance } from "../fixture/fixture" +import { testEffectShared } from "../lib/effect" void Log.init({ print: false }) -function app() { - return Server.Default().app -} - const EventData = Schema.Struct({ id: Schema.optional(Schema.String), type: Schema.String, properties: Schema.Record(Schema.String, Schema.Any), }) -async function readChunk(reader: ReadableStreamDefaultReader) { - let timeout: ReturnType | undefined - try { - return await Promise.race([ - reader.read(), - new Promise((_, reject) => { - timeout = setTimeout(() => reject(new Error("timed out waiting for event")), 5_000) +const readEvent = (reader: ReadableStreamDefaultReader) => + Effect.gen(function* () { + const result = yield* Effect.promise(() => reader.read()).pipe( + Effect.timeoutOrElse({ + duration: "5 seconds", + orElse: () => Effect.fail(new Error("timed out waiting for event")), }), - ]) - } finally { - if (timeout) clearTimeout(timeout) - } -} + ) + if (result.done || !result.value) return yield* Effect.fail(new Error("event stream closed")) + return Schema.decodeUnknownSync(EventData)( + JSON.parse(new TextDecoder().decode(result.value).replace(/^data: /, "")), + ) + }) -async function readFirstEvent(response: Response) { - if (!response.body) throw new Error("missing response body") - const reader = response.body.getReader() - try { - return await readEvent(reader) - } finally { - await reader.cancel() - } -} - -async function readEvent(reader: ReadableStreamDefaultReader) { - const result = await readChunk(reader) - if (result.done || !result.value) throw new Error("event stream closed") - return Schema.decodeUnknownSync(EventData)(JSON.parse(new TextDecoder().decode(result.value).replace(/^data: /, ""))) -} - -async function readStatusWithin(reader: ReadableStreamDefaultReader, delay: number) { - let timeout: ReturnType | undefined - try { - return await Promise.race([ - reader.read().then((result) => (result.done ? "closed" : "event")), - new Promise<"open">((resolve) => { - timeout = setTimeout(() => resolve("open"), delay) - }), - ]) - } finally { - if (timeout) clearTimeout(timeout) - } -} +const openEventStream = (directory: string) => + Effect.gen(function* () { + const response = yield* Effect.promise(async () => + Server.Default().app.request(EventPaths.event, { headers: { "x-opencode-directory": directory } }), + ) + if (!response.body) return yield* Effect.die("missing SSE response body") + const reader = response.body.getReader() + yield* Effect.addFinalizer(() => Effect.promise(() => reader.cancel().catch(() => undefined))) + return { response, reader } + }) afterEach(async () => { await disposeAllInstances() await resetDatabase() }) +const it = testEffectShared(Bus.defaultLayer) + describe("event HttpApi", () => { - test("serves event stream", async () => { - await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) - const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) + it.instance( + "serves event stream", + () => + Effect.gen(function* () { + const { directory } = yield* TestInstance + const { response, reader } = yield* openEventStream(directory) - expect(response.status).toBe(200) - expect(response.headers.get("content-type")).toContain("text/event-stream") - expect(response.headers.get("cache-control")).toBe("no-cache, no-transform") - expect(response.headers.get("x-accel-buffering")).toBe("no") - expect(response.headers.get("x-content-type-options")).toBe("nosniff") - expect(await readFirstEvent(response)).toMatchObject({ type: "server.connected", properties: {} }) - }) + expect(response.status).toBe(200) + expect(response.headers.get("content-type")).toContain("text/event-stream") + expect(response.headers.get("cache-control")).toBe("no-cache, no-transform") + expect(response.headers.get("x-accel-buffering")).toBe("no") + expect(response.headers.get("x-content-type-options")).toBe("nosniff") + expect(yield* readEvent(reader)).toMatchObject({ type: "server.connected", properties: {} }) + }), + { git: true, config: { formatter: false, lsp: false } }, + ) - test("keeps the event stream open after the initial event", async () => { - await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) - const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) - if (!response.body) throw new Error("missing response body") + it.instance( + "keeps the event stream open after the initial event", + () => + Effect.gen(function* () { + const { directory } = yield* TestInstance + const { reader } = yield* openEventStream(directory) + expect(yield* readEvent(reader)).toMatchObject({ type: "server.connected", properties: {} }) - const reader = response.body.getReader() - try { - expect(await readEvent(reader)).toMatchObject({ type: "server.connected", properties: {} }) - expect(await readStatusWithin(reader, 250)).toBe("open") - } finally { - await reader.cancel() - } - }) + // If no second event arrives within 250ms, the stream is still open. + const status = yield* Effect.promise(() => reader.read()).pipe( + Effect.map((result) => (result.done ? ("closed" as const) : ("event" as const))), + Effect.timeoutOrElse({ duration: "250 millis", orElse: () => Effect.succeed("open" as const) }), + ) + expect(status).toBe("open") + }), + { git: true, config: { formatter: false, lsp: false } }, + ) - test("delivers instance bus events after the initial event", async () => { - await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) - const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) - if (!response.body) throw new Error("missing response body") + it.instance( + "delivers instance bus events after the initial event", + () => + Effect.gen(function* () { + const { directory } = yield* TestInstance + const { reader } = yield* openEventStream(directory) + expect(yield* readEvent(reader)).toMatchObject({ type: "server.connected", properties: {} }) - const reader = response.body.getReader() - try { - expect(await readEvent(reader)).toMatchObject({ type: "server.connected", properties: {} }) - - const next = readEvent(reader) - const ctx = await reloadTestInstance({ directory: tmp.path }) - await AppRuntime.runPromise( - Bus.Service.use((svc) => svc.publish(ServerEvent.Connected, {})).pipe(Effect.provideService(InstanceRef, ctx)), - ) - - expect(await next).toMatchObject({ type: "server.connected", properties: {} }) - } finally { - await reader.cancel() - } - }) + yield* Bus.use.publish(ServerEvent.Connected, {}) + expect(yield* readEvent(reader)).toMatchObject({ type: "server.connected", properties: {} }) + }), + { git: true, config: { formatter: false, lsp: false } }, + ) })