From a14aef191b10819dcc130430f7800dc6932ba6f3 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 7 Apr 2026 10:00:14 +0100 Subject: [PATCH] test: speed up task registry tests --- src/tasks/task-registry.maintenance.ts | 92 +++++++-- src/tasks/task-registry.test.ts | 249 +++++++++++-------------- 2 files changed, 185 insertions(+), 156 deletions(-) diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index a49b0b0f8e9..4a0079ada64 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -33,6 +33,43 @@ let sweeper: NodeJS.Timeout | null = null; let deferredSweep: NodeJS.Timeout | null = null; let sweepInProgress = false; +type TaskRegistryMaintenanceRuntime = { + readAcpSessionEntry: typeof readAcpSessionEntry; + loadSessionStore: typeof loadSessionStore; + resolveStorePath: typeof resolveStorePath; + isCronJobActive: typeof isCronJobActive; + getAgentRunContext: typeof getAgentRunContext; + parseAgentSessionKey: typeof parseAgentSessionKey; + deleteTaskRecordById: typeof deleteTaskRecordById; + ensureTaskRegistryReady: typeof ensureTaskRegistryReady; + getTaskById: typeof getTaskById; + listTaskRecords: typeof listTaskRecords; + markTaskLostById: typeof markTaskLostById; + maybeDeliverTaskTerminalUpdate: typeof maybeDeliverTaskTerminalUpdate; + resolveTaskForLookupToken: typeof resolveTaskForLookupToken; + setTaskCleanupAfterById: typeof setTaskCleanupAfterById; +}; + +const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = { + readAcpSessionEntry, + loadSessionStore, + resolveStorePath, + isCronJobActive, + getAgentRunContext, + parseAgentSessionKey, + deleteTaskRecordById, + ensureTaskRegistryReady, + getTaskById, + listTaskRecords, + markTaskLostById, + maybeDeliverTaskTerminalUpdate, + resolveTaskForLookupToken, + setTaskCleanupAfterById, +}; + +let taskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = + defaultTaskRegistryMaintenanceRuntime; + export type TaskRegistryMaintenanceSummary = { reconciled: number; cleanupStamped: number; @@ -70,7 +107,7 @@ function hasActiveCliRun(task: TaskRecord): boolean { const candidateRunIds = [task.sourceId, task.runId]; for (const candidate of candidateRunIds) { const runId = candidate?.trim(); - if (runId && getAgentRunContext(runId)) { + if (runId && taskRegistryMaintenanceRuntime.getAgentRunContext(runId)) { return true; } } @@ -80,7 +117,7 @@ function hasActiveCliRun(task: TaskRecord): boolean { function hasBackingSession(task: TaskRecord): boolean { if (task.runtime === "cron") { const jobId = task.sourceId?.trim(); - return jobId ? isCronJobActive(jobId) : false; + return jobId ? taskRegistryMaintenanceRuntime.isCronJobActive(jobId) : false; } if (task.runtime === "cli" && hasActiveCliRun(task)) { @@ -92,7 +129,7 @@ function hasBackingSession(task: TaskRecord): boolean { return true; } if (task.runtime === "acp") { - const acpEntry = readAcpSessionEntry({ + const acpEntry = taskRegistryMaintenanceRuntime.readAcpSessionEntry({ sessionKey: childSessionKey, }); if (!acpEntry || acpEntry.storeReadFailed) { @@ -107,9 +144,9 @@ function hasBackingSession(task: TaskRecord): boolean { return false; } } - const agentId = parseAgentSessionKey(childSessionKey)?.agentId; - const storePath = resolveStorePath(undefined, { agentId }); - const store = loadSessionStore(storePath); + const agentId = taskRegistryMaintenanceRuntime.parseAgentSessionKey(childSessionKey)?.agentId; + const storePath = taskRegistryMaintenanceRuntime.resolveStorePath(undefined, { agentId }); + const store = taskRegistryMaintenanceRuntime.loadSessionStore(storePath); return Boolean(findSessionEntryByKey(store, childSessionKey)); } @@ -149,14 +186,14 @@ function resolveCleanupAfter(task: TaskRecord): number { function markTaskLost(task: TaskRecord, now: number): TaskRecord { const cleanupAfter = task.cleanupAfter ?? projectTaskLost(task, now).cleanupAfter; const updated = - markTaskLostById({ + taskRegistryMaintenanceRuntime.markTaskLostById({ taskId: task.taskId, endedAt: task.endedAt ?? now, lastEventAt: now, error: task.error ?? "backing session missing", cleanupAfter, }) ?? task; - void maybeDeliverTaskTerminalUpdate(updated.taskId); + void taskRegistryMaintenanceRuntime.maybeDeliverTaskTerminalUpdate(updated.taskId); return updated; } @@ -185,8 +222,10 @@ export function reconcileTaskRecordForOperatorInspection(task: TaskRecord): Task } export function reconcileInspectableTasks(): TaskRecord[] { - ensureTaskRegistryReady(); - return listTaskRecords().map((task) => reconcileTaskRecordForOperatorInspection(task)); + taskRegistryMaintenanceRuntime.ensureTaskRegistryReady(); + return taskRegistryMaintenanceRuntime + .listTaskRecords() + .map((task) => reconcileTaskRecordForOperatorInspection(task)); } export function getInspectableTaskRegistrySummary(): TaskRegistrySummary { @@ -199,18 +238,18 @@ export function getInspectableTaskAuditSummary(): TaskAuditSummary { } export function reconcileTaskLookupToken(token: string): TaskRecord | undefined { - ensureTaskRegistryReady(); - const task = resolveTaskForLookupToken(token); + taskRegistryMaintenanceRuntime.ensureTaskRegistryReady(); + const task = taskRegistryMaintenanceRuntime.resolveTaskForLookupToken(token); return task ? reconcileTaskRecordForOperatorInspection(task) : undefined; } export function previewTaskRegistryMaintenance(): TaskRegistryMaintenanceSummary { - ensureTaskRegistryReady(); + taskRegistryMaintenanceRuntime.ensureTaskRegistryReady(); const now = Date.now(); let reconciled = 0; let cleanupStamped = 0; let pruned = 0; - for (const task of listTaskRecords()) { + for (const task of taskRegistryMaintenanceRuntime.listTaskRecords()) { if (shouldMarkLost(task, now)) { reconciled += 1; continue; @@ -246,15 +285,15 @@ function startScheduledSweep() { } export async function runTaskRegistryMaintenance(): Promise { - ensureTaskRegistryReady(); + taskRegistryMaintenanceRuntime.ensureTaskRegistryReady(); const now = Date.now(); let reconciled = 0; let cleanupStamped = 0; let pruned = 0; - const tasks = listTaskRecords(); + const tasks = taskRegistryMaintenanceRuntime.listTaskRecords(); let processed = 0; for (const task of tasks) { - const current = getTaskById(task.taskId); + const current = taskRegistryMaintenanceRuntime.getTaskById(task.taskId); if (!current) { continue; } @@ -269,7 +308,10 @@ export async function runTaskRegistryMaintenance(): Promise { deferredSweep = null; startScheduledSweep(); @@ -326,6 +368,16 @@ export function stopTaskRegistryMaintenance() { export const stopTaskRegistryMaintenanceForTests = stopTaskRegistryMaintenance; +export function setTaskRegistryMaintenanceRuntimeForTests( + runtime: TaskRegistryMaintenanceRuntime, +): void { + taskRegistryMaintenanceRuntime = runtime; +} + +export function resetTaskRegistryMaintenanceRuntimeForTests(): void { + taskRegistryMaintenanceRuntime = defaultTaskRegistryMaintenanceRuntime; +} + export function getReconciledTaskById(taskId: string): TaskRecord | undefined { const task = getTaskById(taskId); return task ? reconcileTaskRecordForOperatorInspection(task) : undefined; diff --git a/src/tasks/task-registry.test.ts b/src/tasks/task-registry.test.ts index 9fbe49b0ab5..6163e629e05 100644 --- a/src/tasks/task-registry.test.ts +++ b/src/tasks/task-registry.test.ts @@ -1,4 +1,5 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { AcpSessionStoreEntry } from "../acp/runtime/session-meta.js"; import { startAcpSpawnParentStreamRelay } from "../agents/acp-spawn-parent-stream.js"; import { resetCronActiveJobsForTests } from "../cron/active-jobs.js"; import { @@ -11,10 +12,12 @@ import { resetHeartbeatWakeStateForTests, } from "../infra/heartbeat-wake.js"; import { peekSystemEvents, resetSystemEventsForTest } from "../infra/system-events.js"; +import type { ParsedAgentSessionKey } from "../routing/session-key.js"; import { withTempDir } from "../test-helpers/temp-dir.js"; import { createManagedTaskFlow, resetTaskFlowRegistryForTests } from "./task-flow-registry.js"; import { configureTaskFlowRegistryRuntime } from "./task-flow-registry.store.js"; import { + cancelTaskById, createTaskRecord, findLatestTaskForOwnerKey, findLatestTaskForRelatedSessionKey, @@ -41,8 +44,10 @@ import { import { getInspectableTaskAuditSummary, previewTaskRegistryMaintenance, + resetTaskRegistryMaintenanceRuntimeForTests, reconcileInspectableTasks, runTaskRegistryMaintenance, + setTaskRegistryMaintenanceRuntimeForTests, startTaskRegistryMaintenance, stopTaskRegistryMaintenanceForTests, sweepTaskRegistry, @@ -71,39 +76,23 @@ vi.mock("../agents/subagent-control.js", () => ({ killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params), })); -async function loadFreshTaskRegistryModulesForControlTest() { - vi.resetModules(); - vi.doMock("../acp/control-plane/manager.js", () => ({ - getAcpSessionManager: () => ({ - cancelSession: hoisted.cancelSessionMock, - }), - })); - vi.doMock("../agents/subagent-control.js", () => ({ - killSubagentRunAdmin: (params: unknown) => hoisted.killSubagentRunAdminMock(params), - })); - const registry = await import("./task-registry.js"); - registry.setTaskRegistryDeliveryRuntimeForTests({ - sendMessage: hoisted.sendMessageMock, - }); - return registry; -} - -async function loadFreshTaskRegistryMaintenanceModuleForTest(params: { +function configureTaskRegistryMaintenanceRuntimeForTest(params: { currentTasks: Map>; snapshotTasks: ReturnType[]; -}) { - vi.resetModules(); - vi.doMock("../acp/runtime/session-meta.js", () => ({ - readAcpSessionEntry: () => ({ entry: undefined, storeReadFailed: false }), - })); - vi.doMock("../config/sessions.js", () => ({ +}): void { + const emptyAcpEntry = { + cfg: {} as never, + storePath: "", + sessionKey: "", + storeSessionKey: "", + entry: undefined, + storeReadFailed: false, + } satisfies AcpSessionStoreEntry; + setTaskRegistryMaintenanceRuntimeForTests({ + readAcpSessionEntry: () => emptyAcpEntry, loadSessionStore: () => ({}), resolveStorePath: () => "", - })); - vi.doMock("../routing/session-key.js", () => ({ - parseAgentSessionKey: () => undefined, - })); - vi.doMock("./runtime-internal.js", () => ({ + parseAgentSessionKey: () => null as ParsedAgentSessionKey | null, deleteTaskRecordById: (taskId: string) => params.currentTasks.delete(taskId), ensureTaskRegistryReady: () => {}, getTaskById: (taskId: string) => params.currentTasks.get(taskId), @@ -130,7 +119,7 @@ async function loadFreshTaskRegistryMaintenanceModuleForTest(params: { params.currentTasks.set(patch.taskId, next); return next; }, - maybeDeliverTaskTerminalUpdate: () => false, + maybeDeliverTaskTerminalUpdate: async () => null, resolveTaskForLookupToken: () => undefined, setTaskCleanupAfterById: (patch: { taskId: string; cleanupAfter: number }) => { const current = params.currentTasks.get(patch.taskId); @@ -144,8 +133,7 @@ async function loadFreshTaskRegistryMaintenanceModuleForTest(params: { params.currentTasks.set(patch.taskId, next); return next; }, - })); - return await import("./task-registry.maintenance.js"); + }); } async function waitForAssertion(assertion: () => void, timeoutMs = 2_000, stepMs = 5) { @@ -229,6 +217,7 @@ describe("task-registry", () => { resetAgentRunContextForTest(); resetCronActiveJobsForTests(); resetTaskRegistryDeliveryRuntimeForTests(); + resetTaskRegistryMaintenanceRuntimeForTests(); resetTaskRegistryForTests({ persist: false }); resetTaskFlowRegistryForTests({ persist: false }); hoisted.sendMessageMock.mockReset(); @@ -1424,7 +1413,7 @@ describe("task-registry", () => { lastEventAt: now, }; const currentTasks = new Map([[snapshotTask.taskId, currentTask]]); - const { runTaskRegistryMaintenance } = await loadFreshTaskRegistryMaintenanceModuleForTest({ + configureTaskRegistryMaintenanceRuntimeForTest({ currentTasks, snapshotTasks: [staleTask], }); @@ -1464,7 +1453,7 @@ describe("task-registry", () => { cleanupAfter: now + 60_000, }; const currentTasks = new Map([[snapshotTask.taskId, currentTask]]); - const { sweepTaskRegistry } = await loadFreshTaskRegistryMaintenanceModuleForTest({ + configureTaskRegistryMaintenanceRuntimeForTest({ currentTasks, snapshotTasks: [staleTask], }); @@ -1775,122 +1764,110 @@ describe("task-registry", () => { }); it("cancels ACP-backed tasks through the ACP session manager", async () => { - await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { - const registry = await loadFreshTaskRegistryModulesForControlTest(); + await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; - registry.resetTaskRegistryForTests(); - try { - hoisted.cancelSessionMock.mockResolvedValue(undefined); + hoisted.cancelSessionMock.mockResolvedValue(undefined); - const task = registry.createTaskRecord({ - runtime: "acp", - ownerKey: "agent:main:main", - scopeKind: "session", - requesterOrigin: { + const task = createTaskRecord({ + runtime: "acp", + ownerKey: "agent:main:main", + scopeKind: "session", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + childSessionKey: "agent:codex:acp:child", + runId: "run-cancel-acp", + task: "Investigate issue", + status: "running", + deliveryStatus: "pending", + }); + + const result = await cancelTaskById({ + cfg: {} as never, + taskId: task.taskId, + }); + + expect(hoisted.cancelSessionMock).toHaveBeenCalledWith( + expect.objectContaining({ + cfg: {}, + sessionKey: "agent:codex:acp:child", + reason: "task-cancel", + }), + ); + expect(result).toMatchObject({ + found: true, + cancelled: true, + task: expect.objectContaining({ + taskId: task.taskId, + status: "cancelled", + error: "Cancelled by operator.", + }), + }); + await waitForAssertion(() => + expect(hoisted.sendMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ channel: "telegram", to: "telegram:123", - }, - childSessionKey: "agent:codex:acp:child", - runId: "run-cancel-acp", - task: "Investigate issue", - status: "running", - deliveryStatus: "pending", - }); - - const result = await registry.cancelTaskById({ - cfg: {} as never, - taskId: task.taskId, - }); - - expect(hoisted.cancelSessionMock).toHaveBeenCalledWith( - expect.objectContaining({ - cfg: {}, - sessionKey: "agent:codex:acp:child", - reason: "task-cancel", + content: "Background task cancelled: ACP background task (run run-canc).", }), - ); - expect(result).toMatchObject({ - found: true, - cancelled: true, - task: expect.objectContaining({ - taskId: task.taskId, - status: "cancelled", - error: "Cancelled by operator.", - }), - }); - await waitForAssertion(() => - expect(hoisted.sendMessageMock).toHaveBeenCalledWith( - expect.objectContaining({ - channel: "telegram", - to: "telegram:123", - content: "Background task cancelled: ACP background task (run run-canc).", - }), - ), - ); - } finally { - registry.resetTaskRegistryForTests(); - } + ), + ); }); }); it("cancels subagent-backed tasks through subagent control", async () => { - await withTempDir({ prefix: "openclaw-task-registry-" }, async (root) => { - const registry = await loadFreshTaskRegistryModulesForControlTest(); + await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; - registry.resetTaskRegistryForTests(); - try { - hoisted.killSubagentRunAdminMock.mockResolvedValue({ - found: true, - killed: true, - }); + hoisted.killSubagentRunAdminMock.mockResolvedValue({ + found: true, + killed: true, + }); - const task = registry.createTaskRecord({ - runtime: "subagent", - ownerKey: "agent:main:main", - scopeKind: "session", - requesterOrigin: { + const task = createTaskRecord({ + runtime: "subagent", + ownerKey: "agent:main:main", + scopeKind: "session", + requesterOrigin: { + channel: "telegram", + to: "telegram:123", + }, + childSessionKey: "agent:worker:subagent:child", + runId: "run-cancel-subagent", + task: "Investigate issue", + status: "running", + deliveryStatus: "pending", + }); + + const result = await cancelTaskById({ + cfg: {} as never, + taskId: task.taskId, + }); + + expect(hoisted.killSubagentRunAdminMock).toHaveBeenCalledWith( + expect.objectContaining({ + cfg: {}, + sessionKey: "agent:worker:subagent:child", + }), + ); + expect(result).toMatchObject({ + found: true, + cancelled: true, + task: expect.objectContaining({ + taskId: task.taskId, + status: "cancelled", + error: "Cancelled by operator.", + }), + }); + await waitForAssertion(() => + expect(hoisted.sendMessageMock).toHaveBeenCalledWith( + expect.objectContaining({ channel: "telegram", to: "telegram:123", - }, - childSessionKey: "agent:worker:subagent:child", - runId: "run-cancel-subagent", - task: "Investigate issue", - status: "running", - deliveryStatus: "pending", - }); - - const result = await registry.cancelTaskById({ - cfg: {} as never, - taskId: task.taskId, - }); - - expect(hoisted.killSubagentRunAdminMock).toHaveBeenCalledWith( - expect.objectContaining({ - cfg: {}, - sessionKey: "agent:worker:subagent:child", + content: "Background task cancelled: Subagent task (run run-canc).", }), - ); - expect(result).toMatchObject({ - found: true, - cancelled: true, - task: expect.objectContaining({ - taskId: task.taskId, - status: "cancelled", - error: "Cancelled by operator.", - }), - }); - await waitForAssertion(() => - expect(hoisted.sendMessageMock).toHaveBeenCalledWith( - expect.objectContaining({ - channel: "telegram", - to: "telegram:123", - content: "Background task cancelled: Subagent task (run run-canc).", - }), - ), - ); - } finally { - registry.resetTaskRegistryForTests(); - } + ), + ); }); }); });