From 2f919b8bc73bc2bfd8424854d7740196a8340d37 Mon Sep 17 00:00:00 2001 From: Shoubhit Dash Date: Fri, 1 May 2026 19:05:06 +0530 Subject: [PATCH] refactor(task): use background jobs --- packages/opencode/src/effect/app-runtime.ts | 2 + packages/opencode/src/session/prompt.ts | 3 -- packages/opencode/src/tool/registry.ts | 3 ++ packages/opencode/src/tool/task.ts | 30 +++++++++---- packages/opencode/src/tool/task_status.ts | 45 +++++++++++++++++++ packages/opencode/test/session/prompt.test.ts | 2 + .../test/session/snapshot-tool-race.test.ts | 2 + packages/opencode/test/tool/task.test.ts | 33 +++++++------- .../opencode/test/tool/task_status.test.ts | 2 + 9 files changed, 94 insertions(+), 28 deletions(-) diff --git a/packages/opencode/src/effect/app-runtime.ts b/packages/opencode/src/effect/app-runtime.ts index 06969ff9d1..a69b2e00ad 100644 --- a/packages/opencode/src/effect/app-runtime.ts +++ b/packages/opencode/src/effect/app-runtime.ts @@ -50,6 +50,7 @@ import { SessionShare } from "@/share/session" import { SyncEvent } from "@/sync" import { Npm } from "@opencode-ai/core/npm" import { memoMap } from "@opencode-ai/core/effect/memo-map" +import { BackgroundJob } from "@/background/job" export const AppLayer = Layer.mergeAll( Npm.defaultLayer, @@ -75,6 +76,7 @@ export const AppLayer = Layer.mergeAll( Todo.defaultLayer, Session.defaultLayer, SessionStatus.defaultLayer, + BackgroundJob.defaultLayer, SessionRunState.defaultLayer, SessionProcessor.defaultLayer, SessionCompaction.defaultLayer, diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 8fafc10761..e57d867e09 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -118,9 +118,6 @@ export const layer = Layer.effect( resolvePromptParts: (template: string) => resolvePromptParts(template), prompt: (input: PromptInput) => prompt(input), loop: (input: LoopInput) => loop(input), - fork: (effect: Effect.Effect) => { - run.fork(effect) - }, } satisfies TaskPromptOps }) diff --git a/packages/opencode/src/tool/registry.ts b/packages/opencode/src/tool/registry.ts index 148c9de4e6..fd57101d1b 100644 --- a/packages/opencode/src/tool/registry.ts +++ b/packages/opencode/src/tool/registry.ts @@ -48,6 +48,7 @@ import { Agent } from "../agent/agent" import { Skill } from "../skill" import { Permission } from "@/permission" import { SessionStatus } from "@/session/status" +import { BackgroundJob } from "@/background/job" const log = Log.create({ service: "tool.registry" }) @@ -86,6 +87,7 @@ export const layer: Layer.Layer< | Instruction.Service | AppFileSystem.Service | Bus.Service + | BackgroundJob.Service | HttpClient.HttpClient | ChildProcessSpawner | Ripgrep.Service @@ -343,6 +345,7 @@ export const defaultLayer = Layer.suspend(() => Layer.provide(Instruction.defaultLayer), Layer.provide(AppFileSystem.defaultLayer), Layer.provide(Bus.layer), + Layer.provide(BackgroundJob.defaultLayer), Layer.provide(FetchHttpClient.layer), Layer.provide(Format.defaultLayer), Layer.provide(CrossSpawnSpawner.defaultLayer), diff --git a/packages/opencode/src/tool/task.ts b/packages/opencode/src/tool/task.ts index 061321b6b0..8bd01f88af 100644 --- a/packages/opencode/src/tool/task.ts +++ b/packages/opencode/src/tool/task.ts @@ -10,13 +10,13 @@ import { SessionStatus } from "@/session/status" import { TuiEvent } from "@/cli/cmd/tui/event" import { Cause, Effect, Option, Schema } from "effect" import { Config } from "@/config/config" +import { BackgroundJob } from "@/background/job" export interface TaskPromptOps { cancel(sessionID: SessionID): void resolvePromptParts(template: string): Effect.Effect prompt(input: SessionPrompt.PromptInput): Effect.Effect loop(input: SessionPrompt.LoopInput): Effect.Effect - fork(effect: Effect.Effect): void } const id = "task" @@ -80,6 +80,7 @@ export const TaskTool = Tool.define( const config = yield* Config.Service const sessions = yield* Session.Service const status = yield* SessionStatus.Service + const jobs = yield* BackgroundJob.Service const run = Effect.fn( "TaskTool.execute", @@ -236,17 +237,28 @@ export const TaskTool = Tool.define( yield* continueIfIdle({ userID: message.info.id, state }) }) - ops.fork( - runTask().pipe( + yield* jobs.start({ + id: nextSession.id, + type: id, + title: params.description, + metadata: { + parentSessionID: ctx.sessionID, + sessionID: nextSession.id, + subagent: next.name, + }, + run: runTask().pipe( Effect.matchCauseEffect({ - onSuccess: (text) => inject("completed", text), - onFailure: (cause) => - inject("error", errorText(Cause.squash(cause))).pipe(Effect.catchCause(() => Effect.void)), + onSuccess: (text) => inject("completed", text).pipe(Effect.as(text)), + onFailure: (cause) => { + const text = errorText(Cause.squash(cause)) + return inject("error", text).pipe( + Effect.catchCause(() => Effect.void), + Effect.andThen(Effect.failCause(cause)), + ) + }, }), - Effect.catchCause(() => Effect.void), - Effect.asVoid, ), - ) + }) return { title: params.description, diff --git a/packages/opencode/src/tool/task_status.ts b/packages/opencode/src/tool/task_status.ts index f49994d10c..90240d8220 100644 --- a/packages/opencode/src/tool/task_status.ts +++ b/packages/opencode/src/tool/task_status.ts @@ -6,6 +6,7 @@ import { MessageV2 } from "@/session/message-v2" import { SessionStatus } from "@/session/status" import { PositiveInt } from "@/util/schema" import { Effect, Option, Schema } from "effect" +import { BackgroundJob } from "@/background/job" const DEFAULT_TIMEOUT = 60_000 const POLL_MS = 300 @@ -34,11 +35,31 @@ function errorText(error: NonNullable) { return error.name } +function jobResult(job: BackgroundJob.Info): InspectResult { + if (job.status === "running") { + return { + state: "running", + text: "Task is still running.", + } + } + if (job.status === "completed") { + return { + state: "completed", + text: job.output ?? "", + } + } + return { + state: "error", + text: job.error ?? `Task ${job.status}.`, + } +} + export const TaskStatusTool = Tool.define( "task_status", Effect.gen(function* () { const sessions = yield* Session.Service const status = yield* SessionStatus.Service + const jobs = yield* BackgroundJob.Service const inspect: (taskID: SessionID) => Effect.Effect = Effect.fn("TaskStatusTool.inspect")(function* ( taskID: SessionID, @@ -119,6 +140,30 @@ export const TaskStatusTool = Tool.define( )(function* (params: Schema.Schema.Type, _ctx: Tool.Context) { yield* sessions.get(params.task_id) + const job = yield* jobs.get(params.task_id) + const waitedJob = + job && params.wait === true + ? yield* jobs.wait({ id: params.task_id, timeout: params.timeout_ms ?? DEFAULT_TIMEOUT }) + : { info: job, timedOut: false } + if (waitedJob.info) { + const result = jobResult(waitedJob.info) + return { + title: "Task status", + metadata: { + task_id: params.task_id, + state: result.state, + timed_out: waitedJob.timedOut, + }, + output: format({ + taskID: params.task_id, + state: result.state, + text: waitedJob.timedOut + ? `Timed out after ${params.timeout_ms ?? DEFAULT_TIMEOUT}ms while waiting for task completion.` + : result.text, + }), + } + } + const waited = params.wait === true ? yield* waitForTerminal(params.task_id, params.timeout_ms ?? DEFAULT_TIMEOUT) diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index 5330569401..221357dc84 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -41,6 +41,7 @@ import * as Log from "@opencode-ai/core/util/log" import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" import { Ripgrep } from "../../src/file/ripgrep" import { Format } from "../../src/format" +import { BackgroundJob } from "@/background/job" import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture" import { testEffect } from "../lib/effect" import { reply, TestLLMServer } from "../lib/llm-server" @@ -177,6 +178,7 @@ function makeHttp() { Layer.provide(CrossSpawnSpawner.defaultLayer), Layer.provide(Ripgrep.defaultLayer), Layer.provide(Format.defaultLayer), + Layer.provide(BackgroundJob.defaultLayer), Layer.provideMerge(todo), Layer.provideMerge(question), Layer.provideMerge(deps), diff --git a/packages/opencode/test/session/snapshot-tool-race.test.ts b/packages/opencode/test/session/snapshot-tool-race.test.ts index ab5a3ab7ed..20c2925bae 100644 --- a/packages/opencode/test/session/snapshot-tool-race.test.ts +++ b/packages/opencode/test/session/snapshot-tool-race.test.ts @@ -55,6 +55,7 @@ import { AppFileSystem } from "@opencode-ai/core/filesystem" import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" import { Ripgrep } from "../../src/file/ripgrep" import { Format } from "../../src/format" +import { BackgroundJob } from "@/background/job" void Log.init({ print: false }) @@ -130,6 +131,7 @@ function makeHttp() { Layer.provide(CrossSpawnSpawner.defaultLayer), Layer.provide(Ripgrep.defaultLayer), Layer.provide(Format.defaultLayer), + Layer.provide(BackgroundJob.defaultLayer), Layer.provideMerge(todo), Layer.provideMerge(question), Layer.provideMerge(deps), diff --git a/packages/opencode/test/tool/task.test.ts b/packages/opencode/test/tool/task.test.ts index 7f80da1dd5..6267d8b506 100644 --- a/packages/opencode/test/tool/task.test.ts +++ b/packages/opencode/test/tool/task.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, expect } from "bun:test" -import { Effect, Layer } from "effect" +import { Deferred, Effect, Layer } from "effect" import { Agent } from "../../src/agent/agent" import { Bus } from "../../src/bus" import { Config } from "@/config/config" @@ -14,6 +14,7 @@ import { ModelID, ProviderID } from "../../src/provider/schema" import { TaskTool, type TaskPromptOps } from "../../src/tool/task" import { Truncate } from "@/tool/truncate" import { ToolRegistry } from "@/tool/registry" +import { BackgroundJob } from "@/background/job" import { provideTmpdirInstance } from "../fixture/fixture" import { testEffect } from "../lib/effect" @@ -34,6 +35,7 @@ const it = testEffect( CrossSpawnSpawner.defaultLayer, Session.defaultLayer, SessionStatus.defaultLayer, + BackgroundJob.defaultLayer, Truncate.defaultLayer, ToolRegistry.defaultLayer, ), @@ -68,13 +70,17 @@ const seed = Effect.fn("TaskToolTest.seed")(function* (title = "Pinned") { return { chat, assistant } }) -function stubOps(session: Session.Interface, opts?: { onPrompt?: (input: SessionPrompt.PromptInput) => void; text?: string }): TaskPromptOps { +function stubOps( + session: Session.Interface, + opts?: { onPrompt?: (input: SessionPrompt.PromptInput) => void; text?: string; wait?: Effect.Effect }, +): TaskPromptOps { return { cancel() {}, resolvePromptParts: (template) => Effect.succeed([{ type: "text" as const, text: template }]), prompt: (input) => Effect.gen(function* () { opts?.onPrompt?.(input) + if (opts?.wait) yield* opts.wait const userID = input.messageID ?? MessageID.ascending() const user: MessageV2.User = { id: userID, @@ -120,7 +126,6 @@ function stubOps(session: Session.Interface, opts?: { onPrompt?: (input: Session opts?.text ?? "done", ), ), - fork() {}, } } @@ -438,10 +443,11 @@ describe("tool.task", () => { provideTmpdirInstance(() => Effect.gen(function* () { const sessions = yield* Session.Service + const jobs = yield* BackgroundJob.Service const { chat, assistant } = yield* seed() const tool = yield* TaskTool const def = yield* tool.init() - const forks: Effect.Effect[] = [] + const latch = yield* Deferred.make() const result = yield* def.execute( { @@ -456,12 +462,7 @@ describe("tool.task", () => { agent: "build", abort: new AbortController().signal, extra: { - promptOps: { - ...stubOps(sessions), - fork(effect) { - forks.push(effect) - }, - } satisfies TaskPromptOps, + promptOps: stubOps(sessions, { wait: Deferred.await(latch) }), }, messages: [], metadata: () => Effect.void, @@ -473,7 +474,10 @@ describe("tool.task", () => { expect(result.metadata.background).toBe(true) expect(result.output).toContain(`task_id: ${result.metadata.sessionId}`) expect(result.output).toContain("state: running") - expect(forks).toHaveLength(1) + expect((yield* jobs.get(result.metadata.sessionId))?.status).toBe("running") + + yield* Deferred.succeed(latch, undefined) + expect((yield* jobs.wait({ id: result.metadata.sessionId })).info?.status).toBe("completed") }), ), ) @@ -482,10 +486,10 @@ describe("tool.task", () => { provideTmpdirInstance(() => Effect.gen(function* () { const sessions = yield* Session.Service + const jobs = yield* BackgroundJob.Service const { chat, assistant } = yield* seed() const tool = yield* TaskTool const def = yield* tool.init() - const forks: Effect.Effect[] = [] const loops: string[] = [] const result = yield* def.execute( @@ -518,9 +522,6 @@ describe("tool.task", () => { ), ) }, - fork(effect) { - forks.push(effect) - }, } satisfies TaskPromptOps, }, messages: [], @@ -529,7 +530,7 @@ describe("tool.task", () => { }, ) - yield* forks[0]! + expect((yield* jobs.wait({ id: result.metadata.sessionId })).info?.status).toBe("completed") const parent = yield* sessions.findMessage(chat.id, (msg) => msg.info.role === "user") expect(parent._tag).toBe("Some") diff --git a/packages/opencode/test/tool/task_status.test.ts b/packages/opencode/test/tool/task_status.test.ts index 89bf49622d..cffe35cf18 100644 --- a/packages/opencode/test/tool/task_status.test.ts +++ b/packages/opencode/test/tool/task_status.test.ts @@ -10,6 +10,7 @@ import { SessionStatus } from "../../src/session/status" import { TaskStatusTool } from "../../src/tool/task_status" import { Truncate } from "@/tool/truncate" import { ModelID, ProviderID } from "../../src/provider/schema" +import { BackgroundJob } from "@/background/job" import { provideTmpdirInstance } from "../fixture/fixture" import { testEffect } from "../lib/effect" @@ -28,6 +29,7 @@ const it = testEffect( CrossSpawnSpawner.defaultLayer, Session.defaultLayer, SessionStatus.defaultLayer, + BackgroundJob.defaultLayer, Truncate.defaultLayer, ), )