Merge branch 'rag-evaluation'
This commit is contained in:
@@ -1,133 +0,0 @@
|
||||
"""Access verification for search results.
|
||||
|
||||
This module provides centralized verification of Nextcloud access permissions
|
||||
for search results. Verification happens at the final output stage (MCP tool/viz endpoint)
|
||||
rather than within individual search algorithms, preventing redundant API calls.
|
||||
|
||||
Key benefits:
|
||||
- Deduplication: Each document verified exactly once (even in hybrid mode)
|
||||
- Parallel execution: All verifications run concurrently via anyio task groups
|
||||
- Separation of concerns: Algorithms handle scoring, this module handles security
|
||||
"""
|
||||
|
||||
import logging
|
||||
from dataclasses import replace
|
||||
from typing import Protocol
|
||||
|
||||
import anyio
|
||||
|
||||
from nextcloud_mcp_server.search.algorithms import SearchResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NextcloudClientProtocol(Protocol):
|
||||
"""Protocol for Nextcloud client with app-specific access."""
|
||||
|
||||
@property
|
||||
def notes(self):
|
||||
"""Notes client for accessing notes API."""
|
||||
...
|
||||
|
||||
|
||||
async def verify_search_results(
|
||||
results: list[SearchResult],
|
||||
nextcloud_client: NextcloudClientProtocol,
|
||||
) -> list[SearchResult]:
|
||||
"""
|
||||
Verify Nextcloud access for search results.
|
||||
|
||||
Deduplicates by (doc_id, doc_type), verifies in parallel using anyio task groups,
|
||||
and filters out inaccessible documents. Maintains original result ordering.
|
||||
|
||||
Args:
|
||||
results: Unverified search results from Qdrant
|
||||
nextcloud_client: Nextcloud client for access checks
|
||||
|
||||
Returns:
|
||||
Verified and accessible results (same order as input)
|
||||
|
||||
Example:
|
||||
>>> unverified = await search_algo.search(query="test", limit=10)
|
||||
>>> verified = await verify_search_results(unverified, client)
|
||||
>>> # verified contains only documents user can access
|
||||
"""
|
||||
# Deduplicate by (doc_id, doc_type) while preserving order
|
||||
# This is critical for hybrid search where same doc may appear in multiple algorithm results
|
||||
seen = set()
|
||||
unique_results = []
|
||||
for result in results:
|
||||
key = (result.id, result.doc_type)
|
||||
if key not in seen:
|
||||
seen.add(key)
|
||||
unique_results.append(result)
|
||||
|
||||
if not unique_results:
|
||||
return []
|
||||
|
||||
logger.debug(
|
||||
f"Verifying access for {len(unique_results)} unique documents "
|
||||
f"(from {len(results)} total results)"
|
||||
)
|
||||
|
||||
# Verify all unique documents in parallel using anyio task group
|
||||
# Use list to maintain order (index-based storage)
|
||||
verified_results = [None] * len(unique_results)
|
||||
|
||||
# Limit concurrent verifications to prevent connection pool exhaustion
|
||||
# Without this, launching 90+ simultaneous HTTP requests overwhelms the
|
||||
# connection pool, causing RequestError failures
|
||||
max_concurrent = 20
|
||||
semaphore = anyio.Semaphore(max_concurrent)
|
||||
|
||||
async def verify_one(index: int, result: SearchResult):
|
||||
"""
|
||||
Verify a single document and store result at index.
|
||||
|
||||
Uses semaphore to limit concurrent requests and prevent connection pool exhaustion.
|
||||
|
||||
Args:
|
||||
index: Position in verified_results list
|
||||
result: Search result to verify
|
||||
"""
|
||||
async with semaphore: # Limit concurrent verifications
|
||||
try:
|
||||
if result.doc_type == "note":
|
||||
# Fetch note to verify access and get fresh metadata
|
||||
note = await nextcloud_client.notes.get_note(result.id)
|
||||
# Update metadata with fresh data from Nextcloud
|
||||
updated_metadata = {**(result.metadata or {}), **note}
|
||||
verified_results[index] = replace(result, metadata=updated_metadata)
|
||||
# TODO: Add verification for other doc types (calendar, deck, file, etc.)
|
||||
else:
|
||||
# For now, assume other types are accessible
|
||||
# In production, add proper verification for each type
|
||||
logger.debug(
|
||||
f"No verification implemented for doc_type={result.doc_type}, "
|
||||
"assuming accessible"
|
||||
)
|
||||
verified_results[index] = result
|
||||
|
||||
except Exception as e:
|
||||
# Document is inaccessible (403, 404, or other error)
|
||||
# Log at debug level since this is expected for filtered results
|
||||
logger.debug(
|
||||
f"Document {result.doc_type}/{result.id} not accessible: {e}"
|
||||
)
|
||||
verified_results[index] = None
|
||||
|
||||
# Run all verifications in parallel using anyio task group
|
||||
# Semaphore limits concurrency to prevent overwhelming connection pool
|
||||
async with anyio.create_task_group() as tg:
|
||||
for idx, result in enumerate(unique_results):
|
||||
tg.start_soon(verify_one, idx, result)
|
||||
|
||||
# Filter out None (inaccessible) and return verified results
|
||||
accessible = [r for r in verified_results if r is not None]
|
||||
|
||||
logger.debug(
|
||||
f"Verification complete: {len(accessible)} accessible, "
|
||||
f"{len(unique_results) - len(accessible)} filtered out"
|
||||
)
|
||||
|
||||
return accessible
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import logging
|
||||
|
||||
import anyio
|
||||
from httpx import RequestError
|
||||
from mcp.server.fastmcp import Context, FastMCP
|
||||
from mcp.shared.exceptions import McpError
|
||||
@@ -121,11 +122,18 @@ def configure_semantic_tools(mcp: FastMCP):
|
||||
# Sort combined results by score
|
||||
all_results.sort(key=lambda r: r.score, reverse=True)
|
||||
|
||||
# Verify access for all results (deduplicates and filters)
|
||||
from nextcloud_mcp_server.search.verification import verify_search_results
|
||||
# Deduplicate results (hybrid search may return same doc from dense + sparse)
|
||||
# Qdrant already filters by user_id for multi-tenant isolation
|
||||
# Sampling tool will verify access when fetching full content
|
||||
seen = set()
|
||||
unique_results = []
|
||||
for result in all_results:
|
||||
key = (result.id, result.doc_type)
|
||||
if key not in seen:
|
||||
seen.add(key)
|
||||
unique_results.append(result)
|
||||
|
||||
verified_results = await verify_search_results(all_results, client)
|
||||
search_results = verified_results[:limit] # Final limit after verification
|
||||
search_results = unique_results[:limit] # Final limit after deduplication
|
||||
|
||||
# Convert SearchResult objects to SemanticSearchResult for response
|
||||
results = []
|
||||
@@ -302,35 +310,55 @@ def configure_semantic_tools(mcp: FastMCP):
|
||||
success=True,
|
||||
)
|
||||
|
||||
# 4. Fetch full content for notes to provide complete context to LLM
|
||||
# Filter out inaccessible notes (deleted or permissions changed)
|
||||
# 4. Fetch full content for notes in parallel (also verifies access)
|
||||
# Use anyio task group for concurrent fetching with semaphore to prevent
|
||||
# connection pool exhaustion
|
||||
client = await get_client(ctx)
|
||||
accessible_results = []
|
||||
full_contents = [] # Full content for accessible notes
|
||||
accessible_results = [None] * len(search_response.results)
|
||||
full_contents = [None] * len(search_response.results)
|
||||
|
||||
for result in search_response.results:
|
||||
if result.doc_type == "note":
|
||||
try:
|
||||
note = await client.notes.get_note(result.id)
|
||||
# Note is accessible, store full content
|
||||
accessible_results.append(result)
|
||||
full_contents.append(note.get("content", ""))
|
||||
logger.debug(
|
||||
f"Fetched full content for note {result.id} "
|
||||
f"(length: {len(full_contents[-1])} chars)"
|
||||
)
|
||||
except Exception as e:
|
||||
# Note might have been deleted or permissions changed
|
||||
# Filter it out to avoid corrupting LLM with inaccessible data
|
||||
logger.warning(
|
||||
f"Failed to fetch full content for note {result.id}: {e}. "
|
||||
f"Excluding from results."
|
||||
)
|
||||
else:
|
||||
# Non-note document types (future: calendar, deck, files)
|
||||
# For now, keep them with excerpts
|
||||
accessible_results.append(result)
|
||||
full_contents.append(None)
|
||||
# Limit concurrent requests to prevent connection pool exhaustion
|
||||
max_concurrent = 20
|
||||
semaphore = anyio.Semaphore(max_concurrent)
|
||||
|
||||
async def fetch_content(index: int, result: SemanticSearchResult):
|
||||
"""Fetch full content for a single document (parallel with semaphore)."""
|
||||
async with semaphore:
|
||||
if result.doc_type == "note":
|
||||
try:
|
||||
note = await client.notes.get_note(result.id)
|
||||
# Note is accessible, store result and full content
|
||||
content = note.get("content", "")
|
||||
accessible_results[index] = result
|
||||
full_contents[index] = content
|
||||
logger.debug(
|
||||
f"Fetched full content for note {result.id} "
|
||||
f"(length: {len(content)} chars)"
|
||||
)
|
||||
except Exception as e:
|
||||
# Note might have been deleted or permissions changed
|
||||
# Leave as None to filter out later
|
||||
logger.debug(
|
||||
f"Note {result.id} not accessible: {e}. "
|
||||
f"Excluding from results."
|
||||
)
|
||||
else:
|
||||
# Non-note document types (future: calendar, deck, files)
|
||||
# For now, keep them with excerpts
|
||||
accessible_results[index] = result
|
||||
# full_contents[index] remains None (will use excerpt)
|
||||
|
||||
# Run all fetches in parallel using anyio task group
|
||||
async with anyio.create_task_group() as tg:
|
||||
for idx, result in enumerate(search_response.results):
|
||||
tg.start_soon(fetch_content, idx, result)
|
||||
|
||||
# Filter out None (inaccessible notes) while preserving order
|
||||
final_pairs = [
|
||||
(r, c) for r, c in zip(accessible_results, full_contents) if r is not None
|
||||
]
|
||||
accessible_results = [r for r, c in final_pairs]
|
||||
full_contents = [c for r, c in final_pairs]
|
||||
|
||||
# Check if we filtered out all results
|
||||
if not accessible_results:
|
||||
@@ -382,7 +410,6 @@ def configure_semantic_tools(mcp: FastMCP):
|
||||
)
|
||||
|
||||
# 6. Request LLM completion via MCP sampling with timeout
|
||||
import anyio
|
||||
|
||||
try:
|
||||
with anyio.fail_after(30):
|
||||
|
||||
Reference in New Issue
Block a user