feat(vector): add Deck card vector search with visualization support

Adds comprehensive vector search support for Nextcloud Deck cards,
including semantic search indexing, chunk preview in the vector viz UI,
and proper deep linking to cards.

**Vector Search Indexing**
- Add deck_card scanning in scanner.py (scan_deck_cards function)
- Index cards from non-archived, non-deleted boards
- Store metadata: board_id, board_title, stack_id, stack_title, card_type, duedate, owner
- Content structure: title + "\n\n" + description (matches indexing format)
- Incremental sync based on lastModified timestamp
- Deletion tracking with grace period

**Vector Visualization Support**
- Add deck_card handler in context.py for chunk preview expansion
- Include board_id in search result metadata (bm25_hybrid.py, semantic.py)
- Expose metadata in viz_routes.py JSON responses
- Update vector-viz.js to construct proper Deck URLs: /apps/deck/board/{board_id}/card/{card_id}
- Update vector_viz.html filter label from "Deck" to "Deck Cards"

**Bug Fixes**
- Skip soft-deleted boards (deletedAt > 0) to prevent 403 Forbidden errors
- Applies to scanner, processor, and context expansion code paths
- Deck API returns deleted boards but rejects stack access with 403

**Testing**
- Add integration tests in test_deck_vector_search.py:
  - test_deck_card_semantic_search: Filtered search with doc_type="deck_card"
  - test_deck_card_appears_in_cross_app_search: Cross-app search includes deck cards
  - test_deck_card_chunk_context: Chunk context fetching for viz preview

**Documentation**
- Update README.md: Add Deck cards to semantic search feature list
- Update semantic-search-architecture.md: Document deck_card support
- Update nc_semantic_search tool documentation

**Type Safety**
- Fix type narrowing for page_boundaries (could be None) using cast()
- Fix scanner.py payload None check for type safety

