Files
pocketpaw/ee/cloud/shared/chat_persistence.py
Rohit Kushwaha ff5bb0350a fix: resolve all 394 ruff lint errors
- Auto-fix 155 errors (import sorting, annotations, deprecated imports)
- Format 87 files with ruff format for line length compliance
- Fix 15 F401 unused imports (add __all__ for re-exports, remove truly unused)
- Fix 7 F841 unused variables (prefix with _)
- Fix 2 F821 undefined names (add missing imports)
- Fix 3 E402 module-level imports not at top
- Fix 2 UP042 str+Enum → StrEnum
- Fix 1 E712 == False comparison
- Fix remaining 51 E501 line-too-long in string literals and expressions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 10:08:58 +05:30

186 lines
6.0 KiB
Python

"""Chat persistence bridge — saves runtime WebSocket messages to MongoDB.
Subscribes to the message bus outbound channel to persist agent responses.
User messages are persisted via save_user_message() called from the WS adapter.
This ensures all chat history is in MongoDB regardless of chat system.
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime
logger = logging.getLogger(__name__)
# Track active sessions: websocket chat_id → cloud session info
_active_sessions: dict[str, dict] = {}
# Accumulate streaming chunks per session
_stream_buffers: dict[str, str] = {}
def register_chat_persistence() -> None:
"""Subscribe to the message bus to persist outbound messages to MongoDB."""
try:
from pocketpaw.bus.queue import get_bus
bus = get_bus()
if bus is None:
logger.debug("Message bus not available, chat persistence not registered")
return
from pocketpaw.bus.events import Channel
bus.subscribe_outbound(Channel.WEBSOCKET, _on_outbound_message)
logger.info("Chat persistence bridge registered")
except Exception:
logger.debug("Failed to register chat persistence", exc_info=True)
async def save_user_message(chat_id: str, content: str) -> None:
"""Called by the WebSocket adapter to persist a user message."""
try:
session_info = await _ensure_cloud_session(chat_id)
if not session_info:
return
from ee.cloud.models.message import Message
msg = Message(
group=session_info["group_id"],
sender=session_info["user_id"],
sender_type="user",
content=content,
)
await msg.insert()
except Exception:
logger.debug("Failed to persist user message", exc_info=True)
async def _on_outbound_message(message) -> None:
"""Accumulate agent stream chunks and save final message to MongoDB."""
try:
chat_id = message.chat_id
if message.is_stream_chunk:
_stream_buffers[chat_id] = _stream_buffers.get(chat_id, "") + (message.content or "")
return
if message.is_stream_end:
full_text = _stream_buffers.pop(chat_id, "")
if not full_text.strip():
return
session_info = await _ensure_cloud_session(chat_id)
if not session_info:
return
from ee.cloud.models.message import Message
msg = Message(
group=session_info["group_id"],
sender=None,
sender_type="agent",
content=full_text,
)
await msg.insert()
# Touch session activity
from ee.cloud.models.session import Session
session_doc = await Session.find_one(Session.sessionId == f"websocket_{chat_id}")
if session_doc:
session_doc.lastActivity = datetime.now(UTC)
session_doc.messageCount += 1
await session_doc.save()
return
# Non-streaming content accumulation
if message.content and not message.is_stream_chunk:
_stream_buffers[chat_id] = _stream_buffers.get(chat_id, "") + (message.content or "")
except Exception:
logger.debug("Failed to persist outbound message", exc_info=True)
async def _ensure_cloud_session(chat_id: str) -> dict | None:
"""Find or create a cloud session + group for a runtime WebSocket chat."""
if chat_id in _active_sessions:
return _active_sessions[chat_id]
try:
from ee.cloud.models.group import Group
from ee.cloud.models.session import Session
session_id = f"websocket_{chat_id}"
# Check if session already exists
session = await Session.find_one(Session.sessionId == session_id)
if session and session.group:
info = {
"session_id": str(session.id),
"group_id": session.group,
"user_id": session.owner,
}
_active_sessions[chat_id] = info
return info
# No session yet — we need a workspace context
# Try to get the first available workspace
from ee.cloud.models.user import User
users = await User.find({"workspaces": {"$ne": []}}).limit(1).to_list()
if not users:
return None
user = users[0]
workspace_id = user.workspaces[0].workspace if user.workspaces else None
if not workspace_id:
return None
user_id = str(user.id)
# Create a runtime chat group if needed
if not session:
# Create group for this runtime session
group = Group(
workspace=workspace_id,
name="PocketPaw Chat",
type="dm",
members=[user_id],
owner=user_id,
)
await group.insert()
session = Session(
sessionId=session_id,
workspace=workspace_id,
owner=user_id,
title="PocketPaw Chat",
group=str(group.id),
)
await session.insert()
info = {"session_id": str(session.id), "group_id": str(group.id), "user_id": user_id}
else:
# Session exists but no group — create one
group = Group(
workspace=workspace_id,
name="PocketPaw Chat",
type="dm",
members=[user_id],
owner=user_id,
)
await group.insert()
session.group = str(group.id)
await session.save()
info = {"session_id": str(session.id), "group_id": str(group.id), "user_id": user_id}
_active_sessions[chat_id] = info
logger.info(
"Created cloud session for runtime chat: %s → group %s", session_id, info["group_id"]
)
return info
except Exception:
logger.debug("Failed to ensure cloud session for %s", chat_id, exc_info=True)
return None