feat: implement ADR-009 - refactor semantic search to use generic semantic:read scope

This implements ADR-009, which documents the decision to use a generic
`semantic:read` OAuth scope instead of requiring all app-specific scopes
for semantic search functionality.

Changes:
- Created new `nextcloud_mcp_server/models/semantic.py` with semantic search models
  - SemanticSearchResult (with new doc_type field for multi-app support)
  - SemanticSearchResponse
  - SamplingSearchResponse
  - VectorSyncStatusResponse

- Created new `nextcloud_mcp_server/server/semantic.py` with semantic search tools
  - nc_semantic_search (renamed from nc_notes_semantic_search)
  - nc_semantic_search_answer (renamed from nc_notes_semantic_search_answer)
  - nc_get_vector_sync_status (renamed from nc_notes_get_vector_sync_status)
  - All tools now use @require_scopes("semantic:read") instead of "notes:read"

- Updated `nextcloud_mcp_server/server/notes.py`
  - Removed semantic search tools (moved to semantic.py)
  - Removed semantic search model imports
  - Removed unused MCP imports (ModelHint, ModelPreferences, etc.)

- Updated `nextcloud_mcp_server/models/notes.py`
  - Removed semantic search models (moved to semantic.py)

- Updated `nextcloud_mcp_server/app.py`
  - Import configure_semantic_tools
  - Register semantic tools when VECTOR_SYNC_ENABLED=true

- Updated `nextcloud_mcp_server/server/__init__.py`
  - Export configure_semantic_tools

- Updated tests
  - tests/integration/test_sampling.py: Use new tool names
  - tests/unit/test_response_models.py: Import from semantic.py, add doc_type field

Architecture:
- Semantic search is now a cross-app feature, not tied to Notes
- Uses dual-phase authorization: semantic:read scope + per-document verification
- Supports future multi-app indexing (notes, calendar, deck, files, contacts)

