effect(core): add AppProcess service (Phase 1) (#27178)

This commit is contained in:
Kit Langton
2026-05-12 21:24:53 -04:00
committed by GitHub
parent 31e2d72bf7
commit b6d3fa09bc
2 changed files with 417 additions and 0 deletions

View File

@@ -0,0 +1,207 @@
import { Context, Duration, Effect, Fiber, Layer, Schema, Stream } from "effect"
import type { PlatformError } from "effect/PlatformError"
import { ChildProcess } from "effect/unstable/process"
import { ChildProcessSpawner } from "effect/unstable/process/ChildProcessSpawner"
import { CrossSpawnSpawner } from "./cross-spawn-spawner"
export class AppProcessError extends Schema.TaggedErrorClass<AppProcessError>()("AppProcessError", {
command: Schema.String,
exitCode: Schema.optional(Schema.Number),
stderr: Schema.optional(Schema.String),
cause: Schema.optional(Schema.Defect),
}) {}
export interface RunOptions {
readonly maxOutputBytes?: number
readonly maxErrorBytes?: number
readonly signal?: AbortSignal
readonly timeout?: Duration.Input
}
export interface RunStreamOptions {
readonly signal?: AbortSignal
readonly includeStderr?: boolean
readonly okExitCodes?: ReadonlyArray<number>
readonly maxErrorBytes?: number
}
export interface RunResult {
readonly command: string
readonly exitCode: number
readonly stdout: Buffer
readonly stderr: Buffer
readonly truncated: boolean
}
export type Interface = ChildProcessSpawner["Service"] & {
readonly run: (command: ChildProcess.Command, options?: RunOptions) => Effect.Effect<RunResult, AppProcessError>
readonly runStream: (
command: ChildProcess.Command,
options?: RunStreamOptions,
) => Stream.Stream<string, AppProcessError>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/AppProcess") {}
export const requireSuccess = (result: RunResult): Effect.Effect<RunResult, AppProcessError> =>
result.exitCode === 0
? Effect.succeed(result)
: Effect.fail(
new AppProcessError({
command: result.command,
exitCode: result.exitCode,
stderr: result.stderr.toString("utf8"),
}),
)
export const requireExitIn =
(codes: ReadonlyArray<number>) =>
(result: RunResult): Effect.Effect<RunResult, AppProcessError> =>
codes.includes(result.exitCode)
? Effect.succeed(result)
: Effect.fail(
new AppProcessError({
command: result.command,
exitCode: result.exitCode,
stderr: result.stderr.toString("utf8"),
}),
)
const describeCommand = (command: ChildProcess.Command): string => {
if (command._tag === "StandardCommand") {
return command.args.length ? `${command.command} ${command.args.join(" ")}` : command.command
}
return `${describeCommand(command.left)} | ${describeCommand(command.right)}`
}
const wrapError = (description: string, cause: unknown): AppProcessError =>
cause instanceof AppProcessError ? cause : new AppProcessError({ command: description, cause })
const abortError = (signal: AbortSignal): Error => {
const reason = signal.reason
if (reason instanceof Error) return reason
const err = new Error("Aborted")
err.name = "AbortError"
return err
}
const waitForAbort = (signal: AbortSignal) =>
Effect.callback<never, Error>((resume) => {
if (signal.aborted) {
resume(Effect.fail(abortError(signal)))
return
}
const onabort = () => resume(Effect.fail(abortError(signal)))
signal.addEventListener("abort", onabort, { once: true })
return Effect.sync(() => signal.removeEventListener("abort", onabort))
})
const collectStream = (stream: Stream.Stream<Uint8Array, PlatformError>, maxOutputBytes: number | undefined) =>
Stream.runFold(
stream,
() => ({ chunks: [] as Uint8Array[], bytes: 0, truncated: false }),
(acc, chunk) => {
if (maxOutputBytes === undefined) {
acc.chunks.push(chunk)
acc.bytes += chunk.length
return acc
}
const remaining = maxOutputBytes - acc.bytes
if (remaining > 0) acc.chunks.push(remaining >= chunk.length ? chunk : chunk.slice(0, remaining))
acc.bytes += chunk.length
acc.truncated = acc.truncated || acc.bytes > maxOutputBytes
return acc
},
).pipe(Effect.map((x) => ({ buffer: Buffer.concat(x.chunks), truncated: x.truncated })))
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const spawner = yield* ChildProcessSpawner
const run = Effect.fn("AppProcess.run")(function* (command: ChildProcess.Command, options?: RunOptions) {
const description = describeCommand(command)
const collect = Effect.scoped(
Effect.gen(function* () {
const handle = yield* spawner.spawn(command)
const [stdout, stderr, exitCode] = yield* Effect.all(
[
collectStream(handle.stdout, options?.maxOutputBytes),
collectStream(handle.stderr, options?.maxErrorBytes),
handle.exitCode,
],
{ concurrency: "unbounded" },
)
return {
command: description,
exitCode,
stdout: stdout.buffer,
stderr: stderr.buffer,
truncated: stdout.truncated,
} satisfies RunResult
}),
)
const timed = options?.timeout
? Effect.timeoutOrElse(collect, {
duration: options.timeout,
orElse: () =>
Effect.fail(new AppProcessError({ command: description, cause: new Error("Timed out") })),
})
: collect
const aborted = options?.signal
? timed.pipe(
Effect.raceFirst(
waitForAbort(options.signal).pipe(Effect.mapError((cause) => wrapError(description, cause))),
),
)
: timed
return yield* aborted.pipe(Effect.catch((cause) => Effect.fail(wrapError(description, cause))))
})
const runStream = (command: ChildProcess.Command, options?: RunStreamOptions): Stream.Stream<string, AppProcessError> => {
const description = describeCommand(command)
const okExitCodes = options?.okExitCodes
const built: Stream.Stream<string, AppProcessError | PlatformError> = Stream.unwrap(
Effect.gen(function* () {
const handle = yield* spawner.spawn(command)
const stderrFiber = yield* Effect.forkScoped(
collectStream(handle.stderr, options?.maxErrorBytes).pipe(
Effect.map((x) => x.buffer.toString("utf8")),
),
)
const source = options?.includeStderr === true ? handle.all : handle.stdout
const lines = source.pipe(
Stream.decodeText,
Stream.splitLines,
Stream.filter((line) => line.length > 0),
)
const tail = Stream.unwrap(
Effect.gen(function* () {
const code = yield* handle.exitCode
if (okExitCodes && okExitCodes.length > 0 && !okExitCodes.includes(code)) {
const stderr = yield* Fiber.join(stderrFiber)
return Stream.fail(new AppProcessError({ command: description, exitCode: code, stderr }))
}
return Stream.empty
}),
)
return Stream.concat(lines, tail) as Stream.Stream<string, AppProcessError | PlatformError>
}),
)
const mapped = built.pipe(
Stream.catch((cause): Stream.Stream<string, AppProcessError> => Stream.fail(wrapError(description, cause))),
)
if (!options?.signal) return mapped
const signal = options.signal
return mapped.pipe(
Stream.interruptWhen(waitForAbort(signal).pipe(Effect.mapError((cause) => wrapError(description, cause)))),
)
}
return Service.of({ ...spawner, run, runStream })
}),
)
export const defaultLayer = layer.pipe(Layer.provide(CrossSpawnSpawner.defaultLayer))
export * as AppProcess from "./process"

