diff --git a/packages/opencode/src/server/projectors.ts b/packages/opencode/src/server/projectors.ts index cfecce5265..c32e08693e 100644 --- a/packages/opencode/src/server/projectors.ts +++ b/packages/opencode/src/server/projectors.ts @@ -1,4 +1,4 @@ -import z from "zod" +import { Schema } from "effect" import sessionProjectors from "../session/projectors" import { SyncEvent } from "@/sync" import { Session } from "@/session" @@ -10,7 +10,7 @@ export function initProjectors() { projectors: sessionProjectors, convertEvent: (type, data) => { if (type === "session.updated") { - const id = (data as z.infer).sessionID + const id = (data as Schema.Schema.Type).sessionID const row = Database.use((db) => db.select().from(SessionTable).where(eq(SessionTable.id, id)).get()) if (!row) return data diff --git a/packages/opencode/src/session/session.ts b/packages/opencode/src/session/session.ts index d2bdbccb7d..46513801be 100644 --- a/packages/opencode/src/session/session.ts +++ b/packages/opencode/src/session/session.ts @@ -28,7 +28,7 @@ import type { Provider } from "@/provider" import { Permission } from "@/permission" import { Global } from "@/global" import { Effect, Layer, Option, Context, Schema, Types } from "effect" -import { zod, zodObject } from "@/util/effect-zod" +import { ZodOverride, zod, zodObject } from "@/util/effect-zod" import { withStatics } from "@/util/schema" const log = Log.create({ service: "session" }) @@ -215,40 +215,50 @@ export const MessagesInput = Schema.Struct({ limit: Schema.optional(Schema.Number), }).pipe(withStatics((s) => ({ zod: zod(s) }))) +function schemaFromZod(value: T) { + return Schema.declare((input): input is z.output => value.safeParse(input).success).annotate({ + [ZodOverride]: value, + }) +} + +const SessionUpdateInfoSchema = schemaFromZod( + updateSchema(zodObject(Info)).extend({ + share: updateSchema(zodObject(Share)).optional(), + time: updateSchema(zodObject(Time)).optional(), + }), +) + export const Event = { Created: SyncEvent.define({ type: "session.created", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - info: Info.zod, - }), + schema: { + sessionID: SessionID, + info: Info, + }, }), Updated: SyncEvent.define({ type: "session.updated", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - info: updateSchema(zodObject(Info)).extend({ - share: updateSchema(zodObject(Share)).optional(), - time: updateSchema(zodObject(Time)).optional(), - }), - }), - busSchema: z.object({ - sessionID: SessionID.zod, - info: Info.zod, - }), + schema: { + sessionID: SessionID, + info: SessionUpdateInfoSchema, + }, + busSchema: { + sessionID: SessionID, + info: Info, + }, }), Deleted: SyncEvent.define({ type: "session.deleted", version: 1, aggregate: "sessionID", - schema: z.object({ - sessionID: SessionID.zod, - info: Info.zod, - }), + schema: { + sessionID: SessionID, + info: Info, + }, }), Diff: BusEvent.define( "session.diff", @@ -394,7 +404,7 @@ export interface Interface { export class Service extends Context.Service()("@opencode/Session") {} -type Patch = z.infer["info"] +type Patch = Schema.Schema.Type["info"] const db = (fn: (d: Parameters[0] extends (trx: infer D) => any ? D : never) => T) => Effect.sync(() => Database.use(fn)) diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index fdedff0366..2248ed15d2 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -1,5 +1,4 @@ import z from "zod" -import type { ZodObject } from "zod" import { Schema, Types } from "effect" import { Database, eq } from "@/storage" import { GlobalBus } from "@/bus/global" @@ -12,22 +11,34 @@ import { EventID } from "./schema" import { Flag } from "@/flag/flag" import { zod } from "@/util/effect-zod" +type StructLike = Fields | Schema.Struct + export type Definition = { type: string version: number aggregate: string - schema: z.ZodObject - - // This is temporary and only exists for compatibility with bus - // event definitions - properties: z.ZodObject + schema: Schema.Top + busSchema: Schema.Top + properties: z.ZodTypeAny } +type MutableType = Types.DeepMutable> + +type DefinedEvent = Definition & { + type: Type + aggregate: Agg + schema: SchemaDef + busSchema: BusDef + properties: z.ZodType> +} + +type Data = MutableType + export type Event = { id: string seq: number aggregateID: string - data: z.infer + data: Data } export type SerializedEvent = Event & { type: string } @@ -56,7 +67,7 @@ export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; co for (let [type, version] of versions.entries()) { let def = registry.get(versionedType(type, version))! - BusEvent.define(def.type, def.properties || def.schema) + BusEvent.define(def.type, def.properties) } // Freeze the system so it clearly errors if events are defined @@ -71,58 +82,46 @@ export function versionedType(type: string, version?: number) { return version ? `${type}.${version}` : type } -type SchemaLike = - | ZodObject>> - | Schema.Struct> +function struct(value: StructLike) { + return (Schema.isSchema(value) ? value : Schema.Struct(value as Fields)) as Schema.Struct +} -type BusSchemaLike = ZodObject | Schema.Struct - -type Mutable = Types.DeepMutable -type ToZodObject = S extends Schema.Top - ? z.ZodObject<{ [K in keyof Mutable>]: z.ZodType>[K]> }> - : S - -/** - * Define a sync event. Accepts either a Zod schema or an Effect Schema for - * both `schema` and `busSchema`. Effect Schemas are converted to Zod via the - * `effect-zod` walker since the sync pipeline uses Zod for validation and - * JSON Schema generation. - */ export function define< Type extends string, Agg extends string, - S extends SchemaLike, - B extends BusSchemaLike = S, ->(input: { type: Type; version: number; aggregate: Agg; schema: S; busSchema?: B }) { + Fields extends Schema.Struct.Fields & Record, + BusFields extends Schema.Struct.Fields = Fields, +>(input: { + type: Type + version: number + aggregate: Agg + schema: StructLike + busSchema?: StructLike +}): DefinedEvent, Schema.Struct> { if (frozen) { throw new Error("Error defining sync event: sync system has been frozen") } - const schema = toZodObject(input.schema) as ToZodObject - const properties = (input.busSchema ? toZodObject(input.busSchema) : schema) as ToZodObject + const schema = struct(input.schema) + const busSchema = (input.busSchema ? struct(input.busSchema) : schema) as Schema.Struct + const properties = zod(busSchema) as unknown as z.ZodType> - const def = { + const def: DefinedEvent = { type: input.type, version: input.version, aggregate: input.aggregate, schema, + busSchema, properties, } versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0)) - registry.set(versionedType(def.type, def.version), def as unknown as Definition) + registry.set(versionedType(def.type, def.version), def) return def } -function toZodObject(value: ZodObject | Schema.Top): z.ZodObject { - if (Schema.isSchema(value)) { - return zod(value as Schema.Top) as unknown as z.ZodObject - } - return value as z.ZodObject -} - export function project( def: Def, func: (db: Database.TxOrDb, data: Event["data"]) => void, @@ -172,10 +171,10 @@ function process(def: Def, event: Event, options: { const result = convertEvent(def.type, event.data) if (result instanceof Promise) { void result.then((data) => { - void ProjectBus.publish({ type: def.type, properties: def.schema }, data) + void ProjectBus.publish({ type: def.type, properties: def.properties }, data) }) } else { - void ProjectBus.publish({ type: def.type, properties: def.schema }, result) + void ProjectBus.publish({ type: def.type, properties: def.properties }, result) } GlobalBus.emit("event", { @@ -295,7 +294,7 @@ export function payloads() { id: z.string(), seq: z.number(), aggregateID: z.literal(def.aggregate), - data: def.schema, + data: zod(def.schema), }) .meta({ ref: `SyncEvent.${def.type}`, diff --git a/packages/opencode/test/sync/index.test.ts b/packages/opencode/test/sync/index.test.ts index 81c78132ec..9c9ff30818 100644 --- a/packages/opencode/test/sync/index.test.ts +++ b/packages/opencode/test/sync/index.test.ts @@ -44,13 +44,13 @@ describe("SyncEvent", () => { type: "item.created", version: 1, aggregate: "id", - schema: z.object({ id: z.string(), name: z.string() }), + schema: { id: Schema.String, name: Schema.String }, }) const Sent = SyncEvent.define({ type: "item.sent", version: 1, aggregate: "item_id", - schema: z.object({ item_id: z.string(), to: z.string() }), + schema: { item_id: Schema.String, to: Schema.String }, }) SyncEvent.init({ @@ -139,7 +139,7 @@ describe("SyncEvent", () => { type: "item.effect.created", version: 1, aggregate: "id", - schema: Schema.Struct({ id: Schema.String, name: Schema.String }), + schema: { id: Schema.String, name: Schema.String }, }) SyncEvent.init({