feat(AO-05): implement synthetic goal ingestion with unique taskId

This commit is contained in:
larchanka
2026-02-22 22:23:15 +01:00
parent f0267f9932
commit 7e337216f4
2 changed files with 126 additions and 2 deletions

View File

@@ -0,0 +1,88 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { Orchestrator } from "../orchestrator.js";
import { randomUUID } from "node:crypto";
import { PROTOCOL_VERSION } from "../../shared/protocol.js";
// Mock dependencies
vi.mock("../../shared/config.js", () => ({
getConfig: () => ({
ollama: { baseUrl: "http://localhost:11434" },
modelRouter: { plannerComplexity: "small" },
taskMemory: { dbPath: ":memory:" },
cron: { dbPath: ":memory:" }
})
}));
const mockConsoleLogger = vi.hoisted(() => ({
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
ipc: vi.fn(),
processEvent: vi.fn()
}));
vi.mock("../../utils/console-logger.js", () => ({
ConsoleLogger: mockConsoleLogger
}));
// We need to partially mock Orchestrator to avoid spawning real children in this test
describe("Orchestrator Autonomous Ingestion", () => {
let orchestrator: any;
beforeEach(() => {
orchestrator = new Orchestrator();
// Mock runTaskPipeline to just resolve
orchestrator.runTaskPipeline = vi.fn().mockResolvedValue(undefined);
orchestrator.sendToTelegram = vi.fn();
vi.clearAllMocks();
});
it("should handle event.cron.ai_query and route to pipeline with a new taskId", () => {
const query = "What is the time?";
const chatId = 12345;
const userId = 67890;
const envelope = {
id: randomUUID(),
timestamp: Date.now(),
from: "cron-manager",
to: "core",
type: "event.cron.ai_query",
version: PROTOCOL_VERSION,
payload: {
query,
chatId,
userId
}
};
orchestrator.handleCronAIQueryEvent(envelope);
expect(orchestrator.runTaskPipeline).toHaveBeenCalledWith(
chatId,
userId,
query,
String(chatId),
expect.any(String) // taskId
);
});
it("should log a warning if query or chatId is missing", () => {
const envelope = {
id: randomUUID(),
from: "cron-manager",
type: "event.cron.ai_query",
payload: { chatId: 123 } // missing query
};
orchestrator.handleCronAIQueryEvent(envelope as any);
expect(mockConsoleLogger.warn).toHaveBeenCalledWith(
"core",
expect.stringContaining("missing query"),
expect.anything()
);
expect(orchestrator.runTaskPipeline).not.toHaveBeenCalled();
});
});

View File

@@ -151,6 +151,12 @@ export class Orchestrator {
}
return;
}
// Handle cron AI query events from cron-manager
if (fromProcess === "cron-manager" && envelope.type === "event.cron.ai_query") {
this.handleCronAIQueryEvent(envelope);
return;
}
// Handle cron reminder events from cron-manager
if (fromProcess === "cron-manager" && envelope.type === "event.cron.completed") {
this.handleCronReminderEvent(envelope);
@@ -277,7 +283,13 @@ export class Orchestrator {
private static readonly MAX_PLAN_RETRIES = 2; // 3 attempts total (0, 1, 2)
private async runTaskPipeline(chatId: number, userId: number, goal: string, conversationId?: string): Promise<void> {
private async runTaskPipeline(
chatId: number,
userId: number,
goal: string,
conversationId?: string,
initialTaskId?: string
): Promise<void> {
const planner = this.children.get("planner");
const taskMemory = this.children.get("task-memory");
const executor = this.children.get("executor");
@@ -330,7 +342,7 @@ export class Orchestrator {
}
for (let attempt = 0; attempt <= Orchestrator.MAX_PLAN_RETRIES; attempt++) {
const taskId = randomUUID();
const taskId = (attempt === 0 && initialTaskId) ? initialTaskId : randomUUID();
const isRetry = attempt > 0;
if (isRetry) {
this.sendToTelegram(chatId, "Re-planning with error feedback...", true);
@@ -772,6 +784,30 @@ export class Orchestrator {
this.sendToTelegram(chatIdNum, formattedMessage);
}
private handleCronAIQueryEvent(envelope: Envelope): void {
const payload = envelope.payload as Record<string, unknown>;
const query = payload.query as string | undefined;
const chatId = payload.chatId as number | string | undefined;
const userId = payload.userId as number | string | undefined;
if (!query || !chatId) {
ConsoleLogger.warn("core", "event.cron.ai_query missing query or chatId", envelope);
return;
}
const chatIdNum = typeof chatId === "string" ? parseInt(chatId, 10) : chatId;
const userIdNum = typeof userId === "string" ? parseInt(userId, 10) : (userId ?? 0);
const taskId = randomUUID();
ConsoleLogger.info("core", `Triggering autonomous AI task: "${query}" for chatId ${chatIdNum} (taskId: ${taskId})`);
// Route to task pipeline
this.runTaskPipeline(chatIdNum, userIdNum, query, String(chatIdNum), taskId).catch((err) => {
ConsoleLogger.error("core", "Autonomous task pipeline error", err instanceof Error ? err : String(err), envelope);
this.sendToTelegram(chatIdNum, `Autonomous Task Failed: ${err instanceof Error ? err.message : String(err)}`);
});
}
private async handleListReminders(chatId: number, _request: Envelope): Promise<void> {
ConsoleLogger.info("core", `handleListReminders called for chatId: ${chatId}`);
const cronManager = this.children.get("cron-manager");