diff --git a/nextcloud_mcp_server/search/verification.py b/nextcloud_mcp_server/search/verification.py deleted file mode 100644 index 4f25482..0000000 --- a/nextcloud_mcp_server/search/verification.py +++ /dev/null @@ -1,133 +0,0 @@ -"""Access verification for search results. - -This module provides centralized verification of Nextcloud access permissions -for search results. Verification happens at the final output stage (MCP tool/viz endpoint) -rather than within individual search algorithms, preventing redundant API calls. - -Key benefits: -- Deduplication: Each document verified exactly once (even in hybrid mode) -- Parallel execution: All verifications run concurrently via anyio task groups -- Separation of concerns: Algorithms handle scoring, this module handles security -""" - -import logging -from dataclasses import replace -from typing import Protocol - -import anyio - -from nextcloud_mcp_server.search.algorithms import SearchResult - -logger = logging.getLogger(__name__) - - -class NextcloudClientProtocol(Protocol): - """Protocol for Nextcloud client with app-specific access.""" - - @property - def notes(self): - """Notes client for accessing notes API.""" - ... - - -async def verify_search_results( - results: list[SearchResult], - nextcloud_client: NextcloudClientProtocol, -) -> list[SearchResult]: - """ - Verify Nextcloud access for search results. - - Deduplicates by (doc_id, doc_type), verifies in parallel using anyio task groups, - and filters out inaccessible documents. Maintains original result ordering. - - Args: - results: Unverified search results from Qdrant - nextcloud_client: Nextcloud client for access checks - - Returns: - Verified and accessible results (same order as input) - - Example: - >>> unverified = await search_algo.search(query="test", limit=10) - >>> verified = await verify_search_results(unverified, client) - >>> # verified contains only documents user can access - """ - # Deduplicate by (doc_id, doc_type) while preserving order - # This is critical for hybrid search where same doc may appear in multiple algorithm results - seen = set() - unique_results = [] - for result in results: - key = (result.id, result.doc_type) - if key not in seen: - seen.add(key) - unique_results.append(result) - - if not unique_results: - return [] - - logger.debug( - f"Verifying access for {len(unique_results)} unique documents " - f"(from {len(results)} total results)" - ) - - # Verify all unique documents in parallel using anyio task group - # Use list to maintain order (index-based storage) - verified_results = [None] * len(unique_results) - - # Limit concurrent verifications to prevent connection pool exhaustion - # Without this, launching 90+ simultaneous HTTP requests overwhelms the - # connection pool, causing RequestError failures - max_concurrent = 20 - semaphore = anyio.Semaphore(max_concurrent) - - async def verify_one(index: int, result: SearchResult): - """ - Verify a single document and store result at index. - - Uses semaphore to limit concurrent requests and prevent connection pool exhaustion. - - Args: - index: Position in verified_results list - result: Search result to verify - """ - async with semaphore: # Limit concurrent verifications - try: - if result.doc_type == "note": - # Fetch note to verify access and get fresh metadata - note = await nextcloud_client.notes.get_note(result.id) - # Update metadata with fresh data from Nextcloud - updated_metadata = {**(result.metadata or {}), **note} - verified_results[index] = replace(result, metadata=updated_metadata) - # TODO: Add verification for other doc types (calendar, deck, file, etc.) - else: - # For now, assume other types are accessible - # In production, add proper verification for each type - logger.debug( - f"No verification implemented for doc_type={result.doc_type}, " - "assuming accessible" - ) - verified_results[index] = result - - except Exception as e: - # Document is inaccessible (403, 404, or other error) - # Log at debug level since this is expected for filtered results - logger.debug( - f"Document {result.doc_type}/{result.id} not accessible: {e}" - ) - verified_results[index] = None - - # Run all verifications in parallel using anyio task group - # Semaphore limits concurrency to prevent overwhelming connection pool - async with anyio.create_task_group() as tg: - for idx, result in enumerate(unique_results): - tg.start_soon(verify_one, idx, result) - - # Filter out None (inaccessible) and return verified results - accessible = [r for r in verified_results if r is not None] - - logger.debug( - f"Verification complete: {len(accessible)} accessible, " - f"{len(unique_results) - len(accessible)} filtered out" - ) - - return accessible diff --git a/nextcloud_mcp_server/server/semantic.py b/nextcloud_mcp_server/server/semantic.py index 51a72a0..2f8fde6 100644 --- a/nextcloud_mcp_server/server/semantic.py +++ b/nextcloud_mcp_server/server/semantic.py @@ -2,6 +2,7 @@ import logging +import anyio from httpx import RequestError from mcp.server.fastmcp import Context, FastMCP from mcp.shared.exceptions import McpError @@ -121,11 +122,18 @@ def configure_semantic_tools(mcp: FastMCP): # Sort combined results by score all_results.sort(key=lambda r: r.score, reverse=True) - # Verify access for all results (deduplicates and filters) - from nextcloud_mcp_server.search.verification import verify_search_results + # Deduplicate results (hybrid search may return same doc from dense + sparse) + # Qdrant already filters by user_id for multi-tenant isolation + # Sampling tool will verify access when fetching full content + seen = set() + unique_results = [] + for result in all_results: + key = (result.id, result.doc_type) + if key not in seen: + seen.add(key) + unique_results.append(result) - verified_results = await verify_search_results(all_results, client) - search_results = verified_results[:limit] # Final limit after verification + search_results = unique_results[:limit] # Final limit after deduplication # Convert SearchResult objects to SemanticSearchResult for response results = [] @@ -302,35 +310,55 @@ def configure_semantic_tools(mcp: FastMCP): success=True, ) - # 4. Fetch full content for notes to provide complete context to LLM - # Filter out inaccessible notes (deleted or permissions changed) + # 4. Fetch full content for notes in parallel (also verifies access) + # Use anyio task group for concurrent fetching with semaphore to prevent + # connection pool exhaustion client = await get_client(ctx) - accessible_results = [] - full_contents = [] # Full content for accessible notes + accessible_results = [None] * len(search_response.results) + full_contents = [None] * len(search_response.results) - for result in search_response.results: - if result.doc_type == "note": - try: - note = await client.notes.get_note(result.id) - # Note is accessible, store full content - accessible_results.append(result) - full_contents.append(note.get("content", "")) - logger.debug( - f"Fetched full content for note {result.id} " - f"(length: {len(full_contents[-1])} chars)" - ) - except Exception as e: - # Note might have been deleted or permissions changed - # Filter it out to avoid corrupting LLM with inaccessible data - logger.warning( - f"Failed to fetch full content for note {result.id}: {e}. " - f"Excluding from results." - ) - else: - # Non-note document types (future: calendar, deck, files) - # For now, keep them with excerpts - accessible_results.append(result) - full_contents.append(None) + # Limit concurrent requests to prevent connection pool exhaustion + max_concurrent = 20 + semaphore = anyio.Semaphore(max_concurrent) + + async def fetch_content(index: int, result: SemanticSearchResult): + """Fetch full content for a single document (parallel with semaphore).""" + async with semaphore: + if result.doc_type == "note": + try: + note = await client.notes.get_note(result.id) + # Note is accessible, store result and full content + content = note.get("content", "") + accessible_results[index] = result + full_contents[index] = content + logger.debug( + f"Fetched full content for note {result.id} " + f"(length: {len(content)} chars)" + ) + except Exception as e: + # Note might have been deleted or permissions changed + # Leave as None to filter out later + logger.debug( + f"Note {result.id} not accessible: {e}. " + f"Excluding from results." + ) + else: + # Non-note document types (future: calendar, deck, files) + # For now, keep them with excerpts + accessible_results[index] = result + # full_contents[index] remains None (will use excerpt) + + # Run all fetches in parallel using anyio task group + async with anyio.create_task_group() as tg: + for idx, result in enumerate(search_response.results): + tg.start_soon(fetch_content, idx, result) + + # Filter out None (inaccessible notes) while preserving order + final_pairs = [ + (r, c) for r, c in zip(accessible_results, full_contents) if r is not None + ] + accessible_results = [r for r, c in final_pairs] + full_contents = [c for r, c in final_pairs] # Check if we filtered out all results if not accessible_results: @@ -382,7 +410,6 @@ def configure_semantic_tools(mcp: FastMCP): ) # 6. Request LLM completion via MCP sampling with timeout - import anyio try: with anyio.fail_after(30):