refactor: simplify instance store concurrency

This commit is contained in:
Kit Langton
2026-05-01 12:04:13 -04:00
parent c565bd54e2
commit 8a63cbe79c
2 changed files with 261 additions and 73 deletions

View File

@@ -3,9 +3,7 @@ import { WorkspaceContext } from "@/control-plane/workspace-context"
import { disposeInstance } from "@/effect/instance-registry"
import { makeRuntime } from "@/effect/run-service"
import { AppFileSystem } from "@opencode-ai/core/filesystem"
import * as Log from "@opencode-ai/core/util/log"
import { Context, Effect, Layer } from "effect"
import { iife } from "@/util/iife"
import { Context, Deferred, Effect, Exit, Layer, Scope } from "effect"
import { context, type InstanceContext } from "./instance-context"
import * as Project from "./project"
@@ -25,13 +23,18 @@ export interface Interface {
export class Service extends Context.Service<Service, Interface>()("@opencode/InstanceStore") {}
interface Entry {
readonly deferred: Deferred.Deferred<InstanceContext>
}
export const layer: Layer.Layer<Service, never, Project.Service> = Layer.effect(
Service,
Effect.gen(function* () {
const project = yield* Project.Service
const cache = new Map<string, Promise<InstanceContext>>()
const scope = yield* Scope.Scope
const cache = new Map<string, Entry>()
const disposal = {
all: undefined as Promise<void> | undefined,
all: undefined as Deferred.Deferred<void> | undefined,
}
const boot = Effect.fn("InstanceStore.boot")(function* (input: LoadInput & { directory: string }) {
@@ -54,91 +57,128 @@ export const layer: Layer.Layer<Service, never, Project.Service> = Layer.effect(
return ctx
})
function track(directory: string, next: Promise<InstanceContext>) {
const task = next.catch((error) => {
if (cache.get(directory) === task) cache.delete(directory)
throw error
const removeEntry = (directory: string, entry: Entry) =>
Effect.sync(() => {
if (cache.get(directory) !== entry) return false
cache.delete(directory)
return true
})
cache.set(directory, task)
return task
}
const completeLoad = Effect.fnUntraced(function* (directory: string, input: LoadInput, entry: Entry) {
const exit = yield* Effect.exit(boot({ ...input, directory }))
if (Exit.isFailure(exit)) yield* removeEntry(directory, entry)
yield* Deferred.done(entry.deferred, exit).pipe(Effect.asVoid)
})
const emitDisposed = (input: { directory: string; project?: string }) =>
Effect.sync(() =>
GlobalBus.emit("event", {
directory: input.directory,
project: input.project,
workspace: WorkspaceContext.workspaceID,
payload: {
type: "server.instance.disposed",
properties: {
directory: input.directory,
},
},
}),
)
const disposeContext = Effect.fn("InstanceStore.disposeContext")(function* (ctx: InstanceContext) {
yield* Effect.logInfo("disposing instance", { directory: ctx.directory })
yield* Effect.promise(() => disposeInstance(ctx.directory))
yield* emitDisposed({ directory: ctx.directory, project: ctx.project.id })
})
const disposeEntry = Effect.fnUntraced(function* (directory: string, entry: Entry, ctx: InstanceContext) {
if (cache.get(directory) !== entry) return false
yield* disposeContext(ctx)
if (cache.get(directory) !== entry) return false
cache.delete(directory)
return true
})
const load = Effect.fn("InstanceStore.load")(function* (input: LoadInput) {
const directory = AppFileSystem.resolve(input.directory)
const existing = cache.get(directory)
if (existing) return yield* Effect.promise(() => existing)
return yield* Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
const existing = cache.get(directory)
if (existing) return yield* restore(Deferred.await(existing.deferred))
Log.Default.info("creating instance", { directory })
return yield* Effect.promise(() => track(directory, Effect.runPromise(boot({ ...input, directory }))))
const entry: Entry = { deferred: Deferred.makeUnsafe<InstanceContext>() }
cache.set(directory, entry)
yield* Effect.gen(function* () {
yield* Effect.logInfo("creating instance", { directory })
yield* completeLoad(directory, input, entry)
}).pipe(Effect.forkIn(scope, { startImmediately: true }))
return yield* restore(Deferred.await(entry.deferred))
}),
)
})
const reload = Effect.fn("InstanceStore.reload")(function* (input: LoadInput) {
const directory = AppFileSystem.resolve(input.directory)
Log.Default.info("reloading instance", { directory })
yield* Effect.promise(() => disposeInstance(directory))
cache.delete(directory)
const next = track(directory, Effect.runPromise(boot({ ...input, directory })))
GlobalBus.emit("event", {
directory,
project: input.project?.id,
workspace: WorkspaceContext.workspaceID,
payload: {
type: "server.instance.disposed",
properties: {
directory,
},
},
})
return yield* Effect.promise(() => next)
return yield* Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
const previous = cache.get(directory)
const entry: Entry = { deferred: Deferred.makeUnsafe<InstanceContext>() }
cache.set(directory, entry)
yield* Effect.gen(function* () {
yield* Effect.logInfo("reloading instance", { directory })
if (previous) yield* Deferred.await(previous.deferred).pipe(Effect.exit, Effect.asVoid)
yield* Effect.promise(() => disposeInstance(directory))
yield* emitDisposed({ directory, project: input.project?.id })
yield* completeLoad(directory, input, entry)
}).pipe(Effect.forkIn(scope, { startImmediately: true }))
return yield* restore(Deferred.await(entry.deferred))
}),
)
})
const dispose = Effect.fn("InstanceStore.dispose")(function* (ctx: InstanceContext) {
Log.Default.info("disposing instance", { directory: ctx.directory })
yield* Effect.promise(() => disposeInstance(ctx.directory))
cache.delete(ctx.directory)
const entry = cache.get(ctx.directory)
if (!entry) return yield* disposeContext(ctx)
GlobalBus.emit("event", {
directory: ctx.directory,
project: ctx.project.id,
workspace: WorkspaceContext.workspaceID,
payload: {
type: "server.instance.disposed",
properties: {
directory: ctx.directory,
},
},
})
const exit = yield* Deferred.await(entry.deferred).pipe(Effect.exit)
if (Exit.isFailure(exit)) return yield* removeEntry(ctx.directory, entry).pipe(Effect.asVoid)
if (exit.value !== ctx) return
yield* disposeEntry(ctx.directory, entry, ctx).pipe(Effect.asVoid)
})
const disposeAll = Effect.fn("InstanceStore.disposeAll")(function* () {
if (disposal.all) return yield* Effect.promise(() => disposal.all!)
return yield* Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
const existing = disposal.all
if (existing) return yield* restore(Deferred.await(existing))
disposal.all = iife(async () => {
Log.Default.info("disposing all instances")
const entries = [...cache.entries()]
for (const [key, value] of entries) {
if (cache.get(key) !== value) continue
const ctx = await value.catch((error) => {
Log.Default.warn("instance dispose failed", { key, error })
return undefined
})
if (!ctx) {
if (cache.get(key) === value) cache.delete(key)
continue
const done = Deferred.makeUnsafe<void>()
const entries = [...cache.entries()]
disposal.all = done
const exit = yield* Effect.gen(function* () {
yield* Effect.logInfo("disposing all instances")
yield* Effect.forEach(
entries,
(item) =>
Effect.gen(function* () {
const exit = yield* Deferred.await(item[1].deferred).pipe(Effect.exit)
if (Exit.isFailure(exit)) {
yield* Effect.logWarning("instance dispose failed", { key: item[0], cause: exit.cause })
yield* removeEntry(item[0], item[1])
return
}
yield* disposeEntry(item[0], item[1], exit.value)
}),
{ discard: true },
)
}).pipe(Effect.exit)
yield* Deferred.done(done, exit).pipe(Effect.asVoid)
if (disposal.all === done) {
disposal.all = undefined
}
if (cache.get(key) !== value) continue
await Effect.runPromise(dispose(ctx))
}
}).finally(() => {
disposal.all = undefined
})
return yield* Effect.promise(() => disposal.all!)
return yield* restore(Deferred.await(done))
}),
)
})
yield* Effect.addFinalizer(() => disposeAll().pipe(Effect.ignore))

