refactor: migrate vector sync from asyncio.Queue to anyio memory object streams
Replace asyncio.Queue with anyio.create_memory_object_stream() throughout
the vector sync system for better library consistency and improved shutdown
semantics.
## Changes Made
**scanner.py**:
- Changed parameter type from `asyncio.Queue` to `MemoryObjectSendStream[DocumentTask]`
- Replaced all `await document_queue.put()` calls with `await send_stream.send()`
- Wrapped scanner loop in `async with send_stream:` context manager for automatic cleanup
- Updated log messages: "Queued" → "Sent"
- Removed `import asyncio` (no longer needed)
**processor.py**:
- Changed parameter type from `asyncio.Queue` to `MemoryObjectReceiveStream[DocumentTask]`
- Replaced `asyncio.wait_for(document_queue.get(), timeout=1.0)` with `anyio.fail_after(1.0)` + `await receive_stream.receive()`
- Removed all `document_queue.task_done()` calls (not needed with streams)
- Added `anyio.EndOfStream` exception handling for graceful shutdown when scanner closes
- Removed `import asyncio` (no longer needed)
**app.py**:
- Removed `import asyncio` from top-level imports
- Added `from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream`
- Updated AppContext dataclass:
- Replaced `document_queue: Optional[asyncio.Queue]` with:
- `document_send_stream: Optional[MemoryObjectSendStream]`
- `document_receive_stream: Optional[MemoryObjectReceiveStream]`
- Updated `app_lifespan_basic()`:
- Replaced `asyncio.Queue(maxsize=...)` with `anyio.create_memory_object_stream(max_buffer_size=...)`
- Pass `send_stream` to scanner_task
- Pass `receive_stream.clone()` to each processor_task (enables multiple consumers)
- Updated AppContext yield to include both streams
- Updated `starlette_lifespan()`:
- Same changes as app_lifespan_basic for streamable-http transport
- Removed `import asyncio as asyncio_module` (no longer needed)
- Updated app.state storage to use send_stream and receive_stream
**semantic.py**:
- Updated `nc_get_vector_sync_status()` tool:
- Access `document_receive_stream` instead of `document_queue` from lifespan context
- Use `stream_stats.current_buffer_used` instead of `queue.qsize()` for pending count
- More reliable metrics (qsize() was not guaranteed accurate)
## Benefits
1. **Library Consistency**: Pure anyio throughout codebase (was mixing asyncio.Queue with anyio.Event and anyio.create_task_group)
2. **Graceful Shutdown**: `async with send_stream:` automatically closes stream on exit, signaling EndOfStream to all processors
3. **Better Timeout Handling**: `anyio.fail_after()` is more idiomatic than `asyncio.wait_for()`
4. **Stream Cloning**: Easy to add multiple consumers via `receive_stream.clone()`
5. **Better Statistics**: `.statistics()` provides accurate buffer metrics (qsize() was unreliable)
6. **Type Safety**: Separate send/receive types prevent accidental misuse
7. **No task_done() tracking**: Streams handle completion automatically
## Testing
- ✅ All 69 unit tests passing
- ✅ All 5 smoke tests passing
- ✅ No regressions in functionality
- ✅ Graceful shutdown behavior improved
## References
- https://anyio.readthedocs.io/en/stable/why.html#queue-fix
- https://anyio.readthedocs.io/en/stable/streams.html#memory-object-streams
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
+20
-16
@@ -1,4 +1,3 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from collections.abc import AsyncIterator
|
||||
@@ -13,6 +12,7 @@ import anyio
|
||||
import click
|
||||
import httpx
|
||||
import uvicorn
|
||||
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
|
||||
from mcp.server.auth.settings import AuthSettings
|
||||
from mcp.server.fastmcp import Context, FastMCP
|
||||
from pydantic import AnyHttpUrl
|
||||
@@ -211,7 +211,8 @@ class AppContext:
|
||||
"""Application context for BasicAuth mode."""
|
||||
|
||||
client: NextcloudClient
|
||||
document_queue: Optional[asyncio.Queue] = None
|
||||
document_send_stream: Optional[MemoryObjectSendStream] = None
|
||||
document_receive_stream: Optional[MemoryObjectReceiveStream] = None
|
||||
shutdown_event: Optional[anyio.Event] = None
|
||||
scanner_wake_event: Optional[anyio.Event] = None
|
||||
|
||||
@@ -404,7 +405,9 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]:
|
||||
)
|
||||
|
||||
# Initialize shared state
|
||||
document_queue = asyncio.Queue(maxsize=settings.vector_sync_queue_max_size)
|
||||
send_stream, receive_stream = anyio.create_memory_object_stream(
|
||||
max_buffer_size=settings.vector_sync_queue_max_size
|
||||
)
|
||||
shutdown_event = anyio.Event()
|
||||
scanner_wake_event = anyio.Event()
|
||||
|
||||
@@ -413,19 +416,19 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]:
|
||||
# Start scanner task
|
||||
tg.start_soon(
|
||||
scanner_task,
|
||||
document_queue,
|
||||
send_stream,
|
||||
shutdown_event,
|
||||
scanner_wake_event,
|
||||
client,
|
||||
username,
|
||||
)
|
||||
|
||||
# Start processor pool
|
||||
# Start processor pool (each gets a cloned receive stream)
|
||||
for i in range(settings.vector_sync_processor_workers):
|
||||
tg.start_soon(
|
||||
processor_task,
|
||||
i,
|
||||
document_queue,
|
||||
receive_stream.clone(),
|
||||
shutdown_event,
|
||||
client,
|
||||
username,
|
||||
@@ -439,7 +442,8 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]:
|
||||
try:
|
||||
yield AppContext(
|
||||
client=client,
|
||||
document_queue=document_queue,
|
||||
document_send_stream=send_stream,
|
||||
document_receive_stream=receive_stream,
|
||||
shutdown_event=shutdown_event,
|
||||
scanner_wake_event=scanner_wake_event,
|
||||
)
|
||||
@@ -1009,8 +1013,6 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
|
||||
# Start background vector sync tasks for BasicAuth mode (ADR-007)
|
||||
# For streamable-http transport, FastMCP lifespan isn't automatically triggered
|
||||
# so we manually start background tasks here if vector sync is enabled
|
||||
import asyncio as asyncio_module
|
||||
|
||||
import anyio as anyio_module
|
||||
|
||||
settings = get_settings()
|
||||
@@ -1029,21 +1031,23 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
|
||||
client = NextcloudClient.from_env()
|
||||
|
||||
# Initialize shared state
|
||||
document_queue = asyncio_module.Queue(
|
||||
maxsize=settings.vector_sync_queue_max_size
|
||||
send_stream, receive_stream = anyio_module.create_memory_object_stream(
|
||||
max_buffer_size=settings.vector_sync_queue_max_size
|
||||
)
|
||||
shutdown_event = anyio_module.Event()
|
||||
scanner_wake_event = anyio_module.Event()
|
||||
|
||||
# Store in app state for access from routes (ADR-007)
|
||||
app.state.document_queue = document_queue
|
||||
app.state.document_send_stream = send_stream
|
||||
app.state.document_receive_stream = receive_stream
|
||||
app.state.shutdown_event = shutdown_event
|
||||
app.state.scanner_wake_event = scanner_wake_event
|
||||
|
||||
# Also share with browser_app for /user/page route
|
||||
for route in app.routes:
|
||||
if isinstance(route, Mount) and route.path == "/user":
|
||||
route.app.state.document_queue = document_queue
|
||||
route.app.state.document_send_stream = send_stream
|
||||
route.app.state.document_receive_stream = receive_stream
|
||||
route.app.state.shutdown_event = shutdown_event
|
||||
route.app.state.scanner_wake_event = scanner_wake_event
|
||||
logger.info(
|
||||
@@ -1056,19 +1060,19 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
|
||||
# Start scanner task
|
||||
tg.start_soon(
|
||||
scanner_task,
|
||||
document_queue,
|
||||
send_stream,
|
||||
shutdown_event,
|
||||
scanner_wake_event,
|
||||
client,
|
||||
username,
|
||||
)
|
||||
|
||||
# Start processor pool
|
||||
# Start processor pool (each gets a cloned receive stream)
|
||||
for i in range(settings.vector_sync_processor_workers):
|
||||
tg.start_soon(
|
||||
processor_task,
|
||||
i,
|
||||
document_queue,
|
||||
receive_stream.clone(),
|
||||
shutdown_event,
|
||||
client,
|
||||
username,
|
||||
|
||||
@@ -381,12 +381,16 @@ def configure_semantic_tools(mcp: FastMCP):
|
||||
)
|
||||
|
||||
try:
|
||||
# Get document queue from lifespan context
|
||||
# Get document receive stream from lifespan context
|
||||
lifespan_ctx = ctx.request_context.lifespan_context
|
||||
document_queue = getattr(lifespan_ctx, "document_queue", None)
|
||||
document_receive_stream = getattr(
|
||||
lifespan_ctx, "document_receive_stream", None
|
||||
)
|
||||
|
||||
if document_queue is None:
|
||||
logger.debug("document_queue not available in lifespan context")
|
||||
if document_receive_stream is None:
|
||||
logger.debug(
|
||||
"document_receive_stream not available in lifespan context"
|
||||
)
|
||||
return VectorSyncStatusResponse(
|
||||
indexed_count=0,
|
||||
pending_count=0,
|
||||
@@ -394,8 +398,9 @@ def configure_semantic_tools(mcp: FastMCP):
|
||||
enabled=True,
|
||||
)
|
||||
|
||||
# Get pending count from queue
|
||||
pending_count = document_queue.qsize()
|
||||
# Get pending count from stream statistics
|
||||
stream_stats = document_receive_stream.statistics()
|
||||
pending_count = stream_stats.current_buffer_used
|
||||
|
||||
# Get Qdrant client and query indexed count
|
||||
indexed_count = 0
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
"""Processor task for vector database synchronization.
|
||||
|
||||
Processes documents from queue: fetches content, generates embeddings, stores in Qdrant.
|
||||
Processes documents from stream: fetches content, generates embeddings, stores in Qdrant.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import anyio
|
||||
from anyio.streams.memory import MemoryObjectReceiveStream
|
||||
from httpx import HTTPStatusError
|
||||
from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct
|
||||
|
||||
@@ -24,27 +24,26 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
async def processor_task(
|
||||
worker_id: int,
|
||||
document_queue: asyncio.Queue,
|
||||
receive_stream: MemoryObjectReceiveStream[DocumentTask],
|
||||
shutdown_event: anyio.Event,
|
||||
nc_client: NextcloudClient,
|
||||
user_id: str,
|
||||
):
|
||||
"""
|
||||
Process documents from queue concurrently.
|
||||
Process documents from stream concurrently.
|
||||
|
||||
Each processor task runs in a loop:
|
||||
1. Pull document from queue (with timeout)
|
||||
1. Receive document from stream (with timeout)
|
||||
2. Fetch content from Nextcloud
|
||||
3. Tokenize and chunk text
|
||||
4. Generate embeddings (I/O bound - external API)
|
||||
5. Upload vectors to Qdrant
|
||||
6. Mark task complete
|
||||
|
||||
Multiple processors run concurrently for I/O parallelism.
|
||||
|
||||
Args:
|
||||
worker_id: Worker identifier for logging
|
||||
document_queue: Queue to pull documents from
|
||||
receive_stream: Stream to receive documents from
|
||||
shutdown_event: Event signaling shutdown
|
||||
nc_client: Authenticated Nextcloud client
|
||||
user_id: User being processed
|
||||
@@ -54,32 +53,28 @@ async def processor_task(
|
||||
while not shutdown_event.is_set():
|
||||
try:
|
||||
# Get document with timeout (allows checking shutdown)
|
||||
doc_task = await asyncio.wait_for(
|
||||
document_queue.get(),
|
||||
timeout=1.0,
|
||||
)
|
||||
with anyio.fail_after(1.0):
|
||||
doc_task = await receive_stream.receive()
|
||||
|
||||
# Process document
|
||||
await process_document(doc_task, nc_client)
|
||||
|
||||
# Mark complete
|
||||
document_queue.task_done()
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
except TimeoutError:
|
||||
# No documents available, continue
|
||||
continue
|
||||
|
||||
except anyio.EndOfStream:
|
||||
# Scanner finished and closed stream, exit gracefully
|
||||
logger.info(f"Processor {worker_id}: Scanner finished, exiting")
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Processor {worker_id} error processing "
|
||||
f"{doc_task.doc_type}_{doc_task.doc_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
# Mark task done even on error to prevent queue blocking
|
||||
try:
|
||||
document_queue.task_done()
|
||||
except ValueError:
|
||||
pass
|
||||
# Continue to next document (no task_done() needed with streams)
|
||||
|
||||
logger.info(f"Processor {worker_id} stopped")
|
||||
|
||||
|
||||
@@ -3,12 +3,12 @@
|
||||
Periodically scans enabled users' content and queues changed documents for processing.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
|
||||
import anyio
|
||||
from anyio.streams.memory import MemoryObjectSendStream
|
||||
from qdrant_client.models import FieldCondition, Filter, MatchValue
|
||||
|
||||
from nextcloud_mcp_server.client import NextcloudClient
|
||||
@@ -35,7 +35,7 @@ _potentially_deleted: dict[tuple[str, str], float] = {}
|
||||
|
||||
|
||||
async def scanner_task(
|
||||
document_queue: asyncio.Queue,
|
||||
send_stream: MemoryObjectSendStream[DocumentTask],
|
||||
shutdown_event: anyio.Event,
|
||||
wake_event: anyio.Event,
|
||||
nc_client: NextcloudClient,
|
||||
@@ -47,7 +47,7 @@ async def scanner_task(
|
||||
For BasicAuth mode, scans a single user with credentials available at runtime.
|
||||
|
||||
Args:
|
||||
document_queue: Queue to enqueue changed documents
|
||||
send_stream: Stream to send changed documents to processors
|
||||
shutdown_event: Event signaling shutdown
|
||||
wake_event: Event to trigger immediate scan
|
||||
nc_client: Authenticated Nextcloud client
|
||||
@@ -56,44 +56,45 @@ async def scanner_task(
|
||||
logger.info(f"Scanner task started for user: {user_id}")
|
||||
settings = get_settings()
|
||||
|
||||
while not shutdown_event.is_set():
|
||||
try:
|
||||
# Scan user documents
|
||||
await scan_user_documents(
|
||||
user_id=user_id,
|
||||
document_queue=document_queue,
|
||||
nc_client=nc_client,
|
||||
)
|
||||
async with send_stream:
|
||||
while not shutdown_event.is_set():
|
||||
try:
|
||||
# Scan user documents
|
||||
await scan_user_documents(
|
||||
user_id=user_id,
|
||||
send_stream=send_stream,
|
||||
nc_client=nc_client,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Scanner error: {e}", exc_info=True)
|
||||
except Exception as e:
|
||||
logger.error(f"Scanner error: {e}", exc_info=True)
|
||||
|
||||
# Sleep until next interval or wake event
|
||||
try:
|
||||
with anyio.move_on_after(settings.vector_sync_scan_interval):
|
||||
# Wait for wake event or shutdown (whichever comes first)
|
||||
await wake_event.wait()
|
||||
except anyio.get_cancelled_exc_class():
|
||||
# Shutdown, exit loop
|
||||
break
|
||||
# Sleep until next interval or wake event
|
||||
try:
|
||||
with anyio.move_on_after(settings.vector_sync_scan_interval):
|
||||
# Wait for wake event or shutdown (whichever comes first)
|
||||
await wake_event.wait()
|
||||
except anyio.get_cancelled_exc_class():
|
||||
# Shutdown, exit loop
|
||||
break
|
||||
|
||||
logger.info("Scanner task stopped")
|
||||
logger.info("Scanner task stopped - stream closed")
|
||||
|
||||
|
||||
async def scan_user_documents(
|
||||
user_id: str,
|
||||
document_queue: asyncio.Queue,
|
||||
send_stream: MemoryObjectSendStream[DocumentTask],
|
||||
nc_client: NextcloudClient,
|
||||
initial_sync: bool = False,
|
||||
):
|
||||
"""
|
||||
Scan a single user's documents and queue changes.
|
||||
Scan a single user's documents and send changes to processor stream.
|
||||
|
||||
Args:
|
||||
user_id: User to scan
|
||||
document_queue: Queue to enqueue changed documents
|
||||
send_stream: Stream to send changed documents to processors
|
||||
nc_client: Authenticated Nextcloud client
|
||||
initial_sync: If True, queue all documents (first-time sync)
|
||||
initial_sync: If True, send all documents (first-time sync)
|
||||
"""
|
||||
logger.info(f"Scanning documents for user: {user_id}")
|
||||
|
||||
@@ -102,9 +103,9 @@ async def scan_user_documents(
|
||||
logger.debug(f"Found {len(notes)} notes for {user_id}")
|
||||
|
||||
if initial_sync:
|
||||
# Queue everything on first sync
|
||||
# Send everything on first sync
|
||||
for note in notes:
|
||||
await document_queue.put(
|
||||
await send_stream.send(
|
||||
DocumentTask(
|
||||
user_id=user_id,
|
||||
doc_id=str(note["id"]),
|
||||
@@ -113,7 +114,7 @@ async def scan_user_documents(
|
||||
modified_at=note["modified"],
|
||||
)
|
||||
)
|
||||
logger.info(f"Queued {len(notes)} documents for initial sync: {user_id}")
|
||||
logger.info(f"Sent {len(notes)} documents for initial sync: {user_id}")
|
||||
return
|
||||
|
||||
# Get indexed state from Qdrant
|
||||
@@ -154,9 +155,9 @@ async def scan_user_documents(
|
||||
)
|
||||
del _potentially_deleted[doc_key]
|
||||
|
||||
# Queue if never indexed or modified since last index
|
||||
# Send if never indexed or modified since last index
|
||||
if indexed_at is None or note["modified"] > indexed_at:
|
||||
await document_queue.put(
|
||||
await send_stream.send(
|
||||
DocumentTask(
|
||||
user_id=user_id,
|
||||
doc_id=doc_id,
|
||||
@@ -183,12 +184,12 @@ async def scan_user_documents(
|
||||
time_missing = current_time - first_missing_time
|
||||
|
||||
if time_missing >= grace_period:
|
||||
# Grace period elapsed, queue for deletion
|
||||
# Grace period elapsed, send for deletion
|
||||
logger.info(
|
||||
f"Document {doc_id} missing for {time_missing:.1f}s "
|
||||
f"(>{grace_period:.1f}s grace period), queueing deletion"
|
||||
f"(>{grace_period:.1f}s grace period), sending deletion"
|
||||
)
|
||||
await document_queue.put(
|
||||
await send_stream.send(
|
||||
DocumentTask(
|
||||
user_id=user_id,
|
||||
doc_id=doc_id,
|
||||
@@ -198,7 +199,7 @@ async def scan_user_documents(
|
||||
)
|
||||
)
|
||||
queued += 1
|
||||
# Remove from tracking after queueing deletion
|
||||
# Remove from tracking after sending deletion
|
||||
del _potentially_deleted[doc_key]
|
||||
else:
|
||||
logger.debug(
|
||||
@@ -213,6 +214,6 @@ async def scan_user_documents(
|
||||
_potentially_deleted[doc_key] = current_time
|
||||
|
||||
if queued > 0:
|
||||
logger.info(f"Queued {queued} documents for incremental sync: {user_id}")
|
||||
logger.info(f"Sent {queued} documents for incremental sync: {user_id}")
|
||||
else:
|
||||
logger.debug(f"No changes detected for {user_id}")
|
||||
|
||||
Reference in New Issue
Block a user