72232f937a
Replace asyncio.Queue with anyio.create_memory_object_stream() throughout
the vector sync system for better library consistency and improved shutdown
semantics.
## Changes Made
**scanner.py**:
- Changed parameter type from `asyncio.Queue` to `MemoryObjectSendStream[DocumentTask]`
- Replaced all `await document_queue.put()` calls with `await send_stream.send()`
- Wrapped scanner loop in `async with send_stream:` context manager for automatic cleanup
- Updated log messages: "Queued" → "Sent"
- Removed `import asyncio` (no longer needed)
**processor.py**:
- Changed parameter type from `asyncio.Queue` to `MemoryObjectReceiveStream[DocumentTask]`
- Replaced `asyncio.wait_for(document_queue.get(), timeout=1.0)` with `anyio.fail_after(1.0)` + `await receive_stream.receive()`
- Removed all `document_queue.task_done()` calls (not needed with streams)
- Added `anyio.EndOfStream` exception handling for graceful shutdown when scanner closes
- Removed `import asyncio` (no longer needed)
**app.py**:
- Removed `import asyncio` from top-level imports
- Added `from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream`
- Updated AppContext dataclass:
- Replaced `document_queue: Optional[asyncio.Queue]` with:
- `document_send_stream: Optional[MemoryObjectSendStream]`
- `document_receive_stream: Optional[MemoryObjectReceiveStream]`
- Updated `app_lifespan_basic()`:
- Replaced `asyncio.Queue(maxsize=...)` with `anyio.create_memory_object_stream(max_buffer_size=...)`
- Pass `send_stream` to scanner_task
- Pass `receive_stream.clone()` to each processor_task (enables multiple consumers)
- Updated AppContext yield to include both streams
- Updated `starlette_lifespan()`:
- Same changes as app_lifespan_basic for streamable-http transport
- Removed `import asyncio as asyncio_module` (no longer needed)
- Updated app.state storage to use send_stream and receive_stream
**semantic.py**:
- Updated `nc_get_vector_sync_status()` tool:
- Access `document_receive_stream` instead of `document_queue` from lifespan context
- Use `stream_stats.current_buffer_used` instead of `queue.qsize()` for pending count
- More reliable metrics (qsize() was not guaranteed accurate)
## Benefits
1. **Library Consistency**: Pure anyio throughout codebase (was mixing asyncio.Queue with anyio.Event and anyio.create_task_group)
2. **Graceful Shutdown**: `async with send_stream:` automatically closes stream on exit, signaling EndOfStream to all processors
3. **Better Timeout Handling**: `anyio.fail_after()` is more idiomatic than `asyncio.wait_for()`
4. **Stream Cloning**: Easy to add multiple consumers via `receive_stream.clone()`
5. **Better Statistics**: `.statistics()` provides accurate buffer metrics (qsize() was unreliable)
6. **Type Safety**: Separate send/receive types prevent accidental misuse
7. **No task_done() tracking**: Streams handle completion automatically
## Testing
- ✅ All 69 unit tests passing
- ✅ All 5 smoke tests passing
- ✅ No regressions in functionality
- ✅ Graceful shutdown behavior improved
## References
- https://anyio.readthedocs.io/en/stable/why.html#queue-fix
- https://anyio.readthedocs.io/en/stable/streams.html#memory-object-streams
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
221 lines
7.1 KiB
Python
221 lines
7.1 KiB
Python
"""Processor task for vector database synchronization.
|
|
|
|
Processes documents from stream: fetches content, generates embeddings, stores in Qdrant.
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
import uuid
|
|
|
|
import anyio
|
|
from anyio.streams.memory import MemoryObjectReceiveStream
|
|
from httpx import HTTPStatusError
|
|
from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct
|
|
|
|
from nextcloud_mcp_server.client import NextcloudClient
|
|
from nextcloud_mcp_server.config import get_settings
|
|
from nextcloud_mcp_server.embedding import get_embedding_service
|
|
from nextcloud_mcp_server.vector.document_chunker import DocumentChunker
|
|
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
|
|
from nextcloud_mcp_server.vector.scanner import DocumentTask
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def processor_task(
|
|
worker_id: int,
|
|
receive_stream: MemoryObjectReceiveStream[DocumentTask],
|
|
shutdown_event: anyio.Event,
|
|
nc_client: NextcloudClient,
|
|
user_id: str,
|
|
):
|
|
"""
|
|
Process documents from stream concurrently.
|
|
|
|
Each processor task runs in a loop:
|
|
1. Receive document from stream (with timeout)
|
|
2. Fetch content from Nextcloud
|
|
3. Tokenize and chunk text
|
|
4. Generate embeddings (I/O bound - external API)
|
|
5. Upload vectors to Qdrant
|
|
|
|
Multiple processors run concurrently for I/O parallelism.
|
|
|
|
Args:
|
|
worker_id: Worker identifier for logging
|
|
receive_stream: Stream to receive documents from
|
|
shutdown_event: Event signaling shutdown
|
|
nc_client: Authenticated Nextcloud client
|
|
user_id: User being processed
|
|
"""
|
|
logger.info(f"Processor {worker_id} started")
|
|
|
|
while not shutdown_event.is_set():
|
|
try:
|
|
# Get document with timeout (allows checking shutdown)
|
|
with anyio.fail_after(1.0):
|
|
doc_task = await receive_stream.receive()
|
|
|
|
# Process document
|
|
await process_document(doc_task, nc_client)
|
|
|
|
except TimeoutError:
|
|
# No documents available, continue
|
|
continue
|
|
|
|
except anyio.EndOfStream:
|
|
# Scanner finished and closed stream, exit gracefully
|
|
logger.info(f"Processor {worker_id}: Scanner finished, exiting")
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Processor {worker_id} error processing "
|
|
f"{doc_task.doc_type}_{doc_task.doc_id}: {e}",
|
|
exc_info=True,
|
|
)
|
|
# Continue to next document (no task_done() needed with streams)
|
|
|
|
logger.info(f"Processor {worker_id} stopped")
|
|
|
|
|
|
async def process_document(doc_task: DocumentTask, nc_client: NextcloudClient):
|
|
"""
|
|
Process a single document: fetch, tokenize, embed, store in Qdrant.
|
|
|
|
Implements retry logic with exponential backoff for transient failures.
|
|
|
|
Args:
|
|
doc_task: Document task to process
|
|
nc_client: Authenticated Nextcloud client
|
|
"""
|
|
logger.debug(
|
|
f"Processing {doc_task.doc_type}_{doc_task.doc_id} "
|
|
f"for {doc_task.user_id} ({doc_task.operation})"
|
|
)
|
|
|
|
qdrant_client = await get_qdrant_client()
|
|
settings = get_settings()
|
|
|
|
# Handle deletion
|
|
if doc_task.operation == "delete":
|
|
await qdrant_client.delete(
|
|
collection_name=settings.qdrant_collection,
|
|
points_selector=Filter(
|
|
must=[
|
|
FieldCondition(
|
|
key="user_id",
|
|
match=MatchValue(value=doc_task.user_id),
|
|
),
|
|
FieldCondition(
|
|
key="doc_id",
|
|
match=MatchValue(value=doc_task.doc_id),
|
|
),
|
|
FieldCondition(
|
|
key="doc_type",
|
|
match=MatchValue(value=doc_task.doc_type),
|
|
),
|
|
]
|
|
),
|
|
)
|
|
logger.info(
|
|
f"Deleted {doc_task.doc_type}_{doc_task.doc_id} for {doc_task.user_id}"
|
|
)
|
|
return
|
|
|
|
# Handle indexing with retry
|
|
max_retries = 3
|
|
retry_delay = 1.0
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
await _index_document(doc_task, nc_client, qdrant_client)
|
|
return # Success
|
|
|
|
except (HTTPStatusError, Exception) as e:
|
|
if attempt < max_retries - 1:
|
|
logger.warning(
|
|
f"Retry {attempt + 1}/{max_retries} for "
|
|
f"{doc_task.doc_type}_{doc_task.doc_id}: {e}"
|
|
)
|
|
await anyio.sleep(retry_delay)
|
|
retry_delay *= 2 # Exponential backoff
|
|
else:
|
|
logger.error(
|
|
f"Failed to index {doc_task.doc_type}_{doc_task.doc_id} "
|
|
f"after {max_retries} retries: {e}"
|
|
)
|
|
raise
|
|
|
|
|
|
async def _index_document(
|
|
doc_task: DocumentTask, nc_client: NextcloudClient, qdrant_client
|
|
):
|
|
"""
|
|
Index a single document (called by process_document with retry).
|
|
|
|
Args:
|
|
doc_task: Document task to index
|
|
nc_client: Authenticated Nextcloud client
|
|
qdrant_client: Qdrant client instance
|
|
"""
|
|
settings = get_settings()
|
|
|
|
# Fetch document content
|
|
if doc_task.doc_type == "note":
|
|
document = await nc_client.notes.get_note(int(doc_task.doc_id))
|
|
content = f"{document['title']}\n\n{document['content']}"
|
|
title = document["title"]
|
|
etag = document.get("etag", "")
|
|
else:
|
|
raise ValueError(f"Unsupported doc_type: {doc_task.doc_type}")
|
|
|
|
# Tokenize and chunk
|
|
chunker = DocumentChunker(chunk_size=512, overlap=50)
|
|
chunks = chunker.chunk_text(content)
|
|
|
|
# Generate embeddings (I/O bound - external API call)
|
|
embedding_service = get_embedding_service()
|
|
embeddings = await embedding_service.embed_batch(chunks)
|
|
|
|
# Prepare Qdrant points
|
|
indexed_at = int(time.time())
|
|
points = []
|
|
|
|
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
|
|
# Generate deterministic UUID for point ID
|
|
# Using uuid5 with DNS namespace and combining doc info
|
|
point_name = f"{doc_task.doc_type}:{doc_task.doc_id}:chunk:{i}"
|
|
point_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, point_name))
|
|
|
|
points.append(
|
|
PointStruct(
|
|
id=point_id,
|
|
vector=embedding,
|
|
payload={
|
|
"user_id": doc_task.user_id,
|
|
"doc_id": doc_task.doc_id,
|
|
"doc_type": doc_task.doc_type,
|
|
"title": title,
|
|
"excerpt": chunk[:200],
|
|
"indexed_at": indexed_at,
|
|
"modified_at": doc_task.modified_at,
|
|
"etag": etag,
|
|
"chunk_index": i,
|
|
"total_chunks": len(chunks),
|
|
},
|
|
)
|
|
)
|
|
|
|
# Upsert to Qdrant
|
|
await qdrant_client.upsert(
|
|
collection_name=settings.qdrant_collection,
|
|
points=points,
|
|
wait=True,
|
|
)
|
|
|
|
logger.info(
|
|
f"Indexed {doc_task.doc_type}_{doc_task.doc_id} for {doc_task.user_id} "
|
|
f"({len(chunks)} chunks)"
|
|
)
|