fix(telegram): use partial stream deltas

This commit is contained in:
Ayaan Zaidi
2026-05-10 08:02:48 +05:30
parent 512a26c29b
commit 1367ec7461
6 changed files with 106 additions and 67 deletions

View File

@@ -991,14 +991,21 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(editMessageTelegram).not.toHaveBeenCalled();
});
it("coalesces delta-shaped partial fragments while preserving the first-preview debounce", async () => {
it("applies partial deltas while preserving the first-preview debounce", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Streaming " });
await replyOptions?.onPartialReply?.({ text: "previews " });
await replyOptions?.onPartialReply?.({
text: "are useful because they show progress.",
text: "Streaming ",
delta: "Streaming ",
});
await replyOptions?.onPartialReply?.({
text: "Streaming previews ",
delta: "previews ",
});
await replyOptions?.onPartialReply?.({
text: "Streaming previews are useful because they show progress.",
delta: "are useful because they show progress.",
});
await dispatcherOptions.deliver(
{ text: "Streaming previews are useful because they show progress." },
@@ -1032,13 +1039,43 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(deliverReplies).not.toHaveBeenCalled();
});
it("replaces non-prefix partial snapshots instead of appending them", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({
text: "Working...",
delta: "Working...",
});
await replyOptions?.onPartialReply?.({
text: "Done.",
delta: "",
replace: true,
});
await dispatcherOptions.deliver({ text: "Done." }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createContext(),
streamMode: "partial",
telegramCfg: { streaming: { mode: "partial" } },
});
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Working...");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Done.");
expect(answerDraftStream.update).toHaveBeenLastCalledWith("Done.");
expect(deliverReplies).not.toHaveBeenCalled();
});
it("does not coalesce answer partial fragments with tool progress drafts", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
await replyOptions?.onPartialReply?.({ text: "Done " });
await replyOptions?.onPartialReply?.({ text: "answer" });
await replyOptions?.onPartialReply?.({ text: "Done ", delta: "Done " });
await replyOptions?.onPartialReply?.({ text: "Done answer", delta: "answer" });
await dispatcherOptions.deliver({ text: "Done answer." }, { kind: "final" });
return { queuedFinal: true };
},

View File

