From 45360e5e0bba73b40b4c982df778d60902cf4576 Mon Sep 17 00:00:00 2001 From: Shoubhit Dash Date: Mon, 4 May 2026 20:42:32 +0530 Subject: [PATCH] fix(task): handle running background resumes --- packages/opencode/src/tool/task.ts | 24 ++- packages/opencode/test/tool/task.test.ts | 183 +++++++++++++++++++++-- 2 files changed, 190 insertions(+), 17 deletions(-) diff --git a/packages/opencode/src/tool/task.ts b/packages/opencode/src/tool/task.ts index 43c02107dd..17eaa5e86e 100644 --- a/packages/opencode/src/tool/task.ts +++ b/packages/opencode/src/tool/task.ts @@ -8,7 +8,7 @@ import { Agent } from "../agent/agent" import type { SessionPrompt } from "../session/prompt" import { SessionStatus } from "@/session/status" import { TuiEvent } from "@/cli/cmd/tui/event" -import { Cause, Effect, Option, Schema } from "effect" +import { Cause, Effect, Option, Schema, Scope, Stream } from "effect" import { Config } from "@/config/config" import { BackgroundJob } from "@/background/job" import { Flag } from "@opencode-ai/core/flag/flag" @@ -82,6 +82,7 @@ export const TaskTool = Tool.define( const sessions = yield* Session.Service const status = yield* SessionStatus.Service const jobs = yield* BackgroundJob.Service + const scope = yield* Scope.Scope const run = Effect.fn( "TaskTool.execute", @@ -163,6 +164,9 @@ export const TaskTool = Tool.define( if (background && !Flag.OPENCODE_EXPERIMENTAL) { return yield* Effect.fail(new Error("Background tasks require OPENCODE_EXPERIMENTAL=true")) } + if ((yield* jobs.get(nextSession.id))?.status === "running") { + return yield* Effect.fail(new Error(`Task ${nextSession.id} is already running`)) + } const metadata = { sessionId: nextSession.id, @@ -198,11 +202,21 @@ export const TaskTool = Tool.define( return result.parts.findLast((item) => item.type === "text")?.text ?? "" }) - const continueIfIdle = Effect.fn("TaskTool.continueIfIdle")(function* (input: { + const resumeParent: (input: { userID: MessageID state: "completed" | "error" - }) { - if ((yield* status.get(ctx.sessionID)).type !== "idle") return + attempts?: number + }) => Effect.Effect = Effect.fn("TaskTool.resumeParent")(function* (input) { + if ((yield* status.get(ctx.sessionID)).type !== "idle") { + if ((input.attempts ?? 0) >= 60) return + yield* bus.subscribe(SessionStatus.Event.Idle).pipe( + Stream.filter((event) => event.properties.sessionID === ctx.sessionID), + Stream.take(1), + Stream.runDrain, + Effect.timeoutOption("1 second"), + ) + return yield* resumeParent({ ...input, attempts: (input.attempts ?? 0) + 1 }) + } const latest = yield* sessions.findMessage(ctx.sessionID, (item) => item.info.role === "user") if (Option.isNone(latest)) return if (latest.value.info.id !== input.userID) return @@ -238,7 +252,7 @@ export const TaskTool = Tool.define( }, ], }) - yield* continueIfIdle({ userID: message.info.id, state }) + yield* resumeParent({ userID: message.info.id, state }).pipe(Effect.ignore, Effect.forkIn(scope)) }) yield* jobs.start({ diff --git a/packages/opencode/test/tool/task.test.ts b/packages/opencode/test/tool/task.test.ts index 63ccc78bd1..5209ea2e6c 100644 --- a/packages/opencode/test/tool/task.test.ts +++ b/packages/opencode/test/tool/task.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, expect } from "bun:test" -import { Deferred, Effect, Layer } from "effect" +import { Cause, Deferred, Effect, Exit, Layer } from "effect" import { Agent } from "../../src/agent/agent" import { Bus } from "../../src/bus" import { Config } from "@/config/config" @@ -497,6 +497,82 @@ describe("tool.task", () => { const tool = yield* TaskTool const def = yield* tool.init() const loops: string[] = [] + const resumed = yield* Deferred.make() + + const result = yield* def.execute( + { + description: "inspect bug", + prompt: "look into the cache key path", + subagent_type: "general", + background: true, + }, + { + sessionID: chat.id, + messageID: assistant.id, + agent: "build", + abort: new AbortController().signal, + extra: { + promptOps: { + ...stubOps(sessions, { text: "background done" }), + loop(input) { + loops.push(input.sessionID) + return Deferred.succeed(resumed, undefined).pipe( + Effect.andThen( + Effect.sync(() => + reply( + { + sessionID: input.sessionID, + messageID: MessageID.ascending(), + agent: "build", + model: ref, + parts: [], + }, + "looped", + ), + ), + ), + ) + }, + } satisfies TaskPromptOps, + }, + messages: [], + metadata: () => Effect.void, + ask: () => Effect.void, + }, + ) + + expect((yield* jobs.wait({ id: result.metadata.sessionId })).info?.status).toBe("completed") + yield* Deferred.await(resumed).pipe(Effect.timeout("1 second")) + + const parent = yield* sessions.findMessage(chat.id, (msg) => msg.info.role === "user") + expect(parent._tag).toBe("Some") + if (parent._tag !== "Some") return + expect(parent.value.parts.find((part) => part.type === "text")?.text).toContain("Background task completed") + expect(parent.value.parts.find((part) => part.type === "text")?.text).toContain("background done") + expect(loops).toEqual([chat.id]) + + const child = yield* sessions.findMessage(result.metadata.sessionId, (msg) => msg.info.role === "assistant") + expect(child._tag).toBe("Some") + if (child._tag !== "Some") return + expect(child.value.parts.find((part) => part.type === "text")?.text).toBe("background done") + }), + ), + ) + + it.live("background task resumes parent after it becomes idle", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + Flag.OPENCODE_EXPERIMENTAL = true + const sessions = yield* Session.Service + const status = yield* SessionStatus.Service + const jobs = yield* BackgroundJob.Service + const { chat, assistant } = yield* seed() + const tool = yield* TaskTool + const def = yield* tool.init() + const loops: string[] = [] + const resumed = yield* Deferred.make() + + yield* status.set(chat.id, { type: "busy" }) const result = yield* def.execute( { @@ -526,7 +602,7 @@ describe("tool.task", () => { }, "looped", ), - ) + ).pipe(Effect.tap(() => Deferred.succeed(resumed, undefined))) }, } satisfies TaskPromptOps, }, @@ -537,18 +613,101 @@ describe("tool.task", () => { ) expect((yield* jobs.wait({ id: result.metadata.sessionId })).info?.status).toBe("completed") - - const parent = yield* sessions.findMessage(chat.id, (msg) => msg.info.role === "user") - expect(parent._tag).toBe("Some") - if (parent._tag !== "Some") return - expect(parent.value.parts.find((part) => part.type === "text")?.text).toContain("Background task completed") - expect(parent.value.parts.find((part) => part.type === "text")?.text).toContain("background done") + expect(loops).toEqual([]) + yield* status.set(chat.id, { type: "idle" }) + yield* Deferred.await(resumed).pipe(Effect.timeout("1 second")) expect(loops).toEqual([chat.id]) + }), + ), + ) - const child = yield* sessions.findMessage(result.metadata.sessionId, (msg) => msg.info.role === "assistant") - expect(child._tag).toBe("Some") - if (child._tag !== "Some") return - expect(child.value.parts.find((part) => part.type === "text")?.text).toBe("background done") + it.live("background resume fails while task is already running", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + Flag.OPENCODE_EXPERIMENTAL = true + const sessions = yield* Session.Service + const { chat, assistant } = yield* seed() + const tool = yield* TaskTool + const def = yield* tool.init() + const latch = yield* Deferred.make() + + const result = yield* def.execute( + { + description: "inspect bug", + prompt: "look into the cache key path", + subagent_type: "general", + background: true, + }, + { + sessionID: chat.id, + messageID: assistant.id, + agent: "build", + abort: new AbortController().signal, + extra: { + promptOps: stubOps(sessions, { wait: Deferred.await(latch) }), + }, + messages: [], + metadata: () => Effect.void, + ask: () => Effect.void, + }, + ) + + const exit = yield* def + .execute( + { + description: "inspect bug again", + prompt: "second prompt", + subagent_type: "general", + task_id: result.metadata.sessionId, + background: true, + }, + { + sessionID: chat.id, + messageID: assistant.id, + agent: "build", + abort: new AbortController().signal, + extra: { promptOps: stubOps(sessions) }, + messages: [], + metadata: () => Effect.void, + ask: () => Effect.void, + }, + ) + .pipe(Effect.exit) + + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + const error = Cause.squash(exit.cause) + expect(error instanceof Error ? error.message : String(error)).toContain("is already running") + } + + const foregroundExit = yield* def + .execute( + { + description: "inspect bug again", + prompt: "second prompt", + subagent_type: "general", + task_id: result.metadata.sessionId, + }, + { + sessionID: chat.id, + messageID: assistant.id, + agent: "build", + abort: new AbortController().signal, + extra: { promptOps: stubOps(sessions) }, + messages: [], + metadata: () => Effect.void, + ask: () => Effect.void, + }, + ) + .pipe(Effect.exit) + + expect(Exit.isFailure(foregroundExit)).toBe(true) + if (Exit.isFailure(foregroundExit)) { + const error = Cause.squash(foregroundExit.cause) + expect(error instanceof Error ? error.message : String(error)).toContain("is already running") + } + + yield* Deferred.succeed(latch, undefined) }), ), )