From 4b28312bd8ec25b358c52cfc02ff88fb0f6f09e5 Mon Sep 17 00:00:00 2001 From: Galin Iliev Date: Tue, 12 May 2026 09:49:51 -0700 Subject: [PATCH] fix(azure):Drain split provider stream frames (#80927) Merged via squash. Prepared head SHA: 03a7e1fec3886924d5685004c9cc8d91b2e0dabd Co-authored-by: galiniliev <5711535+galiniliev@users.noreply.github.com> Co-authored-by: galiniliev <5711535+galiniliev@users.noreply.github.com> Reviewed-by: @galiniliev --- CHANGELOG.md | 1 + extensions/memory-core/src/tools.shared.ts | 14 +-- extensions/memory-core/src/tools.test.ts | 19 +++++ src/agents/openai-transport-stream.test.ts | 62 ++++++++++++++ src/agents/openai-transport-stream.ts | 70 ++++++++++++++- src/agents/provider-transport-fetch.test.ts | 95 +++++++++++++++++++++ src/agents/provider-transport-fetch.ts | 58 ++++++++----- 7 files changed, 283 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fb10aebfa2d..0a5d7ad7a99 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ Docs: https://docs.openclaw.ai - Control UI/sessions: nest subagent sessions under their parent session in the session picker dropdown using a visual `└─ ` prefix, making the parent-child relationship clear. Fixes #77628. (#78623) Thanks @chinar-amrutkar. - Auto-reply: surface a visible error when the configured model backend fails and fallback produces no visible reply, while preserving intentional silent turns and side-effect-only deliveries. (#80917) Thanks @dutifulbob. - Agents/exec: skip redundant heartbeat wake-ups for subagent session exec completions, preventing spurious LLM invocations on parent sessions. Fixes #66748. (#66749) Thanks @ggzeng. +- Provider streams: keep OpenAI-compatible SSE and JSON fallback streams draining across split chunks and fail Azure Responses streams with a bounded first-event diagnostic instead of stalling. Refs #80926. (#80927) Thanks @galiniliev and @CaptainTimon. ### Changes diff --git a/extensions/memory-core/src/tools.shared.ts b/extensions/memory-core/src/tools.shared.ts index e70bd44fd6b..1c6b58ec159 100644 --- a/extensions/memory-core/src/tools.shared.ts +++ b/extensions/memory-core/src/tools.shared.ts @@ -1,3 +1,4 @@ +import { stringEnum } from "openclaw/plugin-sdk/channel-actions"; import { listMemoryCorpusSupplements, resolveMemorySearchConfig, @@ -31,23 +32,14 @@ export const MemorySearchSchema = Type.Object({ query: Type.String(), maxResults: Type.Optional(Type.Number()), minScore: Type.Optional(Type.Number()), - corpus: Type.Optional( - Type.Union([ - Type.Literal("memory"), - Type.Literal("wiki"), - Type.Literal("all"), - Type.Literal("sessions"), - ]), - ), + corpus: Type.Optional(stringEnum(["memory", "wiki", "all", "sessions"])), }); export const MemoryGetSchema = Type.Object({ path: Type.String(), from: Type.Optional(Type.Number()), lines: Type.Optional(Type.Number()), - corpus: Type.Optional( - Type.Union([Type.Literal("memory"), Type.Literal("wiki"), Type.Literal("all")]), - ), + corpus: Type.Optional(stringEnum(["memory", "wiki", "all"])), }); function resolveMemoryToolContext(options: MemoryToolOptions) { diff --git a/extensions/memory-core/src/tools.test.ts b/extensions/memory-core/src/tools.test.ts index 397db74964a..c43bde9a1af 100644 --- a/extensions/memory-core/src/tools.test.ts +++ b/extensions/memory-core/src/tools.test.ts @@ -7,6 +7,7 @@ import { setMemorySearchImpl, } from "./memory-tool-manager-mock.js"; import { createMemorySearchTool } from "./tools.js"; +import { MemoryGetSchema, MemorySearchSchema } from "./tools.shared.js"; import { asOpenClawConfig, createMemorySearchToolOrThrow, @@ -33,6 +34,24 @@ vi.mock("openclaw/plugin-sdk/session-transcript-hit", async (importOriginal) => }; }); +describe("memory tool schemas", () => { + it("uses flat corpus enums for provider tool compatibility", () => { + const searchCorpus = MemorySearchSchema.properties.corpus as { + anyOf?: unknown; + enum?: unknown; + }; + const getCorpus = MemoryGetSchema.properties.corpus as { + anyOf?: unknown; + enum?: unknown; + }; + + expect(searchCorpus.anyOf).toBeUndefined(); + expect(searchCorpus.enum).toEqual(["memory", "wiki", "all", "sessions"]); + expect(getCorpus.anyOf).toBeUndefined(); + expect(getCorpus.enum).toEqual(["memory", "wiki", "all"]); + }); +}); + describe("memory_search unavailable payloads", () => { beforeEach(() => { resetMemoryToolMockState({ searchImpl: async () => [] }); diff --git a/src/agents/openai-transport-stream.test.ts b/src/agents/openai-transport-stream.test.ts index b05a6c5bfd7..3b9fc53c79e 100644 --- a/src/agents/openai-transport-stream.test.ts +++ b/src/agents/openai-transport-stream.test.ts @@ -22,6 +22,7 @@ import { import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js"; type OpenAICompletionsOutput = Parameters[1]; +type OpenAIResponsesOutput = Parameters[1]; type CapturedStreamEvent = { type?: string; delta?: string }; @@ -60,6 +61,54 @@ function createAssistantOutput(model: Model<"openai-completions">): OpenAIComple }; } +function createResponsesAssistantOutput( + model: Model<"azure-openai-responses">, +): OpenAIResponsesOutput { + return { + role: "assistant" as const, + content: [], + api: model.api, + provider: model.provider, + model: model.id, + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop", + timestamp: Date.now(), + }; +} + +function createAzureResponsesModel(): Model<"azure-openai-responses"> { + return { + id: "gpt-5.4-pro", + name: "GPT-5.4 Pro", + api: "azure-openai-responses", + provider: "azure-openai-responses-devdiv", + baseUrl: "https://example.openai.azure.com/openai/responses", + reasoning: true, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 200_000, + maxTokens: 8192, + }; +} + +function neverYieldsStream(): AsyncIterable { + return { + [Symbol.asyncIterator]() { + return { + next: async () => await new Promise>(() => undefined), + return: async () => ({ done: true, value: undefined }), + }; + }, + }; +} + async function* streamChunks(chunks: readonly unknown[]): AsyncGenerator { for (const chunk of chunks) { yield chunk as never; @@ -78,6 +127,19 @@ function expectRecordFields(record: unknown, expected: Record) } describe("openai transport stream", () => { + it("fails Azure Responses streams when headers arrive but no first event follows", async () => { + const model = createAzureResponsesModel(); + await expect( + __testing.processResponsesStream( + neverYieldsStream(), + createResponsesAssistantOutput(model), + { push: vi.fn() }, + model, + { firstEventTimeoutMs: 1 }, + ), + ).rejects.toThrow(/did not deliver a first event within 1ms after HTTP streaming headers/); + }); + it("summarizes model payload tools with full names when requested", () => { const previous = process.env.OPENCLAW_DEBUG_MODEL_PAYLOAD; process.env.OPENCLAW_DEBUG_MODEL_PAYLOAD = "tools"; diff --git a/src/agents/openai-transport-stream.ts b/src/agents/openai-transport-stream.ts index f9cbefa5527..393c3a81aa3 100644 --- a/src/agents/openai-transport-stream.ts +++ b/src/agents/openai-transport-stream.ts @@ -70,6 +70,7 @@ import { mergeTransportMetadata, sanitizeTransportPayloadText } from "./transpor const DEFAULT_AZURE_OPENAI_API_VERSION = "2024-12-01-preview"; const OPENAI_CODEX_RESPONSES_EMPTY_INPUT_TEXT = " "; const GEMINI_THOUGHT_SIGNATURE_VALIDATOR_SKIP = "skip_thought_signature_validator"; +const AZURE_RESPONSES_FIRST_EVENT_TIMEOUT_MS = 30_000; const log = createSubsystemLogger("openai-transport"); type ReplayableResponseOutputMessage = Omit & { id?: string }; @@ -649,6 +650,61 @@ function resolveOpenAIStrictToolFlagWithDiagnostics( return strict; } +function createResponsesFirstEventTimeoutError(model: Model, timeoutMs: number): Error { + return new Error( + `Azure OpenAI Responses stream did not deliver a first event within ${timeoutMs}ms after HTTP streaming headers. ` + + `provider=${model.provider} model=${model.id}. ` + + "The provider may be stalled while parsing the tool payload; retry with a smaller tool surface or enable OPENCLAW_DEBUG_MODEL_PAYLOAD=tools to inspect exposed tools.", + ); +} + +function withResponsesFirstEventTimeout( + openaiStream: AsyncIterable, + model: Model, + timeoutMs: number | undefined, +): AsyncIterable { + if (timeoutMs === undefined || timeoutMs <= 0 || !Number.isFinite(timeoutMs)) { + return openaiStream; + } + return { + async *[Symbol.asyncIterator]() { + const iterator = openaiStream[Symbol.asyncIterator](); + let timer: ReturnType | undefined; + const clear = () => { + if (timer) { + clearTimeout(timer); + timer = undefined; + } + }; + try { + const first = await new Promise>((resolve, reject) => { + timer = setTimeout( + () => reject(createResponsesFirstEventTimeoutError(model, timeoutMs)), + timeoutMs, + ); + iterator.next().then(resolve, reject); + }).finally(clear); + if (first.done) { + return; + } + yield first.value; + for (;;) { + const next = await iterator.next(); + if (next.done) { + return; + } + yield next.value; + } + } catch (error) { + void iterator.return?.().catch(() => undefined); + throw error; + } finally { + clear(); + } + }, + }; +} + async function processResponsesStream( openaiStream: AsyncIterable, output: MutableAssistantOutput, @@ -660,6 +716,7 @@ async function processResponsesStream( usage: MutableAssistantOutput["usage"], serviceTier?: ResponseCreateParamsStreaming["service_tier"], ) => void; + firstEventTimeoutMs?: number; }, ) { let currentItem: Record | null = null; @@ -669,7 +726,12 @@ async function processResponsesStream( const eventTypes = new Map(); const sseDebugMode = resolveModelSseDebugMode(); const blockIndex = () => output.content.length - 1; - for await (const rawEvent of openaiStream) { + const guardedStream = withResponsesFirstEventTimeout( + openaiStream, + model, + options?.firstEventTimeoutMs, + ); + for await (const rawEvent of guardedStream) { const event = rawEvent as Record; const type = stringifyUnknown(event.type); eventCount += 1; @@ -1421,7 +1483,9 @@ export function createAzureOpenAIResponsesTransportStreamFn(): StreamFn { `elapsedMs=${Date.now() - requestStartedAt}`, ); stream.push({ type: "start", partial: output as never }); - await processResponsesStream(responseStream, output, stream, model); + await processResponsesStream(responseStream, output, stream, model, { + firstEventTimeoutMs: AZURE_RESPONSES_FIRST_EVENT_TIMEOUT_MS, + }); if (options?.signal?.aborted) { throw new Error("Request was aborted"); } @@ -2455,7 +2519,9 @@ export const __testing = { sanitizeOpenAICodexResponsesParams, buildOpenAICompletionsClientConfig, processOpenAICompletionsStream, + processResponsesStream, formatModelTransportDebugBaseUrl, summarizeResponsesPayload, summarizeResponsesTools, + withResponsesFirstEventTimeout, }; diff --git a/src/agents/provider-transport-fetch.test.ts b/src/agents/provider-transport-fetch.test.ts index 2e182c1b329..7396b781af1 100644 --- a/src/agents/provider-transport-fetch.test.ts +++ b/src/agents/provider-transport-fetch.test.ts @@ -440,6 +440,53 @@ describe("buildGuardedModelFetch", () => { expect(items).toEqual([{ ok: true }]); }); + it("continues reading until split SSE frames produce a parser-visible event", async () => { + const encoder = new TextEncoder(); + let pulls = 0; + fetchWithSsrFGuardMock.mockResolvedValue({ + response: new Response( + new ReadableStream({ + pull(controller) { + pulls += 1; + if (pulls === 1) { + controller.enqueue(encoder.encode("event: response.created\n")); + return; + } + if (pulls === 2) { + controller.enqueue(encoder.encode('data: {"ok"')); + return; + } + if (pulls === 3) { + controller.enqueue(encoder.encode(": true}\n\n")); + return; + } + controller.close(); + }, + }), + { headers: { "content-type": "text/event-stream" } }, + ), + finalUrl: "https://api.openai.com/v1/responses", + release: vi.fn(async () => undefined), + }); + const model = { + id: "moonshotai/kimi-k2.6", + provider: "openrouter", + api: "openai-completions", + baseUrl: "https://openrouter.ai/api/v1", + } as unknown as Model<"openai-completions">; + + const response = await buildGuardedModelFetch(model)( + "https://openrouter.ai/api/v1/chat/completions", + { method: "POST" }, + ); + const items = []; + for await (const item of Stream.fromSSEResponse(response, new AbortController())) { + items.push(item); + } + + expect(items).toEqual([{ ok: true }]); + }); + it("synthesizes SSE frames for JSON bodies returned to streaming OpenAI SDK requests", async () => { fetchWithSsrFGuardMock.mockResolvedValue({ response: new Response(' {"ok": true} ', { @@ -499,6 +546,54 @@ describe("buildGuardedModelFetch", () => { expect(response.headers.get("content-type")).toBe("application/json"); }); + it("continues reading split JSON bodies before synthesizing streaming SSE frames", async () => { + const encoder = new TextEncoder(); + let pulls = 0; + fetchWithSsrFGuardMock.mockResolvedValue({ + response: new Response( + new ReadableStream({ + pull(controller) { + pulls += 1; + if (pulls === 1) { + controller.enqueue(encoder.encode('{"ok"')); + return; + } + if (pulls === 2) { + controller.enqueue(encoder.encode(": true}")); + return; + } + controller.close(); + }, + }), + { headers: { "content-type": "application/json; charset=utf-8" } }, + ), + finalUrl: "https://openrouter.ai/api/v1/chat/completions", + release: vi.fn(async () => undefined), + }); + const model = { + id: "moonshotai/kimi-k2.6", + provider: "openrouter", + api: "openai-completions", + baseUrl: "https://openrouter.ai/api/v1", + } as unknown as Model<"openai-completions">; + + const response = await buildGuardedModelFetch(model)( + "https://openrouter.ai/api/v1/chat/completions", + { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ model: "moonshotai/kimi-k2.6", stream: true }), + }, + ); + const items = []; + for await (const item of Stream.fromSSEResponse(response, new AbortController())) { + items.push(item); + } + + expect(response.headers.get("content-type")).toContain("text/event-stream"); + expect(items).toEqual([{ ok: true }]); + }); + it("preserves JSON bodies when the request is not streaming", async () => { fetchWithSsrFGuardMock.mockResolvedValue({ response: new Response('{"ok": true}', { diff --git a/src/agents/provider-transport-fetch.ts b/src/agents/provider-transport-fetch.ts index bbf38be7c55..e40adf12dab 100644 --- a/src/agents/provider-transport-fetch.ts +++ b/src/agents/provider-transport-fetch.ts @@ -77,18 +77,20 @@ function sanitizeOpenAISdkSseResponse( }, async pull(controller) { try { - const chunk = await reader?.read(); - if (!chunk || chunk.done) { - buffer += decoder.decode(); - const data = buffer.trim(); - if (data) { - controller.enqueue(encoder.encode(`data: ${data}\n\n`)); + for (;;) { + const chunk = await reader?.read(); + if (!chunk || chunk.done) { + buffer += decoder.decode(); + const data = buffer.trim(); + if (data) { + controller.enqueue(encoder.encode(`data: ${data}\n\n`)); + } + controller.enqueue(encoder.encode("data: [DONE]\n\n")); + controller.close(); + return; } - controller.enqueue(encoder.encode("data: [DONE]\n\n")); - controller.close(); - return; + buffer += decoder.decode(chunk.value, { stream: true }); } - buffer += decoder.decode(chunk.value, { stream: true }); } catch (error) { controller.error(error); } @@ -118,12 +120,13 @@ function sanitizeOpenAISdkSseResponse( const enqueueSanitized = ( controller: ReadableStreamDefaultController, text: string, - ) => { + ): number => { + let enqueued = 0; buffer += text; for (;;) { const boundary = findSseEventBoundary(buffer); if (!boundary) { - return; + return enqueued; } const block = buffer.slice(0, boundary.index); const separator = buffer.slice(boundary.index, boundary.index + boundary.length); @@ -132,6 +135,7 @@ function sanitizeOpenAISdkSseResponse( // messages. Drop those malformed keepalive-style blocks before it parses. if (hasReadableSseData(block)) { controller.enqueue(encoder.encode(`${block}${separator}`)); + enqueued += 1; } } }; @@ -142,20 +146,28 @@ function sanitizeOpenAISdkSseResponse( }, async pull(controller) { try { - const chunk = await reader?.read(); - if (!chunk || chunk.done) { - const tail = decoder.decode(); - if (tail) { - enqueueSanitized(controller, tail); + for (;;) { + const chunk = await reader?.read(); + if (!chunk || chunk.done) { + const tail = decoder.decode(); + if (tail) { + enqueueSanitized(controller, tail); + } + if (buffer && hasReadableSseData(buffer)) { + controller.enqueue(encoder.encode(buffer)); + } + buffer = ""; + controller.close(); + return; } - if (buffer && hasReadableSseData(buffer)) { - controller.enqueue(encoder.encode(buffer)); + const enqueued = enqueueSanitized( + controller, + decoder.decode(chunk.value, { stream: true }), + ); + if (enqueued > 0) { + return; } - buffer = ""; - controller.close(); - return; } - enqueueSanitized(controller, decoder.decode(chunk.value, { stream: true })); } catch (error) { controller.error(error); }