From 40dee4453ad4c2a1f7a60752a880312b5bc8b96c Mon Sep 17 00:00:00 2001 From: MrHuangJser Date: Fri, 27 Mar 2026 10:50:32 +0800 Subject: [PATCH] feat(cursor): auto-migrate sessions to healthy account on quota exhaustion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a Cursor account's quota is exhausted, sessions bound to it can now seamlessly continue on a different account: Layer 1 — Checkpoint decoupling: Key checkpoints by conversationId (not authID:conversationId). Store authID inside savedCheckpoint. On lookup, if auth changed, discard the stale checkpoint and flatten conversation history into userText. Layer 2 — Cross-account session cleanup: When a request arrives for a conversation whose session belongs to a different (now-exhausted) auth, close the old H2 stream and remove the stale session to free resources. Layer 3 — H2Stream.Err() exposure: New Err() method on H2Stream so callers can inspect RST_STREAM, GOAWAY, or other stream-level errors after closure. Layer 4 — processH2SessionFrames error propagation: Returns error instead of bare return. Connect EndStream errors (quota, rate limit) are now propagated instead of being logged and swallowed. Layer 5 — Pre-response transparent retry: If the stream fails before any data is sent to the client, return an error to the conductor so it retries with a different auth — fully transparent to the client. Layer 6 — Post-response error logging: If the stream fails after data was already sent, log a warning. The conductor's existing cooldown mechanism ensures the next request routes to a healthy account. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/auth/cursor/proto/h2stream.go | 4 + internal/runtime/executor/cursor_executor.go | 147 ++++++++++++++++--- 2 files changed, 128 insertions(+), 23 deletions(-) diff --git a/internal/auth/cursor/proto/h2stream.go b/internal/auth/cursor/proto/h2stream.go index be3f7905..45b5baf7 100644 --- a/internal/auth/cursor/proto/h2stream.go +++ b/internal/auth/cursor/proto/h2stream.go @@ -205,6 +205,10 @@ func (s *H2Stream) Data() <-chan []byte { return s.dataCh } // Done returns a channel closed when the stream ends. func (s *H2Stream) Done() <-chan struct{} { return s.doneCh } +// Err returns the error (if any) that caused the stream to close. +// Returns nil for a clean shutdown (EOF / StreamEnded). +func (s *H2Stream) Err() error { return s.err } + // Close tears down the connection. func (s *H2Stream) Close() { s.conn.Close() diff --git a/internal/runtime/executor/cursor_executor.go b/internal/runtime/executor/cursor_executor.go index 67987e7f..2f34ee05 100644 --- a/internal/runtime/executor/cursor_executor.go +++ b/internal/runtime/executor/cursor_executor.go @@ -51,6 +51,7 @@ type CursorExecutor struct { type savedCheckpoint struct { data []byte // raw ConversationStateStructure protobuf bytes blobStore map[string][]byte // blobs referenced by the checkpoint + authID string // auth that produced this checkpoint (checkpoint is auth-specific) updatedAt time.Time } @@ -126,6 +127,19 @@ func (e *CursorExecutor) cleanupLoop() { } } +// findSessionByConversationLocked searches for a session matching the given +// conversationId regardless of authID. Used to find and clean up stale sessions +// from a previous auth after quota failover. Caller must hold e.mu. +func (e *CursorExecutor) findSessionByConversationLocked(convId string) string { + suffix := ":" + convId + for k := range e.sessions { + if strings.HasSuffix(k, suffix) { + return k + } + } + return "" +} + // PrepareRequest implements ProviderExecutor (for HttpRequest support). func (e *CursorExecutor) PrepareRequest(req *http.Request, auth *cliproxyauth.Auth) error { token := cursorAccessToken(auth) @@ -250,7 +264,7 @@ func (e *CursorExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r // Collect full text from streaming response var fullText strings.Builder - processH2SessionFrames(sessionCtx, stream, params.BlobStore, nil, + if streamErr := processH2SessionFrames(sessionCtx, stream, params.BlobStore, nil, func(text string, isThinking bool) { fullText.WriteString(text) }, @@ -258,7 +272,9 @@ func (e *CursorExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r nil, nil, // tokenUsage - non-streaming nil, // onCheckpoint - non-streaming doesn't persist - ) + ); streamErr != nil && fullText.Len() == 0 { + return resp, fmt.Errorf("cursor: stream error: %w", streamErr) + } id := "chatcmpl-" + uuid.New().String()[:28] created := time.Now().Unix() @@ -324,9 +340,10 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A authID := auth.ID // e.g. "cursor.json" or "cursor-account2.json" log.Debugf("cursor: conversationId=%s authID=%s", conversationId, authID) - // Include authID in keys for multi-account isolation + // Session key includes authID (H2 stream is auth-specific, not transferable). + // Checkpoint key uses conversationId only — allows detecting auth migration. sessionKey := authID + ":" + conversationId - checkpointKey := sessionKey // same isolation + checkpointKey := conversationId needsTranslate := from.String() != "" && from.String() != "openai" // Check if we can resume an existing session with tool results @@ -336,6 +353,20 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A if hasSession { delete(e.sessions, sessionKey) } + // If no session found for current auth, check for stale sessions from + // a different auth on the same conversation (quota failover scenario). + // Clean them up since the H2 stream belongs to the old account. + if !hasSession { + if oldKey := e.findSessionByConversationLocked(conversationId); oldKey != "" { + oldSession := e.sessions[oldKey] + log.Infof("cursor: cleaning up stale session from auth %s for conv=%s (auth migrated to %s)", oldSession.authID, conversationId, authID) + oldSession.cancel() + if oldSession.stream != nil { + oldSession.stream.Close() + } + delete(e.sessions, oldKey) + } + } e.mu.Unlock() if hasSession && session.stream != nil && session.authID == authID { @@ -347,23 +378,33 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A } } - // Clean up any stale session for this key + // Clean up any stale session for this key (or from a previous auth on same conversation) e.mu.Lock() if old, ok := e.sessions[sessionKey]; ok { old.cancel() delete(e.sessions, sessionKey) + } else if oldKey := e.findSessionByConversationLocked(conversationId); oldKey != "" { + old := e.sessions[oldKey] + old.cancel() + if old.stream != nil { + old.stream.Close() + } + delete(e.sessions, oldKey) } e.mu.Unlock() - // Look up saved checkpoint for this conversation + account + // Look up saved checkpoint for this conversation (keyed by conversationId only). + // Checkpoint is auth-specific: if auth changed (e.g. quota exhaustion failover), + // the old checkpoint is useless on the new account — discard and flatten. e.mu.Lock() saved, hasCheckpoint := e.checkpoints[checkpointKey] e.mu.Unlock() params := buildRunRequestParams(parsed, conversationId) - if hasCheckpoint && saved.data != nil { - log.Debugf("cursor: using saved checkpoint (%d bytes) for key=%s", len(saved.data), checkpointKey) + if hasCheckpoint && saved.data != nil && saved.authID == authID { + // Same auth — use checkpoint normally + log.Debugf("cursor: using saved checkpoint (%d bytes) for conv=%s auth=%s", len(saved.data), checkpointKey, authID) params.RawCheckpoint = saved.data // Merge saved blobStore into params if params.BlobStore == nil { @@ -374,6 +415,17 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A params.BlobStore[k] = v } } + } else if hasCheckpoint && saved.data != nil && saved.authID != authID { + // Auth changed (quota failover) — checkpoint is not portable across accounts. + // Discard and flatten conversation history into userText. + log.Infof("cursor: auth migrated (%s → %s) for conv=%s, discarding checkpoint and flattening context", saved.authID, authID, checkpointKey) + e.mu.Lock() + delete(e.checkpoints, checkpointKey) + e.mu.Unlock() + if len(parsed.ToolResults) > 0 || len(parsed.Turns) > 0 { + flattenConversationIntoUserText(parsed) + params = buildRunRequestParams(parsed, conversationId) + } } else if len(parsed.ToolResults) > 0 || len(parsed.Turns) > 0 { // Fallback: no checkpoint available (cold resume / proxy restart). // Flatten the full conversation history (including tool interactions) into userText. @@ -458,6 +510,21 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A } } + // Pre-response error detection for transparent failover: + // If the stream fails before any chunk is emitted (e.g. quota exceeded), + // ExecuteStream returns an error so the conductor retries with a different auth. + streamErrCh := make(chan error, 1) + firstChunkSent := make(chan struct{}, 1) // buffered: goroutine won't block signaling + + origEmitToOut := emitToOut + emitToOut = func(chunk cliproxyexecutor.StreamChunk) { + select { + case firstChunkSent <- struct{}{}: + default: + } + origEmitToOut(chunk) + } + go func() { var resumeOutCh chan cliproxyexecutor.StreamChunk _ = resumeOutCh @@ -466,7 +533,7 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A usage := &cursorTokenUsage{} usage.setInputEstimate(len(payload)) - processH2SessionFrames(sessionCtx, stream, params.BlobStore, params.McpTools, + streamErr := processH2SessionFrames(sessionCtx, stream, params.BlobStore, params.McpTools, func(text string, isThinking bool) { if isThinking { if !thinkingActive { @@ -537,19 +604,43 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A toolResultCh, usage, func(cpData []byte) { - // Save checkpoint for this conversation + // Save checkpoint keyed by conversationId, tagged with authID for migration detection e.mu.Lock() e.checkpoints[checkpointKey] = &savedCheckpoint{ data: cpData, blobStore: params.BlobStore, + authID: authID, updatedAt: time.Now(), } e.mu.Unlock() - log.Debugf("cursor: saved checkpoint (%d bytes) for key=%s", len(cpData), checkpointKey) + log.Debugf("cursor: saved checkpoint (%d bytes) for conv=%s auth=%s", len(cpData), checkpointKey, authID) }, ) - // processH2SessionFrames returned — stream is done + // processH2SessionFrames returned — stream is done. + // Check if error happened before any chunks were emitted. + if streamErr != nil { + select { + case <-firstChunkSent: + // Chunks were already sent to client — can't transparently retry. + // Next request will failover via conductor's cooldown mechanism. + log.Warnf("cursor: stream error after data sent (auth=%s conv=%s): %v", authID, conversationId, streamErr) + default: + // No data sent yet — propagate error for transparent conductor retry. + log.Warnf("cursor: stream error before data sent (auth=%s conv=%s): %v — signaling retry", authID, conversationId, streamErr) + streamErrCh <- streamErr + outMu.Lock() + if currentOut != nil { + close(currentOut) + currentOut = nil + } + outMu.Unlock() + sessionCancel() + stream.Close() + return + } + } + if thinkingActive { sendChunkSwitchable(`{"content":""}`, "") } @@ -584,7 +675,16 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A stream.Close() }() - return &cliproxyexecutor.StreamResult{Chunks: chunks}, nil + // Wait for either the first chunk or a pre-response error. + // If the stream fails before emitting any data (e.g. quota exceeded), + // return an error so the conductor retries with a different auth. + select { + case streamErr := <-streamErrCh: + return nil, fmt.Errorf("cursor: stream failed before response: %w", streamErr) + case <-firstChunkSent: + // Data started flowing — return stream to client + return &cliproxyexecutor.StreamResult{Chunks: chunks}, nil + } } // resumeWithToolResults injects tool results into the running processH2SessionFrames @@ -701,7 +801,7 @@ func processH2SessionFrames( toolResultCh <-chan []toolResultInfo, // nil for no tool result injection; non-nil to wait for results tokenUsage *cursorTokenUsage, // tracks accumulated token usage (may be nil) onCheckpoint func(data []byte), // called when server sends conversation_checkpoint_update -) { +) error { var buf bytes.Buffer rejectReason := "Tool not available in this environment. Use the MCP tools provided instead." log.Debugf("cursor: processH2SessionFrames started for streamID=%s, waiting for data...", stream.ID()) @@ -709,11 +809,11 @@ func processH2SessionFrames( select { case <-ctx.Done(): log.Debugf("cursor: processH2SessionFrames exiting: context done") - return + return ctx.Err() case data, ok := <-stream.Data(): if !ok { log.Debugf("cursor: processH2SessionFrames[%s]: exiting: stream data channel closed", stream.ID()) - return + return stream.Err() // may be RST_STREAM, GOAWAY, or nil for clean close } // Log first 20 bytes of raw data for debugging previewLen := min(20, len(data)) @@ -740,6 +840,7 @@ func processH2SessionFrames( if flags&cursorproto.ConnectEndStreamFlag != 0 { if err := cursorproto.ParseConnectEndStream(payload); err != nil { log.Warnf("cursor: connect end stream error: %v", err) + return err // propagate server-side errors (quota, rate limit, etc.) } continue } @@ -765,7 +866,7 @@ func processH2SessionFrames( case cursorproto.ServerMsgTurnEnded: log.Debugf("cursor: TurnEnded received, stream will finish") - return + return nil // clean completion case cursorproto.ServerMsgHeartbeat: // Server heartbeat, ignore silently @@ -818,7 +919,7 @@ func processH2SessionFrames( onMcpExec(pending) if toolResultCh == nil { - return + return nil } // Inline mode: wait for tool result while handling KV/heartbeat @@ -828,16 +929,16 @@ func processH2SessionFrames( for { select { case <-ctx.Done(): - return + return ctx.Err() case results, ok := <-toolResultCh: if !ok { - return + return nil } toolResults = results break waitLoop case waitData, ok := <-stream.Data(): if !ok { - return + return stream.Err() } buf.Write(waitData) for { @@ -875,7 +976,7 @@ func processH2SessionFrames( } } case <-stream.Done(): - return + return stream.Err() } } @@ -916,7 +1017,7 @@ func processH2SessionFrames( case <-stream.Done(): log.Debugf("cursor: processH2SessionFrames exiting: stream done") - return + return stream.Err() } } }