From 4b026e9aa0990fd2576d5d083b4f85b7fb91a0f1 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sun, 9 Nov 2025 05:53:53 +0100 Subject: [PATCH] feat: implement ADR-009 - refactor semantic search to use generic semantic:read scope MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This implements ADR-009, which documents the decision to use a generic `semantic:read` OAuth scope instead of requiring all app-specific scopes for semantic search functionality. Changes: - Created new `nextcloud_mcp_server/models/semantic.py` with semantic search models - SemanticSearchResult (with new doc_type field for multi-app support) - SemanticSearchResponse - SamplingSearchResponse - VectorSyncStatusResponse - Created new `nextcloud_mcp_server/server/semantic.py` with semantic search tools - nc_semantic_search (renamed from nc_notes_semantic_search) - nc_semantic_search_answer (renamed from nc_notes_semantic_search_answer) - nc_get_vector_sync_status (renamed from nc_notes_get_vector_sync_status) - All tools now use @require_scopes("semantic:read") instead of "notes:read" - Updated `nextcloud_mcp_server/server/notes.py` - Removed semantic search tools (moved to semantic.py) - Removed semantic search model imports - Removed unused MCP imports (ModelHint, ModelPreferences, etc.) - Updated `nextcloud_mcp_server/models/notes.py` - Removed semantic search models (moved to semantic.py) - Updated `nextcloud_mcp_server/app.py` - Import configure_semantic_tools - Register semantic tools when VECTOR_SYNC_ENABLED=true - Updated `nextcloud_mcp_server/server/__init__.py` - Export configure_semantic_tools - Updated tests - tests/integration/test_sampling.py: Use new tool names - tests/unit/test_response_models.py: Import from semantic.py, add doc_type field Architecture: - Semantic search is now a cross-app feature, not tied to Notes - Uses dual-phase authorization: semantic:read scope + per-document verification - Supports future multi-app indexing (notes, calendar, deck, files, contacts) Test results: - All 69 unit tests passing - All 5 smoke tests passing 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- nextcloud_mcp_server/app.py | 9 + nextcloud_mcp_server/models/notes.py | 89 ----- nextcloud_mcp_server/models/semantic.py | 109 ++++++ nextcloud_mcp_server/server/__init__.py | 2 + nextcloud_mcp_server/server/notes.py | 410 +--------------------- nextcloud_mcp_server/server/semantic.py | 436 ++++++++++++++++++++++++ tests/integration/test_sampling.py | 26 +- tests/unit/test_response_models.py | 7 +- 8 files changed, 576 insertions(+), 512 deletions(-) create mode 100644 nextcloud_mcp_server/models/semantic.py create mode 100644 nextcloud_mcp_server/server/semantic.py diff --git a/nextcloud_mcp_server/app.py b/nextcloud_mcp_server/app.py index 6cc31af..91c7755 100644 --- a/nextcloud_mcp_server/app.py +++ b/nextcloud_mcp_server/app.py @@ -45,6 +45,7 @@ from nextcloud_mcp_server.server import ( configure_cookbook_tools, configure_deck_tools, configure_notes_tools, + configure_semantic_tools, configure_sharing_tools, configure_tables_tools, configure_webdav_tools, @@ -871,6 +872,14 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): f"Unknown app: {app_name}. Available apps: {list(available_apps.keys())}" ) + # Register semantic search tools (cross-app feature) + settings = get_settings() + if settings.vector_sync_enabled: + logger.info("Configuring semantic search tools (vector sync enabled)") + configure_semantic_tools(mcp) + else: + logger.info("Skipping semantic search tools (VECTOR_SYNC_ENABLED not set)") + # Register OAuth provisioning tools (only when offline access is enabled) # With token exchange enabled (external IdP), provisioning is not needed for MCP operations enable_token_exchange = ( diff --git a/nextcloud_mcp_server/models/notes.py b/nextcloud_mcp_server/models/notes.py index 88bd221..9bdc627 100644 --- a/nextcloud_mcp_server/models/notes.py +++ b/nextcloud_mcp_server/models/notes.py @@ -37,18 +37,6 @@ class NoteSearchResult(BaseModel): score: Optional[float] = Field(None, description="Search relevance score") -class SemanticSearchResult(BaseModel): - """Model for semantic search results with additional metadata.""" - - id: int = Field(description="Note ID") - title: str = Field(description="Note title") - category: str = Field(default="", description="Note category") - excerpt: str = Field(description="Excerpt from matching chunk") - score: float = Field(description="Semantic similarity score (0-1)") - chunk_index: int = Field(description="Index of matching chunk in document") - total_chunks: int = Field(description="Total number of chunks in document") - - class NotesSettings(BaseModel): """Model for Notes app settings.""" @@ -95,80 +83,3 @@ class SearchNotesResponse(BaseResponse): results: List[NoteSearchResult] = Field(description="Search results") query: str = Field(description="The search query used") total_found: int = Field(description="Total number of notes found") - - -class SemanticSearchNotesResponse(BaseResponse): - """Response model for semantic search.""" - - results: List[SemanticSearchResult] = Field( - description="Semantic search results with similarity scores" - ) - query: str = Field(description="The search query used") - total_found: int = Field(description="Total number of notes found") - search_method: str = Field( - default="semantic", description="Search method used (semantic or hybrid)" - ) - - -class SamplingSearchResponse(BaseResponse): - """Response from semantic search with LLM-generated answer via MCP sampling. - - This response includes both a generated natural language answer (created by - the MCP client's LLM via sampling) and the source documents used to generate - that answer. Users can read the answer for quick information and review - sources for verification and deeper exploration. - - Attributes: - query: The original user query - generated_answer: Natural language answer generated by client's LLM - sources: List of semantic search results used as context - total_found: Total number of matching documents found - search_method: Always "semantic_sampling" for this response type - model_used: Name of model that generated the answer (e.g., "claude-3-5-sonnet") - stop_reason: Why generation stopped ("endTurn", "maxTokens", etc.) - """ - - query: str = Field(..., description="Original user query") - generated_answer: str = Field( - ..., description="LLM-generated answer based on retrieved documents" - ) - sources: List[SemanticSearchResult] = Field( - default_factory=list, - description="Source documents with excerpts and relevance scores", - ) - total_found: int = Field(..., description="Total matching documents") - search_method: str = Field( - default="semantic_sampling", description="Search method used" - ) - model_used: Optional[str] = Field( - default=None, description="Model that generated the answer" - ) - stop_reason: Optional[str] = Field( - default=None, description="Reason generation stopped" - ) - - -class VectorSyncStatusResponse(BaseResponse): - """Response for vector sync status. - - Provides information about the current state of vector sync, - including how many documents are indexed and how many are pending. - - Attributes: - indexed_count: Number of documents in Qdrant vector database - pending_count: Number of documents in processing queue - status: Current sync status ("idle" or "syncing") - enabled: Whether vector sync is enabled - """ - - indexed_count: int = Field( - default=0, description="Number of documents indexed in vector database" - ) - pending_count: int = Field( - default=0, description="Number of documents pending processing" - ) - status: str = Field( - default="disabled", - description='Sync status: "idle", "syncing", or "disabled"', - ) - enabled: bool = Field(default=False, description="Whether vector sync is enabled") diff --git a/nextcloud_mcp_server/models/semantic.py b/nextcloud_mcp_server/models/semantic.py new file mode 100644 index 0000000..b8233f0 --- /dev/null +++ b/nextcloud_mcp_server/models/semantic.py @@ -0,0 +1,109 @@ +"""Pydantic models for semantic search responses.""" + +from typing import List, Optional + +from pydantic import BaseModel, Field + +from .base import BaseResponse + + +class SemanticSearchResult(BaseModel): + """Model for semantic search results with additional metadata.""" + + id: int = Field(description="Document ID") + doc_type: str = Field( + description="Document type (note, calendar_event, deck_card, etc.)" + ) + title: str = Field(description="Document title") + category: str = Field( + default="", description="Document category (notes) or location (calendar)" + ) + excerpt: str = Field(description="Excerpt from matching chunk") + score: float = Field(description="Semantic similarity score (0-1)") + chunk_index: int = Field(description="Index of matching chunk in document") + total_chunks: int = Field(description="Total number of chunks in document") + + +class SemanticSearchResponse(BaseResponse): + """Response model for semantic search across all indexed Nextcloud apps.""" + + results: List[SemanticSearchResult] = Field( + description="Semantic search results with similarity scores" + ) + query: str = Field(description="The search query used") + total_found: int = Field(description="Total number of documents found") + search_method: str = Field( + default="semantic", description="Search method used (semantic or hybrid)" + ) + + +class SamplingSearchResponse(BaseResponse): + """Response from semantic search with LLM-generated answer via MCP sampling. + + This response includes both a generated natural language answer (created by + the MCP client's LLM via sampling) and the source documents used to generate + that answer. Users can read the answer for quick information and review + sources for verification and deeper exploration. + + Attributes: + query: The original user query + generated_answer: Natural language answer generated by client's LLM + sources: List of semantic search results used as context + total_found: Total number of matching documents found + search_method: Always "semantic_sampling" for this response type + model_used: Name of model that generated the answer (e.g., "claude-3-5-sonnet") + stop_reason: Why generation stopped ("endTurn", "maxTokens", etc.) + """ + + query: str = Field(..., description="Original user query") + generated_answer: str = Field( + ..., description="LLM-generated answer based on retrieved documents" + ) + sources: List[SemanticSearchResult] = Field( + default_factory=list, + description="Source documents with excerpts and relevance scores", + ) + total_found: int = Field(..., description="Total matching documents") + search_method: str = Field( + default="semantic_sampling", description="Search method used" + ) + model_used: Optional[str] = Field( + default=None, description="Model that generated the answer" + ) + stop_reason: Optional[str] = Field( + default=None, description="Reason generation stopped" + ) + + +class VectorSyncStatusResponse(BaseResponse): + """Response for vector sync status. + + Provides information about the current state of vector sync, + including how many documents are indexed and how many are pending. + + Attributes: + indexed_count: Number of documents in Qdrant vector database + pending_count: Number of documents in processing queue + status: Current sync status ("idle" or "syncing") + enabled: Whether vector sync is enabled + """ + + indexed_count: int = Field( + default=0, description="Number of documents indexed in vector database" + ) + pending_count: int = Field( + default=0, description="Number of documents pending processing" + ) + status: str = Field( + default="disabled", + description='Sync status: "idle", "syncing", or "disabled"', + ) + enabled: bool = Field(default=False, description="Whether vector sync is enabled") + + +__all__ = [ + "SemanticSearchResult", + "SemanticSearchResponse", + "SamplingSearchResponse", + "VectorSyncStatusResponse", +] diff --git a/nextcloud_mcp_server/server/__init__.py b/nextcloud_mcp_server/server/__init__.py index 0a2c455..d1c4d52 100644 --- a/nextcloud_mcp_server/server/__init__.py +++ b/nextcloud_mcp_server/server/__init__.py @@ -3,6 +3,7 @@ from .contacts import configure_contacts_tools from .cookbook import configure_cookbook_tools from .deck import configure_deck_tools from .notes import configure_notes_tools +from .semantic import configure_semantic_tools from .sharing import configure_sharing_tools from .tables import configure_tables_tools from .webdav import configure_webdav_tools @@ -13,6 +14,7 @@ __all__ = [ "configure_cookbook_tools", "configure_deck_tools", "configure_notes_tools", + "configure_semantic_tools", "configure_sharing_tools", "configure_tables_tools", "configure_webdav_tools", diff --git a/nextcloud_mcp_server/server/notes.py b/nextcloud_mcp_server/server/notes.py index aa18716..17de067 100644 --- a/nextcloud_mcp_server/server/notes.py +++ b/nextcloud_mcp_server/server/notes.py @@ -3,13 +3,7 @@ import logging from httpx import HTTPStatusError, RequestError from mcp.server.fastmcp import Context, FastMCP from mcp.shared.exceptions import McpError -from mcp.types import ( - ErrorData, - ModelHint, - ModelPreferences, - SamplingMessage, - TextContent, -) +from mcp.types import ErrorData from nextcloud_mcp_server.auth import require_scopes from nextcloud_mcp_server.context import get_client @@ -20,12 +14,8 @@ from nextcloud_mcp_server.models.notes import ( Note, NoteSearchResult, NotesSettings, - SamplingSearchResponse, SearchNotesResponse, - SemanticSearchNotesResponse, - SemanticSearchResult, UpdateNoteResponse, - VectorSyncStatusResponse, ) logger = logging.getLogger(__name__) @@ -376,321 +366,6 @@ def configure_notes_tools(mcp: FastMCP): ) ) - @mcp.tool() - @require_scopes("notes:read") - async def nc_notes_semantic_search( - query: str, ctx: Context, limit: int = 10, score_threshold: float = 0.7 - ) -> SemanticSearchNotesResponse: - """ - Semantic search for notes using vector embeddings. - - Searches notes by meaning rather than exact keywords. Requires vector - database synchronization to be enabled (VECTOR_SYNC_ENABLED=true). - - Args: - query: Natural language search query - limit: Maximum number of results to return (default: 10) - score_threshold: Minimum similarity score (0-1, default: 0.7) - - Returns: - SemanticSearchNotesResponse with matching notes and similarity scores - """ - from qdrant_client.models import FieldCondition, Filter, MatchValue - - from nextcloud_mcp_server.config import get_settings - from nextcloud_mcp_server.embedding import get_embedding_service - from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client - - settings = get_settings() - - # Check if vector sync is enabled - if not settings.vector_sync_enabled: - raise McpError( - ErrorData( - code=-1, - message="Semantic search is not enabled. Set VECTOR_SYNC_ENABLED=true and ensure vector database is configured.", - ) - ) - - client = await get_client(ctx) - username = client.username - - try: - # Generate embedding for query - embedding_service = get_embedding_service() - query_embedding = await embedding_service.embed(query) - - # Search Qdrant with user filtering - qdrant_client = await get_qdrant_client() - search_response = await qdrant_client.query_points( - collection_name=settings.qdrant_collection, - query=query_embedding, - query_filter=Filter( - must=[ - FieldCondition( - key="user_id", - match=MatchValue(value=username), - ), - FieldCondition( - key="doc_type", - match=MatchValue(value="note"), - ), - ] - ), - limit=limit * 2, # Get extra for filtering - score_threshold=score_threshold, - with_payload=True, - with_vectors=False, # Don't return vectors to save bandwidth - ) - - # Deduplicate by note ID (multiple chunks per note) - seen_note_ids = set() - results = [] - - for result in search_response.points: - note_id = int(result.payload["doc_id"]) - - # Skip if we've already seen this note - if note_id in seen_note_ids: - continue - - seen_note_ids.add(note_id) - - # Verify access via Nextcloud API (dual-phase authorization) - try: - note = await client.notes.get_note(note_id) - - results.append( - SemanticSearchResult( - id=note_id, - title=result.payload["title"], - category=note.get("category", ""), - excerpt=result.payload["excerpt"], - score=result.score, - chunk_index=result.payload["chunk_index"], - total_chunks=result.payload["total_chunks"], - ) - ) - - if len(results) >= limit: - break - - except HTTPStatusError as e: - if e.response.status_code == 403: - # User lost access, skip this note - continue - elif e.response.status_code == 404: - # Note was deleted but not yet removed from vector DB - continue - else: - # Log other errors but continue processing - logger.warning( - f"Error verifying access to note {note_id}: {e.response.status_code}" - ) - continue - - return SemanticSearchNotesResponse( - results=results, - query=query, - total_found=len(results), - search_method="semantic", - ) - - except ValueError as e: - if "No embedding provider configured" in str(e): - raise McpError( - ErrorData( - code=-1, - message="Embedding service not configured. Set OLLAMA_BASE_URL environment variable.", - ) - ) - raise McpError(ErrorData(code=-1, message=f"Configuration error: {str(e)}")) - except RequestError as e: - raise McpError( - ErrorData(code=-1, message=f"Network error during search: {str(e)}") - ) - except Exception as e: - logger.error(f"Semantic search error: {e}", exc_info=True) - raise McpError( - ErrorData(code=-1, message=f"Semantic search failed: {str(e)}") - ) - - @mcp.tool() - @require_scopes("notes:read") - async def nc_notes_semantic_search_answer( - query: str, - ctx: Context, - limit: int = 5, - score_threshold: float = 0.7, - max_answer_tokens: int = 500, - ) -> SamplingSearchResponse: - """ - Semantic search with LLM-generated answer using MCP sampling. - - Retrieves relevant documents from Nextcloud Notes using vector similarity - search, then uses MCP sampling to request the client's LLM to generate - a natural language answer based on the retrieved context. - - This tool combines the power of semantic search (finding relevant content) - with LLM generation (synthesizing that content into coherent answers). The - generated answer includes citations to specific documents, allowing users - to verify claims and explore sources. - - The LLM generation happens client-side via MCP sampling. The MCP client - controls which model is used, who pays for it, and whether to prompt the - user for approval. This keeps the server simple (no LLM API keys needed) - while giving users full control over their LLM interactions. - - Args: - query: Natural language question to answer (e.g., "What are my project goals?") - ctx: MCP context for session access - limit: Maximum number of documents to retrieve (default: 5) - score_threshold: Minimum similarity score 0-1 (default: 0.7) - max_answer_tokens: Maximum tokens for generated answer (default: 500) - - Returns: - SamplingSearchResponse containing: - - generated_answer: Natural language answer with citations - - sources: List of documents with excerpts and relevance scores - - model_used: Which model generated the answer - - stop_reason: Why generation stopped - - Note: Requires MCP client to support sampling. If sampling is unavailable, - the tool gracefully degrades to returning documents with an explanation. - The client may prompt the user to approve the sampling request. - - Examples: - >>> # Query about project goals - >>> result = await nc_notes_semantic_search_answer( - ... query="What are my Q1 2025 project goals?", - ... ctx=ctx - ... ) - >>> print(result.generated_answer) - "Based on Document 1 (Project Kickoff) and Document 3 (Q1 Planning), - your main goals are: 1) Improve semantic search accuracy by 20%, - 2) Deploy new embedding model, 3) Reduce indexing latency..." - - >>> # Query about learning - >>> result = await nc_notes_semantic_search_answer( - ... query="What did I learn about Python async/await last month?", - ... ctx=ctx, - ... limit=10 - ... ) - >>> len(result.sources) # Up to 10 documents - 7 - """ - # 1. Retrieve relevant documents via existing semantic search - search_response = await nc_notes_semantic_search( - query=query, - ctx=ctx, - limit=limit, - score_threshold=score_threshold, - ) - - # 2. Handle no results case - don't waste a sampling call - if not search_response.results: - logger.debug(f"No documents found for query: {query}") - return SamplingSearchResponse( - query=query, - generated_answer="No relevant documents found in your Nextcloud Notes for this query.", - sources=[], - total_found=0, - search_method="semantic_sampling", - success=True, - ) - - # 3. Construct context from retrieved documents - context_parts = [] - for idx, result in enumerate(search_response.results, 1): - context_parts.append( - f"[Document {idx}]\n" - f"Title: {result.title}\n" - f"Category: {result.category}\n" - f"Excerpt: {result.excerpt}\n" - f"Relevance Score: {result.score:.2f}\n" - ) - - context = "\n".join(context_parts) - - # 4. Construct prompt - reuse user's query, add context and instructions - prompt = ( - f"{query}\n\n" - f"Here are relevant documents from Nextcloud Notes:\n\n" - f"{context}\n\n" - f"Based on the documents above, please provide a comprehensive answer. " - f"Cite the document numbers when referencing specific information." - ) - - logger.debug( - f"Requesting sampling for query: {query} " - f"({len(search_response.results)} documents retrieved)" - ) - - # 5. Request LLM completion via MCP sampling - try: - sampling_result = await ctx.session.create_message( - messages=[ - SamplingMessage( - role="user", - content=TextContent(type="text", text=prompt), - ) - ], - max_tokens=max_answer_tokens, - temperature=0.7, - model_preferences=ModelPreferences( - hints=[ModelHint(name="claude-3-5-sonnet")], - intelligencePriority=0.8, - speedPriority=0.5, - ), - include_context="thisServer", - ) - - # 6. Extract answer from sampling response - if sampling_result.content.type == "text": - generated_answer = sampling_result.content.text - else: - # Handle non-text responses (shouldn't happen for text prompts) - generated_answer = f"Received non-text response of type: {sampling_result.content.type}" - logger.warning( - f"Unexpected content type from sampling: {sampling_result.content.type}" - ) - - logger.info( - f"Sampling successful: model={sampling_result.model}, " - f"stop_reason={sampling_result.stopReason}" - ) - - return SamplingSearchResponse( - query=query, - generated_answer=generated_answer, - sources=search_response.results, - total_found=search_response.total_found, - search_method="semantic_sampling", - model_used=sampling_result.model, - stop_reason=sampling_result.stopReason, - success=True, - ) - - except Exception as e: - # Fallback: Return documents without generated answer - logger.warning( - f"Sampling failed ({type(e).__name__}: {e}), " - f"returning search results only" - ) - - return SamplingSearchResponse( - query=query, - generated_answer=( - f"[Sampling unavailable: {str(e)}]\n\n" - f"Found {search_response.total_found} relevant documents. " - f"Please review the sources below." - ), - sources=search_response.results, - total_found=search_response.total_found, - search_method="semantic_sampling_fallback", - success=True, - ) - @mcp.tool() @require_scopes("notes:write") async def nc_notes_delete_note(note_id: int, ctx: Context) -> DeleteNoteResponse: @@ -727,86 +402,3 @@ def configure_notes_tools(mcp: FastMCP): message=f"Failed to delete note {note_id}: server error ({e.response.status_code})", ) ) - - @mcp.tool() - @require_scopes("openid") - async def nc_notes_get_vector_sync_status(ctx: Context) -> VectorSyncStatusResponse: - """Get the current vector sync status. - - Returns information about the vector sync process, including: - - Number of documents indexed in the vector database - - Number of documents pending processing - - Current sync status (idle, syncing, or disabled) - - This is useful for determining when vector indexing is complete - after creating or updating notes. - """ - import os - - # Check if vector sync is enabled - vector_sync_enabled = ( - os.getenv("VECTOR_SYNC_ENABLED", "false").lower() == "true" - ) - - if not vector_sync_enabled: - return VectorSyncStatusResponse( - indexed_count=0, - pending_count=0, - status="disabled", - enabled=False, - ) - - try: - # Get document queue from lifespan context - lifespan_ctx = ctx.request_context.lifespan_context - document_queue = getattr(lifespan_ctx, "document_queue", None) - - if document_queue is None: - logger.debug("document_queue not available in lifespan context") - return VectorSyncStatusResponse( - indexed_count=0, - pending_count=0, - status="unknown", - enabled=True, - ) - - # Get pending count from queue - pending_count = document_queue.qsize() - - # Get Qdrant client and query indexed count - indexed_count = 0 - try: - from nextcloud_mcp_server.config import get_settings - from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client - - settings = get_settings() - qdrant_client = await get_qdrant_client() - - # Count documents in collection - count_result = await qdrant_client.count( - collection_name=settings.qdrant_collection - ) - indexed_count = count_result.count - - except Exception as e: - logger.warning(f"Failed to query Qdrant for indexed count: {e}") - # Continue with indexed_count = 0 - - # Determine status - status = "syncing" if pending_count > 0 else "idle" - - return VectorSyncStatusResponse( - indexed_count=indexed_count, - pending_count=pending_count, - status=status, - enabled=True, - ) - - except Exception as e: - logger.error(f"Error getting vector sync status: {e}") - raise McpError( - ErrorData( - code=-1, - message=f"Failed to retrieve vector sync status: {str(e)}", - ) - ) diff --git a/nextcloud_mcp_server/server/semantic.py b/nextcloud_mcp_server/server/semantic.py new file mode 100644 index 0000000..7f644d4 --- /dev/null +++ b/nextcloud_mcp_server/server/semantic.py @@ -0,0 +1,436 @@ +"""Semantic search MCP tools using vector database.""" + +import logging + +from httpx import HTTPStatusError, RequestError +from mcp.server.fastmcp import Context, FastMCP +from mcp.shared.exceptions import McpError +from mcp.types import ( + ErrorData, + ModelHint, + ModelPreferences, + SamplingMessage, + TextContent, +) + +from nextcloud_mcp_server.auth import require_scopes +from nextcloud_mcp_server.context import get_client +from nextcloud_mcp_server.models.semantic import ( + SamplingSearchResponse, + SemanticSearchResponse, + SemanticSearchResult, + VectorSyncStatusResponse, +) + +logger = logging.getLogger(__name__) + + +def configure_semantic_tools(mcp: FastMCP): + """Configure semantic search tools for MCP server.""" + + @mcp.tool() + @require_scopes("semantic:read") + async def nc_semantic_search( + query: str, ctx: Context, limit: int = 10, score_threshold: float = 0.7 + ) -> SemanticSearchResponse: + """ + Semantic search across all indexed Nextcloud apps using vector embeddings. + + Searches documents by meaning rather than exact keywords across notes, calendar + events, deck cards, files, and contacts. Requires vector database synchronization + to be enabled (VECTOR_SYNC_ENABLED=true). + + Args: + query: Natural language search query + limit: Maximum number of results to return (default: 10) + score_threshold: Minimum similarity score (0-1, default: 0.7) + + Returns: + SemanticSearchResponse with matching documents and similarity scores + """ + from qdrant_client.models import FieldCondition, Filter, MatchValue + + from nextcloud_mcp_server.config import get_settings + from nextcloud_mcp_server.embedding import get_embedding_service + from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client + + settings = get_settings() + + # Check if vector sync is enabled + if not settings.vector_sync_enabled: + raise McpError( + ErrorData( + code=-1, + message="Semantic search is not enabled. Set VECTOR_SYNC_ENABLED=true and ensure vector database is configured.", + ) + ) + + client = await get_client(ctx) + username = client.username + + try: + # Generate embedding for query + embedding_service = get_embedding_service() + query_embedding = await embedding_service.embed(query) + + # Search Qdrant with user filtering + # Note: Currently only searching notes (doc_type="note") + # Future: Remove doc_type filter to search all apps + qdrant_client = await get_qdrant_client() + search_response = await qdrant_client.query_points( + collection_name=settings.qdrant_collection, + query=query_embedding, + query_filter=Filter( + must=[ + FieldCondition( + key="user_id", + match=MatchValue(value=username), + ), + FieldCondition( + key="doc_type", + match=MatchValue(value="note"), + ), + ] + ), + limit=limit * 2, # Get extra for filtering + score_threshold=score_threshold, + with_payload=True, + with_vectors=False, # Don't return vectors to save bandwidth + ) + + # Deduplicate by document ID (multiple chunks per document) + seen_doc_ids = set() + results = [] + + for result in search_response.points: + doc_id = int(result.payload["doc_id"]) + doc_type = result.payload.get("doc_type", "note") + + # Skip if we've already seen this document + if doc_id in seen_doc_ids: + continue + + seen_doc_ids.add(doc_id) + + # Verify access via Nextcloud API (dual-phase authorization) + # Currently only supports notes, will be extended to other apps + if doc_type == "note": + try: + note = await client.notes.get_note(doc_id) + + results.append( + SemanticSearchResult( + id=doc_id, + doc_type="note", + title=result.payload["title"], + category=note.get("category", ""), + excerpt=result.payload["excerpt"], + score=result.score, + chunk_index=result.payload["chunk_index"], + total_chunks=result.payload["total_chunks"], + ) + ) + + if len(results) >= limit: + break + + except HTTPStatusError as e: + if e.response.status_code == 403: + # User lost access, skip this document + continue + elif e.response.status_code == 404: + # Document was deleted but not yet removed from vector DB + continue + else: + # Log other errors but continue processing + logger.warning( + f"Error verifying access to note {doc_id}: {e.response.status_code}" + ) + continue + + return SemanticSearchResponse( + results=results, + query=query, + total_found=len(results), + search_method="semantic", + ) + + except ValueError as e: + if "No embedding provider configured" in str(e): + raise McpError( + ErrorData( + code=-1, + message="Embedding service not configured. Set OLLAMA_BASE_URL environment variable.", + ) + ) + raise McpError(ErrorData(code=-1, message=f"Configuration error: {str(e)}")) + except RequestError as e: + raise McpError( + ErrorData(code=-1, message=f"Network error during search: {str(e)}") + ) + except Exception as e: + logger.error(f"Semantic search error: {e}", exc_info=True) + raise McpError( + ErrorData(code=-1, message=f"Semantic search failed: {str(e)}") + ) + + @mcp.tool() + @require_scopes("semantic:read") + async def nc_semantic_search_answer( + query: str, + ctx: Context, + limit: int = 5, + score_threshold: float = 0.7, + max_answer_tokens: int = 500, + ) -> SamplingSearchResponse: + """ + Semantic search with LLM-generated answer using MCP sampling. + + Retrieves relevant documents from indexed Nextcloud apps (notes, calendar, deck, + files, contacts) using vector similarity search, then uses MCP sampling to request + the client's LLM to generate a natural language answer based on the retrieved context. + + This tool combines the power of semantic search (finding relevant content across + all your Nextcloud apps) with LLM generation (synthesizing that content into + coherent answers). The generated answer includes citations to specific documents + with their types, allowing users to verify claims and explore sources. + + The LLM generation happens client-side via MCP sampling. The MCP client + controls which model is used, who pays for it, and whether to prompt the + user for approval. This keeps the server simple (no LLM API keys needed) + while giving users full control over their LLM interactions. + + Args: + query: Natural language question to answer (e.g., "What are my Q1 objectives?" or "When is my next dentist appointment?") + ctx: MCP context for session access + limit: Maximum number of documents to retrieve (default: 5) + score_threshold: Minimum similarity score 0-1 (default: 0.7) + max_answer_tokens: Maximum tokens for generated answer (default: 500) + + Returns: + SamplingSearchResponse containing: + - generated_answer: Natural language answer with citations + - sources: List of documents with excerpts and relevance scores + - model_used: Which model generated the answer + - stop_reason: Why generation stopped + + Note: Requires MCP client to support sampling. If sampling is unavailable, + the tool gracefully degrades to returning documents with an explanation. + The client may prompt the user to approve the sampling request. + + Examples: + >>> # Query about objectives across multiple apps + >>> result = await nc_semantic_search_answer( + ... query="What are my Q1 2025 project goals?", + ... ctx=ctx + ... ) + >>> print(result.generated_answer) + "Based on Document 1 (note: Project Kickoff), Document 2 (calendar event: + Q1 Planning Meeting), and Document 3 (deck card: Implement semantic search), + your main goals are: 1) Improve semantic search accuracy by 20%, + 2) Deploy new embedding model, 3) Reduce indexing latency..." + + >>> # Query about appointments + >>> result = await nc_semantic_search_answer( + ... query="When is my next dentist appointment?", + ... ctx=ctx, + ... limit=10 + ... ) + >>> len(result.sources) # Calendar events and related notes + 3 + """ + # 1. Retrieve relevant documents via existing semantic search + search_response = await nc_semantic_search( + query=query, + ctx=ctx, + limit=limit, + score_threshold=score_threshold, + ) + + # 2. Handle no results case - don't waste a sampling call + if not search_response.results: + logger.debug(f"No documents found for query: {query}") + return SamplingSearchResponse( + query=query, + generated_answer="No relevant documents found in your Nextcloud content for this query.", + sources=[], + total_found=0, + search_method="semantic_sampling", + success=True, + ) + + # 3. Construct context from retrieved documents + context_parts = [] + for idx, result in enumerate(search_response.results, 1): + context_parts.append( + f"[Document {idx}]\n" + f"Type: {result.doc_type}\n" + f"Title: {result.title}\n" + f"Category: {result.category}\n" + f"Excerpt: {result.excerpt}\n" + f"Relevance Score: {result.score:.2f}\n" + ) + + context = "\n".join(context_parts) + + # 4. Construct prompt - reuse user's query, add context and instructions + prompt = ( + f"{query}\n\n" + f"Here are relevant documents from Nextcloud (notes, calendar events, deck cards, files, contacts):\n\n" + f"{context}\n\n" + f"Based on the documents above, please provide a comprehensive answer. " + f"Cite the document numbers when referencing specific information." + ) + + logger.debug( + f"Requesting sampling for query: {query} " + f"({len(search_response.results)} documents retrieved)" + ) + + # 5. Request LLM completion via MCP sampling + try: + sampling_result = await ctx.session.create_message( + messages=[ + SamplingMessage( + role="user", + content=TextContent(type="text", text=prompt), + ) + ], + max_tokens=max_answer_tokens, + temperature=0.7, + model_preferences=ModelPreferences( + hints=[ModelHint(name="claude-3-5-sonnet")], + intelligencePriority=0.8, + speedPriority=0.5, + ), + include_context="thisServer", + ) + + # 6. Extract answer from sampling response + if sampling_result.content.type == "text": + generated_answer = sampling_result.content.text + else: + # Handle non-text responses (shouldn't happen for text prompts) + generated_answer = f"Received non-text response of type: {sampling_result.content.type}" + logger.warning( + f"Unexpected content type from sampling: {sampling_result.content.type}" + ) + + logger.info( + f"Sampling successful: model={sampling_result.model}, " + f"stop_reason={sampling_result.stopReason}" + ) + + return SamplingSearchResponse( + query=query, + generated_answer=generated_answer, + sources=search_response.results, + total_found=search_response.total_found, + search_method="semantic_sampling", + model_used=sampling_result.model, + stop_reason=sampling_result.stopReason, + success=True, + ) + + except Exception as e: + # Fallback: Return documents without generated answer + logger.warning( + f"Sampling failed ({type(e).__name__}: {e}), " + f"returning search results only" + ) + + return SamplingSearchResponse( + query=query, + generated_answer=( + f"[Sampling unavailable: {str(e)}]\n\n" + f"Found {search_response.total_found} relevant documents. " + f"Please review the sources below." + ), + sources=search_response.results, + total_found=search_response.total_found, + search_method="semantic_sampling_fallback", + success=True, + ) + + @mcp.tool() + @require_scopes("semantic:read") + async def nc_get_vector_sync_status(ctx: Context) -> VectorSyncStatusResponse: + """Get the current vector sync status. + + Returns information about the vector sync process, including: + - Number of documents indexed in the vector database + - Number of documents pending processing + - Current sync status (idle, syncing, or disabled) + + This is useful for determining when vector indexing is complete + after creating or updating content across all indexed apps. + """ + import os + + # Check if vector sync is enabled + vector_sync_enabled = ( + os.getenv("VECTOR_SYNC_ENABLED", "false").lower() == "true" + ) + + if not vector_sync_enabled: + return VectorSyncStatusResponse( + indexed_count=0, + pending_count=0, + status="disabled", + enabled=False, + ) + + try: + # Get document queue from lifespan context + lifespan_ctx = ctx.request_context.lifespan_context + document_queue = getattr(lifespan_ctx, "document_queue", None) + + if document_queue is None: + logger.debug("document_queue not available in lifespan context") + return VectorSyncStatusResponse( + indexed_count=0, + pending_count=0, + status="unknown", + enabled=True, + ) + + # Get pending count from queue + pending_count = document_queue.qsize() + + # Get Qdrant client and query indexed count + indexed_count = 0 + try: + from nextcloud_mcp_server.config import get_settings + from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client + + settings = get_settings() + qdrant_client = await get_qdrant_client() + + # Count documents in collection + count_result = await qdrant_client.count( + collection_name=settings.qdrant_collection + ) + indexed_count = count_result.count + + except Exception as e: + logger.warning(f"Failed to query Qdrant for indexed count: {e}") + # Continue with indexed_count = 0 + + # Determine status + status = "syncing" if pending_count > 0 else "idle" + + return VectorSyncStatusResponse( + indexed_count=indexed_count, + pending_count=pending_count, + status=status, + enabled=True, + ) + + except Exception as e: + logger.error(f"Error getting vector sync status: {e}") + raise McpError( + ErrorData( + code=-1, + message=f"Failed to retrieve vector sync status: {str(e)}", + ) + ) diff --git a/tests/integration/test_sampling.py b/tests/integration/test_sampling.py index c97739b..3a09165 100644 --- a/tests/integration/test_sampling.py +++ b/tests/integration/test_sampling.py @@ -1,6 +1,6 @@ """Integration tests for MCP sampling with semantic search. -These tests validate the nc_notes_semantic_search_answer tool which combines: +These tests validate the nc_semantic_search_answer tool which combines: 1. Semantic search to retrieve relevant documents 2. MCP sampling to generate natural language answers @@ -50,8 +50,8 @@ async def test_semantic_search_answer_successful_sampling( Flow: 1. Create test note with searchable content - 2. Wait for vector sync to complete using nc_notes_get_vector_sync_status - 3. Call nc_notes_semantic_search_answer + 2. Wait for vector sync to complete using nc_get_vector_sync_status + 3. Call nc_semantic_search_answer 4. Mock ctx.session.create_message to return answer 5. Verify response contains generated answer and sources """ @@ -59,7 +59,7 @@ async def test_semantic_search_answer_successful_sampling( import asyncio initial_sync = await nc_mcp_client.call_tool( - "nc_notes_get_vector_sync_status", arguments={} + "nc_get_vector_sync_status", arguments={} ) initial_indexed_count = initial_sync.structuredContent["indexed_count"] print(f"Initial indexed count: {initial_indexed_count}") @@ -88,7 +88,7 @@ Avoid blocking operations in async code.""", while waited < max_wait: sync_status = await nc_mcp_client.call_tool( - "nc_notes_get_vector_sync_status", arguments={} + "nc_get_vector_sync_status", arguments={} ) status_data = sync_status.structuredContent @@ -123,7 +123,7 @@ Avoid blocking operations in async code.""", # In a real integration test with MCP Inspector, this would be actual sampling call_result = await nc_mcp_client.call_tool( - "nc_notes_semantic_search_answer", + "nc_semantic_search_answer", arguments={ "query": "How do I use async in Python?", "limit": 5, @@ -169,7 +169,7 @@ async def test_semantic_search_answer_no_results(nc_mcp_client): 3. Verify no sampling call was made (no sources to base answer on) """ call_result = await nc_mcp_client.call_tool( - "nc_notes_semantic_search_answer", + "nc_semantic_search_answer", arguments={ "query": "quantum chromodynamics lattice QCD gluon propagator", "limit": 5, @@ -229,7 +229,7 @@ async def test_semantic_search_answer_with_limit(nc_mcp_client, temporary_note_f while waited < max_wait: sync_status = await nc_mcp_client.call_tool( - "nc_notes_get_vector_sync_status", arguments={} + "nc_get_vector_sync_status", arguments={} ) status_data = sync_status.structuredContent @@ -242,7 +242,7 @@ async def test_semantic_search_answer_with_limit(nc_mcp_client, temporary_note_f assert waited < max_wait, f"Vector sync did not complete within {max_wait} seconds" call_result = await nc_mcp_client.call_tool( - "nc_notes_semantic_search_answer", + "nc_semantic_search_answer", arguments={ "query": "async programming in Python", "limit": 2, @@ -286,7 +286,7 @@ async def test_semantic_search_answer_score_threshold( while waited < max_wait: sync_status = await nc_mcp_client.call_tool( - "nc_notes_get_vector_sync_status", arguments={} + "nc_get_vector_sync_status", arguments={} ) status_data = sync_status.structuredContent @@ -300,7 +300,7 @@ async def test_semantic_search_answer_score_threshold( # Query with exact match call_result = await nc_mcp_client.call_tool( - "nc_notes_semantic_search_answer", + "nc_semantic_search_answer", arguments={ "query": "widget manufacturing", "limit": 5, @@ -349,7 +349,7 @@ async def test_semantic_search_answer_max_tokens(nc_mcp_client, temporary_note_f while waited < max_wait: sync_status = await nc_mcp_client.call_tool( - "nc_notes_get_vector_sync_status", arguments={} + "nc_get_vector_sync_status", arguments={} ) status_data = sync_status.structuredContent @@ -362,7 +362,7 @@ async def test_semantic_search_answer_max_tokens(nc_mcp_client, temporary_note_f assert waited < max_wait, f"Vector sync did not complete within {max_wait} seconds" call_result = await nc_mcp_client.call_tool( - "nc_notes_semantic_search_answer", + "nc_semantic_search_answer", arguments={ "query": "document content", "limit": 5, diff --git a/tests/unit/test_response_models.py b/tests/unit/test_response_models.py index b70d163..bbe44dc 100644 --- a/tests/unit/test_response_models.py +++ b/tests/unit/test_response_models.py @@ -6,8 +6,10 @@ from nextcloud_mcp_server.models.notes import ( CreateNoteResponse, Note, NoteSearchResult, - SamplingSearchResponse, SearchNotesResponse, +) +from nextcloud_mcp_server.models.semantic import ( + SamplingSearchResponse, SemanticSearchResult, ) @@ -131,6 +133,7 @@ def test_sampling_search_response_with_answer(): sources = [ SemanticSearchResult( id=1, + doc_type="note", title="Python Guide", category="Development", excerpt="Use async/await for asynchronous programming", @@ -140,6 +143,7 @@ def test_sampling_search_response_with_answer(): ), SemanticSearchResult( id=2, + doc_type="note", title="Best Practices", category="Development", excerpt="Always use context managers with async operations", @@ -189,6 +193,7 @@ def test_sampling_search_response_fallback(): sources = [ SemanticSearchResult( id=1, + doc_type="note", title="Note 1", category="Work", excerpt="Some content",