feat: Implement per-chunk vector visualization with context expansion

Major improvements to vector visualization page:
- Refactor PCA to display individual chunks instead of averaged documents
- Add context expansion module for fetching surrounding text from notes and PDFs
- Update deduplication to use (doc_id, doc_type, chunk_start, chunk_end) keys
- Fix Alpine.js rendering with chunk-specific keys including offsets
- Refactor authentication helper to return NextcloudClient for better reuse
- Add async context manager support to NextcloudClient

Technical details:
- viz_routes.py: Fetch specific chunk vectors instead of averaging per document
- context.py: New module supporting both notes and PDF text extraction via PyMuPDF
- search algorithms: Extract page_number, chunk_index, total_chunks from Qdrant
- vector-viz.js/html: Use chunk positions in expansion tracking keys

This enables users to see which specific chunks match their query
and view them with surrounding context in the PCA visualization.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-11-20 11:22:20 +01:00
parent b8010270c1
commit 327d843f64
10 changed files with 485 additions and 172 deletions
@@ -217,7 +217,7 @@ function vizApp() {
},
async toggleChunk(result) {
const resultKey = `${result.doc_type}_${result.id}`;
const resultKey = `${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`;
if (this.isChunkExpanded(resultKey)) {
delete this.expandedChunks[resultKey];
@@ -117,7 +117,7 @@
<template x-if="!loading && results.length > 0">
<div x-transition.opacity.duration.200ms>
<template x-for="result in results" :key="result.id">
<template x-for="result in results" :key="`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`">
<div style="padding: 12px; border-bottom: 1px solid #eee;">
<a :href="getNextcloudUrl(result)" target="_blank" style="font-weight: 500; color: #0066cc; text-decoration: none;">
<span x-text="result.title"></span>
@@ -134,22 +134,22 @@
<button
class="chunk-toggle-btn"
@click="toggleChunk(result)"
x-text="isChunkExpanded(`${result.doc_type}_${result.id}`) ? 'Hide Chunk' : 'Show Chunk'"
x-text="isChunkExpanded(`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`) ? 'Hide Chunk' : 'Show Chunk'"
></button>
</template>
<!-- Chunk context (expanded inline) -->
<template x-if="isChunkExpanded(`${result.doc_type}_${result.id}`)">
<template x-if="isChunkExpanded(`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`)">
<div class="chunk-context" x-transition.opacity.duration.200ms>
<template x-if="chunkLoading[`${result.doc_type}_${result.id}`]">
<template x-if="chunkLoading[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]">
<div style="color: #666; font-style: italic;">Loading chunk...</div>
</template>
<template x-if="!chunkLoading[`${result.doc_type}_${result.id}`]">
<template x-if="!chunkLoading[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]">
<div>
<template x-if="expandedChunks[`${result.doc_type}_${result.id}`]?.has_more_before">
<template x-if="expandedChunks[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]?.has_more_before">
<span class="chunk-ellipsis">...</span>
</template>
<span class="chunk-text" x-text="expandedChunks[`${result.doc_type}_${result.id}`]?.before_context"></span><span class="chunk-matched" x-text="expandedChunks[`${result.doc_type}_${result.id}`]?.chunk_text"></span><span class="chunk-text" x-text="expandedChunks[`${result.doc_type}_${result.id}`]?.after_context"></span><template x-if="expandedChunks[`${result.doc_type}_${result.id}`]?.has_more_after">
<span class="chunk-text" x-text="expandedChunks[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]?.before_context"></span><span class="chunk-matched" x-text="expandedChunks[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]?.chunk_text"></span><span class="chunk-text" x-text="expandedChunks[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]?.after_context"></span><template x-if="expandedChunks[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]?.has_more_after">
<span class="chunk-ellipsis">...</span>
</template>
</div>
+28 -17
View File
@@ -18,6 +18,8 @@ from starlette.authentication import requires
from starlette.requests import Request
from starlette.responses import HTMLResponse, JSONResponse
from nextcloud_mcp_server.client import NextcloudClient
logger = logging.getLogger(__name__)
# Setup Jinja2 environment for templates
@@ -25,14 +27,20 @@ _template_dir = Path(__file__).parent / "templates"
_jinja_env = Environment(loader=FileSystemLoader(_template_dir))
async def _get_authenticated_client_for_userinfo(request: Request) -> httpx.AsyncClient:
"""Get an authenticated HTTP client for user info page operations.
async def _get_authenticated_client_for_userinfo(request: Request) -> NextcloudClient:
"""Get an authenticated Nextcloud client for user info page operations.
This is a shared helper for authenticated routes that need to access
Nextcloud APIs. It handles both BasicAuth and OAuth authentication modes.
Args:
request: Starlette request object
Returns:
Authenticated httpx.AsyncClient
Authenticated NextcloudClient
Raises:
RuntimeError: If credentials/session not configured
"""
oauth_ctx = getattr(request.app.state, "oauth_context", None)
@@ -45,11 +53,15 @@ async def _get_authenticated_client_for_userinfo(request: Request) -> httpx.Asyn
if not all([nextcloud_host, username, password]):
raise RuntimeError("BasicAuth credentials not configured")
assert nextcloud_host is not None # Type narrowing for type checker
return httpx.AsyncClient(
from httpx import BasicAuth
assert nextcloud_host is not None
assert username is not None
assert password is not None
return NextcloudClient(
base_url=nextcloud_host,
auth=(username, password),
timeout=30.0,
username=username,
auth=BasicAuth(username, password),
)
# OAuth mode - get token from session
@@ -64,15 +76,14 @@ async def _get_authenticated_client_for_userinfo(request: Request) -> httpx.Asyn
raise RuntimeError("No access token found in session")
access_token = token_data["access_token"]
username = token_data.get("username")
nextcloud_host = oauth_ctx.get("config", {}).get("nextcloud_host", "")
if not nextcloud_host:
raise RuntimeError("Nextcloud host not configured")
if not nextcloud_host or not username:
raise RuntimeError("Nextcloud host or username not configured")
return httpx.AsyncClient(
base_url=nextcloud_host,
headers={"Authorization": f"Bearer {access_token}"},
timeout=30.0,
return NextcloudClient.from_token(
base_url=nextcloud_host, token=access_token, username=username
)
@@ -423,10 +434,10 @@ async def user_info_html(request: Request) -> HTMLResponse:
try:
from nextcloud_mcp_server.auth.permissions import is_nextcloud_admin
# Get authenticated HTTP client
http_client = await _get_authenticated_client_for_userinfo(request)
is_admin = await is_nextcloud_admin(request, http_client)
await http_client.aclose()
# Get authenticated Nextcloud client
nc_client = await _get_authenticated_client_for_userinfo(request)
is_admin = await is_nextcloud_admin(request, nc_client._client)
await nc_client.close()
except Exception as e:
logger.warning(f"Failed to check admin status: {e}")
# Default to not admin if check fails
+116 -135
View File
@@ -138,7 +138,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
_get_authenticated_client_for_userinfo,
)
async with await _get_authenticated_client_for_userinfo(request) as http_client: # noqa: F841
async with await _get_authenticated_client_for_userinfo(request) as nc_client: # noqa: F841
# Create search algorithm (no client needed - verification removed)
if algorithm == "semantic":
search_algo = SemanticSearchAlgorithm(score_threshold=score_threshold)
@@ -217,72 +217,75 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
}
)
# Fetch vectors for matching results from Qdrant
# Fetch vectors for specific matching chunks from Qdrant
vector_fetch_start = time.perf_counter()
qdrant_client = await get_qdrant_client()
doc_ids = [r.id for r in search_results]
# Retrieve vectors for the matching documents
from qdrant_client.models import FieldCondition, Filter, MatchAny
# Build filters for each specific chunk
from qdrant_client.models import FieldCondition, Filter, MatchValue
points_response = await qdrant_client.scroll(
collection_name=settings.get_collection_name(),
scroll_filter=Filter(
must=[
chunk_vectors_map = {} # Map (doc_id, chunk_start, chunk_end) -> vector
# Fetch vectors in batches by filtering on chunk-specific fields
for result in search_results:
chunk_start = result.chunk_start_offset
chunk_end = result.chunk_end_offset
# Build filter for this specific chunk
must_conditions = [
FieldCondition(
key="doc_id",
match=MatchValue(value=str(result.id)),
),
FieldCondition(
key="user_id",
match=MatchValue(value=username),
),
]
# Add chunk position filters if available
if chunk_start is not None:
must_conditions.append(
FieldCondition(
key="doc_id",
match=MatchAny(any=[str(doc_id) for doc_id in doc_ids]),
),
key="chunk_start_offset",
match=MatchValue(value=chunk_start),
)
)
if chunk_end is not None:
must_conditions.append(
FieldCondition(
key="user_id",
match={"value": username},
),
]
),
limit=len(doc_ids) * 2, # Account for multiple chunks per doc
with_vectors=["dense"], # Only fetch dense vectors for visualization
with_payload=["doc_id"], # Need doc_id to map vectors to results
)
key="chunk_end_offset",
match=MatchValue(value=chunk_end),
)
)
points = points_response[0]
if not points:
return JSONResponse(
{
"success": True,
"results": [],
"coordinates_2d": [],
"message": "No vectors found for results",
}
# Fetch this specific chunk vector
points_response = await qdrant_client.scroll(
collection_name=settings.get_collection_name(),
scroll_filter=Filter(must=must_conditions),
limit=1, # Only need the first match
with_vectors=["dense"],
with_payload=False,
)
# Extract dense vectors and group by document
def extract_dense_vector(point):
if point.vector is None:
return None
# If named vectors (dict), extract "dense"
if isinstance(point.vector, dict):
return point.vector.get("dense")
# If unnamed vector (array), use directly
return point.vector
points = points_response[0]
if points:
# Extract dense vector
point = points[0]
if point.vector is not None:
# If named vectors (dict), extract "dense"
if isinstance(point.vector, dict):
vector = point.vector.get("dense")
else:
vector = point.vector
# Group chunk vectors by doc_id
from collections import defaultdict
doc_chunks = defaultdict(list)
for point in points:
if point.payload:
# doc_id can be int (for notes) or str (for files - file path)
# Keep original type instead of forcing to int
doc_id = point.payload.get("doc_id", 0)
vector = extract_dense_vector(point)
if vector is not None:
doc_chunks[doc_id].append(vector)
chunk_key = (result.id, chunk_start, chunk_end)
chunk_vectors_map[chunk_key] = vector
vector_fetch_duration = time.perf_counter() - vector_fetch_start
if len(doc_chunks) < 2:
# Not enough documents for PCA
if len(chunk_vectors_map) < 2:
# Not enough chunks for PCA
return JSONResponse(
{
"success": True,
@@ -298,15 +301,15 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
],
"coordinates_3d": [[0, 0, 0]] * len(search_results),
"query_coords": [0, 0, 0],
"message": "Not enough documents for PCA",
"message": "Not enough chunks for PCA",
}
)
# Detect embedding dimension from first available vector
embedding_dim = None
for chunks in doc_chunks.values():
if chunks:
embedding_dim = len(chunks[0])
for vector in chunk_vectors_map.values():
if vector is not None:
embedding_dim = len(vector)
break
if embedding_dim is None:
@@ -320,23 +323,21 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
logger.info(f"Detected embedding dimension: {embedding_dim}")
# Average chunk vectors per document to create document-level embeddings
# Maintain order of search_results for coordinate mapping
doc_vectors = []
# Build chunk vectors array in search_results order (1:1 mapping)
chunk_vectors = []
for result in search_results:
if result.id in doc_chunks:
# Average all chunk embeddings for this document
chunk_vectors = np.array(doc_chunks[result.id])
avg_vector = np.mean(chunk_vectors, axis=0)
doc_vectors.append(avg_vector)
logger.debug(f"Doc {result.id}: averaged {len(chunk_vectors)} chunks")
chunk_key = (result.id, result.chunk_start_offset, result.chunk_end_offset)
if chunk_key in chunk_vectors_map:
chunk_vectors.append(chunk_vectors_map[chunk_key])
else:
# Document not found in vectors (shouldn't happen)
logger.warning(f"Doc {result.id} not found in fetched vectors")
# Use zero vector as fallback with detected dimension
doc_vectors.append(np.zeros(embedding_dim))
# Chunk not found in vectors (shouldn't happen)
logger.warning(
f"Chunk {chunk_key} not found in fetched vectors, using zero vector"
)
# Use zero vector as fallback
chunk_vectors.append(np.zeros(embedding_dim))
doc_vectors = np.array(doc_vectors)
chunk_vectors = np.array(chunk_vectors)
# Generate query embedding for visualization
query_embed_start = time.perf_counter()
@@ -348,9 +349,9 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
logger.info(f"Generated query embedding (dimension={len(query_embedding)})")
# Combine query vector with document vectors for PCA
# Combine query vector with chunk vectors for PCA
# Query will be the last point in the array
all_vectors = np.vstack([doc_vectors, np.array([query_embedding])])
all_vectors = np.vstack([chunk_vectors, np.array([query_embedding])])
# Normalize vectors to unit length (L2 normalization)
# This is critical because Qdrant uses COSINE distance, which only measures
@@ -396,17 +397,12 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
# Replace NaN with 0 to allow JSON serialization
coords_3d = np.nan_to_num(coords_3d, nan=0.0)
# Split query coords from document coords
# Split query coords from chunk coords
# Round to 2 decimal places for cleaner display
query_coords_3d = [
round(float(x), 2) for x in coords_3d[-1]
] # Last point is query
doc_coords_3d = coords_3d[:-1] # All but last are documents
total_chunks = sum(len(chunks) for chunks in doc_chunks.values())
avg_chunks_per_doc = (
total_chunks / len(doc_vectors) if doc_vectors.size > 0 else 0
)
chunk_coords_3d = coords_3d[:-1] # All but last are chunks
logger.info(
f"PCA explained variance: PC1={pca.explained_variance_ratio_[0]:.3f}, "
@@ -414,13 +410,14 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
f"PC3={pca.explained_variance_ratio_[2]:.3f}"
)
logger.info(
f"Embedding stats: documents={len(doc_vectors)}, "
f"total_chunks={total_chunks}, avg_chunks_per_doc={avg_chunks_per_doc:.1f}, "
f"query_dim={len(query_embedding)}, doc_vector_dim={doc_vectors.shape[1] if doc_vectors.size > 0 else 0}"
f"Embedding stats: chunks={len(chunk_vectors)}, "
f"query_dim={len(query_embedding)}, chunk_vector_dim={chunk_vectors.shape[1] if chunk_vectors.size > 0 else 0}"
)
# Coordinates already match search_results order (1:1 mapping)
result_coords = [[round(float(x), 2) for x in coord] for coord in doc_coords_3d]
result_coords = [
[round(float(x), 2) for x in coord] for coord in chunk_coords_3d
]
# Build response
response_results = [
@@ -449,7 +446,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
f"vector_fetch={vector_fetch_duration * 1000:.1f}ms ({vector_fetch_duration / total_duration * 100:.1f}%), "
f"query_embed={query_embed_duration * 1000:.1f}ms ({query_embed_duration / total_duration * 100:.1f}%), "
f"pca={pca_duration * 1000:.1f}ms ({pca_duration / total_duration * 100:.1f}%), "
f"results={len(search_results)}, doc_vectors={len(doc_vectors)}"
f"results={len(search_results)}, chunk_vectors={len(chunk_vectors)}"
)
return JSONResponse(
@@ -470,7 +467,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
"query_embed_ms": round(query_embed_duration * 1000, 2),
"pca_ms": round(pca_duration * 1000, 2),
"num_results": len(search_results),
"num_doc_vectors": len(doc_vectors),
"num_chunk_vectors": len(chunk_vectors),
},
}
)
@@ -519,75 +516,59 @@ async def chunk_context_endpoint(request: Request) -> JSONResponse:
status_code=400,
)
# Type assertions - we validated these above
assert doc_type is not None
assert doc_id is not None
assert start_str is not None
assert end_str is not None
start = int(start_str)
end = int(end_str)
# Currently only support notes
if doc_type != "note":
return JSONResponse(
{"success": False, "error": f"Unsupported doc_type: {doc_type}"},
status_code=400,
)
# Get authenticated HTTP client and fetch note
# Get authenticated Nextcloud client
from nextcloud_mcp_server.auth.userinfo_routes import (
_get_authenticated_client_for_userinfo,
)
from nextcloud_mcp_server.client.notes import NotesClient
from nextcloud_mcp_server.search.context import get_chunk_with_context
# Get username from request auth
username = (
request.user.display_name
if hasattr(request.user, "display_name")
else "unknown"
)
# Use context expansion module to fetch chunk with surrounding context
async with await _get_authenticated_client_for_userinfo(request) as nc_client:
chunk_context = await get_chunk_with_context(
nc_client=nc_client,
user_id=request.user.display_name, # User ID from auth
doc_id=doc_id,
doc_type=doc_type,
chunk_start=start,
chunk_end=end,
context_chars=context_chars,
)
# Create notes client with authenticated HTTP client
http_client = await _get_authenticated_client_for_userinfo(request)
notes_client = NotesClient(http_client, username)
# Fetch full note content
note = await notes_client.get_note(int(doc_id))
full_content = f"{note['title']}\n\n{note['content']}"
# Validate offsets
if start < 0 or end > len(full_content) or start >= end:
# Check if context expansion succeeded
if chunk_context is None:
return JSONResponse(
{
"success": False,
"error": f"Invalid offsets: start={start}, end={end}, content_length={len(full_content)}",
"error": f"Failed to fetch chunk context for {doc_type} {doc_id}",
},
status_code=400,
status_code=404,
)
# Extract chunk
chunk_text = full_content[start:end]
# Extract context before and after
before_start = max(0, start - context_chars)
before_context = full_content[before_start:start]
after_end = min(len(full_content), end + context_chars)
after_context = full_content[end:after_end]
# Determine if there's more content
has_more_before = before_start > 0
has_more_after = after_end < len(full_content)
logger.info(
f"Fetched chunk context for {doc_type}_{doc_id}: "
f"chunk_len={len(chunk_text)}, before_len={len(before_context)}, "
f"after_len={len(after_context)}"
f"chunk_len={len(chunk_context.chunk_text)}, "
f"before_len={len(chunk_context.before_context)}, "
f"after_len={len(chunk_context.after_context)}"
)
# Return response compatible with frontend expectations
return JSONResponse(
{
"success": True,
"chunk_text": chunk_text,
"before_context": before_context,
"after_context": after_context,
"has_more_before": has_more_before,
"has_more_after": has_more_after,
"chunk_text": chunk_context.chunk_text,
"before_context": chunk_context.before_context,
"after_context": chunk_context.after_context,
"has_more_before": chunk_context.has_before_truncation,
"has_more_after": chunk_context.has_after_truncation,
}
)
+9
View File
@@ -190,6 +190,15 @@ class NextcloudClient:
"""Helper to get the base WebDAV path for the authenticated user."""
return f"/remote.php/dav/files/{self.username}"
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit - closes all clients."""
await self.close()
return False # Don't suppress exceptions
async def close(self):
"""Close the HTTP client and CalDAV client."""
await self._client.aclose()
+23
View File
@@ -35,6 +35,29 @@ class SemanticSearchResult(BaseModel):
chunk_end_offset: Optional[int] = Field(
default=None, description="Character position where chunk ends in document"
)
page_number: Optional[int] = Field(
default=None, description="Page number for PDF documents"
)
# Context expansion fields (optional, populated when include_context=True)
has_context_expansion: bool = Field(
default=False, description="Whether context expansion was performed"
)
marked_text: Optional[str] = Field(
default=None,
description="Full text with position markers around matched chunk",
)
before_context: Optional[str] = Field(
default=None, description="Text before the matched chunk"
)
after_context: Optional[str] = Field(
default=None, description="Text after the matched chunk"
)
has_before_truncation: Optional[bool] = Field(
default=None, description="Whether before_context was truncated"
)
has_after_truncation: Optional[bool] = Field(
default=None, description="Whether after_context was truncated"
)
class SemanticSearchResponse(BaseResponse):
@@ -133,6 +133,9 @@ class SearchResult:
metadata: Additional algorithm-specific metadata
chunk_start_offset: Character position where chunk starts (None if not available)
chunk_end_offset: Character position where chunk ends (None if not available)
page_number: Page number for PDF documents (None for other doc types)
chunk_index: Zero-based index of this chunk in the document
total_chunks: Total number of chunks in the document
"""
id: int
@@ -143,6 +146,9 @@ class SearchResult:
metadata: dict[str, Any] | None = None
chunk_start_offset: int | None = None
chunk_end_offset: int | None = None
page_number: int | None = None
chunk_index: int = 0
total_chunks: int = 1
def __post_init__(self):
"""Validate score is non-negative.
+15 -6
View File
@@ -72,6 +72,9 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
Returns unverified results from Qdrant. Access verification should be
performed separately at the final output stage using verify_search_results().
Deduplicates by (doc_id, doc_type, chunk_start_offset, chunk_end_offset)
to show multiple chunks from the same document while avoiding duplicate chunks.
Args:
query: Natural language or keyword search query
user_id: User ID for filtering
@@ -176,21 +179,24 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
f"Top 3 {self.fusion_name.upper()} fusion scores: {top_scores}"
)
# Deduplicate by (doc_id, doc_type) - multiple chunks per document
seen_docs = set()
# Deduplicate by (doc_id, doc_type, chunk_start, chunk_end)
# This allows multiple chunks from same doc, but removes duplicate chunks
seen_chunks = set()
results = []
for result in search_response.points:
# doc_id can be int (notes) or str (files - file paths)
doc_id = result.payload["doc_id"]
doc_type = result.payload.get("doc_type", "note")
doc_key = (doc_id, doc_type)
chunk_start = result.payload.get("chunk_start_offset")
chunk_end = result.payload.get("chunk_end_offset")
chunk_key = (doc_id, doc_type, chunk_start, chunk_end)
# Skip if we've already seen this document
if doc_key in seen_docs:
# Skip if we've already seen this exact chunk
if chunk_key in seen_chunks:
continue
seen_docs.add(doc_key)
seen_chunks.add(chunk_key)
# Return unverified results (verification happens at output stage)
results.append(
@@ -207,6 +213,9 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
},
chunk_start_offset=result.payload.get("chunk_start_offset"),
chunk_end_offset=result.payload.get("chunk_end_offset"),
page_number=result.payload.get("page_number"),
chunk_index=result.payload.get("chunk_index", 0),
total_chunks=result.payload.get("total_chunks", 1),
)
)
+265
View File
@@ -0,0 +1,265 @@
"""Context expansion for search results.
Provides utilities to expand matched chunks with surrounding context and
position markers for better visualization and understanding of search results.
"""
import logging
from dataclasses import dataclass
from nextcloud_mcp_server.client import NextcloudClient
logger = logging.getLogger(__name__)
@dataclass
class ChunkContext:
"""Expanded chunk with surrounding context and position markers.
Attributes:
chunk_text: The matched chunk text
before_context: Text before the chunk (up to context_chars)
after_context: Text after the chunk (up to context_chars)
chunk_start_offset: Character position where chunk starts in document
chunk_end_offset: Character position where chunk ends in document
page_number: Page number for PDFs (None for other doc types)
chunk_index: Zero-based chunk index (N in "chunk N of M")
total_chunks: Total number of chunks in document
marked_text: Full text with position markers around the chunk
has_before_truncation: True if before_context was truncated
has_after_truncation: True if after_context was truncated
"""
chunk_text: str
before_context: str
after_context: str
chunk_start_offset: int
chunk_end_offset: int
page_number: int | None
chunk_index: int
total_chunks: int
marked_text: str
has_before_truncation: bool
has_after_truncation: bool
async def get_chunk_with_context(
nc_client: NextcloudClient,
user_id: str,
doc_id: str | int,
doc_type: str,
chunk_start: int,
chunk_end: int,
page_number: int | None = None,
chunk_index: int = 0,
total_chunks: int = 1,
context_chars: int = 300,
) -> ChunkContext | None:
"""Fetch chunk with surrounding context from original document.
Retrieves the full document text and expands the matched chunk to include
surrounding context for better understanding. Inserts position markers
around the chunk for visualization.
Args:
nc_client: Authenticated Nextcloud client
user_id: User ID who owns the document
doc_id: Document ID (note ID or file path)
doc_type: Type of document ("note", "file", etc.)
chunk_start: Character offset where chunk starts
chunk_end: Character offset where chunk ends
page_number: Optional page number for PDFs
chunk_index: Zero-based chunk index in document
total_chunks: Total number of chunks in document
context_chars: Number of characters to include before/after chunk
Returns:
ChunkContext with expanded context and markers, or None if document
cannot be retrieved
"""
# Fetch full document text
full_text = await _fetch_document_text(nc_client, doc_id, doc_type)
if full_text is None:
logger.warning(
f"Could not fetch document text for {doc_type} {doc_id}, "
"skipping context expansion"
)
return None
# Validate offsets
if chunk_start < 0 or chunk_end > len(full_text) or chunk_start >= chunk_end:
logger.warning(
f"Invalid chunk offsets for {doc_type} {doc_id}: "
f"start={chunk_start}, end={chunk_end}, doc_len={len(full_text)}"
)
return None
# Extract chunk text
chunk_text = full_text[chunk_start:chunk_end]
# Calculate context boundaries
context_start = max(0, chunk_start - context_chars)
context_end = min(len(full_text), chunk_end + context_chars)
# Extract context
before_context = full_text[context_start:chunk_start]
after_context = full_text[chunk_end:context_end]
# Check for truncation
has_before_truncation = context_start > 0
has_after_truncation = context_end < len(full_text)
# Create marked text with position markers
marked_text = _insert_position_markers(
before_context=before_context,
chunk_text=chunk_text,
after_context=after_context,
page_number=page_number,
chunk_index=chunk_index,
total_chunks=total_chunks,
has_before_truncation=has_before_truncation,
has_after_truncation=has_after_truncation,
)
return ChunkContext(
chunk_text=chunk_text,
before_context=before_context,
after_context=after_context,
chunk_start_offset=chunk_start,
chunk_end_offset=chunk_end,
page_number=page_number,
chunk_index=chunk_index,
total_chunks=total_chunks,
marked_text=marked_text,
has_before_truncation=has_before_truncation,
has_after_truncation=has_after_truncation,
)
async def _fetch_document_text(
nc_client: NextcloudClient, doc_id: str | int, doc_type: str
) -> str | None:
"""Fetch full text content of a document.
Args:
nc_client: Authenticated Nextcloud client
doc_id: Document ID (note ID or file path)
doc_type: Type of document ("note", "file", etc.)
Returns:
Full document text, or None if document cannot be retrieved
"""
try:
if doc_type == "note":
# Fetch note by ID
note = await nc_client.notes.get_note(note_id=int(doc_id))
return note.get("content", "")
elif doc_type == "file":
# Fetch file content via WebDAV
try:
file_path = str(doc_id)
file_content, content_type = await nc_client.webdav.read_file(file_path)
# Check if it's a PDF (by content type or file extension)
is_pdf = (
content_type and "pdf" in content_type.lower()
) or file_path.lower().endswith(".pdf")
if is_pdf:
# Extract text from PDF using PyMuPDF
import fitz # PyMuPDF
logger.debug(f"Extracting text from PDF: {file_path}")
pdf_doc = fitz.open(stream=file_content, filetype="pdf")
text_parts = []
for page in pdf_doc:
text_parts.append(page.get_text())
pdf_doc.close()
full_text = "\n".join(text_parts)
logger.debug(
f"Extracted {len(full_text)} characters from "
f"{len(text_parts)} pages in {file_path}"
)
return full_text
else:
# Assume it's a text file, decode to string
logger.debug(f"Decoding text file: {file_path}")
return file_content.decode("utf-8", errors="replace")
except Exception as e:
logger.error(
f"Error fetching file content for {doc_id}: {e}", exc_info=True
)
return None
else:
logger.warning(f"Unsupported doc_type for context expansion: {doc_type}")
return None
except Exception as e:
logger.error(f"Error fetching document {doc_type} {doc_id}: {e}", exc_info=True)
return None
def _insert_position_markers(
before_context: str,
chunk_text: str,
after_context: str,
page_number: int | None,
chunk_index: int,
total_chunks: int,
has_before_truncation: bool,
has_after_truncation: bool,
) -> str:
"""Insert position markers around matched chunk.
Creates markdown-formatted text with visual markers indicating chunk
boundaries and metadata.
Args:
before_context: Text before chunk
chunk_text: The matched chunk
after_context: Text after chunk
page_number: Optional page number
chunk_index: Zero-based chunk index
total_chunks: Total chunks in document
has_before_truncation: Whether before_context is truncated
has_after_truncation: Whether after_context is truncated
Returns:
Formatted text with position markers
"""
# Build position metadata
position_parts = []
if page_number is not None:
position_parts.append(f"Page {page_number}")
position_parts.append(f"Chunk {chunk_index + 1} of {total_chunks}")
position_metadata = ", ".join(position_parts)
# Build marked text
parts = []
# Add truncation indicator for before context
if has_before_truncation:
parts.append("**[...]**\n\n")
# Add before context if present
if before_context:
parts.append(before_context)
# Add chunk start marker
parts.append(f"\n\n🔍 **MATCHED CHUNK START** ({position_metadata})\n\n")
# Add chunk text
parts.append(chunk_text)
# Add chunk end marker
parts.append("\n\n🔍 **MATCHED CHUNK END**\n\n")
# Add after context if present
if after_context:
parts.append(after_context)
# Add truncation indicator for after context
if has_after_truncation:
parts.append("\n\n**[...]**")
return "".join(parts)
+15 -6
View File
@@ -50,6 +50,9 @@ class SemanticSearchAlgorithm(SearchAlgorithm):
Returns unverified results from Qdrant. Access verification should be
performed separately at the final output stage using verify_search_results().
Deduplicates by (doc_id, doc_type, chunk_start_offset, chunk_end_offset)
to show multiple chunks from the same document while avoiding duplicate chunks.
Args:
query: Natural language search query
user_id: User ID for filtering
@@ -123,21 +126,24 @@ class SemanticSearchAlgorithm(SearchAlgorithm):
top_scores = [p.score for p in search_response.points[:3]]
logger.debug(f"Top 3 similarity scores: {top_scores}")
# Deduplicate by (doc_id, doc_type) - multiple chunks per document
seen_docs = set()
# Deduplicate by (doc_id, doc_type, chunk_start, chunk_end)
# This allows multiple chunks from same doc, but removes duplicate chunks
seen_chunks = set()
results = []
for result in search_response.points:
# doc_id can be int (notes) or str (files - file paths)
doc_id = result.payload["doc_id"]
doc_type = result.payload.get("doc_type", "note")
doc_key = (doc_id, doc_type)
chunk_start = result.payload.get("chunk_start_offset")
chunk_end = result.payload.get("chunk_end_offset")
chunk_key = (doc_id, doc_type, chunk_start, chunk_end)
# Skip if we've already seen this document
if doc_key in seen_docs:
# Skip if we've already seen this exact chunk
if chunk_key in seen_chunks:
continue
seen_docs.add(doc_key)
seen_chunks.add(chunk_key)
# Return unverified results (verification happens at output stage)
results.append(
@@ -153,6 +159,9 @@ class SemanticSearchAlgorithm(SearchAlgorithm):
},
chunk_start_offset=result.payload.get("chunk_start_offset"),
chunk_end_offset=result.payload.get("chunk_end_offset"),
page_number=result.payload.get("page_number"),
chunk_index=result.payload.get("chunk_index", 0),
total_chunks=result.payload.get("total_chunks", 1),
)
)