Files
DocsGPT/tests/test_cache.py
Alex e351f45d88 Feat notification system (#2472)
* feat: SSE notification system

Adds a per-user SSE pipe (GET /api/events) plus a per-message
chat-stream reconnect endpoint (GET /api/messages/<id>/events).

Backend substrate:
- application/events/ — durable journal (Redis Streams) + live
  pub/sub for user-scoped events, with publish_user_event() as
  the worker-side entrypoint.
- application/streaming/ — broadcast_channel for pub/sub fanout
  and event_replay for the per-message snapshot+tail path.
- application/storage/db/repositories/message_events.py +
  alembic 0007 — Postgres journal for chat-stream events.
- application/worker.py — ingest/reingest/remote/connector/
  attachment/mcp_oauth tasks publish queued/progress/completed/
  failed envelopes alongside their existing status updates.

Frontend client:
- frontend/src/events/ — connect/reconnect, Last-Event-ID cursor,
  backoff with jitter. Each tab runs its own connection; no
  cross-tab dedup (future work).
- frontend/src/notifications/ — recentEvents ring, cursor
  tracking, tool-approval toast.
- frontend/src/upload/uploadSlice.ts — extraReducers for
  source.ingest.* and attachment.* events.

Coverage: 132 SSE tests across events substrate, replay, journal,
routes, and worker publishes.

* refactor(attachments): remove polling, SSE-only

frontend/src/components/MessageInput.tsx no longer runs a 2s
setInterval against getTaskStatus for every processing
attachment. The attachment.* SSE reducers in uploadSlice.ts are
now the sole driver of attachment state transitions.

* feat(connector): consume source.ingest.* SSE, remove polling

frontend/src/components/ConnectorTree.tsx now mirrors FileTree's
slice-walking pattern: it watches notifications.recentEvents
for source.ingest.{completed,failed} envelopes matching the
sync's source id, and no longer polls /task_status every 2s.

* refactor(source-ingest): remove polling, SSE-only

frontend/src/upload/Upload.tsx and
frontend/src/components/FileTree.tsx no longer run getTaskStatus
polling fallbacks. The source.ingest.* SSE reducers in
uploadSlice.ts and FileTree's slice walk are now the sole
drivers of upload/reingest state transitions.

* refactor(mcp-oauth): carry authorization_url in SSE, remove polling

application/worker.py::mcp_oauth now publishes
authorization_url on the mcp.oauth.awaiting_redirect envelope.
frontend/src/modals/MCPServerModal.tsx consumes it from SSE
instead of polling /oauth_status/<task_id> every 1s.

The URL is generated inside DocsGPTOAuth.redirect_handler when
the FastMCP client triggers OAuth. The worker now plumbs a
publish callback through tool_config -> MCPTool -> DocsGPTOAuth
so the awaiting_redirect publish fires from inside the handler
at the exact point the URL becomes known. The legacy Redis
mcp_oauth_status setex writes and the GET
/api/mcp_server/oauth_status/<task_id> endpoint are kept as
belt-and-suspenders; nothing in the frontend reads them now.

* feat(source-ingest): plumb limited flag through SSE for token-cap UX

application/worker.py::ingest_worker and remote_worker now publish
``limited: bool`` on the source.ingest.completed envelope.
uploadSlice routes ``payload.limited === true`` to a failed status
with a ``tokenLimitReached`` flag, and UploadToast surfaces the
translated tokenLimit i18n string. No worker code path sets
limited=true today; this is a forward-looking contract so when
token-cap detection lands, the UX is already wired.

* refactor(mcp-oauth): read status from SSE journal, drop polling endpoint

MCPOAuthManager.get_oauth_status now walks the per-user SSE Streams
journal (user:{user_id}:stream) for the latest mcp.oauth.* envelope
matching the task id, returning the status string derived from the
event type suffix and the payload fields. The worker is the single
source of truth — its publish_user_event calls write the same
record the SSE client receives live.

Removed:
- /api/mcp_server/oauth_status/<task_id> route in
  application/api/user/tools/mcp.py
- mcp_oauth_status worker function and mcp_oauth_status_task Celery
  wrapper
- All mcp_oauth_status:{task_id} Redis setex writes (4 in mcp_oauth,
  2 in DocsGPTOAuth.redirect_handler / callback_handler)
- The update_status closure in mcp_oauth that wrote the polling
  payload

Tests updated:
- get_oauth_status now takes (task_id, user_id); new coverage walks
  a fake xrevrange response for the completed envelope, the no-match
  case, and a Redis-down case
- Removed TestMCPOAuthStatus route tests and TestMcpOauthStatusTask
  celery-wrapper test
- Removed the two oauth_status methods from the integration runner

mcp_oauth:auth_url/state/code/error Redis keys remain — they are
the OAuth flow's own state (not the dropped polling payload).

* chore(mcp-oauth): delete orphaned getMCPOAuthStatus client

The /api/mcp_server/oauth_status/<task_id> endpoint was removed in
the prior commit; the corresponding userService method and the
MCP_OAUTH_STATUS endpoint constant had no remaining callers in the
frontend, so they're deleted along with it.

* fix(events): drop live publish when journal write fails

application/events/publisher.py returned an envelope to live
pubsub subscribers even when the XADD to the durable journal
failed. The envelope had no ``id`` field, which bypassed the SSE
route's dedup floor and broke ``Last-Event-ID`` semantics for any
reconnecting client.

Best-effort delivery means dropping consistently, not delivering
inconsistent state. Now: if the journal write fails the publisher
returns None and skips the live publish entirely.

* fix(notifications): dedupe sseEventReceived against immediate dupes

Snapshot replay + live tail can both deliver the same id when the
live pubsub frame and the replay XRANGE overlap. The route's own
dedup floor catches the common case, but consumers walking
``recentEvents`` (FileTree, ConnectorTree, MCPServerModal,
ToolApprovalToast) would otherwise act on the same envelope
twice when a duplicate slipped through.

Belt-and-suspenders: short-circuit when the most recent id in
the ring matches the incoming one.

* fix(events): skip replay budget INCR when no snapshot work possible

_allow_replay incremented the per-user counter on every
/api/events GET, including no-op connects from a fresh client
with no cursor against an empty backlog. React StrictMode dev
double-mounts plus a few tabs trivially tripped the default
30-per-60s budget on idle reconnects.

XLEN pre-check: when last_event_id is None and the user stream
is empty, the connect can't do snapshot work — return True
without INCR. Cursor-bearing connects still INCR unconditionally
(probing the cursor's relationship to stream contents would
require a redundant XRANGE).

* fix(streaming): tighten journal contract + recover from seq collisions

Two related fixes to application/streaming/message_journal.py.

1. record_event now rejects non-dict payloads at the gate. The
   live path (base.py::_emit) wrapped non-dicts as
   {"value": payload}; the replay path in event_replay synthesized
   {"type": event_type}. A reconnecting client would receive a
   different envelope than the one originally streamed. Now both
   paths see byte-identical envelopes because non-dicts can't be
   journaled at all. The corresponding event_replay fallback is
   replaced with a warn-and-skip for any legacy rows.

2. record_event handles IntegrityError on (message_id, sequence_no)
   collisions by reading latest_sequence_no and retrying once with
   latest+1. The most likely cause is a stale seq seed on a
   continuation retry where the route read MAX(seq) from a
   separate connection before another writer committed past it.
   Previously the error was swallowed and the event silently
   dropped from the journal; now it lands at the next available
   seq. The live pubsub publish uses the materialised seq so the
   journal row and the live frame agree.

* perf(streaming): batch message_events INSERTs per stream

complete_stream previously opened a fresh db_session() per yielded
event, doing one Postgres INSERT + commit per chunk on the WSGI
thread. Streaming answers emit ~100s of answer chunks per response,
so the route was paying ~100 PG roundtrips per stream serialized on
commit latency.

New BatchedJournalWriter in application/streaming/message_journal.py
accumulates rows per stream and flushes on three triggers:
- size: buffer reaches 16 entries
- time: 100ms elapsed since the last flush
- lifecycle: close() at end-of-stream

Live pubsub publishes still fire synchronously per record(), so
subscribers see events in real time — only the durable journal write
is amortized. On bulk INSERT IntegrityError the writer falls back to
per-row record() with the existing seq+1 retry so a single colliding
seq doesn't drop the rest of the batch.

complete_stream wires journal_writer.close() into every exit path
(happy end, tool-approval-paused end, GeneratorExit, error handler)
so the terminal event is committed before the generator returns —
otherwise a reconnecting client could snapshot up to the last flush
boundary and live-tail waiting for an end that's still in memory.

Repository gets bulk_record() — one SQLAlchemy executemany INSERT
for the bulk path. All-or-nothing on collision (Postgres aborts the
whole batch); the writer's per-row fallback handles recovery.

* chore(upload): drop dead UploadTask.lastEventAt field

The lastEventAt field on UploadTask had no remaining consumers — the
matching Attachment.lastEventAt was cleaned up earlier. Remove the
field declaration and the slice write site.

* chore(frontend): drop orphaned getTaskStatus client

After the polling-removal sweep no caller in frontend/src/ references
userService.getTaskStatus or endpoints.USER.TASK_STATUS. The backend
route /api/task_status itself stays — agents, webhooks, e2e specs,
and the public docs still depend on it.

* docs(repo): remove stale planning docs from repo root

notification-channel-design.md, plan.md, and reminder-tool-design.md
were leftover Claude planning artifacts from the SSE substrate work
that landed accidentally. CLAUDE.md prohibits creating planning docs
unless asked — delete them.

* docs(message-events): clarify repo vs wrapper payload contract

MessageEventsRepository.record accepts any JSONB-compatible value; the
streaming wrapper record_event tightens this to dicts only because the
live and replay paths reconstruct non-dict payloads differently. Spell
the split out so the next reader of the repo method doesn't assume the
wrapper's contract applies here.

* refactor(events): raise on malformed stream id instead of lex fallback

stream_id_compare's lex-fallback branch was a footgun: a malformed id
that sorts lex-greater than a real one would pin live-tail dedup
forever, dropping every subsequent legitimate event silently. Both
current callers in application/api/events/routes.py pre-validate
inputs against _STREAM_ID_RE before calling, so changing the function
to raise ValueError is a no-op on the happy path and turns the future-
caller footgun into a loud failure.

* test(tasks): cover cleanup_message_events task body

Adds skipped-when-no-POSTGRES_URI and happy-path coverage for the
Celery janitor. The skipped path returns the documented short-circuit
shape without touching the repo. The happy path seeds a backdated
row, runs the task against the pg_conn fixture, and asserts the
retention window's row is deleted while in-window rows survive.
Mirrors the TestCleanupPendingToolState pattern.

* fix(notifications): treat /c/new as no current conversation

useMatch('/c/:conversationId') treats the literal URL /c/new as a
real conversation id, so the toast suppression check confused
'user is on /c/new' with 'user is on the conversation needing
approval'. Explicit guard: when the matched id is 'new', fall
through to the no-match case so approval toasts still surface.

* docs(events): enumerate publish_user_event None-return paths

The function returns Optional[str] today, with None conflating five
distinct outcomes (missing args / push disabled / unserialisable /
Redis down / XADD failed). Every current call site is fire-and-
forget and ignores the return, so the right move is to document the
five cases rather than promote to an enum return — keeps the API
small while making the diagnostic surface (logs) obvious. If a
future caller needs to react differently per reason, promote then.

* refactor(sources): move source-id derivation out of worker module

application/api/user/sources/upload.py imported _derive_source_id
from application.worker — pulling the entire Celery worker module
into the API process at import time just for a two-line helper.

Move DOCSGPT_INGEST_NAMESPACE and the derivation function to a
new application/storage/db/source_ids.py module that both layers
can import without that dependency edge. worker.py re-exports the
old names (_derive_source_id, DOCSGPT_INGEST_NAMESPACE) for
backward-compatible imports from tests and any other in-tree
callers; new code should import from the new module directly.

* fix(cache): enable Redis health_check_interval to surface half-open TCP

Without health_check_interval, a half-open TCP socket (NAT silently
dropped state, ELB idle-close) can leave pubsub.get_message hanging
past the SSE generator's keepalive cadence — the kernel never
surfaces the dead socket because no payload is in flight. Setting
health_check_interval=10 makes redis-py ping every 10s when
otherwise idle, so the next get_message after the dead window
raises and the SSE loop falls into its reconnect path instead of
silently freezing on the user.

* chore(events): rename attachment.processing.progress to attachment.progress

The event-type taxonomy was inconsistent: source ingest emits
source.ingest.progress (three segments) while attachments emitted
attachment.processing.progress (four segments). Drops the
.processing. infix for parity. Worker publish sites, the slice
reducer's match, and the worker tests all flip together.

No external consumers — the event type is purely internal between
the publisher and the in-tab slice; safe to rename in one commit.

* feat: events cleanup

* fix: better docs

* fix: e2e tests
2026-05-15 12:23:31 +01:00

534 lines
17 KiB
Python

import json
from unittest.mock import MagicMock, patch
import pytest
from application.cache import (
gen_cache,
gen_cache_key,
get_redis_instance,
stream_cache,
)
from application.utils import get_hash
@pytest.mark.unit
def test_make_gen_cache_key():
messages = [
{"role": "user", "content": "test_user_message"},
{"role": "system", "content": "test_system_message"},
]
model = "test_docgpt"
tools = None
messages_str = json.dumps(messages)
tools_str = json.dumps(tools) if tools else ""
expected_combined = f"{model}_{messages_str}_{tools_str}"
expected_hash = get_hash(expected_combined)
cache_key = gen_cache_key(messages, model=model, tools=None)
assert cache_key == expected_hash
@pytest.mark.unit
def test_gen_cache_key_invalid_message_format():
with pytest.raises(ValueError, match="All messages must be dictionaries."):
gen_cache_key("This is not a list", model="docgpt", tools=None)
@pytest.mark.unit
@patch("application.cache.get_redis_instance")
def test_gen_cache_hit(mock_make_redis):
mock_redis_instance = MagicMock()
mock_make_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = b"cached_result"
@gen_cache
def mock_function(self, model, messages, stream, tools):
return "new_result"
messages = [{"role": "user", "content": "test_user_message"}]
model = "test_docgpt"
result = mock_function(None, model, messages, stream=False, tools=None)
assert result == "cached_result"
mock_redis_instance.get.assert_called_once()
mock_redis_instance.set.assert_not_called()
@pytest.mark.unit
@patch("application.cache.get_redis_instance")
def test_gen_cache_miss(mock_make_redis):
mock_redis_instance = MagicMock()
mock_make_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = None
@gen_cache
def mock_function(self, model, messages, steam, tools):
return "new_result"
messages = [
{"role": "user", "content": "test_user_message"},
{"role": "system", "content": "test_system_message"},
]
model = "test_docgpt"
result = mock_function(None, model, messages, stream=False, tools=None)
assert result == "new_result"
mock_redis_instance.get.assert_called_once()
@pytest.mark.unit
@patch("application.cache.get_redis_instance")
def test_stream_cache_hit(mock_make_redis):
mock_redis_instance = MagicMock()
mock_make_redis.return_value = mock_redis_instance
cached_chunk = json.dumps(["chunk1", "chunk2"]).encode("utf-8")
mock_redis_instance.get.return_value = cached_chunk
@stream_cache
def mock_function(self, model, messages, stream, tools):
yield "new_chunk"
messages = [{"role": "user", "content": "test_user_message"}]
model = "test_docgpt"
result = list(mock_function(None, model, messages, stream=True, tools=None))
assert result == ["chunk1", "chunk2"]
mock_redis_instance.get.assert_called_once()
mock_redis_instance.set.assert_not_called()
@pytest.mark.unit
@patch("application.cache.get_redis_instance")
def test_stream_cache_miss(mock_make_redis):
mock_redis_instance = MagicMock()
mock_make_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = None
@stream_cache
def mock_function(self, model, messages, stream, tools):
yield "new_chunk"
messages = [
{"role": "user", "content": "This is the context"},
{"role": "system", "content": "Some other message"},
{"role": "user", "content": "What is the answer?"},
]
model = "test_docgpt"
result = list(mock_function(None, model, messages, stream=True, tools=None))
assert result == ["new_chunk"]
mock_redis_instance.get.assert_called_once()
mock_redis_instance.set.assert_called_once()
# ── get_redis_instance ──────────────────────────────────────────────────────
@pytest.mark.unit
class TestGetRedisInstance:
def setup_method(self):
"""Reset module-level redis state between tests."""
import application.cache as cache_mod
cache_mod._redis_instance = None
cache_mod._redis_creation_failed = False
def teardown_method(self):
import application.cache as cache_mod
cache_mod._redis_instance = None
cache_mod._redis_creation_failed = False
@patch("application.cache.redis.Redis.from_url")
@patch("application.cache.settings")
def test_creates_redis_instance(self, mock_settings, mock_from_url):
mock_settings.CACHE_REDIS_URL = "redis://localhost:6379/0"
mock_instance = MagicMock()
mock_from_url.return_value = mock_instance
result = get_redis_instance()
assert result is mock_instance
mock_from_url.assert_called_once_with(
"redis://localhost:6379/0",
socket_connect_timeout=2,
health_check_interval=10,
)
@patch("application.cache.redis.Redis.from_url")
@patch("application.cache.settings")
def test_returns_cached_instance(self, mock_settings, mock_from_url):
mock_settings.CACHE_REDIS_URL = "redis://localhost:6379/0"
mock_instance = MagicMock()
mock_from_url.return_value = mock_instance
result1 = get_redis_instance()
result2 = get_redis_instance()
assert result1 is result2
assert mock_from_url.call_count == 1
@patch("application.cache.redis.Redis.from_url")
@patch("application.cache.settings")
def test_value_error_stops_retries(self, mock_settings, mock_from_url):
import application.cache as cache_mod
mock_settings.CACHE_REDIS_URL = "invalid://url"
mock_from_url.side_effect = ValueError("Invalid Redis URL")
result = get_redis_instance()
assert result is None
assert cache_mod._redis_creation_failed is True
# Subsequent calls should not retry
mock_from_url.reset_mock()
result2 = get_redis_instance()
assert result2 is None
mock_from_url.assert_not_called()
@patch("application.cache.redis.Redis.from_url")
@patch("application.cache.settings")
def test_connection_error_allows_retries(self, mock_settings, mock_from_url):
import application.cache as cache_mod
import redis as redis_mod
mock_settings.CACHE_REDIS_URL = "redis://unreachable:6379/0"
mock_from_url.side_effect = redis_mod.ConnectionError("Connection refused")
result = get_redis_instance()
assert result is None
assert cache_mod._redis_creation_failed is False
# Subsequent calls should retry
mock_from_url.side_effect = None
mock_from_url.return_value = MagicMock()
result2 = get_redis_instance()
assert result2 is not None
# ── gen_cache_key edge cases ────────────────────────────────────────────────
@pytest.mark.unit
def test_gen_cache_key_with_tools():
messages = [{"role": "user", "content": "test"}]
tools = [{"type": "function", "function": {"name": "test"}}]
key = gen_cache_key(messages, model="docgpt", tools=tools)
assert isinstance(key, str)
assert len(key) == 32
@pytest.mark.unit
def test_gen_cache_key_default_model():
messages = [{"role": "user", "content": "test"}]
key = gen_cache_key(messages)
assert isinstance(key, str)
assert len(key) == 32
@pytest.mark.unit
def test_gen_cache_key_deterministic():
messages = [{"role": "user", "content": "test"}]
key1 = gen_cache_key(messages, model="m1")
key2 = gen_cache_key(messages, model="m1")
assert key1 == key2
@pytest.mark.unit
def test_gen_cache_key_different_models():
messages = [{"role": "user", "content": "test"}]
key1 = gen_cache_key(messages, model="m1")
key2 = gen_cache_key(messages, model="m2")
assert key1 != key2
# ── gen_cache with tools bypass ─────────────────────────────────────────────
@pytest.mark.unit
@patch("application.cache.get_redis_instance")
def test_gen_cache_bypasses_when_tools_provided(mock_make_redis):
"""When tools are provided, caching is bypassed."""
mock_redis_instance = MagicMock()
mock_make_redis.return_value = mock_redis_instance
@gen_cache
def mock_function(self, model, messages, stream, tools):
return "direct_result"
messages = [{"role": "user", "content": "test"}]
tools = [{"type": "function"}]
result = mock_function(None, "model", messages, stream=False, tools=tools)
assert result == "direct_result"
mock_redis_instance.get.assert_not_called()
@pytest.mark.unit
@patch("application.cache.get_redis_instance")
def test_gen_cache_no_redis(mock_make_redis):
"""When redis is unavailable, function runs without caching."""
mock_make_redis.return_value = None
@gen_cache
def mock_function(self, model, messages, stream, tools):
return "no_cache_result"
messages = [{"role": "user", "content": "test"}]
result = mock_function(None, "model", messages, stream=False, tools=None)
assert result == "no_cache_result"
@pytest.mark.unit
@patch("application.cache.get_redis_instance")
def test_gen_cache_redis_get_error(mock_make_redis):
"""When redis.get raises, function falls through gracefully."""
mock_redis_instance = MagicMock()
mock_make_redis.return_value = mock_redis_instance
mock_redis_instance.get.side_effect = Exception("Redis error")
@gen_cache
def mock_function(self, model, messages, stream, tools):
return "fallback_result"
messages = [{"role": "user", "content": "test"}]
result = mock_function(None, "model", messages, stream=False, tools=None)
assert result == "fallback_result"
@pytest.mark.unit
@patch("application.cache.get_redis_instance")
def test_gen_cache_redis_set_error(mock_make_redis):
"""When redis.set raises, the result is still returned."""
mock_redis_instance = MagicMock()
mock_make_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = None
mock_redis_instance.set.side_effect = Exception("Redis write error")
@gen_cache
def mock_function(self, model, messages, stream, tools):
return "result_str"
messages = [{"role": "user", "content": "test"}]
result = mock_function(None, "model", messages, stream=False, tools=None)
assert result == "result_str"
@pytest.mark.unit
@patch("application.cache.get_redis_instance")
def test_gen_cache_non_string_result_not_cached(mock_make_redis):
"""Non-string results should not be cached."""
mock_redis_instance = MagicMock()
mock_make_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = None
@gen_cache
def mock_function(self, model, messages, stream, tools):
return {"key": "value"} # not a string
messages = [{"role": "user", "content": "test"}]
result = mock_function(None, "model", messages, stream=False, tools=None)
assert result == {"key": "value"}
mock_redis_instance.set.assert_not_called()
# ── stream_cache edge cases ─────────────────────────────────────────────────
@pytest.mark.unit
@patch("application.cache.get_redis_instance")
def test_stream_cache_bypasses_when_tools_provided(mock_make_redis):
"""When tools are provided, streaming cache is bypassed."""
mock_redis_instance = MagicMock()
mock_make_redis.return_value = mock_redis_instance
@stream_cache
def mock_function(self, model, messages, stream, tools):
yield "direct_chunk"
messages = [{"role": "user", "content": "test"}]
tools = [{"type": "function"}]
result = list(mock_function(None, "model", messages, stream=True, tools=tools))
assert result == ["direct_chunk"]
mock_redis_instance.get.assert_not_called()
@pytest.mark.unit
@patch("application.cache.get_redis_instance")
def test_stream_cache_no_redis(mock_make_redis):
"""When redis is unavailable, streaming works without caching."""
mock_make_redis.return_value = None
@stream_cache
def mock_function(self, model, messages, stream, tools):
yield "chunk1"
yield "chunk2"
messages = [{"role": "user", "content": "test"}]
result = list(mock_function(None, "model", messages, stream=True, tools=None))
assert result == ["chunk1", "chunk2"]
@pytest.mark.unit
@patch("application.cache.get_redis_instance")
def test_stream_cache_redis_get_error(mock_make_redis):
"""When redis.get raises during stream, falls through gracefully."""
mock_redis_instance = MagicMock()
mock_make_redis.return_value = mock_redis_instance
mock_redis_instance.get.side_effect = Exception("Redis error")
@stream_cache
def mock_function(self, model, messages, stream, tools):
yield "fallback_chunk"
messages = [{"role": "user", "content": "test"}]
result = list(mock_function(None, "model", messages, stream=True, tools=None))
assert result == ["fallback_chunk"]
@pytest.mark.unit
@patch("application.cache.get_redis_instance")
def test_stream_cache_redis_set_error(mock_make_redis):
"""When redis.set raises during stream save, chunks are still yielded."""
mock_redis_instance = MagicMock()
mock_make_redis.return_value = mock_redis_instance
mock_redis_instance.get.return_value = None
mock_redis_instance.set.side_effect = Exception("Redis write error")
@stream_cache
def mock_function(self, model, messages, stream, tools):
yield "chunk"
messages = [{"role": "user", "content": "test"}]
result = list(mock_function(None, "model", messages, stream=True, tools=None))
assert result == ["chunk"]
# =====================================================================
# Coverage gap tests (lines 86-89)
# =====================================================================
@patch("application.cache.get_redis_instance")
def test_stream_cache_key_generation_failure_yields(mock_make_redis):
"""Cover lines 86-89: ValueError in gen_cache_key falls through to func."""
mock_make_redis.return_value = None
@stream_cache
def mock_function(self, model, messages, stream, tools):
yield "fallback_chunk"
# Pass invalid messages (not dicts) to trigger ValueError in gen_cache_key
messages = ["not_a_dict"]
result = list(mock_function(None, "model", messages, stream=True, tools=None))
assert result == ["fallback_chunk"]
# =====================================================================
# gen_cache_key with inline bytes (Google attachments)
# =====================================================================
@pytest.mark.unit
def test_gen_cache_key_handles_inline_bytes():
"""Image attachments arrive in messages as raw bytes (see
GoogleLLM.prepare_messages_with_attachments). gen_cache_key must not
crash on json.dumps of bytes."""
msgs = [
{
"role": "user",
"content": [{"file_bytes": b"\x00\x01\x02", "mime_type": "image/png"}],
}
]
key = gen_cache_key(msgs, model="x")
assert isinstance(key, str)
assert len(key) == 32
@pytest.mark.unit
def test_gen_cache_key_stable_for_same_bytes():
"""Two requests with identical image bytes must produce the same key
— otherwise we'd never get cache hits on image-bearing prompts."""
a = [
{
"role": "user",
"content": [{"file_bytes": b"abc", "mime_type": "image/png"}],
}
]
b = [
{
"role": "user",
"content": [{"file_bytes": b"abc", "mime_type": "image/png"}],
}
]
assert gen_cache_key(a, "m") == gen_cache_key(b, "m")
@pytest.mark.unit
def test_gen_cache_key_differs_for_different_bytes():
"""Different image bytes must produce different keys — otherwise two
different images would collide in cache."""
a = [
{
"role": "user",
"content": [{"file_bytes": b"abc", "mime_type": "image/png"}],
}
]
b = [
{
"role": "user",
"content": [{"file_bytes": b"xyz", "mime_type": "image/png"}],
}
]
assert gen_cache_key(a, "m") != gen_cache_key(b, "m")
@pytest.mark.unit
def test_gen_cache_key_handles_bytearray_and_memoryview():
"""The default helper covers all bytes-like types so refactors that
swap bytes for bytearray/memoryview don't silently re-introduce the
TypeError."""
msgs_ba = [
{
"role": "user",
"content": [
{"file_bytes": bytearray(b"abc"), "mime_type": "image/png"}
],
}
]
msgs_mv = [
{
"role": "user",
"content": [
{"file_bytes": memoryview(b"abc"), "mime_type": "image/png"}
],
}
]
msgs_b = [
{
"role": "user",
"content": [{"file_bytes": b"abc", "mime_type": "image/png"}],
}
]
# All three should hash the same content to the same key.
assert gen_cache_key(msgs_ba, "m") == gen_cache_key(msgs_b, "m")
assert gen_cache_key(msgs_mv, "m") == gen_cache_key(msgs_b, "m")