Resolves vector search for Deck cards across indexing, search, and visualization.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-12-13 23:51:18 +01:00
parent 6e3f9f6e79
commit 20404cf3f2
12 changed files with 622 additions and 30 deletions
+83 -5
View File
@@ -6,6 +6,7 @@ Processes documents from stream: fetches content, generates embeddings, stores i
import logging
import time
import uuid
from typing import Any, cast
import anyio
from anyio.abc import TaskStatus
@@ -311,6 +312,64 @@ async def _index_document(
file_path = None
content_bytes = None
content_type = None
elif doc_task.doc_type == "deck_card":
# Fetch card from Deck API
# Note: We need board_id and stack_id to fetch the card
# For now, we'll need to get all boards and find the card
# This is not optimal, but Deck API requires board_id and stack_id
boards = await nc_client.deck.get_boards()
card_found = False
for board in boards:
if card_found:
break
# Skip deleted boards (soft delete: deletedAt > 0)
if board.deletedAt > 0:
continue
stacks = await nc_client.deck.get_stacks(board.id)
for stack in stacks:
if card_found:
break
if stack.cards:
for card in stack.cards:
if card.id == int(doc_task.doc_id):
# Build content from card title and description
content_parts = [card.title]
if card.description:
content_parts.append(card.description)
content = "\n\n".join(content_parts)
title = card.title
# Store deck-specific metadata
file_metadata = {
"board_id": board.id,
"board_title": board.title,
"stack_id": stack.id,
"stack_title": stack.title,
"card_type": card.type,
"duedate": (
card.duedate.isoformat()
if card.duedate
else None
),
"archived": card.archived,
"owner": (
card.owner.uid
if hasattr(card.owner, "uid")
else str(card.owner)
),
}
etag = card.etag or ""
file_path = None
content_bytes = None
content_type = None
card_found = True
break
if not card_found:
raise ValueError(
f"Deck card {doc_task.doc_id} not found in any board/stack"
)
elif doc_task.doc_type == "file":
# For files, doc_id is now the numeric file ID, file_path comes from DocumentTask
if not doc_task.file_path:
@@ -399,14 +458,16 @@ async def _index_document(
# Assign page numbers to chunks if page boundaries are available (PDFs)
page_boundaries = file_metadata.get("page_boundaries")
if doc_task.doc_type == "file" and page_boundaries is not None:
# Type narrowing: page_boundaries is guaranteed to be list[dict] here
page_boundaries_list = cast(list[dict[str, Any]], page_boundaries)
with trace_operation(
"vector_sync.assign_page_numbers",
attributes={
"vector_sync.chunk_count": len(chunks),
"vector_sync.page_count": len(page_boundaries),
"vector_sync.page_count": len(page_boundaries_list),
},
):
assign_page_numbers(chunks, page_boundaries)
assign_page_numbers(chunks, page_boundaries_list)
# Diagnostic: Verify page number assignment
assigned_count = sum(1 for c in chunks if c.page_number is not None)
@@ -429,8 +490,8 @@ async def _index_document(
f"Text length: {len(content)}, "
f"Chunks: {len(chunks)}, "
f"Chunk offset range: [{chunks[0].start_offset}:{chunks[-1].end_offset}], "
f"Page boundaries: {len(page_boundaries)} pages, "
f"First boundary: {page_boundaries[0] if page_boundaries else 'None'}"
f"Page boundaries: {len(page_boundaries_list)} pages, "
f"First boundary: {page_boundaries_list[0] if page_boundaries_list else 'None'}"
)
# Extract chunk texts for embedding
@@ -504,6 +565,9 @@ async def _index_document(
logger.warning("No page boundaries available, skipping highlighting")
return
# Type narrowing: page_boundaries is guaranteed to be list[dict] here
page_boundaries_list = cast(list[dict[str, Any]], page_boundaries)
logger.info(
f"Batch generating highlighted page images for {len(chunk_data)} PDF chunks"
)
@@ -514,7 +578,7 @@ async def _index_document(
lambda: PDFHighlighter.highlight_chunks_batch(
pdf_bytes=content_bytes,
chunks=chunk_data,
page_boundaries=page_boundaries,
page_boundaries=page_boundaries_list,
full_text=content,
color="yellow",
zoom=2.0,
@@ -623,6 +687,20 @@ async def _index_document(
if doc_task.doc_type == "news_item"
else {}
),
# Deck card-specific metadata
**(
{
"board_id": file_metadata.get("board_id"),
"board_title": file_metadata.get("board_title"),
"stack_id": file_metadata.get("stack_id"),
"stack_title": file_metadata.get("stack_title"),
"card_type": file_metadata.get("card_type"),
"duedate": file_metadata.get("duedate"),
"owner": file_metadata.get("owner"),
}
if doc_task.doc_type == "deck_card"
else {}
),
# Highlighted page image (PDF only)
**(
{
+216 -3
View File
@@ -79,9 +79,11 @@ async def get_last_indexed_timestamp(user_id: str) -> int | None:
if scroll_result[0]:
timestamps = [
point.payload.get("indexed_at", 0) for point in scroll_result[0]
point.payload.get("indexed_at", 0)
for point in scroll_result[0]
if point.payload is not None
]
max_timestamp = max(timestamps)
max_timestamp = max(timestamps) if timestamps else 0
logger.info(
f"Max indexed_at: {max_timestamp}, timestamps sample: {timestamps[:3]}"
)
@@ -564,9 +566,23 @@ async def scan_user_documents(
except Exception as e:
logger.warning(f"Failed to scan news items for {user_id}: {e}")
# Scan Deck cards
deck_queued = 0
try:
deck_queued = await scan_deck_cards(
user_id=user_id,
send_stream=send_stream,
nc_client=nc_client,
initial_sync=initial_sync,
scan_id=scan_id,
)
queued += deck_queued
except Exception as e:
logger.warning(f"Failed to scan deck cards for {user_id}: {e}")
if queued > 0:
logger.info(
f"Sent {queued} documents ({file_queued} files, {news_queued} news items) for incremental sync: {user_id}"
f"Sent {queued} documents ({file_queued} files, {news_queued} news items, {deck_queued} deck cards) for incremental sync: {user_id}"
)
else:
logger.debug(f"No changes detected for {user_id}")
@@ -753,3 +769,200 @@ async def scan_news_items(
_potentially_deleted[doc_key] = current_time
return queued
async def scan_deck_cards(
user_id: str,
send_stream: MemoryObjectSendStream[DocumentTask],
nc_client: NextcloudClient,
initial_sync: bool,
scan_id: int,
) -> int:
"""
Scan user's Deck cards and queue changed cards for indexing.
Indexes cards from all non-archived boards and stacks.
Args:
user_id: User to scan
send_stream: Stream to send changed documents to processors
nc_client: Authenticated Nextcloud client
initial_sync: If True, send all documents (first-time sync)
scan_id: Scan identifier for logging
Returns:
Number of cards queued for processing
"""
settings = get_settings()
queued = 0
# Get indexed deck card IDs from Qdrant (for deletion tracking)
indexed_card_ids: set[str] = set()
if not initial_sync:
qdrant_client = await get_qdrant_client()
scroll_result = await qdrant_client.scroll(
collection_name=settings.get_collection_name(),
scroll_filter=Filter(
must=[
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
FieldCondition(key="doc_type", match=MatchValue(value="deck_card")),
]
),
with_payload=["doc_id"],
with_vectors=False,
limit=10000,
)
indexed_card_ids = {
point.payload["doc_id"]
for point in (scroll_result[0] or [])
if point.payload is not None
}
logger.debug(f"Found {len(indexed_card_ids)} indexed deck cards in Qdrant")
# Fetch all boards
boards = await nc_client.deck.get_boards()
logger.debug(f"[SCAN-{scan_id}] Found {len(boards)} deck boards")
card_count = 0
nextcloud_card_ids: set[str] = set()
# Iterate through boards
for board in boards:
# Skip archived boards
if board.archived:
continue
# Skip deleted boards (soft delete: deletedAt > 0)
if board.deletedAt > 0:
logger.debug(f"[SCAN-{scan_id}] Skipping deleted board {board.id}")
continue
# Get stacks for this board
stacks = await nc_client.deck.get_stacks(board.id)
# Iterate through stacks
for stack in stacks:
# Skip if stack has no cards
if not stack.cards:
continue
# Iterate through cards in stack
for card in stack.cards:
# Skip archived cards
if card.archived:
continue
card_count += 1
doc_id = str(card.id)
nextcloud_card_ids.add(doc_id)
# Use lastModified timestamp if available
modified_at = card.lastModified or 0
if initial_sync:
# Send everything on first sync - write placeholder first
await write_placeholder_point(
doc_id=doc_id,
doc_type="deck_card",
user_id=user_id,
modified_at=modified_at,
)
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=doc_id,
doc_type="deck_card",
operation="index",
modified_at=modified_at,
)
)
queued += 1
else:
# Incremental sync: check if card exists and compare modified_at
doc_key = (user_id, doc_id)
if doc_key in _potentially_deleted:
logger.debug(
f"Deck card {doc_id} reappeared, removing from deletion grace period"
)
del _potentially_deleted[doc_key]
# Query Qdrant for existing entry
existing_metadata = await query_document_metadata(
doc_id=doc_id, doc_type="deck_card", user_id=user_id
)
needs_indexing = False
if existing_metadata is None:
needs_indexing = True
elif existing_metadata.get("modified_at", 0) < modified_at:
needs_indexing = True
elif existing_metadata.get("is_placeholder", False):
queued_at = existing_metadata.get("queued_at", 0)
placeholder_age = time.time() - queued_at
stale_threshold = settings.vector_sync_scan_interval * 5
if placeholder_age > stale_threshold:
logger.debug(
f"Found stale placeholder for deck card {doc_id} "
f"(age={placeholder_age:.1f}s), requeuing"
)
needs_indexing = True
if needs_indexing:
await write_placeholder_point(
doc_id=doc_id,
doc_type="deck_card",
user_id=user_id,
modified_at=modified_at,
)
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=doc_id,
doc_type="deck_card",
operation="index",
modified_at=modified_at,
)
)
queued += 1
logger.info(
f"[SCAN-{scan_id}] Found {card_count} deck cards (non-archived) for {user_id}"
)
record_vector_sync_scan(card_count)
# Check for deleted cards (not initial sync)
if not initial_sync:
grace_period = settings.vector_sync_scan_interval * 1.5
current_time = time.time()
for doc_id in indexed_card_ids:
if doc_id not in nextcloud_card_ids:
doc_key = (user_id, doc_id)
if doc_key in _potentially_deleted:
first_missing_time = _potentially_deleted[doc_key]
time_missing = current_time - first_missing_time
if time_missing >= grace_period:
logger.info(
f"Deck card {doc_id} missing for {time_missing:.1f}s "
f"(>{grace_period:.1f}s grace period), sending deletion"
)
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=doc_id,
doc_type="deck_card",
operation="delete",
modified_at=0,
)
)
queued += 1
del _potentially_deleted[doc_key]
else:
logger.debug(
f"Deck card {doc_id} missing for first time, starting grace period"
)
_potentially_deleted[doc_key] = current_time
return queued