diff --git a/packages/opencode/src/project/instance-store.ts b/packages/opencode/src/project/instance-store.ts index a1bdeffe05..e7c0f657c1 100644 --- a/packages/opencode/src/project/instance-store.ts +++ b/packages/opencode/src/project/instance-store.ts @@ -3,9 +3,7 @@ import { WorkspaceContext } from "@/control-plane/workspace-context" import { disposeInstance } from "@/effect/instance-registry" import { makeRuntime } from "@/effect/run-service" import { AppFileSystem } from "@opencode-ai/core/filesystem" -import * as Log from "@opencode-ai/core/util/log" -import { Context, Effect, Layer } from "effect" -import { iife } from "@/util/iife" +import { Context, Deferred, Effect, Exit, Layer, Scope } from "effect" import { context, type InstanceContext } from "./instance-context" import * as Project from "./project" @@ -25,13 +23,18 @@ export interface Interface { export class Service extends Context.Service()("@opencode/InstanceStore") {} +interface Entry { + readonly deferred: Deferred.Deferred +} + export const layer: Layer.Layer = Layer.effect( Service, Effect.gen(function* () { const project = yield* Project.Service - const cache = new Map>() + const scope = yield* Scope.Scope + const cache = new Map() const disposal = { - all: undefined as Promise | undefined, + all: undefined as Deferred.Deferred | undefined, } const boot = Effect.fn("InstanceStore.boot")(function* (input: LoadInput & { directory: string }) { @@ -54,91 +57,128 @@ export const layer: Layer.Layer = Layer.effect( return ctx }) - function track(directory: string, next: Promise) { - const task = next.catch((error) => { - if (cache.get(directory) === task) cache.delete(directory) - throw error + const removeEntry = (directory: string, entry: Entry) => + Effect.sync(() => { + if (cache.get(directory) !== entry) return false + cache.delete(directory) + return true }) - cache.set(directory, task) - return task - } + + const completeLoad = Effect.fnUntraced(function* (directory: string, input: LoadInput, entry: Entry) { + const exit = yield* Effect.exit(boot({ ...input, directory })) + if (Exit.isFailure(exit)) yield* removeEntry(directory, entry) + yield* Deferred.done(entry.deferred, exit).pipe(Effect.asVoid) + }) + + const emitDisposed = (input: { directory: string; project?: string }) => + Effect.sync(() => + GlobalBus.emit("event", { + directory: input.directory, + project: input.project, + workspace: WorkspaceContext.workspaceID, + payload: { + type: "server.instance.disposed", + properties: { + directory: input.directory, + }, + }, + }), + ) + + const disposeContext = Effect.fn("InstanceStore.disposeContext")(function* (ctx: InstanceContext) { + yield* Effect.logInfo("disposing instance", { directory: ctx.directory }) + yield* Effect.promise(() => disposeInstance(ctx.directory)) + yield* emitDisposed({ directory: ctx.directory, project: ctx.project.id }) + }) + + const disposeEntry = Effect.fnUntraced(function* (directory: string, entry: Entry, ctx: InstanceContext) { + if (cache.get(directory) !== entry) return false + yield* disposeContext(ctx) + if (cache.get(directory) !== entry) return false + cache.delete(directory) + return true + }) const load = Effect.fn("InstanceStore.load")(function* (input: LoadInput) { const directory = AppFileSystem.resolve(input.directory) - const existing = cache.get(directory) - if (existing) return yield* Effect.promise(() => existing) + return yield* Effect.uninterruptibleMask((restore) => + Effect.gen(function* () { + const existing = cache.get(directory) + if (existing) return yield* restore(Deferred.await(existing.deferred)) - Log.Default.info("creating instance", { directory }) - return yield* Effect.promise(() => track(directory, Effect.runPromise(boot({ ...input, directory })))) + const entry: Entry = { deferred: Deferred.makeUnsafe() } + cache.set(directory, entry) + yield* Effect.gen(function* () { + yield* Effect.logInfo("creating instance", { directory }) + yield* completeLoad(directory, input, entry) + }).pipe(Effect.forkIn(scope, { startImmediately: true })) + return yield* restore(Deferred.await(entry.deferred)) + }), + ) }) const reload = Effect.fn("InstanceStore.reload")(function* (input: LoadInput) { const directory = AppFileSystem.resolve(input.directory) - Log.Default.info("reloading instance", { directory }) - yield* Effect.promise(() => disposeInstance(directory)) - cache.delete(directory) - const next = track(directory, Effect.runPromise(boot({ ...input, directory }))) - - GlobalBus.emit("event", { - directory, - project: input.project?.id, - workspace: WorkspaceContext.workspaceID, - payload: { - type: "server.instance.disposed", - properties: { - directory, - }, - }, - }) - - return yield* Effect.promise(() => next) + return yield* Effect.uninterruptibleMask((restore) => + Effect.gen(function* () { + const previous = cache.get(directory) + const entry: Entry = { deferred: Deferred.makeUnsafe() } + cache.set(directory, entry) + yield* Effect.gen(function* () { + yield* Effect.logInfo("reloading instance", { directory }) + if (previous) yield* Deferred.await(previous.deferred).pipe(Effect.exit, Effect.asVoid) + yield* Effect.promise(() => disposeInstance(directory)) + yield* emitDisposed({ directory, project: input.project?.id }) + yield* completeLoad(directory, input, entry) + }).pipe(Effect.forkIn(scope, { startImmediately: true })) + return yield* restore(Deferred.await(entry.deferred)) + }), + ) }) const dispose = Effect.fn("InstanceStore.dispose")(function* (ctx: InstanceContext) { - Log.Default.info("disposing instance", { directory: ctx.directory }) - yield* Effect.promise(() => disposeInstance(ctx.directory)) - cache.delete(ctx.directory) + const entry = cache.get(ctx.directory) + if (!entry) return yield* disposeContext(ctx) - GlobalBus.emit("event", { - directory: ctx.directory, - project: ctx.project.id, - workspace: WorkspaceContext.workspaceID, - payload: { - type: "server.instance.disposed", - properties: { - directory: ctx.directory, - }, - }, - }) + const exit = yield* Deferred.await(entry.deferred).pipe(Effect.exit) + if (Exit.isFailure(exit)) return yield* removeEntry(ctx.directory, entry).pipe(Effect.asVoid) + if (exit.value !== ctx) return + yield* disposeEntry(ctx.directory, entry, ctx).pipe(Effect.asVoid) }) const disposeAll = Effect.fn("InstanceStore.disposeAll")(function* () { - if (disposal.all) return yield* Effect.promise(() => disposal.all!) + return yield* Effect.uninterruptibleMask((restore) => + Effect.gen(function* () { + const existing = disposal.all + if (existing) return yield* restore(Deferred.await(existing)) - disposal.all = iife(async () => { - Log.Default.info("disposing all instances") - const entries = [...cache.entries()] - for (const [key, value] of entries) { - if (cache.get(key) !== value) continue - - const ctx = await value.catch((error) => { - Log.Default.warn("instance dispose failed", { key, error }) - return undefined - }) - - if (!ctx) { - if (cache.get(key) === value) cache.delete(key) - continue + const done = Deferred.makeUnsafe() + const entries = [...cache.entries()] + disposal.all = done + const exit = yield* Effect.gen(function* () { + yield* Effect.logInfo("disposing all instances") + yield* Effect.forEach( + entries, + (item) => + Effect.gen(function* () { + const exit = yield* Deferred.await(item[1].deferred).pipe(Effect.exit) + if (Exit.isFailure(exit)) { + yield* Effect.logWarning("instance dispose failed", { key: item[0], cause: exit.cause }) + yield* removeEntry(item[0], item[1]) + return + } + yield* disposeEntry(item[0], item[1], exit.value) + }), + { discard: true }, + ) + }).pipe(Effect.exit) + yield* Deferred.done(done, exit).pipe(Effect.asVoid) + if (disposal.all === done) { + disposal.all = undefined } - - if (cache.get(key) !== value) continue - await Effect.runPromise(dispose(ctx)) - } - }).finally(() => { - disposal.all = undefined - }) - - return yield* Effect.promise(() => disposal.all!) + return yield* restore(Deferred.await(done)) + }), + ) }) yield* Effect.addFinalizer(() => disposeAll().pipe(Effect.ignore)) diff --git a/packages/opencode/test/project/instance.test.ts b/packages/opencode/test/project/instance.test.ts index 29d93555fe..064c23383c 100644 --- a/packages/opencode/test/project/instance.test.ts +++ b/packages/opencode/test/project/instance.test.ts @@ -1,6 +1,7 @@ import { afterEach, describe, expect } from "bun:test" import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" -import { Effect, Layer } from "effect" +import { Effect, Fiber, Layer } from "effect" +import { registerDisposer } from "../../src/effect/instance-registry" import { Instance } from "../../src/project/instance" import { InstanceStore } from "../../src/project/instance-store" import { tmpdirScoped } from "../fixture/fixture" @@ -67,6 +68,153 @@ describe("InstanceStore", () => { }), ) + it.live("dedupes concurrent loads while init is in flight", () => + Effect.gen(function* () { + const dir = yield* tmpdirScoped({ git: true }) + const store = yield* InstanceStore.Service + const started = Promise.withResolvers() + const release = Promise.withResolvers() + let initialized = 0 + + const first = yield* store + .load({ + directory: dir, + init: async () => { + initialized++ + started.resolve() + await release.promise + }, + }) + .pipe(Effect.forkScoped) + + yield* Effect.promise(() => started.promise) + + const second = yield* store + .load({ + directory: dir, + init: async () => { + initialized++ + }, + }) + .pipe(Effect.forkScoped) + + expect(initialized).toBe(1) + release.resolve() + + const [firstCtx, secondCtx] = yield* Effect.all([Fiber.join(first), Fiber.join(second)]) + expect(secondCtx).toBe(firstCtx) + expect(initialized).toBe(1) + }), + ) + + it.live("removes failed loads from the cache", () => + Effect.gen(function* () { + const dir = yield* tmpdirScoped({ git: true }) + const store = yield* InstanceStore.Service + let attempts = 0 + + const failed = yield* store + .load({ + directory: dir, + init: async () => { + attempts++ + throw new Error("init failed") + }, + }) + .pipe( + Effect.as(false), + Effect.catchCause(() => Effect.succeed(true)), + ) + + expect(failed).toBe(true) + + const ctx = yield* store.load({ + directory: dir, + init: async () => { + attempts++ + }, + }) + + expect(ctx.directory).toBe(dir) + expect(attempts).toBe(2) + }), + ) + + it.live("reload replaces the cached context", () => + Effect.gen(function* () { + const dir = yield* tmpdirScoped({ git: true }) + const store = yield* InstanceStore.Service + + const first = yield* store.load({ directory: dir }) + const second = yield* store.reload({ directory: dir }) + const cached = yield* store.load({ directory: dir }) + + expect(second).not.toBe(first) + expect(cached).toBe(second) + }), + ) + + it.live("stale dispose does not delete an in-flight reload", () => + Effect.gen(function* () { + const dir = yield* tmpdirScoped({ git: true }) + const store = yield* InstanceStore.Service + const reloading = Promise.withResolvers() + const releaseReload = Promise.withResolvers() + const disposed: Array = [] + const off = registerDisposer(async (directory) => { + disposed.push(directory) + }) + yield* Effect.addFinalizer(() => Effect.sync(off)) + + const first = yield* store.load({ directory: dir }) + const reload = yield* store + .reload({ + directory: dir, + init: async () => { + reloading.resolve() + await releaseReload.promise + }, + }) + .pipe(Effect.forkScoped) + + yield* Effect.promise(() => reloading.promise) + const staleDispose = yield* store.dispose(first).pipe(Effect.forkScoped) + releaseReload.resolve() + + const second = yield* Fiber.join(reload) + yield* Fiber.join(staleDispose) + + expect(disposed).toEqual([dir]) + expect(yield* store.load({ directory: dir })).toBe(second) + }), + ) + + it.live("dedupes concurrent disposeAll calls", () => + Effect.gen(function* () { + const dir = yield* tmpdirScoped({ git: true }) + const store = yield* InstanceStore.Service + const disposing = Promise.withResolvers() + const releaseDispose = Promise.withResolvers() + const disposed: Array = [] + const off = registerDisposer(async (directory) => { + disposed.push(directory) + disposing.resolve() + await releaseDispose.promise + }) + yield* Effect.addFinalizer(() => Effect.sync(off)) + + yield* store.load({ directory: dir }) + const first = yield* store.disposeAll().pipe(Effect.forkScoped) + yield* Effect.promise(() => disposing.promise) + const second = yield* store.disposeAll().pipe(Effect.forkScoped) + + expect(disposed).toEqual([dir]) + releaseDispose.resolve() + yield* Effect.all([Fiber.join(first), Fiber.join(second)]) + expect(disposed).toEqual([dir]) + }), + ) + it.live("keeps Instance.provide as the legacy ALS wrapper", () => Effect.gen(function* () { const dir = yield* tmpdirScoped({ git: true })