From 085fac7c2c41ce773a4a2690e05d015782ca250d Mon Sep 17 00:00:00 2001 From: Shoubhit Dash Date: Fri, 1 May 2026 19:04:56 +0530 Subject: [PATCH] feat(background): add job service --- packages/opencode/src/background/job.ts | 173 ++++++++++++++++++ packages/opencode/src/id/id.ts | 1 + packages/opencode/test/background/job.test.ts | 49 +++++ 3 files changed, 223 insertions(+) create mode 100644 packages/opencode/src/background/job.ts create mode 100644 packages/opencode/test/background/job.test.ts diff --git a/packages/opencode/src/background/job.ts b/packages/opencode/src/background/job.ts new file mode 100644 index 0000000000..5603fb733a --- /dev/null +++ b/packages/opencode/src/background/job.ts @@ -0,0 +1,173 @@ +import { InstanceState } from "@/effect/instance-state" +import { Identifier } from "@/id/id" +import { Cause, Deferred, Effect, Fiber, Layer, Scope, Context } from "effect" + +export type Status = "running" | "completed" | "error" | "cancelled" + +export type Info = { + id: string + type: string + title?: string + status: Status + started_at: number + completed_at?: number + output?: string + error?: string + metadata?: Record +} + +type Active = { + info: Info + done: Deferred.Deferred + fiber?: Fiber.Fiber +} + +type State = { + jobs: Map + scope: Scope.Scope +} + +export type StartInput = { + id?: string + type: string + title?: string + metadata?: Record + run: Effect.Effect +} + +export type WaitInput = { + id: string + timeout?: number +} + +export type WaitResult = { + info?: Info + timedOut: boolean +} + +export interface Interface { + readonly list: () => Effect.Effect + readonly get: (id: string) => Effect.Effect + readonly start: (input: StartInput) => Effect.Effect + readonly wait: (input: WaitInput) => Effect.Effect + readonly cancel: (id: string) => Effect.Effect +} + +export class Service extends Context.Service()("@opencode/BackgroundJob") {} + +function snapshot(job: Active): Info { + return { + ...job.info, + ...(job.info.metadata ? { metadata: { ...job.info.metadata } } : {}), + } +} + +function errorText(error: unknown) { + if (error instanceof Error) return error.message + return String(error) +} + +export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const state = yield* InstanceState.make( + Effect.fn("BackgroundJob.state")(function* () { + return { + jobs: new Map(), + scope: yield* Scope.Scope, + } + }), + ) + + const finish = Effect.fn("BackgroundJob.finish")(function* ( + job: Active, + status: Exclude, + data?: { output?: string; error?: string }, + ) { + if (job.info.status !== "running") return snapshot(job) + job.info.status = status + job.info.completed_at = Date.now() + if (data?.output !== undefined) job.info.output = data.output + if (data?.error !== undefined) job.info.error = data.error + job.fiber = undefined + const info = snapshot(job) + yield* Deferred.succeed(job.done, info).pipe(Effect.ignore) + return info + }) + + const list: Interface["list"] = Effect.fn("BackgroundJob.list")(function* () { + const s = yield* InstanceState.get(state) + return Array.from(s.jobs.values()) + .map(snapshot) + .toSorted((a, b) => a.started_at - b.started_at) + }) + + const get: Interface["get"] = Effect.fn("BackgroundJob.get")(function* (id) { + const s = yield* InstanceState.get(state) + const job = s.jobs.get(id) + if (!job) return + return snapshot(job) + }) + + const start: Interface["start"] = Effect.fn("BackgroundJob.start")(function* (input) { + const s = yield* InstanceState.get(state) + const id = input.id ?? Identifier.ascending("job") + const existing = s.jobs.get(id) + if (existing?.info.status === "running") return snapshot(existing) + + const job: Active = { + info: { + id, + type: input.type, + title: input.title, + status: "running", + started_at: Date.now(), + metadata: input.metadata, + }, + done: yield* Deferred.make(), + } + s.jobs.set(id, job) + job.fiber = yield* input.run.pipe( + Effect.matchCauseEffect({ + onSuccess: (output) => finish(job, "completed", { output }), + onFailure: (cause) => + finish(job, Cause.hasInterruptsOnly(cause) ? "cancelled" : "error", { + error: errorText(Cause.squash(cause)), + }), + }), + Effect.asVoid, + Effect.forkIn(s.scope), + ) + return snapshot(job) + }) + + const wait: Interface["wait"] = Effect.fn("BackgroundJob.wait")(function* (input) { + const s = yield* InstanceState.get(state) + const job = s.jobs.get(input.id) + if (!job) return { timedOut: false } + if (job.info.status !== "running") return { info: snapshot(job), timedOut: false } + if (!input.timeout) return { info: yield* Deferred.await(job.done), timedOut: false } + return yield* Effect.raceAll([ + Deferred.await(job.done).pipe(Effect.map((info) => ({ info, timedOut: false }))), + Effect.sleep(input.timeout).pipe(Effect.as({ info: snapshot(job), timedOut: true })), + ]) + }) + + const cancel: Interface["cancel"] = Effect.fn("BackgroundJob.cancel")(function* (id) { + const s = yield* InstanceState.get(state) + const job = s.jobs.get(id) + if (!job) return + if (job.info.status !== "running") return snapshot(job) + const fiber = job.fiber + const info = yield* finish(job, "cancelled") + if (fiber) yield* Fiber.interrupt(fiber).pipe(Effect.ignore) + return info + }) + + return Service.of({ list, get, start, wait, cancel }) + }), +) + +export const defaultLayer = layer + +export * as BackgroundJob from "./job" diff --git a/packages/opencode/src/id/id.ts b/packages/opencode/src/id/id.ts index 46c210fa5d..737c9a3446 100644 --- a/packages/opencode/src/id/id.ts +++ b/packages/opencode/src/id/id.ts @@ -2,6 +2,7 @@ import z from "zod" import { randomBytes } from "crypto" const prefixes = { + job: "job", event: "evt", session: "ses", message: "msg", diff --git a/packages/opencode/test/background/job.test.ts b/packages/opencode/test/background/job.test.ts new file mode 100644 index 0000000000..6601042295 --- /dev/null +++ b/packages/opencode/test/background/job.test.ts @@ -0,0 +1,49 @@ +import { describe, expect } from "bun:test" +import { Deferred, Effect, Layer } from "effect" +import { BackgroundJob } from "@/background/job" +import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" +import { provideTmpdirInstance } from "../fixture/fixture" +import { testEffect } from "../lib/effect" + +const it = testEffect(Layer.mergeAll(BackgroundJob.defaultLayer, CrossSpawnSpawner.defaultLayer)) + +describe("background.job", () => { + it.live("tracks started jobs through completion", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + const jobs = yield* BackgroundJob.Service + const latch = yield* Deferred.make() + const job = yield* jobs.start({ + type: "test", + title: "test job", + run: Deferred.await(latch).pipe(Effect.as("done")), + }) + + expect(job.status).toBe("running") + yield* Deferred.succeed(latch, undefined) + const done = yield* jobs.wait({ id: job.id }) + + expect(done.info?.status).toBe("completed") + expect(done.info?.output).toBe("done") + expect((yield* jobs.list()).map((item) => item.id)).toEqual([job.id]) + }), + ), + ) + + it.live("can cancel running jobs", () => + provideTmpdirInstance(() => + Effect.gen(function* () { + const jobs = yield* BackgroundJob.Service + const latch = yield* Deferred.make() + const job = yield* jobs.start({ + type: "test", + run: Deferred.await(latch).pipe(Effect.as("done")), + }) + + const cancelled = yield* jobs.cancel(job.id) + + expect(cancelled?.status).toBe("cancelled") + }), + ), + ) +})