diff --git a/packages/opencode/src/session/run-state.ts b/packages/opencode/src/session/run-state.ts index 9d4986f174..2983521a9e 100644 --- a/packages/opencode/src/session/run-state.ts +++ b/packages/opencode/src/session/run-state.ts @@ -1,5 +1,6 @@ import { InstanceState } from "@/effect/instance-state" import { Runner } from "@/effect/runner" +import { BackgroundJob } from "@/background/job" import { Effect, Latch, Layer, Scope, Context } from "effect" import * as Session from "./session" import { MessageV2 } from "./message-v2" @@ -27,6 +28,7 @@ export class Service extends Context.Service()("@opencode/Se export const layer = Layer.effect( Service, Effect.gen(function* () { + const background = yield* BackgroundJob.Service const status = yield* SessionStatus.Service const state = yield* InstanceState.make( @@ -75,6 +77,7 @@ export const layer = Layer.effect( }) const cancel = Effect.fn("SessionRunState.cancel")(function* (sessionID: SessionID) { + yield* cancelBackgroundJobs(background, sessionID) const data = yield* InstanceState.get(state) const existing = data.runners.get(sessionID) if (!existing || !existing.busy) { @@ -105,6 +108,40 @@ export const layer = Layer.effect( }), ) -export const defaultLayer = layer.pipe(Layer.provide(SessionStatus.defaultLayer)) +export const defaultLayer = layer.pipe(Layer.provide(BackgroundJob.defaultLayer), Layer.provide(SessionStatus.defaultLayer)) + +const cancelBackgroundJobs = Effect.fn("SessionRunState.cancelBackgroundJobs")(function* ( + background: BackgroundJob.Interface, + sessionID: SessionID, +) { + const jobs = yield* background.list() + const pending = new Set([sessionID]) + const cancelled = new Set() + const matches = (job: BackgroundJob.Info) => { + if (job.status !== "running") return false + if (cancelled.has(job.id)) return false + if (pending.has(job.id)) return true + if (typeof job.metadata?.sessionId === "string" && pending.has(job.metadata.sessionId)) return true + return typeof job.metadata?.parentSessionId === "string" && pending.has(job.metadata.parentSessionId) + } + let batch = jobs.filter(matches) + while (batch.length > 0) { + yield* Effect.forEach( + batch, + (job) => + background.cancel(job.id).pipe( + Effect.tap(() => + Effect.sync(() => { + cancelled.add(job.id) + pending.add(job.id) + if (typeof job.metadata?.sessionId === "string") pending.add(job.metadata.sessionId) + }), + ), + ), + { concurrency: "unbounded", discard: true }, + ) + batch = jobs.filter(matches) + } +}) export * as SessionRunState from "./run-state" diff --git a/packages/opencode/src/session/session.ts b/packages/opencode/src/session/session.ts index eff027579a..2a6a9dc65c 100644 --- a/packages/opencode/src/session/session.ts +++ b/packages/opencode/src/session/session.ts @@ -1,5 +1,6 @@ import { Slug } from "@opencode-ai/core/util/slug" import path from "path" +import { BackgroundJob } from "@/background/job" import { BusEvent } from "@/bus/bus-event" import { Bus } from "@/bus" import { Decimal } from "decimal.js" @@ -503,9 +504,10 @@ export type Patch = Types.DeepMutable["dat const db = (fn: (d: Parameters[0] extends (trx: infer D) => any ? D : never) => T) => Effect.sync(() => Database.use(fn)) -export const layer: Layer.Layer = Layer.effect( +export const layer: Layer.Layer = Layer.effect( Service, Effect.gen(function* () { + const background = yield* BackgroundJob.Service const bus = yield* Bus.Service const storage = yield* Storage.Service const sync = yield* SyncEvent.Service @@ -583,6 +585,7 @@ export const layer: Layer.Layer { + if (job.status !== "running") return false + if (job.id === sessionID) return true + if (job.metadata?.sessionId === sessionID) return true + return job.metadata?.parentSessionId === sessionID + }), + (job) => background.cancel(job.id), + { concurrency: "unbounded", discard: true }, + ) +}) + function* listByProject( input: ListInput & { projectID: ProjectID diff --git a/packages/opencode/test/session/prompt.test.ts b/packages/opencode/test/session/prompt.test.ts index 3821954945..07615ca5d6 100644 --- a/packages/opencode/test/session/prompt.test.ts +++ b/packages/opencode/test/session/prompt.test.ts @@ -7,6 +7,7 @@ import path from "path" import { fileURLToPath, pathToFileURL } from "url" import { NamedError } from "@opencode-ai/core/util/error" import { Agent as AgentSvc } from "../../src/agent/agent" +import { BackgroundJob } from "@/background/job" import { Bus } from "../../src/bus" import { Command } from "../../src/command" import { Config } from "@/config/config" @@ -175,6 +176,7 @@ function makeHttp() { lsp, mcp, AppFileSystem.defaultLayer, + BackgroundJob.defaultLayer, status, SyncEvent.defaultLayer, ).pipe(Layer.provideMerge(infra)) diff --git a/packages/opencode/test/session/snapshot-tool-race.test.ts b/packages/opencode/test/session/snapshot-tool-race.test.ts index 8640612e98..292f821c42 100644 --- a/packages/opencode/test/session/snapshot-tool-race.test.ts +++ b/packages/opencode/test/session/snapshot-tool-race.test.ts @@ -30,6 +30,7 @@ import { TestLLMServer } from "../lib/llm-server" // Same layer setup as prompt-effect.test.ts import { NodeFileSystem } from "@effect/platform-node" import { Agent as AgentSvc } from "../../src/agent/agent" +import { BackgroundJob } from "@/background/job" import { Git } from "../../src/git" import { Bus } from "../../src/bus" import { Command } from "../../src/command" @@ -124,6 +125,7 @@ function makeHttp() { lsp, mcp, AppFileSystem.defaultLayer, + BackgroundJob.defaultLayer, status, SyncEvent.defaultLayer, ).pipe(Layer.provideMerge(infra))