Files
nextcloud-mcp-server/nextcloud_mcp_server/search/context.py
T
Chris Coutinho f1610bbd2e fix: Reconstruct full content for notes to match indexed offsets
Notes are indexed as "{title}\n\n{content}" in processor.py but were
being retrieved as just content during chunk expansion, causing
chunk_start_offset and chunk_end_offset to be misaligned.

This fix reconstructs the full content structure when fetching notes
for chunk expansion, ensuring the displayed chunks match the excerpts
shown in search results.

Fixes chunk/excerpt mismatch reported in vector visualization.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-20 11:33:12 +01:00

270 lines
9.1 KiB
Python

"""Context expansion for search results.
Provides utilities to expand matched chunks with surrounding context and
position markers for better visualization and understanding of search results.
"""
import logging
from dataclasses import dataclass
from nextcloud_mcp_server.client import NextcloudClient
logger = logging.getLogger(__name__)
@dataclass
class ChunkContext:
"""Expanded chunk with surrounding context and position markers.
Attributes:
chunk_text: The matched chunk text
before_context: Text before the chunk (up to context_chars)
after_context: Text after the chunk (up to context_chars)
chunk_start_offset: Character position where chunk starts in document
chunk_end_offset: Character position where chunk ends in document
page_number: Page number for PDFs (None for other doc types)
chunk_index: Zero-based chunk index (N in "chunk N of M")
total_chunks: Total number of chunks in document
marked_text: Full text with position markers around the chunk
has_before_truncation: True if before_context was truncated
has_after_truncation: True if after_context was truncated
"""
chunk_text: str
before_context: str
after_context: str
chunk_start_offset: int
chunk_end_offset: int
page_number: int | None
chunk_index: int
total_chunks: int
marked_text: str
has_before_truncation: bool
has_after_truncation: bool
async def get_chunk_with_context(
nc_client: NextcloudClient,
user_id: str,
doc_id: str | int,
doc_type: str,
chunk_start: int,
chunk_end: int,
page_number: int | None = None,
chunk_index: int = 0,
total_chunks: int = 1,
context_chars: int = 300,
) -> ChunkContext | None:
"""Fetch chunk with surrounding context from original document.
Retrieves the full document text and expands the matched chunk to include
surrounding context for better understanding. Inserts position markers
around the chunk for visualization.
Args:
nc_client: Authenticated Nextcloud client
user_id: User ID who owns the document
doc_id: Document ID (note ID or file path)
doc_type: Type of document ("note", "file", etc.)
chunk_start: Character offset where chunk starts
chunk_end: Character offset where chunk ends
page_number: Optional page number for PDFs
chunk_index: Zero-based chunk index in document
total_chunks: Total number of chunks in document
context_chars: Number of characters to include before/after chunk
Returns:
ChunkContext with expanded context and markers, or None if document
cannot be retrieved
"""
# Fetch full document text
full_text = await _fetch_document_text(nc_client, doc_id, doc_type)
if full_text is None:
logger.warning(
f"Could not fetch document text for {doc_type} {doc_id}, "
"skipping context expansion"
)
return None
# Validate offsets
if chunk_start < 0 or chunk_end > len(full_text) or chunk_start >= chunk_end:
logger.warning(
f"Invalid chunk offsets for {doc_type} {doc_id}: "
f"start={chunk_start}, end={chunk_end}, doc_len={len(full_text)}"
)
return None
# Extract chunk text
chunk_text = full_text[chunk_start:chunk_end]
# Calculate context boundaries
context_start = max(0, chunk_start - context_chars)
context_end = min(len(full_text), chunk_end + context_chars)
# Extract context
before_context = full_text[context_start:chunk_start]
after_context = full_text[chunk_end:context_end]
# Check for truncation
has_before_truncation = context_start > 0
has_after_truncation = context_end < len(full_text)
# Create marked text with position markers
marked_text = _insert_position_markers(
before_context=before_context,
chunk_text=chunk_text,
after_context=after_context,
page_number=page_number,
chunk_index=chunk_index,
total_chunks=total_chunks,
has_before_truncation=has_before_truncation,
has_after_truncation=has_after_truncation,
)
return ChunkContext(
chunk_text=chunk_text,
before_context=before_context,
after_context=after_context,
chunk_start_offset=chunk_start,
chunk_end_offset=chunk_end,
page_number=page_number,
chunk_index=chunk_index,
total_chunks=total_chunks,
marked_text=marked_text,
has_before_truncation=has_before_truncation,
has_after_truncation=has_after_truncation,
)
async def _fetch_document_text(
nc_client: NextcloudClient, doc_id: str | int, doc_type: str
) -> str | None:
"""Fetch full text content of a document.
Args:
nc_client: Authenticated Nextcloud client
doc_id: Document ID (note ID or file path)
doc_type: Type of document ("note", "file", etc.)
Returns:
Full document text, or None if document cannot be retrieved
"""
try:
if doc_type == "note":
# Fetch note by ID
note = await nc_client.notes.get_note(note_id=int(doc_id))
# Reconstruct full content as indexed: title + "\n\n" + content
# This ensures chunk offsets align with indexed content structure
title = note.get("title", "")
content = note.get("content", "")
return f"{title}\n\n{content}"
elif doc_type == "file":
# Fetch file content via WebDAV
try:
file_path = str(doc_id)
file_content, content_type = await nc_client.webdav.read_file(file_path)
# Check if it's a PDF (by content type or file extension)
is_pdf = (
content_type and "pdf" in content_type.lower()
) or file_path.lower().endswith(".pdf")
if is_pdf:
# Extract text from PDF using PyMuPDF
import fitz # PyMuPDF
logger.debug(f"Extracting text from PDF: {file_path}")
pdf_doc = fitz.open(stream=file_content, filetype="pdf")
text_parts = []
for page in pdf_doc:
text_parts.append(page.get_text())
pdf_doc.close()
full_text = "\n".join(text_parts)
logger.debug(
f"Extracted {len(full_text)} characters from "
f"{len(text_parts)} pages in {file_path}"
)
return full_text
else:
# Assume it's a text file, decode to string
logger.debug(f"Decoding text file: {file_path}")
return file_content.decode("utf-8", errors="replace")
except Exception as e:
logger.error(
f"Error fetching file content for {doc_id}: {e}", exc_info=True
)
return None
else:
logger.warning(f"Unsupported doc_type for context expansion: {doc_type}")
return None
except Exception as e:
logger.error(f"Error fetching document {doc_type} {doc_id}: {e}", exc_info=True)
return None
def _insert_position_markers(
before_context: str,
chunk_text: str,
after_context: str,
page_number: int | None,
chunk_index: int,
total_chunks: int,
has_before_truncation: bool,
has_after_truncation: bool,
) -> str:
"""Insert position markers around matched chunk.
Creates markdown-formatted text with visual markers indicating chunk
boundaries and metadata.
Args:
before_context: Text before chunk
chunk_text: The matched chunk
after_context: Text after chunk
page_number: Optional page number
chunk_index: Zero-based chunk index
total_chunks: Total chunks in document
has_before_truncation: Whether before_context is truncated
has_after_truncation: Whether after_context is truncated
Returns:
Formatted text with position markers
"""
# Build position metadata
position_parts = []
if page_number is not None:
position_parts.append(f"Page {page_number}")
position_parts.append(f"Chunk {chunk_index + 1} of {total_chunks}")
position_metadata = ", ".join(position_parts)
# Build marked text
parts = []
# Add truncation indicator for before context
if has_before_truncation:
parts.append("**[...]**\n\n")
# Add before context if present
if before_context:
parts.append(before_context)
# Add chunk start marker
parts.append(f"\n\n🔍 **MATCHED CHUNK START** ({position_metadata})\n\n")
# Add chunk text
parts.append(chunk_text)
# Add chunk end marker
parts.append("\n\n🔍 **MATCHED CHUNK END**\n\n")
# Add after context if present
if after_context:
parts.append(after_context)
# Add truncation indicator for after context
if has_after_truncation:
parts.append("\n\n**[...]**")
return "".join(parts)