Emit LLM stream lifecycle events (#26971)

This commit is contained in:
Kit Langton
2026-05-11 21:31:48 -04:00
committed by GitHub
parent e5aa5161f2
commit 8030a6c187
14 changed files with 560 additions and 196 deletions

View File

@@ -17,6 +17,7 @@ import {
} from "../schema"
import { JsonObject, optionalArray, optionalNull, ProviderShared } from "./shared"
import * as Cache from "./utils/cache"
import { Lifecycle } from "./utils/lifecycle"
import { ToolStream } from "./utils/tool-stream"
const ADAPTER = "anthropic-messages"
@@ -190,6 +191,7 @@ type AnthropicEvent = Schema.Schema.Type<typeof AnthropicEvent>
interface ParserState {
readonly tools: ToolStream.State<number>
readonly usage?: Usage
readonly lifecycle: Lifecycle.State
}
const invalid = ProviderShared.invalidRequest
@@ -500,37 +502,45 @@ const onContentBlockStart = (state: ParserState, event: AnthropicEvent): StepRes
if (!block) return [state, NO_EVENTS]
if ((block.type === "tool_use" || block.type === "server_tool_use") && event.index !== undefined) {
const events: LLMEvent[] = []
const lifecycle = Lifecycle.stepStart(state.lifecycle, events)
return [
{
...state,
lifecycle,
tools: ToolStream.start(state.tools, event.index, {
id: block.id ?? String(event.index),
name: block.name ?? "",
providerExecuted: block.type === "server_tool_use",
}),
},
NO_EVENTS,
[...events, LLMEvent.toolInputStart({ id: block.id ?? String(event.index), name: block.name ?? "" })],
]
}
if (block.type === "text" && block.text) {
return [state, [LLMEvent.textDelta({ id: `text-${event.index ?? 0}`, text: block.text })]]
const events: LLMEvent[] = []
return [
{ ...state, lifecycle: Lifecycle.textDelta(state.lifecycle, events, `text-${event.index ?? 0}`, block.text) },
events,
]
}
if (block.type === "thinking" && block.thinking) {
const events: LLMEvent[] = []
return [
state,
[
LLMEvent.reasoningDelta({
id: `reasoning-${event.index ?? 0}`,
text: block.thinking,
}),
],
{
...state,
lifecycle: Lifecycle.reasoningDelta(state.lifecycle, events, `reasoning-${event.index ?? 0}`, block.thinking),
},
events,
]
}
const result = serverToolResultEvent(block)
return [state, result ? [result] : NO_EVENTS]
if (!result) return [state, NO_EVENTS]
const events: LLMEvent[] = []
return [{ ...state, lifecycle: Lifecycle.stepStart(state.lifecycle, events) }, [...events, result]]
}
const onContentBlockDelta = Effect.fn("AnthropicMessages.onContentBlockDelta")(function* (
@@ -540,25 +550,37 @@ const onContentBlockDelta = Effect.fn("AnthropicMessages.onContentBlockDelta")(f
const delta = event.delta
if (delta?.type === "text_delta" && delta.text) {
return [state, [LLMEvent.textDelta({ id: `text-${event.index ?? 0}`, text: delta.text })]] satisfies StepResult
const events: LLMEvent[] = []
return [
{ ...state, lifecycle: Lifecycle.textDelta(state.lifecycle, events, `text-${event.index ?? 0}`, delta.text) },
events,
] satisfies StepResult
}
if (delta?.type === "thinking_delta" && delta.thinking) {
const events: LLMEvent[] = []
return [
state,
[LLMEvent.reasoningDelta({ id: `reasoning-${event.index ?? 0}`, text: delta.thinking })],
{
...state,
lifecycle: Lifecycle.reasoningDelta(state.lifecycle, events, `reasoning-${event.index ?? 0}`, delta.thinking),
},
events,
] satisfies StepResult
}
if (delta?.type === "signature_delta" && delta.signature) {
const events: LLMEvent[] = []
return [
state,
[
LLMEvent.reasoningEnd({
id: `reasoning-${event.index ?? 0}`,
providerMetadata: anthropicMetadata({ signature: delta.signature }),
}),
],
{
...state,
lifecycle: Lifecycle.reasoningEnd(
state.lifecycle,
events,
`reasoning-${event.index ?? 0}`,
anthropicMetadata({ signature: delta.signature }),
),
},
events,
] satisfies StepResult
}
@@ -572,7 +594,10 @@ const onContentBlockDelta = Effect.fn("AnthropicMessages.onContentBlockDelta")(f
"Anthropic Messages tool argument delta is missing its tool call",
)
if (ToolStream.isError(result)) return yield* result
return [{ ...state, tools: result.tools }, result.event ? [result.event] : NO_EVENTS] satisfies StepResult
const events: LLMEvent[] = []
const lifecycle = result.events.length ? Lifecycle.stepStart(state.lifecycle, events) : state.lifecycle
events.push(...result.events)
return [{ ...state, lifecycle, tools: result.tools }, events] satisfies StepResult
}
return [state, NO_EVENTS] satisfies StepResult
@@ -584,23 +609,30 @@ const onContentBlockStop = Effect.fn("AnthropicMessages.onContentBlockStop")(fun
) {
if (event.index === undefined) return [state, NO_EVENTS] satisfies StepResult
const result = yield* ToolStream.finish(ADAPTER, state.tools, event.index)
return [{ ...state, tools: result.tools }, result.event ? [result.event] : NO_EVENTS] satisfies StepResult
const events: LLMEvent[] = []
const resultEvents = result.events ?? []
const lifecycle = resultEvents.length
? Lifecycle.stepStart(state.lifecycle, events)
: Lifecycle.reasoningEnd(
Lifecycle.textEnd(state.lifecycle, events, `text-${event.index}`),
events,
`reasoning-${event.index}`,
)
events.push(...resultEvents)
return [{ ...state, lifecycle, tools: result.tools }, events] satisfies StepResult
})
const onMessageDelta = (state: ParserState, event: AnthropicEvent): StepResult => {
const usage = mergeUsage(state.usage, mapUsage(event.usage))
return [
{ ...state, usage },
[
LLMEvent.requestFinish({
reason: mapFinishReason(event.delta?.stop_reason),
usage,
providerMetadata: event.delta?.stop_sequence
? anthropicMetadata({ stopSequence: event.delta.stop_sequence })
: undefined,
}),
],
]
const events: LLMEvent[] = []
const lifecycle = Lifecycle.finish(state.lifecycle, events, {
reason: mapFinishReason(event.delta?.stop_reason),
usage,
providerMetadata: event.delta?.stop_sequence
? anthropicMetadata({ stopSequence: event.delta.stop_sequence })
: undefined,
})
return [{ ...state, lifecycle, usage }, events]
}
const onError = (state: ParserState, event: AnthropicEvent): StepResult => [
@@ -634,7 +666,7 @@ export const protocol = Protocol.make({
},
stream: {
event: Protocol.jsonEvent(AnthropicEvent),
initial: () => ({ tools: ToolStream.empty<number>() }),
initial: () => ({ tools: ToolStream.empty<number>(), lifecycle: Lifecycle.initial() }),
step,
},
})

View File

@@ -17,6 +17,7 @@ import { JsonObject, optionalArray, ProviderShared } from "./shared"
import { BedrockAuth, type Credentials as BedrockCredentials } from "./utils/bedrock-auth"
import { BedrockCache } from "./utils/bedrock-cache"
import { BedrockMedia } from "./utils/bedrock-media"
import { Lifecycle } from "./utils/lifecycle"
import { ToolStream } from "./utils/tool-stream"
const ADAPTER = "bedrock-converse"
@@ -420,45 +421,64 @@ interface ParserState {
// `metadata` (carries usage). Hold the terminal event in state so `onHalt`
// can emit exactly one finish after both chunks have had a chance to arrive.
readonly pendingFinish: { readonly reason: FinishReason; readonly usage?: Usage } | undefined
readonly hasToolCalls: boolean
readonly lifecycle: Lifecycle.State
}
const step = (state: ParserState, event: BedrockEvent) =>
Effect.gen(function* () {
if (event.contentBlockStart?.start?.toolUse) {
const index = event.contentBlockStart.contentBlockIndex
const events: LLMEvent[] = []
const lifecycle = Lifecycle.stepStart(state.lifecycle, events)
return [
{
...state,
lifecycle,
tools: ToolStream.start(state.tools, index, {
id: event.contentBlockStart.start.toolUse.toolUseId,
name: event.contentBlockStart.start.toolUse.name,
}),
},
[],
[
...events,
LLMEvent.toolInputStart({
id: event.contentBlockStart.start.toolUse.toolUseId,
name: event.contentBlockStart.start.toolUse.name,
}),
],
] as const
}
if (event.contentBlockDelta?.delta?.text) {
const events: LLMEvent[] = []
return [
state,
[
LLMEvent.textDelta({
id: `text-${event.contentBlockDelta.contentBlockIndex}`,
text: event.contentBlockDelta.delta.text,
}),
],
{
...state,
lifecycle: Lifecycle.textDelta(
state.lifecycle,
events,
`text-${event.contentBlockDelta.contentBlockIndex}`,
event.contentBlockDelta.delta.text,
),
},
events,
] as const
}
if (event.contentBlockDelta?.delta?.reasoningContent?.text) {
const events: LLMEvent[] = []
return [
state,
[
LLMEvent.reasoningDelta({
id: `reasoning-${event.contentBlockDelta.contentBlockIndex}`,
text: event.contentBlockDelta.delta.reasoningContent.text,
}),
],
{
...state,
lifecycle: Lifecycle.reasoningDelta(
state.lifecycle,
events,
`reasoning-${event.contentBlockDelta.contentBlockIndex}`,
event.contentBlockDelta.delta.reasoningContent.text,
),
},
events,
] as const
}
@@ -472,12 +492,33 @@ const step = (state: ParserState, event: BedrockEvent) =>
"Bedrock Converse tool delta is missing its tool call",
)
if (ToolStream.isError(result)) return yield* result
return [{ ...state, tools: result.tools }, result.event ? [result.event] : []] as const
const events: LLMEvent[] = []
const lifecycle = result.events.length ? Lifecycle.stepStart(state.lifecycle, events) : state.lifecycle
events.push(...result.events)
return [{ ...state, lifecycle, tools: result.tools }, events] as const
}
if (event.contentBlockStop) {
const result = yield* ToolStream.finish(ADAPTER, state.tools, event.contentBlockStop.contentBlockIndex)
return [{ ...state, tools: result.tools }, result.event ? [result.event] : []] as const
const events: LLMEvent[] = []
const resultEvents = result.events ?? []
const lifecycle = resultEvents.length
? Lifecycle.stepStart(state.lifecycle, events)
: Lifecycle.reasoningEnd(
Lifecycle.textEnd(state.lifecycle, events, `text-${event.contentBlockStop.contentBlockIndex}`),
events,
`reasoning-${event.contentBlockStop.contentBlockIndex}`,
)
events.push(...resultEvents)
return [
{
...state,
hasToolCalls: resultEvents.some(LLMEvent.is.toolCall) ? true : state.hasToolCalls,
lifecycle,
tools: result.tools,
},
events,
] as const
}
if (event.messageStop) {
@@ -517,7 +558,15 @@ const framing = BedrockEventStream.framing(ADAPTER)
const onHalt = (state: ParserState): ReadonlyArray<LLMEvent> =>
state.pendingFinish
? [LLMEvent.requestFinish({ reason: state.pendingFinish.reason, usage: state.pendingFinish.usage })]
? (() => {
const events: LLMEvent[] = []
Lifecycle.finish(state.lifecycle, events, {
reason:
state.pendingFinish.reason === "stop" && state.hasToolCalls ? "tool-calls" : state.pendingFinish.reason,
usage: state.pendingFinish.usage,
})
return events
})()
: []
// =============================================================================
@@ -535,7 +584,12 @@ export const protocol = Protocol.make({
},
stream: {
event: BedrockEvent,
initial: () => ({ tools: ToolStream.empty<number>(), pendingFinish: undefined }),
initial: () => ({
tools: ToolStream.empty<number>(),
pendingFinish: undefined,
hasToolCalls: false,
lifecycle: Lifecycle.initial(),
}),
step,
onHalt,
},

View File

@@ -16,6 +16,7 @@ import {
} from "../schema"
import { JsonObject, optionalArray, ProviderShared } from "./shared"
import { GeminiToolSchema } from "./utils/gemini-tool-schema"
import { Lifecycle } from "./utils/lifecycle"
const ADAPTER = "gemini"
export const DEFAULT_BASE_URL = "https://generativelanguage.googleapis.com/v1beta"
@@ -134,10 +135,9 @@ interface ParserState {
readonly hasToolCalls: boolean
readonly nextToolCallId: number
readonly usage?: Usage
readonly lifecycle: Lifecycle.State
}
const invalid = ProviderShared.invalidRequest
const mediaData = ProviderShared.mediaBytes
// =============================================================================
@@ -324,7 +324,14 @@ const mapFinishReason = (finishReason: string | undefined, hasToolCalls: boolean
const finish = (state: ParserState): ReadonlyArray<LLMEvent> =>
state.finishReason || state.usage
? [LLMEvent.requestFinish({ reason: mapFinishReason(state.finishReason, state.hasToolCalls), usage: state.usage })]
? (() => {
const events: LLMEvent[] = []
Lifecycle.finish(state.lifecycle, events, {
reason: mapFinishReason(state.finishReason, state.hasToolCalls),
usage: state.usage,
})
return events
})()
: []
const step = (state: ParserState, event: GeminiEvent) => {
@@ -341,21 +348,21 @@ const step = (state: ParserState, event: GeminiEvent) => {
const events: LLMEvent[] = []
let hasToolCalls = nextState.hasToolCalls
let lifecycle = nextState.lifecycle
let nextToolCallId = nextState.nextToolCallId
for (const part of candidate.content.parts) {
if ("text" in part && part.text.length > 0) {
events.push(
part.thought
? LLMEvent.reasoningDelta({ id: "reasoning-0", text: part.text })
: LLMEvent.textDelta({ id: "text-0", text: part.text }),
)
lifecycle = part.thought
? Lifecycle.reasoningDelta(lifecycle, events, "reasoning-0", part.text)
: Lifecycle.textDelta(lifecycle, events, "text-0", part.text)
continue
}
if ("functionCall" in part) {
const input = part.functionCall.args
const id = `tool_${nextToolCallId++}`
lifecycle = Lifecycle.stepStart(lifecycle, events)
events.push(LLMEvent.toolCall({ id, name: part.functionCall.name, input }))
hasToolCalls = true
}
@@ -365,6 +372,7 @@ const step = (state: ParserState, event: GeminiEvent) => {
{
...nextState,
hasToolCalls,
lifecycle,
nextToolCallId,
finishReason: candidate.finishReason ?? nextState.finishReason,
},
@@ -388,7 +396,7 @@ export const protocol = Protocol.make({
},
stream: {
event: Protocol.jsonEvent(GeminiEvent),
initial: () => ({ hasToolCalls: false, nextToolCallId: 0 }),
initial: () => ({ hasToolCalls: false, nextToolCallId: 0, lifecycle: Lifecycle.initial() }),
step,
onHalt: finish,
},

View File

@@ -16,6 +16,7 @@ import {
} from "../schema"
import { isRecord, JsonObject, optionalArray, optionalNull, ProviderShared } from "./shared"
import { OpenAIOptions } from "./utils/openai-options"
import { Lifecycle } from "./utils/lifecycle"
import { ToolStream } from "./utils/tool-stream"
const ADAPTER = "openai-chat"
@@ -147,6 +148,7 @@ interface ParserState {
readonly toolCallEvents: ReadonlyArray<LLMEvent>
readonly usage?: Usage
readonly finishReason?: FinishReason
readonly lifecycle: Lifecycle.State
}
const invalid = ProviderShared.invalidRequest
@@ -321,7 +323,9 @@ const step = (state: ParserState, event: OpenAIChatEvent) =>
const toolDeltas = delta?.tool_calls ?? []
let tools = state.tools
if (delta?.content) events.push(LLMEvent.textDelta({ id: "text-0", text: delta.content }))
let lifecycle = state.lifecycle
if (delta?.content) lifecycle = Lifecycle.textDelta(lifecycle, events, "text-0", delta.content)
for (const tool of toolDeltas) {
const result = ToolStream.appendOrStart(
@@ -333,7 +337,8 @@ const step = (state: ParserState, event: OpenAIChatEvent) =>
)
if (ToolStream.isError(result)) return yield* result
tools = result.tools
if (result.event) events.push(result.event)
if (result.events.length) lifecycle = Lifecycle.stepStart(lifecycle, events)
events.push(...result.events)
}
// Finalize accumulated tool inputs eagerly when finish_reason arrives so
@@ -349,15 +354,20 @@ const step = (state: ParserState, event: OpenAIChatEvent) =>
toolCallEvents: finished?.events ?? state.toolCallEvents,
usage,
finishReason,
lifecycle,
},
events,
] as const
})
const finishEvents = (state: ParserState): ReadonlyArray<LLMEvent> => {
const events: LLMEvent[] = []
const hasToolCalls = state.toolCallEvents.length > 0
const reason = state.finishReason === "stop" && hasToolCalls ? "tool-calls" : state.finishReason
return [...state.toolCallEvents, ...(reason ? [LLMEvent.requestFinish({ reason, usage: state.usage })] : [])]
const lifecycle = state.toolCallEvents.length ? Lifecycle.stepStart(state.lifecycle, events) : state.lifecycle
events.push(...state.toolCallEvents)
if (reason) Lifecycle.finish(lifecycle, events, { reason, usage: state.usage })
return events
}
// =============================================================================
@@ -377,7 +387,7 @@ export const protocol = Protocol.make({
},
stream: {
event: Protocol.jsonEvent(OpenAIChatEvent),
initial: () => ({ tools: ToolStream.empty<number>(), toolCallEvents: [] }),
initial: () => ({ tools: ToolStream.empty<number>(), toolCallEvents: [], lifecycle: Lifecycle.initial() }),
step,
onHalt: finishEvents,
},

View File

@@ -17,6 +17,7 @@ import {
} from "../schema"
import { JsonObject, optionalArray, optionalNull, ProviderShared } from "./shared"
import { OpenAIOptions } from "./utils/openai-options"
import { Lifecycle } from "./utils/lifecycle"
import { ToolStream } from "./utils/tool-stream"
const ADAPTER = "openai-responses"
@@ -165,6 +166,7 @@ type OpenAIResponsesEvent = Schema.Schema.Type<typeof OpenAIResponsesEvent>
interface ParserState {
readonly tools: ToolStream.State<string>
readonly hasFunctionCall: boolean
readonly lifecycle: Lifecycle.State
}
const invalid = ProviderShared.invalidRequest
@@ -385,23 +387,32 @@ const TERMINAL_TYPES = new Set(["response.completed", "response.incomplete", "re
const onOutputTextDelta = (state: ParserState, event: OpenAIResponsesEvent): StepResult => {
if (!event.delta) return [state, NO_EVENTS]
return [state, [LLMEvent.textDelta({ id: event.item_id ?? "text-0", text: event.delta })]]
const events: LLMEvent[] = []
return [
{ ...state, lifecycle: Lifecycle.textDelta(state.lifecycle, events, event.item_id ?? "text-0", event.delta) },
events,
]
}
const onOutputItemAdded = (state: ParserState, event: OpenAIResponsesEvent): StepResult => {
const item = event.item
if (item?.type !== "function_call" || !item.id) return [state, NO_EVENTS]
const providerMetadata = openaiMetadata({ itemId: item.id })
const events: LLMEvent[] = []
const lifecycle = Lifecycle.stepStart(state.lifecycle, events)
return [
{
...state,
lifecycle,
hasFunctionCall: state.hasFunctionCall,
tools: ToolStream.start(state.tools, item.id, {
id: item.call_id ?? item.id,
name: item.name ?? "",
input: item.arguments ?? "",
providerMetadata: openaiMetadata({ itemId: item.id }),
providerMetadata,
}),
},
NO_EVENTS,
[...events, LLMEvent.toolInputStart({ id: item.call_id ?? item.id, name: item.name ?? "", providerMetadata })],
]
}
@@ -418,10 +429,10 @@ const onFunctionCallArgumentsDelta = Effect.fn("OpenAIResponses.onFunctionCallAr
"OpenAI Responses tool argument delta is missing its tool call",
)
if (ToolStream.isError(result)) return yield* result
return [
{ hasFunctionCall: state.hasFunctionCall, tools: result.tools },
result.event ? [result.event] : NO_EVENTS,
] satisfies StepResult
const events: LLMEvent[] = []
const lifecycle = result.events.length ? Lifecycle.stepStart(state.lifecycle, events) : state.lifecycle
events.push(...result.events)
return [{ ...state, lifecycle, tools: result.tools }, events] satisfies StepResult
})
const onOutputItemDone = Effect.fn("OpenAIResponses.onOutputItemDone")(function* (
@@ -440,33 +451,46 @@ const onOutputItemDone = Effect.fn("OpenAIResponses.onOutputItemDone")(function*
item.arguments === undefined
? yield* ToolStream.finish(ADAPTER, tools, item.id)
: yield* ToolStream.finishWithInput(ADAPTER, tools, item.id, item.arguments)
const events: LLMEvent[] = []
const resultEvents = result.events ?? []
const lifecycle = resultEvents.length ? Lifecycle.stepStart(state.lifecycle, events) : state.lifecycle
events.push(...resultEvents)
return [
{ hasFunctionCall: result.event ? true : state.hasFunctionCall, tools: result.tools },
result.event ? [result.event] : NO_EVENTS,
{
...state,
lifecycle,
hasFunctionCall: resultEvents.some(LLMEvent.is.toolCall) ? true : state.hasFunctionCall,
tools: result.tools,
},
events,
] satisfies StepResult
}
if (isHostedToolItem(item)) return [state, hostedToolEvents(item)] satisfies StepResult
if (isHostedToolItem(item)) {
const events: LLMEvent[] = []
const lifecycle = Lifecycle.stepStart(state.lifecycle, events)
events.push(...hostedToolEvents(item))
return [{ ...state, lifecycle }, events] satisfies StepResult
}
return [state, NO_EVENTS] satisfies StepResult
})
const onResponseFinish = (state: ParserState, event: OpenAIResponsesEvent): StepResult => [
state,
[
LLMEvent.requestFinish({
reason: mapFinishReason(event, state.hasFunctionCall),
usage: mapUsage(event.response?.usage),
providerMetadata:
event.response?.id || event.response?.service_tier
? openaiMetadata({
responseId: event.response.id,
serviceTier: event.response.service_tier,
})
: undefined,
}),
],
]
const onResponseFinish = (state: ParserState, event: OpenAIResponsesEvent): StepResult => {
const events: LLMEvent[] = []
const lifecycle = Lifecycle.finish(state.lifecycle, events, {
reason: mapFinishReason(event, state.hasFunctionCall),
usage: mapUsage(event.response?.usage),
providerMetadata:
event.response?.id || event.response?.service_tier
? openaiMetadata({
responseId: event.response.id,
serviceTier: event.response.service_tier,
})
: undefined,
})
return [{ ...state, lifecycle }, events]
}
const onResponseFailed = (state: ParserState, event: OpenAIResponsesEvent): StepResult => [
state,
@@ -506,7 +530,7 @@ export const protocol = Protocol.make({
},
stream: {
event: Protocol.jsonEvent(OpenAIResponsesEvent),
initial: () => ({ hasFunctionCall: false, tools: ToolStream.empty<string>() }),
initial: () => ({ hasFunctionCall: false, tools: ToolStream.empty<string>(), lifecycle: Lifecycle.initial() }),
step,
terminal: (event) => TERMINAL_TYPES.has(event.type),
},

View File

@@ -0,0 +1,88 @@
import { LLMEvent, type FinishReason, type ProviderMetadata, type Usage } from "../../schema"
export interface State {
readonly stepStarted: boolean
readonly text: ReadonlySet<string>
readonly reasoning: ReadonlySet<string>
}
export const initial = (): State => ({ stepStarted: false, text: new Set(), reasoning: new Set() })
export const stepStart = (state: State, events: LLMEvent[]): State => {
if (state.stepStarted) return state
events.push(LLMEvent.stepStart({ index: 0 }))
return { ...state, stepStarted: true }
}
export const textDelta = (state: State, events: LLMEvent[], id: string, text: string): State => {
const stepped = stepStart(state, events)
if (stepped.text.has(id)) {
events.push(LLMEvent.textDelta({ id, text }))
return stepped
}
events.push(LLMEvent.textStart({ id }), LLMEvent.textDelta({ id, text }))
return { ...stepped, text: new Set([...stepped.text, id]) }
}
export const reasoningDelta = (state: State, events: LLMEvent[], id: string, text: string): State => {
const stepped = stepStart(state, events)
if (stepped.reasoning.has(id)) {
events.push(LLMEvent.reasoningDelta({ id, text }))
return stepped
}
events.push(LLMEvent.reasoningStart({ id }), LLMEvent.reasoningDelta({ id, text }))
return { ...stepped, reasoning: new Set([...stepped.reasoning, id]) }
}
export const reasoningEnd = (
state: State,
events: LLMEvent[],
id: string,
providerMetadata?: ProviderMetadata,
): State => {
if (!state.reasoning.has(id)) return state
const stepped = stepStart(state, events)
events.push(LLMEvent.reasoningEnd({ id, providerMetadata }))
const reasoning = new Set(stepped.reasoning)
reasoning.delete(id)
return { ...stepped, reasoning }
}
export const textEnd = (state: State, events: LLMEvent[], id: string, providerMetadata?: ProviderMetadata): State => {
if (!state.text.has(id)) return state
const stepped = stepStart(state, events)
events.push(LLMEvent.textEnd({ id, providerMetadata }))
const text = new Set(stepped.text)
text.delete(id)
return { ...stepped, text }
}
const closeOpenBlocks = (state: State, events: LLMEvent[]): State => {
for (const id of state.reasoning) events.push(LLMEvent.reasoningEnd({ id }))
for (const id of state.text) events.push(LLMEvent.textEnd({ id }))
return { ...state, text: new Set(), reasoning: new Set() }
}
export const finish = (
state: State,
events: LLMEvent[],
input: {
readonly reason: FinishReason
readonly usage?: Usage
readonly providerMetadata?: ProviderMetadata
},
): State => {
const stepped = closeOpenBlocks(stepStart(state, events), events)
events.push(
LLMEvent.stepFinish({
index: 0,
reason: input.reason,
usage: input.usage,
providerMetadata: input.providerMetadata,
}),
LLMEvent.requestFinish(input),
)
return { ...stepped, stepStarted: false }
}
export * as Lifecycle from "./lifecycle"

View File

@@ -1,5 +1,5 @@
import { Effect } from "effect"
import { LLMError, LLMEvent, type ProviderMetadata, type ToolCall, type ToolInputDelta } from "../../schema"
import { LLMError, LLMEvent, type ProviderMetadata, type ToolCall } from "../../schema"
import { eventError, parseToolInput, type ToolAccumulator } from "../shared"
type StreamKey = string | number
@@ -27,13 +27,13 @@ export type State<K extends StreamKey> = Partial<Record<K, PendingTool>>
/**
* Result of adding argument text to one pending tool call. It returns both the
* next `tools` state and the updated `tool` because parsers often need the
* current id/name immediately. `event` is present only when new text arrived;
* metadata-only deltas update identity without emitting `tool-input-delta`.
* current id/name immediately. `events` contains lifecycle and delta events
* produced by the append; metadata-only deltas update identity without output.
*/
export interface AppendOutcome<K extends StreamKey> {
readonly tools: State<K>
readonly tool: PendingTool
readonly event?: ToolInputDelta
readonly events: ReadonlyArray<LLMEvent>
}
/** Create empty accumulator state for one provider stream. */
@@ -49,7 +49,14 @@ const withoutTool = <K extends StreamKey>(tools: State<K>, key: K): State<K> =>
return next
}
const inputDelta = (tool: PendingTool, text: string): ToolInputDelta =>
const inputStart = (tool: PendingTool) =>
LLMEvent.toolInputStart({
id: tool.id,
name: tool.name,
providerMetadata: tool.providerMetadata,
})
const inputDelta = (tool: PendingTool, text: string) =>
LLMEvent.toolInputDelta({
id: tool.id,
name: tool.name,
@@ -76,11 +83,16 @@ const appendTool = <K extends StreamKey>(
key: K,
tool: PendingTool,
text: string,
): AppendOutcome<K> => ({
tools: withTool(tools, key, tool),
tool,
event: text.length === 0 ? undefined : inputDelta(tool, text),
})
): AppendOutcome<K> => {
const events: LLMEvent[] = []
if (!tools[key]) events.push(inputStart(tool))
if (text.length > 0) events.push(inputDelta(tool, text))
return {
tools: withTool(tools, key, tool),
tool,
events,
}
}
export const isError = <K extends StreamKey>(result: AppendOutcome<K> | LLMError): result is LLMError =>
result instanceof LLMError
@@ -121,7 +133,8 @@ export const appendOrStart = <K extends StreamKey>(
providerExecuted: current?.providerExecuted,
providerMetadata: current?.providerMetadata,
}
if (current && delta.text.length === 0 && current.id === id && current.name === name) return { tools, tool: current }
if (current && delta.text.length === 0 && current.id === id && current.name === name)
return { tools, tool: current, events: [] }
return appendTool(tools, key, tool, delta.text)
}
@@ -139,7 +152,7 @@ export const appendExisting = <K extends StreamKey>(
): AppendOutcome<K> | LLMError => {
const current = tools[key]
if (!current) return eventError(route, missingToolMessage)
if (text.length === 0) return { tools, tool: current }
if (text.length === 0) return { tools, tool: current, events: [] }
return appendTool(tools, key, { ...current, input: `${current.input}${text}` }, text)
}
@@ -152,7 +165,13 @@ export const finish = <K extends StreamKey>(route: string, tools: State<K>, key:
Effect.gen(function* () {
const tool = tools[key]
if (!tool) return { tools }
return { tools: withoutTool(tools, key), event: yield* toolCall(route, tool) }
return {
tools: withoutTool(tools, key),
events: [
LLMEvent.toolInputEnd({ id: tool.id, name: tool.name, providerMetadata: tool.providerMetadata }),
yield* toolCall(route, tool),
],
}
})
/**
@@ -164,7 +183,13 @@ export const finishWithInput = <K extends StreamKey>(route: string, tools: State
Effect.gen(function* () {
const tool = tools[key]
if (!tool) return { tools }
return { tools: withoutTool(tools, key), event: yield* toolCall(route, tool, input) }
return {
tools: withoutTool(tools, key),
events: [
LLMEvent.toolInputEnd({ id: tool.id, name: tool.name, providerMetadata: tool.providerMetadata }),
yield* toolCall(route, tool, input),
],
}
})
/**
@@ -179,7 +204,14 @@ export const finishAll = <K extends StreamKey>(route: string, tools: State<K>) =
)
return {
tools: empty<K>(),
events: yield* Effect.forEach(pending, (tool) => toolCall(route, tool)),
events: yield* Effect.forEach(pending, (tool) =>
toolCall(route, tool).pipe(
Effect.map((call) => [
LLMEvent.toolInputEnd({ id: tool.id, name: tool.name, providerMetadata: tool.providerMetadata }),
call,
]),
),
).pipe(Effect.map((events) => events.flat())),
}
})

View File

@@ -154,8 +154,8 @@ const accumulate = (state: StepState, event: LLMEvent) => {
)
return
}
if (event.type === "request-finish") {
state.finishReason = event.reason
if (event.type === "step-finish" || event.type === "request-finish") {
state.finishReason = event.reason === "stop" && state.toolCalls.length > 0 ? "tool-calls" : event.reason
}
}

View File

@@ -146,24 +146,46 @@ describe("Anthropic Messages route", () => {
tools: [{ name: "lookup", description: "Lookup data", inputSchema: { type: "object" } }],
}),
).pipe(Effect.provide(fixedResponse(body)))
const usage = new Usage({
inputTokens: 5,
outputTokens: 1,
nonCachedInputTokens: 5,
cacheReadInputTokens: undefined,
cacheWriteInputTokens: undefined,
totalTokens: 6,
providerMetadata: { anthropic: { input_tokens: 5, output_tokens: 1 } },
})
expect(response.toolCalls).toEqual([
{ type: "tool-call", id: "call_1", name: "lookup", input: { query: "weather" } },
{
type: "tool-call",
id: "call_1",
name: "lookup",
input: { query: "weather" },
providerExecuted: undefined,
providerMetadata: undefined,
},
])
expect(response.events).toEqual([
{ type: "step-start", index: 0 },
{ type: "tool-input-start", id: "call_1", name: "lookup" },
{ type: "tool-input-delta", id: "call_1", name: "lookup", text: '{"query"' },
{ type: "tool-input-delta", id: "call_1", name: "lookup", text: ':"weather"}' },
{ type: "tool-call", id: "call_1", name: "lookup", input: { query: "weather" } },
{ type: "tool-input-end", id: "call_1", name: "lookup", providerMetadata: undefined },
{
type: "tool-call",
id: "call_1",
name: "lookup",
input: { query: "weather" },
providerExecuted: undefined,
providerMetadata: undefined,
},
{ type: "step-finish", index: 0, reason: "tool-calls", usage, providerMetadata: undefined },
{
type: "request-finish",
reason: "tool-calls",
usage: new Usage({
inputTokens: 5,
outputTokens: 1,
nonCachedInputTokens: 5,
totalTokens: 6,
providerMetadata: { anthropic: { input_tokens: 5, output_tokens: 1 } },
}),
providerMetadata: undefined,
usage,
},
])
}),

View File

@@ -204,30 +204,37 @@ describe("Gemini route", () => {
reasoningTokens: 1,
totalTokens: 7,
})
const usage = new Usage({
inputTokens: 5,
outputTokens: 3,
nonCachedInputTokens: 4,
cacheReadInputTokens: 1,
reasoningTokens: 1,
totalTokens: 7,
providerMetadata: {
google: {
promptTokenCount: 5,
candidatesTokenCount: 2,
totalTokenCount: 7,
thoughtsTokenCount: 1,
cachedContentTokenCount: 1,
},
},
})
expect(response.events).toEqual([
{ type: "step-start", index: 0 },
{ type: "reasoning-start", id: "reasoning-0" },
{ type: "reasoning-delta", id: "reasoning-0", text: "thinking" },
{ type: "text-start", id: "text-0" },
{ type: "text-delta", id: "text-0", text: "Hello" },
{ type: "text-delta", id: "text-0", text: "!" },
{ type: "reasoning-end", id: "reasoning-0" },
{ type: "text-end", id: "text-0" },
{ type: "step-finish", index: 0, reason: "stop", usage, providerMetadata: undefined },
{
type: "request-finish",
reason: "stop",
usage: new Usage({
inputTokens: 5,
outputTokens: 3,
nonCachedInputTokens: 4,
cacheReadInputTokens: 1,
reasoningTokens: 1,
totalTokens: 7,
providerMetadata: {
google: {
promptTokenCount: 5,
candidatesTokenCount: 2,
totalTokenCount: 7,
thoughtsTokenCount: 1,
cachedContentTokenCount: 1,
},
},
}),
usage,
},
])
}),
@@ -252,22 +259,41 @@ describe("Gemini route", () => {
tools: [{ name: "lookup", description: "Lookup data", inputSchema: { type: "object" } }],
}),
).pipe(Effect.provide(fixedResponse(body)))
const usage = new Usage({
inputTokens: 5,
outputTokens: 1,
nonCachedInputTokens: 5,
cacheReadInputTokens: undefined,
reasoningTokens: undefined,
totalTokens: 6,
providerMetadata: { google: { promptTokenCount: 5, candidatesTokenCount: 1 } },
})
expect(response.toolCalls).toEqual([
{ type: "tool-call", id: "tool_0", name: "lookup", input: { query: "weather" } },
{
type: "tool-call",
id: "tool_0",
name: "lookup",
input: { query: "weather" },
providerExecuted: undefined,
providerMetadata: undefined,
},
])
expect(response.events).toEqual([
{ type: "tool-call", id: "tool_0", name: "lookup", input: { query: "weather" } },
{ type: "step-start", index: 0 },
{
type: "tool-call",
id: "tool_0",
name: "lookup",
input: { query: "weather" },
providerExecuted: undefined,
providerMetadata: undefined,
},
{ type: "step-finish", index: 0, reason: "tool-calls", usage, providerMetadata: undefined },
{
type: "request-finish",
reason: "tool-calls",
usage: new Usage({
inputTokens: 5,
outputTokens: 1,
nonCachedInputTokens: 5,
totalTokens: 6,
providerMetadata: { google: { promptTokenCount: 5, candidatesTokenCount: 1 } },
}),
usage,
},
])
}),
@@ -318,8 +344,10 @@ describe("Gemini route", () => {
),
)
expect(length.events).toEqual([{ type: "request-finish", reason: "length" }])
expect(filtered.events).toEqual([{ type: "request-finish", reason: "content-filter" }])
expect(length.events.map((event) => event.type)).toEqual(["step-start", "step-finish", "request-finish"])
expect(length.events.at(-1)).toMatchObject({ type: "request-finish", reason: "length" })
expect(filtered.events.map((event) => event.type)).toEqual(["step-start", "step-finish", "request-finish"])
expect(filtered.events.at(-1)).toMatchObject({ type: "request-finish", reason: "content-filter" })
}),
)

View File

@@ -222,31 +222,36 @@ describe("OpenAI Chat route", () => {
}),
)
const response = yield* LLMClient.generate(request).pipe(Effect.provide(fixedResponse(body)))
const usage = new Usage({
inputTokens: 5,
outputTokens: 2,
nonCachedInputTokens: 4,
cacheReadInputTokens: 1,
reasoningTokens: 0,
totalTokens: 7,
providerMetadata: {
openai: {
prompt_tokens: 5,
completion_tokens: 2,
total_tokens: 7,
prompt_tokens_details: { cached_tokens: 1 },
completion_tokens_details: { reasoning_tokens: 0 },
},
},
})
expect(response.text).toBe("Hello!")
expect(response.events).toEqual([
{ type: "step-start", index: 0 },
{ type: "text-start", id: "text-0" },
{ type: "text-delta", id: "text-0", text: "Hello" },
{ type: "text-delta", id: "text-0", text: "!" },
{ type: "text-end", id: "text-0" },
{ type: "step-finish", index: 0, reason: "stop", usage, providerMetadata: undefined },
{
type: "request-finish",
reason: "stop",
usage: new Usage({
inputTokens: 5,
outputTokens: 2,
nonCachedInputTokens: 4,
cacheReadInputTokens: 1,
reasoningTokens: 0,
totalTokens: 7,
providerMetadata: {
openai: {
prompt_tokens: 5,
completion_tokens: 2,
total_tokens: 7,
prompt_tokens_details: { cached_tokens: 1 },
completion_tokens_details: { reasoning_tokens: 0 },
},
},
}),
usage,
},
])
}),
@@ -269,9 +274,20 @@ describe("OpenAI Chat route", () => {
).pipe(Effect.provide(fixedResponse(body)))
expect(response.events).toEqual([
{ type: "step-start", index: 0 },
{ type: "tool-input-start", id: "call_1", name: "lookup", providerMetadata: undefined },
{ type: "tool-input-delta", id: "call_1", name: "lookup", text: '{"query"' },
{ type: "tool-input-delta", id: "call_1", name: "lookup", text: ':"weather"}' },
{ type: "tool-call", id: "call_1", name: "lookup", input: { query: "weather" } },
{ type: "tool-input-end", id: "call_1", name: "lookup", providerMetadata: undefined },
{
type: "tool-call",
id: "call_1",
name: "lookup",
input: { query: "weather" },
providerExecuted: undefined,
providerMetadata: undefined,
},
{ type: "step-finish", index: 0, reason: "tool-calls", usage: undefined, providerMetadata: undefined },
{ type: "request-finish", reason: "tool-calls", usage: undefined },
])
}),
@@ -293,6 +309,8 @@ describe("OpenAI Chat route", () => {
).pipe(Effect.provide(fixedResponse(body)))
expect(response.events).toEqual([
{ type: "step-start", index: 0 },
{ type: "tool-input-start", id: "call_1", name: "lookup", providerMetadata: undefined },
{ type: "tool-input-delta", id: "call_1", name: "lookup", text: '{"query"' },
{ type: "tool-input-delta", id: "call_1", name: "lookup", text: ':"weather"}' },
])
@@ -352,7 +370,7 @@ describe("OpenAI Chat route", () => {
const events = Array.from(
yield* LLMClient.stream(request).pipe(Stream.take(1), Stream.runCollect, Effect.provide(fixedResponse(body))),
)
expect(events.map((event) => event.type)).toEqual(["text-delta"])
expect(events.map((event) => event.type)).toEqual(["step-start"])
}),
)
})

View File

@@ -333,32 +333,43 @@ describe("OpenAI Responses route", () => {
},
)
const response = yield* LLMClient.generate(request).pipe(Effect.provide(fixedResponse(body)))
const usage = new Usage({
inputTokens: 5,
outputTokens: 2,
nonCachedInputTokens: 4,
cacheReadInputTokens: 1,
reasoningTokens: 0,
totalTokens: 7,
providerMetadata: {
openai: {
input_tokens: 5,
output_tokens: 2,
total_tokens: 7,
input_tokens_details: { cached_tokens: 1 },
output_tokens_details: { reasoning_tokens: 0 },
},
},
})
expect(response.text).toBe("Hello!")
expect(response.events).toEqual([
{ type: "step-start", index: 0 },
{ type: "text-start", id: "msg_1" },
{ type: "text-delta", id: "msg_1", text: "Hello" },
{ type: "text-delta", id: "msg_1", text: "!" },
{ type: "text-end", id: "msg_1" },
{
type: "step-finish",
index: 0,
reason: "stop",
providerMetadata: { openai: { responseId: "resp_1", serviceTier: "default" } },
usage,
},
{
type: "request-finish",
reason: "stop",
providerMetadata: { openai: { responseId: "resp_1", serviceTier: "default" } },
usage: new Usage({
inputTokens: 5,
outputTokens: 2,
nonCachedInputTokens: 4,
cacheReadInputTokens: 1,
reasoningTokens: 0,
totalTokens: 7,
providerMetadata: {
openai: {
input_tokens: 5,
output_tokens: 2,
total_tokens: 7,
input_tokens_details: { cached_tokens: 1 },
output_tokens_details: { reasoning_tokens: 0 },
},
},
}),
usage,
},
])
}),
@@ -390,8 +401,24 @@ describe("OpenAI Responses route", () => {
tools: [{ name: "lookup", description: "Lookup data", inputSchema: { type: "object" } }],
}),
).pipe(Effect.provide(fixedResponse(body)))
const usage = new Usage({
inputTokens: 5,
outputTokens: 1,
nonCachedInputTokens: 5,
cacheReadInputTokens: undefined,
reasoningTokens: undefined,
totalTokens: 6,
providerMetadata: { openai: { input_tokens: 5, output_tokens: 1 } },
})
expect(response.events).toEqual([
{ type: "step-start", index: 0 },
{
type: "tool-input-start",
id: "call_1",
name: "lookup",
providerMetadata: { openai: { itemId: "item_1" } },
},
{
type: "tool-input-delta",
id: "call_1",
@@ -404,23 +431,26 @@ describe("OpenAI Responses route", () => {
name: "lookup",
text: ':"weather"}',
},
{
type: "tool-input-end",
id: "call_1",
name: "lookup",
providerMetadata: { openai: { itemId: "item_1" } },
},
{
type: "tool-call",
id: "call_1",
name: "lookup",
input: { query: "weather" },
providerExecuted: undefined,
providerMetadata: { openai: { itemId: "item_1" } },
},
{ type: "step-finish", index: 0, reason: "tool-calls", usage, providerMetadata: undefined },
{
type: "request-finish",
reason: "tool-calls",
usage: new Usage({
inputTokens: 5,
outputTokens: 1,
nonCachedInputTokens: 5,
totalTokens: 6,
providerMetadata: { openai: { input_tokens: 5, output_tokens: 1 } },
}),
providerMetadata: undefined,
usage,
},
])
}),

View File

@@ -313,7 +313,14 @@ describe("LLMClient tools", () => {
),
)
expect(events.map((event) => event.type)).toEqual(["text-delta", "request-finish"])
expect(events.map((event) => event.type)).toEqual([
"step-start",
"text-start",
"text-delta",
"text-end",
"step-finish",
"request-finish",
])
expect(LLMResponse.text({ events })).toBe("Done.")
}),
)

View File

@@ -21,11 +21,17 @@ describe("ToolStream", () => {
if (ToolStream.isError(second)) return yield* second
const finished = yield* ToolStream.finish(ADAPTER, second.tools, 0)
expect(first.event).toEqual({ type: "tool-input-delta", id: "call_1", name: "lookup", text: '{"query"' })
expect(second.event).toEqual({ type: "tool-input-delta", id: "call_1", name: "lookup", text: ':"weather"}' })
expect(first.events).toEqual([
{ type: "tool-input-start", id: "call_1", name: "lookup" },
{ type: "tool-input-delta", id: "call_1", name: "lookup", text: '{"query"' },
])
expect(second.events).toEqual([{ type: "tool-input-delta", id: "call_1", name: "lookup", text: ':"weather"}' }])
expect(finished).toEqual({
tools: {},
event: { type: "tool-call", id: "call_1", name: "lookup", input: { query: "weather" } },
events: [
{ type: "tool-input-end", id: "call_1", name: "lookup" },
{ type: "tool-call", id: "call_1", name: "lookup", input: { query: "weather" } },
],
})
}),
)
@@ -50,7 +56,10 @@ describe("ToolStream", () => {
expect(finished).toEqual({
tools: {},
event: { type: "tool-call", id: "call_1", name: "lookup", input: { query: "final" } },
events: [
{ type: "tool-input-end", id: "call_1", name: "lookup" },
{ type: "tool-call", id: "call_1", name: "lookup", input: { query: "final" } },
],
})
}),
)
@@ -73,7 +82,9 @@ describe("ToolStream", () => {
expect(finished).toEqual({
tools: {},
events: [
{ type: "tool-input-end", id: "call_1", name: "lookup" },
{ type: "tool-call", id: "call_1", name: "lookup", input: {} },
{ type: "tool-input-end", id: "call_2", name: "web_search" },
{
type: "tool-call",
id: "call_2",