fix(session): rely on native LLM lifecycle events

This commit is contained in:
Kit Langton
2026-05-11 21:33:52 -04:00
parent 4c75b463e7
commit a6e8ec4b35

View File

@@ -293,110 +293,12 @@ export const layer: Layer.Layer<
return {
title: value.name,
metadata: value.result.type === "json" && isRecord(value.result.value) ? value.result.value : {},
output:
typeof value.result.value === "string" ? value.result.value : (JSON.stringify(value.result.value) ?? ""),
output: typeof value.result.value === "string" ? value.result.value : (JSON.stringify(value.result.value) ?? ""),
}
}
const toolInput = (value: unknown): Record<string, any> => (isRecord(value) ? value : { value })
const finishStep = Effect.fn("SessionProcessor.finishStep")(function* (
value: Extract<StreamEvent, { type: "step-finish" | "request-finish" }>,
) {
const completedSnapshot = yield* snapshot.track()
yield* Effect.forEach(Object.keys(ctx.reasoningMap), finishReasoning)
const usage = Session.getUsage({
model: ctx.model,
usage: value.usage ?? new Usage({}),
metadata: value.providerMetadata,
})
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
yield* sync.run(SessionEvent.Step.Ended.Sync, {
sessionID: ctx.sessionID,
finish: value.reason,
cost: usage.cost,
tokens: usage.tokens,
snapshot: completedSnapshot,
timestamp: DateTime.makeUnsafe(Date.now()),
})
}
}
ctx.assistantMessage.finish = value.reason
ctx.assistantMessage.cost += usage.cost
ctx.assistantMessage.tokens = usage.tokens
yield* session.updatePart({
id: PartID.ascending(),
reason: value.reason,
snapshot: completedSnapshot,
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "step-finish",
tokens: usage.tokens,
cost: usage.cost,
})
yield* session.updateMessage(ctx.assistantMessage)
if (ctx.snapshot) {
const patch = yield* snapshot.patch(ctx.snapshot)
if (patch.files.length) {
yield* session.updatePart({
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.sessionID,
type: "patch",
hash: patch.hash,
files: patch.files,
})
}
ctx.snapshot = undefined
}
yield* summary
.summarize({
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.parentID,
})
.pipe(Effect.ignore, Effect.forkIn(scope))
if (
!ctx.assistantMessage.summary &&
isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model })
) {
ctx.needsCompaction = true
}
})
const finishText = Effect.fn("SessionProcessor.finishText")(function* (
providerMetadata?: Extract<StreamEvent, { type: "text-end" }>["providerMetadata"],
) {
if (!ctx.currentText) return
// oxlint-disable-next-line no-self-assign -- reactivity trigger
ctx.currentText.text = ctx.currentText.text
ctx.currentText.text = (yield* plugin.trigger(
"experimental.text.complete",
{
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.id,
partID: ctx.currentText.id,
},
{ text: ctx.currentText.text },
)).text
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
yield* sync.run(SessionEvent.Text.Ended.Sync, {
sessionID: ctx.sessionID,
text: ctx.currentText.text,
timestamp: DateTime.makeUnsafe(Date.now()),
})
}
}
const end = Date.now()
ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end }
if (providerMetadata) ctx.currentText.metadata = providerMetadata
yield* session.updatePart(ctx.currentText)
ctx.currentText = undefined
})
const handleEvent = Effect.fnUntraced(function* (value: StreamEvent) {
switch (value.type) {
case "request-start":
@@ -679,7 +581,66 @@ export const layer: Layer.Layer<
return
case "step-finish": {
yield* finishStep(value)
const completedSnapshot = yield* snapshot.track()
yield* Effect.forEach(Object.keys(ctx.reasoningMap), finishReasoning)
const usage = Session.getUsage({
model: ctx.model,
usage: value.usage ?? new Usage({}),
metadata: value.providerMetadata,
})
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
yield* sync.run(SessionEvent.Step.Ended.Sync, {
sessionID: ctx.sessionID,
finish: value.reason,
cost: usage.cost,
tokens: usage.tokens,
snapshot: completedSnapshot,
timestamp: DateTime.makeUnsafe(Date.now()),
})
}
}
ctx.assistantMessage.finish = value.reason
ctx.assistantMessage.cost += usage.cost
ctx.assistantMessage.tokens = usage.tokens
yield* session.updatePart({
id: PartID.ascending(),
reason: value.reason,
snapshot: completedSnapshot,
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "step-finish",
tokens: usage.tokens,
cost: usage.cost,
})
yield* session.updateMessage(ctx.assistantMessage)
if (ctx.snapshot) {
const patch = yield* snapshot.patch(ctx.snapshot)
if (patch.files.length) {
yield* session.updatePart({
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.sessionID,
type: "patch",
hash: patch.hash,
files: patch.files,
})
}
ctx.snapshot = undefined
}
yield* summary
.summarize({
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.parentID,
})
.pipe(Effect.ignore, Effect.forkIn(scope))
if (
!ctx.assistantMessage.summary &&
isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model })
) {
ctx.needsCompaction = true
}
return
}
@@ -706,17 +667,7 @@ export const layer: Layer.Layer<
return
case "text-delta":
if (!ctx.currentText) {
ctx.currentText = {
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "text",
text: "",
time: { start: Date.now() },
}
yield* session.updatePart(ctx.currentText)
}
if (!ctx.currentText) return
ctx.currentText.text += value.text
yield* session.updatePartDelta({
sessionID: ctx.currentText.sessionID,
@@ -728,13 +679,40 @@ export const layer: Layer.Layer<
return
case "text-end":
yield* finishText(value.providerMetadata)
if (!ctx.currentText) return
// oxlint-disable-next-line no-self-assign -- reactivity trigger
ctx.currentText.text = ctx.currentText.text
ctx.currentText.text = (yield* plugin.trigger(
"experimental.text.complete",
{
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.id,
partID: ctx.currentText.id,
},
{ text: ctx.currentText.text },
)).text
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
yield* sync.run(SessionEvent.Text.Ended.Sync, {
sessionID: ctx.sessionID,
text: ctx.currentText.text,
timestamp: DateTime.makeUnsafe(Date.now()),
})
}
}
{
const end = Date.now()
ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end }
}
if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata
yield* session.updatePart(ctx.currentText)
ctx.currentText = undefined
return
case "request-finish":
yield* finishText()
if (!ctx.assistantMessage.finish) yield* finishStep(value)
return
}
})