Fix runner cancel completion (#27115)

This commit is contained in:
Kit Langton
2026-05-12 14:22:56 -04:00
committed by GitHub
parent 3974520742
commit 822eec0d62
2 changed files with 23 additions and 18 deletions

View File

@@ -181,7 +181,7 @@ export const make = <A, E = never>(
return [
Effect.gen(function* () {
yield* Fiber.interrupt(st.run.fiber)
yield* Deferred.await(st.run.done).pipe(Effect.exit, Effect.asVoid)
yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid)
yield* idleIfCurrent()
}),
{ _tag: "Idle" } as const,

View File

@@ -3,6 +3,11 @@ import { Deferred, Effect, Exit, Fiber, Latch, Ref, Scope } from "effect"
import { Runner } from "@/effect/runner"
import { it } from "../lib/effect"
const waitForState = <A, E>(runner: Runner.Runner<A, E>, tag: Runner.State<A, E>["_tag"]) =>
Effect.gen(function* () {
while (runner.state._tag !== tag) yield* Effect.yieldNow
}).pipe(Effect.timeout("1 second"))
describe("Runner", () => {
// --- ensureRunning semantics ---
@@ -152,7 +157,7 @@ describe("Runner", () => {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s, { onInterrupt: Effect.succeed("fallback") })
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "Running")
yield* runner.cancel
@@ -169,9 +174,9 @@ describe("Runner", () => {
const runner = Runner.make<string>(s, { onInterrupt: Effect.succeed("fallback") })
const a = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "Running")
const b = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* Effect.yieldNow
yield* runner.cancel
@@ -189,7 +194,7 @@ describe("Runner", () => {
const s = yield* Scope.Scope
const runner = Runner.make<string>(s)
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "Running")
yield* runner.cancel
yield* Fiber.await(fiber)
@@ -215,7 +220,7 @@ describe("Runner", () => {
)
const a = yield* runner.ensureRunning(first).pipe(Effect.exit, Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "Running")
const stop = yield* runner.cancel.pipe(Effect.forkChild)
yield* Deferred.await(hit).pipe(Effect.timeout("250 millis"))
@@ -293,7 +298,7 @@ describe("Runner", () => {
const gate = yield* Deferred.make<void>()
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("first"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "Shell")
const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
@@ -314,7 +319,7 @@ describe("Runner", () => {
})
const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "Shell")
const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
@@ -333,7 +338,7 @@ describe("Runner", () => {
const gate = yield* Deferred.make<void>()
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ignored"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "Shell")
const stop = yield* runner.cancel.pipe(Effect.forkChild)
const stopExit = yield* Fiber.await(stop).pipe(Effect.timeout("250 millis"))
@@ -380,11 +385,11 @@ describe("Runner", () => {
const gate = yield* Deferred.make<void>()
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell-result"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "Shell")
expect(runner.state._tag).toBe("Shell")
const run = yield* runner.ensureRunning(Effect.succeed("run-result")).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "ShellThenRun")
expect(runner.state._tag).toBe("ShellThenRun")
yield* Deferred.succeed(gate, undefined)
@@ -406,7 +411,7 @@ describe("Runner", () => {
const gate = yield* Deferred.make<void>()
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "Shell")
const work = Effect.gen(function* () {
yield* Ref.update(calls, (n) => n + 1)
@@ -414,7 +419,7 @@ describe("Runner", () => {
})
const a = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
const b = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "ShellThenRun")
yield* Deferred.succeed(gate, undefined)
yield* Fiber.await(sh)
@@ -433,10 +438,10 @@ describe("Runner", () => {
const runner = Runner.make<string>(s)
const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "Shell")
const run = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "ShellThenRun")
expect(runner.state._tag).toBe("ShellThenRun")
yield* runner.cancel
@@ -472,7 +477,7 @@ describe("Runner", () => {
onIdle: Ref.update(count, (n) => n + 1),
})
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "Running")
yield* runner.cancel
yield* Fiber.await(fiber)
expect(yield* Ref.get(count)).toBeGreaterThanOrEqual(1)
@@ -502,7 +507,7 @@ describe("Runner", () => {
const gate = yield* Deferred.make<void>()
const fiber = yield* runner.ensureRunning(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "Running")
expect(runner.busy).toBe(true)
yield* Deferred.succeed(gate, undefined)
@@ -519,7 +524,7 @@ describe("Runner", () => {
const gate = yield* Deferred.make<void>()
const fiber = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
yield* Effect.sleep("10 millis")
yield* waitForState(runner, "Shell")
expect(runner.busy).toBe(true)
yield* Deferred.succeed(gate, undefined)