fix(azure):Drain split provider stream frames (#80927)

Merged via squash.

Prepared head SHA: 03a7e1fec3
Co-authored-by: galiniliev <5711535+galiniliev@users.noreply.github.com>
Co-authored-by: galiniliev <5711535+galiniliev@users.noreply.github.com>
Reviewed-by: @galiniliev
This commit is contained in:
Galin Iliev
2026-05-12 09:49:51 -07:00
committed by GitHub
parent 8d823a98c0
commit 4b28312bd8
7 changed files with 283 additions and 36 deletions

View File

@@ -35,6 +35,7 @@ Docs: https://docs.openclaw.ai
- Control UI/sessions: nest subagent sessions under their parent session in the session picker dropdown using a visual `└─ ` prefix, making the parent-child relationship clear. Fixes #77628. (#78623) Thanks @chinar-amrutkar.
- Auto-reply: surface a visible error when the configured model backend fails and fallback produces no visible reply, while preserving intentional silent turns and side-effect-only deliveries. (#80917) Thanks @dutifulbob.
- Agents/exec: skip redundant heartbeat wake-ups for subagent session exec completions, preventing spurious LLM invocations on parent sessions. Fixes #66748. (#66749) Thanks @ggzeng.
- Provider streams: keep OpenAI-compatible SSE and JSON fallback streams draining across split chunks and fail Azure Responses streams with a bounded first-event diagnostic instead of stalling. Refs #80926. (#80927) Thanks @galiniliev and @CaptainTimon.
### Changes

View File

@@ -1,3 +1,4 @@
import { stringEnum } from "openclaw/plugin-sdk/channel-actions";
import {
listMemoryCorpusSupplements,
resolveMemorySearchConfig,
@@ -31,23 +32,14 @@ export const MemorySearchSchema = Type.Object({
query: Type.String(),
maxResults: Type.Optional(Type.Number()),
minScore: Type.Optional(Type.Number()),
corpus: Type.Optional(
Type.Union([
Type.Literal("memory"),
Type.Literal("wiki"),
Type.Literal("all"),
Type.Literal("sessions"),
]),
),
corpus: Type.Optional(stringEnum(["memory", "wiki", "all", "sessions"])),
});
export const MemoryGetSchema = Type.Object({
path: Type.String(),
from: Type.Optional(Type.Number()),
lines: Type.Optional(Type.Number()),
corpus: Type.Optional(
Type.Union([Type.Literal("memory"), Type.Literal("wiki"), Type.Literal("all")]),
),
corpus: Type.Optional(stringEnum(["memory", "wiki", "all"])),
});
function resolveMemoryToolContext(options: MemoryToolOptions) {

View File

@@ -7,6 +7,7 @@ import {
setMemorySearchImpl,
} from "./memory-tool-manager-mock.js";
import { createMemorySearchTool } from "./tools.js";
import { MemoryGetSchema, MemorySearchSchema } from "./tools.shared.js";
import {
asOpenClawConfig,
createMemorySearchToolOrThrow,
@@ -33,6 +34,24 @@ vi.mock("openclaw/plugin-sdk/session-transcript-hit", async (importOriginal) =>
};
});
describe("memory tool schemas", () => {
it("uses flat corpus enums for provider tool compatibility", () => {
const searchCorpus = MemorySearchSchema.properties.corpus as {
anyOf?: unknown;
enum?: unknown;
};
const getCorpus = MemoryGetSchema.properties.corpus as {
anyOf?: unknown;
enum?: unknown;
};
expect(searchCorpus.anyOf).toBeUndefined();
expect(searchCorpus.enum).toEqual(["memory", "wiki", "all", "sessions"]);
expect(getCorpus.anyOf).toBeUndefined();
expect(getCorpus.enum).toEqual(["memory", "wiki", "all"]);
});
});
describe("memory_search unavailable payloads", () => {
beforeEach(() => {
resetMemoryToolMockState({ searchImpl: async () => [] });

View File

@@ -22,6 +22,7 @@ import {
import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js";
type OpenAICompletionsOutput = Parameters<typeof __testing.processOpenAICompletionsStream>[1];
type OpenAIResponsesOutput = Parameters<typeof __testing.processResponsesStream>[1];
type CapturedStreamEvent = { type?: string; delta?: string };
@@ -60,6 +61,54 @@ function createAssistantOutput(model: Model<"openai-completions">): OpenAIComple
};
}
function createResponsesAssistantOutput(
model: Model<"azure-openai-responses">,
): OpenAIResponsesOutput {
return {
role: "assistant" as const,
content: [],
api: model.api,
provider: model.provider,
model: model.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop",
timestamp: Date.now(),
};
}
function createAzureResponsesModel(): Model<"azure-openai-responses"> {
return {
id: "gpt-5.4-pro",
name: "GPT-5.4 Pro",
api: "azure-openai-responses",
provider: "azure-openai-responses-devdiv",
baseUrl: "https://example.openai.azure.com/openai/responses",
reasoning: true,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 200_000,
maxTokens: 8192,
};
}
function neverYieldsStream(): AsyncIterable<unknown> {
return {
[Symbol.asyncIterator]() {
return {
next: async () => await new Promise<IteratorResult<unknown>>(() => undefined),
return: async () => ({ done: true, value: undefined }),
};
},
};
}
async function* streamChunks(chunks: readonly unknown[]): AsyncGenerator<never> {
for (const chunk of chunks) {
yield chunk as never;
@@ -78,6 +127,19 @@ function expectRecordFields(record: unknown, expected: Record<string, unknown>)
}
describe("openai transport stream", () => {
it("fails Azure Responses streams when headers arrive but no first event follows", async () => {
const model = createAzureResponsesModel();
await expect(
__testing.processResponsesStream(
neverYieldsStream(),
createResponsesAssistantOutput(model),
{ push: vi.fn() },
model,
{ firstEventTimeoutMs: 1 },
),
).rejects.toThrow(/did not deliver a first event within 1ms after HTTP streaming headers/);
});
it("summarizes model payload tools with full names when requested", () => {
const previous = process.env.OPENCLAW_DEBUG_MODEL_PAYLOAD;
process.env.OPENCLAW_DEBUG_MODEL_PAYLOAD = "tools";

View File

@@ -70,6 +70,7 @@ import { mergeTransportMetadata, sanitizeTransportPayloadText } from "./transpor
const DEFAULT_AZURE_OPENAI_API_VERSION = "2024-12-01-preview";
const OPENAI_CODEX_RESPONSES_EMPTY_INPUT_TEXT = " ";
const GEMINI_THOUGHT_SIGNATURE_VALIDATOR_SKIP = "skip_thought_signature_validator";
const AZURE_RESPONSES_FIRST_EVENT_TIMEOUT_MS = 30_000;
const log = createSubsystemLogger("openai-transport");
type ReplayableResponseOutputMessage = Omit<ResponseOutputMessage, "id"> & { id?: string };
@@ -649,6 +650,61 @@ function resolveOpenAIStrictToolFlagWithDiagnostics(
return strict;
}
function createResponsesFirstEventTimeoutError(model: Model<Api>, timeoutMs: number): Error {
return new Error(
`Azure OpenAI Responses stream did not deliver a first event within ${timeoutMs}ms after HTTP streaming headers. ` +
`provider=${model.provider} model=${model.id}. ` +
"The provider may be stalled while parsing the tool payload; retry with a smaller tool surface or enable OPENCLAW_DEBUG_MODEL_PAYLOAD=tools to inspect exposed tools.",
);
}
function withResponsesFirstEventTimeout(
openaiStream: AsyncIterable<unknown>,
model: Model<Api>,
timeoutMs: number | undefined,
): AsyncIterable<unknown> {
if (timeoutMs === undefined || timeoutMs <= 0 || !Number.isFinite(timeoutMs)) {
return openaiStream;
}
return {
async *[Symbol.asyncIterator]() {
const iterator = openaiStream[Symbol.asyncIterator]();
let timer: ReturnType<typeof setTimeout> | undefined;
const clear = () => {
if (timer) {
clearTimeout(timer);
timer = undefined;
}
};
try {
const first = await new Promise<IteratorResult<unknown>>((resolve, reject) => {
timer = setTimeout(
() => reject(createResponsesFirstEventTimeoutError(model, timeoutMs)),
timeoutMs,
);
iterator.next().then(resolve, reject);
}).finally(clear);
if (first.done) {
return;
}
yield first.value;
for (;;) {
const next = await iterator.next();
if (next.done) {
return;
}
yield next.value;
}
} catch (error) {
void iterator.return?.().catch(() => undefined);
throw error;
} finally {
clear();
}
},
};
}
async function processResponsesStream(
openaiStream: AsyncIterable<unknown>,
output: MutableAssistantOutput,
@@ -660,6 +716,7 @@ async function processResponsesStream(
usage: MutableAssistantOutput["usage"],
serviceTier?: ResponseCreateParamsStreaming["service_tier"],
) => void;
firstEventTimeoutMs?: number;
},
) {
let currentItem: Record<string, unknown> | null = null;
@@ -669,7 +726,12 @@ async function processResponsesStream(
const eventTypes = new Map<string, number>();
const sseDebugMode = resolveModelSseDebugMode();
const blockIndex = () => output.content.length - 1;
for await (const rawEvent of openaiStream) {
const guardedStream = withResponsesFirstEventTimeout(
openaiStream,
model,
options?.firstEventTimeoutMs,
);
for await (const rawEvent of guardedStream) {
const event = rawEvent as Record<string, unknown>;
const type = stringifyUnknown(event.type);
eventCount += 1;
@@ -1421,7 +1483,9 @@ export function createAzureOpenAIResponsesTransportStreamFn(): StreamFn {
`elapsedMs=${Date.now() - requestStartedAt}`,
);
stream.push({ type: "start", partial: output as never });
await processResponsesStream(responseStream, output, stream, model);
await processResponsesStream(responseStream, output, stream, model, {
firstEventTimeoutMs: AZURE_RESPONSES_FIRST_EVENT_TIMEOUT_MS,
});
if (options?.signal?.aborted) {
throw new Error("Request was aborted");
}
@@ -2455,7 +2519,9 @@ export const __testing = {
sanitizeOpenAICodexResponsesParams,
buildOpenAICompletionsClientConfig,
processOpenAICompletionsStream,
processResponsesStream,
formatModelTransportDebugBaseUrl,
summarizeResponsesPayload,
summarizeResponsesTools,
withResponsesFirstEventTimeout,
};

View File

@@ -440,6 +440,53 @@ describe("buildGuardedModelFetch", () => {
expect(items).toEqual([{ ok: true }]);
});
it("continues reading until split SSE frames produce a parser-visible event", async () => {
const encoder = new TextEncoder();
let pulls = 0;
fetchWithSsrFGuardMock.mockResolvedValue({
response: new Response(
new ReadableStream({
pull(controller) {
pulls += 1;
if (pulls === 1) {
controller.enqueue(encoder.encode("event: response.created\n"));
return;
}
if (pulls === 2) {
controller.enqueue(encoder.encode('data: {"ok"'));
return;
}
if (pulls === 3) {
controller.enqueue(encoder.encode(": true}\n\n"));
return;
}
controller.close();
},
}),
{ headers: { "content-type": "text/event-stream" } },
),
finalUrl: "https://api.openai.com/v1/responses",
release: vi.fn(async () => undefined),
});
const model = {
id: "moonshotai/kimi-k2.6",
provider: "openrouter",
api: "openai-completions",
baseUrl: "https://openrouter.ai/api/v1",
} as unknown as Model<"openai-completions">;
const response = await buildGuardedModelFetch(model)(
"https://openrouter.ai/api/v1/chat/completions",
{ method: "POST" },
);
const items = [];
for await (const item of Stream.fromSSEResponse(response, new AbortController())) {
items.push(item);
}
expect(items).toEqual([{ ok: true }]);
});
it("synthesizes SSE frames for JSON bodies returned to streaming OpenAI SDK requests", async () => {
fetchWithSsrFGuardMock.mockResolvedValue({
response: new Response(' {"ok": true} ', {
@@ -499,6 +546,54 @@ describe("buildGuardedModelFetch", () => {
expect(response.headers.get("content-type")).toBe("application/json");
});
it("continues reading split JSON bodies before synthesizing streaming SSE frames", async () => {
const encoder = new TextEncoder();
let pulls = 0;
fetchWithSsrFGuardMock.mockResolvedValue({
response: new Response(
new ReadableStream({
pull(controller) {
pulls += 1;
if (pulls === 1) {
controller.enqueue(encoder.encode('{"ok"'));
return;
}
if (pulls === 2) {
controller.enqueue(encoder.encode(": true}"));
return;
}
controller.close();
},
}),
{ headers: { "content-type": "application/json; charset=utf-8" } },
),
finalUrl: "https://openrouter.ai/api/v1/chat/completions",
release: vi.fn(async () => undefined),
});
const model = {
id: "moonshotai/kimi-k2.6",
provider: "openrouter",
api: "openai-completions",
baseUrl: "https://openrouter.ai/api/v1",
} as unknown as Model<"openai-completions">;
const response = await buildGuardedModelFetch(model)(
"https://openrouter.ai/api/v1/chat/completions",
{
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ model: "moonshotai/kimi-k2.6", stream: true }),
},
);
const items = [];
for await (const item of Stream.fromSSEResponse(response, new AbortController())) {
items.push(item);
}
expect(response.headers.get("content-type")).toContain("text/event-stream");
expect(items).toEqual([{ ok: true }]);
});
it("preserves JSON bodies when the request is not streaming", async () => {
fetchWithSsrFGuardMock.mockResolvedValue({
response: new Response('{"ok": true}', {

View File

@@ -77,18 +77,20 @@ function sanitizeOpenAISdkSseResponse(
},
async pull(controller) {
try {
const chunk = await reader?.read();
if (!chunk || chunk.done) {
buffer += decoder.decode();
const data = buffer.trim();
if (data) {
controller.enqueue(encoder.encode(`data: ${data}\n\n`));
for (;;) {
const chunk = await reader?.read();
if (!chunk || chunk.done) {
buffer += decoder.decode();
const data = buffer.trim();
if (data) {
controller.enqueue(encoder.encode(`data: ${data}\n\n`));
}
controller.enqueue(encoder.encode("data: [DONE]\n\n"));
controller.close();
return;
}
controller.enqueue(encoder.encode("data: [DONE]\n\n"));
controller.close();
return;
buffer += decoder.decode(chunk.value, { stream: true });
}
buffer += decoder.decode(chunk.value, { stream: true });
} catch (error) {
controller.error(error);
}
@@ -118,12 +120,13 @@ function sanitizeOpenAISdkSseResponse(
const enqueueSanitized = (
controller: ReadableStreamDefaultController<Uint8Array>,
text: string,
) => {
): number => {
let enqueued = 0;
buffer += text;
for (;;) {
const boundary = findSseEventBoundary(buffer);
if (!boundary) {
return;
return enqueued;
}
const block = buffer.slice(0, boundary.index);
const separator = buffer.slice(boundary.index, boundary.index + boundary.length);
@@ -132,6 +135,7 @@ function sanitizeOpenAISdkSseResponse(
// messages. Drop those malformed keepalive-style blocks before it parses.
if (hasReadableSseData(block)) {
controller.enqueue(encoder.encode(`${block}${separator}`));
enqueued += 1;
}
}
};
@@ -142,20 +146,28 @@ function sanitizeOpenAISdkSseResponse(
},
async pull(controller) {
try {
const chunk = await reader?.read();
if (!chunk || chunk.done) {
const tail = decoder.decode();
if (tail) {
enqueueSanitized(controller, tail);
for (;;) {
const chunk = await reader?.read();
if (!chunk || chunk.done) {
const tail = decoder.decode();
if (tail) {
enqueueSanitized(controller, tail);
}
if (buffer && hasReadableSseData(buffer)) {
controller.enqueue(encoder.encode(buffer));
}
buffer = "";
controller.close();
return;
}
if (buffer && hasReadableSseData(buffer)) {
controller.enqueue(encoder.encode(buffer));
const enqueued = enqueueSanitized(
controller,
decoder.decode(chunk.value, { stream: true }),
);
if (enqueued > 0) {
return;
}
buffer = "";
controller.close();
return;
}
enqueueSanitized(controller, decoder.decode(chunk.value, { stream: true }));
} catch (error) {
controller.error(error);
}