From ccf93f3523393b5226a1172ddaa85ac07a5c4b6d Mon Sep 17 00:00:00 2001 From: Shoubhit Dash Date: Wed, 13 May 2026 14:40:12 +0530 Subject: [PATCH] fix(session): make message reads effectful (#27291) --- .../instance/httpapi/handlers/session.ts | 4 +- packages/opencode/src/session/message-v2.ts | 41 +++----- packages/opencode/src/session/session.ts | 6 +- packages/opencode/src/tool/task.ts | 2 +- .../test/session/messages-pagination.test.ts | 94 +++++++------------ .../test/session/processor-effect.test.ts | 4 +- packages/opencode/test/session/prompt.test.ts | 8 +- 7 files changed, 60 insertions(+), 99 deletions(-) diff --git a/packages/opencode/src/server/routes/instance/httpapi/handlers/session.ts b/packages/opencode/src/server/routes/instance/httpapi/handlers/session.ts index ebd0fcd4d8..40444385f5 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/handlers/session.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/handlers/session.ts @@ -105,7 +105,7 @@ export const sessionHandlers = HttpApiBuilder.group(InstanceHttpApi, "session", } const page = yield* SessionError.mapStorageNotFound( - MessageV2.pageEffect({ + MessageV2.page({ sessionID: ctx.params.sessionID, limit: ctx.query.limit, before: ctx.query.before, @@ -132,7 +132,7 @@ export const sessionHandlers = HttpApiBuilder.group(InstanceHttpApi, "session", params: { sessionID: SessionID; messageID: MessageID } }) { return yield* SessionError.mapStorageNotFound( - MessageV2.getEffect({ sessionID: ctx.params.sessionID, messageID: ctx.params.messageID }), + MessageV2.get({ sessionID: ctx.params.sessionID, messageID: ctx.params.messageID }), ) }) diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 10754298b2..869ef979f2 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -919,7 +919,11 @@ export function toModelMessages( return Effect.runPromise(toModelMessagesEffect(input, model, options).pipe(Effect.provide(EffectLogger.layer))) } -export function page(input: { sessionID: SessionID; limit: number; before?: string }) { +export const page = Effect.fn("MessageV2.page")(function* (input: { + sessionID: SessionID + limit: number + before?: string +}) { const before = input.before ? cursor.decode(input.before) : undefined const where = before ? and(eq(MessageTable.session_id, input.sessionID), older(before)) @@ -937,7 +941,7 @@ export function page(input: { sessionID: SessionID; limit: number; before?: stri const row = Database.use((db) => db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(), ) - if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` }) + if (!row) return yield* new NotFoundError({ message: `Session not found: ${input.sessionID}` }) return { items: [] as WithParts[], more: false, @@ -954,24 +958,19 @@ export function page(input: { sessionID: SessionID; limit: number; before?: stri more, cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined, } -} - -export const pageEffect = Effect.fn("MessageV2.pageEffect")(function* (input: { - sessionID: SessionID - limit: number - before?: string -}) { - return yield* Effect.try({ - try: () => page(input), - catch: (error) => error, - }).pipe(Effect.catch((error) => (NotFoundError.isInstance(error) ? Effect.fail(error) : Effect.die(error)))) }) export function* stream(sessionID: SessionID) { const size = 50 let before: string | undefined while (true) { - const next = page({ sessionID, limit: size, before }) + const next = Effect.runSync( + page({ sessionID, limit: size, before }).pipe( + Effect.catchIf(NotFoundError.isInstance, () => + Effect.succeed({ items: [] as WithParts[], more: false, cursor: undefined }), + ), + ), + ) if (next.items.length === 0) break for (let i = next.items.length - 1; i >= 0; i--) { yield next.items[i] @@ -996,7 +995,7 @@ export function parts(message_id: MessageID) { ) } -export function get(input: { sessionID: SessionID; messageID: MessageID }): WithParts { +export const get = Effect.fn("MessageV2.get")(function* (input: { sessionID: SessionID; messageID: MessageID }) { const row = Database.use((db) => db .select() @@ -1004,21 +1003,11 @@ export function get(input: { sessionID: SessionID; messageID: MessageID }): With .where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID))) .get(), ) - if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` }) + if (!row) return yield* new NotFoundError({ message: `Message not found: ${input.messageID}` }) return { info: info(row), parts: parts(input.messageID), } -} - -export const getEffect = Effect.fn("MessageV2.getEffect")(function* (input: { - sessionID: SessionID - messageID: MessageID -}) { - return yield* Effect.try({ - try: () => get(input), - catch: (error) => error, - }).pipe(Effect.catch((error) => (NotFoundError.isInstance(error) ? Effect.fail(error) : Effect.die(error)))) }) export function filterCompacted(msgs: Iterable) { diff --git a/packages/opencode/src/session/session.ts b/packages/opencode/src/session/session.ts index d82b2369e1..df173e895b 100644 --- a/packages/opencode/src/session/session.ts +++ b/packages/opencode/src/session/session.ts @@ -759,14 +759,14 @@ export const layer: Layer.Layer= 0; i--) { const item = page.items[i] @@ -817,7 +817,7 @@ export const layer: Layer.Layer= 0; i--) { const item = page.items[i] diff --git a/packages/opencode/src/tool/task.ts b/packages/opencode/src/tool/task.ts index d3572c1c4f..09bbaca262 100644 --- a/packages/opencode/src/tool/task.ts +++ b/packages/opencode/src/tool/task.ts @@ -86,7 +86,7 @@ export const TaskTool = Tool.define( ], })) - const msg = yield* MessageV2.getEffect({ sessionID: ctx.sessionID, messageID: ctx.messageID }).pipe(Effect.orDie) + const msg = yield* MessageV2.get({ sessionID: ctx.sessionID, messageID: ctx.messageID }).pipe(Effect.orDie) if (msg.info.role !== "assistant") return yield* Effect.fail(new Error("Not an assistant message")) const model = next.model ?? { diff --git a/packages/opencode/test/session/messages-pagination.test.ts b/packages/opencode/test/session/messages-pagination.test.ts index 247f95d280..e558d07b50 100644 --- a/packages/opencode/test/session/messages-pagination.test.ts +++ b/packages/opencode/test/session/messages-pagination.test.ts @@ -12,20 +12,6 @@ void Log.init({ print: false }) const it = testEffect(SessionNs.defaultLayer) -function expectNotFound(fn: () => unknown, message: string) { - let thrown: unknown - try { - fn() - } catch (error) { - thrown = error - } - expect(thrown).toBeInstanceOf(NotFoundError) - if (thrown instanceof NotFoundError) { - expect(thrown._tag).toBe("NotFoundError") - expect(thrown.message).toBe(message) - } -} - const withSession = ( fn: (input: { session: SessionNs.Interface; sessionID: SessionID }) => Effect.Effect, ) => @@ -140,12 +126,12 @@ const addCompactionPart = Effect.fn("Test.addCompactionPart")(function* ( }) describe("MessageV2.page", () => { - it.instance("returns sync result", () => + it.instance("returns page result", () => withSession(({ sessionID }) => Effect.gen(function* () { yield* fill(sessionID, 2) - const result = MessageV2.page({ sessionID, limit: 10 }) + const result = yield* MessageV2.page({ sessionID, limit: 10 }) expect(result).toBeDefined() expect(result.items).toBeArray() }), @@ -157,18 +143,18 @@ describe("MessageV2.page", () => { Effect.gen(function* () { const ids = yield* fill(sessionID, 6) - const a = MessageV2.page({ sessionID, limit: 2 }) + const a = yield* MessageV2.page({ sessionID, limit: 2 }) expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2)) expect(a.items.every((item) => item.parts.length === 1)).toBe(true) expect(a.more).toBe(true) expect(a.cursor).toBeTruthy() - const b = MessageV2.page({ sessionID, limit: 2, before: a.cursor! }) + const b = yield* MessageV2.page({ sessionID, limit: 2, before: a.cursor! }) expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(-4, -2)) expect(b.more).toBe(true) expect(b.cursor).toBeTruthy() - const c = MessageV2.page({ sessionID, limit: 2, before: b.cursor! }) + const c = yield* MessageV2.page({ sessionID, limit: 2, before: b.cursor! }) expect(c.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2)) expect(c.more).toBe(false) expect(c.cursor).toBeUndefined() @@ -181,7 +167,7 @@ describe("MessageV2.page", () => { Effect.gen(function* () { const ids = yield* fill(sessionID, 4) - const result = MessageV2.page({ sessionID, limit: 4 }) + const result = yield* MessageV2.page({ sessionID, limit: 4 }) expect(result.items.map((item) => item.info.id)).toEqual(ids) }), ), @@ -190,7 +176,7 @@ describe("MessageV2.page", () => { it.instance("returns empty items for session with no messages", () => withSession(({ sessionID }) => Effect.gen(function* () { - const result = MessageV2.page({ sessionID, limit: 10 }) + const result = yield* MessageV2.page({ sessionID, limit: 10 }) expect(result.items).toEqual([]) expect(result.more).toBe(false) expect(result.cursor).toBeUndefined() @@ -198,17 +184,10 @@ describe("MessageV2.page", () => { ), ) - it.instance("throws NotFoundError for non-existent session", () => + it.instance("fails with NotFoundError for non-existent session", () => Effect.gen(function* () { const fake = "non-existent-session" as SessionID - expectNotFound(() => MessageV2.page({ sessionID: fake, limit: 10 }), `Session not found: ${fake}`) - }), - ) - - it.instance("fails pageEffect with NotFoundError for non-existent session", () => - Effect.gen(function* () { - const fake = "non-existent-session" as SessionID - const error = yield* Effect.flip(MessageV2.pageEffect({ sessionID: fake, limit: 10 })) + const error = yield* Effect.flip(MessageV2.page({ sessionID: fake, limit: 10 })) expect(error).toBeInstanceOf(NotFoundError) expect(error.message).toBe(`Session not found: ${fake}`) }), @@ -219,7 +198,7 @@ describe("MessageV2.page", () => { Effect.gen(function* () { const ids = yield* fill(sessionID, 3) - const result = MessageV2.page({ sessionID, limit: 3 }) + const result = yield* MessageV2.page({ sessionID, limit: 3 }) expect(result.items.map((item) => item.info.id)).toEqual(ids) expect(result.more).toBe(false) expect(result.cursor).toBeUndefined() @@ -232,7 +211,7 @@ describe("MessageV2.page", () => { Effect.gen(function* () { const ids = yield* fill(sessionID, 5) - const result = MessageV2.page({ sessionID, limit: 1 }) + const result = yield* MessageV2.page({ sessionID, limit: 1 }) expect(result.items).toHaveLength(1) expect(result.items[0].info.id).toBe(ids[ids.length - 1]) expect(result.more).toBe(true) @@ -253,7 +232,7 @@ describe("MessageV2.page", () => { text: "extra", }) - const result = MessageV2.page({ sessionID, limit: 10 }) + const result = yield* MessageV2.page({ sessionID, limit: 10 }) expect(result.items).toHaveLength(1) expect(result.items[0].parts).toHaveLength(2) }), @@ -265,8 +244,8 @@ describe("MessageV2.page", () => { Effect.gen(function* () { const ids = yield* fill(sessionID, 4, (i: number) => 1000.5 + i) - const a = MessageV2.page({ sessionID, limit: 2 }) - const b = MessageV2.page({ sessionID, limit: 2, before: a.cursor! }) + const a = yield* MessageV2.page({ sessionID, limit: 2 }) + const b = yield* MessageV2.page({ sessionID, limit: 2, before: a.cursor! }) expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2)) expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2)) @@ -279,11 +258,11 @@ describe("MessageV2.page", () => { Effect.gen(function* () { const ids = yield* fill(sessionID, 4, () => 1000) - const a = MessageV2.page({ sessionID, limit: 2 }) + const a = yield* MessageV2.page({ sessionID, limit: 2 }) expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2)) expect(a.more).toBe(true) - const b = MessageV2.page({ sessionID, limit: 2, before: a.cursor! }) + const b = yield* MessageV2.page({ sessionID, limit: 2, before: a.cursor! }) expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2)) expect(b.more).toBe(false) }), @@ -298,8 +277,8 @@ describe("MessageV2.page", () => { yield* fill(a.id, 3) yield* fill(b.id, 2) - const resultA = MessageV2.page({ sessionID: a.id, limit: 10 }) - const resultB = MessageV2.page({ sessionID: b.id, limit: 10 }) + const resultA = yield* MessageV2.page({ sessionID: a.id, limit: 10 }) + const resultB = yield* MessageV2.page({ sessionID: b.id, limit: 10 }) expect(resultA.items).toHaveLength(3) expect(resultB.items).toHaveLength(2) expect(resultA.items.every((item) => item.info.sessionID === a.id)).toBe(true) @@ -315,7 +294,7 @@ describe("MessageV2.page", () => { Effect.gen(function* () { const ids = yield* fill(sessionID, 10) - const result = MessageV2.page({ sessionID, limit: 100 }) + const result = yield* MessageV2.page({ sessionID, limit: 100 }) expect(result.items).toHaveLength(10) expect(result.items.map((item) => item.info.id)).toEqual(ids) expect(result.more).toBe(false) @@ -482,7 +461,7 @@ describe("MessageV2.get", () => { Effect.gen(function* () { const [id] = yield* fill(sessionID, 1) - const result = MessageV2.get({ sessionID, messageID: id }) + const result = yield* MessageV2.get({ sessionID, messageID: id }) expect(result.info.id).toBe(id) expect(result.info.sessionID).toBe(sessionID) expect(result.info.role).toBe("user") @@ -492,20 +471,11 @@ describe("MessageV2.get", () => { ), ) - it.instance("throws NotFoundError for non-existent message", () => + it.instance("fails with NotFoundError for non-existent message", () => withSession(({ sessionID }) => Effect.gen(function* () { const messageID = MessageID.ascending() - expectNotFound(() => MessageV2.get({ sessionID, messageID }), `Message not found: ${messageID}`) - }), - ), - ) - - it.instance("fails getEffect with NotFoundError for non-existent message", () => - withSession(({ sessionID }) => - Effect.gen(function* () { - const messageID = MessageID.ascending() - const error = yield* Effect.flip(MessageV2.getEffect({ sessionID, messageID })) + const error = yield* Effect.flip(MessageV2.get({ sessionID, messageID })) expect(error).toBeInstanceOf(NotFoundError) expect(error.message).toBe(`Message not found: ${messageID}`) }), @@ -519,8 +489,10 @@ describe("MessageV2.get", () => { const b = yield* session.create({}) const [id] = yield* fill(a.id, 1) - expectNotFound(() => MessageV2.get({ sessionID: b.id, messageID: id }), `Message not found: ${id}`) - const result = MessageV2.get({ sessionID: a.id, messageID: id }) + const error = yield* Effect.flip(MessageV2.get({ sessionID: b.id, messageID: id })) + expect(error).toBeInstanceOf(NotFoundError) + expect(error.message).toBe(`Message not found: ${id}`) + const result = yield* MessageV2.get({ sessionID: a.id, messageID: id }) expect(result.info.id).toBe(id) yield* session.remove(a.id) @@ -541,7 +513,7 @@ describe("MessageV2.get", () => { text: "extra", }) - const result = MessageV2.get({ sessionID, messageID: id }) + const result = yield* MessageV2.get({ sessionID, messageID: id }) expect(result.parts).toHaveLength(2) }), ), @@ -561,7 +533,7 @@ describe("MessageV2.get", () => { text: "response", }) - const result = MessageV2.get({ sessionID, messageID: aid }) + const result = yield* MessageV2.get({ sessionID, messageID: aid }) expect(result.info.role).toBe("assistant") expect(result.parts).toHaveLength(1) expect((result.parts[0] as MessageV2.TextPart).text).toBe("response") @@ -574,7 +546,7 @@ describe("MessageV2.get", () => { Effect.gen(function* () { const id = yield* addUser(sessionID) - const result = MessageV2.get({ sessionID, messageID: id }) + const result = yield* MessageV2.get({ sessionID, messageID: id }) expect(result.info.id).toBe(id) expect(result.parts).toEqual([]) }), @@ -1026,9 +998,9 @@ describe("MessageV2 consistency", () => { Effect.gen(function* () { yield* fill(sessionID, 3) - const paged = MessageV2.page({ sessionID, limit: 10 }) + const paged = yield* MessageV2.page({ sessionID, limit: 10 }) for (const item of paged.items) { - const got = MessageV2.get({ sessionID, messageID: item.info.id as MessageID }) + const got = yield* MessageV2.get({ sessionID, messageID: item.info.id as MessageID }) expect(got.info).toEqual(item.info) expect(got.parts).toEqual(item.parts) } @@ -1041,7 +1013,7 @@ describe("MessageV2 consistency", () => { Effect.gen(function* () { const [id] = yield* fill(sessionID, 1) - const got = MessageV2.get({ sessionID, messageID: id }) + const got = yield* MessageV2.get({ sessionID, messageID: id }) const standalone = MessageV2.parts(id) expect(got.parts).toEqual(standalone) }), @@ -1058,7 +1030,7 @@ describe("MessageV2 consistency", () => { const paged = [] as MessageV2.WithParts[] let cursor: string | undefined while (true) { - const result = MessageV2.page({ sessionID, limit: 3, before: cursor }) + const result = yield* MessageV2.page({ sessionID, limit: 3, before: cursor }) for (let i = result.items.length - 1; i >= 0; i--) { paged.push(result.items[i]) } diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index 56ff102430..fc5e73877d 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -767,7 +767,7 @@ it.live("session.processor effect tests record aborted errors and idle state", ( const exit = yield* Fiber.await(run) yield* Effect.promise(() => seen.promise) - const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id }) + const stored = yield* MessageV2.get({ sessionID: chat.id, messageID: msg.id }) const state = yield* sts.get(chat.id) off() @@ -829,7 +829,7 @@ it.live("session.processor effect tests mark interruptions aborted without manua yield* Fiber.interrupt(run) const exit = yield* Fiber.await(run) - const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id }) + const stored = yield* MessageV2.get({ sessionID: chat.id, messageID: msg.id }) const state = yield* sts.get(chat.id) expect(Exit.isFailure(exit)).toBe(true) diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index e7791db308..427ce17e8f 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -1787,7 +1787,7 @@ it.instance( if (msg.info.role !== "user") throw new Error("expected user message") - const stored = MessageV2.get({ + const stored = yield* MessageV2.get({ sessionID: session.id, messageID: msg.info.id, }) @@ -1875,7 +1875,7 @@ it.instance( parts: yield* prompt.resolvePromptParts("Use @docs for context"), }) - const stored = MessageV2.get({ sessionID: session.id, messageID: message.info.id }) + const stored = yield* MessageV2.get({ sessionID: session.id, messageID: message.info.id }) const synthetic = stored.parts.filter( (part): part is MessageV2.TextPart => part.type === "text" && part.synthetic === true, ) @@ -1931,7 +1931,7 @@ it.instance( ], }) - const stored = MessageV2.get({ sessionID: session.id, messageID: message.info.id }) + const stored = yield* MessageV2.get({ sessionID: session.id, messageID: message.info.id }) const synthetic = stored.parts.filter( (part): part is MessageV2.TextPart => part.type === "text" && part.synthetic === true, ) @@ -1991,7 +1991,7 @@ it.instance( parts, noReply: true, }) - const stored = MessageV2.get({ sessionID: session.id, messageID: message.info.id }) + const stored = yield* MessageV2.get({ sessionID: session.id, messageID: message.info.id }) const textParts = stored.parts.filter((part) => part.type === "text") const hasContent = textParts.some((part) => part.text.includes("special content")) expect(hasContent).toBe(true)