View File

@@ -1,6 +1,7 @@
import { afterEach, describe, expect } from "bun:test"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { Effect, Layer } from "effect"
import { Effect, Fiber, Layer } from "effect"
import { registerDisposer } from "../../src/effect/instance-registry"
import { Instance } from "../../src/project/instance"
import { InstanceStore } from "../../src/project/instance-store"
import { tmpdirScoped } from "../fixture/fixture"
@@ -67,6 +68,153 @@ describe("InstanceStore", () => {
}),
)
it.live("dedupes concurrent loads while init is in flight", () =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped({ git: true })
const store = yield* InstanceStore.Service
const started = Promise.withResolvers<void>()
const release = Promise.withResolvers<void>()
let initialized = 0
const first = yield* store
.load({
directory: dir,
init: async () => {
initialized++
started.resolve()
await release.promise
},
})
.pipe(Effect.forkScoped)
yield* Effect.promise(() => started.promise)
const second = yield* store
.load({
directory: dir,
init: async () => {
initialized++
},
})
.pipe(Effect.forkScoped)
expect(initialized).toBe(1)
release.resolve()
const [firstCtx, secondCtx] = yield* Effect.all([Fiber.join(first), Fiber.join(second)])
expect(secondCtx).toBe(firstCtx)
expect(initialized).toBe(1)
}),
)
it.live("removes failed loads from the cache", () =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped({ git: true })
const store = yield* InstanceStore.Service
let attempts = 0
const failed = yield* store
.load({
directory: dir,
init: async () => {
attempts++
throw new Error("init failed")
},
})
.pipe(
Effect.as(false),
Effect.catchCause(() => Effect.succeed(true)),
)
expect(failed).toBe(true)
const ctx = yield* store.load({
directory: dir,
init: async () => {
attempts++
},
})
expect(ctx.directory).toBe(dir)
expect(attempts).toBe(2)
}),
)
it.live("reload replaces the cached context", () =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped({ git: true })
const store = yield* InstanceStore.Service
const first = yield* store.load({ directory: dir })
const second = yield* store.reload({ directory: dir })
const cached = yield* store.load({ directory: dir })
expect(second).not.toBe(first)
expect(cached).toBe(second)
}),
)
it.live("stale dispose does not delete an in-flight reload", () =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped({ git: true })
const store = yield* InstanceStore.Service
const reloading = Promise.withResolvers<void>()
const releaseReload = Promise.withResolvers<void>()
const disposed: Array<string> = []
const off = registerDisposer(async (directory) => {
disposed.push(directory)
})
yield* Effect.addFinalizer(() => Effect.sync(off))
const first = yield* store.load({ directory: dir })
const reload = yield* store
.reload({
directory: dir,
init: async () => {
reloading.resolve()
await releaseReload.promise
},
})
.pipe(Effect.forkScoped)
yield* Effect.promise(() => reloading.promise)
const staleDispose = yield* store.dispose(first).pipe(Effect.forkScoped)
releaseReload.resolve()
const second = yield* Fiber.join(reload)
yield* Fiber.join(staleDispose)
expect(disposed).toEqual([dir])
expect(yield* store.load({ directory: dir })).toBe(second)
}),
)
it.live("dedupes concurrent disposeAll calls", () =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped({ git: true })
const store = yield* InstanceStore.Service
const disposing = Promise.withResolvers<void>()
const releaseDispose = Promise.withResolvers<void>()
const disposed: Array<string> = []
const off = registerDisposer(async (directory) => {
disposed.push(directory)
disposing.resolve()
await releaseDispose.promise
})
yield* Effect.addFinalizer(() => Effect.sync(off))
yield* store.load({ directory: dir })
const first = yield* store.disposeAll().pipe(Effect.forkScoped)
yield* Effect.promise(() => disposing.promise)
const second = yield* store.disposeAll().pipe(Effect.forkScoped)
expect(disposed).toEqual([dir])
releaseDispose.resolve()
yield* Effect.all([Fiber.join(first), Fiber.join(second)])
expect(disposed).toEqual([dir])
}),
)
it.live("keeps Instance.provide as the legacy ALS wrapper", () =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped({ git: true })