diff --git a/packages/browseros-agent/apps/server/src/agent/mcp-builder.ts b/packages/browseros-agent/apps/server/src/agent/mcp-builder.ts index b0f08bae8..963464d3e 100644 --- a/packages/browseros-agent/apps/server/src/agent/mcp-builder.ts +++ b/packages/browseros-agent/apps/server/src/agent/mcp-builder.ts @@ -2,6 +2,7 @@ import { createMCPClient } from '@ai-sdk/mcp' import { TIMEOUTS } from '@browseros/shared/constants/timeouts' import type { BrowserContext } from '@browseros/shared/schemas/browser-context' import type { ToolSet } from 'ai' +import { klavisStrataCache } from '../api/services/klavis/strata-cache' import type { KlavisClient } from '../lib/clients/klavis/klavis-client' import { logger } from '../lib/logger' import { @@ -40,7 +41,8 @@ export async function buildMcpServerSpecs( deps.browserContext?.enabledMcpServers?.length ) { try { - const result = await deps.klavisClient.createStrata( + const result = await klavisStrataCache.getOrFetch( + deps.klavisClient, deps.browserosId, deps.browserContext.enabledMcpServers, ) diff --git a/packages/browseros-agent/apps/server/src/api/routes/klavis.ts b/packages/browseros-agent/apps/server/src/api/routes/klavis.ts index 2c43de610..8d432cc7e 100644 --- a/packages/browseros-agent/apps/server/src/api/routes/klavis.ts +++ b/packages/browseros-agent/apps/server/src/api/routes/klavis.ts @@ -10,6 +10,7 @@ import { z } from 'zod' import { KlavisClient } from '../../lib/clients/klavis/klavis-client' import { OAUTH_MCP_SERVERS } from '../../lib/clients/klavis/oauth-mcp-servers' import { logger } from '../../lib/logger' +import { klavisStrataCache } from '../services/klavis/strata-cache' const ServerNameSchema = z.object({ serverName: z.string().min(1), @@ -125,6 +126,7 @@ export function createKlavisRoutes(deps: KlavisRouteDeps) { logger.info('Adding server to strata', { serverName }) const result = await klavisClient.createStrata(browserosId, [serverName]) + klavisStrataCache.invalidate(browserosId) return c.json({ success: true, @@ -184,7 +186,17 @@ export function createKlavisRoutes(deps: KlavisRouteDeps) { logger.info('Removing server from strata', { serverName }) - await klavisClient.removeServer(browserosId, serverName) + // The chat hot path keys its cache by the user's full enabled set, + // so a single-server lookup here would always miss and immediately + // be cleared by invalidate() below — call createStrata directly + // to recover the strataId, mirroring the original removeServer flow. + const strata = await klavisClient.createStrata(browserosId, [ + serverName, + ]) + await klavisClient.deleteServersFromStrata(strata.strataId, [ + serverName, + ]) + klavisStrataCache.invalidate(browserosId) return c.json({ success: true, diff --git a/packages/browseros-agent/apps/server/src/api/routes/mcp.ts b/packages/browseros-agent/apps/server/src/api/routes/mcp.ts index 45dcc503b..1cca3a182 100644 --- a/packages/browseros-agent/apps/server/src/api/routes/mcp.ts +++ b/packages/browseros-agent/apps/server/src/api/routes/mcp.ts @@ -11,8 +11,8 @@ import { logger } from '../../lib/logger' import { metrics } from '../../lib/metrics' import { Sentry } from '../../lib/sentry' import type { ToolRegistry } from '../../tools/tool-registry' +import type { KlavisProxyHandle } from '../services/klavis/strata-proxy' import { createMcpServer } from '../services/mcp/mcp-server' -import type { KlavisProxyHandle } from '../services/mcp/register-klavis-mcp' import type { Env } from '../types' interface McpRouteDeps { diff --git a/packages/browseros-agent/apps/server/src/api/server.ts b/packages/browseros-agent/apps/server/src/api/server.ts index b4213fbf6..9172e413a 100644 --- a/packages/browseros-agent/apps/server/src/api/server.ts +++ b/packages/browseros-agent/apps/server/src/api/server.ts @@ -38,7 +38,7 @@ import { createStatusRoute } from './routes/status' import { connectKlavisProxy, type KlavisProxyHandle, -} from './services/mcp/register-klavis-mcp' +} from './services/klavis/strata-proxy' import type { Env, HttpServerConfig } from './types' import { defaultCorsConfig } from './utils/cors' diff --git a/packages/browseros-agent/apps/server/src/api/services/klavis/strata-cache.ts b/packages/browseros-agent/apps/server/src/api/services/klavis/strata-cache.ts new file mode 100644 index 000000000..3918d4a82 --- /dev/null +++ b/packages/browseros-agent/apps/server/src/api/services/klavis/strata-cache.ts @@ -0,0 +1,145 @@ +/** + * @license + * Copyright 2025 BrowserOS + * SPDX-License-Identifier: AGPL-3.0-or-later + * + * In-process cache for Klavis Strata `createStrata` responses. + * + * Conversation creation in `/chat` was blocking on a Worker-proxied + * `klavisClient.createStrata` round-trip every time the user had any + * managed Klavis app connected. This cache stores the (immutable) JSON + * metadata returned by `createStrata` so that subsequent chats with the + * same `(browserosId, enabled-server-set)` skip the round-trip entirely. + * + * It does NOT cache live MCP client connections — only URL/id metadata. + * Per-session MCP clients continue to be opened and closed by + * `AiSdkAgent.create` / `dispose` exactly as before, which makes the + * cache safe across concurrent chats by construction. + */ + +import type { + KlavisClient, + StrataCreateResponse, +} from '../../../lib/clients/klavis/klavis-client' +import { logger } from '../../../lib/logger' + +const DEFAULT_TTL_MS = 60 * 60 * 1000 + +interface CacheEntry { + strataServerUrl: string + strataId: string + addedServers: string[] + serverKey: string + expiresAt: number +} + +function normalizeServers(servers: readonly string[]): string { + return [...new Set(servers)].sort().join(',') +} + +function keyOf(browserosId: string, normalized: string): string { + // xxhash64 → 16 hex chars, fixed width. Birthday-bound collision risk + // for our scale (<10k entries) is ~5e-15; we additionally verify + // serverKey on read so collisions cannot affect correctness. + const hash = Bun.hash(normalized).toString(16).padStart(16, '0') + return `${browserosId}|${hash}` +} + +export class KlavisStrataCache { + private entries = new Map>() + + constructor(private ttlMs: number = DEFAULT_TTL_MS) {} + + async getOrFetch( + client: KlavisClient, + browserosId: string, + servers: readonly string[], + ): Promise { + const normalized = normalizeServers(servers) + const key = keyOf(browserosId, normalized) + const existing = this.entries.get(key) + + if (existing) { + const resolved = await existing.catch(() => null) + if ( + resolved && + resolved.serverKey === normalized && + Date.now() < resolved.expiresAt + ) { + logger.debug('Klavis strata cache hit', { key }) + return this.toResponse(resolved) + } + // Stale/collision/rejected — evict, but only if we're the rightful + // evictor (a racing caller may have already replaced this slot). + if (this.entries.get(key) === existing) { + this.entries.delete(key) + } + } + + logger.debug('Klavis strata cache miss', { + key, + serverCount: servers.length, + }) + const inflight = this.fetch(client, browserosId, servers, normalized) + this.entries.set(key, inflight) + + try { + return this.toResponse(await inflight) + } catch (err) { + // Identity-check: only drop OUR entry. A racing invalidate() may have + // already removed it, or a racing miss may have inserted a new one + // that we must not clobber. + if (this.entries.get(key) === inflight) { + this.entries.delete(key) + } + throw err + } + } + + invalidate(browserosId: string): void { + const prefix = `${browserosId}|` + let dropped = 0 + for (const key of this.entries.keys()) { + if (key.startsWith(prefix)) { + this.entries.delete(key) + dropped++ + } + } + if (dropped > 0) { + logger.debug('Klavis strata cache invalidated', { + browserosId: browserosId.slice(0, 12), + dropped, + }) + } + } + + clear(): void { + this.entries.clear() + } + + private async fetch( + client: KlavisClient, + browserosId: string, + servers: readonly string[], + normalized: string, + ): Promise { + const result = await client.createStrata(browserosId, [...servers]) + return { + strataServerUrl: result.strataServerUrl, + strataId: result.strataId, + addedServers: result.addedServers, + serverKey: normalized, + expiresAt: Date.now() + this.ttlMs, + } + } + + private toResponse(entry: CacheEntry): StrataCreateResponse { + return { + strataServerUrl: entry.strataServerUrl, + strataId: entry.strataId, + addedServers: entry.addedServers, + } + } +} + +export const klavisStrataCache = new KlavisStrataCache() diff --git a/packages/browseros-agent/apps/server/src/api/services/mcp/register-klavis-mcp.ts b/packages/browseros-agent/apps/server/src/api/services/klavis/strata-proxy.ts similarity index 97% rename from packages/browseros-agent/apps/server/src/api/services/mcp/register-klavis-mcp.ts rename to packages/browseros-agent/apps/server/src/api/services/klavis/strata-proxy.ts index c8e67c046..dda87238d 100644 --- a/packages/browseros-agent/apps/server/src/api/services/mcp/register-klavis-mcp.ts +++ b/packages/browseros-agent/apps/server/src/api/services/klavis/strata-proxy.ts @@ -14,6 +14,7 @@ import type { KlavisClient } from '../../../lib/clients/klavis/klavis-client' import { OAUTH_MCP_SERVERS } from '../../../lib/clients/klavis/oauth-mcp-servers' import { logger } from '../../../lib/logger' import { metrics } from '../../../lib/metrics' +import { klavisStrataCache } from './strata-cache' function withTimeout(promise: Promise, label: string): Promise { let timerId: ReturnType | undefined @@ -49,7 +50,8 @@ export async function connectKlavisProxy( // even unauthenticated ones (Klavis handles auth prompts on call) const allServers = OAUTH_MCP_SERVERS.map((s) => s.name) - const strata = await deps.klavisClient.createStrata( + const strata = await klavisStrataCache.getOrFetch( + deps.klavisClient, deps.browserosId, allServers, ) diff --git a/packages/browseros-agent/apps/server/src/api/services/mcp/mcp-server.ts b/packages/browseros-agent/apps/server/src/api/services/mcp/mcp-server.ts index e3daba2df..aab0d7177 100644 --- a/packages/browseros-agent/apps/server/src/api/services/mcp/mcp-server.ts +++ b/packages/browseros-agent/apps/server/src/api/services/mcp/mcp-server.ts @@ -8,11 +8,11 @@ import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js' import { SetLevelRequestSchema } from '@modelcontextprotocol/sdk/types.js' import type { Browser } from '../../../browser/browser' import type { ToolRegistry } from '../../../tools/tool-registry' -import { MCP_INSTRUCTIONS } from './mcp-prompt' import { type KlavisProxyHandle, registerKlavisTools, -} from './register-klavis-mcp' +} from '../klavis/strata-proxy' +import { MCP_INSTRUCTIONS } from './mcp-prompt' import { registerTools } from './register-mcp' export interface McpServiceDeps { diff --git a/packages/browseros-agent/apps/server/src/lib/clients/klavis/klavis-client.ts b/packages/browseros-agent/apps/server/src/lib/clients/klavis/klavis-client.ts index 6f05a392a..b0fb0b8cc 100644 --- a/packages/browseros-agent/apps/server/src/lib/clients/klavis/klavis-client.ts +++ b/packages/browseros-agent/apps/server/src/lib/clients/klavis/klavis-client.ts @@ -145,16 +145,14 @@ export class KlavisClient { }) } - /** - * Remove a server from a Strata instance - * Flow: createStrata(server) to get strataId → DELETE /strata/{strataId}/servers?servers=X - */ - async removeServer(userId: string, serverName: string): Promise { - // createStrata to get strataId (passing same server ensures it exists) - const strata = await this.createStrata(userId, [serverName]) + async deleteServersFromStrata( + strataId: string, + servers: string[], + ): Promise { + const query = servers.map(encodeURIComponent).join(',') await this.request( 'DELETE', - `/mcp-server/strata/${strata.strataId}/servers?servers=${encodeURIComponent(serverName)}`, + `/mcp-server/strata/${strataId}/servers?servers=${query}`, ) } } diff --git a/packages/browseros-agent/apps/server/tests/api/routes/klavis.test.ts b/packages/browseros-agent/apps/server/tests/api/routes/klavis.test.ts index bbe7a4f15..ef850cc19 100644 --- a/packages/browseros-agent/apps/server/tests/api/routes/klavis.test.ts +++ b/packages/browseros-agent/apps/server/tests/api/routes/klavis.test.ts @@ -3,12 +3,17 @@ * Copyright 2025 BrowserOS */ -import { afterEach, describe, it } from 'bun:test' +import { afterEach, beforeEach, describe, it } from 'bun:test' import assert from 'node:assert' import { createKlavisRoutes } from '../../../src/api/routes/klavis' +import { klavisStrataCache } from '../../../src/api/services/klavis/strata-cache' const originalFetch = globalThis.fetch +beforeEach(() => { + klavisStrataCache.clear() +}) + afterEach(() => { globalThis.fetch = originalFetch }) diff --git a/packages/browseros-agent/apps/server/tests/api/services/klavis/strata-cache.test.ts b/packages/browseros-agent/apps/server/tests/api/services/klavis/strata-cache.test.ts new file mode 100644 index 000000000..48b9ad2f2 --- /dev/null +++ b/packages/browseros-agent/apps/server/tests/api/services/klavis/strata-cache.test.ts @@ -0,0 +1,163 @@ +/** + * @license + * Copyright 2025 BrowserOS + */ + +import { describe, expect, it } from 'bun:test' +import { KlavisStrataCache } from '../../../../src/api/services/klavis/strata-cache' +import type { + KlavisClient, + StrataCreateResponse, +} from '../../../../src/lib/clients/klavis/klavis-client' + +class StubKlavisClient { + callCount = 0 + delayMs = 0 + shouldThrowOnce = false + lastServers: string[] | null = null + + async createStrata( + userId: string, + servers: string[], + ): Promise { + this.callCount++ + this.lastServers = servers + if (this.shouldThrowOnce) { + this.shouldThrowOnce = false + throw new Error('boom') + } + if (this.delayMs > 0) { + await new Promise((r) => setTimeout(r, this.delayMs)) + } + return { + strataServerUrl: `https://strata.test/${userId}/${servers.join('-')}`, + strataId: `strata_${userId}`, + addedServers: servers, + } + } +} + +const asClient = (stub: StubKlavisClient): KlavisClient => + stub as unknown as KlavisClient + +describe('KlavisStrataCache', () => { + it('cache hit returns the same value without re-calling the client', async () => { + const cache = new KlavisStrataCache() + const client = new StubKlavisClient() + const a = await cache.getOrFetch(asClient(client), 'user1', ['Gmail']) + const b = await cache.getOrFetch(asClient(client), 'user1', ['Gmail']) + expect(client.callCount).toBe(1) + expect(a.strataServerUrl).toBe(b.strataServerUrl) + expect(a.strataId).toBe(b.strataId) + }) + + it('normalizes server order — [Gmail, Linear] === [Linear, Gmail]', async () => { + const cache = new KlavisStrataCache() + const client = new StubKlavisClient() + await cache.getOrFetch(asClient(client), 'u', ['Gmail', 'Linear']) + await cache.getOrFetch(asClient(client), 'u', ['Linear', 'Gmail']) + expect(client.callCount).toBe(1) + }) + + it('dedupes duplicate server names within one call', async () => { + const cache = new KlavisStrataCache() + const client = new StubKlavisClient() + await cache.getOrFetch(asClient(client), 'u', ['Gmail', 'Gmail']) + await cache.getOrFetch(asClient(client), 'u', ['Gmail']) + expect(client.callCount).toBe(1) + }) + + it('different user gets a separate cache entry', async () => { + const cache = new KlavisStrataCache() + const client = new StubKlavisClient() + await cache.getOrFetch(asClient(client), 'userA', ['Gmail']) + await cache.getOrFetch(asClient(client), 'userB', ['Gmail']) + expect(client.callCount).toBe(2) + }) + + it('different server set (same user) gets a separate cache entry', async () => { + const cache = new KlavisStrataCache() + const client = new StubKlavisClient() + await cache.getOrFetch(asClient(client), 'u', ['Gmail']) + await cache.getOrFetch(asClient(client), 'u', ['Gmail', 'Linear']) + expect(client.callCount).toBe(2) + }) + + it('concurrent misses share a single in-flight Promise', async () => { + const cache = new KlavisStrataCache() + const client = new StubKlavisClient() + client.delayMs = 30 + const [a, b, c] = await Promise.all([ + cache.getOrFetch(asClient(client), 'u', ['Gmail']), + cache.getOrFetch(asClient(client), 'u', ['Gmail']), + cache.getOrFetch(asClient(client), 'u', ['Gmail']), + ]) + expect(client.callCount).toBe(1) + expect(a.strataId).toBe(b.strataId) + expect(b.strataId).toBe(c.strataId) + }) + + it('TTL expiry triggers a fresh fetch', async () => { + const cache = new KlavisStrataCache(10) // 10 ms TTL + const client = new StubKlavisClient() + await cache.getOrFetch(asClient(client), 'u', ['Gmail']) + await new Promise((r) => setTimeout(r, 25)) + await cache.getOrFetch(asClient(client), 'u', ['Gmail']) + expect(client.callCount).toBe(2) + }) + + it('invalidate(userA) drops only userA entries', async () => { + const cache = new KlavisStrataCache() + const client = new StubKlavisClient() + await cache.getOrFetch(asClient(client), 'userA', ['Gmail']) + await cache.getOrFetch(asClient(client), 'userB', ['Gmail']) + cache.invalidate('userA') + await cache.getOrFetch(asClient(client), 'userA', ['Gmail']) + await cache.getOrFetch(asClient(client), 'userB', ['Gmail']) + expect(client.callCount).toBe(3) // userA: cold + cold, userB: cold + hit + }) + + it('invalidate while a fetch is in flight does not store the result', async () => { + const cache = new KlavisStrataCache() + const client = new StubKlavisClient() + client.delayMs = 30 + const inflight = cache.getOrFetch(asClient(client), 'u', ['Gmail']) + cache.invalidate('u') + const result = await inflight + expect(result.strataId).toBe('strata_u') + // Next call should not see the post-invalidate write — must re-fetch. + await cache.getOrFetch(asClient(client), 'u', ['Gmail']) + expect(client.callCount).toBe(2) + }) + + it('rejected fetches do not poison the cache', async () => { + const cache = new KlavisStrataCache() + const client = new StubKlavisClient() + client.shouldThrowOnce = true + await expect( + cache.getOrFetch(asClient(client), 'u', ['Gmail']), + ).rejects.toThrow('boom') + await cache.getOrFetch(asClient(client), 'u', ['Gmail']) + expect(client.callCount).toBe(2) + }) + + it('clear() drops all entries', async () => { + const cache = new KlavisStrataCache() + const client = new StubKlavisClient() + await cache.getOrFetch(asClient(client), 'userA', ['Gmail']) + await cache.getOrFetch(asClient(client), 'userB', ['Linear']) + cache.clear() + await cache.getOrFetch(asClient(client), 'userA', ['Gmail']) + await cache.getOrFetch(asClient(client), 'userB', ['Linear']) + expect(client.callCount).toBe(4) + }) + + it('passes a defensive copy of the servers array to the client', async () => { + const cache = new KlavisStrataCache() + const client = new StubKlavisClient() + const input: readonly string[] = ['Gmail', 'Linear'] + await cache.getOrFetch(asClient(client), 'u', input) + expect(client.lastServers).not.toBe(input) + expect(client.lastServers).toEqual(['Gmail', 'Linear']) + }) +})