mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-13 23:52:06 +00:00
Effectify remaining compaction process tests (#26776)
This commit is contained in:
@@ -1,8 +1,7 @@
|
||||
import { afterEach, describe, expect, mock, test } from "bun:test"
|
||||
import { APICallError } from "ai"
|
||||
import { Cause, Deferred, Effect, Exit, Layer, ManagedRuntime } from "effect"
|
||||
import { Cause, Deferred, Effect, Exit, Fiber, Layer } from "effect"
|
||||
import * as Stream from "effect/Stream"
|
||||
import z from "zod"
|
||||
import { Bus } from "../../src/bus"
|
||||
import { Config } from "@/config/config"
|
||||
import { Image } from "@/image/image"
|
||||
@@ -10,11 +9,10 @@ import { Agent } from "../../src/agent/agent"
|
||||
import { LLM } from "../../src/session/llm"
|
||||
import { SessionCompaction } from "../../src/session/compaction"
|
||||
import { Token } from "@/util/token"
|
||||
import { WithInstance } from "../../src/project/with-instance"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import { Permission } from "../../src/permission"
|
||||
import { Plugin } from "../../src/plugin"
|
||||
import { provideTmpdirInstance, TestInstance, tmpdir } from "../fixture/fixture"
|
||||
import { provideTmpdirInstance, TestInstance } from "../fixture/fixture"
|
||||
import { Session as SessionNs } from "@/session/session"
|
||||
import { MessageV2 } from "../../src/session/message-v2"
|
||||
import { MessageID, PartID, SessionID } from "../../src/session/schema"
|
||||
@@ -32,26 +30,6 @@ import { TestConfig } from "../fixture/config"
|
||||
|
||||
void Log.init({ print: false })
|
||||
|
||||
function run<A, E>(fx: Effect.Effect<A, E, SessionNs.Service>) {
|
||||
return Effect.runPromise(fx.pipe(Effect.provide(SessionNs.defaultLayer)))
|
||||
}
|
||||
|
||||
const svc = {
|
||||
...SessionNs,
|
||||
create(input?: SessionNs.CreateInput) {
|
||||
return run(SessionNs.Service.use((svc) => svc.create(input)))
|
||||
},
|
||||
messages(input: z.output<typeof SessionNs.MessagesInput.zod>) {
|
||||
return run(SessionNs.Service.use((svc) => svc.messages(input)))
|
||||
},
|
||||
updateMessage<T extends MessageV2.Info>(msg: T) {
|
||||
return run(SessionNs.Service.use((svc) => svc.updateMessage(msg)))
|
||||
},
|
||||
updatePart<T extends MessageV2.Part>(part: T) {
|
||||
return run(SessionNs.Service.use((svc) => svc.updatePart(part)))
|
||||
},
|
||||
}
|
||||
|
||||
const summary = Layer.succeed(
|
||||
SessionSummary.Service,
|
||||
SessionSummary.Service.of({
|
||||
@@ -102,50 +80,6 @@ function createModel(opts: {
|
||||
|
||||
const wide = () => ProviderTest.fake({ model: createModel({ context: 100_000, output: 32_000 }) })
|
||||
|
||||
async function user(sessionID: SessionID, text: string) {
|
||||
const msg = await svc.updateMessage({
|
||||
id: MessageID.ascending(),
|
||||
role: "user",
|
||||
sessionID,
|
||||
agent: "build",
|
||||
model: ref,
|
||||
time: { created: Date.now() },
|
||||
})
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
messageID: msg.id,
|
||||
sessionID,
|
||||
type: "text",
|
||||
text,
|
||||
})
|
||||
return msg
|
||||
}
|
||||
|
||||
async function assistant(sessionID: SessionID, parentID: MessageID, root: string) {
|
||||
const msg: MessageV2.Assistant = {
|
||||
id: MessageID.ascending(),
|
||||
role: "assistant",
|
||||
sessionID,
|
||||
mode: "build",
|
||||
agent: "build",
|
||||
path: { cwd: root, root },
|
||||
cost: 0,
|
||||
tokens: {
|
||||
output: 0,
|
||||
input: 0,
|
||||
reasoning: 0,
|
||||
cache: { read: 0, write: 0 },
|
||||
},
|
||||
modelID: ref.modelID,
|
||||
providerID: ref.providerID,
|
||||
parentID,
|
||||
time: { created: Date.now() },
|
||||
finish: "end_turn",
|
||||
}
|
||||
await svc.updateMessage(msg)
|
||||
return msg
|
||||
}
|
||||
|
||||
function createUserMessage(sessionID: SessionID, text: string) {
|
||||
return Effect.gen(function* () {
|
||||
const ssn = yield* SessionNs.Service
|
||||
@@ -193,37 +127,40 @@ function createAssistantMessage(sessionID: SessionID, parentID: MessageID, root:
|
||||
)
|
||||
}
|
||||
|
||||
async function summaryAssistant(sessionID: SessionID, parentID: MessageID, root: string, text: string) {
|
||||
const msg: MessageV2.Assistant = {
|
||||
id: MessageID.ascending(),
|
||||
role: "assistant",
|
||||
sessionID,
|
||||
mode: "compaction",
|
||||
agent: "compaction",
|
||||
path: { cwd: root, root },
|
||||
cost: 0,
|
||||
tokens: {
|
||||
output: 0,
|
||||
input: 0,
|
||||
reasoning: 0,
|
||||
cache: { read: 0, write: 0 },
|
||||
},
|
||||
modelID: ref.modelID,
|
||||
providerID: ref.providerID,
|
||||
parentID,
|
||||
summary: true,
|
||||
time: { created: Date.now() },
|
||||
finish: "end_turn",
|
||||
}
|
||||
await svc.updateMessage(msg)
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
messageID: msg.id,
|
||||
sessionID,
|
||||
type: "text",
|
||||
text,
|
||||
})
|
||||
return msg
|
||||
function createSummaryAssistantMessage(sessionID: SessionID, parentID: MessageID, root: string, text: string) {
|
||||
return SessionNs.Service.use((ssn) =>
|
||||
Effect.gen(function* () {
|
||||
const msg = yield* ssn.updateMessage({
|
||||
id: MessageID.ascending(),
|
||||
role: "assistant",
|
||||
sessionID,
|
||||
mode: "compaction",
|
||||
agent: "compaction",
|
||||
path: { cwd: root, root },
|
||||
cost: 0,
|
||||
tokens: {
|
||||
output: 0,
|
||||
input: 0,
|
||||
reasoning: 0,
|
||||
cache: { read: 0, write: 0 },
|
||||
},
|
||||
modelID: ref.modelID,
|
||||
providerID: ref.providerID,
|
||||
parentID,
|
||||
summary: true,
|
||||
time: { created: Date.now() },
|
||||
finish: "end_turn",
|
||||
})
|
||||
yield* ssn.updatePart({
|
||||
id: PartID.ascending(),
|
||||
messageID: msg.id,
|
||||
sessionID,
|
||||
type: "text",
|
||||
text,
|
||||
})
|
||||
return msg
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
function createCompactionMarker(sessionID: SessionID) {
|
||||
@@ -248,10 +185,6 @@ function createCompactionMarker(sessionID: SessionID) {
|
||||
)
|
||||
}
|
||||
|
||||
async function createCompactionMarkerAsync(sessionID: SessionID) {
|
||||
return run(createCompactionMarker(sessionID))
|
||||
}
|
||||
|
||||
function fake(
|
||||
input: Parameters<SessionProcessorModule.SessionProcessor.Interface["create"]>[0],
|
||||
result: "continue" | "compact",
|
||||
@@ -283,26 +216,6 @@ function cfg(compaction?: Config.Info["compaction"]) {
|
||||
})
|
||||
}
|
||||
|
||||
function runtime(
|
||||
result: "continue" | "compact",
|
||||
plugin = Plugin.defaultLayer,
|
||||
provider = ProviderTest.fake(),
|
||||
config = Config.defaultLayer,
|
||||
) {
|
||||
const bus = Bus.layer
|
||||
return ManagedRuntime.make(
|
||||
Layer.mergeAll(SessionCompaction.layer, bus).pipe(
|
||||
Layer.provide(provider.layer),
|
||||
Layer.provide(SessionNs.defaultLayer),
|
||||
Layer.provide(layer(result)),
|
||||
Layer.provide(Agent.defaultLayer),
|
||||
Layer.provide(plugin),
|
||||
Layer.provide(bus),
|
||||
Layer.provide(config),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
const deps = Layer.mergeAll(
|
||||
wide().layer,
|
||||
layer("continue"),
|
||||
@@ -365,10 +278,6 @@ function readCompactionPart(sessionID: SessionID) {
|
||||
)
|
||||
}
|
||||
|
||||
async function lastCompactionPart(sessionID: SessionID) {
|
||||
return run(readCompactionPart(sessionID))
|
||||
}
|
||||
|
||||
function llm() {
|
||||
const queue: Array<
|
||||
Stream.Stream<LLM.Event, unknown> | ((input: LLM.StreamInput) => Stream.Stream<LLM.Event, unknown>)
|
||||
@@ -391,29 +300,6 @@ function llm() {
|
||||
}
|
||||
}
|
||||
|
||||
function liveRuntime(layer: Layer.Layer<LLM.Service>, provider = ProviderTest.fake(), config = Config.defaultLayer) {
|
||||
const bus = Bus.layer
|
||||
const status = SessionStatus.layer.pipe(Layer.provide(bus))
|
||||
const processor = SessionProcessorModule.SessionProcessor.layer.pipe(
|
||||
Layer.provide(summary),
|
||||
Layer.provide(Image.defaultLayer),
|
||||
)
|
||||
return ManagedRuntime.make(
|
||||
Layer.mergeAll(SessionCompaction.layer.pipe(Layer.provide(processor)), processor, bus, status).pipe(
|
||||
Layer.provide(provider.layer),
|
||||
Layer.provide(SessionNs.defaultLayer),
|
||||
Layer.provide(Snapshot.defaultLayer),
|
||||
Layer.provide(layer),
|
||||
Layer.provide(Permission.defaultLayer),
|
||||
Layer.provide(Agent.defaultLayer),
|
||||
Layer.provide(Plugin.defaultLayer),
|
||||
Layer.provide(status),
|
||||
Layer.provide(bus),
|
||||
Layer.provide(config),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
function reply(
|
||||
text: string,
|
||||
capture?: (input: LLM.StreamInput) => void,
|
||||
@@ -469,23 +355,14 @@ function reply(
|
||||
}
|
||||
}
|
||||
|
||||
function wait(ms = 50) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms))
|
||||
}
|
||||
|
||||
function defer() {
|
||||
let resolve!: () => void
|
||||
const promise = new Promise<void>((done) => {
|
||||
resolve = done
|
||||
})
|
||||
return { promise, resolve }
|
||||
}
|
||||
|
||||
function plugin(ready: ReturnType<typeof defer>) {
|
||||
function plugin(ready: Deferred.Deferred<void>) {
|
||||
return Layer.mock(Plugin.Service)({
|
||||
trigger: <Name extends string, Input, Output>(name: Name, _input: Input, output: Output) => {
|
||||
if (name !== "experimental.session.compacting") return Effect.succeed(output)
|
||||
return Effect.sync(() => ready.resolve()).pipe(Effect.andThen(Effect.never), Effect.as(output))
|
||||
return Effect.sync(() => Deferred.doneUnsafe(ready, Effect.void)).pipe(
|
||||
Effect.andThen(Effect.never),
|
||||
Effect.as(output),
|
||||
)
|
||||
},
|
||||
list: () => Effect.succeed([]),
|
||||
init: () => Effect.void,
|
||||
@@ -1315,154 +1192,99 @@ describe("session.compaction.process", () => {
|
||||
}),
|
||||
)
|
||||
|
||||
test("stops quickly when aborted during retry backoff", async () => {
|
||||
const stub = llm()
|
||||
const ready = defer()
|
||||
stub.push(
|
||||
Stream.fromAsyncIterable(
|
||||
{
|
||||
async *[Symbol.asyncIterator]() {
|
||||
yield { type: "start" } as LLM.Event
|
||||
throw new APICallError({
|
||||
message: "boom",
|
||||
url: "https://example.com/v1/chat/completions",
|
||||
requestBodyValues: {},
|
||||
statusCode: 503,
|
||||
responseHeaders: { "retry-after-ms": "10000" },
|
||||
responseBody: '{"error":"boom"}',
|
||||
isRetryable: true,
|
||||
})
|
||||
itProcess.instance(
|
||||
"stops quickly when aborted during retry backoff",
|
||||
() => {
|
||||
const stub = llm()
|
||||
stub.push(
|
||||
Stream.fromAsyncIterable(
|
||||
{
|
||||
async *[Symbol.asyncIterator]() {
|
||||
yield { type: "start" } as LLM.Event
|
||||
throw new APICallError({
|
||||
message: "boom",
|
||||
url: "https://example.com/v1/chat/completions",
|
||||
requestBodyValues: {},
|
||||
statusCode: 503,
|
||||
responseHeaders: { "retry-after-ms": "10000" },
|
||||
responseBody: '{"error":"boom"}',
|
||||
isRetryable: true,
|
||||
})
|
||||
},
|
||||
},
|
||||
},
|
||||
(err) => err,
|
||||
),
|
||||
)
|
||||
(err) => err,
|
||||
),
|
||||
)
|
||||
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await WithInstance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
const msg = await user(session.id, "hello")
|
||||
const msgs = await svc.messages({ sessionID: session.id })
|
||||
const abort = new AbortController()
|
||||
const rt = liveRuntime(stub.layer, wide())
|
||||
let off: (() => void) | undefined
|
||||
let run: Promise<"continue" | "stop"> | undefined
|
||||
try {
|
||||
off = await rt.runPromise(
|
||||
Bus.Service.use((svc) =>
|
||||
svc.subscribeCallback(SessionStatus.Event.Status, (evt) => {
|
||||
if (evt.properties.sessionID !== session.id) return
|
||||
if (evt.properties.status.type !== "retry") return
|
||||
ready.resolve()
|
||||
}),
|
||||
),
|
||||
)
|
||||
return Effect.gen(function* () {
|
||||
const ssn = yield* SessionNs.Service
|
||||
const bus = yield* Bus.Service
|
||||
const ready = yield* Deferred.make<void>()
|
||||
const session = yield* ssn.create({})
|
||||
const msg = yield* createUserMessage(session.id, "hello")
|
||||
const msgs = yield* ssn.messages({ sessionID: session.id })
|
||||
const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
|
||||
if (evt.properties.sessionID !== session.id) return
|
||||
if (evt.properties.status.type !== "retry") return
|
||||
Deferred.doneUnsafe(ready, Effect.void)
|
||||
})
|
||||
yield* Effect.addFinalizer(() => Effect.sync(off))
|
||||
|
||||
run = rt
|
||||
.runPromiseExit(
|
||||
SessionCompaction.Service.use((svc) =>
|
||||
svc.process({
|
||||
parentID: msg.id,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
}),
|
||||
),
|
||||
{ signal: abort.signal },
|
||||
)
|
||||
.then((exit) => {
|
||||
if (Exit.isFailure(exit)) {
|
||||
if (Cause.hasInterrupts(exit.cause) && abort.signal.aborted) return "stop"
|
||||
throw Cause.squash(exit.cause)
|
||||
}
|
||||
return exit.value
|
||||
})
|
||||
const fiber = yield* SessionCompaction.use
|
||||
.process({
|
||||
parentID: msg.id,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
})
|
||||
.pipe(Effect.forkChild)
|
||||
|
||||
await Promise.race([
|
||||
ready.promise,
|
||||
wait(1000).then(() => {
|
||||
throw new Error("timed out waiting for retry status")
|
||||
}),
|
||||
])
|
||||
yield* Deferred.await(ready).pipe(Effect.timeout("1 second"))
|
||||
const start = Date.now()
|
||||
yield* Fiber.interrupt(fiber)
|
||||
const exit = yield* Fiber.await(fiber).pipe(Effect.timeout("250 millis"))
|
||||
|
||||
const start = Date.now()
|
||||
abort.abort()
|
||||
const result = await Promise.race([
|
||||
run.then((value) => ({ kind: "done" as const, value, ms: Date.now() - start })),
|
||||
wait(250).then(() => ({ kind: "timeout" as const })),
|
||||
])
|
||||
|
||||
expect(result.kind).toBe("done")
|
||||
if (result.kind === "done") {
|
||||
expect(result.value).toBe("stop")
|
||||
expect(result.ms).toBeLessThan(250)
|
||||
}
|
||||
} finally {
|
||||
off?.()
|
||||
abort.abort()
|
||||
await rt.dispose()
|
||||
await run?.catch(() => undefined)
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
if (Exit.isFailure(exit)) {
|
||||
expect(Cause.hasInterrupts(exit.cause)).toBe(true)
|
||||
expect(Date.now() - start).toBeLessThan(250)
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
}).pipe(Effect.provide(compactionProcessLayer({ llm: stub.layer })))
|
||||
},
|
||||
{ git: true },
|
||||
)
|
||||
|
||||
test("does not leave a summary assistant when aborted before processor setup", async () => {
|
||||
const ready = defer()
|
||||
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await WithInstance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
const msg = await user(session.id, "hello")
|
||||
const msgs = await svc.messages({ sessionID: session.id })
|
||||
const abort = new AbortController()
|
||||
const rt = runtime("continue", plugin(ready), wide())
|
||||
let run: Promise<"continue" | "stop"> | undefined
|
||||
try {
|
||||
run = rt
|
||||
.runPromiseExit(
|
||||
SessionCompaction.Service.use((svc) =>
|
||||
svc.process({
|
||||
parentID: msg.id,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
}),
|
||||
),
|
||||
{ signal: abort.signal },
|
||||
)
|
||||
.then((exit) => {
|
||||
if (Exit.isFailure(exit)) {
|
||||
if (Cause.hasInterrupts(exit.cause) && abort.signal.aborted) return "stop"
|
||||
throw Cause.squash(exit.cause)
|
||||
}
|
||||
return exit.value
|
||||
itProcess.instance(
|
||||
"does not leave a summary assistant when aborted before processor setup",
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const ready = yield* Deferred.make<void>()
|
||||
return yield* Effect.gen(function* () {
|
||||
const ssn = yield* SessionNs.Service
|
||||
const session = yield* ssn.create({})
|
||||
const msg = yield* createUserMessage(session.id, "hello")
|
||||
const msgs = yield* ssn.messages({ sessionID: session.id })
|
||||
const fiber = yield* SessionCompaction.use
|
||||
.process({
|
||||
parentID: msg.id,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
})
|
||||
.pipe(Effect.forkChild)
|
||||
|
||||
await Promise.race([
|
||||
ready.promise,
|
||||
wait(1000).then(() => {
|
||||
throw new Error("timed out waiting for compaction hook")
|
||||
}),
|
||||
])
|
||||
yield* Deferred.await(ready).pipe(Effect.timeout("1 second"))
|
||||
yield* Fiber.interrupt(fiber)
|
||||
const exit = yield* Fiber.await(fiber).pipe(Effect.timeout("250 millis"))
|
||||
const all = yield* ssn.messages({ sessionID: session.id })
|
||||
|
||||
abort.abort()
|
||||
expect(await run).toBe("stop")
|
||||
|
||||
const all = await svc.messages({ sessionID: session.id })
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
if (Exit.isFailure(exit)) expect(Cause.hasInterrupts(exit.cause)).toBe(true)
|
||||
expect(all.some((msg) => msg.info.role === "assistant" && msg.info.summary)).toBe(false)
|
||||
} finally {
|
||||
abort.abort()
|
||||
await rt.dispose()
|
||||
await run?.catch(() => undefined)
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
}).pipe(Effect.provide(compactionProcessLayer({ plugin: plugin(ready) })))
|
||||
}),
|
||||
{ git: true },
|
||||
)
|
||||
|
||||
itProcess.instance(
|
||||
"does not allow tool calls while generating the summary",
|
||||
@@ -1533,240 +1355,172 @@ describe("session.compaction.process", () => {
|
||||
{ git: true },
|
||||
)
|
||||
|
||||
test("summarizes only the head while keeping recent tail out of summary input", async () => {
|
||||
const stub = llm()
|
||||
let captured = ""
|
||||
stub.push(
|
||||
reply("summary", (input) => {
|
||||
captured = JSON.stringify(input.messages)
|
||||
}),
|
||||
)
|
||||
itProcess.instance(
|
||||
"summarizes only the head while keeping recent tail out of summary input",
|
||||
() => {
|
||||
const stub = llm()
|
||||
let captured = ""
|
||||
stub.push(
|
||||
reply("summary", (input) => {
|
||||
captured = JSON.stringify(input.messages)
|
||||
}),
|
||||
)
|
||||
return Effect.gen(function* () {
|
||||
const ssn = yield* SessionNs.Service
|
||||
const session = yield* ssn.create({})
|
||||
yield* createUserMessage(session.id, "older context")
|
||||
yield* createUserMessage(session.id, "keep this turn")
|
||||
yield* createUserMessage(session.id, "and this one too")
|
||||
yield* createCompactionMarker(session.id)
|
||||
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await WithInstance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
await user(session.id, "older context")
|
||||
await user(session.id, "keep this turn")
|
||||
await user(session.id, "and this one too")
|
||||
await createCompactionMarkerAsync(session.id)
|
||||
const msgs = yield* ssn.messages({ sessionID: session.id })
|
||||
const parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
yield* SessionCompaction.use.process({
|
||||
parentID: parent!,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
})
|
||||
|
||||
const rt = liveRuntime(stub.layer, wide())
|
||||
try {
|
||||
const msgs = await svc.messages({ sessionID: session.id })
|
||||
const parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
await rt.runPromise(
|
||||
SessionCompaction.Service.use((svc) =>
|
||||
svc.process({
|
||||
parentID: parent!,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
}),
|
||||
),
|
||||
)
|
||||
expect(captured).toContain("older context")
|
||||
expect(captured).not.toContain("keep this turn")
|
||||
expect(captured).not.toContain("and this one too")
|
||||
expect(captured).not.toContain("What did we do so far?")
|
||||
}).pipe(Effect.provide(compactionProcessLayer({ llm: stub.layer })))
|
||||
},
|
||||
{ git: true },
|
||||
)
|
||||
|
||||
expect(captured).toContain("older context")
|
||||
expect(captured).not.toContain("keep this turn")
|
||||
expect(captured).not.toContain("and this one too")
|
||||
expect(captured).not.toContain("What did we do so far?")
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
itProcess.instance(
|
||||
"anchors repeated compactions with the previous summary",
|
||||
() => {
|
||||
const stub = llm()
|
||||
let captured = ""
|
||||
stub.push(reply("summary one"))
|
||||
stub.push(
|
||||
reply("summary two", (input) => {
|
||||
captured = JSON.stringify(input.messages)
|
||||
}),
|
||||
)
|
||||
|
||||
test("anchors repeated compactions with the previous summary", async () => {
|
||||
const stub = llm()
|
||||
let captured = ""
|
||||
stub.push(reply("summary one"))
|
||||
stub.push(
|
||||
reply("summary two", (input) => {
|
||||
captured = JSON.stringify(input.messages)
|
||||
}),
|
||||
)
|
||||
return Effect.gen(function* () {
|
||||
const ssn = yield* SessionNs.Service
|
||||
const session = yield* ssn.create({})
|
||||
yield* createUserMessage(session.id, "older context")
|
||||
yield* createUserMessage(session.id, "keep this turn")
|
||||
yield* createCompactionMarker(session.id)
|
||||
|
||||
await using tmp = await tmpdir({ git: true })
|
||||
await WithInstance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
await user(session.id, "older context")
|
||||
await user(session.id, "keep this turn")
|
||||
await createCompactionMarkerAsync(session.id)
|
||||
let msgs = yield* ssn.messages({ sessionID: session.id })
|
||||
let parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
yield* SessionCompaction.use.process({ parentID: parent!, messages: msgs, sessionID: session.id, auto: false })
|
||||
|
||||
const rt = liveRuntime(stub.layer, wide())
|
||||
try {
|
||||
let msgs = await svc.messages({ sessionID: session.id })
|
||||
let parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
await rt.runPromise(
|
||||
SessionCompaction.Service.use((svc) =>
|
||||
svc.process({
|
||||
parentID: parent!,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
}),
|
||||
),
|
||||
)
|
||||
yield* createUserMessage(session.id, "latest turn")
|
||||
yield* createCompactionMarker(session.id)
|
||||
|
||||
await user(session.id, "latest turn")
|
||||
await createCompactionMarkerAsync(session.id)
|
||||
msgs = MessageV2.filterCompacted(MessageV2.stream(session.id))
|
||||
parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
yield* SessionCompaction.use.process({ parentID: parent!, messages: msgs, sessionID: session.id, auto: false })
|
||||
|
||||
msgs = MessageV2.filterCompacted(MessageV2.stream(session.id))
|
||||
parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
await rt.runPromise(
|
||||
SessionCompaction.Service.use((svc) =>
|
||||
svc.process({
|
||||
parentID: parent!,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
}),
|
||||
),
|
||||
)
|
||||
expect(captured).toContain("<previous-summary>")
|
||||
expect(captured).toContain("summary one")
|
||||
expect(captured.match(/summary one/g)?.length).toBe(1)
|
||||
expect(captured).toContain("## Constraints & Preferences")
|
||||
expect(captured).toContain("## Progress")
|
||||
}).pipe(Effect.provide(compactionProcessLayer({ llm: stub.layer })))
|
||||
},
|
||||
{ git: true },
|
||||
)
|
||||
|
||||
expect(captured).toContain("<previous-summary>")
|
||||
expect(captured).toContain("summary one")
|
||||
expect(captured.match(/summary one/g)?.length).toBe(1)
|
||||
expect(captured).toContain("## Constraints & Preferences")
|
||||
expect(captured).toContain("## Progress")
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("keeps recent pre-compaction turns across repeated compactions", async () => {
|
||||
itProcess.instance("keeps recent pre-compaction turns across repeated compactions", () => {
|
||||
const stub = llm()
|
||||
stub.push(reply("summary one"))
|
||||
stub.push(reply("summary two"))
|
||||
await using tmp = await tmpdir()
|
||||
await WithInstance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
const u1 = await user(session.id, "one")
|
||||
const u2 = await user(session.id, "two")
|
||||
const u3 = await user(session.id, "three")
|
||||
await createCompactionMarkerAsync(session.id)
|
||||
|
||||
const rt = liveRuntime(stub.layer, wide(), cfg({ tail_turns: 2, preserve_recent_tokens: 10_000 }))
|
||||
try {
|
||||
let msgs = await svc.messages({ sessionID: session.id })
|
||||
let parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
await rt.runPromise(
|
||||
SessionCompaction.Service.use((svc) =>
|
||||
svc.process({
|
||||
parentID: parent!,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
}),
|
||||
),
|
||||
)
|
||||
return Effect.gen(function* () {
|
||||
const ssn = yield* SessionNs.Service
|
||||
const session = yield* ssn.create({})
|
||||
const u1 = yield* createUserMessage(session.id, "one")
|
||||
const u2 = yield* createUserMessage(session.id, "two")
|
||||
const u3 = yield* createUserMessage(session.id, "three")
|
||||
yield* createCompactionMarker(session.id)
|
||||
|
||||
const u4 = await user(session.id, "four")
|
||||
await createCompactionMarkerAsync(session.id)
|
||||
let msgs = yield* ssn.messages({ sessionID: session.id })
|
||||
let parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
yield* SessionCompaction.use.process({ parentID: parent!, messages: msgs, sessionID: session.id, auto: false })
|
||||
|
||||
msgs = MessageV2.filterCompacted(MessageV2.stream(session.id))
|
||||
parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
await rt.runPromise(
|
||||
SessionCompaction.Service.use((svc) =>
|
||||
svc.process({
|
||||
parentID: parent!,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
}),
|
||||
),
|
||||
)
|
||||
const u4 = yield* createUserMessage(session.id, "four")
|
||||
yield* createCompactionMarker(session.id)
|
||||
|
||||
const filtered = MessageV2.filterCompacted(MessageV2.stream(session.id))
|
||||
const ids = filtered.map((msg) => msg.info.id)
|
||||
msgs = MessageV2.filterCompacted(MessageV2.stream(session.id))
|
||||
parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
yield* SessionCompaction.use.process({ parentID: parent!, messages: msgs, sessionID: session.id, auto: false })
|
||||
|
||||
expect(ids).not.toContain(u1.id)
|
||||
expect(ids).not.toContain(u2.id)
|
||||
expect(ids).toContain(u3.id)
|
||||
expect(ids).toContain(u4.id)
|
||||
expect(filtered.some((msg) => msg.info.role === "assistant" && msg.info.summary)).toBe(true)
|
||||
expect(
|
||||
filtered.some((msg) => msg.info.role === "user" && msg.parts.some((part) => part.type === "compaction")),
|
||||
).toBe(true)
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
},
|
||||
})
|
||||
const filtered = MessageV2.filterCompacted(MessageV2.stream(session.id))
|
||||
const ids = filtered.map((msg) => msg.info.id)
|
||||
|
||||
expect(ids).not.toContain(u1.id)
|
||||
expect(ids).not.toContain(u2.id)
|
||||
expect(ids).toContain(u3.id)
|
||||
expect(ids).toContain(u4.id)
|
||||
expect(filtered.some((msg) => msg.info.role === "assistant" && msg.info.summary)).toBe(true)
|
||||
expect(
|
||||
filtered.some((msg) => msg.info.role === "user" && msg.parts.some((part) => part.type === "compaction")),
|
||||
).toBe(true)
|
||||
}).pipe(
|
||||
Effect.provide(
|
||||
compactionProcessLayer({ llm: stub.layer, config: cfg({ tail_turns: 2, preserve_recent_tokens: 10_000 }) }),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
test("ignores previous summaries when sizing the retained tail", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
await WithInstance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const session = await svc.create({})
|
||||
await user(session.id, "older")
|
||||
const keep = await user(session.id, "keep this turn")
|
||||
const keepReply = await assistant(session.id, keep.id, tmp.path)
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
messageID: keepReply.id,
|
||||
sessionID: session.id,
|
||||
type: "text",
|
||||
text: "keep reply",
|
||||
})
|
||||
itProcess.instance(
|
||||
"ignores previous summaries when sizing the retained tail",
|
||||
Effect.gen(function* () {
|
||||
const ssn = yield* SessionNs.Service
|
||||
const test = yield* TestInstance
|
||||
const session = yield* ssn.create({})
|
||||
yield* createUserMessage(session.id, "older")
|
||||
const keep = yield* createUserMessage(session.id, "keep this turn")
|
||||
const keepReply = yield* createAssistantMessage(session.id, keep.id, test.directory)
|
||||
yield* ssn.updatePart({
|
||||
id: PartID.ascending(),
|
||||
messageID: keepReply.id,
|
||||
sessionID: session.id,
|
||||
type: "text",
|
||||
text: "keep reply",
|
||||
})
|
||||
|
||||
await createCompactionMarkerAsync(session.id)
|
||||
const firstCompaction = (await svc.messages({ sessionID: session.id })).at(-1)?.info.id
|
||||
expect(firstCompaction).toBeTruthy()
|
||||
await summaryAssistant(session.id, firstCompaction!, tmp.path, "summary ".repeat(800))
|
||||
yield* createCompactionMarker(session.id)
|
||||
const firstCompaction = (yield* ssn.messages({ sessionID: session.id })).at(-1)?.info.id
|
||||
expect(firstCompaction).toBeTruthy()
|
||||
yield* createSummaryAssistantMessage(session.id, firstCompaction!, test.directory, "summary ".repeat(800))
|
||||
|
||||
const recent = await user(session.id, "recent turn")
|
||||
const recentReply = await assistant(session.id, recent.id, tmp.path)
|
||||
await svc.updatePart({
|
||||
id: PartID.ascending(),
|
||||
messageID: recentReply.id,
|
||||
sessionID: session.id,
|
||||
type: "text",
|
||||
text: "recent reply",
|
||||
})
|
||||
const recent = yield* createUserMessage(session.id, "recent turn")
|
||||
const recentReply = yield* createAssistantMessage(session.id, recent.id, test.directory)
|
||||
yield* ssn.updatePart({
|
||||
id: PartID.ascending(),
|
||||
messageID: recentReply.id,
|
||||
sessionID: session.id,
|
||||
type: "text",
|
||||
text: "recent reply",
|
||||
})
|
||||
|
||||
await createCompactionMarkerAsync(session.id)
|
||||
yield* createCompactionMarker(session.id)
|
||||
const msgs = yield* ssn.messages({ sessionID: session.id })
|
||||
const parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
yield* SessionCompaction.use.process({ parentID: parent!, messages: msgs, sessionID: session.id, auto: false })
|
||||
|
||||
const rt = runtime("continue", Plugin.defaultLayer, wide(), cfg({ tail_turns: 2, preserve_recent_tokens: 500 }))
|
||||
try {
|
||||
const msgs = await svc.messages({ sessionID: session.id })
|
||||
const parent = msgs.at(-1)?.info.id
|
||||
expect(parent).toBeTruthy()
|
||||
await rt.runPromise(
|
||||
SessionCompaction.Service.use((svc) =>
|
||||
svc.process({
|
||||
parentID: parent!,
|
||||
messages: msgs,
|
||||
sessionID: session.id,
|
||||
auto: false,
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
const part = await lastCompactionPart(session.id)
|
||||
expect(part?.type).toBe("compaction")
|
||||
expect(part?.tail_start_id).toBe(keep.id)
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
const part = yield* readCompactionPart(session.id)
|
||||
expect(part?.type).toBe("compaction")
|
||||
expect(part?.tail_start_id).toBe(keep.id)
|
||||
}).pipe(Effect.provide(compactionProcessLayer({ config: cfg({ tail_turns: 2, preserve_recent_tokens: 500 }) }))),
|
||||
)
|
||||
})
|
||||
|
||||
describe("util.token.estimate", () => {
|
||||
|
||||
Reference in New Issue
Block a user