refactor(core): make SyncEvent schema-first

Store sync event definitions as Effect Schema and derive Zod only at the bus and OpenAPI edges. Bridge the remaining Zod-native session payloads through Schema annotations so the sync layer no longer needs mixed-schema definition helpers.
This commit is contained in:
Kit Langton
2026-04-23 10:23:51 -04:00
parent 96039bdb83
commit 25c30e06d8
4 changed files with 76 additions and 67 deletions

View File

@@ -1,4 +1,4 @@
import z from "zod"
import { Schema } from "effect"
import sessionProjectors from "../session/projectors"
import { SyncEvent } from "@/sync"
import { Session } from "@/session"
@@ -10,7 +10,7 @@ export function initProjectors() {
projectors: sessionProjectors,
convertEvent: (type, data) => {
if (type === "session.updated") {
const id = (data as z.infer<typeof Session.Event.Updated.schema>).sessionID
const id = (data as Schema.Schema.Type<typeof Session.Event.Updated.schema>).sessionID
const row = Database.use((db) => db.select().from(SessionTable).where(eq(SessionTable.id, id)).get())
if (!row) return data

View File

@@ -28,7 +28,7 @@ import type { Provider } from "@/provider"
import { Permission } from "@/permission"
import { Global } from "@/global"
import { Effect, Layer, Option, Context, Schema, Types } from "effect"
import { zod, zodObject } from "@/util/effect-zod"
import { ZodOverride, zod, zodObject } from "@/util/effect-zod"
import { withStatics } from "@/util/schema"
const log = Log.create({ service: "session" })
@@ -215,40 +215,50 @@ export const MessagesInput = Schema.Struct({
limit: Schema.optional(Schema.Number),
}).pipe(withStatics((s) => ({ zod: zod(s) })))
function schemaFromZod<T extends z.ZodTypeAny>(value: T) {
return Schema.declare((input): input is z.output<T> => value.safeParse(input).success).annotate({
[ZodOverride]: value,
})
}
const SessionUpdateInfoSchema = schemaFromZod(
updateSchema(zodObject(Info)).extend({
share: updateSchema(zodObject(Share)).optional(),
time: updateSchema(zodObject(Time)).optional(),
}),
)
export const Event = {
Created: SyncEvent.define({
type: "session.created",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
info: Info.zod,
}),
schema: {
sessionID: SessionID,
info: Info,
},
}),
Updated: SyncEvent.define({
type: "session.updated",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
info: updateSchema(zodObject(Info)).extend({
share: updateSchema(zodObject(Share)).optional(),
time: updateSchema(zodObject(Time)).optional(),
}),
}),
busSchema: z.object({
sessionID: SessionID.zod,
info: Info.zod,
}),
schema: {
sessionID: SessionID,
info: SessionUpdateInfoSchema,
},
busSchema: {
sessionID: SessionID,
info: Info,
},
}),
Deleted: SyncEvent.define({
type: "session.deleted",
version: 1,
aggregate: "sessionID",
schema: z.object({
sessionID: SessionID.zod,
info: Info.zod,
}),
schema: {
sessionID: SessionID,
info: Info,
},
}),
Diff: BusEvent.define(
"session.diff",
@@ -394,7 +404,7 @@ export interface Interface {
export class Service extends Context.Service<Service, Interface>()("@opencode/Session") {}
type Patch = z.infer<typeof Event.Updated.schema>["info"]
type Patch = Schema.Schema.Type<typeof Event.Updated.schema>["info"]
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
Effect.sync(() => Database.use(fn))

View File

@@ -1,5 +1,4 @@
import z from "zod"
import type { ZodObject } from "zod"
import { Schema, Types } from "effect"
import { Database, eq } from "@/storage"
import { GlobalBus } from "@/bus/global"
@@ -12,22 +11,34 @@ import { EventID } from "./schema"
import { Flag } from "@/flag/flag"
import { zod } from "@/util/effect-zod"
type StructLike<Fields extends Schema.Struct.Fields> = Fields | Schema.Struct<Fields>
export type Definition = {
type: string
version: number
aggregate: string
schema: z.ZodObject
// This is temporary and only exists for compatibility with bus
// event definitions
properties: z.ZodObject
schema: Schema.Top
busSchema: Schema.Top
properties: z.ZodTypeAny
}
type MutableType<S extends Schema.Top> = Types.DeepMutable<Schema.Schema.Type<S>>
type DefinedEvent<Type extends string, Agg extends string, SchemaDef extends Schema.Top, BusDef extends Schema.Top> = Definition & {
type: Type
aggregate: Agg
schema: SchemaDef
busSchema: BusDef
properties: z.ZodType<MutableType<BusDef>>
}
type Data<Def extends Definition> = MutableType<Def["schema"]>
export type Event<Def extends Definition = Definition> = {
id: string
seq: number
aggregateID: string
data: z.infer<Def["schema"]>
data: Data<Def>
}
export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
@@ -56,7 +67,7 @@ export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; co
for (let [type, version] of versions.entries()) {
let def = registry.get(versionedType(type, version))!
BusEvent.define(def.type, def.properties || def.schema)
BusEvent.define(def.type, def.properties)
}
// Freeze the system so it clearly errors if events are defined
@@ -71,58 +82,46 @@ export function versionedType(type: string, version?: number) {
return version ? `${type}.${version}` : type
}
type SchemaLike<Agg extends string> =
| ZodObject<Record<Agg, z.ZodType<string>>>
| Schema.Struct<Record<Agg, Schema.Top>>
function struct<Fields extends Schema.Struct.Fields>(value: StructLike<Fields>) {
return (Schema.isSchema(value) ? value : Schema.Struct(value as Fields)) as Schema.Struct<Fields>
}
type BusSchemaLike = ZodObject | Schema.Struct<Schema.Struct.Fields>
type Mutable<T> = Types.DeepMutable<T>
type ToZodObject<S> = S extends Schema.Top
? z.ZodObject<{ [K in keyof Mutable<Schema.Schema.Type<S>>]: z.ZodType<Mutable<Schema.Schema.Type<S>>[K]> }>
: S
/**
* Define a sync event. Accepts either a Zod schema or an Effect Schema for
* both `schema` and `busSchema`. Effect Schemas are converted to Zod via the
* `effect-zod` walker since the sync pipeline uses Zod for validation and
* JSON Schema generation.
*/
export function define<
Type extends string,
Agg extends string,
S extends SchemaLike<Agg>,
B extends BusSchemaLike = S,
>(input: { type: Type; version: number; aggregate: Agg; schema: S; busSchema?: B }) {
Fields extends Schema.Struct.Fields & Record<Agg, Schema.Top>,
BusFields extends Schema.Struct.Fields = Fields,
>(input: {
type: Type
version: number
aggregate: Agg
schema: StructLike<Fields>
busSchema?: StructLike<BusFields>
}): DefinedEvent<Type, Agg, Schema.Struct<Fields>, Schema.Struct<BusFields>> {
if (frozen) {
throw new Error("Error defining sync event: sync system has been frozen")
}
const schema = toZodObject(input.schema) as ToZodObject<S>
const properties = (input.busSchema ? toZodObject(input.busSchema) : schema) as ToZodObject<B>
const schema = struct(input.schema)
const busSchema = (input.busSchema ? struct(input.busSchema) : schema) as Schema.Struct<BusFields>
const properties = zod(busSchema) as unknown as z.ZodType<MutableType<typeof busSchema>>
const def = {
const def: DefinedEvent<Type, Agg, typeof schema, typeof busSchema> = {
type: input.type,
version: input.version,
aggregate: input.aggregate,
schema,
busSchema,
properties,
}
versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0))
registry.set(versionedType(def.type, def.version), def as unknown as Definition)
registry.set(versionedType(def.type, def.version), def)
return def
}
function toZodObject(value: ZodObject | Schema.Top): z.ZodObject {
if (Schema.isSchema(value)) {
return zod(value as Schema.Top) as unknown as z.ZodObject
}
return value as z.ZodObject
}
export function project<Def extends Definition>(
def: Def,
func: (db: Database.TxOrDb, data: Event<Def>["data"]) => void,
@@ -172,10 +171,10 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
const result = convertEvent(def.type, event.data)
if (result instanceof Promise) {
void result.then((data) => {
void ProjectBus.publish({ type: def.type, properties: def.schema }, data)
void ProjectBus.publish({ type: def.type, properties: def.properties }, data)
})
} else {
void ProjectBus.publish({ type: def.type, properties: def.schema }, result)
void ProjectBus.publish({ type: def.type, properties: def.properties }, result)
}
GlobalBus.emit("event", {
@@ -295,7 +294,7 @@ export function payloads() {
id: z.string(),
seq: z.number(),
aggregateID: z.literal(def.aggregate),
data: def.schema,
data: zod(def.schema),
})
.meta({
ref: `SyncEvent.${def.type}`,

View File

@@ -44,13 +44,13 @@ describe("SyncEvent", () => {
type: "item.created",
version: 1,
aggregate: "id",
schema: z.object({ id: z.string(), name: z.string() }),
schema: { id: Schema.String, name: Schema.String },
})
const Sent = SyncEvent.define({
type: "item.sent",
version: 1,
aggregate: "item_id",
schema: z.object({ item_id: z.string(), to: z.string() }),
schema: { item_id: Schema.String, to: Schema.String },
})
SyncEvent.init({
@@ -139,7 +139,7 @@ describe("SyncEvent", () => {
type: "item.effect.created",
version: 1,
aggregate: "id",
schema: Schema.Struct({ id: Schema.String, name: Schema.String }),
schema: { id: Schema.String, name: Schema.String },
})
SyncEvent.init({