Files
moltbot/extensions/openrouter/stream.ts
2026-05-11 03:41:00 +01:00

222 lines
6.8 KiB
TypeScript

import type { StreamFn } from "@earendil-works/pi-agent-core";
import type { ProviderWrapStreamFnContext } from "openclaw/plugin-sdk/plugin-entry";
import { OPENROUTER_THINKING_STREAM_HOOKS } from "openclaw/plugin-sdk/provider-stream-family";
import {
createDeepSeekV4OpenAICompatibleThinkingWrapper,
type DeepSeekV4ReasoningEffort,
type DeepSeekV4ThinkingLevel,
createPayloadPatchStreamWrapper,
} from "openclaw/plugin-sdk/provider-stream-shared";
import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env";
import { isOpenRouterDeepSeekV4ModelId } from "./models.js";
import {
isOpenRouterProxyReasoningUnsupportedModel,
normalizeOpenRouterBaseUrl,
OPENROUTER_BASE_URL,
} from "./provider-catalog.js";
const log = createSubsystemLogger("openrouter-stream");
function readString(value: unknown): string | undefined {
return typeof value === "string" ? value.trim() : undefined;
}
function isOpenRouterAnthropicModelId(modelId: unknown): boolean {
const normalized = readString(modelId)?.toLowerCase();
return (
normalized?.startsWith("anthropic/") === true ||
normalized?.startsWith("openrouter/anthropic/") === true
);
}
function isVerifiedOpenRouterRoute(model: Parameters<StreamFn>[0]): boolean {
const provider = readString(model.provider)?.toLowerCase();
const baseUrl = readString(model.baseUrl);
if (baseUrl) {
return normalizeOpenRouterBaseUrl(baseUrl) === OPENROUTER_BASE_URL;
}
return provider === "openrouter";
}
function shouldPatchAnthropicOpenRouterPayload(model: Parameters<StreamFn>[0]): boolean {
const api = readString(model.api);
return (
(api === undefined || api === "openai-completions") &&
isOpenRouterAnthropicModelId(model.id) &&
isVerifiedOpenRouterRoute(model)
);
}
function shouldPatchDeepSeekV4OpenRouterPayload(model: Parameters<StreamFn>[0]): boolean {
const api = readString(model.api);
return (
(api === undefined || api === "openai-completions") &&
isOpenRouterDeepSeekV4ModelId(model.id) &&
isVerifiedOpenRouterRoute(model)
);
}
function resolveOpenRouterDeepSeekV4ReasoningEffort(
thinkingLevel: DeepSeekV4ThinkingLevel,
): DeepSeekV4ReasoningEffort {
switch (thinkingLevel) {
case "minimal":
case "low":
case "medium":
case "high":
case "xhigh":
return thinkingLevel;
case "max":
return "xhigh";
case "adaptive":
return "medium";
case "off":
case undefined:
return "high";
}
return "high";
}
function isEnabledReasoningValue(value: unknown): boolean {
if (value === undefined || value === null || value === false) {
return false;
}
if (typeof value === "string") {
const normalized = value.trim().toLowerCase();
return normalized !== "" && normalized !== "off" && normalized !== "none";
}
return true;
}
function isOpenRouterReasoningPayloadEnabled(payload: Record<string, unknown>): boolean {
return (
isEnabledReasoningValue(payload.reasoning) || isEnabledReasoningValue(payload.reasoning_effort)
);
}
function assistantMessageHasAnthropicToolUse(message: Record<string, unknown>): boolean {
if (Array.isArray(message.tool_calls) && message.tool_calls.length > 0) {
return true;
}
const content = message.content;
if (!Array.isArray(content)) {
return false;
}
return content.some(
(block) =>
block &&
typeof block === "object" &&
((block as { type?: unknown }).type === "tool_use" ||
(block as { type?: unknown }).type === "toolCall"),
);
}
function stripTrailingAssistantPrefillMessages(payload: Record<string, unknown>): number {
if (!Array.isArray(payload.messages)) {
return 0;
}
let stripped = 0;
while (payload.messages.length > 0) {
const finalMessage = payload.messages[payload.messages.length - 1];
if (!finalMessage || typeof finalMessage !== "object") {
break;
}
const message = finalMessage as Record<string, unknown>;
if (message.role !== "assistant" || assistantMessageHasAnthropicToolUse(message)) {
break;
}
payload.messages.pop();
stripped += 1;
}
return stripped;
}
function injectOpenRouterRouting(
baseStreamFn: StreamFn | undefined,
providerRouting?: Record<string, unknown>,
): StreamFn | undefined {
if (!providerRouting) {
return baseStreamFn;
}
return (model, context, options) =>
(
baseStreamFn ??
((nextModel) => {
throw new Error(
`OpenRouter routing wrapper requires an underlying streamFn for ${nextModel.id}.`,
);
})
)(
{
...model,
compat: { ...model.compat, openRouterRouting: providerRouting },
} as typeof model,
context,
options,
);
}
function createOpenRouterAnthropicPrefillWrapper(baseStreamFn: StreamFn | undefined): StreamFn {
return createPayloadPatchStreamWrapper(
baseStreamFn,
({ payload }) => {
if (!isOpenRouterReasoningPayloadEnabled(payload)) {
return;
}
const stripped = stripTrailingAssistantPrefillMessages(payload);
if (stripped > 0) {
log.warn(
`removed ${stripped} trailing assistant prefill message${stripped === 1 ? "" : "s"} because OpenRouter-routed Anthropic reasoning requires conversations to end with a user turn`,
);
}
},
{
shouldPatch: ({ model }) => shouldPatchAnthropicOpenRouterPayload(model),
},
);
}
function createOpenRouterDeepSeekV4ThinkingWrapper(
baseStreamFn: StreamFn | undefined,
thinkingLevel: ProviderWrapStreamFnContext["thinkingLevel"],
): StreamFn | undefined {
return createDeepSeekV4OpenAICompatibleThinkingWrapper({
baseStreamFn,
thinkingLevel,
shouldPatchModel: shouldPatchDeepSeekV4OpenRouterPayload,
resolveReasoningEffort: resolveOpenRouterDeepSeekV4ReasoningEffort,
});
}
export function wrapOpenRouterProviderStream(
ctx: ProviderWrapStreamFnContext,
): StreamFn | null | undefined {
const providerRouting =
ctx.extraParams?.provider != null && typeof ctx.extraParams.provider === "object"
? (ctx.extraParams.provider as Record<string, unknown>)
: undefined;
const routedStreamFn = providerRouting
? injectOpenRouterRouting(ctx.streamFn, providerRouting)
: ctx.streamFn;
const wrapStreamFn = OPENROUTER_THINKING_STREAM_HOOKS.wrapStreamFn ?? undefined;
if (!wrapStreamFn) {
return createOpenRouterAnthropicPrefillWrapper(
createOpenRouterDeepSeekV4ThinkingWrapper(routedStreamFn, ctx.thinkingLevel),
);
}
const wrappedStreamFn =
wrapStreamFn({
...ctx,
streamFn: routedStreamFn,
thinkingLevel: isOpenRouterProxyReasoningUnsupportedModel(ctx.modelId)
? undefined
: ctx.thinkingLevel,
}) ?? undefined;
return createOpenRouterAnthropicPrefillWrapper(
createOpenRouterDeepSeekV4ThinkingWrapper(wrappedStreamFn, ctx.thinkingLevel),
);
}