AO-07: Configurable Concurrency Management and AO-06: History Anchoring

This commit is contained in:
larchanka
2026-02-22 22:31:57 +01:00
committed by Mikhail Larchanka
parent 2d05e67ac7
commit 6b90d4fef9
5 changed files with 232 additions and 9 deletions

View File

@@ -0,0 +1,11 @@
# AO-06 History Anchoring for Synthetic Tasks
## Context
Autonomous tasks should have access to relevant conversation history.
## Proposed Changes
- [ ] Ensure `runTaskPipeline` correctly picks up history for `chatId` when triggered by cron.
- [ ] Index the results of autonomous tasks into the shared session memory.
## Verification
- Ask a follow-up question in Telegram about an autonomous task's result to confirm context visibility.

View File

@@ -0,0 +1,14 @@
# AO-07 Configurable Concurrency Management
## Context
Ensure the system doesn't overwhelm local resources during peak autonomous activity.
## Proposed Changes
- [ ] Add `maxConcurrentTasks` to `config.json` (Default: 1).
- [ ] Implement a `TaskQueue` in Orchestrator.
- [ ] Implement logic where `0` means infinite concurrency.
- [ ] Ensure human goals are prioritized or balanced with synthetic goals.
## Verification
- Set concurrency to 1. Trigger multiple cron jobs simultaneously. Verify they execute sequentially.
- Set concurrency to 0. Verify they start simultaneously.

View File

