mirror of
https://github.com/browseros-ai/BrowserOS.git
synced 2026-05-21 04:45:12 +00:00
* feat(agents): decouple chat turn lifecycle from SSE response
Introduce a per-process ActiveTurnRegistry that owns each agent turn's
lifecycle and a ring-buffered event stream, so chat tabs that close,
refresh, or navigate away no longer cancel the in-flight turn. New
endpoints:
POST /agents/:id/chat starts a turn (now returns 409 when
one is already running, with the
active turnId for attaching)
GET /agents/:id/chat/active reports the running turn for a UI
that just mounted
GET /agents/:id/chat/stream subscribes to a turn; supports
Last-Event-ID resume via per-event
seq ids
POST /agents/:id/chat/cancel explicit cancel — fetch abort no
longer affects the underlying turn
The chat hook now captures X-Turn-Id, tracks lastSeq from SSE id lines,
re-attaches on mount when the server still has an active turn, and
routes Stop through the cancel endpoint. The runtime call uses the
registry's per-turn AbortController instead of the HTTP request signal,
which is the core decoupling that lets turns outlive their initiator.
* feat(agents): add ActiveTurnRegistry primitive backing the new chat lifecycle
The previous commit referenced these files in tests and the harness
service but global gitignore swallowed them on the first add.
The registry owns the per-turn ring buffer (drop-oldest, terminal frame
preserved), the per-turn AbortController, and subscriber fan-out used
by /chat/stream resume.
118 lines
3.0 KiB
TypeScript
118 lines
3.0 KiB
TypeScript
function isAbortError(error: unknown): boolean {
|
|
return error instanceof DOMException && error.name === 'AbortError'
|
|
}
|
|
|
|
export interface ParsedSSEEvent<T> {
|
|
data: T
|
|
/** Numeric `id:` line on the same SSE event, if any. */
|
|
seq?: number
|
|
}
|
|
|
|
export function parseSSELines<T>(buffer: string): {
|
|
events: ParsedSSEEvent<T>[]
|
|
remainder: string
|
|
} {
|
|
// SSE events are separated by blank lines. Buffer lines until we hit
|
|
// a blank, then assemble each event. Lines we recognise: `id: <n>`
|
|
// and `data: <payload>`. Everything else is ignored.
|
|
const events: ParsedSSEEvent<T>[] = []
|
|
const lines = buffer.split('\n')
|
|
// Find the last blank-line boundary; everything after it is the
|
|
// remainder (next event partially received).
|
|
let lastBoundary = -1
|
|
for (let i = lines.length - 1; i >= 0; i--) {
|
|
if (lines[i] === '') {
|
|
lastBoundary = i
|
|
break
|
|
}
|
|
}
|
|
const completeLines = lastBoundary >= 0 ? lines.slice(0, lastBoundary) : []
|
|
const remainder =
|
|
lastBoundary >= 0 ? lines.slice(lastBoundary + 1).join('\n') : buffer
|
|
|
|
let currentSeq: number | undefined
|
|
let currentData: string | null = null
|
|
const flush = () => {
|
|
if (currentData != null && currentData !== '[DONE]') {
|
|
try {
|
|
events.push({
|
|
data: JSON.parse(currentData) as T,
|
|
seq: currentSeq,
|
|
})
|
|
} catch {
|
|
// ignore
|
|
}
|
|
}
|
|
currentSeq = undefined
|
|
currentData = null
|
|
}
|
|
|
|
for (const line of completeLines) {
|
|
if (line === '') {
|
|
flush()
|
|
continue
|
|
}
|
|
if (line.startsWith('id: ')) {
|
|
const n = Number.parseInt(line.slice(4).trim(), 10)
|
|
if (Number.isFinite(n)) currentSeq = n
|
|
continue
|
|
}
|
|
if (line.startsWith('data: ')) {
|
|
currentData = line.slice(6)
|
|
}
|
|
}
|
|
// Catch a complete trailing event with no terminating blank line —
|
|
// shouldn't happen in well-formed SSE, but be tolerant.
|
|
flush()
|
|
|
|
return { events, remainder }
|
|
}
|
|
|
|
export async function consumeSSEStream<T>(
|
|
response: Response,
|
|
onEvent: (event: T, meta: { seq?: number }) => void,
|
|
signal?: AbortSignal,
|
|
): Promise<void> {
|
|
const reader = response.body?.getReader()
|
|
if (!reader) return
|
|
|
|
const decoder = new TextDecoder()
|
|
let buffer = ''
|
|
|
|
const abortReader = () => {
|
|
void reader.cancel()
|
|
}
|
|
|
|
signal?.addEventListener('abort', abortReader, { once: true })
|
|
|
|
try {
|
|
while (true) {
|
|
const { done, value } = await reader.read()
|
|
if (done) break
|
|
|
|
buffer += decoder.decode(value, { stream: true })
|
|
const { events, remainder } = parseSSELines<T>(buffer)
|
|
buffer = remainder
|
|
|
|
for (const event of events) {
|
|
onEvent(event.data, { seq: event.seq })
|
|
}
|
|
}
|
|
} catch (error) {
|
|
if (signal?.aborted || isAbortError(error)) return
|
|
throw error
|
|
} finally {
|
|
signal?.removeEventListener('abort', abortReader)
|
|
const trailing = decoder.decode()
|
|
if (trailing) {
|
|
buffer += trailing
|
|
}
|
|
if (buffer) {
|
|
const { events } = parseSSELines<T>(buffer)
|
|
for (const event of events) {
|
|
onEvent(event.data, { seq: event.seq })
|
|
}
|
|
}
|
|
}
|
|
}
|