From ec98b656fdd9e7a8ae53bb8fd6272b5a80773add Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 30 Apr 2026 16:09:14 -0400 Subject: [PATCH] Add SyncEvent service --- .../opencode/src/control-plane/workspace.ts | 91 +++++---- packages/opencode/src/effect/app-runtime.ts | 2 + .../routes/instance/httpapi/handlers/sync.ts | 3 +- .../server/routes/instance/httpapi/server.ts | 2 + .../src/server/routes/instance/sync.ts | 2 +- packages/opencode/src/session/revert.ts | 6 +- packages/opencode/src/session/session.ts | 56 +++--- packages/opencode/src/share/session.ts | 8 +- packages/opencode/src/sync/index.ts | 27 ++- packages/opencode/test/sync/index.test.ts | 175 ++++++++++-------- 10 files changed, 223 insertions(+), 149 deletions(-) diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts index fe8046ba9c..7f9d078bb7 100644 --- a/packages/opencode/src/control-plane/workspace.ts +++ b/packages/opencode/src/control-plane/workspace.ts @@ -169,6 +169,7 @@ export const layer = Layer.effect( const auth = yield* Auth.Service const session = yield* Session.Service const http = yield* HttpClient.HttpClient + const sync = yield* SyncEvent.Service const connections = new Map() const syncFibers = yield* FiberMap.make() @@ -307,25 +308,30 @@ export const layer = Layer.effect( events: events.length, }) - yield* Effect.sync(() => - WorkspaceContext.provide({ + yield* Effect.promise(async () => { + await WorkspaceContext.provide({ workspaceID: space.id, - fn: () => { - for (const event of events) { - SyncEvent.replay( - { - id: event.id, - aggregateID: event.aggregate_id, - seq: event.seq, - type: event.type, - data: event.data, - }, - { publish: true }, - ) - } + async fn() { + await Effect.runPromise( + Effect.forEach( + events, + (event) => + sync.replay( + { + id: event.id, + aggregateID: event.aggregate_id, + seq: event.seq, + type: event.type, + data: event.data, + }, + { publish: true }, + ), + { discard: true }, + ), + ) }, - }), - ) + }) + }) }) const syncWorkspaceLoop = Effect.fn("Workspace.syncWorkspaceLoop")(function* (space: Info) { @@ -361,16 +367,28 @@ export const layer = Layer.effect( setStatus(space.id, "connected") yield* parseSSE(stream, (evt) => - Effect.sync(() => { + Effect.gen(function* () { + if (!evt || typeof evt !== "object" || !("payload" in evt)) return + const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent } + if (payload.type === "server.heartbeat") return + + if (payload.type === "sync" && payload.syncEvent) { + const failed = yield* sync.replay(payload.syncEvent).pipe( + Effect.as(false), + Effect.catchCause((error) => + Effect.sync(() => { + log.info("failed to replay global event", { + workspaceID: space.id, + error, + }) + return true + }), + ), + ) + if (failed) return + } + try { - if (!evt || typeof evt !== "object" || !("payload" in evt)) return - const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent } - if (payload.type === "server.heartbeat") return - - if (payload.type === "sync" && payload.syncEvent) { - SyncEvent.replay(payload.syncEvent) - } - const event = evt as { directory?: string; project?: string; payload: unknown } GlobalBus.emit("event", { directory: event.directory, @@ -378,10 +396,10 @@ export const layer = Layer.effect( workspace: space.id, payload: event.payload, }) - } catch (err) { + } catch (error) { log.info("failed to replay global event", { workspaceID: space.id, - error: err, + error, }) } }), @@ -516,14 +534,12 @@ export const layer = Layer.effect( const adaptor = getAdaptor(space.projectID, space.type) const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space))) - yield* Effect.sync(() => - SyncEvent.run(Session.Event.Updated, { - sessionID: input.sessionID, - info: { - workspaceID: input.workspaceID, - }, - }), - ) + yield* sync.run(Session.Event.Updated, { + sessionID: input.sessionID, + info: { + workspaceID: input.workspaceID, + }, + }) const rows = yield* db((db) => db @@ -593,7 +609,7 @@ export const layer = Layer.effect( }) if (target.type === "local") { - SyncEvent.replayAll(events) + yield* sync.replayAll(events) log.info("session restore batch replayed locally", { workspaceID: input.workspaceID, sessionID: input.sessionID, @@ -812,6 +828,7 @@ export const layer = Layer.effect( export const defaultLayer = layer.pipe( Layer.provide(Auth.defaultLayer), Layer.provide(Session.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), Layer.provide(FetchHttpClient.layer), ) diff --git a/packages/opencode/src/effect/app-runtime.ts b/packages/opencode/src/effect/app-runtime.ts index 84be170688..06969ff9d1 100644 --- a/packages/opencode/src/effect/app-runtime.ts +++ b/packages/opencode/src/effect/app-runtime.ts @@ -47,6 +47,7 @@ import { Pty } from "@/pty" import { Installation } from "@/installation" import { ShareNext } from "@/share/share-next" import { SessionShare } from "@/share/session" +import { SyncEvent } from "@/sync" import { Npm } from "@opencode-ai/core/npm" import { memoMap } from "@opencode-ai/core/effect/memo-map" @@ -97,6 +98,7 @@ export const AppLayer = Layer.mergeAll( Installation.defaultLayer, ShareNext.defaultLayer, SessionShare.defaultLayer, + SyncEvent.defaultLayer, ).pipe(Layer.provideMerge(Observability.layer)) const rt = ManagedRuntime.make(AppLayer, { memoMap }) diff --git a/packages/opencode/src/server/routes/instance/httpapi/handlers/sync.ts b/packages/opencode/src/server/routes/instance/httpapi/handlers/sync.ts index fbe1249939..f4a2f315cd 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/handlers/sync.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/handlers/sync.ts @@ -21,6 +21,7 @@ export const syncHandlers = HttpApiBuilder.group(InstanceHttpApi, "sync", (handl Effect.gen(function* () { const workspace = yield* Workspace.Service const scope = yield* Scope.Scope + const sync = yield* SyncEvent.Service const start = Effect.fn("SyncHttpApi.start")(function* () { yield* workspace @@ -45,7 +46,7 @@ export const syncHandlers = HttpApiBuilder.group(InstanceHttpApi, "sync", (handl last: events.at(-1)?.seq, directory: ctx.payload.directory, }) - SyncEvent.replayAll(events) + yield* sync.replayAll(events) log.info("sync replay complete", { sessionID: source, events: events.length, diff --git a/packages/opencode/src/server/routes/instance/httpapi/server.ts b/packages/opencode/src/server/routes/instance/httpapi/server.ts index 62fa18743a..f53ddb3ec5 100644 --- a/packages/opencode/src/server/routes/instance/httpapi/server.ts +++ b/packages/opencode/src/server/routes/instance/httpapi/server.ts @@ -31,6 +31,7 @@ import { SessionSummary } from "@/session/summary" import { Todo } from "@/session/todo" import { SessionShare } from "@/share/session" import { Skill } from "@/skill" +import { SyncEvent } from "@/sync" import { ToolRegistry } from "@/tool/registry" import { lazy } from "@/util/lazy" import { Vcs } from "@/project/vcs" @@ -147,6 +148,7 @@ export const routes = Layer.mergeAll(rootApiRoutes, instanceRoutes).pipe( SessionRunState.defaultLayer, SessionStatus.defaultLayer, SessionSummary.defaultLayer, + SyncEvent.defaultLayer, Skill.defaultLayer, Todo.defaultLayer, ToolRegistry.defaultLayer, diff --git a/packages/opencode/src/server/routes/instance/sync.ts b/packages/opencode/src/server/routes/instance/sync.ts index bb816ecc42..636739a2c8 100644 --- a/packages/opencode/src/server/routes/instance/sync.ts +++ b/packages/opencode/src/server/routes/instance/sync.ts @@ -94,7 +94,7 @@ export const SyncRoutes = lazy(() => last: events.at(-1)?.seq, directory: body.directory, }) - SyncEvent.replayAll(events) + await AppRuntime.runPromise(SyncEvent.Service.use((sync) => sync.replayAll(events))) log.info("sync replay complete", { sessionID: source, diff --git a/packages/opencode/src/session/revert.ts b/packages/opencode/src/session/revert.ts index da9952ccb2..58d69a2040 100644 --- a/packages/opencode/src/session/revert.ts +++ b/packages/opencode/src/session/revert.ts @@ -38,6 +38,7 @@ export const layer = Layer.effect( const bus = yield* Bus.Service const summary = yield* SessionSummary.Service const state = yield* SessionRunState.Service + const sync = yield* SyncEvent.Service const revert = Effect.fn("SessionRevert.revert")(function* (input: RevertInput) { yield* state.assertNotBusy(input.sessionID) @@ -121,7 +122,7 @@ export const layer = Layer.effect( remove.push(msg) } for (const msg of remove) { - SyncEvent.run(MessageV2.Event.Removed, { + yield* sync.run(MessageV2.Event.Removed, { sessionID, messageID: msg.info.id, }) @@ -133,7 +134,7 @@ export const layer = Layer.effect( const removeParts = target.parts.slice(idx) target.parts = target.parts.slice(0, idx) for (const part of removeParts) { - SyncEvent.run(MessageV2.Event.PartRemoved, { + yield* sync.run(MessageV2.Event.PartRemoved, { sessionID, messageID: target.info.id, partID: part.id, @@ -156,6 +157,7 @@ export const defaultLayer = Layer.suspend(() => Layer.provide(Storage.defaultLayer), Layer.provide(Bus.layer), Layer.provide(SessionSummary.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), ), ) diff --git a/packages/opencode/src/session/session.ts b/packages/opencode/src/session/session.ts index 72c4d241eb..5534976e39 100644 --- a/packages/opencode/src/session/session.ts +++ b/packages/opencode/src/session/session.ts @@ -443,11 +443,12 @@ export type Patch = Types.DeepMutable["dat const db = (fn: (d: Parameters[0] extends (trx: infer D) => any ? D : never) => T) => Effect.sync(() => Database.use(fn)) -export const layer: Layer.Layer = Layer.effect( +export const layer: Layer.Layer = Layer.effect( Service, Effect.gen(function* () { const bus = yield* Bus.Service const storage = yield* Storage.Service + const sync = yield* SyncEvent.Service const createNext = Effect.fn("Session.createNext")(function* (input: { id?: SessionID @@ -477,7 +478,7 @@ export const layer: Layer.Layer = } log.info("created", result) - yield* Effect.sync(() => SyncEvent.run(Event.Created, { sessionID: result.id, info: result })) + yield* sync.run(Event.Created, { sessionID: result.id, info: result }) if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) { // This only exist for backwards compatibility. We should not be @@ -525,10 +526,8 @@ export const layer: Layer.Layer = Effect.catchCause(() => Effect.succeed(false)), ) - yield* Effect.sync(() => { - SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance }) - SyncEvent.remove(sessionID) - }) + yield* sync.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance }) + yield* sync.remove(sessionID) } catch (e) { log.error(e) } @@ -536,19 +535,17 @@ export const layer: Layer.Layer = const updateMessage = (msg: T): Effect.Effect => Effect.gen(function* () { - yield* Effect.sync(() => SyncEvent.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg })) + yield* sync.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg }) return msg }).pipe(Effect.withSpan("Session.updateMessage")) const updatePart = (part: T): Effect.Effect => Effect.gen(function* () { - yield* Effect.sync(() => - SyncEvent.run(MessageV2.Event.PartUpdated, { - sessionID: part.sessionID, - part: structuredClone(part), - time: Date.now(), - }), - ) + yield* sync.run(MessageV2.Event.PartUpdated, { + sessionID: part.sessionID, + part: structuredClone(part), + time: Date.now(), + }) return part }).pipe(Effect.withSpan("Session.updatePart")) @@ -635,8 +632,7 @@ export const layer: Layer.Layer = return session }) - const patch = (sessionID: SessionID, info: Patch) => - Effect.sync(() => SyncEvent.run(Event.Updated, { sessionID, info })) + const patch = (sessionID: SessionID, info: Patch) => sync.run(Event.Updated, { sessionID, info }) const touch = Effect.fn("Session.touch")(function* (sessionID: SessionID) { yield* patch(sessionID, { time: { updated: Date.now() } }) @@ -693,12 +689,10 @@ export const layer: Layer.Layer = sessionID: SessionID messageID: MessageID }) { - yield* Effect.sync(() => - SyncEvent.run(MessageV2.Event.Removed, { - sessionID: input.sessionID, - messageID: input.messageID, - }), - ) + yield* sync.run(MessageV2.Event.Removed, { + sessionID: input.sessionID, + messageID: input.messageID, + }) return input.messageID }) @@ -707,13 +701,11 @@ export const layer: Layer.Layer = messageID: MessageID partID: PartID }) { - yield* Effect.sync(() => - SyncEvent.run(MessageV2.Event.PartRemoved, { - sessionID: input.sessionID, - messageID: input.messageID, - partID: input.partID, - }), - ) + yield* sync.run(MessageV2.Event.PartRemoved, { + sessionID: input.sessionID, + messageID: input.messageID, + partID: input.partID, + }) return input.partID }) @@ -764,7 +756,11 @@ export const layer: Layer.Layer = }), ) -export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Storage.defaultLayer)) +export const defaultLayer = layer.pipe( + Layer.provide(Bus.layer), + Layer.provide(Storage.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), +) export function* list(input?: { directory?: string diff --git a/packages/opencode/src/share/session.ts b/packages/opencode/src/share/session.ts index 99e46a0092..7e4de204ed 100644 --- a/packages/opencode/src/share/session.ts +++ b/packages/opencode/src/share/session.ts @@ -21,20 +21,19 @@ export const layer = Layer.effect( const session = yield* Session.Service const shareNext = yield* ShareNext.Service const scope = yield* Scope.Scope + const sync = yield* SyncEvent.Service const share = Effect.fn("SessionShare.share")(function* (sessionID: SessionID) { const conf = yield* cfg.get() if (conf.share === "disabled") throw new Error("Sharing is disabled in configuration") const result = yield* shareNext.create(sessionID) - yield* Effect.sync(() => - SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }), - ) + yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }) return result }) const unshare = Effect.fn("SessionShare.unshare")(function* (sessionID: SessionID) { yield* shareNext.remove(sessionID) - yield* Effect.sync(() => SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } })) + yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } }) }) const create = Effect.fn("SessionShare.create")(function* (input?: Session.CreateInput) { @@ -54,6 +53,7 @@ export const defaultLayer = layer.pipe( Layer.provide(ShareNext.defaultLayer), Layer.provide(Session.defaultLayer), Layer.provide(Config.defaultLayer), + Layer.provide(SyncEvent.defaultLayer), ) export * as SessionShare from "./session" diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index 67bc9b9e7c..e57eec4e82 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -9,7 +9,7 @@ import { EventSequenceTable, EventTable } from "./event.sql" import { WorkspaceContext } from "@/control-plane/workspace-context" import { EventID } from "./schema" import { Flag } from "@opencode-ai/core/flag/flag" -import { Schema as EffectSchema } from "effect" +import { Context, Effect, Layer, Schema as EffectSchema } from "effect" import { zodObject } from "@/util/effect-zod" import type { DeepMutable } from "@/util/schema" @@ -46,6 +46,31 @@ export type SerializedEvent = Event & type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void type ConvertEvent = (type: string, data: Event["data"]) => unknown | Promise +export interface Interface { + readonly run: ( + def: Def, + data: Event["data"], + options?: { publish?: boolean }, + ) => Effect.Effect + readonly replay: (event: SerializedEvent, options?: { publish: boolean }) => Effect.Effect + readonly replayAll: (events: SerializedEvent[], options?: { publish: boolean }) => Effect.Effect + readonly remove: (aggregateID: string) => Effect.Effect +} + +export class Service extends Context.Service()("@opencode/SyncEvent") {} + +export const layer = Layer.succeed( + Service, + Service.of({ + run: (def, data, options) => Effect.sync(() => run(def, data, options)), + replay: (event, options) => Effect.sync(() => replay(event, options)), + replayAll: (events, options) => Effect.sync(() => replayAll(events, options)), + remove: (aggregateID) => Effect.sync(() => remove(aggregateID)), + }), +) + +export const defaultLayer = layer + export const registry = new Map() let projectors: Map | undefined const versions = new Map() diff --git a/packages/opencode/test/sync/index.test.ts b/packages/opencode/test/sync/index.test.ts index 32a08715ca..160d7b02dc 100644 --- a/packages/opencode/test/sync/index.test.ts +++ b/packages/opencode/test/sync/index.test.ts @@ -1,6 +1,6 @@ import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test" import { tmpdir } from "../fixture/fixture" -import { Schema } from "effect" +import { Effect, Schema } from "effect" import { Bus } from "../../src/bus" import { Instance } from "../../src/project/instance" import { SyncEvent } from "../../src/sync" @@ -35,6 +35,21 @@ function withInstance(fn: () => void | Promise) { } } +function runSyncEvent(fn: (sync: SyncEvent.Interface) => Effect.Effect) { + return Effect.runPromise(SyncEvent.Service.use(fn).pipe(Effect.provide(SyncEvent.defaultLayer))) +} + +async function expectRejects(input: Promise, pattern: RegExp) { + try { + await input + } catch (error) { + if (!(error instanceof Error)) throw error + expect(error.message).toMatch(pattern) + return + } + throw new Error("Expected promise to reject") +} + describe("SyncEvent", () => { function setup() { SyncEvent.reset() @@ -67,9 +82,9 @@ describe("SyncEvent", () => { describe("run", () => { test( "inserts event row", - withInstance(() => { + withInstance(async () => { const { Created } = setup() - SyncEvent.run(Created, { id: "evt_1", name: "first" }) + await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "first" })) const rows = Database.use((db) => db.select().from(EventTable).all()) expect(rows).toHaveLength(1) expect(rows[0].type).toBe("item.created.1") @@ -79,10 +94,10 @@ describe("SyncEvent", () => { test( "increments seq per aggregate", - withInstance(() => { + withInstance(async () => { const { Created } = setup() - SyncEvent.run(Created, { id: "evt_1", name: "first" }) - SyncEvent.run(Created, { id: "evt_1", name: "second" }) + await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "first" })) + await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "second" })) const rows = Database.use((db) => db.select().from(EventTable).all()) expect(rows).toHaveLength(2) expect(rows[1].seq).toBe(rows[0].seq + 1) @@ -91,9 +106,9 @@ describe("SyncEvent", () => { test( "uses custom aggregate field from agg()", - withInstance(() => { + withInstance(async () => { const { Sent } = setup() - SyncEvent.run(Sent, { item_id: "evt_1", to: "james" }) + await runSyncEvent((sync) => sync.run(Sent, { item_id: "evt_1", to: "james" })) const rows = Database.use((db) => db.select().from(EventTable).all()) expect(rows).toHaveLength(1) expect(rows[0].aggregate_id).toBe("evt_1") @@ -115,7 +130,7 @@ describe("SyncEvent", () => { }) }) - SyncEvent.run(Created, { id: "evt_1", name: "test" }) + await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "test" })) await received expect(events).toHaveLength(1) @@ -133,15 +148,17 @@ describe("SyncEvent", () => { describe("replay", () => { test( "inserts event from external payload", - withInstance(() => { + withInstance(async () => { const id = Identifier.descending("message") - SyncEvent.replay({ - id: "evt_1", - type: "item.created.1", - seq: 0, - aggregateID: id, - data: { id, name: "replayed" }, - }) + await runSyncEvent((sync) => + sync.replay({ + id: "evt_1", + type: "item.created.1", + seq: 0, + aggregateID: id, + data: { id, name: "replayed" }, + }), + ) const rows = Database.use((db) => db.select().from(EventTable).all()) expect(rows).toHaveLength(1) expect(rows[0].aggregate_id).toBe(id) @@ -150,81 +167,93 @@ describe("SyncEvent", () => { test( "throws on sequence mismatch", - withInstance(() => { + withInstance(async () => { const id = Identifier.descending("message") - SyncEvent.replay({ - id: "evt_1", - type: "item.created.1", - seq: 0, - aggregateID: id, - data: { id, name: "first" }, - }) - expect(() => - SyncEvent.replay({ + await runSyncEvent((sync) => + sync.replay({ id: "evt_1", type: "item.created.1", - seq: 5, + seq: 0, aggregateID: id, - data: { id, name: "bad" }, + data: { id, name: "first" }, }), - ).toThrow(/Sequence mismatch/) + ) + await expectRejects( + runSyncEvent((sync) => + sync.replay({ + id: "evt_1", + type: "item.created.1", + seq: 5, + aggregateID: id, + data: { id, name: "bad" }, + }), + ), + /Sequence mismatch/, + ) }), ) test( "throws on unknown event type", - withInstance(() => { - expect(() => - SyncEvent.replay({ - id: "evt_1", - type: "unknown.event.1", - seq: 0, - aggregateID: "x", - data: {}, - }), - ).toThrow(/Unknown event type/) + withInstance(async () => { + await expectRejects( + runSyncEvent((sync) => + sync.replay({ + id: "evt_1", + type: "unknown.event.1", + seq: 0, + aggregateID: "x", + data: {}, + }), + ), + /Unknown event type/, + ) }), ) test( "replayAll accepts later chunks after the first batch", - withInstance(() => { + withInstance(async () => { const { Created } = setup() const id = Identifier.descending("message") - const one = SyncEvent.replayAll([ - { - id: "evt_1", - type: SyncEvent.versionedType(Created.type, Created.version), - seq: 0, - aggregateID: id, - data: { id, name: "first" }, - }, - { - id: "evt_2", - type: SyncEvent.versionedType(Created.type, Created.version), - seq: 1, - aggregateID: id, - data: { id, name: "second" }, - }, - ]) + const one = await runSyncEvent((sync) => + sync.replayAll([ + { + id: "evt_1", + type: SyncEvent.versionedType(Created.type, Created.version), + seq: 0, + aggregateID: id, + data: { id, name: "first" }, + }, + { + id: "evt_2", + type: SyncEvent.versionedType(Created.type, Created.version), + seq: 1, + aggregateID: id, + data: { id, name: "second" }, + }, + ]), + ) - const two = SyncEvent.replayAll([ - { - id: "evt_3", - type: SyncEvent.versionedType(Created.type, Created.version), - seq: 2, - aggregateID: id, - data: { id, name: "third" }, - }, - { - id: "evt_4", - type: SyncEvent.versionedType(Created.type, Created.version), - seq: 3, - aggregateID: id, - data: { id, name: "fourth" }, - }, - ]) + const two = await runSyncEvent((sync) => + sync.replayAll([ + { + id: "evt_3", + type: SyncEvent.versionedType(Created.type, Created.version), + seq: 2, + aggregateID: id, + data: { id, name: "third" }, + }, + { + id: "evt_4", + type: SyncEvent.versionedType(Created.type, Created.version), + seq: 3, + aggregateID: id, + data: { id, name: "fourth" }, + }, + ]), + ) expect(one).toBe(id) expect(two).toBe(id)