diff --git a/src/plugin/core/streaming/transformer.test.ts b/src/plugin/core/streaming/transformer.test.ts new file mode 100644 index 0000000..a02c53a --- /dev/null +++ b/src/plugin/core/streaming/transformer.test.ts @@ -0,0 +1,136 @@ +import { describe, expect, it, vi } from "vitest"; +import { + transformStreamingPayload, + deduplicateThinkingText, + cacheThinkingSignaturesFromResponse, + createThoughtBuffer, +} from "./transformer"; +import { createSignatureStore } from "../../stores/signature-store"; + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +function geminiResponse(parts: unknown[]) { + return { candidates: [{ content: { role: "model", parts } }] }; +} + +function thinkingPart(text: string) { + return { thought: true, text }; +} + +function textPart(text: string) { + return { text }; +} + +// ─── transformStreamingPayload ──────────────────────────────────────────────── + +describe("transformStreamingPayload", () => { + it("passes non-data lines through unchanged", () => { + const line = "event: message"; + expect(transformStreamingPayload(line)).toBe(line); + }); + + it("passes empty data line unchanged", () => { + expect(transformStreamingPayload("data: ")).toBe("data: "); + }); + + it("passes invalid JSON data line unchanged", () => { + expect(transformStreamingPayload("data: {not json}")).toBe("data: {not json}"); + }); + + it("passes data without response field unchanged", () => { + const line = `data: ${JSON.stringify({ candidates: [] })}`; + expect(transformStreamingPayload(line)).toBe(line); + }); + + it("applies transformThinkingParts to response field", () => { + const inner = { type: "thinking", text: "reasoning" }; + const payload = { response: inner }; + const transform = vi.fn().mockReturnValue({ type: "redacted_thinking" }); + const result = transformStreamingPayload(`data: ${JSON.stringify(payload)}`, transform); + expect(transform).toHaveBeenCalledWith(inner); + expect(result).toContain("redacted_thinking"); + }); +}); + +// ─── deduplicateThinkingText — Gemini candidates ────────────────────────────── + +describe("deduplicateThinkingText", () => { + it("returns null input unchanged", () => { + expect(deduplicateThinkingText(null, createThoughtBuffer())).toBeNull(); + }); + + it("passes non-thinking parts through", () => { + const buf = createThoughtBuffer(); + const resp = geminiResponse([textPart("hello")]); + const result = deduplicateThinkingText(resp, buf) as typeof resp; + expect(result.candidates[0].content.parts).toEqual([textPart("hello")]); + }); + + it("emits full thinking text on first call", () => { + const buf = createThoughtBuffer(); + const result = deduplicateThinkingText(geminiResponse([thinkingPart("hello")]), buf) as any; + expect(result.candidates[0].content.parts[0].text).toBe("hello"); + }); + + it("emits only the new delta on subsequent call with extended text", () => { + const buf = createThoughtBuffer(); + deduplicateThinkingText(geminiResponse([thinkingPart("alpha")]), buf); + const result = deduplicateThinkingText(geminiResponse([thinkingPart("alphabeta")]), buf) as any; + expect(result.candidates[0].content.parts[0].text).toBe("beta"); + }); + + it("filters out duplicate thinking when hash set is provided", () => { + const buf = createThoughtBuffer(); + const seen = new Set(); + const resp = geminiResponse([thinkingPart("same")]); + deduplicateThinkingText(resp, buf, seen); + const result2 = deduplicateThinkingText(resp, buf, seen) as any; + const parts = result2.candidates[0].content.parts; + expect(parts.some((p: any) => p.thought === true)).toBe(false); + }); +}); + +// ─── cacheThinkingSignaturesFromResponse ────────────────────────────────────── + +describe("cacheThinkingSignaturesFromResponse", () => { + it("accumulates thinking text in the thought buffer", () => { + const store = createSignatureStore(); + const buf = createThoughtBuffer(); + cacheThinkingSignaturesFromResponse(geminiResponse([thinkingPart("my thoughts")]), "k", store, buf); + expect(buf.get(0)).toBe("my thoughts"); + }); + + it("fires onCacheSignature with session key, text, and signature", () => { + const store = createSignatureStore(); + const buf = createThoughtBuffer(); + const onSig = vi.fn(); + cacheThinkingSignaturesFromResponse( + geminiResponse([thinkingPart("reasoning"), { thoughtSignature: "sig-1" }]), + "sess", + store, + buf, + onSig, + ); + expect(onSig).toHaveBeenCalledWith("sess", "reasoning", "sig-1"); + }); + + it("stores result in signatureStore keyed by session key", () => { + const store = createSignatureStore(); + const buf = createThoughtBuffer(); + cacheThinkingSignaturesFromResponse( + geminiResponse([thinkingPart("t"), { thoughtSignature: "sig-2" }]), + "session-a", + store, + buf, + ); + expect(store.get("session-a")).toEqual({ text: "t", signature: "sig-2" }); + }); + + it("skips firing onCacheSignature when no thinking text was accumulated", () => { + const store = createSignatureStore(); + const buf = createThoughtBuffer(); + const onSig = vi.fn(); + cacheThinkingSignaturesFromResponse(geminiResponse([{ thoughtSignature: "sig" }]), "k", store, buf, onSig); + expect(onSig).not.toHaveBeenCalled(); + }); +}); diff --git a/src/plugin/core/streaming/transformer.ts b/src/plugin/core/streaming/transformer.ts index 389eb14..cf2f283 100644 --- a/src/plugin/core/streaming/transformer.ts +++ b/src/plugin/core/streaming/transformer.ts @@ -62,22 +62,22 @@ export function deduplicateThinkingText( ): unknown { if (!response || typeof response !== 'object') return response; - const resp = response as Record; + const responseRecord = response as Record; - if (Array.isArray(resp.candidates)) { - const newCandidates = resp.candidates.map((candidate: unknown, index: number) => { - const cand = candidate as Record | null; - if (!cand?.content) return candidate; + if (Array.isArray(responseRecord.candidates)) { + const newCandidates = responseRecord.candidates.map((candidate: unknown, index: number) => { + const candidateRecord = candidate as Record | null; + if (!candidateRecord?.content) return candidate; - const content = cand.content as Record; + const content = candidateRecord.content as Record; if (!Array.isArray(content.parts)) return candidate; const newParts = content.parts.map((part: unknown) => { - const p = part as Record; - + const partRecord = part as Record; + // Handle image data - save to disk and return file path - if (p.inlineData) { - const inlineData = p.inlineData as Record; + if (partRecord.inlineData) { + const inlineData = partRecord.inlineData as Record; const result = processImageData({ mimeType: inlineData.mimeType as string | undefined, data: inlineData.data as string | undefined, @@ -86,10 +86,10 @@ export function deduplicateThinkingText( return { text: result }; } } - - if (p.thought === true || p.type === 'thinking') { - const fullText = (p.text || p.thinking || '') as string; - + + if (partRecord.thought === true || partRecord.type === 'thinking') { + const fullText = (partRecord.text || partRecord.thinking || '') as string; + if (displayedThinkingHashes) { const hash = hashString(fullText); if (displayedThinkingHashes.has(hash)) { @@ -106,7 +106,7 @@ export function deduplicateThinkingText( sentBuffer.set(index, fullText); if (delta) { - return { ...p, text: delta, thinking: delta }; + return { ...partRecord, text: delta, thinking: delta }; } return null; } @@ -117,24 +117,24 @@ export function deduplicateThinkingText( return part; }); - const filteredParts = newParts.filter((p) => p !== null); + const filteredParts = newParts.filter((item) => item !== null); return { - ...cand, + ...candidateRecord, content: { ...content, parts: filteredParts }, }; }); - return { ...resp, candidates: newCandidates }; + return { ...responseRecord, candidates: newCandidates }; } - if (Array.isArray(resp.content)) { + if (Array.isArray(responseRecord.content)) { let thinkingIndex = 0; - const newContent = resp.content.map((block: unknown) => { - const b = block as Record | null; - if (b?.type === 'thinking') { - const fullText = (b.thinking || b.text || '') as string; - + const newContent = responseRecord.content.map((block: unknown) => { + const blockRecord = block as Record | null; + if (blockRecord?.type === 'thinking') { + const fullText = (blockRecord.thinking || blockRecord.text || '') as string; + if (displayedThinkingHashes) { const hash = hashString(fullText); if (displayedThinkingHashes.has(hash)) { @@ -153,7 +153,7 @@ export function deduplicateThinkingText( thinkingIndex++; if (delta) { - return { ...b, thinking: delta, text: delta }; + return { ...blockRecord, thinking: delta, text: delta }; } return null; } @@ -165,8 +165,8 @@ export function deduplicateThinkingText( return block; }); - const filteredContent = newContent.filter((b) => b !== null); - return { ...resp, content: filteredContent }; + const filteredContent = newContent.filter((item) => item !== null); + return { ...responseRecord, content: filteredContent }; } return response; @@ -232,29 +232,29 @@ export function cacheThinkingSignaturesFromResponse( ): void { if (!response || typeof response !== 'object') return; - const resp = response as Record; + const responseRecord = response as Record; - if (Array.isArray(resp.candidates)) { - resp.candidates.forEach((candidate: unknown, index: number) => { - const cand = candidate as Record | null; - if (!cand?.content) return; - const content = cand.content as Record; + if (Array.isArray(responseRecord.candidates)) { + responseRecord.candidates.forEach((candidate: unknown, index: number) => { + const candidateRecord = candidate as Record | null; + if (!candidateRecord?.content) return; + const content = candidateRecord.content as Record; if (!Array.isArray(content.parts)) return; content.parts.forEach((part: unknown) => { - const p = part as Record; - if (p.thought === true || p.type === 'thinking') { - const text = (p.text || p.thinking || '') as string; + const partRecord = part as Record; + if (partRecord.thought === true || partRecord.type === 'thinking') { + const text = (partRecord.text || partRecord.thinking || '') as string; if (text) { const current = thoughtBuffer.get(index) ?? ''; thoughtBuffer.set(index, current + text); } } - if (p.thoughtSignature) { + if (partRecord.thoughtSignature) { const fullText = thoughtBuffer.get(index) ?? ''; if (fullText) { - const signature = p.thoughtSignature as string; + const signature = partRecord.thoughtSignature as string; onCacheSignature?.(signatureSessionKey, fullText, signature); signatureStore.set(signatureSessionKey, { text: fullText, signature }); } @@ -263,23 +263,23 @@ export function cacheThinkingSignaturesFromResponse( }); } - if (Array.isArray(resp.content)) { + if (Array.isArray(responseRecord.content)) { // Use thoughtBuffer to accumulate thinking text across SSE events // Claude streams thinking content and signature in separate events const CLAUDE_BUFFER_KEY = 0; // Use index 0 for Claude's single-stream content - resp.content.forEach((block: unknown) => { - const b = block as Record | null; - if (b?.type === 'thinking') { - const text = (b.thinking || b.text || '') as string; + responseRecord.content.forEach((block: unknown) => { + const blockRecord = block as Record | null; + if (blockRecord?.type === 'thinking') { + const text = (blockRecord.thinking || blockRecord.text || '') as string; if (text) { const current = thoughtBuffer.get(CLAUDE_BUFFER_KEY) ?? ''; thoughtBuffer.set(CLAUDE_BUFFER_KEY, current + text); } } - if (b?.signature) { + if (blockRecord?.signature) { const fullText = thoughtBuffer.get(CLAUDE_BUFFER_KEY) ?? ''; if (fullText) { - const signature = b.signature as string; + const signature = blockRecord.signature as string; onCacheSignature?.(signatureSessionKey, fullText, signature); signatureStore.set(signatureSessionKey, { text: fullText, signature }); }