ACP: recover hung bound turns (#51816)

* ACP: add hung-turn starvation repro

* ACP: recover hung bound turns

* ACP: preserve timed-out session handles

---------

Co-authored-by: Onur <2453968+osolmaz@users.noreply.github.com>
This commit is contained in:
Bob
2026-03-21 20:54:30 +01:00
committed by GitHub
parent 5c05347d11
commit 8cac327c19
2 changed files with 454 additions and 21 deletions

View File

@@ -1,3 +1,4 @@
import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
import type { OpenClawConfig } from "../../config/config.js";
import { logVerbose } from "../../globals.js";
import { normalizeAgentId } from "../../routing/session-key.js";
@@ -20,6 +21,7 @@ import type {
AcpRuntime,
AcpRuntimeCapabilities,
AcpRuntimeHandle,
AcpRuntimeSessionMode,
AcpRuntimeStatus,
} from "../runtime/types.js";
import { reconcileManagerRuntimeSessionIdentifiers } from "./manager.identity-reconcile.js";
@@ -70,6 +72,10 @@ import {
} from "./runtime-options.js";
import { SessionActorQueue } from "./session-actor-queue.js";
const ACP_TURN_TIMEOUT_GRACE_MS = 1_000;
const ACP_TURN_TIMEOUT_CLEANUP_GRACE_MS = 2_000;
const ACP_TURN_TIMEOUT_REASON = "turn-timeout";
export class AcpSessionManager {
private readonly actorQueue = new SessionActorQueue();
private readonly actorTailBySession = this.actorQueue.getTailMapForTesting();
@@ -621,6 +627,7 @@ export class AcpSessionManager {
let activeTurnStarted = false;
let sawTurnOutput = false;
let retryFreshHandle = false;
let skipPostTurnCleanup = false;
try {
const ensured = await this.ensureRuntimeHandle({
cfg: input.cfg,
@@ -667,26 +674,58 @@ export class AcpSessionManager {
input.signal && typeof AbortSignal.any === "function"
? AbortSignal.any([input.signal, internalAbortController.signal])
: internalAbortController.signal;
for await (const event of runtime.runTurn({
handle,
text: input.text,
attachments: input.attachments,
mode: input.mode,
requestId: input.requestId,
signal: combinedSignal,
})) {
if (event.type === "error") {
streamError = new AcpRuntimeError(
normalizeAcpErrorCode(event.code),
event.message?.trim() || "ACP turn failed before completion.",
);
} else if (event.type === "text_delta" || event.type === "tool_call") {
sawTurnOutput = true;
const eventGate = { open: true };
const turnPromise = (async () => {
for await (const event of runtime.runTurn({
handle,
text: input.text,
attachments: input.attachments,
mode: input.mode,
requestId: input.requestId,
signal: combinedSignal,
})) {
if (!eventGate.open) {
continue;
}
if (event.type === "error") {
streamError = new AcpRuntimeError(
normalizeAcpErrorCode(event.code),
event.message?.trim() || "ACP turn failed before completion.",
);
} else if (event.type === "text_delta" || event.type === "tool_call") {
sawTurnOutput = true;
}
if (input.onEvent) {
await input.onEvent(event);
}
}
if (input.onEvent) {
await input.onEvent(event);
if (eventGate.open && streamError) {
throw streamError;
}
}
})();
const turnTimeoutMs = this.resolveTurnTimeoutMs({
cfg: input.cfg,
meta,
});
const sessionMode = meta.mode;
await this.awaitTurnWithTimeout({
sessionKey,
turnPromise,
timeoutMs: turnTimeoutMs + ACP_TURN_TIMEOUT_GRACE_MS,
timeoutLabelMs: turnTimeoutMs,
onTimeout: async () => {
eventGate.open = false;
skipPostTurnCleanup = true;
if (!activeTurn) {
return;
}
await this.cleanupTimedOutTurn({
sessionKey,
activeTurn,
mode: sessionMode,
});
},
});
if (streamError) {
throw streamError;
}
@@ -735,7 +774,14 @@ export class AcpSessionManager {
if (activeTurn && this.activeTurnBySession.get(actorKey) === activeTurn) {
this.activeTurnBySession.delete(actorKey);
}
if (!retryFreshHandle && runtime && handle && meta && meta.mode !== "oneshot") {
if (
!retryFreshHandle &&
!skipPostTurnCleanup &&
runtime &&
handle &&
meta &&
meta.mode !== "oneshot"
) {
({ handle } = await this.reconcileRuntimeSessionIdentifiers({
cfg: input.cfg,
sessionKey,
@@ -745,7 +791,14 @@ export class AcpSessionManager {
failOnStatusError: false,
}));
}
if (!retryFreshHandle && runtime && handle && meta && meta.mode === "oneshot") {
if (
!retryFreshHandle &&
!skipPostTurnCleanup &&
runtime &&
handle &&
meta &&
meta.mode === "oneshot"
) {
try {
await runtime.close({
handle,
@@ -767,6 +820,191 @@ export class AcpSessionManager {
});
}
private resolveTurnTimeoutMs(params: { cfg: OpenClawConfig; meta: SessionAcpMeta }): number {
const runtimeTimeoutSeconds = resolveRuntimeOptionsFromMeta(params.meta).timeoutSeconds;
if (
typeof runtimeTimeoutSeconds === "number" &&
Number.isFinite(runtimeTimeoutSeconds) &&
runtimeTimeoutSeconds > 0
) {
return Math.max(1_000, Math.round(runtimeTimeoutSeconds * 1_000));
}
return resolveAgentTimeoutMs({
cfg: params.cfg,
minMs: 1_000,
});
}
private async awaitTurnWithTimeout<T>(params: {
sessionKey: string;
turnPromise: Promise<T>;
timeoutMs: number;
timeoutLabelMs: number;
onTimeout: () => Promise<void>;
}): Promise<T> {
const observedTurnPromise: Promise<
| {
kind: "value";
value: T;
}
| {
kind: "error";
error: unknown;
}
> = params.turnPromise.then(
(value) => ({
kind: "value" as const,
value,
}),
(error) => ({
kind: "error" as const,
error,
}),
);
if (params.timeoutMs <= 0) {
const outcome = await observedTurnPromise;
if (outcome.kind === "error") {
throw outcome.error;
}
return outcome.value;
}
const timeoutToken = Symbol("acp-turn-timeout");
let timer: NodeJS.Timeout | undefined;
const timeoutPromise = new Promise<typeof timeoutToken>((resolve) => {
timer = setTimeout(() => resolve(timeoutToken), params.timeoutMs);
timer.unref?.();
});
try {
const outcome = await Promise.race([observedTurnPromise, timeoutPromise]);
if (outcome === timeoutToken) {
void observedTurnPromise.then((lateOutcome) => {
if (lateOutcome.kind === "error") {
logVerbose(
`acp-manager: detached late turn error after timeout for ${params.sessionKey}: ${String(lateOutcome.error)}`,
);
}
});
await params.onTimeout();
throw new AcpRuntimeError(
"ACP_TURN_FAILED",
`ACP turn timed out after ${Math.max(1, Math.round(params.timeoutLabelMs / 1_000))}s.`,
);
}
if (outcome.kind === "error") {
throw outcome.error;
}
return outcome.value;
} finally {
if (timer) {
clearTimeout(timer);
}
}
}
private async cleanupTimedOutTurn(params: {
sessionKey: string;
activeTurn: ActiveTurnState;
mode: AcpRuntimeSessionMode;
}): Promise<void> {
params.activeTurn.abortController.abort();
if (!params.activeTurn.cancelPromise) {
params.activeTurn.cancelPromise = params.activeTurn.runtime.cancel({
handle: params.activeTurn.handle,
reason: ACP_TURN_TIMEOUT_REASON,
});
}
const cancelFinished = await this.awaitCleanupWithGrace({
sessionKey: params.sessionKey,
label: "cancel",
promise: params.activeTurn.cancelPromise,
});
if (params.mode !== "oneshot") {
return;
}
const closePromise = params.activeTurn.runtime.close({
handle: params.activeTurn.handle,
reason: ACP_TURN_TIMEOUT_REASON,
});
const closeFinished = await this.awaitCleanupWithGrace({
sessionKey: params.sessionKey,
label: "close",
promise: closePromise,
});
if (cancelFinished && closeFinished) {
this.clearCachedRuntimeStateIfHandleMatches({
sessionKey: params.sessionKey,
handle: params.activeTurn.handle,
});
return;
}
void Promise.allSettled([params.activeTurn.cancelPromise, closePromise]).then(() => {
this.clearCachedRuntimeStateIfHandleMatches({
sessionKey: params.sessionKey,
handle: params.activeTurn.handle,
});
});
}
private async awaitCleanupWithGrace(params: {
sessionKey: string;
label: "cancel" | "close";
promise: Promise<unknown>;
}): Promise<boolean> {
const observedCleanupPromise: Promise<
| {
kind: "done";
}
| {
kind: "error";
error: unknown;
}
> = params.promise.then(
() => ({
kind: "done" as const,
}),
(error) => ({
kind: "error" as const,
error,
}),
);
const timeoutToken = Symbol(`acp-timeout-${params.label}`);
let timer: NodeJS.Timeout | undefined;
const timeoutPromise = new Promise<typeof timeoutToken>((resolve) => {
timer = setTimeout(() => resolve(timeoutToken), ACP_TURN_TIMEOUT_CLEANUP_GRACE_MS);
timer.unref?.();
});
try {
const outcome = await Promise.race([observedCleanupPromise, timeoutPromise]);
if (outcome === timeoutToken) {
void observedCleanupPromise.then((lateOutcome) => {
if (lateOutcome.kind === "error") {
logVerbose(
`acp-manager: detached timed-out turn ${params.label} cleanup failed for ${params.sessionKey}: ${String(lateOutcome.error)}`,
);
}
});
logVerbose(
`acp-manager: timed-out turn ${params.label} cleanup exceeded ${ACP_TURN_TIMEOUT_CLEANUP_GRACE_MS}ms for ${params.sessionKey}`,
);
return false;
}
if (outcome.kind === "error") {
logVerbose(
`acp-manager: timed-out turn ${params.label} cleanup failed for ${params.sessionKey}: ${String(outcome.error)}`,
);
}
return true;
} finally {
if (timer) {
clearTimeout(timer);
}
}
}
async cancelSession(params: {
cfg: OpenClawConfig;
sessionKey: string;
@@ -1418,4 +1656,27 @@ export class AcpSessionManager {
private clearCachedRuntimeState(sessionKey: string): void {
this.runtimeCache.clear(normalizeActorKey(sessionKey));
}
private clearCachedRuntimeStateIfHandleMatches(params: {
sessionKey: string;
handle: AcpRuntimeHandle;
}): void {
const cached = this.getCachedRuntimeState(params.sessionKey);
if (!cached || !this.runtimeHandlesMatch(cached.handle, params.handle)) {
return;
}
this.clearCachedRuntimeState(params.sessionKey);
}
private runtimeHandlesMatch(a: AcpRuntimeHandle, b: AcpRuntimeHandle): boolean {
return (
a.sessionKey === b.sessionKey &&
a.backend === b.backend &&
a.runtimeSessionName === b.runtimeSessionName &&
(a.cwd ?? "") === (b.cwd ?? "") &&
(a.acpxRecordId ?? "") === (b.acpxRecordId ?? "") &&
(a.backendSessionId ?? "") === (b.backendSessionId ?? "") &&
(a.agentSessionId ?? "") === (b.agentSessionId ?? "")
);
}
}

View File

@@ -98,7 +98,7 @@ function createRuntime(): {
};
}
function readySessionMeta() {
function readySessionMeta(overrides: Partial<SessionAcpMeta> = {}): SessionAcpMeta {
return {
backend: "acpx",
agent: "codex",
@@ -106,6 +106,7 @@ function readySessionMeta() {
mode: "persistent" as const,
state: "idle" as const,
lastActivityAt: Date.now(),
...overrides,
};
}
@@ -270,6 +271,177 @@ describe("AcpSessionManager", () => {
expect(runtimeState.runTurn).toHaveBeenCalledTimes(2);
});
it("times out a hung persistent turn without closing the session and lets queued work continue", async () => {
vi.useFakeTimers();
try {
const runtimeState = createRuntime();
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({
id: "acpx",
runtime: runtimeState.runtime,
});
hoisted.readAcpSessionEntryMock.mockReturnValue({
sessionKey: "agent:codex:acp:session-1",
storeSessionKey: "agent:codex:acp:session-1",
acp: readySessionMeta(),
});
let firstTurnStarted = false;
runtimeState.runTurn.mockImplementation(async function* (input: { requestId: string }) {
if (input.requestId === "r1") {
firstTurnStarted = true;
await new Promise(() => {});
}
yield { type: "done" as const };
});
const manager = new AcpSessionManager();
const cfg = {
...baseCfg,
agents: {
defaults: {
timeoutSeconds: 1,
},
},
} as OpenClawConfig;
const first = manager.runTurn({
cfg,
sessionKey: "agent:codex:acp:session-1",
text: "first",
mode: "prompt",
requestId: "r1",
});
void first.catch(() => undefined);
await vi.waitFor(() => {
expect(firstTurnStarted).toBe(true);
});
const second = manager.runTurn({
cfg,
sessionKey: "agent:codex:acp:session-1",
text: "second",
mode: "prompt",
requestId: "r2",
});
await vi.advanceTimersByTimeAsync(3_500);
await expect(first).rejects.toMatchObject({
code: "ACP_TURN_FAILED",
message: "ACP turn timed out after 1s.",
});
await expect(second).resolves.toBeUndefined();
expect(runtimeState.ensureSession).toHaveBeenCalledTimes(1);
expect(runtimeState.runTurn).toHaveBeenCalledTimes(2);
expect(runtimeState.cancel).toHaveBeenCalledWith(
expect.objectContaining({
reason: "turn-timeout",
}),
);
expect(runtimeState.close).not.toHaveBeenCalled();
expect(manager.getObservabilitySnapshot(cfg)).toMatchObject({
runtimeCache: {
activeSessions: 1,
},
turns: {
active: 0,
queueDepth: 0,
completed: 1,
failed: 1,
},
});
const states = extractStatesFromUpserts();
expect(states).toContain("error");
expect(states.at(-1)).toBe("idle");
} finally {
vi.useRealTimers();
}
});
it("keeps timed-out runtime handles counted until timeout cleanup finishes", async () => {
vi.useFakeTimers();
try {
const runtimeState = createRuntime();
runtimeState.cancel.mockImplementation(() => new Promise(() => {}));
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({
id: "acpx",
runtime: runtimeState.runtime,
});
hoisted.readAcpSessionEntryMock.mockImplementation((paramsUnknown: unknown) => {
const sessionKey = (paramsUnknown as { sessionKey?: string }).sessionKey ?? "";
return {
sessionKey,
storeSessionKey: sessionKey,
acp: {
...readySessionMeta(),
runtimeSessionName: `runtime:${sessionKey}`,
},
};
});
let firstTurnStarted = false;
runtimeState.runTurn.mockImplementation(async function* (input: { requestId: string }) {
if (input.requestId === "r1") {
firstTurnStarted = true;
await new Promise(() => {});
}
yield { type: "done" as const };
});
const manager = new AcpSessionManager();
const cfg = {
...baseCfg,
acp: {
...baseCfg.acp,
maxConcurrentSessions: 1,
},
agents: {
defaults: {
timeoutSeconds: 1,
},
},
} as OpenClawConfig;
const first = manager.runTurn({
cfg,
sessionKey: "agent:codex:acp:session-a",
text: "first",
mode: "prompt",
requestId: "r1",
});
void first.catch(() => undefined);
await vi.waitFor(() => {
expect(firstTurnStarted).toBe(true);
});
await vi.advanceTimersByTimeAsync(4_500);
await expect(first).rejects.toMatchObject({
code: "ACP_TURN_FAILED",
message: "ACP turn timed out after 1s.",
});
expect(manager.getObservabilitySnapshot(cfg).runtimeCache.activeSessions).toBe(1);
await expect(
manager.runTurn({
cfg,
sessionKey: "agent:codex:acp:session-b",
text: "second",
mode: "prompt",
requestId: "r2",
}),
).rejects.toMatchObject({
code: "ACP_SESSION_INIT_FAILED",
message: expect.stringContaining("max concurrent sessions"),
});
expect(runtimeState.ensureSession).toHaveBeenCalledTimes(1);
} finally {
vi.useRealTimers();
}
});
it("runs turns for different ACP sessions in parallel", async () => {
const runtimeState = createRuntime();
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({