mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-13 23:52:06 +00:00
refactor(http-recorder): Redactor + Recorder seams, README (#26636)
This commit is contained in:
214
packages/http-recorder/README.md
Normal file
214
packages/http-recorder/README.md
Normal file
@@ -0,0 +1,214 @@
|
||||
# @opencode-ai/http-recorder
|
||||
|
||||
Record and replay HTTP and WebSocket traffic for Effect's `HttpClient`. Tests
|
||||
exercise real request shapes against deterministic, version-controlled
|
||||
cassettes — no manual mocks, no flakes from upstream drift.
|
||||
|
||||
## Install
|
||||
|
||||
Internal package; depended on as `@opencode-ai/http-recorder` from another
|
||||
workspace package.
|
||||
|
||||
```ts
|
||||
import { HttpRecorder } from "@opencode-ai/http-recorder"
|
||||
```
|
||||
|
||||
## Quickstart
|
||||
|
||||
Provide `cassetteLayer(name)` in place of (or layered over) your `HttpClient`.
|
||||
The first run records to `test/fixtures/recordings/<name>.json`; subsequent
|
||||
runs replay from it.
|
||||
|
||||
```ts
|
||||
import { Effect } from "effect"
|
||||
import { HttpClient, HttpClientRequest } from "effect/unstable/http"
|
||||
import { HttpRecorder } from "@opencode-ai/http-recorder"
|
||||
|
||||
const program = Effect.gen(function* () {
|
||||
const http = yield* HttpClient.HttpClient
|
||||
const response = yield* http.execute(HttpClientRequest.get("https://api.example.com/users/1"))
|
||||
return yield* response.json
|
||||
})
|
||||
|
||||
// Replay (default). Fails if the cassette is missing.
|
||||
Effect.runPromise(program.pipe(Effect.provide(HttpRecorder.cassetteLayer("users/get-one"))))
|
||||
|
||||
// Record. Hits the upstream and writes the cassette.
|
||||
Effect.runPromise(
|
||||
program.pipe(Effect.provide(HttpRecorder.cassetteLayer("users/get-one", { mode: "record" }))),
|
||||
)
|
||||
```
|
||||
|
||||
Set the mode from the environment in your test setup:
|
||||
|
||||
```ts
|
||||
HttpRecorder.cassetteLayer("users/get-one", {
|
||||
mode: process.env.RECORD === "true" ? "record" : "replay",
|
||||
})
|
||||
```
|
||||
|
||||
## Modes
|
||||
|
||||
| Mode | Behavior |
|
||||
| ------------- | -------------------------------------------------------------------- |
|
||||
| `replay` | Default. Match the request to a recorded interaction; error if none. |
|
||||
| `record` | Execute upstream, append the interaction, write the cassette. |
|
||||
| `passthrough` | Bypass the recorder entirely — just call upstream. |
|
||||
|
||||
## Cassette format
|
||||
|
||||
A cassette is JSON at `test/fixtures/recordings/<name>.json`:
|
||||
|
||||
```json
|
||||
{
|
||||
"version": 1,
|
||||
"metadata": { "name": "users/get-one", "recordedAt": "2026-05-09T..." },
|
||||
"interactions": [
|
||||
{
|
||||
"transport": "http",
|
||||
"request": { "method": "GET", "url": "...", "headers": {...}, "body": "" },
|
||||
"response": { "status": 200, "headers": {...}, "body": "..." }
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
Cassettes are normal source files — review them, diff them, commit them.
|
||||
|
||||
## Request matching
|
||||
|
||||
By default, requests match on canonicalized method, URL, headers, and JSON
|
||||
body (object keys sorted). Two dispatch strategies are available:
|
||||
|
||||
- **`match`** (default) — find the first recorded interaction whose request
|
||||
matches the incoming request. Same request twice returns the same response.
|
||||
- **`sequential`** — return interactions in the order they were recorded,
|
||||
validating each one matches as the cursor advances. Use for ordered flows
|
||||
where the same URL is hit multiple times with meaningful state changes
|
||||
(pagination, retries, polling).
|
||||
|
||||
```ts
|
||||
HttpRecorder.cassetteLayer("flow/poll-until-done", { dispatch: "sequential" })
|
||||
```
|
||||
|
||||
Supply your own matcher via `match: (incoming, recorded) => boolean` for
|
||||
custom equivalence (e.g. ignoring a timestamp field in the body).
|
||||
|
||||
## Redaction & secret safety
|
||||
|
||||
Cassettes get checked in, so the recorder is aggressive about not letting
|
||||
secrets escape. Redaction is configured by composing a `Redactor`:
|
||||
|
||||
```ts
|
||||
import { HttpRecorder, Redactor } from "@opencode-ai/http-recorder"
|
||||
|
||||
HttpRecorder.cassetteLayer("anthropic/messages", {
|
||||
mode: process.env.RECORD === "true" ? "record" : "replay",
|
||||
redactor: Redactor.defaults({
|
||||
requestHeaders: { allow: ["content-type", "anthropic-version"] },
|
||||
url: { transform: (url) => url.replace(/\/accounts\/[^/]+/, "/accounts/{account}") },
|
||||
body: (parsed) => ({ ...(parsed as object), user_id: "{user}" }),
|
||||
}),
|
||||
})
|
||||
```
|
||||
|
||||
`Redactor.defaults({ … })` composes the four built-in redactors with your
|
||||
overrides. For full control, build the stack yourself:
|
||||
|
||||
```ts
|
||||
const redactor = Redactor.compose(
|
||||
Redactor.requestHeaders({ allow: ["content-type", "x-custom"] }),
|
||||
Redactor.responseHeaders(),
|
||||
Redactor.url({ query: ["session-id"] }),
|
||||
Redactor.body((parsed) => /* … */),
|
||||
)
|
||||
```
|
||||
|
||||
What each layer does:
|
||||
|
||||
- **`requestHeaders` / `responseHeaders`** — strip headers to a small
|
||||
allow-list (request default: `content-type`, `accept`, `openai-beta`;
|
||||
response default: `content-type`). Sensitive headers within the
|
||||
allow-list (`authorization`, `cookie`, API-key headers, AWS/GCP tokens,
|
||||
…) are replaced with `[REDACTED]`.
|
||||
- **`url`** — query parameters matching common secret names (`api_key`,
|
||||
`token`, `signature`, AWS signing params, …) are replaced with
|
||||
`[REDACTED]`. URL user/password are replaced. `transform` runs after
|
||||
built-in redaction for path-level scrubbing.
|
||||
- **`body`** — receives the parsed JSON request body and returns a redacted
|
||||
version. No-op for non-JSON bodies.
|
||||
|
||||
After assembling the cassette, the recorder scans every string for known
|
||||
secret patterns (Bearer tokens, `sk-…`, `sk-ant-…`, Google `AIza…` keys,
|
||||
AWS access keys, GitHub tokens, PEM blocks) and for values matching any
|
||||
environment variable named like a credential. If anything is found, the
|
||||
cassette is **not written** and the request fails with `UnsafeCassetteError`
|
||||
listing what was detected.
|
||||
|
||||
## WebSocket recording
|
||||
|
||||
WebSocket support records the open frame plus client/server message
|
||||
streams. It uses the shared `Cassette.Service`, so HTTP and WS interactions
|
||||
can live in the same cassette.
|
||||
|
||||
```ts
|
||||
import { HttpRecorder } from "@opencode-ai/http-recorder"
|
||||
import { Effect } from "effect"
|
||||
|
||||
const program = Effect.gen(function* () {
|
||||
const cassette = yield* HttpRecorder.Cassette.Service
|
||||
const executor = yield* HttpRecorder.makeWebSocketExecutor({
|
||||
name: "ws/subscribe",
|
||||
mode: process.env.RECORD === "true" ? "record" : "replay",
|
||||
cassette,
|
||||
live: liveExecutor,
|
||||
})
|
||||
// use executor.open(...)
|
||||
})
|
||||
```
|
||||
|
||||
## Inspecting cassettes programmatically
|
||||
|
||||
`Cassette.Service` exposes `read`, `write`, `append`, `exists`, `list`, and
|
||||
`scan` (re-running the secret detector over an existing cassette). Useful
|
||||
for CI checks:
|
||||
|
||||
```ts
|
||||
import { HttpRecorder } from "@opencode-ai/http-recorder"
|
||||
import { Effect } from "effect"
|
||||
|
||||
const audit = Effect.gen(function* () {
|
||||
const cassettes = yield* HttpRecorder.Cassette.Service
|
||||
const findings = yield* Effect.forEach(yield* cassettes.list(), (entry) =>
|
||||
cassettes.read(entry.name).pipe(Effect.map((c) => ({ entry, findings: cassettes.scan(c) }))),
|
||||
)
|
||||
return findings.filter((r) => r.findings.length > 0)
|
||||
})
|
||||
```
|
||||
|
||||
## Options reference
|
||||
|
||||
```ts
|
||||
type RecordReplayOptions = {
|
||||
mode?: "record" | "replay" | "passthrough" // default: "replay"
|
||||
directory?: string // default: <cwd>/test/fixtures/recordings
|
||||
metadata?: Record<string, unknown> // merged into cassette.metadata
|
||||
redactor?: Redactor // default: Redactor.defaults()
|
||||
dispatch?: "match" | "sequential" // default: "match"
|
||||
match?: (incoming, recorded) => boolean // custom matcher
|
||||
}
|
||||
```
|
||||
|
||||
## Layout
|
||||
|
||||
| File | Purpose |
|
||||
| -------------- | -------------------------------------------------------------------------------- |
|
||||
| `effect.ts` | `cassetteLayer` / `recordingLayer` — the `HttpClient` adapter. |
|
||||
| `websocket.ts` | `makeWebSocketExecutor` — WebSocket record/replay. |
|
||||
| `cassette.ts` | `Cassette.Service` — reads/writes cassette files, accumulates state. |
|
||||
| `recorder.ts` | Shared transport plumbing: `UnsafeCassetteError`, `appendOrFail`, `ReplayState`. |
|
||||
| `redactor.ts` | Composable `Redactor` — headers, url, body redaction. |
|
||||
| `redaction.ts` | Lower-level header/URL primitives + secret pattern detection. |
|
||||
| `schema.ts` | Effect Schema definitions for the cassette JSON format. |
|
||||
| `storage.ts` | Path resolution, JSON encode/decode, sync existence check. |
|
||||
| `matching.ts` | Request matcher, canonicalization, dispatch strategies, mismatch diagnostics. |
|
||||
@@ -1,6 +1,6 @@
|
||||
import { Context, Effect, FileSystem, Layer, PlatformError, Ref } from "effect"
|
||||
import { Context, Effect, FileSystem, Layer, PlatformError } from "effect"
|
||||
import * as path from "node:path"
|
||||
import { cassetteSecretFindings, type SecretFinding } from "./redaction"
|
||||
import { cassetteSecretFindings, secretFindings, type SecretFinding } from "./redaction"
|
||||
import type { Cassette, CassetteMetadata, Interaction } from "./schema"
|
||||
import { cassetteFor, cassettePath, DEFAULT_RECORDINGS_DIR, formatCassette, parseCassette } from "./storage"
|
||||
|
||||
@@ -37,10 +37,18 @@ export const layer = (options: { readonly directory?: string } = {}) =>
|
||||
Effect.gen(function* () {
|
||||
const fileSystem = yield* FileSystem.FileSystem
|
||||
const directory = options.directory ?? DEFAULT_RECORDINGS_DIR
|
||||
const recorded = yield* Ref.make(new Map<string, ReadonlyArray<Interaction>>())
|
||||
const recorded = new Map<string, { interactions: Interaction[]; findings: SecretFinding[] }>()
|
||||
const directoriesEnsured = new Set<string>()
|
||||
|
||||
const pathFor = (name: string) => cassettePath(name, directory)
|
||||
|
||||
const ensureDirectory = Effect.fn("Cassette.ensureDirectory")(function* (name: string) {
|
||||
const dir = path.dirname(pathFor(name))
|
||||
if (directoriesEnsured.has(dir)) return
|
||||
yield* fileSystem.makeDirectory(dir, { recursive: true })
|
||||
directoriesEnsured.add(dir)
|
||||
})
|
||||
|
||||
const walk = (directory: string): Effect.Effect<ReadonlyArray<string>, PlatformError.PlatformError> =>
|
||||
Effect.gen(function* () {
|
||||
const entries = yield* fileSystem
|
||||
@@ -61,7 +69,7 @@ export const layer = (options: { readonly directory?: string } = {}) =>
|
||||
})
|
||||
|
||||
const write = Effect.fn("Cassette.write")(function* (name: string, cassette: Cassette) {
|
||||
yield* fileSystem.makeDirectory(path.dirname(pathFor(name)), { recursive: true })
|
||||
yield* ensureDirectory(name)
|
||||
yield* fileSystem.writeFileString(pathFor(name), formatCassette(cassette))
|
||||
})
|
||||
|
||||
@@ -70,11 +78,12 @@ export const layer = (options: { readonly directory?: string } = {}) =>
|
||||
interaction: Interaction,
|
||||
metadata: CassetteMetadata | undefined,
|
||||
) {
|
||||
const interactions = yield* Ref.updateAndGet(recorded, (previous) =>
|
||||
new Map(previous).set(name, [...(previous.get(name) ?? []), interaction]),
|
||||
)
|
||||
const cassette = cassetteFor(name, interactions.get(name) ?? [], metadata)
|
||||
const findings = cassetteSecretFindings(cassette)
|
||||
const entry = recorded.get(name) ?? { interactions: [], findings: [] }
|
||||
entry.interactions.push(interaction)
|
||||
entry.findings.push(...secretFindings(interaction))
|
||||
recorded.set(name, entry)
|
||||
const cassette = cassetteFor(name, entry.interactions, metadata)
|
||||
const findings = [...entry.findings, ...secretFindings(cassette.metadata ?? {})]
|
||||
if (findings.length === 0) yield* write(name, cassette)
|
||||
return { cassette, findings }
|
||||
})
|
||||
@@ -103,6 +112,3 @@ export const layer = (options: { readonly directory?: string } = {}) =>
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer()
|
||||
|
||||
export * as Cassette from "./cassette"
|
||||
|
||||
@@ -1,95 +0,0 @@
|
||||
import { Option } from "effect"
|
||||
import { Headers, HttpBody, HttpClientRequest, UrlParams } from "effect/unstable/http"
|
||||
import { decodeJson } from "./matching"
|
||||
import { REDACTED, redactUrl, secretFindings } from "./redaction"
|
||||
import { httpInteractions, type Cassette, type RequestSnapshot } from "./schema"
|
||||
|
||||
const safeText = (value: unknown) => {
|
||||
if (value === undefined) return "undefined"
|
||||
if (secretFindings(value).length > 0) return JSON.stringify(REDACTED)
|
||||
const text = typeof value === "string" ? JSON.stringify(value) : JSON.stringify(value)
|
||||
if (!text) return String(value)
|
||||
return text.length > 300 ? `${text.slice(0, 300)}...` : text
|
||||
}
|
||||
|
||||
const jsonBody = (body: string) => Option.getOrUndefined(decodeJson(body))
|
||||
|
||||
const valueDiffs = (expected: unknown, received: unknown, base = "$", limit = 8): ReadonlyArray<string> => {
|
||||
if (Object.is(expected, received)) return []
|
||||
if (
|
||||
expected &&
|
||||
received &&
|
||||
typeof expected === "object" &&
|
||||
typeof received === "object" &&
|
||||
!Array.isArray(expected) &&
|
||||
!Array.isArray(received)
|
||||
) {
|
||||
return [...new Set([...Object.keys(expected), ...Object.keys(received)])]
|
||||
.toSorted()
|
||||
.flatMap((key) =>
|
||||
valueDiffs(
|
||||
(expected as Record<string, unknown>)[key],
|
||||
(received as Record<string, unknown>)[key],
|
||||
`${base}.${key}`,
|
||||
limit,
|
||||
),
|
||||
)
|
||||
.slice(0, limit)
|
||||
}
|
||||
if (Array.isArray(expected) && Array.isArray(received)) {
|
||||
return Array.from({ length: Math.max(expected.length, received.length) }, (_, index) => index)
|
||||
.flatMap((index) => valueDiffs(expected[index], received[index], `${base}[${index}]`, limit))
|
||||
.slice(0, limit)
|
||||
}
|
||||
return [`${base} expected ${safeText(expected)}, received ${safeText(received)}`]
|
||||
}
|
||||
|
||||
const headerDiffs = (expected: Record<string, string>, received: Record<string, string>) =>
|
||||
[...new Set([...Object.keys(expected), ...Object.keys(received)])].toSorted().flatMap((key) => {
|
||||
if (expected[key] === received[key]) return []
|
||||
if (expected[key] === undefined) return [` ${key} unexpected ${safeText(received[key])}`]
|
||||
if (received[key] === undefined) return [` ${key} missing expected ${safeText(expected[key])}`]
|
||||
return [` ${key} expected ${safeText(expected[key])}, received ${safeText(received[key])}`]
|
||||
})
|
||||
|
||||
export const requestDiff = (expected: RequestSnapshot, received: RequestSnapshot) => {
|
||||
const lines = []
|
||||
if (expected.method !== received.method) {
|
||||
lines.push("method:", ` expected ${expected.method}, received ${received.method}`)
|
||||
}
|
||||
if (expected.url !== received.url) {
|
||||
lines.push("url:", ` expected ${expected.url}`, ` received ${received.url}`)
|
||||
}
|
||||
const headers = headerDiffs(expected.headers, received.headers)
|
||||
if (headers.length > 0) lines.push("headers:", ...headers.slice(0, 8))
|
||||
const expectedBody = jsonBody(expected.body)
|
||||
const receivedBody = jsonBody(received.body)
|
||||
const body =
|
||||
expectedBody !== undefined && receivedBody !== undefined
|
||||
? valueDiffs(expectedBody, receivedBody).map((line) => ` ${line}`)
|
||||
: expected.body === received.body
|
||||
? []
|
||||
: [` expected ${safeText(expected.body)}, received ${safeText(received.body)}`]
|
||||
if (body.length > 0) lines.push("body:", ...body)
|
||||
return lines
|
||||
}
|
||||
|
||||
export const mismatchDetail = (cassette: Cassette, incoming: RequestSnapshot) => {
|
||||
const interactions = httpInteractions(cassette)
|
||||
if (interactions.length === 0) return "cassette has no recorded HTTP interactions"
|
||||
const ranked = interactions
|
||||
.map((interaction, index) => ({ index, lines: requestDiff(interaction.request, incoming) }))
|
||||
.toSorted((a, b) => a.lines.length - b.lines.length || a.index - b.index)
|
||||
const best = ranked[0]
|
||||
return ["no recorded interaction matched", `closest interaction: #${best.index + 1}`, ...best.lines].join("\n")
|
||||
}
|
||||
|
||||
export const redactedErrorRequest = (request: HttpClientRequest.HttpClientRequest) =>
|
||||
HttpClientRequest.makeWith(
|
||||
request.method,
|
||||
redactUrl(request.url),
|
||||
UrlParams.empty,
|
||||
Option.none(),
|
||||
Headers.empty,
|
||||
HttpBody.empty,
|
||||
)
|
||||
@@ -1,26 +1,21 @@
|
||||
import { NodeFileSystem } from "@effect/platform-node"
|
||||
import { Effect, Layer, Option, Ref } from "effect"
|
||||
import { Effect, Layer, Option } from "effect"
|
||||
import {
|
||||
FetchHttpClient,
|
||||
Headers,
|
||||
HttpBody,
|
||||
HttpClient,
|
||||
HttpClientError,
|
||||
HttpClientRequest,
|
||||
HttpClientResponse,
|
||||
UrlParams,
|
||||
} from "effect/unstable/http"
|
||||
import { redactedErrorRequest, mismatchDetail, requestDiff } from "./diff"
|
||||
import { defaultMatcher, decodeJson, type RequestMatcher } from "./matching"
|
||||
import { redactHeaders, redactUrl, type SecretFinding } from "./redaction"
|
||||
import {
|
||||
httpInteractions,
|
||||
type Cassette,
|
||||
type CassetteMetadata,
|
||||
type HttpInteraction,
|
||||
type ResponseSnapshot,
|
||||
} from "./schema"
|
||||
import * as CassetteService from "./cassette"
|
||||
|
||||
export const DEFAULT_REQUEST_HEADERS: ReadonlyArray<string> = ["content-type", "accept", "openai-beta"]
|
||||
const DEFAULT_RESPONSE_HEADERS: ReadonlyArray<string> = ["content-type"]
|
||||
import { defaultMatcher, selectMatch, selectSequential, type RequestMatcher } from "./matching"
|
||||
import { appendOrFail, makeReplayState } from "./recorder"
|
||||
import { defaults, type Redactor } from "./redactor"
|
||||
import { redactUrl } from "./redaction"
|
||||
import { httpInteractions, type CassetteMetadata, type HttpInteraction, type ResponseSnapshot } from "./schema"
|
||||
|
||||
export type RecordReplayMode = "record" | "replay" | "passthrough"
|
||||
|
||||
@@ -28,35 +23,15 @@ export interface RecordReplayOptions {
|
||||
readonly mode?: RecordReplayMode
|
||||
readonly directory?: string
|
||||
readonly metadata?: CassetteMetadata
|
||||
readonly redact?: {
|
||||
readonly headers?: ReadonlyArray<string>
|
||||
readonly query?: ReadonlyArray<string>
|
||||
readonly url?: (url: string) => string
|
||||
}
|
||||
readonly requestHeaders?: ReadonlyArray<string>
|
||||
readonly responseHeaders?: ReadonlyArray<string>
|
||||
readonly redactBody?: (body: unknown) => unknown
|
||||
readonly redactor?: Redactor
|
||||
readonly dispatch?: "match" | "sequential"
|
||||
readonly match?: RequestMatcher
|
||||
}
|
||||
|
||||
const responseHeaders = (
|
||||
response: HttpClientResponse.HttpClientResponse,
|
||||
allow: ReadonlyArray<string>,
|
||||
redact: ReadonlyArray<string> | undefined,
|
||||
) => {
|
||||
const merged = redactHeaders(response.headers as Record<string, string>, allow, redact)
|
||||
if (!merged["content-type"]) merged["content-type"] = "text/event-stream"
|
||||
return merged
|
||||
}
|
||||
|
||||
const BINARY_CONTENT_TYPES: ReadonlyArray<string> = ["vnd.amazon.eventstream", "octet-stream"]
|
||||
|
||||
const isBinaryContentType = (contentType: string | undefined) => {
|
||||
if (!contentType) return false
|
||||
const lower = contentType.toLowerCase()
|
||||
return BINARY_CONTENT_TYPES.some((token) => lower.includes(token))
|
||||
}
|
||||
const isBinaryContentType = (contentType: string | undefined) =>
|
||||
contentType !== undefined && BINARY_CONTENT_TYPES.some((token) => contentType.toLowerCase().includes(token))
|
||||
|
||||
const captureResponseBody = (response: HttpClientResponse.HttpClientResponse, contentType: string | undefined) =>
|
||||
isBinaryContentType(contentType)
|
||||
@@ -68,34 +43,12 @@ const captureResponseBody = (response: HttpClientResponse.HttpClientResponse, co
|
||||
const decodeResponseBody = (snapshot: ResponseSnapshot) =>
|
||||
snapshot.bodyEncoding === "base64" ? Buffer.from(snapshot.body, "base64") : snapshot.body
|
||||
|
||||
const fixtureMissing = (request: HttpClientRequest.HttpClientRequest, name: string) =>
|
||||
new HttpClientError.HttpClientError({
|
||||
reason: new HttpClientError.TransportError({
|
||||
request: redactedErrorRequest(request),
|
||||
description: `Fixture "${name}" not found. Run with RECORD=true to create it.`,
|
||||
}),
|
||||
})
|
||||
export const redactedErrorRequest = (request: HttpClientRequest.HttpClientRequest) =>
|
||||
HttpClientRequest.makeWith(request.method, redactUrl(request.url), UrlParams.empty, Option.none(), Headers.empty, HttpBody.empty)
|
||||
|
||||
const fixtureMismatch = (request: HttpClientRequest.HttpClientRequest, name: string, detail: string) =>
|
||||
const transportError = (request: HttpClientRequest.HttpClientRequest, description: string) =>
|
||||
new HttpClientError.HttpClientError({
|
||||
reason: new HttpClientError.TransportError({
|
||||
request: redactedErrorRequest(request),
|
||||
description: `Fixture "${name}" does not match the current request: ${detail}. Run with RECORD=true to update it.`,
|
||||
}),
|
||||
})
|
||||
|
||||
const unsafeCassette = (
|
||||
request: HttpClientRequest.HttpClientRequest,
|
||||
name: string,
|
||||
findings: ReadonlyArray<SecretFinding>,
|
||||
) =>
|
||||
new HttpClientError.HttpClientError({
|
||||
reason: new HttpClientError.TransportError({
|
||||
request: redactedErrorRequest(request),
|
||||
description: `Refusing to write cassette "${name}" because it contains possible secrets: ${findings
|
||||
.map((item) => `${item.path} (${item.reason})`)
|
||||
.join(", ")}`,
|
||||
}),
|
||||
reason: new HttpClientError.TransportError({ request: redactedErrorRequest(request), description }),
|
||||
})
|
||||
|
||||
export const recordingLayer = (
|
||||
@@ -107,61 +60,21 @@ export const recordingLayer = (
|
||||
Effect.gen(function* () {
|
||||
const upstream = yield* HttpClient.HttpClient
|
||||
const cassetteService = yield* CassetteService.Service
|
||||
const requestHeadersAllow = options.requestHeaders ?? DEFAULT_REQUEST_HEADERS
|
||||
const responseHeadersAllow = options.responseHeaders ?? DEFAULT_RESPONSE_HEADERS
|
||||
const redactor = options.redactor ?? defaults()
|
||||
const match = options.match ?? defaultMatcher
|
||||
const mode = options.mode ?? "replay"
|
||||
const sequential = options.dispatch === "sequential"
|
||||
const replay = yield* Ref.make<Cassette | undefined>(undefined)
|
||||
const cursor = yield* Ref.make(0)
|
||||
const replay = yield* makeReplayState(cassetteService, name, httpInteractions)
|
||||
|
||||
const snapshotRequest = (request: HttpClientRequest.HttpClientRequest) =>
|
||||
Effect.gen(function* () {
|
||||
const web = yield* HttpClientRequest.toWeb(request).pipe(Effect.orDie)
|
||||
const raw = yield* Effect.promise(() => web.text())
|
||||
const body = options.redactBody
|
||||
? Option.match(decodeJson(raw), {
|
||||
onNone: () => raw,
|
||||
onSome: (parsed) => JSON.stringify(options.redactBody?.(parsed)),
|
||||
})
|
||||
: raw
|
||||
return {
|
||||
return redactor.request({
|
||||
method: web.method,
|
||||
url: redactUrl(web.url, options.redact?.query, options.redact?.url),
|
||||
headers: redactHeaders(
|
||||
Object.fromEntries(web.headers.entries()),
|
||||
requestHeadersAllow,
|
||||
options.redact?.headers,
|
||||
),
|
||||
body,
|
||||
}
|
||||
})
|
||||
|
||||
const selectInteraction = (cassette: Cassette, incoming: HttpInteraction["request"]) =>
|
||||
Effect.gen(function* () {
|
||||
const interactions = httpInteractions(cassette)
|
||||
if (sequential) {
|
||||
const index = yield* Ref.get(cursor)
|
||||
const interaction = interactions[index]
|
||||
if (!interaction)
|
||||
return { interaction, detail: `interaction ${index + 1} of ${interactions.length} not recorded` }
|
||||
if (!match(incoming, interaction.request)) {
|
||||
return { interaction: undefined, detail: requestDiff(interaction.request, incoming).join("\n") }
|
||||
}
|
||||
yield* Ref.update(cursor, (n) => n + 1)
|
||||
return { interaction, detail: "" }
|
||||
}
|
||||
const interaction = interactions.find((candidate) => match(incoming, candidate.request))
|
||||
return { interaction, detail: interaction ? "" : mismatchDetail(cassette, incoming) }
|
||||
})
|
||||
|
||||
const loadReplay = (request: HttpClientRequest.HttpClientRequest) =>
|
||||
Effect.gen(function* () {
|
||||
const cached = yield* Ref.get(replay)
|
||||
if (cached) return cached
|
||||
const cassette = yield* cassetteService.read(name).pipe(Effect.mapError(() => fixtureMissing(request, name)))
|
||||
yield* Ref.set(replay, cassette)
|
||||
return cassette
|
||||
url: web.url,
|
||||
headers: Object.fromEntries(web.headers.entries()),
|
||||
body: yield* Effect.promise(() => web.text()),
|
||||
})
|
||||
})
|
||||
|
||||
return HttpClient.make((request) => {
|
||||
@@ -169,18 +82,21 @@ export const recordingLayer = (
|
||||
|
||||
if (mode === "record") {
|
||||
return Effect.gen(function* () {
|
||||
const currentRequest = yield* snapshotRequest(request)
|
||||
const incoming = yield* snapshotRequest(request)
|
||||
const response = yield* upstream.execute(request)
|
||||
const headers = responseHeaders(response, responseHeadersAllow, options.redact?.headers)
|
||||
const captured = yield* captureResponseBody(response, headers["content-type"])
|
||||
const captured = yield* captureResponseBody(response, response.headers["content-type"])
|
||||
const interaction: HttpInteraction = {
|
||||
transport: "http",
|
||||
request: currentRequest,
|
||||
response: { status: response.status, headers, ...captured },
|
||||
request: incoming,
|
||||
response: redactor.response({
|
||||
status: response.status,
|
||||
headers: response.headers as Record<string, string>,
|
||||
...captured,
|
||||
}),
|
||||
}
|
||||
const result = yield* cassetteService.append(name, interaction, options.metadata).pipe(Effect.orDie)
|
||||
const findings = result.findings
|
||||
if (findings.length > 0) return yield* unsafeCassette(request, name, findings)
|
||||
yield* appendOrFail(cassetteService, name, interaction, options.metadata).pipe(
|
||||
Effect.catchTag("UnsafeCassetteError", (error) => Effect.fail(transportError(request, error.message))),
|
||||
)
|
||||
return HttpClientResponse.fromWeb(
|
||||
request,
|
||||
new Response(decodeResponseBody(interaction.response), interaction.response),
|
||||
@@ -189,14 +105,21 @@ export const recordingLayer = (
|
||||
}
|
||||
|
||||
return Effect.gen(function* () {
|
||||
const cassette = yield* loadReplay(request)
|
||||
const incoming = yield* snapshotRequest(request)
|
||||
const { interaction, detail } = yield* selectInteraction(cassette, incoming)
|
||||
if (!interaction) return yield* fixtureMismatch(request, name, detail)
|
||||
|
||||
const interactions = yield* replay.load.pipe(
|
||||
Effect.mapError(() => transportError(request, `Fixture "${name}" not found.`)),
|
||||
)
|
||||
const result = sequential
|
||||
? selectSequential(interactions, incoming, match, yield* replay.cursor)
|
||||
: selectMatch(interactions, incoming, match)
|
||||
if (!result.interaction)
|
||||
return yield* Effect.fail(
|
||||
transportError(request, `Fixture "${name}" does not match the current request: ${result.detail}.`),
|
||||
)
|
||||
if (sequential) yield* replay.advance
|
||||
return HttpClientResponse.fromWeb(
|
||||
request,
|
||||
new Response(decodeResponseBody(interaction.response), interaction.response),
|
||||
new Response(decodeResponseBody(result.interaction.response), result.interaction.response),
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,10 +1,18 @@
|
||||
export * from "./schema"
|
||||
export * from "./redaction"
|
||||
export * from "./matching"
|
||||
export * from "./diff"
|
||||
export * from "./storage"
|
||||
export * from "./websocket"
|
||||
export * from "./effect"
|
||||
export type { CassetteMetadata, HttpInteraction, Interaction, RequestSnapshot, ResponseSnapshot, WebSocketFrame, WebSocketInteraction } from "./schema"
|
||||
export { hasCassetteSync } from "./storage"
|
||||
export { defaultMatcher, type RequestMatcher } from "./matching"
|
||||
export { cassetteSecretFindings, redactHeaders, redactUrl, type SecretFinding } from "./redaction"
|
||||
export { UnsafeCassetteError } from "./recorder"
|
||||
export { cassetteLayer, recordingLayer, type RecordReplayMode, type RecordReplayOptions } from "./effect"
|
||||
export {
|
||||
makeWebSocketExecutor,
|
||||
type WebSocketConnection,
|
||||
type WebSocketExecutor,
|
||||
type WebSocketRecordReplayOptions,
|
||||
type WebSocketRequest,
|
||||
} from "./websocket"
|
||||
|
||||
export * as Cassette from "./cassette"
|
||||
export * as Redactor from "./redactor"
|
||||
|
||||
export * as HttpRecorder from "."
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { Option, Schema } from "effect"
|
||||
import type { RequestSnapshot } from "./schema"
|
||||
import { REDACTED, secretFindings } from "./redaction"
|
||||
import type { HttpInteraction, RequestSnapshot } from "./schema"
|
||||
|
||||
const JsonValue = Schema.fromJsonString(Schema.Unknown)
|
||||
export const decodeJson = Schema.decodeUnknownOption(JsonValue)
|
||||
@@ -34,3 +35,90 @@ export const canonicalSnapshot = (snapshot: RequestSnapshot): string =>
|
||||
|
||||
export const defaultMatcher: RequestMatcher = (incoming, recorded) =>
|
||||
canonicalSnapshot(incoming) === canonicalSnapshot(recorded)
|
||||
|
||||
const safeText = (value: unknown) => {
|
||||
if (value === undefined) return "undefined"
|
||||
if (secretFindings(value).length > 0) return JSON.stringify(REDACTED)
|
||||
const text = JSON.stringify(value)
|
||||
if (!text) return String(value)
|
||||
return text.length > 300 ? `${text.slice(0, 300)}...` : text
|
||||
}
|
||||
|
||||
const jsonBody = (body: string) => Option.getOrUndefined(decodeJson(body))
|
||||
|
||||
const valueDiffs = (expected: unknown, received: unknown, base = "$", limit = 8): ReadonlyArray<string> => {
|
||||
if (Object.is(expected, received)) return []
|
||||
if (isRecord(expected) && isRecord(received)) {
|
||||
return [...new Set([...Object.keys(expected), ...Object.keys(received)])]
|
||||
.toSorted()
|
||||
.flatMap((key) => valueDiffs(expected[key], received[key], `${base}.${key}`, limit))
|
||||
.slice(0, limit)
|
||||
}
|
||||
if (Array.isArray(expected) && Array.isArray(received)) {
|
||||
return Array.from({ length: Math.max(expected.length, received.length) }, (_, index) => index)
|
||||
.flatMap((index) => valueDiffs(expected[index], received[index], `${base}[${index}]`, limit))
|
||||
.slice(0, limit)
|
||||
}
|
||||
return [`${base} expected ${safeText(expected)}, received ${safeText(received)}`]
|
||||
}
|
||||
|
||||
const headerDiffs = (expected: Record<string, string>, received: Record<string, string>) =>
|
||||
[...new Set([...Object.keys(expected), ...Object.keys(received)])].toSorted().flatMap((key) => {
|
||||
if (expected[key] === received[key]) return []
|
||||
if (expected[key] === undefined) return [` ${key} unexpected ${safeText(received[key])}`]
|
||||
if (received[key] === undefined) return [` ${key} missing expected ${safeText(expected[key])}`]
|
||||
return [` ${key} expected ${safeText(expected[key])}, received ${safeText(received[key])}`]
|
||||
})
|
||||
|
||||
export const requestDiff = (expected: RequestSnapshot, received: RequestSnapshot): ReadonlyArray<string> => {
|
||||
const lines: string[] = []
|
||||
if (expected.method !== received.method) {
|
||||
lines.push("method:", ` expected ${expected.method}, received ${received.method}`)
|
||||
}
|
||||
if (expected.url !== received.url) {
|
||||
lines.push("url:", ` expected ${expected.url}`, ` received ${received.url}`)
|
||||
}
|
||||
const headers = headerDiffs(expected.headers, received.headers)
|
||||
if (headers.length > 0) lines.push("headers:", ...headers.slice(0, 8))
|
||||
const expectedBody = jsonBody(expected.body)
|
||||
const receivedBody = jsonBody(received.body)
|
||||
const body =
|
||||
expectedBody !== undefined && receivedBody !== undefined
|
||||
? valueDiffs(expectedBody, receivedBody).map((line) => ` ${line}`)
|
||||
: expected.body === received.body
|
||||
? []
|
||||
: [` expected ${safeText(expected.body)}, received ${safeText(received.body)}`]
|
||||
if (body.length > 0) lines.push("body:", ...body)
|
||||
return lines
|
||||
}
|
||||
|
||||
export const mismatchDetail = (interactions: ReadonlyArray<HttpInteraction>, incoming: RequestSnapshot): string => {
|
||||
if (interactions.length === 0) return "cassette has no recorded HTTP interactions"
|
||||
const ranked = interactions
|
||||
.map((interaction, index) => ({ index, lines: requestDiff(interaction.request, incoming) }))
|
||||
.toSorted((a, b) => a.lines.length - b.lines.length || a.index - b.index)
|
||||
const best = ranked[0]
|
||||
return ["no recorded interaction matched", `closest interaction: #${best.index + 1}`, ...best.lines].join("\n")
|
||||
}
|
||||
|
||||
export const selectMatch = (
|
||||
interactions: ReadonlyArray<HttpInteraction>,
|
||||
incoming: RequestSnapshot,
|
||||
match: RequestMatcher,
|
||||
): { readonly interaction: HttpInteraction | undefined; readonly detail: string } => {
|
||||
const interaction = interactions.find((candidate) => match(incoming, candidate.request))
|
||||
return { interaction, detail: interaction ? "" : mismatchDetail(interactions, incoming) }
|
||||
}
|
||||
|
||||
export const selectSequential = (
|
||||
interactions: ReadonlyArray<HttpInteraction>,
|
||||
incoming: RequestSnapshot,
|
||||
match: RequestMatcher,
|
||||
index: number,
|
||||
): { readonly interaction: HttpInteraction | undefined; readonly detail: string } => {
|
||||
const interaction = interactions[index]
|
||||
if (!interaction) return { interaction, detail: `interaction ${index + 1} of ${interactions.length} not recorded` }
|
||||
if (!match(incoming, interaction.request))
|
||||
return { interaction: undefined, detail: requestDiff(interaction.request, incoming).join("\n") }
|
||||
return { interaction, detail: "" }
|
||||
}
|
||||
|
||||
59
packages/http-recorder/src/recorder.ts
Normal file
59
packages/http-recorder/src/recorder.ts
Normal file
@@ -0,0 +1,59 @@
|
||||
import { Effect, PlatformError, Ref, Scope } from "effect"
|
||||
import type * as CassetteService from "./cassette"
|
||||
import type { SecretFinding } from "./redaction"
|
||||
import type { Cassette, CassetteMetadata, Interaction } from "./schema"
|
||||
|
||||
export class UnsafeCassetteError extends Error {
|
||||
readonly _tag = "UnsafeCassetteError"
|
||||
constructor(
|
||||
readonly cassetteName: string,
|
||||
readonly findings: ReadonlyArray<SecretFinding>,
|
||||
) {
|
||||
super(
|
||||
`Refusing to write cassette "${cassetteName}" because it contains possible secrets: ${findings
|
||||
.map((finding) => `${finding.path} (${finding.reason})`)
|
||||
.join(", ")}`,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
export const appendOrFail = (
|
||||
cassette: CassetteService.Interface,
|
||||
name: string,
|
||||
interaction: Interaction,
|
||||
metadata: CassetteMetadata | undefined,
|
||||
): Effect.Effect<Cassette, UnsafeCassetteError> =>
|
||||
cassette.append(name, interaction, metadata).pipe(
|
||||
Effect.orDie,
|
||||
Effect.flatMap(({ cassette: result, findings }) =>
|
||||
findings.length === 0 ? Effect.succeed(result) : Effect.fail(new UnsafeCassetteError(name, findings)),
|
||||
),
|
||||
)
|
||||
|
||||
export interface ReplayState<T> {
|
||||
readonly load: Effect.Effect<ReadonlyArray<T>, PlatformError.PlatformError>
|
||||
readonly cursor: Effect.Effect<number>
|
||||
readonly advance: Effect.Effect<void>
|
||||
}
|
||||
|
||||
export const makeReplayState = <T>(
|
||||
cassette: CassetteService.Interface,
|
||||
name: string,
|
||||
project: (cassette: Cassette) => ReadonlyArray<T>,
|
||||
): Effect.Effect<ReplayState<T>, never, Scope.Scope> =>
|
||||
Effect.gen(function* () {
|
||||
const load = yield* Effect.cached(cassette.read(name).pipe(Effect.map(project)))
|
||||
const position = yield* Ref.make(0)
|
||||
|
||||
yield* Effect.addFinalizer(() =>
|
||||
Effect.gen(function* () {
|
||||
const used = yield* Ref.get(position)
|
||||
if (used === 0) return
|
||||
const interactions = yield* load.pipe(Effect.orDie)
|
||||
if (used < interactions.length)
|
||||
yield* Effect.die(new Error(`Unused recorded interactions in ${name}: used ${used} of ${interactions.length}`))
|
||||
}),
|
||||
)
|
||||
|
||||
return { load, cursor: Ref.get(position), advance: Ref.update(position, (n) => n + 1) }
|
||||
})
|
||||
76
packages/http-recorder/src/redactor.ts
Normal file
76
packages/http-recorder/src/redactor.ts
Normal file
@@ -0,0 +1,76 @@
|
||||
import { Option } from "effect"
|
||||
import { decodeJson } from "./matching"
|
||||
import { redactHeaders, redactUrl } from "./redaction"
|
||||
import type { RequestSnapshot, ResponseSnapshot } from "./schema"
|
||||
|
||||
export const DEFAULT_REQUEST_HEADERS: ReadonlyArray<string> = ["content-type", "accept", "openai-beta"]
|
||||
export const DEFAULT_RESPONSE_HEADERS: ReadonlyArray<string> = ["content-type"]
|
||||
|
||||
const identity = <T>(value: T) => value
|
||||
|
||||
export interface Redactor {
|
||||
readonly request: (snapshot: RequestSnapshot) => RequestSnapshot
|
||||
readonly response: (snapshot: ResponseSnapshot) => ResponseSnapshot
|
||||
}
|
||||
|
||||
export const compose = (...redactors: ReadonlyArray<Partial<Redactor>>): Redactor => {
|
||||
const requests = redactors.map((r) => r.request).filter((fn): fn is Redactor["request"] => fn !== undefined)
|
||||
const responses = redactors.map((r) => r.response).filter((fn): fn is Redactor["response"] => fn !== undefined)
|
||||
return {
|
||||
request: requests.length === 0 ? identity : (snapshot) => requests.reduce((acc, fn) => fn(acc), snapshot),
|
||||
response: responses.length === 0 ? identity : (snapshot) => responses.reduce((acc, fn) => fn(acc), snapshot),
|
||||
}
|
||||
}
|
||||
|
||||
export interface HeaderOptions {
|
||||
readonly allow?: ReadonlyArray<string>
|
||||
readonly redact?: ReadonlyArray<string>
|
||||
}
|
||||
|
||||
export const requestHeaders = (options: HeaderOptions = {}): Partial<Redactor> => ({
|
||||
request: (snapshot) => ({
|
||||
...snapshot,
|
||||
headers: redactHeaders(snapshot.headers, options.allow ?? DEFAULT_REQUEST_HEADERS, options.redact),
|
||||
}),
|
||||
})
|
||||
|
||||
export const responseHeaders = (options: HeaderOptions = {}): Partial<Redactor> => ({
|
||||
response: (snapshot) => ({
|
||||
...snapshot,
|
||||
headers: redactHeaders(snapshot.headers, options.allow ?? DEFAULT_RESPONSE_HEADERS, options.redact),
|
||||
}),
|
||||
})
|
||||
|
||||
export interface UrlOptions {
|
||||
readonly query?: ReadonlyArray<string>
|
||||
readonly transform?: (url: string) => string
|
||||
}
|
||||
|
||||
export const url = (options: UrlOptions = {}): Partial<Redactor> => ({
|
||||
request: (snapshot) => ({ ...snapshot, url: redactUrl(snapshot.url, options.query, options.transform) }),
|
||||
})
|
||||
|
||||
export const body = (transform: (parsed: unknown) => unknown): Partial<Redactor> => ({
|
||||
request: (snapshot) => ({
|
||||
...snapshot,
|
||||
body: Option.match(decodeJson(snapshot.body), {
|
||||
onNone: () => snapshot.body,
|
||||
onSome: (parsed) => JSON.stringify(transform(parsed)),
|
||||
}),
|
||||
}),
|
||||
})
|
||||
|
||||
export interface DefaultRedactorOverrides {
|
||||
readonly requestHeaders?: HeaderOptions
|
||||
readonly responseHeaders?: HeaderOptions
|
||||
readonly url?: UrlOptions
|
||||
readonly body?: (parsed: unknown) => unknown
|
||||
}
|
||||
|
||||
export const defaults = (overrides: DefaultRedactorOverrides = {}): Redactor =>
|
||||
compose(
|
||||
requestHeaders(overrides.requestHeaders),
|
||||
responseHeaders(overrides.responseHeaders),
|
||||
url(overrides.url),
|
||||
...(overrides.body ? [body(overrides.body)] : []),
|
||||
)
|
||||
@@ -7,19 +7,13 @@ export const DEFAULT_RECORDINGS_DIR = path.resolve(process.cwd(), "test", "fixtu
|
||||
|
||||
export const cassettePath = (name: string, directory = DEFAULT_RECORDINGS_DIR) => path.join(directory, `${name}.json`)
|
||||
|
||||
export const metadataFor = (name: string, metadata: CassetteMetadata | undefined): CassetteMetadata => ({
|
||||
name,
|
||||
recordedAt: new Date().toISOString(),
|
||||
...(metadata ?? {}),
|
||||
})
|
||||
|
||||
export const cassetteFor = (
|
||||
name: string,
|
||||
interactions: ReadonlyArray<Interaction>,
|
||||
metadata: CassetteMetadata | undefined,
|
||||
): Cassette => ({
|
||||
version: 1,
|
||||
metadata: metadataFor(name, metadata),
|
||||
metadata: { name, recordedAt: new Date().toISOString(), ...(metadata ?? {}) },
|
||||
interactions,
|
||||
})
|
||||
|
||||
|
||||
@@ -2,10 +2,9 @@ import { Effect, Option, Ref, Scope, Stream } from "effect"
|
||||
import type { Headers } from "effect/unstable/http"
|
||||
import * as CassetteService from "./cassette"
|
||||
import { canonicalizeJson, decodeJson } from "./matching"
|
||||
import { redactHeaders, redactUrl, type SecretFinding } from "./redaction"
|
||||
import { webSocketInteractions, type CassetteMetadata, type WebSocketFrame, type WebSocketInteraction } from "./schema"
|
||||
|
||||
export const DEFAULT_WEBSOCKET_REQUEST_HEADERS: ReadonlyArray<string> = ["content-type", "accept", "openai-beta"]
|
||||
import { appendOrFail, makeReplayState } from "./recorder"
|
||||
import { defaults, type Redactor } from "./redactor"
|
||||
import { webSocketInteractions, type CassetteMetadata, type WebSocketFrame } from "./schema"
|
||||
|
||||
export interface WebSocketRequest {
|
||||
readonly url: string
|
||||
@@ -28,63 +27,32 @@ export interface WebSocketRecordReplayOptions<E> {
|
||||
readonly metadata?: CassetteMetadata
|
||||
readonly cassette: CassetteService.Interface
|
||||
readonly live: WebSocketExecutor<E>
|
||||
readonly redact?: {
|
||||
readonly headers?: ReadonlyArray<string>
|
||||
readonly query?: ReadonlyArray<string>
|
||||
readonly url?: (url: string) => string
|
||||
}
|
||||
readonly requestHeaders?: ReadonlyArray<string>
|
||||
readonly redactor?: Redactor
|
||||
readonly compareClientMessagesAsJson?: boolean
|
||||
}
|
||||
|
||||
const headersRecord = (headers: Headers.Headers) =>
|
||||
const headersRecord = (headers: Headers.Headers): Record<string, string> =>
|
||||
Object.fromEntries(
|
||||
Object.entries(headers as Record<string, unknown>)
|
||||
.filter((entry): entry is [string, string] => typeof entry[1] === "string")
|
||||
.toSorted(([a], [b]) => a.localeCompare(b)),
|
||||
Object.entries(headers as Record<string, unknown>).filter(
|
||||
(entry): entry is [string, string] => typeof entry[1] === "string",
|
||||
),
|
||||
)
|
||||
|
||||
const openSnapshot = (
|
||||
request: WebSocketRequest,
|
||||
options: Pick<WebSocketRecordReplayOptions<never>, "redact" | "requestHeaders"> = {},
|
||||
) => ({
|
||||
url: redactUrl(request.url, options.redact?.query, options.redact?.url),
|
||||
headers: redactHeaders(
|
||||
headersRecord(request.headers),
|
||||
options.requestHeaders ?? DEFAULT_WEBSOCKET_REQUEST_HEADERS,
|
||||
options.redact?.headers,
|
||||
),
|
||||
})
|
||||
|
||||
const textFrame = (body: string): WebSocketFrame => ({ kind: "text", body })
|
||||
|
||||
const frameText = (frame: WebSocketFrame) => {
|
||||
if (frame.kind === "text") return frame.body
|
||||
return new TextDecoder().decode(Buffer.from(frame.body, "base64"))
|
||||
}
|
||||
|
||||
const frameMessage = (frame: WebSocketFrame) =>
|
||||
frame.kind === "text" ? frame.body : new Uint8Array(Buffer.from(frame.body, "base64"))
|
||||
|
||||
const receivedFrame = (message: string | Uint8Array): WebSocketFrame =>
|
||||
const encodeFrame = (message: string | Uint8Array): WebSocketFrame =>
|
||||
typeof message === "string"
|
||||
? textFrame(message)
|
||||
? { kind: "text", body: message }
|
||||
: { kind: "binary", body: Buffer.from(message).toString("base64"), bodyEncoding: "base64" }
|
||||
|
||||
const unsafeCassette = (name: string, findings: ReadonlyArray<SecretFinding>) =>
|
||||
new Error(
|
||||
`Refusing to write WebSocket cassette "${name}" because it contains possible secrets: ${findings
|
||||
.map((item) => `${item.path} (${item.reason})`)
|
||||
.join(", ")}`,
|
||||
)
|
||||
const decodeFrameMessage = (frame: WebSocketFrame): string | Uint8Array =>
|
||||
frame.kind === "text" ? frame.body : new Uint8Array(Buffer.from(frame.body, "base64"))
|
||||
|
||||
const mismatch = (message: string, actual: unknown, expected: unknown) =>
|
||||
new Error(`${message}: expected ${JSON.stringify(expected)}, received ${JSON.stringify(actual)}`)
|
||||
const decodeFrameText = (frame: WebSocketFrame) =>
|
||||
frame.kind === "text" ? frame.body : new TextDecoder().decode(Buffer.from(frame.body, "base64"))
|
||||
|
||||
const assertEqual = (message: string, actual: unknown, expected: unknown) =>
|
||||
Effect.sync(() => {
|
||||
if (JSON.stringify(actual) === JSON.stringify(expected)) return
|
||||
throw mismatch(message, actual, expected)
|
||||
throw new Error(`${message}: expected ${JSON.stringify(expected)}, received ${JSON.stringify(actual)}`)
|
||||
})
|
||||
|
||||
const jsonOrText = (value: string) => Option.match(decodeJson(value), { onNone: () => value, onSome: canonicalizeJson })
|
||||
@@ -94,7 +62,7 @@ const compareClientMessage = (actual: string, expected: WebSocketFrame | undefin
|
||||
return Effect.sync(() => {
|
||||
throw new Error(`Unexpected WebSocket client frame ${index + 1}: ${actual}`)
|
||||
})
|
||||
const expectedText = frameText(expected)
|
||||
const expectedText = decodeFrameText(expected)
|
||||
if (!asJson) return assertEqual(`WebSocket client frame ${index + 1}`, actual, expectedText)
|
||||
return assertEqual(`WebSocket client JSON frame ${index + 1}`, jsonOrText(actual), jsonOrText(expectedText))
|
||||
}
|
||||
@@ -104,6 +72,16 @@ export const makeWebSocketExecutor = <E>(
|
||||
): Effect.Effect<WebSocketExecutor<E>, never, Scope.Scope> =>
|
||||
Effect.gen(function* () {
|
||||
const mode = options.mode ?? "replay"
|
||||
const redactor = options.redactor ?? defaults()
|
||||
const openSnapshot = (request: WebSocketRequest) => {
|
||||
const redacted = redactor.request({
|
||||
method: "GET",
|
||||
url: request.url,
|
||||
headers: headersRecord(request.headers),
|
||||
body: "",
|
||||
})
|
||||
return { url: redacted.url, headers: redacted.headers }
|
||||
}
|
||||
|
||||
if (mode === "passthrough") return options.live
|
||||
|
||||
@@ -118,21 +96,21 @@ export const makeWebSocketExecutor = <E>(
|
||||
const closeOnce = Effect.gen(function* () {
|
||||
if (yield* Ref.getAndSet(closed, true)) return
|
||||
yield* connection.close
|
||||
const result = yield* options.cassette
|
||||
.append(
|
||||
options.name,
|
||||
{ transport: "websocket", open: openSnapshot(request, options), client, server },
|
||||
options.metadata,
|
||||
)
|
||||
.pipe(Effect.orDie)
|
||||
if (result.findings.length > 0) yield* Effect.die(unsafeCassette(options.name, result.findings))
|
||||
yield* appendOrFail(
|
||||
options.cassette,
|
||||
options.name,
|
||||
{ transport: "websocket", open: openSnapshot(request), client, server },
|
||||
options.metadata,
|
||||
).pipe(Effect.orDie)
|
||||
})
|
||||
return {
|
||||
sendText: (message: string) =>
|
||||
connection.sendText(message).pipe(Effect.tap(() => Effect.sync(() => client.push(textFrame(message))))),
|
||||
sendText: (message) =>
|
||||
connection
|
||||
.sendText(message)
|
||||
.pipe(Effect.tap(() => Effect.sync(() => client.push(encodeFrame(message))))),
|
||||
messages: connection.messages.pipe(
|
||||
Stream.map((message) => {
|
||||
server.push(receivedFrame(message))
|
||||
server.push(encodeFrame(message))
|
||||
return message
|
||||
}),
|
||||
),
|
||||
@@ -142,44 +120,20 @@ export const makeWebSocketExecutor = <E>(
|
||||
}
|
||||
}
|
||||
|
||||
const replay = yield* Ref.make<{ readonly interactions: ReadonlyArray<WebSocketInteraction> } | undefined>(
|
||||
undefined,
|
||||
)
|
||||
const cursor = yield* Ref.make(0)
|
||||
|
||||
yield* Effect.addFinalizer(() =>
|
||||
Effect.gen(function* () {
|
||||
const input = yield* Ref.get(replay)
|
||||
if (!input) return
|
||||
yield* assertEqual(
|
||||
`Unused recorded WebSocket interactions in ${options.name}`,
|
||||
yield* Ref.get(cursor),
|
||||
input.interactions.length,
|
||||
)
|
||||
}),
|
||||
)
|
||||
|
||||
const loadReplay = Effect.fn("WebSocketRecorder.loadReplay")(function* () {
|
||||
const cached = yield* Ref.get(replay)
|
||||
if (cached) return cached
|
||||
const input = {
|
||||
interactions: webSocketInteractions(yield* options.cassette.read(options.name).pipe(Effect.orDie)),
|
||||
}
|
||||
yield* Ref.set(replay, input)
|
||||
return input
|
||||
})
|
||||
const replay = yield* makeReplayState(options.cassette, options.name, webSocketInteractions)
|
||||
|
||||
return {
|
||||
open: (request) => {
|
||||
return Effect.gen(function* () {
|
||||
const input = yield* loadReplay()
|
||||
const index = yield* Ref.getAndUpdate(cursor, (value) => value + 1)
|
||||
const interaction = input.interactions[index]
|
||||
open: (request) =>
|
||||
Effect.gen(function* () {
|
||||
const interactions = yield* replay.load.pipe(Effect.orDie)
|
||||
const index = yield* replay.cursor
|
||||
const interaction = interactions[index]
|
||||
if (!interaction) return yield* Effect.die(new Error(`No recorded WebSocket interaction for ${request.url}`))
|
||||
yield* assertEqual(`WebSocket open frame ${index + 1}`, openSnapshot(request, options), interaction.open)
|
||||
yield* replay.advance
|
||||
yield* assertEqual(`WebSocket open frame ${index + 1}`, openSnapshot(request), interaction.open)
|
||||
const messageIndex = yield* Ref.make(0)
|
||||
return {
|
||||
sendText: (message: string) =>
|
||||
sendText: (message) =>
|
||||
Effect.gen(function* () {
|
||||
const current = yield* Ref.getAndUpdate(messageIndex, (value) => value + 1)
|
||||
yield* compareClientMessage(
|
||||
@@ -189,7 +143,7 @@ export const makeWebSocketExecutor = <E>(
|
||||
options.compareClientMessagesAsJson === true,
|
||||
)
|
||||
}),
|
||||
messages: Stream.fromIterable(interaction.server).pipe(Stream.map(frameMessage)),
|
||||
messages: Stream.fromIterable(interaction.server).pipe(Stream.map(decodeFrameMessage)),
|
||||
close: Effect.gen(function* () {
|
||||
yield* assertEqual(
|
||||
`WebSocket client frame count for interaction ${index + 1}`,
|
||||
@@ -198,7 +152,6 @@ export const makeWebSocketExecutor = <E>(
|
||||
)
|
||||
}),
|
||||
}
|
||||
})
|
||||
},
|
||||
}),
|
||||
}
|
||||
})
|
||||
|
||||
@@ -6,7 +6,8 @@ import * as fs from "node:fs"
|
||||
import * as os from "node:os"
|
||||
import * as path from "node:path"
|
||||
import { HttpRecorder } from "../src"
|
||||
import { redactedErrorRequest } from "../src/diff"
|
||||
import { redactedErrorRequest } from "../src/effect"
|
||||
import { cassetteFor, formatCassette, parseCassette } from "../src/storage"
|
||||
|
||||
const post = (url: string, body: object) =>
|
||||
Effect.gen(function* () {
|
||||
@@ -145,7 +146,7 @@ describe("http-recorder", () => {
|
||||
})
|
||||
|
||||
test("formats websocket cassettes with shared metadata", () => {
|
||||
const cassette = HttpRecorder.cassetteFor(
|
||||
const cassette = cassetteFor(
|
||||
"websocket/basic",
|
||||
[
|
||||
{
|
||||
@@ -159,7 +160,7 @@ describe("http-recorder", () => {
|
||||
)
|
||||
|
||||
expect(cassette.metadata).toMatchObject({ name: "websocket/basic", provider: "openai" })
|
||||
expect(HttpRecorder.parseCassette(HttpRecorder.formatCassette(cassette))).toEqual(cassette)
|
||||
expect(parseCassette(formatCassette(cassette))).toEqual(cassette)
|
||||
})
|
||||
|
||||
test("replays websocket interactions from the shared cassette service", async () => {
|
||||
@@ -168,7 +169,7 @@ describe("http-recorder", () => {
|
||||
const cassette = yield* HttpRecorder.Cassette.Service
|
||||
yield* cassette.write(
|
||||
"websocket/replay",
|
||||
HttpRecorder.cassetteFor(
|
||||
cassetteFor(
|
||||
"websocket/replay",
|
||||
[
|
||||
{
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { Redactor } from "@opencode-ai/http-recorder"
|
||||
import { describe, expect } from "bun:test"
|
||||
import { Effect } from "effect"
|
||||
import { LLM, LLMError } from "../../src"
|
||||
@@ -30,7 +31,7 @@ const recorded = recordedTests({
|
||||
provider: "anthropic",
|
||||
protocol: "anthropic-messages",
|
||||
requires: ["ANTHROPIC_API_KEY"],
|
||||
options: { requestHeaders: ["content-type", "anthropic-version"] },
|
||||
options: { redactor: Redactor.defaults({ requestHeaders: { allow: ["content-type", "anthropic-version"] } }) },
|
||||
})
|
||||
|
||||
describe("Anthropic Messages sad-path recorded", () => {
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { Redactor } from "@opencode-ai/http-recorder"
|
||||
import * as AnthropicMessages from "../../src/protocols/anthropic-messages"
|
||||
import * as Gemini from "../../src/protocols/gemini"
|
||||
import * as OpenAIChat from "../../src/protocols/openai-chat"
|
||||
@@ -66,7 +67,7 @@ const redactCloudflareURL = (url: string) =>
|
||||
.replace(/\/v1\/[^/]+\/[^/]+\/compat\//, "/v1/{account}/{gateway}/compat/")
|
||||
|
||||
const cloudflareOptions = {
|
||||
redact: { url: redactCloudflareURL },
|
||||
redactor: Redactor.defaults({ url: { transform: redactCloudflareURL } }),
|
||||
}
|
||||
|
||||
describeRecordedGoldenScenarios([
|
||||
@@ -102,7 +103,7 @@ describeRecordedGoldenScenarios([
|
||||
prefix: "anthropic-messages",
|
||||
model: anthropicHaiku,
|
||||
requires: ["ANTHROPIC_API_KEY"],
|
||||
options: { requestHeaders: ["content-type", "anthropic-version"] },
|
||||
options: { redactor: Redactor.defaults({ requestHeaders: { allow: ["content-type", "anthropic-version"] } }) },
|
||||
scenarios: ["text", "tool-call"],
|
||||
},
|
||||
{
|
||||
@@ -111,7 +112,7 @@ describeRecordedGoldenScenarios([
|
||||
model: anthropicOpus,
|
||||
requires: ["ANTHROPIC_API_KEY"],
|
||||
tags: ["flagship"],
|
||||
options: { requestHeaders: ["content-type", "anthropic-version"] },
|
||||
options: { redactor: Redactor.defaults({ requestHeaders: { allow: ["content-type", "anthropic-version"] } }) },
|
||||
scenarios: [{ id: "tool-loop", temperature: false }],
|
||||
},
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user