effect(core): add stdin option to AppProcess.run; migrate snapshot+clipboard (#27224)

This commit is contained in:
Kit Langton
2026-05-13 11:10:23 -04:00
committed by GitHub
parent 650f67a05a
commit ca723f1cbc
4 changed files with 150 additions and 118 deletions

View File

@@ -16,6 +16,7 @@ export interface RunOptions {
readonly maxErrorBytes?: number
readonly signal?: AbortSignal
readonly timeout?: Duration.Input
readonly stdin?: string | Uint8Array | Stream.Stream<Uint8Array, PlatformError>
}
export interface RunStreamOptions {
@@ -96,6 +97,15 @@ const waitForAbort = (signal: AbortSignal) =>
return Effect.sync(() => signal.removeEventListener("abort", onabort))
})
const normalizeStdin = (
input: string | Uint8Array | Stream.Stream<Uint8Array, PlatformError>,
): Stream.Stream<Uint8Array, PlatformError> =>
typeof input === "string"
? Stream.make(new TextEncoder().encode(input))
: input instanceof Uint8Array
? Stream.make(input)
: input
const collectStream = (stream: Stream.Stream<Uint8Array, PlatformError>, maxOutputBytes: number | undefined) =>
Stream.runFold(
stream,
@@ -119,7 +129,7 @@ export const layer = Layer.effect(
Effect.gen(function* () {
const spawner = yield* ChildProcessSpawner
const run = Effect.fn("AppProcess.run")(function* (command: ChildProcess.Command, options?: RunOptions) {
const runCommand = (command: ChildProcess.Command, options?: RunOptions) => {
const description = describeCommand(command)
const collect = Effect.scoped(
Effect.gen(function* () {
@@ -154,7 +164,22 @@ export const layer = Layer.effect(
),
)
: timed
return yield* aborted.pipe(Effect.catch((cause) => Effect.fail(wrapError(description, cause))))
return aborted.pipe(Effect.catch((cause) => Effect.fail(wrapError(description, cause))))
}
const run = Effect.fn("AppProcess.run")(function* (command: ChildProcess.Command, options?: RunOptions) {
if (options?.stdin === undefined) return yield* runCommand(command, options)
if (command._tag !== "StandardCommand") {
return yield* new AppProcessError({
command: describeCommand(command),
cause: new Error("stdin option only supports StandardCommand; received PipedCommand"),
})
}
const next = ChildProcess.make(command.command, command.args, {
...command.options,
stdin: normalizeStdin(options.stdin),
})
return yield* runCommand(next, options)
})
const runStream = (

View File

@@ -1,4 +1,6 @@
import { describe, expect } from "bun:test"
import { realpathSync } from "node:fs"
import { tmpdir } from "node:os"
import { Effect, Exit, Stream } from "effect"
import { ChildProcess } from "effect/unstable/process"
import { AppProcess } from "@opencode-ai/core/process"
@@ -123,6 +125,82 @@ describe("AppProcess", () => {
)
})
describe("run with stdin option", () => {
const echoStdin = "process.stdin.on('data', c => process.stdout.write(c))"
it.effect(
"feeds a string to stdin and returns it on stdout",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const result = yield* svc.run(cmd("-e", echoStdin), { stdin: "hello" })
expect(result.exitCode).toBe(0)
expect(result.stdout.toString("utf8")).toBe("hello")
}),
)
it.effect(
"feeds a Uint8Array to stdin",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const bytes = new TextEncoder().encode("bytes")
const result = yield* svc.run(cmd("-e", echoStdin), { stdin: bytes })
expect(result.exitCode).toBe(0)
expect(result.stdout.toString("utf8")).toBe("bytes")
}),
)
it.effect(
"feeds a Stream of Uint8Array chunks to stdin",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const enc = new TextEncoder()
const stream = Stream.fromIterable([enc.encode("one"), enc.encode("-two"), enc.encode("-three")])
const result = yield* svc.run(cmd("-e", echoStdin), { stdin: stream })
expect(result.exitCode).toBe(0)
expect(result.stdout.toString("utf8")).toBe("one-two-three")
}),
)
it.effect(
"completes correctly with empty input",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const result = yield* svc.run(cmd("-e", echoStdin), { stdin: "" })
expect(result.exitCode).toBe(0)
expect(result.stdout.toString("utf8")).toBe("")
}),
)
it.effect(
"carries existing Command options like env",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const script =
"process.stdout.write(process.env.FEED + ':'); process.stdin.on('data', c => process.stdout.write(c))"
const command = ChildProcess.make(NODE, ["-e", script], { env: { FEED: "envset" }, extendEnv: true })
const result = yield* svc.run(command, { stdin: "payload" })
expect(result.exitCode).toBe(0)
expect(result.stdout.toString("utf8")).toBe("envset:payload")
}),
)
it.effect(
"carries existing Command options like cwd",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const dir = realpathSync(tmpdir())
const script =
"process.stdout.write(process.cwd() + '|'); process.stdin.on('data', c => process.stdout.write(c))"
const command = ChildProcess.make(NODE, ["-e", script], { cwd: dir })
const result = yield* svc.run(command, { stdin: "ok" })
expect(result.exitCode).toBe(0)
const [cwd, stdin] = result.stdout.toString("utf8").split("|")
expect(realpathSync(cwd)).toBe(dir)
expect(stdin).toBe("ok")
}),
)
})
describe("runStream", () => {
it.live(
"emits lines incrementally and ends cleanly on exit 0",
@@ -136,11 +214,17 @@ describe("AppProcess", () => {
)
it.live(
"fails with AppProcessError when exit not in okExitCodes",
"okExitCodes determines whether a non-zero exit fails the stream",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const allowed = yield* svc
.runStream(cmd("-e", "console.log('only'); process.exit(1)"), { okExitCodes: [0, 1] })
.pipe(Stream.runCollect)
expect(Array.from(allowed)).toEqual(["only"])
const exit = yield* Effect.exit(
svc.runStream(cmd("-e", "console.log('a'); process.exit(2)"), { okExitCodes: [0] }).pipe(Stream.runCollect),
svc
.runStream(cmd("-e", "console.log('a'); process.exit(2)"), { okExitCodes: [0, 1] })
.pipe(Stream.runCollect),
)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
@@ -152,17 +236,6 @@ describe("AppProcess", () => {
}),
)
it.live(
"okExitCodes allowlist treats non-zero as success",
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const result = yield* svc
.runStream(cmd("-e", "console.log('only'); process.exit(1)"), { okExitCodes: [0, 1] })
.pipe(Stream.runCollect)
expect(Array.from(result)).toEqual(["only"])
}),
)
it.live(
"without okExitCodes, never fails on exit code",
Effect.gen(function* () {
@@ -177,12 +250,10 @@ describe("AppProcess", () => {
Effect.gen(function* () {
const svc = yield* AppProcess.Service
const controller = new AbortController()
setTimeout(() => controller.abort(), 50)
controller.abort()
const exit = yield* Effect.exit(
svc
.runStream(cmd("-e", "setInterval(() => console.log('tick'), 100); setTimeout(() => {}, 60_000)"), {
signal: controller.signal,
})
.runStream(cmd("-e", "setInterval(() => {}, 60_000)"), { signal: controller.signal })
.pipe(Stream.runCollect),
)
expect(Exit.isFailure(exit)).toBe(true)

View File

@@ -3,9 +3,21 @@ import { lazy } from "../../../../util/lazy.js"
import { tmpdir } from "os"
import path from "path"
import fs from "fs/promises"
import { Effect } from "effect"
import { ChildProcess } from "effect/unstable/process"
import { AppProcess } from "@opencode-ai/core/process"
import * as Filesystem from "../../../../util/filesystem"
import * as Process from "../../../../util/process"
const writeWithStdin = (cmd: string[], text: string): Promise<void> =>
Effect.runPromise(
AppProcess.Service.use((svc) => svc.run(ChildProcess.make(cmd[0]!, cmd.slice(1)), { stdin: text })).pipe(
Effect.provide(AppProcess.defaultLayer),
Effect.catch(() => Effect.void),
Effect.asVoid,
),
).catch(() => undefined)
// Lazy load which and clipboardy to avoid expensive execa/which/isexe chain at startup
const getWhich = lazy(async () => {
const { which } = await import("../../../../util/which")
@@ -125,49 +137,23 @@ const getCopyMethod = lazy(async () => {
if (os === "linux") {
if (process.env["WAYLAND_DISPLAY"] && which("wl-copy")) {
console.log("clipboard: using wl-copy")
return async (text: string) => {
const proc = Process.spawn(["wl-copy"], { stdin: "pipe", stdout: "ignore", stderr: "ignore" })
if (!proc.stdin) return
proc.stdin.write(text)
proc.stdin.end()
await proc.exited.catch(() => {})
}
return (text: string) => writeWithStdin(["wl-copy"], text)
}
if (which("xclip")) {
console.log("clipboard: using xclip")
return async (text: string) => {
const proc = Process.spawn(["xclip", "-selection", "clipboard"], {
stdin: "pipe",
stdout: "ignore",
stderr: "ignore",
})
if (!proc.stdin) return
proc.stdin.write(text)
proc.stdin.end()
await proc.exited.catch(() => {})
}
return (text: string) => writeWithStdin(["xclip", "-selection", "clipboard"], text)
}
if (which("xsel")) {
console.log("clipboard: using xsel")
return async (text: string) => {
const proc = Process.spawn(["xsel", "--clipboard", "--input"], {
stdin: "pipe",
stdout: "ignore",
stderr: "ignore",
})
if (!proc.stdin) return
proc.stdin.write(text)
proc.stdin.end()
await proc.exited.catch(() => {})
}
return (text: string) => writeWithStdin(["xsel", "--clipboard", "--input"], text)
}
}
if (os === "win32") {
console.log("clipboard: using powershell")
return async (text: string) => {
return (text: string) =>
// Pipe via stdin to avoid PowerShell string interpolation ($env:FOO, $(), etc.)
const proc = Process.spawn(
writeWithStdin(
[
"powershell.exe",
"-NonInteractive",
@@ -175,18 +161,8 @@ const getCopyMethod = lazy(async () => {
"-Command",
"[Console]::InputEncoding = [System.Text.Encoding]::UTF8; Set-Clipboard -Value ([Console]::In.ReadToEnd())",
],
{
stdin: "pipe",
stdout: "ignore",
stderr: "ignore",
},
text,
)
if (!proc.stdin) return
proc.stdin.write(text)
proc.stdin.end()
await proc.exited.catch(() => {})
}
}
console.log("clipboard: no native support")

View File

@@ -1,4 +1,4 @@
import { Cause, Duration, Effect, Layer, Schedule, Schema, Semaphore, Context, Stream } from "effect"
import { Cause, Duration, Effect, Layer, Schedule, Schema, Semaphore, Context } from "effect"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import { formatPatch, structuredPatch } from "diff"
import path from "path"
@@ -84,48 +84,13 @@ export const layer: Layer.Layer<Service, never, AppFileSystem.Service | AppProce
const args = (cmd: string[]) => ["--git-dir", state.gitdir, "--work-tree", state.worktree, ...cmd]
const enc = new TextEncoder()
const feed = (list: string[]) => Stream.make(enc.encode(list.join("\0") + "\0"))
const gitWithStdin = Effect.fnUntraced(
function* (
cmd: string[],
opts: { cwd?: string; env?: Record<string, string>; stdin: ChildProcess.CommandInput },
) {
// stdin-feed calls still need raw spawn — AppProcess.run does not yet
// expose a stdin Stream API. Tracked as future AppProcess helper.
const proc = ChildProcess.make("git", cmd, {
cwd: opts.cwd,
env: opts.env,
extendEnv: true,
stdin: opts.stdin,
})
const handle = yield* appProcess.spawn(proc)
const [text, stderr] = yield* Effect.all(
[Stream.mkString(Stream.decodeText(handle.stdout)), Stream.mkString(Stream.decodeText(handle.stderr))],
{ concurrency: 2 },
)
const code = yield* handle.exitCode
return { code, text, stderr } satisfies GitResult
},
Effect.scoped,
Effect.catch((err) =>
Effect.succeed({
code: ChildProcessSpawner.ExitCode(1),
text: "",
stderr: err instanceof Error ? err.message : String(err),
}),
),
)
const feed = (list: string[]) => list.join("\0") + "\0"
const git = Effect.fnUntraced(
function* (cmd: string[], opts?: { cwd?: string; env?: Record<string, string> }) {
function* (cmd: string[], opts?: { cwd?: string; env?: Record<string, string>; stdin?: string }) {
const result = yield* appProcess.run(
ChildProcess.make("git", cmd, {
cwd: opts?.cwd,
env: opts?.env,
extendEnv: true,
}),
ChildProcess.make("git", cmd, { cwd: opts?.cwd, env: opts?.env, extendEnv: true }),
{ stdin: opts?.stdin },
)
return {
code: ChildProcessSpawner.ExitCode(result.exitCode),
@@ -144,7 +109,7 @@ export const layer: Layer.Layer<Service, never, AppFileSystem.Service | AppProce
const ignore = Effect.fnUntraced(function* (files: string[]) {
if (!files.length) return new Set<string>()
const check = yield* gitWithStdin(
const check = yield* git(
[
...quote,
"--git-dir",
@@ -167,7 +132,7 @@ export const layer: Layer.Layer<Service, never, AppFileSystem.Service | AppProce
const drop = Effect.fnUntraced(function* (files: string[]) {
if (!files.length) return
yield* gitWithStdin(
yield* git(
[
...cfg,
...args(["rm", "--cached", "-f", "--ignore-unmatch", "--pathspec-from-file=-", "--pathspec-file-nul"]),
@@ -181,7 +146,7 @@ export const layer: Layer.Layer<Service, never, AppFileSystem.Service | AppProce
const stage = Effect.fnUntraced(function* (files: string[]) {
if (!files.length) return
const result = yield* gitWithStdin(
const result = yield* git(
[...cfg, ...args(["add", "--all", "--sparse", "--pathspec-from-file=-", "--pathspec-file-nul"])],
{
cwd: state.directory,
@@ -590,26 +555,21 @@ export const layer: Layer.Layer<Service, never, AppFileSystem.Service | AppProce
})
if (!refs.length) return new Map<string, { before: string; after: string }>()
// cat-file --batch is a stdin-feed call — kept on raw spawn
// until AppProcess.run exposes a stdin Stream API.
const proc = ChildProcess.make("git", [...cfg, ...args(["cat-file", "--batch"])], {
cwd: state.directory,
extendEnv: true,
stdin: Stream.make(new TextEncoder().encode(refs.map((item) => item.ref).join("\n") + "\n")),
})
const handle = yield* appProcess.spawn(proc)
const [out, err] = yield* Effect.all(
[Stream.mkUint8Array(handle.stdout), Stream.mkString(Stream.decodeText(handle.stderr))],
{ concurrency: 2 },
const batch = yield* appProcess.run(
ChildProcess.make("git", [...cfg, ...args(["cat-file", "--batch"])], {
cwd: state.directory,
extendEnv: true,
}),
{ stdin: refs.map((item) => item.ref).join("\n") + "\n" },
)
const code = yield* handle.exitCode
if (code !== 0) {
if (batch.exitCode !== 0) {
log.info("git cat-file --batch failed during snapshot diff, falling back to per-file git show", {
stderr: err,
stderr: batch.stderr.toString("utf8"),
refs: refs.length,
})
return
}
const out = batch.stdout
const fail = (msg: string, extra?: Record<string, string>) => {
log.info(msg, { ...extra, refs: refs.length })