fix(session): finish native request streams

This commit is contained in:
Kit Langton
2026-05-11 19:42:38 -04:00
parent a878036f63
commit 03b60f7623

View File

@@ -293,12 +293,78 @@ export const layer: Layer.Layer<
return {
title: value.name,
metadata: value.result.type === "json" && isRecord(value.result.value) ? value.result.value : {},
output: typeof value.result.value === "string" ? value.result.value : (JSON.stringify(value.result.value) ?? ""),
output:
typeof value.result.value === "string" ? value.result.value : (JSON.stringify(value.result.value) ?? ""),
}
}
const toolInput = (value: unknown): Record<string, any> => (isRecord(value) ? value : { value })
const finishStep = Effect.fn("SessionProcessor.finishStep")(function* (
value: Extract<StreamEvent, { type: "step-finish" | "request-finish" }>,
) {
const completedSnapshot = yield* snapshot.track()
yield* Effect.forEach(Object.keys(ctx.reasoningMap), finishReasoning)
const usage = Session.getUsage({
model: ctx.model,
usage: value.usage ?? new Usage({}),
metadata: value.providerMetadata,
})
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
yield* sync.run(SessionEvent.Step.Ended.Sync, {
sessionID: ctx.sessionID,
finish: value.reason,
cost: usage.cost,
tokens: usage.tokens,
snapshot: completedSnapshot,
timestamp: DateTime.makeUnsafe(Date.now()),
})
}
}
ctx.assistantMessage.finish = value.reason
ctx.assistantMessage.cost += usage.cost
ctx.assistantMessage.tokens = usage.tokens
yield* session.updatePart({
id: PartID.ascending(),
reason: value.reason,
snapshot: completedSnapshot,
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "step-finish",
tokens: usage.tokens,
cost: usage.cost,
})
yield* session.updateMessage(ctx.assistantMessage)
if (ctx.snapshot) {
const patch = yield* snapshot.patch(ctx.snapshot)
if (patch.files.length) {
yield* session.updatePart({
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.sessionID,
type: "patch",
hash: patch.hash,
files: patch.files,
})
}
ctx.snapshot = undefined
}
yield* summary
.summarize({
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.parentID,
})
.pipe(Effect.ignore, Effect.forkIn(scope))
if (
!ctx.assistantMessage.summary &&
isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model })
) {
ctx.needsCompaction = true
}
})
const handleEvent = Effect.fnUntraced(function* (value: StreamEvent) {
switch (value.type) {
case "request-start":
@@ -581,66 +647,7 @@ export const layer: Layer.Layer<
return
case "step-finish": {
const completedSnapshot = yield* snapshot.track()
yield* Effect.forEach(Object.keys(ctx.reasoningMap), finishReasoning)
const usage = Session.getUsage({
model: ctx.model,
usage: value.usage ?? new Usage({}),
metadata: value.providerMetadata,
})
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) {
yield* sync.run(SessionEvent.Step.Ended.Sync, {
sessionID: ctx.sessionID,
finish: value.reason,
cost: usage.cost,
tokens: usage.tokens,
snapshot: completedSnapshot,
timestamp: DateTime.makeUnsafe(Date.now()),
})
}
}
ctx.assistantMessage.finish = value.reason
ctx.assistantMessage.cost += usage.cost
ctx.assistantMessage.tokens = usage.tokens
yield* session.updatePart({
id: PartID.ascending(),
reason: value.reason,
snapshot: completedSnapshot,
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "step-finish",
tokens: usage.tokens,
cost: usage.cost,
})
yield* session.updateMessage(ctx.assistantMessage)
if (ctx.snapshot) {
const patch = yield* snapshot.patch(ctx.snapshot)
if (patch.files.length) {
yield* session.updatePart({
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.sessionID,
type: "patch",
hash: patch.hash,
files: patch.files,
})
}
ctx.snapshot = undefined
}
yield* summary
.summarize({
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.parentID,
})
.pipe(Effect.ignore, Effect.forkIn(scope))
if (
!ctx.assistantMessage.summary &&
isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model })
) {
ctx.needsCompaction = true
}
yield* finishStep(value)
return
}
@@ -667,7 +674,17 @@ export const layer: Layer.Layer<
return
case "text-delta":
if (!ctx.currentText) return
if (!ctx.currentText) {
ctx.currentText = {
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "text",
text: "",
time: { start: Date.now() },
}
yield* session.updatePart(ctx.currentText)
}
ctx.currentText.text += value.text
yield* session.updatePartDelta({
sessionID: ctx.currentText.sessionID,
@@ -711,8 +728,8 @@ export const layer: Layer.Layer<
return
case "request-finish":
if (!ctx.assistantMessage.finish) yield* finishStep(value)
return
}
})