mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-21 11:26:39 +00:00
test(server): port event-diagnostics to Effect runner (#28520)
This commit is contained in:
@@ -23,27 +23,24 @@
|
||||
// D5: in-process Bus.Service callback subscriber AND raw /event SSE subscriber
|
||||
// receive the same publish. If both receive: no bug. If only the
|
||||
// callback receives: the /event handler has an acquisition race.
|
||||
import { afterEach, describe, expect, test } from "bun:test"
|
||||
import { afterEach, describe, expect } from "bun:test"
|
||||
import { Deferred, Effect, Schema } from "effect"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import { Bus } from "../../src/bus"
|
||||
import { AppRuntime } from "../../src/effect/app-runtime"
|
||||
import { type AppServices, AppRuntime } from "../../src/effect/app-runtime"
|
||||
import { InstanceRef } from "../../src/effect/instance-ref"
|
||||
import { Server } from "../../src/server/server"
|
||||
import { EventPaths } from "../../src/server/routes/instance/httpapi/groups/event"
|
||||
import { Event as ServerEvent } from "../../src/server/event"
|
||||
import { SyncEvent } from "../../src/sync"
|
||||
import { EventPaths } from "../../src/server/routes/instance/httpapi/groups/event"
|
||||
import { MessageV2 } from "../../src/session/message-v2"
|
||||
import { MessageID, PartID, SessionID } from "../../src/session/schema"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import { Effect, Schema } from "effect"
|
||||
import { SyncEvent } from "../../src/sync"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
import { disposeAllInstances, reloadTestInstance, tmpdir } from "../fixture/fixture"
|
||||
import { disposeAllInstances, TestInstance } from "../fixture/fixture"
|
||||
import { it } from "../lib/effect"
|
||||
|
||||
void Log.init({ print: false })
|
||||
|
||||
function app() {
|
||||
return Server.Default().app
|
||||
}
|
||||
|
||||
const EventData = Schema.Struct({
|
||||
id: Schema.optional(Schema.String),
|
||||
type: Schema.String,
|
||||
@@ -51,170 +48,153 @@ const EventData = Schema.Struct({
|
||||
})
|
||||
|
||||
type SseEvent = Schema.Schema.Type<typeof EventData>
|
||||
|
||||
async function readChunk(reader: ReadableStreamDefaultReader<Uint8Array>, timeoutMs = 3_000) {
|
||||
let timeout: ReturnType<typeof setTimeout> | undefined
|
||||
try {
|
||||
return await Promise.race([
|
||||
reader.read(),
|
||||
new Promise<never>((_, reject) => {
|
||||
timeout = setTimeout(() => reject(new Error(`timed out after ${timeoutMs}ms`)), timeoutMs)
|
||||
}),
|
||||
])
|
||||
} finally {
|
||||
if (timeout) clearTimeout(timeout)
|
||||
}
|
||||
}
|
||||
|
||||
const textDecoder = new TextDecoder()
|
||||
|
||||
function decodeFrame(value: Uint8Array): SseEvent[] {
|
||||
// SSE frames are separated by blank lines and each starts with "data: ".
|
||||
// For our happy-path tests one chunk == one frame, but be defensive.
|
||||
const text = textDecoder.decode(value)
|
||||
return text
|
||||
.split(/\n\n+/)
|
||||
.map((part) => part.trim())
|
||||
.filter((part) => part.length > 0)
|
||||
.map((part) => {
|
||||
const payload = part.replace(/^data: /, "")
|
||||
return Schema.decodeUnknownSync(EventData)(JSON.parse(payload))
|
||||
})
|
||||
}
|
||||
|
||||
async function readNextEvent(reader: ReadableStreamDefaultReader<Uint8Array>, timeoutMs = 3_000): Promise<SseEvent> {
|
||||
const result = await readChunk(reader, timeoutMs)
|
||||
if (result.done || !result.value) throw new Error("event stream closed")
|
||||
const frames = decodeFrame(result.value)
|
||||
if (frames.length === 0) throw new Error("empty SSE frame")
|
||||
return frames[0]
|
||||
}
|
||||
|
||||
async function collectUntil(
|
||||
reader: ReadableStreamDefaultReader<Uint8Array>,
|
||||
predicate: (event: SseEvent) => boolean,
|
||||
timeoutMs = 3_000,
|
||||
): Promise<SseEvent[]> {
|
||||
const events: SseEvent[] = []
|
||||
const deadline = Date.now() + timeoutMs
|
||||
while (Date.now() < deadline) {
|
||||
const remaining = deadline - Date.now()
|
||||
const result = await readChunk(reader, remaining).catch((cause) => {
|
||||
throw new Error(`collectUntil timed out after ${events.length} events: ${cause}`)
|
||||
})
|
||||
if (result.done || !result.value) throw new Error("event stream closed mid-collect")
|
||||
for (const event of decodeFrame(result.value)) {
|
||||
events.push(event)
|
||||
if (predicate(event)) return events
|
||||
}
|
||||
}
|
||||
throw new Error(`collectUntil deadline exceeded; collected ${events.length}: ${JSON.stringify(events)}`)
|
||||
}
|
||||
type BusEvent = { type: string; properties: unknown }
|
||||
|
||||
afterEach(async () => {
|
||||
await disposeAllInstances()
|
||||
await resetDatabase()
|
||||
})
|
||||
|
||||
const inApp = <A, E>(eff: Effect.Effect<A, E, AppServices>) =>
|
||||
Effect.flatMap(InstanceRef, (ctx) =>
|
||||
ctx
|
||||
? Effect.promise(() => AppRuntime.runPromise(eff.pipe(Effect.provideService(InstanceRef, ctx))))
|
||||
: Effect.die("InstanceRef not provided in test scope"),
|
||||
)
|
||||
|
||||
const publishConnected = inApp(Bus.Service.use((svc) => svc.publish(ServerEvent.Connected, {})))
|
||||
|
||||
const publishPartUpdated = (partID: ReturnType<typeof PartID.ascending>) => {
|
||||
const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`)
|
||||
return inApp(
|
||||
SyncEvent.use.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID,
|
||||
part: { id: partID, sessionID, messageID: MessageID.ascending(), type: "text", text: "diag" },
|
||||
time: Date.now(),
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
const subscribeAllCallback = (handler: (event: BusEvent) => void) =>
|
||||
Effect.acquireRelease(
|
||||
inApp(Bus.Service.use((svc) => svc.subscribeAllCallback(handler))),
|
||||
(dispose) => Effect.sync(dispose),
|
||||
)
|
||||
|
||||
const openEventStream = (directory: string) =>
|
||||
Effect.gen(function* () {
|
||||
const response = yield* Effect.promise(async () =>
|
||||
Server.Default().app.request(EventPaths.event, { headers: { "x-opencode-directory": directory } }),
|
||||
)
|
||||
if (!response.body) return yield* Effect.die("missing SSE response body")
|
||||
const reader = response.body.getReader()
|
||||
yield* Effect.addFinalizer(() => Effect.promise(() => reader.cancel().catch(() => undefined)))
|
||||
return reader
|
||||
})
|
||||
|
||||
const decoder = new TextDecoder()
|
||||
|
||||
function decodeFrame(value: Uint8Array): SseEvent[] {
|
||||
return decoder
|
||||
.decode(value)
|
||||
.split(/\n\n+/)
|
||||
.map((part) => part.trim())
|
||||
.filter((part) => part.length > 0)
|
||||
.map((part) => Schema.decodeUnknownSync(EventData)(JSON.parse(part.replace(/^data: /, ""))))
|
||||
}
|
||||
|
||||
const readNextEvent = (reader: ReadableStreamDefaultReader<Uint8Array>) =>
|
||||
Effect.promise(() => reader.read()).pipe(
|
||||
Effect.timeoutOrElse({
|
||||
duration: "3 seconds",
|
||||
orElse: () => Effect.fail(new Error("timed out reading SSE chunk")),
|
||||
}),
|
||||
Effect.flatMap((result) => {
|
||||
if (result.done || !result.value) return Effect.fail(new Error("event stream closed"))
|
||||
const frames = decodeFrame(result.value)
|
||||
if (frames.length === 0) return Effect.fail(new Error("empty SSE frame"))
|
||||
return Effect.succeed(frames[0]!)
|
||||
}),
|
||||
)
|
||||
|
||||
const collectUntilEvent = (
|
||||
reader: ReadableStreamDefaultReader<Uint8Array>,
|
||||
predicate: (event: SseEvent) => boolean,
|
||||
) =>
|
||||
Effect.gen(function* () {
|
||||
const events: SseEvent[] = []
|
||||
while (true) {
|
||||
const event = yield* readNextEvent(reader)
|
||||
events.push(event)
|
||||
if (predicate(event)) return events
|
||||
}
|
||||
}).pipe(
|
||||
Effect.timeoutOrElse({
|
||||
duration: "4 seconds",
|
||||
orElse: () => Effect.fail(new Error("collectUntil deadline exceeded")),
|
||||
}),
|
||||
)
|
||||
|
||||
const isPartUpdated = (event: { type: string }) => event.type === MessageV2.Event.PartUpdated.type
|
||||
|
||||
describe("/event SSE delivery diagnostics", () => {
|
||||
// Sanity: baseline same as httpapi-event.test.ts test 3 (already known to pass)
|
||||
// but explicit about timing — publish happens with NO wait after reading
|
||||
// server.connected. If this fails we have a deeper problem than just sync.
|
||||
test("D1: delivers a single bus event published right after server.connected", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
|
||||
if (!response.body) throw new Error("missing response body")
|
||||
const reader = response.body.getReader()
|
||||
try {
|
||||
const first = await readNextEvent(reader)
|
||||
expect(first.type).toBe("server.connected")
|
||||
it.instance(
|
||||
"D1: delivers a single bus event published right after server.connected",
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const { directory } = yield* TestInstance
|
||||
const reader = yield* openEventStream(directory)
|
||||
|
||||
const ctx = await reloadTestInstance({ directory: tmp.path })
|
||||
// NO wait — publish immediately
|
||||
await AppRuntime.runPromise(
|
||||
Bus.Service.use((svc) => svc.publish(ServerEvent.Connected, {})).pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
const next = await readNextEvent(reader)
|
||||
expect(next.type).toBe("server.connected") // ServerEvent.Connected.type === "server.connected"
|
||||
} finally {
|
||||
await reader.cancel()
|
||||
}
|
||||
})
|
||||
expect((yield* readNextEvent(reader)).type).toBe("server.connected")
|
||||
yield* publishConnected
|
||||
expect((yield* readNextEvent(reader)).type).toBe("server.connected")
|
||||
}),
|
||||
{ git: true, config: { formatter: false, lsp: false } },
|
||||
)
|
||||
|
||||
// If D1 passes but D2 fails, we have a queue-drain or partial-loss issue.
|
||||
test("D2: delivers all N bus events published in rapid succession", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
|
||||
if (!response.body) throw new Error("missing response body")
|
||||
const reader = response.body.getReader()
|
||||
try {
|
||||
const first = await readNextEvent(reader)
|
||||
expect(first.type).toBe("server.connected")
|
||||
it.instance(
|
||||
"D2: delivers all N bus events published in rapid succession",
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const { directory } = yield* TestInstance
|
||||
const reader = yield* openEventStream(directory)
|
||||
expect((yield* readNextEvent(reader)).type).toBe("server.connected")
|
||||
|
||||
const ctx = await reloadTestInstance({ directory: tmp.path })
|
||||
const N = 5
|
||||
for (let i = 0; i < N; i++) {
|
||||
await AppRuntime.runPromise(
|
||||
Bus.Service.use((svc) => svc.publish(ServerEvent.Connected, {})).pipe(
|
||||
Effect.provideService(InstanceRef, ctx),
|
||||
),
|
||||
)
|
||||
}
|
||||
const N = 5
|
||||
yield* Effect.replicateEffect(publishConnected, N)
|
||||
|
||||
const received: SseEvent[] = []
|
||||
for (let i = 0; i < N; i++) {
|
||||
received.push(await readNextEvent(reader))
|
||||
}
|
||||
expect(received).toHaveLength(N)
|
||||
for (const event of received) expect(event.type).toBe("server.connected")
|
||||
} finally {
|
||||
await reader.cancel()
|
||||
}
|
||||
})
|
||||
const received = yield* Effect.replicateEffect(readNextEvent(reader), N)
|
||||
expect(received).toHaveLength(N)
|
||||
for (const event of received) expect(event.type).toBe("server.connected")
|
||||
}),
|
||||
{ git: true, config: { formatter: false, lsp: false } },
|
||||
)
|
||||
|
||||
// The critical test. If D1 passes but this fails, the bus-identity fix is
|
||||
// incomplete OR the sync.run publish path doesn't reach the same bus
|
||||
// /event subscribes to, even within the same AppRuntime.
|
||||
test("D3: delivers a SyncEvent published via SyncEvent.use.run after server.connected", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
|
||||
if (!response.body) throw new Error("missing response body")
|
||||
const reader = response.body.getReader()
|
||||
try {
|
||||
const first = await readNextEvent(reader)
|
||||
expect(first.type).toBe("server.connected")
|
||||
it.instance(
|
||||
"D3: delivers a SyncEvent published via SyncEvent.use.run after server.connected",
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const { directory } = yield* TestInstance
|
||||
const reader = yield* openEventStream(directory)
|
||||
expect((yield* readNextEvent(reader)).type).toBe("server.connected")
|
||||
|
||||
const ctx = await reloadTestInstance({ directory: tmp.path })
|
||||
const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`)
|
||||
const messageID = MessageID.ascending()
|
||||
const partID = PartID.ascending()
|
||||
const part: MessageV2.Part = {
|
||||
id: partID,
|
||||
sessionID,
|
||||
messageID,
|
||||
type: "text",
|
||||
text: "diag",
|
||||
}
|
||||
const partID = PartID.ascending()
|
||||
yield* publishPartUpdated(partID)
|
||||
|
||||
await AppRuntime.runPromise(
|
||||
SyncEvent.use
|
||||
.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID,
|
||||
part: structuredClone(part) as MessageV2.Part,
|
||||
time: Date.now(),
|
||||
})
|
||||
.pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
const collected = await collectUntil(reader, (event) => event.type === MessageV2.Event.PartUpdated.type, 4_000)
|
||||
const updated = collected.find((event) => event.type === MessageV2.Event.PartUpdated.type)
|
||||
expect(updated).toBeDefined()
|
||||
expect((updated as any).properties.part.id).toBe(partID)
|
||||
} finally {
|
||||
await reader.cancel()
|
||||
}
|
||||
})
|
||||
const collected = yield* collectUntilEvent(reader, isPartUpdated)
|
||||
const updated = collected.find(isPartUpdated)
|
||||
expect(updated).toBeDefined()
|
||||
expect((updated as SseEvent).properties.part.id).toBe(partID)
|
||||
}),
|
||||
{ git: true, config: { formatter: false, lsp: false } },
|
||||
)
|
||||
|
||||
// If D3 passes but D5 (the SDK E2E in httpapi-sdk.test.ts) fails, then the
|
||||
// bug is specifically in the cross-request / cross-fiber HTTP path, not in
|
||||
@@ -222,222 +202,116 @@ describe("/event SSE delivery diagnostics", () => {
|
||||
//
|
||||
// D4: ensure the publish reaches an in-process Bus subscriber too. Confirms
|
||||
// pub/sub identity end-to-end without involving /event SSE.
|
||||
test("D4: SyncEvent.use.run publish reaches an in-process Bus.Service.use callback", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const ctx = await reloadTestInstance({ directory: tmp.path })
|
||||
it.instance(
|
||||
"D4: SyncEvent.use.run publish reaches an in-process Bus.Service.use callback",
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const received = yield* Deferred.make<BusEvent>()
|
||||
yield* subscribeAllCallback((event) => {
|
||||
if (isPartUpdated(event)) Deferred.doneUnsafe(received, Effect.succeed(event))
|
||||
})
|
||||
|
||||
let resolveReceived: (event: { id: string; type: string; properties: unknown }) => void
|
||||
const received = new Promise<{ id: string; type: string; properties: unknown }>(
|
||||
(resolve) => (resolveReceived = resolve as typeof resolveReceived),
|
||||
)
|
||||
const partID = PartID.ascending()
|
||||
yield* publishPartUpdated(partID)
|
||||
|
||||
const dispose = await AppRuntime.runPromise(
|
||||
Bus.Service.use((svc) =>
|
||||
svc.subscribeAllCallback((event) => {
|
||||
if (event.type === MessageV2.Event.PartUpdated.type) resolveReceived(event)
|
||||
}),
|
||||
).pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
try {
|
||||
const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`)
|
||||
const messageID = MessageID.ascending()
|
||||
const partID = PartID.ascending()
|
||||
const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d4" }
|
||||
|
||||
await AppRuntime.runPromise(
|
||||
SyncEvent.use
|
||||
.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID,
|
||||
part: structuredClone(part) as MessageV2.Part,
|
||||
time: Date.now(),
|
||||
})
|
||||
.pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
const event = await Promise.race([
|
||||
received,
|
||||
new Promise<never>((_, reject) => setTimeout(() => reject(new Error("D4 timed out")), 3_000)),
|
||||
])
|
||||
expect(event.type).toBe(MessageV2.Event.PartUpdated.type)
|
||||
expect((event.properties as any).part.id).toBe(partID)
|
||||
} finally {
|
||||
dispose()
|
||||
}
|
||||
})
|
||||
const event = yield* Deferred.await(received).pipe(
|
||||
Effect.timeoutOrElse({
|
||||
duration: "3 seconds",
|
||||
orElse: () => Effect.fail(new Error("D4 timed out waiting for callback")),
|
||||
}),
|
||||
)
|
||||
expect(event.type).toBe(MessageV2.Event.PartUpdated.type)
|
||||
expect((event.properties as { part: { id: string } }).part.id).toBe(partID)
|
||||
}),
|
||||
{ git: true, config: { formatter: false, lsp: false } },
|
||||
)
|
||||
|
||||
// D5: BOTH subscribers attached simultaneously. Trigger ONE publish via
|
||||
// SyncEvent.use.run. Both subscribers should receive it. If only one does
|
||||
// we know exactly which side of the chain is failing.
|
||||
test("D5: same SyncEvent.use.run publish reaches BOTH /event SSE and in-process callback", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const ctx = await reloadTestInstance({ directory: tmp.path })
|
||||
it.instance(
|
||||
"D5: same SyncEvent.use.run publish reaches BOTH /event SSE and in-process callback",
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const { directory } = yield* TestInstance
|
||||
const callbackReceived = yield* Deferred.make<BusEvent>()
|
||||
yield* subscribeAllCallback((event) => {
|
||||
if (isPartUpdated(event)) Deferred.doneUnsafe(callbackReceived, Effect.succeed(event))
|
||||
})
|
||||
const reader = yield* openEventStream(directory)
|
||||
expect((yield* readNextEvent(reader)).type).toBe("server.connected")
|
||||
|
||||
// In-process callback subscriber
|
||||
let resolveCallback: (event: { type: string; properties: unknown }) => void
|
||||
const callbackReceived = new Promise<{ type: string; properties: unknown }>(
|
||||
(resolve) => (resolveCallback = resolve as typeof resolveCallback),
|
||||
)
|
||||
const dispose = await AppRuntime.runPromise(
|
||||
Bus.Service.use((svc) =>
|
||||
svc.subscribeAllCallback((event) => {
|
||||
if (event.type === MessageV2.Event.PartUpdated.type) resolveCallback(event)
|
||||
}),
|
||||
).pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
const partID = PartID.ascending()
|
||||
yield* publishPartUpdated(partID)
|
||||
|
||||
// SSE subscriber via raw HTTP
|
||||
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
|
||||
if (!response.body) throw new Error("missing response body")
|
||||
const reader = response.body.getReader()
|
||||
const sseSaw = yield* collectUntilEvent(reader, isPartUpdated).pipe(
|
||||
Effect.map((events) => events.some(isPartUpdated)),
|
||||
Effect.catch(() => Effect.succeed(false)),
|
||||
)
|
||||
const callbackSaw = yield* Deferred.await(callbackReceived).pipe(
|
||||
Effect.timeoutOrElse({ duration: "1 second", orElse: () => Effect.succeed(undefined) }),
|
||||
Effect.map((event) => event !== undefined),
|
||||
)
|
||||
|
||||
try {
|
||||
const first = await readNextEvent(reader)
|
||||
expect(first.type).toBe("server.connected")
|
||||
|
||||
const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`)
|
||||
const messageID = MessageID.ascending()
|
||||
const partID = PartID.ascending()
|
||||
const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d5" }
|
||||
|
||||
await AppRuntime.runPromise(
|
||||
SyncEvent.use
|
||||
.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID,
|
||||
part: structuredClone(part) as MessageV2.Part,
|
||||
time: Date.now(),
|
||||
})
|
||||
.pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
const sseCollected = await collectUntil(
|
||||
reader,
|
||||
(event) => event.type === MessageV2.Event.PartUpdated.type,
|
||||
4_000,
|
||||
).catch((err) => err as Error)
|
||||
const callbackResult = await Promise.race([
|
||||
callbackReceived,
|
||||
new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 1_000)),
|
||||
])
|
||||
|
||||
const sseSaw =
|
||||
Array.isArray(sseCollected) && sseCollected.some((event) => event.type === MessageV2.Event.PartUpdated.type)
|
||||
const callbackSaw = callbackResult !== "timeout"
|
||||
|
||||
// Both should see it. The reason we use a single assert with the boolean
|
||||
// pair is so the test failure message tells us exactly which side broke.
|
||||
expect({ sseSaw, callbackSaw }).toEqual({ sseSaw: true, callbackSaw: true })
|
||||
} finally {
|
||||
await reader.cancel()
|
||||
dispose()
|
||||
}
|
||||
})
|
||||
// Single assert with the boolean pair so the failure message tells us
|
||||
// exactly which side broke.
|
||||
expect({ sseSaw, callbackSaw }).toEqual({ sseSaw: true, callbackSaw: true })
|
||||
}),
|
||||
{ git: true, config: { formatter: false, lsp: false } },
|
||||
)
|
||||
|
||||
// D7: like D5 but the "second subscriber" is a NO-OP AppRuntime.runPromise
|
||||
// call (no PubSub.subscribe). If D7 passes, the specific subscribeAllCallback
|
||||
// is what breaks SSE — not arbitrary AppRuntime usage. If D7 fails, anything
|
||||
// running through AppRuntime concurrently with /event SSE breaks delivery.
|
||||
test("D7: SSE receives sync.run publish even with concurrent no-op AppRuntime activity", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const ctx = await reloadTestInstance({ directory: tmp.path })
|
||||
it.instance(
|
||||
"D7: SSE receives sync.run publish even with concurrent no-op AppRuntime activity",
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const { directory } = yield* TestInstance
|
||||
yield* inApp(Effect.void)
|
||||
|
||||
// No-op: just touches the runtime, no bus interaction
|
||||
await AppRuntime.runPromise(Effect.void)
|
||||
const reader = yield* openEventStream(directory)
|
||||
expect((yield* readNextEvent(reader)).type).toBe("server.connected")
|
||||
|
||||
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
|
||||
if (!response.body) throw new Error("missing response body")
|
||||
const reader = response.body.getReader()
|
||||
try {
|
||||
const first = await readNextEvent(reader)
|
||||
expect(first.type).toBe("server.connected")
|
||||
const partID = PartID.ascending()
|
||||
yield* publishPartUpdated(partID)
|
||||
|
||||
const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`)
|
||||
const messageID = MessageID.ascending()
|
||||
const partID = PartID.ascending()
|
||||
const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d7" }
|
||||
|
||||
await AppRuntime.runPromise(
|
||||
SyncEvent.use
|
||||
.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID,
|
||||
part: structuredClone(part) as MessageV2.Part,
|
||||
time: Date.now(),
|
||||
})
|
||||
.pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
const collected = await collectUntil(reader, (event) => event.type === MessageV2.Event.PartUpdated.type, 4_000)
|
||||
const updated = collected.find((event) => event.type === MessageV2.Event.PartUpdated.type)
|
||||
expect(updated).toBeDefined()
|
||||
} finally {
|
||||
await reader.cancel()
|
||||
}
|
||||
})
|
||||
const collected = yield* collectUntilEvent(reader, isPartUpdated)
|
||||
expect(collected.find(isPartUpdated)).toBeDefined()
|
||||
}),
|
||||
{ git: true, config: { formatter: false, lsp: false } },
|
||||
)
|
||||
|
||||
// D6: same as D5 but the callback subscriber is attached AFTER /event SSE
|
||||
// subscription is established. If D5 fails and D6 passes, the order of
|
||||
// subscriber setup is the determining factor.
|
||||
test("D6: /event SSE receives sync.run publish when callback is attached AFTER /event opens", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const ctx = await reloadTestInstance({ directory: tmp.path })
|
||||
it.instance(
|
||||
"D6: /event SSE receives sync.run publish when callback is attached AFTER /event opens",
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const { directory } = yield* TestInstance
|
||||
const reader = yield* openEventStream(directory)
|
||||
expect((yield* readNextEvent(reader)).type).toBe("server.connected")
|
||||
|
||||
// Open SSE FIRST
|
||||
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
|
||||
if (!response.body) throw new Error("missing response body")
|
||||
const reader = response.body.getReader()
|
||||
const callbackReceived = yield* Deferred.make<BusEvent>()
|
||||
yield* subscribeAllCallback((event) => {
|
||||
if (isPartUpdated(event)) Deferred.doneUnsafe(callbackReceived, Effect.succeed(event))
|
||||
})
|
||||
|
||||
try {
|
||||
const first = await readNextEvent(reader)
|
||||
expect(first.type).toBe("server.connected")
|
||||
|
||||
// THEN attach callback subscriber
|
||||
let resolveCallback: (event: { type: string; properties: unknown }) => void
|
||||
const callbackReceived = new Promise<{ type: string; properties: unknown }>(
|
||||
(resolve) => (resolveCallback = resolve as typeof resolveCallback),
|
||||
)
|
||||
const dispose = await AppRuntime.runPromise(
|
||||
Bus.Service.use((svc) =>
|
||||
svc.subscribeAllCallback((event) => {
|
||||
if (event.type === MessageV2.Event.PartUpdated.type) resolveCallback(event)
|
||||
}),
|
||||
).pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
try {
|
||||
const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`)
|
||||
const messageID = MessageID.ascending()
|
||||
const partID = PartID.ascending()
|
||||
const part: MessageV2.Part = { id: partID, sessionID, messageID, type: "text", text: "diag-d6" }
|
||||
yield* publishPartUpdated(partID)
|
||||
|
||||
await AppRuntime.runPromise(
|
||||
SyncEvent.use
|
||||
.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID,
|
||||
part: structuredClone(part) as MessageV2.Part,
|
||||
time: Date.now(),
|
||||
})
|
||||
.pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
const sseSaw = yield* collectUntilEvent(reader, isPartUpdated).pipe(
|
||||
Effect.map((events) => events.some(isPartUpdated)),
|
||||
Effect.catch(() => Effect.succeed(false)),
|
||||
)
|
||||
const callbackSaw = yield* Deferred.await(callbackReceived).pipe(
|
||||
Effect.timeoutOrElse({ duration: "1 second", orElse: () => Effect.succeed(undefined) }),
|
||||
Effect.map((event) => event !== undefined),
|
||||
)
|
||||
|
||||
const sseCollected = await collectUntil(
|
||||
reader,
|
||||
(event) => event.type === MessageV2.Event.PartUpdated.type,
|
||||
4_000,
|
||||
).catch((err) => err as Error)
|
||||
const callbackResult = await Promise.race([
|
||||
callbackReceived,
|
||||
new Promise<"timeout">((resolve) => setTimeout(() => resolve("timeout"), 1_000)),
|
||||
])
|
||||
|
||||
const sseSaw =
|
||||
Array.isArray(sseCollected) && sseCollected.some((event) => event.type === MessageV2.Event.PartUpdated.type)
|
||||
const callbackSaw = callbackResult !== "timeout"
|
||||
expect({ sseSaw, callbackSaw }).toEqual({ sseSaw: true, callbackSaw: true })
|
||||
} finally {
|
||||
dispose()
|
||||
}
|
||||
} finally {
|
||||
await reader.cancel()
|
||||
}
|
||||
})
|
||||
}),
|
||||
{ git: true, config: { formatter: false, lsp: false } },
|
||||
)
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user