diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 1290cc545f..2d78dc70e8 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -293,110 +293,12 @@ export const layer: Layer.Layer< return { title: value.name, metadata: value.result.type === "json" && isRecord(value.result.value) ? value.result.value : {}, - output: - typeof value.result.value === "string" ? value.result.value : (JSON.stringify(value.result.value) ?? ""), + output: typeof value.result.value === "string" ? value.result.value : (JSON.stringify(value.result.value) ?? ""), } } const toolInput = (value: unknown): Record => (isRecord(value) ? value : { value }) - const finishStep = Effect.fn("SessionProcessor.finishStep")(function* ( - value: Extract, - ) { - const completedSnapshot = yield* snapshot.track() - yield* Effect.forEach(Object.keys(ctx.reasoningMap), finishReasoning) - const usage = Session.getUsage({ - model: ctx.model, - usage: value.usage ?? new Usage({}), - metadata: value.providerMetadata, - }) - if (!ctx.assistantMessage.summary) { - // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { - yield* sync.run(SessionEvent.Step.Ended.Sync, { - sessionID: ctx.sessionID, - finish: value.reason, - cost: usage.cost, - tokens: usage.tokens, - snapshot: completedSnapshot, - timestamp: DateTime.makeUnsafe(Date.now()), - }) - } - } - ctx.assistantMessage.finish = value.reason - ctx.assistantMessage.cost += usage.cost - ctx.assistantMessage.tokens = usage.tokens - yield* session.updatePart({ - id: PartID.ascending(), - reason: value.reason, - snapshot: completedSnapshot, - messageID: ctx.assistantMessage.id, - sessionID: ctx.assistantMessage.sessionID, - type: "step-finish", - tokens: usage.tokens, - cost: usage.cost, - }) - yield* session.updateMessage(ctx.assistantMessage) - if (ctx.snapshot) { - const patch = yield* snapshot.patch(ctx.snapshot) - if (patch.files.length) { - yield* session.updatePart({ - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.sessionID, - type: "patch", - hash: patch.hash, - files: patch.files, - }) - } - ctx.snapshot = undefined - } - yield* summary - .summarize({ - sessionID: ctx.sessionID, - messageID: ctx.assistantMessage.parentID, - }) - .pipe(Effect.ignore, Effect.forkIn(scope)) - if ( - !ctx.assistantMessage.summary && - isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model }) - ) { - ctx.needsCompaction = true - } - }) - - const finishText = Effect.fn("SessionProcessor.finishText")(function* ( - providerMetadata?: Extract["providerMetadata"], - ) { - if (!ctx.currentText) return - // oxlint-disable-next-line no-self-assign -- reactivity trigger - ctx.currentText.text = ctx.currentText.text - ctx.currentText.text = (yield* plugin.trigger( - "experimental.text.complete", - { - sessionID: ctx.sessionID, - messageID: ctx.assistantMessage.id, - partID: ctx.currentText.id, - }, - { text: ctx.currentText.text }, - )).text - if (!ctx.assistantMessage.summary) { - // TODO(v2): Temporary dual-write while migrating session messages to v2 events. - if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { - yield* sync.run(SessionEvent.Text.Ended.Sync, { - sessionID: ctx.sessionID, - text: ctx.currentText.text, - timestamp: DateTime.makeUnsafe(Date.now()), - }) - } - } - const end = Date.now() - ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end } - if (providerMetadata) ctx.currentText.metadata = providerMetadata - yield* session.updatePart(ctx.currentText) - ctx.currentText = undefined - }) - const handleEvent = Effect.fnUntraced(function* (value: StreamEvent) { switch (value.type) { case "request-start": @@ -679,7 +581,66 @@ export const layer: Layer.Layer< return case "step-finish": { - yield* finishStep(value) + const completedSnapshot = yield* snapshot.track() + yield* Effect.forEach(Object.keys(ctx.reasoningMap), finishReasoning) + const usage = Session.getUsage({ + model: ctx.model, + usage: value.usage ?? new Usage({}), + metadata: value.providerMetadata, + }) + if (!ctx.assistantMessage.summary) { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Step.Ended.Sync, { + sessionID: ctx.sessionID, + finish: value.reason, + cost: usage.cost, + tokens: usage.tokens, + snapshot: completedSnapshot, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } + } + ctx.assistantMessage.finish = value.reason + ctx.assistantMessage.cost += usage.cost + ctx.assistantMessage.tokens = usage.tokens + yield* session.updatePart({ + id: PartID.ascending(), + reason: value.reason, + snapshot: completedSnapshot, + messageID: ctx.assistantMessage.id, + sessionID: ctx.assistantMessage.sessionID, + type: "step-finish", + tokens: usage.tokens, + cost: usage.cost, + }) + yield* session.updateMessage(ctx.assistantMessage) + if (ctx.snapshot) { + const patch = yield* snapshot.patch(ctx.snapshot) + if (patch.files.length) { + yield* session.updatePart({ + id: PartID.ascending(), + messageID: ctx.assistantMessage.id, + sessionID: ctx.sessionID, + type: "patch", + hash: patch.hash, + files: patch.files, + }) + } + ctx.snapshot = undefined + } + yield* summary + .summarize({ + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.parentID, + }) + .pipe(Effect.ignore, Effect.forkIn(scope)) + if ( + !ctx.assistantMessage.summary && + isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model }) + ) { + ctx.needsCompaction = true + } return } @@ -706,17 +667,7 @@ export const layer: Layer.Layer< return case "text-delta": - if (!ctx.currentText) { - ctx.currentText = { - id: PartID.ascending(), - messageID: ctx.assistantMessage.id, - sessionID: ctx.assistantMessage.sessionID, - type: "text", - text: "", - time: { start: Date.now() }, - } - yield* session.updatePart(ctx.currentText) - } + if (!ctx.currentText) return ctx.currentText.text += value.text yield* session.updatePartDelta({ sessionID: ctx.currentText.sessionID, @@ -728,13 +679,40 @@ export const layer: Layer.Layer< return case "text-end": - yield* finishText(value.providerMetadata) + if (!ctx.currentText) return + // oxlint-disable-next-line no-self-assign -- reactivity trigger + ctx.currentText.text = ctx.currentText.text + ctx.currentText.text = (yield* plugin.trigger( + "experimental.text.complete", + { + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.id, + partID: ctx.currentText.id, + }, + { text: ctx.currentText.text }, + )).text + if (!ctx.assistantMessage.summary) { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + if (Flag.OPENCODE_EXPERIMENTAL_EVENT_SYSTEM) { + yield* sync.run(SessionEvent.Text.Ended.Sync, { + sessionID: ctx.sessionID, + text: ctx.currentText.text, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } + } + { + const end = Date.now() + ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end } + } + if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata + yield* session.updatePart(ctx.currentText) + ctx.currentText = undefined return case "request-finish": - yield* finishText() - if (!ctx.assistantMessage.finish) yield* finishStep(value) return + } })