fix: track diagnostic progress before stuck warnings

This commit is contained in:
Peter Steinberger
2026-05-02 00:35:10 +01:00
parent 42b7b2b924
commit 2d8d50d418
20 changed files with 289 additions and 36 deletions

View File

@@ -25,6 +25,7 @@ Docs: https://docs.openclaw.ai
- Channels/status reactions: remove stale non-terminal lifecycle reactions when a run reaches done or error, so Discord does not leave a permanent thinking emoji after completion. Fixes #75458. Thanks @davelutztx. - Channels/status reactions: remove stale non-terminal lifecycle reactions when a run reaches done or error, so Discord does not leave a permanent thinking emoji after completion. Fixes #75458. Thanks @davelutztx.
- Discord/doctor: migrate unsupported per-channel `agentId` entries under guild channel config into top-level `bindings[]` routes, so `openclaw doctor --fix` preserves the intended agent route instead of stripping it as an unknown key. Fixes #62455. Thanks @lobster-biscuit. - Discord/doctor: migrate unsupported per-channel `agentId` entries under guild channel config into top-level `bindings[]` routes, so `openclaw doctor --fix` preserves the intended agent route instead of stripping it as an unknown key. Fixes #62455. Thanks @lobster-biscuit.
- Gateway/config: log config health-state write failures instead of silently hiding config observe-recovery write errors. Thanks @sallyom. - Gateway/config: log config health-state write failures instead of silently hiding config observe-recovery write errors. Thanks @sallyom.
- Diagnostics: reset stuck-session timers on reply, tool, status, block, and ACP progress events, and back off repeated `session.stuck` diagnostics while a session remains unchanged. Supersedes #72010. Thanks @rubencu.
## 2026.4.30 ## 2026.4.30

View File

@@ -1,4 +1,4 @@
74530fefef9ed55cab302802bc0be413ec56929e73c12d4bf4f1e4d290813adc config-baseline.json 8bbb620e445cba64aa8a451cfc1a7142ac24e8c80088d74a2fc813ee9e221680 config-baseline.json
21db87c2ebec8844e20bf66ea474c08f3adab842234ff334870fe3e8d87995b4 config-baseline.core.json d145a87759d16d5f58873db337a25cb134ab25e776cd454812dca99bb9cb12a7 config-baseline.core.json
c401cd3450f1737bc92418cfea301d20b54b7fbef9e6049834acc01af338e538 config-baseline.channel.json c401cd3450f1737bc92418cfea301d20b54b7fbef9e6049834acc01af338e538 config-baseline.channel.json
7731a0b93cb335b56fac4c807447ba659fea51ea7a6cd844dc0ef5616669ee75 config-baseline.plugin.json 7731a0b93cb335b56fac4c807447ba659fea51ea7a6cd844dc0ef5616669ee75 config-baseline.plugin.json

View File

@@ -164,7 +164,7 @@ surfaces, while Codex native hooks remain a separate lower-level Codex mechanism
- `agent.wait` default: 30s (just the wait). `timeoutMs` param overrides. - `agent.wait` default: 30s (just the wait). `timeoutMs` param overrides.
- Agent runtime: `agents.defaults.timeoutSeconds` default 172800s (48 hours); enforced in `runEmbeddedPiAgent` abort timer. - Agent runtime: `agents.defaults.timeoutSeconds` default 172800s (48 hours); enforced in `runEmbeddedPiAgent` abort timer.
- Cron runtime: isolated agent-turn `timeoutSeconds` is owned by cron. The scheduler starts that timer when execution begins, aborts the underlying run at the configured deadline, then runs bounded cleanup before recording the timeout so a stale child session cannot keep the lane stuck. - Cron runtime: isolated agent-turn `timeoutSeconds` is owned by cron. The scheduler starts that timer when execution begins, aborts the underlying run at the configured deadline, then runs bounded cleanup before recording the timeout so a stale child session cannot keep the lane stuck.
- Session liveness diagnostics: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` classifies long `processing` sessions. Active embedded runs, model calls, and tool calls report as `session.long_running`; active work with no recent progress reports as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work, and only that path releases the affected session lane so queued startup work can drain. - Session liveness diagnostics: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` classifies long `processing` sessions that have no observed reply, tool, status, block, or ACP progress. Active embedded runs, model calls, and tool calls report as `session.long_running`; active work with no recent progress reports as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work, and only that path releases the affected session lane so queued startup work can drain. Repeated `session.stuck` diagnostics back off while the session remains unchanged.
- Model idle timeout: OpenClaw aborts a model request when no response chunks arrive before the idle window. `models.providers.<id>.timeoutSeconds` extends this idle watchdog for slow local/self-hosted providers; otherwise OpenClaw uses `agents.defaults.timeoutSeconds` when configured, capped at 120s by default. Cron-triggered runs with no explicit model or agent timeout disable the idle watchdog and rely on the cron outer timeout. - Model idle timeout: OpenClaw aborts a model request when no response chunks arrive before the idle window. `models.providers.<id>.timeoutSeconds` extends this idle watchdog for slow local/self-hosted providers; otherwise OpenClaw uses `agents.defaults.timeoutSeconds` when configured, capped at 120s by default. Cron-triggered runs with no explicit model or agent timeout disable the idle watchdog and rely on the cron outer timeout.
- Provider HTTP request timeout: `models.providers.<id>.timeoutSeconds` applies to that provider's model HTTP fetches, including connect, headers, body, SDK request timeout, total guarded-fetch abort handling, and model stream idle watchdog. Use this for slow local/self-hosted providers such as Ollama before raising the whole agent runtime timeout. - Provider HTTP request timeout: `models.providers.<id>.timeoutSeconds` applies to that provider's model HTTP fetches, including connect, headers, body, SDK request timeout, total guarded-fetch abort handling, and model stream idle watchdog. Use this for slow local/self-hosted providers such as Ollama before raising the whole agent runtime timeout.

