refactor(task): use background jobs

This commit is contained in:
Shoubhit Dash
2026-05-01 19:05:06 +05:30
parent 085fac7c2c
commit 2f919b8bc7
9 changed files with 94 additions and 28 deletions

View File

@@ -50,6 +50,7 @@ import { SessionShare } from "@/share/session"
import { SyncEvent } from "@/sync"
import { Npm } from "@opencode-ai/core/npm"
import { memoMap } from "@opencode-ai/core/effect/memo-map"
import { BackgroundJob } from "@/background/job"
export const AppLayer = Layer.mergeAll(
Npm.defaultLayer,
@@ -75,6 +76,7 @@ export const AppLayer = Layer.mergeAll(
Todo.defaultLayer,
Session.defaultLayer,
SessionStatus.defaultLayer,
BackgroundJob.defaultLayer,
SessionRunState.defaultLayer,
SessionProcessor.defaultLayer,
SessionCompaction.defaultLayer,

View File

@@ -118,9 +118,6 @@ export const layer = Layer.effect(
resolvePromptParts: (template: string) => resolvePromptParts(template),
prompt: (input: PromptInput) => prompt(input),
loop: (input: LoopInput) => loop(input),
fork: (effect: Effect.Effect<void, never, never>) => {
run.fork(effect)
},
} satisfies TaskPromptOps
})

View File

