diff --git a/Dockerfile b/Dockerfile index 1074519..9c78966 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,6 +7,7 @@ COPY --from=ghcr.io/astral-sh/uv:0.9.10@sha256:29bd45092ea8902c0bbb7f0a338f0494a # 2. sqlite for development with token db RUN apt update && apt install --no-install-recommends --no-install-suggests -y \ git \ + tesseract-ocr \ sqlite3 && apt clean WORKDIR /app @@ -17,5 +18,6 @@ RUN uv sync --locked --no-dev --no-editable --no-cache ENV PYTHONUNBUFFERED=1 ENV VIRTUAL_ENV=/app/.venv +ENV TESSDATA_PREFIX=/usr/share/tesseract-ocr/5/tessdata ENTRYPOINT ["/app/.venv/bin/nextcloud-mcp-server", "--host", "0.0.0.0"] diff --git a/nextcloud_mcp_server/auth/viz_routes.py b/nextcloud_mcp_server/auth/viz_routes.py index 20ae445..1567aff 100644 --- a/nextcloud_mcp_server/auth/viz_routes.py +++ b/nextcloud_mcp_server/auth/viz_routes.py @@ -27,6 +27,7 @@ from nextcloud_mcp_server.search import ( SemanticSearchAlgorithm, ) from nextcloud_mcp_server.vector.pca import PCA +from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client logger = logging.getLogger(__name__) @@ -233,6 +234,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse: # Build filter for this specific chunk must_conditions = [ + get_placeholder_filter(), # Always exclude placeholders from user-facing queries FieldCondition( key="doc_id", match=MatchValue(value=result.id), diff --git a/nextcloud_mcp_server/search/algorithms.py b/nextcloud_mcp_server/search/algorithms.py index 932a92b..c98ca33 100644 --- a/nextcloud_mcp_server/search/algorithms.py +++ b/nextcloud_mcp_server/search/algorithms.py @@ -83,6 +83,7 @@ async def get_indexed_doc_types(user_id: str) -> set[str]: from qdrant_client.models import FieldCondition, Filter, MatchValue from nextcloud_mcp_server.config import get_settings + from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client logger = logging.getLogger(__name__) @@ -97,7 +98,10 @@ async def get_indexed_doc_types(user_id: str) -> set[str]: scroll_results, _next_offset = await qdrant_client.scroll( collection_name=collection, scroll_filter=Filter( - must=[FieldCondition(key="user_id", match=MatchValue(value=user_id))] + must=[ + get_placeholder_filter(), # Exclude placeholders from doc_type discovery + FieldCondition(key="user_id", match=MatchValue(value=user_id)), + ] ), limit=1000, # Sample size to discover types with_payload=["doc_type"], diff --git a/nextcloud_mcp_server/search/bm25_hybrid.py b/nextcloud_mcp_server/search/bm25_hybrid.py index 1c4336b..851d9e4 100644 --- a/nextcloud_mcp_server/search/bm25_hybrid.py +++ b/nextcloud_mcp_server/search/bm25_hybrid.py @@ -10,6 +10,7 @@ from nextcloud_mcp_server.config import get_settings from nextcloud_mcp_server.embedding import get_bm25_service, get_embedding_service from nextcloud_mcp_server.observability.metrics import record_qdrant_operation from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult +from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client logger = logging.getLogger(__name__) @@ -112,10 +113,11 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm): # Build Qdrant filter filter_conditions = [ + get_placeholder_filter(), # Always exclude placeholders from user-facing queries FieldCondition( key="user_id", match=MatchValue(value=user_id), - ) + ), ] # Add doc_type filter if specified diff --git a/nextcloud_mcp_server/search/semantic.py b/nextcloud_mcp_server/search/semantic.py index e1c52cf..f3c0ca7 100644 --- a/nextcloud_mcp_server/search/semantic.py +++ b/nextcloud_mcp_server/search/semantic.py @@ -9,6 +9,7 @@ from nextcloud_mcp_server.config import get_settings from nextcloud_mcp_server.embedding import get_embedding_service from nextcloud_mcp_server.observability.metrics import record_qdrant_operation from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult +from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client logger = logging.getLogger(__name__) @@ -83,10 +84,11 @@ class SemanticSearchAlgorithm(SearchAlgorithm): # Build Qdrant filter filter_conditions = [ + get_placeholder_filter(), # Always exclude placeholders from user-facing queries FieldCondition( key="user_id", match=MatchValue(value=user_id), - ) + ), ] # Add doc_type filter if specified diff --git a/nextcloud_mcp_server/vector/placeholder.py b/nextcloud_mcp_server/vector/placeholder.py new file mode 100644 index 0000000..eac10c1 --- /dev/null +++ b/nextcloud_mcp_server/vector/placeholder.py @@ -0,0 +1,300 @@ +"""Placeholder point management for Qdrant state tracking. + +Placeholders are zero-vector points stored in Qdrant to track document processing +state. They prevent duplicate work by marking documents as "in-flight" during the +gap between scanner queuing and processor completion. + +Architecture: +- Scanner writes placeholders when queuing documents for processing +- Processor deletes placeholders and writes real vectors after processing +- All user-facing queries filter out placeholders (is_placeholder: False) + +Placeholders contain: +- Zero vectors (dimension from embedding service) +- is_placeholder: True flag (for filtering) +- status: "pending", "processing", "completed", "failed" +- modified_at, etag from source document +- queued_at timestamp +""" + +import logging +import time +import uuid + +from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct + +from nextcloud_mcp_server.config import get_settings +from nextcloud_mcp_server.embedding import get_embedding_service +from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client + +logger = logging.getLogger(__name__) + + +def _generate_placeholder_id(doc_type: str, doc_id: str | int) -> str: + """Generate deterministic UUID for placeholder point. + + Args: + doc_type: Document type (note, file, etc.) + doc_id: Document ID + + Returns: + UUID string for point ID + """ + point_name = f"{doc_type}:{doc_id}:placeholder" + return str(uuid.uuid5(uuid.NAMESPACE_DNS, point_name)) + + +async def write_placeholder_point( + doc_id: str | int, + doc_type: str, + user_id: str, + modified_at: int, + etag: str = "", + file_path: str | None = None, +) -> None: + """Write a placeholder point to Qdrant to mark document as queued. + + This should be called by the scanner BEFORE queuing a document for processing. + The placeholder prevents duplicate work if the scanner runs again before + processing completes. + + Args: + doc_id: Document ID (int for notes/files) + doc_type: Document type (note, file, etc.) + user_id: User ID who owns the document + modified_at: Document modification timestamp + etag: Document ETag (if available) + file_path: File path (for files only) + + Raises: + Exception: If Qdrant write fails + """ + try: + qdrant_client = await get_qdrant_client() + settings = get_settings() + embedding_service = get_embedding_service() + + # Get dimension dynamically (never hardcode) + dimension = embedding_service.get_dimension() + + # Create zero vectors + zero_dense = [0.0] * dimension + + # Generate deterministic point ID + point_id = _generate_placeholder_id(doc_type, doc_id) + + # Build payload + payload = { + "user_id": user_id, + "doc_id": doc_id, + "doc_type": doc_type, + "is_placeholder": True, + "status": "pending", + "modified_at": modified_at, + "etag": etag, + "queued_at": int(time.time()), + } + + # Add file_path for files + if doc_type == "file" and file_path: + payload["file_path"] = file_path + + # Create placeholder point + point = PointStruct( + id=point_id, + vector={ + "dense": zero_dense, + "sparse": None, # No sparse vector for placeholders + }, + payload=payload, + ) + + # Upsert to Qdrant + await qdrant_client.upsert( + collection_name=settings.get_collection_name(), + points=[point], + wait=True, + ) + + logger.debug( + f"Wrote placeholder for {doc_type}_{doc_id} (user={user_id}, " + f"modified_at={modified_at})" + ) + + except Exception as e: + logger.error( + f"Failed to write placeholder for {doc_type}_{doc_id}: {e}", + exc_info=True, + ) + raise + + +async def query_document_metadata( + doc_id: str | int, + doc_type: str, + user_id: str, +) -> dict | None: + """Query Qdrant for existing document entry (placeholder or real). + + Returns the payload of the first matching point, which could be: + - A placeholder (is_placeholder: True) + - A real indexed document (is_placeholder: False or missing) + - None if document not in Qdrant + + Args: + doc_id: Document ID + doc_type: Document type + user_id: User ID + + Returns: + Payload dict if found, None otherwise + """ + try: + qdrant_client = await get_qdrant_client() + settings = get_settings() + + # Query for any entry matching doc_id, doc_type, user_id + scroll_result = await qdrant_client.scroll( + collection_name=settings.get_collection_name(), + scroll_filter=Filter( + must=[ + FieldCondition(key="user_id", match=MatchValue(value=user_id)), + FieldCondition(key="doc_id", match=MatchValue(value=doc_id)), + FieldCondition(key="doc_type", match=MatchValue(value=doc_type)), + ] + ), + limit=1, + with_payload=True, + with_vectors=False, + ) + + if scroll_result[0]: + point = scroll_result[0][0] + return dict(point.payload) + + return None + + except Exception as e: + logger.warning(f"Error querying document metadata for {doc_type}_{doc_id}: {e}") + return None + + +async def delete_placeholder_point( + doc_id: str | int, + doc_type: str, + user_id: str, +) -> None: + """Delete a placeholder point from Qdrant. + + This should be called by the processor BEFORE writing real vectors. + We delete the placeholder to avoid duplicates, then write the real chunks. + + Args: + doc_id: Document ID + doc_type: Document type + user_id: User ID + + Raises: + Exception: If Qdrant delete fails + """ + try: + qdrant_client = await get_qdrant_client() + settings = get_settings() + + # Delete by filter (in case there are multiple chunks from old indexing) + await qdrant_client.delete( + collection_name=settings.get_collection_name(), + points_selector=Filter( + must=[ + FieldCondition(key="user_id", match=MatchValue(value=user_id)), + FieldCondition(key="doc_id", match=MatchValue(value=doc_id)), + FieldCondition(key="doc_type", match=MatchValue(value=doc_type)), + FieldCondition(key="is_placeholder", match=MatchValue(value=True)), + ] + ), + ) + + logger.debug(f"Deleted placeholder for {doc_type}_{doc_id} (user={user_id})") + + except Exception as e: + logger.error( + f"Failed to delete placeholder for {doc_type}_{doc_id}: {e}", + exc_info=True, + ) + raise + + +async def update_placeholder_status( + doc_id: str | int, + doc_type: str, + user_id: str, + status: str, +) -> None: + """Update the status field of a placeholder point. + + Status values: + - "pending": Queued for processing + - "processing": Currently being processed + - "completed": Processing completed successfully + - "failed": Processing failed + + Args: + doc_id: Document ID + doc_type: Document type + user_id: User ID + status: New status value + + Raises: + Exception: If Qdrant update fails + """ + try: + qdrant_client = await get_qdrant_client() + settings = get_settings() + + # Update payload using set_payload + await qdrant_client.set_payload( + collection_name=settings.get_collection_name(), + payload={"status": status}, + points=Filter( + must=[ + FieldCondition(key="user_id", match=MatchValue(value=user_id)), + FieldCondition(key="doc_id", match=MatchValue(value=doc_id)), + FieldCondition(key="doc_type", match=MatchValue(value=doc_type)), + FieldCondition(key="is_placeholder", match=MatchValue(value=True)), + ] + ), + ) + + logger.debug( + f"Updated placeholder status for {doc_type}_{doc_id} to '{status}' " + f"(user={user_id})" + ) + + except Exception as e: + logger.warning( + f"Failed to update placeholder status for {doc_type}_{doc_id}: {e}" + ) + # Don't raise - status updates are non-critical + + +def get_placeholder_filter() -> FieldCondition: + """Get a filter condition to exclude placeholders from queries. + + Add this to all user-facing search/visualization queries to ensure + placeholders are never returned to users. + + Returns: + FieldCondition that filters out is_placeholder: True + + Example: + Filter( + must=[ + get_placeholder_filter(), # Exclude placeholders + FieldCondition(key="user_id", match=MatchValue(value=user_id)), + ] + ) + """ + return FieldCondition( + key="is_placeholder", + match=MatchValue(value=False), + ) diff --git a/nextcloud_mcp_server/vector/processor.py b/nextcloud_mcp_server/vector/processor.py index 74aed47..ab2bc55 100644 --- a/nextcloud_mcp_server/vector/processor.py +++ b/nextcloud_mcp_server/vector/processor.py @@ -23,6 +23,7 @@ from nextcloud_mcp_server.observability.metrics import ( ) from nextcloud_mcp_server.observability.tracing import trace_operation from nextcloud_mcp_server.vector.document_chunker import DocumentChunker +from nextcloud_mcp_server.vector.placeholder import delete_placeholder_point from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client from nextcloud_mcp_server.vector.scanner import DocumentTask @@ -418,6 +419,20 @@ async def _index_document( ) ) + # Delete placeholder before writing real vectors + # This prevents duplicates and cleans up the placeholder state + try: + await delete_placeholder_point( + doc_id=doc_task.doc_id, + doc_type=doc_task.doc_type, + user_id=doc_task.user_id, + ) + except Exception as e: + # Log but don't fail indexing if placeholder deletion fails + logger.warning( + f"Failed to delete placeholder for {doc_task.doc_type}_{doc_task.doc_id}: {e}" + ) + # Upsert to Qdrant await qdrant_client.upsert( collection_name=settings.get_collection_name(), diff --git a/nextcloud_mcp_server/vector/scanner.py b/nextcloud_mcp_server/vector/scanner.py index a41277e..c33fc23 100644 --- a/nextcloud_mcp_server/vector/scanner.py +++ b/nextcloud_mcp_server/vector/scanner.py @@ -17,6 +17,10 @@ from nextcloud_mcp_server.client import NextcloudClient from nextcloud_mcp_server.config import get_settings from nextcloud_mcp_server.observability.metrics import record_vector_sync_scan from nextcloud_mcp_server.observability.tracing import trace_operation +from nextcloud_mcp_server.vector.placeholder import ( + query_document_metadata, + write_placeholder_point, +) from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client logger = logging.getLogger(__name__) @@ -184,8 +188,9 @@ async def scan_user_documents( f"[SCAN-{scan_id}] Using pruneBefore={prune_before} to optimize data transfer" ) - # Get indexed state from Qdrant first (for incremental sync) - indexed_docs = {} + # For deletion tracking, get all doc_ids in Qdrant (for incremental sync) + # Note: We no longer bulk-query indexed_at, instead check per-document + indexed_doc_ids = set() if not initial_sync: qdrant_client = await get_qdrant_client() scroll_result = await qdrant_client.scroll( @@ -196,17 +201,14 @@ async def scan_user_documents( FieldCondition(key="doc_type", match=MatchValue(value="note")), ] ), - with_payload=["doc_id", "indexed_at"], + with_payload=["doc_id"], with_vectors=False, limit=10000, ) - indexed_docs = { - point.payload["doc_id"]: point.payload["indexed_at"] - for point in scroll_result[0] - } + indexed_doc_ids = {point.payload["doc_id"] for point in scroll_result[0]} - logger.debug(f"Found {len(indexed_docs)} indexed documents in Qdrant") + logger.debug(f"Found {len(indexed_doc_ids)} indexed documents in Qdrant") # Stream notes from Nextcloud and process immediately note_count = 0 @@ -220,7 +222,14 @@ async def scan_user_documents( modified_at = note.get("modified", 0) if initial_sync: - # Send everything on first sync + # Send everything on first sync - write placeholder first + await write_placeholder_point( + doc_id=doc_id, + doc_type="note", + user_id=user_id, + modified_at=modified_at, + etag=note.get("etag", ""), + ) await send_stream.send( DocumentTask( user_id=user_id, @@ -232,9 +241,7 @@ async def scan_user_documents( ) queued += 1 else: - # Incremental sync: compare with indexed state - indexed_at = indexed_docs.get(doc_id) - + # Incremental sync: check if document exists and compare modified_at # If document reappeared, remove from potentially_deleted doc_key = (user_id, doc_id) if doc_key in _potentially_deleted: @@ -243,8 +250,36 @@ async def scan_user_documents( ) del _potentially_deleted[doc_key] + # Query Qdrant for existing entry (placeholder or real) + existing_metadata = await query_document_metadata( + doc_id=doc_id, doc_type="note", user_id=user_id + ) + # Send if never indexed or modified since last index - if indexed_at is None or modified_at > indexed_at: + # Compare against stored modified_at (not indexed_at!) + needs_indexing = False + if existing_metadata is None: + # Never seen before + needs_indexing = True + elif existing_metadata.get("modified_at", 0) < modified_at: + # Document modified since last indexing + needs_indexing = True + elif existing_metadata.get("is_placeholder", False): + # Placeholder exists but processing may have failed - requeue + logger.debug( + f"Found existing placeholder for note {doc_id}, requeuing" + ) + needs_indexing = True + + if needs_indexing: + # Write placeholder before queuing + await write_placeholder_point( + doc_id=doc_id, + doc_type="note", + user_id=user_id, + modified_at=modified_at, + etag=note.get("etag", ""), + ) await send_stream.send( DocumentTask( user_id=user_id, @@ -272,7 +307,7 @@ async def scan_user_documents( ) # Allow 1.5 scan intervals current_time = time.time() - for doc_id in indexed_docs: + for doc_id in indexed_doc_ids: if doc_id not in nextcloud_doc_ids: doc_key = (user_id, doc_id) @@ -312,8 +347,8 @@ async def scan_user_documents( _potentially_deleted[doc_key] = current_time # Scan tagged PDF files (after notes) - # Get indexed files from Qdrant (separate query for doc_type="file") - indexed_files = {} + # Get indexed file IDs from Qdrant (for deletion tracking) + indexed_file_ids = set() if not initial_sync: file_scroll_result = await qdrant_client.scroll( collection_name=settings.get_collection_name(), @@ -324,16 +359,15 @@ async def scan_user_documents( ] ), limit=10000, # Reasonable limit for file count - with_payload=["doc_id", "indexed_at"], + with_payload=["doc_id"], with_vectors=False, ) - indexed_files = { - point.payload["doc_id"]: point.payload["indexed_at"] - for point in file_scroll_result[0] + indexed_file_ids = { + point.payload["doc_id"] for point in file_scroll_result[0] } - logger.debug(f"Found {len(indexed_files)} indexed files in Qdrant") + logger.debug(f"Found {len(indexed_file_ids)} indexed files in Qdrant") # Scan for tagged PDF files file_count = 0 @@ -370,7 +404,14 @@ async def scan_user_documents( pass if initial_sync: - # Send everything on first sync + # Send everything on first sync - write placeholder first + await write_placeholder_point( + doc_id=file_id, + doc_type="file", + user_id=user_id, + modified_at=modified_at, + file_path=file_path, + ) await send_stream.send( DocumentTask( user_id=user_id, @@ -383,9 +424,7 @@ async def scan_user_documents( ) file_queued += 1 else: - # Incremental sync: compare with indexed state - indexed_at = indexed_files.get(file_id) - + # Incremental sync: check if file exists and compare modified_at # If file reappeared, remove from potentially_deleted file_key = (user_id, file_id) if file_key in _potentially_deleted: @@ -394,8 +433,36 @@ async def scan_user_documents( ) del _potentially_deleted[file_key] + # Query Qdrant for existing entry (placeholder or real) + existing_metadata = await query_document_metadata( + doc_id=file_id, doc_type="file", user_id=user_id + ) + # Send if never indexed or modified since last index - if indexed_at is None or modified_at > indexed_at: + # Compare against stored modified_at (not indexed_at!) + needs_indexing = False + if existing_metadata is None: + # Never seen before + needs_indexing = True + elif existing_metadata.get("modified_at", 0) < modified_at: + # File modified since last indexing + needs_indexing = True + elif existing_metadata.get("is_placeholder", False): + # Placeholder exists but processing may have failed - requeue + logger.debug( + f"Found existing placeholder for file {file_path} (ID: {file_id}), requeuing" + ) + needs_indexing = True + + if needs_indexing: + # Write placeholder before queuing + await write_placeholder_point( + doc_id=file_id, + doc_type="file", + user_id=user_id, + modified_at=modified_at, + file_path=file_path, + ) await send_stream.send( DocumentTask( user_id=user_id, @@ -415,7 +482,7 @@ async def scan_user_documents( # Check for deleted files (not initial sync) if not initial_sync: - for file_id in indexed_files: + for file_id in indexed_file_ids: if file_id not in nextcloud_file_ids: file_key = (user_id, file_id)