diff --git a/_board/TASKS/AO-06_HISTORY_ANCHORING.md b/_board/TASKS/AO-06_HISTORY_ANCHORING.md new file mode 100644 index 0000000..48a8ac6 --- /dev/null +++ b/_board/TASKS/AO-06_HISTORY_ANCHORING.md @@ -0,0 +1,11 @@ +# AO-06 History Anchoring for Synthetic Tasks + +## Context +Autonomous tasks should have access to relevant conversation history. + +## Proposed Changes +- [ ] Ensure `runTaskPipeline` correctly picks up history for `chatId` when triggered by cron. +- [ ] Index the results of autonomous tasks into the shared session memory. + +## Verification +- Ask a follow-up question in Telegram about an autonomous task's result to confirm context visibility. diff --git a/_board/TASKS/AO-07_CONCURRENCY_MANAGEMENT.md b/_board/TASKS/AO-07_CONCURRENCY_MANAGEMENT.md new file mode 100644 index 0000000..f6457ff --- /dev/null +++ b/_board/TASKS/AO-07_CONCURRENCY_MANAGEMENT.md @@ -0,0 +1,14 @@ +# AO-07 Configurable Concurrency Management + +## Context +Ensure the system doesn't overwhelm local resources during peak autonomous activity. + +## Proposed Changes +- [ ] Add `maxConcurrentTasks` to `config.json` (Default: 1). +- [ ] Implement a `TaskQueue` in Orchestrator. +- [ ] Implement logic where `0` means infinite concurrency. +- [ ] Ensure human goals are prioritized or balanced with synthetic goals. + +## Verification +- Set concurrency to 1. Trigger multiple cron jobs simultaneously. Verify they execute sequentially. +- Set concurrency to 0. Verify they start simultaneously. diff --git a/src/core/__tests__/orchestrator-concurrency.test.ts b/src/core/__tests__/orchestrator-concurrency.test.ts new file mode 100644 index 0000000..02cf84e --- /dev/null +++ b/src/core/__tests__/orchestrator-concurrency.test.ts @@ -0,0 +1,128 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { Orchestrator } from '../orchestrator'; +import { getConfig } from '../../shared/config'; + +// Mock dependencies +vi.mock('../../shared/config', () => ({ + getConfig: vi.fn(), +})); + +vi.mock('../../utils/console-logger', () => ({ + ConsoleLogger: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + ipc: vi.fn(), + success: vi.fn(), + }, +})); + +// We need to mock the adapter and services that Orchestrator initializes in its constructor +vi.mock('../../adapters/ollama-adapter', () => ({ + OllamaAdapter: vi.fn().mockImplementation(function () { return {}; }), +})); + +vi.mock('../model-router', () => ({ + ModelRouter: vi.fn().mockImplementation(function () { return {}; }), +})); + +vi.mock('../../services/model-manager', () => ({ + ModelManagerService: vi.fn().mockImplementation(function () { return {}; }), +})); + +describe('Orchestrator Concurrency', () => { + let orchestrator: Orchestrator; + + beforeEach(() => { + vi.clearAllMocks(); + (getConfig as any).mockReturnValue({ + maxConcurrentTasks: 1, + ollama: { baseUrl: 'http://localhost:11434', timeoutMs: 60000 }, + modelRouter: { modelTiers: {} }, + }); + orchestrator = new Orchestrator(); + }); + + it('should process human tasks sequentially when limit is 1', async () => { + const runTaskPipelineSpy = vi.spyOn(orchestrator as any, 'runTaskPipeline'); + + // Controlled promise to simulate long running task + let resolveTask1: (value: unknown) => void; + const task1Promise = new Promise((resolve) => { resolveTask1 = resolve; }); + + runTaskPipelineSpy.mockImplementationOnce(() => task1Promise); + runTaskPipelineSpy.mockImplementationOnce(() => Promise.resolve()); + + // Enqueue two human tasks (priority 1) + (orchestrator as any).enqueueTask({ chatId: 1, userId: 1, goal: 'task1', priority: 1 }); + (orchestrator as any).enqueueTask({ chatId: 1, userId: 1, goal: 'task2', priority: 1 }); + + // Verify only task 1 started + expect(runTaskPipelineSpy).toHaveBeenCalledTimes(1); + expect(runTaskPipelineSpy).toHaveBeenCalledWith(1, 1, 'task1', undefined, undefined); + + // Complete task 1 + resolveTask1!(null); + await new Promise(process.nextTick); // Let promises settle + + // Verify task 2 started + expect(runTaskPipelineSpy).toHaveBeenCalledTimes(2); + expect(runTaskPipelineSpy).toHaveBeenLastCalledWith(1, 1, 'task2', undefined, undefined); + }); + + it('should prioritize human tasks over synthetic tasks', async () => { + const runTaskPipelineSpy = vi.spyOn(orchestrator as any, 'runTaskPipeline'); + + // Controlled promise to simulate long running task + let resolveTask1: (value: unknown) => void; + const task1Promise = new Promise((resolve) => { resolveTask1 = resolve; }); + + runTaskPipelineSpy.mockImplementationOnce(() => task1Promise); + runTaskPipelineSpy.mockImplementation(() => Promise.resolve()); + + // 1. Start a slow synthetic task (priority 0) + (orchestrator as any).enqueueTask({ chatId: 1, userId: 1, goal: 'synthetic1', priority: 0 }); + + // 2. Enqueue another synthetic task + (orchestrator as any).enqueueTask({ chatId: 1, userId: 1, goal: 'synthetic2', priority: 0 }); + + // 3. Enqueue a human task (priority 1) + (orchestrator as any).enqueueTask({ chatId: 1, userId: 1, goal: 'human1', priority: 1 }); + + // Verify only synthetic1 started + expect(runTaskPipelineSpy).toHaveBeenCalledTimes(1); + expect(runTaskPipelineSpy).toHaveBeenCalledWith(1, 1, 'synthetic1', undefined, undefined); + + // Controlled promise to simulate long running task 2 (human) + let resolveTask2: (value: unknown) => void; + const task2Promise = new Promise((resolve) => { resolveTask2 = resolve; }); + runTaskPipelineSpy.mockImplementationOnce(() => task2Promise); + + // Complete synthetic1 + resolveTask1!(null); + await new Promise(process.nextTick); + + // Verify human1 started next (pushed to front) + expect(runTaskPipelineSpy).toHaveBeenCalledTimes(2); + expect(runTaskPipelineSpy).toHaveBeenCalledWith(1, 1, 'human1', undefined, undefined); + + // Complete human1 + resolveTask2!(null); + await new Promise(process.nextTick); + expect(runTaskPipelineSpy).toHaveBeenCalledTimes(3); + expect(runTaskPipelineSpy).toHaveBeenCalledWith(1, 1, 'synthetic2', undefined, undefined); + }); + + it('should run immediately if limit is 0', async () => { + (getConfig as any).mockReturnValue({ + maxConcurrentTasks: 0, + }); + + const runTaskPipelineSpy = vi.spyOn(orchestrator as any, 'runTaskPipeline').mockResolvedValue(null); + + (orchestrator as any).enqueueTask({ chatId: 1, userId: 1, goal: 'task1', priority: 1 }); + (orchestrator as any).enqueueTask({ chatId: 2, userId: 2, goal: 'task2', priority: 1 }); + + expect(runTaskPipelineSpy).toHaveBeenCalledTimes(2); + }); +}); diff --git a/src/core/orchestrator.ts b/src/core/orchestrator.ts index a8b4fb8..d998bd2 100644 --- a/src/core/orchestrator.ts +++ b/src/core/orchestrator.ts @@ -57,6 +57,17 @@ export class Orchestrator { private readonly pending = new Map(); private readonly modelManager: ModelManagerService; + // Concurrency & Queueing + private taskQueue: Array<{ + chatId: number; + userId: number; + goal: string; + conversationId?: string | undefined; + initialTaskId?: string | undefined; + priority: number; + }> = []; + private activeTaskCount = 0; + constructor() { const ollama = new OllamaAdapter(); const modelRouter = new ModelRouter(); @@ -264,20 +275,70 @@ export class Orchestrator { const userId = payload.userId as number | undefined; if (goal != null && chatId != null) { const conversationId = payload.conversationId as string | undefined; - this.runTaskPipeline(chatId, userId ?? 0, goal, conversationId).catch((err) => { - ConsoleLogger.error("core", "Task pipeline error", err instanceof Error ? err : String(err), envelope); - this.sendToTelegram(chatId, `Error: ${err instanceof Error ? err.message : String(err)}`); + // Route to task queue (priority 1 for human) + this.enqueueTask({ + chatId, + userId: userId ?? 0, + goal, + conversationId, + priority: 1 }); } + return; } // FP-10: Handle file ingest from Telegram adapter if (type === "file.ingest" && fromProcess === "telegram-adapter") { - const p = (payload as unknown) as FileIngestPayload; + const p = payload as unknown as FileIngestPayload; this.handleFileIngest(p).catch((err) => { ConsoleLogger.error("core", "File ingest error", err instanceof Error ? err : String(err), envelope); this.sendToTelegram(p.chatId, `File processing error: ${err instanceof Error ? err.message : String(err)}`); }); + return; + } + } + + private enqueueTask(task: { + chatId: number; + userId: number; + goal: string; + conversationId?: string | undefined; + initialTaskId?: string | undefined; + priority: number; + }): void { + const limit = getConfig().maxConcurrentTasks; + + if (limit === 0) { + // Infinite concurrency + this.runTaskPipeline(task.chatId, task.userId, task.goal, task.conversationId, task.initialTaskId); + return; + } + + // Insert into queue with priority (Human priority 1 > Synthetic priority 0) + if (task.priority > 0) { + // Human goals go to the front + this.taskQueue.unshift(task); + } else { + // Synthetic goals go to the back + this.taskQueue.push(task); + } + + this.processQueue(); + } + + private processQueue(): void { + const limit = getConfig().maxConcurrentTasks; + if (limit === 0) return; // Managed by direct execution in enqueueTask + + while (this.activeTaskCount < limit && this.taskQueue.length > 0) { + const task = this.taskQueue.shift()!; + this.activeTaskCount++; + + this.runTaskPipeline(task.chatId, task.userId, task.goal, task.conversationId, task.initialTaskId) + .finally(() => { + this.activeTaskCount--; + this.processQueue(); + }); } } @@ -801,10 +862,14 @@ export class Orchestrator { ConsoleLogger.info("core", `Triggering autonomous AI task: "${query}" for chatId ${chatIdNum} (taskId: ${taskId})`); - // Route to task pipeline - this.runTaskPipeline(chatIdNum, userIdNum, query, String(chatIdNum), taskId).catch((err) => { - ConsoleLogger.error("core", "Autonomous task pipeline error", err instanceof Error ? err : String(err), envelope); - this.sendToTelegram(chatIdNum, `Autonomous Task Failed: ${err instanceof Error ? err.message : String(err)}`); + // Route to task queue (priority 0 for synthetic) + this.enqueueTask({ + chatId: chatIdNum, + userId: userIdNum, + goal: query, + conversationId: String(chatIdNum), + initialTaskId: taskId, + priority: 0 }); } @@ -1075,4 +1140,6 @@ function main(): void { }); } -main(); +if (typeof require !== 'undefined' && require.main === module) { + main(); +} diff --git a/src/shared/config.ts b/src/shared/config.ts index f970d6f..5935608 100644 --- a/src/shared/config.ts +++ b/src/shared/config.ts @@ -133,6 +133,7 @@ export interface AppConfig { skills: SkillsConfig; whisper: WhisperConfig; fileProcessor: FileProcessorConfig; + maxConcurrentTasks: number; } const DEFAULT_CONFIG: AppConfig = { @@ -200,6 +201,7 @@ const DEFAULT_CONFIG: AppConfig = { ocrModel: "glm-ocr:q8_0", ocrEnabled: true, }, + maxConcurrentTasks: 1, }; function loadConfigFile(): Partial { @@ -282,6 +284,7 @@ function mergeEnv(config: AppConfig): AppConfig { ocrModel: process.env.FILE_PROCESSOR_OCR_MODEL ?? config.fileProcessor.ocrModel, ocrEnabled: process.env.FILE_PROCESSOR_OCR_ENABLED === "false" ? false : config.fileProcessor.ocrEnabled, }, + maxConcurrentTasks: Number(process.env.MAX_CONCURRENT_TASKS) || config.maxConcurrentTasks, }; }