3464b21845
Fix false-positive validation error where DBSF (Distribution-Based Score Fusion) correctly produces scores > 1.0 but SearchResult validation incorrectly rejected them. **Root Cause**: SearchResult.__post_init__() enforced scores in [0.0, 1.0] range, but DBSF sums normalized scores from multiple retrieval systems (dense semantic + sparse BM25), resulting in scores like 1.55 when both systems strongly agree a document is relevant. **Changes**: - Relaxed validation to allow any score ≥ 0.0 (algorithms.py:147-157) - Updated SearchResult and SemanticSearchResult documentation to explain score ranges for RRF ([0.0, 1.0]) vs DBSF (unbounded) - Added comprehensive test coverage for both fusion methods - Added DBSF fusion option to vector visualization UI - Updated viz routes and vizApp() to support fusion parameter selection **Testing**: All 157 unit tests pass, type checking passes, ruff passes Fixes error: "Configuration error: Score must be between 0.0 and 1.0, got 1.1528953" 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
224 lines
8.4 KiB
Python
224 lines
8.4 KiB
Python
"""BM25 hybrid search algorithm using Qdrant native RRF fusion."""
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
from qdrant_client import models
|
|
from qdrant_client.models import FieldCondition, Filter, MatchValue
|
|
|
|
from nextcloud_mcp_server.config import get_settings
|
|
from nextcloud_mcp_server.embedding import get_bm25_service, get_embedding_service
|
|
from nextcloud_mcp_server.observability.metrics import record_qdrant_operation
|
|
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
|
|
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BM25HybridSearchAlgorithm(SearchAlgorithm):
|
|
"""
|
|
Hybrid search combining dense semantic vectors with BM25 sparse vectors.
|
|
|
|
Uses Qdrant's native Reciprocal Rank Fusion (RRF) to automatically merge
|
|
results from both dense (semantic) and sparse (BM25 keyword) searches.
|
|
This provides the best of both worlds: semantic understanding for conceptual
|
|
queries and precise keyword matching for specific terms, acronyms, and codes.
|
|
|
|
The fusion happens efficiently in the database using the prefetch mechanism,
|
|
eliminating the need for application-layer result merging.
|
|
"""
|
|
|
|
def __init__(self, score_threshold: float = 0.0, fusion: str = "rrf"):
|
|
"""
|
|
Initialize BM25 hybrid search algorithm.
|
|
|
|
Args:
|
|
score_threshold: Minimum fusion score (0-1, default: 0.0 to allow fusion scoring)
|
|
Note: Both RRF and DBSF produce normalized scores
|
|
fusion: Fusion algorithm to use: "rrf" (Reciprocal Rank Fusion, default)
|
|
or "dbsf" (Distribution-Based Score Fusion)
|
|
|
|
Raises:
|
|
ValueError: If fusion is not "rrf" or "dbsf"
|
|
"""
|
|
if fusion not in ("rrf", "dbsf"):
|
|
raise ValueError(
|
|
f"Invalid fusion algorithm '{fusion}'. Must be 'rrf' or 'dbsf'"
|
|
)
|
|
|
|
self.score_threshold = score_threshold
|
|
self.fusion = models.Fusion.RRF if fusion == "rrf" else models.Fusion.DBSF
|
|
self.fusion_name = fusion
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "bm25_hybrid"
|
|
|
|
@property
|
|
def requires_vector_db(self) -> bool:
|
|
return True
|
|
|
|
async def search(
|
|
self,
|
|
query: str,
|
|
user_id: str,
|
|
limit: int = 10,
|
|
doc_type: str | None = None,
|
|
**kwargs: Any,
|
|
) -> list[SearchResult]:
|
|
"""
|
|
Execute hybrid search using dense + sparse vectors with native RRF fusion.
|
|
|
|
Returns unverified results from Qdrant. Access verification should be
|
|
performed separately at the final output stage using verify_search_results().
|
|
|
|
Args:
|
|
query: Natural language or keyword search query
|
|
user_id: User ID for filtering
|
|
limit: Maximum results to return
|
|
doc_type: Optional document type filter
|
|
**kwargs: Additional parameters (score_threshold override)
|
|
|
|
Returns:
|
|
List of unverified SearchResult objects ranked by RRF fusion score
|
|
|
|
Raises:
|
|
McpError: If vector sync is not enabled or search fails
|
|
"""
|
|
settings = get_settings()
|
|
score_threshold = kwargs.get("score_threshold", self.score_threshold)
|
|
|
|
logger.info(
|
|
f"BM25 hybrid search: query='{query}', user={user_id}, "
|
|
f"limit={limit}, score_threshold={score_threshold}, doc_type={doc_type}, "
|
|
f"fusion={self.fusion_name}"
|
|
)
|
|
|
|
# Generate dense embedding for semantic search
|
|
embedding_service = get_embedding_service()
|
|
dense_embedding = await embedding_service.embed(query)
|
|
logger.debug(f"Generated dense embedding (dimension={len(dense_embedding)})")
|
|
|
|
# Generate sparse embedding for BM25 keyword search
|
|
bm25_service = get_bm25_service()
|
|
sparse_embedding = bm25_service.encode(query)
|
|
logger.debug(
|
|
f"Generated sparse embedding "
|
|
f"({len(sparse_embedding['indices'])} non-zero terms)"
|
|
)
|
|
|
|
# Build Qdrant filter
|
|
filter_conditions = [
|
|
FieldCondition(
|
|
key="user_id",
|
|
match=MatchValue(value=user_id),
|
|
)
|
|
]
|
|
|
|
# Add doc_type filter if specified
|
|
if doc_type:
|
|
filter_conditions.append(
|
|
FieldCondition(
|
|
key="doc_type",
|
|
match=MatchValue(value=doc_type),
|
|
)
|
|
)
|
|
|
|
query_filter = Filter(must=filter_conditions)
|
|
|
|
# Execute hybrid search with Qdrant native RRF fusion
|
|
qdrant_client = await get_qdrant_client()
|
|
try:
|
|
# Use prefetch to run both dense and sparse searches
|
|
# Qdrant will automatically merge results using RRF
|
|
search_response = await qdrant_client.query_points(
|
|
collection_name=settings.get_collection_name(),
|
|
prefetch=[
|
|
# Dense semantic search
|
|
models.Prefetch(
|
|
query=dense_embedding,
|
|
using="dense",
|
|
limit=limit * 2, # Get extra for deduplication
|
|
filter=query_filter,
|
|
),
|
|
# Sparse BM25 search
|
|
models.Prefetch(
|
|
query=models.SparseVector(
|
|
indices=sparse_embedding["indices"],
|
|
values=sparse_embedding["values"],
|
|
),
|
|
using="sparse",
|
|
limit=limit * 2, # Get extra for deduplication
|
|
filter=query_filter,
|
|
),
|
|
],
|
|
# Fusion query (RRF or DBSF based on initialization)
|
|
query=models.FusionQuery(fusion=self.fusion),
|
|
limit=limit * 2, # Get extra for deduplication
|
|
score_threshold=score_threshold,
|
|
with_payload=True,
|
|
with_vectors=False, # Don't return vectors to save bandwidth
|
|
)
|
|
record_qdrant_operation("search", "success")
|
|
except Exception:
|
|
record_qdrant_operation("search", "error")
|
|
raise
|
|
|
|
logger.info(
|
|
f"Qdrant {self.fusion_name.upper()} fusion returned {len(search_response.points)} results "
|
|
f"(before deduplication)"
|
|
)
|
|
|
|
if search_response.points:
|
|
# Log top 3 fusion scores to help with threshold tuning
|
|
top_scores = [p.score for p in search_response.points[:3]]
|
|
logger.debug(
|
|
f"Top 3 {self.fusion_name.upper()} fusion scores: {top_scores}"
|
|
)
|
|
|
|
# Deduplicate by (doc_id, doc_type) - multiple chunks per document
|
|
seen_docs = set()
|
|
results = []
|
|
|
|
for result in search_response.points:
|
|
doc_id = int(result.payload["doc_id"])
|
|
doc_type = result.payload.get("doc_type", "note")
|
|
doc_key = (doc_id, doc_type)
|
|
|
|
# Skip if we've already seen this document
|
|
if doc_key in seen_docs:
|
|
continue
|
|
|
|
seen_docs.add(doc_key)
|
|
|
|
# Return unverified results (verification happens at output stage)
|
|
results.append(
|
|
SearchResult(
|
|
id=doc_id,
|
|
doc_type=doc_type,
|
|
title=result.payload.get("title", "Untitled"),
|
|
excerpt=result.payload.get("excerpt", ""),
|
|
score=result.score, # Fusion score (RRF or DBSF)
|
|
metadata={
|
|
"chunk_index": result.payload.get("chunk_index"),
|
|
"total_chunks": result.payload.get("total_chunks"),
|
|
"search_method": f"bm25_hybrid_{self.fusion_name}",
|
|
},
|
|
chunk_start_offset=result.payload.get("chunk_start_offset"),
|
|
chunk_end_offset=result.payload.get("chunk_end_offset"),
|
|
)
|
|
)
|
|
|
|
if len(results) >= limit:
|
|
break
|
|
|
|
logger.info(f"Returning {len(results)} unverified results after deduplication")
|
|
if results:
|
|
result_details = [
|
|
f"{r.doc_type}_{r.id} (score={r.score:.3f}, title='{r.title}')"
|
|
for r in results[:5] # Show top 5
|
|
]
|
|
logger.debug(f"Top results: {', '.join(result_details)}")
|
|
|
|
return results
|