diff --git a/nextcloud_mcp_server/auth/viz_routes.py b/nextcloud_mcp_server/auth/viz_routes.py new file mode 100644 index 0000000..da2f0ab --- /dev/null +++ b/nextcloud_mcp_server/auth/viz_routes.py @@ -0,0 +1,581 @@ +"""Vector visualization routes for testing search algorithms. + +Provides a web UI for users to test different search algorithms on their own +indexed documents and visualize results in 2D space using PCA. + +All processing happens server-side following ADR-012: +- Search execution via shared search/algorithms.py +- PCA dimensionality reduction (768-dim → 2D) +- Only 2D coordinates + metadata sent to client +- Bandwidth-efficient (2 floats per doc vs 768) +""" + +import logging + +import numpy as np +from starlette.authentication import requires +from starlette.requests import Request +from starlette.responses import HTMLResponse, JSONResponse + +from nextcloud_mcp_server.config import get_settings +from nextcloud_mcp_server.search import ( + FuzzySearchAlgorithm, + HybridSearchAlgorithm, + KeywordSearchAlgorithm, + SemanticSearchAlgorithm, +) +from nextcloud_mcp_server.vector.pca import PCA +from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client + +logger = logging.getLogger(__name__) + + +@requires("authenticated", redirect="oauth_login") +async def vector_visualization_html(request: Request) -> HTMLResponse: + """Vector visualization page with search controls and interactive plot. + + Provides UI for testing search algorithms with real-time visualization. + Requires vector sync to be enabled. + + Args: + request: Starlette request object + + Returns: + HTML page with search interface + """ + settings = get_settings() + + if not settings.vector_sync_enabled: + return HTMLResponse( + """ +
+

Vector Visualization

+
+ Vector sync is not enabled. Set VECTOR_SYNC_ENABLED=true to use this feature. +
+
+ """ + ) + + # Get user info from session + user_info = request.session.get("user_info", {}) + username = user_info.get("preferred_username", "unknown") + + html_content = f""" + + + + + + Vector Visualization - Nextcloud MCP + + + + + + +
+
+

Vector Visualization

+
+ Testing search algorithms on your indexed documents. User: {username} +
+ +
+
+
+
+ + +
+ +
+ + +
+ +
+ +
+ + + +
+
+ + + +
+
+ + + +
+
+
+ +
+
+ + +
+ +
+ + +
+ +
+ +
+
+
+
+
+ +
+
+ Executing search and computing PCA projection... +
+
+
+ +
+

Search Results ()

