feat(v2): add session message model conversion

This commit is contained in:
Dax Raad
2026-05-03 14:01:24 -04:00
parent c06af70ab0
commit a8b02aec38
8 changed files with 389 additions and 22 deletions

View File

@@ -143,6 +143,15 @@ export const { use: useSyncV2, provider: SyncProviderV2 } = createSimpleContext(
currentAssistant.snapshot = { ...currentAssistant.snapshot, end: event.properties.snapshot }
})
break
case "session.next.step.failed":
update(event.properties.sessionID, (draft) => {
const currentAssistant = activeAssistant(draft)
if (!currentAssistant) return
currentAssistant.time.completed = event.properties.timestamp
currentAssistant.finish = "error"
currentAssistant.error = event.properties.error
})
break
case "session.next.text.started":
update(event.properties.sessionID, (draft) => {
activeAssistant(draft)?.content.push({ type: "text", text: "" })
@@ -210,7 +219,7 @@ export const { use: useSyncV2, provider: SyncProviderV2 } = createSimpleContext(
match.time.completed = event.properties.timestamp
})
break
case "session.next.tool.error":
case "session.next.tool.failed":
update(event.properties.sessionID, (draft) => {
const match = latestTool(activeAssistant(draft), event.properties.callID)
if (match?.state.status !== "running") return

View File

@@ -405,7 +405,7 @@ export const layer: Layer.Layer<
case "tool-error": {
const toolCall = yield* readToolCall(value.toolCallId)
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Tool.Error.Sync, {
EventV2.run(SessionEvent.Tool.Failed.Sync, {
sessionID: ctx.sessionID,
callID: value.toolCallId,
error: {
@@ -650,6 +650,17 @@ export const layer: Layer.Layer<
yield* bus.publish(Session.Event.Error, { sessionID: ctx.sessionID, error })
return
}
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
EventV2.run(SessionEvent.Step.Failed.Sync, {
sessionID: ctx.sessionID,
error: {
type: error.name,
message: errorMessage(e),
},
timestamp: DateTime.makeUnsafe(Date.now()),
})
}
ctx.assistantMessage.error = error
yield* bus.publish(Session.Event.Error, {
sessionID: ctx.assistantMessage.sessionID,

View File

@@ -161,6 +161,9 @@ export default [
SyncEvent.project(SessionEvent.Step.Ended.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.step.ended", data })
}),
SyncEvent.project(SessionEvent.Step.Failed.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.step.failed", data })
}),
SyncEvent.project(SessionEvent.Text.Started.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.text.started", data })
}),
@@ -181,8 +184,8 @@ export default [
SyncEvent.project(SessionEvent.Tool.Success.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.tool.success", data })
}),
SyncEvent.project(SessionEvent.Tool.Error.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.tool.error", data })
SyncEvent.project(SessionEvent.Tool.Failed.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.tool.failed", data })
}),
SyncEvent.project(SessionEvent.Reasoning.Started.Sync, (db, data, event) => {
update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.reasoning.started", data })

View File

@@ -22,6 +22,11 @@ const Base = {
sessionID: SessionID,
}
const Error = Schema.Struct({
type: Schema.String,
message: Schema.String,
})
export const AgentSwitched = EventV2.define({
type: "session.next.agent.switched",
aggregate: "sessionID",
@@ -128,6 +133,16 @@ export namespace Step {
},
})
export type Ended = Schema.Schema.Type<typeof Ended>
export const Failed = EventV2.define({
type: "session.next.step.failed",
aggregate: "sessionID",
schema: {
...Base,
error: Error,
},
})
export type Failed = Schema.Schema.Type<typeof Failed>
}
export namespace Text {
@@ -275,23 +290,20 @@ export namespace Tool {
})
export type Success = Schema.Schema.Type<typeof Success>
export const Error = EventV2.define({
type: "session.next.tool.error",
export const Failed = EventV2.define({
type: "session.next.tool.failed",
aggregate: "sessionID",
schema: {
...Base,
callID: Schema.String,
error: Schema.Struct({
type: Schema.String,
message: Schema.String,
}),
error: Error,
provider: Schema.Struct({
executed: Schema.Boolean,
metadata: Schema.Record(Schema.String, Schema.Unknown).pipe(Schema.optional),
}),
},
})
export type Error = Schema.Schema.Type<typeof Error>
export type Failed = Schema.Schema.Type<typeof Failed>
}
export const RetryError = Schema.Struct({
@@ -359,6 +371,7 @@ export const All = Schema.Union(
Shell.Ended,
Step.Started,
Step.Ended,
Step.Failed,
Text.Started,
Text.Delta,
Text.Ended,
@@ -368,7 +381,7 @@ export const All = Schema.Union(
Tool.Called,
Tool.Progress,
Tool.Success,
Tool.Error,
Tool.Failed,
Reasoning.Started,
Reasoning.Delta,
Reasoning.Ended,

View File

@@ -0,0 +1,285 @@
import { isMedia } from "@/util/media"
import type { Provider } from "@/provider/provider"
import { Effect } from "effect"
import { convertToModelMessages, type ModelMessage, type ProviderMetadata, type UIMessage } from "ai"
import * as EffectLogger from "@opencode-ai/core/effect/logger"
import { SessionMessage } from "./session-message"
import { ID } from "./event"
export const SYNTHETIC_ATTACHMENT_PROMPT = "Attached image(s) from tool result:"
type Attachment = {
mime: string
url: string
filename?: string
}
type JsonValue = null | boolean | number | string | JsonValue[] | { [key: string]: JsonValue | undefined }
function truncateToolOutput(text: string, maxChars?: number) {
if (!maxChars || text.length <= maxChars) return text
const omitted = text.length - maxChars
return `${text.slice(0, maxChars)}\n[Tool output truncated for compaction: omitted ${omitted} chars]`
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null
}
function isJsonValue(value: unknown): value is JsonValue {
if (value === null) return true
if (typeof value === "boolean" || typeof value === "number" || typeof value === "string") return true
if (Array.isArray(value)) return value.every(isJsonValue)
if (isRecord(value)) return Object.values(value).every((item) => item === undefined || isJsonValue(item))
return false
}
function outputText(content: SessionMessage.ToolStateCompleted["content"]) {
return content
.filter((item) => item.type === "text")
.map((item) => item.text)
.join("\n")
}
function outputAttachments(content: SessionMessage.ToolStateCompleted["content"]) {
return content
.filter((item) => item.type === "file")
.map((item) => ({
mime: item.mime,
url: item.uri,
filename: item.name,
}))
}
function providerMeta(metadata: Record<string, unknown> | undefined) {
if (!metadata) return undefined
const result: ProviderMetadata = Object.fromEntries(
Object.entries(metadata)
.filter(([key]) => key !== "providerExecuted")
.filter(
(entry): entry is [string, Record<string, JsonValue | undefined>] =>
isRecord(entry[1]) && Object.values(entry[1]).every((item) => item === undefined || isJsonValue(item)),
),
)
return Object.keys(result).length > 0 ? result : undefined
}
function toModelOutput(options: { output: unknown }) {
if (typeof options.output === "string") return { type: "text", value: options.output }
if (!isRecord(options.output)) return { type: "json", value: options.output as never }
const attachments = Array.isArray(options.output.attachments)
? options.output.attachments.filter(
(attachment): attachment is Attachment =>
isRecord(attachment) && typeof attachment.mime === "string" && typeof attachment.url === "string",
)
: []
const text = typeof options.output.text === "string" ? options.output.text : ""
return {
type: "content",
value: [
...(text ? [{ type: "text" as const, text }] : []),
...attachments
.filter((attachment) => attachment.url.startsWith("data:") && attachment.url.includes(","))
.map((attachment) => ({
type: "media" as const,
mediaType: attachment.mime,
data: attachment.url.slice(attachment.url.indexOf(",") + 1),
})),
],
}
}
function supportsMediaInToolResults(model: Provider.Model) {
if (model.api.npm === "@ai-sdk/anthropic") return true
if (model.api.npm === "@ai-sdk/openai") return true
if (model.api.npm === "@ai-sdk/amazon-bedrock") return true
if (model.api.npm === "@ai-sdk/google-vertex/anthropic") return true
if (model.api.npm === "@ai-sdk/google") return model.api.id.toLowerCase().includes("gemini-3")
return false
}
export const toModelMessagesEffect = Effect.fnUntraced(function* (
input: SessionMessage.Message[],
model: Provider.Model,
options?: { stripMedia?: boolean; toolOutputMaxChars?: number },
) {
const result: UIMessage[] = []
const toolNames = new Set<string>()
const supportsMedia = supportsMediaInToolResults(model)
for (const msg of input) {
if (msg.type === "user") {
const parts: UIMessage["parts"] = []
if (msg.text)
parts.push({
type: "text",
text: msg.text,
})
for (const file of msg.files ?? []) {
if (file.mime === "text/plain" || file.mime === "application/x-directory") continue
if (options?.stripMedia && isMedia(file.mime)) {
parts.push({
type: "text",
text: `[Attached ${file.mime}: ${file.name ?? "file"}]`,
})
continue
}
parts.push({
type: "file",
url: file.uri,
mediaType: file.mime,
filename: file.name,
})
}
if (parts.length > 0) result.push({ id: msg.id, role: "user", parts })
}
if (msg.type === "synthetic" && msg.text) {
result.push({
id: msg.id,
role: "user",
parts: [{ type: "text", text: msg.text }],
})
}
if (msg.type === "shell") {
result.push({
id: msg.id,
role: "user",
parts: [
{
type: "text",
text: `The following shell command was executed by the user:\n\n${msg.command}${
msg.output ? `\n\nOutput:\n${msg.output}` : ""
}`,
},
],
})
}
if (msg.type === "compaction" && msg.summary) {
result.push({
id: `${msg.id}-summary`,
role: "assistant",
parts: [{ type: "text", text: msg.summary }],
})
}
if (msg.type === "assistant") {
const differentModel = `${model.providerID}/${model.id}` !== `${msg.model.providerID}/${msg.model.id}`
if (msg.error && !msg.content.some((item) => item.type === "text" || item.type === "tool")) continue
const parts: UIMessage["parts"] = []
const media: Attachment[] = []
for (const content of msg.content) {
if (content.type === "text") {
parts.push({ type: "text", text: content.text })
continue
}
if (content.type === "reasoning") {
if (differentModel) {
if (content.text.trim().length > 0) parts.push({ type: "text", text: content.text })
continue
}
parts.push({ type: "reasoning", text: content.text })
continue
}
toolNames.add(content.name)
if (content.state.status === "completed") {
const output = content.time.pruned
? "[Old tool result content cleared]"
: truncateToolOutput(outputText(content.state.content), options?.toolOutputMaxChars)
const attachments =
content.time.pruned || options?.stripMedia
? []
: [
...outputAttachments(content.state.content),
...(content.state.attachments ?? []).map((attachment) => ({
mime: attachment.mime,
url: attachment.uri,
filename: attachment.name,
})),
]
const mediaAttachments = attachments.filter((attachment) => isMedia(attachment.mime))
if (!supportsMedia && mediaAttachments.length > 0) media.push(...mediaAttachments)
const finalAttachments = supportsMedia
? attachments
: attachments.filter((attachment) => !isMedia(attachment.mime))
parts.push({
type: `tool-${content.name}` as `tool-${string}`,
state: "output-available",
toolCallId: content.id,
input: content.state.input,
output: finalAttachments.length > 0 ? { text: output, attachments: finalAttachments } : output,
...(content.provider?.executed ? { providerExecuted: true } : {}),
...(differentModel ? {} : { callProviderMetadata: providerMeta(content.provider?.metadata) }),
})
continue
}
if (content.state.status === "error") {
parts.push({
type: `tool-${content.name}` as `tool-${string}`,
state: "output-error",
toolCallId: content.id,
input: content.state.input,
errorText: content.state.error.message,
...(content.provider?.executed ? { providerExecuted: true } : {}),
...(differentModel ? {} : { callProviderMetadata: providerMeta(content.provider?.metadata) }),
})
continue
}
parts.push({
type: `tool-${content.name}` as `tool-${string}`,
state: "output-error",
toolCallId: content.id,
input: content.state.status === "pending" ? {} : content.state.input,
errorText: "[Tool execution was interrupted]",
...(content.provider?.executed ? { providerExecuted: true } : {}),
...(differentModel ? {} : { callProviderMetadata: providerMeta(content.provider?.metadata) }),
})
}
if (parts.length > 0) {
result.push({ id: msg.id, role: "assistant", parts })
if (media.length > 0) {
result.push({
id: ID.create(),
role: "user",
parts: [
{ type: "text", text: SYNTHETIC_ATTACHMENT_PROMPT },
...media.map((attachment) => ({
type: "file" as const,
url: attachment.url,
mediaType: attachment.mime,
filename: attachment.filename,
})),
],
})
}
}
}
}
return yield* Effect.promise(() =>
convertToModelMessages(result, {
// @ts-expect-error convertToModelMessages only needs tools[name]?.toModelOutput here.
tools: Object.fromEntries(Array.from(toolNames).map((toolName) => [toolName, { toModelOutput }])),
}),
)
})
export function toModelMessages(
input: SessionMessage.Message[],
model: Provider.Model,
options?: { stripMedia?: boolean; toolOutputMaxChars?: number },
): Promise<ModelMessage[]> {
return Effect.runPromise(toModelMessagesEffect(input, model, options).pipe(Effect.provide(EffectLogger.layer)))
}
export * as SessionMessageModel from "./session-message-model"

View File

@@ -199,6 +199,17 @@ export function update<Result>(adapter: Adapter<Result>, event: SessionEvent.Eve
)
}
},
"session.next.step.failed": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {
draft.time.completed = event.data.timestamp
draft.finish = "error"
draft.error = event.data.error
}),
)
}
},
"session.next.text.started": () => {
if (currentAssistant) {
adapter.updateAssistant(
@@ -314,7 +325,7 @@ export function update<Result>(adapter: Adapter<Result>, event: SessionEvent.Eve
)
}
},
"session.next.tool.error": (event) => {
"session.next.tool.failed": (event) => {
if (currentAssistant) {
adapter.updateAssistant(
produce(currentAssistant, (draft) => {

View File

@@ -152,7 +152,7 @@ export class Assistant extends Schema.Class<Assistant>("Session.Message.Assistan
write: Schema.Finite,
}),
}).pipe(Schema.optional),
error: Schema.String.pipe(Schema.optional),
error: SessionEvent.Step.Failed.fields.data.fields.error.pipe(Schema.optional),
time: Schema.Struct({
created: V2Schema.DateTimeUtcFromMillis,
completed: V2Schema.DateTimeUtcFromMillis.pipe(Schema.optional),

View File

@@ -58,6 +58,7 @@ export type Event =
| EventSessionNextShellEnded
| EventSessionNextStepStarted
| EventSessionNextStepEnded
| EventSessionNextStepFailed
| EventSessionNextTextStarted
| EventSessionNextTextDelta
| EventSessionNextTextEnded
@@ -70,7 +71,7 @@ export type Event =
| EventSessionNextToolCalled
| EventSessionNextToolProgress
| EventSessionNextToolSuccess
| EventSessionNextToolError
| EventSessionNextToolFailed
| EventSessionNextRetried
| EventSessionNextCompactionStarted
| EventSessionNextCompactionDelta
@@ -823,6 +824,7 @@ export type GlobalEvent = {
| EventSessionNextShellEnded
| EventSessionNextStepStarted
| EventSessionNextStepEnded
| EventSessionNextStepFailed
| EventSessionNextTextStarted
| EventSessionNextTextDelta
| EventSessionNextTextEnded
@@ -835,7 +837,7 @@ export type GlobalEvent = {
| EventSessionNextToolCalled
| EventSessionNextToolProgress
| EventSessionNextToolSuccess
| EventSessionNextToolError
| EventSessionNextToolFailed
| EventSessionNextRetried
| EventSessionNextCompactionStarted
| EventSessionNextCompactionDelta
@@ -857,6 +859,7 @@ export type GlobalEvent = {
| SyncEventSessionNextShellEnded
| SyncEventSessionNextStepStarted
| SyncEventSessionNextStepEnded
| SyncEventSessionNextStepFailed
| SyncEventSessionNextTextStarted
| SyncEventSessionNextTextDelta
| SyncEventSessionNextTextEnded
@@ -869,7 +872,7 @@ export type GlobalEvent = {
| SyncEventSessionNextToolCalled
| SyncEventSessionNextToolProgress
| SyncEventSessionNextToolSuccess
| SyncEventSessionNextToolError
| SyncEventSessionNextToolFailed
| SyncEventSessionNextRetried
| SyncEventSessionNextCompactionStarted
| SyncEventSessionNextCompactionDelta
@@ -1973,6 +1976,22 @@ export type SyncEventSessionNextStepEnded = {
}
}
export type SyncEventSessionNextStepFailed = {
type: "sync"
name: "session.next.step.failed.1"
id: string
seq: number
aggregateID: "sessionID"
data: {
timestamp: number
sessionID: string
error: {
type: string
message: string
}
}
}
export type SyncEventSessionNextTextStarted = {
type: "sync"
name: "session.next.text.started.1"
@@ -2157,9 +2176,9 @@ export type SyncEventSessionNextToolSuccess = {
}
}
export type SyncEventSessionNextToolError = {
export type SyncEventSessionNextToolFailed = {
type: "sync"
name: "session.next.tool.error.1"
name: "session.next.tool.failed.1"
id: string
seq: number
aggregateID: "sessionID"
@@ -2710,6 +2729,19 @@ export type EventSessionNextStepEnded = {
}
}
export type EventSessionNextStepFailed = {
id: string
type: "session.next.step.failed"
properties: {
timestamp: number
sessionID: string
error: {
type: string
message: string
}
}
}
export type EventSessionNextTextStarted = {
id: string
type: "session.next.text.started"
@@ -2870,9 +2902,9 @@ export type EventSessionNextToolSuccess = {
}
}
export type EventSessionNextToolError = {
export type EventSessionNextToolFailed = {
id: string
type: "session.next.tool.error"
type: "session.next.tool.failed"
properties: {
timestamp: number
sessionID: string
@@ -3162,7 +3194,10 @@ export type SessionMessageAssistant = {
write: number
}
}
error?: string
error?: {
type: string
message: string
}
}
export type SessionMessageCompaction = {