Files
BrowserOS/packages/browseros-agent/apps/agent/entrypoints/app/agent-command/useAgentConversation.ts
Dani Akash 8712f89f18 feat(agents): durable per-agent chat message queue + composer Stop (#880)
* feat(agents): durable per-agent chat message queue + composer Stop button

* fix(agents): tighten queue UI — smaller Stop, drop empty indicator, live drain attach

User feedback round 1 on the message-queue UX:

1) The Stop button matched the send/voice mics at h-10 w-10 with a
   solid destructive fill, which read as alarming. Shrunk to h-8 w-8,
   ghost variant with a soft destructive/10 background, smaller
   filled square glyph. Reads as a calm 'stop' affordance instead of
   a panic button.

2) The QueueItem's leading <QueueItemIndicator> dot was decorative
   only — no state, no interaction. Dropped it from QueuePanel along
   with the import; queue items now render as a clean preview line
   with the trailing X remove action.

3) When the server drained the queue and started the next turn, the
   chat panel didn't pick up the live stream until the user
   navigated away and back. The hook's resume effect previously
   only fired on agent change, not on listing-observed activeTurnId
   change. Surface activeTurnId from useHarnessAgents into
   useAgentConversation; effect now re-runs when the id changes,
   calls /chat/active, and attaches to the new turn — so a queued
   message starts streaming the moment the server drain pops it.

* fix(agents): don't reset streaming state from the resume effect's no-op paths

The Stop button was disappearing while the agent was actively
streaming, even though events were still flowing into the chat. Root
cause: the resume effect's `finally` block reset `streaming`,
`turnIdRef`, and `lastSeqRef` unconditionally — including on the
early-return paths (no active turn, or another mechanism already
owns the stream).

Sequence that triggered it:
  1) User sends a message → send() sets streamAbortRef + streaming=true
     and starts consuming the SSE.
  2) User enqueues another message → enqueue mutation invalidates the
     listing query.
  3) Listing refetches with the live activeTurnId → the resume
     effect re-fires (deps include activeTurnIdDep).
  4) attemptResume hits `if (streamAbortRef.current) return` because
     send() owns it.
  5) The finally clause fires anyway and calls setStreaming(false),
     clobbering the live state set by send(). The SSE consumer keeps
     running (refs are intact) so text keeps streaming, but the React
     flag is wrong, so the Stop button gates off.

Fix: track whether *this* run actually started a stream
(`weStartedStream`). The finally only resets state when it does.
Early-return / no-active-turn paths now leave streaming/turnIdRef/
lastSeqRef alone for whoever does own them.

Also widens the Stop button's visibility (`canStop` prop on
ConversationInput) so it stays steady across the brief gap between
turns when a queue drain is mid-flight; the parent computes
`streaming || activeTurnId !== null || queue.length > 0`. The
visibility widening is independent of the streaming-state fix above
— both are now in place.

* revert: drop canStop widening — Stop only shows while streaming

Reverts the canStop prop on ConversationInput and the OR-with-queue
visibility from AgentCommandConversation. Stop is gated solely on
`streaming` again. Between turns (queue draining) the button stays
hidden — only the actively-streaming turn is interruptible from the
composer, which matches what the user actually expects.

* fix(agents): persist the kicking-off prompt on active turns so the resume placeholder isn't empty

