From 11e620f2d17bb4366802e0c991f45ce33dca03e7 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sat, 15 Nov 2025 00:10:19 +0100 Subject: [PATCH] feat: Implement unified search algorithm module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Creates shared search module with four algorithms implementing ADR-012: - Semantic search (vector similarity via Qdrant) - Keyword search (token-based matching from ADR-001) - Fuzzy search (character overlap matching) - Hybrid search (RRF fusion from ADR-003) Architecture: - Base SearchAlgorithm interface for consistent API - SearchResult dataclass for unified result format - All algorithms async and independently testable - Proper logging and error handling throughout Semantic Search (search/semantic.py): - Extracted from server/semantic.py - Vector similarity using Qdrant query_points - Dual-phase authorization (vector filter + API verification) - Deduplication of document chunks - Configurable score threshold (default: 0.7) Keyword Search (search/keyword.py): - Implements ADR-001 token-based matching - Title matches weighted 3x higher than content - Case-insensitive token matching - Relevance scoring with normalization - Excerpt extraction with context Fuzzy Search (search/fuzzy.py): - Simple character overlap calculation - Configurable threshold (default: 70%) - Typo-tolerant matching - Fast and dependency-free Hybrid Search (search/hybrid.py): - Reciprocal Rank Fusion (RRF) from ADR-003 - Parallel execution of sub-algorithms - Configurable weights per algorithm - RRF constant k=60 (standard value) - Weight validation (must sum ≤1.0) All algorithms: - Share NextcloudClient for document access - Support user_id filtering (multi-tenant) - Support doc_type filtering (currently notes only) - Return consistent SearchResult objects - Properly formatted with ruff and type-checked Next steps: Update MCP tool to use these algorithms 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- nextcloud_mcp_server/search/__init__.py | 26 +++ nextcloud_mcp_server/search/algorithms.py | 87 ++++++++ nextcloud_mcp_server/search/fuzzy.py | 174 ++++++++++++++++ nextcloud_mcp_server/search/hybrid.py | 240 ++++++++++++++++++++++ nextcloud_mcp_server/search/keyword.py | 225 ++++++++++++++++++++ nextcloud_mcp_server/search/semantic.py | 229 +++++++++++++++++++++ 6 files changed, 981 insertions(+) create mode 100644 nextcloud_mcp_server/search/__init__.py create mode 100644 nextcloud_mcp_server/search/algorithms.py create mode 100644 nextcloud_mcp_server/search/fuzzy.py create mode 100644 nextcloud_mcp_server/search/hybrid.py create mode 100644 nextcloud_mcp_server/search/keyword.py create mode 100644 nextcloud_mcp_server/search/semantic.py diff --git a/nextcloud_mcp_server/search/__init__.py b/nextcloud_mcp_server/search/__init__.py new file mode 100644 index 0000000..1da5a84 --- /dev/null +++ b/nextcloud_mcp_server/search/__init__.py @@ -0,0 +1,26 @@ +"""Search algorithms module for unified multi-algorithm search. + +This module provides a unified interface for different search algorithms: +- Semantic search (vector similarity) +- Keyword search (token-based matching) +- Fuzzy search (character overlap) +- Hybrid search (RRF fusion of multiple algorithms) + +All algorithms share the same interface and can be used interchangeably by both +MCP tools and the visualization pane. +""" + +from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult +from nextcloud_mcp_server.search.fuzzy import FuzzySearchAlgorithm +from nextcloud_mcp_server.search.hybrid import HybridSearchAlgorithm +from nextcloud_mcp_server.search.keyword import KeywordSearchAlgorithm +from nextcloud_mcp_server.search.semantic import SemanticSearchAlgorithm + +__all__ = [ + "SearchAlgorithm", + "SearchResult", + "SemanticSearchAlgorithm", + "KeywordSearchAlgorithm", + "FuzzySearchAlgorithm", + "HybridSearchAlgorithm", +] diff --git a/nextcloud_mcp_server/search/algorithms.py b/nextcloud_mcp_server/search/algorithms.py new file mode 100644 index 0000000..560e113 --- /dev/null +++ b/nextcloud_mcp_server/search/algorithms.py @@ -0,0 +1,87 @@ +"""Base interfaces and data structures for search algorithms.""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any + + +@dataclass +class SearchResult: + """A single search result with metadata and score. + + Attributes: + id: Document ID + doc_type: Document type (note, file, calendar, contact, etc.) + title: Document title + excerpt: Content excerpt showing match context + score: Relevance score (0.0-1.0, higher is better) + metadata: Additional algorithm-specific metadata + """ + + id: int + doc_type: str + title: str + excerpt: str + score: float + metadata: dict[str, Any] | None = None + + def __post_init__(self): + """Validate score is in valid range.""" + if not 0.0 <= self.score <= 1.0: + raise ValueError(f"Score must be between 0.0 and 1.0, got {self.score}") + + +class SearchAlgorithm(ABC): + """Abstract base class for search algorithms. + + All search algorithms must implement the search() method with consistent + interface, allowing them to be used interchangeably. + """ + + @abstractmethod + async def search( + self, + query: str, + user_id: str, + limit: int = 10, + doc_type: str | None = None, + **kwargs: Any, + ) -> list[SearchResult]: + """Execute search with the given parameters. + + Args: + query: Search query string + user_id: User ID for multi-tenant filtering + limit: Maximum number of results to return + doc_type: Optional document type filter (note, file, calendar, etc.) + **kwargs: Algorithm-specific parameters + + Returns: + List of SearchResult objects ranked by relevance + + Raises: + McpError: If search fails or configuration is invalid + """ + pass + + @property + @abstractmethod + def name(self) -> str: + """Return algorithm name for identification.""" + pass + + @property + def supports_scoring(self) -> bool: + """Whether this algorithm provides meaningful relevance scores. + + Default: True. Override if algorithm doesn't support scoring. + """ + return True + + @property + def requires_vector_db(self) -> bool: + """Whether this algorithm requires vector database. + + Default: False. Override for semantic search. + """ + return False diff --git a/nextcloud_mcp_server/search/fuzzy.py b/nextcloud_mcp_server/search/fuzzy.py new file mode 100644 index 0000000..479459f --- /dev/null +++ b/nextcloud_mcp_server/search/fuzzy.py @@ -0,0 +1,174 @@ +"""Fuzzy search algorithm using character overlap matching.""" + +import logging +from typing import Any + +from nextcloud_mcp_server.client import NextcloudClient +from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult + +logger = logging.getLogger(__name__) + + +class FuzzySearchAlgorithm(SearchAlgorithm): + """Fuzzy search using simple character-based similarity. + + Implements character overlap matching with configurable threshold: + - Compares character sets between query and text + - Requires configurable % character overlap to match (default: 70%) + - Tolerant to typos and minor variations + """ + + def __init__(self, threshold: float = 0.7): + """Initialize fuzzy search algorithm. + + Args: + threshold: Minimum character overlap ratio (0-1, default: 0.7) + """ + if not 0.0 <= threshold <= 1.0: + raise ValueError(f"Threshold must be between 0.0 and 1.0, got {threshold}") + self.threshold = threshold + + @property + def name(self) -> str: + return "fuzzy" + + async def search( + self, + query: str, + user_id: str, + limit: int = 10, + doc_type: str | None = None, + nextcloud_client: NextcloudClient | None = None, + **kwargs: Any, + ) -> list[SearchResult]: + """Execute fuzzy search using character overlap. + + Args: + query: Search query + user_id: User ID for filtering + limit: Maximum results to return + doc_type: Optional document type filter (currently only "note" supported) + nextcloud_client: NextcloudClient for fetching documents + **kwargs: Additional parameters (threshold override) + + Returns: + List of SearchResult objects ranked by character overlap score + + Raises: + ValueError: If nextcloud_client not provided + """ + if not nextcloud_client: + raise ValueError("FuzzySearch requires nextcloud_client parameter") + + threshold = kwargs.get("threshold", self.threshold) + + logger.info( + f"Fuzzy search: query='{query}', user={user_id}, " + f"limit={limit}, threshold={threshold}, doc_type={doc_type}" + ) + + # Currently only supports notes + if doc_type and doc_type != "note": + logger.warning(f"Fuzzy search not yet implemented for doc_type={doc_type}") + return [] + + # Fetch all notes for the user + notes = await nextcloud_client.notes.get_notes() + logger.debug(f"Fetched {len(notes)} notes for fuzzy search") + + # Score and filter notes + scored_notes = [] + query_lower = query.lower() + + for note in notes: + title = note.get("title", "") + content = note.get("content", "") + + # Check title match + title_score = self._calculate_char_overlap(query_lower, title.lower()) + + # Check content match + content_score = self._calculate_char_overlap(query_lower, content.lower()) + + # Use best score + best_score = max(title_score, content_score) + + if best_score >= threshold: + # Extract excerpt based on which matched better + if title_score >= content_score: + excerpt = f"Title match: {title}" + else: + excerpt = self._extract_excerpt(content, max_length=200) + + scored_notes.append( + SearchResult( + id=note["id"], + doc_type="note", + title=title or "Untitled", + excerpt=excerpt, + score=best_score, + metadata={ + "category": note.get("category", ""), + "modified": note.get("modified"), + "match_location": "title" + if title_score >= content_score + else "content", + }, + ) + ) + + # Sort by score (descending) and limit + scored_notes.sort(key=lambda x: x.score, reverse=True) + results = scored_notes[:limit] + + logger.info(f"Fuzzy search returned {len(results)} matching notes") + if results: + result_details = [ + f"note_{r.id} (score={r.score:.3f}, title='{r.title}')" + for r in results[:5] + ] + logger.debug(f"Top fuzzy results: {', '.join(result_details)}") + + return results + + def _calculate_char_overlap(self, query: str, text: str) -> float: + """Calculate character overlap ratio between query and text. + + Args: + query: Query string (normalized) + text: Text to compare (normalized) + + Returns: + Overlap ratio (0.0-1.0) + """ + if not query or not text: + return 0.0 + + # Convert to character sets + query_chars = set(query) + text_chars = set(text) + + # Calculate overlap + overlap = query_chars & text_chars + overlap_ratio = len(overlap) / len(query_chars) + + return overlap_ratio + + def _extract_excerpt(self, content: str, max_length: int = 200) -> str: + """Extract excerpt from content. + + Args: + content: Full document content + max_length: Maximum excerpt length + + Returns: + Excerpt string + """ + if not content: + return "" + + excerpt = content[:max_length].strip() + if len(content) > max_length: + excerpt += "..." + + return excerpt diff --git a/nextcloud_mcp_server/search/hybrid.py b/nextcloud_mcp_server/search/hybrid.py new file mode 100644 index 0000000..a8778c8 --- /dev/null +++ b/nextcloud_mcp_server/search/hybrid.py @@ -0,0 +1,240 @@ +"""Hybrid search algorithm using Reciprocal Rank Fusion (RRF).""" + +import asyncio +import logging +from collections import defaultdict +from typing import Any + +from nextcloud_mcp_server.client import NextcloudClient +from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult +from nextcloud_mcp_server.search.fuzzy import FuzzySearchAlgorithm +from nextcloud_mcp_server.search.keyword import KeywordSearchAlgorithm +from nextcloud_mcp_server.search.semantic import SemanticSearchAlgorithm + +logger = logging.getLogger(__name__) + + +class HybridSearchAlgorithm(SearchAlgorithm): + """Hybrid search combining multiple algorithms using Reciprocal Rank Fusion. + + Implements RRF from ADR-003 to combine results from: + - Semantic search (vector similarity) + - Keyword search (token matching) + - Fuzzy search (character overlap) + + RRF formula: score = weight / (k + rank) + where k=60 (standard value) and rank is 1-indexed position. + """ + + DEFAULT_RRF_K = 60 # Standard RRF constant + + def __init__( + self, + semantic_weight: float = 0.5, + keyword_weight: float = 0.3, + fuzzy_weight: float = 0.2, + rrf_k: int = DEFAULT_RRF_K, + ): + """Initialize hybrid search with algorithm weights. + + Args: + semantic_weight: Weight for semantic results (default: 0.5) + keyword_weight: Weight for keyword results (default: 0.3) + fuzzy_weight: Weight for fuzzy results (default: 0.2) + rrf_k: RRF constant for rank decay (default: 60) + + Raises: + ValueError: If weights are invalid + """ + # Validate weights + if semantic_weight < 0 or keyword_weight < 0 or fuzzy_weight < 0: + raise ValueError("Weights must be non-negative") + + total_weight = semantic_weight + keyword_weight + fuzzy_weight + if total_weight > 1.0: + raise ValueError(f"Weights sum to {total_weight:.2f}, must be ≤1.0") + + if total_weight == 0.0: + raise ValueError("At least one weight must be > 0") + + self.semantic_weight = semantic_weight + self.keyword_weight = keyword_weight + self.fuzzy_weight = fuzzy_weight + self.rrf_k = rrf_k + + # Initialize sub-algorithms + self.semantic = SemanticSearchAlgorithm() + self.keyword = KeywordSearchAlgorithm() + self.fuzzy = FuzzySearchAlgorithm() + + @property + def name(self) -> str: + return "hybrid" + + @property + def requires_vector_db(self) -> bool: + # Requires vector DB if semantic search has non-zero weight + return self.semantic_weight > 0 + + async def search( + self, + query: str, + user_id: str, + limit: int = 10, + doc_type: str | None = None, + nextcloud_client: NextcloudClient | None = None, + **kwargs: Any, + ) -> list[SearchResult]: + """Execute hybrid search using RRF to combine algorithms. + + Args: + query: Search query + user_id: User ID for filtering + limit: Maximum results to return + doc_type: Optional document type filter + nextcloud_client: NextcloudClient for document access + **kwargs: Additional parameters passed to sub-algorithms + + Returns: + List of SearchResult objects ranked by RRF combined score + + Raises: + ValueError: If nextcloud_client not provided (needed for keyword/fuzzy) + """ + logger.info( + f"Hybrid search: query='{query}', user={user_id}, limit={limit}, " + f"weights=(semantic={self.semantic_weight}, keyword={self.keyword_weight}, " + f"fuzzy={self.fuzzy_weight})" + ) + + # Run algorithms in parallel + tasks = [] + algo_names = [] + + if self.semantic_weight > 0: + tasks.append( + self.semantic.search( + query, user_id, limit * 2, doc_type, nextcloud_client, **kwargs + ) + ) + algo_names.append("semantic") + + if self.keyword_weight > 0: + if not nextcloud_client: + raise ValueError("Hybrid search with keyword requires nextcloud_client") + tasks.append( + self.keyword.search( + query, user_id, limit * 2, doc_type, nextcloud_client, **kwargs + ) + ) + algo_names.append("keyword") + + if self.fuzzy_weight > 0: + if not nextcloud_client: + raise ValueError("Hybrid search with fuzzy requires nextcloud_client") + tasks.append( + self.fuzzy.search( + query, user_id, limit * 2, doc_type, nextcloud_client, **kwargs + ) + ) + algo_names.append("fuzzy") + + # Execute searches in parallel + results_list = await asyncio.gather(*tasks) + + # Build results dict + algo_results = {} + for algo_name, results in zip(algo_names, results_list): + algo_results[algo_name] = results + logger.debug(f"{algo_name} returned {len(results)} results") + + # Combine using RRF + combined_results = self._reciprocal_rank_fusion( + algo_results, + { + "semantic": self.semantic_weight, + "keyword": self.keyword_weight, + "fuzzy": self.fuzzy_weight, + }, + limit, + ) + + logger.info(f"Hybrid search returned {len(combined_results)} combined results") + if combined_results: + result_details = [ + f"{r.doc_type}_{r.id} (score={r.score:.3f}, title='{r.title}')" + for r in combined_results[:5] + ] + logger.debug(f"Top hybrid results: {', '.join(result_details)}") + + return combined_results + + def _reciprocal_rank_fusion( + self, + algo_results: dict[str, list[SearchResult]], + weights: dict[str, float], + limit: int, + ) -> list[SearchResult]: + """Combine multiple ranked result lists using RRF. + + Args: + algo_results: Dict of algorithm_name -> ranked results + weights: Dict of algorithm_name -> weight (0-1) + limit: Maximum results to return + + Returns: + Combined and re-ranked results + """ + # Track RRF scores per document + rrf_scores: dict[tuple[int, str], float] = defaultdict(float) + # Track best result object for each document + best_results: dict[tuple[int, str], SearchResult] = {} + + for algo_name, results in algo_results.items(): + weight = weights.get(algo_name, 0.0) + if weight == 0: + continue + + for rank, result in enumerate(results, start=1): + doc_key = (result.id, result.doc_type) + + # RRF formula: weight / (k + rank) + rrf_score = weight / (self.rrf_k + rank) + rrf_scores[doc_key] += rrf_score + + # Track best result object (prefer higher original scores) + if doc_key not in best_results: + best_results[doc_key] = result + elif result.score > best_results[doc_key].score: + best_results[doc_key] = result + + # Sort by combined RRF score + sorted_docs = sorted( + rrf_scores.items(), + key=lambda x: x[1], + reverse=True, + )[:limit] + + # Build final results with RRF scores + final_results = [] + for doc_key, rrf_score in sorted_docs: + result = best_results[doc_key] + + # Create new result with RRF score + # Keep original metadata but add RRF details + metadata = result.metadata or {} + metadata["rrf_score"] = rrf_score + metadata["original_score"] = result.score + + final_results.append( + SearchResult( + id=result.id, + doc_type=result.doc_type, + title=result.title, + excerpt=result.excerpt, + score=rrf_score, # Use RRF score as the primary score + metadata=metadata, + ) + ) + + return final_results diff --git a/nextcloud_mcp_server/search/keyword.py b/nextcloud_mcp_server/search/keyword.py new file mode 100644 index 0000000..410a7a7 --- /dev/null +++ b/nextcloud_mcp_server/search/keyword.py @@ -0,0 +1,225 @@ +"""Keyword search algorithm using token-based matching (ADR-001).""" + +import logging +from typing import Any + +from nextcloud_mcp_server.client import NextcloudClient +from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult + +logger = logging.getLogger(__name__) + + +class KeywordSearchAlgorithm(SearchAlgorithm): + """Keyword search using token-based matching with weighted scoring. + + Implements token-based search from ADR-001: + - Title matches weighted 3x higher than content matches + - Case-insensitive token matching + - Relevance scoring based on match frequency and location + """ + + # Weighting constants from ADR-001 + TITLE_WEIGHT = 3.0 + CONTENT_WEIGHT = 1.0 + + @property + def name(self) -> str: + return "keyword" + + async def search( + self, + query: str, + user_id: str, + limit: int = 10, + doc_type: str | None = None, + nextcloud_client: NextcloudClient | None = None, + **kwargs: Any, + ) -> list[SearchResult]: + """Execute keyword search using token matching. + + Args: + query: Search query to tokenize and match + user_id: User ID for filtering + limit: Maximum results to return + doc_type: Optional document type filter (currently only "note" supported) + nextcloud_client: NextcloudClient for fetching documents + **kwargs: Additional parameters (unused) + + Returns: + List of SearchResult objects ranked by keyword match score + + Raises: + ValueError: If nextcloud_client not provided + """ + if not nextcloud_client: + raise ValueError("KeywordSearch requires nextcloud_client parameter") + + logger.info( + f"Keyword search: query='{query}', user={user_id}, " + f"limit={limit}, doc_type={doc_type}" + ) + + # Tokenize query + query_tokens = self._process_query(query) + logger.debug(f"Query tokens: {query_tokens}") + + # Currently only supports notes + # TODO: Extend to other document types (files, calendar, etc.) + if doc_type and doc_type != "note": + logger.warning( + f"Keyword search not yet implemented for doc_type={doc_type}" + ) + return [] + + # Fetch all notes for the user + notes = await nextcloud_client.notes.get_notes() + logger.debug(f"Fetched {len(notes)} notes for keyword search") + + # Score and filter notes + scored_notes = [] + for note in notes: + score = self._calculate_score( + query_tokens, + note.get("title", ""), + note.get("content", ""), + ) + + if score > 0: # Only include matches + # Extract excerpt with context + excerpt = self._extract_excerpt( + note.get("content", ""), + query_tokens, + max_length=200, + ) + + scored_notes.append( + SearchResult( + id=note["id"], + doc_type="note", + title=note.get("title", "Untitled"), + excerpt=excerpt, + score=score, + metadata={ + "category": note.get("category", ""), + "modified": note.get("modified"), + }, + ) + ) + + # Sort by score (descending) and limit + scored_notes.sort(key=lambda x: x.score, reverse=True) + results = scored_notes[:limit] + + logger.info(f"Keyword search returned {len(results)} matching notes") + if results: + result_details = [ + f"note_{r.id} (score={r.score:.3f}, title='{r.title}')" + for r in results[:5] + ] + logger.debug(f"Top keyword results: {', '.join(result_details)}") + + return results + + def _process_query(self, query: str) -> list[str]: + """Tokenize and normalize query. + + Args: + query: Raw query string + + Returns: + List of normalized tokens + """ + # Convert to lowercase and split into tokens + tokens = query.lower().split() + + # Filter out very short tokens (optional) + tokens = [token for token in tokens if len(token) > 1] + + return tokens + + def _calculate_score( + self, query_tokens: list[str], title: str, content: str + ) -> float: + """Calculate relevance score based on token matches. + + Args: + query_tokens: List of query tokens + title: Document title + content: Document content + + Returns: + Relevance score (0.0-1.0) + """ + if not query_tokens: + return 0.0 + + # Process title and content + title_tokens = title.lower().split() + content_tokens = content.lower().split() + + score = 0.0 + + # Count matches in title + title_matches = sum(1 for qt in query_tokens if qt in title_tokens) + if query_tokens: # Avoid division by zero + title_match_ratio = title_matches / len(query_tokens) + score += self.TITLE_WEIGHT * title_match_ratio + + # Count matches in content + content_matches = sum(1 for qt in query_tokens if qt in content_tokens) + if query_tokens: + content_match_ratio = content_matches / len(query_tokens) + score += self.CONTENT_WEIGHT * content_match_ratio + + # Normalize score to 0-1 range + # Max score would be TITLE_WEIGHT + CONTENT_WEIGHT if all tokens match everywhere + max_score = self.TITLE_WEIGHT + self.CONTENT_WEIGHT + normalized_score = min(score / max_score, 1.0) + + return normalized_score + + def _extract_excerpt( + self, content: str, query_tokens: list[str], max_length: int = 200 + ) -> str: + """Extract excerpt showing match context. + + Args: + content: Full document content + query_tokens: Query tokens to find + max_length: Maximum excerpt length in characters + + Returns: + Excerpt string with context around matches + """ + if not content: + return "" + + content_lower = content.lower() + + # Find first occurrence of any query token + first_match_pos = -1 + for token in query_tokens: + pos = content_lower.find(token) + if pos != -1: + if first_match_pos == -1 or pos < first_match_pos: + first_match_pos = pos + + if first_match_pos == -1: + # No matches found, return beginning + return content[:max_length].strip() + ( + "..." if len(content) > max_length else "" + ) + + # Extract context around match + start = max(0, first_match_pos - max_length // 2) + end = min(len(content), first_match_pos + max_length // 2) + + excerpt = content[start:end].strip() + + # Add ellipsis if truncated + if start > 0: + excerpt = "..." + excerpt + if end < len(content): + excerpt = excerpt + "..." + + return excerpt diff --git a/nextcloud_mcp_server/search/semantic.py b/nextcloud_mcp_server/search/semantic.py new file mode 100644 index 0000000..c6e632d --- /dev/null +++ b/nextcloud_mcp_server/search/semantic.py @@ -0,0 +1,229 @@ +"""Semantic search algorithm using vector similarity (Qdrant).""" + +import logging +from typing import Any + +from httpx import HTTPStatusError +from qdrant_client.models import FieldCondition, Filter, MatchValue + +from nextcloud_mcp_server.client import NextcloudClient +from nextcloud_mcp_server.config import get_settings +from nextcloud_mcp_server.embedding import get_embedding_service +from nextcloud_mcp_server.observability.metrics import record_qdrant_operation +from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult +from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client + +logger = logging.getLogger(__name__) + + +class SemanticSearchAlgorithm(SearchAlgorithm): + """Semantic search using vector similarity in Qdrant. + + Searches documents by meaning rather than exact keywords using + 768-dimensional embeddings and cosine distance. + """ + + def __init__(self, score_threshold: float = 0.7): + """Initialize semantic search algorithm. + + Args: + score_threshold: Minimum similarity score (0-1, default: 0.7) + """ + self.score_threshold = score_threshold + + @property + def name(self) -> str: + return "semantic" + + @property + def requires_vector_db(self) -> bool: + return True + + async def search( + self, + query: str, + user_id: str, + limit: int = 10, + doc_type: str | None = None, + nextcloud_client: NextcloudClient | None = None, + **kwargs: Any, + ) -> list[SearchResult]: + """Execute semantic search using vector similarity. + + Args: + query: Natural language search query + user_id: User ID for filtering + limit: Maximum results to return + doc_type: Optional document type filter (currently only "note" supported) + nextcloud_client: NextcloudClient for access verification + **kwargs: Additional parameters (score_threshold override) + + Returns: + List of SearchResult objects ranked by similarity score + + Raises: + McpError: If vector sync is not enabled or search fails + """ + settings = get_settings() + score_threshold = kwargs.get("score_threshold", self.score_threshold) + + logger.info( + f"Semantic search: query='{query}', user={user_id}, " + f"limit={limit}, score_threshold={score_threshold}, doc_type={doc_type}" + ) + + # Generate embedding for query + embedding_service = get_embedding_service() + query_embedding = await embedding_service.embed(query) + logger.debug( + f"Generated embedding for query (dimension={len(query_embedding)})" + ) + + # Build Qdrant filter + filter_conditions = [ + FieldCondition( + key="user_id", + match=MatchValue(value=user_id), + ) + ] + + # Add doc_type filter if specified + if doc_type: + filter_conditions.append( + FieldCondition( + key="doc_type", + match=MatchValue(value=doc_type), + ) + ) + + # Search Qdrant + qdrant_client = await get_qdrant_client() + try: + search_response = await qdrant_client.query_points( + collection_name=settings.get_collection_name(), + query=query_embedding, + query_filter=Filter(must=filter_conditions), + limit=limit * 2, # Get extra for deduplication + score_threshold=score_threshold, + with_payload=True, + with_vectors=False, # Don't return vectors to save bandwidth + ) + record_qdrant_operation("search", "success") + except Exception: + record_qdrant_operation("search", "error") + raise + + logger.info( + f"Qdrant returned {len(search_response.points)} results " + f"(before deduplication and access verification)" + ) + + if search_response.points: + # Log top 3 scores to help with threshold tuning + top_scores = [p.score for p in search_response.points[:3]] + logger.debug(f"Top 3 similarity scores: {top_scores}") + + # Deduplicate by document ID (multiple chunks per document) + results = await self._deduplicate_and_verify( + search_response.points, limit, nextcloud_client + ) + + logger.info( + f"Returning {len(results)} results after deduplication and access verification" + ) + if results: + result_details = [ + f"{r.doc_type}_{r.id} (score={r.score:.3f}, title='{r.title}')" + for r in results[:5] # Show top 5 + ] + logger.debug(f"Top results: {', '.join(result_details)}") + + return results + + async def _deduplicate_and_verify( + self, + points: list[Any], + limit: int, + nextcloud_client: NextcloudClient | None, + ) -> list[SearchResult]: + """Deduplicate results by doc_id and verify access. + + Args: + points: Qdrant search results + limit: Maximum results to return + nextcloud_client: NextcloudClient for access verification (optional) + + Returns: + List of SearchResult objects + """ + seen_doc_ids = set() + results = [] + + for result in points: + doc_id = int(result.payload["doc_id"]) + doc_type = result.payload.get("doc_type", "note") + + # Skip if we've already seen this document + if doc_id in seen_doc_ids: + continue + + seen_doc_ids.add(doc_id) + + # Verify access via Nextcloud API if client provided + # Currently only supports notes + if nextcloud_client and doc_type == "note": + try: + note = await nextcloud_client.notes.get_note(doc_id) + + results.append( + SearchResult( + id=doc_id, + doc_type="note", + title=result.payload["title"], + excerpt=result.payload["excerpt"], + score=result.score, + metadata={ + "category": note.get("category", ""), + "chunk_index": result.payload["chunk_index"], + "total_chunks": result.payload["total_chunks"], + }, + ) + ) + + if len(results) >= limit: + break + + except HTTPStatusError as e: + if e.response.status_code in (403, 404): + # User lost access or document deleted + logger.debug( + f"Skipping note {doc_id}: {e.response.status_code}" + ) + continue + else: + # Log other errors but continue processing + logger.warning( + f"Error verifying access to note {doc_id}: " + f"{e.response.status_code}" + ) + continue + else: + # No access verification, return result directly + results.append( + SearchResult( + id=doc_id, + doc_type=doc_type, + title=result.payload["title"], + excerpt=result.payload["excerpt"], + score=result.score, + metadata={ + "chunk_index": result.payload.get("chunk_index"), + "total_chunks": result.payload.get("total_chunks"), + }, + ) + ) + + if len(results) >= limit: + break + + return results