View File

@@ -115,7 +115,7 @@ keys.
- If commands seem stuck, enable verbose logs and look for “queued for …ms” lines to confirm the queue is draining. - If commands seem stuck, enable verbose logs and look for “queued for …ms” lines to confirm the queue is draining.
- If you need queue depth, enable verbose logs and watch for queue timing lines. - If you need queue depth, enable verbose logs and watch for queue timing lines.
- Codex app-server runs that accept a turn and then stop emitting progress are interrupted by the Codex adapter so the active session lane can release instead of waiting for the outer run timeout. - Codex app-server runs that accept a turn and then stop emitting progress are interrupted by the Codex adapter so the active session lane can release instead of waiting for the outer run timeout.
- When diagnostics are enabled, sessions that remain in `processing` past `diagnostics.stuckSessionWarnMs` are classified by current activity. Active work logs as `session.long_running`; active work with no recent progress logs as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work, and only that path can release the affected session lane so queued work drains. - When diagnostics are enabled, sessions that remain in `processing` past `diagnostics.stuckSessionWarnMs` with no observed reply, tool, status, block, or ACP progress are classified by current activity. Active work logs as `session.long_running`; active work with no recent progress logs as `session.stalled`; `session.stuck` is reserved for stale session bookkeeping with no active work, and only that path can release the affected session lane so queued work drains. Repeated `session.stuck` diagnostics back off while the session remains unchanged.
## Related ## Related

View File

@@ -937,7 +937,7 @@ Notes:
- `enabled`: master toggle for instrumentation output (default: `true`). - `enabled`: master toggle for instrumentation output (default: `true`).
- `flags`: array of flag strings enabling targeted log output (supports wildcards like `"telegram.*"` or `"*"`). - `flags`: array of flag strings enabling targeted log output (supports wildcards like `"telegram.*"` or `"*"`).
- `stuckSessionWarnMs`: age threshold in ms for classifying long-running processing sessions as `session.long_running`, `session.stalled`, or `session.stuck`. - `stuckSessionWarnMs`: no-progress age threshold in ms for classifying long-running processing sessions as `session.long_running`, `session.stalled`, or `session.stuck`. Reply, tool, status, block, and ACP progress reset the timer; repeated `session.stuck` diagnostics back off while unchanged.
- `otel.enabled`: enables the OpenTelemetry export pipeline (default: `false`). For the full configuration, signal catalog, and privacy model, see [OpenTelemetry export](/gateway/opentelemetry). - `otel.enabled`: enables the OpenTelemetry export pipeline (default: `false`). For the full configuration, signal catalog, and privacy model, see [OpenTelemetry export](/gateway/opentelemetry).
- `otel.endpoint`: collector URL for OTel export. - `otel.endpoint`: collector URL for OTel export.
- `otel.tracesEndpoint` / `otel.metricsEndpoint` / `otel.logsEndpoint`: optional signal-specific OTLP endpoints. When set, they override `otel.endpoint` for that signal only. - `otel.tracesEndpoint` / `otel.metricsEndpoint` / `otel.logsEndpoint`: optional signal-specific OTLP endpoints. When set, they override `otel.endpoint` for that signal only.

View File