@@ -0,0 +1,128 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { Orchestrator } from '../orchestrator';
import { getConfig } from '../../shared/config';
// Mock dependencies
vi.mock('../../shared/config', () => ({
getConfig: vi.fn(),
}));
vi.mock('../../utils/console-logger', () => ({
ConsoleLogger: {
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
ipc: vi.fn(),
success: vi.fn(),
},
}));
// We need to mock the adapter and services that Orchestrator initializes in its constructor
vi.mock('../../adapters/ollama-adapter', () => ({
OllamaAdapter: vi.fn().mockImplementation(function () { return {}; }),
}));
vi.mock('../model-router', () => ({
ModelRouter: vi.fn().mockImplementation(function () { return {}; }),
}));
vi.mock('../../services/model-manager', () => ({
ModelManagerService: vi.fn().mockImplementation(function () { return {}; }),
}));
describe('Orchestrator Concurrency', () => {
let orchestrator: Orchestrator;
beforeEach(() => {
vi.clearAllMocks();
(getConfig as any).mockReturnValue({
maxConcurrentTasks: 1,
ollama: { baseUrl: 'http://localhost:11434', timeoutMs: 60000 },
modelRouter: { modelTiers: {} },
});
orchestrator = new Orchestrator();
});
it('should process human tasks sequentially when limit is 1', async () => {
const runTaskPipelineSpy = vi.spyOn(orchestrator as any, 'runTaskPipeline');
// Controlled promise to simulate long running task
let resolveTask1: (value: unknown) => void;
const task1Promise = new Promise((resolve) => { resolveTask1 = resolve; });
runTaskPipelineSpy.mockImplementationOnce(() => task1Promise);
runTaskPipelineSpy.mockImplementationOnce(() => Promise.resolve());
// Enqueue two human tasks (priority 1)
(orchestrator as any).enqueueTask({ chatId: 1, userId: 1, goal: 'task1', priority: 1 });
(orchestrator as any).enqueueTask({ chatId: 1, userId: 1, goal: 'task2', priority: 1 });
// Verify only task 1 started
expect(runTaskPipelineSpy).toHaveBeenCalledTimes(1);
expect(runTaskPipelineSpy).toHaveBeenCalledWith(1, 1, 'task1', undefined, undefined);
// Complete task 1
resolveTask1!(null);
await new Promise(process.nextTick); // Let promises settle
// Verify task 2 started
expect(runTaskPipelineSpy).toHaveBeenCalledTimes(2);
expect(runTaskPipelineSpy).toHaveBeenLastCalledWith(1, 1, 'task2', undefined, undefined);
});
it('should prioritize human tasks over synthetic tasks', async () => {
const runTaskPipelineSpy = vi.spyOn(orchestrator as any, 'runTaskPipeline');
// Controlled promise to simulate long running task
let resolveTask1: (value: unknown) => void;
const task1Promise = new Promise((resolve) => { resolveTask1 = resolve; });
runTaskPipelineSpy.mockImplementationOnce(() => task1Promise);
runTaskPipelineSpy.mockImplementation(() => Promise.resolve());
// 1. Start a slow synthetic task (priority 0)
(orchestrator as any).enqueueTask({ chatId: 1, userId: 1, goal: 'synthetic1', priority: 0 });
// 2. Enqueue another synthetic task
(orchestrator as any).enqueueTask({ chatId: 1, userId: 1, goal: 'synthetic2', priority: 0 });
// 3. Enqueue a human task (priority 1)
(orchestrator as any).enqueueTask({ chatId: 1, userId: 1, goal: 'human1', priority: 1 });
// Verify only synthetic1 started
expect(runTaskPipelineSpy).toHaveBeenCalledTimes(1);
expect(runTaskPipelineSpy).toHaveBeenCalledWith(1, 1, 'synthetic1', undefined, undefined);
// Controlled promise to simulate long running task 2 (human)
let resolveTask2: (value: unknown) => void;
const task2Promise = new Promise((resolve) => { resolveTask2 = resolve; });
runTaskPipelineSpy.mockImplementationOnce(() => task2Promise);
// Complete synthetic1
resolveTask1!(null);
await new Promise(process.nextTick);
// Verify human1 started next (pushed to front)
expect(runTaskPipelineSpy).toHaveBeenCalledTimes(2);
expect(runTaskPipelineSpy).toHaveBeenCalledWith(1, 1, 'human1', undefined, undefined);
// Complete human1
resolveTask2!(null);
await new Promise(process.nextTick);
expect(runTaskPipelineSpy).toHaveBeenCalledTimes(3);
expect(runTaskPipelineSpy).toHaveBeenCalledWith(1, 1, 'synthetic2', undefined, undefined);
});
it('should run immediately if limit is 0', async () => {
(getConfig as any).mockReturnValue({
maxConcurrentTasks: 0,
});
const runTaskPipelineSpy = vi.spyOn(orchestrator as any, 'runTaskPipeline').mockResolvedValue(null);
(orchestrator as any).enqueueTask({ chatId: 1, userId: 1, goal: 'task1', priority: 1 });
(orchestrator as any).enqueueTask({ chatId: 2, userId: 2, goal: 'task2', priority: 1 });
expect(runTaskPipelineSpy).toHaveBeenCalledTimes(2);
});
});

View File

