3f06e2ee77
Fixed 8 type checker errors across the codebase:
- vector/scanner.py: Handle None scroll results with null-safe iteration
- search/{bm25_hybrid,semantic}.py: Add None checks for result.payload
- auth/{unified_verifier,webhook_routes}.py: Assert non-None auth credentials
- client/webdav.py: Add None checks before int() conversions
- providers/openai.py: Assert embedding_model is not None
- search/algorithms.py: Explicitly type doc_types set and cast values
- observability/logging_config.py: Match parent class signature (log_data)
Also fixed test_create_tag_creates_system_tag to match WebDAV implementation
(was testing OCS API endpoint, now tests correct WebDAV endpoint with
Content-Location header).
Type checker: 0 errors (down from 8), 20 warnings (ignored)
Tests: All 192 unit tests passing
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
256 lines
10 KiB
Python
256 lines
10 KiB
Python
"""BM25 hybrid search algorithm using Qdrant native RRF fusion."""
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
from qdrant_client import models
|
|
from qdrant_client.models import FieldCondition, Filter, MatchValue
|
|
|
|
from nextcloud_mcp_server.config import get_settings
|
|
from nextcloud_mcp_server.embedding import get_bm25_service, get_embedding_service
|
|
from nextcloud_mcp_server.observability.metrics import record_qdrant_operation
|
|
from nextcloud_mcp_server.observability.tracing import trace_operation
|
|
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
|
|
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
|
|
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BM25HybridSearchAlgorithm(SearchAlgorithm):
|
|
"""
|
|
Hybrid search combining dense semantic vectors with BM25 sparse vectors.
|
|
|
|
Uses Qdrant's native Reciprocal Rank Fusion (RRF) to automatically merge
|
|
results from both dense (semantic) and sparse (BM25 keyword) searches.
|
|
This provides the best of both worlds: semantic understanding for conceptual
|
|
queries and precise keyword matching for specific terms, acronyms, and codes.
|
|
|
|
The fusion happens efficiently in the database using the prefetch mechanism,
|
|
eliminating the need for application-layer result merging.
|
|
"""
|
|
|
|
def __init__(self, score_threshold: float = 0.0, fusion: str = "rrf"):
|
|
"""
|
|
Initialize BM25 hybrid search algorithm.
|
|
|
|
Args:
|
|
score_threshold: Minimum fusion score (0-1, default: 0.0 to allow fusion scoring)
|
|
Note: Both RRF and DBSF produce normalized scores
|
|
fusion: Fusion algorithm to use: "rrf" (Reciprocal Rank Fusion, default)
|
|
or "dbsf" (Distribution-Based Score Fusion)
|
|
|
|
Raises:
|
|
ValueError: If fusion is not "rrf" or "dbsf"
|
|
"""
|
|
if fusion not in ("rrf", "dbsf"):
|
|
raise ValueError(
|
|
f"Invalid fusion algorithm '{fusion}'. Must be 'rrf' or 'dbsf'"
|
|
)
|
|
|
|
self.score_threshold = score_threshold
|
|
self.fusion = models.Fusion.RRF if fusion == "rrf" else models.Fusion.DBSF
|
|
self.fusion_name = fusion
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "bm25_hybrid"
|
|
|
|
@property
|
|
def requires_vector_db(self) -> bool:
|
|
return True
|
|
|
|
async def search(
|
|
self,
|
|
query: str,
|
|
user_id: str,
|
|
limit: int = 10,
|
|
doc_type: str | None = None,
|
|
**kwargs: Any,
|
|
) -> list[SearchResult]:
|
|
"""
|
|
Execute hybrid search using dense + sparse vectors with native RRF fusion.
|
|
|
|
Returns unverified results from Qdrant. Access verification should be
|
|
performed separately at the final output stage using verify_search_results().
|
|
|
|
Deduplicates by (doc_id, doc_type, chunk_start_offset, chunk_end_offset)
|
|
to show multiple chunks from the same document while avoiding duplicate chunks.
|
|
|
|
Args:
|
|
query: Natural language or keyword search query
|
|
user_id: User ID for filtering
|
|
limit: Maximum results to return
|
|
doc_type: Optional document type filter
|
|
**kwargs: Additional parameters (score_threshold override)
|
|
|
|
Returns:
|
|
List of unverified SearchResult objects ranked by RRF fusion score
|
|
|
|
Raises:
|
|
McpError: If vector sync is not enabled or search fails
|
|
"""
|
|
settings = get_settings()
|
|
score_threshold = kwargs.get("score_threshold", self.score_threshold)
|
|
|
|
logger.info(
|
|
f"BM25 hybrid search: query='{query}', user={user_id}, "
|
|
f"limit={limit}, score_threshold={score_threshold}, doc_type={doc_type}, "
|
|
f"fusion={self.fusion_name}"
|
|
)
|
|
|
|
# Generate dense embedding for semantic search
|
|
with trace_operation("search.get_embedding_service"):
|
|
embedding_service = get_embedding_service()
|
|
with trace_operation("search.dense_embedding"):
|
|
dense_embedding = await embedding_service.embed(query)
|
|
# Store for reuse by callers (e.g., viz_routes PCA visualization)
|
|
self.query_embedding = dense_embedding
|
|
logger.debug(f"Generated dense embedding (dimension={len(dense_embedding)})")
|
|
|
|
# Generate sparse embedding for BM25 keyword search
|
|
with trace_operation("search.get_bm25_service"):
|
|
bm25_service = get_bm25_service()
|
|
with trace_operation("search.sparse_embedding_bm25"):
|
|
sparse_embedding = await bm25_service.encode_async(query)
|
|
logger.debug(
|
|
f"Generated sparse embedding "
|
|
f"({len(sparse_embedding['indices'])} non-zero terms)"
|
|
)
|
|
|
|
# Build Qdrant filter
|
|
filter_conditions = [
|
|
get_placeholder_filter(), # Always exclude placeholders from user-facing queries
|
|
FieldCondition(
|
|
key="user_id",
|
|
match=MatchValue(value=user_id),
|
|
),
|
|
]
|
|
|
|
# Add doc_type filter if specified
|
|
if doc_type:
|
|
filter_conditions.append(
|
|
FieldCondition(
|
|
key="doc_type",
|
|
match=MatchValue(value=doc_type),
|
|
)
|
|
)
|
|
|
|
query_filter = Filter(must=filter_conditions)
|
|
|
|
# Execute hybrid search with Qdrant native RRF fusion
|
|
with trace_operation("search.get_qdrant_client"):
|
|
qdrant_client = await get_qdrant_client()
|
|
|
|
try:
|
|
# Use prefetch to run both dense and sparse searches
|
|
# Qdrant will automatically merge results using RRF
|
|
with trace_operation(
|
|
"search.qdrant_query",
|
|
attributes={"query.limit": limit * 2, "query.fusion": self.fusion_name},
|
|
):
|
|
search_response = await qdrant_client.query_points(
|
|
collection_name=settings.get_collection_name(),
|
|
prefetch=[
|
|
# Dense semantic search
|
|
models.Prefetch(
|
|
query=dense_embedding,
|
|
using="dense",
|
|
limit=limit * 2, # Get extra for deduplication
|
|
filter=query_filter,
|
|
),
|
|
# Sparse BM25 search
|
|
models.Prefetch(
|
|
query=models.SparseVector(
|
|
indices=sparse_embedding["indices"],
|
|
values=sparse_embedding["values"],
|
|
),
|
|
using="sparse",
|
|
limit=limit * 2, # Get extra for deduplication
|
|
filter=query_filter,
|
|
),
|
|
],
|
|
# Fusion query (RRF or DBSF based on initialization)
|
|
query=models.FusionQuery(fusion=self.fusion),
|
|
limit=limit * 2, # Get extra for deduplication
|
|
score_threshold=score_threshold,
|
|
with_payload=True,
|
|
with_vectors=False, # Don't return vectors to save bandwidth
|
|
)
|
|
record_qdrant_operation("search", "success")
|
|
except Exception:
|
|
record_qdrant_operation("search", "error")
|
|
raise
|
|
|
|
logger.info(
|
|
f"Qdrant {self.fusion_name.upper()} fusion returned {len(search_response.points)} results "
|
|
f"(before deduplication)"
|
|
)
|
|
|
|
if search_response.points:
|
|
# Log top 3 fusion scores to help with threshold tuning
|
|
top_scores = [p.score for p in search_response.points[:3]]
|
|
logger.debug(
|
|
f"Top 3 {self.fusion_name.upper()} fusion scores: {top_scores}"
|
|
)
|
|
|
|
# Deduplicate by (doc_id, doc_type, chunk_start, chunk_end)
|
|
# This allows multiple chunks from same doc, but removes duplicate chunks
|
|
with trace_operation(
|
|
"search.deduplicate",
|
|
attributes={"dedupe.num_points": len(search_response.points)},
|
|
):
|
|
seen_chunks = set()
|
|
results = []
|
|
|
|
for result in search_response.points:
|
|
if result.payload is None:
|
|
continue
|
|
# doc_id can be int (notes) or str (files - file paths)
|
|
doc_id = result.payload["doc_id"]
|
|
doc_type = result.payload.get("doc_type", "note")
|
|
chunk_start = result.payload.get("chunk_start_offset")
|
|
chunk_end = result.payload.get("chunk_end_offset")
|
|
chunk_key = (doc_id, doc_type, chunk_start, chunk_end)
|
|
|
|
# Skip if we've already seen this exact chunk
|
|
if chunk_key in seen_chunks:
|
|
continue
|
|
|
|
seen_chunks.add(chunk_key)
|
|
|
|
# Return unverified results (verification happens at output stage)
|
|
results.append(
|
|
SearchResult(
|
|
id=doc_id,
|
|
doc_type=doc_type,
|
|
title=result.payload.get("title", "Untitled"),
|
|
excerpt=result.payload.get("excerpt", ""),
|
|
score=result.score, # Fusion score (RRF or DBSF)
|
|
metadata={
|
|
"chunk_index": result.payload.get("chunk_index"),
|
|
"total_chunks": result.payload.get("total_chunks"),
|
|
"search_method": f"bm25_hybrid_{self.fusion_name}",
|
|
},
|
|
chunk_start_offset=result.payload.get("chunk_start_offset"),
|
|
chunk_end_offset=result.payload.get("chunk_end_offset"),
|
|
page_number=result.payload.get("page_number"),
|
|
chunk_index=result.payload.get("chunk_index", 0),
|
|
total_chunks=result.payload.get("total_chunks", 1),
|
|
point_id=str(result.id), # Qdrant point ID for batch retrieval
|
|
)
|
|
)
|
|
|
|
if len(results) >= limit:
|
|
break
|
|
|
|
logger.info(f"Returning {len(results)} unverified results after deduplication")
|
|
if results:
|
|
result_details = [
|
|
f"{r.doc_type}_{r.id} (score={r.score:.3f}, title='{r.title}')"
|
|
for r in results[:5] # Show top 5
|
|
]
|
|
logger.debug(f"Top results: {', '.join(result_details)}")
|
|
|
|
return results
|