fix(session): add typed message lookup wrappers (#27269)

This commit is contained in:
Shoubhit Dash
2026-05-13 13:04:07 +05:30
committed by GitHub
parent e9a29e4908
commit fed043a1ad
3 changed files with 49 additions and 10 deletions

View File

@@ -14,7 +14,6 @@ import { SessionStatus } from "@/session/status"
import { SessionSummary } from "@/session/summary"
import { Todo } from "@/session/todo"
import { MessageID, PartID, SessionID } from "@/session/schema"
import { NotFoundError } from "@/storage/storage"
import { NamedError } from "@opencode-ai/core/util/error"
import { Cause, Effect, Option, Schema, Scope } from "effect"
import * as Stream from "effect/Stream"
@@ -105,11 +104,13 @@ export const sessionHandlers = HttpApiBuilder.group(InstanceHttpApi, "session",
return yield* session.messages({ sessionID: ctx.params.sessionID })
}
const page = MessageV2.page({
sessionID: ctx.params.sessionID,
limit: ctx.query.limit,
before: ctx.query.before,
})
const page = yield* SessionError.mapStorageNotFound(
MessageV2.pageEffect({
sessionID: ctx.params.sessionID,
limit: ctx.query.limit,
before: ctx.query.before,
}),
)
if (!page.cursor) return page.items
const request = yield* HttpServerRequest.HttpServerRequest
@@ -131,10 +132,7 @@ export const sessionHandlers = HttpApiBuilder.group(InstanceHttpApi, "session",
params: { sessionID: SessionID; messageID: MessageID }
}) {
return yield* SessionError.mapStorageNotFound(
Effect.try({
try: () => MessageV2.get({ sessionID: ctx.params.sessionID, messageID: ctx.params.messageID }),
catch: (error) => error,
}).pipe(Effect.catch((error) => (NotFoundError.isInstance(error) ? Effect.fail(error) : Effect.die(error)))),
MessageV2.getEffect({ sessionID: ctx.params.sessionID, messageID: ctx.params.messageID }),
)
})

View File

@@ -956,6 +956,17 @@ export function page(input: { sessionID: SessionID; limit: number; before?: stri
}
}
export const pageEffect = Effect.fn("MessageV2.pageEffect")(function* (input: {
sessionID: SessionID
limit: number
before?: string
}) {
return yield* Effect.try({
try: () => page(input),
catch: (error) => error,
}).pipe(Effect.catch((error) => (NotFoundError.isInstance(error) ? Effect.fail(error) : Effect.die(error))))
})
export function* stream(sessionID: SessionID) {
const size = 50
let before: string | undefined
@@ -1000,6 +1011,16 @@ export function get(input: { sessionID: SessionID; messageID: MessageID }): With
}
}
export const getEffect = Effect.fn("MessageV2.getEffect")(function* (input: {
sessionID: SessionID
messageID: MessageID
}) {
return yield* Effect.try({
try: () => get(input),
catch: (error) => error,
}).pipe(Effect.catch((error) => (NotFoundError.isInstance(error) ? Effect.fail(error) : Effect.die(error))))
})
export function filterCompacted(msgs: Iterable<WithParts>) {
const result = [] as WithParts[]
const completed = new Set<string>()

View File

@@ -205,6 +205,15 @@ describe("MessageV2.page", () => {
}),
)
it.instance("fails pageEffect with NotFoundError for non-existent session", () =>
Effect.gen(function* () {
const fake = "non-existent-session" as SessionID
const error = yield* Effect.flip(MessageV2.pageEffect({ sessionID: fake, limit: 10 }))
expect(error).toBeInstanceOf(NotFoundError)
expect(error.message).toBe(`Session not found: ${fake}`)
}),
)
it.instance("handles exact limit boundary", () =>
withSession(({ sessionID }) =>
Effect.gen(function* () {
@@ -492,6 +501,17 @@ describe("MessageV2.get", () => {
),
)
it.instance("fails getEffect with NotFoundError for non-existent message", () =>
withSession(({ sessionID }) =>
Effect.gen(function* () {
const messageID = MessageID.ascending()
const error = yield* Effect.flip(MessageV2.getEffect({ sessionID, messageID }))
expect(error).toBeInstanceOf(NotFoundError)
expect(error.message).toBe(`Message not found: ${messageID}`)
}),
),
)
it.instance("scopes by session id", () =>
Effect.gen(function* () {
const session = yield* SessionNs.Service