Add SyncEvent service

This commit is contained in:
Kit Langton
2026-04-30 16:09:14 -04:00
parent 3250b814ce
commit ec98b656fd
10 changed files with 223 additions and 149 deletions

View File

@@ -169,6 +169,7 @@ export const layer = Layer.effect(
const auth = yield* Auth.Service
const session = yield* Session.Service
const http = yield* HttpClient.HttpClient
const sync = yield* SyncEvent.Service
const connections = new Map<WorkspaceID, ConnectionStatus>()
const syncFibers = yield* FiberMap.make<WorkspaceID, void, SyncLoopError>()
@@ -307,25 +308,30 @@ export const layer = Layer.effect(
events: events.length,
})
yield* Effect.sync(() =>
WorkspaceContext.provide({
yield* Effect.promise(async () => {
await WorkspaceContext.provide({
workspaceID: space.id,
fn: () => {
for (const event of events) {
SyncEvent.replay(
{
id: event.id,
aggregateID: event.aggregate_id,
seq: event.seq,
type: event.type,
data: event.data,
},
{ publish: true },
)
}
async fn() {
await Effect.runPromise(
Effect.forEach(
events,
(event) =>
sync.replay(
{
id: event.id,
aggregateID: event.aggregate_id,
seq: event.seq,
type: event.type,
data: event.data,
},
{ publish: true },
),
{ discard: true },
),
)
},
}),
)
})
})
})
const syncWorkspaceLoop = Effect.fn("Workspace.syncWorkspaceLoop")(function* (space: Info) {
@@ -361,16 +367,28 @@ export const layer = Layer.effect(
setStatus(space.id, "connected")
yield* parseSSE(stream, (evt) =>
Effect.sync(() => {
Effect.gen(function* () {
if (!evt || typeof evt !== "object" || !("payload" in evt)) return
const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent }
if (payload.type === "server.heartbeat") return
if (payload.type === "sync" && payload.syncEvent) {
const failed = yield* sync.replay(payload.syncEvent).pipe(
Effect.as(false),
Effect.catchCause((error) =>
Effect.sync(() => {
log.info("failed to replay global event", {
workspaceID: space.id,
error,
})
return true
}),
),
)
if (failed) return
}
try {
if (!evt || typeof evt !== "object" || !("payload" in evt)) return
const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent }
if (payload.type === "server.heartbeat") return
if (payload.type === "sync" && payload.syncEvent) {
SyncEvent.replay(payload.syncEvent)
}
const event = evt as { directory?: string; project?: string; payload: unknown }
GlobalBus.emit("event", {
directory: event.directory,
@@ -378,10 +396,10 @@ export const layer = Layer.effect(
workspace: space.id,
payload: event.payload,
})
} catch (err) {
} catch (error) {
log.info("failed to replay global event", {
workspaceID: space.id,
error: err,
error,
})
}
}),
@@ -516,14 +534,12 @@ export const layer = Layer.effect(
const adaptor = getAdaptor(space.projectID, space.type)
const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space)))
yield* Effect.sync(() =>
SyncEvent.run(Session.Event.Updated, {
sessionID: input.sessionID,
info: {
workspaceID: input.workspaceID,
},
}),
)
yield* sync.run(Session.Event.Updated, {
sessionID: input.sessionID,
info: {
workspaceID: input.workspaceID,
},
})
const rows = yield* db((db) =>
db
@@ -593,7 +609,7 @@ export const layer = Layer.effect(
})
if (target.type === "local") {
SyncEvent.replayAll(events)
yield* sync.replayAll(events)
log.info("session restore batch replayed locally", {
workspaceID: input.workspaceID,
sessionID: input.sessionID,
@@ -812,6 +828,7 @@ export const layer = Layer.effect(
export const defaultLayer = layer.pipe(
Layer.provide(Auth.defaultLayer),
Layer.provide(Session.defaultLayer),
Layer.provide(SyncEvent.defaultLayer),
Layer.provide(FetchHttpClient.layer),
)

View File

@@ -47,6 +47,7 @@ import { Pty } from "@/pty"
import { Installation } from "@/installation"
import { ShareNext } from "@/share/share-next"
import { SessionShare } from "@/share/session"
import { SyncEvent } from "@/sync"
import { Npm } from "@opencode-ai/core/npm"
import { memoMap } from "@opencode-ai/core/effect/memo-map"
@@ -97,6 +98,7 @@ export const AppLayer = Layer.mergeAll(
Installation.defaultLayer,
ShareNext.defaultLayer,
SessionShare.defaultLayer,
SyncEvent.defaultLayer,
).pipe(Layer.provideMerge(Observability.layer))
const rt = ManagedRuntime.make(AppLayer, { memoMap })

View File

@@ -21,6 +21,7 @@ export const syncHandlers = HttpApiBuilder.group(InstanceHttpApi, "sync", (handl
Effect.gen(function* () {
const workspace = yield* Workspace.Service
const scope = yield* Scope.Scope
const sync = yield* SyncEvent.Service
const start = Effect.fn("SyncHttpApi.start")(function* () {
yield* workspace
@@ -45,7 +46,7 @@ export const syncHandlers = HttpApiBuilder.group(InstanceHttpApi, "sync", (handl
last: events.at(-1)?.seq,
directory: ctx.payload.directory,
})
SyncEvent.replayAll(events)
yield* sync.replayAll(events)
log.info("sync replay complete", {
sessionID: source,
events: events.length,

View File

@@ -31,6 +31,7 @@ import { SessionSummary } from "@/session/summary"
import { Todo } from "@/session/todo"
import { SessionShare } from "@/share/session"
import { Skill } from "@/skill"
import { SyncEvent } from "@/sync"
import { ToolRegistry } from "@/tool/registry"
import { lazy } from "@/util/lazy"
import { Vcs } from "@/project/vcs"
@@ -147,6 +148,7 @@ export const routes = Layer.mergeAll(rootApiRoutes, instanceRoutes).pipe(
SessionRunState.defaultLayer,
SessionStatus.defaultLayer,
SessionSummary.defaultLayer,
SyncEvent.defaultLayer,
Skill.defaultLayer,
Todo.defaultLayer,
ToolRegistry.defaultLayer,

View File

@@ -94,7 +94,7 @@ export const SyncRoutes = lazy(() =>
last: events.at(-1)?.seq,
directory: body.directory,
})
SyncEvent.replayAll(events)
await AppRuntime.runPromise(SyncEvent.Service.use((sync) => sync.replayAll(events)))
log.info("sync replay complete", {
sessionID: source,

View File

@@ -38,6 +38,7 @@ export const layer = Layer.effect(
const bus = yield* Bus.Service
const summary = yield* SessionSummary.Service
const state = yield* SessionRunState.Service
const sync = yield* SyncEvent.Service
const revert = Effect.fn("SessionRevert.revert")(function* (input: RevertInput) {
yield* state.assertNotBusy(input.sessionID)
@@ -121,7 +122,7 @@ export const layer = Layer.effect(
remove.push(msg)
}
for (const msg of remove) {
SyncEvent.run(MessageV2.Event.Removed, {
yield* sync.run(MessageV2.Event.Removed, {
sessionID,
messageID: msg.info.id,
})
@@ -133,7 +134,7 @@ export const layer = Layer.effect(
const removeParts = target.parts.slice(idx)
target.parts = target.parts.slice(0, idx)
for (const part of removeParts) {
SyncEvent.run(MessageV2.Event.PartRemoved, {
yield* sync.run(MessageV2.Event.PartRemoved, {
sessionID,
messageID: target.info.id,
partID: part.id,
@@ -156,6 +157,7 @@ export const defaultLayer = Layer.suspend(() =>
Layer.provide(Storage.defaultLayer),
Layer.provide(Bus.layer),
Layer.provide(SessionSummary.defaultLayer),
Layer.provide(SyncEvent.defaultLayer),
),
)

View File

@@ -443,11 +443,12 @@ export type Patch = Types.DeepMutable<SyncEvent.Event<typeof Event.Updated>["dat
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
Effect.sync(() => Database.use(fn))
export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> = Layer.effect(
export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service | SyncEvent.Service> = Layer.effect(
Service,
Effect.gen(function* () {
const bus = yield* Bus.Service
const storage = yield* Storage.Service
const sync = yield* SyncEvent.Service
const createNext = Effect.fn("Session.createNext")(function* (input: {
id?: SessionID
@@ -477,7 +478,7 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
}
log.info("created", result)
yield* Effect.sync(() => SyncEvent.run(Event.Created, { sessionID: result.id, info: result }))
yield* sync.run(Event.Created, { sessionID: result.id, info: result })
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
// This only exist for backwards compatibility. We should not be
@@ -525,10 +526,8 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
Effect.catchCause(() => Effect.succeed(false)),
)
yield* Effect.sync(() => {
SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance })
SyncEvent.remove(sessionID)
})
yield* sync.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance })
yield* sync.remove(sessionID)
} catch (e) {
log.error(e)
}
@@ -536,19 +535,17 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
const updateMessage = <T extends MessageV2.Info>(msg: T): Effect.Effect<T> =>
Effect.gen(function* () {
yield* Effect.sync(() => SyncEvent.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg }))
yield* sync.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg })
return msg
}).pipe(Effect.withSpan("Session.updateMessage"))
const updatePart = <T extends MessageV2.Part>(part: T): Effect.Effect<T> =>
Effect.gen(function* () {
yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.PartUpdated, {
sessionID: part.sessionID,
part: structuredClone(part),
time: Date.now(),
}),
)
yield* sync.run(MessageV2.Event.PartUpdated, {
sessionID: part.sessionID,
part: structuredClone(part),
time: Date.now(),
})
return part
}).pipe(Effect.withSpan("Session.updatePart"))
@@ -635,8 +632,7 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
return session
})
const patch = (sessionID: SessionID, info: Patch) =>
Effect.sync(() => SyncEvent.run(Event.Updated, { sessionID, info }))
const patch = (sessionID: SessionID, info: Patch) => sync.run(Event.Updated, { sessionID, info })
const touch = Effect.fn("Session.touch")(function* (sessionID: SessionID) {
yield* patch(sessionID, { time: { updated: Date.now() } })
@@ -693,12 +689,10 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
sessionID: SessionID
messageID: MessageID
}) {
yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.Removed, {
sessionID: input.sessionID,
messageID: input.messageID,
}),
)
yield* sync.run(MessageV2.Event.Removed, {
sessionID: input.sessionID,
messageID: input.messageID,
})
return input.messageID
})
@@ -707,13 +701,11 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
messageID: MessageID
partID: PartID
}) {
yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.PartRemoved, {
sessionID: input.sessionID,
messageID: input.messageID,
partID: input.partID,
}),
)
yield* sync.run(MessageV2.Event.PartRemoved, {
sessionID: input.sessionID,
messageID: input.messageID,
partID: input.partID,
})
return input.partID
})
@@ -764,7 +756,11 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
}),
)
export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Storage.defaultLayer))
export const defaultLayer = layer.pipe(
Layer.provide(Bus.layer),
Layer.provide(Storage.defaultLayer),
Layer.provide(SyncEvent.defaultLayer),
)
export function* list(input?: {
directory?: string

View File

@@ -21,20 +21,19 @@ export const layer = Layer.effect(
const session = yield* Session.Service
const shareNext = yield* ShareNext.Service
const scope = yield* Scope.Scope
const sync = yield* SyncEvent.Service
const share = Effect.fn("SessionShare.share")(function* (sessionID: SessionID) {
const conf = yield* cfg.get()
if (conf.share === "disabled") throw new Error("Sharing is disabled in configuration")
const result = yield* shareNext.create(sessionID)
yield* Effect.sync(() =>
SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }),
)
yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } })
return result
})
const unshare = Effect.fn("SessionShare.unshare")(function* (sessionID: SessionID) {
yield* shareNext.remove(sessionID)
yield* Effect.sync(() => SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } }))
yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } })
})
const create = Effect.fn("SessionShare.create")(function* (input?: Session.CreateInput) {
@@ -54,6 +53,7 @@ export const defaultLayer = layer.pipe(
Layer.provide(ShareNext.defaultLayer),
Layer.provide(Session.defaultLayer),
Layer.provide(Config.defaultLayer),
Layer.provide(SyncEvent.defaultLayer),
)
export * as SessionShare from "./session"

View File

@@ -9,7 +9,7 @@ import { EventSequenceTable, EventTable } from "./event.sql"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import { EventID } from "./schema"
import { Flag } from "@opencode-ai/core/flag/flag"
import { Schema as EffectSchema } from "effect"
import { Context, Effect, Layer, Schema as EffectSchema } from "effect"
import { zodObject } from "@/util/effect-zod"
import type { DeepMutable } from "@/util/schema"
@@ -46,6 +46,31 @@ export type SerializedEvent<Def extends Definition = Definition> = Event<Def> &
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
type ConvertEvent = (type: string, data: Event["data"]) => unknown | Promise<unknown>
export interface Interface {
readonly run: <Def extends Definition>(
def: Def,
data: Event<Def>["data"],
options?: { publish?: boolean },
) => Effect.Effect<void>
readonly replay: (event: SerializedEvent, options?: { publish: boolean }) => Effect.Effect<void>
readonly replayAll: (events: SerializedEvent[], options?: { publish: boolean }) => Effect.Effect<string | undefined>
readonly remove: (aggregateID: string) => Effect.Effect<void>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/SyncEvent") {}
export const layer = Layer.succeed(
Service,
Service.of({
run: (def, data, options) => Effect.sync(() => run(def, data, options)),
replay: (event, options) => Effect.sync(() => replay(event, options)),
replayAll: (events, options) => Effect.sync(() => replayAll(events, options)),
remove: (aggregateID) => Effect.sync(() => remove(aggregateID)),
}),
)
export const defaultLayer = layer
export const registry = new Map<string, Definition>()
let projectors: Map<Definition, ProjectorFunc> | undefined
const versions = new Map<string, number>()

View File

@@ -1,6 +1,6 @@
import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test"
import { tmpdir } from "../fixture/fixture"
import { Schema } from "effect"
import { Effect, Schema } from "effect"
import { Bus } from "../../src/bus"
import { Instance } from "../../src/project/instance"
import { SyncEvent } from "../../src/sync"
@@ -35,6 +35,21 @@ function withInstance(fn: () => void | Promise<void>) {
}
}
function runSyncEvent<A>(fn: (sync: SyncEvent.Interface) => Effect.Effect<A>) {
return Effect.runPromise(SyncEvent.Service.use(fn).pipe(Effect.provide(SyncEvent.defaultLayer)))
}
async function expectRejects(input: Promise<unknown>, pattern: RegExp) {
try {
await input
} catch (error) {
if (!(error instanceof Error)) throw error
expect(error.message).toMatch(pattern)
return
}
throw new Error("Expected promise to reject")
}
describe("SyncEvent", () => {
function setup() {
SyncEvent.reset()
@@ -67,9 +82,9 @@ describe("SyncEvent", () => {
describe("run", () => {
test(
"inserts event row",
withInstance(() => {
withInstance(async () => {
const { Created } = setup()
SyncEvent.run(Created, { id: "evt_1", name: "first" })
await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "first" }))
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].type).toBe("item.created.1")
@@ -79,10 +94,10 @@ describe("SyncEvent", () => {
test(
"increments seq per aggregate",
withInstance(() => {
withInstance(async () => {
const { Created } = setup()
SyncEvent.run(Created, { id: "evt_1", name: "first" })
SyncEvent.run(Created, { id: "evt_1", name: "second" })
await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "first" }))
await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "second" }))
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(2)
expect(rows[1].seq).toBe(rows[0].seq + 1)
@@ -91,9 +106,9 @@ describe("SyncEvent", () => {
test(
"uses custom aggregate field from agg()",
withInstance(() => {
withInstance(async () => {
const { Sent } = setup()
SyncEvent.run(Sent, { item_id: "evt_1", to: "james" })
await runSyncEvent((sync) => sync.run(Sent, { item_id: "evt_1", to: "james" }))
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].aggregate_id).toBe("evt_1")
@@ -115,7 +130,7 @@ describe("SyncEvent", () => {
})
})
SyncEvent.run(Created, { id: "evt_1", name: "test" })
await runSyncEvent((sync) => sync.run(Created, { id: "evt_1", name: "test" }))
await received
expect(events).toHaveLength(1)
@@ -133,15 +148,17 @@ describe("SyncEvent", () => {
describe("replay", () => {
test(
"inserts event from external payload",
withInstance(() => {
withInstance(async () => {
const id = Identifier.descending("message")
SyncEvent.replay({
id: "evt_1",
type: "item.created.1",
seq: 0,
aggregateID: id,
data: { id, name: "replayed" },
})
await runSyncEvent((sync) =>
sync.replay({
id: "evt_1",
type: "item.created.1",
seq: 0,
aggregateID: id,
data: { id, name: "replayed" },
}),
)
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].aggregate_id).toBe(id)
@@ -150,81 +167,93 @@ describe("SyncEvent", () => {
test(
"throws on sequence mismatch",
withInstance(() => {
withInstance(async () => {
const id = Identifier.descending("message")
SyncEvent.replay({
id: "evt_1",
type: "item.created.1",
seq: 0,
aggregateID: id,
data: { id, name: "first" },
})
expect(() =>
SyncEvent.replay({
await runSyncEvent((sync) =>
sync.replay({
id: "evt_1",
type: "item.created.1",
seq: 5,
seq: 0,
aggregateID: id,
data: { id, name: "bad" },
data: { id, name: "first" },
}),
).toThrow(/Sequence mismatch/)
)
await expectRejects(
runSyncEvent((sync) =>
sync.replay({
id: "evt_1",
type: "item.created.1",
seq: 5,
aggregateID: id,
data: { id, name: "bad" },
}),
),
/Sequence mismatch/,
)
}),
)
test(
"throws on unknown event type",
withInstance(() => {
expect(() =>
SyncEvent.replay({
id: "evt_1",
type: "unknown.event.1",
seq: 0,
aggregateID: "x",
data: {},
}),
).toThrow(/Unknown event type/)
withInstance(async () => {
await expectRejects(
runSyncEvent((sync) =>
sync.replay({
id: "evt_1",
type: "unknown.event.1",
seq: 0,
aggregateID: "x",
data: {},
}),
),
/Unknown event type/,
)
}),
)
test(
"replayAll accepts later chunks after the first batch",
withInstance(() => {
withInstance(async () => {
const { Created } = setup()
const id = Identifier.descending("message")
const one = SyncEvent.replayAll([
{
id: "evt_1",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 0,
aggregateID: id,
data: { id, name: "first" },
},
{
id: "evt_2",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 1,
aggregateID: id,
data: { id, name: "second" },
},
])
const one = await runSyncEvent((sync) =>
sync.replayAll([
{
id: "evt_1",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 0,
aggregateID: id,
data: { id, name: "first" },
},
{
id: "evt_2",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 1,
aggregateID: id,
data: { id, name: "second" },
},
]),
)
const two = SyncEvent.replayAll([
{
id: "evt_3",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 2,
aggregateID: id,
data: { id, name: "third" },
},
{
id: "evt_4",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 3,
aggregateID: id,
data: { id, name: "fourth" },
},
])
const two = await runSyncEvent((sync) =>
sync.replayAll([
{
id: "evt_3",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 2,
aggregateID: id,
data: { id, name: "third" },
},
{
id: "evt_4",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 3,
aggregateID: id,
data: { id, name: "fourth" },
},
]),
)
expect(one).toBe(id)
expect(two).toBe(id)