fix(task): handle running background resumes

This commit is contained in:
Shoubhit Dash
2026-05-04 20:42:32 +05:30
parent 227b8c668b
commit 45360e5e0b
2 changed files with 190 additions and 17 deletions

View File

@@ -8,7 +8,7 @@ import { Agent } from "../agent/agent"
import type { SessionPrompt } from "../session/prompt"
import { SessionStatus } from "@/session/status"
import { TuiEvent } from "@/cli/cmd/tui/event"
import { Cause, Effect, Option, Schema } from "effect"
import { Cause, Effect, Option, Schema, Scope, Stream } from "effect"
import { Config } from "@/config/config"
import { BackgroundJob } from "@/background/job"
import { Flag } from "@opencode-ai/core/flag/flag"
@@ -82,6 +82,7 @@ export const TaskTool = Tool.define(
const sessions = yield* Session.Service
const status = yield* SessionStatus.Service
const jobs = yield* BackgroundJob.Service
const scope = yield* Scope.Scope
const run = Effect.fn(
"TaskTool.execute",
@@ -163,6 +164,9 @@ export const TaskTool = Tool.define(
if (background && !Flag.OPENCODE_EXPERIMENTAL) {
return yield* Effect.fail(new Error("Background tasks require OPENCODE_EXPERIMENTAL=true"))
}
if ((yield* jobs.get(nextSession.id))?.status === "running") {
return yield* Effect.fail(new Error(`Task ${nextSession.id} is already running`))
}
const metadata = {
sessionId: nextSession.id,
@@ -198,11 +202,21 @@ export const TaskTool = Tool.define(
return result.parts.findLast((item) => item.type === "text")?.text ?? ""
})
const continueIfIdle = Effect.fn("TaskTool.continueIfIdle")(function* (input: {
const resumeParent: (input: {
userID: MessageID
state: "completed" | "error"
}) {
if ((yield* status.get(ctx.sessionID)).type !== "idle") return
attempts?: number
}) => Effect.Effect<void> = Effect.fn("TaskTool.resumeParent")(function* (input) {
if ((yield* status.get(ctx.sessionID)).type !== "idle") {
if ((input.attempts ?? 0) >= 60) return
yield* bus.subscribe(SessionStatus.Event.Idle).pipe(
Stream.filter((event) => event.properties.sessionID === ctx.sessionID),
Stream.take(1),
Stream.runDrain,
Effect.timeoutOption("1 second"),
)
return yield* resumeParent({ ...input, attempts: (input.attempts ?? 0) + 1 })
}
const latest = yield* sessions.findMessage(ctx.sessionID, (item) => item.info.role === "user")
if (Option.isNone(latest)) return
if (latest.value.info.id !== input.userID) return
@@ -238,7 +252,7 @@ export const TaskTool = Tool.define(
},
],
})
yield* continueIfIdle({ userID: message.info.id, state })
yield* resumeParent({ userID: message.info.id, state }).pipe(Effect.ignore, Effect.forkIn(scope))
})
yield* jobs.start({

View File

@@ -1,5 +1,5 @@
import { afterEach, describe, expect } from "bun:test"
import { Deferred, Effect, Layer } from "effect"
import { Cause, Deferred, Effect, Exit, Layer } from "effect"
import { Agent } from "../../src/agent/agent"
import { Bus } from "../../src/bus"
import { Config } from "@/config/config"
@@ -497,6 +497,82 @@ describe("tool.task", () => {
const tool = yield* TaskTool
const def = yield* tool.init()
const loops: string[] = []
const resumed = yield* Deferred.make<void>()
const result = yield* def.execute(
{
description: "inspect bug",
prompt: "look into the cache key path",
subagent_type: "general",
background: true,
},
{
sessionID: chat.id,
messageID: assistant.id,
agent: "build",
abort: new AbortController().signal,
extra: {
promptOps: {
...stubOps(sessions, { text: "background done" }),
loop(input) {
loops.push(input.sessionID)
return Deferred.succeed(resumed, undefined).pipe(
Effect.andThen(
Effect.sync(() =>
reply(
{
sessionID: input.sessionID,
messageID: MessageID.ascending(),
agent: "build",
model: ref,
parts: [],
},
"looped",
),
),
),
)
},
} satisfies TaskPromptOps,
},
messages: [],
metadata: () => Effect.void,
ask: () => Effect.void,
},
)
expect((yield* jobs.wait({ id: result.metadata.sessionId })).info?.status).toBe("completed")
yield* Deferred.await(resumed).pipe(Effect.timeout("1 second"))
const parent = yield* sessions.findMessage(chat.id, (msg) => msg.info.role === "user")
expect(parent._tag).toBe("Some")
if (parent._tag !== "Some") return
expect(parent.value.parts.find((part) => part.type === "text")?.text).toContain("Background task completed")
expect(parent.value.parts.find((part) => part.type === "text")?.text).toContain("background done")
expect(loops).toEqual([chat.id])
const child = yield* sessions.findMessage(result.metadata.sessionId, (msg) => msg.info.role === "assistant")
expect(child._tag).toBe("Some")
if (child._tag !== "Some") return
expect(child.value.parts.find((part) => part.type === "text")?.text).toBe("background done")
}),
),
)
it.live("background task resumes parent after it becomes idle", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
Flag.OPENCODE_EXPERIMENTAL = true
const sessions = yield* Session.Service
const status = yield* SessionStatus.Service
const jobs = yield* BackgroundJob.Service
const { chat, assistant } = yield* seed()
const tool = yield* TaskTool
const def = yield* tool.init()
const loops: string[] = []
const resumed = yield* Deferred.make<void>()
yield* status.set(chat.id, { type: "busy" })
const result = yield* def.execute(
{
@@ -526,7 +602,7 @@ describe("tool.task", () => {
},
"looped",
),
)
).pipe(Effect.tap(() => Deferred.succeed(resumed, undefined)))
},
} satisfies TaskPromptOps,
},
@@ -537,18 +613,101 @@ describe("tool.task", () => {
)
expect((yield* jobs.wait({ id: result.metadata.sessionId })).info?.status).toBe("completed")
const parent = yield* sessions.findMessage(chat.id, (msg) => msg.info.role === "user")
expect(parent._tag).toBe("Some")
if (parent._tag !== "Some") return
expect(parent.value.parts.find((part) => part.type === "text")?.text).toContain("Background task completed")
expect(parent.value.parts.find((part) => part.type === "text")?.text).toContain("background done")
expect(loops).toEqual([])
yield* status.set(chat.id, { type: "idle" })
yield* Deferred.await(resumed).pipe(Effect.timeout("1 second"))
expect(loops).toEqual([chat.id])
}),
),
)
const child = yield* sessions.findMessage(result.metadata.sessionId, (msg) => msg.info.role === "assistant")
expect(child._tag).toBe("Some")
if (child._tag !== "Some") return
expect(child.value.parts.find((part) => part.type === "text")?.text).toBe("background done")
it.live("background resume fails while task is already running", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
Flag.OPENCODE_EXPERIMENTAL = true
const sessions = yield* Session.Service
const { chat, assistant } = yield* seed()
const tool = yield* TaskTool
const def = yield* tool.init()
const latch = yield* Deferred.make<void>()
const result = yield* def.execute(
{
description: "inspect bug",
prompt: "look into the cache key path",
subagent_type: "general",
background: true,
},
{
sessionID: chat.id,
messageID: assistant.id,
agent: "build",
abort: new AbortController().signal,
extra: {
promptOps: stubOps(sessions, { wait: Deferred.await(latch) }),
},
messages: [],
metadata: () => Effect.void,
ask: () => Effect.void,
},
)
const exit = yield* def
.execute(
{
description: "inspect bug again",
prompt: "second prompt",
subagent_type: "general",
task_id: result.metadata.sessionId,
background: true,
},
{
sessionID: chat.id,
messageID: assistant.id,
agent: "build",
abort: new AbortController().signal,
extra: { promptOps: stubOps(sessions) },
messages: [],
metadata: () => Effect.void,
ask: () => Effect.void,
},
)
.pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
const error = Cause.squash(exit.cause)
expect(error instanceof Error ? error.message : String(error)).toContain("is already running")
}
const foregroundExit = yield* def
.execute(
{
description: "inspect bug again",
prompt: "second prompt",
subagent_type: "general",
task_id: result.metadata.sessionId,
},
{
sessionID: chat.id,
messageID: assistant.id,
agent: "build",
abort: new AbortController().signal,
extra: { promptOps: stubOps(sessions) },
messages: [],
metadata: () => Effect.void,
ask: () => Effect.void,
},
)
.pipe(Effect.exit)
expect(Exit.isFailure(foregroundExit)).toBe(true)
if (Exit.isFailure(foregroundExit)) {
const error = Cause.squash(foregroundExit.cause)
expect(error instanceof Error ? error.message : String(error)).toContain("is already running")
}
yield* Deferred.succeed(latch, undefined)
}),
),
)