Files
nextcloud-mcp-server/nextcloud_mcp_server/vector/processor.py
T
Chris Coutinho cb39b3fca4 feat(vector): Add configurable chunk size and overlap for document embedding
Enable users to tune document chunking parameters to match their embedding
model and content type by adding DOCUMENT_CHUNK_SIZE and DOCUMENT_CHUNK_OVERLAP
environment variables.

- **config.py**: Added `document_chunk_size` (default: 512) and
  `document_chunk_overlap` (default: 50) configuration fields with validation:
  - Ensures overlap < chunk_size
  - Warns if chunk_size < 100 words
  - Prevents negative overlap values

- **processor.py**: Updated DocumentChunker instantiation to use config
  settings instead of hardcoded values (line 174-177)

- **tests/unit/test_config.py**: Added TestChunkConfigValidation class with
  9 tests covering:
  - Default values
  - Valid configurations
  - Validation errors (overlap >= chunk_size, negative overlap)
  - Warning for small chunk sizes
  - Environment variable loading

- **docs/configuration.md**: Added comprehensive "Document Chunking
  Configuration" section with:
  - Chunk size selection guidance (256-384 vs 512 vs 768-1024 words)
  - Overlap recommendations (10-20% of chunk size)
  - Configuration examples for different use cases
  - Added env vars to reference table

- **docs/semantic-search-architecture.md**: Added "Document Chunking Strategy"
  section with:
  - Chunking process explanation
  - Example showing sliding window behavior
  - Search behavior with chunks
  - Tuning recommendations

- **env.sample**: Added complete "Semantic Search & Vector Sync Configuration"
  section with:
  - Vector sync settings
  - Qdrant configuration (3 modes)
  - Ollama embedding service
  - Document chunking configuration

- **docker-compose.yml**: Added commented examples for DOCUMENT_CHUNK_SIZE and
  DOCUMENT_CHUNK_OVERLAP with usage notes

