From a62a007c8773fdfa44d8ed1bceafd1e73b907975 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Fri, 21 Nov 2025 01:02:22 +0100 Subject: [PATCH] feat: Add context expansion to semantic search with chunk overlap removal Implements optional context expansion for semantic search results that fetches adjacent chunks (N-1 and N+1) from Qdrant to provide before/after context. Removes configurable chunk overlap (default 200 chars) to avoid duplicate text appearing in both context and excerpt. Key changes: - Add include_context and context_chars parameters to nc_semantic_search and nc_semantic_search_answer tools - Implement Qdrant cache fast path for chunk retrieval (avoids re-fetching and re-parsing documents, especially important for PDFs) - Add _get_chunk_by_index_from_qdrant() to fetch adjacent chunks - Remove chunk overlap from before_context (last N chars) and after_context (first N chars) to prevent duplicate text - Fetch context in parallel with anyio.Semaphore (max 20 concurrent) - Pass through page_number from SearchResult to SemanticSearchResult - Remove document-level deduplication (keep chunk-level dedup from algorithm) Context expansion is opt-in via include_context=true parameter. When enabled: - Populates has_context_expansion, marked_text, before_context, after_context - Adds truncation flags when context exceeds context_chars limit - Falls back to document fetch for legacy data with truncated excerpts Related: nextcloud_mcp_server/search/context.py:87-382, nextcloud_mcp_server/server/semantic.py:161-255 --- .../auth/templates/vector_viz.html | 3 +- .../document_processors/pymupdf.py | 2 +- nextcloud_mcp_server/search/context.py | 248 +++++++++++++++++- nextcloud_mcp_server/server/semantic.py | 123 ++++++++- nextcloud_mcp_server/vector/processor.py | 2 +- 5 files changed, 359 insertions(+), 19 deletions(-) diff --git a/nextcloud_mcp_server/auth/templates/vector_viz.html b/nextcloud_mcp_server/auth/templates/vector_viz.html index 214b582..de91f2f 100644 --- a/nextcloud_mcp_server/auth/templates/vector_viz.html +++ b/nextcloud_mcp_server/auth/templates/vector_viz.html @@ -122,7 +122,8 @@ -
+
Raw Score: (% relative) | diff --git a/nextcloud_mcp_server/document_processors/pymupdf.py b/nextcloud_mcp_server/document_processors/pymupdf.py index 8d5c7e6..4b006c4 100644 --- a/nextcloud_mcp_server/document_processors/pymupdf.py +++ b/nextcloud_mcp_server/document_processors/pymupdf.py @@ -8,12 +8,12 @@ from typing import Any, Optional import pymupdf import pymupdf.layout -import pymupdf4llm from .base import DocumentProcessor, ProcessingResult, ProcessorError # Activate layout analysis for better text extraction pymupdf.layout.activate() +import pymupdf4llm # noqa logger = logging.getLogger(__name__) diff --git a/nextcloud_mcp_server/search/context.py b/nextcloud_mcp_server/search/context.py index 4477e43..eda4d1e 100644 --- a/nextcloud_mcp_server/search/context.py +++ b/nextcloud_mcp_server/search/context.py @@ -12,6 +12,141 @@ from nextcloud_mcp_server.client import NextcloudClient logger = logging.getLogger(__name__) +async def _get_chunk_from_qdrant( + user_id: str, doc_id: int, doc_type: str, chunk_start: int, chunk_end: int +) -> str | None: + """Retrieve full chunk text from Qdrant payload. + + This avoids re-fetching and re-parsing documents by using the cached + chunk content already stored in Qdrant. + + Args: + user_id: User ID who owns the document + doc_id: Document ID + doc_type: Document type (e.g., "note", "file") + chunk_start: Character offset where chunk starts + chunk_end: Character offset where chunk ends + + Returns: + Full chunk text from Qdrant excerpt field, or None if not found + """ + try: + from qdrant_client.models import FieldCondition, Filter, MatchValue + + from nextcloud_mcp_server.config import get_settings + from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client + + qdrant_client = await get_qdrant_client() + settings = get_settings() + + # Query for the specific chunk + scroll_result = await qdrant_client.scroll( + collection_name=settings.get_collection_name(), + scroll_filter=Filter( + must=[ + FieldCondition(key="user_id", match=MatchValue(value=user_id)), + FieldCondition(key="doc_id", match=MatchValue(value=doc_id)), + FieldCondition(key="doc_type", match=MatchValue(value=doc_type)), + FieldCondition( + key="chunk_start_offset", match=MatchValue(value=chunk_start) + ), + FieldCondition( + key="chunk_end_offset", match=MatchValue(value=chunk_end) + ), + ] + ), + limit=1, + with_payload=["excerpt"], + with_vectors=False, + ) + + if scroll_result[0]: + point = scroll_result[0][0] + excerpt = point.payload.get("excerpt") + if excerpt: + logger.debug( + f"Retrieved chunk from Qdrant for {doc_type} {doc_id}: " + f"{len(excerpt)} chars" + ) + return str(excerpt) + + logger.debug( + f"Chunk not found in Qdrant for {doc_type} {doc_id}, " + f"chunk [{chunk_start}:{chunk_end}]. Will fall back to document fetch." + ) + return None + + except Exception as e: + logger.error( + f"Error querying Qdrant for chunk: {e}. Falling back to document fetch.", + exc_info=True, + ) + return None + + +async def _get_chunk_by_index_from_qdrant( + user_id: str, doc_id: int, doc_type: str, chunk_index: int +) -> str | None: + """Retrieve chunk text by chunk_index from Qdrant payload. + + Used to fetch adjacent chunks for context expansion. + + Args: + user_id: User ID who owns the document + doc_id: Document ID + doc_type: Document type (e.g., "note", "file") + chunk_index: Zero-based chunk index in document + + Returns: + Full chunk text from Qdrant excerpt field, or None if not found + """ + try: + from qdrant_client.models import FieldCondition, Filter, MatchValue + + from nextcloud_mcp_server.config import get_settings + from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client + + qdrant_client = await get_qdrant_client() + settings = get_settings() + + # Query for chunk by index + scroll_result = await qdrant_client.scroll( + collection_name=settings.get_collection_name(), + scroll_filter=Filter( + must=[ + FieldCondition(key="user_id", match=MatchValue(value=user_id)), + FieldCondition(key="doc_id", match=MatchValue(value=doc_id)), + FieldCondition(key="doc_type", match=MatchValue(value=doc_type)), + FieldCondition( + key="chunk_index", match=MatchValue(value=chunk_index) + ), + ] + ), + limit=1, + with_payload=["excerpt"], + with_vectors=False, + ) + + if scroll_result[0]: + point = scroll_result[0][0] + excerpt = point.payload.get("excerpt") + if excerpt: + logger.debug( + f"Retrieved adjacent chunk {chunk_index} from Qdrant for " + f"{doc_type} {doc_id}: {len(excerpt)} chars" + ) + return str(excerpt) + + return None + + except Exception as e: + logger.debug( + f"Could not retrieve adjacent chunk {chunk_index} for " + f"{doc_type} {doc_id}: {e}" + ) + return None + + async def _get_file_path_from_qdrant( user_id: str, file_id: int, chunk_start: int, chunk_end: int ) -> str | None: @@ -117,11 +252,11 @@ async def get_chunk_with_context( total_chunks: int = 1, context_chars: int = 300, ) -> ChunkContext | None: - """Fetch chunk with surrounding context from original document. + """Fetch chunk with surrounding context. - Retrieves the full document text and expands the matched chunk to include - surrounding context for better understanding. Inserts position markers - around the chunk for visualization. + First tries to retrieve the chunk from Qdrant (fast, cached). If that fails + (e.g., legacy data with truncated excerpts), falls back to fetching and + parsing the full document (slower, for PDFs especially). Args: nc_client: Authenticated Nextcloud client @@ -139,6 +274,111 @@ async def get_chunk_with_context( ChunkContext with expanded context and markers, or None if document cannot be retrieved """ + # Convert doc_id to int for Qdrant query + doc_id_int = ( + int(doc_id) + if isinstance(doc_id, str) and doc_id.isdigit() + else (doc_id if isinstance(doc_id, int) else None) + ) + + # Try to get chunk from Qdrant first (fast path) + if doc_id_int is not None: + chunk_text = await _get_chunk_from_qdrant( + user_id, doc_id_int, doc_type, chunk_start, chunk_end + ) + if chunk_text: + logger.info( + f"Retrieved chunk from Qdrant cache for {doc_type} {doc_id} " + f"(avoids document re-fetch/re-parse)" + ) + + # Fetch adjacent chunks for context expansion + # Get chunk overlap from config to remove duplicate text + from nextcloud_mcp_server.config import get_settings + + settings = get_settings() + chunk_overlap = settings.document_chunk_overlap + + before_context = "" + after_context = "" + has_before_truncation = False + has_after_truncation = False + + # Fetch previous chunk if not first chunk + if chunk_index > 0: + before_chunk = await _get_chunk_by_index_from_qdrant( + user_id, doc_id_int, doc_type, chunk_index - 1 + ) + if before_chunk: + # Remove overlap: the last chunk_overlap chars of previous chunk + # overlap with the first chunk_overlap chars of current chunk + before_context = ( + before_chunk[:-chunk_overlap] + if len(before_chunk) > chunk_overlap + else "" + ) + # Truncate if requested context_chars < remaining length + if before_context and len(before_context) > context_chars: + before_context = before_context[-context_chars:] + has_before_truncation = True + else: + # Could not fetch previous chunk, but we're not at start + has_before_truncation = True + + # Fetch next chunk if not last chunk + if chunk_index < total_chunks - 1: + after_chunk = await _get_chunk_by_index_from_qdrant( + user_id, doc_id_int, doc_type, chunk_index + 1 + ) + if after_chunk: + # Remove overlap: the first chunk_overlap chars of next chunk + # overlap with the last chunk_overlap chars of current chunk + after_context = ( + after_chunk[chunk_overlap:] + if len(after_chunk) > chunk_overlap + else "" + ) + # Truncate if requested context_chars < remaining length + if after_context and len(after_context) > context_chars: + after_context = after_context[:context_chars] + has_after_truncation = True + else: + # Could not fetch next chunk, but we're not at end + has_after_truncation = True + + marked_text = _insert_position_markers( + before_context=before_context, + chunk_text=chunk_text, + after_context=after_context, + page_number=page_number, + chunk_index=chunk_index, + total_chunks=total_chunks, + has_before_truncation=has_before_truncation, + has_after_truncation=has_after_truncation, + ) + return ChunkContext( + chunk_text=chunk_text, + before_context=before_context, + after_context=after_context, + chunk_start_offset=chunk_start, + chunk_end_offset=chunk_end, + page_number=page_number, + chunk_index=chunk_index, + total_chunks=total_chunks, + marked_text=marked_text, + has_before_truncation=has_before_truncation, + has_after_truncation=has_after_truncation, + ) + + # Fallback: Fetch full document and extract chunk with context + # This path is taken for: + # 1. Legacy data with truncated excerpts in Qdrant + # 2. Failed Qdrant queries + logger.info( + f"Falling back to document fetch for {doc_type} {doc_id} " + f"(Qdrant cache miss, possibly legacy data)" + ) + # For files, retrieve file_path from Qdrant payload resolved_doc_id = doc_id if doc_type == "file" and isinstance(doc_id, int): diff --git a/nextcloud_mcp_server/server/semantic.py b/nextcloud_mcp_server/server/semantic.py index 0ff76da..65f0a97 100644 --- a/nextcloud_mcp_server/server/semantic.py +++ b/nextcloud_mcp_server/server/semantic.py @@ -26,6 +26,7 @@ from nextcloud_mcp_server.observability.metrics import ( instrument_tool, ) from nextcloud_mcp_server.search.bm25_hybrid import BM25HybridSearchAlgorithm +from nextcloud_mcp_server.search.context import get_chunk_with_context logger = logging.getLogger(__name__) @@ -43,6 +44,8 @@ def configure_semantic_tools(mcp: FastMCP): doc_types: list[str] | None = None, score_threshold: float = 0.0, fusion: str = "rrf", + include_context: bool = False, + context_chars: int = 300, ) -> SemanticSearchResponse: """ Search Nextcloud content using BM25 hybrid search with cross-app support. @@ -66,6 +69,8 @@ def configure_semantic_tools(mcp: FastMCP): fusion: Fusion algorithm: "rrf" (Reciprocal Rank Fusion, default) or "dbsf" (Distribution-Based Score Fusion) RRF: Good general-purpose fusion using reciprocal ranks DBSF: Uses distribution-based normalization, may better balance different score ranges + include_context: Whether to expand results with surrounding context (default: False) + context_chars: Number of characters to include before/after matched chunk (default: 300) Returns: SemanticSearchResponse with matching documents ranked by fusion scores @@ -128,18 +133,16 @@ def configure_semantic_tools(mcp: FastMCP): # Sort combined results by score all_results.sort(key=lambda r: r.score, reverse=True) - # Deduplicate results (hybrid search may return same doc from dense + sparse) - # Qdrant already filters by user_id for multi-tenant isolation - # Sampling tool will verify access when fetching full content - seen = set() - unique_results = [] - for result in all_results: - key = (result.id, result.doc_type) - if key not in seen: - seen.add(key) - unique_results.append(result) - - search_results = unique_results[:limit] # Final limit after deduplication + # Note: BM25HybridSearchAlgorithm already deduplicates at chunk level + # (doc_id, doc_type, chunk_start, chunk_end), which allows multiple + # chunks from the same document while preventing duplicate chunks. + # No additional deduplication needed here - multiple chunks per document + # are valuable for RAG contexts. + # Qdrant already filters by user_id for multi-tenant isolation. + # Sampling tool will verify access when fetching full content. + search_results = all_results[ + :limit + ] # Final limit after chunk-level dedup in algorithm # Convert SearchResult objects to SemanticSearchResult for response results = [] @@ -160,9 +163,99 @@ def configure_semantic_tools(mcp: FastMCP): else 1, chunk_start_offset=r.chunk_start_offset, chunk_end_offset=r.chunk_end_offset, + page_number=r.page_number, ) ) + # Expand results with surrounding context if requested + if include_context and results: + logger.info( + f"Expanding {len(results)} results with context " + f"(context_chars={context_chars})" + ) + + # Fetch context for all results in parallel + # Limit concurrent requests to prevent connection pool exhaustion + max_concurrent = 20 + semaphore = anyio.Semaphore(max_concurrent) + expanded_results = [None] * len(results) + + async def fetch_context(index: int, result: SemanticSearchResult): + """Fetch context for a single result (parallel with semaphore).""" + async with semaphore: + # Only expand if we have valid chunk offsets + if ( + result.chunk_start_offset is None + or result.chunk_end_offset is None + ): + # Keep result as-is without context expansion + expanded_results[index] = result + return + + try: + chunk_context = await get_chunk_with_context( + nc_client=client, + user_id=username, + doc_id=result.id, + doc_type=result.doc_type, + chunk_start=result.chunk_start_offset, + chunk_end=result.chunk_end_offset, + page_number=result.page_number, + chunk_index=result.chunk_index, + total_chunks=result.total_chunks, + context_chars=context_chars, + ) + + if chunk_context: + # Create new result with context fields populated + expanded_results[index] = SemanticSearchResult( + id=result.id, + doc_type=result.doc_type, + title=result.title, + category=result.category, + excerpt=result.excerpt, + score=result.score, + chunk_index=result.chunk_index, + total_chunks=result.total_chunks, + chunk_start_offset=result.chunk_start_offset, + chunk_end_offset=result.chunk_end_offset, + page_number=result.page_number, + # Context expansion fields + has_context_expansion=True, + marked_text=chunk_context.marked_text, + before_context=chunk_context.before_context, + after_context=chunk_context.after_context, + has_before_truncation=chunk_context.has_before_truncation, + has_after_truncation=chunk_context.has_after_truncation, + ) + logger.debug( + f"Expanded context for {result.doc_type} {result.id}" + ) + else: + # Context expansion failed, keep original result + expanded_results[index] = result + logger.debug( + f"Failed to expand context for {result.doc_type} {result.id}, " + "keeping original result" + ) + except Exception as e: + # Context expansion failed, keep original result + expanded_results[index] = result + logger.warning( + f"Error expanding context for {result.doc_type} {result.id}: {e}" + ) + + # Run all context fetches in parallel using anyio task group + async with anyio.create_task_group() as tg: + for idx, result in enumerate(results): + tg.start_soon(fetch_context, idx, result) + + # Replace results with expanded versions + results = [r for r in expanded_results if r is not None] + logger.info( + f"Context expansion completed: {len(results)} results with context" + ) + logger.info(f"Returning {len(results)} results from BM25 hybrid search") return SemanticSearchResponse( @@ -202,6 +295,8 @@ def configure_semantic_tools(mcp: FastMCP): score_threshold: float = 0.7, max_answer_tokens: int = 500, fusion: str = "rrf", + include_context: bool = False, + context_chars: int = 300, ) -> SamplingSearchResponse: """ Semantic search with LLM-generated answer using MCP sampling. @@ -227,6 +322,8 @@ def configure_semantic_tools(mcp: FastMCP): score_threshold: Minimum similarity score 0-1 (default: 0.7) max_answer_tokens: Maximum tokens for generated answer (default: 500) fusion: Fusion algorithm: "rrf" (Reciprocal Rank Fusion, default) or "dbsf" (Distribution-Based Score Fusion) + include_context: Whether to expand results with surrounding context (default: False) + context_chars: Number of characters to include before/after matched chunk (default: 300) Returns: SamplingSearchResponse containing: @@ -267,6 +364,8 @@ def configure_semantic_tools(mcp: FastMCP): limit=limit, score_threshold=score_threshold, fusion=fusion, + include_context=include_context, + context_chars=context_chars, ) # 2. Handle no results case - don't waste a sampling call diff --git a/nextcloud_mcp_server/vector/processor.py b/nextcloud_mcp_server/vector/processor.py index d3e8607..6f3fc5c 100644 --- a/nextcloud_mcp_server/vector/processor.py +++ b/nextcloud_mcp_server/vector/processor.py @@ -391,7 +391,7 @@ async def _index_document( "doc_type": doc_task.doc_type, "is_placeholder": False, # Real indexed document (not placeholder) "title": title, - "excerpt": chunk.text[:200], + "excerpt": chunk.text, # Full chunk text (up to chunk_size, default 2048 chars) "indexed_at": indexed_at, "modified_at": doc_task.modified_at, "etag": etag,