"""Vector visualization routes for testing search algorithms. Provides a web UI for users to test different search algorithms on their own indexed documents and visualize results in 3D space using PCA. All processing happens server-side following ADR-012: - Search execution via shared search/algorithms.py - Query embedding generation - PCA dimensionality reduction (768-dim → 3D) - Only 3D coordinates + metadata sent to client - Bandwidth-efficient (3 floats per doc vs 768) """ import logging import time from pathlib import Path import numpy as np from jinja2 import Environment, FileSystemLoader from starlette.authentication import requires from starlette.requests import Request from starlette.responses import HTMLResponse, JSONResponse from nextcloud_mcp_server.config import get_settings from nextcloud_mcp_server.search import ( BM25HybridSearchAlgorithm, SemanticSearchAlgorithm, ) from nextcloud_mcp_server.vector.pca import PCA from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client logger = logging.getLogger(__name__) # Setup Jinja2 environment for templates _template_dir = Path(__file__).parent / "templates" _jinja_env = Environment(loader=FileSystemLoader(_template_dir)) @requires("authenticated", redirect="oauth_login") async def vector_visualization_html(request: Request) -> HTMLResponse: """Vector visualization page with search controls and interactive plot. Provides UI for testing search algorithms with real-time visualization. Requires vector sync to be enabled. Args: request: Starlette request object Returns: HTML page with search interface """ settings = get_settings() if not settings.vector_sync_enabled: return HTMLResponse( """

Vector Visualization