Test results:
- All 69 unit tests passing
- All 5 smoke tests passing

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-11-09 05:53:53 +01:00
parent 31799ffd9a
commit 4b026e9aa0
8 changed files with 576 additions and 512 deletions
+9
View File
@@ -45,6 +45,7 @@ from nextcloud_mcp_server.server import (
configure_cookbook_tools,
configure_deck_tools,
configure_notes_tools,
configure_semantic_tools,
configure_sharing_tools,
configure_tables_tools,
configure_webdav_tools,
@@ -871,6 +872,14 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
f"Unknown app: {app_name}. Available apps: {list(available_apps.keys())}"
)
# Register semantic search tools (cross-app feature)
settings = get_settings()
if settings.vector_sync_enabled:
logger.info("Configuring semantic search tools (vector sync enabled)")
configure_semantic_tools(mcp)
else:
logger.info("Skipping semantic search tools (VECTOR_SYNC_ENABLED not set)")
# Register OAuth provisioning tools (only when offline access is enabled)
# With token exchange enabled (external IdP), provisioning is not needed for MCP operations
enable_token_exchange = (
-89
View File
@@ -37,18 +37,6 @@ class NoteSearchResult(BaseModel):
score: Optional[float] = Field(None, description="Search relevance score")
class SemanticSearchResult(BaseModel):
"""Model for semantic search results with additional metadata."""
id: int = Field(description="Note ID")
title: str = Field(description="Note title")
category: str = Field(default="", description="Note category")
excerpt: str = Field(description="Excerpt from matching chunk")
score: float = Field(description="Semantic similarity score (0-1)")
chunk_index: int = Field(description="Index of matching chunk in document")
total_chunks: int = Field(description="Total number of chunks in document")
class NotesSettings(BaseModel):
"""Model for Notes app settings."""
@@ -95,80 +83,3 @@ class SearchNotesResponse(BaseResponse):
results: List[NoteSearchResult] = Field(description="Search results")
query: str = Field(description="The search query used")
total_found: int = Field(description="Total number of notes found")
class SemanticSearchNotesResponse(BaseResponse):
"""Response model for semantic search."""
results: List[SemanticSearchResult] = Field(
description="Semantic search results with similarity scores"
)
query: str = Field(description="The search query used")
total_found: int = Field(description="Total number of notes found")
search_method: str = Field(
default="semantic", description="Search method used (semantic or hybrid)"
)
class SamplingSearchResponse(BaseResponse):
"""Response from semantic search with LLM-generated answer via MCP sampling.
This response includes both a generated natural language answer (created by
the MCP client's LLM via sampling) and the source documents used to generate
that answer. Users can read the answer for quick information and review
sources for verification and deeper exploration.
Attributes:
query: The original user query
generated_answer: Natural language answer generated by client's LLM
sources: List of semantic search results used as context
total_found: Total number of matching documents found
search_method: Always "semantic_sampling" for this response type
model_used: Name of model that generated the answer (e.g., "claude-3-5-sonnet")
stop_reason: Why generation stopped ("endTurn", "maxTokens", etc.)
"""
query: str = Field(..., description="Original user query")
generated_answer: str = Field(
..., description="LLM-generated answer based on retrieved documents"
)
sources: List[SemanticSearchResult] = Field(
default_factory=list,
description="Source documents with excerpts and relevance scores",
)
total_found: int = Field(..., description="Total matching documents")
search_method: str = Field(
default="semantic_sampling", description="Search method used"
)
model_used: Optional[str] = Field(
default=None, description="Model that generated the answer"
)
stop_reason: Optional[str] = Field(
default=None, description="Reason generation stopped"
)
class VectorSyncStatusResponse(BaseResponse):
"""Response for vector sync status.
Provides information about the current state of vector sync,
including how many documents are indexed and how many are pending.
Attributes:
indexed_count: Number of documents in Qdrant vector database
pending_count: Number of documents in processing queue
status: Current sync status ("idle" or "syncing")
enabled: Whether vector sync is enabled
"""
indexed_count: int = Field(
default=0, description="Number of documents indexed in vector database"
)
pending_count: int = Field(
default=0, description="Number of documents pending processing"
)
status: str = Field(
default="disabled",
description='Sync status: "idle", "syncing", or "disabled"',
)
enabled: bool = Field(default=False, description="Whether vector sync is enabled")
+109
View File
@@ -0,0 +1,109 @@
"""Pydantic models for semantic search responses."""
from typing import List, Optional
from pydantic import BaseModel, Field
from .base import BaseResponse
class SemanticSearchResult(BaseModel):
"""Model for semantic search results with additional metadata."""
id: int = Field(description="Document ID")
doc_type: str = Field(
description="Document type (note, calendar_event, deck_card, etc.)"
)
title: str = Field(description="Document title")
category: str = Field(
default="", description="Document category (notes) or location (calendar)"
)
excerpt: str = Field(description="Excerpt from matching chunk")
score: float = Field(description="Semantic similarity score (0-1)")
chunk_index: int = Field(description="Index of matching chunk in document")
total_chunks: int = Field(description="Total number of chunks in document")
class SemanticSearchResponse(BaseResponse):
"""Response model for semantic search across all indexed Nextcloud apps."""
results: List[SemanticSearchResult] = Field(
description="Semantic search results with similarity scores"
)
query: str = Field(description="The search query used")
total_found: int = Field(description="Total number of documents found")
search_method: str = Field(
default="semantic", description="Search method used (semantic or hybrid)"
)
class SamplingSearchResponse(BaseResponse):
"""Response from semantic search with LLM-generated answer via MCP sampling.
This response includes both a generated natural language answer (created by
the MCP client's LLM via sampling) and the source documents used to generate
that answer. Users can read the answer for quick information and review
sources for verification and deeper exploration.
Attributes:
query: The original user query
generated_answer: Natural language answer generated by client's LLM
sources: List of semantic search results used as context
total_found: Total number of matching documents found
search_method: Always "semantic_sampling" for this response type
model_used: Name of model that generated the answer (e.g., "claude-3-5-sonnet")
stop_reason: Why generation stopped ("endTurn", "maxTokens", etc.)
"""
query: str = Field(..., description="Original user query")
generated_answer: str = Field(
..., description="LLM-generated answer based on retrieved documents"
)
sources: List[SemanticSearchResult] = Field(
default_factory=list,
description="Source documents with excerpts and relevance scores",
)
total_found: int = Field(..., description="Total matching documents")
search_method: str = Field(
default="semantic_sampling", description="Search method used"
)
model_used: Optional[str] = Field(
default=None, description="Model that generated the answer"
)
stop_reason: Optional[str] = Field(
default=None, description="Reason generation stopped"
)
class VectorSyncStatusResponse(BaseResponse):
"""Response for vector sync status.
Provides information about the current state of vector sync,
including how many documents are indexed and how many are pending.
Attributes:
indexed_count: Number of documents in Qdrant vector database
pending_count: Number of documents in processing queue
status: Current sync status ("idle" or "syncing")
enabled: Whether vector sync is enabled
"""
indexed_count: int = Field(
default=0, description="Number of documents indexed in vector database"
)
pending_count: int = Field(
default=0, description="Number of documents pending processing"
)
status: str = Field(
default="disabled",
description='Sync status: "idle", "syncing", or "disabled"',
)
enabled: bool = Field(default=False, description="Whether vector sync is enabled")
__all__ = [
"SemanticSearchResult",
"SemanticSearchResponse",
"SamplingSearchResponse",
"VectorSyncStatusResponse",
]
+2
View File
@@ -3,6 +3,7 @@ from .contacts import configure_contacts_tools
from .cookbook import configure_cookbook_tools
from .deck import configure_deck_tools
from .notes import configure_notes_tools
from .semantic import configure_semantic_tools
from .sharing import configure_sharing_tools
from .tables import configure_tables_tools
from .webdav import configure_webdav_tools
@@ -13,6 +14,7 @@ __all__ = [
"configure_cookbook_tools",
"configure_deck_tools",
"configure_notes_tools",
"configure_semantic_tools",
"configure_sharing_tools",
"configure_tables_tools",
"configure_webdav_tools",
+1 -409
View File
@@ -3,13 +3,7 @@ import logging
from httpx import HTTPStatusError, RequestError
from mcp.server.fastmcp import Context, FastMCP
from mcp.shared.exceptions import McpError
from mcp.types import (
ErrorData,
ModelHint,
ModelPreferences,
SamplingMessage,
TextContent,
)
from mcp.types import ErrorData
from nextcloud_mcp_server.auth import require_scopes
from nextcloud_mcp_server.context import get_client
@@ -20,12 +14,8 @@ from nextcloud_mcp_server.models.notes import (
Note,
NoteSearchResult,
NotesSettings,
SamplingSearchResponse,
SearchNotesResponse,
SemanticSearchNotesResponse,
SemanticSearchResult,
UpdateNoteResponse,
VectorSyncStatusResponse,
)
logger = logging.getLogger(__name__)
@@ -376,321 +366,6 @@ def configure_notes_tools(mcp: FastMCP):
)
)
@mcp.tool()
@require_scopes("notes:read")
async def nc_notes_semantic_search(
query: str, ctx: Context, limit: int = 10, score_threshold: float = 0.7
) -> SemanticSearchNotesResponse:
"""
Semantic search for notes using vector embeddings.
Searches notes by meaning rather than exact keywords. Requires vector
database synchronization to be enabled (VECTOR_SYNC_ENABLED=true).
Args:
query: Natural language search query
limit: Maximum number of results to return (default: 10)
score_threshold: Minimum similarity score (0-1, default: 0.7)
Returns:
SemanticSearchNotesResponse with matching notes and similarity scores
"""
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.embedding import get_embedding_service
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
settings = get_settings()
# Check if vector sync is enabled
if not settings.vector_sync_enabled:
raise McpError(
ErrorData(
code=-1,
message="Semantic search is not enabled. Set VECTOR_SYNC_ENABLED=true and ensure vector database is configured.",
)
)
client = await get_client(ctx)
username = client.username
try:
# Generate embedding for query
embedding_service = get_embedding_service()
query_embedding = await embedding_service.embed(query)
# Search Qdrant with user filtering
qdrant_client = await get_qdrant_client()
search_response = await qdrant_client.query_points(
collection_name=settings.qdrant_collection,
query=query_embedding,
query_filter=Filter(
must=[
FieldCondition(
key="user_id",
match=MatchValue(value=username),
),
FieldCondition(
key="doc_type",
match=MatchValue(value="note"),
),
]
),
limit=limit * 2, # Get extra for filtering
score_threshold=score_threshold,
with_payload=True,
with_vectors=False, # Don't return vectors to save bandwidth
)
# Deduplicate by note ID (multiple chunks per note)
seen_note_ids = set()
results = []
for result in search_response.points:
note_id = int(result.payload["doc_id"])
# Skip if we've already seen this note
if note_id in seen_note_ids:
continue
seen_note_ids.add(note_id)
# Verify access via Nextcloud API (dual-phase authorization)
try:
note = await client.notes.get_note(note_id)
results.append(
SemanticSearchResult(
id=note_id,
title=result.payload["title"],
category=note.get("category", ""),
excerpt=result.payload["excerpt"],
score=result.score,
chunk_index=result.payload["chunk_index"],
total_chunks=result.payload["total_chunks"],
)
)
if len(results) >= limit:
break
except HTTPStatusError as e:
if e.response.status_code == 403:
# User lost access, skip this note
continue
elif e.response.status_code == 404:
# Note was deleted but not yet removed from vector DB
continue
else:
# Log other errors but continue processing
logger.warning(
f"Error verifying access to note {note_id}: {e.response.status_code}"
)
continue
return SemanticSearchNotesResponse(
results=results,
query=query,
total_found=len(results),
search_method="semantic",
)
except ValueError as e:
if "No embedding provider configured" in str(e):
raise McpError(
ErrorData(
code=-1,
message="Embedding service not configured. Set OLLAMA_BASE_URL environment variable.",
)
)
raise McpError(ErrorData(code=-1, message=f"Configuration error: {str(e)}"))
except RequestError as e:
raise McpError(
ErrorData(code=-1, message=f"Network error during search: {str(e)}")
)
except Exception as e:
logger.error(f"Semantic search error: {e}", exc_info=True)
raise McpError(
ErrorData(code=-1, message=f"Semantic search failed: {str(e)}")
)
@mcp.tool()
@require_scopes("notes:read")
async def nc_notes_semantic_search_answer(
query: str,
ctx: Context,
limit: int = 5,
score_threshold: float = 0.7,
max_answer_tokens: int = 500,
) -> SamplingSearchResponse:
"""
Semantic search with LLM-generated answer using MCP sampling.
Retrieves relevant documents from Nextcloud Notes using vector similarity
search, then uses MCP sampling to request the client's LLM to generate
a natural language answer based on the retrieved context.
This tool combines the power of semantic search (finding relevant content)
with LLM generation (synthesizing that content into coherent answers). The
generated answer includes citations to specific documents, allowing users
to verify claims and explore sources.
The LLM generation happens client-side via MCP sampling. The MCP client
controls which model is used, who pays for it, and whether to prompt the
user for approval. This keeps the server simple (no LLM API keys needed)
while giving users full control over their LLM interactions.
Args:
query: Natural language question to answer (e.g., "What are my project goals?")
ctx: MCP context for session access
limit: Maximum number of documents to retrieve (default: 5)
score_threshold: Minimum similarity score 0-1 (default: 0.7)
max_answer_tokens: Maximum tokens for generated answer (default: 500)
Returns:
SamplingSearchResponse containing:
- generated_answer: Natural language answer with citations
- sources: List of documents with excerpts and relevance scores
- model_used: Which model generated the answer
- stop_reason: Why generation stopped
Note: Requires MCP client to support sampling. If sampling is unavailable,
the tool gracefully degrades to returning documents with an explanation.
The client may prompt the user to approve the sampling request.
Examples:
>>> # Query about project goals
>>> result = await nc_notes_semantic_search_answer(
... query="What are my Q1 2025 project goals?",
... ctx=ctx
... )
>>> print(result.generated_answer)
"Based on Document 1 (Project Kickoff) and Document 3 (Q1 Planning),
your main goals are: 1) Improve semantic search accuracy by 20%,
2) Deploy new embedding model, 3) Reduce indexing latency..."
>>> # Query about learning
>>> result = await nc_notes_semantic_search_answer(
... query="What did I learn about Python async/await last month?",
... ctx=ctx,
... limit=10
... )
>>> len(result.sources) # Up to 10 documents
7
"""
# 1. Retrieve relevant documents via existing semantic search
search_response = await nc_notes_semantic_search(
query=query,
ctx=ctx,
limit=limit,
score_threshold=score_threshold,
)
# 2. Handle no results case - don't waste a sampling call
if not search_response.results:
logger.debug(f"No documents found for query: {query}")
return SamplingSearchResponse(
query=query,
generated_answer="No relevant documents found in your Nextcloud Notes for this query.",
sources=[],
total_found=0,
search_method="semantic_sampling",
success=True,
)
# 3. Construct context from retrieved documents
context_parts = []
for idx, result in enumerate(search_response.results, 1):
context_parts.append(
f"[Document {idx}]\n"
f"Title: {result.title}\n"
f"Category: {result.category}\n"
f"Excerpt: {result.excerpt}\n"
f"Relevance Score: {result.score:.2f}\n"
)
context = "\n".join(context_parts)
# 4. Construct prompt - reuse user's query, add context and instructions
prompt = (
f"{query}\n\n"
f"Here are relevant documents from Nextcloud Notes:\n\n"
f"{context}\n\n"
f"Based on the documents above, please provide a comprehensive answer. "
f"Cite the document numbers when referencing specific information."
)
logger.debug(
f"Requesting sampling for query: {query} "
f"({len(search_response.results)} documents retrieved)"
)
# 5. Request LLM completion via MCP sampling
try:
sampling_result = await ctx.session.create_message(
messages=[
SamplingMessage(
role="user",
content=TextContent(type="text", text=prompt),
)
],
max_tokens=max_answer_tokens,
temperature=0.7,
model_preferences=ModelPreferences(
hints=[ModelHint(name="claude-3-5-sonnet")],
intelligencePriority=0.8,
speedPriority=0.5,
),
include_context="thisServer",
)
# 6. Extract answer from sampling response
if sampling_result.content.type == "text":
generated_answer = sampling_result.content.text
else:
# Handle non-text responses (shouldn't happen for text prompts)
generated_answer = f"Received non-text response of type: {sampling_result.content.type}"
logger.warning(
f"Unexpected content type from sampling: {sampling_result.content.type}"
)
logger.info(
f"Sampling successful: model={sampling_result.model}, "
f"stop_reason={sampling_result.stopReason}"
)
return SamplingSearchResponse(
query=query,
generated_answer=generated_answer,
sources=search_response.results,
total_found=search_response.total_found,
search_method="semantic_sampling",
model_used=sampling_result.model,
stop_reason=sampling_result.stopReason,
success=True,
)
except Exception as e:
# Fallback: Return documents without generated answer
logger.warning(
f"Sampling failed ({type(e).__name__}: {e}), "
f"returning search results only"
)
return SamplingSearchResponse(
query=query,
generated_answer=(
f"[Sampling unavailable: {str(e)}]\n\n"
f"Found {search_response.total_found} relevant documents. "
f"Please review the sources below."
),
sources=search_response.results,
total_found=search_response.total_found,
search_method="semantic_sampling_fallback",
success=True,
)
@mcp.tool()
@require_scopes("notes:write")
async def nc_notes_delete_note(note_id: int, ctx: Context) -> DeleteNoteResponse:
@@ -727,86 +402,3 @@ def configure_notes_tools(mcp: FastMCP):
message=f"Failed to delete note {note_id}: server error ({e.response.status_code})",
)
)
@mcp.tool()
@require_scopes("openid")
async def nc_notes_get_vector_sync_status(ctx: Context) -> VectorSyncStatusResponse:
"""Get the current vector sync status.
Returns information about the vector sync process, including:
- Number of documents indexed in the vector database
- Number of documents pending processing
- Current sync status (idle, syncing, or disabled)
This is useful for determining when vector indexing is complete
after creating or updating notes.
"""
import os
# Check if vector sync is enabled
vector_sync_enabled = (
os.getenv("VECTOR_SYNC_ENABLED", "false").lower() == "true"
)
if not vector_sync_enabled:
return VectorSyncStatusResponse(
indexed_count=0,
pending_count=0,
status="disabled",
enabled=False,
)
try:
# Get document queue from lifespan context
lifespan_ctx = ctx.request_context.lifespan_context
document_queue = getattr(lifespan_ctx, "document_queue", None)
if document_queue is None:
logger.debug("document_queue not available in lifespan context")
return VectorSyncStatusResponse(
indexed_count=0,
pending_count=0,
status="unknown",
enabled=True,
)
# Get pending count from queue
pending_count = document_queue.qsize()
# Get Qdrant client and query indexed count
indexed_count = 0
try:
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
settings = get_settings()
qdrant_client = await get_qdrant_client()
# Count documents in collection
count_result = await qdrant_client.count(
collection_name=settings.qdrant_collection
)
indexed_count = count_result.count
except Exception as e:
logger.warning(f"Failed to query Qdrant for indexed count: {e}")
# Continue with indexed_count = 0
# Determine status
status = "syncing" if pending_count > 0 else "idle"
return VectorSyncStatusResponse(
indexed_count=indexed_count,
pending_count=pending_count,
status=status,
enabled=True,
)
except Exception as e:
logger.error(f"Error getting vector sync status: {e}")
raise McpError(
ErrorData(
code=-1,
message=f"Failed to retrieve vector sync status: {str(e)}",
)
)
+436
View File
@@ -0,0 +1,436 @@
"""Semantic search MCP tools using vector database."""
import logging
from httpx import HTTPStatusError, RequestError
from mcp.server.fastmcp import Context, FastMCP
from mcp.shared.exceptions import McpError
from mcp.types import (
ErrorData,
ModelHint,
ModelPreferences,
SamplingMessage,
TextContent,
)
from nextcloud_mcp_server.auth import require_scopes
from nextcloud_mcp_server.context import get_client
from nextcloud_mcp_server.models.semantic import (
SamplingSearchResponse,
SemanticSearchResponse,
SemanticSearchResult,
VectorSyncStatusResponse,
)
logger = logging.getLogger(__name__)
def configure_semantic_tools(mcp: FastMCP):
"""Configure semantic search tools for MCP server."""
@mcp.tool()
@require_scopes("semantic:read")
async def nc_semantic_search(
query: str, ctx: Context, limit: int = 10, score_threshold: float = 0.7
) -> SemanticSearchResponse:
"""
Semantic search across all indexed Nextcloud apps using vector embeddings.
Searches documents by meaning rather than exact keywords across notes, calendar
events, deck cards, files, and contacts. Requires vector database synchronization
to be enabled (VECTOR_SYNC_ENABLED=true).
Args:
query: Natural language search query
limit: Maximum number of results to return (default: 10)
score_threshold: Minimum similarity score (0-1, default: 0.7)
Returns:
SemanticSearchResponse with matching documents and similarity scores
"""
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.embedding import get_embedding_service
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
settings = get_settings()
# Check if vector sync is enabled
if not settings.vector_sync_enabled:
raise McpError(
ErrorData(
code=-1,
message="Semantic search is not enabled. Set VECTOR_SYNC_ENABLED=true and ensure vector database is configured.",
)
)
client = await get_client(ctx)
username = client.username
try:
# Generate embedding for query
embedding_service = get_embedding_service()
query_embedding = await embedding_service.embed(query)
# Search Qdrant with user filtering
# Note: Currently only searching notes (doc_type="note")
# Future: Remove doc_type filter to search all apps
qdrant_client = await get_qdrant_client()
search_response = await qdrant_client.query_points(
collection_name=settings.qdrant_collection,
query=query_embedding,
query_filter=Filter(
must=[
FieldCondition(
key="user_id",
match=MatchValue(value=username),
),
FieldCondition(
key="doc_type",
match=MatchValue(value="note"),
),
]
),
limit=limit * 2, # Get extra for filtering
score_threshold=score_threshold,
with_payload=True,
with_vectors=False, # Don't return vectors to save bandwidth
)
# Deduplicate by document ID (multiple chunks per document)
seen_doc_ids = set()
results = []
for result in search_response.points:
doc_id = int(result.payload["doc_id"])
doc_type = result.payload.get("doc_type", "note")
# Skip if we've already seen this document
if doc_id in seen_doc_ids:
continue
seen_doc_ids.add(doc_id)
# Verify access via Nextcloud API (dual-phase authorization)
# Currently only supports notes, will be extended to other apps
if doc_type == "note":
try:
note = await client.notes.get_note(doc_id)
results.append(
SemanticSearchResult(
id=doc_id,
doc_type="note",
title=result.payload["title"],
category=note.get("category", ""),
excerpt=result.payload["excerpt"],
score=result.score,
chunk_index=result.payload["chunk_index"],
total_chunks=result.payload["total_chunks"],
)
)
if len(results) >= limit:
break
except HTTPStatusError as e:
if e.response.status_code == 403:
# User lost access, skip this document
continue
elif e.response.status_code == 404:
# Document was deleted but not yet removed from vector DB
continue
else:
# Log other errors but continue processing
logger.warning(
f"Error verifying access to note {doc_id}: {e.response.status_code}"
)
continue
return SemanticSearchResponse(
results=results,
query=query,
total_found=len(results),
search_method="semantic",
)
except ValueError as e:
if "No embedding provider configured" in str(e):
raise McpError(
ErrorData(
code=-1,
message="Embedding service not configured. Set OLLAMA_BASE_URL environment variable.",
)
)
raise McpError(ErrorData(code=-1, message=f"Configuration error: {str(e)}"))
except RequestError as e:
raise McpError(
ErrorData(code=-1, message=f"Network error during search: {str(e)}")
)
except Exception as e:
logger.error(f"Semantic search error: {e}", exc_info=True)
raise McpError(
ErrorData(code=-1, message=f"Semantic search failed: {str(e)}")
)
@mcp.tool()
@require_scopes("semantic:read")
async def nc_semantic_search_answer(
query: str,
ctx: Context,
limit: int = 5,
score_threshold: float = 0.7,
max_answer_tokens: int = 500,
) -> SamplingSearchResponse:
"""
Semantic search with LLM-generated answer using MCP sampling.
Retrieves relevant documents from indexed Nextcloud apps (notes, calendar, deck,
files, contacts) using vector similarity search, then uses MCP sampling to request
the client's LLM to generate a natural language answer based on the retrieved context.
This tool combines the power of semantic search (finding relevant content across
all your Nextcloud apps) with LLM generation (synthesizing that content into
coherent answers). The generated answer includes citations to specific documents
with their types, allowing users to verify claims and explore sources.
The LLM generation happens client-side via MCP sampling. The MCP client
controls which model is used, who pays for it, and whether to prompt the
user for approval. This keeps the server simple (no LLM API keys needed)
while giving users full control over their LLM interactions.
Args:
query: Natural language question to answer (e.g., "What are my Q1 objectives?" or "When is my next dentist appointment?")
ctx: MCP context for session access
limit: Maximum number of documents to retrieve (default: 5)
score_threshold: Minimum similarity score 0-1 (default: 0.7)
max_answer_tokens: Maximum tokens for generated answer (default: 500)
Returns:
SamplingSearchResponse containing:
- generated_answer: Natural language answer with citations
- sources: List of documents with excerpts and relevance scores
- model_used: Which model generated the answer
- stop_reason: Why generation stopped
Note: Requires MCP client to support sampling. If sampling is unavailable,
the tool gracefully degrades to returning documents with an explanation.
The client may prompt the user to approve the sampling request.
Examples:
>>> # Query about objectives across multiple apps
>>> result = await nc_semantic_search_answer(
... query="What are my Q1 2025 project goals?",
... ctx=ctx
... )
>>> print(result.generated_answer)
"Based on Document 1 (note: Project Kickoff), Document 2 (calendar event:
Q1 Planning Meeting), and Document 3 (deck card: Implement semantic search),
your main goals are: 1) Improve semantic search accuracy by 20%,
2) Deploy new embedding model, 3) Reduce indexing latency..."
>>> # Query about appointments
>>> result = await nc_semantic_search_answer(
... query="When is my next dentist appointment?",
... ctx=ctx,
... limit=10
... )
>>> len(result.sources) # Calendar events and related notes
3
"""
# 1. Retrieve relevant documents via existing semantic search
search_response = await nc_semantic_search(
query=query,
ctx=ctx,
limit=limit,
score_threshold=score_threshold,
)
# 2. Handle no results case - don't waste a sampling call
if not search_response.results:
logger.debug(f"No documents found for query: {query}")
return SamplingSearchResponse(
query=query,
generated_answer="No relevant documents found in your Nextcloud content for this query.",
sources=[],
total_found=0,
search_method="semantic_sampling",
success=True,
)
# 3. Construct context from retrieved documents
context_parts = []
for idx, result in enumerate(search_response.results, 1):
context_parts.append(
f"[Document {idx}]\n"
f"Type: {result.doc_type}\n"
f"Title: {result.title}\n"
f"Category: {result.category}\n"
f"Excerpt: {result.excerpt}\n"
f"Relevance Score: {result.score:.2f}\n"
)
context = "\n".join(context_parts)
# 4. Construct prompt - reuse user's query, add context and instructions
prompt = (
f"{query}\n\n"
f"Here are relevant documents from Nextcloud (notes, calendar events, deck cards, files, contacts):\n\n"
f"{context}\n\n"
f"Based on the documents above, please provide a comprehensive answer. "
f"Cite the document numbers when referencing specific information."
)
logger.debug(
f"Requesting sampling for query: {query} "
f"({len(search_response.results)} documents retrieved)"
)
# 5. Request LLM completion via MCP sampling
try:
sampling_result = await ctx.session.create_message(
messages=[
SamplingMessage(
role="user",
content=TextContent(type="text", text=prompt),
)
],
max_tokens=max_answer_tokens,
temperature=0.7,
model_preferences=ModelPreferences(
hints=[ModelHint(name="claude-3-5-sonnet")],
intelligencePriority=0.8,
speedPriority=0.5,
),
include_context="thisServer",
)
# 6. Extract answer from sampling response
if sampling_result.content.type == "text":
generated_answer = sampling_result.content.text
else:
# Handle non-text responses (shouldn't happen for text prompts)
generated_answer = f"Received non-text response of type: {sampling_result.content.type}"
logger.warning(
f"Unexpected content type from sampling: {sampling_result.content.type}"
)
logger.info(
f"Sampling successful: model={sampling_result.model}, "
f"stop_reason={sampling_result.stopReason}"
)
return SamplingSearchResponse(
query=query,
generated_answer=generated_answer,
sources=search_response.results,
total_found=search_response.total_found,
search_method="semantic_sampling",
model_used=sampling_result.model,
stop_reason=sampling_result.stopReason,
success=True,
)
except Exception as e:
# Fallback: Return documents without generated answer
logger.warning(
f"Sampling failed ({type(e).__name__}: {e}), "
f"returning search results only"
)
return SamplingSearchResponse(
query=query,
generated_answer=(
f"[Sampling unavailable: {str(e)}]\n\n"
f"Found {search_response.total_found} relevant documents. "
f"Please review the sources below."
),
sources=search_response.results,
total_found=search_response.total_found,
search_method="semantic_sampling_fallback",
success=True,
)
@mcp.tool()
@require_scopes("semantic:read")
async def nc_get_vector_sync_status(ctx: Context) -> VectorSyncStatusResponse:
"""Get the current vector sync status.
Returns information about the vector sync process, including:
- Number of documents indexed in the vector database
- Number of documents pending processing
- Current sync status (idle, syncing, or disabled)
This is useful for determining when vector indexing is complete
after creating or updating content across all indexed apps.
"""
import os
# Check if vector sync is enabled
vector_sync_enabled = (
os.getenv("VECTOR_SYNC_ENABLED", "false").lower() == "true"
)
if not vector_sync_enabled:
return VectorSyncStatusResponse(
indexed_count=0,
pending_count=0,
status="disabled",
enabled=False,
)
try:
# Get document queue from lifespan context
lifespan_ctx = ctx.request_context.lifespan_context
document_queue = getattr(lifespan_ctx, "document_queue", None)
if document_queue is None:
logger.debug("document_queue not available in lifespan context")
return VectorSyncStatusResponse(
indexed_count=0,
pending_count=0,
status="unknown",
enabled=True,
)
# Get pending count from queue
pending_count = document_queue.qsize()
# Get Qdrant client and query indexed count
indexed_count = 0
try:
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
settings = get_settings()
qdrant_client = await get_qdrant_client()
# Count documents in collection
count_result = await qdrant_client.count(
collection_name=settings.qdrant_collection
)
indexed_count = count_result.count
except Exception as e:
logger.warning(f"Failed to query Qdrant for indexed count: {e}")
# Continue with indexed_count = 0
# Determine status
status = "syncing" if pending_count > 0 else "idle"
return VectorSyncStatusResponse(
indexed_count=indexed_count,
pending_count=pending_count,
status=status,
enabled=True,
)
except Exception as e:
logger.error(f"Error getting vector sync status: {e}")
raise McpError(
ErrorData(
code=-1,
message=f"Failed to retrieve vector sync status: {str(e)}",
)
)
+13 -13
View File
@@ -1,6 +1,6 @@
"""Integration tests for MCP sampling with semantic search.
These tests validate the nc_notes_semantic_search_answer tool which combines:
These tests validate the nc_semantic_search_answer tool which combines:
1. Semantic search to retrieve relevant documents
2. MCP sampling to generate natural language answers
@@ -50,8 +50,8 @@ async def test_semantic_search_answer_successful_sampling(
Flow:
1. Create test note with searchable content
2. Wait for vector sync to complete using nc_notes_get_vector_sync_status
3. Call nc_notes_semantic_search_answer
2. Wait for vector sync to complete using nc_get_vector_sync_status
3. Call nc_semantic_search_answer
4. Mock ctx.session.create_message to return answer
5. Verify response contains generated answer and sources
"""
@@ -59,7 +59,7 @@ async def test_semantic_search_answer_successful_sampling(
import asyncio
initial_sync = await nc_mcp_client.call_tool(
"nc_notes_get_vector_sync_status", arguments={}
"nc_get_vector_sync_status", arguments={}
)
initial_indexed_count = initial_sync.structuredContent["indexed_count"]
print(f"Initial indexed count: {initial_indexed_count}")
@@ -88,7 +88,7 @@ Avoid blocking operations in async code.""",
while waited < max_wait:
sync_status = await nc_mcp_client.call_tool(
"nc_notes_get_vector_sync_status", arguments={}
"nc_get_vector_sync_status", arguments={}
)
status_data = sync_status.structuredContent
@@ -123,7 +123,7 @@ Avoid blocking operations in async code.""",
# In a real integration test with MCP Inspector, this would be actual sampling
call_result = await nc_mcp_client.call_tool(
"nc_notes_semantic_search_answer",
"nc_semantic_search_answer",
arguments={
"query": "How do I use async in Python?",
"limit": 5,
@@ -169,7 +169,7 @@ async def test_semantic_search_answer_no_results(nc_mcp_client):
3. Verify no sampling call was made (no sources to base answer on)
"""
call_result = await nc_mcp_client.call_tool(
"nc_notes_semantic_search_answer",
"nc_semantic_search_answer",
arguments={
"query": "quantum chromodynamics lattice QCD gluon propagator",
"limit": 5,
@@ -229,7 +229,7 @@ async def test_semantic_search_answer_with_limit(nc_mcp_client, temporary_note_f
while waited < max_wait:
sync_status = await nc_mcp_client.call_tool(
"nc_notes_get_vector_sync_status", arguments={}
"nc_get_vector_sync_status", arguments={}
)
status_data = sync_status.structuredContent
@@ -242,7 +242,7 @@ async def test_semantic_search_answer_with_limit(nc_mcp_client, temporary_note_f
assert waited < max_wait, f"Vector sync did not complete within {max_wait} seconds"
call_result = await nc_mcp_client.call_tool(
"nc_notes_semantic_search_answer",
"nc_semantic_search_answer",
arguments={
"query": "async programming in Python",
"limit": 2,
@@ -286,7 +286,7 @@ async def test_semantic_search_answer_score_threshold(
while waited < max_wait:
sync_status = await nc_mcp_client.call_tool(
"nc_notes_get_vector_sync_status", arguments={}
"nc_get_vector_sync_status", arguments={}
)
status_data = sync_status.structuredContent
@@ -300,7 +300,7 @@ async def test_semantic_search_answer_score_threshold(
# Query with exact match
call_result = await nc_mcp_client.call_tool(
"nc_notes_semantic_search_answer",
"nc_semantic_search_answer",
arguments={
"query": "widget manufacturing",
"limit": 5,
@@ -349,7 +349,7 @@ async def test_semantic_search_answer_max_tokens(nc_mcp_client, temporary_note_f
while waited < max_wait:
sync_status = await nc_mcp_client.call_tool(
"nc_notes_get_vector_sync_status", arguments={}
"nc_get_vector_sync_status", arguments={}
)
status_data = sync_status.structuredContent
@@ -362,7 +362,7 @@ async def test_semantic_search_answer_max_tokens(nc_mcp_client, temporary_note_f
assert waited < max_wait, f"Vector sync did not complete within {max_wait} seconds"
call_result = await nc_mcp_client.call_tool(
"nc_notes_semantic_search_answer",
"nc_semantic_search_answer",
arguments={
"query": "document content",
"limit": 5,
+6 -1
View File
@@ -6,8 +6,10 @@ from nextcloud_mcp_server.models.notes import (
CreateNoteResponse,
Note,
NoteSearchResult,
SamplingSearchResponse,
SearchNotesResponse,
)
from nextcloud_mcp_server.models.semantic import (
SamplingSearchResponse,
SemanticSearchResult,
)
@@ -131,6 +133,7 @@ def test_sampling_search_response_with_answer():
sources = [
SemanticSearchResult(
id=1,
doc_type="note",
title="Python Guide",
category="Development",
excerpt="Use async/await for asynchronous programming",
@@ -140,6 +143,7 @@ def test_sampling_search_response_with_answer():
),
SemanticSearchResult(
id=2,
doc_type="note",
title="Best Practices",
category="Development",
excerpt="Always use context managers with async operations",
@@ -189,6 +193,7 @@ def test_sampling_search_response_fallback():
sources = [
SemanticSearchResult(
id=1,
doc_type="note",
title="Note 1",
category="Work",
excerpt="Some content",