diff --git a/nextcloud_mcp_server/auth/userinfo_routes.py b/nextcloud_mcp_server/auth/userinfo_routes.py index ca33e2b..113c524 100644 --- a/nextcloud_mcp_server/auth/userinfo_routes.py +++ b/nextcloud_mcp_server/auth/userinfo_routes.py @@ -725,6 +725,11 @@ async def user_info_html(request: Request) -> HTMLResponse: }}, renderPlot(coordinates, results) {{ + // Calculate score range for auto-scaling + const scores = results.map(r => r.score); + const minScore = Math.min(...scores); + const maxScore = Math.max(...scores); + const trace = {{ x: coordinates.map(c => c[0]), y: coordinates.map(c => c[1]), @@ -732,11 +737,18 @@ async def user_info_html(request: Request) -> HTMLResponse: type: 'scatter', text: results.map(r => `${{r.title}}
Score: ${{r.score.toFixed(3)}}`), marker: {{ - size: 8, - color: results.map(r => r.score), + // Multi-channel encoding: size + opacity + color for visual hierarchy + // Power scaling (score^2) amplifies visual differences dramatically + // score=0.0 → 6px, score=0.5 → 9.5px, score=1.0 → 20px + size: results.map(r => 6 + (Math.pow(r.score, 2) * 14)), + // Linear opacity scaling (0.2-1.0 range keeps all points visible) + opacity: results.map(r => 0.2 + (r.score * 0.8)), + // Color gradient shows score + color: scores, colorscale: 'Viridis', showscale: true, - colorbar: {{ title: 'Score' }}, + colorbar: {{ title: 'Relative Score' }}, + // Scores are normalized 0-1 within result set cmin: 0, cmax: 1 }} diff --git a/nextcloud_mcp_server/auth/viz_routes.py b/nextcloud_mcp_server/auth/viz_routes.py index 20eaf1c..ef5a163 100644 --- a/nextcloud_mcp_server/auth/viz_routes.py +++ b/nextcloud_mcp_server/auth/viz_routes.py @@ -11,6 +11,7 @@ All processing happens server-side following ADR-012: """ import logging +import time import numpy as np from starlette.authentication import requires @@ -381,56 +382,17 @@ async def vector_visualization_search(request: Request) -> JSONResponse: ) try: + # Start total request timer + request_start = time.perf_counter() # Get authenticated HTTP client from session # In BasicAuth mode: uses username/password from session # In OAuth mode: uses access token from session from nextcloud_mcp_server.auth.userinfo_routes import ( _get_authenticated_client_for_userinfo, ) - from nextcloud_mcp_server.client.notes import NotesClient - async with await _get_authenticated_client_for_userinfo(request) as http_client: - # Create NotesClient directly with authenticated HTTP client - notes_client = NotesClient(http_client, username) - - # Wrap in a minimal client object for search algorithms - # This conforms to NextcloudClientProtocol but only implements notes - class MinimalNextcloudClient: - def __init__(self, notes_client, username): - self._notes = notes_client - self.username = username - - @property - def notes(self): - return self._notes - - @property - def webdav(self): - return None - - @property - def calendar(self): - return None - - @property - def contacts(self): - return None - - @property - def deck(self): - return None - - @property - def cookbook(self): - return None - - @property - def tables(self): - return None - - nextcloud_client = MinimalNextcloudClient(notes_client, username) - - # Create search algorithm + async with await _get_authenticated_client_for_userinfo(request) as http_client: # noqa: F841 + # Create search algorithm (no client needed - verification removed) if algorithm == "semantic": search_algo = SemanticSearchAlgorithm(score_threshold=score_threshold) elif algorithm == "keyword": @@ -451,6 +413,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse: # Execute search (supports cross-app when doc_types=None) # Get unverified results with buffer for filtering + search_start = time.perf_counter() all_results = [] if doc_types is None or len(doc_types) == 0: # Cross-app search - search all indexed types @@ -476,13 +439,28 @@ async def vector_visualization_search(request: Request) -> JSONResponse: # Sort by score before verification all_results.sort(key=lambda r: r.score, reverse=True) - # Verify access for all results (deduplicates and filters) - from nextcloud_mcp_server.search.verification import verify_search_results + # No verification needed for visualization - we only need Qdrant metadata + # (title, excerpt, doc_type) which is already in search results. + # Verification is only needed for sampling (LLM needs full content). + search_results = all_results[:limit] + search_duration = time.perf_counter() - search_start - verified_results = await verify_search_results( - all_results, nextcloud_client + # Normalize scores relative to this result set for better visualization + # (best result = 1.0, worst result = 0.0 within THIS result set) + # This makes visual encoding meaningful regardless of RRF normalization + if search_results: + scores = [r.score for r in search_results] + min_score, max_score = min(scores), max(scores) + score_range = max_score - min_score if max_score > min_score else 1.0 + + logger.info( + f"Normalizing scores for viz: original range [{min_score:.3f}, {max_score:.3f}] " + f"→ [0.0, 1.0]" ) - search_results = verified_results[:limit] + + # Rescale each result's score to 0-1 within this result set + for r in search_results: + r.score = (r.score - min_score) / score_range if not search_results: return JSONResponse( @@ -495,6 +473,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse: ) # Fetch vectors for matching results from Qdrant + vector_fetch_start = time.perf_counter() qdrant_client = await get_qdrant_client() doc_ids = [r.id for r in search_results] @@ -534,6 +513,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse: # Extract vectors vectors = np.array([p.vector for p in points if p.vector is not None]) + vector_fetch_duration = time.perf_counter() - vector_fetch_start if len(vectors) < 2: # Not enough points for PCA @@ -556,8 +536,10 @@ async def vector_visualization_search(request: Request) -> JSONResponse: ) # Apply PCA dimensionality reduction (768-dim → 2D) + pca_start = time.perf_counter() pca = PCA(n_components=2) coords_2d = pca.fit_transform(vectors) + pca_duration = time.perf_counter() - pca_start # After fit, these attributes are guaranteed to be set assert pca.explained_variance_ratio_ is not None @@ -590,6 +572,18 @@ async def vector_visualization_search(request: Request) -> JSONResponse: for r in search_results ] + # Calculate total request duration + total_duration = time.perf_counter() - request_start + + # Log comprehensive timing metrics + logger.info( + f"Viz search timing: total={total_duration * 1000:.1f}ms, " + f"search={search_duration * 1000:.1f}ms ({search_duration / total_duration * 100:.1f}%), " + f"vector_fetch={vector_fetch_duration * 1000:.1f}ms ({vector_fetch_duration / total_duration * 100:.1f}%), " + f"pca={pca_duration * 1000:.1f}ms ({pca_duration / total_duration * 100:.1f}%), " + f"results={len(search_results)}, vectors={len(vectors)}" + ) + return JSONResponse( { "success": True, @@ -599,6 +593,14 @@ async def vector_visualization_search(request: Request) -> JSONResponse: "pc1": float(pca.explained_variance_ratio_[0]), "pc2": float(pca.explained_variance_ratio_[1]), }, + "timing": { + "total_ms": round(total_duration * 1000, 2), + "search_ms": round(search_duration * 1000, 2), + "vector_fetch_ms": round(vector_fetch_duration * 1000, 2), + "pca_ms": round(pca_duration * 1000, 2), + "num_results": len(search_results), + "num_vectors": len(vectors), + }, } ) diff --git a/nextcloud_mcp_server/client/base.py b/nextcloud_mcp_server/client/base.py index 7ca2278..7e6a196 100644 --- a/nextcloud_mcp_server/client/base.py +++ b/nextcloud_mcp_server/client/base.py @@ -5,6 +5,7 @@ import time from abc import ABC from functools import wraps +import anyio from httpx import AsyncClient, HTTPStatusError, RequestError, codes from nextcloud_mcp_server.observability.metrics import ( @@ -47,7 +48,7 @@ def retry_on_429(func): # Record retry metric (extract app name from args if available) if len(args) > 0 and hasattr(args[0], "app_name"): record_nextcloud_api_retry(app=args[0].app_name, reason="429") - time.sleep(5) + await anyio.sleep(5) elif e.response.status_code == 404: # 404 errors are often expected (e.g., checking if attachments exist) # Log as debug instead of warning diff --git a/nextcloud_mcp_server/search/verification.py b/nextcloud_mcp_server/search/verification.py index ea64621..4f25482 100644 --- a/nextcloud_mcp_server/search/verification.py +++ b/nextcloud_mcp_server/search/verification.py @@ -74,39 +74,50 @@ async def verify_search_results( # Use list to maintain order (index-based storage) verified_results = [None] * len(unique_results) + # Limit concurrent verifications to prevent connection pool exhaustion + # Without this, launching 90+ simultaneous HTTP requests overwhelms the + # connection pool, causing RequestError failures + max_concurrent = 20 + semaphore = anyio.Semaphore(max_concurrent) + async def verify_one(index: int, result: SearchResult): """ Verify a single document and store result at index. + Uses semaphore to limit concurrent requests and prevent connection pool exhaustion. + Args: index: Position in verified_results list result: Search result to verify """ - try: - if result.doc_type == "note": - # Fetch note to verify access and get fresh metadata - note = await nextcloud_client.notes.get_note(result.id) - # Update metadata with fresh data from Nextcloud - updated_metadata = {**(result.metadata or {}), **note} - verified_results[index] = replace(result, metadata=updated_metadata) - # TODO: Add verification for other doc types (calendar, deck, file, etc.) - else: - # For now, assume other types are accessible - # In production, add proper verification for each type - logger.debug( - f"No verification implemented for doc_type={result.doc_type}, " - "assuming accessible" - ) - verified_results[index] = result + async with semaphore: # Limit concurrent verifications + try: + if result.doc_type == "note": + # Fetch note to verify access and get fresh metadata + note = await nextcloud_client.notes.get_note(result.id) + # Update metadata with fresh data from Nextcloud + updated_metadata = {**(result.metadata or {}), **note} + verified_results[index] = replace(result, metadata=updated_metadata) + # TODO: Add verification for other doc types (calendar, deck, file, etc.) + else: + # For now, assume other types are accessible + # In production, add proper verification for each type + logger.debug( + f"No verification implemented for doc_type={result.doc_type}, " + "assuming accessible" + ) + verified_results[index] = result - except Exception as e: - # Document is inaccessible (403, 404, or other error) - # Log at debug level since this is expected for filtered results - logger.debug(f"Document {result.doc_type}/{result.id} not accessible: {e}") - verified_results[index] = None + except Exception as e: + # Document is inaccessible (403, 404, or other error) + # Log at debug level since this is expected for filtered results + logger.debug( + f"Document {result.doc_type}/{result.id} not accessible: {e}" + ) + verified_results[index] = None # Run all verifications in parallel using anyio task group - # This provides structured concurrency with automatic cancellation on errors + # Semaphore limits concurrency to prevent overwhelming connection pool async with anyio.create_task_group() as tg: for idx, result in enumerate(unique_results): tg.start_soon(verify_one, idx, result) diff --git a/nextcloud_mcp_server/vector/scanner.py b/nextcloud_mcp_server/vector/scanner.py index 27f7d86..9d45a8f 100644 --- a/nextcloud_mcp_server/vector/scanner.py +++ b/nextcloud_mcp_server/vector/scanner.py @@ -182,73 +182,43 @@ async def scan_user_documents( f"[SCAN-{scan_id}] Using pruneBefore={prune_before} to optimize data transfer" ) - # Fetch all notes from Nextcloud - notes = [ - note - async for note in nc_client.notes.get_all_notes(prune_before=prune_before) - ] - logger.info(f"[SCAN-{scan_id}] Found {len(notes)} notes for {user_id}") + # Get indexed state from Qdrant first (for incremental sync) + indexed_docs = {} + if not initial_sync: + qdrant_client = await get_qdrant_client() + scroll_result = await qdrant_client.scroll( + collection_name=get_settings().get_collection_name(), + scroll_filter=Filter( + must=[ + FieldCondition(key="user_id", match=MatchValue(value=user_id)), + FieldCondition(key="doc_type", match=MatchValue(value="note")), + ] + ), + with_payload=["doc_id", "indexed_at"], + with_vectors=False, + limit=10000, + ) - # Record documents scanned - record_vector_sync_scan(len(notes)) + indexed_docs = { + point.payload["doc_id"]: point.payload["indexed_at"] + for point in scroll_result[0] + } - if initial_sync: - # Send everything on first sync - for note in notes: - modified_at = note.get("modified", 0) - await send_stream.send( - DocumentTask( - user_id=user_id, - doc_id=str(note["id"]), - doc_type="note", - operation="index", - modified_at=modified_at, - ) - ) - logger.info(f"Sent {len(notes)} documents for initial sync: {user_id}") - return + logger.debug(f"Found {len(indexed_docs)} indexed documents in Qdrant") - # Get indexed state from Qdrant - qdrant_client = await get_qdrant_client() - scroll_result = await qdrant_client.scroll( - collection_name=get_settings().get_collection_name(), - scroll_filter=Filter( - must=[ - FieldCondition(key="user_id", match=MatchValue(value=user_id)), - FieldCondition(key="doc_type", match=MatchValue(value="note")), - ] - ), - with_payload=["doc_id", "indexed_at"], - with_vectors=False, - limit=10000, - ) - - indexed_docs = { - point.payload["doc_id"]: point.payload["indexed_at"] - for point in scroll_result[0] - } - - logger.debug(f"Found {len(indexed_docs)} indexed documents in Qdrant") - - # Compare and queue changes + # Stream notes from Nextcloud and process immediately + note_count = 0 queued = 0 - nextcloud_doc_ids = {str(note["id"]) for note in notes} + nextcloud_doc_ids = set() - for note in notes: + async for note in nc_client.notes.get_all_notes(prune_before=prune_before): + note_count += 1 doc_id = str(note["id"]) - indexed_at = indexed_docs.get(doc_id) + nextcloud_doc_ids.add(doc_id) modified_at = note.get("modified", 0) - # If document reappeared, remove from potentially_deleted - doc_key = (user_id, doc_id) - if doc_key in _potentially_deleted: - logger.debug( - f"Document {doc_id} reappeared, removing from deletion grace period" - ) - del _potentially_deleted[doc_key] - - # Send if never indexed or modified since last index - if indexed_at is None or modified_at > indexed_at: + if initial_sync: + # Send everything on first sync await send_stream.send( DocumentTask( user_id=user_id, @@ -259,6 +229,38 @@ async def scan_user_documents( ) ) queued += 1 + else: + # Incremental sync: compare with indexed state + indexed_at = indexed_docs.get(doc_id) + + # If document reappeared, remove from potentially_deleted + doc_key = (user_id, doc_id) + if doc_key in _potentially_deleted: + logger.debug( + f"Document {doc_id} reappeared, removing from deletion grace period" + ) + del _potentially_deleted[doc_key] + + # Send if never indexed or modified since last index + if indexed_at is None or modified_at > indexed_at: + await send_stream.send( + DocumentTask( + user_id=user_id, + doc_id=doc_id, + doc_type="note", + operation="index", + modified_at=modified_at, + ) + ) + queued += 1 + + # Log and record metrics after streaming + logger.info(f"[SCAN-{scan_id}] Found {note_count} notes for {user_id}") + record_vector_sync_scan(note_count) + + if initial_sync: + logger.info(f"Sent {queued} documents for initial sync: {user_id}") + return # Check for deleted documents (in Qdrant but not in Nextcloud) # Use grace period: only delete after 2 consecutive scans confirm absence