mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-13 23:52:06 +00:00
Stabilize session event tests (#27117)
This commit is contained in:
@@ -1,186 +1,174 @@
|
||||
import { describe, expect, test } from "bun:test"
|
||||
import path from "path"
|
||||
import { describe, expect } from "bun:test"
|
||||
import { Deferred, Effect, Exit, Layer } from "effect"
|
||||
import { Session as SessionNs } from "@/session/session"
|
||||
import { Bus } from "../../src/bus"
|
||||
import { GlobalBus, type GlobalEvent } from "../../src/bus/global"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { WithInstance } from "../../src/project/with-instance"
|
||||
import { Flag } from "@opencode-ai/core/flag/flag"
|
||||
import { MessageV2 } from "../../src/session/message-v2"
|
||||
import { MessageID, PartID, type SessionID } from "../../src/session/schema"
|
||||
import { AppRuntime } from "../../src/effect/app-runtime"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
|
||||
import { provideInstance, tmpdirScoped } from "../fixture/fixture"
|
||||
import { testEffect } from "../lib/effect"
|
||||
|
||||
const projectRoot = path.join(__dirname, "../..")
|
||||
void Log.init({ print: false })
|
||||
|
||||
function create(input?: SessionNs.CreateInput) {
|
||||
return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create(input)))
|
||||
}
|
||||
const it = testEffect(Layer.mergeAll(SessionNs.defaultLayer, CrossSpawnSpawner.defaultLayer))
|
||||
|
||||
function get(id: SessionID) {
|
||||
return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.get(id)))
|
||||
}
|
||||
const awaitDeferred = <T>(deferred: Deferred.Deferred<T>, message: string) =>
|
||||
Effect.race(
|
||||
Deferred.await(deferred),
|
||||
Effect.sleep("2 seconds").pipe(Effect.flatMap(() => Effect.fail(new Error(message)))),
|
||||
)
|
||||
|
||||
function remove(id: SessionID) {
|
||||
return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.remove(id)))
|
||||
}
|
||||
const remove = (id: SessionID) => SessionNs.Service.use((svc) => svc.remove(id))
|
||||
|
||||
function updateMessage<T extends MessageV2.Info>(msg: T) {
|
||||
return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.updateMessage(msg)))
|
||||
}
|
||||
|
||||
function updatePart<T extends MessageV2.Part>(part: T) {
|
||||
return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.updatePart(part)))
|
||||
const subscribeGlobal = (type: string, callback: (event: NonNullable<GlobalEvent["payload"]>) => void) => {
|
||||
const listener = (event: GlobalEvent) => {
|
||||
if (event.payload?.type === type) callback(event.payload)
|
||||
}
|
||||
GlobalBus.on("event", listener)
|
||||
return () => GlobalBus.off("event", listener)
|
||||
}
|
||||
|
||||
describe("session.created event", () => {
|
||||
test("should emit session.created event when session is created", async () => {
|
||||
await WithInstance.provide({
|
||||
directory: projectRoot,
|
||||
fn: async () => {
|
||||
let eventReceived = false
|
||||
let receivedInfo: SessionNs.Info | undefined
|
||||
it.instance("should emit session.created event when session is created", () =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* SessionNs.Service
|
||||
const received = yield* Deferred.make<SessionNs.Info>()
|
||||
|
||||
const unsub = Bus.subscribe(SessionNs.Event.Created, (event) => {
|
||||
eventReceived = true
|
||||
receivedInfo = event.properties.info as SessionNs.Info
|
||||
})
|
||||
const unsub = subscribeGlobal(SessionNs.Event.Created.type, (event) => {
|
||||
Deferred.doneUnsafe(received, Effect.succeed(event.properties.info as SessionNs.Info))
|
||||
})
|
||||
yield* Effect.addFinalizer(() => Effect.sync(unsub))
|
||||
|
||||
const info = await create({})
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
unsub()
|
||||
const info = yield* session.create({})
|
||||
const receivedInfo = yield* awaitDeferred(received, "timed out waiting for session.created")
|
||||
|
||||
expect(eventReceived).toBe(true)
|
||||
expect(receivedInfo).toBeDefined()
|
||||
expect(receivedInfo?.id).toBe(info.id)
|
||||
expect(receivedInfo?.projectID).toBe(info.projectID)
|
||||
expect(receivedInfo?.directory).toBe(info.directory)
|
||||
expect(receivedInfo?.path).toBe(info.path)
|
||||
expect(receivedInfo?.title).toBe(info.title)
|
||||
expect(receivedInfo.id).toBe(info.id)
|
||||
expect(receivedInfo.projectID).toBe(info.projectID)
|
||||
expect(receivedInfo.directory).toBe(info.directory)
|
||||
expect(receivedInfo.path).toBe(info.path)
|
||||
expect(receivedInfo.title).toBe(info.title)
|
||||
|
||||
await remove(info.id)
|
||||
},
|
||||
})
|
||||
})
|
||||
yield* session.remove(info.id)
|
||||
}),
|
||||
)
|
||||
|
||||
test("session.created event should be emitted before session.updated", async () => {
|
||||
await WithInstance.provide({
|
||||
directory: projectRoot,
|
||||
fn: async () => {
|
||||
const events: string[] = []
|
||||
it.instance("session.created event should be emitted before session.updated", () =>
|
||||
Effect.gen(function* () {
|
||||
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) return
|
||||
|
||||
const unsubCreated = Bus.subscribe(SessionNs.Event.Created, () => {
|
||||
events.push("created")
|
||||
})
|
||||
const session = yield* SessionNs.Service
|
||||
const events: string[] = []
|
||||
const received = yield* Deferred.make<string[]>()
|
||||
const push = (event: string) => {
|
||||
events.push(event)
|
||||
if (events.includes("created") && events.includes("updated")) {
|
||||
Deferred.doneUnsafe(received, Effect.succeed(events))
|
||||
}
|
||||
}
|
||||
|
||||
const unsubUpdated = Bus.subscribe(SessionNs.Event.Updated, () => {
|
||||
events.push("updated")
|
||||
})
|
||||
const unsubCreated = subscribeGlobal(SessionNs.Event.Created.type, () => {
|
||||
push("created")
|
||||
})
|
||||
yield* Effect.addFinalizer(() => Effect.sync(unsubCreated))
|
||||
|
||||
const info = await create({})
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
unsubCreated()
|
||||
unsubUpdated()
|
||||
const unsubUpdated = subscribeGlobal(SessionNs.Event.Updated.type, () => {
|
||||
push("updated")
|
||||
})
|
||||
yield* Effect.addFinalizer(() => Effect.sync(unsubUpdated))
|
||||
|
||||
expect(events).toContain("created")
|
||||
expect(events).toContain("updated")
|
||||
expect(events.indexOf("created")).toBeLessThan(events.indexOf("updated"))
|
||||
const info = yield* session.create({})
|
||||
const receivedEvents = yield* awaitDeferred(received, "timed out waiting for session created/updated events")
|
||||
|
||||
await remove(info.id)
|
||||
},
|
||||
})
|
||||
})
|
||||
expect(receivedEvents).toContain("created")
|
||||
expect(receivedEvents).toContain("updated")
|
||||
expect(receivedEvents.indexOf("created")).toBeLessThan(receivedEvents.indexOf("updated"))
|
||||
|
||||
yield* session.remove(info.id)
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe("step-finish token propagation via Bus event", () => {
|
||||
test(
|
||||
it.instance(
|
||||
"non-zero tokens propagate through PartUpdated event",
|
||||
async () => {
|
||||
await WithInstance.provide({
|
||||
directory: projectRoot,
|
||||
fn: async () => {
|
||||
const info = await create({})
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* SessionNs.Service
|
||||
const info = yield* session.create({})
|
||||
|
||||
const messageID = MessageID.ascending()
|
||||
await updateMessage({
|
||||
id: messageID,
|
||||
sessionID: info.id,
|
||||
role: "user",
|
||||
time: { created: Date.now() },
|
||||
agent: "user",
|
||||
model: { providerID: "test", modelID: "test" },
|
||||
tools: {},
|
||||
mode: "",
|
||||
} as unknown as MessageV2.Info)
|
||||
const messageID = MessageID.ascending()
|
||||
yield* session.updateMessage({
|
||||
id: messageID,
|
||||
sessionID: info.id,
|
||||
role: "user",
|
||||
time: { created: Date.now() },
|
||||
agent: "user",
|
||||
model: { providerID: "test", modelID: "test" },
|
||||
tools: {},
|
||||
mode: "",
|
||||
} as unknown as MessageV2.Info)
|
||||
|
||||
// Bus subscribers receive readonly Schema.Type payloads; `MessageV2.Part`
|
||||
// is the mutable domain type. Cast bridges the two — safe because the
|
||||
// test only reads the value afterwards.
|
||||
let received: MessageV2.Part | undefined
|
||||
const unsub = Bus.subscribe(MessageV2.Event.PartUpdated, (event) => {
|
||||
received = event.properties.part as MessageV2.Part
|
||||
})
|
||||
// Bus subscribers receive readonly Schema.Type payloads; `MessageV2.Part`
|
||||
// is the mutable domain type. Cast bridges the two — safe because the
|
||||
// test only reads the value afterwards.
|
||||
const received = yield* Deferred.make<MessageV2.Part>()
|
||||
const unsub = subscribeGlobal(MessageV2.Event.PartUpdated.type, (event) => {
|
||||
Deferred.doneUnsafe(received, Effect.succeed(event.properties.part as MessageV2.Part))
|
||||
})
|
||||
yield* Effect.addFinalizer(() => Effect.sync(unsub))
|
||||
|
||||
const tokens = {
|
||||
total: 1500,
|
||||
input: 500,
|
||||
output: 800,
|
||||
reasoning: 200,
|
||||
cache: { read: 100, write: 50 },
|
||||
}
|
||||
const tokens = {
|
||||
total: 1500,
|
||||
input: 500,
|
||||
output: 800,
|
||||
reasoning: 200,
|
||||
cache: { read: 100, write: 50 },
|
||||
}
|
||||
|
||||
const partInput = {
|
||||
id: PartID.ascending(),
|
||||
messageID,
|
||||
sessionID: info.id,
|
||||
type: "step-finish" as const,
|
||||
reason: "stop",
|
||||
cost: 0.005,
|
||||
tokens,
|
||||
}
|
||||
const partInput = {
|
||||
id: PartID.ascending(),
|
||||
messageID,
|
||||
sessionID: info.id,
|
||||
type: "step-finish" as const,
|
||||
reason: "stop",
|
||||
cost: 0.005,
|
||||
tokens,
|
||||
}
|
||||
|
||||
await updatePart(partInput)
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
yield* session.updatePart(partInput)
|
||||
const receivedPart = yield* awaitDeferred(received, "timed out waiting for message.part.updated")
|
||||
|
||||
expect(received).toBeDefined()
|
||||
expect(received!.type).toBe("step-finish")
|
||||
const finish = received as MessageV2.StepFinishPart
|
||||
expect(finish.tokens.input).toBe(500)
|
||||
expect(finish.tokens.output).toBe(800)
|
||||
expect(finish.tokens.reasoning).toBe(200)
|
||||
expect(finish.tokens.total).toBe(1500)
|
||||
expect(finish.tokens.cache.read).toBe(100)
|
||||
expect(finish.tokens.cache.write).toBe(50)
|
||||
expect(finish.cost).toBe(0.005)
|
||||
expect(received).not.toBe(partInput)
|
||||
expect(receivedPart.type).toBe("step-finish")
|
||||
const finish = receivedPart as MessageV2.StepFinishPart
|
||||
expect(finish.tokens.input).toBe(500)
|
||||
expect(finish.tokens.output).toBe(800)
|
||||
expect(finish.tokens.reasoning).toBe(200)
|
||||
expect(finish.tokens.total).toBe(1500)
|
||||
expect(finish.tokens.cache.read).toBe(100)
|
||||
expect(finish.tokens.cache.write).toBe(50)
|
||||
expect(finish.cost).toBe(0.005)
|
||||
expect(receivedPart).not.toBe(partInput)
|
||||
|
||||
unsub()
|
||||
await remove(info.id)
|
||||
},
|
||||
})
|
||||
},
|
||||
yield* session.remove(info.id)
|
||||
}),
|
||||
{ timeout: 30000 },
|
||||
)
|
||||
})
|
||||
|
||||
describe("Session", () => {
|
||||
test("remove works without an instance", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
it.live("remove works without an instance", () =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* SessionNs.Service
|
||||
const dir = yield* tmpdirScoped({ git: true })
|
||||
const info = yield* provideInstance(dir)(session.create({ title: "remove-without-instance" }))
|
||||
|
||||
const info = await WithInstance.provide({
|
||||
directory: tmp.path,
|
||||
fn: () => create({ title: "remove-without-instance" }),
|
||||
})
|
||||
const removeExit = yield* remove(info.id).pipe(Effect.exit)
|
||||
expect(Exit.isSuccess(removeExit)).toBe(true)
|
||||
|
||||
await expect(async () => {
|
||||
await remove(info.id)
|
||||
}).not.toThrow()
|
||||
|
||||
let missing = false
|
||||
await get(info.id).catch(() => {
|
||||
missing = true
|
||||
})
|
||||
|
||||
expect(missing).toBe(true)
|
||||
})
|
||||
const getExit = yield* session.get(info.id).pipe(Effect.exit)
|
||||
expect(Exit.isFailure(getExit)).toBe(true)
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user