mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-13 15:47:28 +00:00
fix(agents): reclaim reported stale session locks
This commit is contained in:
@@ -9,6 +9,7 @@ import type {
|
||||
PluginHookBeforePromptBuildResult,
|
||||
} from "../../plugins/types.js";
|
||||
import { normalizeLowercaseStringOrEmpty } from "../../shared/string-coerce.js";
|
||||
import { clearAgentHarnesses, registerAgentHarness } from "../harness/registry.js";
|
||||
import type { FailoverReason } from "../pi-embedded-helpers/types.js";
|
||||
import type { buildEmbeddedRunPayloads } from "./run/payloads.js";
|
||||
import type { EmbeddedRunAttemptResult } from "./run/types.js";
|
||||
@@ -234,6 +235,17 @@ export const overflowBaseRunParams = {
|
||||
} as const;
|
||||
|
||||
export function resetRunOverflowCompactionHarnessMocks(): void {
|
||||
clearAgentHarnesses();
|
||||
registerAgentHarness({
|
||||
id: "codex",
|
||||
label: "Codex",
|
||||
supports: (ctx) =>
|
||||
ctx.provider === "codex" || ctx.provider === "openai-codex"
|
||||
? { supported: true, priority: 100 }
|
||||
: { supported: false },
|
||||
runAttempt: async (params) => await mockedRunEmbeddedAttempt(params),
|
||||
});
|
||||
|
||||
mockedGlobalHookRunner.hasHooks.mockReset();
|
||||
mockedGlobalHookRunner.hasHooks.mockReturnValue(false);
|
||||
mockedGlobalHookRunner.runBeforeAgentReply.mockReset();
|
||||
|
||||
@@ -62,6 +62,10 @@ type LockInspectionDetails = Pick<
|
||||
|
||||
const SESSION_LOCKS = createFileLockManager("openclaw.session-write-lock");
|
||||
|
||||
function isFileLockError(error: unknown, code: string): boolean {
|
||||
return (error as { code?: unknown } | null)?.code === code;
|
||||
}
|
||||
|
||||
export type SessionWriteLockAcquireTimeoutConfig = {
|
||||
session?: {
|
||||
writeLock?: {
|
||||
@@ -368,6 +372,33 @@ async function shouldReclaimContendedLockFile(
|
||||
}
|
||||
}
|
||||
|
||||
function sessionLockHeldByThisProcess(normalizedSessionFile: string): boolean {
|
||||
return SESSION_LOCKS.heldEntries().some(
|
||||
(entry) => entry.normalizedTargetPath === normalizedSessionFile,
|
||||
);
|
||||
}
|
||||
|
||||
async function removeReportedStaleLockIfStillStale(params: {
|
||||
lockPath: string;
|
||||
normalizedSessionFile: string;
|
||||
staleMs: number;
|
||||
}): Promise<boolean> {
|
||||
const nowMs = Date.now();
|
||||
const payload = await readLockPayload(params.lockPath);
|
||||
const inspected = inspectLockPayloadForSession({
|
||||
payload,
|
||||
staleMs: params.staleMs,
|
||||
nowMs,
|
||||
heldByThisProcess: sessionLockHeldByThisProcess(params.normalizedSessionFile),
|
||||
reclaimLockWithoutStarttime: true,
|
||||
});
|
||||
if (!(await shouldReclaimContendedLockFile(params.lockPath, inspected, params.staleMs, nowMs))) {
|
||||
return false;
|
||||
}
|
||||
await fs.rm(params.lockPath, { force: true });
|
||||
return true;
|
||||
}
|
||||
|
||||
function shouldTreatAsOrphanSelfLock(params: {
|
||||
payload: LockFilePayload | null;
|
||||
heldByThisProcess: boolean;
|
||||
@@ -502,42 +533,56 @@ export async function acquireSessionWriteLock(params: {
|
||||
const normalizedSessionFile = await resolveNormalizedSessionFile(sessionFile);
|
||||
const lockPath = `${normalizedSessionFile}.lock`;
|
||||
await fs.mkdir(sessionDir, { recursive: true });
|
||||
try {
|
||||
const lock = await SESSION_LOCKS.acquire(sessionFile, {
|
||||
staleMs,
|
||||
timeoutMs,
|
||||
retry: { minTimeout: 50, maxTimeout: 1000, factor: 1 },
|
||||
allowReentrant,
|
||||
metadata: { maxHoldMs },
|
||||
payload: () => {
|
||||
const createdAt = new Date().toISOString();
|
||||
const starttime = getProcessStartTime(process.pid);
|
||||
const lockPayload: LockFilePayload = { pid: process.pid, createdAt };
|
||||
if (starttime !== null) {
|
||||
lockPayload.starttime = starttime;
|
||||
while (true) {
|
||||
try {
|
||||
const lock = await SESSION_LOCKS.acquire(sessionFile, {
|
||||
staleMs,
|
||||
timeoutMs,
|
||||
retry: { minTimeout: 50, maxTimeout: 1000, factor: 1 },
|
||||
allowReentrant,
|
||||
metadata: { maxHoldMs },
|
||||
payload: () => {
|
||||
const createdAt = new Date().toISOString();
|
||||
const starttime = getProcessStartTime(process.pid);
|
||||
const lockPayload: LockFilePayload = { pid: process.pid, createdAt };
|
||||
if (starttime !== null) {
|
||||
lockPayload.starttime = starttime;
|
||||
}
|
||||
return lockPayload as Record<string, unknown>;
|
||||
},
|
||||
shouldReclaim: async ({ payload, nowMs, heldByThisProcess }) => {
|
||||
const inspected = inspectLockPayloadForSession({
|
||||
payload: payload as LockFilePayload | null,
|
||||
staleMs,
|
||||
nowMs,
|
||||
heldByThisProcess,
|
||||
reclaimLockWithoutStarttime: true,
|
||||
});
|
||||
return await shouldReclaimContendedLockFile(lockPath, inspected, staleMs, nowMs);
|
||||
},
|
||||
});
|
||||
return { release: lock.release };
|
||||
} catch (err) {
|
||||
if (isFileLockError(err, "file_lock_stale")) {
|
||||
const staleLockPath = (err as { lockPath?: string }).lockPath ?? lockPath;
|
||||
if (
|
||||
await removeReportedStaleLockIfStillStale({
|
||||
lockPath: staleLockPath,
|
||||
normalizedSessionFile,
|
||||
staleMs,
|
||||
})
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
return lockPayload as Record<string, unknown>;
|
||||
},
|
||||
shouldReclaim: async ({ payload, nowMs, heldByThisProcess }) => {
|
||||
const inspected = inspectLockPayloadForSession({
|
||||
payload: payload as LockFilePayload | null,
|
||||
staleMs,
|
||||
nowMs,
|
||||
heldByThisProcess,
|
||||
reclaimLockWithoutStarttime: true,
|
||||
});
|
||||
return await shouldReclaimContendedLockFile(lockPath, inspected, staleMs, nowMs);
|
||||
},
|
||||
});
|
||||
return { release: lock.release };
|
||||
} catch (err) {
|
||||
if ((err as { code?: unknown }).code !== "file_lock_timeout") {
|
||||
throw err;
|
||||
}
|
||||
if (!isFileLockError(err, "file_lock_timeout")) {
|
||||
throw err;
|
||||
}
|
||||
const timeoutLockPath = (err as { lockPath?: string }).lockPath ?? lockPath;
|
||||
const payload = await readLockPayload(timeoutLockPath);
|
||||
const owner = typeof payload?.pid === "number" ? `pid=${payload.pid}` : "unknown";
|
||||
throw new SessionWriteLockTimeoutError({ timeoutMs, owner, lockPath: timeoutLockPath });
|
||||
}
|
||||
const timeoutLockPath = (err as { lockPath?: string }).lockPath ?? lockPath;
|
||||
const payload = await readLockPayload(timeoutLockPath);
|
||||
const owner = typeof payload?.pid === "number" ? `pid=${payload.pid}` : "unknown";
|
||||
throw new SessionWriteLockTimeoutError({ timeoutMs, owner, lockPath: timeoutLockPath });
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user