refactor(session): define explicit LLM stream events

This commit is contained in:
Kit Langton
2026-05-09 22:19:29 -04:00
parent e1c1193f3e
commit 7acb326a5a
3 changed files with 166 additions and 8 deletions

View File

@@ -2,7 +2,16 @@ import { Provider } from "@/provider/provider"
import * as Log from "@opencode-ai/core/util/log"
import { Context, Effect, Layer, Record } from "effect"
import * as Stream from "effect/Stream"
import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai"
import {
streamText,
wrapLanguageModel,
type LanguageModelUsage,
type ModelMessage,
type ProviderMetadata as AiProviderMetadata,
type Tool,
tool,
jsonSchema,
} from "ai"
import { mergeDeep } from "remeda"
import { GitLabWorkflowLanguageModel } from "gitlab-ai-provider"
import { ProviderTransform } from "@/provider/transform"
@@ -27,7 +36,48 @@ import * as OtelTracer from "@effect/opentelemetry/Tracer"
const log = Log.create({ service: "llm" })
export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX
type Result = Awaited<ReturnType<typeof streamText>>
export type ProviderMetadata = AiProviderMetadata
export type Usage = LanguageModelUsage
export type ToolOutput = {
title: string
metadata: Record<string, any>
output: string
attachments?: MessageV2.FilePart[]
}
export type Event =
| { type: "start" }
| { type: "reasoning-start"; id: string; providerMetadata?: ProviderMetadata }
| { type: "reasoning-delta"; id: string; text: string; providerMetadata?: ProviderMetadata }
| { type: "reasoning-end"; id: string; providerMetadata?: ProviderMetadata }
| { type: "tool-input-start"; id: string; toolName: string; providerExecuted?: boolean }
| { type: "tool-input-delta"; id: string; delta?: string }
| { type: "tool-input-end"; id: string }
| { type: "tool-call"; toolCallId: string; toolName: string; input: any; providerMetadata?: ProviderMetadata }
| { type: "tool-result"; toolCallId: string; output: ToolOutput }
| { type: "tool-error"; toolCallId: string; error: unknown }
| { type: "error"; error: unknown }
| { type: "start-step" }
| {
type: "finish-step"
finishReason: string
rawFinishReason?: string
usage: Usage
response?: unknown
providerMetadata?: ProviderMetadata
}
| { type: "text-start"; id?: string; providerMetadata?: ProviderMetadata }
| { type: "text-delta"; id?: string; text: string; providerMetadata?: ProviderMetadata }
| { type: "text-end"; id?: string; providerMetadata?: ProviderMetadata }
| {
type: "finish"
finishReason?: string
rawFinishReason?: string
usage?: Usage
totalUsage?: Usage
providerMetadata?: ProviderMetadata
}
// Avoid re-instantiating remeda's deep merge types in this hot LLM path; the runtime behavior is still mergeDeep.
const mergeOptions = (target: Record<string, any>, source: Record<string, any> | undefined): Record<string, any> =>
@@ -52,8 +102,6 @@ export type StreamRequest = StreamInput & {
abort: AbortSignal
}
export type Event = Result["fullStream"] extends AsyncIterable<infer T> ? T : never
export interface Interface {
readonly stream: (input: StreamInput) => Stream.Stream<Event, unknown>
}
@@ -427,7 +475,9 @@ const live: Layer.Layer<
const result = yield* run({ ...input, abort: ctrl.signal })
return Stream.fromAsyncIterable(result.fullStream, (e) => (e instanceof Error ? e : new Error(String(e))))
return Stream.fromAsyncIterable(result.fullStream as AsyncIterable<Event>, (e) =>
e instanceof Error ? e : new Error(String(e)),
)
}),
),
)

View File

