5a251a99e6
The processor was not setting is_placeholder field when writing real document chunks to Qdrant. This caused the placeholder filter to exclude all documents (since None != False), resulting in 0 search results. Now explicitly sets is_placeholder: False in payload when writing real indexed chunks, allowing search filters to correctly distinguish between placeholders and real documents.
448 lines
17 KiB
Python
448 lines
17 KiB
Python
"""Processor task for vector database synchronization.
|
|
|
|
Processes documents from stream: fetches content, generates embeddings, stores in Qdrant.
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
import uuid
|
|
|
|
import anyio
|
|
from anyio.abc import TaskStatus
|
|
from anyio.streams.memory import MemoryObjectReceiveStream
|
|
from httpx import HTTPStatusError
|
|
from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct
|
|
|
|
from nextcloud_mcp_server.client import NextcloudClient
|
|
from nextcloud_mcp_server.config import get_settings
|
|
from nextcloud_mcp_server.embedding import get_bm25_service, get_embedding_service
|
|
from nextcloud_mcp_server.observability.metrics import (
|
|
record_qdrant_operation,
|
|
record_vector_sync_processing,
|
|
update_vector_sync_queue_size,
|
|
)
|
|
from nextcloud_mcp_server.observability.tracing import trace_operation
|
|
from nextcloud_mcp_server.vector.document_chunker import DocumentChunker
|
|
from nextcloud_mcp_server.vector.placeholder import delete_placeholder_point
|
|
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
|
|
from nextcloud_mcp_server.vector.scanner import DocumentTask
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def assign_page_numbers(chunks, page_boundaries):
|
|
"""Assign page numbers to chunks based on page boundaries.
|
|
|
|
Each chunk gets the page number where most of its content appears.
|
|
For chunks spanning multiple pages, assigns the page containing the
|
|
majority of the chunk's characters.
|
|
|
|
Args:
|
|
chunks: List of ChunkWithPosition objects
|
|
page_boundaries: List of dicts with {page, start_offset, end_offset}
|
|
|
|
Returns:
|
|
None (modifies chunks in place)
|
|
"""
|
|
if not page_boundaries:
|
|
return
|
|
|
|
for chunk in chunks:
|
|
# Find which page(s) this chunk overlaps with
|
|
max_overlap = 0
|
|
assigned_page = None
|
|
|
|
for boundary in page_boundaries:
|
|
# Calculate overlap between chunk and page
|
|
overlap_start = max(chunk.start_offset, boundary["start_offset"])
|
|
overlap_end = min(chunk.end_offset, boundary["end_offset"])
|
|
overlap = max(0, overlap_end - overlap_start)
|
|
|
|
# Assign to page with maximum overlap
|
|
if overlap > max_overlap:
|
|
max_overlap = overlap
|
|
assigned_page = boundary["page"]
|
|
|
|
if assigned_page is not None:
|
|
chunk.page_number = assigned_page
|
|
|
|
|
|
async def processor_task(
|
|
worker_id: int,
|
|
receive_stream: MemoryObjectReceiveStream[DocumentTask],
|
|
shutdown_event: anyio.Event,
|
|
nc_client: NextcloudClient,
|
|
user_id: str,
|
|
*,
|
|
task_status: TaskStatus = anyio.TASK_STATUS_IGNORED,
|
|
):
|
|
"""
|
|
Process documents from stream concurrently.
|
|
|
|
Each processor task runs in a loop:
|
|
1. Receive document from stream (with timeout)
|
|
2. Fetch content from Nextcloud
|
|
3. Tokenize and chunk text
|
|
4. Generate embeddings (I/O bound - external API)
|
|
5. Upload vectors to Qdrant
|
|
|
|
Multiple processors run concurrently for I/O parallelism.
|
|
|
|
Args:
|
|
worker_id: Worker identifier for logging
|
|
receive_stream: Stream to receive documents from
|
|
shutdown_event: Event signaling shutdown
|
|
nc_client: Authenticated Nextcloud client
|
|
user_id: User being processed
|
|
task_status: Status object for signaling task readiness
|
|
"""
|
|
logger.info(f"Processor {worker_id} started")
|
|
|
|
# Signal that the task has started and is ready
|
|
task_status.started()
|
|
|
|
while not shutdown_event.is_set():
|
|
try:
|
|
# Get document with timeout (allows checking shutdown)
|
|
with anyio.fail_after(1.0):
|
|
doc_task = await receive_stream.receive()
|
|
|
|
# Update queue size metric after receiving
|
|
stream_stats = receive_stream.statistics()
|
|
update_vector_sync_queue_size(stream_stats.current_buffer_used)
|
|
|
|
# Process document
|
|
await process_document(doc_task, nc_client)
|
|
|
|
# Update queue size metric after processing
|
|
stream_stats = receive_stream.statistics()
|
|
update_vector_sync_queue_size(stream_stats.current_buffer_used)
|
|
|
|
except TimeoutError:
|
|
# No documents available, update metric to show empty queue
|
|
stream_stats = receive_stream.statistics()
|
|
update_vector_sync_queue_size(stream_stats.current_buffer_used)
|
|
continue
|
|
|
|
except anyio.EndOfStream:
|
|
# Scanner finished and closed stream, exit gracefully
|
|
logger.info(f"Processor {worker_id}: Scanner finished, exiting")
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Processor {worker_id} error processing "
|
|
f"{doc_task.doc_type}_{doc_task.doc_id}: {e}",
|
|
exc_info=True,
|
|
)
|
|
# Continue to next document (no task_done() needed with streams)
|
|
|
|
logger.info(f"Processor {worker_id} stopped")
|
|
|
|
|
|
async def process_document(doc_task: DocumentTask, nc_client: NextcloudClient):
|
|
"""
|
|
Process a single document: fetch, tokenize, embed, store in Qdrant.
|
|
|
|
Implements retry logic with exponential backoff for transient failures.
|
|
|
|
Args:
|
|
doc_task: Document task to process
|
|
nc_client: Authenticated Nextcloud client
|
|
"""
|
|
start_time = time.time()
|
|
|
|
logger.debug(
|
|
f"Processing {doc_task.doc_type}_{doc_task.doc_id} "
|
|
f"for {doc_task.user_id} ({doc_task.operation})"
|
|
)
|
|
|
|
with trace_operation(
|
|
"vector_sync.process_document",
|
|
attributes={
|
|
"vector_sync.operation": "process",
|
|
"vector_sync.user_id": doc_task.user_id,
|
|
"vector_sync.doc_id": doc_task.doc_id,
|
|
"vector_sync.doc_type": doc_task.doc_type,
|
|
"vector_sync.doc_operation": doc_task.operation,
|
|
},
|
|
):
|
|
try:
|
|
qdrant_client = await get_qdrant_client()
|
|
settings = get_settings()
|
|
|
|
# Handle deletion
|
|
if doc_task.operation == "delete":
|
|
await qdrant_client.delete(
|
|
collection_name=settings.get_collection_name(),
|
|
points_selector=Filter(
|
|
must=[
|
|
FieldCondition(
|
|
key="user_id",
|
|
match=MatchValue(value=doc_task.user_id),
|
|
),
|
|
FieldCondition(
|
|
key="doc_id",
|
|
match=MatchValue(value=doc_task.doc_id),
|
|
),
|
|
FieldCondition(
|
|
key="doc_type",
|
|
match=MatchValue(value=doc_task.doc_type),
|
|
),
|
|
]
|
|
),
|
|
)
|
|
logger.info(
|
|
f"Deleted {doc_task.doc_type}_{doc_task.doc_id} for {doc_task.user_id}"
|
|
)
|
|
|
|
# Record successful deletion metrics
|
|
duration = time.time() - start_time
|
|
record_qdrant_operation("delete", "success")
|
|
record_vector_sync_processing(duration, "success")
|
|
return
|
|
|
|
# Handle indexing with retry
|
|
max_retries = 3
|
|
retry_delay = 1.0
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
await _index_document(doc_task, nc_client, qdrant_client)
|
|
|
|
# Record successful processing metrics
|
|
duration = time.time() - start_time
|
|
record_qdrant_operation("upsert", "success")
|
|
record_vector_sync_processing(duration, "success")
|
|
return # Success
|
|
|
|
except (HTTPStatusError, Exception) as e:
|
|
if attempt < max_retries - 1:
|
|
logger.warning(
|
|
f"Retry {attempt + 1}/{max_retries} for "
|
|
f"{doc_task.doc_type}_{doc_task.doc_id}: {e}"
|
|
)
|
|
await anyio.sleep(retry_delay)
|
|
retry_delay *= 2 # Exponential backoff
|
|
else:
|
|
logger.error(
|
|
f"Failed to index {doc_task.doc_type}_{doc_task.doc_id} "
|
|
f"after {max_retries} retries: {e}"
|
|
)
|
|
# Record failed processing metrics
|
|
duration = time.time() - start_time
|
|
record_qdrant_operation("upsert", "error")
|
|
record_vector_sync_processing(duration, "error")
|
|
raise
|
|
|
|
except Exception:
|
|
# Catch any other unexpected errors
|
|
duration = time.time() - start_time
|
|
record_vector_sync_processing(duration, "error")
|
|
raise
|
|
|
|
|
|
async def _index_document(
|
|
doc_task: DocumentTask, nc_client: NextcloudClient, qdrant_client
|
|
):
|
|
"""
|
|
Index a single document (called by process_document with retry).
|
|
|
|
Args:
|
|
doc_task: Document task to index
|
|
nc_client: Authenticated Nextcloud client
|
|
qdrant_client: Qdrant client instance
|
|
"""
|
|
settings = get_settings()
|
|
|
|
# Fetch document content
|
|
if doc_task.doc_type == "note":
|
|
document = await nc_client.notes.get_note(int(doc_task.doc_id))
|
|
content = f"{document['title']}\n\n{document['content']}"
|
|
title = document["title"]
|
|
etag = document.get("etag", "")
|
|
file_metadata = {} # No file-specific metadata for notes
|
|
file_path = None # Notes don't have file paths
|
|
elif doc_task.doc_type == "file":
|
|
# For files, doc_id is now the numeric file ID, file_path comes from DocumentTask
|
|
if not doc_task.file_path:
|
|
raise ValueError(
|
|
f"File path required for file indexing but not provided (file_id={doc_task.doc_id})"
|
|
)
|
|
file_path = doc_task.file_path
|
|
|
|
# Read file content via WebDAV
|
|
content_bytes, content_type = await nc_client.webdav.read_file(file_path)
|
|
|
|
# Use document processor registry to extract text
|
|
from nextcloud_mcp_server.document_processors import get_registry
|
|
|
|
registry = get_registry()
|
|
|
|
try:
|
|
result = await registry.process(
|
|
content=content_bytes,
|
|
content_type=content_type,
|
|
filename=file_path,
|
|
)
|
|
content = result.text
|
|
file_metadata = result.metadata
|
|
title = file_metadata.get("title") or file_path.split("/")[-1]
|
|
etag = "" # WebDAV read_file doesn't return etag
|
|
|
|
# Diagnostic: Log page boundary information if available
|
|
if "page_boundaries" in file_metadata:
|
|
page_boundaries = file_metadata["page_boundaries"]
|
|
logger.info(
|
|
f"Page boundaries for {file_path}: "
|
|
f"{len(page_boundaries)} pages, text length: {len(content)}"
|
|
)
|
|
# Log first 3 page boundaries for debugging
|
|
for boundary in page_boundaries[:3]:
|
|
logger.debug(
|
|
f" Page {boundary['page']}: "
|
|
f"offsets [{boundary['start_offset']}:{boundary['end_offset']}]"
|
|
)
|
|
# Verify last boundary matches text length
|
|
if page_boundaries:
|
|
last_boundary = page_boundaries[-1]
|
|
if last_boundary["end_offset"] != len(content):
|
|
logger.warning(
|
|
f"Text length mismatch: content={len(content)}, "
|
|
f"last_boundary_end={last_boundary['end_offset']}"
|
|
)
|
|
else:
|
|
logger.debug(f"No page_boundaries in metadata for {file_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to process file {file_path}: {e}")
|
|
raise
|
|
else:
|
|
raise ValueError(f"Unsupported doc_type: {doc_task.doc_type}")
|
|
|
|
# Tokenize and chunk (using configured chunk size and overlap)
|
|
chunker = DocumentChunker(
|
|
chunk_size=settings.document_chunk_size,
|
|
overlap=settings.document_chunk_overlap,
|
|
)
|
|
chunks = await chunker.chunk_text(content)
|
|
|
|
# Assign page numbers to chunks if page boundaries are available (PDFs)
|
|
if doc_task.doc_type == "file" and "page_boundaries" in file_metadata:
|
|
assign_page_numbers(chunks, file_metadata["page_boundaries"])
|
|
|
|
# Diagnostic: Verify page number assignment
|
|
assigned_count = sum(1 for c in chunks if c.page_number is not None)
|
|
logger.info(
|
|
f"Assigned page numbers to {assigned_count}/{len(chunks)} chunks "
|
|
f"for {file_path}"
|
|
)
|
|
|
|
# Log first 3 chunks to see their page assignments
|
|
for i, chunk in enumerate(chunks[:3]):
|
|
logger.debug(
|
|
f" Chunk {i}: page={chunk.page_number}, "
|
|
f"offsets=[{chunk.start_offset}:{chunk.end_offset}]"
|
|
)
|
|
|
|
# Warning if NO page numbers were assigned
|
|
if assigned_count == 0:
|
|
logger.warning(
|
|
f"NO page numbers assigned! "
|
|
f"Text length: {len(content)}, "
|
|
f"Chunks: {len(chunks)}, "
|
|
f"Chunk offset range: [{chunks[0].start_offset}:{chunks[-1].end_offset}], "
|
|
f"Page boundaries: {len(file_metadata['page_boundaries'])} pages, "
|
|
f"First boundary: {file_metadata['page_boundaries'][0] if file_metadata['page_boundaries'] else 'None'}"
|
|
)
|
|
|
|
# Extract chunk texts for embedding
|
|
chunk_texts = [chunk.text for chunk in chunks]
|
|
|
|
# Generate dense embeddings (I/O bound - external API call)
|
|
embedding_service = get_embedding_service()
|
|
dense_embeddings = await embedding_service.embed_batch(chunk_texts)
|
|
|
|
# Generate sparse embeddings (BM25 for keyword matching)
|
|
bm25_service = get_bm25_service()
|
|
sparse_embeddings = await bm25_service.encode_batch(chunk_texts)
|
|
|
|
# Prepare Qdrant points
|
|
indexed_at = int(time.time())
|
|
points = []
|
|
|
|
for i, (chunk, dense_emb, sparse_emb) in enumerate(
|
|
zip(chunks, dense_embeddings, sparse_embeddings)
|
|
):
|
|
# Generate deterministic UUID for point ID
|
|
# Using uuid5 with DNS namespace and combining doc info
|
|
point_name = f"{doc_task.doc_type}:{doc_task.doc_id}:chunk:{i}"
|
|
point_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, point_name))
|
|
|
|
points.append(
|
|
PointStruct(
|
|
id=point_id,
|
|
vector={
|
|
"dense": dense_emb,
|
|
"sparse": sparse_emb,
|
|
},
|
|
payload={
|
|
"user_id": doc_task.user_id,
|
|
"doc_id": doc_task.doc_id,
|
|
"doc_type": doc_task.doc_type,
|
|
"is_placeholder": False, # Real indexed document (not placeholder)
|
|
"title": title,
|
|
"excerpt": chunk.text[:200],
|
|
"indexed_at": indexed_at,
|
|
"modified_at": doc_task.modified_at,
|
|
"etag": etag,
|
|
"chunk_index": i,
|
|
"total_chunks": len(chunks),
|
|
"chunk_start_offset": chunk.start_offset,
|
|
"chunk_end_offset": chunk.end_offset,
|
|
"metadata_version": 2, # v2 includes position metadata
|
|
# File-specific metadata (PDF, etc.)
|
|
**(
|
|
{
|
|
"file_path": file_path, # Store file path for retrieval
|
|
"mime_type": content_type, # From WebDAV response
|
|
"file_size": file_metadata.get("file_size"),
|
|
"page_number": chunk.page_number,
|
|
"page_count": file_metadata.get("page_count"),
|
|
"author": file_metadata.get("author"),
|
|
"creation_date": file_metadata.get("creation_date"),
|
|
"has_images": file_metadata.get("has_images", False),
|
|
"image_count": file_metadata.get("image_count", 0),
|
|
}
|
|
if doc_task.doc_type == "file"
|
|
else {}
|
|
),
|
|
},
|
|
)
|
|
)
|
|
|
|
# Delete placeholder before writing real vectors
|
|
# This prevents duplicates and cleans up the placeholder state
|
|
try:
|
|
await delete_placeholder_point(
|
|
doc_id=doc_task.doc_id,
|
|
doc_type=doc_task.doc_type,
|
|
user_id=doc_task.user_id,
|
|
)
|
|
except Exception as e:
|
|
# Log but don't fail indexing if placeholder deletion fails
|
|
logger.warning(
|
|
f"Failed to delete placeholder for {doc_task.doc_type}_{doc_task.doc_id}: {e}"
|
|
)
|
|
|
|
# Upsert to Qdrant
|
|
await qdrant_client.upsert(
|
|
collection_name=settings.get_collection_name(),
|
|
points=points,
|
|
wait=True,
|
|
)
|
|
|
|
logger.info(
|
|
f"Indexed {doc_task.doc_type}_{doc_task.doc_id} for {doc_task.user_id} "
|
|
f"({len(chunks)} chunks)"
|
|
)
|