Vector sync is not enabled. Set VECTOR_SYNC_ENABLED=true to use this feature.
""" ) # Get user info from auth context username = ( request.user.display_name if hasattr(request.user, "display_name") else "unknown" ) # Load and render template template = _jinja_env.get_template("vector_viz.html") html_content = template.render(username=username) return HTMLResponse(content=html_content) @requires("authenticated", redirect="oauth_login") async def vector_visualization_search(request: Request) -> JSONResponse: """Execute server-side search and return 3D coordinates + results. All processing happens server-side: 1. Execute search via shared algorithm module 2. Generate query embedding 3. Fetch matching vectors from Qdrant 4. Apply PCA reduction (768-dim → 3D) to query + documents 5. Return coordinates + metadata only Args: request: Starlette request with query parameters Returns: JSON response with coordinates_3d and results (including query point) """ settings = get_settings() if not settings.vector_sync_enabled: return JSONResponse( {"success": False, "error": "Vector sync not enabled"}, status_code=400, ) # Get user info from auth context username = ( request.user.display_name if hasattr(request.user, "display_name") else None ) if not username: return JSONResponse( {"success": False, "error": "User not authenticated"}, status_code=401, ) # Parse query parameters query = request.query_params.get("query", "") algorithm = request.query_params.get("algorithm", "bm25_hybrid") limit = int(request.query_params.get("limit", "50")) score_threshold = float(request.query_params.get("score_threshold", "0.0")) fusion = request.query_params.get("fusion", "rrf") # Default to RRF # Parse doc_types (comma-separated list, None = all types) doc_types_param = request.query_params.get("doc_types", "") doc_types = doc_types_param.split(",") if doc_types_param else None logger.info( f"Viz search: user={username}, query='{query}', " f"algorithm={algorithm}, fusion={fusion}, limit={limit}, doc_types={doc_types}" ) try: # Start total request timer request_start = time.perf_counter() # Get authenticated HTTP client from session # In BasicAuth mode: uses username/password from session # In OAuth mode: uses access token from session from nextcloud_mcp_server.auth.userinfo_routes import ( _get_authenticated_client_for_userinfo, ) async with await _get_authenticated_client_for_userinfo(request) as http_client: # noqa: F841 # Create search algorithm (no client needed - verification removed) if algorithm == "semantic": search_algo = SemanticSearchAlgorithm(score_threshold=score_threshold) elif algorithm == "bm25_hybrid": search_algo = BM25HybridSearchAlgorithm( score_threshold=score_threshold, fusion=fusion ) else: return JSONResponse( {"success": False, "error": f"Unknown algorithm: {algorithm}"}, status_code=400, ) # Execute search (supports cross-app when doc_types=None) # Get unverified results with buffer for filtering search_start = time.perf_counter() all_results = [] if doc_types is None or len(doc_types) == 0: # Cross-app search - search all indexed types unverified_results = await search_algo.search( query=query, user_id=username, limit=limit * 2, # Buffer for verification filtering doc_type=None, # Search all types score_threshold=score_threshold, ) all_results.extend(unverified_results) else: # Search each document type and combine for doc_type in doc_types: unverified_results = await search_algo.search( query=query, user_id=username, limit=limit * 2, # Buffer for verification filtering doc_type=doc_type, score_threshold=score_threshold, ) all_results.extend(unverified_results) # Sort by score before verification all_results.sort(key=lambda r: r.score, reverse=True) # No verification needed for visualization - we only need Qdrant metadata # (title, excerpt, doc_type) which is already in search results. # Verification is only needed for sampling (LLM needs full content). search_results = all_results[:limit] search_duration = time.perf_counter() - search_start # Store original scores and normalize for visualization # (best result = 1.0, worst result = 0.0 within THIS result set) # This makes visual encoding meaningful regardless of RRF normalization if search_results: scores = [r.score for r in search_results] min_score, max_score = min(scores), max(scores) score_range = max_score - min_score if max_score > min_score else 1.0 logger.info( f"Normalizing scores for viz: original range [{min_score:.3f}, {max_score:.3f}] " f"→ [0.0, 1.0]" ) # Store original score and rescale to 0-1 for visualization for r in search_results: # Store original score before normalization r.original_score = r.score # Rescale for visual encoding r.score = (r.score - min_score) / score_range if not search_results: return JSONResponse( { "success": True, "results": [], "coordinates_3d": [], "query_coords": None, "message": "No results found", } ) # Fetch vectors for matching results from Qdrant vector_fetch_start = time.perf_counter() qdrant_client = await get_qdrant_client() doc_ids = [r.id for r in search_results] # Retrieve vectors for the matching documents from qdrant_client.models import FieldCondition, Filter, MatchAny points_response = await qdrant_client.scroll( collection_name=settings.get_collection_name(), scroll_filter=Filter( must=[ FieldCondition( key="doc_id", match=MatchAny(any=[str(doc_id) for doc_id in doc_ids]), ), FieldCondition( key="user_id", match={"value": username}, ), ] ), limit=len(doc_ids) * 2, # Account for multiple chunks per doc with_vectors=["dense"], # Only fetch dense vectors for visualization with_payload=["doc_id"], # Need doc_id to map vectors to results ) points = points_response[0] if not points: return JSONResponse( { "success": True, "results": [], "coordinates_2d": [], "message": "No vectors found for results", } ) # Extract dense vectors and group by document def extract_dense_vector(point): if point.vector is None: return None # If named vectors (dict), extract "dense" if isinstance(point.vector, dict): return point.vector.get("dense") # If unnamed vector (array), use directly return point.vector # Group chunk vectors by doc_id from collections import defaultdict doc_chunks = defaultdict(list) for point in points: if point.payload: doc_id = int(point.payload.get("doc_id", 0)) vector = extract_dense_vector(point) if vector is not None: doc_chunks[doc_id].append(vector) vector_fetch_duration = time.perf_counter() - vector_fetch_start if len(doc_chunks) < 2: # Not enough documents for PCA return JSONResponse( { "success": True, "results": [ { "id": r.id, "doc_type": r.doc_type, "title": r.title, "excerpt": r.excerpt, "score": r.score, } for r in search_results ], "coordinates_3d": [[0, 0, 0]] * len(search_results), "query_coords": [0, 0, 0], "message": "Not enough documents for PCA", } ) # Detect embedding dimension from first available vector embedding_dim = None for chunks in doc_chunks.values(): if chunks: embedding_dim = len(chunks[0]) break if embedding_dim is None: return JSONResponse( { "success": False, "error": "Could not determine embedding dimension", }, status_code=500, ) logger.info(f"Detected embedding dimension: {embedding_dim}") # Average chunk vectors per document to create document-level embeddings # Maintain order of search_results for coordinate mapping doc_vectors = [] for result in search_results: if result.id in doc_chunks: # Average all chunk embeddings for this document chunk_vectors = np.array(doc_chunks[result.id]) avg_vector = np.mean(chunk_vectors, axis=0) doc_vectors.append(avg_vector) logger.debug(f"Doc {result.id}: averaged {len(chunk_vectors)} chunks") else: # Document not found in vectors (shouldn't happen) logger.warning(f"Doc {result.id} not found in fetched vectors") # Use zero vector as fallback with detected dimension doc_vectors.append(np.zeros(embedding_dim)) doc_vectors = np.array(doc_vectors) # Generate query embedding for visualization query_embed_start = time.perf_counter() from nextcloud_mcp_server.embedding.service import get_embedding_service embedding_service = get_embedding_service() query_embedding = await embedding_service.embed(query) query_embed_duration = time.perf_counter() - query_embed_start logger.info(f"Generated query embedding (dimension={len(query_embedding)})") # Combine query vector with document vectors for PCA # Query will be the last point in the array all_vectors = np.vstack([doc_vectors, np.array([query_embedding])]) # Normalize vectors to unit length (L2 normalization) # This is critical because Qdrant uses COSINE distance, which only measures # vector direction (angle), not magnitude. PCA uses Euclidean distance which # considers both direction and magnitude. By normalizing to unit length, # Euclidean distances in PCA space will match cosine distances. norms = np.linalg.norm(all_vectors, axis=1, keepdims=True) # Check for zero-norm vectors (can happen with empty/corrupted embeddings) zero_norm_mask = norms[:, 0] < 1e-10 if zero_norm_mask.any(): zero_indices = np.where(zero_norm_mask)[0] logger.warning( f"Found {zero_norm_mask.sum()} zero-norm vectors at indices {zero_indices.tolist()}. " "Replacing with small epsilon to avoid division by zero." ) # Replace zero norms with small epsilon to avoid NaN norms[zero_norm_mask] = 1e-10 all_vectors_normalized = all_vectors / norms logger.info( f"Normalized vectors: query_norm={norms[-1][0]:.3f}, " f"doc_norm_range=[{norms[:-1].min():.3f}, {norms[:-1].max():.3f}]" ) # Apply PCA dimensionality reduction (768-dim → 3D) on normalized vectors pca_start = time.perf_counter() pca = PCA(n_components=3) coords_3d = pca.fit_transform(all_vectors_normalized) pca_duration = time.perf_counter() - pca_start # After fit, these attributes are guaranteed to be set assert pca.explained_variance_ratio_ is not None # Check for NaN values in PCA output (numerical instability) nan_mask = np.isnan(coords_3d) if nan_mask.any(): nan_rows = np.where(nan_mask.any(axis=1))[0] logger.error( f"Found NaN values in PCA output at {len(nan_rows)} points: {nan_rows.tolist()[:10]}. " "Replacing NaN with 0.0 to prevent JSON serialization error." ) # Replace NaN with 0 to allow JSON serialization coords_3d = np.nan_to_num(coords_3d, nan=0.0) # Split query coords from document coords # Round to 2 decimal places for cleaner display query_coords_3d = [ round(float(x), 2) for x in coords_3d[-1] ] # Last point is query doc_coords_3d = coords_3d[:-1] # All but last are documents total_chunks = sum(len(chunks) for chunks in doc_chunks.values()) avg_chunks_per_doc = ( total_chunks / len(doc_vectors) if doc_vectors.size > 0 else 0 ) logger.info( f"PCA explained variance: PC1={pca.explained_variance_ratio_[0]:.3f}, " f"PC2={pca.explained_variance_ratio_[1]:.3f}, " f"PC3={pca.explained_variance_ratio_[2]:.3f}" ) logger.info( f"Embedding stats: documents={len(doc_vectors)}, " f"total_chunks={total_chunks}, avg_chunks_per_doc={avg_chunks_per_doc:.1f}, " f"query_dim={len(query_embedding)}, doc_vector_dim={doc_vectors.shape[1] if doc_vectors.size > 0 else 0}" ) # Coordinates already match search_results order (1:1 mapping) result_coords = [[round(float(x), 2) for x in coord] for coord in doc_coords_3d] # Build response response_results = [ { "id": r.id, "doc_type": r.doc_type, "title": r.title, "excerpt": r.excerpt, "score": r.score, # Normalized score for visual encoding (0-1) "original_score": getattr( r, "original_score", r.score ), # Raw score from algorithm "chunk_start_offset": r.chunk_start_offset, "chunk_end_offset": r.chunk_end_offset, } for r in search_results ] # Calculate total request duration total_duration = time.perf_counter() - request_start # Log comprehensive timing metrics logger.info( f"Viz search timing: total={total_duration * 1000:.1f}ms, " f"search={search_duration * 1000:.1f}ms ({search_duration / total_duration * 100:.1f}%), " f"vector_fetch={vector_fetch_duration * 1000:.1f}ms ({vector_fetch_duration / total_duration * 100:.1f}%), " f"query_embed={query_embed_duration * 1000:.1f}ms ({query_embed_duration / total_duration * 100:.1f}%), " f"pca={pca_duration * 1000:.1f}ms ({pca_duration / total_duration * 100:.1f}%), " f"results={len(search_results)}, doc_vectors={len(doc_vectors)}" ) return JSONResponse( { "success": True, "results": response_results, "coordinates_3d": result_coords[: len(search_results)], "query_coords": query_coords_3d, "pca_variance": { "pc1": float(pca.explained_variance_ratio_[0]), "pc2": float(pca.explained_variance_ratio_[1]), "pc3": float(pca.explained_variance_ratio_[2]), }, "timing": { "total_ms": round(total_duration * 1000, 2), "search_ms": round(search_duration * 1000, 2), "vector_fetch_ms": round(vector_fetch_duration * 1000, 2), "query_embed_ms": round(query_embed_duration * 1000, 2), "pca_ms": round(pca_duration * 1000, 2), "num_results": len(search_results), "num_doc_vectors": len(doc_vectors), }, } ) except Exception as e: logger.error(f"Viz search error: {e}", exc_info=True) return JSONResponse( {"success": False, "error": str(e)}, status_code=500, ) @requires("authenticated", redirect="oauth_login") async def chunk_context_endpoint(request: Request) -> JSONResponse: """Fetch chunk text with surrounding context for visualization. This endpoint retrieves the matched chunk along with surrounding text to provide context for the search result. Used by the viz pane to display chunks inline. Query parameters: doc_type: Document type (e.g., "note") doc_id: Document ID start: Chunk start offset (character position) end: Chunk end offset (character position) context: Characters of context before/after (default: 500) Returns: JSON with chunk_text, before_context, after_context, and flags """ try: # Get query parameters doc_type = request.query_params.get("doc_type") doc_id = request.query_params.get("doc_id") start_str = request.query_params.get("start") end_str = request.query_params.get("end") context_chars = int(request.query_params.get("context", "500")) # Validate required parameters if not all([doc_type, doc_id, start_str, end_str]): return JSONResponse( { "success": False, "error": "Missing required parameters: doc_type, doc_id, start, end", }, status_code=400, ) start = int(start_str) end = int(end_str) # Currently only support notes if doc_type != "note": return JSONResponse( {"success": False, "error": f"Unsupported doc_type: {doc_type}"}, status_code=400, ) # Get authenticated HTTP client and fetch note from nextcloud_mcp_server.auth.userinfo_routes import ( _get_authenticated_client_for_userinfo, ) from nextcloud_mcp_server.client.notes import NotesClient # Get username from request auth username = ( request.user.display_name if hasattr(request.user, "display_name") else "unknown" ) # Create notes client with authenticated HTTP client http_client = await _get_authenticated_client_for_userinfo(request) notes_client = NotesClient(http_client, username) # Fetch full note content note = await notes_client.get_note(int(doc_id)) full_content = f"{note['title']}\n\n{note['content']}" # Validate offsets if start < 0 or end > len(full_content) or start >= end: return JSONResponse( { "success": False, "error": f"Invalid offsets: start={start}, end={end}, content_length={len(full_content)}", }, status_code=400, ) # Extract chunk chunk_text = full_content[start:end] # Extract context before and after before_start = max(0, start - context_chars) before_context = full_content[before_start:start] after_end = min(len(full_content), end + context_chars) after_context = full_content[end:after_end] # Determine if there's more content has_more_before = before_start > 0 has_more_after = after_end < len(full_content) logger.info( f"Fetched chunk context for {doc_type}_{doc_id}: " f"chunk_len={len(chunk_text)}, before_len={len(before_context)}, " f"after_len={len(after_context)}" ) return JSONResponse( { "success": True, "chunk_text": chunk_text, "before_context": before_context, "after_context": after_context, "has_more_before": has_more_before, "has_more_after": has_more_after, } ) except ValueError as e: logger.error(f"Invalid parameter format: {e}") return JSONResponse( {"success": False, "error": f"Invalid parameter format: {e}"}, status_code=400, ) except Exception as e: logger.error(f"Chunk context error: {e}", exc_info=True) return JSONResponse( {"success": False, "error": str(e)}, status_code=500, )