fix: keep cron heartbeat wakes on session route

This commit is contained in:
Peter Steinberger
2026-05-11 15:53:22 +01:00
parent 7b28a52676
commit 402fa0e935
4 changed files with 173 additions and 7 deletions

View File

@@ -86,7 +86,7 @@ export type CronServiceDeps = {
agentId?: string;
sessionKey?: string;
/** Optional heartbeat config override (e.g. target: "last" for cron-triggered heartbeats). */
heartbeat?: { target?: string };
heartbeat?: HeartbeatWakeRequest["heartbeat"];
}) => Promise<HeartbeatRunResult>;
/**
* WakeMode=now: max time to wait for runHeartbeatOnce to stop returning

View File

@@ -404,8 +404,122 @@ describe("buildGatewayCronService", () => {
reason: "cron:test",
agentId: "main",
sessionKey: "agent:main:discord:channel:ops",
heartbeat: { target: "last", to: undefined, accountId: undefined },
});
} finally {
state.cron.stop();
}
});
it("does not inherit explicit heartbeat destinations for direct target-last wakes", async () => {
const cfg = {
...createCronConfig("server-cron-direct-heartbeat-route"),
agents: {
defaults: {
heartbeat: {
every: "1h",
prompt: "Default heartbeat prompt",
target: "none",
directPolicy: "block",
to: "telegram:dm",
accountId: "default",
},
},
},
} as OpenClawConfig;
loadConfigMock.mockReturnValue(cfg);
const state = buildGatewayCronService({
cfg,
deps: {} as CliDeps,
broadcast: () => {},
});
try {
const cronDeps = (
state.cron as unknown as {
state?: {
deps?: {
runHeartbeatOnce?: (opts?: {
agentId?: string;
sessionKey?: string | null;
reason?: string;
heartbeat?: { target?: string };
}) => Promise<unknown>;
};
};
}
).state?.deps;
await cronDeps?.runHeartbeatOnce?.({
reason: "cron:test",
sessionKey: "telegram:group:123:topic:456",
heartbeat: { target: "last" },
});
const call = requireRecord(
callArg(runHeartbeatOnceMock, 0, 0, "heartbeat run options"),
"heartbeat run options",
);
expect(call.sessionKey).toBe("agent:main:telegram:group:123:topic:456");
expect(call.heartbeat).toEqual({
every: "1h",
prompt: "Default heartbeat prompt",
target: "last",
directPolicy: "block",
to: undefined,
accountId: undefined,
});
} finally {
state.cron.stop();
}
});
it("does not inherit explicit heartbeat destinations for queued target-last wakes", async () => {
const cfg = {
...createCronConfig("server-cron-queued-heartbeat-route"),
agents: {
defaults: {
heartbeat: {
every: "1h",
prompt: "Default heartbeat prompt",
target: "none",
directPolicy: "block",
to: "telegram:dm",
accountId: "default",
},
},
},
} as OpenClawConfig;
loadConfigMock.mockReturnValue(cfg);
const state = buildGatewayCronService({
cfg,
deps: {} as CliDeps,
broadcast: () => {},
});
try {
const job = await state.cron.add({
name: "queued-heartbeat-route",
enabled: true,
schedule: { kind: "at", at: new Date(1).toISOString() },
sessionTarget: "main",
wakeMode: "next-heartbeat",
sessionKey: "telegram:group:123:topic:456",
payload: { kind: "systemEvent", text: "hello" },
});
await state.cron.run(job.id, "force");
const call = requireRecord(
callArg(requestHeartbeatMock, 0, 0, "heartbeat request"),
"heartbeat request",
);
expect(call.sessionKey).toBe("agent:main:telegram:group:123:topic:456");
expect(call.heartbeat).toEqual({
target: "last",
to: undefined,
accountId: undefined,
});
} finally {
state.cron.stop();
}

View File

@@ -9,6 +9,7 @@ import {
resolveAgentMainSessionKey,
} from "../config/sessions.js";
import { resolveStorePath } from "../config/sessions/paths.js";
import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
import {
@@ -60,6 +61,25 @@ function pickDefined<T extends Record<string, unknown>>(
return result;
}
function omitExplicitHeartbeatDestination(
heartbeat: AgentDefaultsConfig["heartbeat"] | undefined,
): AgentDefaultsConfig["heartbeat"] | undefined {
if (!heartbeat) {
return undefined;
}
return {
...heartbeat,
to: undefined,
accountId: undefined,
};
}
function sanitizeCronHeartbeatOverride(
heartbeat: AgentDefaultsConfig["heartbeat"] | undefined,
): AgentDefaultsConfig["heartbeat"] | undefined {
return heartbeat?.target === "last" ? omitExplicitHeartbeatDestination(heartbeat) : heartbeat;
}
/** Map internal CronJob to the public plugin SDK shape. */
function toPluginCronJob(job: CronJob): PluginHookGatewayCronJob {
return {
@@ -203,6 +223,28 @@ export function buildGatewayCronService(params: {
return { runtimeConfig, agentId, sessionKey };
};
const resolveCronHeartbeatOverride = (params: {
runtimeConfig: OpenClawConfig;
agentId?: string;
heartbeat?: AgentDefaultsConfig["heartbeat"];
}) => {
if (!params.heartbeat) {
return undefined;
}
const agentEntry =
params.agentId !== undefined
? findAgentEntry(params.runtimeConfig, params.agentId)
: undefined;
const agentHeartbeat =
agentEntry && typeof agentEntry === "object" ? agentEntry.heartbeat : undefined;
const baseHeartbeat = {
...params.runtimeConfig.agents?.defaults?.heartbeat,
...agentHeartbeat,
};
const heartbeatOverride = { ...baseHeartbeat, ...params.heartbeat };
return sanitizeCronHeartbeatOverride(heartbeatOverride);
};
const defaultAgentId = resolveDefaultAgentId(params.cfg);
const runLogPrune = resolveCronRunLogPruneOptions(params.cfg.cron?.runLog);
const resolveSessionStorePath = (agentId?: string) =>
@@ -257,7 +299,7 @@ export function buildGatewayCronService(params: {
reason: opts?.reason,
agentId,
sessionKey,
heartbeat: opts?.heartbeat,
heartbeat: sanitizeCronHeartbeatOverride(opts?.heartbeat),
});
},
runHeartbeatOnce: async (opts) => {
@@ -269,7 +311,11 @@ export function buildGatewayCronService(params: {
reason: opts?.reason,
agentId,
sessionKey,
heartbeat: opts?.heartbeat,
heartbeat: resolveCronHeartbeatOverride({
runtimeConfig,
agentId,
heartbeat: opts?.heartbeat,
}),
deps: { ...params.deps, runtime: defaultRuntime },
});
},

View File

@@ -41,13 +41,19 @@ export type HeartbeatWakeSource =
| "retry"
| "other";
export type HeartbeatWakeOverride = {
target?: string;
to?: string | undefined;
accountId?: string | undefined;
};
export type HeartbeatWakeRequest = {
source: HeartbeatWakeSource;
intent: HeartbeatWakeIntent;
reason?: string;
agentId?: string;
sessionKey?: string;
heartbeat?: { target?: string };
heartbeat?: HeartbeatWakeOverride;
};
export type HeartbeatWakeHandler = (opts: HeartbeatWakeRequest) => Promise<HeartbeatRunResult>;
@@ -71,7 +77,7 @@ type PendingWakeReason = {
requestedAt: number;
agentId?: string;
sessionKey?: string;
heartbeat?: { target?: string };
heartbeat?: HeartbeatWakeOverride;
};
let handler: HeartbeatWakeHandler | null = null;
@@ -135,7 +141,7 @@ function queuePendingWakeReason(params: {
requestedAt?: number;
agentId?: string;
sessionKey?: string;
heartbeat?: { target?: string };
heartbeat?: HeartbeatWakeOverride;
}) {
const requestedAt = params.requestedAt ?? Date.now();
const normalizedReason = normalizeWakeReason(params.reason);
@@ -312,7 +318,7 @@ export function requestHeartbeat(opts: {
coalesceMs?: number;
agentId?: string;
sessionKey?: string;
heartbeat?: { target?: string };
heartbeat?: HeartbeatWakeOverride;
}) {
queuePendingWakeReason({
source: opts.source,