diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts index bbf79620c1..efed0a7071 100644 --- a/packages/opencode/src/control-plane/workspace.ts +++ b/packages/opencode/src/control-plane/workspace.ts @@ -1,19 +1,19 @@ import z from "zod" -import { setTimeout as sleep } from "node:timers/promises" -import { fn } from "@/util/fn" +import { AppFileSystem } from "@/filesystem" +import { makeRuntime } from "@/effect/run-service" import { Database, eq } from "@/storage/db" -import { Project } from "@/project/project" +import type { Project } from "@/project/project" import { BusEvent } from "@/bus/bus-event" import { GlobalBus } from "@/bus/global" import { SyncEvent } from "@/sync" import { Log } from "@/util/log" -import { Filesystem } from "@/util/filesystem" import { ProjectID } from "@/project/schema" import { WorkspaceTable } from "./workspace.sql" import { getAdaptor } from "./adaptors" import { WorkspaceInfo } from "./types" import { WorkspaceID } from "./schema" import { parseSSE } from "./sse" +import { Context, Effect, Layer, Scope } from "effect" export namespace Workspace { export const Info = WorkspaceInfo.meta({ @@ -56,174 +56,250 @@ export namespace Workspace { } } - const CreateInput = z.object({ + export const CreateInput = z.object({ id: WorkspaceID.zod.optional(), type: Info.shape.type, branch: Info.shape.branch, projectID: ProjectID.zod, extra: Info.shape.extra, }) - - export const create = fn(CreateInput, async (input) => { - const id = WorkspaceID.ascending(input.id) - const adaptor = await getAdaptor(input.type) - - const config = await adaptor.configure({ ...input, id, name: null, directory: null }) - - const info: Info = { - id, - type: config.type, - branch: config.branch ?? null, - name: config.name ?? null, - directory: config.directory ?? null, - extra: config.extra ?? null, - projectID: input.projectID, - } - - Database.use((db) => { - db.insert(WorkspaceTable) - .values({ - id: info.id, - type: info.type, - branch: info.branch, - name: info.name, - directory: info.directory, - extra: info.extra, - project_id: info.projectID, - }) - .run() - }) - - await adaptor.create(config) - - startSync(info) - - return info - }) - - export function list(project: Project.Info) { - const rows = Database.use((db) => - db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, project.id)).all(), - ) - const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id)) - for (const space of spaces) startSync(space) - return spaces - } - - export const get = fn(WorkspaceID.zod, async (id) => { - const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get()) - if (!row) return - const space = fromRow(row) - startSync(space) - return space - }) - - export const remove = fn(WorkspaceID.zod, async (id) => { - const row = Database.use((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get()) - if (row) { - stopSync(id) - - const info = fromRow(row) - const adaptor = await getAdaptor(row.type) - adaptor.remove(info) - Database.use((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run()) - return info - } - }) - - const connections = new Map() - const aborts = new Map() - - function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) { - const prev = connections.get(id) - if (prev?.status === status && prev?.error === error) return - const next = { workspaceID: id, status, error } - connections.set(id, next) - GlobalBus.emit("event", { - directory: "global", - workspace: id, - payload: { - type: Event.Status.type, - properties: next, - }, - }) - } - - export function status(): ConnectionStatus[] { - return [...connections.values()] - } + export type CreateInput = z.infer const log = Log.create({ service: "workspace-sync" }) - async function workspaceEventLoop(space: Info, signal: AbortSignal) { - log.info("starting sync: " + space.id) + export interface Interface { + readonly create: (input: CreateInput) => Effect.Effect + readonly list: (projectID: ProjectID) => Effect.Effect + readonly get: (id: WorkspaceID) => Effect.Effect + readonly remove: (id: WorkspaceID) => Effect.Effect + readonly status: () => Effect.Effect + } - while (!signal.aborted) { - log.info("connecting to sync: " + space.id) + export class Service extends Context.Service()("@opencode/Workspace") {} - setStatus(space.id, "connecting") - const adaptor = await getAdaptor(space.type) - const target = await adaptor.target(space) + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const fs = yield* AppFileSystem.Service + const scope = yield* Scope.Scope + const connections = new Map() + const aborts = new Map() - if (target.type === "local") return + yield* Effect.addFinalizer(() => + Effect.sync(() => { + for (const abort of aborts.values()) abort.abort() + }), + ) - const res = await fetch(target.url + "/sync/event", { method: "GET", signal }).catch((err: unknown) => { - setStatus(space.id, "error", String(err)) - return undefined + const db = (fn: (db: Parameters[0] extends (trx: infer D) => any ? D : never) => T) => + Effect.sync(() => Database.use(fn)) + + const setStatus = Effect.fnUntraced(function* ( + id: WorkspaceID, + status: ConnectionStatus["status"], + error?: string, + ) { + const prev = connections.get(id) + if (prev?.status === status && prev?.error === error) return + const next = { workspaceID: id, status, error } + connections.set(id, next) + GlobalBus.emit("event", { + directory: "global", + workspace: id, + payload: { + type: Event.Status.type, + properties: next, + }, + }) }) - if (!res || !res.ok || !res.body) { - log.info("failed to connect to sync: " + res?.status) - setStatus(space.id, "error", res ? `HTTP ${res.status}` : "no response") - await sleep(1000) - continue - } - setStatus(space.id, "connected") - await parseSSE(res.body, signal, (evt) => { - const event = evt as SyncEvent.SerializedEvent + const stopSync = Effect.fnUntraced(function* (id: WorkspaceID) { + aborts.get(id)?.abort() + aborts.delete(id) + connections.delete(id) + }) - try { - if (!event.type.startsWith("server.")) { - SyncEvent.replay(event) + const workspaceEventLoop = Effect.fn("Workspace.workspaceEventLoop")(function* ( + space: Info, + signal: AbortSignal, + ) { + log.info("starting sync: " + space.id) + + while (!signal.aborted) { + log.info("connecting to sync: " + space.id) + + yield* setStatus(space.id, "connecting") + const adaptor = yield* Effect.promise(() => getAdaptor(space.type)).pipe(Effect.orDie) + const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space))).pipe(Effect.orDie) + + if (target.type === "local") return + + const res = yield* Effect.tryPromise({ + try: () => fetch(target.url + "/sync/event", { method: "GET", signal }), + catch: (err) => String(err), + }).pipe( + Effect.catch((err) => setStatus(space.id, "error", err).pipe(Effect.as(undefined as Response | undefined))), + ) + + if (!res || !res.ok || !res.body) { + log.info("failed to connect to sync: " + res?.status) + yield* setStatus(space.id, "error", res ? `HTTP ${res.status}` : "no response") + yield* Effect.sleep("1 second") + continue } - } catch (err) { - log.warn("failed to replay sync event", { - workspaceID: space.id, - error: err, - }) + + const body = res.body + yield* setStatus(space.id, "connected") + yield* Effect.promise(() => + parseSSE(body, signal, (evt) => { + const event = evt as SyncEvent.SerializedEvent + + try { + if (!event.type.startsWith("server.")) { + SyncEvent.replay(event) + } + } catch (err) { + log.warn("failed to replay sync event", { + workspaceID: space.id, + error: err, + }) + } + }), + ).pipe(Effect.orDie) + + yield* setStatus(space.id, "disconnected") + log.info("disconnected to sync: " + space.id) + yield* Effect.sleep("250 millis") } }) - setStatus(space.id, "disconnected") - log.info("disconnected to sync: " + space.id) - await sleep(250) - } + + const startSync = Effect.fn("Workspace.startSync")(function* (space: Info) { + if (space.type === "worktree") { + yield* Effect.gen(function* () { + const exists = yield* fs.exists(space.directory!).pipe(Effect.orDie) + yield* setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist") + }).pipe(Effect.forkIn(scope)) + return + } + + if (aborts.has(space.id)) return + + const abort = new AbortController() + aborts.set(space.id, abort) + yield* setStatus(space.id, "disconnected") + yield* workspaceEventLoop(space, abort.signal).pipe( + Effect.catchCause((cause) => + Effect.gen(function* () { + yield* setStatus(space.id, "error", String(cause)) + yield* Effect.sync(() => { + log.warn("workspace sync listener failed", { + workspaceID: space.id, + error: cause, + }) + }) + }), + ), + Effect.forkIn(scope), + ) + }) + + const create = Effect.fn("Workspace.create")(function* (input: CreateInput) { + const id = WorkspaceID.ascending(input.id) + const adaptor = yield* Effect.promise(() => getAdaptor(input.type)).pipe(Effect.orDie) + const config = yield* Effect.promise(() => + Promise.resolve(adaptor.configure({ ...input, id, name: null, directory: null })), + ).pipe(Effect.orDie) + + const info: Info = { + id, + type: config.type, + branch: config.branch ?? null, + name: config.name ?? null, + directory: config.directory ?? null, + extra: config.extra ?? null, + projectID: input.projectID, + } + + yield* db((db) => { + db.insert(WorkspaceTable) + .values({ + id: info.id, + type: info.type, + branch: info.branch, + name: info.name, + directory: info.directory, + extra: info.extra, + project_id: info.projectID, + }) + .run() + }) + + yield* Effect.promise(() => adaptor.create(config)).pipe(Effect.orDie) + yield* startSync(info) + return info + }) + + const list = Effect.fn("Workspace.list")(function* (projectID: ProjectID) { + const rows = yield* db((db) => + db.select().from(WorkspaceTable).where(eq(WorkspaceTable.project_id, projectID)).all(), + ) + const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id)) + for (const space of spaces) { + yield* startSync(space) + } + return spaces + }) + + const get = Effect.fn("Workspace.get")(function* (id: WorkspaceID) { + const row = yield* db((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get()) + if (!row) return + const space = fromRow(row) + yield* startSync(space) + return space + }) + + const remove = Effect.fn("Workspace.remove")(function* (id: WorkspaceID) { + const row = yield* db((db) => db.select().from(WorkspaceTable).where(eq(WorkspaceTable.id, id)).get()) + if (!row) return + + yield* stopSync(id) + + const info = fromRow(row) + const adaptor = yield* Effect.promise(() => getAdaptor(row.type)).pipe(Effect.orDie) + yield* Effect.sync(() => { + void adaptor.remove(info) + }) + yield* db((db) => db.delete(WorkspaceTable).where(eq(WorkspaceTable.id, id)).run()) + return info + }) + + const status = Effect.fn("Workspace.status")(() => Effect.succeed([...connections.values()])) + + return Service.of({ create, list, get, remove, status }) + }), + ) + + export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer)) + + const { runPromise } = makeRuntime(Service, defaultLayer) + + export async function create(input: CreateInput) { + return runPromise((svc) => svc.create(input)) } - function startSync(space: Info) { - if (space.type === "worktree") { - void Filesystem.exists(space.directory!).then((exists) => { - setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist") - }) - return - } - - if (aborts.has(space.id)) return - const abort = new AbortController() - aborts.set(space.id, abort) - setStatus(space.id, "disconnected") - - void workspaceEventLoop(space, abort.signal).catch((error) => { - setStatus(space.id, "error", String(error)) - log.warn("workspace sync listener failed", { - workspaceID: space.id, - error, - }) - }) + export async function list(project: Project.Info) { + return runPromise((svc) => svc.list(project.id)) } - function stopSync(id: WorkspaceID) { - aborts.get(id)?.abort() - aborts.delete(id) - connections.delete(id) + export async function get(id: WorkspaceID) { + return runPromise((svc) => svc.get(id)) + } + + export async function remove(id: WorkspaceID) { + return runPromise((svc) => svc.remove(id)) + } + + export async function status() { + return runPromise((svc) => svc.status()) } } diff --git a/packages/opencode/src/server/routes/workspace.ts b/packages/opencode/src/server/routes/workspace.ts index 4193216541..1c86cd1a61 100644 --- a/packages/opencode/src/server/routes/workspace.ts +++ b/packages/opencode/src/server/routes/workspace.ts @@ -28,7 +28,7 @@ export const WorkspaceRoutes = lazy(() => }), validator( "json", - Workspace.create.schema.omit({ + Workspace.CreateInput.omit({ projectID: true, }), ), @@ -59,7 +59,7 @@ export const WorkspaceRoutes = lazy(() => }, }), async (c) => { - return c.json(Workspace.list(Instance.project)) + return c.json(await Workspace.list(Instance.project)) }, ) .get( @@ -80,8 +80,8 @@ export const WorkspaceRoutes = lazy(() => }, }), async (c) => { - const ids = new Set(Workspace.list(Instance.project).map((item) => item.id)) - return c.json(Workspace.status().filter((item) => ids.has(item.workspaceID))) + const ids = new Set((await Workspace.list(Instance.project)).map((item) => item.id)) + return c.json((await Workspace.status()).filter((item) => ids.has(item.workspaceID))) }, ) .delete(