Files
nextcloud-mcp-server/nextcloud_mcp_server/search/bm25_hybrid.py
T
Chris Coutinho 13b2d0048c feat: Implement Qdrant placeholder state management
Introduces a placeholder-based state tracking system to prevent duplicate
document processing during the gap between scanner queuing and processor
completion.

**Key Changes:**

1. **Placeholder Helper Functions** (`vector/placeholder.py`):
   - `write_placeholder_point()` - Creates zero-vector placeholder when queuing
   - `query_document_metadata()` - Queries for existing entry (placeholder or real)
   - `delete_placeholder_point()` - Removes placeholder before writing real vectors
   - `get_placeholder_filter()` - Filters placeholders from user-facing queries

2. **Scanner Updates** (`vector/scanner.py`):
   - Replace `indexed_at` comparison with `modified_at` comparison
   - Write placeholder before queuing each document
   - Query per-document metadata instead of bulk-querying indexed_at
   - Fixes bug where files were resubmitted every scan cycle

3. **Processor Updates** (`vector/processor.py`):
   - Delete placeholder before upserting real vectors
   - Ensures no duplicate points in Qdrant

4. **Query Filters** (all search files):
   - Add `get_placeholder_filter()` to all user-facing queries
   - Ensures placeholders never appear in search results or visualizations
   - Applied to: bm25_hybrid.py, semantic.py, viz_routes.py, algorithms.py

**Architecture:**
- Placeholders use zero vectors with dimension from embedding service
- Payload includes `is_placeholder: True` flag for filtering
- Status field tracks: "pending", "processing", "completed", "failed"
- Deterministic UUIDs using uuid5 for consistent point IDs

**Impact:**
- Eliminates duplicate processing of same documents
- Fixes race condition where long-running documents get queued multiple times
- Prevents scanner from resubmitting files every scan cycle
- Maintains clean separation between in-flight and indexed documents

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-20 15:04:00 +01:00

236 lines
9.2 KiB
Python