@@ -57,6 +57,17 @@ export class Orchestrator {
private readonly pending = new Map<string, { resolve: PendingResolve; reject: PendingReject }>();
private readonly modelManager: ModelManagerService;
// Concurrency & Queueing
private taskQueue: Array<{
chatId: number;
userId: number;
goal: string;
conversationId?: string | undefined;
initialTaskId?: string | undefined;
priority: number;
}> = [];
private activeTaskCount = 0;
constructor() {
const ollama = new OllamaAdapter();
const modelRouter = new ModelRouter();
@@ -264,20 +275,70 @@ export class Orchestrator {
const userId = payload.userId as number | undefined;
if (goal != null && chatId != null) {
const conversationId = payload.conversationId as string | undefined;
this.runTaskPipeline(chatId, userId ?? 0, goal, conversationId).catch((err) => {
ConsoleLogger.error("core", "Task pipeline error", err instanceof Error ? err : String(err), envelope);
this.sendToTelegram(chatId, `Error: ${err instanceof Error ? err.message : String(err)}`);
// Route to task queue (priority 1 for human)
this.enqueueTask({
chatId,
userId: userId ?? 0,
goal,
conversationId,
priority: 1
});
}
return;
}
// FP-10: Handle file ingest from Telegram adapter
if (type === "file.ingest" && fromProcess === "telegram-adapter") {
const p = (payload as unknown) as FileIngestPayload;
const p = payload as unknown as FileIngestPayload;
this.handleFileIngest(p).catch((err) => {
ConsoleLogger.error("core", "File ingest error", err instanceof Error ? err : String(err), envelope);
this.sendToTelegram(p.chatId, `File processing error: ${err instanceof Error ? err.message : String(err)}`);
});
return;
}
}
private enqueueTask(task: {
chatId: number;
userId: number;
goal: string;
conversationId?: string | undefined;
initialTaskId?: string | undefined;
priority: number;
}): void {
const limit = getConfig().maxConcurrentTasks;
if (limit === 0) {
// Infinite concurrency
this.runTaskPipeline(task.chatId, task.userId, task.goal, task.conversationId, task.initialTaskId);
return;
}
// Insert into queue with priority (Human priority 1 > Synthetic priority 0)
if (task.priority > 0) {
// Human goals go to the front
this.taskQueue.unshift(task);
} else {
// Synthetic goals go to the back
this.taskQueue.push(task);
}
this.processQueue();
}
private processQueue(): void {
const limit = getConfig().maxConcurrentTasks;
if (limit === 0) return; // Managed by direct execution in enqueueTask
while (this.activeTaskCount < limit && this.taskQueue.length > 0) {
const task = this.taskQueue.shift()!;
this.activeTaskCount++;
this.runTaskPipeline(task.chatId, task.userId, task.goal, task.conversationId, task.initialTaskId)
.finally(() => {
this.activeTaskCount--;
this.processQueue();
});
}
}
@@ -801,10 +862,14 @@ export class Orchestrator {
ConsoleLogger.info("core", `Triggering autonomous AI task: "${query}" for chatId ${chatIdNum} (taskId: ${taskId})`);
// Route to task pipeline
this.runTaskPipeline(chatIdNum, userIdNum, query, String(chatIdNum), taskId).catch((err) => {
ConsoleLogger.error("core", "Autonomous task pipeline error", err instanceof Error ? err : String(err), envelope);
this.sendToTelegram(chatIdNum, `Autonomous Task Failed: ${err instanceof Error ? err.message : String(err)}`);
// Route to task queue (priority 0 for synthetic)
this.enqueueTask({
chatId: chatIdNum,
userId: userIdNum,
goal: query,
conversationId: String(chatIdNum),
initialTaskId: taskId,
priority: 0
});
}
@@ -1075,4 +1140,6 @@ function main(): void {
});
}
main();
if (typeof require !== 'undefined' && require.main === module) {
main();
}

View File

@@ -133,6 +133,7 @@ export interface AppConfig {
skills: SkillsConfig;
whisper: WhisperConfig;
fileProcessor: FileProcessorConfig;
maxConcurrentTasks: number;
}
const DEFAULT_CONFIG: AppConfig = {
@@ -200,6 +201,7 @@ const DEFAULT_CONFIG: AppConfig = {
ocrModel: "glm-ocr:q8_0",
ocrEnabled: true,
},
maxConcurrentTasks: 1,
};
function loadConfigFile(): Partial<AppConfig> {
@@ -282,6 +284,7 @@ function mergeEnv(config: AppConfig): AppConfig {
ocrModel: process.env.FILE_PROCESSOR_OCR_MODEL ?? config.fileProcessor.ocrModel,
ocrEnabled: process.env.FILE_PROCESSOR_OCR_ENABLED === "false" ? false : config.fileProcessor.ocrEnabled,
},
maxConcurrentTasks: Number(process.env.MAX_CONCURRENT_TASKS) || config.maxConcurrentTasks,
};
}