@@ -110,49 +110,22 @@ const silentReplyDispatchLogger = createSubsystemLogger("telegram/silent-reply-d
/** Minimum chars before sending first streaming message (improves push notification UX) */
const DRAFT_MIN_INITIAL_CHARS = 30;
function appendWithOverlap(previous: string, fragment: string): string {
const maxOverlap = Math.min(previous.length, fragment.length);
for (let overlap = maxOverlap; overlap > 0; overlap -= 1) {
if (previous.endsWith(fragment.slice(0, overlap))) {
return `${previous}${fragment.slice(overlap)}`;
}
}
return `${previous}${fragment}`;
}
type DraftPartialTextUpdate = {
text: string;
delta?: string;
replace?: true;
};
function looksLikeDraftDeltaFragment(previous: string, text: string): boolean {
if (!previous || !text) {
return false;
}
if (text.startsWith(previous) || previous.startsWith(text)) {
return false;
}
if (/^\s/.test(text)) {
return true;
}
if (previous.length < DRAFT_MIN_INITIAL_CHARS) {
return true;
}
if (/\s$/.test(previous) && text.length <= DRAFT_MIN_INITIAL_CHARS) {
return true;
}
return text.length <= Math.max(16, Math.floor(previous.length / 2));
}
function resolveDraftPartialText(previous: string, text: string): string | undefined {
if (!previous) {
return text;
}
if (text === previous) {
function resolveDraftPartialText(
previous: string,
update: DraftPartialTextUpdate,
): string | undefined {
const nextText =
update.replace || update.delta === undefined ? update.text : `${previous}${update.delta}`;
if (nextText === previous) {
return undefined;
}
if (text.startsWith(previous)) {
return text;
}
if (previous.startsWith(text) && text.length < previous.length) {
return undefined;
}
return looksLikeDraftDeltaFragment(previous, text) ? appendWithOverlap(previous, text) : text;
return nextText;
}
async function resolveStickerVisionSupport(cfg: OpenClawConfig, agentId: string) {
@@ -714,23 +687,36 @@ export const dispatchTelegramMessage = async ({
});
return draftLaneEventQueue;
};
type SplitLaneSegment = { lane: LaneName; text: string };
type SplitLaneSegment = { lane: LaneName; update: DraftPartialTextUpdate };
type SplitLaneSegmentsResult = {
segments: SplitLaneSegment[];
suppressedReasoningOnly: boolean;
};
const splitTextIntoLaneSegments = (
text?: string,
update: { text?: string; delta?: string; replace?: true },
isReasoning?: boolean,
): SplitLaneSegmentsResult => {
const split = splitTelegramReasoningText(text, isReasoning);
const split = splitTelegramReasoningText(update.text, isReasoning);
const splitSegments: Array<{ lane: LaneName; text: string }> = [];
const useDelta = !update.replace && update.delta !== undefined;
const segments: SplitLaneSegment[] = [];
const suppressReasoning = resolvedReasoningLevel === "off";
if (split.reasoningText && !suppressReasoning) {
segments.push({ lane: "reasoning", text: split.reasoningText });
splitSegments.push({ lane: "reasoning", text: split.reasoningText });
}
if (split.answerText) {
segments.push({ lane: "answer", text: split.answerText });
splitSegments.push({ lane: "answer", text: split.answerText });
}
for (const segment of splitSegments) {
const canApplyDelta = useDelta && splitSegments.length === 1;
segments.push({
lane: segment.lane,
update: {
text: segment.text,
...(canApplyDelta ? { delta: update.delta } : {}),
...(update.replace ? { replace: true } : {}),
},
});
}
return {
segments,
@@ -761,13 +747,13 @@ export const dispatchTelegramMessage = async ({
}
await rotateLaneForNewMessage(answerLane);
};
const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => {
const updateDraftFromPartial = (lane: DraftLaneState, update: DraftPartialTextUpdate) => {
const laneStream = lane.stream;
if (!laneStream || !text) {
if (!laneStream || !update.text) {
return;
}
const previousText = lane === answerLane ? lastAnswerPartialText : lane.lastPartialText;
const nextText = resolveDraftPartialText(previousText, text);
const nextText = resolveDraftPartialText(previousText, update);
if (!nextText) {
return;
}
@@ -786,8 +772,11 @@ export const dispatchTelegramMessage = async ({
lane.lastPartialText = nextText;
laneStream.update(nextText);
};
const ingestDraftLaneSegments = async (text: string | undefined, isReasoning?: boolean) => {
const split = splitTextIntoLaneSegments(text, isReasoning);
const ingestDraftLaneSegments = async (
update: { text?: string; delta?: string; replace?: true },
isReasoning?: boolean,
) => {
const split = splitTextIntoLaneSegments(update, isReasoning);
for (const segment of split.segments) {
if (segment.lane === "answer") {
await prepareAnswerLaneForText();
@@ -796,7 +785,7 @@ export const dispatchTelegramMessage = async ({
reasoningStepState.noteReasoningHint();
reasoningStepState.noteReasoningDelivered();
}
updateDraftFromPartial(lanes[segment.lane], segment.text);
updateDraftFromPartial(lanes[segment.lane], segment.update);
}
};
const flushDraftLane = async (lane: DraftLaneState) => {
@@ -1207,7 +1196,10 @@ export const dispatchTelegramMessage = async ({
| { buttons?: TelegramInlineButtons }
| undefined
)?.buttons;
const split = splitTextIntoLaneSegments(payload.text, payload.isReasoning);
const split = splitTextIntoLaneSegments(
{ text: payload.text },
payload.isReasoning,
);
const segments = split.segments;
const reply = resolveSendableOutboundReplyParts(payload);
const _hasMedia = reply.hasMedia;
@@ -1241,7 +1233,7 @@ export const dispatchTelegramMessage = async ({
) {
reasoningStepState.bufferFinalAnswer({
payload,
text: segment.text,
text: segment.update.text,
bufferedGeneration: replyFenceGeneration,
});
continue;
@@ -1253,10 +1245,10 @@ export const dispatchTelegramMessage = async ({
streamMode === "progress" &&
segment.lane === "answer" &&
info.kind === "final"
? await deliverProgressModeFinalAnswer(payload, segment.text)
? await deliverProgressModeFinalAnswer(payload, segment.update.text)
: await deliverLaneText({
laneName: segment.lane,
text: segment.text,
text: segment.update.text,
payload,
infoKind: info.kind,
buttons: telegramButtons,
@@ -1351,7 +1343,7 @@ export const dispatchTelegramMessage = async ({
answerLane.stream || reasoningLane.stream
? (payload) =>
enqueueDraftLaneEvent(async () => {
await ingestDraftLaneSegments(payload.text);
await ingestDraftLaneSegments(payload);
})
: undefined,
onReasoningStream: reasoningLane.stream
@@ -1362,7 +1354,7 @@ export const dispatchTelegramMessage = async ({
resetDraftLaneState(reasoningLane);
splitReasoningOnNextStream = false;
}
await ingestDraftLaneSegments(payload.text, true);
await ingestDraftLaneSegments(payload, true);
})
: undefined,
onAssistantMessageStart: answerLane.stream

View File

@@ -1,6 +1,9 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { ImageContent } from "@mariozechner/pi-ai";
import type { SourceReplyDeliveryMode } from "../../../auto-reply/get-reply-options.types.js";
import type {
PartialReplyPayload,
SourceReplyDeliveryMode,
} from "../../../auto-reply/get-reply-options.types.js";
import type { ReplyPayload } from "../../../auto-reply/reply-payload.js";
import type { ReplyOperation } from "../../../auto-reply/reply/reply-run-registry.js";
import type { ReasoningLevel, ThinkLevel, VerboseLevel } from "../../../auto-reply/thinking.js";
@@ -151,7 +154,7 @@ export type RunEmbeddedPiAgentParams = {
replyOperation?: ReplyOperation;
shouldEmitToolResult?: () => boolean;
shouldEmitToolOutput?: () => boolean;
onPartialReply?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise<void>;
onPartialReply?: (payload: PartialReplyPayload) => void | Promise<void>;
onAssistantMessageStart?: () => void | Promise<void>;
onBlockReply?: (payload: BlockReplyPayload) => void | Promise<void>;
onBlockReplyFlush?: () => void | Promise<void>;

View File

@@ -1,4 +1,5 @@
import type { AgentSession } from "@mariozechner/pi-coding-agent";
import type { PartialReplyPayload } from "../auto-reply/get-reply-options.types.js";
import type { ReplyPayload } from "../auto-reply/reply-payload.js";
import type { ReasoningLevel, ThinkLevel, VerboseLevel } from "../auto-reply/thinking.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
@@ -38,7 +39,7 @@ export type SubscribeEmbeddedPiSessionParams = {
onBlockReplyFlush?: () => void | Promise<void>;
blockReplyBreak?: "text_end" | "message_end";
blockReplyChunking?: BlockReplyChunking;
onPartialReply?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise<void>;
onPartialReply?: (payload: PartialReplyPayload) => void | Promise<void>;
onAssistantMessageStart?: () => void | Promise<void>;
onAgentEvent?: (evt: {
stream: string;

View File

@@ -31,6 +31,11 @@ export type ReplyThreadingPolicy = {
export type SourceReplyDeliveryMode = "automatic" | "message_tool_only";
export type PartialReplyPayload = Pick<ReplyPayload, "text" | "mediaUrls"> & {
delta?: string;
replace?: true;
};
export type GetReplyOptions = {
/** Override run id for agent events (defaults to random UUID). */
runId?: string;
@@ -72,7 +77,7 @@ export type GetReplyOptions = {
* channel to surface progress via its own streaming/edit UX.
*/
suppressDefaultToolProgressMessages?: boolean;
onPartialReply?: (payload: ReplyPayload) => Promise<void> | void;
onPartialReply?: (payload: PartialReplyPayload) => Promise<void> | void;
onReasoningStream?: (payload: ReplyPayload) => Promise<void> | void;
/** Called when a thinking/reasoning block ends. */
onReasoningEnd?: () => Promise<void> | void;

View File

@@ -1,6 +1,7 @@
export type {
BlockReplyContext,
GetReplyOptions,
PartialReplyPayload,
ReplyThreadingPolicy,
TypingPolicy,
} from "./get-reply-options.types.js";