"""BM25 hybrid search algorithm using Qdrant native RRF fusion."""
import logging
from typing import Any
from qdrant_client import models
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.embedding import get_bm25_service, get_embedding_service
from nextcloud_mcp_server.observability.metrics import record_qdrant_operation
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
logger = logging.getLogger(__name__)
class BM25HybridSearchAlgorithm(SearchAlgorithm):
"""
Hybrid search combining dense semantic vectors with BM25 sparse vectors.
Uses Qdrant's native Reciprocal Rank Fusion (RRF) to automatically merge
results from both dense (semantic) and sparse (BM25 keyword) searches.
This provides the best of both worlds: semantic understanding for conceptual
queries and precise keyword matching for specific terms, acronyms, and codes.
The fusion happens efficiently in the database using the prefetch mechanism,
eliminating the need for application-layer result merging.
"""
def __init__(self, score_threshold: float = 0.0, fusion: str = "rrf"):
"""
Initialize BM25 hybrid search algorithm.
Args:
score_threshold: Minimum fusion score (0-1, default: 0.0 to allow fusion scoring)
Note: Both RRF and DBSF produce normalized scores
fusion: Fusion algorithm to use: "rrf" (Reciprocal Rank Fusion, default)
or "dbsf" (Distribution-Based Score Fusion)
Raises:
ValueError: If fusion is not "rrf" or "dbsf"
"""
if fusion not in ("rrf", "dbsf"):
raise ValueError(
f"Invalid fusion algorithm '{fusion}'. Must be 'rrf' or 'dbsf'"
)
self.score_threshold = score_threshold
self.fusion = models.Fusion.RRF if fusion == "rrf" else models.Fusion.DBSF
self.fusion_name = fusion
@property
def name(self) -> str:
return "bm25_hybrid"
@property
def requires_vector_db(self) -> bool:
return True
async def search(
self,
query: str,
user_id: str,
limit: int = 10,
doc_type: str | None = None,
**kwargs: Any,
) -> list[SearchResult]:
"""
Execute hybrid search using dense + sparse vectors with native RRF fusion.
Returns unverified results from Qdrant. Access verification should be
performed separately at the final output stage using verify_search_results().
Deduplicates by (doc_id, doc_type, chunk_start_offset, chunk_end_offset)
to show multiple chunks from the same document while avoiding duplicate chunks.
Args:
query: Natural language or keyword search query
user_id: User ID for filtering
limit: Maximum results to return
doc_type: Optional document type filter
**kwargs: Additional parameters (score_threshold override)
Returns:
List of unverified SearchResult objects ranked by RRF fusion score
Raises:
McpError: If vector sync is not enabled or search fails
"""
settings = get_settings()
score_threshold = kwargs.get("score_threshold", self.score_threshold)
logger.info(
f"BM25 hybrid search: query='{query}', user={user_id}, "
f"limit={limit}, score_threshold={score_threshold}, doc_type={doc_type}, "
f"fusion={self.fusion_name}"
)
# Generate dense embedding for semantic search
embedding_service = get_embedding_service()
dense_embedding = await embedding_service.embed(query)
logger.debug(f"Generated dense embedding (dimension={len(dense_embedding)})")
# Generate sparse embedding for BM25 keyword search
bm25_service = get_bm25_service()
sparse_embedding = bm25_service.encode(query)
logger.debug(
f"Generated sparse embedding "
f"({len(sparse_embedding['indices'])} non-zero terms)"
)
# Build Qdrant filter
filter_conditions = [
get_placeholder_filter(), # Always exclude placeholders from user-facing queries
FieldCondition(
key="user_id",
match=MatchValue(value=user_id),
),
]
# Add doc_type filter if specified
if doc_type:
filter_conditions.append(
FieldCondition(
key="doc_type",
match=MatchValue(value=doc_type),
)
)
query_filter = Filter(must=filter_conditions)
# Execute hybrid search with Qdrant native RRF fusion
qdrant_client = await get_qdrant_client()
try:
# Use prefetch to run both dense and sparse searches
# Qdrant will automatically merge results using RRF
search_response = await qdrant_client.query_points(
collection_name=settings.get_collection_name(),
prefetch=[
# Dense semantic search
models.Prefetch(
query=dense_embedding,
using="dense",
limit=limit * 2, # Get extra for deduplication
filter=query_filter,
),
# Sparse BM25 search
models.Prefetch(
query=models.SparseVector(
indices=sparse_embedding["indices"],
values=sparse_embedding["values"],
),
using="sparse",
limit=limit * 2, # Get extra for deduplication
filter=query_filter,
),
],
# Fusion query (RRF or DBSF based on initialization)
query=models.FusionQuery(fusion=self.fusion),
limit=limit * 2, # Get extra for deduplication
score_threshold=score_threshold,
with_payload=True,
with_vectors=False, # Don't return vectors to save bandwidth
)
record_qdrant_operation("search", "success")
except Exception:
record_qdrant_operation("search", "error")
raise
logger.info(
f"Qdrant {self.fusion_name.upper()} fusion returned {len(search_response.points)} results "
f"(before deduplication)"
)
if search_response.points:
# Log top 3 fusion scores to help with threshold tuning
top_scores = [p.score for p in search_response.points[:3]]
logger.debug(
f"Top 3 {self.fusion_name.upper()} fusion scores: {top_scores}"
)
# Deduplicate by (doc_id, doc_type, chunk_start, chunk_end)
# This allows multiple chunks from same doc, but removes duplicate chunks
seen_chunks = set()
results = []
for result in search_response.points:
# doc_id can be int (notes) or str (files - file paths)
doc_id = result.payload["doc_id"]
doc_type = result.payload.get("doc_type", "note")
chunk_start = result.payload.get("chunk_start_offset")
chunk_end = result.payload.get("chunk_end_offset")
chunk_key = (doc_id, doc_type, chunk_start, chunk_end)
# Skip if we've already seen this exact chunk
if chunk_key in seen_chunks:
continue
seen_chunks.add(chunk_key)
# Return unverified results (verification happens at output stage)
results.append(
SearchResult(
id=doc_id,
doc_type=doc_type,
title=result.payload.get("title", "Untitled"),
excerpt=result.payload.get("excerpt", ""),
score=result.score, # Fusion score (RRF or DBSF)
metadata={
"chunk_index": result.payload.get("chunk_index"),
"total_chunks": result.payload.get("total_chunks"),
"search_method": f"bm25_hybrid_{self.fusion_name}",
},
chunk_start_offset=result.payload.get("chunk_start_offset"),
chunk_end_offset=result.payload.get("chunk_end_offset"),
page_number=result.payload.get("page_number"),
chunk_index=result.payload.get("chunk_index", 0),
total_chunks=result.payload.get("total_chunks", 1),
)
)
if len(results) >= limit:
break
logger.info(f"Returning {len(results)} unverified results after deduplication")
if results:
result_details = [
f"{r.doc_type}_{r.id} (score={r.score:.3f}, title='{r.title}')"
for r in results[:5] # Show top 5
]
logger.debug(f"Top results: {', '.join(result_details)}")
return results