View File

@@ -0,0 +1,210 @@
import { describe, expect } from "bun:test"
import { Effect, Exit, Stream } from "effect"
import { ChildProcess } from "effect/unstable/process"
import { AppProcess } from "@opencode-ai/core/process"
import { testEffect } from "../lib/effect"
const it = testEffect(AppProcess.defaultLayer)
const NODE = process.execPath
const cmd = (...args: string[]) => ChildProcess.make(NODE, args)
describe("AppProcess", () => {
describe("run", () => {
it.effect(
"captures stdout and exit code zero",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const result = yield* svc.run(cmd("-e", "process.stdout.write('hi\\n')"))
expect(result.exitCode).toBe(0)
expect(result.stdout.toString("utf8")).toBe("hi\n")
expect(result.truncated).toBe(false)
}),
)
it.effect(
"non-zero exit returns RunResult; caller can require success",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const result = yield* svc.run(cmd("-e", "process.exit(1)"))
expect(result.exitCode).toBe(1)
}),
)
it.effect(
"requireSuccess fails on non-zero exit",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const exit = yield* Effect.exit(svc.run(cmd("-e", "process.exit(1)")).pipe(Effect.flatMap(AppProcess.requireSuccess)))
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
const reason = exit.cause.reasons[0]
if (reason && reason._tag === "Fail") {
expect(reason.error).toBeInstanceOf(AppProcess.AppProcessError)
expect((reason.error as AppProcess.AppProcessError).exitCode).toBe(1)
} else {
throw new Error("expected fail reason")
}
}
}),
)
it.effect(
"requireSuccess succeeds on exit 0",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const result = yield* svc.run(cmd("-e", "process.exit(0)")).pipe(Effect.flatMap(AppProcess.requireSuccess))
expect(result.exitCode).toBe(0)
}),
)
it.effect(
"requireExitIn allowlists multiple exit codes",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const requireZeroOrOne = AppProcess.requireExitIn([0, 1])
const okZero = yield* svc.run(cmd("-e", "process.exit(0)")).pipe(Effect.flatMap(requireZeroOrOne))
expect(okZero.exitCode).toBe(0)
const okOne = yield* svc.run(cmd("-e", "process.exit(1)")).pipe(Effect.flatMap(requireZeroOrOne))
expect(okOne.exitCode).toBe(1)
const exit = yield* Effect.exit(
svc.run(cmd("-e", "process.exit(2)")).pipe(Effect.flatMap(requireZeroOrOne)),
)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
const reason = exit.cause.reasons[0]
if (reason && reason._tag === "Fail") {
expect(reason.error).toBeInstanceOf(AppProcess.AppProcessError)
expect((reason.error as AppProcess.AppProcessError).exitCode).toBe(2)
}
}
}),
)
it.effect(
"truncates output when maxOutputBytes is set",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const result = yield* svc.run(cmd("-e", "process.stdout.write('0123456789')"), { maxOutputBytes: 5 })
expect(result.exitCode).toBe(0)
expect(result.truncated).toBe(true)
expect(result.stdout.length).toBe(5)
expect(result.stdout.toString("utf8")).toBe("01234")
}),
)
it.effect(
"result includes command description",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const result = yield* svc.run(cmd("-e", "process.stdout.write('hi')"))
expect(result.command).toBe(`${NODE} -e process.stdout.write('hi')`)
}),
)
})
describe("inherited platform methods", () => {
it.effect(
"string returns stdout as string",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const out = yield* svc.string(cmd("-e", "process.stdout.write('hi\\n')"))
expect(out).toBe("hi\n")
}),
)
it.effect(
"lines returns the platform's array of lines",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const out = yield* svc.lines(cmd("-e", "process.stdout.write('a\\nb\\n')"))
expect(Array.from(out)).toEqual(["a", "b"])
}),
)
})
describe("runStream", () => {
it.live(
"emits lines incrementally and ends cleanly on exit 0",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const result = yield* svc
.runStream(cmd("-e", "console.log('one'); console.log('two'); console.log('three')"))
.pipe(Stream.runCollect)
expect(Array.from(result)).toEqual(["one", "two", "three"])
}),
)
it.live(
"fails with AppProcessError when exit not in okExitCodes",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const exit = yield* Effect.exit(
svc
.runStream(cmd("-e", "console.log('a'); process.exit(2)"), { okExitCodes: [0] })
.pipe(Stream.runCollect),
)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
const reason = exit.cause.reasons[0]
if (reason && reason._tag === "Fail") {
expect(reason.error).toBeInstanceOf(AppProcess.AppProcessError)
}
}
}),
)
it.live(
"okExitCodes allowlist treats non-zero as success",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const result = yield* svc
.runStream(cmd("-e", "console.log('only'); process.exit(1)"), { okExitCodes: [0, 1] })
.pipe(Stream.runCollect)
expect(Array.from(result)).toEqual(["only"])
}),
)
it.live(
"without okExitCodes, never fails on exit code",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const result = yield* svc
.runStream(cmd("-e", "console.log('only'); process.exit(7)"))
.pipe(Stream.runCollect)
expect(Array.from(result)).toEqual(["only"])
}),
)
it.live(
"AbortSignal interrupts the stream",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const controller = new AbortController()
setTimeout(() => controller.abort(), 50)
const exit = yield* Effect.exit(
svc
.runStream(cmd("-e", "setInterval(() => console.log('tick'), 100); setTimeout(() => {}, 60_000)"), {
signal: controller.signal,
})
.pipe(Stream.runCollect),
)
expect(Exit.isFailure(exit)).toBe(true)
}),
)
})
describe("spawn (inherited)", () => {
it.live(
"returns the platform ChildProcessHandle for advanced use",
Effect.scoped(
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const handle = yield* svc.spawn(cmd("-e", "setInterval(() => {}, 1_000)"))
expect(yield* handle.isRunning).toBe(true)
yield* handle.kill()
}),
),
)
})
})