From 7acb326a5a755d16fda95a5ec80be597eddb1004 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Sat, 9 May 2026 22:19:29 -0400 Subject: [PATCH] refactor(session): define explicit LLM stream events --- packages/opencode/src/session/llm.ts | 60 +++++++++- packages/opencode/src/session/processor.ts | 4 +- .../test/session/processor-effect.test.ts | 110 +++++++++++++++++- 3 files changed, 166 insertions(+), 8 deletions(-) diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index c7990d1b35..8f8b093d77 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -2,7 +2,16 @@ import { Provider } from "@/provider/provider" import * as Log from "@opencode-ai/core/util/log" import { Context, Effect, Layer, Record } from "effect" import * as Stream from "effect/Stream" -import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai" +import { + streamText, + wrapLanguageModel, + type LanguageModelUsage, + type ModelMessage, + type ProviderMetadata as AiProviderMetadata, + type Tool, + tool, + jsonSchema, +} from "ai" import { mergeDeep } from "remeda" import { GitLabWorkflowLanguageModel } from "gitlab-ai-provider" import { ProviderTransform } from "@/provider/transform" @@ -27,7 +36,48 @@ import * as OtelTracer from "@effect/opentelemetry/Tracer" const log = Log.create({ service: "llm" }) export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX -type Result = Awaited> +export type ProviderMetadata = AiProviderMetadata +export type Usage = LanguageModelUsage + +export type ToolOutput = { + title: string + metadata: Record + output: string + attachments?: MessageV2.FilePart[] +} + +export type Event = + | { type: "start" } + | { type: "reasoning-start"; id: string; providerMetadata?: ProviderMetadata } + | { type: "reasoning-delta"; id: string; text: string; providerMetadata?: ProviderMetadata } + | { type: "reasoning-end"; id: string; providerMetadata?: ProviderMetadata } + | { type: "tool-input-start"; id: string; toolName: string; providerExecuted?: boolean } + | { type: "tool-input-delta"; id: string; delta?: string } + | { type: "tool-input-end"; id: string } + | { type: "tool-call"; toolCallId: string; toolName: string; input: any; providerMetadata?: ProviderMetadata } + | { type: "tool-result"; toolCallId: string; output: ToolOutput } + | { type: "tool-error"; toolCallId: string; error: unknown } + | { type: "error"; error: unknown } + | { type: "start-step" } + | { + type: "finish-step" + finishReason: string + rawFinishReason?: string + usage: Usage + response?: unknown + providerMetadata?: ProviderMetadata + } + | { type: "text-start"; id?: string; providerMetadata?: ProviderMetadata } + | { type: "text-delta"; id?: string; text: string; providerMetadata?: ProviderMetadata } + | { type: "text-end"; id?: string; providerMetadata?: ProviderMetadata } + | { + type: "finish" + finishReason?: string + rawFinishReason?: string + usage?: Usage + totalUsage?: Usage + providerMetadata?: ProviderMetadata + } // Avoid re-instantiating remeda's deep merge types in this hot LLM path; the runtime behavior is still mergeDeep. const mergeOptions = (target: Record, source: Record | undefined): Record => @@ -52,8 +102,6 @@ export type StreamRequest = StreamInput & { abort: AbortSignal } -export type Event = Result["fullStream"] extends AsyncIterable ? T : never - export interface Interface { readonly stream: (input: StreamInput) => Stream.Stream } @@ -427,7 +475,9 @@ const live: Layer.Layer< const result = yield* run({ ...input, abort: ctrl.signal }) - return Stream.fromAsyncIterable(result.fullStream, (e) => (e instanceof Error ? e : new Error(String(e)))) + return Stream.fromAsyncIterable(result.fullStream as AsyncIterable, (e) => + e instanceof Error ? e : new Error(String(e)), + ) }), ), ) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 6e84db16e2..c4ba1be14d 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -388,7 +388,7 @@ export const layer: Layer.Layer< text: value.output.output, }, ...(value.output.attachments?.map((item: MessageV2.FilePart) => ({ - type: "file", + type: "file" as const, uri: item.url, mime: item.mime, name: item.filename, @@ -578,7 +578,7 @@ export const layer: Layer.Layer< return default: - slog.info("unhandled", { event: value.type, value }) + slog.info("unhandled", { event: (value as { type: string }).type, value }) return } }) diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index 226bab9864..a0ca6f91f5 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -1,6 +1,7 @@ import { NodeFileSystem } from "@effect/platform-node" import { expect } from "bun:test" import { Cause, Effect, Exit, Fiber, Layer } from "effect" +import * as Stream from "effect/Stream" import path from "path" import type { Agent } from "../../src/agent/agent" import { Agent as AgentSvc } from "../../src/agent/agent" @@ -20,7 +21,7 @@ import { SessionSummary } from "../../src/session/summary" import { Snapshot } from "../../src/snapshot" import * as Log from "@opencode-ai/core/util/log" import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" -import { provideTmpdirServer } from "../fixture/fixture" +import { provideTmpdirServer, TestInstance } from "../fixture/fixture" import { testEffect } from "../lib/effect" import { raw, reply, TestLLMServer } from "../lib/llm-server" @@ -173,6 +174,37 @@ const env = Layer.mergeAll( const it = testEffect(env) +function llmEvents() { + const queue: Array = [] + + return { + push(...events: LLM.Event[]) { + queue.push(events) + }, + layer: Layer.succeed( + LLM.Service, + LLM.Service.of({ + stream: () => Stream.make(...(queue.shift() ?? [])), + }), + ), + } +} + +const directLLM = llmEvents() +const directDeps = Layer.mergeAll( + Session.defaultLayer, + Snapshot.defaultLayer, + AgentSvc.defaultLayer, + Permission.defaultLayer, + Plugin.defaultLayer, + Config.defaultLayer, + directLLM.layer, + Provider.defaultLayer, + status, +).pipe(Layer.provideMerge(infra)) +const directEnv = SessionProcessor.layer.pipe(Layer.provide(summary), Layer.provideMerge(directDeps)) +const directIt = testEffect(directEnv) + const boot = Effect.fn("test.boot")(function* () { const processors = yield* SessionProcessor.Service const session = yield* Session.Service @@ -184,6 +216,82 @@ const boot = Effect.fn("test.boot")(function* () { // Tests // --------------------------------------------------------------------------- +directIt.instance( + "session.processor effect tests consume explicit llm events", + Effect.gen(function* () { + const { directory: dir } = yield* TestInstance + const { processors, session, provider } = yield* boot() + + directLLM.push( + { type: "start" }, + { type: "start-step" }, + { type: "reasoning-start", id: "reason-0" }, + { type: "reasoning-delta", id: "reason-0", text: "think" }, + { type: "reasoning-end", id: "reason-0" }, + { type: "text-start", id: "text-0" }, + { type: "text-delta", id: "text-0", text: "hello" }, + { type: "text-end", id: "text-0" }, + { + type: "finish-step", + finishReason: "stop", + usage: { + inputTokens: 1, + outputTokens: 1, + totalTokens: 2, + inputTokenDetails: { + noCacheTokens: undefined, + cacheReadTokens: undefined, + cacheWriteTokens: undefined, + }, + outputTokenDetails: { + textTokens: undefined, + reasoningTokens: undefined, + }, + }, + }, + { type: "finish", finishReason: "stop" }, + ) + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "hi") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + }) + + const value = yield* handle.process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + messages: [{ role: "user", content: "hi" }], + tools: {}, + }) + + const parts = MessageV2.parts(msg.id) + const reasoning = parts.find((part): part is MessageV2.ReasoningPart => part.type === "reasoning") + const text = parts.find((part): part is MessageV2.TextPart => part.type === "text") + const finish = parts.find((part): part is MessageV2.StepFinishPart => part.type === "step-finish") + + expect(value).toBe("continue") + expect(reasoning?.text).toBe("think") + expect(text?.text).toBe("hello") + expect(finish?.reason).toBe("stop") + }), + { git: true, config: providerCfg("http://localhost:1/v1") }, +) + it.live("session.processor effect tests capture llm input cleanly", () => provideTmpdirServer( ({ dir, llm }) =>