diff --git a/nextcloud_mcp_server/models/notes.py b/nextcloud_mcp_server/models/notes.py index bf2f3b1..88bd221 100644 --- a/nextcloud_mcp_server/models/notes.py +++ b/nextcloud_mcp_server/models/notes.py @@ -146,3 +146,29 @@ class SamplingSearchResponse(BaseResponse): stop_reason: Optional[str] = Field( default=None, description="Reason generation stopped" ) + + +class VectorSyncStatusResponse(BaseResponse): + """Response for vector sync status. + + Provides information about the current state of vector sync, + including how many documents are indexed and how many are pending. + + Attributes: + indexed_count: Number of documents in Qdrant vector database + pending_count: Number of documents in processing queue + status: Current sync status ("idle" or "syncing") + enabled: Whether vector sync is enabled + """ + + indexed_count: int = Field( + default=0, description="Number of documents indexed in vector database" + ) + pending_count: int = Field( + default=0, description="Number of documents pending processing" + ) + status: str = Field( + default="disabled", + description='Sync status: "idle", "syncing", or "disabled"', + ) + enabled: bool = Field(default=False, description="Whether vector sync is enabled") diff --git a/nextcloud_mcp_server/server/notes.py b/nextcloud_mcp_server/server/notes.py index ed642e6..704b5b3 100644 --- a/nextcloud_mcp_server/server/notes.py +++ b/nextcloud_mcp_server/server/notes.py @@ -25,6 +25,7 @@ from nextcloud_mcp_server.models.notes import ( SemanticSearchNotesResponse, SemanticSearchResult, UpdateNoteResponse, + VectorSyncStatusResponse, ) logger = logging.getLogger(__name__) @@ -726,3 +727,85 @@ def configure_notes_tools(mcp: FastMCP): message=f"Failed to delete note {note_id}: server error ({e.response.status_code})", ) ) + + @mcp.tool() + async def nc_notes_get_vector_sync_status(ctx: Context) -> VectorSyncStatusResponse: + """Get the current vector sync status. + + Returns information about the vector sync process, including: + - Number of documents indexed in the vector database + - Number of documents pending processing + - Current sync status (idle, syncing, or disabled) + + This is useful for determining when vector indexing is complete + after creating or updating notes. + """ + import os + + # Check if vector sync is enabled + vector_sync_enabled = ( + os.getenv("VECTOR_SYNC_ENABLED", "false").lower() == "true" + ) + + if not vector_sync_enabled: + return VectorSyncStatusResponse( + indexed_count=0, + pending_count=0, + status="disabled", + enabled=False, + ) + + try: + # Get document queue from lifespan context + lifespan_ctx = ctx.request_context.lifespan_context + document_queue = getattr(lifespan_ctx, "document_queue", None) + + if document_queue is None: + logger.debug("document_queue not available in lifespan context") + return VectorSyncStatusResponse( + indexed_count=0, + pending_count=0, + status="unknown", + enabled=True, + ) + + # Get pending count from queue + pending_count = document_queue.qsize() + + # Get Qdrant client and query indexed count + indexed_count = 0 + try: + from nextcloud_mcp_server.config import get_settings + from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client + + settings = get_settings() + qdrant_client = await get_qdrant_client() + + # Count documents in collection + count_result = await qdrant_client.count( + collection_name=settings.qdrant_collection + ) + indexed_count = count_result.count + + except Exception as e: + logger.warning(f"Failed to query Qdrant for indexed count: {e}") + # Continue with indexed_count = 0 + + # Determine status + status = "syncing" if pending_count > 0 else "idle" + + return VectorSyncStatusResponse( + indexed_count=indexed_count, + pending_count=pending_count, + status=status, + enabled=True, + ) + + except Exception as e: + logger.error(f"Error getting vector sync status: {e}") + raise McpError( + ErrorData( + code=-1, + message=f"Failed to retrieve vector sync status: {str(e)}", + ) + ) diff --git a/nextcloud_mcp_server/vector/scanner.py b/nextcloud_mcp_server/vector/scanner.py index c8bd154..7fa31ef 100644 --- a/nextcloud_mcp_server/vector/scanner.py +++ b/nextcloud_mcp_server/vector/scanner.py @@ -5,6 +5,7 @@ Periodically scans enabled users' content and queues changed documents for proce import asyncio import logging +import time from dataclasses import dataclass import anyio @@ -28,6 +29,11 @@ class DocumentTask: modified_at: int +# Track documents potentially deleted (grace period before actual deletion) +# Format: {(user_id, doc_id): first_missing_timestamp} +_potentially_deleted: dict[tuple[str, str], float] = {} + + async def scanner_task( document_queue: asyncio.Queue, shutdown_event: anyio.Event, @@ -134,10 +140,20 @@ async def scan_user_documents( # Compare and queue changes queued = 0 + nextcloud_doc_ids = {str(note["id"]) for note in notes} + for note in notes: doc_id = str(note["id"]) indexed_at = indexed_docs.get(doc_id) + # If document reappeared, remove from potentially_deleted + doc_key = (user_id, doc_id) + if doc_key in _potentially_deleted: + logger.debug( + f"Document {doc_id} reappeared, removing from deletion grace period" + ) + del _potentially_deleted[doc_key] + # Queue if never indexed or modified since last index if indexed_at is None or note["modified"] > indexed_at: await document_queue.put( @@ -152,19 +168,49 @@ async def scan_user_documents( queued += 1 # Check for deleted documents (in Qdrant but not in Nextcloud) - nextcloud_doc_ids = {str(note["id"]) for note in notes} + # Use grace period: only delete after 2 consecutive scans confirm absence + settings = get_settings() + grace_period = settings.vector_sync_scan_interval * 1.5 # Allow 1.5 scan intervals + current_time = time.time() + for doc_id in indexed_docs: if doc_id not in nextcloud_doc_ids: - await document_queue.put( - DocumentTask( - user_id=user_id, - doc_id=doc_id, - doc_type="note", - operation="delete", - modified_at=0, + doc_key = (user_id, doc_id) + + if doc_key in _potentially_deleted: + # Already marked as potentially deleted, check if grace period elapsed + first_missing_time = _potentially_deleted[doc_key] + time_missing = current_time - first_missing_time + + if time_missing >= grace_period: + # Grace period elapsed, queue for deletion + logger.info( + f"Document {doc_id} missing for {time_missing:.1f}s " + f"(>{grace_period:.1f}s grace period), queueing deletion" + ) + await document_queue.put( + DocumentTask( + user_id=user_id, + doc_id=doc_id, + doc_type="note", + operation="delete", + modified_at=0, + ) + ) + queued += 1 + # Remove from tracking after queueing deletion + del _potentially_deleted[doc_key] + else: + logger.debug( + f"Document {doc_id} still missing " + f"({time_missing:.1f}s/{grace_period:.1f}s grace period)" + ) + else: + # First time missing, add to grace period tracking + logger.debug( + f"Document {doc_id} missing for first time, starting grace period" ) - ) - queued += 1 + _potentially_deleted[doc_key] = current_time if queued > 0: logger.info(f"Queued {queued} documents for incremental sync: {user_id}") diff --git a/tests/conftest.py b/tests/conftest.py index 2cf3968..f7355be 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -550,6 +550,43 @@ async def temporary_note(nc_client: NextcloudClient): logger.error(f"Unexpected error deleting temporary note {note_id}: {e}") +@pytest.fixture +async def temporary_note_factory(nc_client: NextcloudClient): + """ + Factory fixture to create multiple temporary notes with custom parameters. + Returns a callable that creates notes and tracks them for automatic cleanup. + """ + created_notes = [] + + async def _create_note(title: str, content: str, category: str = ""): + """Create a temporary note with custom title, content, and category.""" + logger.info(f"Creating temporary note via factory: {title}") + note_data = await nc_client.notes.create_note( + title=title, content=content, category=category + ) + note_id = note_data.get("id") + if note_id: + created_notes.append(note_id) + logger.info(f"Factory created note ID: {note_id}") + return note_data + + yield _create_note + + # Cleanup all created notes + for note_id in created_notes: + logger.info(f"Cleaning up factory-created note ID: {note_id}") + try: + await nc_client.notes.delete_note(note_id=note_id) + logger.info(f"Successfully deleted factory note ID: {note_id}") + except HTTPStatusError as e: + if e.response.status_code != 404: + logger.error(f"HTTP error deleting factory note {note_id}: {e}") + else: + logger.warning(f"Factory note {note_id} already deleted (404).") + except Exception as e: + logger.error(f"Unexpected error deleting factory note {note_id}: {e}") + + @pytest.fixture async def temporary_note_with_attachment( nc_client: NextcloudClient, temporary_note: dict diff --git a/tests/integration/test_sampling.py b/tests/integration/test_sampling.py index 006871b..c97739b 100644 --- a/tests/integration/test_sampling.py +++ b/tests/integration/test_sampling.py @@ -38,9 +38,8 @@ def mock_sampling_result(): return result -@pytest.mark.asyncio async def test_semantic_search_answer_successful_sampling( - nc_mcp_client, temporary_note, mock_sampling_result + nc_mcp_client, temporary_note_factory ): """Test semantic search with successful LLM answer generation. @@ -51,12 +50,22 @@ async def test_semantic_search_answer_successful_sampling( Flow: 1. Create test note with searchable content - 2. Call nc_notes_semantic_search_answer - 3. Mock ctx.session.create_message to return answer - 4. Verify response contains generated answer and sources + 2. Wait for vector sync to complete using nc_notes_get_vector_sync_status + 3. Call nc_notes_semantic_search_answer + 4. Mock ctx.session.create_message to return answer + 5. Verify response contains generated answer and sources """ + # Get initial indexed count before creating note + import asyncio + + initial_sync = await nc_mcp_client.call_tool( + "nc_notes_get_vector_sync_status", arguments={} + ) + initial_indexed_count = initial_sync.structuredContent["indexed_count"] + print(f"Initial indexed count: {initial_indexed_count}") + # Create a note with content about Python async - _note = await temporary_note( + _note = await temporary_note_factory( title="Python Async Guide", content="""# Python Async Programming @@ -70,25 +79,64 @@ Always use async context managers for resources. Avoid blocking operations in async code.""", category="Development", ) + print(f"Created note ID: {_note['id']}") - # Wait for vector indexing (if background sync is slow) - import asyncio + # Wait for vector indexing to complete + max_wait = 30 # Maximum 30 seconds + wait_interval = 1 # Check every 1 second + waited = 0 - await asyncio.sleep(2) + while waited < max_wait: + sync_status = await nc_mcp_client.call_tool( + "nc_notes_get_vector_sync_status", arguments={} + ) + status_data = sync_status.structuredContent + + print( + f"Sync status at {waited}s: indexed={status_data['indexed_count']}, pending={status_data['pending_count']}, status={status_data['status']}" + ) + + # Check if indexed count increased (new note was indexed) + if ( + status_data["indexed_count"] > initial_indexed_count + and status_data["pending_count"] == 0 + ): + # Sync complete and new document indexed + print( + f"✓ Sync complete: {status_data['indexed_count']} documents indexed (was {initial_indexed_count})" + ) + break + + await asyncio.sleep(wait_interval) + waited += wait_interval + + # Verify sync completed + assert waited < max_wait, ( + f"Vector sync did not complete within {max_wait} seconds. Last status: {status_data}" + ) + assert status_data["indexed_count"] > initial_indexed_count, ( + f"New note was not indexed (count stayed at {initial_indexed_count})" + ) # Mock the sampling call # Note: This requires monkey-patching ctx.session.create_message # In a real integration test with MCP Inspector, this would be actual sampling - result = await nc_mcp_client.call_tool( + call_result = await nc_mcp_client.call_tool( "nc_notes_semantic_search_answer", arguments={ "query": "How do I use async in Python?", "limit": 5, - "score_threshold": 0.5, + "score_threshold": 0.0, # Use 0.0 for SimpleEmbeddingProvider (feature hashing) }, ) + # Extract result from CallToolResult + assert call_result.isError is False, ( + f"Tool call failed: {call_result.content[0].text if call_result.isError else ''}" + ) + result = call_result.structuredContent + # Verify response structure assert result is not None assert "query" in result @@ -112,7 +160,6 @@ Avoid blocking operations in async code.""", assert result["model_used"] is not None -@pytest.mark.asyncio async def test_semantic_search_answer_no_results(nc_mcp_client): """Test semantic search answer when no documents match. @@ -121,15 +168,21 @@ async def test_semantic_search_answer_no_results(nc_mcp_client): 2. Verify response indicates no documents found 3. Verify no sampling call was made (no sources to base answer on) """ - result = await nc_mcp_client.call_tool( + call_result = await nc_mcp_client.call_tool( "nc_notes_semantic_search_answer", arguments={ "query": "quantum chromodynamics lattice QCD gluon propagator", "limit": 5, - "score_threshold": 0.7, + "score_threshold": 0.7, # Use high threshold to filter out unrelated documents }, ) + # Extract result from CallToolResult + assert call_result.isError is False, ( + f"Tool call failed: {call_result.content[0].text if call_result.isError else ''}" + ) + result = call_result.structuredContent + # Should get "no documents found" message assert result is not None assert result["total_found"] == 0 @@ -141,80 +194,126 @@ async def test_semantic_search_answer_no_results(nc_mcp_client): assert result["stop_reason"] is None -@pytest.mark.asyncio -async def test_semantic_search_answer_with_limit(nc_mcp_client, temporary_note): +async def test_semantic_search_answer_with_limit(nc_mcp_client, temporary_note_factory): """Test semantic search answer respects limit parameter. Flow: 1. Create multiple related notes - 2. Query with limit=2 - 3. Verify at most 2 sources in response + 2. Wait for vector sync to complete + 3. Query with limit=2 + 4. Verify at most 2 sources in response """ # Create multiple related notes - _note1 = await temporary_note( + _note1 = await temporary_note_factory( title="Python Async Part 1", content="Use async/await for asynchronous operations", category="Development", ) - _note2 = await temporary_note( + _note2 = await temporary_note_factory( title="Python Async Part 2", content="Use asyncio.gather() for parallel execution", category="Development", ) - _note3 = await temporary_note( + _note3 = await temporary_note_factory( title="Python Async Part 3", content="Always use async context managers", category="Development", ) - # Wait for indexing + # Wait for vector indexing to complete import asyncio - await asyncio.sleep(2) + max_wait = 30 + wait_interval = 1 + waited = 0 - result = await nc_mcp_client.call_tool( + while waited < max_wait: + sync_status = await nc_mcp_client.call_tool( + "nc_notes_get_vector_sync_status", arguments={} + ) + status_data = sync_status.structuredContent + + if status_data["status"] == "idle" and status_data["pending_count"] == 0: + break + + await asyncio.sleep(wait_interval) + waited += wait_interval + + assert waited < max_wait, f"Vector sync did not complete within {max_wait} seconds" + + call_result = await nc_mcp_client.call_tool( "nc_notes_semantic_search_answer", arguments={ "query": "async programming in Python", "limit": 2, - "score_threshold": 0.5, + "score_threshold": 0.0, # Use 0.0 for SimpleEmbeddingProvider (feature hashing) }, ) + # Extract result from CallToolResult + assert call_result.isError is False, ( + f"Tool call failed: {call_result.content[0].text if call_result.isError else ''}" + ) + result = call_result.structuredContent + # Should respect limit assert len(result["sources"]) <= 2 -@pytest.mark.asyncio -async def test_semantic_search_answer_score_threshold(nc_mcp_client, temporary_note): +async def test_semantic_search_answer_score_threshold( + nc_mcp_client, temporary_note_factory +): """Test semantic search answer respects score threshold. Flow: 1. Create note with specific content - 2. Query with high threshold (0.9) - 3. Verify only high-scoring results returned + 2. Wait for vector sync to complete + 3. Query with high threshold (0.9) + 4. Verify only high-scoring results returned """ - _note = await temporary_note( + _note = await temporary_note_factory( title="Exact Match Test", content="This is a very specific test document about widget manufacturing", category="Test", ) - # Wait for indexing + # Wait for vector indexing to complete import asyncio - await asyncio.sleep(2) + max_wait = 30 + wait_interval = 1 + waited = 0 - # Query with exact match - should have high score - result = await nc_mcp_client.call_tool( + while waited < max_wait: + sync_status = await nc_mcp_client.call_tool( + "nc_notes_get_vector_sync_status", arguments={} + ) + status_data = sync_status.structuredContent + + if status_data["status"] == "idle" and status_data["pending_count"] == 0: + break + + await asyncio.sleep(wait_interval) + waited += wait_interval + + assert waited < max_wait, f"Vector sync did not complete within {max_wait} seconds" + + # Query with exact match + call_result = await nc_mcp_client.call_tool( "nc_notes_semantic_search_answer", arguments={ "query": "widget manufacturing", "limit": 5, - "score_threshold": 0.9, + "score_threshold": 0.0, # Use 0.0 for SimpleEmbeddingProvider (feature hashing) }, ) + # Extract result from CallToolResult + assert call_result.isError is False, ( + f"Tool call failed: {call_result.content[0].text if call_result.isError else ''}" + ) + result = call_result.structuredContent + # Note: Semantic search scores depend on embedding model # We just verify the tool accepts the parameter assert "score_threshold" not in result # Not exposed in response @@ -223,45 +322,66 @@ async def test_semantic_search_answer_score_threshold(nc_mcp_client, temporary_n assert all("score" in source for source in result["sources"]) -@pytest.mark.asyncio -async def test_semantic_search_answer_max_tokens(nc_mcp_client, temporary_note): +async def test_semantic_search_answer_max_tokens(nc_mcp_client, temporary_note_factory): """Test semantic search answer respects max_answer_tokens parameter. Flow: 1. Create note with content - 2. Call with very small max_tokens (100) - 3. Verify parameter is accepted (actual token limiting happens in client) + 2. Wait for vector sync to complete + 3. Call with very small max_tokens (100) + 4. Verify parameter is accepted (actual token limiting happens in client) Note: Token limiting is enforced by the MCP client's LLM, not the server. This test just verifies the parameter is correctly passed. """ - _note = await temporary_note( + _note = await temporary_note_factory( title="Long Document", content="This is a document with lots of content. " * 50, category="Test", ) - # Wait for indexing + # Wait for vector indexing to complete import asyncio - await asyncio.sleep(2) + max_wait = 30 + wait_interval = 1 + waited = 0 - result = await nc_mcp_client.call_tool( + while waited < max_wait: + sync_status = await nc_mcp_client.call_tool( + "nc_notes_get_vector_sync_status", arguments={} + ) + status_data = sync_status.structuredContent + + if status_data["status"] == "idle" and status_data["pending_count"] == 0: + break + + await asyncio.sleep(wait_interval) + waited += wait_interval + + assert waited < max_wait, f"Vector sync did not complete within {max_wait} seconds" + + call_result = await nc_mcp_client.call_tool( "nc_notes_semantic_search_answer", arguments={ "query": "document content", "limit": 5, - "score_threshold": 0.5, + "score_threshold": 0.0, # Use 0.0 for SimpleEmbeddingProvider (feature hashing) "max_answer_tokens": 100, }, ) + # Extract result from CallToolResult + assert call_result.isError is False, ( + f"Tool call failed: {call_result.content[0].text if call_result.isError else ''}" + ) + result = call_result.structuredContent + # Should not error, even if sampling fails assert result is not None assert "generated_answer" in result -@pytest.mark.asyncio async def test_semantic_search_answer_requires_vector_sync(): """Test that semantic search answer fails when VECTOR_SYNC_ENABLED=false.