feat: Implement unified search algorithm module

Creates shared search module with four algorithms implementing ADR-012:
- Semantic search (vector similarity via Qdrant)
- Keyword search (token-based matching from ADR-001)
- Fuzzy search (character overlap matching)
- Hybrid search (RRF fusion from ADR-003)

Architecture:
- Base SearchAlgorithm interface for consistent API
- SearchResult dataclass for unified result format
- All algorithms async and independently testable
- Proper logging and error handling throughout

Semantic Search (search/semantic.py):
- Extracted from server/semantic.py
- Vector similarity using Qdrant query_points
- Dual-phase authorization (vector filter + API verification)
- Deduplication of document chunks
- Configurable score threshold (default: 0.7)

Keyword Search (search/keyword.py):
- Implements ADR-001 token-based matching
- Title matches weighted 3x higher than content
- Case-insensitive token matching
- Relevance scoring with normalization
- Excerpt extraction with context

Fuzzy Search (search/fuzzy.py):
- Simple character overlap calculation
- Configurable threshold (default: 70%)
- Typo-tolerant matching
- Fast and dependency-free

Hybrid Search (search/hybrid.py):
- Reciprocal Rank Fusion (RRF) from ADR-003
- Parallel execution of sub-algorithms
- Configurable weights per algorithm
- RRF constant k=60 (standard value)
- Weight validation (must sum ≤1.0)

All algorithms:
- Share NextcloudClient for document access
- Support user_id filtering (multi-tenant)
- Support doc_type filtering (currently notes only)
- Return consistent SearchResult objects
- Properly formatted with ruff and type-checked

