Files
BrowserOS/packages/browseros-agent/apps/agent/lib/sse.ts
Dani Akash a228c278c6 feat(agents): background-resilient chat — turns survive tab disconnect (#863)
* feat(agents): decouple chat turn lifecycle from SSE response

Introduce a per-process ActiveTurnRegistry that owns each agent turn's
lifecycle and a ring-buffered event stream, so chat tabs that close,
refresh, or navigate away no longer cancel the in-flight turn. New
endpoints:

  POST   /agents/:id/chat          starts a turn (now returns 409 when
                                   one is already running, with the
                                   active turnId for attaching)
  GET    /agents/:id/chat/active   reports the running turn for a UI
                                   that just mounted
  GET    /agents/:id/chat/stream   subscribes to a turn; supports
                                   Last-Event-ID resume via per-event
                                   seq ids
  POST   /agents/:id/chat/cancel   explicit cancel — fetch abort no
                                   longer affects the underlying turn

The chat hook now captures X-Turn-Id, tracks lastSeq from SSE id lines,
re-attaches on mount when the server still has an active turn, and
routes Stop through the cancel endpoint. The runtime call uses the
registry's per-turn AbortController instead of the HTTP request signal,
which is the core decoupling that lets turns outlive their initiator.

* feat(agents): add ActiveTurnRegistry primitive backing the new chat lifecycle

The previous commit referenced these files in tests and the harness
service but global gitignore swallowed them on the first add.

The registry owns the per-turn ring buffer (drop-oldest, terminal frame
preserved), the per-turn AbortController, and subscriber fan-out used
by /chat/stream resume.
2026-04-29 21:01:06 +05:30

118 lines
3.0 KiB
TypeScript

function isAbortError(error: unknown): boolean {
return error instanceof DOMException && error.name === 'AbortError'
}
export interface ParsedSSEEvent<T> {
data: T
/** Numeric `id:` line on the same SSE event, if any. */
seq?: number
}
export function parseSSELines<T>(buffer: string): {
events: ParsedSSEEvent<T>[]
remainder: string
} {
// SSE events are separated by blank lines. Buffer lines until we hit
// a blank, then assemble each event. Lines we recognise: `id: <n>`
// and `data: <payload>`. Everything else is ignored.
const events: ParsedSSEEvent<T>[] = []
const lines = buffer.split('\n')
// Find the last blank-line boundary; everything after it is the
// remainder (next event partially received).
let lastBoundary = -1
for (let i = lines.length - 1; i >= 0; i--) {
if (lines[i] === '') {
lastBoundary = i
break
}
}
const completeLines = lastBoundary >= 0 ? lines.slice(0, lastBoundary) : []
const remainder =
lastBoundary >= 0 ? lines.slice(lastBoundary + 1).join('\n') : buffer
let currentSeq: number | undefined
let currentData: string | null = null
const flush = () => {
if (currentData != null && currentData !== '[DONE]') {
try {
events.push({
data: JSON.parse(currentData) as T,
seq: currentSeq,
})
} catch {
// ignore
}
}
currentSeq = undefined
currentData = null
}
for (const line of completeLines) {
if (line === '') {
flush()
continue
}
if (line.startsWith('id: ')) {
const n = Number.parseInt(line.slice(4).trim(), 10)
if (Number.isFinite(n)) currentSeq = n
continue
}
if (line.startsWith('data: ')) {
currentData = line.slice(6)
}
}
// Catch a complete trailing event with no terminating blank line —
// shouldn't happen in well-formed SSE, but be tolerant.
flush()
return { events, remainder }
}
export async function consumeSSEStream<T>(
response: Response,
onEvent: (event: T, meta: { seq?: number }) => void,
signal?: AbortSignal,
): Promise<void> {
const reader = response.body?.getReader()
if (!reader) return
const decoder = new TextDecoder()
let buffer = ''
const abortReader = () => {
void reader.cancel()
}
signal?.addEventListener('abort', abortReader, { once: true })
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const { events, remainder } = parseSSELines<T>(buffer)
buffer = remainder
for (const event of events) {
onEvent(event.data, { seq: event.seq })
}
}
} catch (error) {
if (signal?.aborted || isAbortError(error)) return
throw error
} finally {
signal?.removeEventListener('abort', abortReader)
const trailing = decoder.decode()
if (trailing) {
buffer += trailing
}
if (buffer) {
const { events } = parseSSELines<T>(buffer)
for (const event of events) {
onEvent(event.data, { seq: event.seq })
}
}
}
}