Files
nextcloud-mcp-server/nextcloud_mcp_server/vector/processor.py
T
Chris Coutinho e575c8e57b feat(vector): Support multiple embedding models with auto-generated collection names
This PR enables safe switching between embedding models and multi-server
deployments by implementing auto-generated Qdrant collection names based on
deployment ID and model name.

## Problem

Previously, all deployments used a single hardcoded collection name
"nextcloud_content", which caused two critical issues:

1. **Dimension mismatches when switching models**: Changing
   OLLAMA_EMBEDDING_MODEL (e.g., nomic-embed-text at 768D → all-minilm at
   384D) would cause runtime errors as vectors couldn't be inserted into a
   collection with incompatible dimensions.

2. **Collection collisions in multi-server setups**: Multiple MCP servers
   sharing a single Qdrant instance would overwrite each other's data,
   making horizontal scaling impossible.

## Solution

### Auto-Generated Collection Naming

Collections are now automatically named using the pattern:
\`{deployment-id}-{model-name}\`

**Deployment ID**: Uses \`OTEL_SERVICE_NAME\` if configured (and not default
value), otherwise falls back to \`hostname\` for simple Docker deployments.

**Model Name**: From \`OLLAMA_EMBEDDING_MODEL\` with path separators sanitized.

**Examples**:
- \`my-mcp-server-nomic-embed-text\` (with OTEL_SERVICE_NAME=my-mcp-server)
- \`mcp-container-all-minilm\` (simple Docker, hostname=mcp-container)

**Override**: Users can still set \`QDRANT_COLLECTION\` explicitly to bypass
auto-generation for backward compatibility.

### Dimension Validation

Added startup validation that checks collection dimensions match the
embedding service. If a mismatch is detected, the server fails fast with a
clear error message explaining:
- Expected vs actual dimensions
- Likely cause (model change)
- Solutions (delete collection, use different name, or revert model)

### Improved Sampling Error Handling

Enhanced MCP sampling rejection handling to treat user rejections as normal
behavior rather than errors:

- **User rejections** ("rejected", "denied") → INFO log, no traceback
- **Unsupported clients** → INFO log, no traceback
- **Other MCP errors** → WARNING log, no traceback
- **Unexpected errors** → ERROR log WITH traceback

This aligns with the MCP specification where clients SHOULD prompt users for
approval/denial of sampling requests.

## Changes

### Core Implementation

- **nextcloud_mcp_server/config.py**: Added \`get_collection_name()\` method
  with deployment ID detection and model name sanitization
- **nextcloud_mcp_server/vector/qdrant_client.py**: Dimension validation on
  collection open with helpful error messages
- **nextcloud_mcp_server/vector/{scanner,processor}.py**: Updated to use
  \`get_collection_name()\`
- **nextcloud_mcp_server/auth/userinfo_routes.py**: Vector sync status uses
  \`get_collection_name()\`
- **nextcloud_mcp_server/server/semantic.py**:
  - Updated semantic search tools to use \`get_collection_name()\`
  - Improved sampling rejection error handling (McpError vs Exception)

### Documentation

- **docs/semantic-search-architecture.md**: New comprehensive architecture
  document (557 lines) covering background sync, semantic search flow, RAG
  implementation, and deployment modes
- **docs/configuration.md**: Added detailed "Qdrant Collection Naming"
  section with examples and multi-server deployment guidance
- **docker-compose.yml**: Added comments explaining collection naming behavior
- **README.md**: Updated semantic search descriptions to clarify
  experimental status, Notes-only support, and infrastructure requirements

## Migration Guide

**For existing single-server deployments:**

Option 1 (Recommended): Use explicit collection name for continuity
\`\`\`bash
QDRANT_COLLECTION=nextcloud_content  # Keep existing collection
\`\`\`

Option 2: Allow auto-generation and re-embed
\`\`\`bash
# Remove QDRANT_COLLECTION override
# New collection will be created based on deployment ID + model
# Requires re-embedding all documents (may take time)
\`\`\`

**For new multi-server deployments:**

Set unique OTEL service names per server:
\`\`\`bash
# Server 1
OTEL_SERVICE_NAME=mcp-prod
OLLAMA_EMBEDDING_MODEL=nomic-embed-text
# → Collection: "mcp-prod-nomic-embed-text"

# Server 2
OTEL_SERVICE_NAME=mcp-staging
OLLAMA_EMBEDDING_MODEL=nomic-embed-text
# → Collection: "mcp-staging-nomic-embed-text"
\`\`\`

## Benefits

 **Safe model switching**: Each model gets its own collection, preventing
   dimension mismatch errors
 **Multi-server support**: Multiple MCP servers can share one Qdrant
   instance without conflicts
 **Clear ownership**: Collection names show which deployment and model owns
   the data
 **Better error messages**: Dimension validation provides actionable
   guidance
 **Backward compatible**: Existing deployments can continue using
   \`QDRANT_COLLECTION\` override

## Testing

Validated with:
- Single-server deployments (default hostname-based naming)
- Multi-server deployments (OTEL service name-based naming)
- Model switching scenarios (dimension validation)
- Collection override scenarios (backward compatibility)

Next steps: Testing various Ollama embedding models to investigate optimal
chunk sizes and performance characteristics.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 01:18:30 +01:00

221 lines
7.1 KiB
Python

"""Processor task for vector database synchronization.
Processes documents from stream: fetches content, generates embeddings, stores in Qdrant.
"""
import logging
import time
import uuid
import anyio
from anyio.streams.memory import MemoryObjectReceiveStream
from httpx import HTTPStatusError
from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.embedding import get_embedding_service
from nextcloud_mcp_server.vector.document_chunker import DocumentChunker
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
from nextcloud_mcp_server.vector.scanner import DocumentTask
logger = logging.getLogger(__name__)
async def processor_task(
worker_id: int,
receive_stream: MemoryObjectReceiveStream[DocumentTask],
shutdown_event: anyio.Event,
nc_client: NextcloudClient,
user_id: str,
):
"""
Process documents from stream concurrently.
Each processor task runs in a loop:
1. Receive document from stream (with timeout)
2. Fetch content from Nextcloud
3. Tokenize and chunk text
4. Generate embeddings (I/O bound - external API)
5. Upload vectors to Qdrant
Multiple processors run concurrently for I/O parallelism.
Args:
worker_id: Worker identifier for logging
receive_stream: Stream to receive documents from
shutdown_event: Event signaling shutdown
nc_client: Authenticated Nextcloud client
user_id: User being processed
"""
logger.info(f"Processor {worker_id} started")
while not shutdown_event.is_set():
try:
# Get document with timeout (allows checking shutdown)
with anyio.fail_after(1.0):
doc_task = await receive_stream.receive()
# Process document
await process_document(doc_task, nc_client)
except TimeoutError:
# No documents available, continue
continue
except anyio.EndOfStream:
# Scanner finished and closed stream, exit gracefully
logger.info(f"Processor {worker_id}: Scanner finished, exiting")
break
except Exception as e:
logger.error(
f"Processor {worker_id} error processing "
f"{doc_task.doc_type}_{doc_task.doc_id}: {e}",
exc_info=True,
)
# Continue to next document (no task_done() needed with streams)
logger.info(f"Processor {worker_id} stopped")
async def process_document(doc_task: DocumentTask, nc_client: NextcloudClient):
"""
Process a single document: fetch, tokenize, embed, store in Qdrant.
Implements retry logic with exponential backoff for transient failures.
Args:
doc_task: Document task to process
nc_client: Authenticated Nextcloud client
"""
logger.debug(
f"Processing {doc_task.doc_type}_{doc_task.doc_id} "
f"for {doc_task.user_id} ({doc_task.operation})"
)
qdrant_client = await get_qdrant_client()
settings = get_settings()
# Handle deletion
if doc_task.operation == "delete":
await qdrant_client.delete(
collection_name=settings.get_collection_name(),
points_selector=Filter(
must=[
FieldCondition(
key="user_id",
match=MatchValue(value=doc_task.user_id),
),
FieldCondition(
key="doc_id",
match=MatchValue(value=doc_task.doc_id),
),
FieldCondition(
key="doc_type",
match=MatchValue(value=doc_task.doc_type),
),
]
),
)
logger.info(
f"Deleted {doc_task.doc_type}_{doc_task.doc_id} for {doc_task.user_id}"
)
return
# Handle indexing with retry
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
await _index_document(doc_task, nc_client, qdrant_client)
return # Success
except (HTTPStatusError, Exception) as e:
if attempt < max_retries - 1:
logger.warning(
f"Retry {attempt + 1}/{max_retries} for "
f"{doc_task.doc_type}_{doc_task.doc_id}: {e}"
)
await anyio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
logger.error(
f"Failed to index {doc_task.doc_type}_{doc_task.doc_id} "
f"after {max_retries} retries: {e}"
)
raise
async def _index_document(
doc_task: DocumentTask, nc_client: NextcloudClient, qdrant_client
):
"""
Index a single document (called by process_document with retry).
Args:
doc_task: Document task to index
nc_client: Authenticated Nextcloud client
qdrant_client: Qdrant client instance
"""
settings = get_settings()
# Fetch document content
if doc_task.doc_type == "note":
document = await nc_client.notes.get_note(int(doc_task.doc_id))
content = f"{document['title']}\n\n{document['content']}"
title = document["title"]
etag = document.get("etag", "")
else:
raise ValueError(f"Unsupported doc_type: {doc_task.doc_type}")
# Tokenize and chunk
chunker = DocumentChunker(chunk_size=512, overlap=50)
chunks = chunker.chunk_text(content)
# Generate embeddings (I/O bound - external API call)
embedding_service = get_embedding_service()
embeddings = await embedding_service.embed_batch(chunks)
# Prepare Qdrant points
indexed_at = int(time.time())
points = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
# Generate deterministic UUID for point ID
# Using uuid5 with DNS namespace and combining doc info
point_name = f"{doc_task.doc_type}:{doc_task.doc_id}:chunk:{i}"
point_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, point_name))
points.append(
PointStruct(
id=point_id,
vector=embedding,
payload={
"user_id": doc_task.user_id,
"doc_id": doc_task.doc_id,
"doc_type": doc_task.doc_type,
"title": title,
"excerpt": chunk[:200],
"indexed_at": indexed_at,
"modified_at": doc_task.modified_at,
"etag": etag,
"chunk_index": i,
"total_chunks": len(chunks),
},
)
)
# Upsert to Qdrant
await qdrant_client.upsert(
collection_name=settings.get_collection_name(),
points=points,
wait=True,
)
logger.info(
f"Indexed {doc_task.doc_type}_{doc_task.doc_id} for {doc_task.user_id} "
f"({len(chunks)} chunks)"
)