fix(effect-drizzle-sqlite): support pipeable transactions

This commit is contained in:
Kit Langton
2026-04-27 22:18:44 -04:00
parent 89efce865d
commit e4ae265d8f
2 changed files with 68 additions and 39 deletions

View File

@@ -31,10 +31,10 @@ export type EffectSQLiteDatabase<
TRelations extends AnyRelations = EmptyRelations,
> = SQLiteBunDatabase<TSchema, TRelations> & {
readonly $client: Database
readonly withTransaction: <A, E>(
transaction: (tx: SQLiteTransaction<"sync", void, TSchema, TRelations>) => Effect.Effect<A, E>,
readonly withTransaction: <A, E, R>(
effect: Effect.Effect<A, E, R>,
config?: SQLiteTransactionConfig,
) => Effect.Effect<A, E>
) => Effect.Effect<A, E, R>
}
export type MakeConfig<
@@ -176,33 +176,48 @@ const attachTransaction = <
TSchema extends Record<string, unknown> = Record<string, never>,
TRelations extends AnyRelations = EmptyRelations,
>(db: SQLiteBunDatabase<TSchema, TRelations> & { readonly $client: Database }): EffectSQLiteDatabase<TSchema, TRelations> => {
const runTransaction = db.transaction.bind(db) as (
transaction: (tx: SQLiteTransaction<"sync", void, TSchema, TRelations>) => unknown,
config?: SQLiteTransactionConfig,
) => unknown
return Object.assign(db, {
withTransaction: <A, E>(
transaction: (tx: SQLiteTransaction<"sync", void, TSchema, TRelations>) => Effect.Effect<A, E>,
const txStack: Array<SQLiteTransaction<"sync", void, TSchema, TRelations>> = []
const current = () => txStack.at(-1) ?? db
const runTransaction = (target: SQLiteBunDatabase<TSchema, TRelations> | SQLiteTransaction<"sync", void, TSchema, TRelations>) =>
target.transaction.bind(target) as (
transaction: (tx: SQLiteTransaction<"sync", void, TSchema, TRelations>) => unknown,
config?: SQLiteTransactionConfig,
) =>
Effect.sync(
() =>
runTransaction(
(tx) =>
Exit.match(Effect.runSyncExit(transaction(tx)), {
onSuccess: (value) => value,
onFailure: (cause) => {
throw new TransactionFailure(cause)
},
}),
config,
) as A,
).pipe(
Effect.catchDefect((defect) =>
defect instanceof TransactionFailure ? Effect.failCause(defect.effectCause as Cause.Cause<E>) : Effect.die(defect),
) => unknown
const withTransaction = <A, E, R>(
effect: Effect.Effect<A, E, R>,
config?: SQLiteTransactionConfig,
): Effect.Effect<A, E, R> =>
Effect.context<R>().pipe(
Effect.flatMap((context) =>
Effect.sync(
() =>
runTransaction(current())((tx) => {
txStack.push(tx)
try {
const exit = Effect.runSyncExit(Effect.provideContext(effect, context))
if (Exit.isSuccess(exit)) return exit.value
throw new TransactionFailure(exit.cause)
} finally {
txStack.pop()
}
}, config) as A,
).pipe(
Effect.catchDefect((defect) =>
defect instanceof TransactionFailure ? Effect.failCause(defect.effectCause as Cause.Cause<E>) : Effect.die(defect),
),
),
),
)
return new Proxy(db, {
get(_target, property) {
if (property === "withTransaction") return withTransaction
if (property === "$client") return db.$client
const value = Reflect.get(current(), property)
return typeof value === "function" ? value.bind(current()) : value
},
}) as EffectSQLiteDatabase<TSchema, TRelations>
}

View File

@@ -104,22 +104,18 @@ describe("effect drizzle sqlite", () => {
testEffect("runs synchronous Effect programs inside transactions", () =>
Effect.gen(function* () {
yield* db.withTransaction((tx) =>
Effect.gen(function* () {
yield* tx.insert(users).values({ id: 1, name: "Ada" })
return yield* tx.select().from(users)
}),
)
yield* Effect.gen(function* () {
yield* db.insert(users).values({ id: 1, name: "Ada" })
return yield* db.select().from(users)
}).pipe(db.withTransaction)
expect(yield* db.select().from(users)).toEqual([{ id: 1, name: "Ada" }])
const exit = yield* Effect.exit(
db.withTransaction((tx) =>
Effect.gen(function* () {
yield* tx.insert(users).values({ id: 2, name: "Grace" })
return yield* Effect.fail("rollback")
}),
),
Effect.gen(function* () {
yield* db.insert(users).values({ id: 2, name: "Grace" })
return yield* Effect.fail("rollback")
}).pipe(db.withTransaction),
)
expect(Exit.isFailure(exit)).toBe(true)
@@ -127,6 +123,24 @@ describe("effect drizzle sqlite", () => {
}),
)
testEffect("supports pipeable transactions using the same database service", () =>
Effect.gen(function* () {
const exit = yield* Effect.gen(function* () {
yield* db.insert(users).values({ id: 1, name: "Ada" })
return yield* Effect.fail("rollback")
}).pipe(db.withTransaction, Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
expect(yield* db.select().from(users)).toEqual([])
yield* Effect.gen(function* () {
yield* db.insert(users).values({ id: 2, name: "Grace" })
}).pipe(db.withTransaction)
expect(yield* db.select().from(users)).toEqual([{ id: 2, name: "Grace" }])
}),
)
testEffect("wraps query failures with query text and parameters", () =>
Effect.gen(function* () {
const exit = yield* Effect.exit(db.insert(posts).values({ id: 1, user_id: 404, title: "Missing" }))