feat(signal): support container REST API

Adds container REST/WebSocket support for bbernhard/signal-cli-rest-api Signal deployments.

Closes #10240.

Thanks @Hua688.

Verification:
- pnpm exec oxfmt --check --threads=1 docs/channels/signal.md
- pnpm lint:extensions
- pnpm test extensions/signal
- pnpm tsgo:extensions && pnpm tsgo:test:extensions
- pnpm config:docs:check
- git diff --check
- CI checks on PR head 1d0a536ecd
- Crabbox/Testbox live Docker smoke tbx_01kr7h07shhcafxjc0ezfh946w / run 25614453516
This commit is contained in:
Hua Yang
2026-05-09 20:13:55 -07:00
committed by GitHub
parent 7af50ce47b
commit dff4a04c1f
22 changed files with 2721 additions and 26 deletions

View File

@@ -0,0 +1 @@
- Signal/container mode: add REST API support for bbernhard/signal-cli-rest-api containerized deployments via a unified adapter layer, with automatic mode detection and `channels.signal.apiMode` config. (#10240) Thanks @Hua688.

View File

@@ -1,4 +1,4 @@
7317136882cafd0cfd23146218b7bec4a65a2c08328f7a1f8e22ac04789ec35d config-baseline.json 31ba805dfaa665fe16735d7c2f3a5e9fb0c349aeb5e301881b5da65cee62b1f8 config-baseline.json
8e0f397eaeaa858f016a080e2585c69fa6744cdbb0e7f68d71cfc693971dbc8c config-baseline.core.json 5552e7871057e05d5699f90f536ce58e62d5b8cfc6020d3e7106be7915fed041 config-baseline.core.json
25c6e70d5b4925e07549072159ce4fcad45813fed12fa36a2f43d3568ca8dd96 config-baseline.channel.json 80f0f51caedf14dc2138d975b62852ff7c5cf085df1c734c9de279f5859a7eeb config-baseline.channel.json
af8a8e8616a0146ad989ff1bc0e8cf62c61a4d434dd67bbe7fe082c5c204fada config-baseline.plugin.json 3a08e5422ce1422d9e2b75feac7e44cdcd0c3dc1eea594f664bceec13cbe3f58 config-baseline.plugin.json

View File

@@ -1,17 +1,19 @@
--- ---
summary: "Signal support via signal-cli (JSON-RPC + SSE), setup paths, and number model" summary: "Signal support via signal-cli (native daemon or bbernhard container), setup paths, and number model"
read_when: read_when:
- Setting up Signal support - Setting up Signal support
- Debugging Signal send/receive - Debugging Signal send/receive
title: "Signal" title: "Signal"
--- ---
Status: external CLI integration. Gateway talks to `signal-cli` over HTTP JSON-RPC + SSE. Status: external CLI integration. Gateway talks to `signal-cli` over HTTP — either native daemon (JSON-RPC + SSE) or bbernhard/signal-cli-rest-api container (REST + WebSocket).
## Prerequisites ## Prerequisites
- OpenClaw installed on your server (Linux flow below tested on Ubuntu 24). - OpenClaw installed on your server (Linux flow below tested on Ubuntu 24).
- `signal-cli` available on the host where the gateway runs. - One of:
- `signal-cli` available on the host (native mode), **or**
- `bbernhard/signal-cli-rest-api` Docker container (container mode).
- A phone number that can receive one verification SMS (for SMS registration path). - A phone number that can receive one verification SMS (for SMS registration path).
- Browser access for Signal captcha (`signalcaptchas.org`) during registration. - Browser access for Signal captcha (`signalcaptchas.org`) during registration.
@@ -179,6 +181,63 @@ If you want to manage `signal-cli` yourself (slow JVM cold starts, container ini
This skips auto-spawn and the startup wait inside OpenClaw. For slow starts when auto-spawning, set `channels.signal.startupTimeoutMs`. This skips auto-spawn and the startup wait inside OpenClaw. For slow starts when auto-spawning, set `channels.signal.startupTimeoutMs`.
## Container mode (bbernhard/signal-cli-rest-api)
Instead of running `signal-cli` natively, you can use the [bbernhard/signal-cli-rest-api](https://github.com/bbernhard/signal-cli-rest-api) Docker container. This wraps `signal-cli` behind a REST API and WebSocket interface.
Requirements:
- The container **must** run with `MODE=json-rpc` for real-time message receiving.
- Register or link your Signal account inside the container before connecting OpenClaw.
Example `docker-compose.yml` service:
```yaml
signal-cli:
image: bbernhard/signal-cli-rest-api:latest
environment:
MODE: json-rpc
ports:
- "8080:8080"
volumes:
- signal-cli-data:/home/.local/share/signal-cli
```
OpenClaw config:
```json5
{
channels: {
signal: {
enabled: true,
account: "+15551234567",
httpUrl: "http://signal-cli:8080",
autoStart: false,
apiMode: "container", // or "auto" to detect automatically
},
},
}
```
The `apiMode` field controls which protocol OpenClaw uses:
| Value | Behavior |
| ------------- | ------------------------------------------------------------------------------------ |
| `"auto"` | (Default) Probes both transports; streaming validates container WebSocket receive |
| `"native"` | Force native signal-cli (JSON-RPC at `/api/v1/rpc`, SSE at `/api/v1/events`) |
| `"container"` | Force bbernhard container (REST at `/v2/send`, WebSocket at `/v1/receive/{account}`) |
When `apiMode` is `"auto"`, OpenClaw caches the detected mode for 30 seconds to avoid repeated probes. Container receive is only selected for streaming after `/v1/receive/{account}` upgrades to WebSocket, which requires `MODE=json-rpc`.
Container mode supports the same Signal channel operations as native mode where the container exposes matching APIs: sends, receives, attachments, typing indicators, read/viewed receipts, reactions, groups, and styled text. OpenClaw translates its native Signal RPC calls into the container's REST payloads, including `group.{base64(internal_id)}` group IDs and `text_mode: "styled"` for formatted text.
Operational notes:
- Use `autoStart: false` with container mode. OpenClaw should not spawn a native daemon when `apiMode: "container"` is selected.
- Use `MODE=json-rpc` for receiving. `MODE=normal` can make `/v1/about` look healthy, but `/v1/receive/{account}` does not WebSocket-upgrade, so OpenClaw will not select container receive streaming in `auto` mode.
- Set `apiMode: "container"` when you know the `httpUrl` points at bbernhard's REST API. Set `apiMode: "native"` when you know it points at native `signal-cli` JSON-RPC/SSE. Use `"auto"` when the deployment may vary.
- Container attachment downloads honor the same media byte limits as native mode. Oversized responses are rejected before being fully buffered when the server sends `Content-Length`, and while streaming otherwise.
## Access control (DMs + groups) ## Access control (DMs + groups)
DMs: DMs:
@@ -202,7 +261,8 @@ Groups:
## How it works (behavior) ## How it works (behavior)
- `signal-cli` runs as a daemon; the gateway reads events via SSE. - Native mode: `signal-cli` runs as a daemon; the gateway reads events via SSE.
- Container mode: the gateway sends via REST API and receives via WebSocket.
- Inbound messages are normalized into the shared channel envelope. - Inbound messages are normalized into the shared channel envelope.
- Replies always route back to the same number or group. - Replies always route back to the same number or group.
@@ -302,6 +362,7 @@ Full configuration: [Configuration](/gateway/configuration)
Provider options: Provider options:
- `channels.signal.enabled`: enable/disable channel startup. - `channels.signal.enabled`: enable/disable channel startup.
- `channels.signal.apiMode`: `auto | native | container` (default: auto). See [Container mode](#container-mode-bbernhardsignal-cli-rest-api).
- `channels.signal.account`: E.164 for the bot account. - `channels.signal.account`: E.164 for the bot account.
- `channels.signal.cliPath`: path to `signal-cli`. - `channels.signal.cliPath`: path to `signal-cli`.
- `channels.signal.httpUrl`: full daemon URL (overrides host/port). - `channels.signal.httpUrl`: full daemon URL (overrides host/port).

View File

@@ -4,6 +4,9 @@
"private": true, "private": true,
"description": "OpenClaw Signal channel plugin", "description": "OpenClaw Signal channel plugin",
"type": "module", "type": "module",
"dependencies": {
"ws": "^8.20.0"
},
"devDependencies": { "devDependencies": {
"@openclaw/plugin-sdk": "workspace:*" "@openclaw/plugin-sdk": "workspace:*"
}, },

View File

@@ -347,7 +347,9 @@ export const signalPlugin: ChannelPlugin<ResolvedSignalAccount, SignalProbe> =
probeAccount: async ({ account, timeoutMs }) => { probeAccount: async ({ account, timeoutMs }) => {
const baseUrl = account.baseUrl; const baseUrl = account.baseUrl;
const { probeSignal } = await loadSignalProbeModule(); const { probeSignal } = await loadSignalProbeModule();
return await probeSignal(baseUrl, timeoutMs); return await probeSignal(baseUrl, timeoutMs, {
apiMode: account.config?.apiMode ?? "auto",
});
}, },
formatCapabilitiesProbe: ({ probe }) => formatCapabilitiesProbe: ({ probe }) =>
probe?.version ? [{ text: `Signal daemon: ${probe.version}` }] : [], probe?.version ? [{ text: `Signal daemon: ${probe.version}` }] : [],

View File

@@ -0,0 +1,618 @@
import { describe, expect, it, vi, beforeEach } from "vitest";
import {
signalRpcRequest as signalRpcRequestImpl,
detectSignalApiMode,
signalCheck as signalCheckImpl,
streamSignalEvents as streamSignalEventsImpl,
fetchAttachment as fetchAttachmentImpl,
type SignalApiMode,
} from "./client-adapter.js";
import * as containerClientModule from "./client-container.js";
import * as nativeClientModule from "./client.js";
const mockNativeCheck = vi.fn();
const mockNativeRpcRequest = vi.fn();
const mockNativeStreamEvents = vi.fn();
const mockContainerCheck = vi.fn();
const mockContainerRpcRequest = vi.fn();
const mockContainerFetchAttachment = vi.fn();
const mockStreamContainerEvents = vi.fn();
let currentApiMode: SignalApiMode = "auto";
beforeEach(() => {
vi.spyOn(nativeClientModule, "signalCheck").mockImplementation(mockNativeCheck as any);
vi.spyOn(nativeClientModule, "signalRpcRequest").mockImplementation(mockNativeRpcRequest as any);
vi.spyOn(nativeClientModule, "streamSignalEvents").mockImplementation(
mockNativeStreamEvents as any,
);
vi.spyOn(containerClientModule, "containerCheck").mockImplementation(mockContainerCheck as any);
vi.spyOn(containerClientModule, "containerRpcRequest").mockImplementation(
mockContainerRpcRequest as any,
);
vi.spyOn(containerClientModule, "containerFetchAttachment").mockImplementation(
mockContainerFetchAttachment as any,
);
vi.spyOn(containerClientModule, "streamContainerEvents").mockImplementation(
mockStreamContainerEvents as any,
);
});
function setApiMode(mode: SignalApiMode) {
currentApiMode = mode;
}
function signalRpcRequest<T = unknown>(
method: string,
params: Record<string, unknown> | undefined,
opts: Parameters<typeof signalRpcRequestImpl>[2],
) {
return signalRpcRequestImpl<T>(method, params, { ...opts, apiMode: currentApiMode });
}
function signalCheck(baseUrl: string, timeoutMs?: number) {
return signalCheckImpl(baseUrl, timeoutMs, { apiMode: currentApiMode });
}
function streamSignalEvents(params: Parameters<typeof streamSignalEventsImpl>[0]) {
return streamSignalEventsImpl({ ...params, apiMode: currentApiMode });
}
function fetchAttachment(params: Parameters<typeof fetchAttachmentImpl>[0]) {
return fetchAttachmentImpl({ ...params, apiMode: currentApiMode });
}
describe("detectSignalApiMode", () => {
beforeEach(() => {
vi.clearAllMocks();
setApiMode("native");
});
it("returns native when native endpoint responds", async () => {
mockNativeCheck.mockResolvedValue({ ok: true, status: 200 });
mockContainerCheck.mockResolvedValue({ ok: false, status: 404 });
const result = await detectSignalApiMode("http://localhost:8080");
expect(result).toBe("native");
});
it("returns container when only container endpoint responds", async () => {
mockNativeCheck.mockResolvedValue({ ok: false, status: 404 });
mockContainerCheck.mockResolvedValue({ ok: true, status: 200 });
const result = await detectSignalApiMode("http://localhost:8080");
expect(result).toBe("container");
});
it("prefers native when both endpoints respond", async () => {
mockNativeCheck.mockResolvedValue({ ok: true, status: 200 });
mockContainerCheck.mockResolvedValue({ ok: true, status: 200 });
const result = await detectSignalApiMode("http://localhost:8080");
expect(result).toBe("native");
});
it("throws error when neither endpoint responds", async () => {
mockNativeCheck.mockResolvedValue({ ok: false, status: null, error: "Connection refused" });
mockContainerCheck.mockResolvedValue({ ok: false, status: null, error: "Connection refused" });
await expect(detectSignalApiMode("http://localhost:8080")).rejects.toThrow(
"Signal API not reachable at http://localhost:8080",
);
});
it("handles exceptions from check functions", async () => {
mockNativeCheck.mockRejectedValue(new Error("Network error"));
mockContainerCheck.mockRejectedValue(new Error("Network error"));
await expect(detectSignalApiMode("http://localhost:8080")).rejects.toThrow(
"Signal API not reachable",
);
});
it("respects timeout parameter", async () => {
mockNativeCheck.mockResolvedValue({ ok: true, status: 200 });
mockContainerCheck.mockResolvedValue({ ok: false });
await detectSignalApiMode("http://localhost:8080", 5000);
expect(mockNativeCheck).toHaveBeenCalledWith("http://localhost:8080", 5000);
expect(mockContainerCheck).toHaveBeenCalledWith("http://localhost:8080", 5000);
});
it("requires a working container receive WebSocket when requested", async () => {
mockNativeCheck.mockResolvedValue({ ok: false, status: 404 });
mockContainerCheck.mockResolvedValue({ ok: true, status: 101 });
const result = await detectSignalApiMode("http://localhost:8080", 5000, {
account: "+14259798283",
requireContainerReceive: true,
});
expect(result).toBe("container");
expect(mockContainerCheck).toHaveBeenCalledWith("http://localhost:8080", 5000, "+14259798283");
});
it("does not select container receive mode without an account", async () => {
mockNativeCheck.mockResolvedValue({ ok: false, status: 404 });
await expect(
detectSignalApiMode("http://localhost:8080", 5000, {
requireContainerReceive: true,
}),
).rejects.toThrow("Signal API not reachable");
expect(mockContainerCheck).not.toHaveBeenCalled();
});
});
describe("signalRpcRequest", () => {
beforeEach(() => {
vi.clearAllMocks();
setApiMode("native");
});
it("routes to native JSON-RPC for native mode", async () => {
mockNativeRpcRequest.mockResolvedValue({ timestamp: 1700000000000 });
const result = await signalRpcRequest(
"send",
{ message: "Hello", account: "+14259798283", recipient: ["+15550001111"] },
{ baseUrl: "http://localhost:8080" },
);
expect(result).toEqual({ timestamp: 1700000000000 });
expect(mockNativeRpcRequest).toHaveBeenCalledWith(
"send",
expect.objectContaining({ message: "Hello" }),
expect.objectContaining({ baseUrl: "http://localhost:8080" }),
);
expect(mockContainerRpcRequest).not.toHaveBeenCalled();
});
it("routes to container RPC for container mode", async () => {
setApiMode("container");
mockContainerRpcRequest.mockResolvedValue({ timestamp: 1700000000000 });
const result = await signalRpcRequest(
"send",
{ message: "Hello", account: "+14259798283", recipient: ["+15550001111"] },
{ baseUrl: "http://localhost:8080" },
);
expect(result).toEqual({ timestamp: 1700000000000 });
expect(mockContainerRpcRequest).toHaveBeenCalledWith(
"send",
expect.objectContaining({ message: "Hello" }),
expect.objectContaining({ baseUrl: "http://localhost:8080" }),
);
expect(mockNativeRpcRequest).not.toHaveBeenCalled();
});
it("uses explicit apiMode from the active config", async () => {
setApiMode("native");
mockContainerRpcRequest.mockResolvedValue({ timestamp: 1700000000000 });
const result = await signalRpcRequestImpl(
"send",
{ message: "Hello", account: "+14259798283", recipient: ["+15550001111"] },
{ baseUrl: "http://localhost:8080", apiMode: "container" },
);
expect(result).toEqual({ timestamp: 1700000000000 });
expect(mockContainerRpcRequest).toHaveBeenCalled();
expect(mockNativeRpcRequest).not.toHaveBeenCalled();
});
it("passes all RPC methods through to native", async () => {
mockNativeRpcRequest.mockResolvedValue({});
await signalRpcRequest(
"sendTyping",
{ account: "+1", recipient: ["+2"] },
{ baseUrl: "http://localhost:8080" },
);
expect(mockNativeRpcRequest).toHaveBeenCalledWith(
"sendTyping",
expect.anything(),
expect.anything(),
);
});
it("passes all RPC methods through to container", async () => {
setApiMode("container");
mockContainerRpcRequest.mockResolvedValue({});
await signalRpcRequest(
"sendReceipt",
{ account: "+1", recipient: ["+2"] },
{ baseUrl: "http://localhost:8080" },
);
expect(mockContainerRpcRequest).toHaveBeenCalledWith(
"sendReceipt",
expect.anything(),
expect.anything(),
);
});
});
describe("signalCheck", () => {
beforeEach(() => {
vi.clearAllMocks();
setApiMode("native");
});
it("uses native check for native mode", async () => {
mockNativeCheck.mockResolvedValue({ ok: true, status: 200 });
const result = await signalCheck("http://localhost:8080");
expect(result).toEqual({ ok: true, status: 200 });
expect(mockNativeCheck).toHaveBeenCalledWith("http://localhost:8080", 10000);
expect(mockContainerCheck).not.toHaveBeenCalled();
});
it("uses container check for container mode", async () => {
setApiMode("container");
mockContainerCheck.mockResolvedValue({ ok: true, status: 200 });
const result = await signalCheck("http://localhost:8080");
expect(result).toEqual({ ok: true, status: 200 });
expect(mockContainerCheck).toHaveBeenCalledWith("http://localhost:8080", 10000);
expect(mockNativeCheck).not.toHaveBeenCalled();
});
it("respects timeout parameter", async () => {
mockNativeCheck.mockResolvedValue({ ok: true });
await signalCheck("http://localhost:8080", 5000);
expect(mockNativeCheck).toHaveBeenCalledWith("http://localhost:8080", 5000);
});
it("uses the caller timeout for auto detection", async () => {
setApiMode("auto");
mockNativeCheck.mockResolvedValue({ ok: false, status: null, error: "Connection refused" });
mockContainerCheck.mockResolvedValue({ ok: false, status: null, error: "Connection refused" });
await signalCheck("http://auto-timeout.local:8080", 1000);
expect(mockNativeCheck).toHaveBeenCalledWith("http://auto-timeout.local:8080", 1000);
expect(mockContainerCheck).toHaveBeenCalledWith("http://auto-timeout.local:8080", 1000);
});
it("returns a retryable failure when auto detection is not ready", async () => {
setApiMode("auto");
mockNativeCheck.mockResolvedValue({ ok: false, status: null, error: "Connection refused" });
mockContainerCheck.mockResolvedValue({ ok: false, status: null, error: "Connection refused" });
await expect(signalCheck("http://localhost:8080")).resolves.toEqual({
ok: false,
status: null,
error: "Signal API not reachable at http://localhost:8080",
});
});
});
describe("streamSignalEvents", () => {
beforeEach(() => {
vi.clearAllMocks();
setApiMode("native");
});
it("uses native SSE for native mode", async () => {
mockNativeStreamEvents.mockResolvedValue(undefined);
const onEvent = vi.fn();
await streamSignalEvents({
baseUrl: "http://localhost:8080",
account: "+14259798283",
onEvent,
});
expect(mockNativeStreamEvents).toHaveBeenCalledWith(
expect.objectContaining({
baseUrl: "http://localhost:8080",
account: "+14259798283",
}),
);
expect(mockStreamContainerEvents).not.toHaveBeenCalled();
});
it("uses container WebSocket for container mode", async () => {
setApiMode("container");
mockStreamContainerEvents.mockResolvedValue(undefined);
const onEvent = vi.fn();
await streamSignalEvents({
baseUrl: "http://localhost:8080",
account: "+14259798283",
onEvent,
});
expect(mockStreamContainerEvents).toHaveBeenCalledWith(
expect.objectContaining({
baseUrl: "http://localhost:8080",
account: "+14259798283",
}),
);
expect(mockNativeStreamEvents).not.toHaveBeenCalled();
});
it("passes native SSE events through unchanged", async () => {
const payload = { envelope: { sourceNumber: "+1555000111" } };
mockNativeStreamEvents.mockImplementation(async (params) => {
params.onEvent({ event: "receive", data: JSON.stringify(payload) });
});
const events: unknown[] = [];
await streamSignalEvents({
baseUrl: "http://localhost:8080",
onEvent: (evt) => events.push(evt),
});
expect(events).toHaveLength(1);
expect(events[0]).toEqual({ event: "receive", data: JSON.stringify(payload) });
});
it("converts container events to SSE-like receive events", async () => {
setApiMode("container");
mockStreamContainerEvents.mockImplementation(async (params) => {
params.onEvent({ envelope: { sourceNumber: "+1555000111" } });
});
const events: unknown[] = [];
await streamSignalEvents({
baseUrl: "http://localhost:8080",
onEvent: (evt) => events.push(evt),
});
expect(events).toHaveLength(1);
expect(events[0]).toEqual({
event: "receive",
data: JSON.stringify({ envelope: { sourceNumber: "+1555000111" } }),
});
});
it("passes abort signal to underlying stream", async () => {
mockNativeStreamEvents.mockResolvedValue(undefined);
const abortController = new AbortController();
await streamSignalEvents({
baseUrl: "http://localhost:8080",
abortSignal: abortController.signal,
onEvent: vi.fn(),
});
expect(mockNativeStreamEvents).toHaveBeenCalledWith(
expect.objectContaining({
abortSignal: abortController.signal,
}),
);
});
it("forwards timeout to native SSE stream", async () => {
mockNativeStreamEvents.mockResolvedValue(undefined);
await streamSignalEvents({
baseUrl: "http://localhost:8080",
timeoutMs: 45000,
onEvent: vi.fn(),
});
expect(mockNativeStreamEvents).toHaveBeenCalledWith(
expect.objectContaining({
timeoutMs: 45000,
}),
);
});
it("uses a positive probe timeout while preserving zero stream timeout", async () => {
setApiMode("auto");
mockNativeCheck.mockResolvedValue({ ok: true, status: 200 });
mockContainerCheck.mockResolvedValue({ ok: false, status: 404 });
mockNativeStreamEvents.mockResolvedValue(undefined);
await streamSignalEvents({
baseUrl: "http://zero-timeout.local:8080",
account: "+14259798283",
timeoutMs: 0,
onEvent: vi.fn(),
});
expect(mockNativeCheck).toHaveBeenCalledWith("http://zero-timeout.local:8080", 10000);
expect(mockContainerCheck).toHaveBeenCalledWith(
"http://zero-timeout.local:8080",
10000,
"+14259798283",
);
expect(mockNativeStreamEvents).toHaveBeenCalledWith(
expect.objectContaining({
timeoutMs: 0,
}),
);
});
it("forwards timeout to container event stream", async () => {
setApiMode("container");
mockStreamContainerEvents.mockResolvedValue(undefined);
await streamSignalEvents({
baseUrl: "http://localhost:8080",
timeoutMs: 45000,
onEvent: vi.fn(),
});
expect(mockStreamContainerEvents).toHaveBeenCalledWith(
expect.objectContaining({
timeoutMs: 45000,
}),
);
});
it("revalidates an unvalidated cached container mode before streaming", async () => {
setApiMode("auto");
mockNativeCheck.mockResolvedValue({ ok: false, status: 404 });
mockContainerCheck
.mockResolvedValueOnce({ ok: true, status: 200 })
.mockResolvedValueOnce({ ok: true, status: 200 })
.mockResolvedValueOnce({
ok: false,
status: 200,
error: "Signal container receive endpoint did not upgrade to WebSocket (HTTP 200)",
});
await expect(signalCheck("http://auto-cache.local:8080")).resolves.toEqual({
ok: true,
status: 200,
});
await expect(
streamSignalEvents({
baseUrl: "http://auto-cache.local:8080",
account: "+14259798283",
onEvent: vi.fn(),
}),
).rejects.toThrow("Signal API not reachable at http://auto-cache.local:8080");
expect(mockStreamContainerEvents).not.toHaveBeenCalled();
expect(mockContainerCheck).toHaveBeenLastCalledWith(
"http://auto-cache.local:8080",
10000,
"+14259798283",
);
});
});
describe("fetchAttachment", () => {
beforeEach(() => {
vi.clearAllMocks();
setApiMode("native");
});
it("uses native JSON-RPC for native mode with sender", async () => {
mockNativeRpcRequest.mockResolvedValue({ data: "base64data" });
const result = await fetchAttachment({
baseUrl: "http://localhost:8080",
account: "+14259798283",
attachmentId: "attachment-123",
sender: "+15550001111",
});
expect(result).toBeInstanceOf(Buffer);
expect(mockNativeRpcRequest).toHaveBeenCalledWith(
"getAttachment",
expect.objectContaining({
id: "attachment-123",
account: "+14259798283",
recipient: "+15550001111",
}),
expect.anything(),
);
});
it("uses container REST for container mode", async () => {
setApiMode("container");
const mockBuffer = Buffer.from([0x89, 0x50, 0x4e, 0x47]);
mockContainerFetchAttachment.mockResolvedValue(mockBuffer);
const result = await fetchAttachment({
baseUrl: "http://localhost:8080",
attachmentId: "attachment-123",
});
expect(result).toBe(mockBuffer);
expect(mockContainerFetchAttachment).toHaveBeenCalledWith(
"attachment-123",
expect.objectContaining({ baseUrl: "http://localhost:8080" }),
);
});
it("returns null for native mode without sender or groupId", async () => {
const result = await fetchAttachment({
baseUrl: "http://localhost:8080",
attachmentId: "attachment-123",
});
expect(result).toBeNull();
expect(mockNativeRpcRequest).not.toHaveBeenCalled();
});
it("uses groupId when provided for native mode", async () => {
mockNativeRpcRequest.mockResolvedValue({ data: "base64data" });
await fetchAttachment({
baseUrl: "http://localhost:8080",
attachmentId: "attachment-123",
groupId: "group-123",
});
expect(mockNativeRpcRequest).toHaveBeenCalledWith(
"getAttachment",
expect.objectContaining({ groupId: "group-123" }),
expect.anything(),
);
});
it("returns null when native RPC returns no data", async () => {
mockNativeRpcRequest.mockResolvedValue({});
const result = await fetchAttachment({
baseUrl: "http://localhost:8080",
attachmentId: "attachment-123",
sender: "+15550001111",
});
expect(result).toBeNull();
});
it("prefers groupId over sender when both provided", async () => {
mockNativeRpcRequest.mockResolvedValue({ data: "base64data" });
await fetchAttachment({
baseUrl: "http://localhost:8080",
attachmentId: "attachment-123",
sender: "+15550001111",
groupId: "group-123",
});
const callParams = mockNativeRpcRequest.mock.calls[0][1];
expect(callParams).toHaveProperty("groupId", "group-123");
expect(callParams).not.toHaveProperty("recipient");
});
it("passes timeout to container fetch", async () => {
setApiMode("container");
mockContainerFetchAttachment.mockResolvedValue(Buffer.from([]));
await fetchAttachment({
baseUrl: "http://localhost:8080",
attachmentId: "attachment-123",
timeoutMs: 60000,
});
expect(mockContainerFetchAttachment).toHaveBeenCalledWith(
"attachment-123",
expect.objectContaining({
timeoutMs: 60000,
}),
);
});
it("passes max response bytes to container fetch", async () => {
setApiMode("container");
mockContainerFetchAttachment.mockResolvedValue(Buffer.from([]));
await fetchAttachment({
baseUrl: "http://localhost:8080",
attachmentId: "attachment-123",
maxResponseBytes: 4096,
});
expect(mockContainerFetchAttachment).toHaveBeenCalledWith(
"attachment-123",
expect.objectContaining({
maxResponseBytes: 4096,
}),
);
});
});

View File

@@ -0,0 +1,274 @@
/**
* Signal client adapter - unified interface for both native signal-cli and bbernhard container.
*
* This adapter provides a single API that routes to the appropriate implementation
* based on the configured API mode. Exports mirror client.ts names so consumers
* only need to change their import path.
*/
import {
containerCheck,
containerRpcRequest,
streamContainerEvents,
containerFetchAttachment,
} from "./client-container.js";
import type { SignalRpcOptions } from "./client.js";
import {
signalCheck as nativeCheck,
signalRpcRequest as nativeRpcRequest,
streamSignalEvents as nativeStreamEvents,
} from "./client.js";
const DEFAULT_TIMEOUT_MS = 10_000;
const MODE_CACHE_TTL_MS = 30_000;
export type SignalSseEvent = {
event?: string;
data?: string;
};
export type SignalApiMode = "native" | "container" | "auto";
// Re-export the options type so consumers can import it from the adapter.
export type { SignalRpcOptions } from "./client.js";
// Cache auto-detected modes per baseUrl to avoid repeated network probes.
const detectedModeCache = new Map<
string,
{ mode: "native" | "container"; expiresAt: number; receiveAccount?: string }
>();
function resolveConfiguredApiMode(configured?: SignalApiMode): SignalApiMode {
if (configured === "native" || configured === "container") {
return configured;
}
return "auto";
}
function formatErrorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
function resolveAutoProbeTimeoutMs(timeoutMs: number | undefined): number {
return typeof timeoutMs === "number" && Number.isFinite(timeoutMs) && timeoutMs > 0
? timeoutMs
: DEFAULT_TIMEOUT_MS;
}
async function resolveAutoApiMode(
baseUrl: string,
timeoutMs = DEFAULT_TIMEOUT_MS,
options: { account?: string; requireContainerReceive?: boolean } = {},
): Promise<"native" | "container"> {
const cached = detectedModeCache.get(baseUrl);
if (cached && cached.expiresAt > Date.now()) {
if (
cached.mode !== "container" ||
!options.requireContainerReceive ||
cached.receiveAccount === options.account
) {
return cached.mode;
}
}
const detected = await detectSignalApiMode(baseUrl, timeoutMs, options);
detectedModeCache.set(baseUrl, {
mode: detected,
expiresAt: Date.now() + MODE_CACHE_TTL_MS,
...(detected === "container" && options.requireContainerReceive && options.account
? { receiveAccount: options.account }
: {}),
});
return detected;
}
async function resolveApiModeForOperation(params: {
baseUrl: string;
accountId?: string;
account?: string;
requireContainerReceive?: boolean;
timeoutMs?: number;
apiMode?: SignalApiMode;
}): Promise<"native" | "container"> {
const configured = resolveConfiguredApiMode(params.apiMode);
if (configured === "native" || configured === "container") {
return configured;
}
return resolveAutoApiMode(params.baseUrl, params.timeoutMs ?? DEFAULT_TIMEOUT_MS, {
account: params.account,
requireContainerReceive: params.requireContainerReceive,
});
}
/**
* Detect which Signal API mode is available by probing endpoints.
* First endpoint to respond OK wins.
*/
export async function detectSignalApiMode(
baseUrl: string,
timeoutMs = DEFAULT_TIMEOUT_MS,
options: { account?: string; requireContainerReceive?: boolean } = {},
): Promise<"native" | "container"> {
const nativePromise = nativeCheck(baseUrl, timeoutMs).then((r) =>
r.ok ? ("native" as const) : Promise.reject(new Error("native not ok")),
);
const containerAccount = options.requireContainerReceive ? options.account?.trim() : undefined;
const containerPromise = containerAccount
? containerCheck(baseUrl, timeoutMs, containerAccount).then((r) =>
r.ok ? ("container" as const) : Promise.reject(new Error("container not ok")),
)
: options.requireContainerReceive
? Promise.reject(new Error("container receive account required"))
: containerCheck(baseUrl, timeoutMs).then((r) =>
r.ok ? ("container" as const) : Promise.reject(new Error("container not ok")),
);
try {
return await Promise.any([nativePromise, containerPromise]);
} catch {
throw new Error(`Signal API not reachable at ${baseUrl}`);
}
}
/**
* Drop-in replacement for native signalRpcRequest.
* Routes to native JSON-RPC or container REST based on config.
*/
export async function signalRpcRequest<T = unknown>(
method: string,
params: Record<string, unknown> | undefined,
opts: SignalRpcOptions & { accountId?: string; apiMode?: SignalApiMode },
): Promise<T> {
const mode = await resolveApiModeForOperation({
baseUrl: opts.baseUrl,
accountId: opts.accountId,
account: typeof params?.account === "string" ? params.account : undefined,
timeoutMs: opts.timeoutMs,
apiMode: opts.apiMode,
});
if (mode === "native") {
return nativeRpcRequest<T>(method, params, opts);
}
return containerRpcRequest<T>(method, params, opts);
}
/**
* Drop-in replacement for native signalCheck.
*/
export async function signalCheck(
baseUrl: string,
timeoutMs = DEFAULT_TIMEOUT_MS,
options: { apiMode?: SignalApiMode } = {},
): Promise<{ ok: boolean; status?: number | null; error?: string | null }> {
const configured = resolveConfiguredApiMode(options.apiMode);
const mode =
configured === "auto"
? await resolveAutoApiMode(baseUrl, timeoutMs).catch((error: unknown) => {
return { ok: false, status: null, error: formatErrorMessage(error) } as const;
})
: configured;
if (typeof mode !== "string") {
return mode;
}
if (mode === "container") {
return containerCheck(baseUrl, timeoutMs);
}
return nativeCheck(baseUrl, timeoutMs);
}
/**
* Drop-in replacement for native streamSignalEvents.
* Container mode uses WebSocket; native uses SSE.
*/
export async function streamSignalEvents(params: {
baseUrl: string;
account?: string;
accountId?: string;
abortSignal?: AbortSignal;
timeoutMs?: number;
onEvent: (event: SignalSseEvent) => void;
logger?: { log?: (msg: string) => void; error?: (msg: string) => void };
apiMode?: SignalApiMode;
}): Promise<void> {
const mode = await resolveApiModeForOperation({
baseUrl: params.baseUrl,
accountId: params.accountId,
account: params.account,
requireContainerReceive: true,
timeoutMs: resolveAutoProbeTimeoutMs(params.timeoutMs),
apiMode: params.apiMode,
});
if (mode === "container") {
return streamContainerEvents({
baseUrl: params.baseUrl,
account: params.account,
abortSignal: params.abortSignal,
timeoutMs: params.timeoutMs,
onEvent: (event) => params.onEvent({ event: "receive", data: JSON.stringify(event) }),
logger: params.logger,
});
}
return nativeStreamEvents({
baseUrl: params.baseUrl,
account: params.account,
abortSignal: params.abortSignal,
timeoutMs: params.timeoutMs,
onEvent: (event) => params.onEvent(event),
});
}
/**
* Fetch attachment, routing to native or container implementation.
*/
export async function fetchAttachment(params: {
baseUrl: string;
account?: string;
accountId?: string;
attachmentId: string;
sender?: string;
groupId?: string;
timeoutMs?: number;
maxResponseBytes?: number;
apiMode?: SignalApiMode;
}): Promise<Buffer | null> {
const mode = await resolveApiModeForOperation({
baseUrl: params.baseUrl,
accountId: params.accountId,
account: params.account,
timeoutMs: params.timeoutMs,
apiMode: params.apiMode,
});
if (mode === "container") {
return containerFetchAttachment(params.attachmentId, {
baseUrl: params.baseUrl,
timeoutMs: params.timeoutMs,
maxResponseBytes: params.maxResponseBytes,
});
}
const rpcParams: Record<string, unknown> = {
id: params.attachmentId,
};
if (params.account) {
rpcParams.account = params.account;
}
if (params.groupId) {
rpcParams.groupId = params.groupId;
} else if (params.sender) {
rpcParams.recipient = params.sender;
} else {
return null;
}
const result = await nativeRpcRequest<{ data?: string }>("getAttachment", rpcParams, {
baseUrl: params.baseUrl,
timeoutMs: params.timeoutMs,
maxResponseBytes: params.maxResponseBytes,
});
if (!result?.data) {
return null;
}
return Buffer.from(result.data, "base64");
}

View File

@@ -0,0 +1,922 @@
import * as fetchModule from "openclaw/plugin-sdk/fetch-runtime";
import { describe, expect, it, vi, beforeEach } from "vitest";
import {
containerCheck,
containerRestRequest,
containerSendMessage,
containerSendTyping,
containerSendReceipt,
containerFetchAttachment,
containerRpcRequest,
containerSendReaction,
containerRemoveReaction,
streamContainerEvents,
} from "./client-container.js";
// spyOn approach works with vitest forks pool for cross-directory imports
const mockFetch = vi.fn();
const wsMockState = vi.hoisted(() => ({
behavior: "close" as "close" | "open" | "error" | "unexpected-response",
urls: [] as string[],
}));
beforeEach(() => {
vi.spyOn(fetchModule, "resolveFetch").mockReturnValue(mockFetch as unknown as typeof fetch);
wsMockState.behavior = "close";
wsMockState.urls = [];
});
// Minimal WebSocket mock for connection-log assertions.
vi.mock("ws", () => ({
default: class MockWebSocket {
private handlers = new Map<string, Array<(...args: unknown[]) => void>>();
constructor(url: string | URL) {
wsMockState.urls.push(String(url));
setTimeout(() => {
if (wsMockState.behavior === "open") {
this.emit("open");
} else if (wsMockState.behavior === "error") {
this.emit("error", new Error("WebSocket failed"));
} else if (wsMockState.behavior === "unexpected-response") {
this.emit("unexpected-response", {}, { statusCode: 200, statusMessage: "OK" });
} else {
this.emit("close", 1000, Buffer.from("done"));
}
}, 0);
}
on(event: string, callback: (...args: unknown[]) => void) {
const handlers = this.handlers.get(event) ?? [];
handlers.push(callback);
this.handlers.set(event, handlers);
return this;
}
once(event: string, callback: (...args: unknown[]) => void) {
const onceCallback = (...args: unknown[]) => {
this.handlers.set(
event,
(this.handlers.get(event) ?? []).filter((handler) => handler !== onceCallback),
);
callback(...args);
};
return this.on(event, onceCallback);
}
close() {
this.emit("close", 1000, Buffer.from("done"));
}
terminate() {}
private emit(event: string, ...args: unknown[]) {
for (const handler of this.handlers.get(event) ?? []) {
handler(...args);
}
}
},
}));
describe("containerCheck", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("returns ok:true when /v1/about returns 200", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
});
const result = await containerCheck("http://localhost:8080");
expect(result).toEqual({ ok: true, status: 200, error: null });
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/about",
expect.objectContaining({ method: "GET" }),
);
});
it("returns ok:false when /v1/about returns 404", async () => {
mockFetch.mockResolvedValue({
ok: false,
status: 404,
});
const result = await containerCheck("http://localhost:8080");
expect(result).toEqual({ ok: false, status: 404, error: "HTTP 404" });
});
it("returns ok:false with error message on fetch failure", async () => {
mockFetch.mockRejectedValue(new Error("Network error"));
const result = await containerCheck("http://localhost:8080");
expect(result).toEqual({ ok: false, status: null, error: "Network error" });
});
it("normalizes base URL by removing trailing slash", async () => {
mockFetch.mockResolvedValue({ ok: true, status: 200 });
await containerCheck("http://localhost:8080/");
expect(mockFetch).toHaveBeenCalledWith("http://localhost:8080/v1/about", expect.anything());
});
it("adds http:// prefix when missing", async () => {
mockFetch.mockResolvedValue({ ok: true, status: 200 });
await containerCheck("localhost:8080");
expect(mockFetch).toHaveBeenCalledWith("http://localhost:8080/v1/about", expect.anything());
});
it("validates the receive WebSocket when an account is provided", async () => {
wsMockState.behavior = "open";
mockFetch.mockResolvedValue({ ok: true, status: 200 });
const result = await containerCheck("http://localhost:8080", 1000, "+14259798283");
expect(result).toEqual({ ok: true, status: 101, error: null });
expect(wsMockState.urls).toEqual(["ws://localhost:8080/v1/receive/%2B14259798283"]);
});
it("rejects container receive endpoints that do not upgrade to WebSocket", async () => {
wsMockState.behavior = "unexpected-response";
mockFetch.mockResolvedValue({ ok: true, status: 200 });
const result = await containerCheck("http://localhost:8080", 1000, "+14259798283");
expect(result).toEqual({
ok: false,
status: 200,
error: "Signal container receive endpoint did not upgrade to WebSocket (HTTP 200)",
});
});
});
describe("containerRestRequest", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("makes GET request with correct endpoint", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({ version: "1.0" }),
});
const result = await containerRestRequest("/v1/about", { baseUrl: "http://localhost:8080" });
expect(result).toEqual({ version: "1.0" });
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/about",
expect.objectContaining({
method: "GET",
headers: { "Content-Type": "application/json" },
}),
);
});
it("makes POST request with body", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 201,
text: async () => "",
});
await containerRestRequest("/v2/send", { baseUrl: "http://localhost:8080" }, "POST", {
message: "test",
number: "+1234567890",
recipients: ["+1234567890"],
});
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v2/send",
expect.objectContaining({
method: "POST",
body: JSON.stringify({
message: "test",
number: "+1234567890",
recipients: ["+1234567890"],
}),
}),
);
});
it("parses 201 response bodies", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 201,
text: async () => JSON.stringify({ timestamp: 1700000000000 }),
});
const result = await containerRestRequest(
"/v2/send",
{ baseUrl: "http://localhost:8080" },
"POST",
);
expect(result).toEqual({ timestamp: 1700000000000 });
});
it("returns undefined for 204 status", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 204,
});
const result = await containerRestRequest(
"/v1/typing-indicator/+1234567890",
{ baseUrl: "http://localhost:8080" },
"PUT",
);
expect(result).toBeUndefined();
});
it("throws error on non-ok response", async () => {
mockFetch.mockResolvedValue({
ok: false,
status: 500,
statusText: "Internal Server Error",
text: async () => "Server error details",
});
await expect(
containerRestRequest("/v2/send", { baseUrl: "http://localhost:8080" }, "POST"),
).rejects.toThrow("Signal REST 500: Server error details");
});
it("handles empty response body", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => "",
});
const result = await containerRestRequest("/v1/about", { baseUrl: "http://localhost:8080" });
expect(result).toBeUndefined();
});
it("respects custom timeout by using abort signal", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => "{}",
});
await containerRestRequest("/v1/about", { baseUrl: "http://localhost:8080", timeoutMs: 5000 });
// The timeout is enforced via AbortController, so we verify the call was made with a signal
expect(mockFetch).toHaveBeenCalled();
const callArgs = mockFetch.mock.calls[0];
expect(callArgs[1].signal).toBeDefined();
});
});
describe("containerSendMessage", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("sends message to recipients", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({ timestamp: "1700000000000" }),
});
const result = await containerSendMessage({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipients: ["+15550001111"],
message: "Hello world",
});
expect(result).toEqual({ timestamp: 1700000000000 });
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v2/send",
expect.objectContaining({
method: "POST",
body: JSON.stringify({
message: "Hello world",
number: "+14259798283",
recipients: ["+15550001111"],
}),
}),
);
});
it("normalizes invalid send timestamps before returning", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({ timestamp: "not-a-number" }),
});
await expect(
containerSendMessage({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipients: ["+15550001111"],
message: "Hello world",
}),
).rejects.toThrow("Signal REST send returned invalid timestamp");
});
it("uses container styled text mode when styles are provided", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({}),
});
await containerSendMessage({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipients: ["+15550001111"],
message: "Bold text",
textStyles: [{ start: 0, length: 4, style: "BOLD" }],
});
const callArgs = mockFetch.mock.calls[0];
const body = JSON.parse(callArgs[1].body);
expect(body).toMatchObject({
message: "**Bold** text",
text_mode: "styled",
});
expect(body).not.toHaveProperty("text_style");
});
it("escapes unstyled formatting markers in styled container messages", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({}),
});
await containerSendMessage({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipients: ["+15550001111"],
message: "Bold * not italic",
textStyles: [{ start: 0, length: 4, style: "BOLD" }],
});
const callArgs = mockFetch.mock.calls[0];
const body = JSON.parse(callArgs[1].body);
expect(body.message).toBe("**Bold** \\* not italic");
});
it("preserves literal backslashes in styled container messages", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({}),
});
await containerSendMessage({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipients: ["+15550001111"],
message: "Bold C:\\Temp\\file and /foo\\bar/",
textStyles: [{ start: 0, length: 4, style: "BOLD" }],
});
const callArgs = mockFetch.mock.calls[0];
const body = JSON.parse(callArgs[1].body);
expect(body.message).toBe("**Bold** C:\\Temp\\file and /foo\\bar/");
});
it("includes attachments as base64 data URIs", async () => {
const fs = await import("node:fs/promises");
const os = await import("node:os");
const path = await import("node:path");
// Create a temp file with known content
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "signal-test-"));
const tmpFile = path.join(tmpDir, "test-image.jpg");
const content = Buffer.from([0xff, 0xd8, 0xff, 0xe0]); // JPEG magic bytes
await fs.writeFile(tmpFile, content);
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({}),
});
await containerSendMessage({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipients: ["+15550001111"],
message: "Photo",
attachments: [tmpFile],
});
const callArgs = mockFetch.mock.calls[0];
const body = JSON.parse(callArgs[1].body);
expect(body.attachments).toBeUndefined();
expect(body.base64_attachments).toBeDefined();
expect(body.base64_attachments).toHaveLength(1);
expect(body.base64_attachments[0]).toMatch(
/^data:image\/jpeg;filename=test-image\.jpg;base64,/,
);
// Cleanup
await fs.rm(tmpDir, { recursive: true });
});
});
describe("containerSendTyping", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("sends typing indicator with PUT", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 204,
});
const result = await containerSendTyping({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
});
expect(result).toBe(true);
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/typing-indicator/%2B14259798283",
expect.objectContaining({
method: "PUT",
body: JSON.stringify({ recipient: "+15550001111" }),
}),
);
});
it("stops typing indicator with DELETE", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 204,
});
await containerSendTyping({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
stop: true,
});
expect(mockFetch).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({ method: "DELETE" }),
);
});
});
describe("containerRpcRequest typing", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("formats group ids for typing indicators", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 204,
});
await containerRpcRequest(
"sendTyping",
{
account: "+14259798283",
groupId: "group-123",
},
{ baseUrl: "http://localhost:8080" },
);
const callArgs = mockFetch.mock.calls[0];
const body = JSON.parse(callArgs[1].body);
expect(body.recipient).toBe("group.Z3JvdXAtMTIz");
});
});
describe("containerSendReceipt", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("sends read receipt", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 204,
});
const result = await containerSendReceipt({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
timestamp: 1700000000000,
});
expect(result).toBe(true);
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/receipts/%2B14259798283",
expect.objectContaining({
method: "POST",
body: JSON.stringify({
recipient: "+15550001111",
timestamp: 1700000000000,
receipt_type: "read",
}),
}),
);
});
it("sends viewed receipt when type specified", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 204,
});
await containerSendReceipt({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
timestamp: 1700000000000,
type: "viewed",
});
const callArgs = mockFetch.mock.calls[0];
const body = JSON.parse(callArgs[1].body);
expect(body.receipt_type).toBe("viewed");
});
});
describe("containerFetchAttachment", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("fetches attachment binary", async () => {
const binaryData = new Uint8Array([0x89, 0x50, 0x4e, 0x47]); // PNG header
mockFetch.mockResolvedValue({
ok: true,
status: 200,
arrayBuffer: async () => binaryData.buffer,
});
const result = await containerFetchAttachment("attachment-123", {
baseUrl: "http://localhost:8080",
});
expect(result).toBeInstanceOf(Buffer);
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/attachments/attachment-123",
expect.objectContaining({ method: "GET" }),
);
});
it("returns null on non-ok response", async () => {
mockFetch.mockResolvedValue({
ok: false,
status: 404,
});
const result = await containerFetchAttachment("attachment-123", {
baseUrl: "http://localhost:8080",
});
expect(result).toBeNull();
});
it("encodes attachment ID in URL", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
arrayBuffer: async () => new ArrayBuffer(0),
});
await containerFetchAttachment("path/with/slashes", {
baseUrl: "http://localhost:8080",
});
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/attachments/path%2Fwith%2Fslashes",
expect.anything(),
);
});
it("rejects attachments above the content-length cap", async () => {
const arrayBuffer = vi.fn();
mockFetch.mockResolvedValue({
ok: true,
status: 200,
headers: new Headers({ "content-length": "5" }),
arrayBuffer,
});
await expect(
containerFetchAttachment("attachment-123", {
baseUrl: "http://localhost:8080",
maxResponseBytes: 4,
}),
).rejects.toThrow("Signal REST attachment exceeded size limit");
expect(arrayBuffer).not.toHaveBeenCalled();
});
it("rejects streamed attachments that exceed the response cap", async () => {
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.enqueue(new Uint8Array([4, 5]));
controller.close();
},
});
mockFetch.mockResolvedValue({
ok: true,
status: 200,
headers: new Headers(),
body: stream,
});
await expect(
containerFetchAttachment("attachment-123", {
baseUrl: "http://localhost:8080",
maxResponseBytes: 4,
}),
).rejects.toThrow("Signal REST attachment exceeded size limit");
});
});
describe("normalizeBaseUrl edge cases", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("throws error for empty base URL", async () => {
await expect(containerCheck("")).rejects.toThrow("Signal base URL is required");
});
it("throws error for whitespace-only base URL", async () => {
await expect(containerCheck(" ")).rejects.toThrow("Signal base URL is required");
});
it("handles https URLs", async () => {
mockFetch.mockResolvedValue({ ok: true, status: 200 });
await containerCheck("https://signal.example.com");
expect(mockFetch).toHaveBeenCalledWith(
"https://signal.example.com/v1/about",
expect.anything(),
);
});
it("handles URLs with ports", async () => {
mockFetch.mockResolvedValue({ ok: true, status: 200 });
await containerCheck("http://192.168.1.100:9922");
expect(mockFetch).toHaveBeenCalledWith("http://192.168.1.100:9922/v1/about", expect.anything());
});
it("rejects base URLs with credentials", async () => {
await expect(containerCheck("http://user:pass@localhost:8080")).rejects.toThrow(
"Signal base URL must not include credentials",
);
});
});
describe("containerRestRequest edge cases", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("handles DELETE method", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 204,
});
await containerRestRequest(
"/v1/some-resource/123",
{ baseUrl: "http://localhost:8080" },
"DELETE",
);
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/some-resource/123",
expect.objectContaining({ method: "DELETE" }),
);
});
it("handles error response with empty body", async () => {
mockFetch.mockResolvedValue({
ok: false,
status: 500,
statusText: "Internal Server Error",
text: async () => "",
});
await expect(
containerRestRequest("/v2/send", { baseUrl: "http://localhost:8080" }, "POST"),
).rejects.toThrow("Signal REST 500: Internal Server Error");
});
it("handles JSON parse errors gracefully", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => "not-valid-json",
});
await expect(
containerRestRequest("/v1/about", { baseUrl: "http://localhost:8080" }),
).rejects.toThrow();
});
});
describe("streamContainerEvents", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("redacts the account from the connection log", async () => {
const log = vi.fn();
await streamContainerEvents({
baseUrl: "http://localhost:8080",
account: "+14259798283",
onEvent: vi.fn(),
logger: { log },
});
expect(log).toHaveBeenCalledWith(
"[signal-ws] connecting to ws://localhost:8080/v1/receive/<redacted>",
);
expect(log).not.toHaveBeenCalledWith(expect.stringContaining("+14259798283"));
expect(log).not.toHaveBeenCalledWith(expect.stringContaining("%2B14259798283"));
});
it("removes the abort listener when the stream closes", async () => {
const abortController = new AbortController();
const addEventListener = vi.spyOn(abortController.signal, "addEventListener");
const removeEventListener = vi.spyOn(abortController.signal, "removeEventListener");
await streamContainerEvents({
baseUrl: "http://localhost:8080",
account: "+14259798283",
abortSignal: abortController.signal,
onEvent: vi.fn(),
});
const abortHandler = addEventListener.mock.calls.find((call) => call[0] === "abort")?.[1];
expect(abortHandler).toBeTypeOf("function");
expect(removeEventListener).toHaveBeenCalledWith("abort", abortHandler);
});
});
describe("containerSendReaction", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("sends reaction to recipient", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({ timestamp: 1700000000000 }),
});
const result = await containerSendReaction({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
emoji: "👍",
targetAuthor: "+15550001111",
targetTimestamp: 1699999999999,
});
expect(result).toEqual({ timestamp: 1700000000000 });
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/reactions/%2B14259798283",
expect.objectContaining({
method: "POST",
body: JSON.stringify({
recipient: "+15550001111",
reaction: "👍",
target_author: "+15550001111",
timestamp: 1699999999999,
}),
}),
);
});
it("includes group_id when provided", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({}),
});
await containerSendReaction({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
emoji: "❤️",
targetAuthor: "+15550001111",
targetTimestamp: 1699999999999,
groupId: "group-123",
});
const callArgs = mockFetch.mock.calls[0];
const body = JSON.parse(callArgs[1].body);
expect(body.group_id).toBe("group-123");
});
});
describe("containerRpcRequest reactions", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("routes group reactions to the formatted group recipient", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({}),
});
await containerRpcRequest(
"sendReaction",
{
account: "+14259798283",
recipients: ["uuid:author-uuid"],
groupIds: ["group-123"],
emoji: "👍",
targetAuthor: "uuid:author-uuid",
targetTimestamp: 1699999999999,
},
{ baseUrl: "http://localhost:8080" },
);
const callArgs = mockFetch.mock.calls[0];
const body = JSON.parse(callArgs[1].body);
expect(body.recipient).toBe("group.Z3JvdXAtMTIz");
expect(body.group_id).toBe("group.Z3JvdXAtMTIz");
expect(body.target_author).toBe("author-uuid");
});
});
describe("containerRemoveReaction", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("removes reaction with DELETE", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({ timestamp: 1700000000000 }),
});
const result = await containerRemoveReaction({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
emoji: "👍",
targetAuthor: "+15550001111",
targetTimestamp: 1699999999999,
});
expect(result).toEqual({ timestamp: 1700000000000 });
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/reactions/%2B14259798283",
expect.objectContaining({
method: "DELETE",
body: JSON.stringify({
recipient: "+15550001111",
reaction: "👍",
target_author: "+15550001111",
timestamp: 1699999999999,
}),
}),
);
});
it("includes group_id when provided", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({}),
});
await containerRemoveReaction({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
emoji: "❤️",
targetAuthor: "+15550001111",
targetTimestamp: 1699999999999,
groupId: "group-123",
});
const callArgs = mockFetch.mock.calls[0];
const body = JSON.parse(callArgs[1].body);
expect(body.group_id).toBe("group-123");
});
});