+ +
+
+ + + + + """ + + return HTMLResponse(content=html_content) + + +@requires("authenticated", redirect="oauth_login") +async def vector_visualization_search(request: Request) -> JSONResponse: + """Execute server-side search and return 2D coordinates + results. + + All processing happens server-side: + 1. Execute search via shared algorithm module + 2. Fetch matching vectors from Qdrant + 3. Apply PCA reduction (768-dim → 2D) + 4. Return coordinates + metadata only + + Args: + request: Starlette request with query parameters + + Returns: + JSON response with coordinates_2d and results + """ + settings = get_settings() + + if not settings.vector_sync_enabled: + return JSONResponse( + {"success": False, "error": "Vector sync not enabled"}, + status_code=400, + ) + + # Get user info + user_info = request.session.get("user_info", {}) + username = user_info.get("preferred_username") + + if not username: + return JSONResponse( + {"success": False, "error": "User not authenticated"}, + status_code=401, + ) + + # Parse query parameters + query = request.query_params.get("query", "") + algorithm = request.query_params.get("algorithm", "hybrid") + limit = int(request.query_params.get("limit", "50")) + score_threshold = float(request.query_params.get("score_threshold", "0.7")) + semantic_weight = float(request.query_params.get("semantic_weight", "0.5")) + keyword_weight = float(request.query_params.get("keyword_weight", "0.3")) + fuzzy_weight = float(request.query_params.get("fuzzy_weight", "0.2")) + + logger.info( + f"Viz search: user={username}, query='{query}', " + f"algorithm={algorithm}, limit={limit}" + ) + + try: + # Get authenticated HTTP client from session + # In BasicAuth mode: uses username/password from session + # In OAuth mode: uses access token from session + from nextcloud_mcp_server.auth.userinfo_routes import ( + _get_authenticated_client_for_userinfo, + ) + from nextcloud_mcp_server.client.notes import NotesClient + + async with await _get_authenticated_client_for_userinfo(request) as http_client: + # Create NotesClient directly with authenticated HTTP client + notes_client = NotesClient(http_client, username) + + # Wrap in a minimal client object for search algorithms + # This conforms to NextcloudClientProtocol but only implements notes + class MinimalNextcloudClient: + def __init__(self, notes_client, username): + self._notes = notes_client + self.username = username + + @property + def notes(self): + return self._notes + + @property + def webdav(self): + return None + + @property + def calendar(self): + return None + + @property + def contacts(self): + return None + + @property + def deck(self): + return None + + @property + def cookbook(self): + return None + + @property + def tables(self): + return None + + nextcloud_client = MinimalNextcloudClient(notes_client, username) + + # Create search algorithm + if algorithm == "semantic": + search_algo = SemanticSearchAlgorithm(score_threshold=score_threshold) + elif algorithm == "keyword": + search_algo = KeywordSearchAlgorithm() + elif algorithm == "fuzzy": + search_algo = FuzzySearchAlgorithm() + elif algorithm == "hybrid": + search_algo = HybridSearchAlgorithm( + semantic_weight=semantic_weight, + keyword_weight=keyword_weight, + fuzzy_weight=fuzzy_weight, + ) + else: + return JSONResponse( + {"success": False, "error": f"Unknown algorithm: {algorithm}"}, + status_code=400, + ) + + # Execute search + search_results = await search_algo.search( + query=query, + user_id=username, + limit=limit, + doc_type="note", + nextcloud_client=nextcloud_client, + score_threshold=score_threshold, + ) + + if not search_results: + return JSONResponse( + { + "success": True, + "results": [], + "coordinates_2d": [], + "message": "No results found", + } + ) + + # Fetch vectors for matching results from Qdrant + qdrant_client = await get_qdrant_client() + doc_ids = [r.id for r in search_results] + + # Retrieve vectors for the matching documents + from qdrant_client.models import FieldCondition, Filter, MatchAny + + points_response = await qdrant_client.scroll( + collection_name=settings.get_collection_name(), + scroll_filter=Filter( + must=[ + FieldCondition( + key="doc_id", + match=MatchAny(any=[str(doc_id) for doc_id in doc_ids]), + ), + FieldCondition( + key="user_id", + match={"value": username}, + ), + ] + ), + limit=len(doc_ids) * 2, # Account for multiple chunks per doc + with_vectors=True, + with_payload=False, + ) + + points = points_response[0] + + if not points: + return JSONResponse( + { + "success": True, + "results": [], + "coordinates_2d": [], + "message": "No vectors found for results", + } + ) + + # Extract vectors + vectors = np.array([p.vector for p in points if p.vector is not None]) + + if len(vectors) < 2: + # Not enough points for PCA + return JSONResponse( + { + "success": True, + "results": [ + { + "id": r.id, + "doc_type": r.doc_type, + "title": r.title, + "excerpt": r.excerpt, + "score": r.score, + } + for r in search_results + ], + "coordinates_2d": [[0, 0]] * len(search_results), + "message": "Not enough vectors for PCA", + } + ) + + # Apply PCA dimensionality reduction (768-dim → 2D) + pca = PCA(n_components=2) + coords_2d = pca.fit_transform(vectors) + + # After fit, these attributes are guaranteed to be set + assert pca.explained_variance_ratio_ is not None + + logger.info( + f"PCA explained variance: PC1={pca.explained_variance_ratio_[0]:.3f}, " + f"PC2={pca.explained_variance_ratio_[1]:.3f}" + ) + + # Map results to coordinates (use first chunk per document) + result_coords = [] + seen_doc_ids = set() + + for point, coord in zip(points, coords_2d): + if point.payload: + doc_id = int(point.payload.get("doc_id", 0)) + if doc_id not in seen_doc_ids and doc_id in doc_ids: + seen_doc_ids.add(doc_id) + result_coords.append(coord.tolist()) + + # Build response + response_results = [ + { + "id": r.id, + "doc_type": r.doc_type, + "title": r.title, + "excerpt": r.excerpt, + "score": r.score, + } + for r in search_results + ] + + return JSONResponse( + { + "success": True, + "results": response_results, + "coordinates_2d": result_coords[: len(search_results)], + "pca_variance": { + "pc1": float(pca.explained_variance_ratio_[0]), + "pc2": float(pca.explained_variance_ratio_[1]), + }, + } + ) + + except Exception as e: + logger.error(f"Viz search error: {e}", exc_info=True) + return JSONResponse( + {"success": False, "error": str(e)}, + status_code=500, + ) diff --git a/nextcloud_mcp_server/vector/pca.py b/nextcloud_mcp_server/vector/pca.py new file mode 100644 index 0000000..7f6b402 --- /dev/null +++ b/nextcloud_mcp_server/vector/pca.py @@ -0,0 +1,140 @@ +"""Custom PCA implementation for dimensionality reduction. + +Implements Principal Component Analysis without scikit-learn dependency. +Used for reducing high-dimensional embeddings (768-dim) to 2D for visualization. +""" + +import logging + +import numpy as np + +logger = logging.getLogger(__name__) + + +class PCA: + """Principal Component Analysis for dimensionality reduction. + + Simple implementation that finds principal components via eigendecomposition + of the covariance matrix. Suitable for small-to-medium datasets. + + Attributes: + n_components: Number of principal components to keep + mean_: Mean of training data (set during fit) + components_: Principal components (eigenvectors) + explained_variance_: Variance explained by each component + explained_variance_ratio_: Fraction of total variance explained + """ + + def __init__(self, n_components: int = 2): + """Initialize PCA. + + Args: + n_components: Number of components to keep (default: 2) + """ + if n_components < 1: + raise ValueError(f"n_components must be >= 1, got {n_components}") + + self.n_components = n_components + self.mean_: np.ndarray | None = None + self.components_: np.ndarray | None = None + self.explained_variance_: np.ndarray | None = None + self.explained_variance_ratio_: np.ndarray | None = None + + def fit(self, X: np.ndarray) -> "PCA": + """Fit PCA model to data. + + Args: + X: Training data of shape (n_samples, n_features) + + Returns: + self (for method chaining) + + Raises: + ValueError: If X has fewer features than n_components + """ + X = np.asarray(X) + + if X.ndim != 2: + raise ValueError(f"X must be 2D array, got shape {X.shape}") + + n_samples, n_features = X.shape + + if n_features < self.n_components: + raise ValueError( + f"n_components={self.n_components} > n_features={n_features}" + ) + + # Center data + self.mean_ = np.mean(X, axis=0) + X_centered = X - self.mean_ + + # Compute covariance matrix + # Use (X^T X) / (n-1) for numerical stability with high-dim data + cov = np.cov(X_centered.T) + + # Eigendecomposition + eigenvalues, eigenvectors = np.linalg.eigh(cov) + + # Sort by eigenvalue (descending) + idx = np.argsort(eigenvalues)[::-1] + eigenvalues = eigenvalues[idx] + eigenvectors = eigenvectors[:, idx] + + # Keep top n_components + self.components_ = eigenvectors[:, : self.n_components].T + self.explained_variance_ = eigenvalues[: self.n_components] + + # Calculate explained variance ratio + total_variance = np.sum(eigenvalues) + if total_variance > 0: + self.explained_variance_ratio_ = self.explained_variance_ / total_variance + else: + self.explained_variance_ratio_ = np.zeros(self.n_components) + + logger.debug( + f"PCA fit: {n_samples} samples, {n_features} features → " + f"{self.n_components} components, " + f"explained variance: {self.explained_variance_ratio_}" + ) + + return self + + def transform(self, X: np.ndarray) -> np.ndarray: + """Transform data to principal component space. + + Args: + X: Data to transform of shape (n_samples, n_features) + + Returns: + Transformed data of shape (n_samples, n_components) + + Raises: + ValueError: If PCA not fitted yet + """ + if self.mean_ is None or self.components_ is None: + raise ValueError("PCA not fitted yet. Call fit() first.") + + X = np.asarray(X) + + if X.ndim != 2: + raise ValueError(f"X must be 2D array, got shape {X.shape}") + + # Center using training mean + X_centered = X - self.mean_ + + # Project onto principal components + X_transformed = np.dot(X_centered, self.components_.T) + + return X_transformed + + def fit_transform(self, X: np.ndarray) -> np.ndarray: + """Fit PCA model and transform data in one step. + + Args: + X: Training data of shape (n_samples, n_features) + + Returns: + Transformed data of shape (n_samples, n_components) + """ + self.fit(X) + return self.transform(X)