diff --git a/packages/core/src/process.ts b/packages/core/src/process.ts new file mode 100644 index 0000000000..f855459b2c --- /dev/null +++ b/packages/core/src/process.ts @@ -0,0 +1,207 @@ +import { Context, Duration, Effect, Fiber, Layer, Schema, Stream } from "effect" +import type { PlatformError } from "effect/PlatformError" +import { ChildProcess } from "effect/unstable/process" +import { ChildProcessSpawner } from "effect/unstable/process/ChildProcessSpawner" +import { CrossSpawnSpawner } from "./cross-spawn-spawner" + +export class AppProcessError extends Schema.TaggedErrorClass()("AppProcessError", { + command: Schema.String, + exitCode: Schema.optional(Schema.Number), + stderr: Schema.optional(Schema.String), + cause: Schema.optional(Schema.Defect), +}) {} + +export interface RunOptions { + readonly maxOutputBytes?: number + readonly maxErrorBytes?: number + readonly signal?: AbortSignal + readonly timeout?: Duration.Input +} + +export interface RunStreamOptions { + readonly signal?: AbortSignal + readonly includeStderr?: boolean + readonly okExitCodes?: ReadonlyArray + readonly maxErrorBytes?: number +} + +export interface RunResult { + readonly command: string + readonly exitCode: number + readonly stdout: Buffer + readonly stderr: Buffer + readonly truncated: boolean +} + +export type Interface = ChildProcessSpawner["Service"] & { + readonly run: (command: ChildProcess.Command, options?: RunOptions) => Effect.Effect + readonly runStream: ( + command: ChildProcess.Command, + options?: RunStreamOptions, + ) => Stream.Stream +} + +export class Service extends Context.Service()("@opencode/AppProcess") {} + +export const requireSuccess = (result: RunResult): Effect.Effect => + result.exitCode === 0 + ? Effect.succeed(result) + : Effect.fail( + new AppProcessError({ + command: result.command, + exitCode: result.exitCode, + stderr: result.stderr.toString("utf8"), + }), + ) + +export const requireExitIn = + (codes: ReadonlyArray) => + (result: RunResult): Effect.Effect => + codes.includes(result.exitCode) + ? Effect.succeed(result) + : Effect.fail( + new AppProcessError({ + command: result.command, + exitCode: result.exitCode, + stderr: result.stderr.toString("utf8"), + }), + ) + +const describeCommand = (command: ChildProcess.Command): string => { + if (command._tag === "StandardCommand") { + return command.args.length ? `${command.command} ${command.args.join(" ")}` : command.command + } + return `${describeCommand(command.left)} | ${describeCommand(command.right)}` +} + +const wrapError = (description: string, cause: unknown): AppProcessError => + cause instanceof AppProcessError ? cause : new AppProcessError({ command: description, cause }) + +const abortError = (signal: AbortSignal): Error => { + const reason = signal.reason + if (reason instanceof Error) return reason + const err = new Error("Aborted") + err.name = "AbortError" + return err +} + +const waitForAbort = (signal: AbortSignal) => + Effect.callback((resume) => { + if (signal.aborted) { + resume(Effect.fail(abortError(signal))) + return + } + const onabort = () => resume(Effect.fail(abortError(signal))) + signal.addEventListener("abort", onabort, { once: true }) + return Effect.sync(() => signal.removeEventListener("abort", onabort)) + }) + +const collectStream = (stream: Stream.Stream, maxOutputBytes: number | undefined) => + Stream.runFold( + stream, + () => ({ chunks: [] as Uint8Array[], bytes: 0, truncated: false }), + (acc, chunk) => { + if (maxOutputBytes === undefined) { + acc.chunks.push(chunk) + acc.bytes += chunk.length + return acc + } + const remaining = maxOutputBytes - acc.bytes + if (remaining > 0) acc.chunks.push(remaining >= chunk.length ? chunk : chunk.slice(0, remaining)) + acc.bytes += chunk.length + acc.truncated = acc.truncated || acc.bytes > maxOutputBytes + return acc + }, + ).pipe(Effect.map((x) => ({ buffer: Buffer.concat(x.chunks), truncated: x.truncated }))) + +export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const spawner = yield* ChildProcessSpawner + + const run = Effect.fn("AppProcess.run")(function* (command: ChildProcess.Command, options?: RunOptions) { + const description = describeCommand(command) + const collect = Effect.scoped( + Effect.gen(function* () { + const handle = yield* spawner.spawn(command) + const [stdout, stderr, exitCode] = yield* Effect.all( + [ + collectStream(handle.stdout, options?.maxOutputBytes), + collectStream(handle.stderr, options?.maxErrorBytes), + handle.exitCode, + ], + { concurrency: "unbounded" }, + ) + return { + command: description, + exitCode, + stdout: stdout.buffer, + stderr: stderr.buffer, + truncated: stdout.truncated, + } satisfies RunResult + }), + ) + const timed = options?.timeout + ? Effect.timeoutOrElse(collect, { + duration: options.timeout, + orElse: () => + Effect.fail(new AppProcessError({ command: description, cause: new Error("Timed out") })), + }) + : collect + const aborted = options?.signal + ? timed.pipe( + Effect.raceFirst( + waitForAbort(options.signal).pipe(Effect.mapError((cause) => wrapError(description, cause))), + ), + ) + : timed + return yield* aborted.pipe(Effect.catch((cause) => Effect.fail(wrapError(description, cause)))) + }) + + const runStream = (command: ChildProcess.Command, options?: RunStreamOptions): Stream.Stream => { + const description = describeCommand(command) + const okExitCodes = options?.okExitCodes + const built: Stream.Stream = Stream.unwrap( + Effect.gen(function* () { + const handle = yield* spawner.spawn(command) + const stderrFiber = yield* Effect.forkScoped( + collectStream(handle.stderr, options?.maxErrorBytes).pipe( + Effect.map((x) => x.buffer.toString("utf8")), + ), + ) + const source = options?.includeStderr === true ? handle.all : handle.stdout + const lines = source.pipe( + Stream.decodeText, + Stream.splitLines, + Stream.filter((line) => line.length > 0), + ) + const tail = Stream.unwrap( + Effect.gen(function* () { + const code = yield* handle.exitCode + if (okExitCodes && okExitCodes.length > 0 && !okExitCodes.includes(code)) { + const stderr = yield* Fiber.join(stderrFiber) + return Stream.fail(new AppProcessError({ command: description, exitCode: code, stderr })) + } + return Stream.empty + }), + ) + return Stream.concat(lines, tail) as Stream.Stream + }), + ) + const mapped = built.pipe( + Stream.catch((cause): Stream.Stream => Stream.fail(wrapError(description, cause))), + ) + if (!options?.signal) return mapped + const signal = options.signal + return mapped.pipe( + Stream.interruptWhen(waitForAbort(signal).pipe(Effect.mapError((cause) => wrapError(description, cause)))), + ) + } + + return Service.of({ ...spawner, run, runStream }) + }), +) + +export const defaultLayer = layer.pipe(Layer.provide(CrossSpawnSpawner.defaultLayer)) + +export * as AppProcess from "./process" diff --git a/packages/core/test/process/process.test.ts b/packages/core/test/process/process.test.ts new file mode 100644 index 0000000000..1b46c1f1e6 --- /dev/null +++ b/packages/core/test/process/process.test.ts @@ -0,0 +1,210 @@ +import { describe, expect } from "bun:test" +import { Effect, Exit, Stream } from "effect" +import { ChildProcess } from "effect/unstable/process" +import { AppProcess } from "@opencode-ai/core/process" +import { testEffect } from "../lib/effect" + +const it = testEffect(AppProcess.defaultLayer) + +const NODE = process.execPath +const cmd = (...args: string[]) => ChildProcess.make(NODE, args) + +describe("AppProcess", () => { + describe("run", () => { + it.effect( + "captures stdout and exit code zero", + Effect.gen(function* () { + const svc = yield* AppProcess.Service + const result = yield* svc.run(cmd("-e", "process.stdout.write('hi\\n')")) + expect(result.exitCode).toBe(0) + expect(result.stdout.toString("utf8")).toBe("hi\n") + expect(result.truncated).toBe(false) + }), + ) + + it.effect( + "non-zero exit returns RunResult; caller can require success", + Effect.gen(function* () { + const svc = yield* AppProcess.Service + const result = yield* svc.run(cmd("-e", "process.exit(1)")) + expect(result.exitCode).toBe(1) + }), + ) + + it.effect( + "requireSuccess fails on non-zero exit", + Effect.gen(function* () { + const svc = yield* AppProcess.Service + const exit = yield* Effect.exit(svc.run(cmd("-e", "process.exit(1)")).pipe(Effect.flatMap(AppProcess.requireSuccess))) + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + const reason = exit.cause.reasons[0] + if (reason && reason._tag === "Fail") { + expect(reason.error).toBeInstanceOf(AppProcess.AppProcessError) + expect((reason.error as AppProcess.AppProcessError).exitCode).toBe(1) + } else { + throw new Error("expected fail reason") + } + } + }), + ) + + it.effect( + "requireSuccess succeeds on exit 0", + Effect.gen(function* () { + const svc = yield* AppProcess.Service + const result = yield* svc.run(cmd("-e", "process.exit(0)")).pipe(Effect.flatMap(AppProcess.requireSuccess)) + expect(result.exitCode).toBe(0) + }), + ) + + it.effect( + "requireExitIn allowlists multiple exit codes", + Effect.gen(function* () { + const svc = yield* AppProcess.Service + const requireZeroOrOne = AppProcess.requireExitIn([0, 1]) + const okZero = yield* svc.run(cmd("-e", "process.exit(0)")).pipe(Effect.flatMap(requireZeroOrOne)) + expect(okZero.exitCode).toBe(0) + const okOne = yield* svc.run(cmd("-e", "process.exit(1)")).pipe(Effect.flatMap(requireZeroOrOne)) + expect(okOne.exitCode).toBe(1) + const exit = yield* Effect.exit( + svc.run(cmd("-e", "process.exit(2)")).pipe(Effect.flatMap(requireZeroOrOne)), + ) + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + const reason = exit.cause.reasons[0] + if (reason && reason._tag === "Fail") { + expect(reason.error).toBeInstanceOf(AppProcess.AppProcessError) + expect((reason.error as AppProcess.AppProcessError).exitCode).toBe(2) + } + } + }), + ) + + it.effect( + "truncates output when maxOutputBytes is set", + Effect.gen(function* () { + const svc = yield* AppProcess.Service + const result = yield* svc.run(cmd("-e", "process.stdout.write('0123456789')"), { maxOutputBytes: 5 }) + expect(result.exitCode).toBe(0) + expect(result.truncated).toBe(true) + expect(result.stdout.length).toBe(5) + expect(result.stdout.toString("utf8")).toBe("01234") + }), + ) + + it.effect( + "result includes command description", + Effect.gen(function* () { + const svc = yield* AppProcess.Service + const result = yield* svc.run(cmd("-e", "process.stdout.write('hi')")) + expect(result.command).toBe(`${NODE} -e process.stdout.write('hi')`) + }), + ) + }) + + describe("inherited platform methods", () => { + it.effect( + "string returns stdout as string", + Effect.gen(function* () { + const svc = yield* AppProcess.Service + const out = yield* svc.string(cmd("-e", "process.stdout.write('hi\\n')")) + expect(out).toBe("hi\n") + }), + ) + + it.effect( + "lines returns the platform's array of lines", + Effect.gen(function* () { + const svc = yield* AppProcess.Service + const out = yield* svc.lines(cmd("-e", "process.stdout.write('a\\nb\\n')")) + expect(Array.from(out)).toEqual(["a", "b"]) + }), + ) + }) + + describe("runStream", () => { + it.live( + "emits lines incrementally and ends cleanly on exit 0", + Effect.gen(function* () { + const svc = yield* AppProcess.Service + const result = yield* svc + .runStream(cmd("-e", "console.log('one'); console.log('two'); console.log('three')")) + .pipe(Stream.runCollect) + expect(Array.from(result)).toEqual(["one", "two", "three"]) + }), + ) + + it.live( + "fails with AppProcessError when exit not in okExitCodes", + Effect.gen(function* () { + const svc = yield* AppProcess.Service + const exit = yield* Effect.exit( + svc + .runStream(cmd("-e", "console.log('a'); process.exit(2)"), { okExitCodes: [0] }) + .pipe(Stream.runCollect), + ) + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + const reason = exit.cause.reasons[0] + if (reason && reason._tag === "Fail") { + expect(reason.error).toBeInstanceOf(AppProcess.AppProcessError) + } + } + }), + ) + + it.live( + "okExitCodes allowlist treats non-zero as success", + Effect.gen(function* () { + const svc = yield* AppProcess.Service + const result = yield* svc + .runStream(cmd("-e", "console.log('only'); process.exit(1)"), { okExitCodes: [0, 1] }) + .pipe(Stream.runCollect) + expect(Array.from(result)).toEqual(["only"]) + }), + ) + + it.live( + "without okExitCodes, never fails on exit code", + Effect.gen(function* () { + const svc = yield* AppProcess.Service + const result = yield* svc + .runStream(cmd("-e", "console.log('only'); process.exit(7)")) + .pipe(Stream.runCollect) + expect(Array.from(result)).toEqual(["only"]) + }), + ) + + it.live( + "AbortSignal interrupts the stream", + Effect.gen(function* () { + const svc = yield* AppProcess.Service + const controller = new AbortController() + setTimeout(() => controller.abort(), 50) + const exit = yield* Effect.exit( + svc + .runStream(cmd("-e", "setInterval(() => console.log('tick'), 100); setTimeout(() => {}, 60_000)"), { + signal: controller.signal, + }) + .pipe(Stream.runCollect), + ) + expect(Exit.isFailure(exit)).toBe(true) + }), + ) + }) + + describe("spawn (inherited)", () => { + it.live( + "returns the platform ChildProcessHandle for advanced use", + Effect.scoped( + Effect.gen(function* () { + const svc = yield* AppProcess.Service + const handle = yield* svc.spawn(cmd("-e", "setInterval(() => {}, 1_000)")) + expect(yield* handle.isRunning).toBe(true) + yield* handle.kill() + }), + ), + ) + }) +})