3aa7128f45
Track character offsets (start_offset, end_offset) for each chunk in vector database metadata, enabling precise chunk highlighting in visualization pane. Changes: - processor.py: Store chunk_start_offset and chunk_end_offset in Qdrant metadata - processor.py: Added metadata_version=2 to indicate position tracking support - search/semantic.py: Return chunk positions from search results - server/semantic.py: Expose chunk positions in API responses (SemanticSearchResult) Enables viz pane to: 1. Display exact matched chunk with surrounding context 2. Highlight the precise portion of text that matched the query 3. Build user trust by showing what the RAG system actually retrieved Position tracking uses ChunkWithPosition dataclass from document_chunker.py which provides character-accurate offsets in the original document. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
295 lines
10 KiB
Python
295 lines
10 KiB
Python
"""Processor task for vector database synchronization.
|
|
|
|
Processes documents from stream: fetches content, generates embeddings, stores in Qdrant.
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
import uuid
|
|
|
|
import anyio
|
|
from anyio.abc import TaskStatus
|
|
from anyio.streams.memory import MemoryObjectReceiveStream
|
|
from httpx import HTTPStatusError
|
|
from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct
|
|
|
|
from nextcloud_mcp_server.client import NextcloudClient
|
|
from nextcloud_mcp_server.config import get_settings
|
|
from nextcloud_mcp_server.embedding import get_bm25_service, get_embedding_service
|
|
from nextcloud_mcp_server.observability.metrics import (
|
|
record_qdrant_operation,
|
|
record_vector_sync_processing,
|
|
update_vector_sync_queue_size,
|
|
)
|
|
from nextcloud_mcp_server.observability.tracing import trace_operation
|
|
from nextcloud_mcp_server.vector.document_chunker import DocumentChunker
|
|
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
|
|
from nextcloud_mcp_server.vector.scanner import DocumentTask
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def processor_task(
|
|
worker_id: int,
|
|
receive_stream: MemoryObjectReceiveStream[DocumentTask],
|
|
shutdown_event: anyio.Event,
|
|
nc_client: NextcloudClient,
|
|
user_id: str,
|
|
*,
|
|
task_status: TaskStatus = anyio.TASK_STATUS_IGNORED,
|
|
):
|
|
"""
|
|
Process documents from stream concurrently.
|
|
|
|
Each processor task runs in a loop:
|
|
1. Receive document from stream (with timeout)
|
|
2. Fetch content from Nextcloud
|
|
3. Tokenize and chunk text
|
|
4. Generate embeddings (I/O bound - external API)
|
|
5. Upload vectors to Qdrant
|
|
|
|
Multiple processors run concurrently for I/O parallelism.
|
|
|
|
Args:
|
|
worker_id: Worker identifier for logging
|
|
receive_stream: Stream to receive documents from
|
|
shutdown_event: Event signaling shutdown
|
|
nc_client: Authenticated Nextcloud client
|
|
user_id: User being processed
|
|
task_status: Status object for signaling task readiness
|
|
"""
|
|
logger.info(f"Processor {worker_id} started")
|
|
|
|
# Signal that the task has started and is ready
|
|
task_status.started()
|
|
|
|
while not shutdown_event.is_set():
|
|
try:
|
|
# Get document with timeout (allows checking shutdown)
|
|
with anyio.fail_after(1.0):
|
|
doc_task = await receive_stream.receive()
|
|
|
|
# Update queue size metric after receiving
|
|
stream_stats = receive_stream.statistics()
|
|
update_vector_sync_queue_size(stream_stats.current_buffer_used)
|
|
|
|
# Process document
|
|
await process_document(doc_task, nc_client)
|
|
|
|
# Update queue size metric after processing
|
|
stream_stats = receive_stream.statistics()
|
|
update_vector_sync_queue_size(stream_stats.current_buffer_used)
|
|
|
|
except TimeoutError:
|
|
# No documents available, update metric to show empty queue
|
|
stream_stats = receive_stream.statistics()
|
|
update_vector_sync_queue_size(stream_stats.current_buffer_used)
|
|
continue
|
|
|
|
except anyio.EndOfStream:
|
|
# Scanner finished and closed stream, exit gracefully
|
|
logger.info(f"Processor {worker_id}: Scanner finished, exiting")
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Processor {worker_id} error processing "
|
|
f"{doc_task.doc_type}_{doc_task.doc_id}: {e}",
|
|
exc_info=True,
|
|
)
|
|
# Continue to next document (no task_done() needed with streams)
|
|
|
|
logger.info(f"Processor {worker_id} stopped")
|
|
|
|
|
|
async def process_document(doc_task: DocumentTask, nc_client: NextcloudClient):
|
|
"""
|
|
Process a single document: fetch, tokenize, embed, store in Qdrant.
|
|
|
|
Implements retry logic with exponential backoff for transient failures.
|
|
|
|
Args:
|
|
doc_task: Document task to process
|
|
nc_client: Authenticated Nextcloud client
|
|
"""
|
|
start_time = time.time()
|
|
|
|
logger.debug(
|
|
f"Processing {doc_task.doc_type}_{doc_task.doc_id} "
|
|
f"for {doc_task.user_id} ({doc_task.operation})"
|
|
)
|
|
|
|
with trace_operation(
|
|
"vector_sync.process_document",
|
|
attributes={
|
|
"vector_sync.operation": "process",
|
|
"vector_sync.user_id": doc_task.user_id,
|
|
"vector_sync.doc_id": doc_task.doc_id,
|
|
"vector_sync.doc_type": doc_task.doc_type,
|
|
"vector_sync.doc_operation": doc_task.operation,
|
|
},
|
|
):
|
|
try:
|
|
qdrant_client = await get_qdrant_client()
|
|
settings = get_settings()
|
|
|
|
# Handle deletion
|
|
if doc_task.operation == "delete":
|
|
await qdrant_client.delete(
|
|
collection_name=settings.get_collection_name(),
|
|
points_selector=Filter(
|
|
must=[
|
|
FieldCondition(
|
|
key="user_id",
|
|
match=MatchValue(value=doc_task.user_id),
|
|
),
|
|
FieldCondition(
|
|
key="doc_id",
|
|
match=MatchValue(value=doc_task.doc_id),
|
|
),
|
|
FieldCondition(
|
|
key="doc_type",
|
|
match=MatchValue(value=doc_task.doc_type),
|
|
),
|
|
]
|
|
),
|
|
)
|
|
logger.info(
|
|
f"Deleted {doc_task.doc_type}_{doc_task.doc_id} for {doc_task.user_id}"
|
|
)
|
|
|
|
# Record successful deletion metrics
|
|
duration = time.time() - start_time
|
|
record_qdrant_operation("delete", "success")
|
|
record_vector_sync_processing(duration, "success")
|
|
return
|
|
|
|
# Handle indexing with retry
|
|
max_retries = 3
|
|
retry_delay = 1.0
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
await _index_document(doc_task, nc_client, qdrant_client)
|
|
|
|
# Record successful processing metrics
|
|
duration = time.time() - start_time
|
|
record_qdrant_operation("upsert", "success")
|
|
record_vector_sync_processing(duration, "success")
|
|
return # Success
|
|
|
|
except (HTTPStatusError, Exception) as e:
|
|
if attempt < max_retries - 1:
|
|
logger.warning(
|
|
f"Retry {attempt + 1}/{max_retries} for "
|
|
f"{doc_task.doc_type}_{doc_task.doc_id}: {e}"
|
|
)
|
|
await anyio.sleep(retry_delay)
|
|
retry_delay *= 2 # Exponential backoff
|
|
else:
|
|
logger.error(
|
|
f"Failed to index {doc_task.doc_type}_{doc_task.doc_id} "
|
|
f"after {max_retries} retries: {e}"
|
|
)
|
|
# Record failed processing metrics
|
|
duration = time.time() - start_time
|
|
record_qdrant_operation("upsert", "error")
|
|
record_vector_sync_processing(duration, "error")
|
|
raise
|
|
|
|
except Exception:
|
|
# Catch any other unexpected errors
|
|
duration = time.time() - start_time
|
|
record_vector_sync_processing(duration, "error")
|
|
raise
|
|
|
|
|
|
async def _index_document(
|
|
doc_task: DocumentTask, nc_client: NextcloudClient, qdrant_client
|
|
):
|
|
"""
|
|
Index a single document (called by process_document with retry).
|
|
|
|
Args:
|
|
doc_task: Document task to index
|
|
nc_client: Authenticated Nextcloud client
|
|
qdrant_client: Qdrant client instance
|
|
"""
|
|
settings = get_settings()
|
|
|
|
# Fetch document content
|
|
if doc_task.doc_type == "note":
|
|
document = await nc_client.notes.get_note(int(doc_task.doc_id))
|
|
content = f"{document['title']}\n\n{document['content']}"
|
|
title = document["title"]
|
|
etag = document.get("etag", "")
|
|
else:
|
|
raise ValueError(f"Unsupported doc_type: {doc_task.doc_type}")
|
|
|
|
# Tokenize and chunk (using configured chunk size and overlap)
|
|
chunker = DocumentChunker(
|
|
chunk_size=settings.document_chunk_size,
|
|
overlap=settings.document_chunk_overlap,
|
|
)
|
|
chunks = chunker.chunk_text(content)
|
|
|
|
# Extract chunk texts for embedding
|
|
chunk_texts = [chunk.text for chunk in chunks]
|
|
|
|
# Generate dense embeddings (I/O bound - external API call)
|
|
embedding_service = get_embedding_service()
|
|
dense_embeddings = await embedding_service.embed_batch(chunk_texts)
|
|
|
|
# Generate sparse embeddings (BM25 for keyword matching)
|
|
bm25_service = get_bm25_service()
|
|
sparse_embeddings = bm25_service.encode_batch(chunk_texts)
|
|
|
|
# Prepare Qdrant points
|
|
indexed_at = int(time.time())
|
|
points = []
|
|
|
|
for i, (chunk, dense_emb, sparse_emb) in enumerate(
|
|
zip(chunks, dense_embeddings, sparse_embeddings)
|
|
):
|
|
# Generate deterministic UUID for point ID
|
|
# Using uuid5 with DNS namespace and combining doc info
|
|
point_name = f"{doc_task.doc_type}:{doc_task.doc_id}:chunk:{i}"
|
|
point_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, point_name))
|
|
|
|
points.append(
|
|
PointStruct(
|
|
id=point_id,
|
|
vector={
|
|
"dense": dense_emb,
|
|
"sparse": sparse_emb,
|
|
},
|
|
payload={
|
|
"user_id": doc_task.user_id,
|
|
"doc_id": doc_task.doc_id,
|
|
"doc_type": doc_task.doc_type,
|
|
"title": title,
|
|
"excerpt": chunk.text[:200],
|
|
"indexed_at": indexed_at,
|
|
"modified_at": doc_task.modified_at,
|
|
"etag": etag,
|
|
"chunk_index": i,
|
|
"total_chunks": len(chunks),
|
|
"chunk_start_offset": chunk.start_offset,
|
|
"chunk_end_offset": chunk.end_offset,
|
|
"metadata_version": 2, # v2 includes position metadata
|
|
},
|
|
)
|
|
)
|
|
|
|
# Upsert to Qdrant
|
|
await qdrant_client.upsert(
|
|
collection_name=settings.get_collection_name(),
|
|
points=points,
|
|
wait=True,
|
|
)
|
|
|
|
logger.info(
|
|
f"Indexed {doc_task.doc_type}_{doc_task.doc_id} for {doc_task.user_id} "
|
|
f"({len(chunks)} chunks)"
|
|
)
|