fix(session): make message reads effectful (#27291)

This commit is contained in:
Shoubhit Dash
2026-05-13 14:40:12 +05:30
committed by GitHub
parent 4b041716fc
commit ccf93f3523
7 changed files with 60 additions and 99 deletions

View File

@@ -105,7 +105,7 @@ export const sessionHandlers = HttpApiBuilder.group(InstanceHttpApi, "session",
}
const page = yield* SessionError.mapStorageNotFound(
MessageV2.pageEffect({
MessageV2.page({
sessionID: ctx.params.sessionID,
limit: ctx.query.limit,
before: ctx.query.before,
@@ -132,7 +132,7 @@ export const sessionHandlers = HttpApiBuilder.group(InstanceHttpApi, "session",
params: { sessionID: SessionID; messageID: MessageID }
}) {
return yield* SessionError.mapStorageNotFound(
MessageV2.getEffect({ sessionID: ctx.params.sessionID, messageID: ctx.params.messageID }),
MessageV2.get({ sessionID: ctx.params.sessionID, messageID: ctx.params.messageID }),
)
})

View File

@@ -919,7 +919,11 @@ export function toModelMessages(
return Effect.runPromise(toModelMessagesEffect(input, model, options).pipe(Effect.provide(EffectLogger.layer)))
}
export function page(input: { sessionID: SessionID; limit: number; before?: string }) {
export const page = Effect.fn("MessageV2.page")(function* (input: {
sessionID: SessionID
limit: number
before?: string
}) {
const before = input.before ? cursor.decode(input.before) : undefined
const where = before
? and(eq(MessageTable.session_id, input.sessionID), older(before))
@@ -937,7 +941,7 @@ export function page(input: { sessionID: SessionID; limit: number; before?: stri
const row = Database.use((db) =>
db.select({ id: SessionTable.id }).from(SessionTable).where(eq(SessionTable.id, input.sessionID)).get(),
)
if (!row) throw new NotFoundError({ message: `Session not found: ${input.sessionID}` })
if (!row) return yield* new NotFoundError({ message: `Session not found: ${input.sessionID}` })
return {
items: [] as WithParts[],
more: false,
@@ -954,24 +958,19 @@ export function page(input: { sessionID: SessionID; limit: number; before?: stri
more,
cursor: more && tail ? cursor.encode({ id: tail.id, time: tail.time_created }) : undefined,
}
}
export const pageEffect = Effect.fn("MessageV2.pageEffect")(function* (input: {
sessionID: SessionID
limit: number
before?: string
}) {
return yield* Effect.try({
try: () => page(input),
catch: (error) => error,
}).pipe(Effect.catch((error) => (NotFoundError.isInstance(error) ? Effect.fail(error) : Effect.die(error))))
})
export function* stream(sessionID: SessionID) {
const size = 50
let before: string | undefined
while (true) {
const next = page({ sessionID, limit: size, before })
const next = Effect.runSync(
page({ sessionID, limit: size, before }).pipe(
Effect.catchIf(NotFoundError.isInstance, () =>
Effect.succeed({ items: [] as WithParts[], more: false, cursor: undefined }),
),
),
)
if (next.items.length === 0) break
for (let i = next.items.length - 1; i >= 0; i--) {
yield next.items[i]
@@ -996,7 +995,7 @@ export function parts(message_id: MessageID) {
)
}
export function get(input: { sessionID: SessionID; messageID: MessageID }): WithParts {
export const get = Effect.fn("MessageV2.get")(function* (input: { sessionID: SessionID; messageID: MessageID }) {
const row = Database.use((db) =>
db
.select()
@@ -1004,21 +1003,11 @@ export function get(input: { sessionID: SessionID; messageID: MessageID }): With
.where(and(eq(MessageTable.id, input.messageID), eq(MessageTable.session_id, input.sessionID)))
.get(),
)
if (!row) throw new NotFoundError({ message: `Message not found: ${input.messageID}` })
if (!row) return yield* new NotFoundError({ message: `Message not found: ${input.messageID}` })
return {
info: info(row),
parts: parts(input.messageID),
}
}
export const getEffect = Effect.fn("MessageV2.getEffect")(function* (input: {
sessionID: SessionID
messageID: MessageID
}) {
return yield* Effect.try({
try: () => get(input),
catch: (error) => error,
}).pipe(Effect.catch((error) => (NotFoundError.isInstance(error) ? Effect.fail(error) : Effect.die(error))))
})
export function filterCompacted(msgs: Iterable<WithParts>) {

View File

@@ -759,14 +759,14 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service |
const messages: Interface["messages"] = Effect.fn("Session.messages")(function* (input) {
if (input.limit) {
return (yield* MessageV2.pageEffect({ sessionID: input.sessionID, limit: input.limit })).items
return (yield* MessageV2.page({ sessionID: input.sessionID, limit: input.limit })).items
}
const size = 50
const result = [] as MessageV2.WithParts[]
let before: string | undefined
while (true) {
const page = yield* MessageV2.pageEffect({ sessionID: input.sessionID, limit: size, before })
const page = yield* MessageV2.page({ sessionID: input.sessionID, limit: size, before })
if (page.items.length === 0) break
for (let i = page.items.length - 1; i >= 0; i--) {
const item = page.items[i]
@@ -817,7 +817,7 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service |
const size = 50
let before: string | undefined
while (true) {
const page = yield* MessageV2.pageEffect({ sessionID, limit: size, before })
const page = yield* MessageV2.page({ sessionID, limit: size, before })
if (page.items.length === 0) break
for (let i = page.items.length - 1; i >= 0; i--) {
const item = page.items[i]

View File

@@ -86,7 +86,7 @@ export const TaskTool = Tool.define(
],
}))
const msg = yield* MessageV2.getEffect({ sessionID: ctx.sessionID, messageID: ctx.messageID }).pipe(Effect.orDie)
const msg = yield* MessageV2.get({ sessionID: ctx.sessionID, messageID: ctx.messageID }).pipe(Effect.orDie)
if (msg.info.role !== "assistant") return yield* Effect.fail(new Error("Not an assistant message"))
const model = next.model ?? {

View File

@@ -12,20 +12,6 @@ void Log.init({ print: false })
const it = testEffect(SessionNs.defaultLayer)
function expectNotFound(fn: () => unknown, message: string) {
let thrown: unknown
try {
fn()
} catch (error) {
thrown = error
}
expect(thrown).toBeInstanceOf(NotFoundError)
if (thrown instanceof NotFoundError) {
expect(thrown._tag).toBe("NotFoundError")
expect(thrown.message).toBe(message)
}
}
const withSession = <A, E, R>(
fn: (input: { session: SessionNs.Interface; sessionID: SessionID }) => Effect.Effect<A, E, R>,
) =>
@@ -140,12 +126,12 @@ const addCompactionPart = Effect.fn("Test.addCompactionPart")(function* (
})
describe("MessageV2.page", () => {
it.instance("returns sync result", () =>
it.instance("returns page result", () =>
withSession(({ sessionID }) =>
Effect.gen(function* () {
yield* fill(sessionID, 2)
const result = MessageV2.page({ sessionID, limit: 10 })
const result = yield* MessageV2.page({ sessionID, limit: 10 })
expect(result).toBeDefined()
expect(result.items).toBeArray()
}),
@@ -157,18 +143,18 @@ describe("MessageV2.page", () => {
Effect.gen(function* () {
const ids = yield* fill(sessionID, 6)
const a = MessageV2.page({ sessionID, limit: 2 })
const a = yield* MessageV2.page({ sessionID, limit: 2 })
expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2))
expect(a.items.every((item) => item.parts.length === 1)).toBe(true)
expect(a.more).toBe(true)
expect(a.cursor).toBeTruthy()
const b = MessageV2.page({ sessionID, limit: 2, before: a.cursor! })
const b = yield* MessageV2.page({ sessionID, limit: 2, before: a.cursor! })
expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(-4, -2))
expect(b.more).toBe(true)
expect(b.cursor).toBeTruthy()
const c = MessageV2.page({ sessionID, limit: 2, before: b.cursor! })
const c = yield* MessageV2.page({ sessionID, limit: 2, before: b.cursor! })
expect(c.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2))
expect(c.more).toBe(false)
expect(c.cursor).toBeUndefined()
@@ -181,7 +167,7 @@ describe("MessageV2.page", () => {
Effect.gen(function* () {
const ids = yield* fill(sessionID, 4)
const result = MessageV2.page({ sessionID, limit: 4 })
const result = yield* MessageV2.page({ sessionID, limit: 4 })
expect(result.items.map((item) => item.info.id)).toEqual(ids)
}),
),
@@ -190,7 +176,7 @@ describe("MessageV2.page", () => {
it.instance("returns empty items for session with no messages", () =>
withSession(({ sessionID }) =>
Effect.gen(function* () {
const result = MessageV2.page({ sessionID, limit: 10 })
const result = yield* MessageV2.page({ sessionID, limit: 10 })
expect(result.items).toEqual([])
expect(result.more).toBe(false)
expect(result.cursor).toBeUndefined()
@@ -198,17 +184,10 @@ describe("MessageV2.page", () => {
),
)
it.instance("throws NotFoundError for non-existent session", () =>
it.instance("fails with NotFoundError for non-existent session", () =>
Effect.gen(function* () {
const fake = "non-existent-session" as SessionID
expectNotFound(() => MessageV2.page({ sessionID: fake, limit: 10 }), `Session not found: ${fake}`)
}),
)
it.instance("fails pageEffect with NotFoundError for non-existent session", () =>
Effect.gen(function* () {
const fake = "non-existent-session" as SessionID
const error = yield* Effect.flip(MessageV2.pageEffect({ sessionID: fake, limit: 10 }))
const error = yield* Effect.flip(MessageV2.page({ sessionID: fake, limit: 10 }))
expect(error).toBeInstanceOf(NotFoundError)
expect(error.message).toBe(`Session not found: ${fake}`)
}),
@@ -219,7 +198,7 @@ describe("MessageV2.page", () => {
Effect.gen(function* () {
const ids = yield* fill(sessionID, 3)
const result = MessageV2.page({ sessionID, limit: 3 })
const result = yield* MessageV2.page({ sessionID, limit: 3 })
expect(result.items.map((item) => item.info.id)).toEqual(ids)
expect(result.more).toBe(false)
expect(result.cursor).toBeUndefined()
@@ -232,7 +211,7 @@ describe("MessageV2.page", () => {
Effect.gen(function* () {
const ids = yield* fill(sessionID, 5)
const result = MessageV2.page({ sessionID, limit: 1 })
const result = yield* MessageV2.page({ sessionID, limit: 1 })
expect(result.items).toHaveLength(1)
expect(result.items[0].info.id).toBe(ids[ids.length - 1])
expect(result.more).toBe(true)
@@ -253,7 +232,7 @@ describe("MessageV2.page", () => {
text: "extra",
})
const result = MessageV2.page({ sessionID, limit: 10 })
const result = yield* MessageV2.page({ sessionID, limit: 10 })
expect(result.items).toHaveLength(1)
expect(result.items[0].parts).toHaveLength(2)
}),
@@ -265,8 +244,8 @@ describe("MessageV2.page", () => {
Effect.gen(function* () {
const ids = yield* fill(sessionID, 4, (i: number) => 1000.5 + i)
const a = MessageV2.page({ sessionID, limit: 2 })
const b = MessageV2.page({ sessionID, limit: 2, before: a.cursor! })
const a = yield* MessageV2.page({ sessionID, limit: 2 })
const b = yield* MessageV2.page({ sessionID, limit: 2, before: a.cursor! })
expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2))
expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2))
@@ -279,11 +258,11 @@ describe("MessageV2.page", () => {
Effect.gen(function* () {
const ids = yield* fill(sessionID, 4, () => 1000)
const a = MessageV2.page({ sessionID, limit: 2 })
const a = yield* MessageV2.page({ sessionID, limit: 2 })
expect(a.items.map((item) => item.info.id)).toEqual(ids.slice(-2))
expect(a.more).toBe(true)
const b = MessageV2.page({ sessionID, limit: 2, before: a.cursor! })
const b = yield* MessageV2.page({ sessionID, limit: 2, before: a.cursor! })
expect(b.items.map((item) => item.info.id)).toEqual(ids.slice(0, 2))
expect(b.more).toBe(false)
}),
@@ -298,8 +277,8 @@ describe("MessageV2.page", () => {
yield* fill(a.id, 3)
yield* fill(b.id, 2)
const resultA = MessageV2.page({ sessionID: a.id, limit: 10 })
const resultB = MessageV2.page({ sessionID: b.id, limit: 10 })
const resultA = yield* MessageV2.page({ sessionID: a.id, limit: 10 })
const resultB = yield* MessageV2.page({ sessionID: b.id, limit: 10 })
expect(resultA.items).toHaveLength(3)
expect(resultB.items).toHaveLength(2)
expect(resultA.items.every((item) => item.info.sessionID === a.id)).toBe(true)
@@ -315,7 +294,7 @@ describe("MessageV2.page", () => {
Effect.gen(function* () {
const ids = yield* fill(sessionID, 10)
const result = MessageV2.page({ sessionID, limit: 100 })
const result = yield* MessageV2.page({ sessionID, limit: 100 })
expect(result.items).toHaveLength(10)
expect(result.items.map((item) => item.info.id)).toEqual(ids)
expect(result.more).toBe(false)
@@ -482,7 +461,7 @@ describe("MessageV2.get", () => {
Effect.gen(function* () {
const [id] = yield* fill(sessionID, 1)
const result = MessageV2.get({ sessionID, messageID: id })
const result = yield* MessageV2.get({ sessionID, messageID: id })
expect(result.info.id).toBe(id)
expect(result.info.sessionID).toBe(sessionID)
expect(result.info.role).toBe("user")
@@ -492,20 +471,11 @@ describe("MessageV2.get", () => {
),
)
it.instance("throws NotFoundError for non-existent message", () =>
it.instance("fails with NotFoundError for non-existent message", () =>
withSession(({ sessionID }) =>
Effect.gen(function* () {
const messageID = MessageID.ascending()
expectNotFound(() => MessageV2.get({ sessionID, messageID }), `Message not found: ${messageID}`)
}),
),
)
it.instance("fails getEffect with NotFoundError for non-existent message", () =>
withSession(({ sessionID }) =>
Effect.gen(function* () {
const messageID = MessageID.ascending()
const error = yield* Effect.flip(MessageV2.getEffect({ sessionID, messageID }))
const error = yield* Effect.flip(MessageV2.get({ sessionID, messageID }))
expect(error).toBeInstanceOf(NotFoundError)
expect(error.message).toBe(`Message not found: ${messageID}`)
}),
@@ -519,8 +489,10 @@ describe("MessageV2.get", () => {
const b = yield* session.create({})
const [id] = yield* fill(a.id, 1)
expectNotFound(() => MessageV2.get({ sessionID: b.id, messageID: id }), `Message not found: ${id}`)
const result = MessageV2.get({ sessionID: a.id, messageID: id })
const error = yield* Effect.flip(MessageV2.get({ sessionID: b.id, messageID: id }))
expect(error).toBeInstanceOf(NotFoundError)
expect(error.message).toBe(`Message not found: ${id}`)
const result = yield* MessageV2.get({ sessionID: a.id, messageID: id })
expect(result.info.id).toBe(id)
yield* session.remove(a.id)
@@ -541,7 +513,7 @@ describe("MessageV2.get", () => {
text: "extra",
})
const result = MessageV2.get({ sessionID, messageID: id })
const result = yield* MessageV2.get({ sessionID, messageID: id })
expect(result.parts).toHaveLength(2)
}),
),
@@ -561,7 +533,7 @@ describe("MessageV2.get", () => {
text: "response",
})
const result = MessageV2.get({ sessionID, messageID: aid })
const result = yield* MessageV2.get({ sessionID, messageID: aid })
expect(result.info.role).toBe("assistant")
expect(result.parts).toHaveLength(1)
expect((result.parts[0] as MessageV2.TextPart).text).toBe("response")
@@ -574,7 +546,7 @@ describe("MessageV2.get", () => {
Effect.gen(function* () {
const id = yield* addUser(sessionID)
const result = MessageV2.get({ sessionID, messageID: id })
const result = yield* MessageV2.get({ sessionID, messageID: id })
expect(result.info.id).toBe(id)
expect(result.parts).toEqual([])
}),
@@ -1026,9 +998,9 @@ describe("MessageV2 consistency", () => {
Effect.gen(function* () {
yield* fill(sessionID, 3)
const paged = MessageV2.page({ sessionID, limit: 10 })
const paged = yield* MessageV2.page({ sessionID, limit: 10 })
for (const item of paged.items) {
const got = MessageV2.get({ sessionID, messageID: item.info.id as MessageID })
const got = yield* MessageV2.get({ sessionID, messageID: item.info.id as MessageID })
expect(got.info).toEqual(item.info)
expect(got.parts).toEqual(item.parts)
}
@@ -1041,7 +1013,7 @@ describe("MessageV2 consistency", () => {
Effect.gen(function* () {
const [id] = yield* fill(sessionID, 1)
const got = MessageV2.get({ sessionID, messageID: id })
const got = yield* MessageV2.get({ sessionID, messageID: id })
const standalone = MessageV2.parts(id)
expect(got.parts).toEqual(standalone)
}),
@@ -1058,7 +1030,7 @@ describe("MessageV2 consistency", () => {
const paged = [] as MessageV2.WithParts[]
let cursor: string | undefined
while (true) {
const result = MessageV2.page({ sessionID, limit: 3, before: cursor })
const result = yield* MessageV2.page({ sessionID, limit: 3, before: cursor })
for (let i = result.items.length - 1; i >= 0; i--) {
paged.push(result.items[i])
}

View File

@@ -767,7 +767,7 @@ it.live("session.processor effect tests record aborted errors and idle state", (
const exit = yield* Fiber.await(run)
yield* Effect.promise(() => seen.promise)
const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id })
const stored = yield* MessageV2.get({ sessionID: chat.id, messageID: msg.id })
const state = yield* sts.get(chat.id)
off()
@@ -829,7 +829,7 @@ it.live("session.processor effect tests mark interruptions aborted without manua
yield* Fiber.interrupt(run)
const exit = yield* Fiber.await(run)
const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id })
const stored = yield* MessageV2.get({ sessionID: chat.id, messageID: msg.id })
const state = yield* sts.get(chat.id)
expect(Exit.isFailure(exit)).toBe(true)

View File

@@ -1787,7 +1787,7 @@ it.instance(
if (msg.info.role !== "user") throw new Error("expected user message")
const stored = MessageV2.get({
const stored = yield* MessageV2.get({
sessionID: session.id,
messageID: msg.info.id,
})
@@ -1875,7 +1875,7 @@ it.instance(
parts: yield* prompt.resolvePromptParts("Use @docs for context"),
})
const stored = MessageV2.get({ sessionID: session.id, messageID: message.info.id })
const stored = yield* MessageV2.get({ sessionID: session.id, messageID: message.info.id })
const synthetic = stored.parts.filter(
(part): part is MessageV2.TextPart => part.type === "text" && part.synthetic === true,
)
@@ -1931,7 +1931,7 @@ it.instance(
],
})
const stored = MessageV2.get({ sessionID: session.id, messageID: message.info.id })
const stored = yield* MessageV2.get({ sessionID: session.id, messageID: message.info.id })
const synthetic = stored.parts.filter(
(part): part is MessageV2.TextPart => part.type === "text" && part.synthetic === true,
)
@@ -1991,7 +1991,7 @@ it.instance(
parts,
noReply: true,
})
const stored = MessageV2.get({ sessionID: session.id, messageID: message.info.id })
const stored = yield* MessageV2.get({ sessionID: session.id, messageID: message.info.id })
const textParts = stored.parts.filter((part) => part.type === "text")
const hasContent = textParts.some((part) => part.text.includes("special content"))
expect(hasContent).toBe(true)