View File

@@ -0,0 +1,745 @@
/**
* Signal client for bbernhard/signal-cli-rest-api container.
* Uses WebSocket for receiving messages and REST API for sending.
*
* This is a separate implementation from client.ts (native signal-cli)
* to keep the two modes cleanly isolated.
*/
import fs from "node:fs/promises";
import nodePath from "node:path";
import { resolveFetch } from "openclaw/plugin-sdk/fetch-runtime";
import { detectMime } from "openclaw/plugin-sdk/media-runtime";
import WebSocket from "ws";
export type ContainerRpcOptions = {
baseUrl: string;
timeoutMs?: number;
maxResponseBytes?: number;
};
export type ContainerWebSocketMessage = {
envelope?: {
syncMessage?: unknown;
dataMessage?: {
message?: string;
groupInfo?: { groupId?: string; groupName?: string };
attachments?: Array<{
id?: string;
contentType?: string;
filename?: string;
size?: number;
}>;
quote?: { text?: string };
reaction?: unknown;
};
editMessage?: { dataMessage?: unknown };
reactionMessage?: unknown;
sourceNumber?: string;
sourceUuid?: string;
sourceName?: string;
timestamp?: number;
};
exception?: { message?: string };
};
const DEFAULT_TIMEOUT_MS = 10_000;
const DEFAULT_ATTACHMENT_RESPONSE_MAX_BYTES = 1_048_576;
const CONTAINER_TEXT_STYLE_MARKERS: Record<string, string> = {
BOLD: "**",
ITALIC: "*",
STRIKETHROUGH: "~",
MONOSPACE: "`",
SPOILER: "||",
};
function normalizeBaseUrl(url: string): string {
const trimmed = url.trim();
if (!trimmed) {
throw new Error("Signal base URL is required");
}
const withProtocol = /^https?:\/\//i.test(trimmed) ? trimmed : `http://${trimmed}`;
const parsed = new URL(withProtocol);
if (parsed.protocol !== "http:" && parsed.protocol !== "https:") {
throw new Error(`Signal base URL unsupported protocol: ${parsed.protocol}`);
}
if (parsed.username || parsed.password) {
throw new Error("Signal base URL must not include credentials");
}
const pathname = parsed.pathname === "/" ? "" : parsed.pathname.replace(/\/+$/, "");
return `${parsed.protocol}//${parsed.host}${pathname}`;
}
async function fetchWithTimeout(url: string, init: RequestInit, timeoutMs: number) {
const fetchImpl = resolveFetch();
if (!fetchImpl) {
throw new Error("fetch is not available");
}
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
try {
return await fetchImpl(url, { ...init, signal: controller.signal });
} finally {
clearTimeout(timer);
}
}
function normalizeMaxResponseBytes(value: number | undefined): number {
if (typeof value !== "number" || !Number.isFinite(value) || value <= 0) {
return DEFAULT_ATTACHMENT_RESPONSE_MAX_BYTES;
}
return Math.floor(value);
}
function readContentLength(res: Response): number | undefined {
const raw = res.headers?.get("content-length");
if (!raw) {
return undefined;
}
const parsed = Number(raw);
return Number.isFinite(parsed) && parsed >= 0 ? parsed : undefined;
}
async function readCappedResponseBuffer(res: Response, maxResponseBytes: number): Promise<Buffer> {
const contentLength = readContentLength(res);
if (contentLength !== undefined && contentLength > maxResponseBytes) {
throw new Error("Signal REST attachment exceeded size limit");
}
const reader = res.body?.getReader();
if (!reader) {
const arrayBuffer = await res.arrayBuffer();
if (arrayBuffer.byteLength > maxResponseBytes) {
throw new Error("Signal REST attachment exceeded size limit");
}
return Buffer.from(arrayBuffer);
}
const chunks: Buffer[] = [];
let totalBytes = 0;
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
const chunk = Buffer.from(value ?? new Uint8Array());
totalBytes += chunk.byteLength;
if (totalBytes > maxResponseBytes) {
await reader.cancel().catch(() => {});
throw new Error("Signal REST attachment exceeded size limit");
}
chunks.push(chunk);
}
} finally {
reader.releaseLock();
}
return Buffer.concat(chunks);
}
/**
* Check if bbernhard container REST API is available.
*/
export async function containerCheck(
baseUrl: string,
timeoutMs = DEFAULT_TIMEOUT_MS,
account?: string,
): Promise<{ ok: boolean; status?: number | null; error?: string | null }> {
const normalized = normalizeBaseUrl(baseUrl);
try {
const res = await fetchWithTimeout(`${normalized}/v1/about`, { method: "GET" }, timeoutMs);
if (!res.ok) {
return { ok: false, status: res.status, error: `HTTP ${res.status}` };
}
const receiveAccount = account?.trim();
if (receiveAccount) {
return await containerReceiveCheck(normalized, receiveAccount, timeoutMs);
}
return { ok: true, status: res.status, error: null };
} catch (err) {
return {
ok: false,
status: null,
error: err instanceof Error ? err.message : String(err),
};
}
}
function containerReceiveCheck(
normalizedBaseUrl: string,
account: string,
timeoutMs: number,
): Promise<{ ok: boolean; status?: number | null; error?: string | null }> {
const wsUrl = `${normalizedBaseUrl.replace(/^http/, "ws")}/v1/receive/${encodeURIComponent(account)}`;
return new Promise((resolve) => {
let settled = false;
let ws: WebSocket | undefined;
const timer = setTimeout(() => {
settle({ ok: false, status: null, error: "Signal container receive WebSocket timed out" });
ws?.terminate();
}, timeoutMs);
timer.unref?.();
const settle = (result: { ok: boolean; status?: number | null; error?: string | null }) => {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
resolve(result);
};
try {
ws = new WebSocket(wsUrl);
} catch (err) {
settle({
ok: false,
status: null,
error: err instanceof Error ? err.message : String(err),
});
return;
}
ws.once("open", () => {
settle({ ok: true, status: 101, error: null });
ws?.close();
});
ws.once("unexpected-response", (_request, response) => {
settle({
ok: false,
status: response.statusCode ?? null,
error: `Signal container receive endpoint did not upgrade to WebSocket (HTTP ${
response.statusCode ?? "unknown"
})`,
});
ws?.terminate();
});
ws.once("error", (err) => {
settle({
ok: false,
status: null,
error: err instanceof Error ? err.message : String(err),
});
});
});
}
/**
* Make a REST API request to bbernhard container.
*/
export async function containerRestRequest<T = unknown>(
endpoint: string,
opts: ContainerRpcOptions,
method: "GET" | "POST" | "PUT" | "DELETE" = "GET",
body?: unknown,
): Promise<T> {
const baseUrl = normalizeBaseUrl(opts.baseUrl);
const url = `${baseUrl}${endpoint}`;
const init: RequestInit = {
method,
headers: { "Content-Type": "application/json" },
};
if (body) {
init.body = JSON.stringify(body);
}
const res = await fetchWithTimeout(url, init, opts.timeoutMs ?? DEFAULT_TIMEOUT_MS);
if (res.status === 204) {
return undefined as T;
}
if (!res.ok) {
const errorText = await res.text().catch(() => "");
throw new Error(`Signal REST ${res.status}: ${errorText || res.statusText}`);
}
const text = await res.text();
if (!text) {
return undefined as T;
}
return JSON.parse(text) as T;
}
/**
* Fetch attachment binary from bbernhard container.
*/
export async function containerFetchAttachment(
attachmentId: string,
opts: ContainerRpcOptions,
): Promise<Buffer | null> {
const baseUrl = normalizeBaseUrl(opts.baseUrl);
const url = `${baseUrl}/v1/attachments/${encodeURIComponent(attachmentId)}`;
const res = await fetchWithTimeout(url, { method: "GET" }, opts.timeoutMs ?? DEFAULT_TIMEOUT_MS);
if (!res.ok) {
return null;
}
return readCappedResponseBuffer(res, normalizeMaxResponseBytes(opts.maxResponseBytes));
}
/**
* Stream messages using WebSocket from bbernhard container.
* The Promise resolves when the connection closes (for any reason).
* The caller (runSignalLoopAdapter) is responsible for reconnection.
*/
export async function streamContainerEvents(params: {
baseUrl: string;
account?: string;
abortSignal?: AbortSignal;
timeoutMs?: number;
onEvent: (event: ContainerWebSocketMessage) => void;
logger?: { log?: (msg: string) => void; error?: (msg: string) => void };
}): Promise<void> {
const normalized = normalizeBaseUrl(params.baseUrl);
const wsUrl = `${normalized.replace(/^http/, "ws")}/v1/receive/${encodeURIComponent(params.account ?? "")}`;
const redactedWsUrl = `${normalized.replace(/^http/, "ws")}/v1/receive/<redacted>`;
const log = params.logger?.log ?? (() => {});
const logError = params.logger?.error ?? (() => {});
log(`[signal-ws] connecting to ${redactedWsUrl}`);
return new Promise((resolve, reject) => {
let ws: WebSocket;
let resolved = false;
let abortHandler: (() => void) | undefined;
const cleanup = () => {
if (resolved) {
return;
}
resolved = true;
if (abortHandler) {
params.abortSignal?.removeEventListener("abort", abortHandler);
abortHandler = undefined;
}
};
try {
ws = new WebSocket(wsUrl);
} catch (err) {
logError(
`[signal-ws] failed to create WebSocket: ${err instanceof Error ? err.message : String(err)}`,
);
reject(err);
return;
}
ws.on("open", () => {
log("[signal-ws] connected");
});
ws.on("message", (data: Buffer) => {
try {
const text = data.toString();
const envelope = JSON.parse(text) as ContainerWebSocketMessage;
if (envelope) {
params.onEvent(envelope);
}
} catch (err) {
logError(`[signal-ws] parse error: ${err instanceof Error ? err.message : String(err)}`);
}
});
ws.on("error", (err) => {
logError(`[signal-ws] error: ${err instanceof Error ? err.message : String(err)}`);
// Don't resolve here - the close event will fire next
});
ws.on("close", (code, reason) => {
const reasonStr = reason?.toString() || "no reason";
log(`[signal-ws] closed (code=${code}, reason=${reasonStr})`);
cleanup();
resolve(); // Let the outer loop handle reconnection
});
ws.on("ping", () => {
log("[signal-ws] ping received");
});
ws.on("pong", () => {
log("[signal-ws] pong received");
});
if (params.abortSignal) {
abortHandler = () => {
log("[signal-ws] aborted, closing connection");
cleanup();
ws.close();
resolve();
};
params.abortSignal.addEventListener("abort", abortHandler, { once: true });
}
});
}
/**
* Convert local file paths to base64 data URIs for the container REST API.
* The bbernhard container /v2/send only accepts `base64_attachments` (not file paths).
*/
async function filesToBase64DataUris(filePaths: string[]): Promise<string[]> {
const results: string[] = [];
for (const filePath of filePaths) {
const buffer = await fs.readFile(filePath);
const mime = (await detectMime({ buffer, filePath })) ?? "application/octet-stream";
const filename = nodePath.basename(filePath);
const b64 = buffer.toString("base64");
results.push(`data:${mime};filename=${filename};base64,${b64}`);
}
return results;
}
function escapeContainerStyledText(text: string): string {
return text.replace(/[*~`|]/g, (char) => `\\${char}`);
}
function renderContainerStyledText(
text: string,
styles: Array<{ start: number; length: number; style: string }>,
): string {
const spans = styles
.map((style) => {
const marker = CONTAINER_TEXT_STYLE_MARKERS[style.style];
if (!marker) {
return null;
}
const start = Math.max(0, Math.min(style.start, text.length));
const end = Math.max(start, Math.min(style.start + style.length, text.length));
if (end <= start) {
return null;
}
return { start, end, marker };
})
.filter((span): span is { start: number; end: number; marker: string } => span !== null);
if (spans.length === 0) {
return text;
}
const positions = [
...new Set([0, text.length, ...spans.flatMap((span) => [span.start, span.end])]),
].toSorted((a, b) => a - b);
let rendered = "";
for (let i = 0; i < positions.length; i += 1) {
const pos = positions[i];
for (const span of spans
.filter((candidate) => candidate.end === pos)
.toSorted((a, b) => b.start - a.start)) {
rendered += span.marker;
}
for (const span of spans
.filter((candidate) => candidate.start === pos)
.toSorted((a, b) => b.end - a.end)) {
rendered += span.marker;
}
const next = positions[i + 1];
if (next !== undefined && next > pos) {
rendered += escapeContainerStyledText(text.slice(pos, next));
}
}
return rendered;
}
function parseContainerSendTimestamp(raw: unknown): number | undefined {
if (raw == null) {
return undefined;
}
const timestamp =
typeof raw === "number" ? raw : typeof raw === "string" ? Number(raw) : Number.NaN;
if (!Number.isFinite(timestamp)) {
throw new Error("Signal REST send returned invalid timestamp");
}
return timestamp;
}
/**
* Send message via bbernhard container REST API.
*/
export async function containerSendMessage(params: {
baseUrl: string;
account: string;
recipients: string[];
message: string;
textStyles?: Array<{ start: number; length: number; style: string }>;
attachments?: string[];
timeoutMs?: number;
}): Promise<{ timestamp?: number }> {
const payload: Record<string, unknown> = {
message: params.message,
number: params.account,
recipients: params.recipients,
};
if (params.textStyles && params.textStyles.length > 0) {
payload.message = renderContainerStyledText(params.message, params.textStyles);
payload["text_mode"] = "styled";
}
if (params.attachments && params.attachments.length > 0) {
// Container API only accepts base64-encoded attachments, not file paths.
payload.base64_attachments = await filesToBase64DataUris(params.attachments);
}
const result = await containerRestRequest<{ timestamp?: unknown }>(
"/v2/send",
{ baseUrl: params.baseUrl, timeoutMs: params.timeoutMs },
"POST",
payload,
);
const timestamp = parseContainerSendTimestamp(result?.timestamp);
return timestamp === undefined ? {} : { timestamp };
}
/**
* Send typing indicator via bbernhard container REST API.
*/
export async function containerSendTyping(params: {
baseUrl: string;
account: string;
recipient: string;
stop?: boolean;
timeoutMs?: number;
}): Promise<boolean> {
const method = params.stop ? "DELETE" : "PUT";
await containerRestRequest(
`/v1/typing-indicator/${encodeURIComponent(params.account)}`,
{ baseUrl: params.baseUrl, timeoutMs: params.timeoutMs },
method,
{ recipient: params.recipient },
);
return true;
}
/**
* Send read receipt via bbernhard container REST API.
*/
export async function containerSendReceipt(params: {
baseUrl: string;
account: string;
recipient: string;
timestamp: number;
type?: "read" | "viewed";
timeoutMs?: number;
}): Promise<boolean> {
await containerRestRequest(
`/v1/receipts/${encodeURIComponent(params.account)}`,
{ baseUrl: params.baseUrl, timeoutMs: params.timeoutMs },
"POST",
{
recipient: params.recipient,
timestamp: params.timestamp,
receipt_type: params.type ?? "read",
},
);
return true;
}
/**
* Send a reaction to a message via bbernhard container REST API.
*/
export async function containerSendReaction(params: {
baseUrl: string;
account: string;
recipient: string;
emoji: string;
targetAuthor: string;
targetTimestamp: number;
groupId?: string;
timeoutMs?: number;
}): Promise<{ timestamp?: number }> {
const payload: Record<string, unknown> = {
recipient: params.recipient,
reaction: params.emoji,
target_author: params.targetAuthor,
timestamp: params.targetTimestamp,
};
if (params.groupId) {
payload.group_id = params.groupId;
}
const result = await containerRestRequest<{ timestamp?: number }>(
`/v1/reactions/${encodeURIComponent(params.account)}`,
{ baseUrl: params.baseUrl, timeoutMs: params.timeoutMs },
"POST",
payload,
);
return result ?? {};
}
/**
* Remove a reaction from a message via bbernhard container REST API.
*/
export async function containerRemoveReaction(params: {
baseUrl: string;
account: string;
recipient: string;
emoji: string;
targetAuthor: string;
targetTimestamp: number;
groupId?: string;
timeoutMs?: number;
}): Promise<{ timestamp?: number }> {
const payload: Record<string, unknown> = {
recipient: params.recipient,
reaction: params.emoji,
target_author: params.targetAuthor,
timestamp: params.targetTimestamp,
};
if (params.groupId) {
payload.group_id = params.groupId;
}
const result = await containerRestRequest<{ timestamp?: number }>(
`/v1/reactions/${encodeURIComponent(params.account)}`,
{ baseUrl: params.baseUrl, timeoutMs: params.timeoutMs },
"DELETE",
payload,
);
return result ?? {};
}
/**
* Strip the "uuid:" prefix that native signal-cli accepts but the container API rejects.
*/
function stripUuidPrefix(id: string): string {
return id.startsWith("uuid:") ? id.slice(5) : id;
}
/**
* Convert a group internal_id to the container-expected format.
* The bbernhard container expects groups as "group.{base64(internal_id)}".
*/
function formatGroupIdForContainer(groupId: string): string {
if (groupId.startsWith("group.")) {
return groupId;
}
return `group.${Buffer.from(groupId).toString("base64")}`;
}
/**
* Drop-in replacement for native signalRpcRequest that translates
* JSON-RPC method + params into the equivalent container REST API calls.
* This keeps all container protocol details (uuid: stripping, group ID
* formatting, base64 attachments, text-style conversion) isolated here.
*/
export async function containerRpcRequest<T = unknown>(
method: string,
params: Record<string, unknown> | undefined,
opts: ContainerRpcOptions,
): Promise<T> {
const p = params ?? {};
switch (method) {
case "send": {
const recipients = ((p.recipient as string[] | undefined) ?? []).map(stripUuidPrefix);
const usernames = ((p.username as string[] | undefined) ?? []).map(stripUuidPrefix);
const groupId = p.groupId as string | undefined;
const formattedGroupId = groupId ? formatGroupIdForContainer(groupId) : undefined;
const finalRecipients =
recipients.length > 0
? recipients
: usernames.length > 0
? usernames
: formattedGroupId
? [formattedGroupId]
: [];
const textStylesRaw = p["text-style"] as string[] | undefined;
const textStyles = textStylesRaw?.map((s) => {
const [start, length, style] = s.split(":");
return { start: Number(start), length: Number(length), style };
});
const result = await containerSendMessage({
baseUrl: opts.baseUrl,
account: (p.account as string) ?? "",
recipients: finalRecipients,
message: (p.message as string) ?? "",
textStyles,
attachments: p.attachments as string[] | undefined,
timeoutMs: opts.timeoutMs,
});
return result as T;
}
case "sendTyping": {
const recipient = stripUuidPrefix(
(p.recipient as string[] | undefined)?.[0] ??
((p.groupId as string | undefined) ? formatGroupIdForContainer(p.groupId as string) : ""),
);
await containerSendTyping({
baseUrl: opts.baseUrl,
account: (p.account as string) ?? "",
recipient,
stop: p.stop as boolean | undefined,
timeoutMs: opts.timeoutMs,
});
return undefined as T;
}
case "sendReceipt": {
const recipient = stripUuidPrefix((p.recipient as string[] | undefined)?.[0] ?? "");
await containerSendReceipt({
baseUrl: opts.baseUrl,
account: (p.account as string) ?? "",
recipient,
timestamp: p.targetTimestamp as number,
type: p.type as "read" | "viewed" | undefined,
timeoutMs: opts.timeoutMs,
});
return undefined as T;
}
case "sendReaction": {
const recipient = stripUuidPrefix((p.recipients as string[] | undefined)?.[0] ?? "");
const groupId = (p.groupIds as string[] | undefined)?.[0] ?? undefined;
const formattedGroupId = groupId ? formatGroupIdForContainer(groupId) : undefined;
// Container API uses `recipient` for both DMs and groups.
// For groups, pass the formatted group ID as recipient.
const effectiveRecipient = formattedGroupId || recipient || "";
const reactionParams = {
baseUrl: opts.baseUrl,
account: (p.account as string) ?? "",
recipient: effectiveRecipient,
emoji: (p.emoji as string) ?? "",
targetAuthor: stripUuidPrefix((p.targetAuthor as string) ?? recipient),
targetTimestamp: p.targetTimestamp as number,
groupId: formattedGroupId,
timeoutMs: opts.timeoutMs,
};
const fn = p.remove ? containerRemoveReaction : containerSendReaction;
return (await fn(reactionParams)) as T;
}
case "getAttachment": {
const attachmentId = p.id as string;
const buffer = await containerFetchAttachment(attachmentId, {
baseUrl: opts.baseUrl,
timeoutMs: opts.timeoutMs,
maxResponseBytes: opts.maxResponseBytes,
});
// Convert to native format: { data: base64String }
if (!buffer) {
return { data: undefined } as T;
}
return { data: buffer.toString("base64") } as T;
}
case "version": {
const result = await containerRestRequest<{ versions?: string[]; build?: number }>(
"/v1/about",
{ baseUrl: opts.baseUrl, timeoutMs: opts.timeoutMs },
);
return result as T;
}
default:
throw new Error(`Unsupported container RPC method: ${method}`);
}
}

View File

@@ -71,6 +71,24 @@ describe("signal groups schema", () => {
}); });
}); });
it("accepts channel apiMode", () => {
for (const apiMode of ["auto", "native", "container"]) {
expectValidSignalConfig({ apiMode });
}
});
it("rejects per-account apiMode", () => {
const issues = expectInvalidSignalConfig({
accounts: {
primary: {
apiMode: "container",
},
},
});
expect(issues.some((issue) => issue.path.join(".") === "accounts.primary")).toBe(true);
});
it("accepts top-level group overrides", () => { it("accepts top-level group overrides", () => {
expectValidSignalConfig({ expectValidSignalConfig({
groups: { groups: {

View File

@@ -6,7 +6,7 @@ import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import { createPluginSetupWizardStatus } from "openclaw/plugin-sdk/plugin-test-runtime"; import { createPluginSetupWizardStatus } from "openclaw/plugin-sdk/plugin-test-runtime";
import { describe, expect, it, vi } from "vitest"; import { describe, expect, it, vi } from "vitest";
import { signalPlugin } from "./channel.js"; import { signalPlugin } from "./channel.js";
import * as clientModule from "./client.js"; import * as clientModule from "./client-adapter.js";
import { classifySignalCliLogLine } from "./daemon.js"; import { classifySignalCliLogLine } from "./daemon.js";
import { import {
looksLikeUuid, looksLikeUuid,

View File

@@ -188,8 +188,14 @@ vi.mock("./client.js", () => ({
signalRpcRequest: (...args: unknown[]) => signalRpcRequestMock(...args), signalRpcRequest: (...args: unknown[]) => signalRpcRequestMock(...args),
})); }));
vi.mock("./daemon.js", async () => { vi.mock("./client-adapter.js", () => ({
const actual = await vi.importActual<typeof import("./daemon.js")>("./daemon.js"); streamSignalEvents: (...args: unknown[]) => streamMock(...args),
signalCheck: (...args: unknown[]) => signalCheckMock(...args),
signalRpcRequest: (...args: unknown[]) => signalRpcRequestMock(...args),
}));
vi.mock("./daemon.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./daemon.js")>();
return { return {
...actual, ...actual,
spawnSignalDaemon: (...args: unknown[]) => spawnSignalDaemonMock(...args), spawnSignalDaemon: (...args: unknown[]) => spawnSignalDaemonMock(...args),

View File

@@ -34,7 +34,7 @@ import {
} from "openclaw/plugin-sdk/text-runtime"; } from "openclaw/plugin-sdk/text-runtime";
import { waitForTransportReady } from "openclaw/plugin-sdk/transport-ready-runtime"; import { waitForTransportReady } from "openclaw/plugin-sdk/transport-ready-runtime";
import { resolveSignalAccount } from "./accounts.js"; import { resolveSignalAccount } from "./accounts.js";
import { signalCheck, signalRpcRequest } from "./client.js"; import { signalRpcRequest, signalCheck } from "./client-adapter.js";
import { formatSignalDaemonExit, spawnSignalDaemon, type SignalDaemonHandle } from "./daemon.js"; import { formatSignalDaemonExit, spawnSignalDaemon, type SignalDaemonHandle } from "./daemon.js";
import { isSignalSenderAllowed, type resolveSignalSender } from "./identity.js"; import { isSignalSenderAllowed, type resolveSignalSender } from "./identity.js";
import { createSignalEventHandler } from "./monitor/event-handler.js"; import { createSignalEventHandler } from "./monitor/event-handler.js";
@@ -272,6 +272,7 @@ function deriveSignalAttachmentRpcMaxResponseBytes(maxBytes: number): number | u
async function fetchAttachment(params: { async function fetchAttachment(params: {
baseUrl: string; baseUrl: string;
account?: string; account?: string;
apiMode?: "native" | "container" | "auto";
attachment: SignalAttachment; attachment: SignalAttachment;
sender?: string; sender?: string;
groupId?: string; groupId?: string;
@@ -303,6 +304,7 @@ async function fetchAttachment(params: {
const result = await signalRpcRequest<{ data?: string }>("getAttachment", rpcParams, { const result = await signalRpcRequest<{ data?: string }>("getAttachment", rpcParams, {
baseUrl: params.baseUrl, baseUrl: params.baseUrl,
maxResponseBytes: deriveSignalAttachmentRpcMaxResponseBytes(params.maxBytes), maxResponseBytes: deriveSignalAttachmentRpcMaxResponseBytes(params.maxBytes),
apiMode: params.apiMode,
}); });
if (!result?.data) { if (!result?.data) {
return null; return null;
@@ -422,6 +424,7 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
const waitForTransportReadyFn = opts.waitForTransportReady ?? waitForTransportReady; const waitForTransportReadyFn = opts.waitForTransportReady ?? waitForTransportReady;
const autoStart = opts.autoStart ?? accountInfo.config.autoStart ?? !accountInfo.config.httpUrl; const autoStart = opts.autoStart ?? accountInfo.config.autoStart ?? !accountInfo.config.httpUrl;
const configuredApiMode = cfg.channels?.signal?.apiMode ?? "auto";
const startupTimeoutMs = Math.min( const startupTimeoutMs = Math.min(
120_000, 120_000,
Math.max(1_000, opts.startupTimeoutMs ?? accountInfo.config.startupTimeoutMs ?? 30_000), Math.max(1_000, opts.startupTimeoutMs ?? accountInfo.config.startupTimeoutMs ?? 30_000),
@@ -430,6 +433,12 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
const daemonLifecycle = createSignalDaemonLifecycle({ abortSignal: opts.abortSignal }); const daemonLifecycle = createSignalDaemonLifecycle({ abortSignal: opts.abortSignal });
let daemonHandle: SignalDaemonHandle | null = null; let daemonHandle: SignalDaemonHandle | null = null;
if (autoStart && configuredApiMode === "container") {
throw new Error(
"channels.signal.autoStart=true is incompatible with channels.signal.apiMode=container",
);
}
if (autoStart) { if (autoStart) {
const cliPath = opts.cliPath ?? accountInfo.config.cliPath ?? "signal-cli"; const cliPath = opts.cliPath ?? accountInfo.config.cliPath ?? "signal-cli";
const httpHost = opts.httpHost ?? accountInfo.config.httpHost ?? "127.0.0.1"; const httpHost = opts.httpHost ?? accountInfo.config.httpHost ?? "127.0.0.1";
@@ -491,7 +500,7 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
ignoreAttachments, ignoreAttachments,
sendReadReceipts, sendReadReceipts,
readReceiptsViaDaemon, readReceiptsViaDaemon,
fetchAttachment, fetchAttachment: (params) => fetchAttachment({ ...params, apiMode: configuredApiMode }),
deliverReplies: (params) => deliverReplies({ ...params, cfg, chunkMode }), deliverReplies: (params) => deliverReplies({ ...params, cfg, chunkMode }),
resolveSignalReactionTargets, resolveSignalReactionTargets,
isSignalReactionMessage, isSignalReactionMessage,
@@ -506,6 +515,7 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
runtime, runtime,
// signal-cli can keep the SSE event endpoint idle until the next inbound event. // signal-cli can keep the SSE event endpoint idle until the next inbound event.
timeoutMs: 0, timeoutMs: 0,
apiMode: configuredApiMode,
policy: opts.reconnectPolicy, policy: opts.reconnectPolicy,
onEvent: (event) => { onEvent: (event) => {
void handleEvent(event).catch((err) => { void handleEvent(event).catch((err) => {

View File

@@ -1,6 +1,6 @@
import type { BaseProbeResult } from "openclaw/plugin-sdk/channel-contract"; import type { BaseProbeResult } from "openclaw/plugin-sdk/channel-contract";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { signalCheck, signalRpcRequest } from "./client.js"; import { type SignalApiMode, signalCheck, signalRpcRequest } from "./client-adapter.js";
export type SignalProbe = BaseProbeResult & { export type SignalProbe = BaseProbeResult & {
status?: number | null; status?: number | null;
@@ -21,7 +21,11 @@ function parseSignalVersion(value: unknown): string | null {
return null; return null;
} }
export async function probeSignal(baseUrl: string, timeoutMs: number): Promise<SignalProbe> { export async function probeSignal(
baseUrl: string,
timeoutMs: number,
options: { apiMode?: SignalApiMode } = {},
): Promise<SignalProbe> {
const started = Date.now(); const started = Date.now();
const result: SignalProbe = { const result: SignalProbe = {
ok: false, ok: false,
@@ -30,7 +34,8 @@ export async function probeSignal(baseUrl: string, timeoutMs: number): Promise<S
elapsedMs: 0, elapsedMs: 0,
version: null, version: null,
}; };
const check = await signalCheck(baseUrl, timeoutMs); const apiMode = options.apiMode ?? "native";
const check = await signalCheck(baseUrl, timeoutMs, { apiMode });
if (!check.ok) { if (!check.ok) {
return { return {
...result, ...result,
@@ -43,6 +48,7 @@ export async function probeSignal(baseUrl: string, timeoutMs: number): Promise<S
const version = await signalRpcRequest("version", undefined, { const version = await signalRpcRequest("version", undefined, {
baseUrl, baseUrl,
timeoutMs, timeoutMs,
apiMode,
}); });
result.version = parseSignalVersion(version); result.version = parseSignalVersion(version);
} catch (err) { } catch (err) {

View File

@@ -22,7 +22,7 @@ vi.mock("./accounts.js", () => ({
}), }),
})); }));
vi.mock("./client.js", () => ({ vi.mock("./client-adapter.js", () => ({
signalRpcRequest: (...args: unknown[]) => rpcMock(...args), signalRpcRequest: (...args: unknown[]) => rpcMock(...args),
})); }));

View File

@@ -6,7 +6,7 @@ import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import { requireRuntimeConfig } from "openclaw/plugin-sdk/plugin-config-runtime"; import { requireRuntimeConfig } from "openclaw/plugin-sdk/plugin-config-runtime";
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime"; import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime";
import { resolveSignalAccount } from "./accounts.js"; import { resolveSignalAccount } from "./accounts.js";
import { signalRpcRequest } from "./client.js"; import { signalRpcRequest } from "./client-adapter.js";
import { resolveSignalRpcContext } from "./rpc-context.js"; import { resolveSignalRpcContext } from "./rpc-context.js";
export type SignalReactionOpts = { export type SignalReactionOpts = {
@@ -79,6 +79,7 @@ async function sendReactionSignalCore(params: {
errors: SignalReactionErrorMessages; errors: SignalReactionErrorMessages;
}): Promise<SignalReactionResult> { }): Promise<SignalReactionResult> {
const cfg = requireRuntimeConfig(params.opts.cfg, "Signal reactions"); const cfg = requireRuntimeConfig(params.opts.cfg, "Signal reactions");
const apiMode = cfg.channels?.signal?.apiMode;
const accountInfo = resolveSignalAccount({ const accountInfo = resolveSignalAccount({
cfg, cfg,
accountId: params.opts.accountId, accountId: params.opts.accountId,
@@ -126,6 +127,7 @@ async function sendReactionSignalCore(params: {
const result = await signalRpcRequest<{ timestamp?: number }>("sendReaction", requestParams, { const result = await signalRpcRequest<{ timestamp?: number }>("sendReaction", requestParams, {
baseUrl, baseUrl,
timeoutMs: params.opts.timeoutMs, timeoutMs: params.opts.timeoutMs,
apiMode,
}); });
return { return {

View File

@@ -5,7 +5,7 @@ const resolveOutboundAttachmentFromUrlMock = vi.hoisted(() =>
vi.fn(async (_params: unknown) => ({ path: "/tmp/image.png", contentType: "image/png" })), vi.fn(async (_params: unknown) => ({ path: "/tmp/image.png", contentType: "image/png" })),
); );
vi.mock("./client.js", () => ({ vi.mock("./client-adapter.js", () => ({
signalRpcRequest: (...args: unknown[]) => signalRpcRequestMock(...args), signalRpcRequest: (...args: unknown[]) => signalRpcRequestMock(...args),
})); }));

View File

@@ -11,7 +11,7 @@ import { resolveOutboundAttachmentFromUrl } from "openclaw/plugin-sdk/media-runt
import { requireRuntimeConfig } from "openclaw/plugin-sdk/plugin-config-runtime"; import { requireRuntimeConfig } from "openclaw/plugin-sdk/plugin-config-runtime";
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime"; import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime";
import { resolveSignalAccount } from "./accounts.js"; import { resolveSignalAccount } from "./accounts.js";
import { signalRpcRequest } from "./client.js"; import { signalRpcRequest } from "./client-adapter.js";
import { markdownToSignalText, type SignalTextStyleRange } from "./format.js"; import { markdownToSignalText, type SignalTextStyleRange } from "./format.js";
import { resolveSignalRpcContext } from "./rpc-context.js"; import { resolveSignalRpcContext } from "./rpc-context.js";
@@ -172,6 +172,7 @@ export async function sendMessageSignal(
opts: SignalSendOpts, opts: SignalSendOpts,
): Promise<SignalSendResult> { ): Promise<SignalSendResult> {
const cfg = requireRuntimeConfig(opts.cfg, "Signal send"); const cfg = requireRuntimeConfig(opts.cfg, "Signal send");
const apiMode = cfg.channels?.signal?.apiMode;
const accountInfo = resolveSignalAccount({ const accountInfo = resolveSignalAccount({
cfg, cfg,
accountId: opts.accountId, accountId: opts.accountId,
@@ -256,6 +257,7 @@ export async function sendMessageSignal(
const result = await signalRpcRequest<{ timestamp?: number }>("send", params, { const result = await signalRpcRequest<{ timestamp?: number }>("send", params, {
baseUrl, baseUrl,
timeoutMs: opts.timeoutMs, timeoutMs: opts.timeoutMs,
apiMode,
}); });
const timestamp = result?.timestamp; const timestamp = result?.timestamp;
const messageId = timestamp ? String(timestamp) : "unknown"; const messageId = timestamp ? String(timestamp) : "unknown";
@@ -276,6 +278,7 @@ export async function sendTypingSignal(
opts: SignalRpcOpts & { stop?: boolean }, opts: SignalRpcOpts & { stop?: boolean },
): Promise<boolean> { ): Promise<boolean> {
const accountInfo = await resolveSignalRpcAccountInfo(opts); const accountInfo = await resolveSignalRpcAccountInfo(opts);
const cfg = requireRuntimeConfig(opts.cfg, "Signal typing");
const { baseUrl, account } = resolveSignalRpcContext(opts, accountInfo); const { baseUrl, account } = resolveSignalRpcContext(opts, accountInfo);
const targetParams = buildTargetParams(parseTarget(to), { const targetParams = buildTargetParams(parseTarget(to), {
recipient: true, recipient: true,
@@ -294,6 +297,7 @@ export async function sendTypingSignal(
await signalRpcRequest("sendTyping", params, { await signalRpcRequest("sendTyping", params, {
baseUrl, baseUrl,
timeoutMs: opts.timeoutMs, timeoutMs: opts.timeoutMs,
apiMode: cfg.channels?.signal?.apiMode,
}); });
return true; return true;
} }
@@ -307,6 +311,7 @@ export async function sendReadReceiptSignal(
return false; return false;
} }
const accountInfo = await resolveSignalRpcAccountInfo(opts); const accountInfo = await resolveSignalRpcAccountInfo(opts);
const cfg = requireRuntimeConfig(opts.cfg, "Signal read receipt");
const { baseUrl, account } = resolveSignalRpcContext(opts, accountInfo); const { baseUrl, account } = resolveSignalRpcContext(opts, accountInfo);
const targetParams = buildTargetParams(parseTarget(to), { const targetParams = buildTargetParams(parseTarget(to), {
recipient: true, recipient: true,
@@ -325,6 +330,7 @@ export async function sendReadReceiptSignal(
await signalRpcRequest("sendReceipt", params, { await signalRpcRequest("sendReceipt", params, {
baseUrl, baseUrl,
timeoutMs: opts.timeoutMs, timeoutMs: opts.timeoutMs,
apiMode: cfg.channels?.signal?.apiMode,
}); });
return true; return true;
} }

View File

@@ -6,7 +6,7 @@ import {
type BackoffPolicy, type BackoffPolicy,
type RuntimeEnv, type RuntimeEnv,
} from "openclaw/plugin-sdk/runtime-env"; } from "openclaw/plugin-sdk/runtime-env";
import { type SignalSseEvent, streamSignalEvents } from "./client.js"; import { type SignalApiMode, type SignalSseEvent, streamSignalEvents } from "./client-adapter.js";
const DEFAULT_RECONNECT_POLICY: BackoffPolicy = { const DEFAULT_RECONNECT_POLICY: BackoffPolicy = {
initialMs: 1_000, initialMs: 1_000,
@@ -22,6 +22,7 @@ type RunSignalSseLoopParams = {
runtime: RuntimeEnv; runtime: RuntimeEnv;
onEvent: (event: SignalSseEvent) => void; onEvent: (event: SignalSseEvent) => void;
timeoutMs?: number; timeoutMs?: number;
apiMode?: SignalApiMode;
policy?: Partial<BackoffPolicy>; policy?: Partial<BackoffPolicy>;
}; };
@@ -32,6 +33,7 @@ export async function runSignalSseLoop({
runtime, runtime,
onEvent, onEvent,
timeoutMs, timeoutMs,
apiMode,
policy, policy,
}: RunSignalSseLoopParams) { }: RunSignalSseLoopParams) {
const reconnectPolicy = { const reconnectPolicy = {
@@ -57,26 +59,31 @@ export async function runSignalSseLoop({
account, account,
abortSignal, abortSignal,
timeoutMs, timeoutMs,
onEvent: (event) => { apiMode,
onEvent: (event: SignalSseEvent) => {
reconnectAttempts = 0; reconnectAttempts = 0;
onEvent(event); onEvent(event);
}, },
logger: {
log: runtime.log,
error: runtime.error,
},
}); });
if (abortSignal?.aborted) { if (abortSignal?.aborted) {
return; return;
} }
reconnectAttempts += 1; reconnectAttempts += 1;
const delayMs = computeBackoff(reconnectPolicy, reconnectAttempts); const delayMs = computeBackoff(reconnectPolicy, reconnectAttempts);
logReconnectVerbose(`Signal SSE stream ended, reconnecting in ${delayMs / 1000}s...`); logReconnectVerbose(`Signal stream ended, reconnecting in ${delayMs / 1000}s...`);
await sleepWithAbort(delayMs, abortSignal); await sleepWithAbort(delayMs, abortSignal);
} catch (err) { } catch (err) {
if (abortSignal?.aborted) { if (abortSignal?.aborted) {
return; return;
} }
runtime.error?.(`Signal SSE stream error: ${String(err)}`); runtime.error?.(`Signal stream error: ${String(err)}`);
reconnectAttempts += 1; reconnectAttempts += 1;
const delayMs = computeBackoff(reconnectPolicy, reconnectAttempts); const delayMs = computeBackoff(reconnectPolicy, reconnectAttempts);
runtime.log?.(`Signal SSE connection lost, reconnecting in ${delayMs / 1000}s...`); runtime.log?.(`Signal connection lost, reconnecting in ${delayMs / 1000}s...`);
try { try {
await sleepWithAbort(delayMs, abortSignal); await sleepWithAbort(delayMs, abortSignal);
} catch (sleepErr) { } catch (sleepErr) {

4
pnpm-lock.yaml generated
View File

@@ -1335,6 +1335,10 @@ importers:
version: link:../../packages/plugin-sdk version: link:../../packages/plugin-sdk
extensions/signal: extensions/signal:
dependencies:
ws:
specifier: ^8.20.0
version: 8.20.0
devDependencies: devDependencies:
'@openclaw/plugin-sdk': '@openclaw/plugin-sdk':
specifier: workspace:* specifier: workspace:*

View File

@@ -3,6 +3,7 @@ import type { GroupToolPolicyBySenderConfig, GroupToolPolicyConfig } from "./typ
export type SignalReactionNotificationMode = "off" | "own" | "all" | "allowlist"; export type SignalReactionNotificationMode = "off" | "own" | "all" | "allowlist";
export type SignalReactionLevel = "off" | "ack" | "minimal" | "extensive"; export type SignalReactionLevel = "off" | "ack" | "minimal" | "extensive";
export type SignalApiMode = "auto" | "native" | "container";
export type SignalGroupConfig = { export type SignalGroupConfig = {
requireMention?: boolean; requireMention?: boolean;
@@ -57,6 +58,14 @@ export type SignalAccountConfig = CommonChannelMessagingConfig & {
}; };
export type SignalConfig = { export type SignalConfig = {
/**
* Signal API mode (channel-global):
* - "auto" (default): Auto-detect based on available endpoints
* - "native": Use native signal-cli with JSON-RPC + SSE (/api/v1/rpc, /api/v1/events)
* - "container": Use bbernhard/signal-cli-rest-api with REST + WebSocket (/v2/send, /v1/receive/{account}).
* Requires the container to run with MODE=json-rpc for real-time message receiving.
*/
apiMode?: SignalApiMode;
/** Optional per-account Signal configuration (multi-account). */ /** Optional per-account Signal configuration (multi-account). */
accounts?: Record<string, SignalAccountConfig>; accounts?: Record<string, SignalAccountConfig>;
/** Optional default account id when multiple accounts are configured. */ /** Optional default account id when multiple accounts are configured. */

View File

@@ -1203,6 +1203,7 @@ export const SignalAccountSchemaBase = z
export const SignalAccountSchema = SignalAccountSchemaBase; export const SignalAccountSchema = SignalAccountSchemaBase;
export const SignalConfigSchema = SignalAccountSchemaBase.extend({ export const SignalConfigSchema = SignalAccountSchemaBase.extend({
apiMode: z.enum(["auto", "native", "container"]).optional(),
accounts: z.record(z.string(), SignalAccountSchema.optional()).optional(), accounts: z.record(z.string(), SignalAccountSchema.optional()).optional(),
defaultAccount: z.string().optional(), defaultAccount: z.string().optional(),
}).superRefine((value, ctx) => { }).superRefine((value, ctx) => {