Next steps: Update MCP tool to use these algorithms

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-11-15 00:10:19 +01:00
parent 56bd85c0f7
commit 11e620f2d1
6 changed files with 981 additions and 0 deletions
+26
View File
@@ -0,0 +1,26 @@
"""Search algorithms module for unified multi-algorithm search.
This module provides a unified interface for different search algorithms:
- Semantic search (vector similarity)
- Keyword search (token-based matching)
- Fuzzy search (character overlap)
- Hybrid search (RRF fusion of multiple algorithms)
All algorithms share the same interface and can be used interchangeably by both
MCP tools and the visualization pane.
"""
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
from nextcloud_mcp_server.search.fuzzy import FuzzySearchAlgorithm
from nextcloud_mcp_server.search.hybrid import HybridSearchAlgorithm
from nextcloud_mcp_server.search.keyword import KeywordSearchAlgorithm
from nextcloud_mcp_server.search.semantic import SemanticSearchAlgorithm
__all__ = [
"SearchAlgorithm",
"SearchResult",
"SemanticSearchAlgorithm",
"KeywordSearchAlgorithm",
"FuzzySearchAlgorithm",
"HybridSearchAlgorithm",
]
+87
View File
@@ -0,0 +1,87 @@
"""Base interfaces and data structures for search algorithms."""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
@dataclass
class SearchResult:
"""A single search result with metadata and score.
Attributes:
id: Document ID
doc_type: Document type (note, file, calendar, contact, etc.)
title: Document title
excerpt: Content excerpt showing match context
score: Relevance score (0.0-1.0, higher is better)
metadata: Additional algorithm-specific metadata
"""
id: int
doc_type: str
title: str
excerpt: str
score: float
metadata: dict[str, Any] | None = None
def __post_init__(self):
"""Validate score is in valid range."""
if not 0.0 <= self.score <= 1.0:
raise ValueError(f"Score must be between 0.0 and 1.0, got {self.score}")
class SearchAlgorithm(ABC):
"""Abstract base class for search algorithms.
All search algorithms must implement the search() method with consistent
interface, allowing them to be used interchangeably.
"""
@abstractmethod
async def search(
self,
query: str,
user_id: str,
limit: int = 10,
doc_type: str | None = None,
**kwargs: Any,
) -> list[SearchResult]:
"""Execute search with the given parameters.
Args:
query: Search query string
user_id: User ID for multi-tenant filtering
limit: Maximum number of results to return
doc_type: Optional document type filter (note, file, calendar, etc.)
**kwargs: Algorithm-specific parameters
Returns:
List of SearchResult objects ranked by relevance
Raises:
McpError: If search fails or configuration is invalid
"""
pass
@property
@abstractmethod
def name(self) -> str:
"""Return algorithm name for identification."""
pass
@property
def supports_scoring(self) -> bool:
"""Whether this algorithm provides meaningful relevance scores.
Default: True. Override if algorithm doesn't support scoring.
"""
return True
@property
def requires_vector_db(self) -> bool:
"""Whether this algorithm requires vector database.
Default: False. Override for semantic search.
"""
return False
+174
View File
@@ -0,0 +1,174 @@
"""Fuzzy search algorithm using character overlap matching."""
import logging
from typing import Any
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
logger = logging.getLogger(__name__)
class FuzzySearchAlgorithm(SearchAlgorithm):
"""Fuzzy search using simple character-based similarity.
Implements character overlap matching with configurable threshold:
- Compares character sets between query and text
- Requires configurable % character overlap to match (default: 70%)
- Tolerant to typos and minor variations
"""
def __init__(self, threshold: float = 0.7):
"""Initialize fuzzy search algorithm.
Args:
threshold: Minimum character overlap ratio (0-1, default: 0.7)
"""
if not 0.0 <= threshold <= 1.0:
raise ValueError(f"Threshold must be between 0.0 and 1.0, got {threshold}")
self.threshold = threshold
@property
def name(self) -> str:
return "fuzzy"
async def search(
self,
query: str,
user_id: str,
limit: int = 10,
doc_type: str | None = None,
nextcloud_client: NextcloudClient | None = None,
**kwargs: Any,
) -> list[SearchResult]:
"""Execute fuzzy search using character overlap.
Args:
query: Search query
user_id: User ID for filtering
limit: Maximum results to return
doc_type: Optional document type filter (currently only "note" supported)
nextcloud_client: NextcloudClient for fetching documents
**kwargs: Additional parameters (threshold override)
Returns:
List of SearchResult objects ranked by character overlap score
Raises:
ValueError: If nextcloud_client not provided
"""
if not nextcloud_client:
raise ValueError("FuzzySearch requires nextcloud_client parameter")
threshold = kwargs.get("threshold", self.threshold)
logger.info(
f"Fuzzy search: query='{query}', user={user_id}, "
f"limit={limit}, threshold={threshold}, doc_type={doc_type}"
)
# Currently only supports notes
if doc_type and doc_type != "note":
logger.warning(f"Fuzzy search not yet implemented for doc_type={doc_type}")
return []
# Fetch all notes for the user
notes = await nextcloud_client.notes.get_notes()
logger.debug(f"Fetched {len(notes)} notes for fuzzy search")
# Score and filter notes
scored_notes = []
query_lower = query.lower()
for note in notes:
title = note.get("title", "")
content = note.get("content", "")
# Check title match
title_score = self._calculate_char_overlap(query_lower, title.lower())
# Check content match
content_score = self._calculate_char_overlap(query_lower, content.lower())
# Use best score
best_score = max(title_score, content_score)
if best_score >= threshold:
# Extract excerpt based on which matched better
if title_score >= content_score:
excerpt = f"Title match: {title}"
else:
excerpt = self._extract_excerpt(content, max_length=200)
scored_notes.append(
SearchResult(
id=note["id"],
doc_type="note",
title=title or "Untitled",
excerpt=excerpt,
score=best_score,
metadata={
"category": note.get("category", ""),
"modified": note.get("modified"),
"match_location": "title"
if title_score >= content_score
else "content",
},
)
)
# Sort by score (descending) and limit
scored_notes.sort(key=lambda x: x.score, reverse=True)
results = scored_notes[:limit]
logger.info(f"Fuzzy search returned {len(results)} matching notes")
if results:
result_details = [
f"note_{r.id} (score={r.score:.3f}, title='{r.title}')"
for r in results[:5]
]
logger.debug(f"Top fuzzy results: {', '.join(result_details)}")
return results
def _calculate_char_overlap(self, query: str, text: str) -> float:
"""Calculate character overlap ratio between query and text.
Args:
query: Query string (normalized)
text: Text to compare (normalized)
Returns:
Overlap ratio (0.0-1.0)
"""
if not query or not text:
return 0.0
# Convert to character sets
query_chars = set(query)
text_chars = set(text)
# Calculate overlap
overlap = query_chars & text_chars
overlap_ratio = len(overlap) / len(query_chars)
return overlap_ratio
def _extract_excerpt(self, content: str, max_length: int = 200) -> str:
"""Extract excerpt from content.
Args:
content: Full document content
max_length: Maximum excerpt length
Returns:
Excerpt string
"""
if not content:
return ""
excerpt = content[:max_length].strip()
if len(content) > max_length:
excerpt += "..."
return excerpt
+240
View File
@@ -0,0 +1,240 @@
"""Hybrid search algorithm using Reciprocal Rank Fusion (RRF)."""
import asyncio
import logging
from collections import defaultdict
from typing import Any
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
from nextcloud_mcp_server.search.fuzzy import FuzzySearchAlgorithm
from nextcloud_mcp_server.search.keyword import KeywordSearchAlgorithm
from nextcloud_mcp_server.search.semantic import SemanticSearchAlgorithm
logger = logging.getLogger(__name__)
class HybridSearchAlgorithm(SearchAlgorithm):
"""Hybrid search combining multiple algorithms using Reciprocal Rank Fusion.
Implements RRF from ADR-003 to combine results from:
- Semantic search (vector similarity)
- Keyword search (token matching)
- Fuzzy search (character overlap)
RRF formula: score = weight / (k + rank)
where k=60 (standard value) and rank is 1-indexed position.
"""
DEFAULT_RRF_K = 60 # Standard RRF constant
def __init__(
self,
semantic_weight: float = 0.5,
keyword_weight: float = 0.3,
fuzzy_weight: float = 0.2,
rrf_k: int = DEFAULT_RRF_K,
):
"""Initialize hybrid search with algorithm weights.
Args:
semantic_weight: Weight for semantic results (default: 0.5)
keyword_weight: Weight for keyword results (default: 0.3)
fuzzy_weight: Weight for fuzzy results (default: 0.2)
rrf_k: RRF constant for rank decay (default: 60)
Raises:
ValueError: If weights are invalid
"""
# Validate weights
if semantic_weight < 0 or keyword_weight < 0 or fuzzy_weight < 0:
raise ValueError("Weights must be non-negative")
total_weight = semantic_weight + keyword_weight + fuzzy_weight
if total_weight > 1.0:
raise ValueError(f"Weights sum to {total_weight:.2f}, must be ≤1.0")
if total_weight == 0.0:
raise ValueError("At least one weight must be > 0")
self.semantic_weight = semantic_weight
self.keyword_weight = keyword_weight
self.fuzzy_weight = fuzzy_weight
self.rrf_k = rrf_k
# Initialize sub-algorithms
self.semantic = SemanticSearchAlgorithm()
self.keyword = KeywordSearchAlgorithm()
self.fuzzy = FuzzySearchAlgorithm()
@property
def name(self) -> str:
return "hybrid"
@property
def requires_vector_db(self) -> bool:
# Requires vector DB if semantic search has non-zero weight
return self.semantic_weight > 0
async def search(
self,
query: str,
user_id: str,
limit: int = 10,
doc_type: str | None = None,
nextcloud_client: NextcloudClient | None = None,
**kwargs: Any,
) -> list[SearchResult]:
"""Execute hybrid search using RRF to combine algorithms.
Args:
query: Search query
user_id: User ID for filtering
limit: Maximum results to return
doc_type: Optional document type filter
nextcloud_client: NextcloudClient for document access
**kwargs: Additional parameters passed to sub-algorithms
Returns:
List of SearchResult objects ranked by RRF combined score
Raises:
ValueError: If nextcloud_client not provided (needed for keyword/fuzzy)
"""
logger.info(
f"Hybrid search: query='{query}', user={user_id}, limit={limit}, "
f"weights=(semantic={self.semantic_weight}, keyword={self.keyword_weight}, "
f"fuzzy={self.fuzzy_weight})"
)
# Run algorithms in parallel
tasks = []
algo_names = []
if self.semantic_weight > 0:
tasks.append(
self.semantic.search(
query, user_id, limit * 2, doc_type, nextcloud_client, **kwargs
)
)
algo_names.append("semantic")
if self.keyword_weight > 0:
if not nextcloud_client:
raise ValueError("Hybrid search with keyword requires nextcloud_client")
tasks.append(
self.keyword.search(
query, user_id, limit * 2, doc_type, nextcloud_client, **kwargs
)
)
algo_names.append("keyword")
if self.fuzzy_weight > 0:
if not nextcloud_client:
raise ValueError("Hybrid search with fuzzy requires nextcloud_client")
tasks.append(
self.fuzzy.search(
query, user_id, limit * 2, doc_type, nextcloud_client, **kwargs
)
)
algo_names.append("fuzzy")
# Execute searches in parallel
results_list = await asyncio.gather(*tasks)
# Build results dict
algo_results = {}
for algo_name, results in zip(algo_names, results_list):
algo_results[algo_name] = results
logger.debug(f"{algo_name} returned {len(results)} results")
# Combine using RRF
combined_results = self._reciprocal_rank_fusion(
algo_results,
{
"semantic": self.semantic_weight,
"keyword": self.keyword_weight,
"fuzzy": self.fuzzy_weight,
},
limit,
)
logger.info(f"Hybrid search returned {len(combined_results)} combined results")
if combined_results:
result_details = [
f"{r.doc_type}_{r.id} (score={r.score:.3f}, title='{r.title}')"
for r in combined_results[:5]
]
logger.debug(f"Top hybrid results: {', '.join(result_details)}")
return combined_results
def _reciprocal_rank_fusion(
self,
algo_results: dict[str, list[SearchResult]],
weights: dict[str, float],
limit: int,
) -> list[SearchResult]:
"""Combine multiple ranked result lists using RRF.
Args:
algo_results: Dict of algorithm_name -> ranked results
weights: Dict of algorithm_name -> weight (0-1)
limit: Maximum results to return
Returns:
Combined and re-ranked results
"""
# Track RRF scores per document
rrf_scores: dict[tuple[int, str], float] = defaultdict(float)
# Track best result object for each document
best_results: dict[tuple[int, str], SearchResult] = {}
for algo_name, results in algo_results.items():
weight = weights.get(algo_name, 0.0)
if weight == 0:
continue
for rank, result in enumerate(results, start=1):
doc_key = (result.id, result.doc_type)
# RRF formula: weight / (k + rank)
rrf_score = weight / (self.rrf_k + rank)
rrf_scores[doc_key] += rrf_score
# Track best result object (prefer higher original scores)
if doc_key not in best_results:
best_results[doc_key] = result
elif result.score > best_results[doc_key].score:
best_results[doc_key] = result
# Sort by combined RRF score
sorted_docs = sorted(
rrf_scores.items(),
key=lambda x: x[1],
reverse=True,
)[:limit]
# Build final results with RRF scores
final_results = []
for doc_key, rrf_score in sorted_docs:
result = best_results[doc_key]
# Create new result with RRF score
# Keep original metadata but add RRF details
metadata = result.metadata or {}
metadata["rrf_score"] = rrf_score
metadata["original_score"] = result.score
final_results.append(
SearchResult(
id=result.id,
doc_type=result.doc_type,
title=result.title,
excerpt=result.excerpt,
score=rrf_score, # Use RRF score as the primary score
metadata=metadata,
)
)
return final_results
+225
View File
@@ -0,0 +1,225 @@
"""Keyword search algorithm using token-based matching (ADR-001)."""
import logging
from typing import Any
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
logger = logging.getLogger(__name__)
class KeywordSearchAlgorithm(SearchAlgorithm):
"""Keyword search using token-based matching with weighted scoring.
Implements token-based search from ADR-001:
- Title matches weighted 3x higher than content matches
- Case-insensitive token matching
- Relevance scoring based on match frequency and location
"""
# Weighting constants from ADR-001
TITLE_WEIGHT = 3.0
CONTENT_WEIGHT = 1.0
@property
def name(self) -> str:
return "keyword"
async def search(
self,
query: str,
user_id: str,
limit: int = 10,
doc_type: str | None = None,
nextcloud_client: NextcloudClient | None = None,
**kwargs: Any,
) -> list[SearchResult]:
"""Execute keyword search using token matching.
Args:
query: Search query to tokenize and match
user_id: User ID for filtering
limit: Maximum results to return
doc_type: Optional document type filter (currently only "note" supported)
nextcloud_client: NextcloudClient for fetching documents
**kwargs: Additional parameters (unused)
Returns:
List of SearchResult objects ranked by keyword match score
Raises:
ValueError: If nextcloud_client not provided
"""
if not nextcloud_client:
raise ValueError("KeywordSearch requires nextcloud_client parameter")
logger.info(
f"Keyword search: query='{query}', user={user_id}, "
f"limit={limit}, doc_type={doc_type}"
)
# Tokenize query
query_tokens = self._process_query(query)
logger.debug(f"Query tokens: {query_tokens}")
# Currently only supports notes
# TODO: Extend to other document types (files, calendar, etc.)
if doc_type and doc_type != "note":
logger.warning(
f"Keyword search not yet implemented for doc_type={doc_type}"
)
return []
# Fetch all notes for the user
notes = await nextcloud_client.notes.get_notes()
logger.debug(f"Fetched {len(notes)} notes for keyword search")
# Score and filter notes
scored_notes = []
for note in notes:
score = self._calculate_score(
query_tokens,
note.get("title", ""),
note.get("content", ""),
)
if score > 0: # Only include matches
# Extract excerpt with context
excerpt = self._extract_excerpt(
note.get("content", ""),
query_tokens,
max_length=200,
)
scored_notes.append(
SearchResult(
id=note["id"],
doc_type="note",
title=note.get("title", "Untitled"),
excerpt=excerpt,
score=score,
metadata={
"category": note.get("category", ""),
"modified": note.get("modified"),
},
)
)
# Sort by score (descending) and limit
scored_notes.sort(key=lambda x: x.score, reverse=True)
results = scored_notes[:limit]
logger.info(f"Keyword search returned {len(results)} matching notes")
if results:
result_details = [
f"note_{r.id} (score={r.score:.3f}, title='{r.title}')"
for r in results[:5]
]
logger.debug(f"Top keyword results: {', '.join(result_details)}")
return results
def _process_query(self, query: str) -> list[str]:
"""Tokenize and normalize query.
Args:
query: Raw query string
Returns:
List of normalized tokens
"""
# Convert to lowercase and split into tokens
tokens = query.lower().split()
# Filter out very short tokens (optional)
tokens = [token for token in tokens if len(token) > 1]
return tokens
def _calculate_score(
self, query_tokens: list[str], title: str, content: str
) -> float:
"""Calculate relevance score based on token matches.
Args:
query_tokens: List of query tokens
title: Document title
content: Document content
Returns:
Relevance score (0.0-1.0)
"""
if not query_tokens:
return 0.0
# Process title and content
title_tokens = title.lower().split()
content_tokens = content.lower().split()
score = 0.0
# Count matches in title
title_matches = sum(1 for qt in query_tokens if qt in title_tokens)
if query_tokens: # Avoid division by zero
title_match_ratio = title_matches / len(query_tokens)
score += self.TITLE_WEIGHT * title_match_ratio
# Count matches in content
content_matches = sum(1 for qt in query_tokens if qt in content_tokens)
if query_tokens:
content_match_ratio = content_matches / len(query_tokens)
score += self.CONTENT_WEIGHT * content_match_ratio
# Normalize score to 0-1 range
# Max score would be TITLE_WEIGHT + CONTENT_WEIGHT if all tokens match everywhere
max_score = self.TITLE_WEIGHT + self.CONTENT_WEIGHT
normalized_score = min(score / max_score, 1.0)
return normalized_score
def _extract_excerpt(
self, content: str, query_tokens: list[str], max_length: int = 200
) -> str:
"""Extract excerpt showing match context.
Args:
content: Full document content
query_tokens: Query tokens to find
max_length: Maximum excerpt length in characters
Returns:
Excerpt string with context around matches
"""
if not content:
return ""
content_lower = content.lower()
# Find first occurrence of any query token
first_match_pos = -1
for token in query_tokens:
pos = content_lower.find(token)
if pos != -1:
if first_match_pos == -1 or pos < first_match_pos:
first_match_pos = pos
if first_match_pos == -1:
# No matches found, return beginning
return content[:max_length].strip() + (
"..." if len(content) > max_length else ""
)
# Extract context around match
start = max(0, first_match_pos - max_length // 2)
end = min(len(content), first_match_pos + max_length // 2)
excerpt = content[start:end].strip()
# Add ellipsis if truncated
if start > 0:
excerpt = "..." + excerpt
if end < len(content):
excerpt = excerpt + "..."
return excerpt
+229
View File
@@ -0,0 +1,229 @@
"""Semantic search algorithm using vector similarity (Qdrant)."""
import logging
from typing import Any
from httpx import HTTPStatusError
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.embedding import get_embedding_service
from nextcloud_mcp_server.observability.metrics import record_qdrant_operation
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
logger = logging.getLogger(__name__)
class SemanticSearchAlgorithm(SearchAlgorithm):
"""Semantic search using vector similarity in Qdrant.
Searches documents by meaning rather than exact keywords using
768-dimensional embeddings and cosine distance.
"""
def __init__(self, score_threshold: float = 0.7):
"""Initialize semantic search algorithm.
Args:
score_threshold: Minimum similarity score (0-1, default: 0.7)
"""
self.score_threshold = score_threshold
@property
def name(self) -> str:
return "semantic"
@property
def requires_vector_db(self) -> bool:
return True
async def search(
self,
query: str,
user_id: str,
limit: int = 10,
doc_type: str | None = None,
nextcloud_client: NextcloudClient | None = None,
**kwargs: Any,
) -> list[SearchResult]:
"""Execute semantic search using vector similarity.
Args:
query: Natural language search query
user_id: User ID for filtering
limit: Maximum results to return
doc_type: Optional document type filter (currently only "note" supported)
nextcloud_client: NextcloudClient for access verification
**kwargs: Additional parameters (score_threshold override)
Returns:
List of SearchResult objects ranked by similarity score
Raises:
McpError: If vector sync is not enabled or search fails
"""
settings = get_settings()
score_threshold = kwargs.get("score_threshold", self.score_threshold)
logger.info(
f"Semantic search: query='{query}', user={user_id}, "
f"limit={limit}, score_threshold={score_threshold}, doc_type={doc_type}"
)
# Generate embedding for query
embedding_service = get_embedding_service()
query_embedding = await embedding_service.embed(query)
logger.debug(
f"Generated embedding for query (dimension={len(query_embedding)})"
)
# Build Qdrant filter
filter_conditions = [
FieldCondition(
key="user_id",
match=MatchValue(value=user_id),
)
]
# Add doc_type filter if specified
if doc_type:
filter_conditions.append(
FieldCondition(
key="doc_type",
match=MatchValue(value=doc_type),
)
)
# Search Qdrant
qdrant_client = await get_qdrant_client()
try:
search_response = await qdrant_client.query_points(
collection_name=settings.get_collection_name(),
query=query_embedding,
query_filter=Filter(must=filter_conditions),
limit=limit * 2, # Get extra for deduplication
score_threshold=score_threshold,
with_payload=True,
with_vectors=False, # Don't return vectors to save bandwidth
)
record_qdrant_operation("search", "success")
except Exception:
record_qdrant_operation("search", "error")
raise
logger.info(
f"Qdrant returned {len(search_response.points)} results "
f"(before deduplication and access verification)"
)
if search_response.points:
# Log top 3 scores to help with threshold tuning
top_scores = [p.score for p in search_response.points[:3]]
logger.debug(f"Top 3 similarity scores: {top_scores}")
# Deduplicate by document ID (multiple chunks per document)
results = await self._deduplicate_and_verify(
search_response.points, limit, nextcloud_client
)
logger.info(
f"Returning {len(results)} results after deduplication and access verification"
)
if results:
result_details = [
f"{r.doc_type}_{r.id} (score={r.score:.3f}, title='{r.title}')"
for r in results[:5] # Show top 5
]
logger.debug(f"Top results: {', '.join(result_details)}")
return results
async def _deduplicate_and_verify(
self,
points: list[Any],
limit: int,
nextcloud_client: NextcloudClient | None,
) -> list[SearchResult]:
"""Deduplicate results by doc_id and verify access.
Args:
points: Qdrant search results
limit: Maximum results to return
nextcloud_client: NextcloudClient for access verification (optional)
Returns:
List of SearchResult objects
"""
seen_doc_ids = set()
results = []
for result in points:
doc_id = int(result.payload["doc_id"])
doc_type = result.payload.get("doc_type", "note")
# Skip if we've already seen this document
if doc_id in seen_doc_ids:
continue
seen_doc_ids.add(doc_id)
# Verify access via Nextcloud API if client provided
# Currently only supports notes
if nextcloud_client and doc_type == "note":
try:
note = await nextcloud_client.notes.get_note(doc_id)
results.append(
SearchResult(
id=doc_id,
doc_type="note",
title=result.payload["title"],
excerpt=result.payload["excerpt"],
score=result.score,
metadata={
"category": note.get("category", ""),
"chunk_index": result.payload["chunk_index"],
"total_chunks": result.payload["total_chunks"],
},
)
)
if len(results) >= limit:
break
except HTTPStatusError as e:
if e.response.status_code in (403, 404):
# User lost access or document deleted
logger.debug(
f"Skipping note {doc_id}: {e.response.status_code}"
)
continue
else:
# Log other errors but continue processing
logger.warning(
f"Error verifying access to note {doc_id}: "
f"{e.response.status_code}"
)
continue
else:
# No access verification, return result directly
results.append(
SearchResult(
id=doc_id,
doc_type=doc_type,
title=result.payload["title"],
excerpt=result.payload["excerpt"],
score=result.score,
metadata={
"chunk_index": result.payload.get("chunk_index"),
"total_chunks": result.payload.get("total_chunks"),
},
)
)
if len(results) >= limit:
break
return results