mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-18 10:07:58 +00:00
fix(opencode): own effect sqlite lifecycle in layers
This commit is contained in:
@@ -52,6 +52,7 @@ import { SessionShare } from "@/share/session"
|
||||
import { SyncEvent } from "@/sync"
|
||||
import { Npm } from "@opencode-ai/core/npm"
|
||||
import { memoMap } from "@opencode-ai/core/effect/memo-map"
|
||||
import { lazy } from "@/util/lazy"
|
||||
|
||||
export const AppLayer = Layer.mergeAll(
|
||||
Npm.defaultLayer,
|
||||
@@ -127,5 +128,7 @@ export const AppRuntime: Runtime = {
|
||||
runCallback(effect) {
|
||||
return rt.runCallback(wrap(effect))
|
||||
},
|
||||
dispose: () => rt.dispose(),
|
||||
async dispose() {
|
||||
await rt.dispose()
|
||||
},
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import { Vcs } from "@/project/vcs"
|
||||
import { Snapshot } from "@/snapshot"
|
||||
import { Bus } from "@/bus"
|
||||
import { Config } from "@/config/config"
|
||||
import { lazy } from "@/util/lazy"
|
||||
import * as Observability from "@opencode-ai/core/effect/observability"
|
||||
import { memoMap } from "@opencode-ai/core/effect/memo-map"
|
||||
|
||||
@@ -26,4 +27,20 @@ export const BootstrapLayer = Layer.mergeAll(
|
||||
Bus.defaultLayer,
|
||||
).pipe(Layer.provide(Observability.layer))
|
||||
|
||||
export const BootstrapRuntime = ManagedRuntime.make(BootstrapLayer, { memoMap })
|
||||
const rt = lazy(() => ManagedRuntime.make(BootstrapLayer, { memoMap }))
|
||||
type Runtime = Pick<ReturnType<typeof rt>, "runPromise" | "dispose">
|
||||
|
||||
export const BootstrapRuntime: Runtime = {
|
||||
runPromise(effect, options) {
|
||||
return rt().runPromise(effect, options)
|
||||
},
|
||||
async dispose() {
|
||||
const current = rt.peek()
|
||||
if (!current) return
|
||||
try {
|
||||
await current.dispose()
|
||||
} finally {
|
||||
if (rt.peek() === current) rt.reset()
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1,24 +1,21 @@
|
||||
import { Database } from "@/storage/db"
|
||||
import * as StorageSchema from "@/storage/schema"
|
||||
import { Context, Layer } from "effect"
|
||||
import { Context, Effect, Layer } from "effect"
|
||||
import { drizzle, type EffectSQLiteDatabase } from "@opencode-ai/effect-drizzle-sqlite"
|
||||
|
||||
const schema = { ...StorageSchema }
|
||||
|
||||
export class Service extends Context.Service<Service, EffectSQLiteDatabase<typeof schema>>()("@opencode/DatabaseEffect") {}
|
||||
|
||||
export const layer = Layer.sync(Service, () => {
|
||||
let current: EffectSQLiteDatabase<typeof schema> | undefined
|
||||
|
||||
return new Proxy({} as EffectSQLiteDatabase<typeof schema>, {
|
||||
get(_target, property) {
|
||||
const client = Database.Client().$client
|
||||
if (current?.$client !== client) current = drizzle({ client, schema })
|
||||
|
||||
const value = Reflect.get(current, property)
|
||||
return typeof value === "function" ? value.bind(current) : value
|
||||
},
|
||||
})
|
||||
})
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.acquireRelease(
|
||||
Effect.sync(() => {
|
||||
const lease = Database.acquire()
|
||||
return { lease, db: drizzle({ client: lease.client.$client, schema }) }
|
||||
}),
|
||||
(value) => Effect.sync(() => value.lease.release()),
|
||||
).pipe(Effect.map((value) => value.db)),
|
||||
)
|
||||
|
||||
export * as DatabaseEffect from "./db-effect"
|
||||
|
||||
@@ -44,7 +44,7 @@ export const Path = iife(() => {
|
||||
|
||||
export type Transaction = SQLiteTransaction<"sync", void>
|
||||
|
||||
type Client = SQLiteBunDatabase
|
||||
export type Client = SQLiteBunDatabase
|
||||
|
||||
type Journal = { sql: string; timestamp: number; name: string }[]
|
||||
|
||||
@@ -88,7 +88,7 @@ function migrations(dir: string): Journal {
|
||||
return sql.sort((a, b) => a.timestamp - b.timestamp)
|
||||
}
|
||||
|
||||
export const Client = lazy(() => {
|
||||
export function open() {
|
||||
log.info("opening database", { path: Path })
|
||||
|
||||
const db = init(Path)
|
||||
@@ -119,12 +119,42 @@ export const Client = lazy(() => {
|
||||
}
|
||||
|
||||
return db
|
||||
})
|
||||
}
|
||||
|
||||
export function close() {
|
||||
if (!Client.loaded()) return
|
||||
Client().$client.close()
|
||||
Client.reset()
|
||||
export const Client = lazy(open)
|
||||
|
||||
let layerRefs = 0
|
||||
let layerOwner: Client | undefined
|
||||
|
||||
export function acquire() {
|
||||
const owner = Client.peek() === undefined
|
||||
const client = Client()
|
||||
if (owner) layerOwner = client
|
||||
layerRefs++
|
||||
|
||||
let released = false
|
||||
return {
|
||||
client,
|
||||
release() {
|
||||
if (released) return
|
||||
released = true
|
||||
layerRefs--
|
||||
if (layerRefs === 0 && layerOwner === client) {
|
||||
layerOwner = undefined
|
||||
close(client)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
export function close(client = Client.peek()) {
|
||||
if (!client) return
|
||||
client.$client.close()
|
||||
if (Client.peek() === client) {
|
||||
layerRefs = 0
|
||||
layerOwner = undefined
|
||||
Client.reset()
|
||||
}
|
||||
}
|
||||
|
||||
export type TxOrDb = Transaction | Client
|
||||
@@ -140,7 +170,8 @@ export function use<T>(callback: (trx: TxOrDb) => T): T {
|
||||
} catch (err) {
|
||||
if (err instanceof LocalContext.NotFound) {
|
||||
const effects: (() => void | Promise<void>)[] = []
|
||||
const result = ctx.provide({ effects, tx: Client() }, () => callback(Client()))
|
||||
const client = Client()
|
||||
const result = ctx.provide({ effects, tx: client }, () => callback(client))
|
||||
for (const effect of effects) effect()
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -15,6 +15,6 @@ export function lazy<T>(fn: () => T) {
|
||||
}
|
||||
|
||||
result.loaded = () => loaded
|
||||
|
||||
result.peek = () => (loaded ? value : undefined)
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -131,7 +131,7 @@ describe("ShareNext", () => {
|
||||
Effect.gen(function* () {
|
||||
yield* seed("https://control.example.com", "org-1")
|
||||
|
||||
const req = yield* ShareNext.Service.use((svc) => svc.request()).pipe(Effect.provide(live(none)))
|
||||
const req = yield* ShareNext.Service.use((svc) => svc.request())
|
||||
|
||||
expect(req.api.create).toBe("/api/shares")
|
||||
expect(req.api.sync("shr_123")).toBe("/api/shares/shr_123/sync")
|
||||
@@ -142,33 +142,33 @@ describe("ShareNext", () => {
|
||||
authorization: "Bearer st_test_token",
|
||||
"x-org-id": "org-1",
|
||||
})
|
||||
}),
|
||||
}).pipe(Effect.provide(wired(none))),
|
||||
),
|
||||
)
|
||||
|
||||
it.live("create posts share, persists it, and returns the result", () =>
|
||||
provideTmpdirInstance(
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* Session.Service.use((svc) => svc.create({ title: "test" }))
|
||||
const seen: HttpClientRequest.HttpClientRequest[] = []
|
||||
const client = HttpClient.make((req) => {
|
||||
seen.push(req)
|
||||
if (req.url.endsWith("/api/share")) {
|
||||
return Effect.succeed(
|
||||
json(req, {
|
||||
id: "shr_abc",
|
||||
url: "https://legacy-share.example.com/share/abc",
|
||||
secret: "sec_123",
|
||||
}),
|
||||
)
|
||||
}
|
||||
return Effect.succeed(json(req, { ok: true }))
|
||||
})
|
||||
() => {
|
||||
const seen: HttpClientRequest.HttpClientRequest[] = []
|
||||
const client = HttpClient.make((req) => {
|
||||
seen.push(req)
|
||||
if (req.url.endsWith("/api/share")) {
|
||||
return Effect.succeed(
|
||||
json(req, {
|
||||
id: "shr_abc",
|
||||
url: "https://legacy-share.example.com/share/abc",
|
||||
secret: "sec_123",
|
||||
}),
|
||||
)
|
||||
}
|
||||
return Effect.succeed(json(req, { ok: true }))
|
||||
})
|
||||
|
||||
const result = yield* ShareNext.Service.use((svc) => svc.create(session.id)).pipe(
|
||||
Effect.provide(live(client)),
|
||||
)
|
||||
return Effect.gen(function* () {
|
||||
const sessions = yield* Session.Service
|
||||
const shareNext = yield* ShareNext.Service
|
||||
const session = yield* sessions.create({ title: "test" })
|
||||
const result = yield* shareNext.create(session.id)
|
||||
|
||||
expect(result.id).toBe("shr_abc")
|
||||
expect(result.url).toBe("https://legacy-share.example.com/share/abc")
|
||||
@@ -182,60 +182,61 @@ describe("ShareNext", () => {
|
||||
expect(seen).toHaveLength(1)
|
||||
expect(seen[0].method).toBe("POST")
|
||||
expect(seen[0].url).toBe("https://legacy-share.example.com/api/share")
|
||||
}),
|
||||
}).pipe(Effect.provide(wired(client)))
|
||||
},
|
||||
{ config: { enterprise: { url: "https://legacy-share.example.com" } } },
|
||||
),
|
||||
)
|
||||
|
||||
it.live("remove deletes the persisted share and calls the delete endpoint", () =>
|
||||
provideTmpdirInstance(
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* Session.Service.use((svc) => svc.create({ title: "test" }))
|
||||
const seen: HttpClientRequest.HttpClientRequest[] = []
|
||||
const client = HttpClient.make((req) => {
|
||||
seen.push(req)
|
||||
if (req.method === "POST") {
|
||||
return Effect.succeed(
|
||||
json(req, {
|
||||
id: "shr_abc",
|
||||
url: "https://legacy-share.example.com/share/abc",
|
||||
secret: "sec_123",
|
||||
}),
|
||||
)
|
||||
}
|
||||
return Effect.succeed(HttpClientResponse.fromWeb(req, new Response(null, { status: 200 })))
|
||||
})
|
||||
|
||||
yield* Effect.gen(function* () {
|
||||
yield* ShareNext.Service.use((svc) => svc.create(session.id))
|
||||
yield* ShareNext.Service.use((svc) => svc.remove(session.id))
|
||||
}).pipe(Effect.provide(live(client)))
|
||||
() => {
|
||||
const seen: HttpClientRequest.HttpClientRequest[] = []
|
||||
const client = HttpClient.make((req) => {
|
||||
seen.push(req)
|
||||
if (req.method === "POST") {
|
||||
return Effect.succeed(
|
||||
json(req, {
|
||||
id: "shr_abc",
|
||||
url: "https://legacy-share.example.com/share/abc",
|
||||
secret: "sec_123",
|
||||
}),
|
||||
)
|
||||
}
|
||||
return Effect.succeed(HttpClientResponse.fromWeb(req, new Response(null, { status: 200 })))
|
||||
})
|
||||
|
||||
return Effect.gen(function* () {
|
||||
const sessions = yield* Session.Service
|
||||
const shareNext = yield* ShareNext.Service
|
||||
const session = yield* sessions.create({ title: "test" })
|
||||
yield* shareNext.create(session.id)
|
||||
yield* shareNext.remove(session.id)
|
||||
expect(share(session.id)).toBeUndefined()
|
||||
expect(seen.map((req) => [req.method, req.url])).toEqual([
|
||||
["POST", "https://legacy-share.example.com/api/share"],
|
||||
["DELETE", "https://legacy-share.example.com/api/share/shr_abc"],
|
||||
])
|
||||
}),
|
||||
}).pipe(Effect.provide(wired(client)))
|
||||
},
|
||||
{ config: { enterprise: { url: "https://legacy-share.example.com" } } },
|
||||
),
|
||||
)
|
||||
|
||||
it.live("create fails on a non-ok response and does not persist a share", () =>
|
||||
provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* Session.Service.use((svc) => svc.create({ title: "test" }))
|
||||
const client = HttpClient.make((req) => Effect.succeed(json(req, { error: "bad" }, 500)))
|
||||
provideTmpdirInstance(() => {
|
||||
const client = HttpClient.make((req) => Effect.succeed(json(req, { error: "bad" }, 500)))
|
||||
|
||||
const exit = yield* ShareNext.Service.use((svc) => Effect.exit(svc.create(session.id))).pipe(
|
||||
Effect.provide(live(client)),
|
||||
)
|
||||
return Effect.gen(function* () {
|
||||
const sessions = yield* Session.Service
|
||||
const shareNext = yield* ShareNext.Service
|
||||
const session = yield* sessions.create({ title: "test" })
|
||||
const exit = yield* Effect.exit(shareNext.create(session.id))
|
||||
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
expect(share(session.id)).toBeUndefined()
|
||||
}),
|
||||
),
|
||||
}).pipe(Effect.provide(wired(client)))
|
||||
}),
|
||||
)
|
||||
|
||||
it.live("ShareNext coalesces rapid diff events into one delayed sync with latest data", () =>
|
||||
|
||||
Reference in New Issue
Block a user