@@ -5,7 +5,10 @@ import { createAcpTestConfig as createCfg } from "./test-fixtures/acp-runtime.js
type Delivery = { kind: string; text?: string }; type Delivery = { kind: string; text?: string };
function createProjectorHarness(cfgOverrides?: Parameters<typeof createCfg>[0]) { function createProjectorHarness(
cfgOverrides?: Parameters<typeof createCfg>[0],
opts?: { onProgress?: () => void },
) {
const deliveries: Delivery[] = []; const deliveries: Delivery[] = [];
const projector = createAcpReplyProjector({ const projector = createAcpReplyProjector({
cfg: createCfg(cfgOverrides), cfg: createCfg(cfgOverrides),
@@ -14,6 +17,7 @@ function createProjectorHarness(cfgOverrides?: Parameters<typeof createCfg>[0])
deliveries.push({ kind, text: payload.text }); deliveries.push({ kind, text: payload.text });
return true; return true;
}, },
onProgress: opts?.onProgress,
}); });
return { deliveries, projector }; return { deliveries, projector };
} }
@@ -175,6 +179,28 @@ async function runHiddenBoundaryCase(params: {
} }
describe("createAcpReplyProjector", () => { describe("createAcpReplyProjector", () => {
it("reports progress for ACP runtime events before delivery filtering", async () => {
const onProgress = vi.fn();
const { projector } = createProjectorHarness(undefined, { onProgress });
await projector.onEvent({
type: "text_delta",
stream: "thought",
text: "hidden reasoning",
tag: "agent_message_chunk",
});
await projector.onEvent({
type: "tool_call",
tag: "tool_call",
toolCallId: "tool-1",
status: "in_progress",
title: "Run command",
text: "Run command",
});
expect(onProgress).toHaveBeenCalledTimes(2);
});
it("coalesces text deltas into bounded block chunks", async () => { it("coalesces text deltas into bounded block chunks", async () => {
const { deliveries, projector } = createProjectorHarness(); const { deliveries, projector } = createProjectorHarness();

View File

@@ -173,6 +173,7 @@ export function createAcpReplyProjector(params: {
payload: ReplyPayload, payload: ReplyPayload,
meta?: AcpProjectedDeliveryMeta, meta?: AcpProjectedDeliveryMeta,
) => Promise<boolean>; ) => Promise<boolean>;
onProgress?: () => void;
provider?: string; provider?: string;
accountId?: string; accountId?: string;
}): AcpReplyProjector { }): AcpReplyProjector {
@@ -403,6 +404,7 @@ export function createAcpReplyProjector(params: {
}; };
const onEvent = async (event: AcpRuntimeEvent): Promise<void> => { const onEvent = async (event: AcpRuntimeEvent): Promise<void> => {
params.onProgress?.();
if (event.type === "text_delta") { if (event.type === "text_delta") {
if (event.stream && event.stream !== "output") { if (event.stream && event.stream !== "output") {
return; return;

View File

@@ -74,6 +74,10 @@ const mediaUnderstandingMocks = vi.hoisted(() => ({
applyMediaUnderstanding: vi.fn(async (_params: unknown) => undefined), applyMediaUnderstanding: vi.fn(async (_params: unknown) => undefined),
})); }));
const diagnosticMocks = vi.hoisted(() => ({
markDiagnosticSessionProgress: vi.fn(),
}));
const sessionMetaMocks = vi.hoisted(() => ({ const sessionMetaMocks = vi.hoisted(() => ({
readAcpSessionEntry: vi.fn< readAcpSessionEntry: vi.fn<
(params: { sessionKey: string; cfg?: OpenClawConfig }) => AcpSessionStoreEntry | null (params: { sessionKey: string; cfg?: OpenClawConfig }) => AcpSessionStoreEntry | null
@@ -168,6 +172,10 @@ vi.mock("./dispatch-acp-session.runtime.js", () => ({
sessionMetaMocks.readAcpSessionEntry(params), sessionMetaMocks.readAcpSessionEntry(params),
})); }));
vi.mock("../../logging/diagnostic.js", () => ({
markDiagnosticSessionProgress: diagnosticMocks.markDiagnosticSessionProgress,
}));
vi.mock("./dispatch-acp-transcript.runtime.js", () => ({ vi.mock("./dispatch-acp-transcript.runtime.js", () => ({
persistAcpDispatchTranscript: (params: unknown) => persistAcpDispatchTranscript: (params: unknown) =>
transcriptMocks.persistAcpDispatchTranscript(params), transcriptMocks.persistAcpDispatchTranscript(params),
@@ -374,6 +382,7 @@ describe("tryDispatchAcpReply", () => {
ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" });
mediaUnderstandingMocks.applyMediaUnderstanding.mockReset(); mediaUnderstandingMocks.applyMediaUnderstanding.mockReset();
mediaUnderstandingMocks.applyMediaUnderstanding.mockResolvedValue(undefined); mediaUnderstandingMocks.applyMediaUnderstanding.mockResolvedValue(undefined);
diagnosticMocks.markDiagnosticSessionProgress.mockReset();
sessionMetaMocks.readAcpSessionEntry.mockReset(); sessionMetaMocks.readAcpSessionEntry.mockReset();
sessionMetaMocks.readAcpSessionEntry.mockReturnValue(null); sessionMetaMocks.readAcpSessionEntry.mockReturnValue(null);
transcriptMocks.persistAcpDispatchTranscript.mockClear(); transcriptMocks.persistAcpDispatchTranscript.mockClear();
@@ -545,6 +554,18 @@ describe("tryDispatchAcpReply", () => {
expect(onReplyStart).toHaveBeenCalledTimes(1); expect(onReplyStart).toHaveBeenCalledTimes(1);
}); });
it("does not mark ACP diagnostic progress when diagnostics are disabled", async () => {
setReadyAcpResolution();
mockVisibleTextTurn();
await runDispatch({
bodyForAgent: "visible",
cfg: createAcpTestConfig({ diagnostics: { enabled: false } }),
});
expect(diagnosticMocks.markDiagnosticSessionProgress).not.toHaveBeenCalled();
});
it("does not start reply lifecycle for empty ACP prompt", async () => { it("does not start reply lifecycle for empty ACP prompt", async () => {
setReadyAcpResolution(); setReadyAcpResolution();
const onReplyStart = vi.fn(); const onReplyStart = vi.fn();

View File

@@ -11,9 +11,11 @@ import type { OpenClawConfig } from "../../config/types.openclaw.js";
import type { TtsAutoMode } from "../../config/types.tts.js"; import type { TtsAutoMode } from "../../config/types.tts.js";
import { logVerbose } from "../../globals.js"; import { logVerbose } from "../../globals.js";
import { emitAgentEvent } from "../../infra/agent-events.js"; import { emitAgentEvent } from "../../infra/agent-events.js";
import { isDiagnosticsEnabled } from "../../infra/diagnostic-events.js";
import { formatErrorMessage } from "../../infra/errors.js"; import { formatErrorMessage } from "../../infra/errors.js";
import { generateSecureUuid } from "../../infra/secure-random.js"; import { generateSecureUuid } from "../../infra/secure-random.js";
import { prefixSystemMessage } from "../../infra/system-message.js"; import { prefixSystemMessage } from "../../infra/system-message.js";
import { markDiagnosticSessionProgress } from "../../logging/diagnostic.js";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js"; import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
import { import {
normalizeLowercaseStringOrEmpty, normalizeLowercaseStringOrEmpty,
@@ -342,6 +344,23 @@ export async function tryDispatchAcpReply(params: {
} }
const canonicalSessionKey = acpResolution.sessionKey; const canonicalSessionKey = acpResolution.sessionKey;
const acpAgentId = resolveAgentIdFromSessionKey(canonicalSessionKey); const acpAgentId = resolveAgentIdFromSessionKey(canonicalSessionKey);
const progressSessionKeys = isDiagnosticsEnabled(params.cfg)
? Array.from(
new Set(
[params.ctx.SessionKey, sessionKey, canonicalSessionKey]
.map((key) => normalizeOptionalString(key))
.filter((key): key is string => Boolean(key)),
),
)
: [];
const markAcpProgress =
progressSessionKeys.length > 0
? () => {
for (const key of progressSessionKeys) {
markDiagnosticSessionProgress({ sessionKey: key });
}
}
: undefined;
let queuedFinal = false; let queuedFinal = false;
const delivery = createAcpDispatchDeliveryCoordinator({ const delivery = createAcpDispatchDeliveryCoordinator({
@@ -401,6 +420,7 @@ export async function tryDispatchAcpReply(params: {
cfg: params.cfg, cfg: params.cfg,
shouldSendToolSummaries: params.shouldSendToolSummaries, shouldSendToolSummaries: params.shouldSendToolSummaries,
deliver: delivery.deliver, deliver: delivery.deliver,
onProgress: markAcpProgress,
provider: params.ctx.Surface ?? params.ctx.Provider, provider: params.ctx.Surface ?? params.ctx.Provider,
accountId: effectiveDispatchAccountId, accountId: effectiveDispatchAccountId,
}); });

View File

@@ -178,6 +178,7 @@ describe("dispatchReplyFromConfig ACP abort", () => {
diagnosticMocks.logMessageQueued.mockReset(); diagnosticMocks.logMessageQueued.mockReset();
diagnosticMocks.logMessageProcessed.mockReset(); diagnosticMocks.logMessageProcessed.mockReset();
diagnosticMocks.logSessionStateChange.mockReset(); diagnosticMocks.logSessionStateChange.mockReset();
diagnosticMocks.markDiagnosticSessionProgress.mockReset();
agentEventMocks.emitAgentEvent.mockReset(); agentEventMocks.emitAgentEvent.mockReset();
agentEventMocks.onAgentEvent.mockReset().mockImplementation(() => () => {}); agentEventMocks.onAgentEvent.mockReset().mockImplementation(() => () => {});
setNoAbort(); setNoAbort();

View File

@@ -84,6 +84,7 @@ describe("dispatchReplyFromConfig reply_dispatch hook", () => {
diagnosticMocks.logMessageQueued.mockReset(); diagnosticMocks.logMessageQueued.mockReset();
diagnosticMocks.logMessageProcessed.mockReset(); diagnosticMocks.logMessageProcessed.mockReset();
diagnosticMocks.logSessionStateChange.mockReset(); diagnosticMocks.logSessionStateChange.mockReset();
diagnosticMocks.markDiagnosticSessionProgress.mockReset();
runtimePluginMocks.ensureRuntimePluginsLoaded.mockReset(); runtimePluginMocks.ensureRuntimePluginsLoaded.mockReset();
resetPluginTtsAndThreadMocks(); resetPluginTtsAndThreadMocks();
}); });

View File

@@ -28,6 +28,7 @@ const diagnosticMocks = vi.hoisted(() => ({
logMessageQueued: vi.fn(), logMessageQueued: vi.fn(),
logMessageProcessed: vi.fn(), logMessageProcessed: vi.fn(),
logSessionStateChange: vi.fn(), logSessionStateChange: vi.fn(),
markDiagnosticSessionProgress: vi.fn(),
})); }));
const hookMocks = vi.hoisted(() => ({ const hookMocks = vi.hoisted(() => ({
registry: { registry: {
@@ -177,6 +178,7 @@ vi.mock("../../logging/diagnostic.js", () => ({
logMessageQueued: diagnosticMocks.logMessageQueued, logMessageQueued: diagnosticMocks.logMessageQueued,
logMessageProcessed: diagnosticMocks.logMessageProcessed, logMessageProcessed: diagnosticMocks.logMessageProcessed,
logSessionStateChange: diagnosticMocks.logSessionStateChange, logSessionStateChange: diagnosticMocks.logSessionStateChange,
markDiagnosticSessionProgress: diagnosticMocks.markDiagnosticSessionProgress,
})); }));
vi.mock("../../config/sessions/thread-info.js", () => ({ vi.mock("../../config/sessions/thread-info.js", () => ({
parseSessionThreadInfo: (sessionKey: string | undefined) => parseSessionThreadInfo: (sessionKey: string | undefined) =>

View File

@@ -42,6 +42,7 @@ const diagnosticMocks = vi.hoisted(() => ({
logMessageQueued: vi.fn(), logMessageQueued: vi.fn(),
logMessageProcessed: vi.fn(), logMessageProcessed: vi.fn(),
logSessionStateChange: vi.fn(), logSessionStateChange: vi.fn(),
markDiagnosticSessionProgress: vi.fn(),
})); }));
const hookMocks = vi.hoisted(() => ({ const hookMocks = vi.hoisted(() => ({
registry: { registry: {
@@ -343,6 +344,7 @@ vi.mock("../../logging/diagnostic.js", () => ({
logMessageQueued: diagnosticMocks.logMessageQueued, logMessageQueued: diagnosticMocks.logMessageQueued,
logMessageProcessed: diagnosticMocks.logMessageProcessed, logMessageProcessed: diagnosticMocks.logMessageProcessed,
logSessionStateChange: diagnosticMocks.logSessionStateChange, logSessionStateChange: diagnosticMocks.logSessionStateChange,
markDiagnosticSessionProgress: diagnosticMocks.markDiagnosticSessionProgress,
})); }));
vi.mock("../../config/sessions/thread-info.js", () => ({ vi.mock("../../config/sessions/thread-info.js", () => ({
parseSessionThreadInfo: (sessionKey: string | undefined) => parseSessionThreadInfo: (sessionKey: string | undefined) =>
@@ -749,6 +751,7 @@ describe("dispatchReplyFromConfig", () => {
diagnosticMocks.logMessageQueued.mockClear(); diagnosticMocks.logMessageQueued.mockClear();
diagnosticMocks.logMessageProcessed.mockClear(); diagnosticMocks.logMessageProcessed.mockClear();
diagnosticMocks.logSessionStateChange.mockClear(); diagnosticMocks.logSessionStateChange.mockClear();
diagnosticMocks.markDiagnosticSessionProgress.mockClear();
hookMocks.runner.hasHooks.mockClear(); hookMocks.runner.hasHooks.mockClear();
hookMocks.runner.hasHooks.mockImplementation( hookMocks.runner.hasHooks.mockImplementation(
(hookName?: string) => hookName === "reply_dispatch", (hookName?: string) => hookName === "reply_dispatch",
@@ -2959,6 +2962,89 @@ describe("dispatchReplyFromConfig", () => {
); );
}); });
it("marks diagnostic progress for real reply events but not reply start callbacks", async () => {
setNoAbort();
const cfg = { diagnostics: { enabled: true } } as OpenClawConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "slack",
Surface: "slack",
SessionKey: "agent:main:main",
To: "slack:C123",
});
const onReplyStart = vi.fn(async () => {});
const replyResolver = async (
_ctx: MsgContext,
opts?: GetReplyOptions,
): Promise<ReplyPayload> => {
await opts?.onReplyStart?.();
await opts?.onToolResult?.({ text: "tool progress" });
return { text: "hi" };
};
await dispatchReplyFromConfig({
ctx,
cfg,
dispatcher,
replyOptions: { onReplyStart },
replyResolver,
});
expect(onReplyStart).toHaveBeenCalledTimes(1);
expect(diagnosticMocks.markDiagnosticSessionProgress).toHaveBeenCalledTimes(1);
expect(diagnosticMocks.markDiagnosticSessionProgress).toHaveBeenCalledWith({
sessionKey: "agent:main:main",
});
});
it("keeps diagnostic progress when source progress callbacks are suppressed", async () => {
setNoAbort();
const cfg = { diagnostics: { enabled: true } } as OpenClawConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "discord",
Surface: "discord",
ChatType: "channel",
SessionKey: "agent:main:discord:channel:C1",
To: "discord:channel:C1",
});
const callbacks = {
toolStart: vi.fn(async () => {}),
itemEvent: vi.fn(async () => {}),
commandOutput: vi.fn(async () => {}),
};
const replyResolver = async (
_ctx: MsgContext,
opts?: GetReplyOptions,
): Promise<ReplyPayload> => {
await opts?.onToolStart?.({ name: "lookup" });
await opts?.onItemEvent?.({ progressText: "working" });
await opts?.onCommandOutput?.({ output: "line", status: "running" });
return { text: "hi" };
};
await dispatchReplyFromConfig({
ctx,
cfg,
dispatcher,
replyOptions: {
sourceReplyDeliveryMode: "message_tool_only",
onToolStart: callbacks.toolStart,
onItemEvent: callbacks.itemEvent,
onCommandOutput: callbacks.commandOutput,
},
replyResolver,
});
expect(callbacks.toolStart).not.toHaveBeenCalled();
expect(callbacks.itemEvent).not.toHaveBeenCalled();
expect(callbacks.commandOutput).not.toHaveBeenCalled();
expect(diagnosticMocks.markDiagnosticSessionProgress).toHaveBeenCalledTimes(3);
expect(diagnosticMocks.markDiagnosticSessionProgress).toHaveBeenCalledWith({
sessionKey: "agent:main:discord:channel:C1",
});
});
it("routes plugin-owned bindings to the owning plugin before generic inbound claim broadcast", async () => { it("routes plugin-owned bindings to the owning plugin before generic inbound claim broadcast", async () => {
setNoAbort(); setNoAbort();
hookMocks.runner.hasHooks.mockImplementation( hookMocks.runner.hasHooks.mockImplementation(

View File

@@ -45,6 +45,7 @@ import {
logMessageProcessed, logMessageProcessed,
logMessageQueued, logMessageQueued,
logSessionStateChange, logSessionStateChange,
markDiagnosticSessionProgress,
} from "../../logging/diagnostic.js"; } from "../../logging/diagnostic.js";
import { import {
buildPluginBindingDeclinedText, buildPluginBindingDeclinedText,
@@ -411,6 +412,15 @@ export async function dispatchReplyFromConfig(
const boundAcpDispatchSessionKey = resolveBoundAcpDispatchSessionKey({ ctx, cfg }); const boundAcpDispatchSessionKey = resolveBoundAcpDispatchSessionKey({ ctx, cfg });
const acpDispatchSessionKey = const acpDispatchSessionKey =
boundAcpDispatchSessionKey ?? initialSessionStoreEntry.sessionKey ?? sessionKey; boundAcpDispatchSessionKey ?? initialSessionStoreEntry.sessionKey ?? sessionKey;
const markProgress = () => {
if (!canTrackSession || !sessionKey) {
return;
}
markDiagnosticSessionProgress({ sessionKey });
if (acpDispatchSessionKey && acpDispatchSessionKey !== sessionKey) {
markDiagnosticSessionProgress({ sessionKey: acpDispatchSessionKey });
}
};
const sessionStoreEntry = boundAcpDispatchSessionKey const sessionStoreEntry = boundAcpDispatchSessionKey
? resolveSessionStoreLookup({ ...ctx, SessionKey: boundAcpDispatchSessionKey }, cfg) ? resolveSessionStoreLookup({ ...ctx, SessionKey: boundAcpDispatchSessionKey }, cfg)
: initialSessionStoreEntry; : initialSessionStoreEntry;
@@ -1190,6 +1200,19 @@ export async function dispatchReplyFromConfig(
const onPlanUpdateFromReplyOptions = params.replyOptions?.onPlanUpdate; const onPlanUpdateFromReplyOptions = params.replyOptions?.onPlanUpdate;
const onApprovalEventFromReplyOptions = params.replyOptions?.onApprovalEvent; const onApprovalEventFromReplyOptions = params.replyOptions?.onApprovalEvent;
const onPatchSummaryFromReplyOptions = params.replyOptions?.onPatchSummary; const onPatchSummaryFromReplyOptions = params.replyOptions?.onPatchSummary;
const wrapProgressCallback = <Args extends unknown[]>(
callback: ((...args: Args) => Promise<void> | void) | undefined,
): ((...args: Args) => Promise<void>) | undefined => {
if (!callback && (!suppressAutomaticSourceDelivery || !canTrackSession)) {
return undefined;
}
return async (...args: Args) => {
markProgress();
if (!suppressAutomaticSourceDelivery) {
await callback?.(...args);
}
};
};
const replyResolver = const replyResolver =
params.replyResolver ?? (await loadGetReplyFromConfigRuntime()).getReplyFromConfig; params.replyResolver ?? (await loadGetReplyFromConfigRuntime()).getReplyFromConfig;
@@ -1203,33 +1226,18 @@ export async function dispatchReplyFromConfig(
sourceReplyDeliveryMode, sourceReplyDeliveryMode,
typingPolicy: typing.typingPolicy, typingPolicy: typing.typingPolicy,
suppressTyping: typing.suppressTyping, suppressTyping: typing.suppressTyping,
onPartialReply: suppressAutomaticSourceDelivery onPartialReply: wrapProgressCallback(params.replyOptions?.onPartialReply),
? undefined onReasoningStream: wrapProgressCallback(params.replyOptions?.onReasoningStream),
: params.replyOptions?.onPartialReply, onReasoningEnd: wrapProgressCallback(params.replyOptions?.onReasoningEnd),
onReasoningStream: suppressAutomaticSourceDelivery onAssistantMessageStart: wrapProgressCallback(params.replyOptions?.onAssistantMessageStart),
? undefined onBlockReplyQueued: wrapProgressCallback(params.replyOptions?.onBlockReplyQueued),
: params.replyOptions?.onReasoningStream, onToolStart: wrapProgressCallback(params.replyOptions?.onToolStart),
onReasoningEnd: suppressAutomaticSourceDelivery onItemEvent: wrapProgressCallback(params.replyOptions?.onItemEvent),
? undefined onCommandOutput: wrapProgressCallback(params.replyOptions?.onCommandOutput),
: params.replyOptions?.onReasoningEnd, onCompactionStart: wrapProgressCallback(params.replyOptions?.onCompactionStart),
onAssistantMessageStart: suppressAutomaticSourceDelivery onCompactionEnd: wrapProgressCallback(params.replyOptions?.onCompactionEnd),
? undefined
: params.replyOptions?.onAssistantMessageStart,
onBlockReplyQueued: suppressAutomaticSourceDelivery
? undefined
: params.replyOptions?.onBlockReplyQueued,
onToolStart: suppressAutomaticSourceDelivery ? undefined : params.replyOptions?.onToolStart,
onItemEvent: suppressAutomaticSourceDelivery ? undefined : params.replyOptions?.onItemEvent,
onCommandOutput: suppressAutomaticSourceDelivery
? undefined
: params.replyOptions?.onCommandOutput,
onCompactionStart: suppressAutomaticSourceDelivery
? undefined
: params.replyOptions?.onCompactionStart,
onCompactionEnd: suppressAutomaticSourceDelivery
? undefined
: params.replyOptions?.onCompactionEnd,
onToolResult: (payload: ReplyPayload) => { onToolResult: (payload: ReplyPayload) => {
markProgress();
const run = async () => { const run = async () => {
markInboundDedupeReplayUnsafe(); markInboundDedupeReplayUnsafe();
if (!suppressAutomaticSourceDelivery) { if (!suppressAutomaticSourceDelivery) {
@@ -1277,6 +1285,7 @@ export async function dispatchReplyFromConfig(
return run(); return run();
}, },
onPlanUpdate: async (payload) => { onPlanUpdate: async (payload) => {
markProgress();
markInboundDedupeReplayUnsafe(); markInboundDedupeReplayUnsafe();
if (!suppressAutomaticSourceDelivery) { if (!suppressAutomaticSourceDelivery) {
await onPlanUpdateFromReplyOptions?.(payload); await onPlanUpdateFromReplyOptions?.(payload);
@@ -1287,6 +1296,7 @@ export async function dispatchReplyFromConfig(
await sendPlanUpdate({ explanation: payload.explanation, steps: payload.steps }); await sendPlanUpdate({ explanation: payload.explanation, steps: payload.steps });
}, },
onApprovalEvent: async (payload) => { onApprovalEvent: async (payload) => {
markProgress();
markInboundDedupeReplayUnsafe(); markInboundDedupeReplayUnsafe();
if (!suppressAutomaticSourceDelivery) { if (!suppressAutomaticSourceDelivery) {
await onApprovalEventFromReplyOptions?.(payload); await onApprovalEventFromReplyOptions?.(payload);
@@ -1305,6 +1315,7 @@ export async function dispatchReplyFromConfig(
await maybeSendWorkingStatus(label); await maybeSendWorkingStatus(label);
}, },
onPatchSummary: async (payload) => { onPatchSummary: async (payload) => {
markProgress();
markInboundDedupeReplayUnsafe(); markInboundDedupeReplayUnsafe();
if (!suppressAutomaticSourceDelivery) { if (!suppressAutomaticSourceDelivery) {
await onPatchSummaryFromReplyOptions?.(payload); await onPatchSummaryFromReplyOptions?.(payload);
@@ -1319,6 +1330,7 @@ export async function dispatchReplyFromConfig(
await maybeSendWorkingStatus(label); await maybeSendWorkingStatus(label);
}, },
onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => { onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => {
markProgress();
const run = async () => { const run = async () => {
if ( if (
payload.isReasoning !== true && payload.isReasoning !== true &&

View File

@@ -149,7 +149,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
maximum: 9007199254740991, maximum: 9007199254740991,
title: "Session Liveness Threshold (ms)", title: "Session Liveness Threshold (ms)",
description: description:
"Age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Increase for long multi-tool turns; decrease for faster stale-session detection.", "No-progress age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Reply, tool, status, block, and ACP progress reset the timer; repeated stuck diagnostics back off while unchanged.",
}, },
otel: { otel: {
type: "object", type: "object",
@@ -24485,7 +24485,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
}, },
"diagnostics.stuckSessionWarnMs": { "diagnostics.stuckSessionWarnMs": {
label: "Session Liveness Threshold (ms)", label: "Session Liveness Threshold (ms)",
help: "Age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Increase for long multi-tool turns; decrease for faster stale-session detection.", help: "No-progress age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Reply, tool, status, block, and ACP progress reset the timer; repeated stuck diagnostics back off while unchanged.",
tags: ["observability", "storage"], tags: ["observability", "storage"],
}, },
"diagnostics.otel.enabled": { "diagnostics.otel.enabled": {

View File

@@ -585,7 +585,7 @@ export const FIELD_HELP: Record<string, string> = {
"diagnostics.enabled": "diagnostics.enabled":
"Master toggle for diagnostics instrumentation output in logs and telemetry wiring paths. Defaults to enabled; set false only in tightly constrained environments.", "Master toggle for diagnostics instrumentation output in logs and telemetry wiring paths. Defaults to enabled; set false only in tightly constrained environments.",
"diagnostics.stuckSessionWarnMs": "diagnostics.stuckSessionWarnMs":
"Age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Increase for long multi-tool turns; decrease for faster stale-session detection.", "No-progress age threshold in milliseconds for classifying long processing sessions as long-running, stalled, or stuck. Reply, tool, status, block, and ACP progress reset the timer; repeated stuck diagnostics back off while unchanged.",
"diagnostics.otel.enabled": "diagnostics.otel.enabled":
"Enables OpenTelemetry export pipeline for traces, metrics, and logs based on configured endpoint/protocol settings. Keep disabled unless your collector endpoint and auth are fully configured.", "Enables OpenTelemetry export pipeline for traces, metrics, and logs based on configured endpoint/protocol settings. Keep disabled unless your collector endpoint and auth are fully configured.",
"diagnostics.otel.endpoint": "diagnostics.otel.endpoint":

View File

@@ -276,7 +276,7 @@ export type DiagnosticsConfig = {
enabled?: boolean; enabled?: boolean;
/** Optional ad-hoc diagnostics flags (e.g. "telegram.http"). */ /** Optional ad-hoc diagnostics flags (e.g. "telegram.http"). */
flags?: string[]; flags?: string[];
/** Threshold in ms before a processing session logs "stuck session" diagnostics. */ /** Threshold in ms before a processing session with no observed progress logs diagnostics. */
stuckSessionWarnMs?: number; stuckSessionWarnMs?: number;
otel?: DiagnosticsOtelConfig; otel?: DiagnosticsOtelConfig;
cacheTrace?: DiagnosticsCacheTraceConfig; cacheTrace?: DiagnosticsCacheTraceConfig;

View File

@@ -4,6 +4,7 @@ export type SessionState = {
sessionId?: string; sessionId?: string;
sessionKey?: string; sessionKey?: string;
lastActivity: number; lastActivity: number;
lastStuckWarnAgeMs?: number;
state: SessionStateValue; state: SessionStateValue;
queueDepth: number; queueDepth: number;
toolCallHistory?: ToolCallRecord[]; toolCallHistory?: ToolCallRecord[];

View File

@@ -25,6 +25,7 @@ import {
import { import {
logSessionStateChange, logSessionStateChange,
logMessageQueued, logMessageQueued,
markDiagnosticSessionProgress,
resetDiagnosticStateForTest, resetDiagnosticStateForTest,
resolveStuckSessionWarnMs, resolveStuckSessionWarnMs,
startDiagnosticHeartbeat, startDiagnosticHeartbeat,
@@ -202,6 +203,63 @@ describe("stuck session diagnostics threshold", () => {
}); });
}); });
it("does not warn while a processing session continues reporting progress", () => {
const events: DiagnosticEventPayload[] = [];
const unsubscribe = onDiagnosticEvent((event) => {
events.push(event);
});
try {
startDiagnosticHeartbeat({
diagnostics: {
enabled: true,
stuckSessionWarnMs: 30_000,
},
});
logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" });
vi.advanceTimersByTime(45_000);
markDiagnosticSessionProgress({ sessionId: "s1", sessionKey: "main" });
vi.advanceTimersByTime(16_000);
} finally {
unsubscribe();
}
expect(events.filter((event) => event.type === "session.stuck")).toHaveLength(0);
expect(events.filter((event) => event.type === "session.stalled")).toHaveLength(0);
expect(events.filter((event) => event.type === "session.long_running")).toHaveLength(0);
});
it("backs off repeated stuck warnings while a session remains unchanged", () => {
const events: Array<{ ageMs?: number }> = [];
const recoverStuckSession = vi.fn();
const unsubscribe = onDiagnosticEvent((event) => {
if (event.type === "session.stuck") {
events.push({ ageMs: event.ageMs });
}
});
try {
startDiagnosticHeartbeat(
{
diagnostics: {
enabled: true,
stuckSessionWarnMs: 30_000,
},
},
{ recoverStuckSession },
);
logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" });
vi.advanceTimersByTime(91_000);
expect(events).toHaveLength(1);
expect(recoverStuckSession).toHaveBeenCalledTimes(1);
vi.advanceTimersByTime(30_000);
} finally {
unsubscribe();
}
expect(events.map((event) => event.ageMs)).toEqual([60_000, 120_000]);
expect(recoverStuckSession).toHaveBeenCalledTimes(2);
});
it("reports active sessions as stalled instead of stuck when active work stops progressing", () => { it("reports active sessions as stalled instead of stuck when active work stops progressing", () => {
const events: DiagnosticEventPayload[] = []; const events: DiagnosticEventPayload[] = [];
const recoverStuckSession = vi.fn(); const recoverStuckSession = vi.fn();

View File

@@ -416,6 +416,7 @@ export function logMessageQueued(params: {
const state = getDiagnosticSessionState(params); const state = getDiagnosticSessionState(params);
state.queueDepth += 1; state.queueDepth += 1;
state.lastActivity = Date.now(); state.lastActivity = Date.now();
state.lastStuckWarnAgeMs = undefined;
if (diag.isEnabled("debug")) { if (diag.isEnabled("debug")) {
diag.debug( diag.debug(
`message queued: sessionId=${state.sessionId ?? "unknown"} sessionKey=${ `message queued: sessionId=${state.sessionId ?? "unknown"} sessionKey=${
@@ -494,6 +495,7 @@ export function logSessionStateChange(
const prevState = state.state; const prevState = state.state;
state.state = params.state; state.state = params.state;
state.lastActivity = Date.now(); state.lastActivity = Date.now();
state.lastStuckWarnAgeMs = undefined;
if (params.state === "idle") { if (params.state === "idle") {
state.queueDepth = Math.max(0, state.queueDepth - 1); state.queueDepth = Math.max(0, state.queueDepth - 1);
} }
@@ -518,6 +520,16 @@ export function logSessionStateChange(
markActivity(); markActivity();
} }
export function markDiagnosticSessionProgress(params: SessionRef) {
if (!areDiagnosticsEnabledForProcess()) {
return;
}
const state = getDiagnosticSessionState(params);
state.lastActivity = Date.now();
state.lastStuckWarnAgeMs = undefined;
markActivity();
}
function sessionAttentionFields(params: { function sessionAttentionFields(params: {
classification: SessionAttentionClassification; classification: SessionAttentionClassification;
activity: DiagnosticSessionActivitySnapshot; activity: DiagnosticSessionActivitySnapshot;
@@ -562,6 +574,16 @@ export function logSessionAttention(
activity, activity,
staleMs: params.thresholdMs, staleMs: params.thresholdMs,
}); });
if (classification.eventType === "session.stuck") {
const nextWarnAgeMs =
state.lastStuckWarnAgeMs === undefined
? params.thresholdMs
: Math.max(state.lastStuckWarnAgeMs + params.thresholdMs, state.lastStuckWarnAgeMs * 2);
if (params.ageMs < nextWarnAgeMs) {
return undefined;
}
state.lastStuckWarnAgeMs = params.ageMs;
}
const label = const label =
classification.eventType === "session.stuck" classification.eventType === "session.stuck"
? "stuck session" ? "stuck session"