Files
nextcloud-mcp-server/nextcloud_mcp_server/vector/scanner.py
T
Chris Coutinho e575c8e57b feat(vector): Support multiple embedding models with auto-generated collection names
This PR enables safe switching between embedding models and multi-server
deployments by implementing auto-generated Qdrant collection names based on
deployment ID and model name.

## Problem

Previously, all deployments used a single hardcoded collection name
"nextcloud_content", which caused two critical issues:

1. **Dimension mismatches when switching models**: Changing
   OLLAMA_EMBEDDING_MODEL (e.g., nomic-embed-text at 768D → all-minilm at
   384D) would cause runtime errors as vectors couldn't be inserted into a
   collection with incompatible dimensions.

2. **Collection collisions in multi-server setups**: Multiple MCP servers
   sharing a single Qdrant instance would overwrite each other's data,
   making horizontal scaling impossible.

## Solution

### Auto-Generated Collection Naming

Collections are now automatically named using the pattern:
\`{deployment-id}-{model-name}\`

**Deployment ID**: Uses \`OTEL_SERVICE_NAME\` if configured (and not default
value), otherwise falls back to \`hostname\` for simple Docker deployments.

**Model Name**: From \`OLLAMA_EMBEDDING_MODEL\` with path separators sanitized.

**Examples**:
- \`my-mcp-server-nomic-embed-text\` (with OTEL_SERVICE_NAME=my-mcp-server)
- \`mcp-container-all-minilm\` (simple Docker, hostname=mcp-container)

**Override**: Users can still set \`QDRANT_COLLECTION\` explicitly to bypass
auto-generation for backward compatibility.

### Dimension Validation

Added startup validation that checks collection dimensions match the
embedding service. If a mismatch is detected, the server fails fast with a
clear error message explaining:
- Expected vs actual dimensions
- Likely cause (model change)
- Solutions (delete collection, use different name, or revert model)

### Improved Sampling Error Handling

Enhanced MCP sampling rejection handling to treat user rejections as normal
behavior rather than errors:

- **User rejections** ("rejected", "denied") → INFO log, no traceback
- **Unsupported clients** → INFO log, no traceback
- **Other MCP errors** → WARNING log, no traceback
- **Unexpected errors** → ERROR log WITH traceback

This aligns with the MCP specification where clients SHOULD prompt users for
approval/denial of sampling requests.

## Changes

### Core Implementation

- **nextcloud_mcp_server/config.py**: Added \`get_collection_name()\` method
  with deployment ID detection and model name sanitization
- **nextcloud_mcp_server/vector/qdrant_client.py**: Dimension validation on
  collection open with helpful error messages
- **nextcloud_mcp_server/vector/{scanner,processor}.py**: Updated to use
  \`get_collection_name()\`
- **nextcloud_mcp_server/auth/userinfo_routes.py**: Vector sync status uses
  \`get_collection_name()\`
- **nextcloud_mcp_server/server/semantic.py**:
  - Updated semantic search tools to use \`get_collection_name()\`
  - Improved sampling rejection error handling (McpError vs Exception)

### Documentation

- **docs/semantic-search-architecture.md**: New comprehensive architecture
  document (557 lines) covering background sync, semantic search flow, RAG
  implementation, and deployment modes
- **docs/configuration.md**: Added detailed "Qdrant Collection Naming"
  section with examples and multi-server deployment guidance
- **docker-compose.yml**: Added comments explaining collection naming behavior
- **README.md**: Updated semantic search descriptions to clarify
  experimental status, Notes-only support, and infrastructure requirements

## Migration Guide

**For existing single-server deployments:**

Option 1 (Recommended): Use explicit collection name for continuity
\`\`\`bash
QDRANT_COLLECTION=nextcloud_content  # Keep existing collection
\`\`\`

Option 2: Allow auto-generation and re-embed
\`\`\`bash
# Remove QDRANT_COLLECTION override
# New collection will be created based on deployment ID + model
# Requires re-embedding all documents (may take time)
\`\`\`

**For new multi-server deployments:**

Set unique OTEL service names per server:
\`\`\`bash
# Server 1
OTEL_SERVICE_NAME=mcp-prod
OLLAMA_EMBEDDING_MODEL=nomic-embed-text
# → Collection: "mcp-prod-nomic-embed-text"

# Server 2
OTEL_SERVICE_NAME=mcp-staging
OLLAMA_EMBEDDING_MODEL=nomic-embed-text
# → Collection: "mcp-staging-nomic-embed-text"
\`\`\`

## Benefits

 **Safe model switching**: Each model gets its own collection, preventing
   dimension mismatch errors
 **Multi-server support**: Multiple MCP servers can share one Qdrant
   instance without conflicts
 **Clear ownership**: Collection names show which deployment and model owns
   the data
 **Better error messages**: Dimension validation provides actionable
   guidance
 **Backward compatible**: Existing deployments can continue using
   \`QDRANT_COLLECTION\` override

## Testing

Validated with:
- Single-server deployments (default hostname-based naming)
- Multi-server deployments (OTEL service name-based naming)
- Model switching scenarios (dimension validation)
- Collection override scenarios (backward compatibility)

Next steps: Testing various Ollama embedding models to investigate optimal
chunk sizes and performance characteristics.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 01:18:30 +01:00

234 lines
8.0 KiB
Python

"""Scanner task for vector database synchronization.
Periodically scans enabled users' content and queues changed documents for processing.
"""
import logging
import time
from dataclasses import dataclass
import anyio
from anyio.streams.memory import MemoryObjectSendStream
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
logger = logging.getLogger(__name__)
@dataclass
class DocumentTask:
"""Document task for processing queue."""
user_id: str
doc_id: str
doc_type: str # "note", "file", "calendar"
operation: str # "index" or "delete"
modified_at: int
# Track documents potentially deleted (grace period before actual deletion)
# Format: {(user_id, doc_id): first_missing_timestamp}
_potentially_deleted: dict[tuple[str, str], float] = {}
async def scanner_task(
send_stream: MemoryObjectSendStream[DocumentTask],
shutdown_event: anyio.Event,
wake_event: anyio.Event,
nc_client: NextcloudClient,
user_id: str,
):
"""
Periodic scanner that detects changed documents for enabled user.
For BasicAuth mode, scans a single user with credentials available at runtime.
Args:
send_stream: Stream to send changed documents to processors
shutdown_event: Event signaling shutdown
wake_event: Event to trigger immediate scan
nc_client: Authenticated Nextcloud client
user_id: User to scan
"""
logger.info(f"Scanner task started for user: {user_id}")
settings = get_settings()
async with send_stream:
while not shutdown_event.is_set():
try:
# Scan user documents
await scan_user_documents(
user_id=user_id,
send_stream=send_stream,
nc_client=nc_client,
)
except Exception as e:
logger.error(f"Scanner error: {e}", exc_info=True)
# Sleep until next interval or wake event
try:
with anyio.move_on_after(settings.vector_sync_scan_interval):
# Wait for wake event or shutdown (whichever comes first)
await wake_event.wait()
except anyio.get_cancelled_exc_class():
# Shutdown, exit loop
break
logger.info("Scanner task stopped - stream closed")
async def scan_user_documents(
user_id: str,
send_stream: MemoryObjectSendStream[DocumentTask],
nc_client: NextcloudClient,
initial_sync: bool = False,
):
"""
Scan a single user's documents and send changes to processor stream.
Args:
user_id: User to scan
send_stream: Stream to send changed documents to processors
nc_client: Authenticated Nextcloud client
initial_sync: If True, send all documents (first-time sync)
"""
logger.debug(f"Scanning documents for user: {user_id}")
# Fetch all notes from Nextcloud
notes = [note async for note in nc_client.notes.get_all_notes()]
logger.debug(f"Found {len(notes)} notes for {user_id}")
if initial_sync:
# Send everything on first sync
for note in notes:
# Handle missing 'modified' field (use 0 as fallback)
modified_at = note.get("modified", 0)
if modified_at == 0:
logger.warning(
f"Note {note['id']} missing 'modified' field, using 0 as fallback"
)
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=str(note["id"]),
doc_type="note",
operation="index",
modified_at=modified_at,
)
)
logger.info(f"Sent {len(notes)} documents for initial sync: {user_id}")
return
# Get indexed state from Qdrant
qdrant_client = await get_qdrant_client()
scroll_result = await qdrant_client.scroll(
collection_name=get_settings().get_collection_name(),
scroll_filter=Filter(
must=[
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
FieldCondition(key="doc_type", match=MatchValue(value="note")),
]
),
with_payload=["doc_id", "indexed_at"],
with_vectors=False,
limit=10000,
)
indexed_docs = {
point.payload["doc_id"]: point.payload["indexed_at"]
for point in scroll_result[0]
}
logger.debug(f"Found {len(indexed_docs)} indexed documents in Qdrant")
# Compare and queue changes
queued = 0
nextcloud_doc_ids = {str(note["id"]) for note in notes}
for note in notes:
doc_id = str(note["id"])
indexed_at = indexed_docs.get(doc_id)
# Handle missing 'modified' field (use 0 as fallback)
modified_at = note.get("modified", 0)
if modified_at == 0:
logger.warning(
f"Note {doc_id} missing 'modified' field, using 0 as fallback"
)
# If document reappeared, remove from potentially_deleted
doc_key = (user_id, doc_id)
if doc_key in _potentially_deleted:
logger.debug(
f"Document {doc_id} reappeared, removing from deletion grace period"
)
del _potentially_deleted[doc_key]
# Send if never indexed or modified since last index
if indexed_at is None or modified_at > indexed_at:
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=doc_id,
doc_type="note",
operation="index",
modified_at=modified_at,
)
)
queued += 1
# Check for deleted documents (in Qdrant but not in Nextcloud)
# Use grace period: only delete after 2 consecutive scans confirm absence
settings = get_settings()
grace_period = settings.vector_sync_scan_interval * 1.5 # Allow 1.5 scan intervals
current_time = time.time()
for doc_id in indexed_docs:
if doc_id not in nextcloud_doc_ids:
doc_key = (user_id, doc_id)
if doc_key in _potentially_deleted:
# Already marked as potentially deleted, check if grace period elapsed
first_missing_time = _potentially_deleted[doc_key]
time_missing = current_time - first_missing_time
if time_missing >= grace_period:
# Grace period elapsed, send for deletion
logger.info(
f"Document {doc_id} missing for {time_missing:.1f}s "
f"(>{grace_period:.1f}s grace period), sending deletion"
)
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=doc_id,
doc_type="note",
operation="delete",
modified_at=0,
)
)
queued += 1
# Remove from tracking after sending deletion
del _potentially_deleted[doc_key]
else:
logger.debug(
f"Document {doc_id} still missing "
f"({time_missing:.1f}s/{grace_period:.1f}s grace period)"
)
else:
# First time missing, add to grace period tracking
logger.debug(
f"Document {doc_id} missing for first time, starting grace period"
)
_potentially_deleted[doc_key] = current_time
if queued > 0:
logger.info(f"Sent {queued} documents for incremental sync: {user_id}")
else:
logger.debug(f"No changes detected for {user_id}")