When a queued message drained and started a new turn, the chat
panel's resume effect staged a placeholder turn with userText: ''
because the hook had no way to know what message kicked off the
turn — only the agent-side stream was visible, and the user bubble
above it was blank until the user navigated away and back (at which
point the session record's history loaded normally).

Fix: ActiveTurnRegistry.register now accepts an optional `prompt`
that's stashed on the turn and surfaced via describe() / the
ActiveTurnInfo response. AgentHarnessService.startTurn passes the
incoming message into register. /chat/active returns it. The chat
hook's resume effect uses active.prompt as the placeholder
turn's userText, so the user bubble shows the queued message text
the moment streaming begins. Falls back to '' for older clients
that haven't been refetched yet.

* fix(agents): always release streamAbortRef on resume cleanup, even when cancelled

Greptile P1 follow-up. The previous `weStartedStream` guard correctly
stopped the resume effect's no-op early-returns from clobbering an
in-flight `send()` stream — but it also stopped a *cancelled*
mid-stream resume from clearing its own `streamAbortRef`. When the
cleanup fires (e.g. the 5s listing poll captures a new queue-drain
turn id while the SSE for the prior turn is still finishing), the
next effect run hits the `if (streamAbortRef.current) return` guard
against the now-aborted controller and never reattaches, leaving
`streaming === true` with no live stream until the user navigates
away.

Split the finally block: always release `streamAbortRef` when we
owned the controller (so the next run can take over), but only
reset the streaming flag / turn id / lastSeq on a clean exit (the
new run will set those itself, so resetting on cancel would just
flicker).
2026-04-30 18:26:56 +05:30

449 lines
14 KiB
TypeScript

import { useEffect, useRef, useState } from 'react'
import {
type AgentHarnessStreamEvent,
attachToHarnessTurn,
cancelHarnessTurn,
chatWithHarnessAgent,
fetchActiveHarnessTurn,
} from '@/entrypoints/app/agents/useAgents'
import type { OpenClawChatHistoryMessage } from '@/entrypoints/app/agents/useOpenClaw'
import type {
AgentConversationTurn,
AssistantPart,
ToolEntry,
UserAttachmentPreview,
} from '@/lib/agent-conversations/types'
import type { ServerAttachmentPayload } from '@/lib/attachments'
import { consumeSSEStream } from '@/lib/sse'
import { buildToolLabel } from '@/lib/tool-labels'
import { mapAgentHarnessToolStatus } from './agent-stream-events'
export interface SendInput {
text: string
attachments?: ServerAttachmentPayload[]
// Optional preview metadata used to render the optimistic user turn.
// Built by the composer at staging time; the server only sees the
// payload array.
attachmentPreviews?: UserAttachmentPreview[]
}
interface UseAgentConversationOptions {
// The hook always speaks to the harness chat path now; the OpenClaw
// legacy /claw/agents/:id/chat surface was removed in Step 12. The
// option remains for forward-compatibility.
runtime?: 'agent-harness'
sessionKey?: string | null
history?: OpenClawChatHistoryMessage[]
onComplete?: () => void
onSessionKeyChange?: (sessionKey: string) => void
/**
* Server-side active turn id, surfaced via the listing query. When
* this changes from null/<id> to a different non-null id while we
* aren't already streaming (e.g. the server just popped a queued
* message and started a new turn), the hook reattaches via
* /chat/active so the chat panel picks up the live stream without
* waiting for a remount.
*/
activeTurnId?: string | null
}
export function useAgentConversation(
agentId: string,
options: UseAgentConversationOptions = {},
) {
const [turns, setTurns] = useState<AgentConversationTurn[]>([])
const [streaming, setStreaming] = useState(false)
const sessionKeyRef = useRef(options.sessionKey ?? '')
const historyRef = useRef<OpenClawChatHistoryMessage[]>(options.history ?? [])
const textAccRef = useRef('')
const thinkAccRef = useRef('')
const streamAbortRef = useRef<AbortController | null>(null)
const onCompleteRef = useRef(options.onComplete)
const onSessionKeyChangeRef = useRef(options.onSessionKeyChange)
// Per-turn resume bookkeeping. `turnId` is captured from the response
// header; `lastSeq` advances with every SSE event so a reconnect can
// resume via Last-Event-ID.
const turnIdRef = useRef<string | null>(null)
const lastSeqRef = useRef<number | null>(null)
useEffect(() => {
sessionKeyRef.current = options.sessionKey ?? ''
}, [options.sessionKey])
useEffect(() => {
historyRef.current = options.history ?? []
}, [options.history])
useEffect(() => {
onCompleteRef.current = options.onComplete
}, [options.onComplete])
useEffect(() => {
onSessionKeyChangeRef.current = options.onSessionKeyChange
}, [options.onSessionKeyChange])
useEffect(() => {
return () => {
streamAbortRef.current?.abort()
}
}, [])
// Indirection for the resume effect below: lets it call the latest
// event handler without re-subscribing on every render.
const processEventRef = useRef<(event: AgentHarnessStreamEvent) => void>(
() => {},
)
const updateCurrentTurnParts = (
updater: (parts: AssistantPart[]) => AssistantPart[],
) => {
setTurns((prev) => {
const last = prev[prev.length - 1]
if (!last) return prev
return [...prev.slice(0, -1), { ...last, parts: updater(last.parts) }]
})
}
const appendTextDelta = (delta: string) => {
textAccRef.current += delta
const text = textAccRef.current
updateCurrentTurnParts((parts) => {
const last = parts[parts.length - 1]
if (last?.kind === 'text') {
return [...parts.slice(0, -1), { ...last, text }]
}
return [...parts, { kind: 'text', text }]
})
}
const appendThinkingDelta = (delta: string) => {
thinkAccRef.current += delta
const text = thinkAccRef.current
updateCurrentTurnParts((parts) => {
const idx = parts.findIndex((p) => p.kind === 'thinking' && !p.done)
if (idx >= 0) {
return [
...parts.slice(0, idx),
{ ...parts[idx], text, done: false },
...parts.slice(idx + 1),
]
}
return [...parts, { kind: 'thinking', text, done: false }]
})
}
const appendErrorText = (message: string) => {
updateCurrentTurnParts((parts) => [
...parts,
{ kind: 'text', text: `Error: ${message}` },
])
}
const markCurrentTurnDone = () => {
updateCurrentTurnParts((parts) =>
parts.map((part) =>
part.kind === 'thinking' ? { ...part, done: true } : part,
),
)
setTurns((prev) => {
const last = prev[prev.length - 1]
if (!last) return prev
return [...prev.slice(0, -1), { ...last, done: true }]
})
}
const upsertAgentHarnessTool = (event: AgentHarnessStreamEvent) => {
if (event.type !== 'tool_call') return
const rawName = event.title || event.rawType || 'tool call'
const { label, subject } = buildToolLabel(
rawName,
event.text ? { description: event.text } : undefined,
)
const tool: ToolEntry = {
id: event.id ?? crypto.randomUUID(),
name: rawName,
label,
subject,
status: mapAgentHarnessToolStatus(event.status),
}
updateCurrentTurnParts((parts) => {
for (let i = parts.length - 1; i >= 0; i--) {
const part = parts[i]
if (
part.kind === 'tool-batch' &&
part.tools.some((existing) => existing.id === tool.id)
) {
const tools = part.tools.map((existing) =>
existing.id === tool.id ? { ...existing, ...tool } : existing,
)
return [
...parts.slice(0, i),
{ ...part, tools },
...parts.slice(i + 1),
]
}
}
const last = parts[parts.length - 1]
if (last?.kind === 'tool-batch') {
return [
...parts.slice(0, -1),
{ ...last, tools: [...last.tools, tool] },
]
}
return [...parts, { kind: 'tool-batch', tools: [tool] }]
})
}
const processAgentHarnessStreamEvent = (event: AgentHarnessStreamEvent) => {
switch (event.type) {
case 'text_delta':
if (event.stream === 'thought') {
appendThinkingDelta(event.text)
} else {
appendTextDelta(event.text)
}
break
case 'tool_call':
upsertAgentHarnessTool(event)
break
case 'done':
markCurrentTurnDone()
break
case 'error':
appendErrorText(event.message)
break
case 'status':
break
}
}
processEventRef.current = processAgentHarnessStreamEvent
const activeTurnIdDep = options.activeTurnId ?? null
// On mount, on agent change, and whenever the listing reports a
// *new* active turn id, check whether the server has an in-flight
// turn for this agent and reattach to it. This catches three
// cases at once: the chat resilience flow (tab close/reopen),
// navigation between agents, AND queue drain (the server starts a
// new turn from a queued message → activeTurnId flips → attach).
useEffect(() => {
let cancelled = false
const abortController = new AbortController()
// Reference the dep inside the body so biome's exhaustive-deps
// rule sees it consumed; the value is just an "any non-null
// active turn id" trigger — the actual id we attach to comes
// from the fresh fetchActiveHarnessTurn call below.
void activeTurnIdDep
const attemptResume = async () => {
// Track whether *we* started a stream in this run. When the
// early-return paths fire (no active turn, or a `send()` /
// earlier resume already owns `streamAbortRef`), the finally
// block must NOT touch streaming/turnIdRef/lastSeqRef —
// otherwise we clobber the in-flight stream's state and the
// Stop button drops out mid-turn while events keep arriving.
let weStartedStream = false
try {
const active = await fetchActiveHarnessTurn(agentId)
if (cancelled || !active || active.status !== 'running') return
if (streamAbortRef.current) return // someone else already owns the stream
// Stage a placeholder turn so the streamed events have a row
// to render into. The server now persists the kicking-off
// prompt on the active turn, so we render it as the user
// bubble immediately — no empty-bubble flicker when a queued
// message starts running.
setTurns((prev) => [
...prev,
{
id: crypto.randomUUID(),
userText: active.prompt ?? '',
parts: [],
done: false,
timestamp: active.startedAt,
},
])
textAccRef.current = ''
thinkAccRef.current = ''
turnIdRef.current = active.turnId
lastSeqRef.current = null
streamAbortRef.current = abortController
setStreaming(true)
weStartedStream = true
const response = await attachToHarnessTurn(agentId, {
turnId: active.turnId,
signal: abortController.signal,
})
if (!response.ok) return
await consumeSSEStream<AgentHarnessStreamEvent>(
response,
(event, meta) => {
if (typeof meta.seq === 'number') lastSeqRef.current = meta.seq
processEventRef.current(event)
},
abortController.signal,
)
} catch {
// Resume is best-effort; transient errors fall back to the
// user starting a new turn manually.
} finally {
// Always release `streamAbortRef` if we owned it — even when
// the effect was cancelled mid-stream (a listing poll
// captured the next queue-drain turn id, for example). If we
// don't, the next effect run hits `if (streamAbortRef.current)
// return` against our now-aborted controller and never
// reattaches, leaving `streaming === true` with no live stream.
if (weStartedStream && streamAbortRef.current === abortController) {
streamAbortRef.current = null
}
// The other state (streaming flag, turn id, lastSeq) is the
// *current run's* lifecycle: only reset it on a clean exit.
// When `cancelled` is true the next run will set these
// itself, so resetting here would only cause a brief flicker.
if (!cancelled && weStartedStream) {
turnIdRef.current = null
lastSeqRef.current = null
setStreaming(false)
}
}
}
void attemptResume()
return () => {
cancelled = true
abortController.abort()
}
}, [agentId, activeTurnIdDep])
const send = async (input: string | SendInput) => {
const normalized: SendInput =
typeof input === 'string' ? { text: input } : input
const trimmed = normalized.text.trim()
const attachments = normalized.attachments ?? []
if (streaming) return
if (!trimmed && attachments.length === 0) return
const turn: AgentConversationTurn = {
id: crypto.randomUUID(),
userText: trimmed,
userAttachments:
normalized.attachmentPreviews &&
normalized.attachmentPreviews.length > 0
? normalized.attachmentPreviews
: undefined,
parts: [],
done: false,
timestamp: Date.now(),
}
setTurns((prev) => [...prev, turn])
setStreaming(true)
textAccRef.current = ''
thinkAccRef.current = ''
const abortController = new AbortController()
streamAbortRef.current = abortController
try {
let response = await chatWithHarnessAgent(
agentId,
trimmed,
abortController.signal,
attachments,
)
// 409 means the server already has an active turn for this
// agent (e.g. a previous tab kicked one off and we're a fresh
// mount that missed the resume window). Attach to it instead of
// double-sending.
if (response.status === 409) {
const body = (await response.json()) as { turnId?: string }
if (body.turnId) {
response = await attachToHarnessTurn(agentId, {
turnId: body.turnId,
signal: abortController.signal,
})
}
}
const responseSessionKey =
response.headers.get('X-Session-Key') ??
response.headers.get('X-Session-Id')
if (responseSessionKey) {
sessionKeyRef.current = responseSessionKey
onSessionKeyChangeRef.current?.(responseSessionKey)
}
const responseTurnId = response.headers.get('X-Turn-Id')
if (responseTurnId) {
turnIdRef.current = responseTurnId
lastSeqRef.current = null
}
if (!response.ok) {
const err = await response.text()
updateCurrentTurnParts((parts) => [
...parts,
{ kind: 'text', text: `Error: ${err}` },
])
return
}
await consumeSSEStream<AgentHarnessStreamEvent>(
response,
(event, meta) => {
if (typeof meta.seq === 'number') lastSeqRef.current = meta.seq
processAgentHarnessStreamEvent(event)
},
abortController.signal,
)
} catch (err) {
if (abortController.signal.aborted) return
const msg = err instanceof Error ? err.message : String(err)
updateCurrentTurnParts((parts) => [
...parts,
{ kind: 'text', text: `Error: ${msg}` },
])
} finally {
if (streamAbortRef.current === abortController) {
streamAbortRef.current = null
}
turnIdRef.current = null
lastSeqRef.current = null
onCompleteRef.current?.()
setStreaming(false)
}
}
/**
* Stop button. The fetch abort only detaches *this* SSE subscriber
* now — the underlying turn would otherwise keep running on the
* server. So we explicitly cancel via the new endpoint, then unwind
* the local stream.
*/
const stop = async () => {
const turnId = turnIdRef.current ?? undefined
streamAbortRef.current?.abort()
streamAbortRef.current = null
try {
await cancelHarnessTurn(agentId, {
turnId,
reason: 'user pressed stop',
})
} catch {
// Best-effort — UI already aborted.
}
}
const resetConversation = () => {
void stop()
setTurns([])
setStreaming(false)
}
return {
turns,
streaming,
sessionKey: sessionKeyRef.current,
send,
stop,
resetConversation,
}
}