From af7ada9a7a8a5dea595e6ebb6bb76f1d096c6bd0 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Fri, 10 Apr 2026 23:34:38 -0400 Subject: [PATCH] refactor(sync): migrate SyncEvent to service layer --- packages/opencode/src/sync/index.ts | 399 +++++++++++++++++----------- 1 file changed, 237 insertions(+), 162 deletions(-) diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index a409391915..f836b64a3d 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -1,6 +1,8 @@ import z from "zod" import type { ZodObject } from "zod" import { EventEmitter } from "events" +import { Context, Effect, Layer } from "effect" +import { makeRuntime } from "@/effect/run-service" import { Database, eq } from "@/storage/db" import { Bus as ProjectBus } from "@/bus" import { BusEvent } from "@/bus/bus-event" @@ -9,6 +11,8 @@ import { EventID } from "./schema" import { Flag } from "@/flag/flag" export namespace SyncEvent { + type Convert = (type: string, event: Event["data"]) => Promise> | Record + export type Definition = { type: string version: number @@ -30,38 +34,244 @@ export namespace SyncEvent { export type SerializedEvent = Event & { type: string } type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void + type Init = { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: Convert } + type Payload = { def: Definition; event: Event } export const registry = new Map() - let projectors: Map | undefined const versions = new Map() let frozen = false - let convertEvent: (type: string, event: Event["data"]) => Promise> | Record - const Bus = new EventEmitter<{ event: [{ def: Definition; event: Event }] }>() - - export function reset() { - frozen = false - projectors = undefined - convertEvent = (_, data) => data + export interface Interface { + readonly reset: () => Effect.Effect + readonly init: (input: Init) => Effect.Effect + readonly replay: (event: SerializedEvent, options?: { publish: boolean }) => Effect.Effect + readonly run: ( + def: Def, + data: Event["data"], + options?: { publish?: boolean }, + ) => Effect.Effect + readonly remove: (aggregateID: string) => Effect.Effect + readonly subscribeAll: (handler: (event: Payload) => void) => Effect.Effect<() => void> + readonly payloads: () => z.ZodTypeAny } - export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: typeof convertEvent }) { - projectors = new Map(input.projectors) + export class Service extends Context.Service()("@opencode/SyncEvent") {} - // Install all the latest event defs to the bus. We only ever emit - // latest versions from code, and keep around old versions for - // replaying. Replaying does not go through the bus, and it - // simplifies the bus to only use unversioned latest events - for (let [type, version] of versions.entries()) { - let def = registry.get(versionedType(type, version))! + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + let projectors: Map | undefined + let convert: Convert = (_, data) => data as Record + const bus = new EventEmitter<{ event: [Payload] }>() - BusEvent.define(def.type, def.properties || def.schema) - } + const reset = Effect.fn("SyncEvent.reset")(() => + Effect.sync(() => { + frozen = false + projectors = undefined + convert = (_, data) => data as Record + }), + ) - // Freeze the system so it clearly errors if events are defined - // after `init` which would cause bugs - frozen = true - convertEvent = input.convertEvent || ((_, data) => data) + const init = Effect.fn("SyncEvent.init")((input: Init) => + Effect.sync(() => { + projectors = new Map(input.projectors) + + // Install all the latest event defs to the bus. We only ever emit + // latest versions from code, and keep around old versions for + // replaying. Replaying does not go through the bus, and it + // simplifies the bus to only use unversioned latest events + for (const [type, version] of versions.entries()) { + const def = registry.get(versionedType(type, version))! + BusEvent.define(def.type, def.properties || def.schema) + } + + // Freeze the system so it clearly errors if events are defined + // after `init` which would cause bugs + frozen = true + convert = input.convertEvent || ((_, data) => data as Record) + }), + ) + + const process = Effect.fn("SyncEvent.process")(function* ( + def: Def, + event: Event, + options: { publish: boolean }, + ) { + if (projectors == null) { + throw new Error("No projectors available. Call `SyncEvent.init` to install projectors") + } + + const projector = projectors.get(def) + if (!projector) { + throw new Error(`Projector not found for event: ${def.type}`) + } + + yield* Effect.sync(() => { + // idempotent: need to ignore any events already logged + Database.transaction((tx) => { + projector(tx, event.data) + + if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) { + tx.insert(EventSequenceTable) + .values({ + aggregate_id: event.aggregateID, + seq: event.seq, + }) + .onConflictDoUpdate({ + target: EventSequenceTable.aggregate_id, + set: { seq: event.seq }, + }) + .run() + tx.insert(EventTable) + .values({ + id: event.id, + seq: event.seq, + aggregate_id: event.aggregateID, + type: versionedType(def.type, def.version), + data: event.data as Record, + }) + .run() + } + + Database.effect(() => { + bus.emit("event", { def, event }) + + if (!options.publish) return + + const result = convert(def.type, event.data) + if (result instanceof Promise) { + void result.then((data) => { + void ProjectBus.publish({ type: def.type, properties: def.schema }, data) + }) + return + } + + void ProjectBus.publish({ type: def.type, properties: def.schema }, result) + }) + }) + }) + }) + + const replay = Effect.fn("SyncEvent.replay")(function* (event: SerializedEvent, options?: { publish: boolean }) { + const def = registry.get(event.type) + if (!def) { + throw new Error(`Unknown event type: ${event.type}`) + } + + const row = Database.use((db) => + db + .select({ seq: EventSequenceTable.seq }) + .from(EventSequenceTable) + .where(eq(EventSequenceTable.aggregate_id, event.aggregateID)) + .get(), + ) + + const latest = row?.seq ?? -1 + if (event.seq <= latest) return + + const expected = latest + 1 + if (event.seq !== expected) { + throw new Error( + `Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`, + ) + } + + yield* process(def, event, { publish: !!options?.publish }) + }) + + const run = Effect.fn("SyncEvent.run")(function* ( + def: Def, + data: Event["data"], + options?: { publish?: boolean }, + ) { + const agg = (data as Record)[def.aggregate] + if (agg == null) { + throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`) + } + + if (def.version !== versions.get(def.type)) { + throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`) + } + + const publish = options?.publish ?? true + + yield* Effect.sync(() => { + // Note that this is an "immediate" transaction which is critical. + // We need to make sure we can safely read and write with nothing + // else changing the data from under us + Database.transaction( + (tx) => { + const id = EventID.ascending() + const row = tx + .select({ seq: EventSequenceTable.seq }) + .from(EventSequenceTable) + .where(eq(EventSequenceTable.aggregate_id, agg)) + .get() + const seq = row?.seq != null ? row.seq + 1 : 0 + + const event = { id, seq, aggregateID: agg, data } + Effect.runSync(process(def, event, { publish })) + }, + { + behavior: "immediate", + }, + ) + }) + }) + + const remove = Effect.fn("SyncEvent.remove")((aggregateID: string) => + Effect.sync(() => { + Database.transaction((tx) => { + tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run() + tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run() + }) + }), + ) + + const subscribeAll = Effect.fn("SyncEvent.subscribeAll")((handler: (event: Payload) => void) => + Effect.sync(() => { + bus.on("event", handler) + return () => bus.off("event", handler) + }), + ) + + function payloads() { + return z + .union( + registry + .entries() + .map(([type, def]) => { + return z + .object({ + type: z.literal(type), + aggregate: z.literal(def.aggregate), + data: def.schema, + }) + .meta({ + ref: "SyncEvent" + "." + def.type, + }) + }) + .toArray() as any, + ) + .meta({ + ref: "SyncEvent", + }) + } + + return Service.of({ reset, init, replay, run, remove, subscribeAll, payloads }) + }), + ) + + export const defaultLayer = layer + + const { runSync } = makeRuntime(Service, defaultLayer) + + export function reset() { + return runSync((svc) => svc.reset()) + } + + export function init(input: Init) { + return runSync((svc) => svc.init(input)) } export function versionedType(type: A): A @@ -102,63 +312,6 @@ export namespace SyncEvent { return [def, func as ProjectorFunc] } - function process(def: Def, event: Event, options: { publish: boolean }) { - if (projectors == null) { - throw new Error("No projectors available. Call `SyncEvent.init` to install projectors") - } - - const projector = projectors.get(def) - if (!projector) { - throw new Error(`Projector not found for event: ${def.type}`) - } - - // idempotent: need to ignore any events already logged - - Database.transaction((tx) => { - projector(tx, event.data) - - if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) { - tx.insert(EventSequenceTable) - .values({ - aggregate_id: event.aggregateID, - seq: event.seq, - }) - .onConflictDoUpdate({ - target: EventSequenceTable.aggregate_id, - set: { seq: event.seq }, - }) - .run() - tx.insert(EventTable) - .values({ - id: event.id, - seq: event.seq, - aggregate_id: event.aggregateID, - type: versionedType(def.type, def.version), - data: event.data as Record, - }) - .run() - } - - Database.effect(() => { - Bus.emit("event", { - def, - event, - }) - - if (options?.publish) { - const result = convertEvent(def.type, event.data) - if (result instanceof Promise) { - result.then((data) => { - ProjectBus.publish({ type: def.type, properties: def.schema }, data) - }) - } else { - ProjectBus.publish({ type: def.type, properties: def.schema }, result) - } - } - }) - }) - } - // TODO: // // * Support applying multiple events at one time. One transaction, @@ -166,100 +319,22 @@ export namespace SyncEvent { // * when loading events from db, apply zod validation to ensure shape export function replay(event: SerializedEvent, options?: { publish: boolean }) { - const def = registry.get(event.type) - if (!def) { - throw new Error(`Unknown event type: ${event.type}`) - } - - const row = Database.use((db) => - db - .select({ seq: EventSequenceTable.seq }) - .from(EventSequenceTable) - .where(eq(EventSequenceTable.aggregate_id, event.aggregateID)) - .get(), - ) - - const latest = row?.seq ?? -1 - if (event.seq <= latest) { - return - } - - const expected = latest + 1 - if (event.seq !== expected) { - throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`) - } - - process(def, event, { publish: !!options?.publish }) + return runSync((svc) => svc.replay(event, options)) } export function run(def: Def, data: Event["data"], options?: { publish?: boolean }) { - const agg = (data as Record)[def.aggregate] - // This should never happen: we've enforced it via typescript in - // the definition - if (agg == null) { - throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`) - } - - if (def.version !== versions.get(def.type)) { - throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`) - } - - const { publish = true } = options || {} - - // Note that this is an "immediate" transaction which is critical. - // We need to make sure we can safely read and write with nothing - // else changing the data from under us - Database.transaction( - (tx) => { - const id = EventID.ascending() - const row = tx - .select({ seq: EventSequenceTable.seq }) - .from(EventSequenceTable) - .where(eq(EventSequenceTable.aggregate_id, agg)) - .get() - const seq = row?.seq != null ? row.seq + 1 : 0 - - const event = { id, seq, aggregateID: agg, data } - process(def, event, { publish }) - }, - { - behavior: "immediate", - }, - ) + return runSync((svc) => svc.run(def, data, options)) } export function remove(aggregateID: string) { - Database.transaction((tx) => { - tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run() - tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run() - }) + return runSync((svc) => svc.remove(aggregateID)) } export function subscribeAll(handler: (event: { def: Definition; event: Event }) => void) { - Bus.on("event", handler) - return () => Bus.off("event", handler) + return runSync((svc) => svc.subscribeAll(handler)) } export function payloads() { - return z - .union( - registry - .entries() - .map(([type, def]) => { - return z - .object({ - type: z.literal(type), - aggregate: z.literal(def.aggregate), - data: def.schema, - }) - .meta({ - ref: "SyncEvent" + "." + def.type, - }) - }) - .toArray() as any, - ) - .meta({ - ref: "SyncEvent", - }) + return runSync((svc) => Effect.sync(() => svc.payloads())) } }