fix(whatsapp): recover group inbound after reconnect churn

Repair WhatsApp group inbound recovery after repeated reconnect churn while keeping the fallback scoped to reconnect metadata.

Canonical issue: #66920. Related evidence: #7433, #63855, #70856.

Thanks to legonhilltech-jpg, octopuslabs-fl, Kanorin-chan, and stuswan for the reports and reproduction details.
This commit is contained in:
Vincent Koc
2026-04-29 03:54:18 -07:00
committed by GitHub
parent 4f73baf7d7
commit 21a92ea0f6
9 changed files with 269 additions and 29 deletions

View File

@@ -132,6 +132,7 @@ Docs: https://docs.openclaw.ai
- Local model prompt caching: keep stable Project Context above volatile channel/session prompt guidance and stop embedding current channel names in the message tool description, so Ollama, MLX, llama.cpp, and other prefix-cache backends avoid avoidable full prompt reprocessing across channel turns. Fixes #40256; supersedes #40296. Thanks @rhclaw and @sriram369.
- Gateway/OpenAI-compatible API: guard provider policy lookup against runtime providers with non-array `models` values, so `/v1/chat/completions` no longer fails with `provider?.models?.some is not a function`. Fixes #66744; carries forward #66761. Thanks @MightyMoud, @MukundaKatta.
- WhatsApp/Web: pass explicit Baileys socket timings into every WhatsApp Web socket and expose `web.whatsapp.*` keepalive, connect, and query timeout settings so unstable networks can avoid repeated 408 disconnect and opening-handshake timeout loops. Fixes #56365. (#73580) Thanks @velvet-shark.
- WhatsApp/Web: recover recently active listeners when a post-408 reconnect keeps receiving transport frames but stops delivering app messages, while keeping group metadata fallback off Baileys sends. Fixes #63855 and #66920; refs #7433, #67986, #70856, #60007, and #72621. Thanks @legonhilltech-jpg, @octopuslabs-fl, @Kanorin-chan, and @stuswan.
- Channels/Telegram: persist native command metadata on target sessions so topic, helper, and ACP-bound slash commands keep their session metadata attached to the routed conversation. (#57548) Thanks @GaosCode.
- Channels/native commands: keep validated native slash command replies visible in group chats while preserving explicit owner allowlists for command authorization. (#73672) Thanks @obviyus.
- Pairing/doctor: bootstrap `commands.ownerAllowFrom` from the first approved DM pairing when no command owner exists, and have doctor explain missing owners so privileged slash commands are not accidentally unusable after onboarding. Thanks @pashpashpash.

View File

@@ -151,7 +151,7 @@ OpenClaw recommends running WhatsApp on a separate number when possible. (The ch
## Runtime model
- Gateway owns the WhatsApp socket and reconnect loop.
- The reconnect watchdog uses WhatsApp Web transport activity, not only inbound app-message volume, so a quiet linked-device session is not restarted solely because nobody has sent a message recently. A longer application-silence cap still forces a reconnect if transport frames keep arriving but no application messages are handled for the watchdog window.
- The reconnect watchdog uses WhatsApp Web transport activity, not only inbound app-message volume, so a quiet linked-device session is not restarted solely because nobody has sent a message recently. A longer application-silence cap still forces a reconnect if transport frames keep arriving but no application messages are handled for the watchdog window; after a transient reconnect for a recently active session, that application-silence check uses the normal message timeout for the first recovery window.
- Baileys socket timings are explicit under `web.whatsapp.*`: `keepAliveIntervalMs` controls WhatsApp Web application pings, `connectTimeoutMs` controls the opening handshake timeout, and `defaultQueryTimeoutMs` controls Baileys query timeouts.
- Outbound sends require an active WhatsApp listener for the target account.
- Status and broadcast chats are ignored (`@status`, `@broadcast`).

View File

@@ -513,6 +513,51 @@ describe("web auto-reply connection", () => {
}
});
it("recovers a post-408 listener when transport frames continue but app delivery stays silent", async () => {
vi.useFakeTimers();
try {
const { scripted, controller, run } = await startWatchdogScenario({
monitorWebChannel,
});
scripted.resolveClose(0, {
status: 408,
isLoggedOut: false,
error: "status=408 Request Time-out",
});
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBe(2);
},
{ timeout: 250, interval: 2 },
);
const reconnectedSocket = getLastWebAutoReplySessionSocket();
for (let elapsedMs = 0; elapsedMs < 45; elapsedMs += 5) {
reconnectedSocket.ws.emit("frame");
await vi.advanceTimersByTimeAsync(5);
}
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBeGreaterThanOrEqual(3);
},
{ timeout: 250, interval: 2 },
);
controller.abort();
scripted.resolveClose(scripted.getListenerCount() - 1, {
status: 499,
isLoggedOut: false,
error: "aborted",
});
await Promise.resolve();
await run;
} finally {
vi.useRealTimers();
}
});
it("gives a reconnected listener a fresh watchdog window", async () => {
vi.useFakeTimers();
try {

View File

@@ -20,7 +20,7 @@ import {
WhatsAppConnectionController,
type ManagedWhatsAppListener,
} from "../connection-controller.js";
import { attachWebInboxToSocket } from "../inbound/monitor.js";
import { attachWebInboxToSocket, type WhatsAppGroupMetadataCache } from "../inbound/monitor.js";
import {
newConnectionId,
resolveHeartbeatSeconds,
@@ -202,6 +202,7 @@ export async function monitorWebChannel(
}>
>();
const groupMemberNames = new Map<string, Map<string, string>>();
const groupMetadataCache: WhatsAppGroupMetadataCache = new Map();
const echoTracker = createEchoTracker({ maxItems: 100, logVerbose });
const sleep =
@@ -305,6 +306,7 @@ export async function monitorWebChannel(
shouldRetryDisconnect: () => !sigintStop && controller.shouldRetryDisconnect(),
disconnectRetryPolicy: reconnectPolicy,
disconnectRetryAbortSignal: controller.getDisconnectRetryAbortSignal(),
groupMetadataCache,
onMessage: async (msg: WebInboundMsg) => {
const inboundAt = Date.now();
controller.noteInbound(inboundAt);

View File

@@ -45,6 +45,7 @@ export type WhatsAppLiveConnection = {
handledMessages: number;
unregisterUnhandled: (() => void) | null;
unregisterTransportActivity: (() => void) | null;
openedAfterRecentInbound: boolean;
backgroundTasks: Set<Promise<unknown>>;
closePromise: Promise<WebListenerCloseReason>;
resolveClose: (reason: WebListenerCloseReason) => void;
@@ -97,6 +98,7 @@ function createLiveConnection(params: {
connectionId: string;
sock: WASocket;
listener: ManagedWhatsAppListener;
openedAfterRecentInbound: boolean;
}): WhatsAppLiveConnection {
let closeResolved = false;
let resolveClosePromise = (_reason: WebListenerCloseReason) => {};
@@ -122,6 +124,7 @@ function createLiveConnection(params: {
handledMessages: 0,
unregisterUnhandled: null,
unregisterTransportActivity: null,
openedAfterRecentInbound: params.openedAfterRecentInbound,
backgroundTasks: new Set<Promise<unknown>>(),
closePromise,
resolveClose: resolveClosePromise,
@@ -259,6 +262,7 @@ export class WhatsAppConnectionController {
private current: WhatsAppLiveConnection | null = null;
private reconnectAttempts = 0;
private lastHandledInboundAt: number | null = null;
constructor(params: {
accountId: string;
@@ -334,6 +338,8 @@ export class WhatsAppConnectionController {
this.current.handledMessages += 1;
this.current.lastInboundAt = timestamp;
this.current.lastTransportActivityAt = timestamp;
this.current.openedAfterRecentInbound = false;
this.lastHandledInboundAt = timestamp;
}
noteTransportActivity(timestamp = Date.now()): void {
@@ -397,6 +403,7 @@ export class WhatsAppConnectionController {
connectionId: params.connectionId,
sock,
listener: placeholderListener,
openedAfterRecentInbound: this.isOpeningAfterRecentInbound(),
});
const listener = await params.createListener({ sock, connection });
connection.listener = listener;
@@ -602,10 +609,10 @@ export class WhatsAppConnectionController {
const transportStaleForMs = now - connection.lastTransportActivityAt;
const appBaselineAt = connection.lastInboundAt ?? connection.startedAt;
const appSilentForMs = now - appBaselineAt;
if (
transportStaleForMs <= this.transportTimeoutMs &&
appSilentForMs <= this.appSilenceTimeoutMs
) {
const appSilenceTimeoutMs = connection.openedAfterRecentInbound
? this.messageTimeoutMs
: this.appSilenceTimeoutMs;
if (transportStaleForMs <= this.transportTimeoutMs && appSilentForMs <= appSilenceTimeoutMs) {
return;
}
const snapshot = this.getCurrentSnapshot(connection);
@@ -639,6 +646,13 @@ export class WhatsAppConnectionController {
};
}
private isOpeningAfterRecentInbound(): boolean {
if (this.reconnectAttempts <= 0 || this.lastHandledInboundAt === null) {
return false;
}
return Date.now() - this.lastHandledInboundAt <= this.appSilenceTimeoutMs;
}
private stopDisconnectRetries(): void {
if (!this.disconnectRetryController.signal.aborted) {
this.disconnectRetryController.abort();

View File

@@ -2,6 +2,7 @@ import type {
AnyMessageContent,
MiscMessageGenerationOptions,
proto,
GroupMetadata,
WAMessage,
WASocket,
} from "@whiskeysockets/baileys";
@@ -45,6 +46,53 @@ import type { WebInboundMessage, WebListenerCloseReason } from "./types.js";
const LOGGED_OUT_STATUS = DisconnectReason?.loggedOut ?? 401;
const RECONNECT_IN_PROGRESS_ERROR = "no active socket - reconnection in progress";
const GROUP_META_TTL_MS = 5 * 60 * 1000; // 5 minutes
export const WHATSAPP_GROUP_METADATA_CACHE_MAX_ENTRIES = 500;
export type WhatsAppGroupMetadataCacheEntry = {
subject?: string;
expires: number;
};
export type WhatsAppGroupMetadataCache = Map<string, WhatsAppGroupMetadataCacheEntry>;
type LocalGroupMetadataCacheEntry = WhatsAppGroupMetadataCacheEntry & {
participants?: string[];
};
function rememberGroupMetadataCacheEntry<T extends WhatsAppGroupMetadataCacheEntry>(
cache: Map<string, T>,
jid: string,
entry: T,
): void {
if (cache.has(jid)) {
cache.delete(jid);
}
cache.set(jid, entry);
while (cache.size > WHATSAPP_GROUP_METADATA_CACHE_MAX_ENTRIES) {
const oldest = cache.keys().next();
if (oldest.done) {
break;
}
cache.delete(oldest.value);
}
}
function readGroupMetadataCacheEntry<T extends WhatsAppGroupMetadataCacheEntry>(
cache: Map<string, T>,
jid: string,
): T | null {
const entry = cache.get(jid);
if (!entry) {
return null;
}
if (entry.expires <= Date.now()) {
cache.delete(jid);
return null;
}
cache.delete(jid);
cache.set(jid, entry);
return entry;
}
function logWhatsAppVerbose(enabled: boolean | undefined, message: string) {
if (!enabled) {
@@ -98,6 +146,8 @@ export type MonitorWebInboxOptions = {
};
/** Abort in-flight reconnect waits when shutdown becomes terminal. */
disconnectRetryAbortSignal?: AbortSignal;
/** Shared group metadata cache used only for inbound metadata fallback after fetch failures. */
groupMetadataCache?: WhatsAppGroupMetadataCache;
};
export async function attachWebInboxToSocket(
@@ -234,11 +284,8 @@ export async function attachWebInboxToSocket(
inboundConsoleLog.error(`Failed handling inbound web message: ${String(err)}`);
},
});
const groupMetaCache = new Map<
string,
{ subject?: string; participants?: string[]; expires: number }
>();
const GROUP_META_TTL_MS = 5 * 60 * 1000; // 5 minutes
const groupMetadataCache = options.groupMetadataCache ?? new Map();
const groupMetaCache = new Map<string, LocalGroupMetadataCacheEntry>();
const lidLookup = sock.signalRepository?.lidMapping;
const resolveInboundJid = async (jid: string | null | undefined): Promise<string | null> =>
@@ -306,30 +353,54 @@ export async function attachWebInboxToSocket(
}
};
const summarizeGroupMeta = async (meta: GroupMetadata) => {
const participants =
(
await Promise.all(
meta.participants?.map(async (p) => {
const mapped = await resolveInboundJid(p.id);
return mapped ?? p.id;
}) ?? [],
)
).filter(Boolean) ?? [];
return {
subject: meta.subject,
participants,
expires: Date.now() + GROUP_META_TTL_MS,
};
};
const summarizeGroupMetaForReconnectCache = (
meta: GroupMetadata,
): WhatsAppGroupMetadataCacheEntry => ({
subject: meta.subject,
expires: Date.now() + GROUP_META_TTL_MS,
});
const getGroupMeta = async (jid: string) => {
const cached = groupMetaCache.get(jid);
if (cached && cached.expires > Date.now()) {
const cached = readGroupMetadataCacheEntry(groupMetaCache, jid);
if (cached) {
return cached;
}
try {
const meta = await sock.groupMetadata(jid);
const participants =
(
await Promise.all(
meta.participants?.map(async (p) => {
const mapped = await resolveInboundJid(p.id);
return mapped ?? p.id;
}) ?? [],
)
).filter(Boolean) ?? [];
const entry = {
subject: meta.subject,
participants,
expires: Date.now() + GROUP_META_TTL_MS,
};
groupMetaCache.set(jid, entry);
const entry = await summarizeGroupMeta(meta);
rememberGroupMetadataCacheEntry(groupMetadataCache, jid, {
subject: entry.subject,
expires: entry.expires,
});
rememberGroupMetadataCacheEntry(groupMetaCache, jid, entry);
return entry;
} catch (err) {
const hydrated = readGroupMetadataCacheEntry(groupMetadataCache, jid);
if (hydrated) {
rememberGroupMetadataCacheEntry(groupMetaCache, jid, hydrated);
logWhatsAppVerbose(
options.verbose,
`Using cached group metadata for ${jid} after fetch failure: ${String(err)}`,
);
return hydrated;
}
logWhatsAppVerbose(
options.verbose,
`Failed to fetch group metadata for ${jid}: ${String(err)}`,
@@ -733,6 +804,15 @@ export async function attachWebInboxToSocket(
void (async () => {
try {
const groups = await sock.groupFetchAllParticipating();
for (const [jid, meta] of Object.entries(groups ?? {})) {
if (meta) {
rememberGroupMetadataCacheEntry(
groupMetadataCache,
jid,
summarizeGroupMetaForReconnectCache(meta),
);
}
}
logWhatsAppVerbose(
options.verbose,
`Hydrated ${Object.keys(groups ?? {}).length} participating groups on connect`,

View File

@@ -3,6 +3,7 @@ import path from "node:path";
import "./monitor-inbox.test-harness.js";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { WhatsAppRetryableInboundError } from "./inbound/dedupe.js";
import { WHATSAPP_GROUP_METADATA_CACHE_MAX_ENTRIES } from "./inbound/monitor.js";
import {
type InboxMonitorOptions,
InboxOnMessage,
@@ -215,6 +216,93 @@ describe("web monitor inbox", () => {
await listener.close();
});
it("keeps group inbound alive with cached metadata after reconnect-time metadata fetch failures", async () => {
const groupMetadataCache: NonNullable<InboxMonitorOptions["groupMetadataCache"]> = new Map();
const onMessage = vi.fn(async (_msg: Parameters<InboxOnMessage>[0]) => {
return;
});
const firstSock = getSock();
firstSock.groupFetchAllParticipating.mockResolvedValueOnce({
"123@g.us": {
id: "123@g.us",
subject: "Recovered Group",
owner: undefined,
participants: [{ id: "444@s.whatsapp.net" }],
},
});
const first = await startInboxMonitor(onMessage as InboxOnMessage, {
groupMetadataCache,
});
await vi.waitFor(() => {
expect(groupMetadataCache.get("123@g.us")?.subject).toBe("Recovered Group");
});
expect(
(groupMetadataCache.get("123@g.us") as Record<string, unknown>)?.participants,
).toBeUndefined();
await first.listener.close();
const second = await startInboxMonitor(onMessage as InboxOnMessage, {
groupMetadataCache,
});
second.sock.groupMetadata.mockRejectedValueOnce(new Error("408 timed out"));
second.sock.ev.emit(
"messages.upsert",
buildNotifyMessageUpsert({
id: nextMessageId("group-reconnect-cache"),
remoteJid: "123@g.us",
participant: "444@s.whatsapp.net",
text: "ping",
timestamp: 1_700_000_000,
}),
);
await waitForMessageCalls(onMessage, 1);
expect(onMessage).toHaveBeenCalledWith(
expect.objectContaining({
body: "ping",
from: "123@g.us",
groupSubject: "Recovered Group",
senderE164: "+444",
chatType: "group",
}),
);
expect(onMessage.mock.calls[0]?.[0].groupParticipants).toBeUndefined();
await second.listener.close();
});
it("bounds cached group metadata kept across reconnects", async () => {
const groupMetadataCache: NonNullable<InboxMonitorOptions["groupMetadataCache"]> = new Map();
const groups = Object.fromEntries(
Array.from({ length: WHATSAPP_GROUP_METADATA_CACHE_MAX_ENTRIES + 2 }, (_, index) => [
`${index}@g.us`,
{
id: `${index}@g.us`,
subject: `Group ${index}`,
owner: undefined,
participants: [],
},
]),
);
const sock = getSock();
sock.groupFetchAllParticipating.mockResolvedValueOnce(groups);
const { listener } = await startInboxMonitor(vi.fn(async () => {}) as InboxOnMessage, {
groupMetadataCache,
});
await vi.waitFor(() => {
expect(groupMetadataCache.size).toBe(WHATSAPP_GROUP_METADATA_CACHE_MAX_ENTRIES);
});
expect(groupMetadataCache.has("0@g.us")).toBe(false);
expect(groupMetadataCache.has(`${WHATSAPP_GROUP_METADATA_CACHE_MAX_ENTRIES + 1}@g.us`)).toBe(
true,
);
await listener.close();
});
it("does not block inbound listeners while group hydration is pending", async () => {
let resolveHydration!: () => void;
const sock = getSock();

View File

@@ -38,6 +38,7 @@ export type MockSock = {
sendPresenceUpdate: AnyMockFn;
sendMessage: AnyMockFn;
readMessages: AnyMockFn;
groupMetadata: AnyMockFn;
groupFetchAllParticipating: AnyMockFn;
updateMediaMessage: AnyMockFn;
logger: Record<string, unknown>;
@@ -110,6 +111,12 @@ function createMockSock(): MockSock {
sendPresenceUpdate: createResolvedMock(),
sendMessage: createResolvedMock(),
readMessages: createResolvedMock(),
groupMetadata: vi.fn().mockImplementation(async (jid: string) => ({
id: jid,
subject: "Test Group",
owner: undefined,
participants: [],
})),
groupFetchAllParticipating: vi.fn().mockResolvedValue({}),
updateMediaMessage: vi.fn(),
logger: {},

View File

@@ -130,7 +130,10 @@ async function printTerminalQr(qr: string): Promise<void> {
export async function createWaSocket(
printQr: boolean,
verbose: boolean,
opts: { authDir?: string; onQr?: (qr: string) => void } & WhatsAppSocketTimingOptions = {},
opts: {
authDir?: string;
onQr?: (qr: string) => void;
} & WhatsAppSocketTimingOptions = {},
): Promise<ReturnType<typeof makeWASocket>> {
const baseLogger = getChildLogger(
{ module: "baileys" },