@@ -48,6 +48,7 @@ import { Agent } from "../agent/agent"
import { Skill } from "../skill"
import { Permission } from "@/permission"
import { SessionStatus } from "@/session/status"
import { BackgroundJob } from "@/background/job"
const log = Log.create({ service: "tool.registry" })
@@ -86,6 +87,7 @@ export const layer: Layer.Layer<
| Instruction.Service
| AppFileSystem.Service
| Bus.Service
| BackgroundJob.Service
| HttpClient.HttpClient
| ChildProcessSpawner
| Ripgrep.Service
@@ -343,6 +345,7 @@ export const defaultLayer = Layer.suspend(() =>
Layer.provide(Instruction.defaultLayer),
Layer.provide(AppFileSystem.defaultLayer),
Layer.provide(Bus.layer),
Layer.provide(BackgroundJob.defaultLayer),
Layer.provide(FetchHttpClient.layer),
Layer.provide(Format.defaultLayer),
Layer.provide(CrossSpawnSpawner.defaultLayer),

View File

@@ -10,13 +10,13 @@ import { SessionStatus } from "@/session/status"
import { TuiEvent } from "@/cli/cmd/tui/event"
import { Cause, Effect, Option, Schema } from "effect"
import { Config } from "@/config/config"
import { BackgroundJob } from "@/background/job"
export interface TaskPromptOps {
cancel(sessionID: SessionID): void
resolvePromptParts(template: string): Effect.Effect<SessionPrompt.PromptInput["parts"]>
prompt(input: SessionPrompt.PromptInput): Effect.Effect<MessageV2.WithParts>
loop(input: SessionPrompt.LoopInput): Effect.Effect<MessageV2.WithParts>
fork(effect: Effect.Effect<void, never, never>): void
}
const id = "task"
@@ -80,6 +80,7 @@ export const TaskTool = Tool.define(
const config = yield* Config.Service
const sessions = yield* Session.Service
const status = yield* SessionStatus.Service
const jobs = yield* BackgroundJob.Service
const run = Effect.fn(
"TaskTool.execute",
@@ -236,17 +237,28 @@ export const TaskTool = Tool.define(
yield* continueIfIdle({ userID: message.info.id, state })
})
ops.fork(
runTask().pipe(
yield* jobs.start({
id: nextSession.id,
type: id,
title: params.description,
metadata: {
parentSessionID: ctx.sessionID,
sessionID: nextSession.id,
subagent: next.name,
},
run: runTask().pipe(
Effect.matchCauseEffect({
onSuccess: (text) => inject("completed", text),
onFailure: (cause) =>
inject("error", errorText(Cause.squash(cause))).pipe(Effect.catchCause(() => Effect.void)),
onSuccess: (text) => inject("completed", text).pipe(Effect.as(text)),
onFailure: (cause) => {
const text = errorText(Cause.squash(cause))
return inject("error", text).pipe(
Effect.catchCause(() => Effect.void),
Effect.andThen(Effect.failCause(cause)),
)
},
}),
Effect.catchCause(() => Effect.void),
Effect.asVoid,
),
)
})
return {
title: params.description,

View File

@@ -6,6 +6,7 @@ import { MessageV2 } from "@/session/message-v2"
import { SessionStatus } from "@/session/status"
import { PositiveInt } from "@/util/schema"
import { Effect, Option, Schema } from "effect"
import { BackgroundJob } from "@/background/job"
const DEFAULT_TIMEOUT = 60_000
const POLL_MS = 300
@@ -34,11 +35,31 @@ function errorText(error: NonNullable<MessageV2.Assistant["error"]>) {
return error.name
}
function jobResult(job: BackgroundJob.Info): InspectResult {
if (job.status === "running") {
return {
state: "running",
text: "Task is still running.",
}
}
if (job.status === "completed") {
return {
state: "completed",
text: job.output ?? "",
}
}
return {
state: "error",
text: job.error ?? `Task ${job.status}.`,
}
}
export const TaskStatusTool = Tool.define(
"task_status",
Effect.gen(function* () {
const sessions = yield* Session.Service
const status = yield* SessionStatus.Service
const jobs = yield* BackgroundJob.Service
const inspect: (taskID: SessionID) => Effect.Effect<InspectResult> = Effect.fn("TaskStatusTool.inspect")(function* (
taskID: SessionID,
@@ -119,6 +140,30 @@ export const TaskStatusTool = Tool.define(
)(function* (params: Schema.Schema.Type<typeof Parameters>, _ctx: Tool.Context) {
yield* sessions.get(params.task_id)
const job = yield* jobs.get(params.task_id)
const waitedJob =
job && params.wait === true
? yield* jobs.wait({ id: params.task_id, timeout: params.timeout_ms ?? DEFAULT_TIMEOUT })
: { info: job, timedOut: false }
if (waitedJob.info) {
const result = jobResult(waitedJob.info)
return {
title: "Task status",
metadata: {
task_id: params.task_id,
state: result.state,
timed_out: waitedJob.timedOut,
},
output: format({
taskID: params.task_id,
state: result.state,
text: waitedJob.timedOut
? `Timed out after ${params.timeout_ms ?? DEFAULT_TIMEOUT}ms while waiting for task completion.`
: result.text,
}),
}
}
const waited =
params.wait === true
? yield* waitForTerminal(params.task_id, params.timeout_ms ?? DEFAULT_TIMEOUT)

View File

@@ -41,6 +41,7 @@ import * as Log from "@opencode-ai/core/util/log"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { Ripgrep } from "../../src/file/ripgrep"
import { Format } from "../../src/format"
import { BackgroundJob } from "@/background/job"
import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
import { reply, TestLLMServer } from "../lib/llm-server"
@@ -177,6 +178,7 @@ function makeHttp() {
Layer.provide(CrossSpawnSpawner.defaultLayer),
Layer.provide(Ripgrep.defaultLayer),
Layer.provide(Format.defaultLayer),
Layer.provide(BackgroundJob.defaultLayer),
Layer.provideMerge(todo),
Layer.provideMerge(question),
Layer.provideMerge(deps),

View File

@@ -55,6 +55,7 @@ import { AppFileSystem } from "@opencode-ai/core/filesystem"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { Ripgrep } from "../../src/file/ripgrep"
import { Format } from "../../src/format"
import { BackgroundJob } from "@/background/job"
void Log.init({ print: false })
@@ -130,6 +131,7 @@ function makeHttp() {
Layer.provide(CrossSpawnSpawner.defaultLayer),
Layer.provide(Ripgrep.defaultLayer),
Layer.provide(Format.defaultLayer),
Layer.provide(BackgroundJob.defaultLayer),
Layer.provideMerge(todo),
Layer.provideMerge(question),
Layer.provideMerge(deps),

View File

@@ -1,5 +1,5 @@
import { afterEach, describe, expect } from "bun:test"
import { Effect, Layer } from "effect"
import { Deferred, Effect, Layer } from "effect"
import { Agent } from "../../src/agent/agent"
import { Bus } from "../../src/bus"
import { Config } from "@/config/config"
@@ -14,6 +14,7 @@ import { ModelID, ProviderID } from "../../src/provider/schema"
import { TaskTool, type TaskPromptOps } from "../../src/tool/task"
import { Truncate } from "@/tool/truncate"
import { ToolRegistry } from "@/tool/registry"
import { BackgroundJob } from "@/background/job"
import { provideTmpdirInstance } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
@@ -34,6 +35,7 @@ const it = testEffect(
CrossSpawnSpawner.defaultLayer,
Session.defaultLayer,
SessionStatus.defaultLayer,
BackgroundJob.defaultLayer,
Truncate.defaultLayer,
ToolRegistry.defaultLayer,
),
@@ -68,13 +70,17 @@ const seed = Effect.fn("TaskToolTest.seed")(function* (title = "Pinned") {
return { chat, assistant }
})
function stubOps(session: Session.Interface, opts?: { onPrompt?: (input: SessionPrompt.PromptInput) => void; text?: string }): TaskPromptOps {
function stubOps(
session: Session.Interface,
opts?: { onPrompt?: (input: SessionPrompt.PromptInput) => void; text?: string; wait?: Effect.Effect<void> },
): TaskPromptOps {
return {
cancel() {},
resolvePromptParts: (template) => Effect.succeed([{ type: "text" as const, text: template }]),
prompt: (input) =>
Effect.gen(function* () {
opts?.onPrompt?.(input)
if (opts?.wait) yield* opts.wait
const userID = input.messageID ?? MessageID.ascending()
const user: MessageV2.User = {
id: userID,
@@ -120,7 +126,6 @@ function stubOps(session: Session.Interface, opts?: { onPrompt?: (input: Session
opts?.text ?? "done",
),
),
fork() {},
}
}
@@ -438,10 +443,11 @@ describe("tool.task", () => {
provideTmpdirInstance(() =>
Effect.gen(function* () {
const sessions = yield* Session.Service
const jobs = yield* BackgroundJob.Service
const { chat, assistant } = yield* seed()
const tool = yield* TaskTool
const def = yield* tool.init()
const forks: Effect.Effect<void, never, never>[] = []
const latch = yield* Deferred.make<void>()
const result = yield* def.execute(
{
@@ -456,12 +462,7 @@ describe("tool.task", () => {
agent: "build",
abort: new AbortController().signal,
extra: {
promptOps: {
...stubOps(sessions),
fork(effect) {
forks.push(effect)
},
} satisfies TaskPromptOps,
promptOps: stubOps(sessions, { wait: Deferred.await(latch) }),
},
messages: [],
metadata: () => Effect.void,
@@ -473,7 +474,10 @@ describe("tool.task", () => {
expect(result.metadata.background).toBe(true)
expect(result.output).toContain(`task_id: ${result.metadata.sessionId}`)
expect(result.output).toContain("state: running")
expect(forks).toHaveLength(1)
expect((yield* jobs.get(result.metadata.sessionId))?.status).toBe("running")
yield* Deferred.succeed(latch, undefined)
expect((yield* jobs.wait({ id: result.metadata.sessionId })).info?.status).toBe("completed")
}),
),
)
@@ -482,10 +486,10 @@ describe("tool.task", () => {
provideTmpdirInstance(() =>
Effect.gen(function* () {
const sessions = yield* Session.Service
const jobs = yield* BackgroundJob.Service
const { chat, assistant } = yield* seed()
const tool = yield* TaskTool
const def = yield* tool.init()
const forks: Effect.Effect<void, never, never>[] = []
const loops: string[] = []
const result = yield* def.execute(
@@ -518,9 +522,6 @@ describe("tool.task", () => {
),
)
},
fork(effect) {
forks.push(effect)
},
} satisfies TaskPromptOps,
},
messages: [],
@@ -529,7 +530,7 @@ describe("tool.task", () => {
},
)
yield* forks[0]!
expect((yield* jobs.wait({ id: result.metadata.sessionId })).info?.status).toBe("completed")
const parent = yield* sessions.findMessage(chat.id, (msg) => msg.info.role === "user")
expect(parent._tag).toBe("Some")

View File

@@ -10,6 +10,7 @@ import { SessionStatus } from "../../src/session/status"
import { TaskStatusTool } from "../../src/tool/task_status"
import { Truncate } from "@/tool/truncate"
import { ModelID, ProviderID } from "../../src/provider/schema"
import { BackgroundJob } from "@/background/job"
import { provideTmpdirInstance } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
@@ -28,6 +29,7 @@ const it = testEffect(
CrossSpawnSpawner.defaultLayer,
Session.defaultLayer,
SessionStatus.defaultLayer,
BackgroundJob.defaultLayer,
Truncate.defaultLayer,
),
)