refactor!: Make all search algorithms query Qdrant payload, not Nextcloud

BREAKING CHANGE: Search algorithms now require Qdrant to be populated.
Vector sync must be enabled and documents indexed for search to work.

- Keyword and fuzzy search now query Qdrant scroll API for title/excerpt
- Remove inefficient Nextcloud API fetching pattern
- Add optional Nextcloud verification for security
- Deduplicate by (doc_id, doc_type) tuple, keeping chunk_index=0
- Align with document processor pattern that already stores text in Qdrant
This commit is contained in:
Chris Coutinho
2025-11-15 01:56:41 +01:00
parent b5b03bfd78
commit 2a078093ed
3 changed files with 356 additions and 182 deletions
+153 -86
View File
@@ -1,14 +1,18 @@
"""Fuzzy search algorithm using character overlap matching."""
"""Fuzzy search algorithm using character overlap matching on Qdrant payload."""
import logging
from typing import Any
from httpx import HTTPStatusError
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.search.algorithms import (
NextcloudClientProtocol,
SearchAlgorithm,
SearchResult,
get_indexed_doc_types,
)
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
logger = logging.getLogger(__name__)
@@ -45,25 +49,24 @@ class FuzzySearchAlgorithm(SearchAlgorithm):
nextcloud_client: NextcloudClientProtocol | None = None,
**kwargs: Any,
) -> list[SearchResult]:
"""Execute fuzzy search using character overlap.
"""Execute fuzzy search using character overlap on Qdrant payload.
Queries Qdrant for all indexed documents, then scores based on character
overlap in title and excerpt fields. Only verifies access with Nextcloud
at the end for security.
Args:
query: Search query
user_id: User ID for filtering
limit: Maximum results to return
doc_type: Optional document type filter (currently only "note" supported)
nextcloud_client: NextcloudClient for fetching documents
doc_type: Optional document type filter (None = all types)
nextcloud_client: NextcloudClient for access verification (optional)
**kwargs: Additional parameters (threshold override)
Returns:
List of SearchResult objects ranked by character overlap score
Raises:
ValueError: If nextcloud_client not provided
"""
if not nextcloud_client:
raise ValueError("FuzzySearch requires nextcloud_client parameter")
settings = get_settings()
threshold = kwargs.get("threshold", self.threshold)
logger.info(
@@ -71,112 +74,176 @@ class FuzzySearchAlgorithm(SearchAlgorithm):
f"limit={limit}, threshold={threshold}, doc_type={doc_type}"
)
# Get available document types from Qdrant
indexed_types = await get_indexed_doc_types(user_id)
logger.debug(f"Indexed document types for user: {indexed_types}")
# Determine which types to search
# Build Qdrant filter
filter_conditions = [
FieldCondition(key="user_id", match=MatchValue(value=user_id))
]
if doc_type:
# Search specific type if requested
search_types = [doc_type] if doc_type in indexed_types else []
if not search_types:
logger.info(f"Doc type '{doc_type}' not indexed for user {user_id}")
return []
else:
# Search all indexed types
search_types = list(indexed_types)
filter_conditions.append(
FieldCondition(key="doc_type", match=MatchValue(value=doc_type))
)
# Fetch documents for each type and score them
all_documents = []
for dtype in search_types:
documents = await self._fetch_documents(nextcloud_client, dtype)
for doc in documents:
doc["_doc_type"] = dtype # Tag with type
all_documents.extend(documents)
# Scroll through Qdrant to get all matching documents
qdrant_client = await get_qdrant_client()
collection = settings.qdrant_collection
logger.debug(f"Fetched {len(all_documents)} total documents for fuzzy search")
all_points = []
offset = None
# Score and filter documents
# Scroll through all points matching filter
while True:
scroll_result, next_offset = await qdrant_client.scroll(
collection_name=collection,
scroll_filter=Filter(must=filter_conditions),
limit=100, # Batch size
offset=offset,
with_payload=["doc_id", "doc_type", "title", "excerpt", "chunk_index"],
with_vectors=False, # Don't need vectors
)
all_points.extend(scroll_result)
if next_offset is None:
break
offset = next_offset
logger.debug(f"Retrieved {len(all_points)} points from Qdrant for fuzzy search")
# Deduplicate by (doc_id, doc_type) - keep first chunk
seen_docs = {}
for point in all_points:
doc_id = int(point.payload["doc_id"])
dtype = point.payload.get("doc_type", "note")
doc_key = (doc_id, dtype)
chunk_idx = point.payload.get("chunk_index", 0)
if doc_key not in seen_docs or chunk_idx == 0:
seen_docs[doc_key] = point
logger.debug(f"Deduplicated to {len(seen_docs)} unique documents")
# Score each document based on fuzzy matches
scored_results = []
query_lower = query.lower()
for doc in all_documents:
dtype = doc.get("_doc_type", "note")
title = doc.get("title", "")
content = doc.get("content", "")
for doc_key, point in seen_docs.items():
doc_id, dtype = doc_key
title = point.payload.get("title", "")
excerpt = point.payload.get("excerpt", "")
# Check title match
title_score = self._calculate_char_overlap(query_lower, title.lower())
# Check content match
content_score = self._calculate_char_overlap(query_lower, content.lower())
# Check excerpt match
excerpt_score = self._calculate_char_overlap(query_lower, excerpt.lower())
# Use best score
best_score = max(title_score, content_score)
best_score = max(title_score, excerpt_score)
if best_score >= threshold:
# Extract excerpt based on which matched better
if title_score >= content_score:
excerpt = f"Title match: {title}"
else:
excerpt = self._extract_excerpt(content, max_length=200)
match_location = "title" if title_score >= excerpt_score else "excerpt"
scored_results.append(
SearchResult(
id=doc["id"],
doc_type=dtype,
title=title or "Untitled",
excerpt=excerpt,
score=best_score,
metadata={
"category": doc.get("category", ""),
"modified": doc.get("modified"),
"match_location": "title"
if title_score >= content_score
else "content",
},
)
{
"doc_id": doc_id,
"doc_type": dtype,
"title": title,
"excerpt": excerpt
if excerpt_score >= title_score
else f"Title match: {title}",
"score": best_score,
"match_location": match_location,
}
)
# Sort by score (descending) and limit
scored_results.sort(key=lambda x: x.score, reverse=True)
results = scored_results[:limit]
scored_results.sort(key=lambda x: x["score"], reverse=True)
top_results = scored_results[: limit * 2] # Get extra for access verification
logger.info(f"Fuzzy search returned {len(results)} matching notes")
if results:
# Verify access with Nextcloud (optional, for security)
final_results = []
if nextcloud_client:
for result in top_results:
verified = await self._verify_access(
nextcloud_client, result["doc_id"], result["doc_type"]
)
if verified:
final_results.append(
SearchResult(
id=result["doc_id"],
doc_type=result["doc_type"],
title=result["title"],
excerpt=result["excerpt"],
score=result["score"],
metadata={
**verified.get("metadata", {}),
"match_location": result["match_location"],
},
)
)
if len(final_results) >= limit:
break
else:
# No verification, return results directly
for result in top_results[:limit]:
final_results.append(
SearchResult(
id=result["doc_id"],
doc_type=result["doc_type"],
title=result["title"],
excerpt=result["excerpt"],
score=result["score"],
metadata={"match_location": result["match_location"]},
)
)
logger.info(f"Fuzzy search returned {len(final_results)} matching documents")
if final_results:
result_details = [
f"note_{r.id} (score={r.score:.3f}, title='{r.title}')"
for r in results[:5]
f"{r.doc_type}_{r.id} (score={r.score:.3f}, title='{r.title}')"
for r in final_results[:5]
]
logger.debug(f"Top fuzzy results: {', '.join(result_details)}")
return results
return final_results
async def _fetch_documents(
self, nextcloud_client: NextcloudClientProtocol, doc_type: str
) -> list[dict[str, Any]]:
"""Fetch documents of a specific type from Nextcloud.
async def _verify_access(
self, nextcloud_client: NextcloudClientProtocol, doc_id: int, doc_type: str
) -> dict[str, Any] | None:
"""Verify user has access to a document via Nextcloud API.
Args:
nextcloud_client: Client for API access
doc_type: Document type to fetch ("note", "file", "calendar", etc.)
doc_id: Document ID
doc_type: Document type
Returns:
List of document dictionaries with at minimum: id, title, content
Dict with metadata if access verified, None otherwise
"""
if doc_type == "note":
return await nextcloud_client.notes.get_notes()
elif doc_type == "file":
# Future: fetch files when indexed
logger.info("File documents not yet supported for fuzzy search")
return []
elif doc_type == "calendar":
# Future: fetch calendar events when indexed
logger.info("Calendar documents not yet supported for fuzzy search")
return []
else:
logger.warning(f"Unknown document type '{doc_type}' for fuzzy search")
return []
try:
if doc_type == "note":
note = await nextcloud_client.notes.get_note(doc_id)
return {
"metadata": {
"category": note.get("category", ""),
"modified": note.get("modified"),
}
}
else:
logger.debug(
f"Skipping verification for {doc_type} {doc_id} (not implemented)"
)
return {"metadata": {}}
except HTTPStatusError as e:
if e.response.status_code in (403, 404):
logger.debug(
f"Access denied for {doc_type} {doc_id}: {e.response.status_code}"
)
return None
else:
logger.warning(
f"Error verifying {doc_type} {doc_id}: {e.response.status_code}"
)
return None
def _calculate_char_overlap(self, query: str, text: str) -> float:
"""Calculate character overlap ratio between query and text.
+161 -86
View File
@@ -1,14 +1,18 @@
"""Keyword search algorithm using token-based matching (ADR-001)."""
"""Keyword search algorithm using token-based matching on Qdrant payload (ADR-001)."""
import logging
from typing import Any
from httpx import HTTPStatusError
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.search.algorithms import (
NextcloudClientProtocol,
SearchAlgorithm,
SearchResult,
get_indexed_doc_types,
)
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
logger = logging.getLogger(__name__)
@@ -39,24 +43,24 @@ class KeywordSearchAlgorithm(SearchAlgorithm):
nextcloud_client: NextcloudClientProtocol | None = None,
**kwargs: Any,
) -> list[SearchResult]:
"""Execute keyword search using token matching.
"""Execute keyword search using token matching on Qdrant payload.
Queries Qdrant for all indexed documents, then scores based on token
matches in title and excerpt fields. Only verifies access with Nextcloud
at the end for security.
Args:
query: Search query to tokenize and match
user_id: User ID for filtering
limit: Maximum results to return
doc_type: Optional document type filter (currently only "note" supported)
nextcloud_client: NextcloudClient for fetching documents
doc_type: Optional document type filter (None = all types)
nextcloud_client: NextcloudClient for access verification (optional)
**kwargs: Additional parameters (unused)
Returns:
List of SearchResult objects ranked by keyword match score
Raises:
ValueError: If nextcloud_client not provided
"""
if not nextcloud_client:
raise ValueError("KeywordSearch requires nextcloud_client parameter")
settings = get_settings()
logger.info(
f"Keyword search: query='{query}', user={user_id}, "
@@ -67,102 +71,173 @@ class KeywordSearchAlgorithm(SearchAlgorithm):
query_tokens = self._process_query(query)
logger.debug(f"Query tokens: {query_tokens}")
# Get available document types from Qdrant
indexed_types = await get_indexed_doc_types(user_id)
logger.debug(f"Indexed document types for user: {indexed_types}")
# Determine which types to search
# Build Qdrant filter
filter_conditions = [
FieldCondition(key="user_id", match=MatchValue(value=user_id))
]
if doc_type:
# Search specific type if requested
search_types = [doc_type] if doc_type in indexed_types else []
if not search_types:
logger.info(f"Doc type '{doc_type}' not indexed for user {user_id}")
return []
else:
# Search all indexed types
search_types = list(indexed_types)
# Fetch documents for each type and score them
all_documents = []
for dtype in search_types:
documents = await self._fetch_documents(nextcloud_client, dtype)
for doc in documents:
doc["_doc_type"] = dtype # Tag with type
all_documents.extend(documents)
logger.debug(f"Fetched {len(all_documents)} total documents for keyword search")
# Score and filter documents
scored_results = []
for doc in all_documents:
dtype = doc.get("_doc_type", "note")
score = self._calculate_score(
query_tokens,
doc.get("title", ""),
doc.get("content", ""),
filter_conditions.append(
FieldCondition(key="doc_type", match=MatchValue(value=doc_type))
)
if score > 0: # Only include matches
# Extract excerpt with context
excerpt = self._extract_excerpt(
doc.get("content", ""),
query_tokens,
max_length=200,
)
# Scroll through Qdrant to get all matching documents
# We need title and excerpt from payload for token matching
qdrant_client = await get_qdrant_client()
collection = settings.qdrant_collection
all_points = []
offset = None
# Scroll through all points matching filter
while True:
scroll_result, next_offset = await qdrant_client.scroll(
collection_name=collection,
scroll_filter=Filter(must=filter_conditions),
limit=100, # Batch size
offset=offset,
with_payload=[
"doc_id",
"doc_type",
"title",
"excerpt",
"chunk_index",
"total_chunks",
],
with_vectors=False, # Don't need vectors for keyword search
)
all_points.extend(scroll_result)
if next_offset is None:
break
offset = next_offset
logger.debug(
f"Retrieved {len(all_points)} points from Qdrant for keyword search"
)
# Deduplicate by (doc_id, doc_type) - keep best chunk per document
seen_docs = {}
for point in all_points:
doc_id = int(point.payload["doc_id"])
dtype = point.payload.get("doc_type", "note")
doc_key = (doc_id, dtype)
# Keep first chunk (chunk_index=0) as it has the most relevant content
chunk_idx = point.payload.get("chunk_index", 0)
if doc_key not in seen_docs or chunk_idx == 0:
seen_docs[doc_key] = point
logger.debug(f"Deduplicated to {len(seen_docs)} unique documents")
# Score each document based on keyword matches
scored_results = []
for doc_key, point in seen_docs.items():
doc_id, dtype = doc_key
title = point.payload.get("title", "")
excerpt = point.payload.get("excerpt", "")
# Calculate keyword match score
score = self._calculate_score(query_tokens, title, excerpt)
if score > 0: # Only include matches
scored_results.append(
SearchResult(
id=doc["id"],
doc_type=dtype,
title=doc.get("title", "Untitled"),
excerpt=excerpt,
score=score,
metadata={
"category": doc.get("category", ""),
"modified": doc.get("modified"),
},
)
{
"doc_id": doc_id,
"doc_type": dtype,
"title": title,
"excerpt": excerpt,
"score": score,
}
)
# Sort by score (descending) and limit
scored_results.sort(key=lambda x: x.score, reverse=True)
results = scored_results[:limit]
scored_results.sort(key=lambda x: x["score"], reverse=True)
top_results = scored_results[: limit * 2] # Get extra for access verification
logger.info(f"Keyword search returned {len(results)} matching notes")
if results:
# Verify access with Nextcloud (optional, for security)
final_results = []
if nextcloud_client:
for result in top_results:
verified = await self._verify_access(
nextcloud_client, result["doc_id"], result["doc_type"]
)
if verified:
final_results.append(
SearchResult(
id=result["doc_id"],
doc_type=result["doc_type"],
title=result["title"],
excerpt=result["excerpt"],
score=result["score"],
metadata=verified.get("metadata", {}),
)
)
if len(final_results) >= limit:
break
else:
# No verification, return results directly
for result in top_results[:limit]:
final_results.append(
SearchResult(
id=result["doc_id"],
doc_type=result["doc_type"],
title=result["title"],
excerpt=result["excerpt"],
score=result["score"],
metadata={},
)
)
logger.info(f"Keyword search returned {len(final_results)} matching documents")
if final_results:
result_details = [
f"note_{r.id} (score={r.score:.3f}, title='{r.title}')"
for r in results[:5]
f"{r.doc_type}_{r.id} (score={r.score:.3f}, title='{r.title}')"
for r in final_results[:5]
]
logger.debug(f"Top keyword results: {', '.join(result_details)}")
return results
return final_results
async def _fetch_documents(
self, nextcloud_client: NextcloudClientProtocol, doc_type: str
) -> list[dict[str, Any]]:
"""Fetch documents of a specific type from Nextcloud.
async def _verify_access(
self, nextcloud_client: NextcloudClientProtocol, doc_id: int, doc_type: str
) -> dict[str, Any] | None:
"""Verify user has access to a document via Nextcloud API.
Args:
nextcloud_client: Client for API access
doc_type: Document type to fetch ("note", "file", "calendar", etc.)
doc_id: Document ID
doc_type: Document type
Returns:
List of document dictionaries with at minimum: id, title, content
Dict with metadata if access verified, None otherwise
"""
if doc_type == "note":
return await nextcloud_client.notes.get_notes()
elif doc_type == "file":
# Future: fetch files when indexed
logger.info("File documents not yet supported for keyword search")
return []
elif doc_type == "calendar":
# Future: fetch calendar events when indexed
logger.info("Calendar documents not yet supported for keyword search")
return []
else:
logger.warning(f"Unknown document type '{doc_type}' for keyword search")
return []
try:
if doc_type == "note":
note = await nextcloud_client.notes.get_note(doc_id)
return {
"metadata": {
"category": note.get("category", ""),
"modified": note.get("modified"),
}
}
# Future: Add verification for other document types
else:
logger.debug(
f"Skipping verification for {doc_type} {doc_id} (not implemented)"
)
return {"metadata": {}}
except HTTPStatusError as e:
if e.response.status_code in (403, 404):
logger.debug(
f"Access denied for {doc_type} {doc_id}: {e.response.status_code}"
)
return None
else:
logger.warning(
f"Error verifying {doc_type} {doc_id}: {e.response.status_code}"
)
return None
def _process_query(self, query: str) -> list[str]:
"""Tokenize and normalize query.
+42 -10
View File
@@ -45,6 +45,7 @@ def configure_semantic_tools(mcp: FastMCP):
query: str,
ctx: Context,
limit: int = 10,
doc_types: list[str] | None = None,
score_threshold: float = 0.7,
algorithm: Literal["semantic", "keyword", "fuzzy", "hybrid"] = "hybrid",
semantic_weight: float = 0.5,
@@ -52,7 +53,7 @@ def configure_semantic_tools(mcp: FastMCP):
fuzzy_weight: float = 0.2,
) -> SemanticSearchResponse:
"""
Search Nextcloud content using configurable algorithms.
Search Nextcloud content using configurable algorithms with cross-app support.
Supports multiple search algorithms with client-configurable weighting:
- semantic: Vector similarity search (requires VECTOR_SYNC_ENABLED=true)
@@ -60,9 +61,13 @@ def configure_semantic_tools(mcp: FastMCP):
- fuzzy: Character overlap matching (typo-tolerant)
- hybrid: Combines all algorithms using Reciprocal Rank Fusion (default)
Document types are queried from the vector database to determine what's
actually indexed. Currently only "note" documents are fully supported.
Args:
query: Natural language search query
limit: Maximum number of results to return (default: 10)
doc_types: Document types to search (e.g., ["note", "file"]). None = search all indexed types (default)
score_threshold: Minimum similarity score for semantic/hybrid (0-1, default: 0.7)
algorithm: Search algorithm to use (default: "hybrid")
semantic_weight: Weight for semantic results in hybrid mode (default: 0.5)
@@ -116,15 +121,42 @@ def configure_semantic_tools(mcp: FastMCP):
ErrorData(code=-1, message=f"Unknown algorithm: {algorithm}")
)
# Execute search (currently limited to notes doc_type)
search_results = await search_algo.search(
query=query,
user_id=username,
limit=limit,
doc_type="note",
nextcloud_client=client,
score_threshold=score_threshold,
)
# Execute search across requested document types
# If doc_types is None, search all indexed types (cross-app search)
# If doc_types is a list, search only those types
all_results = []
if doc_types is None:
# Cross-app search: search all indexed types
# Pass None to search algorithm to let it query Qdrant for available types
search_results = await search_algo.search(
query=query,
user_id=username,
limit=limit,
doc_type=None, # Signal to search all types
nextcloud_client=client,
score_threshold=score_threshold,
)
all_results.extend(search_results)
else:
# Search specific document types
# For each requested type, execute search and combine results
for dtype in doc_types:
search_results = await search_algo.search(
query=query,
user_id=username,
limit=limit * 2, # Get extra for combining
doc_type=dtype,
nextcloud_client=client,
score_threshold=score_threshold,
)
all_results.extend(search_results)
# Sort combined results by score and limit
all_results.sort(key=lambda r: r.score, reverse=True)
all_results = all_results[:limit]
search_results = all_results
# Convert SearchResult objects to SemanticSearchResult for response
results = []