From 1367ec74616ae9f2c836ba874ba79b578799bd5a Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Sun, 10 May 2026 08:02:48 +0530 Subject: [PATCH] fix(telegram): use partial stream deltas --- .../telegram/src/bot-message-dispatch.test.ts | 49 +++++++- .../telegram/src/bot-message-dispatch.ts | 106 ++++++++---------- src/agents/pi-embedded-runner/run/params.ts | 7 +- src/agents/pi-embedded-subscribe.types.ts | 3 +- src/auto-reply/get-reply-options.types.ts | 7 +- src/auto-reply/types.ts | 1 + 6 files changed, 106 insertions(+), 67 deletions(-) diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index d4cd4f1ef50..2fd2f01b7cd 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -991,14 +991,21 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(editMessageTelegram).not.toHaveBeenCalled(); }); - it("coalesces delta-shaped partial fragments while preserving the first-preview debounce", async () => { + it("applies partial deltas while preserving the first-preview debounce", async () => { const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 }); dispatchReplyWithBufferedBlockDispatcher.mockImplementation( async ({ dispatcherOptions, replyOptions }) => { - await replyOptions?.onPartialReply?.({ text: "Streaming " }); - await replyOptions?.onPartialReply?.({ text: "previews " }); await replyOptions?.onPartialReply?.({ - text: "are useful because they show progress.", + text: "Streaming ", + delta: "Streaming ", + }); + await replyOptions?.onPartialReply?.({ + text: "Streaming previews ", + delta: "previews ", + }); + await replyOptions?.onPartialReply?.({ + text: "Streaming previews are useful because they show progress.", + delta: "are useful because they show progress.", }); await dispatcherOptions.deliver( { text: "Streaming previews are useful because they show progress." }, @@ -1032,13 +1039,43 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(deliverReplies).not.toHaveBeenCalled(); }); + it("replaces non-prefix partial snapshots instead of appending them", async () => { + const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ + text: "Working...", + delta: "Working...", + }); + await replyOptions?.onPartialReply?.({ + text: "Done.", + delta: "", + replace: true, + }); + await dispatcherOptions.deliver({ text: "Done." }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + + await dispatchWithContext({ + context: createContext(), + streamMode: "partial", + telegramCfg: { streaming: { mode: "partial" } }, + }); + + expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Working..."); + expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Done."); + expect(answerDraftStream.update).toHaveBeenLastCalledWith("Done."); + expect(deliverReplies).not.toHaveBeenCalled(); + }); + it("does not coalesce answer partial fragments with tool progress drafts", async () => { const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 }); dispatchReplyWithBufferedBlockDispatcher.mockImplementation( async ({ dispatcherOptions, replyOptions }) => { await replyOptions?.onToolStart?.({ name: "exec", phase: "start" }); - await replyOptions?.onPartialReply?.({ text: "Done " }); - await replyOptions?.onPartialReply?.({ text: "answer" }); + await replyOptions?.onPartialReply?.({ text: "Done ", delta: "Done " }); + await replyOptions?.onPartialReply?.({ text: "Done answer", delta: "answer" }); await dispatcherOptions.deliver({ text: "Done answer." }, { kind: "final" }); return { queuedFinal: true }; }, diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index 0f5609c8432..f7f07f7b349 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -110,49 +110,22 @@ const silentReplyDispatchLogger = createSubsystemLogger("telegram/silent-reply-d /** Minimum chars before sending first streaming message (improves push notification UX) */ const DRAFT_MIN_INITIAL_CHARS = 30; -function appendWithOverlap(previous: string, fragment: string): string { - const maxOverlap = Math.min(previous.length, fragment.length); - for (let overlap = maxOverlap; overlap > 0; overlap -= 1) { - if (previous.endsWith(fragment.slice(0, overlap))) { - return `${previous}${fragment.slice(overlap)}`; - } - } - return `${previous}${fragment}`; -} +type DraftPartialTextUpdate = { + text: string; + delta?: string; + replace?: true; +}; -function looksLikeDraftDeltaFragment(previous: string, text: string): boolean { - if (!previous || !text) { - return false; - } - if (text.startsWith(previous) || previous.startsWith(text)) { - return false; - } - if (/^\s/.test(text)) { - return true; - } - if (previous.length < DRAFT_MIN_INITIAL_CHARS) { - return true; - } - if (/\s$/.test(previous) && text.length <= DRAFT_MIN_INITIAL_CHARS) { - return true; - } - return text.length <= Math.max(16, Math.floor(previous.length / 2)); -} - -function resolveDraftPartialText(previous: string, text: string): string | undefined { - if (!previous) { - return text; - } - if (text === previous) { +function resolveDraftPartialText( + previous: string, + update: DraftPartialTextUpdate, +): string | undefined { + const nextText = + update.replace || update.delta === undefined ? update.text : `${previous}${update.delta}`; + if (nextText === previous) { return undefined; } - if (text.startsWith(previous)) { - return text; - } - if (previous.startsWith(text) && text.length < previous.length) { - return undefined; - } - return looksLikeDraftDeltaFragment(previous, text) ? appendWithOverlap(previous, text) : text; + return nextText; } async function resolveStickerVisionSupport(cfg: OpenClawConfig, agentId: string) { @@ -714,23 +687,36 @@ export const dispatchTelegramMessage = async ({ }); return draftLaneEventQueue; }; - type SplitLaneSegment = { lane: LaneName; text: string }; + type SplitLaneSegment = { lane: LaneName; update: DraftPartialTextUpdate }; type SplitLaneSegmentsResult = { segments: SplitLaneSegment[]; suppressedReasoningOnly: boolean; }; const splitTextIntoLaneSegments = ( - text?: string, + update: { text?: string; delta?: string; replace?: true }, isReasoning?: boolean, ): SplitLaneSegmentsResult => { - const split = splitTelegramReasoningText(text, isReasoning); + const split = splitTelegramReasoningText(update.text, isReasoning); + const splitSegments: Array<{ lane: LaneName; text: string }> = []; + const useDelta = !update.replace && update.delta !== undefined; const segments: SplitLaneSegment[] = []; const suppressReasoning = resolvedReasoningLevel === "off"; if (split.reasoningText && !suppressReasoning) { - segments.push({ lane: "reasoning", text: split.reasoningText }); + splitSegments.push({ lane: "reasoning", text: split.reasoningText }); } if (split.answerText) { - segments.push({ lane: "answer", text: split.answerText }); + splitSegments.push({ lane: "answer", text: split.answerText }); + } + for (const segment of splitSegments) { + const canApplyDelta = useDelta && splitSegments.length === 1; + segments.push({ + lane: segment.lane, + update: { + text: segment.text, + ...(canApplyDelta ? { delta: update.delta } : {}), + ...(update.replace ? { replace: true } : {}), + }, + }); } return { segments, @@ -761,13 +747,13 @@ export const dispatchTelegramMessage = async ({ } await rotateLaneForNewMessage(answerLane); }; - const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => { + const updateDraftFromPartial = (lane: DraftLaneState, update: DraftPartialTextUpdate) => { const laneStream = lane.stream; - if (!laneStream || !text) { + if (!laneStream || !update.text) { return; } const previousText = lane === answerLane ? lastAnswerPartialText : lane.lastPartialText; - const nextText = resolveDraftPartialText(previousText, text); + const nextText = resolveDraftPartialText(previousText, update); if (!nextText) { return; } @@ -786,8 +772,11 @@ export const dispatchTelegramMessage = async ({ lane.lastPartialText = nextText; laneStream.update(nextText); }; - const ingestDraftLaneSegments = async (text: string | undefined, isReasoning?: boolean) => { - const split = splitTextIntoLaneSegments(text, isReasoning); + const ingestDraftLaneSegments = async ( + update: { text?: string; delta?: string; replace?: true }, + isReasoning?: boolean, + ) => { + const split = splitTextIntoLaneSegments(update, isReasoning); for (const segment of split.segments) { if (segment.lane === "answer") { await prepareAnswerLaneForText(); @@ -796,7 +785,7 @@ export const dispatchTelegramMessage = async ({ reasoningStepState.noteReasoningHint(); reasoningStepState.noteReasoningDelivered(); } - updateDraftFromPartial(lanes[segment.lane], segment.text); + updateDraftFromPartial(lanes[segment.lane], segment.update); } }; const flushDraftLane = async (lane: DraftLaneState) => { @@ -1207,7 +1196,10 @@ export const dispatchTelegramMessage = async ({ | { buttons?: TelegramInlineButtons } | undefined )?.buttons; - const split = splitTextIntoLaneSegments(payload.text, payload.isReasoning); + const split = splitTextIntoLaneSegments( + { text: payload.text }, + payload.isReasoning, + ); const segments = split.segments; const reply = resolveSendableOutboundReplyParts(payload); const _hasMedia = reply.hasMedia; @@ -1241,7 +1233,7 @@ export const dispatchTelegramMessage = async ({ ) { reasoningStepState.bufferFinalAnswer({ payload, - text: segment.text, + text: segment.update.text, bufferedGeneration: replyFenceGeneration, }); continue; @@ -1253,10 +1245,10 @@ export const dispatchTelegramMessage = async ({ streamMode === "progress" && segment.lane === "answer" && info.kind === "final" - ? await deliverProgressModeFinalAnswer(payload, segment.text) + ? await deliverProgressModeFinalAnswer(payload, segment.update.text) : await deliverLaneText({ laneName: segment.lane, - text: segment.text, + text: segment.update.text, payload, infoKind: info.kind, buttons: telegramButtons, @@ -1351,7 +1343,7 @@ export const dispatchTelegramMessage = async ({ answerLane.stream || reasoningLane.stream ? (payload) => enqueueDraftLaneEvent(async () => { - await ingestDraftLaneSegments(payload.text); + await ingestDraftLaneSegments(payload); }) : undefined, onReasoningStream: reasoningLane.stream @@ -1362,7 +1354,7 @@ export const dispatchTelegramMessage = async ({ resetDraftLaneState(reasoningLane); splitReasoningOnNextStream = false; } - await ingestDraftLaneSegments(payload.text, true); + await ingestDraftLaneSegments(payload, true); }) : undefined, onAssistantMessageStart: answerLane.stream diff --git a/src/agents/pi-embedded-runner/run/params.ts b/src/agents/pi-embedded-runner/run/params.ts index dd1517f35ed..ed7a9dc5e30 100644 --- a/src/agents/pi-embedded-runner/run/params.ts +++ b/src/agents/pi-embedded-runner/run/params.ts @@ -1,6 +1,9 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { ImageContent } from "@mariozechner/pi-ai"; -import type { SourceReplyDeliveryMode } from "../../../auto-reply/get-reply-options.types.js"; +import type { + PartialReplyPayload, + SourceReplyDeliveryMode, +} from "../../../auto-reply/get-reply-options.types.js"; import type { ReplyPayload } from "../../../auto-reply/reply-payload.js"; import type { ReplyOperation } from "../../../auto-reply/reply/reply-run-registry.js"; import type { ReasoningLevel, ThinkLevel, VerboseLevel } from "../../../auto-reply/thinking.js"; @@ -151,7 +154,7 @@ export type RunEmbeddedPiAgentParams = { replyOperation?: ReplyOperation; shouldEmitToolResult?: () => boolean; shouldEmitToolOutput?: () => boolean; - onPartialReply?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise; + onPartialReply?: (payload: PartialReplyPayload) => void | Promise; onAssistantMessageStart?: () => void | Promise; onBlockReply?: (payload: BlockReplyPayload) => void | Promise; onBlockReplyFlush?: () => void | Promise; diff --git a/src/agents/pi-embedded-subscribe.types.ts b/src/agents/pi-embedded-subscribe.types.ts index e6899461878..3abb563fda7 100644 --- a/src/agents/pi-embedded-subscribe.types.ts +++ b/src/agents/pi-embedded-subscribe.types.ts @@ -1,4 +1,5 @@ import type { AgentSession } from "@mariozechner/pi-coding-agent"; +import type { PartialReplyPayload } from "../auto-reply/get-reply-options.types.js"; import type { ReplyPayload } from "../auto-reply/reply-payload.js"; import type { ReasoningLevel, ThinkLevel, VerboseLevel } from "../auto-reply/thinking.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; @@ -38,7 +39,7 @@ export type SubscribeEmbeddedPiSessionParams = { onBlockReplyFlush?: () => void | Promise; blockReplyBreak?: "text_end" | "message_end"; blockReplyChunking?: BlockReplyChunking; - onPartialReply?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise; + onPartialReply?: (payload: PartialReplyPayload) => void | Promise; onAssistantMessageStart?: () => void | Promise; onAgentEvent?: (evt: { stream: string; diff --git a/src/auto-reply/get-reply-options.types.ts b/src/auto-reply/get-reply-options.types.ts index 625d70f6504..8471e6b2a96 100644 --- a/src/auto-reply/get-reply-options.types.ts +++ b/src/auto-reply/get-reply-options.types.ts @@ -31,6 +31,11 @@ export type ReplyThreadingPolicy = { export type SourceReplyDeliveryMode = "automatic" | "message_tool_only"; +export type PartialReplyPayload = Pick & { + delta?: string; + replace?: true; +}; + export type GetReplyOptions = { /** Override run id for agent events (defaults to random UUID). */ runId?: string; @@ -72,7 +77,7 @@ export type GetReplyOptions = { * channel to surface progress via its own streaming/edit UX. */ suppressDefaultToolProgressMessages?: boolean; - onPartialReply?: (payload: ReplyPayload) => Promise | void; + onPartialReply?: (payload: PartialReplyPayload) => Promise | void; onReasoningStream?: (payload: ReplyPayload) => Promise | void; /** Called when a thinking/reasoning block ends. */ onReasoningEnd?: () => Promise | void; diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index 711e7b5d50f..00505f153a1 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -1,6 +1,7 @@ export type { BlockReplyContext, GetReplyOptions, + PartialReplyPayload, ReplyThreadingPolicy, TypingPolicy, } from "./get-reply-options.types.js";