feat: Add context expansion to semantic search with chunk overlap removal

Implements optional context expansion for semantic search results that
fetches adjacent chunks (N-1 and N+1) from Qdrant to provide before/after
context. Removes configurable chunk overlap (default 200 chars) to avoid
duplicate text appearing in both context and excerpt.

Key changes:
- Add include_context and context_chars parameters to nc_semantic_search
  and nc_semantic_search_answer tools
- Implement Qdrant cache fast path for chunk retrieval (avoids re-fetching
  and re-parsing documents, especially important for PDFs)
- Add _get_chunk_by_index_from_qdrant() to fetch adjacent chunks
- Remove chunk overlap from before_context (last N chars) and after_context
  (first N chars) to prevent duplicate text
- Fetch context in parallel with anyio.Semaphore (max 20 concurrent)
- Pass through page_number from SearchResult to SemanticSearchResult
- Remove document-level deduplication (keep chunk-level dedup from algorithm)

Context expansion is opt-in via include_context=true parameter. When enabled:
- Populates has_context_expansion, marked_text, before_context, after_context
- Adds truncation flags when context exceeds context_chars limit
- Falls back to document fetch for legacy data with truncated excerpts

Related: nextcloud_mcp_server/search/context.py:87-382,
         nextcloud_mcp_server/server/semantic.py:161-255
