From 46daede10c55288220dab68e61e90b7ca56f1eb7 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Tue, 12 May 2026 23:25:52 -0400 Subject: [PATCH] test(pty): migrate output isolation to Effect runner (#27235) --- .../test/pty/pty-output-isolation.test.ts | 271 +++++++++--------- 1 file changed, 143 insertions(+), 128 deletions(-) diff --git a/packages/opencode/test/pty/pty-output-isolation.test.ts b/packages/opencode/test/pty/pty-output-isolation.test.ts index 662042b64c..0fa710f02a 100644 --- a/packages/opencode/test/pty/pty-output-isolation.test.ts +++ b/packages/opencode/test/pty/pty-output-isolation.test.ts @@ -1,147 +1,162 @@ -import { describe, expect, test } from "bun:test" -import { AppRuntime } from "../../src/effect/app-runtime" -import { Effect } from "effect" -import { Instance } from "../../src/project/instance" -import { WithInstance } from "../../src/project/with-instance" +import { describe, expect } from "bun:test" +import { Bus } from "../../src/bus" +import { Config } from "../../src/config/config" +import { Plugin } from "../../src/plugin" import { Pty } from "../../src/pty" -import { tmpdir } from "../fixture/fixture" -import { setTimeout as sleep } from "node:timers/promises" +import { Duration, Effect, Layer, Queue } from "effect" +import { testEffect } from "../lib/effect" + +type Socket = Parameters[1] + +const it = testEffect( + Pty.layer.pipe( + Layer.provideMerge(Bus.layer), + Layer.provideMerge(Config.defaultLayer), + Layer.provideMerge(Plugin.defaultLayer), + ), +) +const ptyTest = process.platform === "win32" ? it.instance.skip : it.instance + +const createPty = Effect.fn("PtyOutputIsolationTest.createPty")(function* (input: Pty.CreateInput) { + const pty = yield* Pty.Service + return yield* Effect.acquireRelease(pty.create(input), (info) => pty.remove(info.id).pipe(Effect.ignore)) +}) + +const decodeOutput = (data: string | Uint8Array | ArrayBuffer) => + typeof data === "string" + ? data + : Buffer.from(data instanceof Uint8Array ? data : new Uint8Array(data)).toString("utf8") + +const makeSocket = Effect.fn("PtyOutputIsolationTest.makeSocket")(function* (data: unknown) { + const output = yield* Queue.unbounded() + const chunks: string[] = [] + const socket: Socket = { + readyState: 1, + data, + send: (data) => { + const text = decodeOutput(data) + chunks.push(text) + Queue.offerUnsafe(output, text) + }, + close: () => { + // no-op (simulate abrupt drop) + }, + } + + return { socket, output, chunks } +}) + +const waitForOutput = (output: Queue.Queue, text: string, duration: Duration.Input = "5 seconds") => + Effect.gen(function* () { + let received = "" + while (!received.includes(text)) { + received += yield* Queue.take(output) + } + return received + }).pipe( + Effect.timeoutOrElse({ + duration, + orElse: () => Effect.fail(new Error(`timeout waiting for output containing ${JSON.stringify(text)}`)), + }), + ) + +const waitForLeakedOutput = (output: Queue.Queue, text: string) => + Effect.gen(function* () { + let received = "" + while (!received.includes(text)) { + received += yield* Queue.take(output) + } + return received + }).pipe( + Effect.timeoutOrElse({ + duration: "100 millis", + orElse: () => Effect.succeed(undefined), + }), + ) describe("pty", () => { - test("does not leak output when websocket objects are reused", async () => { - await using dir = await tmpdir({ git: true }) + ptyTest( + "does not leak output when websocket objects are reused", + () => + Effect.gen(function* () { + const pty = yield* Pty.Service + const a = yield* createPty({ command: "cat", title: "a" }) + const b = yield* createPty({ command: "cat", title: "b" }) + const connectionA = yield* makeSocket({ events: { connection: "a" } }) + const connectionB = { events: { connection: "b" } } - await WithInstance.provide({ - directory: dir.path, - fn: () => - AppRuntime.runPromise( - Effect.gen(function* () { - const pty = yield* Pty.Service - const a = yield* pty.create({ command: "cat", title: "a" }) - const b = yield* pty.create({ command: "cat", title: "b" }) - try { - const outA: string[] = [] - const outB: string[] = [] + yield* pty.connect(a.id, connectionA.socket) - const ws = { - readyState: 1, - data: { events: { connection: "a" } }, - send: (data: unknown) => { - outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) - }, - close: () => { - // no-op (simulate abrupt drop) - }, - } + const outBQueue = yield* Queue.unbounded() + const outB: string[] = [] + connectionA.socket.data = connectionB + connectionA.socket.send = (data) => { + const text = decodeOutput(data) + outB.push(text) + Queue.offerUnsafe(outBQueue, text) + } + yield* pty.connect(b.id, connectionA.socket) - yield* pty.connect(a.id, ws as any) + connectionA.chunks.length = 0 + outB.length = 0 - ws.data = { events: { connection: "b" } } - ws.send = (data: unknown) => { - outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) - } - yield* pty.connect(b.id, ws as any) + yield* pty.write(a.id, "AAA\n") + const verifyA = yield* makeSocket({ events: { connection: "verify-a" } }) + yield* pty.connect(a.id, verifyA.socket) + yield* waitForOutput(verifyA.output, "AAA") - outA.length = 0 - outB.length = 0 + expect(outB.join("")).not.toContain("AAA") + expect(yield* waitForLeakedOutput(outBQueue, "AAA")).toBeUndefined() + }), + { git: true }, + ) - yield* pty.write(a.id, "AAA\n") - yield* Effect.promise(() => sleep(100)) + ptyTest( + "does not leak output when Bun recycles websocket objects before re-connect", + () => + Effect.gen(function* () { + const pty = yield* Pty.Service + const a = yield* createPty({ command: "cat", title: "a" }) + const outA = yield* makeSocket({ events: { connection: "a" } }) + const outB = yield* Queue.unbounded() - expect(outB.join("")).not.toContain("AAA") - } finally { - yield* pty.remove(a.id) - yield* pty.remove(b.id) - } - }), - ), - }) - }) + yield* pty.connect(a.id, outA.socket) + outA.chunks.length = 0 - test("does not leak output when Bun recycles websocket objects before re-connect", async () => { - await using dir = await tmpdir({ git: true }) + const connectionB = { events: { connection: "b" } } + outA.socket.data = connectionB + outA.socket.send = (data) => { + Queue.offerUnsafe(outB, decodeOutput(data)) + } - await WithInstance.provide({ - directory: dir.path, - fn: () => - AppRuntime.runPromise( - Effect.gen(function* () { - const pty = yield* Pty.Service - const a = yield* pty.create({ command: "cat", title: "a" }) - try { - const outA: string[] = [] - const outB: string[] = [] + yield* pty.write(a.id, "AAA\n") + const verifyA = yield* makeSocket({ events: { connection: "verify-a" } }) + yield* pty.connect(a.id, verifyA.socket) + yield* waitForOutput(verifyA.output, "AAA") - const ws = { - readyState: 1, - data: { events: { connection: "a" } }, - send: (data: unknown) => { - outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) - }, - close: () => { - // no-op (simulate abrupt drop) - }, - } + expect(yield* waitForLeakedOutput(outB, "AAA")).toBeUndefined() + }), + { git: true }, + ) - yield* pty.connect(a.id, ws as any) - outA.length = 0 + ptyTest( + "treats in-place socket data mutation as the same connection", + () => + Effect.gen(function* () { + const pty = yield* Pty.Service + const a = yield* createPty({ command: "cat", title: "a" }) + const ctx = { connId: 1 } + const out = yield* makeSocket(ctx) - ws.data = { events: { connection: "b" } } - ws.send = (data: unknown) => { - outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) - } + yield* pty.connect(a.id, out.socket) + out.chunks.length = 0 - yield* pty.write(a.id, "AAA\n") - yield* Effect.promise(() => sleep(100)) + ctx.connId = 2 - expect(outB.join("")).not.toContain("AAA") - } finally { - yield* pty.remove(a.id) - } - }), - ), - }) - }) + yield* pty.write(a.id, "AAA\n") - test("treats in-place socket data mutation as the same connection", async () => { - await using dir = await tmpdir({ git: true }) - - await WithInstance.provide({ - directory: dir.path, - fn: () => - AppRuntime.runPromise( - Effect.gen(function* () { - const pty = yield* Pty.Service - const a = yield* pty.create({ command: "cat", title: "a" }) - try { - const out: string[] = [] - - const ctx = { connId: 1 } - const ws = { - readyState: 1, - data: ctx, - send: (data: unknown) => { - out.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) - }, - close: () => { - // no-op - }, - } - - yield* pty.connect(a.id, ws as any) - out.length = 0 - - ctx.connId = 2 - - yield* pty.write(a.id, "AAA\n") - yield* Effect.promise(() => sleep(100)) - - expect(out.join("")).toContain("AAA") - } finally { - yield* pty.remove(a.id) - } - }), - ), - }) - }) + expect(yield* waitForOutput(out.output, "AAA")).toContain("AAA") + }), + { git: true }, + ) })