mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-22 03:45:23 +00:00
test(server): port event SSE tests to it.instance + testEffectShared (#28569)
This commit is contained in:
@@ -5,6 +5,7 @@ import { BusEvent } from "./bus-event"
|
||||
import { GlobalBus } from "./global"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { serviceUse } from "@/effect/service-use"
|
||||
import { Identifier } from "@/id/id"
|
||||
import type { InstanceContext } from "@/project/instance-context"
|
||||
import { InstanceRef } from "@/effect/instance-ref"
|
||||
@@ -56,6 +57,8 @@ export interface Interface {
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/Bus") {}
|
||||
|
||||
export const use = serviceUse(Service)
|
||||
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import { test, type TestOptions } from "bun:test"
|
||||
import { Cause, Duration, Effect, Exit, Layer } from "effect"
|
||||
import type * as Scope from "effect/Scope"
|
||||
import * as Scope from "effect/Scope"
|
||||
import * as TestClock from "effect/testing/TestClock"
|
||||
import * as TestConsole from "effect/testing/TestConsole"
|
||||
import { memoMap } from "@opencode-ai/core/effect/memo-map"
|
||||
import type { Config } from "@/config/config"
|
||||
import { TestInstance, withTmpdirInstance } from "../fixture/fixture"
|
||||
|
||||
@@ -24,7 +25,9 @@ function instanceArgs(
|
||||
|
||||
const body = <A, E, R>(value: Body<A, E, R>) => Effect.suspend(() => (typeof value === "function" ? value() : value))
|
||||
|
||||
const run = <A, E, R, E2>(value: Body<A, E, R | Scope.Scope>, layer: Layer.Layer<R, E2>) =>
|
||||
type Runner = <A, E, R, E2>(value: Body<A, E, R | Scope.Scope>, layer: Layer.Layer<R, E2>) => Promise<A>
|
||||
|
||||
const isolatedRun: Runner = (value, layer) =>
|
||||
Effect.gen(function* () {
|
||||
const exit = yield* body(value).pipe(Effect.scoped, Effect.provide(layer), Effect.exit)
|
||||
if (Exit.isFailure(exit)) {
|
||||
@@ -35,7 +38,25 @@ const run = <A, E, R, E2>(value: Body<A, E, R | Scope.Scope>, layer: Layer.Layer
|
||||
return yield* exit
|
||||
}).pipe(Effect.runPromise)
|
||||
|
||||
const make = <R, E>(testLayer: Layer.Layer<R, E>, liveLayer: Layer.Layer<R, E>) => {
|
||||
// Builds the test layer through the shared process-wide memoMap so cached
|
||||
// services (Bus, Session, …) match Server.Default's instances. Use for tests
|
||||
// that publish to an in-process HTTP server and need pub/sub identity with
|
||||
// the server's handlers.
|
||||
const sharedRun: Runner = (value, layer) =>
|
||||
Effect.gen(function* () {
|
||||
const scope = yield* Scope.make()
|
||||
const ctx = yield* Layer.buildWithMemoMap(layer, memoMap, scope)
|
||||
const exit = yield* body(value).pipe(Effect.scoped, Effect.provide(ctx), Effect.exit)
|
||||
yield* Scope.close(scope, Exit.void)
|
||||
if (Exit.isFailure(exit)) {
|
||||
for (const err of Cause.prettyErrors(exit.cause)) {
|
||||
yield* Effect.logError(err)
|
||||
}
|
||||
}
|
||||
return yield* exit
|
||||
}).pipe(Effect.runPromise)
|
||||
|
||||
const make = <R, E>(testLayer: Layer.Layer<R, E>, liveLayer: Layer.Layer<R, E>, run: Runner = isolatedRun) => {
|
||||
const effect = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
|
||||
test(name, () => run(value, testLayer), opts)
|
||||
|
||||
@@ -110,6 +131,13 @@ export const it = make(testEnv, liveEnv)
|
||||
export const testEffect = <R, E>(layer: Layer.Layer<R, E>) =>
|
||||
make(Layer.provideMerge(layer, testEnv), Layer.provideMerge(layer, liveEnv))
|
||||
|
||||
// Variant of `testEffect` that builds the test layer through the shared
|
||||
// process-wide memoMap so services like Bus/Session resolve to the same
|
||||
// instances Server.Default uses. Use when a test needs pub/sub identity with
|
||||
// an in-process HTTP server — most tests should stick with `testEffect`.
|
||||
export const testEffectShared = <R, E>(layer: Layer.Layer<R, E>) =>
|
||||
make(Layer.provideMerge(layer, testEnv), Layer.provideMerge(layer, liveEnv), sharedRun)
|
||||
|
||||
export const awaitWithTimeout = <A, E, R>(
|
||||
self: Effect.Effect<A, E, R>,
|
||||
message: string,
|
||||
|
||||
@@ -1,53 +1,52 @@
|
||||
// Diagnostic suite for /event SSE delivery.
|
||||
//
|
||||
// Each test isolates ONE variable in the publisher chain while keeping the
|
||||
// subscriber path constant (raw `app().request` reading the SSE body — no SDK
|
||||
// consumer involvement). The pass/fail pattern across tests tells us where the
|
||||
// bug lives:
|
||||
// subscriber path constant (in-process HttpApi via Server.Default reading the
|
||||
// SSE body). The pass/fail pattern across tests tells us where the bug lives:
|
||||
//
|
||||
// D1 (baseline): publish via Bus.Service.use via AppRuntime — mirror of the
|
||||
// existing httpapi-event.test.ts test 3. Confirms /event SSE delivery
|
||||
// works for a SOME publish path.
|
||||
// D1 (baseline): publish via Bus.use.publish — mirror of httpapi-event.test.ts
|
||||
// test 3. Confirms /event SSE delivery works for SOME publish path.
|
||||
//
|
||||
// D2: publish N times in quick succession via Bus.Service.use. If the bus
|
||||
// D2: publish N times in quick succession via Bus.use.publish. If the bus
|
||||
// subscription is acquired correctly there should be no message loss.
|
||||
//
|
||||
// D3: publish via SyncEvent.use.run via AppRuntime — exercises the same path
|
||||
// the HTTP handlers use (Session.updatePart → sync.run → bus.publish)
|
||||
// without the HTTP roundtrip. Tells us whether the sync path itself can
|
||||
// deliver in-process.
|
||||
// D3: publish via SyncEvent.use.run — exercises the same path the HTTP
|
||||
// handlers use (Session.updatePart → sync.run → bus.publish) without
|
||||
// the HTTP roundtrip. Tells us whether the sync path itself can deliver
|
||||
// in-process.
|
||||
//
|
||||
// D4: publish via SyncEvent.use.run from a fresh `Effect.provide` scope
|
||||
// (mimicking what happens if a handler's layer was scoped per-request).
|
||||
// D4: publish via SyncEvent.use.run; subscriber is an in-process Bus
|
||||
// callback. Confirms pub/sub identity end-to-end without /event SSE.
|
||||
//
|
||||
// D5: in-process Bus.Service callback subscriber AND raw /event SSE subscriber
|
||||
// D5: in-process Bus callback subscriber AND raw /event SSE subscriber
|
||||
// receive the same publish. If both receive: no bug. If only the
|
||||
// callback receives: the /event handler has an acquisition race.
|
||||
//
|
||||
// D6: same as D5 but the callback subscriber is attached AFTER /event SSE
|
||||
// subscription is established. Order-of-setup variable.
|
||||
import { afterEach, describe, expect } from "bun:test"
|
||||
import { Deferred, Effect, Schema } from "effect"
|
||||
import { Deferred, Effect, Layer, Schema } from "effect"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import { Bus } from "../../src/bus"
|
||||
import { type AppServices, AppRuntime } from "../../src/effect/app-runtime"
|
||||
import { InstanceRef } from "../../src/effect/instance-ref"
|
||||
import { Server } from "../../src/server/server"
|
||||
import { Event as ServerEvent } from "../../src/server/event"
|
||||
import { Server } from "../../src/server/server"
|
||||
import { EventPaths } from "../../src/server/routes/instance/httpapi/groups/event"
|
||||
import { MessageV2 } from "../../src/session/message-v2"
|
||||
import { MessageID, PartID, SessionID } from "../../src/session/schema"
|
||||
import { SyncEvent } from "../../src/sync"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
import { disposeAllInstances, TestInstance } from "../fixture/fixture"
|
||||
import { it } from "../lib/effect"
|
||||
import { testEffectShared } from "../lib/effect"
|
||||
|
||||
void Log.init({ print: false })
|
||||
|
||||
const EventData = Schema.Struct({
|
||||
const SseEvent = Schema.Struct({
|
||||
id: Schema.optional(Schema.String),
|
||||
type: Schema.String,
|
||||
properties: Schema.Record(Schema.String, Schema.Any),
|
||||
})
|
||||
|
||||
type SseEvent = Schema.Schema.Type<typeof EventData>
|
||||
type SseEvent = Schema.Schema.Type<typeof SseEvent>
|
||||
type BusEvent = { type: string; properties: unknown }
|
||||
|
||||
afterEach(async () => {
|
||||
@@ -55,30 +54,21 @@ afterEach(async () => {
|
||||
await resetDatabase()
|
||||
})
|
||||
|
||||
const inApp = <A, E>(eff: Effect.Effect<A, E, AppServices>) =>
|
||||
Effect.gen(function* () {
|
||||
const ctx = yield* InstanceRef
|
||||
if (!ctx) return yield* Effect.die("InstanceRef not provided in test scope")
|
||||
return yield* Effect.promise(() => AppRuntime.runPromise(eff.pipe(Effect.provideService(InstanceRef, ctx))))
|
||||
})
|
||||
const it = testEffectShared(Layer.mergeAll(Bus.defaultLayer, SyncEvent.defaultLayer))
|
||||
|
||||
const publishConnected = inApp(Bus.Service.use((svc) => svc.publish(ServerEvent.Connected, {})))
|
||||
const publishConnected = Bus.use.publish(ServerEvent.Connected, {})
|
||||
|
||||
const publishPartUpdated = (partID: ReturnType<typeof PartID.ascending>) => {
|
||||
const sessionID = SessionID.make(`ses_${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`)
|
||||
return inApp(
|
||||
SyncEvent.use.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID,
|
||||
part: { id: partID, sessionID, messageID: MessageID.ascending(), type: "text", text: "diag" },
|
||||
time: Date.now(),
|
||||
}),
|
||||
)
|
||||
return SyncEvent.use.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID,
|
||||
part: { id: partID, sessionID, messageID: MessageID.ascending(), type: "text", text: "diag" },
|
||||
time: Date.now(),
|
||||
})
|
||||
}
|
||||
|
||||
const subscribeAllCallback = (handler: (event: BusEvent) => void) =>
|
||||
Effect.acquireRelease(inApp(Bus.Service.use((svc) => svc.subscribeAllCallback(handler))), (dispose) =>
|
||||
Effect.sync(() => dispose()),
|
||||
)
|
||||
Effect.acquireRelease(Bus.use.subscribeAllCallback(handler), (dispose) => Effect.sync(() => dispose()))
|
||||
|
||||
const openEventStream = (directory: string) =>
|
||||
Effect.gen(function* () {
|
||||
@@ -99,7 +89,7 @@ function decodeFrame(value: Uint8Array): SseEvent[] {
|
||||
.split(/\n\n+/)
|
||||
.map((part) => part.trim())
|
||||
.filter((part) => part.length > 0)
|
||||
.map((part) => Schema.decodeUnknownSync(EventData)(JSON.parse(part.replace(/^data: /, ""))))
|
||||
.map((part) => Schema.decodeUnknownSync(SseEvent)(JSON.parse(part.replace(/^data: /, ""))))
|
||||
}
|
||||
|
||||
const readNextEvent = (reader: ReadableStreamDefaultReader<Uint8Array>) =>
|
||||
@@ -112,7 +102,7 @@ const readNextEvent = (reader: ReadableStreamDefaultReader<Uint8Array>) =>
|
||||
if (result.done || !result.value) return Effect.fail(new Error("event stream closed"))
|
||||
const frames = decodeFrame(result.value)
|
||||
if (frames.length === 0) return Effect.fail(new Error("empty SSE frame"))
|
||||
return Effect.succeed(frames[0])
|
||||
return Effect.succeed(frames[0]!)
|
||||
}),
|
||||
)
|
||||
|
||||
@@ -172,7 +162,7 @@ describe("/event SSE delivery diagnostics", () => {
|
||||
|
||||
// The critical test. If D1 passes but this fails, the bus-identity fix is
|
||||
// incomplete OR the sync.run publish path doesn't reach the same bus
|
||||
// /event subscribes to, even within the same AppRuntime.
|
||||
// /event subscribes to, even when both share the memoMap.
|
||||
it.instance(
|
||||
"D3: delivers a SyncEvent published via SyncEvent.use.run after server.connected",
|
||||
() =>
|
||||
@@ -198,7 +188,7 @@ describe("/event SSE delivery diagnostics", () => {
|
||||
// D4: ensure the publish reaches an in-process Bus subscriber too. Confirms
|
||||
// pub/sub identity end-to-end without involving /event SSE.
|
||||
it.instance(
|
||||
"D4: SyncEvent.use.run publish reaches an in-process Bus.Service.use callback",
|
||||
"D4: SyncEvent.use.run publish reaches an in-process Bus callback",
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const received = yield* Deferred.make<BusEvent>()
|
||||
@@ -255,29 +245,6 @@ describe("/event SSE delivery diagnostics", () => {
|
||||
{ git: true, config: { formatter: false, lsp: false } },
|
||||
)
|
||||
|
||||
// D7: like D5 but the "second subscriber" is a NO-OP AppRuntime.runPromise
|
||||
// call (no PubSub.subscribe). If D7 passes, the specific subscribeAllCallback
|
||||
// is what breaks SSE — not arbitrary AppRuntime usage. If D7 fails, anything
|
||||
// running through AppRuntime concurrently with /event SSE breaks delivery.
|
||||
it.instance(
|
||||
"D7: SSE receives sync.run publish even with concurrent no-op AppRuntime activity",
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const { directory } = yield* TestInstance
|
||||
yield* inApp(Effect.void)
|
||||
|
||||
const reader = yield* openEventStream(directory)
|
||||
expect((yield* readNextEvent(reader)).type).toBe("server.connected")
|
||||
|
||||
const partID = PartID.ascending()
|
||||
yield* publishPartUpdated(partID)
|
||||
|
||||
const collected = yield* collectUntilEvent(reader, isPartUpdated)
|
||||
expect(collected.find(isPartUpdated)).toBeDefined()
|
||||
}),
|
||||
{ git: true, config: { formatter: false, lsp: false } },
|
||||
)
|
||||
|
||||
// D6: same as D5 but the callback subscriber is attached AFTER /event SSE
|
||||
// subscription is established. If D5 fails and D6 passes, the order of
|
||||
// subscriber setup is the determining factor.
|
||||
|
||||
@@ -1,121 +1,101 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test"
|
||||
import { afterEach, describe, expect } from "bun:test"
|
||||
import { Effect, Schema } from "effect"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import { Bus } from "../../src/bus"
|
||||
import { AppRuntime } from "../../src/effect/app-runtime"
|
||||
import { InstanceRef } from "../../src/effect/instance-ref"
|
||||
import { Event as ServerEvent } from "../../src/server/event"
|
||||
import { Server } from "../../src/server/server"
|
||||
import { EventPaths } from "../../src/server/routes/instance/httpapi/groups/event"
|
||||
import { Event as ServerEvent } from "../../src/server/event"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import { Effect, Schema } from "effect"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
import { disposeAllInstances, reloadTestInstance, tmpdir } from "../fixture/fixture"
|
||||
import { disposeAllInstances, TestInstance } from "../fixture/fixture"
|
||||
import { testEffectShared } from "../lib/effect"
|
||||
|
||||
void Log.init({ print: false })
|
||||
|
||||
function app() {
|
||||
return Server.Default().app
|
||||
}
|
||||
|
||||
const EventData = Schema.Struct({
|
||||
id: Schema.optional(Schema.String),
|
||||
type: Schema.String,
|
||||
properties: Schema.Record(Schema.String, Schema.Any),
|
||||
})
|
||||
|
||||
async function readChunk(reader: ReadableStreamDefaultReader<Uint8Array>) {
|
||||
let timeout: ReturnType<typeof setTimeout> | undefined
|
||||
try {
|
||||
return await Promise.race([
|
||||
reader.read(),
|
||||
new Promise<never>((_, reject) => {
|
||||
timeout = setTimeout(() => reject(new Error("timed out waiting for event")), 5_000)
|
||||
const readEvent = (reader: ReadableStreamDefaultReader<Uint8Array>) =>
|
||||
Effect.gen(function* () {
|
||||
const result = yield* Effect.promise(() => reader.read()).pipe(
|
||||
Effect.timeoutOrElse({
|
||||
duration: "5 seconds",
|
||||
orElse: () => Effect.fail(new Error("timed out waiting for event")),
|
||||
}),
|
||||
])
|
||||
} finally {
|
||||
if (timeout) clearTimeout(timeout)
|
||||
}
|
||||
}
|
||||
)
|
||||
if (result.done || !result.value) return yield* Effect.fail(new Error("event stream closed"))
|
||||
return Schema.decodeUnknownSync(EventData)(
|
||||
JSON.parse(new TextDecoder().decode(result.value).replace(/^data: /, "")),
|
||||
)
|
||||
})
|
||||
|
||||
async function readFirstEvent(response: Response) {
|
||||
if (!response.body) throw new Error("missing response body")
|
||||
const reader = response.body.getReader()
|
||||
try {
|
||||
return await readEvent(reader)
|
||||
} finally {
|
||||
await reader.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
async function readEvent(reader: ReadableStreamDefaultReader<Uint8Array>) {
|
||||
const result = await readChunk(reader)
|
||||
if (result.done || !result.value) throw new Error("event stream closed")
|
||||
return Schema.decodeUnknownSync(EventData)(JSON.parse(new TextDecoder().decode(result.value).replace(/^data: /, "")))
|
||||
}
|
||||
|
||||
async function readStatusWithin(reader: ReadableStreamDefaultReader<Uint8Array>, delay: number) {
|
||||
let timeout: ReturnType<typeof setTimeout> | undefined
|
||||
try {
|
||||
return await Promise.race([
|
||||
reader.read().then((result) => (result.done ? "closed" : "event")),
|
||||
new Promise<"open">((resolve) => {
|
||||
timeout = setTimeout(() => resolve("open"), delay)
|
||||
}),
|
||||
])
|
||||
} finally {
|
||||
if (timeout) clearTimeout(timeout)
|
||||
}
|
||||
}
|
||||
const openEventStream = (directory: string) =>
|
||||
Effect.gen(function* () {
|
||||
const response = yield* Effect.promise(async () =>
|
||||
Server.Default().app.request(EventPaths.event, { headers: { "x-opencode-directory": directory } }),
|
||||
)
|
||||
if (!response.body) return yield* Effect.die("missing SSE response body")
|
||||
const reader = response.body.getReader()
|
||||
yield* Effect.addFinalizer(() => Effect.promise(() => reader.cancel().catch(() => undefined)))
|
||||
return { response, reader }
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
await disposeAllInstances()
|
||||
await resetDatabase()
|
||||
})
|
||||
|
||||
const it = testEffectShared(Bus.defaultLayer)
|
||||
|
||||
describe("event HttpApi", () => {
|
||||
test("serves event stream", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
|
||||
it.instance(
|
||||
"serves event stream",
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const { directory } = yield* TestInstance
|
||||
const { response, reader } = yield* openEventStream(directory)
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(response.headers.get("content-type")).toContain("text/event-stream")
|
||||
expect(response.headers.get("cache-control")).toBe("no-cache, no-transform")
|
||||
expect(response.headers.get("x-accel-buffering")).toBe("no")
|
||||
expect(response.headers.get("x-content-type-options")).toBe("nosniff")
|
||||
expect(await readFirstEvent(response)).toMatchObject({ type: "server.connected", properties: {} })
|
||||
})
|
||||
expect(response.status).toBe(200)
|
||||
expect(response.headers.get("content-type")).toContain("text/event-stream")
|
||||
expect(response.headers.get("cache-control")).toBe("no-cache, no-transform")
|
||||
expect(response.headers.get("x-accel-buffering")).toBe("no")
|
||||
expect(response.headers.get("x-content-type-options")).toBe("nosniff")
|
||||
expect(yield* readEvent(reader)).toMatchObject({ type: "server.connected", properties: {} })
|
||||
}),
|
||||
{ git: true, config: { formatter: false, lsp: false } },
|
||||
)
|
||||
|
||||
test("keeps the event stream open after the initial event", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
|
||||
if (!response.body) throw new Error("missing response body")
|
||||
it.instance(
|
||||
"keeps the event stream open after the initial event",
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const { directory } = yield* TestInstance
|
||||
const { reader } = yield* openEventStream(directory)
|
||||
expect(yield* readEvent(reader)).toMatchObject({ type: "server.connected", properties: {} })
|
||||
|
||||
const reader = response.body.getReader()
|
||||
try {
|
||||
expect(await readEvent(reader)).toMatchObject({ type: "server.connected", properties: {} })
|
||||
expect(await readStatusWithin(reader, 250)).toBe("open")
|
||||
} finally {
|
||||
await reader.cancel()
|
||||
}
|
||||
})
|
||||
// If no second event arrives within 250ms, the stream is still open.
|
||||
const status = yield* Effect.promise(() => reader.read()).pipe(
|
||||
Effect.map((result) => (result.done ? ("closed" as const) : ("event" as const))),
|
||||
Effect.timeoutOrElse({ duration: "250 millis", orElse: () => Effect.succeed("open" as const) }),
|
||||
)
|
||||
expect(status).toBe("open")
|
||||
}),
|
||||
{ git: true, config: { formatter: false, lsp: false } },
|
||||
)
|
||||
|
||||
test("delivers instance bus events after the initial event", async () => {
|
||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
|
||||
if (!response.body) throw new Error("missing response body")
|
||||
it.instance(
|
||||
"delivers instance bus events after the initial event",
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const { directory } = yield* TestInstance
|
||||
const { reader } = yield* openEventStream(directory)
|
||||
expect(yield* readEvent(reader)).toMatchObject({ type: "server.connected", properties: {} })
|
||||
|
||||
const reader = response.body.getReader()
|
||||
try {
|
||||
expect(await readEvent(reader)).toMatchObject({ type: "server.connected", properties: {} })
|
||||
|
||||
const next = readEvent(reader)
|
||||
const ctx = await reloadTestInstance({ directory: tmp.path })
|
||||
await AppRuntime.runPromise(
|
||||
Bus.Service.use((svc) => svc.publish(ServerEvent.Connected, {})).pipe(Effect.provideService(InstanceRef, ctx)),
|
||||
)
|
||||
|
||||
expect(await next).toMatchObject({ type: "server.connected", properties: {} })
|
||||
} finally {
|
||||
await reader.cancel()
|
||||
}
|
||||
})
|
||||
yield* Bus.use.publish(ServerEvent.Connected, {})
|
||||
expect(yield* readEvent(reader)).toMatchObject({ type: "server.connected", properties: {} })
|
||||
}),
|
||||
{ git: true, config: { formatter: false, lsp: false } },
|
||||
)
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user