diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 8c5af8bde6..be42bf807c 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -7,13 +7,12 @@ export namespace Bus { type Subscription = (event: any) => void const state = Instance.state(() => { - const subscriptions = new Map() + const subscriptions = new Map() - return { - subscriptions, - } - }, - ) + return { + subscriptions, + } + }) export type EventDefinition = ReturnType diff --git a/packages/opencode/src/project/project.ts b/packages/opencode/src/project/project.ts index cd928362a8..c61a5ca61d 100644 --- a/packages/opencode/src/project/project.ts +++ b/packages/opencode/src/project/project.ts @@ -2,7 +2,7 @@ import z from "zod" import { Filesystem } from "../util/filesystem" import path from "path" import { $ } from "bun" -import { StorageNext } from "../storage/storage-next" +import { Storage } from "../storage/storage" import { Log } from "../util/log" export namespace Project { @@ -37,7 +37,7 @@ export namespace Project { created: Date.now(), }, } - await StorageNext.write(["project", "global"], project) + await Storage.write(["project", "global"], project) return project } let worktree = path.dirname(git) @@ -69,7 +69,7 @@ export namespace Project { created: Date.now(), }, } - await StorageNext.write(["project", id], project) + await Storage.write(["project", id], project) return project } if (cache.has(directory)) { @@ -81,13 +81,13 @@ export namespace Project { } export async function setInitialized(projectID: string) { - await StorageNext.update(["project", projectID], (draft) => { + await Storage.update(["project", projectID], (draft) => { draft.time.initialized = Date.now() }) } export async function list() { - const keys = await StorageNext.list(["project"]) - return await Promise.all(keys.map((x) => StorageNext.read(x))) + const keys = await Storage.list(["project"]) + return await Promise.all(keys.map((x) => Storage.read(x))) } } diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index 3acde74710..4d3a4d2d0f 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -31,7 +31,7 @@ import { ProviderTransform } from "../provider/transform" import type { ModelsDev } from "../provider/models" import { Share } from "../share/share" import { Snapshot } from "../snapshot" -import { StorageNext } from "../storage/storage-next" +import { Storage } from "../storage/storage" import { Log } from "../util/log" import { NamedError } from "../util/error" import { SystemPrompt } from "./system" @@ -187,7 +187,7 @@ export namespace Session { }, } log.info("created", result) - await StorageNext.write(["session", Instance.project.id, result.id], result) + await Storage.write(["session", Instance.project.id, result.id], result) const cfg = await Config.get() if (!result.parentID && (Flag.OPENCODE_AUTO_SHARE || cfg.share === "auto")) share(result.id) @@ -206,12 +206,12 @@ export namespace Session { } export async function get(id: string) { - const read = await StorageNext.read(["session", Instance.project.id, id]) + const read = await Storage.read(["session", Instance.project.id, id]) return read as Info } export async function getShare(id: string) { - return StorageNext.read(["share", id]) + return Storage.read(["share", id]) } export async function share(id: string) { @@ -228,7 +228,7 @@ export namespace Session { url: share.url, } }) - await StorageNext.write(["share", id], share) + await Storage.write(["share", id], share) await Share.sync("session/info/" + id, session) for (const msg of await messages(id)) { await Share.sync("session/message/" + id + "/" + msg.info.id, msg.info) @@ -242,7 +242,7 @@ export namespace Session { export async function unshare(id: string) { const share = await getShare(id) if (!share) return - await StorageNext.remove(["share", id]) + await Storage.remove(["share", id]) await update(id, (draft) => { draft.share = undefined }) @@ -251,7 +251,7 @@ export namespace Session { export async function update(id: string, editor: (session: Info) => void) { const project = Instance.project - const result = await StorageNext.update(["session", project.id, id], (draft) => { + const result = await Storage.update(["session", project.id, id], (draft) => { editor(draft) draft.time.updated = Date.now() }) @@ -266,8 +266,8 @@ export namespace Session { info: MessageV2.Info parts: MessageV2.Part[] }[] - for (const p of await StorageNext.list(["message", sessionID])) { - const read = await StorageNext.read(p) + for (const p of await Storage.list(["message", sessionID])) { + const read = await Storage.read(p) result.push({ info: read, parts: await getParts(read.id), @@ -279,15 +279,15 @@ export namespace Session { export async function getMessage(sessionID: string, messageID: string) { return { - info: await StorageNext.read(["message", sessionID, messageID]), + info: await Storage.read(["message", sessionID, messageID]), parts: await getParts(messageID), } } export async function getParts(messageID: string) { const result = [] as MessageV2.Part[] - for (const item of await StorageNext.list(["part", messageID])) { - const read = await StorageNext.read(item) + for (const item of await Storage.list(["part", messageID])) { + const read = await Storage.read(item) result.push(read) } result.sort((a, b) => (a.id > b.id ? 1 : -1)) @@ -296,16 +296,16 @@ export namespace Session { export async function* list() { const project = Instance.project - for (const item of await StorageNext.list(["session", project.id])) { - yield StorageNext.read(item) + for (const item of await Storage.list(["session", project.id])) { + yield Storage.read(item) } } export async function children(parentID: string) { const project = Instance.project const result = [] as Session.Info[] - for (const item of await StorageNext.list(["session", project.id])) { - const session = await StorageNext.read(item) + for (const item of await Storage.list(["session", project.id])) { + const session = await Storage.read(item) if (session.parentID !== parentID) continue result.push(session) } @@ -332,13 +332,13 @@ export namespace Session { await remove(child.id, false) } await unshare(sessionID).catch(() => {}) - for (const msg of await StorageNext.list(["message", sessionID])) { - for (const part of await StorageNext.list(["part", msg.at(-1)!])) { - await StorageNext.remove(part) + for (const msg of await Storage.list(["message", sessionID])) { + for (const part of await Storage.list(["part", msg.at(-1)!])) { + await Storage.remove(part) } - await StorageNext.remove(msg) + await Storage.remove(msg) } - await StorageNext.remove(["session", project.id, sessionID]) + await Storage.remove(["session", project.id, sessionID]) if (emitEvent) { Bus.publish(Event.Deleted, { info: session, @@ -350,14 +350,14 @@ export namespace Session { } async function updateMessage(msg: MessageV2.Info) { - await StorageNext.write(["message", msg.sessionID, msg.id], msg) + await Storage.write(["message", msg.sessionID, msg.id], msg) Bus.publish(MessageV2.Event.Updated, { info: msg, }) } async function updatePart(part: MessageV2.Part) { - await StorageNext.write(["part", part.messageID, part.id], part) + await Storage.write(["part", part.messageID, part.id], part) Bus.publish(MessageV2.Event.PartUpdated, { part, }) @@ -425,7 +425,7 @@ export namespace Session { const [preserve, remove] = splitWhen(msgs, (x) => x.info.id === messageID) msgs = preserve for (const msg of remove) { - await StorageNext.remove(["message", input.sessionID, msg.info.id]) + await Storage.remove(["message", input.sessionID, msg.info.id]) await Bus.publish(MessageV2.Event.Removed, { sessionID: input.sessionID, messageID: msg.info.id }) } const last = preserve.at(-1) @@ -434,7 +434,7 @@ export namespace Session { const [preserveParts, removeParts] = splitWhen(last.parts, (x) => x.id === partID) last.parts = preserveParts for (const part of removeParts) { - await StorageNext.remove(["part", last.info.id, part.id]) + await Storage.remove(["part", last.info.id, part.id]) await Bus.publish(MessageV2.Event.PartRemoved, { sessionID: input.sessionID, messageID: last.info.id, @@ -787,7 +787,7 @@ export namespace Session { await updateMessage(assistantMsg) await using _ = defer(async () => { if (assistantMsg.time.completed) return - await StorageNext.remove(["session", "message", input.sessionID, assistantMsg.id]) + await Storage.remove(["session", "message", input.sessionID, assistantMsg.id]) await Bus.publish(MessageV2.Event.Removed, { sessionID: input.sessionID, messageID: assistantMsg.id }) }) const tools: Record = {} diff --git a/packages/opencode/src/share/share.ts b/packages/opencode/src/share/share.ts index f907be956e..9df862d59a 100644 --- a/packages/opencode/src/share/share.ts +++ b/packages/opencode/src/share/share.ts @@ -1,7 +1,7 @@ import { Bus } from "../bus" import { Installation } from "../installation" import { Session } from "../session" -import { StorageNext } from "../storage/storage-next" +import { MessageV2 } from "../session/message-v2" import { Log } from "../util/log" export namespace Share { @@ -45,7 +45,25 @@ export namespace Share { }) } - export function init() {} + export function init() { + Bus.subscribe(Session.Event.Updated, async (evt) => { + await sync("session/info/" + evt.properties.info.id, evt.properties.info) + }) + Bus.subscribe(MessageV2.Event.Updated, async (evt) => { + await sync("session/message/" + evt.properties.info.sessionID + "/" + evt.properties.info.id, evt.properties.info) + }) + Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { + await sync( + "session/part/" + + evt.properties.part.sessionID + + "/" + + evt.properties.part.messageID + + "/" + + evt.properties.part.id, + evt.properties.part, + ) + }) + } export const URL = process.env["OPENCODE_API"] ?? diff --git a/packages/opencode/src/storage/storage-next.ts b/packages/opencode/src/storage/storage-next.ts deleted file mode 100644 index 7992191a89..0000000000 --- a/packages/opencode/src/storage/storage-next.ts +++ /dev/null @@ -1,78 +0,0 @@ -import { Log } from "../util/log" -import path from "path" -import fs from "fs/promises" -import { Global } from "../global" -import { lazy } from "../util/lazy" -import { Lock } from "../util/lock" - -export namespace StorageNext { - const log = Log.create({ service: "storage" }) - - type Migration = (dir: string) => Promise - - const MIGRATIONS: Migration[] = [] - - const state = lazy(async () => { - const dir = path.join(Global.Path.data, "storage") - const migration = await Bun.file(path.join(dir, "migration")) - .json() - .then((x) => parseInt(x)) - .catch(() => 0) - for (let index = migration; index < MIGRATIONS.length; index++) { - log.info("running migration", { index }) - const migration = MIGRATIONS[index] - await migration(dir) - await Bun.write(path.join(dir, "migration"), (index + 1).toString()) - } - return { - dir, - } - }) - - export async function remove(key: string[]) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - await fs.unlink(target).catch(() => {}) - } - - export async function read(key: string[]) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - using _ = await Lock.read(target) - return Bun.file(target).json() as Promise - } - - export async function update(key: string[], fn: (draft: T) => void) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - using _ = await Lock.write("storage") - const content = await Bun.file(target).json() - fn(content) - await Bun.write(target, JSON.stringify(content, null, 2)) - return content as T - } - - export async function write(key: string[], content: T) { - const dir = await state().then((x) => x.dir) - const target = path.join(dir, ...key) + ".json" - using _ = await Lock.write("storage") - await Bun.write(target, JSON.stringify(content, null, 2)) - } - - const glob = new Bun.Glob("**/*") - export async function list(prefix: string[]) { - const dir = await state().then((x) => x.dir) - try { - const result = await Array.fromAsync( - glob.scan({ - cwd: path.join(dir, ...prefix), - onlyFiles: true, - }), - ).then((results) => results.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)])) - result.sort() - return result - } catch { - return [] - } - } -} diff --git a/packages/opencode/src/storage/storage.ts b/packages/opencode/src/storage/storage.ts new file mode 100644 index 0000000000..9580b7feb3 --- /dev/null +++ b/packages/opencode/src/storage/storage.ts @@ -0,0 +1,167 @@ +import { Log } from "../util/log" +import path from "path" +import fs from "fs/promises" +import { Global } from "../global" +import { lazy } from "../util/lazy" +import { Lock } from "../util/lock" +import { $ } from "bun" + +export namespace Storage { + const log = Log.create({ service: "storage" }) + + type Migration = (dir: string) => Promise + + const MIGRATIONS: Migration[] = [ + async (dir) => { + const project = path.resolve(dir, "../project") + for await (const projectDir of new Bun.Glob("*").scan({ cwd: project, onlyFiles: false })) { + let projectID = projectDir + const fullProjectDir = path.join(project, projectDir) + let worktree = "/" + + if (projectID !== "global") { + for await (const msgFile of new Bun.Glob("storage/session/message/*/*.json").scan({ + cwd: path.join(project, projectDir), + absolute: true, + })) { + const json = await Bun.file(msgFile).json() + worktree = json.path?.root + if (worktree) break + } + if (!worktree) continue + const [id] = await $`git rev-list --max-parents=0 --all` + .quiet() + .nothrow() + .cwd(worktree) + .text() + .then((x) => + x + .split("\n") + .filter(Boolean) + .map((x) => x.trim()) + .toSorted(), + ) + if (!id) continue + projectID = id + + await Bun.write( + path.join(dir, "project", projectID + ".json"), + JSON.stringify({ + id, + vcs: "git", + worktree, + time: { + created: Date.now(), + initialized: Date.now(), + }, + }), + ) + + for await (const sessionFile of new Bun.Glob("storage/session/info/*.json").scan({ + cwd: fullProjectDir, + absolute: true, + })) { + const dest = path.join(dir, "session", projectID, path.basename(sessionFile)) + log.info("copying", { + sessionFile, + dest, + }) + const session = await Bun.file(sessionFile).json() + await Bun.write(dest, JSON.stringify(session)) + for await (const msgFile of new Bun.Glob(`storage/session/message/${session.id}/*.json`).scan({ + cwd: fullProjectDir, + absolute: true, + })) { + const dest = path.join(dir, "message", session.id, path.basename(msgFile)) + log.info("copying", { + msgFile, + dest, + }) + const message = await Bun.file(msgFile).json() + await Bun.write(dest, JSON.stringify(message)) + + for await (const partFile of new Bun.Glob(`storage/session/part/${session.id}/${message.id}/*.json`).scan( + { + cwd: fullProjectDir, + absolute: true, + }, + )) { + const dest = path.join(dir, "part", message.id, path.basename(partFile)) + const part = await Bun.file(partFile).json() + log.info("copying", { + partFile, + dest, + }) + await Bun.write(dest, JSON.stringify(part)) + } + } + } + } + } + }, + ] + + const state = lazy(async () => { + const dir = path.join(Global.Path.data, "storage") + const migration = await Bun.file(path.join(dir, "migration")) + .json() + .then((x) => parseInt(x)) + .catch(() => 0) + for (let index = migration; index < MIGRATIONS.length; index++) { + log.info("running migration", { index }) + const migration = MIGRATIONS[index] + await migration(dir) + await Bun.write(path.join(dir, "migration"), (index + 1).toString()) + } + return { + dir, + } + }) + + export async function remove(key: string[]) { + const dir = await state().then((x) => x.dir) + const target = path.join(dir, ...key) + ".json" + await fs.unlink(target).catch(() => {}) + } + + export async function read(key: string[]) { + const dir = await state().then((x) => x.dir) + const target = path.join(dir, ...key) + ".json" + using _ = await Lock.read(target) + return Bun.file(target).json() as Promise + } + + export async function update(key: string[], fn: (draft: T) => void) { + const dir = await state().then((x) => x.dir) + const target = path.join(dir, ...key) + ".json" + using _ = await Lock.write("storage") + const content = await Bun.file(target).json() + fn(content) + await Bun.write(target, JSON.stringify(content, null, 2)) + return content as T + } + + export async function write(key: string[], content: T) { + const dir = await state().then((x) => x.dir) + const target = path.join(dir, ...key) + ".json" + using _ = await Lock.write("storage") + await Bun.write(target, JSON.stringify(content, null, 2)) + } + + const glob = new Bun.Glob("**/*") + export async function list(prefix: string[]) { + const dir = await state().then((x) => x.dir) + try { + const result = await Array.fromAsync( + glob.scan({ + cwd: path.join(dir, ...prefix), + onlyFiles: true, + }), + ).then((results) => results.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)])) + result.sort() + return result + } catch { + return [] + } + } +}