\`\`\`bash
DOCUMENT_CHUNK_SIZE=512

DOCUMENT_CHUNK_OVERLAP=50
\`\`\`

1. \`overlap\` must be less than \`chunk_size\`
2. \`overlap\` cannot be negative
3. Warning issued if \`chunk_size\` < 100 words

**Precise matching** (small notes, specific queries):
\`\`\`bash
DOCUMENT_CHUNK_SIZE=256
DOCUMENT_CHUNK_OVERLAP=25
\`\`\`

**Balanced** (default, general purpose):
\`\`\`bash
DOCUMENT_CHUNK_SIZE=512
DOCUMENT_CHUNK_OVERLAP=50
\`\`\`

**Contextual** (long documents, broader topics):
\`\`\`bash
DOCUMENT_CHUNK_SIZE=1024
DOCUMENT_CHUNK_OVERLAP=100
\`\`\`

 **User control** - Tune chunking to match embedding model capabilities
 **Experimentation** - Test different chunk sizes for optimal results
 **Model alignment** - Match chunk size to embedding context window
 **Backward compatible** - Defaults maintain existing behavior
 **Well validated** - Comprehensive tests prevent misconfiguration

All 22 config validation tests pass (9 new tests for chunking):
- Default values work correctly
- Validation prevents invalid configurations
- Environment variables load properly
- Warning system works as expected

With configurable chunk sizes, users can now experiment with different Ollama
embedding models and tune chunk parameters for optimal semantic search quality.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-10 02:47:57 +01:00

224 lines
7.2 KiB
Python

"""Processor task for vector database synchronization.
Processes documents from stream: fetches content, generates embeddings, stores in Qdrant.
"""
import logging
import time
import uuid
import anyio
from anyio.streams.memory import MemoryObjectReceiveStream
from httpx import HTTPStatusError
from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.embedding import get_embedding_service
from nextcloud_mcp_server.vector.document_chunker import DocumentChunker
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
from nextcloud_mcp_server.vector.scanner import DocumentTask
logger = logging.getLogger(__name__)
async def processor_task(
worker_id: int,
receive_stream: MemoryObjectReceiveStream[DocumentTask],
shutdown_event: anyio.Event,
nc_client: NextcloudClient,
user_id: str,
):
"""
Process documents from stream concurrently.
Each processor task runs in a loop:
1. Receive document from stream (with timeout)
2. Fetch content from Nextcloud
3. Tokenize and chunk text
4. Generate embeddings (I/O bound - external API)
5. Upload vectors to Qdrant
Multiple processors run concurrently for I/O parallelism.
Args:
worker_id: Worker identifier for logging
receive_stream: Stream to receive documents from
shutdown_event: Event signaling shutdown
nc_client: Authenticated Nextcloud client
user_id: User being processed
"""
logger.info(f"Processor {worker_id} started")
while not shutdown_event.is_set():
try:
# Get document with timeout (allows checking shutdown)
with anyio.fail_after(1.0):
doc_task = await receive_stream.receive()
# Process document
await process_document(doc_task, nc_client)
except TimeoutError:
# No documents available, continue
continue
except anyio.EndOfStream:
# Scanner finished and closed stream, exit gracefully
logger.info(f"Processor {worker_id}: Scanner finished, exiting")
break
except Exception as e:
logger.error(
f"Processor {worker_id} error processing "
f"{doc_task.doc_type}_{doc_task.doc_id}: {e}",
exc_info=True,
)
# Continue to next document (no task_done() needed with streams)
logger.info(f"Processor {worker_id} stopped")
async def process_document(doc_task: DocumentTask, nc_client: NextcloudClient):
"""
Process a single document: fetch, tokenize, embed, store in Qdrant.
Implements retry logic with exponential backoff for transient failures.
Args:
doc_task: Document task to process
nc_client: Authenticated Nextcloud client
"""
logger.debug(
f"Processing {doc_task.doc_type}_{doc_task.doc_id} "
f"for {doc_task.user_id} ({doc_task.operation})"
)
qdrant_client = await get_qdrant_client()
settings = get_settings()
# Handle deletion
if doc_task.operation == "delete":
await qdrant_client.delete(
collection_name=settings.get_collection_name(),
points_selector=Filter(
must=[
FieldCondition(
key="user_id",
match=MatchValue(value=doc_task.user_id),
),
FieldCondition(
key="doc_id",
match=MatchValue(value=doc_task.doc_id),
),
FieldCondition(
key="doc_type",
match=MatchValue(value=doc_task.doc_type),
),
]
),
)
logger.info(
f"Deleted {doc_task.doc_type}_{doc_task.doc_id} for {doc_task.user_id}"
)
return
# Handle indexing with retry
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
await _index_document(doc_task, nc_client, qdrant_client)
return # Success
except (HTTPStatusError, Exception) as e:
if attempt < max_retries - 1:
logger.warning(
f"Retry {attempt + 1}/{max_retries} for "
f"{doc_task.doc_type}_{doc_task.doc_id}: {e}"
)
await anyio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
logger.error(
f"Failed to index {doc_task.doc_type}_{doc_task.doc_id} "
f"after {max_retries} retries: {e}"
)
raise
async def _index_document(
doc_task: DocumentTask, nc_client: NextcloudClient, qdrant_client
):
"""
Index a single document (called by process_document with retry).
Args:
doc_task: Document task to index
nc_client: Authenticated Nextcloud client
qdrant_client: Qdrant client instance
"""
settings = get_settings()
# Fetch document content
if doc_task.doc_type == "note":
document = await nc_client.notes.get_note(int(doc_task.doc_id))
content = f"{document['title']}\n\n{document['content']}"
title = document["title"]
etag = document.get("etag", "")
else:
raise ValueError(f"Unsupported doc_type: {doc_task.doc_type}")
# Tokenize and chunk (using configured chunk size and overlap)
chunker = DocumentChunker(
chunk_size=settings.document_chunk_size,
overlap=settings.document_chunk_overlap,
)
chunks = chunker.chunk_text(content)
# Generate embeddings (I/O bound - external API call)
embedding_service = get_embedding_service()
embeddings = await embedding_service.embed_batch(chunks)
# Prepare Qdrant points
indexed_at = int(time.time())
points = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
# Generate deterministic UUID for point ID
# Using uuid5 with DNS namespace and combining doc info
point_name = f"{doc_task.doc_type}:{doc_task.doc_id}:chunk:{i}"
point_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, point_name))
points.append(
PointStruct(
id=point_id,
vector=embedding,
payload={
"user_id": doc_task.user_id,
"doc_id": doc_task.doc_id,
"doc_type": doc_task.doc_type,
"title": title,
"excerpt": chunk[:200],
"indexed_at": indexed_at,
"modified_at": doc_task.modified_at,
"etag": etag,
"chunk_index": i,
"total_chunks": len(chunks),
},
)
)
# Upsert to Qdrant
await qdrant_client.upsert(
collection_name=settings.get_collection_name(),
points=points,
wait=True,
)
logger.info(
f"Indexed {doc_task.doc_type}_{doc_task.doc_id} for {doc_task.user_id} "
f"({len(chunks)} chunks)"
)