fix: schedule task show errors (#191)

* fix: add debug logging for start:dev

* feat: use eventsource-parser for schedule tasks

* fix: remove reasoning traces, minor UI updates for schedule task

* fix: bug with textdelta
This commit is contained in:
Nikhil
2026-01-08 18:16:05 -08:00
committed by GitHub
parent eb15382825
commit 3525dc9026
8 changed files with 179 additions and 75 deletions

View File

@@ -1,6 +1,5 @@
import dayjs from 'dayjs'
import duration from 'dayjs/plugin/duration'
import DOMPurify from 'dompurify'
import {
AlertCircle,
Check,
@@ -9,8 +8,7 @@ import {
Loader2,
XCircle,
} from 'lucide-react'
import { marked } from 'marked'
import { type FC, useMemo, useState } from 'react'
import { type FC, useState } from 'react'
import { Button } from '@/components/ui/button'
import {
Dialog,
@@ -21,6 +19,7 @@ import {
} from '@/components/ui/dialog'
import { ScrollArea } from '@/components/ui/scroll-area'
import type { ScheduledJobRun } from '@/lib/schedules/scheduleTypes'
import { MessageResponse } from './message'
dayjs.extend(duration)
@@ -50,12 +49,6 @@ export const RunResultDialog: FC<RunResultDialogProps> = ({
}) => {
const [copied, setCopied] = useState(false)
const renderedContent = useMemo(() => {
if (!run?.result) return null
const html = marked.parse(run.result, { async: false }) as string
return DOMPurify.sanitize(html)
}, [run?.result])
const handleCopy = async () => {
if (!run?.result) return
await navigator.clipboard.writeText(run.result)
@@ -94,12 +87,10 @@ export const RunResultDialog: FC<RunResultDialogProps> = ({
</div>
<p className="text-destructive text-sm">{run.result}</p>
</div>
) : renderedContent ? (
<div
className="prose prose-sm dark:prose-invert max-w-none rounded-lg border border-border bg-muted/50 p-4"
// biome-ignore lint/security/noDangerouslySetInnerHtml: renderedContent is sanitized with DOMPurify
dangerouslySetInnerHTML={{ __html: renderedContent }}
/>
) : run.result ? (
<div className="prose prose-sm dark:prose-invert max-w-none rounded-lg border border-border bg-muted/50 p-4">
<MessageResponse>{run.result}</MessageResponse>
</div>
) : (
<div className="rounded-lg border border-border bg-muted/50 p-4 text-muted-foreground text-sm">
No result available

View File

@@ -138,12 +138,17 @@ export const scheduledJobRuns = async () => {
status: 'completed',
completedAt: new Date().toISOString(),
result: response.text,
finalResult: response.finalResult,
executionLog: response.executionLog,
toolCalls: response.toolCalls,
})
} catch (e) {
const errorMessage = e instanceof Error ? e.message : String(e)
await updateJobRun(jobRun.id, {
status: 'failed',
completedAt: new Date().toISOString(),
result: e instanceof Error ? e.message : String(e),
result: errorMessage,
error: errorMessage,
})
} finally {
await updateJobLastRunAt(jobId)

View File

@@ -1,3 +1,4 @@
import { createParser, type EventSourceMessage } from 'eventsource-parser'
import type { ChatMode } from '@/entrypoints/sidepanel/index/chatTypes'
import { getAgentServerUrl } from '@/lib/browseros/helpers'
import {
@@ -8,6 +9,7 @@ import type { LlmProviderConfig } from '@/lib/llm-providers/types'
import { mcpServerStorage } from '@/lib/mcp/mcpServerStorage'
import { personalizationStorage } from '../personalization/personalizationStorage'
import { scheduleSystemPrompt } from './scheduleSystemPrompt'
import type { ToolCallExecution } from './scheduleTypes'
interface ActiveTab {
id?: number
@@ -26,36 +28,38 @@ interface ChatServerRequest {
interface ChatServerResponse {
text: string
conversationId: string
finalResult: string
executionLog: string
toolCalls: ToolCallExecution[]
}
interface StreamEvent {
type: string
delta?: string
errorText?: string
interface ParsedStreamResult {
fullText: string
finalResult: string
executionLog: string
toolCalls: ToolCallExecution[]
error: string | null
}
interface StreamState {
result: string
streamError: string | null
}
type UIMessageEvent =
| { type: 'text-delta'; id: string; delta: string }
| {
type: 'tool-input-available'
toolCallId: string
toolName: string
input: unknown
}
| { type: 'tool-output-available'; toolCallId: string; output: unknown }
| { type: 'tool-output-error'; toolCallId: string; errorText: string }
| { type: 'error'; errorText: string }
function processStreamEvent(event: StreamEvent, state: StreamState): void {
if (event.type === 'text-delta' && event.delta) {
state.result += event.delta
} else if (event.type === 'error' && event.errorText) {
state.streamError = event.errorText
}
}
function tryParseStreamLine(line: string, state: StreamState): void {
if (!line.startsWith('data: ')) return
const data = line.slice(6)
if (data === '[DONE]') return
try {
processStreamEvent(JSON.parse(data), state)
} catch {
// Ignore JSON parse errors for malformed chunks
}
interface StreamParseState {
fullText: string
currentStepText: string
lastTextBeforeToolCall: string
executionSteps: string[]
toolCallsMap: Map<string, ToolCallExecution>
error: string | null
}
const getDefaultProvider = async (): Promise<LlmProviderConfig | null> => {
@@ -134,37 +138,124 @@ export async function getChatServerResponse(
)
}
const text = await parseSSEStream(response)
const parsed = await parseUIMessageStream(response)
return { text, conversationId }
}
async function parseSSEStream(response: Response): Promise<string> {
const reader = response.body?.getReader()
if (!reader) throw new Error('Response body is not readable')
const decoder = new TextDecoder()
const state: StreamState = { result: '', streamError: null }
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() ?? ''
for (const line of lines) {
tryParseStreamLine(line, state)
}
}
tryParseStreamLine(buffer, state)
} finally {
reader.releaseLock()
if (parsed.error) {
throw new Error(parsed.error)
}
if (state.streamError) throw new Error(state.streamError)
return state.result
return {
text: parsed.fullText,
conversationId,
finalResult: parsed.finalResult,
executionLog: parsed.executionLog,
toolCalls: parsed.toolCalls,
}
}
function processEvent(event: UIMessageEvent, state: StreamParseState): void {
if (event.type === 'text-delta') {
const text = event.delta
state.fullText += text
state.currentStepText += text
state.lastTextBeforeToolCall += text
} else if (event.type === 'tool-input-available') {
const toolCall: ToolCallExecution = {
id: event.toolCallId,
name: event.toolName,
input: event.input,
timestamp: new Date().toISOString(),
}
state.toolCallsMap.set(event.toolCallId, toolCall)
if (state.currentStepText.trim()) {
state.executionSteps.push(state.currentStepText.trim())
state.currentStepText = ''
}
} else if (event.type === 'tool-output-available') {
const existingCall = state.toolCallsMap.get(event.toolCallId)
if (existingCall) {
existingCall.output = event.output
}
} else if (event.type === 'tool-output-error') {
const existingCall = state.toolCallsMap.get(event.toolCallId)
if (existingCall) {
existingCall.error = event.errorText
}
} else if (event.type === 'error') {
state.error = event.errorText
}
}
async function parseUIMessageStream(
response: Response,
): Promise<ParsedStreamResult> {
if (!response.body) {
throw new Error('Response body is not readable')
}
const state: StreamParseState = {
fullText: '',
currentStepText: '',
lastTextBeforeToolCall: '',
executionSteps: [],
toolCallsMap: new Map(),
error: null,
}
const parser = createParser({
onEvent(event: EventSourceMessage) {
if (event.data === '[DONE]') return
try {
const parsedEvent = JSON.parse(event.data) as UIMessageEvent
processEvent(parsedEvent, state)
} catch {
// Ignore invalid JSON events
}
},
})
try {
const reader = response.body.getReader()
const decoder = new TextDecoder()
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value, { stream: true })
parser.feed(chunk)
}
const finalResult = state.currentStepText.trim()
? state.currentStepText.trim()
: state.lastTextBeforeToolCall.trim()
const allSteps = [...state.executionSteps]
if (finalResult) {
allSteps.push(finalResult)
}
return {
fullText: state.fullText,
finalResult,
executionLog: allSteps.join('\n\n'),
toolCalls: Array.from(state.toolCallsMap.values()),
error: state.error,
}
} catch (error) {
return {
fullText: state.fullText,
finalResult: '',
executionLog: state.executionSteps.join('\n\n'),
toolCalls: Array.from(state.toolCallsMap.values()),
error:
error instanceof Error
? error.message
: String(error || 'Unknown error'),
}
}
}

View File

@@ -10,6 +10,15 @@ export interface ScheduledJob {
lastRunAt?: string
}
export interface ToolCallExecution {
id: string
name: string
input: unknown
output?: unknown
error?: string
timestamp: string
}
export interface ScheduledJobRun {
id: string
jobId: string
@@ -17,4 +26,8 @@ export interface ScheduledJobRun {
completedAt?: string
status: 'running' | 'completed' | 'failed'
result?: string
finalResult?: string
executionLog?: string
toolCalls?: ToolCallExecution[]
error?: string
}

View File

@@ -47,13 +47,12 @@
"clsx": "^2.1.1",
"cmdk": "^1.1.1",
"dayjs": "^1.11.19",
"dompurify": "^3.3.1",
"downshift": "^9.0.10",
"embla-carousel-react": "^8.6.0",
"es-toolkit": "^1.42.0",
"eventsource-parser": "^3.0.6",
"klavis": "^2.15.0",
"lucide-react": "^0.554.0",
"marked": "^17.0.1",
"motion": "^12.23.24",
"nanoid": "^5.1.6",
"next-themes": "^0.4.6",

View File

@@ -119,8 +119,11 @@ export class Logger implements LoggerInterface {
private fileLogger: pino.Logger | null = null
private level: LogLevel
constructor(level: LogLevel = 'info') {
this.level = level
constructor(level?: LogLevel) {
this.level =
level ||
(process.env.LOG_LEVEL as LogLevel | undefined) ||
(isDev ? 'debug' : 'info')
this.consoleLogger = this.createConsoleLogger()
}

View File

@@ -55,6 +55,7 @@
"downshift": "^9.0.10",
"embla-carousel-react": "^8.6.0",
"es-toolkit": "^1.42.0",
"eventsource-parser": "^3.0.6",
"klavis": "^2.15.0",
"lucide-react": "^0.554.0",
"marked": "^17.0.1",

View File

@@ -100,6 +100,7 @@ function createEnvWithMutablePorts(
): NodeJS.ProcessEnv {
return {
...process.env,
NODE_ENV: 'development',
BROWSEROS_CDP_PORT: String(ports.cdp),
BROWSEROS_SERVER_PORT: String(ports.server),
BROWSEROS_EXTENSION_PORT: String(ports.extension),