@@ -388,7 +388,7 @@ export const layer: Layer.Layer<
text: value.output.output,
},
...(value.output.attachments?.map((item: MessageV2.FilePart) => ({
type: "file",
type: "file" as const,
uri: item.url,
mime: item.mime,
name: item.filename,
@@ -578,7 +578,7 @@ export const layer: Layer.Layer<
return
default:
slog.info("unhandled", { event: value.type, value })
slog.info("unhandled", { event: (value as { type: string }).type, value })
return
}
})

View File

@@ -1,6 +1,7 @@
import { NodeFileSystem } from "@effect/platform-node"
import { expect } from "bun:test"
import { Cause, Effect, Exit, Fiber, Layer } from "effect"
import * as Stream from "effect/Stream"
import path from "path"
import type { Agent } from "../../src/agent/agent"
import { Agent as AgentSvc } from "../../src/agent/agent"
@@ -20,7 +21,7 @@ import { SessionSummary } from "../../src/session/summary"
import { Snapshot } from "../../src/snapshot"
import * as Log from "@opencode-ai/core/util/log"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { provideTmpdirServer } from "../fixture/fixture"
import { provideTmpdirServer, TestInstance } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
import { raw, reply, TestLLMServer } from "../lib/llm-server"
@@ -173,6 +174,37 @@ const env = Layer.mergeAll(
const it = testEffect(env)
function llmEvents() {
const queue: Array<LLM.Event[]> = []
return {
push(...events: LLM.Event[]) {
queue.push(events)
},
layer: Layer.succeed(
LLM.Service,
LLM.Service.of({
stream: () => Stream.make(...(queue.shift() ?? [])),
}),
),
}
}
const directLLM = llmEvents()
const directDeps = Layer.mergeAll(
Session.defaultLayer,
Snapshot.defaultLayer,
AgentSvc.defaultLayer,
Permission.defaultLayer,
Plugin.defaultLayer,
Config.defaultLayer,
directLLM.layer,
Provider.defaultLayer,
status,
).pipe(Layer.provideMerge(infra))
const directEnv = SessionProcessor.layer.pipe(Layer.provide(summary), Layer.provideMerge(directDeps))
const directIt = testEffect(directEnv)
const boot = Effect.fn("test.boot")(function* () {
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
@@ -184,6 +216,82 @@ const boot = Effect.fn("test.boot")(function* () {
// Tests
// ---------------------------------------------------------------------------
directIt.instance(
"session.processor effect tests consume explicit llm events",
Effect.gen(function* () {
const { directory: dir } = yield* TestInstance
const { processors, session, provider } = yield* boot()
directLLM.push(
{ type: "start" },
{ type: "start-step" },
{ type: "reasoning-start", id: "reason-0" },
{ type: "reasoning-delta", id: "reason-0", text: "think" },
{ type: "reasoning-end", id: "reason-0" },
{ type: "text-start", id: "text-0" },
{ type: "text-delta", id: "text-0", text: "hello" },
{ type: "text-end", id: "text-0" },
{
type: "finish-step",
finishReason: "stop",
usage: {
inputTokens: 1,
outputTokens: 1,
totalTokens: 2,
inputTokenDetails: {
noCacheTokens: undefined,
cacheReadTokens: undefined,
cacheWriteTokens: undefined,
},
outputTokenDetails: {
textTokens: undefined,
reasoningTokens: undefined,
},
},
},
{ type: "finish", finishReason: "stop" },
)
const chat = yield* session.create({})
const parent = yield* user(chat.id, "hi")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
})
const value = yield* handle.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
messages: [{ role: "user", content: "hi" }],
tools: {},
})
const parts = MessageV2.parts(msg.id)
const reasoning = parts.find((part): part is MessageV2.ReasoningPart => part.type === "reasoning")
const text = parts.find((part): part is MessageV2.TextPart => part.type === "text")
const finish = parts.find((part): part is MessageV2.StepFinishPart => part.type === "step-finish")
expect(value).toBe("continue")
expect(reasoning?.text).toBe("think")
expect(text?.text).toBe("hello")
expect(finish?.reason).toBe("stop")
}),
{ git: true, config: providerCfg("http://localhost:1/v1") },
)
it.live("session.processor effect tests capture llm input cleanly", () =>
provideTmpdirServer(
({ dir, llm }) =>