diff --git a/packages/opencode/test/server/httpapi-event-diagnostics.test.ts b/packages/opencode/test/server/httpapi-event-diagnostics.test.ts index eb1751a350..17297c356c 100644 --- a/packages/opencode/test/server/httpapi-event-diagnostics.test.ts +++ b/packages/opencode/test/server/httpapi-event-diagnostics.test.ts @@ -23,27 +23,24 @@ // D5: in-process Bus.Service callback subscriber AND raw /event SSE subscriber // receive the same publish. If both receive: no bug. If only the // callback receives: the /event handler has an acquisition race. -import { afterEach, describe, expect, test } from "bun:test" +import { afterEach, describe, expect } from "bun:test" +import { Deferred, Effect, Schema } from "effect" +import * as Log from "@opencode-ai/core/util/log" import { Bus } from "../../src/bus" -import { AppRuntime } from "../../src/effect/app-runtime" +import { type AppServices, AppRuntime } from "../../src/effect/app-runtime" import { InstanceRef } from "../../src/effect/instance-ref" import { Server } from "../../src/server/server" -import { EventPaths } from "../../src/server/routes/instance/httpapi/groups/event" import { Event as ServerEvent } from "../../src/server/event" -import { SyncEvent } from "../../src/sync" +import { EventPaths } from "../../src/server/routes/instance/httpapi/groups/event" import { MessageV2 } from "../../src/session/message-v2" import { MessageID, PartID, SessionID } from "../../src/session/schema" -import * as Log from "@opencode-ai/core/util/log" -import { Effect, Schema } from "effect" +import { SyncEvent } from "../../src/sync" import { resetDatabase } from "../fixture/db" -import { disposeAllInstances, reloadTestInstance, tmpdir } from "../fixture/fixture" +import { disposeAllInstances, TestInstance } from "../fixture/fixture" +import { it } from "../lib/effect" void Log.init({ print: false }) -function app() { - return Server.Default().app -} - const EventData = Schema.Struct({ id: Schema.optional(Schema.String), type: Schema.String, @@ -51,170 +48,153 @@ const EventData = Schema.Struct({ }) type SseEvent = Schema.Schema.Type - -async function readChunk(reader: ReadableStreamDefaultReader, timeoutMs = 3_000) { - let timeout: ReturnType | undefined - try { - return await Promise.race([ - reader.read(), - new Promise((_, reject) => { - timeout = setTimeout(() => reject(new Error(`timed out after ${timeoutMs}ms`)), timeoutMs) - }), - ]) - } finally { - if (timeout) clearTimeout(timeout) - } -} - -const textDecoder = new TextDecoder() - -function decodeFrame(value: Uint8Array): SseEvent[] { - // SSE frames are separated by blank lines and each starts with "data: ". - // For our happy-path tests one chunk == one frame, but be defensive. - const text = textDecoder.decode(value) - return text - .split(/\n\n+/) - .map((part) => part.trim()) - .filter((part) => part.length > 0) - .map((part) => { - const payload = part.replace(/^data: /, "") - return Schema.decodeUnknownSync(EventData)(JSON.parse(payload)) - }) -} - -async function readNextEvent(reader: ReadableStreamDefaultReader, timeoutMs = 3_000): Promise { - const result = await readChunk(reader, timeoutMs) - if (result.done || !result.value) throw new Error("event stream closed") - const frames = decodeFrame(result.value) - if (frames.length === 0) throw new Error("empty SSE frame") - return frames[0] -} - -async function collectUntil( - reader: ReadableStreamDefaultReader, - predicate: (event: SseEvent) => boolean, - timeoutMs = 3_000, -): Promise { - const events: SseEvent[] = [] - const deadline = Date.now() + timeoutMs - while (Date.now() < deadline) { - const remaining = deadline - Date.now() - const result = await readChunk(reader, remaining).catch((cause) => { - throw new Error(`collectUntil timed out after ${events.length} events: ${cause}`) - }) - if (result.done || !result.value) throw new Error("event stream closed mid-collect") - for (const event of decodeFrame(result.value)) { - events.push(event) - if (predicate(event)) return events - } - } - throw new Error(`collectUntil deadline exceeded; collected ${events.length}: ${JSON.stringify(events)}`) -} +type BusEvent = { type: string; properties: unknown } afterEach(async () => { await disposeAllInstances() await resetDatabase() }) +const inApp = (eff: Effect.Effect) => + Effect.flatMap(InstanceRef, (ctx) => + ctx + ? Effect.promise(() => AppRuntime.runPromise(eff.pipe(Effect.provideService(InstanceRef, ctx)))) + : Effect.die("InstanceRef not provided in test scope"), + ) + +const publishConnected = inApp(Bus.Service.use((svc) => svc.publish(ServerEvent.Connected, {}))) + +const publishPartUpdated = (partID: ReturnType) => { + const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`) + return inApp( + SyncEvent.use.run(MessageV2.Event.PartUpdated, { + sessionID, + part: { id: partID, sessionID, messageID: MessageID.ascending(), type: "text", text: "diag" }, + time: Date.now(), + }), + ) +} + +const subscribeAllCallback = (handler: (event: BusEvent) => void) => + Effect.acquireRelease( + inApp(Bus.Service.use((svc) => svc.subscribeAllCallback(handler))), + (dispose) => Effect.sync(dispose), + ) + +const openEventStream = (directory: string) => + Effect.gen(function* () { + const response = yield* Effect.promise(async () => + Server.Default().app.request(EventPaths.event, { headers: { "x-opencode-directory": directory } }), + ) + if (!response.body) return yield* Effect.die("missing SSE response body") + const reader = response.body.getReader() + yield* Effect.addFinalizer(() => Effect.promise(() => reader.cancel().catch(() => undefined))) + return reader + }) + +const decoder = new TextDecoder() + +function decodeFrame(value: Uint8Array): SseEvent[] { + return decoder + .decode(value) + .split(/\n\n+/) + .map((part) => part.trim()) + .filter((part) => part.length > 0) + .map((part) => Schema.decodeUnknownSync(EventData)(JSON.parse(part.replace(/^data: /, "")))) +} + +const readNextEvent = (reader: ReadableStreamDefaultReader) => + Effect.promise(() => reader.read()).pipe( + Effect.timeoutOrElse({ + duration: "3 seconds", + orElse: () => Effect.fail(new Error("timed out reading SSE chunk")), + }), + Effect.flatMap((result) => { + if (result.done || !result.value) return Effect.fail(new Error("event stream closed")) + const frames = decodeFrame(result.value) + if (frames.length === 0) return Effect.fail(new Error("empty SSE frame")) + return Effect.succeed(frames[0]!) + }), + ) + +const collectUntilEvent = ( + reader: ReadableStreamDefaultReader, + predicate: (event: SseEvent) => boolean, +) => + Effect.gen(function* () { + const events: SseEvent[] = [] + while (true) { + const event = yield* readNextEvent(reader) + events.push(event) + if (predicate(event)) return events + } + }).pipe( + Effect.timeoutOrElse({ + duration: "4 seconds", + orElse: () => Effect.fail(new Error("collectUntil deadline exceeded")), + }), + ) + +const isPartUpdated = (event: { type: string }) => event.type === MessageV2.Event.PartUpdated.type + describe("/event SSE delivery diagnostics", () => { // Sanity: baseline same as httpapi-event.test.ts test 3 (already known to pass) // but explicit about timing — publish happens with NO wait after reading // server.connected. If this fails we have a deeper problem than just sync. - test("D1: delivers a single bus event published right after server.connected", async () => { - await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) - const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) - if (!response.body) throw new Error("missing response body") - const reader = response.body.getReader() - try { - const first = await readNextEvent(reader) - expect(first.type).toBe("server.connected") + it.instance( + "D1: delivers a single bus event published right after server.connected", + () => + Effect.gen(function* () { + const { directory } = yield* TestInstance + const reader = yield* openEventStream(directory) - const ctx = await reloadTestInstance({ directory: tmp.path }) - // NO wait — publish immediately - await AppRuntime.runPromise( - Bus.Service.use((svc) => svc.publish(ServerEvent.Connected, {})).pipe(Effect.provideService(InstanceRef, ctx)), - ) - - const next = await readNextEvent(reader) - expect(next.type).toBe("server.connected") // ServerEvent.Connected.type === "server.connected" - } finally { - await reader.cancel() - } - }) + expect((yield* readNextEvent(reader)).type).toBe("server.connected") + yield* publishConnected + expect((yield* readNextEvent(reader)).type).toBe("server.connected") + }), + { git: true, config: { formatter: false, lsp: false } }, + ) // If D1 passes but D2 fails, we have a queue-drain or partial-loss issue. - test("D2: delivers all N bus events published in rapid succession", async () => { - await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) - const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) - if (!response.body) throw new Error("missing response body") - const reader = response.body.getReader() - try { - const first = await readNextEvent(reader) - expect(first.type).toBe("server.connected") + it.instance( + "D2: delivers all N bus events published in rapid succession", + () => + Effect.gen(function* () { + const { directory } = yield* TestInstance + const reader = yield* openEventStream(directory) + expect((yield* readNextEvent(reader)).type).toBe("server.connected") - const ctx = await reloadTestInstance({ directory: tmp.path }) - const N = 5 - for (let i = 0; i < N; i++) { - await AppRuntime.runPromise( - Bus.Service.use((svc) => svc.publish(ServerEvent.Connected, {})).pipe( - Effect.provideService(InstanceRef, ctx), - ), - ) - } + const N = 5 + yield* Effect.replicateEffect(publishConnected, N) - const received: SseEvent[] = [] - for (let i = 0; i < N; i++) { - received.push(await readNextEvent(reader)) - } - expect(received).toHaveLength(N) - for (const event of received) expect(event.type).toBe("server.connected") - } finally { - await reader.cancel() - } - }) + const received = yield* Effect.replicateEffect(readNextEvent(reader), N) + expect(received).toHaveLength(N) + for (const event of received) expect(event.type).toBe("server.connected") + }), + { git: true, config: { formatter: false, lsp: false } }, + ) // The critical test. If D1 passes but this fails, the bus-identity fix is // incomplete OR the sync.run publish path doesn't reach the same bus // /event subscribes to, even within the same AppRuntime. - test("D3: delivers a SyncEvent published via SyncEvent.use.run after server.connected", async () => { - await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) - const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) - if (!response.body) throw new Error("missing response body") - const reader = response.body.getReader() - try { - const first = await readNextEvent(reader) - expect(first.type).toBe("server.connected") + it.instance( + "D3: delivers a SyncEvent published via SyncEvent.use.run after server.connected", + () => + Effect.gen(function* () { + const { directory } = yield* TestInstance + const reader = yield* openEventStream(directory) + expect((yield* readNextEvent(reader)).type).toBe("server.connected") - const ctx = await reloadTestInstance({ directory: tmp.path }) - const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`) - const messageID = MessageID.ascending() - const partID = PartID.ascending() - const part: MessageV2.Part = { - id: partID, - sessionID, - messageID, - type: "text", - text: "diag", - } + const partID = PartID.ascending() + yield* publishPartUpdated(partID) - await AppRuntime.runPromise( - SyncEvent.use - .run(MessageV2.Event.PartUpdated, { - sessionID, - part: structuredClone(part) as MessageV2.Part, - time: Date.now(), - }) - .pipe(Effect.provideService(InstanceRef, ctx)), - ) - - const collected = await collectUntil(reader, (event) => event.type === MessageV2.Event.PartUpdated.type, 4_000) - const updated = collected.find((event) => event.type === MessageV2.Event.PartUpdated.type) - expect(updated).toBeDefined() - expect((updated as any).properties.part.id).toBe(partID) - } finally { - await reader.cancel() - } - }) + const collected = yield* collectUntilEvent(reader, isPartUpdated) + const updated = collected.find(isPartUpdated) + expect(updated).toBeDefined() + expect((updated as SseEvent).properties.part.id).toBe(partID) + }), + { git: true, config: { formatter: false, lsp: false } }, + ) // If D3 passes but D5 (the SDK E2E in httpapi-sdk.test.ts) fails, then the // bug is specifically in the cross-request / cross-fiber HTTP path, not in @@ -222,222 +202,116 @@ describe("/event SSE delivery diagnostics", () => { // // D4: ensure the publish reaches an in-process Bus subscriber too. Confirms // pub/sub identity end-to-end without involving /event SSE. - test("D4: SyncEvent.use.run publish reaches an in-process Bus.Service.use callback", async () => { - await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) - const ctx = await reloadTestInstance({ directory: tmp.path }) + it.instance( + "D4: SyncEvent.use.run publish reaches an in-process Bus.Service.use callback", + () => + Effect.gen(function* () { + const received = yield* Deferred.make() + yield* subscribeAllCallback((event) => { + if (isPartUpdated(event)) Deferred.doneUnsafe(received, Effect.succeed(event)) + }) - let resolveReceived: (event: { id: string; type: string; properties: unknown }) => void - const received = new Promise<{ id: string; type: string; properties: unknown }>( - (resolve) => (resolveReceived = resolve as typeof resolveReceived), - ) + const partID = PartID.ascending() + yield* publishPartUpdated(partID) - const dispose = await AppRuntime.runPromise( - Bus.Service.use((svc) => - svc.subscribeAllCallback((event) => { - if (event.type === MessageV2.Event.PartUpdated.type) resolveReceived(event) - }), - ).pipe(Effect.provideService(InstanceRef, ctx)), - ) - - try { - const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`) - const messageID = MessageID.ascending() - const partID = PartID.ascending() - const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d4" } - - await AppRuntime.runPromise( - SyncEvent.use - .run(MessageV2.Event.PartUpdated, { - sessionID, - part: structuredClone(part) as MessageV2.Part, - time: Date.now(), - }) - .pipe(Effect.provideService(InstanceRef, ctx)), - ) - - const event = await Promise.race([ - received, - new Promise((_, reject) => setTimeout(() => reject(new Error("D4 timed out")), 3_000)), - ]) - expect(event.type).toBe(MessageV2.Event.PartUpdated.type) - expect((event.properties as any).part.id).toBe(partID) - } finally { - dispose() - } - }) + const event = yield* Deferred.await(received).pipe( + Effect.timeoutOrElse({ + duration: "3 seconds", + orElse: () => Effect.fail(new Error("D4 timed out waiting for callback")), + }), + ) + expect(event.type).toBe(MessageV2.Event.PartUpdated.type) + expect((event.properties as { part: { id: string } }).part.id).toBe(partID) + }), + { git: true, config: { formatter: false, lsp: false } }, + ) // D5: BOTH subscribers attached simultaneously. Trigger ONE publish via // SyncEvent.use.run. Both subscribers should receive it. If only one does // we know exactly which side of the chain is failing. - test("D5: same SyncEvent.use.run publish reaches BOTH /event SSE and in-process callback", async () => { - await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) - const ctx = await reloadTestInstance({ directory: tmp.path }) + it.instance( + "D5: same SyncEvent.use.run publish reaches BOTH /event SSE and in-process callback", + () => + Effect.gen(function* () { + const { directory } = yield* TestInstance + const callbackReceived = yield* Deferred.make() + yield* subscribeAllCallback((event) => { + if (isPartUpdated(event)) Deferred.doneUnsafe(callbackReceived, Effect.succeed(event)) + }) + const reader = yield* openEventStream(directory) + expect((yield* readNextEvent(reader)).type).toBe("server.connected") - // In-process callback subscriber - let resolveCallback: (event: { type: string; properties: unknown }) => void - const callbackReceived = new Promise<{ type: string; properties: unknown }>( - (resolve) => (resolveCallback = resolve as typeof resolveCallback), - ) - const dispose = await AppRuntime.runPromise( - Bus.Service.use((svc) => - svc.subscribeAllCallback((event) => { - if (event.type === MessageV2.Event.PartUpdated.type) resolveCallback(event) - }), - ).pipe(Effect.provideService(InstanceRef, ctx)), - ) + const partID = PartID.ascending() + yield* publishPartUpdated(partID) - // SSE subscriber via raw HTTP - const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) - if (!response.body) throw new Error("missing response body") - const reader = response.body.getReader() + const sseSaw = yield* collectUntilEvent(reader, isPartUpdated).pipe( + Effect.map((events) => events.some(isPartUpdated)), + Effect.catch(() => Effect.succeed(false)), + ) + const callbackSaw = yield* Deferred.await(callbackReceived).pipe( + Effect.timeoutOrElse({ duration: "1 second", orElse: () => Effect.succeed(undefined) }), + Effect.map((event) => event !== undefined), + ) - try { - const first = await readNextEvent(reader) - expect(first.type).toBe("server.connected") - - const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`) - const messageID = MessageID.ascending() - const partID = PartID.ascending() - const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d5" } - - await AppRuntime.runPromise( - SyncEvent.use - .run(MessageV2.Event.PartUpdated, { - sessionID, - part: structuredClone(part) as MessageV2.Part, - time: Date.now(), - }) - .pipe(Effect.provideService(InstanceRef, ctx)), - ) - - const sseCollected = await collectUntil( - reader, - (event) => event.type === MessageV2.Event.PartUpdated.type, - 4_000, - ).catch((err) => err as Error) - const callbackResult = await Promise.race([ - callbackReceived, - new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 1_000)), - ]) - - const sseSaw = - Array.isArray(sseCollected) && sseCollected.some((event) => event.type === MessageV2.Event.PartUpdated.type) - const callbackSaw = callbackResult !== "timeout" - - // Both should see it. The reason we use a single assert with the boolean - // pair is so the test failure message tells us exactly which side broke. - expect({ sseSaw, callbackSaw }).toEqual({ sseSaw: true, callbackSaw: true }) - } finally { - await reader.cancel() - dispose() - } - }) + // Single assert with the boolean pair so the failure message tells us + // exactly which side broke. + expect({ sseSaw, callbackSaw }).toEqual({ sseSaw: true, callbackSaw: true }) + }), + { git: true, config: { formatter: false, lsp: false } }, + ) // D7: like D5 but the "second subscriber" is a NO-OP AppRuntime.runPromise // call (no PubSub.subscribe). If D7 passes, the specific subscribeAllCallback // is what breaks SSE — not arbitrary AppRuntime usage. If D7 fails, anything // running through AppRuntime concurrently with /event SSE breaks delivery. - test("D7: SSE receives sync.run publish even with concurrent no-op AppRuntime activity", async () => { - await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) - const ctx = await reloadTestInstance({ directory: tmp.path }) + it.instance( + "D7: SSE receives sync.run publish even with concurrent no-op AppRuntime activity", + () => + Effect.gen(function* () { + const { directory } = yield* TestInstance + yield* inApp(Effect.void) - // No-op: just touches the runtime, no bus interaction - await AppRuntime.runPromise(Effect.void) + const reader = yield* openEventStream(directory) + expect((yield* readNextEvent(reader)).type).toBe("server.connected") - const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) - if (!response.body) throw new Error("missing response body") - const reader = response.body.getReader() - try { - const first = await readNextEvent(reader) - expect(first.type).toBe("server.connected") + const partID = PartID.ascending() + yield* publishPartUpdated(partID) - const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`) - const messageID = MessageID.ascending() - const partID = PartID.ascending() - const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d7" } - - await AppRuntime.runPromise( - SyncEvent.use - .run(MessageV2.Event.PartUpdated, { - sessionID, - part: structuredClone(part) as MessageV2.Part, - time: Date.now(), - }) - .pipe(Effect.provideService(InstanceRef, ctx)), - ) - - const collected = await collectUntil(reader, (event) => event.type === MessageV2.Event.PartUpdated.type, 4_000) - const updated = collected.find((event) => event.type === MessageV2.Event.PartUpdated.type) - expect(updated).toBeDefined() - } finally { - await reader.cancel() - } - }) + const collected = yield* collectUntilEvent(reader, isPartUpdated) + expect(collected.find(isPartUpdated)).toBeDefined() + }), + { git: true, config: { formatter: false, lsp: false } }, + ) // D6: same as D5 but the callback subscriber is attached AFTER /event SSE // subscription is established. If D5 fails and D6 passes, the order of // subscriber setup is the determining factor. - test("D6: /event SSE receives sync.run publish when callback is attached AFTER /event opens", async () => { - await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } }) - const ctx = await reloadTestInstance({ directory: tmp.path }) + it.instance( + "D6: /event SSE receives sync.run publish when callback is attached AFTER /event opens", + () => + Effect.gen(function* () { + const { directory } = yield* TestInstance + const reader = yield* openEventStream(directory) + expect((yield* readNextEvent(reader)).type).toBe("server.connected") - // Open SSE FIRST - const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } }) - if (!response.body) throw new Error("missing response body") - const reader = response.body.getReader() + const callbackReceived = yield* Deferred.make() + yield* subscribeAllCallback((event) => { + if (isPartUpdated(event)) Deferred.doneUnsafe(callbackReceived, Effect.succeed(event)) + }) - try { - const first = await readNextEvent(reader) - expect(first.type).toBe("server.connected") - - // THEN attach callback subscriber - let resolveCallback: (event: { type: string; properties: unknown }) => void - const callbackReceived = new Promise<{ type: string; properties: unknown }>( - (resolve) => (resolveCallback = resolve as typeof resolveCallback), - ) - const dispose = await AppRuntime.runPromise( - Bus.Service.use((svc) => - svc.subscribeAllCallback((event) => { - if (event.type === MessageV2.Event.PartUpdated.type) resolveCallback(event) - }), - ).pipe(Effect.provideService(InstanceRef, ctx)), - ) - - try { - const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`) - const messageID = MessageID.ascending() const partID = PartID.ascending() - const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d6" } + yield* publishPartUpdated(partID) - await AppRuntime.runPromise( - SyncEvent.use - .run(MessageV2.Event.PartUpdated, { - sessionID, - part: structuredClone(part) as MessageV2.Part, - time: Date.now(), - }) - .pipe(Effect.provideService(InstanceRef, ctx)), + const sseSaw = yield* collectUntilEvent(reader, isPartUpdated).pipe( + Effect.map((events) => events.some(isPartUpdated)), + Effect.catch(() => Effect.succeed(false)), + ) + const callbackSaw = yield* Deferred.await(callbackReceived).pipe( + Effect.timeoutOrElse({ duration: "1 second", orElse: () => Effect.succeed(undefined) }), + Effect.map((event) => event !== undefined), ) - - const sseCollected = await collectUntil( - reader, - (event) => event.type === MessageV2.Event.PartUpdated.type, - 4_000, - ).catch((err) => err as Error) - const callbackResult = await Promise.race([ - callbackReceived, - new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 1_000)), - ]) - - const sseSaw = - Array.isArray(sseCollected) && sseCollected.some((event) => event.type === MessageV2.Event.PartUpdated.type) - const callbackSaw = callbackResult !== "timeout" expect({ sseSaw, callbackSaw }).toEqual({ sseSaw: true, callbackSaw: true }) - } finally { - dispose() - } - } finally { - await reader.cancel() - } - }) + }), + { git: true, config: { formatter: false, lsp: false } }, + ) })