This commit is contained in:
Chris Coutinho
2025-11-21 01:02:22 +01:00
parent 5a251a99e6
commit a62a007c87
5 changed files with 359 additions and 19 deletions
+111 -12
View File
@@ -26,6 +26,7 @@ from nextcloud_mcp_server.observability.metrics import (
instrument_tool,
)
from nextcloud_mcp_server.search.bm25_hybrid import BM25HybridSearchAlgorithm
from nextcloud_mcp_server.search.context import get_chunk_with_context
logger = logging.getLogger(__name__)
@@ -43,6 +44,8 @@ def configure_semantic_tools(mcp: FastMCP):
doc_types: list[str] | None = None,
score_threshold: float = 0.0,
fusion: str = "rrf",
include_context: bool = False,
context_chars: int = 300,
) -> SemanticSearchResponse:
"""
Search Nextcloud content using BM25 hybrid search with cross-app support.
@@ -66,6 +69,8 @@ def configure_semantic_tools(mcp: FastMCP):
fusion: Fusion algorithm: "rrf" (Reciprocal Rank Fusion, default) or "dbsf" (Distribution-Based Score Fusion)
RRF: Good general-purpose fusion using reciprocal ranks
DBSF: Uses distribution-based normalization, may better balance different score ranges
include_context: Whether to expand results with surrounding context (default: False)
context_chars: Number of characters to include before/after matched chunk (default: 300)
Returns:
SemanticSearchResponse with matching documents ranked by fusion scores
@@ -128,18 +133,16 @@ def configure_semantic_tools(mcp: FastMCP):
# Sort combined results by score
all_results.sort(key=lambda r: r.score, reverse=True)
# Deduplicate results (hybrid search may return same doc from dense + sparse)
# Qdrant already filters by user_id for multi-tenant isolation
# Sampling tool will verify access when fetching full content
seen = set()
unique_results = []
for result in all_results:
key = (result.id, result.doc_type)
if key not in seen:
seen.add(key)
unique_results.append(result)
search_results = unique_results[:limit] # Final limit after deduplication
# Note: BM25HybridSearchAlgorithm already deduplicates at chunk level
# (doc_id, doc_type, chunk_start, chunk_end), which allows multiple
# chunks from the same document while preventing duplicate chunks.
# No additional deduplication needed here - multiple chunks per document
# are valuable for RAG contexts.
# Qdrant already filters by user_id for multi-tenant isolation.
# Sampling tool will verify access when fetching full content.
search_results = all_results[
:limit
] # Final limit after chunk-level dedup in algorithm
# Convert SearchResult objects to SemanticSearchResult for response
results = []
@@ -160,9 +163,99 @@ def configure_semantic_tools(mcp: FastMCP):
else 1,
chunk_start_offset=r.chunk_start_offset,
chunk_end_offset=r.chunk_end_offset,
page_number=r.page_number,
)
)
# Expand results with surrounding context if requested
if include_context and results:
logger.info(
f"Expanding {len(results)} results with context "
f"(context_chars={context_chars})"
)
# Fetch context for all results in parallel
# Limit concurrent requests to prevent connection pool exhaustion
max_concurrent = 20
semaphore = anyio.Semaphore(max_concurrent)
expanded_results = [None] * len(results)
async def fetch_context(index: int, result: SemanticSearchResult):
"""Fetch context for a single result (parallel with semaphore)."""
async with semaphore:
# Only expand if we have valid chunk offsets
if (
result.chunk_start_offset is None
or result.chunk_end_offset is None
):
# Keep result as-is without context expansion
expanded_results[index] = result
return
try:
chunk_context = await get_chunk_with_context(
nc_client=client,
user_id=username,
doc_id=result.id,
doc_type=result.doc_type,
chunk_start=result.chunk_start_offset,
chunk_end=result.chunk_end_offset,
page_number=result.page_number,
chunk_index=result.chunk_index,
total_chunks=result.total_chunks,
context_chars=context_chars,
)
if chunk_context:
# Create new result with context fields populated
expanded_results[index] = SemanticSearchResult(
id=result.id,
doc_type=result.doc_type,
title=result.title,
category=result.category,
excerpt=result.excerpt,
score=result.score,
chunk_index=result.chunk_index,
total_chunks=result.total_chunks,
chunk_start_offset=result.chunk_start_offset,
chunk_end_offset=result.chunk_end_offset,
page_number=result.page_number,
# Context expansion fields
has_context_expansion=True,
marked_text=chunk_context.marked_text,
before_context=chunk_context.before_context,
after_context=chunk_context.after_context,
has_before_truncation=chunk_context.has_before_truncation,
has_after_truncation=chunk_context.has_after_truncation,
)
logger.debug(
f"Expanded context for {result.doc_type} {result.id}"
)
else:
# Context expansion failed, keep original result
expanded_results[index] = result
logger.debug(
f"Failed to expand context for {result.doc_type} {result.id}, "
"keeping original result"
)
except Exception as e:
# Context expansion failed, keep original result
expanded_results[index] = result
logger.warning(
f"Error expanding context for {result.doc_type} {result.id}: {e}"
)
# Run all context fetches in parallel using anyio task group
async with anyio.create_task_group() as tg:
for idx, result in enumerate(results):
tg.start_soon(fetch_context, idx, result)
# Replace results with expanded versions
results = [r for r in expanded_results if r is not None]
logger.info(
f"Context expansion completed: {len(results)} results with context"
)
logger.info(f"Returning {len(results)} results from BM25 hybrid search")
return SemanticSearchResponse(
@@ -202,6 +295,8 @@ def configure_semantic_tools(mcp: FastMCP):
score_threshold: float = 0.7,
max_answer_tokens: int = 500,
fusion: str = "rrf",
include_context: bool = False,
context_chars: int = 300,
) -> SamplingSearchResponse:
"""
Semantic search with LLM-generated answer using MCP sampling.
@@ -227,6 +322,8 @@ def configure_semantic_tools(mcp: FastMCP):
score_threshold: Minimum similarity score 0-1 (default: 0.7)
max_answer_tokens: Maximum tokens for generated answer (default: 500)
fusion: Fusion algorithm: "rrf" (Reciprocal Rank Fusion, default) or "dbsf" (Distribution-Based Score Fusion)
include_context: Whether to expand results with surrounding context (default: False)
context_chars: Number of characters to include before/after matched chunk (default: 300)
Returns:
SamplingSearchResponse containing:
@@ -267,6 +364,8 @@ def configure_semantic_tools(mcp: FastMCP):
limit=limit,
score_threshold=score_threshold,
fusion=fusion,
include_context=include_context,
context_chars=context_chars,
)
# 2. Handle no results case - don't waste a sampling call