mirror of
https://github.com/browseros-ai/BrowserOS.git
synced 2026-05-13 15:46:22 +00:00
feat(server): cache klavis createStrata to unblock /chat hot path (#654)
* feat(server): cache klavis createStrata to unblock /chat hot path
Conversation creation in /chat was blocking on a Worker-proxied
klavisClient.createStrata round-trip every time the user had any
managed Klavis app connected. The 5s KLAVIS_TIMEOUT_MS in the
ai-worker proxy existed specifically to bound this latency, but
the same cap also caused user-visible 504s on /klavis/servers/remove
since Strata DELETE operations routinely take >5s. Without caching
we couldn't raise the timeout without regressing chat creation.
This adds an in-process cache for Strata createStrata responses,
keyed by (browserosId, hashed sorted-server-set) and gated by a 1h
TTL. The cache stores only immutable JSON metadata (strataServerUrl,
strataId, addedServers); per-session MCP clients continue to be
opened and disposed by AiSdkAgent exactly as before, which keeps
the cache concurrency-safe by construction.
Cache invalidation has two layers: (a) the cache key embeds the
server set, so adding/removing apps naturally produces a different
key; (b) POST /klavis/servers/add and DELETE /klavis/servers/remove
explicitly call invalidate(browserosId) after their underlying
Klavis API call succeeds, as defense-in-depth.
Other changes:
- Consolidates klavis-related services into a new
apps/server/src/api/services/klavis/ directory; moves
register-klavis-mcp.ts -> strata-proxy.ts and adds strata-cache.ts
there. lib/clients/klavis/ stays unchanged.
- Refactors KlavisClient.removeServer into a low-level
deleteServersFromStrata(strataId, servers) primitive. The
cache-lookup + delete + invalidate orchestration moves up into
routes/klavis.ts where it belongs, eliminating the lib->api
layering inversion the original removeServer would have introduced.
- Uses Bun.hash (xxhash64) for fixed-width 16-hex-char keys, with
serverKey verified on read to make collision risk strictly zero.
- Dedupes concurrent fetches via in-flight Promise sharing, with
identity-checks before delete to avoid races between invalidate()
and a racing replacement insert.
Follow-up (separate PR): bump KLAVIS_TIMEOUT_MS to 30000 in
ai-worker/wrangler.toml so /klavis/servers/remove stops 504-ing.
* fix: address greptile review comments for klavis strata cache
- Drop dead `invalidated` field on InflightEntry. It was added to
support a "discard post-resolution if invalidated" check that I
later replaced with identity-checked deletes during self-review,
but I forgot to remove the field and the misleading comment
referencing it. Simplify Map<string, InflightEntry> to plain
Map<string, Promise<CacheEntry>>.
- Lower cache miss log from info to debug. Misses fire on every new
conversation; matching the existing debug-level for hits.
- Stop routing the /klavis/servers/remove handler through
klavisStrataCache.getOrFetch. The chat hot path keys its cache by
the user's full enabled-server set (e.g. hash('Gmail,Linear')),
so a single-server lookup here (hash('Gmail')) is guaranteed to
miss, write a spurious entry, and then have it immediately
cleared by invalidate() on the next line. Call createStrata
directly to recover the strataId, mirroring the original
removeServer flow.
This commit is contained in:
@@ -2,6 +2,7 @@ import { createMCPClient } from '@ai-sdk/mcp'
|
||||
import { TIMEOUTS } from '@browseros/shared/constants/timeouts'
|
||||
import type { BrowserContext } from '@browseros/shared/schemas/browser-context'
|
||||
import type { ToolSet } from 'ai'
|
||||
import { klavisStrataCache } from '../api/services/klavis/strata-cache'
|
||||
import type { KlavisClient } from '../lib/clients/klavis/klavis-client'
|
||||
import { logger } from '../lib/logger'
|
||||
import {
|
||||
@@ -40,7 +41,8 @@ export async function buildMcpServerSpecs(
|
||||
deps.browserContext?.enabledMcpServers?.length
|
||||
) {
|
||||
try {
|
||||
const result = await deps.klavisClient.createStrata(
|
||||
const result = await klavisStrataCache.getOrFetch(
|
||||
deps.klavisClient,
|
||||
deps.browserosId,
|
||||
deps.browserContext.enabledMcpServers,
|
||||
)
|
||||
|
||||
@@ -10,6 +10,7 @@ import { z } from 'zod'
|
||||
import { KlavisClient } from '../../lib/clients/klavis/klavis-client'
|
||||
import { OAUTH_MCP_SERVERS } from '../../lib/clients/klavis/oauth-mcp-servers'
|
||||
import { logger } from '../../lib/logger'
|
||||
import { klavisStrataCache } from '../services/klavis/strata-cache'
|
||||
|
||||
const ServerNameSchema = z.object({
|
||||
serverName: z.string().min(1),
|
||||
@@ -125,6 +126,7 @@ export function createKlavisRoutes(deps: KlavisRouteDeps) {
|
||||
logger.info('Adding server to strata', { serverName })
|
||||
|
||||
const result = await klavisClient.createStrata(browserosId, [serverName])
|
||||
klavisStrataCache.invalidate(browserosId)
|
||||
|
||||
return c.json({
|
||||
success: true,
|
||||
@@ -184,7 +186,17 @@ export function createKlavisRoutes(deps: KlavisRouteDeps) {
|
||||
|
||||
logger.info('Removing server from strata', { serverName })
|
||||
|
||||
await klavisClient.removeServer(browserosId, serverName)
|
||||
// The chat hot path keys its cache by the user's full enabled set,
|
||||
// so a single-server lookup here would always miss and immediately
|
||||
// be cleared by invalidate() below — call createStrata directly
|
||||
// to recover the strataId, mirroring the original removeServer flow.
|
||||
const strata = await klavisClient.createStrata(browserosId, [
|
||||
serverName,
|
||||
])
|
||||
await klavisClient.deleteServersFromStrata(strata.strataId, [
|
||||
serverName,
|
||||
])
|
||||
klavisStrataCache.invalidate(browserosId)
|
||||
|
||||
return c.json({
|
||||
success: true,
|
||||
|
||||
@@ -11,8 +11,8 @@ import { logger } from '../../lib/logger'
|
||||
import { metrics } from '../../lib/metrics'
|
||||
import { Sentry } from '../../lib/sentry'
|
||||
import type { ToolRegistry } from '../../tools/tool-registry'
|
||||
import type { KlavisProxyHandle } from '../services/klavis/strata-proxy'
|
||||
import { createMcpServer } from '../services/mcp/mcp-server'
|
||||
import type { KlavisProxyHandle } from '../services/mcp/register-klavis-mcp'
|
||||
import type { Env } from '../types'
|
||||
|
||||
interface McpRouteDeps {
|
||||
|
||||
@@ -38,7 +38,7 @@ import { createStatusRoute } from './routes/status'
|
||||
import {
|
||||
connectKlavisProxy,
|
||||
type KlavisProxyHandle,
|
||||
} from './services/mcp/register-klavis-mcp'
|
||||
} from './services/klavis/strata-proxy'
|
||||
import type { Env, HttpServerConfig } from './types'
|
||||
import { defaultCorsConfig } from './utils/cors'
|
||||
|
||||
|
||||
@@ -0,0 +1,145 @@
|
||||
/**
|
||||
* @license
|
||||
* Copyright 2025 BrowserOS
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*
|
||||
* In-process cache for Klavis Strata `createStrata` responses.
|
||||
*
|
||||
* Conversation creation in `/chat` was blocking on a Worker-proxied
|
||||
* `klavisClient.createStrata` round-trip every time the user had any
|
||||
* managed Klavis app connected. This cache stores the (immutable) JSON
|
||||
* metadata returned by `createStrata` so that subsequent chats with the
|
||||
* same `(browserosId, enabled-server-set)` skip the round-trip entirely.
|
||||
*
|
||||
* It does NOT cache live MCP client connections — only URL/id metadata.
|
||||
* Per-session MCP clients continue to be opened and closed by
|
||||
* `AiSdkAgent.create` / `dispose` exactly as before, which makes the
|
||||
* cache safe across concurrent chats by construction.
|
||||
*/
|
||||
|
||||
import type {
|
||||
KlavisClient,
|
||||
StrataCreateResponse,
|
||||
} from '../../../lib/clients/klavis/klavis-client'
|
||||
import { logger } from '../../../lib/logger'
|
||||
|
||||
const DEFAULT_TTL_MS = 60 * 60 * 1000
|
||||
|
||||
interface CacheEntry {
|
||||
strataServerUrl: string
|
||||
strataId: string
|
||||
addedServers: string[]
|
||||
serverKey: string
|
||||
expiresAt: number
|
||||
}
|
||||
|
||||
function normalizeServers(servers: readonly string[]): string {
|
||||
return [...new Set(servers)].sort().join(',')
|
||||
}
|
||||
|
||||
function keyOf(browserosId: string, normalized: string): string {
|
||||
// xxhash64 → 16 hex chars, fixed width. Birthday-bound collision risk
|
||||
// for our scale (<10k entries) is ~5e-15; we additionally verify
|
||||
// serverKey on read so collisions cannot affect correctness.
|
||||
const hash = Bun.hash(normalized).toString(16).padStart(16, '0')
|
||||
return `${browserosId}|${hash}`
|
||||
}
|
||||
|
||||
export class KlavisStrataCache {
|
||||
private entries = new Map<string, Promise<CacheEntry>>()
|
||||
|
||||
constructor(private ttlMs: number = DEFAULT_TTL_MS) {}
|
||||
|
||||
async getOrFetch(
|
||||
client: KlavisClient,
|
||||
browserosId: string,
|
||||
servers: readonly string[],
|
||||
): Promise<StrataCreateResponse> {
|
||||
const normalized = normalizeServers(servers)
|
||||
const key = keyOf(browserosId, normalized)
|
||||
const existing = this.entries.get(key)
|
||||
|
||||
if (existing) {
|
||||
const resolved = await existing.catch(() => null)
|
||||
if (
|
||||
resolved &&
|
||||
resolved.serverKey === normalized &&
|
||||
Date.now() < resolved.expiresAt
|
||||
) {
|
||||
logger.debug('Klavis strata cache hit', { key })
|
||||
return this.toResponse(resolved)
|
||||
}
|
||||
// Stale/collision/rejected — evict, but only if we're the rightful
|
||||
// evictor (a racing caller may have already replaced this slot).
|
||||
if (this.entries.get(key) === existing) {
|
||||
this.entries.delete(key)
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug('Klavis strata cache miss', {
|
||||
key,
|
||||
serverCount: servers.length,
|
||||
})
|
||||
const inflight = this.fetch(client, browserosId, servers, normalized)
|
||||
this.entries.set(key, inflight)
|
||||
|
||||
try {
|
||||
return this.toResponse(await inflight)
|
||||
} catch (err) {
|
||||
// Identity-check: only drop OUR entry. A racing invalidate() may have
|
||||
// already removed it, or a racing miss may have inserted a new one
|
||||
// that we must not clobber.
|
||||
if (this.entries.get(key) === inflight) {
|
||||
this.entries.delete(key)
|
||||
}
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
invalidate(browserosId: string): void {
|
||||
const prefix = `${browserosId}|`
|
||||
let dropped = 0
|
||||
for (const key of this.entries.keys()) {
|
||||
if (key.startsWith(prefix)) {
|
||||
this.entries.delete(key)
|
||||
dropped++
|
||||
}
|
||||
}
|
||||
if (dropped > 0) {
|
||||
logger.debug('Klavis strata cache invalidated', {
|
||||
browserosId: browserosId.slice(0, 12),
|
||||
dropped,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
clear(): void {
|
||||
this.entries.clear()
|
||||
}
|
||||
|
||||
private async fetch(
|
||||
client: KlavisClient,
|
||||
browserosId: string,
|
||||
servers: readonly string[],
|
||||
normalized: string,
|
||||
): Promise<CacheEntry> {
|
||||
const result = await client.createStrata(browserosId, [...servers])
|
||||
return {
|
||||
strataServerUrl: result.strataServerUrl,
|
||||
strataId: result.strataId,
|
||||
addedServers: result.addedServers,
|
||||
serverKey: normalized,
|
||||
expiresAt: Date.now() + this.ttlMs,
|
||||
}
|
||||
}
|
||||
|
||||
private toResponse(entry: CacheEntry): StrataCreateResponse {
|
||||
return {
|
||||
strataServerUrl: entry.strataServerUrl,
|
||||
strataId: entry.strataId,
|
||||
addedServers: entry.addedServers,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const klavisStrataCache = new KlavisStrataCache()
|
||||
@@ -14,6 +14,7 @@ import type { KlavisClient } from '../../../lib/clients/klavis/klavis-client'
|
||||
import { OAUTH_MCP_SERVERS } from '../../../lib/clients/klavis/oauth-mcp-servers'
|
||||
import { logger } from '../../../lib/logger'
|
||||
import { metrics } from '../../../lib/metrics'
|
||||
import { klavisStrataCache } from './strata-cache'
|
||||
|
||||
function withTimeout<T>(promise: Promise<T>, label: string): Promise<T> {
|
||||
let timerId: ReturnType<typeof setTimeout> | undefined
|
||||
@@ -49,7 +50,8 @@ export async function connectKlavisProxy(
|
||||
// even unauthenticated ones (Klavis handles auth prompts on call)
|
||||
const allServers = OAUTH_MCP_SERVERS.map((s) => s.name)
|
||||
|
||||
const strata = await deps.klavisClient.createStrata(
|
||||
const strata = await klavisStrataCache.getOrFetch(
|
||||
deps.klavisClient,
|
||||
deps.browserosId,
|
||||
allServers,
|
||||
)
|
||||
@@ -8,11 +8,11 @@ import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'
|
||||
import { SetLevelRequestSchema } from '@modelcontextprotocol/sdk/types.js'
|
||||
import type { Browser } from '../../../browser/browser'
|
||||
import type { ToolRegistry } from '../../../tools/tool-registry'
|
||||
import { MCP_INSTRUCTIONS } from './mcp-prompt'
|
||||
import {
|
||||
type KlavisProxyHandle,
|
||||
registerKlavisTools,
|
||||
} from './register-klavis-mcp'
|
||||
} from '../klavis/strata-proxy'
|
||||
import { MCP_INSTRUCTIONS } from './mcp-prompt'
|
||||
import { registerTools } from './register-mcp'
|
||||
|
||||
export interface McpServiceDeps {
|
||||
|
||||
@@ -145,16 +145,14 @@ export class KlavisClient {
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a server from a Strata instance
|
||||
* Flow: createStrata(server) to get strataId → DELETE /strata/{strataId}/servers?servers=X
|
||||
*/
|
||||
async removeServer(userId: string, serverName: string): Promise<void> {
|
||||
// createStrata to get strataId (passing same server ensures it exists)
|
||||
const strata = await this.createStrata(userId, [serverName])
|
||||
async deleteServersFromStrata(
|
||||
strataId: string,
|
||||
servers: string[],
|
||||
): Promise<void> {
|
||||
const query = servers.map(encodeURIComponent).join(',')
|
||||
await this.request(
|
||||
'DELETE',
|
||||
`/mcp-server/strata/${strata.strataId}/servers?servers=${encodeURIComponent(serverName)}`,
|
||||
`/mcp-server/strata/${strataId}/servers?servers=${query}`,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,12 +3,17 @@
|
||||
* Copyright 2025 BrowserOS
|
||||
*/
|
||||
|
||||
import { afterEach, describe, it } from 'bun:test'
|
||||
import { afterEach, beforeEach, describe, it } from 'bun:test'
|
||||
import assert from 'node:assert'
|
||||
import { createKlavisRoutes } from '../../../src/api/routes/klavis'
|
||||
import { klavisStrataCache } from '../../../src/api/services/klavis/strata-cache'
|
||||
|
||||
const originalFetch = globalThis.fetch
|
||||
|
||||
beforeEach(() => {
|
||||
klavisStrataCache.clear()
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
globalThis.fetch = originalFetch
|
||||
})
|
||||
|
||||
@@ -0,0 +1,163 @@
|
||||
/**
|
||||
* @license
|
||||
* Copyright 2025 BrowserOS
|
||||
*/
|
||||
|
||||
import { describe, expect, it } from 'bun:test'
|
||||
import { KlavisStrataCache } from '../../../../src/api/services/klavis/strata-cache'
|
||||
import type {
|
||||
KlavisClient,
|
||||
StrataCreateResponse,
|
||||
} from '../../../../src/lib/clients/klavis/klavis-client'
|
||||
|
||||
class StubKlavisClient {
|
||||
callCount = 0
|
||||
delayMs = 0
|
||||
shouldThrowOnce = false
|
||||
lastServers: string[] | null = null
|
||||
|
||||
async createStrata(
|
||||
userId: string,
|
||||
servers: string[],
|
||||
): Promise<StrataCreateResponse> {
|
||||
this.callCount++
|
||||
this.lastServers = servers
|
||||
if (this.shouldThrowOnce) {
|
||||
this.shouldThrowOnce = false
|
||||
throw new Error('boom')
|
||||
}
|
||||
if (this.delayMs > 0) {
|
||||
await new Promise((r) => setTimeout(r, this.delayMs))
|
||||
}
|
||||
return {
|
||||
strataServerUrl: `https://strata.test/${userId}/${servers.join('-')}`,
|
||||
strataId: `strata_${userId}`,
|
||||
addedServers: servers,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const asClient = (stub: StubKlavisClient): KlavisClient =>
|
||||
stub as unknown as KlavisClient
|
||||
|
||||
describe('KlavisStrataCache', () => {
|
||||
it('cache hit returns the same value without re-calling the client', async () => {
|
||||
const cache = new KlavisStrataCache()
|
||||
const client = new StubKlavisClient()
|
||||
const a = await cache.getOrFetch(asClient(client), 'user1', ['Gmail'])
|
||||
const b = await cache.getOrFetch(asClient(client), 'user1', ['Gmail'])
|
||||
expect(client.callCount).toBe(1)
|
||||
expect(a.strataServerUrl).toBe(b.strataServerUrl)
|
||||
expect(a.strataId).toBe(b.strataId)
|
||||
})
|
||||
|
||||
it('normalizes server order — [Gmail, Linear] === [Linear, Gmail]', async () => {
|
||||
const cache = new KlavisStrataCache()
|
||||
const client = new StubKlavisClient()
|
||||
await cache.getOrFetch(asClient(client), 'u', ['Gmail', 'Linear'])
|
||||
await cache.getOrFetch(asClient(client), 'u', ['Linear', 'Gmail'])
|
||||
expect(client.callCount).toBe(1)
|
||||
})
|
||||
|
||||
it('dedupes duplicate server names within one call', async () => {
|
||||
const cache = new KlavisStrataCache()
|
||||
const client = new StubKlavisClient()
|
||||
await cache.getOrFetch(asClient(client), 'u', ['Gmail', 'Gmail'])
|
||||
await cache.getOrFetch(asClient(client), 'u', ['Gmail'])
|
||||
expect(client.callCount).toBe(1)
|
||||
})
|
||||
|
||||
it('different user gets a separate cache entry', async () => {
|
||||
const cache = new KlavisStrataCache()
|
||||
const client = new StubKlavisClient()
|
||||
await cache.getOrFetch(asClient(client), 'userA', ['Gmail'])
|
||||
await cache.getOrFetch(asClient(client), 'userB', ['Gmail'])
|
||||
expect(client.callCount).toBe(2)
|
||||
})
|
||||
|
||||
it('different server set (same user) gets a separate cache entry', async () => {
|
||||
const cache = new KlavisStrataCache()
|
||||
const client = new StubKlavisClient()
|
||||
await cache.getOrFetch(asClient(client), 'u', ['Gmail'])
|
||||
await cache.getOrFetch(asClient(client), 'u', ['Gmail', 'Linear'])
|
||||
expect(client.callCount).toBe(2)
|
||||
})
|
||||
|
||||
it('concurrent misses share a single in-flight Promise', async () => {
|
||||
const cache = new KlavisStrataCache()
|
||||
const client = new StubKlavisClient()
|
||||
client.delayMs = 30
|
||||
const [a, b, c] = await Promise.all([
|
||||
cache.getOrFetch(asClient(client), 'u', ['Gmail']),
|
||||
cache.getOrFetch(asClient(client), 'u', ['Gmail']),
|
||||
cache.getOrFetch(asClient(client), 'u', ['Gmail']),
|
||||
])
|
||||
expect(client.callCount).toBe(1)
|
||||
expect(a.strataId).toBe(b.strataId)
|
||||
expect(b.strataId).toBe(c.strataId)
|
||||
})
|
||||
|
||||
it('TTL expiry triggers a fresh fetch', async () => {
|
||||
const cache = new KlavisStrataCache(10) // 10 ms TTL
|
||||
const client = new StubKlavisClient()
|
||||
await cache.getOrFetch(asClient(client), 'u', ['Gmail'])
|
||||
await new Promise((r) => setTimeout(r, 25))
|
||||
await cache.getOrFetch(asClient(client), 'u', ['Gmail'])
|
||||
expect(client.callCount).toBe(2)
|
||||
})
|
||||
|
||||
it('invalidate(userA) drops only userA entries', async () => {
|
||||
const cache = new KlavisStrataCache()
|
||||
const client = new StubKlavisClient()
|
||||
await cache.getOrFetch(asClient(client), 'userA', ['Gmail'])
|
||||
await cache.getOrFetch(asClient(client), 'userB', ['Gmail'])
|
||||
cache.invalidate('userA')
|
||||
await cache.getOrFetch(asClient(client), 'userA', ['Gmail'])
|
||||
await cache.getOrFetch(asClient(client), 'userB', ['Gmail'])
|
||||
expect(client.callCount).toBe(3) // userA: cold + cold, userB: cold + hit
|
||||
})
|
||||
|
||||
it('invalidate while a fetch is in flight does not store the result', async () => {
|
||||
const cache = new KlavisStrataCache()
|
||||
const client = new StubKlavisClient()
|
||||
client.delayMs = 30
|
||||
const inflight = cache.getOrFetch(asClient(client), 'u', ['Gmail'])
|
||||
cache.invalidate('u')
|
||||
const result = await inflight
|
||||
expect(result.strataId).toBe('strata_u')
|
||||
// Next call should not see the post-invalidate write — must re-fetch.
|
||||
await cache.getOrFetch(asClient(client), 'u', ['Gmail'])
|
||||
expect(client.callCount).toBe(2)
|
||||
})
|
||||
|
||||
it('rejected fetches do not poison the cache', async () => {
|
||||
const cache = new KlavisStrataCache()
|
||||
const client = new StubKlavisClient()
|
||||
client.shouldThrowOnce = true
|
||||
await expect(
|
||||
cache.getOrFetch(asClient(client), 'u', ['Gmail']),
|
||||
).rejects.toThrow('boom')
|
||||
await cache.getOrFetch(asClient(client), 'u', ['Gmail'])
|
||||
expect(client.callCount).toBe(2)
|
||||
})
|
||||
|
||||
it('clear() drops all entries', async () => {
|
||||
const cache = new KlavisStrataCache()
|
||||
const client = new StubKlavisClient()
|
||||
await cache.getOrFetch(asClient(client), 'userA', ['Gmail'])
|
||||
await cache.getOrFetch(asClient(client), 'userB', ['Linear'])
|
||||
cache.clear()
|
||||
await cache.getOrFetch(asClient(client), 'userA', ['Gmail'])
|
||||
await cache.getOrFetch(asClient(client), 'userB', ['Linear'])
|
||||
expect(client.callCount).toBe(4)
|
||||
})
|
||||
|
||||
it('passes a defensive copy of the servers array to the client', async () => {
|
||||
const cache = new KlavisStrataCache()
|
||||
const client = new StubKlavisClient()
|
||||
const input: readonly string[] = ['Gmail', 'Linear']
|
||||
await cache.getOrFetch(asClient(client), 'u', input)
|
||||
expect(client.lastServers).not.toBe(input)
|
||||
expect(client.lastServers).toEqual(['Gmail', 'Linear'])
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user