From 03b60f7623946ad80f698319accc3b874930a63b Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 11 May 2026 19:42:38 -0400 Subject: [PATCH] fix(session): finish native request streams --- packages/opencode/src/session/processor.ts | 143 ++++++++++++--------- 1 file changed, 80 insertions(+), 63 deletions(-) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 2d78dc70e8..a207d4db74 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -293,12 +293,78 @@ export const layer: Layer.Layer< return { title: value.name, metadata: value.result.type === "json" && isRecord(value.result.value) ? value.result.value : {}, - output: typeof value.result.value === "string" ? value.result.value : (JSON.stringify(value.result.value) ?? ""), + output: + typeof value.result.value === "string" ? value.result.value : (JSON.stringify(value.result.value) ?? ""), } } const toolInput = (value: unknown): Record => (isRecord(value) ? value : { value }) + const finishStep = Effect.fn("SessionProcessor.finishStep")(function* ( + value: Extract, + ) { + const completedSnapshot = yield* snapshot.track() + yield* Effect.forEach(Object.keys(ctx.reasoningMap), finishReasoning) + const usage = Session.getUsage({ + model: ctx.model, + usage: value.usage ?? new Usage({}), + metadata: value.providerMetadata, + }) + if (!ctx.assistantMessage.summary) { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Step.Ended.Sync, { + sessionID: ctx.sessionID, + finish: value.reason, + cost: usage.cost, + tokens: usage.tokens, + snapshot: completedSnapshot, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } + } + ctx.assistantMessage.finish = value.reason + ctx.assistantMessage.cost += usage.cost + ctx.assistantMessage.tokens = usage.tokens + yield* session.updatePart({ + id: PartID.ascending(), + reason: value.reason, + snapshot: completedSnapshot, + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "step-finish", + tokens: usage.tokens, + cost: usage.cost, + }) + yield* session.updateMessage(ctx.assistantMessage) + if (ctx.snapshot) { + const patch = yield* snapshot.patch(ctx.snapshot) + if (patch.files.length) { + yield* session.updatePart({ + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, + type: "patch", + hash: patch.hash, + files: patch.files, + }) + } + ctx.snapshot = undefined + } + yield* summary + .summarize({ + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.parentID, + }) + .pipe(Effect.ignore, Effect.forkIn(scope)) + if ( + !ctx.assistantMessage.summary && + isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model }) + ) { + ctx.needsCompaction = true + } + }) + const handleEvent = Effect.fnUntraced(function* (value: StreamEvent) { switch (value.type) { case "request-start": @@ -581,66 +647,7 @@ export const layer: Layer.Layer< return case "step-finish": { - const completedSnapshot = yield* snapshot.track() - yield* Effect.forEach(Object.keys(ctx.reasoningMap), finishReasoning) - const usage = Session.getUsage({ - model: ctx.model, - usage: value.usage ?? new Usage({}), - metadata: value.providerMetadata, - }) - if (!ctx.assistantMessage.summary) { - // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { - yield* sync.run(SessionEvent.Step.Ended.Sync, { - sessionID: ctx.sessionID, - finish: value.reason, - cost: usage.cost, - tokens: usage.tokens, - snapshot: completedSnapshot, - timestamp: DateTime.makeUnsafe(Date.now()), - }) - } - } - ctx.assistantMessage.finish = value.reason - ctx.assistantMessage.cost += usage.cost - ctx.assistantMessage.tokens = usage.tokens - yield* session.updatePart({ - id: PartID.ascending(), - reason: value.reason, - snapshot: completedSnapshot, - messageID: ctx.assistantMessage.id, - sessionID: ctx.assistantMessage.sessionID, - type: "step-finish", - tokens: usage.tokens, - cost: usage.cost, - }) - yield* session.updateMessage(ctx.assistantMessage) - if (ctx.snapshot) { - const patch = yield* snapshot.patch(ctx.snapshot) - if (patch.files.length) { - yield* session.updatePart({ - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.sessionID, - type: "patch", - hash: patch.hash, - files: patch.files, - }) - } - ctx.snapshot = undefined - } - yield* summary - .summarize({ - sessionID: ctx.sessionID, - messageID: ctx.assistantMessage.parentID, - }) - .pipe(Effect.ignore, Effect.forkIn(scope)) - if ( - !ctx.assistantMessage.summary && - isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model }) - ) { - ctx.needsCompaction = true - } + yield* finishStep(value) return } @@ -667,7 +674,17 @@ export const layer: Layer.Layer< return case "text-delta": - if (!ctx.currentText) return + if (!ctx.currentText) { + ctx.currentText = { + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "text", + text: "", + time: { start: Date.now() }, + } + yield* session.updatePart(ctx.currentText) + } ctx.currentText.text += value.text yield* session.updatePartDelta({ sessionID: ctx.currentText.sessionID, @@ -711,8 +728,8 @@ export const layer: Layer.Layer< return case "request-finish": + if (!ctx.assistantMessage.finish) yield* finishStep(value) return - } })