mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-13 23:52:06 +00:00
test(server): migrate session messages to Effect runner (#27234)
This commit is contained in:
@@ -1,191 +1,179 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test"
|
||||
import { afterEach, describe, expect } from "bun:test"
|
||||
import { Effect } from "effect"
|
||||
import { WithInstance } from "../../src/project/with-instance"
|
||||
import { Server } from "../../src/server/server"
|
||||
import { Session as SessionNs } from "@/session/session"
|
||||
import { MessageV2 } from "../../src/session/message-v2"
|
||||
import { ModelID, ProviderID } from "../../src/provider/schema"
|
||||
import { MessageID, PartID, type SessionID } from "../../src/session/schema"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import { disposeAllInstances, tmpdir } from "../fixture/fixture"
|
||||
import { disposeAllInstances, TestInstance } from "../fixture/fixture"
|
||||
import { testEffect } from "../lib/effect"
|
||||
|
||||
void Log.init({ print: false })
|
||||
|
||||
function run<A, E>(fx: Effect.Effect<A, E, SessionNs.Service>) {
|
||||
return Effect.runPromise(fx.pipe(Effect.provide(SessionNs.defaultLayer)))
|
||||
}
|
||||
const it = testEffect(SessionNs.defaultLayer)
|
||||
|
||||
const svc = {
|
||||
...SessionNs,
|
||||
create(input?: SessionNs.CreateInput) {
|
||||
return run(SessionNs.Service.use((svc) => svc.create(input)))
|
||||
},
|
||||
remove(id: SessionID) {
|
||||
return run(SessionNs.Service.use((svc) => svc.remove(id)))
|
||||
},
|
||||
updateMessage<T extends MessageV2.Info>(msg: T) {
|
||||
return run(SessionNs.Service.use((svc) => svc.updateMessage(msg)))
|
||||
},
|
||||
updatePart<T extends MessageV2.Part>(part: T) {
|
||||
return run(SessionNs.Service.use((svc) => svc.updatePart(part)))
|
||||
},
|
||||
const model = {
|
||||
providerID: ProviderID.make("test"),
|
||||
modelID: ModelID.make("test"),
|
||||
}
|
||||
|
||||
afterEach(async () => {
|
||||
await disposeAllInstances()
|
||||
})
|
||||
|
||||
async function withoutWatcher<T>(fn: () => Promise<T>) {
|
||||
if (process.platform !== "win32") return fn()
|
||||
const prev = process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER
|
||||
process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = "true"
|
||||
try {
|
||||
return await fn()
|
||||
} finally {
|
||||
if (prev === undefined) delete process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER
|
||||
else process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = prev
|
||||
}
|
||||
const withoutWatcher = <A, E, R>(effect: Effect.Effect<A, E, R>) => {
|
||||
if (process.platform !== "win32") return effect
|
||||
return Effect.acquireUseRelease(
|
||||
Effect.sync(() => {
|
||||
const previous = process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER
|
||||
process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = "true"
|
||||
return previous
|
||||
}),
|
||||
() => effect,
|
||||
(previous) =>
|
||||
Effect.sync(() => {
|
||||
if (previous === undefined) delete process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER
|
||||
else process.env.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = previous
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
async function fill(sessionID: SessionID, count: number, time = (i: number) => Date.now() + i) {
|
||||
const ids = [] as MessageID[]
|
||||
for (let i = 0; i < count; i++) {
|
||||
const id = MessageID.ascending()
|
||||
ids.push(id)
|
||||
await svc.updateMessage({
|
||||
id,
|
||||
sessionID,
|
||||
role: "user",
|
||||
time: { created: time(i) },
|
||||
agent: "test",
|
||||
model: { providerID: "test", modelID: "test" },
|
||||
tools: {},
|
||||
mode: "",
|
||||
} as unknown as MessageV2.Info)
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
sessionID,
|
||||
messageID: id,
|
||||
type: "text",
|
||||
text: `m${i}`,
|
||||
})
|
||||
}
|
||||
return ids
|
||||
const sessionScoped = Effect.acquireRelease(
|
||||
SessionNs.Service.use((svc) => svc.create({})),
|
||||
(session) => SessionNs.Service.use((svc) => svc.remove(session.id)).pipe(Effect.ignore),
|
||||
)
|
||||
|
||||
const fill = Effect.fn("SessionMessagesTest.fill")(function* (
|
||||
sessionID: SessionID,
|
||||
count: number,
|
||||
time = (i: number) => Date.now() + i,
|
||||
) {
|
||||
const session = yield* SessionNs.Service
|
||||
return yield* Effect.forEach(
|
||||
Array.from({ length: count }, (_, i) => i),
|
||||
(i) =>
|
||||
Effect.gen(function* () {
|
||||
const id = MessageID.ascending()
|
||||
yield* session.updateMessage({
|
||||
id,
|
||||
sessionID,
|
||||
role: "user",
|
||||
time: { created: time(i) },
|
||||
agent: "test",
|
||||
model,
|
||||
tools: {},
|
||||
} satisfies MessageV2.User)
|
||||
yield* session.updatePart({
|
||||
id: PartID.ascending(),
|
||||
sessionID,
|
||||
messageID: id,
|
||||
type: "text",
|
||||
text: `m${i}`,
|
||||
} satisfies MessageV2.TextPart)
|
||||
return id
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
function request(path: string) {
|
||||
return Effect.promise(() => Promise.resolve(Server.Default().app.request(path)))
|
||||
}
|
||||
|
||||
function json<T>(response: Response) {
|
||||
return Effect.promise(() => response.json() as Promise<T>)
|
||||
}
|
||||
|
||||
describe("session messages endpoint", () => {
|
||||
test("returns cursor headers for older pages", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await withoutWatcher(() =>
|
||||
WithInstance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
const ids = await fill(session.id, 5)
|
||||
const app = Server.Default().app
|
||||
it.instance(
|
||||
"returns cursor headers for older pages",
|
||||
withoutWatcher(
|
||||
Effect.gen(function* () {
|
||||
const session = yield* sessionScoped
|
||||
const ids = yield* fill(session.id, 5)
|
||||
|
||||
const a = await app.request(`/session/${session.id}/message?limit=2`)
|
||||
expect(a.status).toBe(200)
|
||||
const aBody = (await a.json()) as MessageV2.WithParts[]
|
||||
expect(aBody.map((item) => item.info.id)).toEqual(ids.slice(-2))
|
||||
const cursor = a.headers.get("x-next-cursor")
|
||||
expect(cursor).toBeTruthy()
|
||||
expect(a.headers.get("link")).toContain('rel="next"')
|
||||
const a = yield* request(`/session/${session.id}/message?limit=2`)
|
||||
expect(a.status).toBe(200)
|
||||
const aBody = yield* json<MessageV2.WithParts[]>(a)
|
||||
expect(aBody.map((item) => item.info.id)).toEqual(ids.slice(-2))
|
||||
const cursor = a.headers.get("x-next-cursor")
|
||||
expect(cursor).toBeTruthy()
|
||||
expect(a.headers.get("link")).toContain('rel="next"')
|
||||
|
||||
const b = await app.request(`/session/${session.id}/message?limit=2&before=${encodeURIComponent(cursor!)}`)
|
||||
expect(b.status).toBe(200)
|
||||
const bBody = (await b.json()) as MessageV2.WithParts[]
|
||||
expect(bBody.map((item) => item.info.id)).toEqual(ids.slice(-4, -2))
|
||||
|
||||
await svc.remove(session.id)
|
||||
},
|
||||
const b = yield* request(`/session/${session.id}/message?limit=2&before=${encodeURIComponent(cursor!)}`)
|
||||
expect(b.status).toBe(200)
|
||||
const bBody = yield* json<MessageV2.WithParts[]>(b)
|
||||
expect(bBody.map((item) => item.info.id)).toEqual(ids.slice(-4, -2))
|
||||
}),
|
||||
)
|
||||
})
|
||||
),
|
||||
{ git: true },
|
||||
)
|
||||
|
||||
test("keeps full-history responses when limit is omitted", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await withoutWatcher(() =>
|
||||
WithInstance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
const ids = await fill(session.id, 3)
|
||||
const app = Server.Default().app
|
||||
it.instance(
|
||||
"keeps full-history responses when limit is omitted",
|
||||
withoutWatcher(
|
||||
Effect.gen(function* () {
|
||||
const session = yield* sessionScoped
|
||||
const ids = yield* fill(session.id, 3)
|
||||
|
||||
const res = await app.request(`/session/${session.id}/message`)
|
||||
expect(res.status).toBe(200)
|
||||
const body = (await res.json()) as MessageV2.WithParts[]
|
||||
expect(body.map((item) => item.info.id)).toEqual(ids)
|
||||
|
||||
await svc.remove(session.id)
|
||||
},
|
||||
const res = yield* request(`/session/${session.id}/message`)
|
||||
expect(res.status).toBe(200)
|
||||
const body = yield* json<MessageV2.WithParts[]>(res)
|
||||
expect(body.map((item) => item.info.id)).toEqual(ids)
|
||||
}),
|
||||
)
|
||||
})
|
||||
),
|
||||
{ git: true },
|
||||
)
|
||||
|
||||
test("rejects invalid cursors and missing sessions", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await withoutWatcher(() =>
|
||||
WithInstance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
const app = Server.Default().app
|
||||
it.instance(
|
||||
"rejects invalid cursors and missing sessions",
|
||||
withoutWatcher(
|
||||
Effect.gen(function* () {
|
||||
const session = yield* sessionScoped
|
||||
|
||||
const bad = await app.request(`/session/${session.id}/message?limit=2&before=bad`)
|
||||
expect(bad.status).toBe(400)
|
||||
const bad = yield* request(`/session/${session.id}/message?limit=2&before=bad`)
|
||||
expect(bad.status).toBe(400)
|
||||
|
||||
const miss = await app.request(`/session/ses_missing/message?limit=2`)
|
||||
expect(miss.status).toBe(404)
|
||||
|
||||
await svc.remove(session.id)
|
||||
},
|
||||
const miss = yield* request(`/session/ses_missing/message?limit=2`)
|
||||
expect(miss.status).toBe(404)
|
||||
}),
|
||||
)
|
||||
})
|
||||
),
|
||||
{ git: true },
|
||||
)
|
||||
|
||||
test("does not truncate large legacy limit requests", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await withoutWatcher(() =>
|
||||
WithInstance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
await fill(session.id, 520)
|
||||
const app = Server.Default().app
|
||||
it.instance(
|
||||
"does not truncate large legacy limit requests",
|
||||
withoutWatcher(
|
||||
Effect.gen(function* () {
|
||||
const session = yield* sessionScoped
|
||||
yield* fill(session.id, 520)
|
||||
|
||||
const res = await app.request(`/session/${session.id}/message?limit=510`)
|
||||
expect(res.status).toBe(200)
|
||||
const body = (await res.json()) as MessageV2.WithParts[]
|
||||
expect(body).toHaveLength(510)
|
||||
|
||||
await svc.remove(session.id)
|
||||
},
|
||||
const res = yield* request(`/session/${session.id}/message?limit=510`)
|
||||
expect(res.status).toBe(200)
|
||||
const body = yield* json<MessageV2.WithParts[]>(res)
|
||||
expect(body).toHaveLength(510)
|
||||
}),
|
||||
)
|
||||
})
|
||||
),
|
||||
{ git: true },
|
||||
)
|
||||
|
||||
test("accepts directory query used by workspace routing", async () => {
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await withoutWatcher(() =>
|
||||
WithInstance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
await fill(session.id, 1)
|
||||
const app = Server.Default().app
|
||||
it.instance(
|
||||
"accepts directory query used by workspace routing",
|
||||
withoutWatcher(
|
||||
Effect.gen(function* () {
|
||||
const tmp = yield* TestInstance
|
||||
const session = yield* sessionScoped
|
||||
yield* fill(session.id, 1)
|
||||
|
||||
const res = await app.request(
|
||||
`/session/${session.id}/message?limit=80&directory=${encodeURIComponent(tmp.path)}`,
|
||||
)
|
||||
expect(res.status).toBe(200)
|
||||
const body = await res.json()
|
||||
expect(Array.isArray(body)).toBe(true)
|
||||
expect(body).toHaveLength(1)
|
||||
|
||||
await svc.remove(session.id)
|
||||
},
|
||||
const res = yield* request(
|
||||
`/session/${session.id}/message?limit=80&directory=${encodeURIComponent(tmp.directory)}`,
|
||||
)
|
||||
expect(res.status).toBe(200)
|
||||
const body = yield* json<unknown[]>(res)
|
||||
expect(Array.isArray(body)).toBe(true)
|
||||
expect(body).toHaveLength(1)
|
||||
}),
|
||||
)
|
||||
})
|
||||
),
|
||||
{ git: true },
|
||||
)
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user