From 402fa0e935dfcf9c442815d4ebe8065cf995dee7 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 11 May 2026 15:53:22 +0100 Subject: [PATCH] fix: keep cron heartbeat wakes on session route --- src/cron/service/state.ts | 2 +- src/gateway/server-cron.test.ts | 114 ++++++++++++++++++++++++++++++++ src/gateway/server-cron.ts | 50 +++++++++++++- src/infra/heartbeat-wake.ts | 14 ++-- 4 files changed, 173 insertions(+), 7 deletions(-) diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index b384550e7aa..ffbd3ca0f04 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -86,7 +86,7 @@ export type CronServiceDeps = { agentId?: string; sessionKey?: string; /** Optional heartbeat config override (e.g. target: "last" for cron-triggered heartbeats). */ - heartbeat?: { target?: string }; + heartbeat?: HeartbeatWakeRequest["heartbeat"]; }) => Promise; /** * WakeMode=now: max time to wait for runHeartbeatOnce to stop returning diff --git a/src/gateway/server-cron.test.ts b/src/gateway/server-cron.test.ts index 70eb3c42443..470bb8ceebb 100644 --- a/src/gateway/server-cron.test.ts +++ b/src/gateway/server-cron.test.ts @@ -404,8 +404,122 @@ describe("buildGatewayCronService", () => { reason: "cron:test", agentId: "main", sessionKey: "agent:main:discord:channel:ops", + heartbeat: { target: "last", to: undefined, accountId: undefined }, + }); + } finally { + state.cron.stop(); + } + }); + + it("does not inherit explicit heartbeat destinations for direct target-last wakes", async () => { + const cfg = { + ...createCronConfig("server-cron-direct-heartbeat-route"), + agents: { + defaults: { + heartbeat: { + every: "1h", + prompt: "Default heartbeat prompt", + target: "none", + directPolicy: "block", + to: "telegram:dm", + accountId: "default", + }, + }, + }, + } as OpenClawConfig; + loadConfigMock.mockReturnValue(cfg); + + const state = buildGatewayCronService({ + cfg, + deps: {} as CliDeps, + broadcast: () => {}, + }); + try { + const cronDeps = ( + state.cron as unknown as { + state?: { + deps?: { + runHeartbeatOnce?: (opts?: { + agentId?: string; + sessionKey?: string | null; + reason?: string; + heartbeat?: { target?: string }; + }) => Promise; + }; + }; + } + ).state?.deps; + + await cronDeps?.runHeartbeatOnce?.({ + reason: "cron:test", + sessionKey: "telegram:group:123:topic:456", heartbeat: { target: "last" }, }); + + const call = requireRecord( + callArg(runHeartbeatOnceMock, 0, 0, "heartbeat run options"), + "heartbeat run options", + ); + expect(call.sessionKey).toBe("agent:main:telegram:group:123:topic:456"); + expect(call.heartbeat).toEqual({ + every: "1h", + prompt: "Default heartbeat prompt", + target: "last", + directPolicy: "block", + to: undefined, + accountId: undefined, + }); + } finally { + state.cron.stop(); + } + }); + + it("does not inherit explicit heartbeat destinations for queued target-last wakes", async () => { + const cfg = { + ...createCronConfig("server-cron-queued-heartbeat-route"), + agents: { + defaults: { + heartbeat: { + every: "1h", + prompt: "Default heartbeat prompt", + target: "none", + directPolicy: "block", + to: "telegram:dm", + accountId: "default", + }, + }, + }, + } as OpenClawConfig; + loadConfigMock.mockReturnValue(cfg); + + const state = buildGatewayCronService({ + cfg, + deps: {} as CliDeps, + broadcast: () => {}, + }); + try { + const job = await state.cron.add({ + name: "queued-heartbeat-route", + enabled: true, + schedule: { kind: "at", at: new Date(1).toISOString() }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + sessionKey: "telegram:group:123:topic:456", + payload: { kind: "systemEvent", text: "hello" }, + }); + + await state.cron.run(job.id, "force"); + + const call = requireRecord( + callArg(requestHeartbeatMock, 0, 0, "heartbeat request"), + "heartbeat request", + ); + expect(call.sessionKey).toBe("agent:main:telegram:group:123:topic:456"); + expect(call.heartbeat).toEqual({ + target: "last", + to: undefined, + accountId: undefined, + }); } finally { state.cron.stop(); } diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index 22dd18a72f6..1893694a1c4 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -9,6 +9,7 @@ import { resolveAgentMainSessionKey, } from "../config/sessions.js"; import { resolveStorePath } from "../config/sessions/paths.js"; +import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js"; import { @@ -60,6 +61,25 @@ function pickDefined>( return result; } +function omitExplicitHeartbeatDestination( + heartbeat: AgentDefaultsConfig["heartbeat"] | undefined, +): AgentDefaultsConfig["heartbeat"] | undefined { + if (!heartbeat) { + return undefined; + } + return { + ...heartbeat, + to: undefined, + accountId: undefined, + }; +} + +function sanitizeCronHeartbeatOverride( + heartbeat: AgentDefaultsConfig["heartbeat"] | undefined, +): AgentDefaultsConfig["heartbeat"] | undefined { + return heartbeat?.target === "last" ? omitExplicitHeartbeatDestination(heartbeat) : heartbeat; +} + /** Map internal CronJob to the public plugin SDK shape. */ function toPluginCronJob(job: CronJob): PluginHookGatewayCronJob { return { @@ -203,6 +223,28 @@ export function buildGatewayCronService(params: { return { runtimeConfig, agentId, sessionKey }; }; + const resolveCronHeartbeatOverride = (params: { + runtimeConfig: OpenClawConfig; + agentId?: string; + heartbeat?: AgentDefaultsConfig["heartbeat"]; + }) => { + if (!params.heartbeat) { + return undefined; + } + const agentEntry = + params.agentId !== undefined + ? findAgentEntry(params.runtimeConfig, params.agentId) + : undefined; + const agentHeartbeat = + agentEntry && typeof agentEntry === "object" ? agentEntry.heartbeat : undefined; + const baseHeartbeat = { + ...params.runtimeConfig.agents?.defaults?.heartbeat, + ...agentHeartbeat, + }; + const heartbeatOverride = { ...baseHeartbeat, ...params.heartbeat }; + return sanitizeCronHeartbeatOverride(heartbeatOverride); + }; + const defaultAgentId = resolveDefaultAgentId(params.cfg); const runLogPrune = resolveCronRunLogPruneOptions(params.cfg.cron?.runLog); const resolveSessionStorePath = (agentId?: string) => @@ -257,7 +299,7 @@ export function buildGatewayCronService(params: { reason: opts?.reason, agentId, sessionKey, - heartbeat: opts?.heartbeat, + heartbeat: sanitizeCronHeartbeatOverride(opts?.heartbeat), }); }, runHeartbeatOnce: async (opts) => { @@ -269,7 +311,11 @@ export function buildGatewayCronService(params: { reason: opts?.reason, agentId, sessionKey, - heartbeat: opts?.heartbeat, + heartbeat: resolveCronHeartbeatOverride({ + runtimeConfig, + agentId, + heartbeat: opts?.heartbeat, + }), deps: { ...params.deps, runtime: defaultRuntime }, }); }, diff --git a/src/infra/heartbeat-wake.ts b/src/infra/heartbeat-wake.ts index 572421a2609..a3cbd6102a5 100644 --- a/src/infra/heartbeat-wake.ts +++ b/src/infra/heartbeat-wake.ts @@ -41,13 +41,19 @@ export type HeartbeatWakeSource = | "retry" | "other"; +export type HeartbeatWakeOverride = { + target?: string; + to?: string | undefined; + accountId?: string | undefined; +}; + export type HeartbeatWakeRequest = { source: HeartbeatWakeSource; intent: HeartbeatWakeIntent; reason?: string; agentId?: string; sessionKey?: string; - heartbeat?: { target?: string }; + heartbeat?: HeartbeatWakeOverride; }; export type HeartbeatWakeHandler = (opts: HeartbeatWakeRequest) => Promise; @@ -71,7 +77,7 @@ type PendingWakeReason = { requestedAt: number; agentId?: string; sessionKey?: string; - heartbeat?: { target?: string }; + heartbeat?: HeartbeatWakeOverride; }; let handler: HeartbeatWakeHandler | null = null; @@ -135,7 +141,7 @@ function queuePendingWakeReason(params: { requestedAt?: number; agentId?: string; sessionKey?: string; - heartbeat?: { target?: string }; + heartbeat?: HeartbeatWakeOverride; }) { const requestedAt = params.requestedAt ?? Date.now(); const normalizedReason = normalizeWakeReason(params.reason); @@ -312,7 +318,7 @@ export function requestHeartbeat(opts: { coalesceMs?: number; agentId?: string; sessionKey?: string; - heartbeat?: { target?: string }; + heartbeat?: HeartbeatWakeOverride; }) { queuePendingWakeReason({ source: opts.source,