fix(session): cancel background agents

This commit is contained in:
Shoubhit Dash
2026-05-12 20:51:31 +05:30
parent 0e214c1a0c
commit ec87f26085
4 changed files with 64 additions and 2 deletions

View File

@@ -1,5 +1,6 @@
import { InstanceState } from "@/effect/instance-state"
import { Runner } from "@/effect/runner"
import { BackgroundJob } from "@/background/job"
import { Effect, Latch, Layer, Scope, Context } from "effect"
import * as Session from "./session"
import { MessageV2 } from "./message-v2"
@@ -27,6 +28,7 @@ export class Service extends Context.Service<Service, Interface>()("@opencode/Se
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const background = yield* BackgroundJob.Service
const status = yield* SessionStatus.Service
const state = yield* InstanceState.make(
@@ -75,6 +77,7 @@ export const layer = Layer.effect(
})
const cancel = Effect.fn("SessionRunState.cancel")(function* (sessionID: SessionID) {
yield* cancelBackgroundJobs(background, sessionID)
const data = yield* InstanceState.get(state)
const existing = data.runners.get(sessionID)
if (!existing || !existing.busy) {
@@ -105,6 +108,40 @@ export const layer = Layer.effect(
}),
)
export const defaultLayer = layer.pipe(Layer.provide(SessionStatus.defaultLayer))
export const defaultLayer = layer.pipe(Layer.provide(BackgroundJob.defaultLayer), Layer.provide(SessionStatus.defaultLayer))
const cancelBackgroundJobs = Effect.fn("SessionRunState.cancelBackgroundJobs")(function* (
background: BackgroundJob.Interface,
sessionID: SessionID,
) {
const jobs = yield* background.list()
const pending = new Set<string>([sessionID])
const cancelled = new Set<string>()
const matches = (job: BackgroundJob.Info) => {
if (job.status !== "running") return false
if (cancelled.has(job.id)) return false
if (pending.has(job.id)) return true
if (typeof job.metadata?.sessionId === "string" && pending.has(job.metadata.sessionId)) return true
return typeof job.metadata?.parentSessionId === "string" && pending.has(job.metadata.parentSessionId)
}
let batch = jobs.filter(matches)
while (batch.length > 0) {
yield* Effect.forEach(
batch,
(job) =>
background.cancel(job.id).pipe(
Effect.tap(() =>
Effect.sync(() => {
cancelled.add(job.id)
pending.add(job.id)
if (typeof job.metadata?.sessionId === "string") pending.add(job.metadata.sessionId)
}),
),
),
{ concurrency: "unbounded", discard: true },
)
batch = jobs.filter(matches)
}
})
export * as SessionRunState from "./run-state"

View File

@@ -1,5 +1,6 @@
import { Slug } from "@opencode-ai/core/util/slug"
import path from "path"
import { BackgroundJob } from "@/background/job"
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { Decimal } from "decimal.js"
@@ -503,9 +504,10 @@ export type Patch = Types.DeepMutable<SyncEvent.Event<typeof Event.Updated>["dat
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
Effect.sync(() => Database.use(fn))
export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service | SyncEvent.Service> = Layer.effect(
export const layer: Layer.Layer<Service, never, BackgroundJob.Service | Bus.Service | Storage.Service | SyncEvent.Service> = Layer.effect(
Service,
Effect.gen(function* () {
const background = yield* BackgroundJob.Service
const bus = yield* Bus.Service
const storage = yield* Storage.Service
const sync = yield* SyncEvent.Service
@@ -583,6 +585,7 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service |
const remove: Interface["remove"] = Effect.fnUntraced(function* (sessionID: SessionID) {
const session = yield* get(sessionID)
try {
yield* cancelBackgroundJobs(background, sessionID)
const kids = yield* children(sessionID)
for (const child of kids) {
yield* remove(child.id)
@@ -833,11 +836,29 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service |
)
export const defaultLayer = layer.pipe(
Layer.provide(BackgroundJob.defaultLayer),
Layer.provide(Bus.layer),
Layer.provide(Storage.defaultLayer),
Layer.provide(SyncEvent.defaultLayer),
)
const cancelBackgroundJobs = Effect.fn("Session.cancelBackgroundJobs")(function* (
background: BackgroundJob.Interface,
sessionID: SessionID,
) {
const jobs = yield* background.list()
yield* Effect.forEach(
jobs.filter((job) => {
if (job.status !== "running") return false
if (job.id === sessionID) return true
if (job.metadata?.sessionId === sessionID) return true
return job.metadata?.parentSessionId === sessionID
}),
(job) => background.cancel(job.id),
{ concurrency: "unbounded", discard: true },
)
})
function* listByProject(
input: ListInput & {
projectID: ProjectID

View File

@@ -7,6 +7,7 @@ import path from "path"
import { fileURLToPath, pathToFileURL } from "url"
import { NamedError } from "@opencode-ai/core/util/error"
import { Agent as AgentSvc } from "../../src/agent/agent"
import { BackgroundJob } from "@/background/job"
import { Bus } from "../../src/bus"
import { Command } from "../../src/command"
import { Config } from "@/config/config"
@@ -175,6 +176,7 @@ function makeHttp() {
lsp,
mcp,
AppFileSystem.defaultLayer,
BackgroundJob.defaultLayer,
status,
SyncEvent.defaultLayer,
).pipe(Layer.provideMerge(infra))

View File

@@ -30,6 +30,7 @@ import { TestLLMServer } from "../lib/llm-server"
// Same layer setup as prompt-effect.test.ts
import { NodeFileSystem } from "@effect/platform-node"
import { Agent as AgentSvc } from "../../src/agent/agent"
import { BackgroundJob } from "@/background/job"
import { Git } from "../../src/git"
import { Bus } from "../../src/bus"
import { Command } from "../../src/command"
@@ -124,6 +125,7 @@ function makeHttp() {
lsp,
mcp,
AppFileSystem.defaultLayer,
BackgroundJob.defaultLayer,
status,
SyncEvent.defaultLayer,
).pipe(Layer.provideMerge(infra))