perf: fix vector viz search performance and visual encoding

This commit addresses critical performance issues with vector visualization
search (reducing time from 40s to ~2s) and improves result visualization
through better visual encoding.

## Performance Fixes

### 1. Fix blocking sleep in retry decorator (base.py:51)
- Changed `time.sleep(5)` to `await anyio.sleep(5)` in @retry_on_429
- Prevents entire event loop from freezing during rate limit retries
- Impact: Reduced search time from 22s to 16s initially

### 2. Add concurrency limiting for verification (verification.py:77-93)
- Added `anyio.Semaphore(20)` to limit concurrent HTTP requests
- Prevents connection pool exhaustion (RequestError) from 90+ simultaneous requests
- Fixes false filtering (was filtering 77/90 results incorrectly)
- Note: Semaphore still in code but verification removed from viz endpoint

### 3. Remove unnecessary verification from viz endpoint (viz_routes.py:483-486)
- Visualization only needs Qdrant metadata (title, excerpt), not full content
- Verification only required for sampling (LLM needs full note content)
- Impact: Reduced search time from 43.7s to ~2s (final fix)

### 4. Restore streaming scanner pattern (scanner.py)
- Process notes one-at-a-time using async generator
- Avoids loading all notes into memory

## Visualization Improvements

### 5. Result-relative score normalization (viz_routes.py:489-504)
- Normalize scores within result set: best=1.0, worst=0.0
- Removes arbitrary RRF normalization (theoretical max didn't make sense)
- Makes visual encoding meaningful regardless of algorithm scores

### 6. Power scaling for marker sizes (userinfo_routes.py:743)
- Changed from linear `8 + (score * 12)` to power `6 + (score² * 14)`
- Creates dramatic visual contrast: 0.0→6px, 0.5→9.5px, 1.0→20px
- Combined with opacity (0.2-1.0) for clear visual hierarchy

### 7. Multi-channel visual encoding (userinfo_routes.py:740-745)
- Size: Exponentially scaled with score²
- Opacity: Linear 0.2-1.0 (keeps all points visible)
- Color: Viridis gradient (blue→yellow)
- Effect: Top results are large/bright/opaque, context results small/dim/transparent

## Result
- Search time: 40s → ~2s (20x faster)
- Visual contrast: Subtle → dramatic (clear result hierarchy)
- No arbitrary cutoffs: All results visible, best naturally highlighted

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-11-16 07:01:35 +01:00
parent c8d9cc24e0
commit 137d1d6c75
5 changed files with 161 additions and 133 deletions
+15 -3
View File
@@ -725,6 +725,11 @@ async def user_info_html(request: Request) -> HTMLResponse:
}},
renderPlot(coordinates, results) {{
// Calculate score range for auto-scaling
const scores = results.map(r => r.score);
const minScore = Math.min(...scores);
const maxScore = Math.max(...scores);
const trace = {{
x: coordinates.map(c => c[0]),
y: coordinates.map(c => c[1]),
@@ -732,11 +737,18 @@ async def user_info_html(request: Request) -> HTMLResponse:
type: 'scatter',
text: results.map(r => `${{r.title}}<br>Score: ${{r.score.toFixed(3)}}`),
marker: {{
size: 8,
color: results.map(r => r.score),
// Multi-channel encoding: size + opacity + color for visual hierarchy
// Power scaling (score^2) amplifies visual differences dramatically
// score=0.0 → 6px, score=0.5 → 9.5px, score=1.0 → 20px
size: results.map(r => 6 + (Math.pow(r.score, 2) * 14)),
// Linear opacity scaling (0.2-1.0 range keeps all points visible)
opacity: results.map(r => 0.2 + (r.score * 0.8)),
// Color gradient shows score
color: scores,
colorscale: 'Viridis',
showscale: true,
colorbar: {{ title: 'Score' }},
colorbar: {{ title: 'Relative Score' }},
// Scores are normalized 0-1 within result set
cmin: 0,
cmax: 1
}}
+50 -48
View File
@@ -11,6 +11,7 @@ All processing happens server-side following ADR-012:
"""
import logging
import time
import numpy as np
from starlette.authentication import requires
@@ -381,56 +382,17 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
)
try:
# Start total request timer
request_start = time.perf_counter()
# Get authenticated HTTP client from session
# In BasicAuth mode: uses username/password from session
# In OAuth mode: uses access token from session
from nextcloud_mcp_server.auth.userinfo_routes import (
_get_authenticated_client_for_userinfo,
)
from nextcloud_mcp_server.client.notes import NotesClient
async with await _get_authenticated_client_for_userinfo(request) as http_client:
# Create NotesClient directly with authenticated HTTP client
notes_client = NotesClient(http_client, username)
# Wrap in a minimal client object for search algorithms
# This conforms to NextcloudClientProtocol but only implements notes
class MinimalNextcloudClient:
def __init__(self, notes_client, username):
self._notes = notes_client
self.username = username
@property
def notes(self):
return self._notes
@property
def webdav(self):
return None
@property
def calendar(self):
return None
@property
def contacts(self):
return None
@property
def deck(self):
return None
@property
def cookbook(self):
return None
@property
def tables(self):
return None
nextcloud_client = MinimalNextcloudClient(notes_client, username)
# Create search algorithm
async with await _get_authenticated_client_for_userinfo(request) as http_client: # noqa: F841
# Create search algorithm (no client needed - verification removed)
if algorithm == "semantic":
search_algo = SemanticSearchAlgorithm(score_threshold=score_threshold)
elif algorithm == "keyword":
@@ -451,6 +413,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
# Execute search (supports cross-app when doc_types=None)
# Get unverified results with buffer for filtering
search_start = time.perf_counter()
all_results = []
if doc_types is None or len(doc_types) == 0:
# Cross-app search - search all indexed types
@@ -476,13 +439,28 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
# Sort by score before verification
all_results.sort(key=lambda r: r.score, reverse=True)
# Verify access for all results (deduplicates and filters)
from nextcloud_mcp_server.search.verification import verify_search_results
# No verification needed for visualization - we only need Qdrant metadata
# (title, excerpt, doc_type) which is already in search results.
# Verification is only needed for sampling (LLM needs full content).
search_results = all_results[:limit]
search_duration = time.perf_counter() - search_start
verified_results = await verify_search_results(
all_results, nextcloud_client
# Normalize scores relative to this result set for better visualization
# (best result = 1.0, worst result = 0.0 within THIS result set)
# This makes visual encoding meaningful regardless of RRF normalization
if search_results:
scores = [r.score for r in search_results]
min_score, max_score = min(scores), max(scores)
score_range = max_score - min_score if max_score > min_score else 1.0
logger.info(
f"Normalizing scores for viz: original range [{min_score:.3f}, {max_score:.3f}] "
f"→ [0.0, 1.0]"
)
search_results = verified_results[:limit]
# Rescale each result's score to 0-1 within this result set
for r in search_results:
r.score = (r.score - min_score) / score_range
if not search_results:
return JSONResponse(
@@ -495,6 +473,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
)
# Fetch vectors for matching results from Qdrant
vector_fetch_start = time.perf_counter()
qdrant_client = await get_qdrant_client()
doc_ids = [r.id for r in search_results]
@@ -534,6 +513,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
# Extract vectors
vectors = np.array([p.vector for p in points if p.vector is not None])
vector_fetch_duration = time.perf_counter() - vector_fetch_start
if len(vectors) < 2:
# Not enough points for PCA
@@ -556,8 +536,10 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
)
# Apply PCA dimensionality reduction (768-dim → 2D)
pca_start = time.perf_counter()
pca = PCA(n_components=2)
coords_2d = pca.fit_transform(vectors)
pca_duration = time.perf_counter() - pca_start
# After fit, these attributes are guaranteed to be set
assert pca.explained_variance_ratio_ is not None
@@ -590,6 +572,18 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
for r in search_results
]
# Calculate total request duration
total_duration = time.perf_counter() - request_start
# Log comprehensive timing metrics
logger.info(
f"Viz search timing: total={total_duration * 1000:.1f}ms, "
f"search={search_duration * 1000:.1f}ms ({search_duration / total_duration * 100:.1f}%), "
f"vector_fetch={vector_fetch_duration * 1000:.1f}ms ({vector_fetch_duration / total_duration * 100:.1f}%), "
f"pca={pca_duration * 1000:.1f}ms ({pca_duration / total_duration * 100:.1f}%), "
f"results={len(search_results)}, vectors={len(vectors)}"
)
return JSONResponse(
{
"success": True,
@@ -599,6 +593,14 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
"pc1": float(pca.explained_variance_ratio_[0]),
"pc2": float(pca.explained_variance_ratio_[1]),
},
"timing": {
"total_ms": round(total_duration * 1000, 2),
"search_ms": round(search_duration * 1000, 2),
"vector_fetch_ms": round(vector_fetch_duration * 1000, 2),
"pca_ms": round(pca_duration * 1000, 2),
"num_results": len(search_results),
"num_vectors": len(vectors),
},
}
)
+2 -1
View File
@@ -5,6 +5,7 @@ import time
from abc import ABC
from functools import wraps
import anyio
from httpx import AsyncClient, HTTPStatusError, RequestError, codes
from nextcloud_mcp_server.observability.metrics import (
@@ -47,7 +48,7 @@ def retry_on_429(func):
# Record retry metric (extract app name from args if available)
if len(args) > 0 and hasattr(args[0], "app_name"):
record_nextcloud_api_retry(app=args[0].app_name, reason="429")
time.sleep(5)
await anyio.sleep(5)
elif e.response.status_code == 404:
# 404 errors are often expected (e.g., checking if attachments exist)
# Log as debug instead of warning
+33 -22
View File
@@ -74,39 +74,50 @@ async def verify_search_results(
# Use list to maintain order (index-based storage)
verified_results = [None] * len(unique_results)
# Limit concurrent verifications to prevent connection pool exhaustion
# Without this, launching 90+ simultaneous HTTP requests overwhelms the
# connection pool, causing RequestError failures
max_concurrent = 20
semaphore = anyio.Semaphore(max_concurrent)
async def verify_one(index: int, result: SearchResult):
"""
Verify a single document and store result at index.
Uses semaphore to limit concurrent requests and prevent connection pool exhaustion.
Args:
index: Position in verified_results list
result: Search result to verify
"""
try:
if result.doc_type == "note":
# Fetch note to verify access and get fresh metadata
note = await nextcloud_client.notes.get_note(result.id)
# Update metadata with fresh data from Nextcloud
updated_metadata = {**(result.metadata or {}), **note}
verified_results[index] = replace(result, metadata=updated_metadata)
# TODO: Add verification for other doc types (calendar, deck, file, etc.)
else:
# For now, assume other types are accessible
# In production, add proper verification for each type
logger.debug(
f"No verification implemented for doc_type={result.doc_type}, "
"assuming accessible"
)
verified_results[index] = result
async with semaphore: # Limit concurrent verifications
try:
if result.doc_type == "note":
# Fetch note to verify access and get fresh metadata
note = await nextcloud_client.notes.get_note(result.id)
# Update metadata with fresh data from Nextcloud
updated_metadata = {**(result.metadata or {}), **note}
verified_results[index] = replace(result, metadata=updated_metadata)
# TODO: Add verification for other doc types (calendar, deck, file, etc.)
else:
# For now, assume other types are accessible
# In production, add proper verification for each type
logger.debug(
f"No verification implemented for doc_type={result.doc_type}, "
"assuming accessible"
)
verified_results[index] = result
except Exception as e:
# Document is inaccessible (403, 404, or other error)
# Log at debug level since this is expected for filtered results
logger.debug(f"Document {result.doc_type}/{result.id} not accessible: {e}")
verified_results[index] = None
except Exception as e:
# Document is inaccessible (403, 404, or other error)
# Log at debug level since this is expected for filtered results
logger.debug(
f"Document {result.doc_type}/{result.id} not accessible: {e}"
)
verified_results[index] = None
# Run all verifications in parallel using anyio task group
# This provides structured concurrency with automatic cancellation on errors
# Semaphore limits concurrency to prevent overwhelming connection pool
async with anyio.create_task_group() as tg:
for idx, result in enumerate(unique_results):
tg.start_soon(verify_one, idx, result)
+61 -59
View File
@@ -182,73 +182,43 @@ async def scan_user_documents(
f"[SCAN-{scan_id}] Using pruneBefore={prune_before} to optimize data transfer"
)
# Fetch all notes from Nextcloud
notes = [
note
async for note in nc_client.notes.get_all_notes(prune_before=prune_before)
]
logger.info(f"[SCAN-{scan_id}] Found {len(notes)} notes for {user_id}")
# Get indexed state from Qdrant first (for incremental sync)
indexed_docs = {}
if not initial_sync:
qdrant_client = await get_qdrant_client()
scroll_result = await qdrant_client.scroll(
collection_name=get_settings().get_collection_name(),
scroll_filter=Filter(
must=[
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
FieldCondition(key="doc_type", match=MatchValue(value="note")),
]
),
with_payload=["doc_id", "indexed_at"],
with_vectors=False,
limit=10000,
)
# Record documents scanned
record_vector_sync_scan(len(notes))
indexed_docs = {
point.payload["doc_id"]: point.payload["indexed_at"]
for point in scroll_result[0]
}
if initial_sync:
# Send everything on first sync
for note in notes:
modified_at = note.get("modified", 0)
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=str(note["id"]),
doc_type="note",
operation="index",
modified_at=modified_at,
)
)
logger.info(f"Sent {len(notes)} documents for initial sync: {user_id}")
return
logger.debug(f"Found {len(indexed_docs)} indexed documents in Qdrant")
# Get indexed state from Qdrant
qdrant_client = await get_qdrant_client()
scroll_result = await qdrant_client.scroll(
collection_name=get_settings().get_collection_name(),
scroll_filter=Filter(
must=[
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
FieldCondition(key="doc_type", match=MatchValue(value="note")),
]
),
with_payload=["doc_id", "indexed_at"],
with_vectors=False,
limit=10000,
)
indexed_docs = {
point.payload["doc_id"]: point.payload["indexed_at"]
for point in scroll_result[0]
}
logger.debug(f"Found {len(indexed_docs)} indexed documents in Qdrant")
# Compare and queue changes
# Stream notes from Nextcloud and process immediately
note_count = 0
queued = 0
nextcloud_doc_ids = {str(note["id"]) for note in notes}
nextcloud_doc_ids = set()
for note in notes:
async for note in nc_client.notes.get_all_notes(prune_before=prune_before):
note_count += 1
doc_id = str(note["id"])
indexed_at = indexed_docs.get(doc_id)
nextcloud_doc_ids.add(doc_id)
modified_at = note.get("modified", 0)
# If document reappeared, remove from potentially_deleted
doc_key = (user_id, doc_id)
if doc_key in _potentially_deleted:
logger.debug(
f"Document {doc_id} reappeared, removing from deletion grace period"
)
del _potentially_deleted[doc_key]
# Send if never indexed or modified since last index
if indexed_at is None or modified_at > indexed_at:
if initial_sync:
# Send everything on first sync
await send_stream.send(
DocumentTask(
user_id=user_id,
@@ -259,6 +229,38 @@ async def scan_user_documents(
)
)
queued += 1
else:
# Incremental sync: compare with indexed state
indexed_at = indexed_docs.get(doc_id)
# If document reappeared, remove from potentially_deleted
doc_key = (user_id, doc_id)
if doc_key in _potentially_deleted:
logger.debug(
f"Document {doc_id} reappeared, removing from deletion grace period"
)
del _potentially_deleted[doc_key]
# Send if never indexed or modified since last index
if indexed_at is None or modified_at > indexed_at:
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=doc_id,
doc_type="note",
operation="index",
modified_at=modified_at,
)
)
queued += 1
# Log and record metrics after streaming
logger.info(f"[SCAN-{scan_id}] Found {note_count} notes for {user_id}")
record_vector_sync_scan(note_count)
if initial_sync:
logger.info(f"Sent {queued} documents for initial sync: {user_id}")
return
# Check for deleted documents (in Qdrant but not in Nextcloud)
# Use grace period: only delete after 2 consecutive scans confirm absence