Compare commits

...

60 Commits

Author SHA1 Message Date
smithery-ai[bot] d4871fe9c5 Update README 2025-11-22 23:31:10 +00:00
Chris Coutinho 26f679d86e Merge pull request #332 from cbcoutinho/renovate/docker.io-library-python-3.12-slim-trixie
chore(deps): update docker.io/library/python:3.12-slim-trixie docker digest to b43ff04
2025-11-23 00:29:07 +01:00
Chris Coutinho cf39a15db1 Merge pull request #345 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.9.11
2025-11-23 00:28:53 +01:00
renovate-bot-cbcoutinho[bot] 1f3c35f162 chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.9.11 2025-11-22 23:04:43 +00:00
renovate-bot-cbcoutinho[bot] 2bccc3dad9 chore(deps): update docker.io/library/python:3.12-slim-trixie docker digest to b43ff04 2025-11-22 23:04:40 +00:00
github-actions[bot] 959cb8b21a bump: version 0.46.1 → 0.46.2 2025-11-22 21:02:53 +00:00
Chris Coutinho f8a2410a0a Merge pull request #344 from cbcoutinho/fix/smithery-json-response
fix(smithery): Enable JSON response format for scanner compatibility
2025-11-22 22:02:24 +01:00
Chris Coutinho 03b984d5a7 fix(smithery): Enable JSON response format for scanner compatibility
The Smithery scanner was reporting "0 tools" despite the server returning
valid tool definitions. Root cause: the server was returning SSE-formatted
responses (event: message\ndata: {...}) which the scanner couldn't parse.

Changes:
- Add json_response=True to FastMCP for Smithery stateless mode
- Clean up verbose docstring examples in semantic.py and webdav.py

The MCP spec allows both SSE and plain JSON responses for HTTP transport.
Setting json_response=True returns Content-Type: application/json with
plain JSON-RPC instead of text/event-stream with SSE format.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 22:01:18 +01:00
github-actions[bot] 57db18c6a3 bump: version 0.46.0 → 0.46.1 2025-11-22 18:54:11 +00:00
Chris Coutinho ea79e94842 Merge pull request #343 from cbcoutinho/fix/vector-viz-search
perf: Optimize vector viz search performance
2025-11-22 19:53:40 +01:00
Chris Coutinho b0612cfa0f perf: Optimize vector viz search performance
- Replace sequential Qdrant scroll calls with batch retrieve
  (50 HTTP requests → 1 request, ~50x faster vector fetch)

- Add point_id to SearchResult to enable batch retrieval by Qdrant point ID

- Reuse query embedding from search algorithm in viz_routes
  (eliminates redundant embedding call, saves ~30ms)

- Make BM25 encode() async with thread pool to avoid blocking event loop
  (~4.4s was blocking, now properly async)

- Run PCA computation in thread pool to avoid blocking event loop
  (~1.2s was blocking, now properly async)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 19:47:43 +01:00
github-actions[bot] 4e61d73da5 bump: version 0.45.0 → 0.46.0 2025-11-22 18:40:24 +00:00
Chris Coutinho 3b41776110 Merge pull request #342 from cbcoutinho/feature/smithery
feat: Add Smithery stateless deployment support (ADR-016)
2025-11-22 19:39:53 +01:00
Chris Coutinho 3e3d38696c docs(smithery): Make Smithery the primary Quick Start option
Reorganize README to promote Smithery as the fastest way to get started:
- Quick Start now features Smithery one-click deployment
- Docker instructions moved to separate "Docker (Self-Hosted)" section
- Added note about Smithery's stateless mode limitations

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 19:38:11 +01:00
Chris Coutinho 7b22e5be0f build: add smithery image to docker compose 2025-11-22 19:06:25 +01:00
Chris Coutinho 39fba49cfe fix(smithery): Add JSON Schema metadata to mcp-config endpoint
Add proper JSON Schema metadata fields per Smithery documentation:
- $schema: JSON Schema draft-07
- $id: Schema identifier URL
- title: Human-readable title
- description: Schema description
- x-query-style: "flat" (no nested objects in our schema)
- additionalProperties: false

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 18:28:05 +01:00
Chris Coutinho 706a15f0bc fix(smithery): Use container runtime pattern for config discovery
ADR-016: For container runtime deployment, Smithery does not auto-generate
the .well-known/mcp-config endpoint like it does for Python CLI runtime.

Changes:
- Remove [tool.smithery] from pyproject.toml (not used in container mode)
- Remove smithery_server.py (Python CLI runtime specific)
- Add .well-known/mcp-config endpoint to return JSON Schema config
- Add SmitheryConfigMiddleware to extract config from URL query params
- Use ContextVar to pass session config to tool handlers

The container runtime passes config as URL query parameters to /mcp:
  GET /mcp?nextcloud_url=...&username=...&app_password=...

Tested:
- All 164 unit tests passing
- Docker container builds successfully
- .well-known/mcp-config returns valid JSON Schema
- Health endpoints working

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 18:22:55 +01:00
Chris Coutinho b8dc413b73 feat: Add Smithery CLI deployment support
- Add smithery package as dependency
- Create smithery_server.py with @smithery.server() decorator
- Add SmitheryConfigSchema for session config (nextcloud_url, username, app_password)
- Add [tool.smithery] section to pyproject.toml
- Remove manual .well-known/mcp-config endpoint (Smithery handles this)

Smithery CLI will automatically:
- Extract config schema from the decorated function
- Handle session config parsing from query parameters
- Make config accessible via ctx.session_config in tools

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 18:05:33 +01:00
Chris Coutinho 8d29ce0122 fix: Add Smithery lifespan and auth mode detection
- Add SmitheryAppContext dataclass for stateless mode
- Add app_lifespan_smithery() with minimal lifespan (no shared state)
- Update is_oauth_mode() to detect Smithery mode and return BasicAuth
- Use Smithery lifespan when SMITHERY_DEPLOYMENT=true
- Add .well-known/mcp-config endpoint for config discovery
- Skip document processors in Smithery mode (not enabled)

Fixes startup issues in Smithery mode where missing env credentials
would incorrectly trigger OAuth mode.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 17:48:53 +01:00
Chris Coutinho a272e7cbab build: Fix Dockerfile.smithery 2025-11-22 17:35:16 +01:00
Chris Coutinho ce55b239e2 build: Fix Dockerfile.smithery 2025-11-22 17:33:12 +01:00
Chris Coutinho 432ab73741 build: Add missing deps 2025-11-22 17:32:20 +01:00
Chris Coutinho f93d650992 feat: Implement ADR-016 Smithery stateless deployment mode
Adds support for Smithery hosted deployment with stateless operation:

- Add DeploymentMode enum with SELF_HOSTED and SMITHERY_STATELESS modes
- Add get_deployment_mode() to detect mode from SMITHERY_DEPLOYMENT env var
- Update get_client() to create per-request clients from session config
- Add conditional tool registration (skip semantic search in Smithery mode)
- Add conditional /app admin UI mounting (skip in Smithery mode)
- Create smithery.yaml with configSchema for user credentials
- Create Dockerfile.smithery for minimal stateless container
- Create smithery_main.py entrypoint for Smithery deployment

In Smithery mode:
- Users provide nextcloud_url, username, app_password via session config
- Each request creates a fresh NextcloudClient (no state between requests)
- Semantic search tools are disabled (no vector database)
- Admin UI (/app) is disabled (no webhooks, vector viz)

All existing self-hosted functionality remains unchanged.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 17:30:42 +01:00
github-actions[bot] f9da19d1a1 bump: version 0.44.1 → 0.45.0 2025-11-22 16:14:35 +00:00
Chris Coutinho d2b6a26fe4 Merge pull request #341 from cbcoutinho/fix/async-await-and-pdf-metadata
fix: Async/await patterns, PDF metadata, and vector visualization improvements
2025-11-22 17:14:06 +01:00
Chris Coutinho 482ef89a73 docs: Add ADR-016 for Smithery stateless deployment
Add architecture decision record for supporting Smithery-hosted MCP
server in a stateless mode for multi-user public Nextcloud instances.

Key decisions:
- New SMITHERY_STATELESS deployment mode alongside SELF_HOSTED
- Session-based configuration (nextcloud_url, username, app_password)
- Feature subset excluding semantic search and background sync
- Admin UI (/app) excluded in Smithery mode
- Per-request client creation from session config

This enables users to try the MCP server without self-hosting
infrastructure while supporting multiple Nextcloud instances.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 17:13:18 +01:00
Chris Coutinho 34fd17ba55 fix: Use alpha_composite for proper RGBA highlight blending
Drawing directly with ImageDraw on RGBA mode doesn't blend alpha
properly. Use Image.alpha_composite() with a transparent overlay
to achieve correct semi-transparent highlight fills.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 17:04:29 +01:00
Chris Coutinho 8baa07db84 fix: Remove pymupdf.layout.activate() to fix page_chunks behavior
pymupdf.layout.activate() causes pymupdf4llm.to_markdown() to ignore the
page_chunks=True option, returning a single string instead of list[dict].
This broke per-page chunking needed for semantic search indexing.

See: https://github.com/pymupdf/pymupdf4llm/issues/323

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 16:58:35 +01:00
Chris Coutinho ba8a53803a refactor: Simplify PDF text extraction with single to_markdown call
Replace parallel per-page extraction with single to_markdown(page_chunks=True)
call. This is more efficient as pymupdf4llm can optimize internally for
full-document processing instead of making N separate calls for N pages.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 03:52:02 +01:00
Chris Coutinho 31fade9730 perf: Optimize PDF processing with parallel extraction and single-render highlights
Phase 1 - PDF Highlighting Optimization:
- Render each page ONCE instead of once per chunk (N chunks = 1 render, not N)
- Use PIL to draw bounding boxes on copied base images (fast) instead of
  re-rendering page via pymupdf (slow)
- Add _find_chunk_bbox() to extract bbox without modifying page

Phase 2 - Parallel Page Extraction:
- Use anyio task group with run_sync() for parallel page extraction
- Each page extracted in separate thread via anyio.to_thread.run_sync()
- Event loop stays responsive during extraction
- Remove obsolete _process_sync() method

Expected improvement: 30-50% reduction in total PDF processing time.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 03:11:56 +01:00
Chris Coutinho fffe483c02 fix: Centralize PDF processing and generate separate images per chunk
Previously, pymupdf4llm.to_markdown() was called twice - once in
PyMuPDFProcessor during indexing and again in PDFHighlighter during
visualization. Different image path lengths caused different character
offsets, leading to highlighted pages not matching their chunks.

Also fixed issue where all chunks on the same page showed all highlights
instead of just their own highlight. Now restores original page contents
between chunks using xref stream caching.

Changes:
- Add PDFHighlighter class requiring pre-computed page_boundaries and
  full_text from document processor (no fallback extraction)
- Pass pre-computed data from processor to highlighter
- Extract page-relative portion of chunk text for cross-page chunks
- Add bounding box highlighting using text anchor search
- Run highlight generation in parallel with embedding/BM25
- Cache and restore page contents to isolate highlights per chunk

Results: Highlighting success rate improved from 51% to 95% (121/128).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 02:46:30 +01:00
Chris Coutinho 8c79993280 Merge pull request #334 from cbcoutinho/renovate/docker.io-library-redis-alpine
chore(deps): update docker.io/library/redis:alpine docker digest to 6cbef35
2025-11-21 14:24:54 +01:00
Chris Coutinho 8a0672a6be Merge pull request #339 from cbcoutinho/renovate/astral-sh-setup-uv-7.x
chore(deps): update astral-sh/setup-uv action to v7.1.4
2025-11-21 14:24:42 +01:00
Chris Coutinho 395f798ee2 Merge pull request #340 from cbcoutinho/renovate/ollama-1.x
chore(deps): update helm release ollama to v1.35.0
2025-11-21 14:24:26 +01:00
renovate-bot-cbcoutinho[bot] debff75221 chore(deps): update helm release ollama to v1.35.0 2025-11-21 11:09:18 +00:00
renovate-bot-cbcoutinho[bot] 4bf0a6c22e chore(deps): update astral-sh/setup-uv action to v7.1.4 2025-11-21 11:08:53 +00:00
Chris Coutinho fb025821cb Merge pull request #335 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.9.11
2025-11-21 09:45:31 +01:00
Chris Coutinho ff880fd4c9 Merge pull request #338 from cbcoutinho/renovate/docker.io-library-nextcloud-32.x
chore(deps): update docker.io/library/nextcloud docker tag to v32.0.2
2025-11-21 09:34:20 +01:00
renovate-bot-cbcoutinho[bot] 03495d901d chore(deps): update docker.io/library/nextcloud docker tag to v32.0.2 2025-11-21 05:14:28 +00:00
github-actions[bot] 798958f20a bump: version 0.44.0 → 0.44.1 2025-11-21 00:39:23 +00:00
Chris Coutinho 699295c5be Merge pull request #336 from cbcoutinho/renovate/mcp-1.x
fix(deps): update dependency mcp to >=1.22,<1.23
2025-11-21 01:38:50 +01:00
Chris Coutinho a62a007c87 feat: Add context expansion to semantic search with chunk overlap removal
Implements optional context expansion for semantic search results that
fetches adjacent chunks (N-1 and N+1) from Qdrant to provide before/after
context. Removes configurable chunk overlap (default 200 chars) to avoid
duplicate text appearing in both context and excerpt.

Key changes:
- Add include_context and context_chars parameters to nc_semantic_search
  and nc_semantic_search_answer tools
- Implement Qdrant cache fast path for chunk retrieval (avoids re-fetching
  and re-parsing documents, especially important for PDFs)
- Add _get_chunk_by_index_from_qdrant() to fetch adjacent chunks
- Remove chunk overlap from before_context (last N chars) and after_context
  (first N chars) to prevent duplicate text
- Fetch context in parallel with anyio.Semaphore (max 20 concurrent)
- Pass through page_number from SearchResult to SemanticSearchResult
- Remove document-level deduplication (keep chunk-level dedup from algorithm)

Context expansion is opt-in via include_context=true parameter. When enabled:
- Populates has_context_expansion, marked_text, before_context, after_context
- Adds truncation flags when context exceeds context_chars limit
- Falls back to document fetch for legacy data with truncated excerpts

Related: nextcloud_mcp_server/search/context.py:87-382,
         nextcloud_mcp_server/server/semantic.py:161-255
2025-11-21 01:02:22 +01:00
renovate-bot-cbcoutinho[bot] d4fc1de80d fix(deps): update dependency mcp to >=1.22,<1.23 2025-11-20 23:11:11 +00:00
renovate-bot-cbcoutinho[bot] 0902b5653f chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.9.11 2025-11-20 23:10:47 +00:00
renovate-bot-cbcoutinho[bot] 0b6a02075c chore(deps): update docker.io/library/redis:alpine docker digest to 6cbef35 2025-11-20 23:10:43 +00:00
Chris Coutinho 7880a8de30 Merge pull request #333 from cbcoutinho/renovate/actions-checkout-6.x
chore(deps): update actions/checkout action to v6
2025-11-20 20:17:21 +01:00
renovate-bot-cbcoutinho[bot] 2abedd6b4b chore(deps): update actions/checkout action to v6 2025-11-20 17:12:30 +00:00
Chris Coutinho 5a251a99e6 fix: Set is_placeholder=False in processor to fix search filtering
The processor was not setting is_placeholder field when writing real
document chunks to Qdrant. This caused the placeholder filter to exclude
all documents (since None != False), resulting in 0 search results.

Now explicitly sets is_placeholder: False in payload when writing real
indexed chunks, allowing search filters to correctly distinguish between
placeholders and real documents.
2025-11-20 17:15:19 +01:00
Chris Coutinho 25ef33de7f feat: Use Ollama native batch API in embed_batch()
- Switch from sequential loop to /api/embed batch endpoint
- Use 'input' array parameter instead of individual 'prompt' requests
- Process in chunks of 32 to avoid quality degradation (issue #6262)
- Reduces HTTP overhead: 128 texts = 4 requests instead of 128
- Maintains backward compatibility with embed() for single embeddings

Ref: ollama/ollama#6262
2025-11-20 16:50:13 +01:00
Chris Coutinho ec2c274cd9 fix: Increase placeholder staleness threshold to 5x scan interval
- Changed from 2x (120s) to 5x (300s) scan interval
- Large PDFs take 3-4 minutes to process, need longer threshold
- Prevents premature requeuing of in-flight documents
2025-11-20 15:36:49 +01:00
Chris Coutinho 47f0b3db9a fix: Add placeholder staleness check to prevent duplicate processing
- Only requeue documents if placeholder is older than 2x scan interval (120s default)
- Prevents scanner from immediately requeuing in-flight documents
- Fixes issue where PDFs were being reprocessed every 60 seconds
- Staleness check applied to both notes and files scanning logic
2025-11-20 15:30:10 +01:00
Chris Coutinho 233de3508f fix: Use empty SparseVector instead of None for placeholders
Qdrant validation rejects None for sparse vectors in named vector dicts.
Use models.SparseVector(indices=[], values=[]) instead to create valid
empty sparse vectors for placeholder points.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-20 15:15:10 +01:00
Chris Coutinho 13b2d0048c feat: Implement Qdrant placeholder state management
Introduces a placeholder-based state tracking system to prevent duplicate
document processing during the gap between scanner queuing and processor
completion.

**Key Changes:**

1. **Placeholder Helper Functions** (`vector/placeholder.py`):
   - `write_placeholder_point()` - Creates zero-vector placeholder when queuing
   - `query_document_metadata()` - Queries for existing entry (placeholder or real)
   - `delete_placeholder_point()` - Removes placeholder before writing real vectors
   - `get_placeholder_filter()` - Filters placeholders from user-facing queries

2. **Scanner Updates** (`vector/scanner.py`):
   - Replace `indexed_at` comparison with `modified_at` comparison
   - Write placeholder before queuing each document
   - Query per-document metadata instead of bulk-querying indexed_at
   - Fixes bug where files were resubmitted every scan cycle

3. **Processor Updates** (`vector/processor.py`):
   - Delete placeholder before upserting real vectors
   - Ensures no duplicate points in Qdrant

4. **Query Filters** (all search files):
   - Add `get_placeholder_filter()` to all user-facing queries
   - Ensures placeholders never appear in search results or visualizations
   - Applied to: bm25_hybrid.py, semantic.py, viz_routes.py, algorithms.py

**Architecture:**
- Placeholders use zero vectors with dimension from embedding service
- Payload includes `is_placeholder: True` flag for filtering
- Status field tracks: "pending", "processing", "completed", "failed"
- Deterministic UUIDs using uuid5 for consistent point IDs

**Impact:**
- Eliminates duplicate processing of same documents
- Fixes race condition where long-running documents get queued multiple times
- Prevents scanner from resubmitting files every scan cycle
- Maintains clean separation between in-flight and indexed documents

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-20 15:04:00 +01:00
Chris Coutinho 944dd760ca fix: Return empty array instead of null for query_coords when no results
When vector visualization search returns zero results, the code was returning
query_coords: null, which caused JavaScript error "can't access property 0,
queryCoords is null" when the frontend tried to access the array.

Changed to return empty array [] to match expected type and prevent crash.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-20 14:18:02 +01:00
Chris Coutinho d67aa6ae5c fix: Align PDF text extraction between indexing and context expansion
This commit fixes two critical issues with PDF processing:

1. **Text extraction mismatch (context expansion bug)**:
   - Indexing used pymupdf4llm.to_markdown() producing markdown text
   - Context expansion used page.get_text() producing plain text
   - Different text formats caused character offset misalignment
   - Search would find correct chunk, but expansion showed wrong section
   - Fixed by making context.py use pymupdf4llm.to_markdown() consistently

2. **Diagnostic logging for page number assignment**:
   - Added logging to verify page_boundaries exist in metadata
   - Added logging to verify assign_page_numbers() assigns values
   - Helps diagnose why page numbers show as null in search results

3. **mime_type storage bug**:
   - Fixed incorrect field reference in processor.py:405
   - Was using file_metadata.get("content_type", "")
   - Should use content_type from WebDAV response

Changes:
- nextcloud_mcp_server/search/context.py: Use pymupdf4llm.to_markdown()
  for PDF text extraction to match indexing method
- nextcloud_mcp_server/vector/processor.py: Add diagnostic logging for
  page boundaries and assignment, fix mime_type storage
- tests/unit/client/test_webdav.py: Fix import sorting

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-20 13:57:50 +01:00
Chris Coutinho f1a5fac1b9 fix: Update models and viz to use int-only doc_id
- algorithms.py: Revert SearchResult.id to int (all docs use int IDs now)
- semantic.py: Revert SemanticSearchResult.id to int, remove Union import
- viz_routes.py: Remove str() conversion when querying doc_id from Qdrant
- viz_routes.py: Convert doc_id from query param to int in chunk context

Fixes vector visualization which was collapsing all chunks to a single
point because Qdrant queries were failing to match doc_id (string vs int).
2025-11-20 12:32:27 +01:00
Chris Coutinho d0691d5aa0 feat: Switch files to use numeric IDs with file_path resolution
- scanner.py: Use file_info['id'] as doc_id instead of file_path
- scanner.py: Pass file_path in DocumentTask for content retrieval
- processor.py: Store file_path in Qdrant payload for later lookup
- context.py: Add _get_file_path_from_qdrant() to resolve file_id → file_path
- context.py: Update get_chunk_with_context() to handle file ID resolution

This makes the system resilient to file renames since file IDs are stable
identifiers in Nextcloud, while file paths can change.
2025-11-20 12:00:47 +01:00
Chris Coutinho f1610bbd2e fix: Reconstruct full content for notes to match indexed offsets
Notes are indexed as "{title}\n\n{content}" in processor.py but were
being retrieved as just content during chunk expansion, causing
chunk_start_offset and chunk_end_offset to be misaligned.

This fix reconstructs the full content structure when fetching notes
for chunk expansion, ensuring the displayed chunks match the excerpts
shown in search results.

Fixes chunk/excerpt mismatch reported in vector visualization.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-20 11:33:12 +01:00
Chris Coutinho 327d843f64 feat: Implement per-chunk vector visualization with context expansion
Major improvements to vector visualization page:
- Refactor PCA to display individual chunks instead of averaged documents
- Add context expansion module for fetching surrounding text from notes and PDFs
- Update deduplication to use (doc_id, doc_type, chunk_start, chunk_end) keys
- Fix Alpine.js rendering with chunk-specific keys including offsets
- Refactor authentication helper to return NextcloudClient for better reuse
- Add async context manager support to NextcloudClient

Technical details:
- viz_routes.py: Fetch specific chunk vectors instead of averaging per document
- context.py: New module supporting both notes and PDF text extraction via PyMuPDF
- search algorithms: Extract page_number, chunk_index, total_chunks from Qdrant
- vector-viz.js/html: Use chunk positions in expansion tracking keys

This enables users to see which specific chunks match their query
and view them with surrounding context in the PCA visualization.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-20 11:22:20 +01:00
Chris Coutinho b8010270c1 fix: Add async/await, PDF metadata, and type safety fixes
This commit addresses multiple issues with async operations, PDF metadata
extraction, and type safety in document processing and search.

## Async/Await Fixes
- processor.py:259 - Added await for chunker.chunk_text(content)
- processor.py:270 - Added await for bm25_service.encode_batch(chunk_texts)
- tests/unit/test_document_chunker.py - Converted all 12 test methods to async

## PDF Metadata Enhancement
- pymupdf.py:143 - Added file_size metadata extraction
- pymupdf.py:145-206 - Refactored to extract text page-by-page
  - Manually loop through pages instead of using page_chunks=True
  - Generate page_boundaries metadata for precise page tracking
  - Works around pymupdf.layout.activate() breaking page_chunks=True
- processor.py:32-66 - Added assign_page_numbers() helper function
  - Assigns page numbers to chunks based on overlap with page boundaries
  - Handles chunks spanning multiple pages
- processor.py:298-300 - Call assign_page_numbers() for PDF files

## Type Safety Fixes
- bm25_hybrid.py:184 - Removed int() conversion of doc_id
- semantic.py:131 - Removed int() conversion of doc_id
- viz_routes.py:275 - Removed int() conversion of doc_id
- Added comments documenting that doc_id can be int (notes) or str (file paths)

## Testing
- All 18 tests passing (12 unit + 6 integration)
- No type errors in modified files
- Container logs show successful processing
- Vector viz searches working correctly

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-20 02:37:07 +01:00
48 changed files with 5443 additions and 437 deletions
+1 -1
View File
@@ -15,7 +15,7 @@ jobs:
packages: write
steps:
- name: Check out
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6
with:
fetch-depth: 0
token: "${{ secrets.PERSONAL_ACCESS_TOKEN }}"
+1 -1
View File
@@ -12,7 +12,7 @@ jobs:
packages: write
steps:
- name: Checkout repository
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6
- name: Docker meta
id: meta
+1 -1
View File
@@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6
with:
fetch-depth: 0
+2 -2
View File
@@ -18,9 +18,9 @@ jobs:
contents: read
steps:
- name: Checkout
uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6
- name: Install uv
uses: astral-sh/setup-uv@5a7eac68fb9809dea845d802897dc5c723910fa3 # v7.1.3
uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7.1.4
- name: Install Python 3.11
run: uv python install 3.11
- name: Build
+4 -4
View File
@@ -9,9 +9,9 @@ jobs:
linting:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
- name: Install the latest version of uv
uses: astral-sh/setup-uv@5a7eac68fb9809dea845d802897dc5c723910fa3 # v7.1.3
uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7.1.4
- name: Check format
run: |
uv run --frozen ruff format --diff
@@ -27,7 +27,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
submodules: 'true'
@@ -56,7 +56,7 @@ jobs:
up-flags: "--build"
- name: Install the latest version of uv
uses: astral-sh/setup-uv@5a7eac68fb9809dea845d802897dc5c723910fa3 # v7.1.3
uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7.1.4
- name: Install Playwright dependencies
run: |
+64
View File
@@ -1,3 +1,67 @@
## v0.46.2 (2025-11-22)
### Fix
- **smithery**: Enable JSON response format for scanner compatibility
## v0.46.1 (2025-11-22)
### Perf
- Optimize vector viz search performance
## v0.46.0 (2025-11-22)
### Feat
- Add Smithery CLI deployment support
- Implement ADR-016 Smithery stateless deployment mode
### Fix
- **smithery**: Add JSON Schema metadata to mcp-config endpoint
- **smithery**: Use container runtime pattern for config discovery
- Add Smithery lifespan and auth mode detection
## v0.45.0 (2025-11-22)
### Feat
- Add context expansion to semantic search with chunk overlap removal
- Use Ollama native batch API in embed_batch()
- Implement Qdrant placeholder state management
- Switch files to use numeric IDs with file_path resolution
- Implement per-chunk vector visualization with context expansion
### Fix
- Use alpha_composite for proper RGBA highlight blending
- Remove pymupdf.layout.activate() to fix page_chunks behavior
- Centralize PDF processing and generate separate images per chunk
- Set is_placeholder=False in processor to fix search filtering
- Increase placeholder staleness threshold to 5x scan interval
- Add placeholder staleness check to prevent duplicate processing
- Use empty SparseVector instead of None for placeholders
- Return empty array instead of null for query_coords when no results
- Align PDF text extraction between indexing and context expansion
- Update models and viz to use int-only doc_id
- Reconstruct full content for notes to match indexed offsets
- Add async/await, PDF metadata, and type safety fixes
### Refactor
- Simplify PDF text extraction with single to_markdown call
### Perf
- Optimize PDF processing with parallel extraction and single-render highlights
## v0.44.1 (2025-11-21)
### Fix
- **deps**: update dependency mcp to >=1.22,<1.23
## v0.44.0 (2025-11-19)
### Feat
+5 -2
View File
@@ -1,12 +1,13 @@
FROM docker.io/library/python:3.12-slim-trixie@sha256:2e683fc3e18a248aa23b8022f2a3474b072b04fb851efe9b49f6b516a8944939
FROM docker.io/library/python:3.12-slim-trixie@sha256:b43ff04d5df04ad5cabb80890b7ef74e8410e3395b19af970dcd52d7a4bff921
COPY --from=ghcr.io/astral-sh/uv:0.9.10@sha256:29bd45092ea8902c0bbb7f0a338f0494a382b1f4b18355df5be270ade679ff1d /uv /uvx /bin/
COPY --from=ghcr.io/astral-sh/uv:0.9.11@sha256:5aa820129de0a600924f166aec9cb51613b15b68f1dcd2a02f31a500d2ede568 /uv /uvx /bin/
# Install dependencies
# 1. git (required for caldav dependency from git)
# 2. sqlite for development with token db
RUN apt update && apt install --no-install-recommends --no-install-suggests -y \
git \
tesseract-ocr \
sqlite3 && apt clean
WORKDIR /app
@@ -17,5 +18,7 @@ RUN uv sync --locked --no-dev --no-editable --no-cache
ENV PYTHONUNBUFFERED=1
ENV VIRTUAL_ENV=/app/.venv
ENV PATH=/app/.vnev/bin:$PATH
ENV TESSDATA_PREFIX=/usr/share/tesseract-ocr/5/tessdata
ENTRYPOINT ["/app/.venv/bin/nextcloud-mcp-server", "--host", "0.0.0.0"]
+44
View File
@@ -0,0 +1,44 @@
# Dockerfile for Smithery stateless deployment
# ADR-016: Stateless mode for multi-user public Nextcloud instances
#
# This image excludes:
# - Vector database dependencies (qdrant-client)
# - Background sync workers
# - Admin UI routes (/app)
# - Semantic search tools
#
# Features included:
# - Core Nextcloud tools (notes, calendar, contacts, files, deck, tables, cookbook)
# - Per-session app password authentication
# - Multi-user support via Smithery session config
FROM docker.io/library/python:3.12-slim-trixie@sha256:b43ff04d5df04ad5cabb80890b7ef74e8410e3395b19af970dcd52d7a4bff921
WORKDIR /app
# Install uv for fast dependency management
COPY --from=ghcr.io/astral-sh/uv:0.9.11@sha256:5aa820129de0a600924f166aec9cb51613b15b68f1dcd2a02f31a500d2ede568 /uv /uvx /bin/
# Install dependencies
# 1. git (required for caldav dependency from git)
# 2. sqlite for development with token db
RUN apt update && apt install --no-install-recommends --no-install-suggests -y \
git
# Copy project files
COPY . .
RUN uv sync --locked --no-dev --no-editable --no-cache
# Set Smithery mode environment variables
ENV SMITHERY_DEPLOYMENT=true
ENV VECTOR_SYNC_ENABLED=false
# Smithery sets PORT=8081 by default
EXPOSE 8081
# Health check endpoint
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD uv run python -c "import httpx; httpx.get('http://localhost:${PORT:-8081}/health/live').raise_for_status()"
CMD ["/app/.venv/bin/smithery-main"]
+18 -3
View File
@@ -1,9 +1,11 @@
```markdown
<p align="center">
<img src="astrolabe.svg" alt="Nextcloud MCP Server" width="128" height="128">
</p>
# Nextcloud MCP Server
[![smithery badge](https://smithery.ai/badge/@cbcoutinho/nextcloud-mcp-server)](https://smithery.ai/server/@cbcoutinho/nextcloud-mcp-server)
[![Docker Image](https://img.shields.io/badge/docker-ghcr.io/cbcoutinho/nextcloud--mcp--server-blue)](https://github.com/cbcoutinho/nextcloud-mcp-server/pkgs/container/nextcloud-mcp-server)
**A production-ready MCP server that connects AI assistants to your Nextcloud instance.**
@@ -17,7 +19,20 @@ This is a **dedicated standalone MCP server** designed for external MCP clients
## Quick Start
Get up and running in 60 seconds using Docker:
The fastest way to get started is via [Smithery](https://smithery.ai/server/@cbcoutinho/nextcloud-mcp-server) - no Docker or self-hosting required:
1. Visit the [Smithery marketplace page](https://smithery.ai/server/@cbcoutinho/nextcloud-mcp-server)
2. Click "Deploy" and configure:
- **Nextcloud URL**: Your Nextcloud instance (e.g., `https://cloud.example.com`)
- **Username**: Your Nextcloud username
- **App Password**: Generate one in Nextcloud → Settings → Security → Devices & sessions
> [!NOTE]
> Smithery runs in stateless mode without semantic search. For full features, use [Docker](#docker-self-hosted) or see [ADR-016](docs/ADR-016-smithery-stateless-deployment.md).
## Docker (Self-Hosted)
For full features including semantic search, run with Docker:
```bash
# 1. Create a minimal configuration
@@ -37,12 +52,11 @@ curl http://127.0.0.1:8000/health/ready
# 4. Connect to the endpoint
http://127.0.0.1:8000/sse
# 4. Or with --transport streamable-http
# Or with --transport streamable-http
http://127.0.0.1:8000/mcp
```
**Next Steps:**
- Create an app password in Nextcloud: Settings → Security → Devices & sessions
- Connect your MCP client (Claude Desktop, IDEs, `mcp dev`, etc.)
- See [docs/installation.md](docs/installation.md) for other deployment options (local, Kubernetes)
@@ -210,3 +224,4 @@ This project is licensed under the AGPL-3.0 License. See [LICENSE](./LICENSE) fo
- [Model Context Protocol](https://github.com/modelcontextprotocol)
- [MCP Python SDK](https://github.com/modelcontextprotocol/python-sdk)
- [Nextcloud](https://nextcloud.com/)
```
+3 -3
View File
@@ -4,6 +4,6 @@ dependencies:
version: 1.16.0
- name: ollama
repository: https://otwld.github.io/ollama-helm
version: 1.34.0
digest: sha256:9dfb8d6e3d5488f669d4c37f3a766213b598ff3de2aead2c734789736c7835b4
generated: "2025-11-17T17:08:48.055530019Z"
version: 1.35.0
digest: sha256:da8db198b12ce0252df220fabb297cfe69186edb8e67952c52e05de778189b92
generated: "2025-11-21T11:09:07.997781541Z"
+3 -3
View File
@@ -2,8 +2,8 @@ apiVersion: v2
name: nextcloud-mcp-server
description: A Helm chart for Nextcloud MCP Server - enables AI assistants to interact with Nextcloud
type: application
version: 0.44.0
appVersion: "0.44.0"
version: 0.46.2
appVersion: "0.46.2"
keywords:
- nextcloud
- mcp
@@ -31,6 +31,6 @@ dependencies:
repository: https://qdrant.github.io/qdrant-helm
condition: qdrant.networkMode.deploySubchart
- name: ollama
version: "1.34.0"
version: "1.35.0"
repository: https://otwld.github.io/ollama-helm
condition: ollama.enabled
+22 -2
View File
@@ -17,11 +17,11 @@ services:
# Note: Redis is an external service. You can find more information about the configuration here:
# https://hub.docker.com/_/redis
redis:
image: docker.io/library/redis:alpine@sha256:5013e94192ef18a5d8368179c7522e5300f9265cc339cadac76c7b93303a2752
image: docker.io/library/redis:alpine@sha256:6cbef353e480a8a6e7f10ec545f13d7d3fa85a212cdcc5ffaf5a1c818b9d3798
restart: always
app:
image: docker.io/library/nextcloud:32.0.1@sha256:d572839eeb693026d72a0c6aa48076df0bb8930797ea321e604936ef7189d06e
image: docker.io/library/nextcloud:32.0.2@sha256:ac08482d73ffd85d94069ba291bbd5fb39a70ff21502030a2e3e2d89a7246a48
restart: always
ports:
- 0.0.0.0:8080:80
@@ -224,6 +224,26 @@ services:
- keycloak-tokens:/app/data
- keycloak-oauth-storage:/app/.oauth
# Smithery stateless deployment mode (ADR-016)
# Test with: docker compose --profile smithery up smithery
# Then: curl http://localhost:8081/.well-known/mcp-config
smithery:
build:
context: .
dockerfile: Dockerfile.smithery
restart: always
depends_on:
app:
condition: service_healthy
ports:
- 127.0.0.1:8081:8081
environment:
- SMITHERY_DEPLOYMENT=true
- VECTOR_SYNC_ENABLED=false
- PORT=8081
profiles:
- smithery
qdrant:
image: qdrant/qdrant:v1.16.0@sha256:1005201498cf927d835383d0f918b17d8c9da7db58550f169f694455e42d78f4
restart: always
@@ -0,0 +1,492 @@
# ADR-016: Smithery Stateless Deployment for Multi-User Public Nextcloud Instances
**Status:** Proposed
**Date:** 2025-01-22
**Deciders:** Development Team
**Related:** ADR-004 (OAuth), ADR-007 (Background Vector Sync), ADR-015 (Unified Provider)
## Context
[Smithery](https://smithery.ai) is a hosting platform and marketplace for MCP servers that provides:
- **Discovery**: Marketplace listing for MCP servers
- **Hosting**: Containerized deployment with auto-scaling
- **Authentication UI**: OAuth flow presentation for users
- **Session Configuration**: Per-user settings passed via URL parameters
- **Observability**: Usage logs and monitoring
### Current Architecture Limitations
The current nextcloud-mcp-server architecture assumes a **self-hosted deployment** with:
1. **Persistent Infrastructure**
- Qdrant vector database for semantic search
- Background sync worker for content indexing
- Refresh token storage for offline access
2. **Single-Tenant Configuration**
- Environment variables configure one Nextcloud instance
- `NEXTCLOUD_HOST`, `NEXTCLOUD_USERNAME`, `NEXTCLOUD_PASSWORD`
- Or OAuth with a single IdP
3. **Stateful Operations**
- Vector sync maintains index state across requests
- Token storage persists between sessions
### Smithery Hosting Constraints
Smithery-hosted containers are **stateless by design**:
- No persistent storage between requests
- No background workers or cron jobs
- No databases (Qdrant, Redis, etc.)
- Containers may be recycled at any time
- Configuration passed per-session via URL parameters
### Opportunity
Many users have **publicly accessible Nextcloud instances** and want to:
1. Try the MCP server without self-hosting infrastructure
2. Connect multiple users to different Nextcloud instances
3. Use basic Nextcloud tools without semantic search
4. Benefit from Smithery's discovery and OAuth UI
## Decision
Implement a **stateless deployment mode** for Smithery that:
1. **Disables stateful features** (vector sync, semantic search)
2. **Creates clients per-session** from Smithery configuration
3. **Supports multiple Nextcloud instances** via session config
4. **Provides a useful subset of tools** that work without infrastructure
### Architecture
```
┌─────────────────────────────────────────────────────────────────────────┐
│ Smithery-Hosted Stateless Mode │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ MCP Client Smithery │
│ (Cursor, Claude) Infrastructure │
│ │ │ │
│ │ 1. Connect │ │
│ ├───────────────────────────►│ │
│ │ │ │
│ │ 2. Config UI │ │
│ │◄───────────────────────────┤ User enters: │
│ │ (Smithery presents) │ - nextcloud_url │
│ │ │ - auth_mode (basic/oauth) │
│ │ │ - credentials │
│ │ 3. Tool call │ │
│ ├───────────────────────────►│ │
│ │ + session config │ │
│ │ │ │
│ │ ┌───────┴───────┐ │
│ │ │ MCP Server │ │
│ │ │ Container │ │
│ │ │ │ │
│ │ │ 4. Create │ │
│ │ │ client │ │
│ │ │ from │ │
│ │ │ config │ │
│ │ │ │ │ │
│ │ │ ▼ │ │
│ │ │ 5. Call │ │
│ │ │ Nextcloud │───────► User's Nextcloud │
│ │ │ API │ Instance │
│ │ │ │ │ │
│ │ │ ▼ │ │
│ │ 6. Response │ Return result │ │
│ │◄───────────────────┤ │ │
│ │ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
### Session Configuration Schema
```python
from pydantic import BaseModel, Field
class SmitheryConfigSchema(BaseModel):
"""Configuration schema for Smithery session."""
# Required: Nextcloud instance
nextcloud_url: str = Field(
...,
description="Your Nextcloud instance URL (e.g., https://cloud.example.com)"
)
# Authentication mode
auth_mode: str = Field(
"app_password",
description="Authentication method: 'app_password' or 'oauth'"
)
# App Password authentication (recommended for Smithery)
username: str | None = Field(
None,
description="Nextcloud username (required for app_password auth)"
)
app_password: str | None = Field(
None,
description="Nextcloud app password (Settings → Security → App passwords)"
)
# OAuth authentication (advanced)
# When auth_mode='oauth', Smithery handles the OAuth flow
# and passes the access token automatically
```
### Feature Matrix
| Feature | Self-Hosted | Smithery Stateless |
|---------|-------------|-------------------|
| **Notes** | | |
| List/Search notes | ✓ | ✓ |
| Get/Create/Update notes | ✓ | ✓ |
| Semantic search | ✓ | ✗ |
| **Calendar** | | |
| List calendars | ✓ | ✓ |
| Get/Create events | ✓ | ✓ |
| **Contacts** | | |
| List address books | ✓ | ✓ |
| Search/Get contacts | ✓ | ✓ |
| **Files (WebDAV)** | | |
| List/Download files | ✓ | ✓ |
| Upload files | ✓ | ✓ |
| Search files | ✓ | ✓ (keyword only) |
| **Deck** | | |
| List boards/cards | ✓ | ✓ |
| Create/Update cards | ✓ | ✓ |
| **Tables** | | |
| List/Query tables | ✓ | ✓ |
| Create/Update rows | ✓ | ✓ |
| **Cookbook** | | |
| List/Get recipes | ✓ | ✓ |
| **Semantic Search** | | |
| Vector search | ✓ | ✗ |
| RAG answers | ✓ | ✗ |
| **Background Sync** | | |
| Auto-indexing | ✓ | ✗ |
| Webhook sync | ✓ | ✗ |
| **Admin UI (`/app`)** | | |
| Vector sync status | ✓ | ✗ |
| Vector visualization | ✓ | ✗ |
| Webhook management | ✓ | ✗ |
| Session management | ✓ | ✗ |
### Implementation
#### 1. Deployment Mode Detection
```python
# nextcloud_mcp_server/config.py
class DeploymentMode(Enum):
SELF_HOSTED = "self_hosted" # Full features, env-based config
SMITHERY_STATELESS = "smithery" # Stateless, session-based config
def get_deployment_mode() -> DeploymentMode:
"""Detect deployment mode from environment."""
if os.getenv("SMITHERY_DEPLOYMENT") == "true":
return DeploymentMode.SMITHERY_STATELESS
return DeploymentMode.SELF_HOSTED
```
#### 2. Session-Based Client Factory
```python
# nextcloud_mcp_server/context.py
async def get_client(ctx: Context) -> NextcloudClient:
"""Get NextcloudClient - from session config or environment."""
mode = get_deployment_mode()
if mode == DeploymentMode.SMITHERY_STATELESS:
# Create client from Smithery session config
config = ctx.session_config
if not config:
raise McpError("Session configuration required")
return NextcloudClient(
base_url=config.nextcloud_url,
username=config.username,
password=config.app_password,
)
else:
# Existing behavior: from environment or OAuth context
return await _get_client_from_context(ctx)
```
#### 3. Conditional Tool Registration
```python
# nextcloud_mcp_server/app.py
def create_mcp_server(mode: DeploymentMode) -> FastMCP:
"""Create MCP server with mode-appropriate tools."""
mcp = FastMCP("Nextcloud MCP")
# Always register core tools
configure_notes_tools(mcp)
configure_calendar_tools(mcp)
configure_contacts_tools(mcp)
configure_webdav_tools(mcp)
configure_deck_tools(mcp)
configure_tables_tools(mcp)
configure_cookbook_tools(mcp)
# Only register stateful tools in self-hosted mode
if mode == DeploymentMode.SELF_HOSTED:
configure_semantic_tools(mcp) # Requires Qdrant
register_oauth_tools(mcp) # Requires token storage
return mcp
```
#### 4. Exclude Admin UI Routes
The `/app` admin UI should **not be installed** in Smithery mode because:
- **Vector sync status** - No vector sync in stateless mode
- **Vector visualization** - No Qdrant to visualize
- **Webhook management** - No webhook sync without background workers
- **Session management** - No persistent sessions to manage
```python
# nextcloud_mcp_server/app.py
def create_app(mode: DeploymentMode) -> Starlette:
"""Create Starlette app with mode-appropriate routes."""
routes = [
Route("/health/live", health_live, methods=["GET"]),
Route("/health/ready", health_ready, methods=["GET"]),
]
# Only mount admin UI in self-hosted mode
if mode == DeploymentMode.SELF_HOSTED:
browser_app = create_browser_app()
routes.append(
Route("/app", lambda r: RedirectResponse("/app/", status_code=307))
)
routes.append(Mount("/app", app=browser_app))
logger.info("Admin UI mounted at /app")
else:
logger.info("Admin UI disabled in Smithery stateless mode")
# Mount FastMCP at root
mcp_app = create_mcp_server(mode).streamable_http_app()
routes.append(Mount("/", app=mcp_app))
return Starlette(routes=routes, lifespan=starlette_lifespan)
```
**Endpoints by Mode:**
| Endpoint | Self-Hosted | Smithery |
|----------|-------------|----------|
| `/mcp` | ✓ | ✓ |
| `/health/live` | ✓ | ✓ |
| `/health/ready` | ✓ | ✓ |
| `/.well-known/mcp-config` | ✓ | ✓ |
| `/app` | ✓ | ✗ |
| `/app/vector-sync/status` | ✓ | ✗ |
| `/app/vector-viz` | ✓ | ✗ |
| `/app/webhooks` | ✓ | ✗ |
#### 5. Smithery Integration Files
**smithery.yaml:**
```yaml
runtime: "container"
build:
dockerfile: "Dockerfile.smithery"
dockerBuildPath: "."
startCommand:
type: "http"
configSchema:
type: "object"
required: ["nextcloud_url", "username", "app_password"]
properties:
nextcloud_url:
type: "string"
title: "Nextcloud URL"
description: "Your Nextcloud instance URL (e.g., https://cloud.example.com)"
username:
type: "string"
title: "Username"
description: "Your Nextcloud username"
app_password:
type: "string"
title: "App Password"
description: "Generate at Settings → Security → App passwords"
exampleConfig:
nextcloud_url: "https://cloud.example.com"
username: "alice"
app_password: "xxxxx-xxxxx-xxxxx-xxxxx-xxxxx"
```
**Dockerfile.smithery:**
```dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv
# Copy project files
COPY pyproject.toml uv.lock ./
COPY nextcloud_mcp_server ./nextcloud_mcp_server
# Install dependencies (without vector/semantic extras)
RUN uv sync --frozen --no-dev
# Set Smithery mode
ENV SMITHERY_DEPLOYMENT=true
ENV VECTOR_SYNC_ENABLED=false
# Smithery sets PORT=8081
EXPOSE 8081
CMD ["uv", "run", "python", "-m", "nextcloud_mcp_server.smithery_main"]
```
**nextcloud_mcp_server/smithery_main.py:**
```python
"""Smithery-specific entrypoint for stateless deployment."""
import os
import uvicorn
from starlette.middleware.cors import CORSMiddleware
from nextcloud_mcp_server.app import create_mcp_server
from nextcloud_mcp_server.config import DeploymentMode
def main():
# Force stateless mode
os.environ["SMITHERY_DEPLOYMENT"] = "true"
os.environ["VECTOR_SYNC_ENABLED"] = "false"
mcp = create_mcp_server(DeploymentMode.SMITHERY_STATELESS)
app = mcp.streamable_http_app()
# Add CORS for browser-based clients
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["GET", "POST", "OPTIONS"],
allow_headers=["*"],
expose_headers=["mcp-session-id", "mcp-protocol-version"],
)
# Smithery sets PORT environment variable
port = int(os.environ.get("PORT", 8081))
uvicorn.run(app, host="0.0.0.0", port=port)
if __name__ == "__main__":
main()
```
### Security Considerations
1. **App Passwords over User Passwords**
- Smithery config encourages app passwords (revocable, scoped)
- Documentation guides users to create dedicated app passwords
- App passwords can be revoked without changing main password
2. **HTTPS Required**
- `nextcloud_url` must be HTTPS for production use
- Validation rejects HTTP URLs in Smithery mode
3. **No Credential Storage**
- Credentials exist only for request duration
- No server-side persistence of user credentials
- Smithery handles secure config transmission
4. **Scope Limitation**
- Stateless mode cannot access offline_access
- No background operations on user's behalf
- Clear user expectation: tools work during session only
### Migration Path
Users can start with Smithery stateless mode and migrate to self-hosted:
1. **Try on Smithery** → Basic tools, no setup
2. **Self-host for semantic search** → Add Qdrant, enable vector sync
3. **Full deployment** → Background sync, webhooks, multi-user OAuth
## Consequences
### Positive
1. **Lower barrier to entry** - Users can try without infrastructure
2. **Multi-user support** - Each session connects to different Nextcloud
3. **Smithery ecosystem** - Discovery, observability, OAuth UI
4. **Clear feature tiers** - Stateless (simple) vs self-hosted (full)
### Negative
1. **No semantic search** - Key differentiator unavailable on Smithery
2. **Per-request auth** - Credentials sent with each request
3. **No offline access** - Cannot perform background operations
4. **Maintenance burden** - Two deployment modes to support
### Neutral
1. **Feature subset** - May encourage users to self-host for full features
2. **Documentation needs** - Clear guidance on mode differences required
## Alternatives Considered
### 1. External MCP Only
**Approach:** Only support self-hosted external MCP registration on Smithery.
**Rejected because:**
- Higher barrier to entry for new users
- Misses opportunity for Smithery marketplace visibility
- Users want to try before committing to infrastructure
### 2. Embedded Vector DB (SQLite-vec)
**Approach:** Use SQLite with vector extensions for per-request indexing.
**Rejected because:**
- No persistence between requests anyway
- Indexing latency too high for synchronous requests
- Complexity without benefit in stateless context
### 3. External Vector DB Service
**Approach:** Connect to Pinecone/Weaviate Cloud from Smithery container.
**Rejected because:**
- Adds external dependency and cost
- Per-user collections require complex multi-tenancy
- Sync still impossible without background workers
### 4. Hybrid: Smithery + User's Qdrant
**Approach:** User provides their own Qdrant URL in session config.
**Considered for future:**
- Could enable semantic search for advanced users
- Adds complexity to session config
- Sync still requires external trigger (manual or webhook)
## References
- [Smithery Documentation](https://smithery.ai/docs)
- [Smithery Session Configuration](https://smithery.ai/docs/build/session-config)
- [Smithery External MCPs](https://smithery.ai/docs/build/external)
- [MCP Streamable HTTP Transport](https://modelcontextprotocol.io/docs/concepts/transports)
- [Nextcloud App Passwords](https://docs.nextcloud.com/server/latest/user_manual/en/session_management.html#app-passwords)
+300 -78
View File
@@ -3,6 +3,7 @@ import os
import time
from collections.abc import AsyncIterator
from contextlib import AsyncExitStack, asynccontextmanager
from contextvars import ContextVar
from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional
@@ -25,6 +26,8 @@ from starlette.middleware.cors import CORSMiddleware
from starlette.responses import JSONResponse, RedirectResponse
from starlette.routing import Mount, Route
from starlette.staticfiles import StaticFiles
from starlette.types import ASGIApp, Receive, Send
from starlette.types import Scope as StarletteScope
from nextcloud_mcp_server.auth import (
InsufficientScopeError,
@@ -36,6 +39,8 @@ from nextcloud_mcp_server.auth import (
from nextcloud_mcp_server.auth.unified_verifier import UnifiedTokenVerifier
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import (
DeploymentMode,
get_deployment_mode,
get_document_processor_config,
get_settings,
)
@@ -122,6 +127,26 @@ def initialize_document_processors():
except Exception as e:
logger.warning(f"Failed to register Tesseract processor: {e}")
# Register PyMuPDF processor (high priority, local, no API required)
if "pymupdf" in config["processors"]:
pymupdf_config = config["processors"]["pymupdf"]
try:
from nextcloud_mcp_server.document_processors.pymupdf import (
PyMuPDFProcessor,
)
processor = PyMuPDFProcessor(
extract_images=pymupdf_config.get("extract_images", True),
image_dir=pymupdf_config.get("image_dir"),
)
registry.register(processor, priority=15) # Higher than unstructured
logger.info(
f"Registered PyMuPDF processor: extract_images={pymupdf_config.get('extract_images', True)}"
)
registered_count += 1
except Exception as e:
logger.warning(f"Failed to register PyMuPDF processor: {e}")
# Register custom processor
if "custom" in config["processors"]:
custom_config = config["processors"]["custom"]
@@ -244,17 +269,160 @@ class OAuthAppContext:
)
@dataclass
class SmitheryAppContext:
"""Application context for Smithery stateless mode.
ADR-016: No shared client - clients created per-request from session config.
"""
pass # No shared state needed - everything comes from session config
# ADR-016: Smithery config schema for container runtime
# This schema is served at /.well-known/mcp-config for Smithery discovery
# See: https://smithery.ai/docs/build/session-config
SMITHERY_CONFIG_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://server.smithery.ai/nextcloud-mcp-server/.well-known/mcp-config",
"title": "Nextcloud MCP Server Configuration",
"description": "Configuration for connecting to your Nextcloud instance via app password authentication",
"x-query-style": "flat", # Our schema has no nested objects, so flat style works
"type": "object",
"required": ["nextcloud_url", "username", "app_password"],
"properties": {
"nextcloud_url": {
"type": "string",
"title": "Nextcloud URL",
"description": "Your Nextcloud instance URL (e.g., https://cloud.example.com). Must be publicly accessible.",
"pattern": "^https?://.+",
},
"username": {
"type": "string",
"title": "Username",
"description": "Your Nextcloud username",
"minLength": 1,
},
"app_password": {
"type": "string",
"title": "App Password",
"description": "Nextcloud app password. Generate at Settings > Security > App passwords. Do NOT use your main password.",
"minLength": 1,
},
},
"additionalProperties": False,
}
# ADR-016: Context variable to hold Smithery session config per-request
# This is set by SmitheryConfigMiddleware and accessed in context.py
_smithery_session_config: ContextVar[dict[str, str] | None] = ContextVar(
"smithery_session_config"
)
_smithery_session_config.set(None) # Set initial value
def get_smithery_session_config() -> dict | None:
"""Get the current Smithery session config from context variable.
Used by context.py to access config extracted from URL query parameters.
"""
return _smithery_session_config.get()
class SmitheryConfigMiddleware:
"""Middleware to extract Smithery config from URL query parameters.
ADR-016: For container runtime, Smithery passes configuration as URL query
parameters to the /mcp endpoint. This middleware extracts those parameters
and stores them in a context variable for access in tools.
Configuration parameters:
- nextcloud_url: Nextcloud instance URL
- username: Nextcloud username
- app_password: Nextcloud app password
The extracted config is stored in a ContextVar and can be accessed via
get_smithery_session_config() in context.py.
"""
def __init__(self, app: ASGIApp):
self.app = app
async def __call__(
self, scope: StarletteScope, receive: Receive, send: Send
) -> None:
if scope["type"] == "http":
# Extract config from query parameters
from urllib.parse import parse_qs
query_string = scope.get("query_string", b"").decode("utf-8")
params = parse_qs(query_string)
# Build session config from query parameters
# Smithery uses dot notation for nested objects, but our schema is flat
session_config = {}
for key in ["nextcloud_url", "username", "app_password"]:
if key in params:
# parse_qs returns lists, take first value
session_config[key] = params[key][0]
# Store in context variable for access by context.py
if session_config:
_smithery_session_config.set(session_config)
logger.debug(
f"Smithery config extracted: nextcloud_url={session_config.get('nextcloud_url')}, "
f"username={session_config.get('username')}"
)
try:
await self.app(scope, receive, send)
finally:
# Clear context variable after request
_smithery_session_config.set(None)
@asynccontextmanager
async def app_lifespan_smithery(server: FastMCP) -> AsyncIterator[SmitheryAppContext]:
"""
Manage application lifecycle for Smithery stateless mode.
ADR-016: Minimal lifespan with no shared state.
- No shared Nextcloud client (created per-request from session config)
- No vector sync (disabled in Smithery mode)
- No persistent storage (stateless deployment)
- No document processors (not enabled in Smithery mode)
"""
logger.info("Starting MCP server in Smithery stateless mode")
logger.info("Clients will be created per-request from session config")
try:
yield SmitheryAppContext()
finally:
logger.info("Shutting down Smithery stateless mode")
def is_oauth_mode() -> bool:
"""
Determine if OAuth mode should be used.
OAuth mode is enabled when:
- NEXTCLOUD_USERNAME and NEXTCLOUD_PASSWORD are NOT set
- AND we are NOT in Smithery stateless mode
- Or explicitly enabled via configuration
Returns:
True if OAuth mode, False if BasicAuth mode
"""
# ADR-016: Smithery stateless mode uses per-request BasicAuth from session config
# It's not OAuth mode even though env credentials aren't set
deployment_mode = get_deployment_mode()
if deployment_mode == DeploymentMode.SMITHERY_STATELESS:
logger.info(
"BasicAuth mode (Smithery stateless - credentials from session config)"
)
return False
username = os.getenv("NEXTCLOUD_USERNAME")
password = os.getenv("NEXTCLOUD_PASSWORD")
@@ -838,8 +1006,9 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
"OpenTelemetry tracing disabled (set OTEL_EXPORTER_OTLP_ENDPOINT to enable)"
)
# Determine authentication mode
# Determine authentication mode and deployment mode
oauth_enabled = is_oauth_mode()
deployment_mode = get_deployment_mode()
if oauth_enabled:
logger.info("Configuring MCP server for OAuth mode")
@@ -900,8 +1069,17 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
auth=auth_settings,
)
else:
logger.info("Configuring MCP server for BasicAuth mode")
mcp = FastMCP("Nextcloud MCP", lifespan=app_lifespan_basic)
# ADR-016: Use Smithery lifespan for stateless mode, BasicAuth otherwise
if deployment_mode == DeploymentMode.SMITHERY_STATELESS:
logger.info("Configuring MCP server for Smithery stateless mode")
# json_response=True returns plain JSON-RPC instead of SSE format,
# required for Smithery scanner compatibility
mcp = FastMCP(
"Nextcloud MCP", lifespan=app_lifespan_smithery, json_response=True
)
else:
logger.info("Configuring MCP server for BasicAuth mode")
mcp = FastMCP("Nextcloud MCP", lifespan=app_lifespan_basic)
@mcp.resource("nc://capabilities")
async def nc_get_capabilities():
@@ -937,8 +1115,12 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
)
# Register semantic search tools (cross-app feature)
# ADR-016: Skip in Smithery stateless mode (no vector database)
settings = get_settings()
if settings.vector_sync_enabled:
deployment_mode = get_deployment_mode()
if deployment_mode == DeploymentMode.SMITHERY_STATELESS:
logger.info("Skipping semantic search tools (Smithery stateless mode)")
elif settings.vector_sync_enabled:
logger.info("Configuring semantic search tools (vector sync enabled)")
configure_semantic_tools(mcp)
else:
@@ -1341,6 +1523,26 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
)
logger.info("Test webhook endpoint enabled: /webhooks/nextcloud")
# ADR-016: Add Smithery well-known config endpoint for container runtime discovery
if deployment_mode == DeploymentMode.SMITHERY_STATELESS:
def smithery_mcp_config(request):
"""Smithery MCP configuration endpoint.
Returns JSON Schema for Smithery's configuration UI.
This endpoint is required for Smithery container runtime discovery.
"""
return JSONResponse(SMITHERY_CONFIG_SCHEMA)
routes.append(
Route(
"/.well-known/mcp-config",
smithery_mcp_config,
methods=["GET"],
)
)
logger.info("Smithery config endpoint enabled: /.well-known/mcp-config")
# Note: Metrics endpoint is NOT exposed on main HTTP port for security reasons.
# Metrics are served on dedicated port via setup_metrics() (default: 9090)
@@ -1471,85 +1673,98 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
)
# Add user info routes (available in both BasicAuth and OAuth modes)
# These require session authentication, so we wrap them in a separate app
from nextcloud_mcp_server.auth.session_backend import SessionAuthBackend
from nextcloud_mcp_server.auth.userinfo_routes import (
revoke_session,
user_info_html,
vector_sync_status_fragment,
)
from nextcloud_mcp_server.auth.viz_routes import (
chunk_context_endpoint,
vector_visualization_html,
vector_visualization_search,
)
from nextcloud_mcp_server.auth.webhook_routes import (
disable_webhook_preset,
enable_webhook_preset,
webhook_management_pane,
)
# Create a separate Starlette app for browser routes that need session auth
# This prevents SessionAuthBackend from interfering with FastMCP's OAuth
browser_routes = [
Route("/", user_info_html, methods=["GET"]), # /app → user info with all tabs
Route(
"/revoke", revoke_session, methods=["POST"], name="revoke_session_endpoint"
), # /app/revoke → revoke_session
# Vector sync status fragment (htmx polling)
Route(
"/vector-sync/status",
# ADR-016: Skip /app admin UI in Smithery stateless mode (no vector sync, webhooks)
if deployment_mode != DeploymentMode.SMITHERY_STATELESS:
# These require session authentication, so we wrap them in a separate app
from nextcloud_mcp_server.auth.session_backend import SessionAuthBackend
from nextcloud_mcp_server.auth.userinfo_routes import (
revoke_session,
user_info_html,
vector_sync_status_fragment,
methods=["GET"],
), # /app/vector-sync/status
# Vector visualization routes
Route(
"/vector-viz", vector_visualization_html, methods=["GET"]
), # /app/vector-viz
Route(
"/vector-viz/search",
vector_visualization_search,
methods=["GET"],
), # /app/vector-viz/search
Route(
"/chunk-context",
chunk_context_endpoint,
methods=["GET"],
), # /app/chunk-context
# Webhook management routes (admin-only)
Route("/webhooks", webhook_management_pane, methods=["GET"]), # /app/webhooks
Route(
"/webhooks/enable/{preset_id:str}", enable_webhook_preset, methods=["POST"]
),
Route(
"/webhooks/disable/{preset_id:str}",
disable_webhook_preset,
methods=["DELETE"],
),
]
# Add static files mount if directory exists
static_dir = os.path.join(os.path.dirname(__file__), "auth", "static")
if os.path.isdir(static_dir):
browser_routes.append(
Mount("/static", StaticFiles(directory=static_dir), name="static")
)
logger.info(f"Mounted static files from {static_dir}")
from nextcloud_mcp_server.auth.viz_routes import (
chunk_context_endpoint,
vector_visualization_html,
vector_visualization_search,
)
from nextcloud_mcp_server.auth.webhook_routes import (
disable_webhook_preset,
enable_webhook_preset,
webhook_management_pane,
)
browser_app = Starlette(routes=browser_routes)
browser_app.add_middleware(
AuthenticationMiddleware, # type: ignore[invalid-argument-type]
backend=SessionAuthBackend(oauth_enabled=oauth_enabled),
)
# Create a separate Starlette app for browser routes that need session auth
# This prevents SessionAuthBackend from interfering with FastMCP's OAuth
browser_routes = [
Route(
"/", user_info_html, methods=["GET"]
), # /app → user info with all tabs
Route(
"/revoke",
revoke_session,
methods=["POST"],
name="revoke_session_endpoint",
), # /app/revoke → revoke_session
# Vector sync status fragment (htmx polling)
Route(
"/vector-sync/status",
vector_sync_status_fragment,
methods=["GET"],
), # /app/vector-sync/status
# Vector visualization routes
Route(
"/vector-viz", vector_visualization_html, methods=["GET"]
), # /app/vector-viz
Route(
"/vector-viz/search",
vector_visualization_search,
methods=["GET"],
), # /app/vector-viz/search
Route(
"/chunk-context",
chunk_context_endpoint,
methods=["GET"],
), # /app/chunk-context
# Webhook management routes (admin-only)
Route(
"/webhooks", webhook_management_pane, methods=["GET"]
), # /app/webhooks
Route(
"/webhooks/enable/{preset_id:str}",
enable_webhook_preset,
methods=["POST"],
),
Route(
"/webhooks/disable/{preset_id:str}",
disable_webhook_preset,
methods=["DELETE"],
),
]
# Add redirect from /app to /app/ (Starlette requires trailing slash for mounted apps)
routes.append(
Route("/app", lambda request: RedirectResponse("/app/", status_code=307))
)
# Add static files mount if directory exists
static_dir = os.path.join(os.path.dirname(__file__), "auth", "static")
if os.path.isdir(static_dir):
browser_routes.append(
Mount("/static", StaticFiles(directory=static_dir), name="static")
)
logger.info(f"Mounted static files from {static_dir}")
# Mount browser app at /app (webapp and admin routes)
routes.append(Mount("/app", app=browser_app))
logger.info("App routes with session auth: /app, /app/webhooks, /app/revoke")
browser_app = Starlette(routes=browser_routes)
browser_app.add_middleware(
AuthenticationMiddleware, # type: ignore[invalid-argument-type]
backend=SessionAuthBackend(oauth_enabled=oauth_enabled),
)
# Add redirect from /app to /app/ (Starlette requires trailing slash for mounted apps)
routes.append(
Route("/app", lambda request: RedirectResponse("/app/", status_code=307))
)
# Mount browser app at /app (webapp and admin routes)
routes.append(Mount("/app", app=browser_app))
logger.info("App routes with session auth: /app, /app/webhooks, /app/revoke")
else:
logger.info("Admin UI (/app) disabled in Smithery stateless mode")
# Mount FastMCP at root last (catch-all, handles OAuth via token_verifier)
routes.append(Mount("/", app=mcp_app))
@@ -1669,4 +1884,11 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
logger.info("WWW-Authenticate scope challenge handler enabled")
# ADR-016: Apply SmitheryConfigMiddleware in Smithery stateless mode
# This must be the outermost middleware to extract config from URL query parameters
# before any other middleware processes the request
if deployment_mode == DeploymentMode.SMITHERY_STATELESS:
app = SmitheryConfigMiddleware(app)
logger.info("SmitheryConfigMiddleware enabled for query parameter config")
return app
@@ -190,3 +190,30 @@
color: var(--color-text-maxcontrast);
font-style: italic;
}
/* PDF highlighted image styles */
.chunk-image-container {
margin-bottom: 16px;
border: 1px solid var(--color-border);
border-radius: var(--border-radius);
overflow: hidden;
background: #fff;
}
.chunk-image-header {
background: var(--color-background-dark);
padding: 8px 12px;
font-size: 12px;
font-weight: 500;
color: var(--color-text-maxcontrast);
border-bottom: 1px solid var(--color-border);
font-family: var(--font-face);
}
.chunk-highlighted-image {
display: block;
max-width: 100%;
height: auto;
cursor: zoom-in;
}
.chunk-highlighted-image:hover {
opacity: 0.95;
}
@@ -217,7 +217,7 @@ function vizApp() {
},
async toggleChunk(result) {
const resultKey = `${result.doc_type}_${result.id}`;
const resultKey = `${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`;
if (this.isChunkExpanded(resultKey)) {
delete this.expandedChunks[resultKey];
@@ -117,12 +117,13 @@
<template x-if="!loading && results.length > 0">
<div x-transition.opacity.duration.200ms>
<template x-for="result in results" :key="result.id">
<template x-for="result in results" :key="`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`">
<div style="padding: 12px; border-bottom: 1px solid #eee;">
<a :href="getNextcloudUrl(result)" target="_blank" style="font-weight: 500; color: #0066cc; text-decoration: none;">
<span x-text="result.title"></span>
</a>
<div style="font-size: 14px; color: #666; margin-top: 4px;" x-text="result.excerpt"></div>
<div style="font-size: 14px; color: #666; margin-top: 4px;"
x-text="result.excerpt.length > 200 ? result.excerpt.substring(0, 200) + '...' : result.excerpt"></div>
<div style="font-size: 12px; color: #999; margin-top: 4px;">
Raw Score: <span x-text="result.original_score.toFixed(3)"></span>
(<span x-text="(result.score * 100).toFixed(0)"></span>% relative) |
@@ -134,22 +135,36 @@
<button
class="chunk-toggle-btn"
@click="toggleChunk(result)"
x-text="isChunkExpanded(`${result.doc_type}_${result.id}`) ? 'Hide Chunk' : 'Show Chunk'"
x-text="isChunkExpanded(`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`) ? 'Hide Chunk' : 'Show Chunk'"
></button>
</template>
<!-- Chunk context (expanded inline) -->
<template x-if="isChunkExpanded(`${result.doc_type}_${result.id}`)">
<template x-if="isChunkExpanded(`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`)">
<div class="chunk-context" x-transition.opacity.duration.200ms>
<template x-if="chunkLoading[`${result.doc_type}_${result.id}`]">
<template x-if="chunkLoading[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]">
<div style="color: #666; font-style: italic;">Loading chunk...</div>
</template>
<template x-if="!chunkLoading[`${result.doc_type}_${result.id}`]">
<template x-if="!chunkLoading[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]">
<div>
<template x-if="expandedChunks[`${result.doc_type}_${result.id}`]?.has_more_before">
<!-- Highlighted page image for PDFs -->
<template x-if="expandedChunks[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]?.highlighted_page_image">
<div class="chunk-image-container">
<div class="chunk-image-header">
<span>Page <span x-text="expandedChunks[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]?.page_number"></span></span>
</div>
<img
:src="'data:image/png;base64,' + expandedChunks[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]?.highlighted_page_image"
:alt="'Page ' + expandedChunks[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]?.page_number"
class="chunk-highlighted-image"
/>
</div>
</template>
<!-- Text context -->
<template x-if="expandedChunks[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]?.has_more_before">
<span class="chunk-ellipsis">...</span>
</template>
<span class="chunk-text" x-text="expandedChunks[`${result.doc_type}_${result.id}`]?.before_context"></span><span class="chunk-matched" x-text="expandedChunks[`${result.doc_type}_${result.id}`]?.chunk_text"></span><span class="chunk-text" x-text="expandedChunks[`${result.doc_type}_${result.id}`]?.after_context"></span><template x-if="expandedChunks[`${result.doc_type}_${result.id}`]?.has_more_after">
<span class="chunk-text" x-text="expandedChunks[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]?.before_context"></span><span class="chunk-matched" x-text="expandedChunks[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]?.chunk_text"></span><span class="chunk-text" x-text="expandedChunks[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]?.after_context"></span><template x-if="expandedChunks[`${result.doc_type}_${result.id}_${result.chunk_start_offset || 0}`]?.has_more_after">
<span class="chunk-ellipsis">...</span>
</template>
</div>
+28 -17
View File
@@ -18,6 +18,8 @@ from starlette.authentication import requires
from starlette.requests import Request
from starlette.responses import HTMLResponse, JSONResponse
from nextcloud_mcp_server.client import NextcloudClient
logger = logging.getLogger(__name__)
# Setup Jinja2 environment for templates
@@ -25,14 +27,20 @@ _template_dir = Path(__file__).parent / "templates"
_jinja_env = Environment(loader=FileSystemLoader(_template_dir))
async def _get_authenticated_client_for_userinfo(request: Request) -> httpx.AsyncClient:
"""Get an authenticated HTTP client for user info page operations.
async def _get_authenticated_client_for_userinfo(request: Request) -> NextcloudClient:
"""Get an authenticated Nextcloud client for user info page operations.
This is a shared helper for authenticated routes that need to access
Nextcloud APIs. It handles both BasicAuth and OAuth authentication modes.
Args:
request: Starlette request object
Returns:
Authenticated httpx.AsyncClient
Authenticated NextcloudClient
Raises:
RuntimeError: If credentials/session not configured
"""
oauth_ctx = getattr(request.app.state, "oauth_context", None)
@@ -45,11 +53,15 @@ async def _get_authenticated_client_for_userinfo(request: Request) -> httpx.Asyn
if not all([nextcloud_host, username, password]):
raise RuntimeError("BasicAuth credentials not configured")
assert nextcloud_host is not None # Type narrowing for type checker
return httpx.AsyncClient(
from httpx import BasicAuth
assert nextcloud_host is not None
assert username is not None
assert password is not None
return NextcloudClient(
base_url=nextcloud_host,
auth=(username, password),
timeout=30.0,
username=username,
auth=BasicAuth(username, password),
)
# OAuth mode - get token from session
@@ -64,15 +76,14 @@ async def _get_authenticated_client_for_userinfo(request: Request) -> httpx.Asyn
raise RuntimeError("No access token found in session")
access_token = token_data["access_token"]
username = token_data.get("username")
nextcloud_host = oauth_ctx.get("config", {}).get("nextcloud_host", "")
if not nextcloud_host:
raise RuntimeError("Nextcloud host not configured")
if not nextcloud_host or not username:
raise RuntimeError("Nextcloud host or username not configured")
return httpx.AsyncClient(
base_url=nextcloud_host,
headers={"Authorization": f"Bearer {access_token}"},
timeout=30.0,
return NextcloudClient.from_token(
base_url=nextcloud_host, token=access_token, username=username
)
@@ -423,10 +434,10 @@ async def user_info_html(request: Request) -> HTMLResponse:
try:
from nextcloud_mcp_server.auth.permissions import is_nextcloud_admin
# Get authenticated HTTP client
http_client = await _get_authenticated_client_for_userinfo(request)
is_admin = await is_nextcloud_admin(request, http_client)
await http_client.aclose()
# Get authenticated Nextcloud client
nc_client = await _get_authenticated_client_for_userinfo(request)
is_admin = await is_nextcloud_admin(request, nc_client._client)
await nc_client.close()
except Exception as e:
logger.warning(f"Failed to check admin status: {e}")
# Default to not admin if check fails
+178 -149
View File
@@ -27,6 +27,7 @@ from nextcloud_mcp_server.search import (
SemanticSearchAlgorithm,
)
from nextcloud_mcp_server.vector.pca import PCA
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
logger = logging.getLogger(__name__)
@@ -138,7 +139,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
_get_authenticated_client_for_userinfo,
)
async with await _get_authenticated_client_for_userinfo(request) as http_client: # noqa: F841
async with await _get_authenticated_client_for_userinfo(request) as nc_client: # noqa: F841
# Create search algorithm (no client needed - verification removed)
if algorithm == "semantic":
search_algo = SemanticSearchAlgorithm(score_threshold=score_threshold)
@@ -212,75 +213,51 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
"success": True,
"results": [],
"coordinates_3d": [],
"query_coords": None,
"query_coords": [],
"message": "No results found",
}
)
# Fetch vectors for matching results from Qdrant
# Fetch vectors for specific matching chunks from Qdrant using batch retrieve
vector_fetch_start = time.perf_counter()
qdrant_client = await get_qdrant_client()
doc_ids = [r.id for r in search_results]
# Retrieve vectors for the matching documents
from qdrant_client.models import FieldCondition, Filter, MatchAny
chunk_vectors_map = {} # Map (doc_id, chunk_start, chunk_end) -> vector
points_response = await qdrant_client.scroll(
collection_name=settings.get_collection_name(),
scroll_filter=Filter(
must=[
FieldCondition(
key="doc_id",
match=MatchAny(any=[str(doc_id) for doc_id in doc_ids]),
),
FieldCondition(
key="user_id",
match={"value": username},
),
]
),
limit=len(doc_ids) * 2, # Account for multiple chunks per doc
with_vectors=["dense"], # Only fetch dense vectors for visualization
with_payload=["doc_id"], # Need doc_id to map vectors to results
)
# Collect point IDs from search results for batch retrieval
# point_id is the Qdrant internal ID returned by search algorithms
point_ids = [r.point_id for r in search_results if r.point_id]
points = points_response[0]
if not points:
return JSONResponse(
{
"success": True,
"results": [],
"coordinates_2d": [],
"message": "No vectors found for results",
}
if point_ids:
# Single batch retrieve call instead of N sequential scroll calls
# This is ~50x faster for 50 results (1 HTTP request vs 50)
points_response = await qdrant_client.retrieve(
collection_name=settings.get_collection_name(),
ids=point_ids,
with_vectors=["dense"],
with_payload=["doc_id", "chunk_start_offset", "chunk_end_offset"],
)
# Extract dense vectors and group by document
def extract_dense_vector(point):
if point.vector is None:
return None
# If named vectors (dict), extract "dense"
if isinstance(point.vector, dict):
return point.vector.get("dense")
# If unnamed vector (array), use directly
return point.vector
# Build chunk_vectors_map from batch response
for point in points_response:
if point.vector is not None:
# Extract dense vector (handle both named and unnamed vectors)
if isinstance(point.vector, dict):
vector = point.vector.get("dense")
else:
vector = point.vector
# Group chunk vectors by doc_id
from collections import defaultdict
doc_chunks = defaultdict(list)
for point in points:
if point.payload:
doc_id = int(point.payload.get("doc_id", 0))
vector = extract_dense_vector(point)
if vector is not None:
doc_chunks[doc_id].append(vector)
if vector is not None and point.payload:
doc_id = point.payload.get("doc_id")
chunk_start = point.payload.get("chunk_start_offset")
chunk_end = point.payload.get("chunk_end_offset")
chunk_key = (doc_id, chunk_start, chunk_end)
chunk_vectors_map[chunk_key] = vector
vector_fetch_duration = time.perf_counter() - vector_fetch_start
if len(doc_chunks) < 2:
# Not enough documents for PCA
if len(chunk_vectors_map) < 2:
# Not enough chunks for PCA
return JSONResponse(
{
"success": True,
@@ -296,15 +273,15 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
],
"coordinates_3d": [[0, 0, 0]] * len(search_results),
"query_coords": [0, 0, 0],
"message": "Not enough documents for PCA",
"message": "Not enough chunks for PCA",
}
)
# Detect embedding dimension from first available vector
embedding_dim = None
for chunks in doc_chunks.values():
if chunks:
embedding_dim = len(chunks[0])
for vector in chunk_vectors_map.values():
if vector is not None:
embedding_dim = len(vector)
break
if embedding_dim is None:
@@ -318,37 +295,42 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
logger.info(f"Detected embedding dimension: {embedding_dim}")
# Average chunk vectors per document to create document-level embeddings
# Maintain order of search_results for coordinate mapping
doc_vectors = []
# Build chunk vectors array in search_results order (1:1 mapping)
chunk_vectors = []
for result in search_results:
if result.id in doc_chunks:
# Average all chunk embeddings for this document
chunk_vectors = np.array(doc_chunks[result.id])
avg_vector = np.mean(chunk_vectors, axis=0)
doc_vectors.append(avg_vector)
logger.debug(f"Doc {result.id}: averaged {len(chunk_vectors)} chunks")
chunk_key = (result.id, result.chunk_start_offset, result.chunk_end_offset)
if chunk_key in chunk_vectors_map:
chunk_vectors.append(chunk_vectors_map[chunk_key])
else:
# Document not found in vectors (shouldn't happen)
logger.warning(f"Doc {result.id} not found in fetched vectors")
# Use zero vector as fallback with detected dimension
doc_vectors.append(np.zeros(embedding_dim))
# Chunk not found in vectors (shouldn't happen)
logger.warning(
f"Chunk {chunk_key} not found in fetched vectors, using zero vector"
)
# Use zero vector as fallback
chunk_vectors.append(np.zeros(embedding_dim))
doc_vectors = np.array(doc_vectors)
chunk_vectors = np.array(chunk_vectors)
# Generate query embedding for visualization
# Reuse query embedding from search algorithm (avoids redundant embedding call)
query_embed_start = time.perf_counter()
from nextcloud_mcp_server.embedding.service import get_embedding_service
if search_algo.query_embedding is not None:
query_embedding = search_algo.query_embedding
logger.info(
f"Reusing query embedding from search algorithm "
f"(dimension={len(query_embedding)})"
)
else:
# Fallback: generate embedding if not available from search
from nextcloud_mcp_server.embedding.service import get_embedding_service
embedding_service = get_embedding_service()
query_embedding = await embedding_service.embed(query)
embedding_service = get_embedding_service()
query_embedding = await embedding_service.embed(query)
logger.info(f"Generated query embedding (dimension={len(query_embedding)})")
query_embed_duration = time.perf_counter() - query_embed_start
logger.info(f"Generated query embedding (dimension={len(query_embedding)})")
# Combine query vector with document vectors for PCA
# Combine query vector with chunk vectors for PCA
# Query will be the last point in the array
all_vectors = np.vstack([doc_vectors, np.array([query_embedding])])
all_vectors = np.vstack([chunk_vectors, np.array([query_embedding])])
# Normalize vectors to unit length (L2 normalization)
# This is critical because Qdrant uses COSINE distance, which only measures
@@ -375,9 +357,19 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
)
# Apply PCA dimensionality reduction (768-dim → 3D) on normalized vectors
# Run in thread pool to avoid blocking the event loop (CPU-bound)
pca_start = time.perf_counter()
pca = PCA(n_components=3)
coords_3d = pca.fit_transform(all_vectors_normalized)
def _compute_pca(vectors: np.ndarray) -> tuple[np.ndarray, PCA]:
pca = PCA(n_components=3)
coords = pca.fit_transform(vectors)
return coords, pca
import anyio
coords_3d, pca = await anyio.to_thread.run_sync( # type: ignore[attr-defined]
lambda: _compute_pca(all_vectors_normalized)
)
pca_duration = time.perf_counter() - pca_start
# After fit, these attributes are guaranteed to be set
@@ -394,17 +386,12 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
# Replace NaN with 0 to allow JSON serialization
coords_3d = np.nan_to_num(coords_3d, nan=0.0)
# Split query coords from document coords
# Split query coords from chunk coords
# Round to 2 decimal places for cleaner display
query_coords_3d = [
round(float(x), 2) for x in coords_3d[-1]
] # Last point is query
doc_coords_3d = coords_3d[:-1] # All but last are documents
total_chunks = sum(len(chunks) for chunks in doc_chunks.values())
avg_chunks_per_doc = (
total_chunks / len(doc_vectors) if doc_vectors.size > 0 else 0
)
chunk_coords_3d = coords_3d[:-1] # All but last are chunks
logger.info(
f"PCA explained variance: PC1={pca.explained_variance_ratio_[0]:.3f}, "
@@ -412,13 +399,14 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
f"PC3={pca.explained_variance_ratio_[2]:.3f}"
)
logger.info(
f"Embedding stats: documents={len(doc_vectors)}, "
f"total_chunks={total_chunks}, avg_chunks_per_doc={avg_chunks_per_doc:.1f}, "
f"query_dim={len(query_embedding)}, doc_vector_dim={doc_vectors.shape[1] if doc_vectors.size > 0 else 0}"
f"Embedding stats: chunks={len(chunk_vectors)}, "
f"query_dim={len(query_embedding)}, chunk_vector_dim={chunk_vectors.shape[1] if chunk_vectors.size > 0 else 0}"
)
# Coordinates already match search_results order (1:1 mapping)
result_coords = [[round(float(x), 2) for x in coord] for coord in doc_coords_3d]
result_coords = [
[round(float(x), 2) for x in coord] for coord in chunk_coords_3d
]
# Build response
response_results = [
@@ -447,7 +435,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
f"vector_fetch={vector_fetch_duration * 1000:.1f}ms ({vector_fetch_duration / total_duration * 100:.1f}%), "
f"query_embed={query_embed_duration * 1000:.1f}ms ({query_embed_duration / total_duration * 100:.1f}%), "
f"pca={pca_duration * 1000:.1f}ms ({pca_duration / total_duration * 100:.1f}%), "
f"results={len(search_results)}, doc_vectors={len(doc_vectors)}"
f"results={len(search_results)}, chunk_vectors={len(chunk_vectors)}"
)
return JSONResponse(
@@ -468,7 +456,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
"query_embed_ms": round(query_embed_duration * 1000, 2),
"pca_ms": round(pca_duration * 1000, 2),
"num_results": len(search_results),
"num_doc_vectors": len(doc_vectors),
"num_chunk_vectors": len(chunk_vectors),
},
}
)
@@ -517,77 +505,118 @@ async def chunk_context_endpoint(request: Request) -> JSONResponse:
status_code=400,
)
# Type assertions - we validated these above
assert doc_type is not None
assert doc_id is not None
assert start_str is not None
assert end_str is not None
start = int(start_str)
end = int(end_str)
# Convert doc_id to int (all document types use int IDs)
doc_id_int = int(doc_id)
# Currently only support notes
if doc_type != "note":
return JSONResponse(
{"success": False, "error": f"Unsupported doc_type: {doc_type}"},
status_code=400,
)
# Get authenticated HTTP client and fetch note
# Get authenticated Nextcloud client
from nextcloud_mcp_server.auth.userinfo_routes import (
_get_authenticated_client_for_userinfo,
)
from nextcloud_mcp_server.client.notes import NotesClient
from nextcloud_mcp_server.search.context import get_chunk_with_context
# Get username from request auth
username = (
request.user.display_name
if hasattr(request.user, "display_name")
else "unknown"
)
# Use context expansion module to fetch chunk with surrounding context
async with await _get_authenticated_client_for_userinfo(request) as nc_client:
chunk_context = await get_chunk_with_context(
nc_client=nc_client,
user_id=request.user.display_name, # User ID from auth
doc_id=doc_id_int,
doc_type=doc_type,
chunk_start=start,
chunk_end=end,
context_chars=context_chars,
)
# Create notes client with authenticated HTTP client
http_client = await _get_authenticated_client_for_userinfo(request)
notes_client = NotesClient(http_client, username)
# Fetch full note content
note = await notes_client.get_note(int(doc_id))
full_content = f"{note['title']}\n\n{note['content']}"
# Validate offsets
if start < 0 or end > len(full_content) or start >= end:
# Check if context expansion succeeded
if chunk_context is None:
return JSONResponse(
{
"success": False,
"error": f"Invalid offsets: start={start}, end={end}, content_length={len(full_content)}",
"error": f"Failed to fetch chunk context for {doc_type} {doc_id}",
},
status_code=400,
status_code=404,
)
# Extract chunk
chunk_text = full_content[start:end]
# Extract context before and after
before_start = max(0, start - context_chars)
before_context = full_content[before_start:start]
after_end = min(len(full_content), end + context_chars)
after_context = full_content[end:after_end]
# Determine if there's more content
has_more_before = before_start > 0
has_more_after = after_end < len(full_content)
logger.info(
f"Fetched chunk context for {doc_type}_{doc_id}: "
f"chunk_len={len(chunk_text)}, before_len={len(before_context)}, "
f"after_len={len(after_context)}"
f"chunk_len={len(chunk_context.chunk_text)}, "
f"before_len={len(chunk_context.before_context)}, "
f"after_len={len(chunk_context.after_context)}"
)
return JSONResponse(
{
"success": True,
"chunk_text": chunk_text,
"before_context": before_context,
"after_context": after_context,
"has_more_before": has_more_before,
"has_more_after": has_more_after,
}
)
# For PDF files, also fetch the highlighted page image from Qdrant
highlighted_page_image = None
page_number = None
if doc_type == "file":
try:
from qdrant_client.models import FieldCondition, Filter, MatchValue
settings = get_settings()
qdrant_client = await get_qdrant_client()
username = request.user.display_name
# Query for this specific chunk's highlighted image
points_response = await qdrant_client.scroll(
collection_name=settings.get_collection_name(),
scroll_filter=Filter(
must=[
get_placeholder_filter(),
FieldCondition(
key="doc_id", match=MatchValue(value=doc_id_int)
),
FieldCondition(
key="user_id", match=MatchValue(value=username)
),
FieldCondition(
key="chunk_start_offset", match=MatchValue(value=start)
),
FieldCondition(
key="chunk_end_offset", match=MatchValue(value=end)
),
]
),
limit=1,
with_vectors=False,
with_payload=["highlighted_page_image", "page_number"],
)
points = points_response[0]
if points and points[0].payload:
highlighted_page_image = points[0].payload.get(
"highlighted_page_image"
)
page_number = points[0].payload.get("page_number")
if highlighted_page_image:
logger.info(
f"Found highlighted image for chunk: "
f"page={page_number}, image_size={len(highlighted_page_image)}"
)
except Exception as e:
logger.warning(f"Failed to fetch highlighted image: {e}")
# Return response compatible with frontend expectations
response_data: dict = {
"success": True,
"chunk_text": chunk_context.chunk_text,
"before_context": chunk_context.before_context,
"after_context": chunk_context.after_context,
"has_more_before": chunk_context.has_before_truncation,
"has_more_after": chunk_context.has_after_truncation,
}
# Add image data if available
if highlighted_page_image:
response_data["highlighted_page_image"] = highlighted_page_image
response_data["page_number"] = page_number
return JSONResponse(response_data)
except ValueError as e:
logger.error(f"Invalid parameter format: {e}")
+65
View File
@@ -130,10 +130,75 @@ class NextcloudClient:
all_notes = self.notes.get_all_notes()
return await self._notes_search.search_notes(all_notes, query)
async def find_files_by_tag(
self, tag_name: str, mime_type_filter: str | None = None
) -> list[dict]:
"""Find files by system tag name, optionally filtered by MIME type.
This method coordinates tag lookup and file retrieval via WebDAV:
1. Look up the tag ID by name
2. Get all files with that tag (via REPORT with full metadata)
3. Optionally filter by MIME type
Args:
tag_name: Name of the system tag to search for (e.g., "vector-index")
mime_type_filter: Optional MIME type filter (e.g., "application/pdf")
Returns:
List of file dictionaries with WebDAV properties (path, size, content_type, etc.)
Raises:
RuntimeError: If tag lookup or file query fails
Examples:
# Find all files with "vector-index" tag
files = await nc_client.find_files_by_tag("vector-index")
# Find only PDFs with the tag
pdfs = await nc_client.find_files_by_tag("vector-index", "application/pdf")
"""
# Look up tag by name using WebDAV
tag = await self.webdav.get_tag_by_name(tag_name)
if not tag:
logger.debug(f"Tag '{tag_name}' not found, returning empty list")
return []
# Get files with this tag (returns full file info from REPORT)
files = await self.webdav.get_files_by_tag(tag["id"])
if not files:
logger.debug(f"No files found with tag '{tag_name}'")
return []
logger.debug(f"Found {len(files)} files with tag '{tag_name}'")
# Apply MIME type filter if specified
if mime_type_filter:
filtered_files = [
f
for f in files
if f.get("content_type", "").startswith(mime_type_filter)
]
logger.info(
f"Returning {len(filtered_files)} files with tag '{tag_name}' (filtered by {mime_type_filter})"
)
return filtered_files
logger.info(f"Returning {len(files)} files with tag '{tag_name}'")
return files
def _get_webdav_base_path(self) -> str:
"""Helper to get the base WebDAV path for the authenticated user."""
return f"/remote.php/dav/files/{self.username}"
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit - closes all clients."""
await self.close()
return False # Don't suppress exceptions
async def close(self):
"""Close the HTTP client and CalDAV client."""
await self._client.aclose()
+347
View File
@@ -821,6 +821,20 @@ class WebDAVClient(BaseNextcloudClient):
item["file_id"] = int(value) if value else None
elif tag == "favorite":
item["is_favorite"] = value == "1"
elif tag == "tags":
# Tags can be comma-separated or have multiple child elements
if value:
# Handle comma-separated tags
item["tags"] = [
t.strip() for t in value.split(",") if t.strip()
]
else:
# Check for child tag elements (alternative format)
tag_elements = child.findall(".//{http://owncloud.org/ns}tag")
if tag_elements:
item["tags"] = [t.text for t in tag_elements if t.text]
else:
item["tags"] = []
elif tag == "permissions":
item["permissions"] = value
elif tag == "size":
@@ -948,3 +962,336 @@ class WebDAVClient(BaseNextcloudClient):
properties=properties,
limit=limit,
)
async def find_by_tag(
self, tag_name: str, scope: str = "", limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Find files by tag name.
DEPRECATED: Use NextcloudClient.find_files_by_tag() instead, which uses
the proper OCS Tags API rather than WebDAV SEARCH.
Args:
tag_name: Tag to filter by (e.g., "vector-index")
scope: Directory path to search in (empty string for user root)
limit: Maximum number of results to return
Returns:
List of files/directories with the specified tag
Examples:
# Find all files tagged with "vector-index"
results = await find_by_tag("vector-index")
# Find tagged files in a specific folder
results = await find_by_tag("vector-index", scope="Documents")
"""
# Use LIKE for tag matching since tags can be comma-separated
where_conditions = f"""
<d:like>
<d:prop>
<oc:tags/>
</d:prop>
<d:literal>%{tag_name}%</d:literal>
</d:like>
"""
# Request tag property along with standard properties
properties = [
"displayname",
"getcontentlength",
"getcontenttype",
"getlastmodified",
"resourcetype",
"getetag",
"fileid",
"tags",
]
return await self.search_files(
scope=scope,
where_conditions=where_conditions,
properties=properties,
limit=limit,
)
async def _get_file_info_by_id(self, file_id: int) -> Dict[str, Any]:
"""Get file information by Nextcloud file ID using WebDAV.
Args:
file_id: Nextcloud internal file ID
Returns:
File information dictionary with path, size, content_type, etc.
Raises:
HTTPStatusError: If file not found or request fails
"""
# Nextcloud allows accessing files by ID via special meta endpoint
meta_path = f"/remote.php/dav/meta/{file_id}/"
propfind_body = """<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
<d:prop>
<d:displayname/>
<d:getcontentlength/>
<d:getcontenttype/>
<d:getlastmodified/>
<d:resourcetype/>
<d:getetag/>
<oc:fileid/>
</d:prop>
</d:propfind>"""
headers = {"Depth": "0", "Content-Type": "text/xml", "OCS-APIRequest": "true"}
response = await self._make_request(
"PROPFIND", meta_path, content=propfind_body, headers=headers
)
response.raise_for_status()
# Parse the XML response
root = ET.fromstring(response.content)
responses = root.findall(".//{DAV:}response")
if not responses:
raise RuntimeError(f"File ID {file_id} not found")
response_elem = responses[0]
href = response_elem.find(".//{DAV:}href")
if href is None:
raise RuntimeError(f"No href in response for file ID {file_id}")
propstat = response_elem.find(".//{DAV:}propstat")
if propstat is None:
raise RuntimeError(f"No propstat for file ID {file_id}")
prop = propstat.find(".//{DAV:}prop")
if prop is None:
raise RuntimeError(f"No prop for file ID {file_id}")
# Extract file path from displayname or construct from file ID
displayname_elem = prop.find(".//{DAV:}displayname")
name = (
displayname_elem.text if displayname_elem is not None else f"file_{file_id}"
)
# Get file properties
size_elem = prop.find(".//{DAV:}getcontentlength")
size = int(size_elem.text) if size_elem is not None and size_elem.text else 0
content_type_elem = prop.find(".//{DAV:}getcontenttype")
content_type = content_type_elem.text if content_type_elem is not None else None
modified_elem = prop.find(".//{DAV:}getlastmodified")
modified = modified_elem.text if modified_elem is not None else None
etag_elem = prop.find(".//{DAV:}getetag")
etag = (
etag_elem.text.strip('"')
if etag_elem is not None and etag_elem.text
else None
)
# Check if it's a directory
resourcetype = prop.find(".//{DAV:}resourcetype")
is_directory = (
resourcetype is not None
and resourcetype.find(".//{DAV:}collection") is not None
)
# Try to get actual file path - meta endpoint doesn't give us the real path
# so we'll construct a reasonable path from the name
# The calling code in NextcloudClient will have the context to determine the actual path
file_info = {
"name": name,
"path": f"/{name}", # Placeholder - caller should use WebDAV to get real path if needed
"size": size,
"content_type": content_type,
"last_modified": modified,
"etag": etag,
"is_directory": is_directory,
"file_id": file_id,
}
logger.debug(f"Retrieved file info for ID {file_id}: {name}")
return file_info
async def get_tag_by_name(self, tag_name: str) -> dict[str, Any] | None:
"""Get a system tag by its name via WebDAV.
Args:
tag_name: Name of the tag to find (case-sensitive)
Returns:
Tag dictionary if found, None otherwise
"""
# Use WebDAV PROPFIND to list all systemtags
propfind_body = """<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
<d:prop>
<oc:id/>
<oc:display-name/>
<oc:user-visible/>
<oc:user-assignable/>
</d:prop>
</d:propfind>"""
response = await self._client.request(
"PROPFIND",
"/remote.php/dav/systemtags/",
headers={"Depth": "1"},
content=propfind_body,
)
response.raise_for_status()
# Parse XML response
root = ET.fromstring(response.content)
ns = {
"d": "DAV:",
"oc": "http://owncloud.org/ns",
}
for response_elem in root.findall("d:response", ns):
href = response_elem.find("d:href", ns)
if href is None or href.text == "/remote.php/dav/systemtags/":
# Skip the collection itself
continue
propstat = response_elem.find("d:propstat", ns)
if propstat is None:
continue
prop = propstat.find("d:prop", ns)
if prop is None:
continue
# Extract tag properties
tag_id_elem = prop.find("oc:id", ns)
display_name_elem = prop.find("oc:display-name", ns)
user_visible_elem = prop.find("oc:user-visible", ns)
user_assignable_elem = prop.find("oc:user-assignable", ns)
if display_name_elem is not None and display_name_elem.text == tag_name:
tag_info = {
"id": int(tag_id_elem.text) if tag_id_elem is not None else None,
"name": display_name_elem.text,
"userVisible": user_visible_elem.text.lower() == "true"
if user_visible_elem is not None
else True,
"userAssignable": user_assignable_elem.text.lower() == "true"
if user_assignable_elem is not None
else True,
}
logger.debug(f"Found tag '{tag_name}' with ID {tag_info['id']}")
return tag_info
logger.debug(f"Tag '{tag_name}' not found")
return None
async def get_files_by_tag(self, tag_id: int) -> list[dict[str, Any]]:
"""Get all files tagged with a specific system tag via WebDAV REPORT.
Args:
tag_id: Numeric ID of the tag
Returns:
List of file info dictionaries with path, size, content_type, etc.
"""
# Use WebDAV REPORT method with systemtag filter, requesting all properties
report_body = f"""<?xml version="1.0"?>
<oc:filter-files xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns">
<d:prop>
<oc:fileid/>
<d:displayname/>
<d:getcontentlength/>
<d:getcontenttype/>
<d:getlastmodified/>
<d:getetag/>
</d:prop>
<oc:filter-rules>
<oc:systemtag>{tag_id}</oc:systemtag>
</oc:filter-rules>
</oc:filter-files>"""
response = await self._client.request(
"REPORT",
f"{self._get_webdav_base_path()}/",
content=report_body,
)
response.raise_for_status()
# Parse XML response
root = ET.fromstring(response.content)
ns = {
"d": "DAV:",
"oc": "http://owncloud.org/ns",
}
files = []
for response_elem in root.findall("d:response", ns):
# Extract href (file path)
href_elem = response_elem.find("d:href", ns)
if href_elem is None or not href_elem.text:
continue
propstat = response_elem.find("d:propstat", ns)
if propstat is None:
continue
prop = propstat.find("d:prop", ns)
if prop is None:
continue
# Extract all properties
fileid_elem = prop.find("oc:fileid", ns)
displayname_elem = prop.find("d:displayname", ns)
contentlength_elem = prop.find("d:getcontentlength", ns)
contenttype_elem = prop.find("d:getcontenttype", ns)
lastmodified_elem = prop.find("d:getlastmodified", ns)
etag_elem = prop.find("d:getetag", ns)
if fileid_elem is None or not fileid_elem.text:
continue
# Decode href path and extract the file path
from urllib.parse import unquote
href_path = unquote(href_elem.text)
# Remove WebDAV prefix to get user-relative path
webdav_prefix = f"/remote.php/dav/files/{self.username}/"
file_path = href_path.replace(webdav_prefix, "/")
# Parse last modified timestamp
last_modified_timestamp = None
if lastmodified_elem is not None and lastmodified_elem.text:
from email.utils import parsedate_to_datetime
try:
dt = parsedate_to_datetime(lastmodified_elem.text)
last_modified_timestamp = int(dt.timestamp())
except Exception:
pass
file_info = {
"id": int(fileid_elem.text),
"path": file_path,
"name": displayname_elem.text
if displayname_elem is not None
else file_path.split("/")[-1],
"size": int(contentlength_elem.text)
if contentlength_elem is not None and contentlength_elem.text
else 0,
"content_type": contenttype_elem.text
if contenttype_elem is not None
else "",
"last_modified": lastmodified_elem.text
if lastmodified_elem is not None
else None,
"last_modified_timestamp": last_modified_timestamp,
"etag": etag_elem.text if etag_elem is not None else None,
}
files.append(file_info)
logger.debug(f"Found {len(files)} files with tag ID {tag_id}")
return files
+37
View File
@@ -2,8 +2,37 @@ import logging
import logging.config
import os
from dataclasses import dataclass
from enum import Enum
from typing import Any, Optional
class DeploymentMode(Enum):
"""Deployment mode for the MCP server.
SELF_HOSTED: Full features, environment-based configuration.
Supports vector sync, semantic search, admin UI.
SMITHERY_STATELESS: Stateless mode for Smithery hosting.
Session-based configuration, no persistent storage.
Excludes semantic search, vector sync, admin UI.
"""
SELF_HOSTED = "self_hosted"
SMITHERY_STATELESS = "smithery"
def get_deployment_mode() -> DeploymentMode:
"""Detect deployment mode from environment.
Returns:
DeploymentMode.SMITHERY_STATELESS if SMITHERY_DEPLOYMENT=true,
otherwise DeploymentMode.SELF_HOSTED (default).
"""
if os.getenv("SMITHERY_DEPLOYMENT", "false").lower() == "true":
return DeploymentMode.SMITHERY_STATELESS
return DeploymentMode.SELF_HOSTED
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
@@ -102,6 +131,14 @@ def get_document_processor_config() -> dict[str, Any]:
"lang": os.getenv("TESSERACT_LANG", "eng"),
}
# PyMuPDF configuration (local PDF processing)
if os.getenv("ENABLE_PYMUPDF", "true").lower() == "true": # Enabled by default
config["processors"]["pymupdf"] = {
"extract_images": os.getenv("PYMUPDF_EXTRACT_IMAGES", "true").lower()
== "true",
"image_dir": os.getenv("PYMUPDF_IMAGE_DIR"), # None = use temp directory
}
# Custom processor (via HTTP API)
if os.getenv("ENABLE_CUSTOM_PROCESSOR", "false").lower() == "true":
custom_url = os.getenv("CUSTOM_PROCESSOR_URL")
+110 -8
View File
@@ -1,21 +1,37 @@
"""Helper functions for accessing context in MCP tools."""
import logging
from httpx import BasicAuth
from mcp.server.fastmcp import Context
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.config import (
DeploymentMode,
get_deployment_mode,
get_settings,
)
logger = logging.getLogger(__name__)
async def get_client(ctx: Context) -> NextcloudClient:
"""
Get the appropriate Nextcloud client based on authentication mode.
ADR-005 compliant implementation supporting two modes:
1. BasicAuth mode: Returns shared client from lifespan context
2. Multi-audience mode (ENABLE_TOKEN_EXCHANGE=false, default):
Token already contains both MCP and Nextcloud audiences - use directly
3. Token exchange mode (ENABLE_TOKEN_EXCHANGE=true):
Exchange MCP token for Nextcloud token via RFC 8693
ADR-016 compliant implementation supporting three deployment modes:
1. Smithery stateless mode (SMITHERY_DEPLOYMENT=true):
Create client from session configuration (nextcloud_url, username, app_password)
No persistent state - client created per-request from Smithery session config.
2. BasicAuth mode: Returns shared client from lifespan context
3. OAuth mode:
a. Multi-audience mode (ENABLE_TOKEN_EXCHANGE=false, default):
Token already contains both MCP and Nextcloud audiences - use directly
b. Token exchange mode (ENABLE_TOKEN_EXCHANGE=true):
Exchange MCP token for Nextcloud token via RFC 8693
SECURITY: Token passthrough has been REMOVED. All OAuth modes validate
proper token audiences per MCP Security Best Practices specification.
@@ -24,7 +40,7 @@ async def get_client(ctx: Context) -> NextcloudClient:
by the MCP server via @require_scopes decorator, not by the IdP.
This function automatically detects the authentication mode by checking
the type of the lifespan context.
the deployment mode and type of the lifespan context.
Args:
ctx: MCP request context
@@ -34,6 +50,7 @@ async def get_client(ctx: Context) -> NextcloudClient:
Raises:
AttributeError: If context doesn't contain expected data
ValueError: If Smithery mode but session config is missing required fields
Example:
```python
@@ -43,6 +60,12 @@ async def get_client(ctx: Context) -> NextcloudClient:
return await client.capabilities()
```
"""
deployment_mode = get_deployment_mode()
# ADR-016: Smithery stateless mode - create client from session config
if deployment_mode == DeploymentMode.SMITHERY_STATELESS:
return _get_client_from_session_config(ctx)
settings = get_settings()
lifespan_ctx = ctx.request_context.lifespan_context
@@ -75,3 +98,82 @@ async def get_client(ctx: Context) -> NextcloudClient:
f"Lifespan context does not have 'client' or 'nextcloud_host' attribute. "
f"Type: {type(lifespan_ctx)}"
)
def _get_client_from_session_config(ctx: Context) -> NextcloudClient:
"""
Create NextcloudClient from Smithery session configuration.
ADR-016: In Smithery stateless mode, each request includes session config
with the user's Nextcloud credentials. This function creates a fresh client
for each request - no state is persisted between requests.
For container runtime, config is extracted from URL query parameters by
SmitheryConfigMiddleware and stored in a context variable.
Expected session config fields (from Smithery configSchema):
- nextcloud_url: str - Nextcloud instance URL (required)
- username: str - Nextcloud username (required)
- app_password: str - Nextcloud app password (required)
Args:
ctx: MCP request context (not used directly for Smithery config)
Returns:
NextcloudClient configured with session credentials
Raises:
ValueError: If required session config fields are missing
"""
# ADR-016: Get session config from context variable (set by SmitheryConfigMiddleware)
from nextcloud_mcp_server.app import get_smithery_session_config
session_config = get_smithery_session_config()
if session_config is None:
raise ValueError(
"Session configuration required in Smithery mode. "
"Ensure nextcloud_url, username, and app_password are provided as URL query parameters."
)
# Extract required fields - config is always a dict from SmitheryConfigMiddleware
nextcloud_url = session_config.get("nextcloud_url")
username = session_config.get("username")
app_password = session_config.get("app_password")
# Validate required fields
missing_fields = []
if not nextcloud_url:
missing_fields.append("nextcloud_url")
if not username:
missing_fields.append("username")
if not app_password:
missing_fields.append("app_password")
if missing_fields:
raise ValueError(
f"Missing required session config fields: {', '.join(missing_fields)}. "
f"Configure these in the Smithery connection settings."
)
# Type assertions after validation (for type checker)
# These are guaranteed to be str after the missing_fields check above
assert nextcloud_url is not None
assert username is not None
assert app_password is not None
# Validate URL format
if not nextcloud_url.startswith(("http://", "https://")):
raise ValueError(
f"Invalid nextcloud_url: {nextcloud_url}. "
f"Must start with http:// or https://"
)
logger.debug(f"Creating Smithery client for {nextcloud_url} as {username}")
# Create client with session credentials using BasicAuth
return NextcloudClient(
base_url=nextcloud_url,
username=username,
auth=BasicAuth(username, app_password),
)
@@ -1,12 +1,18 @@
"""Document processing plugins for extracting text from various file formats."""
from .base import DocumentProcessor, ProcessingResult, ProcessorError
from .pymupdf import PyMuPDFProcessor
from .registry import ProcessorRegistry, get_registry
# Register processors at module initialization
_registry = get_registry()
_registry.register(PyMuPDFProcessor(), priority=10)
__all__ = [
"DocumentProcessor",
"ProcessingResult",
"ProcessorError",
"ProcessorRegistry",
"get_registry",
"PyMuPDFProcessor",
]
@@ -0,0 +1,253 @@
"""Document processor using PyMuPDF (fitz) library."""
import logging
import pathlib
import tempfile
from collections.abc import Awaitable, Callable
from typing import Any, Optional
# NOTE: Do NOT call pymupdf.layout.activate() here!
# It changes the behavior of pymupdf4llm.to_markdown() when page_chunks=True,
# causing it to return a string instead of a list[dict].
# See: https://github.com/pymupdf/pymupdf4llm/issues/323
import pymupdf
import pymupdf4llm
from .base import DocumentProcessor, ProcessingResult, ProcessorError
logger = logging.getLogger(__name__)
class PyMuPDFProcessor(DocumentProcessor):
"""Document processor using PyMuPDF library for PDF processing.
PyMuPDF (fitz) is a fast, local PDF processing library that extracts text,
metadata, and images without requiring external API calls.
Features:
- Fast text extraction with layout preservation
- PDF metadata extraction (title, author, creation date, page count)
- Image extraction for future multimodal support
- Page number tracking for precise citations
"""
SUPPORTED_TYPES = {
"application/pdf",
}
def __init__(
self,
extract_images: bool = True,
image_dir: Optional[str | pathlib.Path] = None,
):
"""Initialize PyMuPDF processor.
Args:
extract_images: Whether to extract embedded images from PDFs
image_dir: Directory to store extracted images (defaults to temp directory)
"""
self.extract_images = extract_images
if image_dir is None:
self.image_dir = pathlib.Path(tempfile.gettempdir()) / "pdf-images"
else:
self.image_dir = pathlib.Path(image_dir)
# Create image directory if it doesn't exist
if self.extract_images:
self.image_dir.mkdir(exist_ok=True, parents=True)
logger.info(
f"Initialized PyMuPDFProcessor with image extraction to {self.image_dir}"
)
else:
logger.info("Initialized PyMuPDFProcessor without image extraction")
@property
def name(self) -> str:
return "pymupdf"
@property
def supported_mime_types(self) -> set[str]:
return self.SUPPORTED_TYPES
async def process(
self,
content: bytes,
content_type: str,
filename: Optional[str] = None,
options: Optional[dict[str, Any]] = None,
progress_callback: Optional[
Callable[[float, Optional[float], Optional[str]], Awaitable[None]]
] = None,
) -> ProcessingResult:
"""Process a PDF document and extract text, metadata, and images.
Args:
content: PDF document bytes
content_type: MIME type (should be application/pdf)
filename: Optional filename for better error messages
options: Processing options (currently unused)
progress_callback: Optional callback for progress updates
Returns:
ProcessingResult with extracted text and metadata
Raises:
ProcessorError: If PDF processing fails
"""
import anyio
try:
if progress_callback:
await progress_callback(0, 100, "Opening PDF document")
# Open document and extract metadata in thread
doc = await anyio.to_thread.run_sync( # type: ignore[attr-defined]
lambda: pymupdf.open("pdf", content)
)
metadata = self._extract_metadata(doc, filename)
metadata["file_size"] = len(content)
page_count = doc.page_count
if progress_callback:
await progress_callback(10, 100, f"Extracting {page_count} pages")
# Prepare image directory if needed
pdf_image_dir = None
if self.extract_images:
pdf_id = filename.replace("/", "_") if filename else "unknown"
pdf_image_dir = self.image_dir / pdf_id
pdf_image_dir.mkdir(exist_ok=True, parents=True)
# Extract all pages in a single call with page_chunks=True
def do_extract() -> list[dict[str, Any]]:
# When page_chunks=True, to_markdown returns list[dict] not str
return pymupdf4llm.to_markdown( # type: ignore[return-value]
doc,
write_images=self.extract_images,
image_path=pdf_image_dir if self.extract_images else None,
page_chunks=True,
)
page_chunks: list[dict[str, Any]] = await anyio.to_thread.run_sync( # type: ignore[attr-defined]
do_extract
)
if progress_callback:
await progress_callback(90, 100, "Building result")
# Extract page texts and build boundaries from chunks
page_texts: list[str] = []
page_boundaries: list[dict[str, Any]] = []
current_offset = 0
for chunk in page_chunks:
text = chunk.get("text", "")
page_num = chunk.get("metadata", {}).get("page", len(page_texts) + 1)
page_texts.append(text)
page_boundaries.append(
{
"page": page_num,
"start_offset": current_offset,
"end_offset": current_offset + len(text),
}
)
current_offset += len(text)
# Collect image paths
image_paths = []
if pdf_image_dir and pdf_image_dir.exists():
image_paths = [str(p) for p in pdf_image_dir.glob("*")]
# Build final text and metadata
md_text = "".join(page_texts)
metadata["has_images"] = len(image_paths) > 0
if image_paths:
metadata["image_count"] = len(image_paths)
metadata["image_paths"] = image_paths
metadata["page_boundaries"] = page_boundaries
# Close document
doc.close()
if progress_callback:
await progress_callback(100, 100, "Processing complete")
logger.info(
f"Successfully processed PDF {filename or '<bytes>'}: "
f"{metadata['page_count']} pages, {len(md_text)} chars, "
f"{metadata.get('image_count', 0)} images"
)
return ProcessingResult(
text=md_text,
metadata=metadata,
processor=self.name,
success=True,
)
except Exception as e:
error_msg = f"Failed to process PDF {filename or '<bytes>'}: {e}"
logger.error(error_msg, exc_info=True)
raise ProcessorError(error_msg) from e
def _extract_metadata(
self, doc: pymupdf.Document, filename: Optional[str]
) -> dict[str, Any]:
"""Extract metadata from PDF document.
Args:
doc: Opened PyMuPDF document
filename: Optional filename
Returns:
Dictionary with PDF metadata
"""
metadata: dict[str, Any] = {}
# Basic document info
metadata["page_count"] = doc.page_count
metadata["format"] = "PDF 1." + str(
doc.pdf_version() if hasattr(doc, "pdf_version") else "?" # type: ignore[call-non-callable]
)
if filename:
metadata["filename"] = filename
# Extract PDF metadata dictionary
pdf_metadata = doc.metadata
if pdf_metadata:
# Standard PDF metadata fields
if pdf_metadata.get("title"):
metadata["title"] = pdf_metadata["title"]
if pdf_metadata.get("author"):
metadata["author"] = pdf_metadata["author"]
if pdf_metadata.get("subject"):
metadata["subject"] = pdf_metadata["subject"]
if pdf_metadata.get("keywords"):
metadata["keywords"] = pdf_metadata["keywords"]
if pdf_metadata.get("creator"):
metadata["creator"] = pdf_metadata["creator"]
if pdf_metadata.get("producer"):
metadata["producer"] = pdf_metadata["producer"]
if pdf_metadata.get("creationDate"):
metadata["creation_date"] = pdf_metadata["creationDate"]
if pdf_metadata.get("modDate"):
metadata["modification_date"] = pdf_metadata["modDate"]
return metadata
async def health_check(self) -> bool:
"""Check if PyMuPDF is available and working.
Returns:
True if processor is ready to use
"""
try:
# Try to create a simple PDF in memory
test_doc = pymupdf.open()
test_doc.close()
return True
except Exception as e:
logger.error(f"PyMuPDF health check failed: {e}")
return False
@@ -37,7 +37,9 @@ class BM25SparseEmbeddingProvider:
def encode(self, text: str) -> dict[str, Any]:
"""
Generate BM25 sparse embedding for a single text.
Generate BM25 sparse embedding for a single text (synchronous).
Note: For async contexts, prefer encode_async() to avoid blocking the event loop.
Args:
text: Input text to encode
@@ -53,7 +55,24 @@ class BM25SparseEmbeddingProvider:
"values": sparse_embedding.values.tolist(),
}
def encode_batch(self, texts: list[str]) -> list[dict[str, Any]]:
async def encode_async(self, text: str) -> dict[str, Any]:
"""
Generate BM25 sparse embedding for a single text (async).
Runs CPU-bound BM25 encoding in thread pool to avoid blocking the event loop.
Args:
text: Input text to encode
Returns:
Dictionary with 'indices' and 'values' keys for Qdrant sparse vector
"""
import anyio
# Run CPU-bound BM25 encoding in thread pool
return await anyio.to_thread.run_sync(lambda: self.encode(text)) # type: ignore[attr-defined]
async def encode_batch(self, texts: list[str]) -> list[dict[str, Any]]:
"""
Generate BM25 sparse embeddings for multiple texts (batched).
@@ -63,7 +82,12 @@ class BM25SparseEmbeddingProvider:
Returns:
List of dictionaries with 'indices' and 'values' for each text
"""
sparse_embeddings = list(self.model.embed(texts))
import anyio
# Run CPU-bound BM25 encoding in thread pool to avoid blocking event loop
sparse_embeddings = await anyio.to_thread.run_sync( # type: ignore[attr-defined]
lambda: list(self.model.embed(texts))
)
return [
{
+24 -1
View File
@@ -10,7 +10,7 @@ from .base import BaseResponse
class SemanticSearchResult(BaseModel):
"""Model for semantic search results with additional metadata."""
id: int = Field(description="Document ID")
id: int = Field(description="Document ID (int for all document types)")
doc_type: str = Field(
description="Document type (note, calendar_event, deck_card, etc.)"
)
@@ -35,6 +35,29 @@ class SemanticSearchResult(BaseModel):
chunk_end_offset: Optional[int] = Field(
default=None, description="Character position where chunk ends in document"
)
page_number: Optional[int] = Field(
default=None, description="Page number for PDF documents"
)
# Context expansion fields (optional, populated when include_context=True)
has_context_expansion: bool = Field(
default=False, description="Whether context expansion was performed"
)
marked_text: Optional[str] = Field(
default=None,
description="Full text with position markers around matched chunk",
)
before_context: Optional[str] = Field(
default=None, description="Text before the matched chunk"
)
after_context: Optional[str] = Field(
default=None, description="Text after the matched chunk"
)
has_before_truncation: Optional[bool] = Field(
default=None, description="Whether before_context was truncated"
)
has_after_truncation: Optional[bool] = Field(
default=None, description="Whether after_context was truncated"
)
class SemanticSearchResponse(BaseResponse):
@@ -37,7 +37,7 @@ class HealthCheckFilter(logging.Filter):
"""
# Check if the log message contains health check endpoints
message = record.getMessage()
return not any(
health_check = any(
endpoint in message
for endpoint in [
"/health/live",
@@ -47,6 +47,8 @@ class HealthCheckFilter(logging.Filter):
]
)
return not health_check
class TraceContextFormatter(JsonFormatter):
"""
+21 -8
View File
@@ -92,14 +92,21 @@ class OllamaProvider(Provider):
response.raise_for_status()
return response.json()["embedding"]
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
async def embed_batch(
self, texts: list[str], batch_size: int = 32
) -> list[list[float]]:
"""
Generate embeddings for multiple texts (batched requests).
Generate embeddings for multiple texts using Ollama's batch API.
Note: Ollama doesn't have native batch API, so we send requests sequentially.
Uses /api/embed endpoint with array input for efficient batch processing.
Conservative batch size (32) prevents quality degradation observed in
Ollama issue #6262 with larger batches.
Note: Ollama processes batches serially, not in parallel.
Args:
texts: List of texts to embed
batch_size: Maximum texts per batch (default: 32)
Returns:
List of vector embeddings
@@ -112,11 +119,17 @@ class OllamaProvider(Provider):
"Embedding not supported - no embedding_model configured"
)
embeddings = []
for text in texts:
embedding = await self.embed(text)
embeddings.append(embedding)
return embeddings
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size]
response = await self.client.post(
f"{self.base_url}/api/embed",
json={"model": self.embedding_model, "input": batch},
)
response.raise_for_status()
all_embeddings.extend(response.json()["embeddings"])
return all_embeddings
async def _detect_dimension(self):
"""
+21 -2
View File
@@ -83,6 +83,7 @@ async def get_indexed_doc_types(user_id: str) -> set[str]:
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
logger = logging.getLogger(__name__)
@@ -97,7 +98,10 @@ async def get_indexed_doc_types(user_id: str) -> set[str]:
scroll_results, _next_offset = await qdrant_client.scroll(
collection_name=collection,
scroll_filter=Filter(
must=[FieldCondition(key="user_id", match=MatchValue(value=user_id))]
must=[
get_placeholder_filter(), # Exclude placeholders from doc_type discovery
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
]
),
limit=1000, # Sample size to discover types
with_payload=["doc_type"],
@@ -123,7 +127,7 @@ class SearchResult:
"""A single search result with metadata and score.
Attributes:
id: Document ID
id: Document ID (int for all document types)
doc_type: Document type (note, file, calendar, contact, etc.)
title: Document title
excerpt: Content excerpt showing match context
@@ -133,6 +137,10 @@ class SearchResult:
metadata: Additional algorithm-specific metadata
chunk_start_offset: Character position where chunk starts (None if not available)
chunk_end_offset: Character position where chunk ends (None if not available)
page_number: Page number for PDF documents (None for other doc types)
chunk_index: Zero-based index of this chunk in the document
total_chunks: Total number of chunks in the document
point_id: Qdrant point ID for batch vector retrieval (None if not from Qdrant)
"""
id: int
@@ -143,6 +151,10 @@ class SearchResult:
metadata: dict[str, Any] | None = None
chunk_start_offset: int | None = None
chunk_end_offset: int | None = None
page_number: int | None = None
chunk_index: int = 0
total_chunks: int = 1
point_id: str | None = None
def __post_init__(self):
"""Validate score is non-negative.
@@ -162,8 +174,15 @@ class SearchAlgorithm(ABC):
All search algorithms must implement the search() method with consistent
interface, allowing them to be used interchangeably.
Attributes:
query_embedding: The query embedding generated during the last search.
Available after search() completes for algorithms that use embeddings.
Can be reused by callers to avoid redundant embedding generation.
"""
query_embedding: list[float] | None = None
@abstractmethod
async def search(
self,
+24 -9
View File
@@ -10,6 +10,7 @@ from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.embedding import get_bm25_service, get_embedding_service
from nextcloud_mcp_server.observability.metrics import record_qdrant_operation
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
logger = logging.getLogger(__name__)
@@ -72,6 +73,9 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
Returns unverified results from Qdrant. Access verification should be
performed separately at the final output stage using verify_search_results().
Deduplicates by (doc_id, doc_type, chunk_start_offset, chunk_end_offset)
to show multiple chunks from the same document while avoiding duplicate chunks.
Args:
query: Natural language or keyword search query
user_id: User ID for filtering
@@ -97,11 +101,13 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
# Generate dense embedding for semantic search
embedding_service = get_embedding_service()
dense_embedding = await embedding_service.embed(query)
# Store for reuse by callers (e.g., viz_routes PCA visualization)
self.query_embedding = dense_embedding
logger.debug(f"Generated dense embedding (dimension={len(dense_embedding)})")
# Generate sparse embedding for BM25 keyword search
bm25_service = get_bm25_service()
sparse_embedding = bm25_service.encode(query)
sparse_embedding = await bm25_service.encode_async(query)
logger.debug(
f"Generated sparse embedding "
f"({len(sparse_embedding['indices'])} non-zero terms)"
@@ -109,10 +115,11 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
# Build Qdrant filter
filter_conditions = [
get_placeholder_filter(), # Always exclude placeholders from user-facing queries
FieldCondition(
key="user_id",
match=MatchValue(value=user_id),
)
),
]
# Add doc_type filter if specified
@@ -176,20 +183,24 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
f"Top 3 {self.fusion_name.upper()} fusion scores: {top_scores}"
)
# Deduplicate by (doc_id, doc_type) - multiple chunks per document
seen_docs = set()
# Deduplicate by (doc_id, doc_type, chunk_start, chunk_end)
# This allows multiple chunks from same doc, but removes duplicate chunks
seen_chunks = set()
results = []
for result in search_response.points:
doc_id = int(result.payload["doc_id"])
# doc_id can be int (notes) or str (files - file paths)
doc_id = result.payload["doc_id"]
doc_type = result.payload.get("doc_type", "note")
doc_key = (doc_id, doc_type)
chunk_start = result.payload.get("chunk_start_offset")
chunk_end = result.payload.get("chunk_end_offset")
chunk_key = (doc_id, doc_type, chunk_start, chunk_end)
# Skip if we've already seen this document
if doc_key in seen_docs:
# Skip if we've already seen this exact chunk
if chunk_key in seen_chunks:
continue
seen_docs.add(doc_key)
seen_chunks.add(chunk_key)
# Return unverified results (verification happens at output stage)
results.append(
@@ -206,6 +217,10 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
},
chunk_start_offset=result.payload.get("chunk_start_offset"),
chunk_end_offset=result.payload.get("chunk_end_offset"),
page_number=result.payload.get("page_number"),
chunk_index=result.payload.get("chunk_index", 0),
total_chunks=result.payload.get("total_chunks", 1),
point_id=str(result.id), # Qdrant point ID for batch retrieval
)
)
+598
View File
@@ -0,0 +1,598 @@
"""Context expansion for search results.
Provides utilities to expand matched chunks with surrounding context and
position markers for better visualization and understanding of search results.
"""
import logging
from dataclasses import dataclass
from nextcloud_mcp_server.client import NextcloudClient
logger = logging.getLogger(__name__)
async def _get_chunk_from_qdrant(
user_id: str, doc_id: int, doc_type: str, chunk_start: int, chunk_end: int
) -> str | None:
"""Retrieve full chunk text from Qdrant payload.
This avoids re-fetching and re-parsing documents by using the cached
chunk content already stored in Qdrant.
Args:
user_id: User ID who owns the document
doc_id: Document ID
doc_type: Document type (e.g., "note", "file")
chunk_start: Character offset where chunk starts
chunk_end: Character offset where chunk ends
Returns:
Full chunk text from Qdrant excerpt field, or None if not found
"""
try:
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
qdrant_client = await get_qdrant_client()
settings = get_settings()
# Query for the specific chunk
scroll_result = await qdrant_client.scroll(
collection_name=settings.get_collection_name(),
scroll_filter=Filter(
must=[
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
FieldCondition(key="doc_id", match=MatchValue(value=doc_id)),
FieldCondition(key="doc_type", match=MatchValue(value=doc_type)),
FieldCondition(
key="chunk_start_offset", match=MatchValue(value=chunk_start)
),
FieldCondition(
key="chunk_end_offset", match=MatchValue(value=chunk_end)
),
]
),
limit=1,
with_payload=["excerpt"],
with_vectors=False,
)
if scroll_result[0]:
point = scroll_result[0][0]
excerpt = point.payload.get("excerpt")
if excerpt:
logger.debug(
f"Retrieved chunk from Qdrant for {doc_type} {doc_id}: "
f"{len(excerpt)} chars"
)
return str(excerpt)
logger.debug(
f"Chunk not found in Qdrant for {doc_type} {doc_id}, "
f"chunk [{chunk_start}:{chunk_end}]. Will fall back to document fetch."
)
return None
except Exception as e:
logger.error(
f"Error querying Qdrant for chunk: {e}. Falling back to document fetch.",
exc_info=True,
)
return None
async def _get_chunk_by_index_from_qdrant(
user_id: str, doc_id: int, doc_type: str, chunk_index: int
) -> str | None:
"""Retrieve chunk text by chunk_index from Qdrant payload.
Used to fetch adjacent chunks for context expansion.
Args:
user_id: User ID who owns the document
doc_id: Document ID
doc_type: Document type (e.g., "note", "file")
chunk_index: Zero-based chunk index in document
Returns:
Full chunk text from Qdrant excerpt field, or None if not found
"""
try:
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
qdrant_client = await get_qdrant_client()
settings = get_settings()
# Query for chunk by index
scroll_result = await qdrant_client.scroll(
collection_name=settings.get_collection_name(),
scroll_filter=Filter(
must=[
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
FieldCondition(key="doc_id", match=MatchValue(value=doc_id)),
FieldCondition(key="doc_type", match=MatchValue(value=doc_type)),
FieldCondition(
key="chunk_index", match=MatchValue(value=chunk_index)
),
]
),
limit=1,
with_payload=["excerpt"],
with_vectors=False,
)
if scroll_result[0]:
point = scroll_result[0][0]
excerpt = point.payload.get("excerpt")
if excerpt:
logger.debug(
f"Retrieved adjacent chunk {chunk_index} from Qdrant for "
f"{doc_type} {doc_id}: {len(excerpt)} chars"
)
return str(excerpt)
return None
except Exception as e:
logger.debug(
f"Could not retrieve adjacent chunk {chunk_index} for "
f"{doc_type} {doc_id}: {e}"
)
return None
async def _get_file_path_from_qdrant(
user_id: str, file_id: int, chunk_start: int, chunk_end: int
) -> str | None:
"""Resolve file_id to file_path by querying Qdrant payload.
Args:
user_id: User ID who owns the file
file_id: Numeric file ID
chunk_start: Character offset where chunk starts
chunk_end: Character offset where chunk ends
Returns:
File path string, or None if not found in Qdrant
"""
try:
from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
qdrant_client = await get_qdrant_client()
settings = get_settings()
# Query for the specific chunk
scroll_result = await qdrant_client.scroll(
collection_name=settings.get_collection_name(),
scroll_filter=Filter(
must=[
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
FieldCondition(key="doc_id", match=MatchValue(value=file_id)),
FieldCondition(key="doc_type", match=MatchValue(value="file")),
FieldCondition(
key="chunk_start_offset", match=MatchValue(value=chunk_start)
),
FieldCondition(
key="chunk_end_offset", match=MatchValue(value=chunk_end)
),
]
),
limit=1,
with_payload=["file_path"],
with_vectors=False,
)
if scroll_result[0]:
point = scroll_result[0][0]
file_path = point.payload.get("file_path")
if file_path:
logger.debug(f"Resolved file_id {file_id} to file_path {file_path}")
return str(file_path)
logger.warning(
f"Could not find file_path in Qdrant for file_id {file_id}, "
f"chunk [{chunk_start}:{chunk_end}]"
)
return None
except Exception as e:
logger.error(f"Error querying Qdrant for file_path: {e}", exc_info=True)
return None
@dataclass
class ChunkContext:
"""Expanded chunk with surrounding context and position markers.
Attributes:
chunk_text: The matched chunk text
before_context: Text before the chunk (up to context_chars)
after_context: Text after the chunk (up to context_chars)
chunk_start_offset: Character position where chunk starts in document
chunk_end_offset: Character position where chunk ends in document
page_number: Page number for PDFs (None for other doc types)
chunk_index: Zero-based chunk index (N in "chunk N of M")
total_chunks: Total number of chunks in document
marked_text: Full text with position markers around the chunk
has_before_truncation: True if before_context was truncated
has_after_truncation: True if after_context was truncated
"""
chunk_text: str
before_context: str
after_context: str
chunk_start_offset: int
chunk_end_offset: int
page_number: int | None
chunk_index: int
total_chunks: int
marked_text: str
has_before_truncation: bool
has_after_truncation: bool
async def get_chunk_with_context(
nc_client: NextcloudClient,
user_id: str,
doc_id: str | int,
doc_type: str,
chunk_start: int,
chunk_end: int,
page_number: int | None = None,
chunk_index: int = 0,
total_chunks: int = 1,
context_chars: int = 300,
) -> ChunkContext | None:
"""Fetch chunk with surrounding context.
First tries to retrieve the chunk from Qdrant (fast, cached). If that fails
(e.g., legacy data with truncated excerpts), falls back to fetching and
parsing the full document (slower, for PDFs especially).
Args:
nc_client: Authenticated Nextcloud client
user_id: User ID who owns the document
doc_id: Document ID (int for notes/files)
doc_type: Type of document ("note", "file", etc.)
chunk_start: Character offset where chunk starts
chunk_end: Character offset where chunk ends
page_number: Optional page number for PDFs
chunk_index: Zero-based chunk index in document
total_chunks: Total number of chunks in document
context_chars: Number of characters to include before/after chunk
Returns:
ChunkContext with expanded context and markers, or None if document
cannot be retrieved
"""
# Convert doc_id to int for Qdrant query
doc_id_int = (
int(doc_id)
if isinstance(doc_id, str) and doc_id.isdigit()
else (doc_id if isinstance(doc_id, int) else None)
)
# Try to get chunk from Qdrant first (fast path)
if doc_id_int is not None:
chunk_text = await _get_chunk_from_qdrant(
user_id, doc_id_int, doc_type, chunk_start, chunk_end
)
if chunk_text:
logger.info(
f"Retrieved chunk from Qdrant cache for {doc_type} {doc_id} "
f"(avoids document re-fetch/re-parse)"
)
# Fetch adjacent chunks for context expansion
# Get chunk overlap from config to remove duplicate text
from nextcloud_mcp_server.config import get_settings
settings = get_settings()
chunk_overlap = settings.document_chunk_overlap
before_context = ""
after_context = ""
has_before_truncation = False
has_after_truncation = False
# Fetch previous chunk if not first chunk
if chunk_index > 0:
before_chunk = await _get_chunk_by_index_from_qdrant(
user_id, doc_id_int, doc_type, chunk_index - 1
)
if before_chunk:
# Remove overlap: the last chunk_overlap chars of previous chunk
# overlap with the first chunk_overlap chars of current chunk
before_context = (
before_chunk[:-chunk_overlap]
if len(before_chunk) > chunk_overlap
else ""
)
# Truncate if requested context_chars < remaining length
if before_context and len(before_context) > context_chars:
before_context = before_context[-context_chars:]
has_before_truncation = True
else:
# Could not fetch previous chunk, but we're not at start
has_before_truncation = True
# Fetch next chunk if not last chunk
if chunk_index < total_chunks - 1:
after_chunk = await _get_chunk_by_index_from_qdrant(
user_id, doc_id_int, doc_type, chunk_index + 1
)
if after_chunk:
# Remove overlap: the first chunk_overlap chars of next chunk
# overlap with the last chunk_overlap chars of current chunk
after_context = (
after_chunk[chunk_overlap:]
if len(after_chunk) > chunk_overlap
else ""
)
# Truncate if requested context_chars < remaining length
if after_context and len(after_context) > context_chars:
after_context = after_context[:context_chars]
has_after_truncation = True
else:
# Could not fetch next chunk, but we're not at end
has_after_truncation = True
marked_text = _insert_position_markers(
before_context=before_context,
chunk_text=chunk_text,
after_context=after_context,
page_number=page_number,
chunk_index=chunk_index,
total_chunks=total_chunks,
has_before_truncation=has_before_truncation,
has_after_truncation=has_after_truncation,
)
return ChunkContext(
chunk_text=chunk_text,
before_context=before_context,
after_context=after_context,
chunk_start_offset=chunk_start,
chunk_end_offset=chunk_end,
page_number=page_number,
chunk_index=chunk_index,
total_chunks=total_chunks,
marked_text=marked_text,
has_before_truncation=has_before_truncation,
has_after_truncation=has_after_truncation,
)
# Fallback: Fetch full document and extract chunk with context
# This path is taken for:
# 1. Legacy data with truncated excerpts in Qdrant
# 2. Failed Qdrant queries
logger.info(
f"Falling back to document fetch for {doc_type} {doc_id} "
f"(Qdrant cache miss, possibly legacy data)"
)
# For files, retrieve file_path from Qdrant payload
resolved_doc_id = doc_id
if doc_type == "file" and isinstance(doc_id, int):
file_path = await _get_file_path_from_qdrant(
user_id, doc_id, chunk_start, chunk_end
)
if not file_path:
logger.warning(
f"Could not resolve file_id {doc_id} to file_path from Qdrant"
)
return None
resolved_doc_id = file_path
logger.debug(f"Resolved file_id {doc_id} to file_path {file_path}")
# Fetch full document text
full_text = await _fetch_document_text(nc_client, resolved_doc_id, doc_type)
if full_text is None:
logger.warning(
f"Could not fetch document text for {doc_type} {doc_id}, "
"skipping context expansion"
)
return None
# Validate offsets
if chunk_start < 0 or chunk_end > len(full_text) or chunk_start >= chunk_end:
logger.warning(
f"Invalid chunk offsets for {doc_type} {doc_id}: "
f"start={chunk_start}, end={chunk_end}, doc_len={len(full_text)}"
)
return None
# Extract chunk text
chunk_text = full_text[chunk_start:chunk_end]
# Calculate context boundaries
context_start = max(0, chunk_start - context_chars)
context_end = min(len(full_text), chunk_end + context_chars)
# Extract context
before_context = full_text[context_start:chunk_start]
after_context = full_text[chunk_end:context_end]
# Check for truncation
has_before_truncation = context_start > 0
has_after_truncation = context_end < len(full_text)
# Create marked text with position markers
marked_text = _insert_position_markers(
before_context=before_context,
chunk_text=chunk_text,
after_context=after_context,
page_number=page_number,
chunk_index=chunk_index,
total_chunks=total_chunks,
has_before_truncation=has_before_truncation,
has_after_truncation=has_after_truncation,
)
return ChunkContext(
chunk_text=chunk_text,
before_context=before_context,
after_context=after_context,
chunk_start_offset=chunk_start,
chunk_end_offset=chunk_end,
page_number=page_number,
chunk_index=chunk_index,
total_chunks=total_chunks,
marked_text=marked_text,
has_before_truncation=has_before_truncation,
has_after_truncation=has_after_truncation,
)
async def _fetch_document_text(
nc_client: NextcloudClient, doc_id: str | int, doc_type: str
) -> str | None:
"""Fetch full text content of a document.
Args:
nc_client: Authenticated Nextcloud client
doc_id: Document ID (note ID or file path)
doc_type: Type of document ("note", "file", etc.)
Returns:
Full document text, or None if document cannot be retrieved
"""
try:
if doc_type == "note":
# Fetch note by ID
note = await nc_client.notes.get_note(note_id=int(doc_id))
# Reconstruct full content as indexed: title + "\n\n" + content
# This ensures chunk offsets align with indexed content structure
title = note.get("title", "")
content = note.get("content", "")
return f"{title}\n\n{content}"
elif doc_type == "file":
# Fetch file content via WebDAV
try:
file_path = str(doc_id)
file_content, content_type = await nc_client.webdav.read_file(file_path)
# Check if it's a PDF (by content type or file extension)
is_pdf = (
content_type and "pdf" in content_type.lower()
) or file_path.lower().endswith(".pdf")
if is_pdf:
# Extract text from PDF using PyMuPDF
# IMPORTANT: Use pymupdf4llm.to_markdown() to match indexing extraction
# This ensures character offsets align between indexed chunks and retrieval
import pymupdf
import pymupdf4llm
logger.debug(f"Extracting text from PDF: {file_path}")
pdf_doc = pymupdf.open(stream=file_content, filetype="pdf")
text_parts = []
# Extract each page as markdown (same as indexing)
for page_num in range(pdf_doc.page_count):
page_md = pymupdf4llm.to_markdown(
pdf_doc,
pages=[page_num],
write_images=False, # Don't need images for context
page_chunks=False,
)
text_parts.append(page_md)
pdf_doc.close()
# Join pages (no separator - matches indexing)
full_text = "".join(text_parts)
logger.debug(
f"Extracted {len(full_text)} characters from "
f"{pdf_doc.page_count} pages in {file_path}"
)
return full_text
else:
# Assume it's a text file, decode to string
logger.debug(f"Decoding text file: {file_path}")
return file_content.decode("utf-8", errors="replace")
except Exception as e:
logger.error(
f"Error fetching file content for {doc_id}: {e}", exc_info=True
)
return None
else:
logger.warning(f"Unsupported doc_type for context expansion: {doc_type}")
return None
except Exception as e:
logger.error(f"Error fetching document {doc_type} {doc_id}: {e}", exc_info=True)
return None
def _insert_position_markers(
before_context: str,
chunk_text: str,
after_context: str,
page_number: int | None,
chunk_index: int,
total_chunks: int,
has_before_truncation: bool,
has_after_truncation: bool,
) -> str:
"""Insert position markers around matched chunk.
Creates markdown-formatted text with visual markers indicating chunk
boundaries and metadata.
Args:
before_context: Text before chunk
chunk_text: The matched chunk
after_context: Text after chunk
page_number: Optional page number
chunk_index: Zero-based chunk index
total_chunks: Total chunks in document
has_before_truncation: Whether before_context is truncated
has_after_truncation: Whether after_context is truncated
Returns:
Formatted text with position markers
"""
# Build position metadata
position_parts = []
if page_number is not None:
position_parts.append(f"Page {page_number}")
position_parts.append(f"Chunk {chunk_index + 1} of {total_chunks}")
position_metadata = ", ".join(position_parts)
# Build marked text
parts = []
# Add truncation indicator for before context
if has_before_truncation:
parts.append("**[...]**\n\n")
# Add before context if present
if before_context:
parts.append(before_context)
# Add chunk start marker
parts.append(f"\n\n🔍 **MATCHED CHUNK START** ({position_metadata})\n\n")
# Add chunk text
parts.append(chunk_text)
# Add chunk end marker
parts.append("\n\n🔍 **MATCHED CHUNK END**\n\n")
# Add after context if present
if after_context:
parts.append(after_context)
# Add truncation indicator for after context
if has_after_truncation:
parts.append("\n\n**[...]**")
return "".join(parts)
@@ -0,0 +1,907 @@
"""PDF chunk highlighting utilities for vector visualization.
This module provides utilities to generate highlighted page images showing
matched chunks and their context from semantic search results.
The highlighting uses character offsets to precisely locate chunks within
PDF documents, ensuring accurate highlighting even when text formatting
varies between indexing and rendering.
"""
import logging
import re
from typing import Optional
import pymupdf
import pymupdf4llm
logger = logging.getLogger(__name__)
class PDFHighlighter:
"""Generate highlighted page images from PDF chunks."""
# Color definitions (RGB, 0-1 range)
COLORS = {
"yellow": [1, 1, 0],
"red": [1, 0, 0],
"green": [0, 1, 0],
"blue": [0, 0, 1],
"orange": [1, 0.5, 0],
"pink": [1, 0, 1],
"gray": [0.7, 0.7, 0.7],
"light_blue": [0.7, 0.9, 1.0],
"light_green": [0.7, 1.0, 0.7],
}
@staticmethod
def strip_markdown(text: str) -> str:
"""Remove markdown formatting to improve search accuracy.
Args:
text: Text with potential markdown formatting
Returns:
Plain text with markdown removed
"""
# Remove bold/italic markers
text = re.sub(r"\*\*(.+?)\*\*", r"\1", text)
text = re.sub(r"\*(.+?)\*", r"\1", text)
text = re.sub(r"__(.+?)__", r"\1", text)
text = re.sub(r"_(.+?)_", r"\1", text)
# Remove headers
text = re.sub(r"^#+\s+", "", text, flags=re.MULTILINE)
# Remove inline code
text = re.sub(r"`(.+?)`", r"\1", text)
return text.strip()
@staticmethod
def extract_pdf_text_with_boundaries(
pdf_doc: pymupdf.Document,
) -> tuple[str, list[dict]]:
"""Extract full document text with page boundary tracking.
Uses pymupdf4llm.to_markdown() for consistency with indexing.
IMPORTANT: Must use write_images=True to match PyMuPDFProcessor behavior!
Even though we don't need the images, we need the image references in the
markdown text to maintain consistent character offsets with indexing.
Args:
pdf_doc: Open PyMuPDF document
Returns:
Tuple of (full_text, page_boundaries) where page_boundaries is a list of:
{"page": 1, "start_offset": 0, "end_offset": 1234}
"""
import tempfile
from pathlib import Path
page_boundaries = []
text_parts = []
current_offset = 0
# Use temp directory for image output (images are discarded after extraction)
temp_dir = Path(tempfile.mkdtemp(prefix="pdf_highlight_"))
for page_idx in range(pdf_doc.page_count):
page_md = pymupdf4llm.to_markdown(
pdf_doc,
pages=[page_idx],
write_images=True, # Must match indexing! Otherwise offsets misalign
image_path=temp_dir,
page_chunks=False,
)
page_boundaries.append(
{
"page": page_idx + 1, # 1-indexed
"start_offset": current_offset,
"end_offset": current_offset + len(page_md),
}
)
text_parts.append(page_md)
current_offset += len(page_md)
full_text = "".join(text_parts)
# Clean up temp directory and extracted images
import shutil
try:
shutil.rmtree(temp_dir)
except Exception as e:
logger.warning(f"Failed to clean up temp directory {temp_dir}: {e}")
return full_text, page_boundaries
@staticmethod
def find_chunk_page(
chunk_start_offset: int,
chunk_end_offset: int,
page_boundaries: list[dict],
) -> Optional[dict]:
"""Find which page contains the most of a given chunk.
Args:
chunk_start_offset: Chunk start position in full document
chunk_end_offset: Chunk end position in full document
page_boundaries: Page boundary list from extract_pdf_text_with_boundaries()
Returns:
Dict with keys: page_num, overlap_chars, page_relative_start, page_relative_end
or None if chunk not found on any page
"""
chunk_pages = []
for boundary in page_boundaries:
page_start = boundary["start_offset"]
page_end = boundary["end_offset"]
# Check if chunk overlaps with this page
if chunk_start_offset < page_end and chunk_end_offset > page_start:
overlap_start = max(chunk_start_offset, page_start)
overlap_end = min(chunk_end_offset, page_end)
overlap_chars = overlap_end - overlap_start
chunk_pages.append(
{
"page_num": boundary["page"],
"overlap_chars": overlap_chars,
"page_relative_start": overlap_start - page_start,
"page_relative_end": overlap_end - page_start,
}
)
if not chunk_pages:
return None
# Return page with maximum overlap
return max(chunk_pages, key=lambda p: p["overlap_chars"])
@staticmethod
def highlight_chunk_by_word_positions(
page: pymupdf.Page,
chunk_text: str,
color: str = "yellow",
search_region: tuple[float, float, float, float] | None = None,
) -> int:
"""Highlight chunk using word-position matching.
This method matches words from the chunk to their positions on the PDF page,
avoiding text search mismatches between markdown-formatted text and raw PDF text.
Args:
page: PyMuPDF page object
chunk_text: Text to highlight (may contain markdown)
color: Color name from COLORS dict
search_region: Optional (x0, y0, x1, y1) bounding box to constrain search.
If provided, only words within this region are considered.
Returns:
Number of highlight rectangles added
"""
# Tokenize chunk into words (alphanumeric only, lowercase)
chunk_words = re.findall(
r"\w+", PDFHighlighter.strip_markdown(chunk_text).lower()
)
if not chunk_words:
logger.warning("No words found in chunk text")
return 0
# Get all words from page with positions
# Format: (x0, y0, x1, y1, "word", block_no, line_no, word_no)
try:
page_words = page.get_text("words")
except Exception as e:
logger.error(f"Failed to extract words from page: {e}")
return 0
if not page_words:
logger.warning("No words found on page")
return 0
# Filter words by search region if provided
if search_region:
rx0, ry0, rx1, ry1 = search_region
# Allow some tolerance (10 points) for words near region boundary
tolerance = 10
page_words = [
w
for w in page_words
if (
w[0] >= rx0 - tolerance
and w[2] <= rx1 + tolerance
and w[1] >= ry0 - tolerance
and w[3] <= ry1 + tolerance
)
]
logger.debug(
f"Filtered to {len(page_words)} words in region "
f"({rx0:.0f}, {ry0:.0f}, {rx1:.0f}, {ry1:.0f})"
)
if not page_words:
logger.warning("No words found in search region")
return 0
# Find matching word sequence - use FIRST match, not longest
# This ensures we highlight the actual chunk location, not similar text elsewhere
matches = []
# Build a simple word-to-positions index for the first few chunk words
# to find candidate starting positions
first_chunk_word = chunk_words[0] if chunk_words else ""
candidate_starts = []
for i, pw in enumerate(page_words):
page_word = pw[4].lower()
# Check if this could be the start of the chunk
if (
first_chunk_word == page_word
or first_chunk_word in page_word
or page_word in first_chunk_word
):
candidate_starts.append(i)
# Try each candidate start position and take the FIRST good match
for start_pos in candidate_starts:
current_matches = []
chunk_idx = 0
skip_count = 0
max_skips = 3 # Allow some formatting differences
for page_idx in range(start_pos, len(page_words)):
if chunk_idx >= len(chunk_words):
break
page_word = page_words[page_idx][4].lower()
chunk_word = chunk_words[chunk_idx]
# Check for match (allow partial matches for flexibility)
if (
chunk_word == page_word
or chunk_word in page_word
or page_word in chunk_word
):
current_matches.append(page_words[page_idx])
chunk_idx += 1
skip_count = 0
elif skip_count < max_skips:
# Allow skipping some words (formatting, punctuation)
skip_count += 1
continue
else:
break
# Accept if we matched at least 50% of chunk words
if len(current_matches) >= len(chunk_words) * 0.5:
matches = current_matches
logger.debug(
f"Found match at position {start_pos}: "
f"{len(matches)}/{len(chunk_words)} words"
)
break # Take FIRST match, not best/longest
if not matches:
logger.debug(f"No word matches found (chunk has {len(chunk_words)} words)")
return 0
logger.debug(
f"Matched {len(matches)} words out of {len(chunk_words)} chunk words"
)
# Build rectangles from matched words
rects = [pymupdf.Rect(w[0], w[1], w[2], w[3]) for w in matches]
# Check if matches are contiguous (not scattered across the page)
# Scattered matches indicate false positives from common words
if len(rects) > 1:
# Sort by vertical position then horizontal
sorted_matches = sorted(matches, key=lambda w: (round(w[1]), w[0]))
# Check for large vertical gaps (more than ~2 lines apart)
# A typical line height is 12-20 points
max_line_gap = 50 # Points - allows for ~2-3 lines gap
prev_y = sorted_matches[0][1]
large_gaps = 0
for match in sorted_matches[1:]:
y_gap = match[1] - prev_y
if y_gap > max_line_gap:
large_gaps += 1
prev_y = match[1]
# If matches are scattered (many large gaps), reject this match
# A chunk should be mostly contiguous text
if large_gaps > len(matches) * 0.3: # More than 30% have gaps
logger.debug(
f"Rejecting scattered matches: {large_gaps} large gaps "
f"out of {len(matches)} matches"
)
return 0
# Merge adjacent rectangles on the same line for cleaner highlighting
merged_rects = []
sorted_rects = sorted(rects, key=lambda r: (round(r.y0), r.x0))
current_rect = None
for rect in sorted_rects:
if current_rect is None:
current_rect = rect
elif abs(rect.y0 - current_rect.y0) < 5: # Same line (within 5 points)
current_rect = current_rect | rect # Union
else:
merged_rects.append(current_rect)
current_rect = rect
if current_rect:
merged_rects.append(current_rect)
# Add highlights
rgb = PDFHighlighter.COLORS.get(color, PDFHighlighter.COLORS["yellow"])
for rect in merged_rects:
highlight = page.add_highlight_annot(rect)
highlight.set_colors({"stroke": rgb})
highlight.set_info(
content="Chunk from semantic search",
title="PDF Highlighter (word-position)",
)
highlight.update()
return len(merged_rects)
@staticmethod
def find_unique_phrase(
text: str, min_len: int = 30, max_len: int = 80
) -> str | None:
"""Find a relatively unique phrase from text for location search.
Looks for phrases that are likely to be unique on the page:
- Prefers phrases with numbers or special terms
- Avoids very common words
Args:
text: Source text to extract phrase from
min_len: Minimum phrase length
max_len: Maximum phrase length
Returns:
A phrase likely to be unique, or None if not found
"""
clean_text = PDFHighlighter.strip_markdown(text).strip()
if not clean_text:
return None
# Try first sentence (often unique due to context)
sentences = re.split(r"[.!?]\s+", clean_text)
for sentence in sentences:
sentence = sentence.strip()
if min_len <= len(sentence) <= max_len:
return sentence
elif len(sentence) > max_len:
return sentence[:max_len]
# Fallback: first N chars
if len(clean_text) >= min_len:
return clean_text[:max_len]
return clean_text if clean_text else None
@staticmethod
def _find_chunk_bbox(
page: pymupdf.Page,
chunk_text: str,
page_relative_start: int,
page_relative_end: int,
page_text_length: int,
) -> tuple[float, float, float, float] | None:
"""Find bounding box for a chunk without modifying the page.
Returns (x0, y0, x1, y1) in page coordinates, or None if not found.
"""
page_rect = page.rect
# Strip markdown for searching
search_text = PDFHighlighter.strip_markdown(chunk_text)
# Try to find chunk location using text search
anchor_rect = None
search_phrases = []
# Build search phrases from chunk text
sentences = re.split(r"[.!?]\s+", search_text)
for sentence in sentences[:3]:
sentence = sentence.strip()
if len(sentence) >= 20:
search_phrases.append(sentence[:80])
if len(sentence) >= 40:
search_phrases.append(sentence[:40])
# Also try first N characters
if len(search_text) >= 30:
search_phrases.append(search_text[:60])
search_phrases.append(search_text[:30])
for phrase in search_phrases:
if not phrase:
continue
rects = page.search_for(phrase.strip())
if rects:
anchor_rect = rects[0]
break
if not anchor_rect:
return None
# Calculate chunk height based on character count
chunk_chars = len(search_text)
estimated_lines = max(1, chunk_chars / 60)
estimated_height = estimated_lines * 14
# Build bounding box
return (
page_rect.x0 + 30, # Left margin
anchor_rect.y0 - 5, # Start slightly above anchor
page_rect.x1 - 30, # Right margin
min(anchor_rect.y0 + estimated_height + 10, page_rect.y1 - 30),
)
@staticmethod
def highlight_chunk_on_page(
page: pymupdf.Page,
chunk_text: str,
color: str = "yellow",
page_relative_start: int | None = None,
page_relative_end: int | None = None,
page_text_length: int | None = None,
) -> int:
"""Add bounding box highlight to a PDF page for the given chunk text.
Uses text search to find the chunk's location on the page, then draws
a bounding box around that region. Falls back to character offset estimation
if text search fails.
Args:
page: PyMuPDF page object
chunk_text: Text to highlight (may contain markdown)
color: Color name from COLORS dict
page_relative_start: Character offset where chunk starts on page (optional)
page_relative_end: Character offset where chunk ends on page (optional)
page_text_length: Total character length of page text (optional)
Returns:
Number of highlights added (1 for bounding box, 0 if failed)
"""
page_rect = page.rect
rgb = PDFHighlighter.COLORS.get(color, PDFHighlighter.COLORS["yellow"])
# Strip markdown for searching
search_text = PDFHighlighter.strip_markdown(chunk_text)
# Try to find chunk location using text search
# Search for progressively shorter phrases until we find a match
anchor_rect = None
search_phrases = []
# Build search phrases from chunk text
sentences = re.split(r"[.!?]\s+", search_text)
for sentence in sentences[:3]: # Try first 3 sentences
sentence = sentence.strip()
if len(sentence) >= 20:
search_phrases.append(sentence[:80])
if len(sentence) >= 40:
search_phrases.append(sentence[:40])
# Also try first N characters
if len(search_text) >= 30:
search_phrases.append(search_text[:60])
search_phrases.append(search_text[:30])
for phrase in search_phrases:
if not phrase:
continue
rects = page.search_for(phrase.strip())
if rects:
anchor_rect = rects[0] # Use first match
logger.debug(f"Found chunk anchor using phrase: '{phrase[:30]}...'")
break
if not anchor_rect:
page_num = page.number + 1 if page.number is not None else "unknown"
logger.warning(f"Could not find chunk text on page {page_num}")
return 0
# Calculate chunk height based on character count
# Estimate ~15 chars per line, ~12pt line height
chunk_chars = len(search_text)
estimated_lines = max(1, chunk_chars / 60) # ~60 chars per line typical
estimated_height = estimated_lines * 14 # ~14pt per line
# Build bounding box starting from anchor
chunk_rect = pymupdf.Rect(
page_rect.x0 + 30, # Left margin
anchor_rect.y0 - 5, # Start slightly above anchor
page_rect.x1 - 30, # Right margin
min(
anchor_rect.y0 + estimated_height + 10, page_rect.y1 - 30
), # Estimated bottom
)
# Draw a visible rectangle around the chunk region
shape = page.new_shape()
shape.draw_rect(chunk_rect)
shape.finish(
color=rgb, # Border color
fill=None, # No fill (transparent)
width=2.5, # Border width
dashes="[4 2]", # Dashed line
)
shape.commit()
# Add semi-transparent fill for visibility
fill_shape = page.new_shape()
fill_shape.draw_rect(chunk_rect)
fill_shape.finish(
color=None, # No border
fill=[1, 1, 0.7], # Light yellow fill
fill_opacity=0.15, # Very transparent
)
fill_shape.commit()
logger.debug(
f"Added bounding box at y={chunk_rect.y0:.0f}-{chunk_rect.y1:.0f} "
f"(estimated {estimated_lines:.1f} lines)"
)
return 1
@staticmethod
def highlight_chunk(
pdf_bytes: bytes,
chunk_start_offset: int,
chunk_end_offset: int,
stored_page_number: Optional[int] = None,
color: str = "yellow",
zoom: float = 2.0,
) -> Optional[tuple[bytes, int, int]]:
"""Generate PNG image of PDF page with highlighted chunk.
This is the main entry point for highlighting. It:
1. Extracts document text with page boundaries
2. Finds which page contains the chunk
3. Extracts chunk text using character offsets
4. Highlights the chunk on the page
5. Renders page to PNG
Args:
pdf_bytes: PDF file bytes
chunk_start_offset: Chunk start position (document-level)
chunk_end_offset: Chunk end position (document-level)
stored_page_number: Page number from metadata (optional, for validation)
color: Highlight color name
zoom: Rendering zoom factor (2.0 = 144 DPI)
Returns:
Tuple of (png_bytes, page_number, highlight_count) or None if failed
"""
import tempfile
from pathlib import Path
temp_pdf_path = None
try:
# Write PDF to temp file with consistent name "pdf.pdf"
# This ensures image references match indexing (e.g., pdf-0001.png)
# Different temp filenames would cause different markdown text lengths!
temp_dir = Path(tempfile.mkdtemp(prefix="pdf_highlight_"))
temp_pdf_path = temp_dir / "pdf.pdf"
temp_pdf_path.write_bytes(pdf_bytes)
# Open PDF from temp file
doc = pymupdf.open(temp_pdf_path)
# Extract text with page boundaries
full_text, page_boundaries = (
PDFHighlighter.extract_pdf_text_with_boundaries(doc)
)
# Find which page contains the chunk
chunk_page_info = PDFHighlighter.find_chunk_page(
chunk_start_offset, chunk_end_offset, page_boundaries
)
if not chunk_page_info:
logger.error("Chunk not found on any page")
doc.close()
return None
page_num = chunk_page_info["page_num"]
# Log if page differs from stored metadata
if stored_page_number and stored_page_number != page_num:
logger.info(
f"Chunk primarily on page {page_num}, metadata says {stored_page_number}"
)
# Extract page text
page_boundary = page_boundaries[page_num - 1]
page_start = page_boundary["start_offset"]
page_end = page_boundary["end_offset"]
page_text = full_text[page_start:page_end]
# Extract chunk text using page-relative offsets
page_relative_start = chunk_page_info["page_relative_start"]
page_relative_end = chunk_page_info["page_relative_end"]
chunk_text = page_text[page_relative_start:page_relative_end]
# Calculate page text length for region estimation
page_text_length = page_end - page_start
logger.debug(
f"Extracted {len(chunk_text)} chars on page {page_num} "
f"(offsets {page_relative_start}-{page_relative_end} of {page_text_length})"
)
# Get page and add highlights
page = doc[page_num - 1]
highlight_count = PDFHighlighter.highlight_chunk_on_page(
page,
chunk_text,
color,
page_relative_start=page_relative_start,
page_relative_end=page_relative_end,
page_text_length=page_text_length,
)
if highlight_count == 0:
logger.warning("No highlights added")
doc.close()
return None
# Render page to PNG
mat = pymupdf.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat, alpha=False)
png_bytes = pix.tobytes("png")
doc.close()
logger.info(
f"Generated {len(png_bytes):,} byte image with {highlight_count} highlights"
)
return (png_bytes, page_num, highlight_count)
except Exception as e:
logger.error(f"Error highlighting chunk: {e}", exc_info=True)
return None
finally:
# Clean up temp directory and PDF file
if temp_pdf_path and temp_pdf_path.parent.exists():
try:
import shutil
shutil.rmtree(temp_pdf_path.parent)
except Exception as e:
logger.warning(
f"Failed to delete temp directory {temp_pdf_path.parent}: {e}"
)
@staticmethod
def highlight_chunks_batch(
pdf_bytes: bytes,
chunks: list[tuple[int, int, int, int | None, str]],
page_boundaries: list[dict],
full_text: str,
color: str = "yellow",
zoom: float = 2.0,
) -> dict[int, tuple[bytes, int, int]]:
"""Generate highlighted images for multiple chunks.
Opens PDF once for rendering, uses pre-computed page boundaries from the
document processor. This ensures consistent character offsets between
chunking and highlighting.
Args:
pdf_bytes: PDF file bytes
chunks: List of (chunk_index, start_offset, end_offset, stored_page_number, chunk_text)
The chunk_index is used as the key in the returned dict.
chunk_text is the actual text content of the chunk.
page_boundaries: Pre-computed page boundaries from document processor.
Each entry: {"page": 1, "start_offset": 0, "end_offset": 1234}
full_text: Full document text for extracting page-relative portions.
color: Highlight color name
zoom: Rendering zoom factor (2.0 = 144 DPI)
Returns:
Dict mapping chunk_index to (png_bytes, page_number, highlight_count)
Chunks that fail to highlight are omitted from the result.
"""
import shutil
import tempfile
from collections import defaultdict
from pathlib import Path
results: dict[int, tuple[bytes, int, int]] = {}
if not chunks:
return results
temp_pdf_path = None
try:
# Write PDF to temp file
temp_dir = Path(tempfile.mkdtemp(prefix="pdf_highlight_batch_"))
temp_pdf_path = temp_dir / "pdf.pdf"
temp_pdf_path.write_bytes(pdf_bytes)
# Open PDF once (only for rendering, not text extraction)
doc = pymupdf.open(temp_pdf_path)
logger.debug(
f"Batch highlighting: {len(chunks)} chunks, "
f"{len(page_boundaries)} pages"
)
# Group chunks by their target page for efficient rendering
# We'll render each page only once with all its highlights
chunks_by_page: dict[int, list[tuple[int, dict, str]]] = defaultdict(list)
for chunk_tuple in chunks:
# Unpack chunk tuple - chunk_text is now passed directly
chunk_index, start_offset, end_offset, stored_page_num, chunk_text = (
chunk_tuple
)
# Find which page contains this chunk
chunk_page_info = PDFHighlighter.find_chunk_page(
start_offset, end_offset, page_boundaries
)
if not chunk_page_info:
logger.warning(f"Chunk {chunk_index}: not found on any page")
continue
page_num = chunk_page_info["page_num"]
# Log if page differs from stored metadata
if stored_page_num and stored_page_num != page_num:
logger.debug(
f"Chunk {chunk_index}: found on page {page_num}, "
f"metadata says {stored_page_num}"
)
# Extract page-relative portion of chunk text
# This is critical for cross-page chunks where the start
# of the chunk might be on a different page
page_boundary = page_boundaries[page_num - 1]
page_start = page_boundary["start_offset"]
page_end = page_boundary["end_offset"]
page_text_length = page_end - page_start
# Calculate what portion of the chunk appears on this page
chunk_start_on_page = max(start_offset, page_start)
chunk_end_on_page = min(end_offset, page_end)
# Extract just the text that appears on this page
page_relative_text = full_text[chunk_start_on_page:chunk_end_on_page]
chunks_by_page[page_num].append(
(chunk_index, chunk_page_info, page_relative_text, page_text_length)
)
logger.debug(
f"Chunks distributed across {len(chunks_by_page)} unique pages"
)
# OPTIMIZATION: Render each page ONCE, then draw highlights using PIL
# This avoids expensive page.get_pixmap() calls per chunk
from io import BytesIO
from PIL import Image, ImageDraw
# PIL color for bounding box (RGB tuple)
rgb = PDFHighlighter.COLORS.get(color, PDFHighlighter.COLORS["yellow"])
pil_color = tuple(int(c * 255) for c in rgb)
fill_color = (255, 255, 178, 38) # Light yellow with alpha
for page_num, page_chunks in chunks_by_page.items():
page = doc[page_num - 1]
# Render page ONCE to get base image (most expensive operation)
mat = pymupdf.Matrix(zoom, zoom)
base_pix = page.get_pixmap(matrix=mat, alpha=False)
base_png = base_pix.tobytes("png")
# Convert to PIL Image for fast highlight drawing
base_image = Image.open(BytesIO(base_png)).convert("RGBA")
page_rect = page.rect
logger.debug(
f"Page {page_num}: rendered once, processing {len(page_chunks)} chunks"
)
for (
chunk_index,
chunk_page_info,
chunk_text,
page_text_length,
) in page_chunks:
try:
# Find chunk bounding box using text search
bbox = PDFHighlighter._find_chunk_bbox(
page,
chunk_text,
chunk_page_info["page_relative_start"],
chunk_page_info["page_relative_end"],
page_text_length,
)
if bbox is None:
logger.warning(f"Chunk {chunk_index}: could not find bbox")
continue
# Copy base image for this chunk
chunk_image = base_image.copy()
# Scale bbox coordinates to pixmap coordinates
scale_x = base_pix.width / page_rect.width
scale_y = base_pix.height / page_rect.height
pil_bbox = (
int(bbox[0] * scale_x),
int(bbox[1] * scale_y),
int(bbox[2] * scale_x),
int(bbox[3] * scale_y),
)
# Create transparent overlay for fill (proper alpha blending)
overlay = Image.new("RGBA", chunk_image.size, (0, 0, 0, 0))
overlay_draw = ImageDraw.Draw(overlay)
overlay_draw.rectangle(pil_bbox, fill=fill_color)
# Alpha composite the overlay onto the chunk image
chunk_image = Image.alpha_composite(chunk_image, overlay)
# Draw border on top (solid, not transparent)
border_draw = ImageDraw.Draw(chunk_image)
border_draw.rectangle(pil_bbox, outline=pil_color, width=3)
# Convert back to PNG bytes
output = BytesIO()
chunk_image.convert("RGB").save(output, format="PNG")
png_bytes = output.getvalue()
results[chunk_index] = (png_bytes, page_num, 1)
logger.debug(
f"Chunk {chunk_index}: {len(png_bytes):,} bytes, "
f"page {page_num}, bbox {pil_bbox}"
)
except Exception as e:
logger.error(f"Chunk {chunk_index}: error - {e}")
continue
doc.close()
logger.info(
f"Batch highlighted {len(results)}/{len(chunks)} chunks successfully"
)
return results
except Exception as e:
logger.error(f"Error in batch highlighting: {e}", exc_info=True)
return results
finally:
# Clean up temp directory
if temp_pdf_path and temp_pdf_path.parent.exists():
try:
shutil.rmtree(temp_pdf_path.parent)
except Exception as e:
logger.warning(f"Failed to clean up temp dir: {e}")
+23 -8
View File
@@ -9,6 +9,7 @@ from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.embedding import get_embedding_service
from nextcloud_mcp_server.observability.metrics import record_qdrant_operation
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
logger = logging.getLogger(__name__)
@@ -50,6 +51,9 @@ class SemanticSearchAlgorithm(SearchAlgorithm):
Returns unverified results from Qdrant. Access verification should be
performed separately at the final output stage using verify_search_results().
Deduplicates by (doc_id, doc_type, chunk_start_offset, chunk_end_offset)
to show multiple chunks from the same document while avoiding duplicate chunks.
Args:
query: Natural language search query
user_id: User ID for filtering
@@ -74,16 +78,19 @@ class SemanticSearchAlgorithm(SearchAlgorithm):
# Generate embedding for query
embedding_service = get_embedding_service()
query_embedding = await embedding_service.embed(query)
# Store for reuse by callers (e.g., viz_routes PCA visualization)
self.query_embedding = query_embedding
logger.debug(
f"Generated embedding for query (dimension={len(query_embedding)})"
)
# Build Qdrant filter
filter_conditions = [
get_placeholder_filter(), # Always exclude placeholders from user-facing queries
FieldCondition(
key="user_id",
match=MatchValue(value=user_id),
)
),
]
# Add doc_type filter if specified
@@ -123,20 +130,24 @@ class SemanticSearchAlgorithm(SearchAlgorithm):
top_scores = [p.score for p in search_response.points[:3]]
logger.debug(f"Top 3 similarity scores: {top_scores}")
# Deduplicate by (doc_id, doc_type) - multiple chunks per document
seen_docs = set()
# Deduplicate by (doc_id, doc_type, chunk_start, chunk_end)
# This allows multiple chunks from same doc, but removes duplicate chunks
seen_chunks = set()
results = []
for result in search_response.points:
doc_id = int(result.payload["doc_id"])
# doc_id can be int (notes) or str (files - file paths)
doc_id = result.payload["doc_id"]
doc_type = result.payload.get("doc_type", "note")
doc_key = (doc_id, doc_type)
chunk_start = result.payload.get("chunk_start_offset")
chunk_end = result.payload.get("chunk_end_offset")
chunk_key = (doc_id, doc_type, chunk_start, chunk_end)
# Skip if we've already seen this document
if doc_key in seen_docs:
# Skip if we've already seen this exact chunk
if chunk_key in seen_chunks:
continue
seen_docs.add(doc_key)
seen_chunks.add(chunk_key)
# Return unverified results (verification happens at output stage)
results.append(
@@ -152,6 +163,10 @@ class SemanticSearchAlgorithm(SearchAlgorithm):
},
chunk_start_offset=result.payload.get("chunk_start_offset"),
chunk_end_offset=result.payload.get("chunk_end_offset"),
page_number=result.payload.get("page_number"),
chunk_index=result.payload.get("chunk_index", 0),
total_chunks=result.payload.get("total_chunks", 1),
point_id=str(result.id), # Qdrant point ID for batch retrieval
)
)
+111 -33
View File
@@ -26,6 +26,7 @@ from nextcloud_mcp_server.observability.metrics import (
instrument_tool,
)
from nextcloud_mcp_server.search.bm25_hybrid import BM25HybridSearchAlgorithm
from nextcloud_mcp_server.search.context import get_chunk_with_context
logger = logging.getLogger(__name__)
@@ -43,6 +44,8 @@ def configure_semantic_tools(mcp: FastMCP):
doc_types: list[str] | None = None,
score_threshold: float = 0.0,
fusion: str = "rrf",
include_context: bool = False,
context_chars: int = 300,
) -> SemanticSearchResponse:
"""
Search Nextcloud content using BM25 hybrid search with cross-app support.
@@ -66,6 +69,8 @@ def configure_semantic_tools(mcp: FastMCP):
fusion: Fusion algorithm: "rrf" (Reciprocal Rank Fusion, default) or "dbsf" (Distribution-Based Score Fusion)
RRF: Good general-purpose fusion using reciprocal ranks
DBSF: Uses distribution-based normalization, may better balance different score ranges
include_context: Whether to expand results with surrounding context (default: False)
context_chars: Number of characters to include before/after matched chunk (default: 300)
Returns:
SemanticSearchResponse with matching documents ranked by fusion scores
@@ -128,18 +133,16 @@ def configure_semantic_tools(mcp: FastMCP):
# Sort combined results by score
all_results.sort(key=lambda r: r.score, reverse=True)
# Deduplicate results (hybrid search may return same doc from dense + sparse)
# Qdrant already filters by user_id for multi-tenant isolation
# Sampling tool will verify access when fetching full content
seen = set()
unique_results = []
for result in all_results:
key = (result.id, result.doc_type)
if key not in seen:
seen.add(key)
unique_results.append(result)
search_results = unique_results[:limit] # Final limit after deduplication
# Note: BM25HybridSearchAlgorithm already deduplicates at chunk level
# (doc_id, doc_type, chunk_start, chunk_end), which allows multiple
# chunks from the same document while preventing duplicate chunks.
# No additional deduplication needed here - multiple chunks per document
# are valuable for RAG contexts.
# Qdrant already filters by user_id for multi-tenant isolation.
# Sampling tool will verify access when fetching full content.
search_results = all_results[
:limit
] # Final limit after chunk-level dedup in algorithm
# Convert SearchResult objects to SemanticSearchResult for response
results = []
@@ -160,9 +163,99 @@ def configure_semantic_tools(mcp: FastMCP):
else 1,
chunk_start_offset=r.chunk_start_offset,
chunk_end_offset=r.chunk_end_offset,
page_number=r.page_number,
)
)
# Expand results with surrounding context if requested
if include_context and results:
logger.info(
f"Expanding {len(results)} results with context "
f"(context_chars={context_chars})"
)
# Fetch context for all results in parallel
# Limit concurrent requests to prevent connection pool exhaustion
max_concurrent = 20
semaphore = anyio.Semaphore(max_concurrent)
expanded_results = [None] * len(results)
async def fetch_context(index: int, result: SemanticSearchResult):
"""Fetch context for a single result (parallel with semaphore)."""
async with semaphore:
# Only expand if we have valid chunk offsets
if (
result.chunk_start_offset is None
or result.chunk_end_offset is None
):
# Keep result as-is without context expansion
expanded_results[index] = result
return
try:
chunk_context = await get_chunk_with_context(
nc_client=client,
user_id=username,
doc_id=result.id,
doc_type=result.doc_type,
chunk_start=result.chunk_start_offset,
chunk_end=result.chunk_end_offset,
page_number=result.page_number,
chunk_index=result.chunk_index,
total_chunks=result.total_chunks,
context_chars=context_chars,
)
if chunk_context:
# Create new result with context fields populated
expanded_results[index] = SemanticSearchResult(
id=result.id,
doc_type=result.doc_type,
title=result.title,
category=result.category,
excerpt=result.excerpt,
score=result.score,
chunk_index=result.chunk_index,
total_chunks=result.total_chunks,
chunk_start_offset=result.chunk_start_offset,
chunk_end_offset=result.chunk_end_offset,
page_number=result.page_number,
# Context expansion fields
has_context_expansion=True,
marked_text=chunk_context.marked_text,
before_context=chunk_context.before_context,
after_context=chunk_context.after_context,
has_before_truncation=chunk_context.has_before_truncation,
has_after_truncation=chunk_context.has_after_truncation,
)
logger.debug(
f"Expanded context for {result.doc_type} {result.id}"
)
else:
# Context expansion failed, keep original result
expanded_results[index] = result
logger.debug(
f"Failed to expand context for {result.doc_type} {result.id}, "
"keeping original result"
)
except Exception as e:
# Context expansion failed, keep original result
expanded_results[index] = result
logger.warning(
f"Error expanding context for {result.doc_type} {result.id}: {e}"
)
# Run all context fetches in parallel using anyio task group
async with anyio.create_task_group() as tg:
for idx, result in enumerate(results):
tg.start_soon(fetch_context, idx, result)
# Replace results with expanded versions
results = [r for r in expanded_results if r is not None]
logger.info(
f"Context expansion completed: {len(results)} results with context"
)
logger.info(f"Returning {len(results)} results from BM25 hybrid search")
return SemanticSearchResponse(
@@ -202,6 +295,8 @@ def configure_semantic_tools(mcp: FastMCP):
score_threshold: float = 0.7,
max_answer_tokens: int = 500,
fusion: str = "rrf",
include_context: bool = False,
context_chars: int = 300,
) -> SamplingSearchResponse:
"""
Semantic search with LLM-generated answer using MCP sampling.
@@ -227,6 +322,8 @@ def configure_semantic_tools(mcp: FastMCP):
score_threshold: Minimum similarity score 0-1 (default: 0.7)
max_answer_tokens: Maximum tokens for generated answer (default: 500)
fusion: Fusion algorithm: "rrf" (Reciprocal Rank Fusion, default) or "dbsf" (Distribution-Based Score Fusion)
include_context: Whether to expand results with surrounding context (default: False)
context_chars: Number of characters to include before/after matched chunk (default: 300)
Returns:
SamplingSearchResponse containing:
@@ -238,27 +335,6 @@ def configure_semantic_tools(mcp: FastMCP):
Note: Requires MCP client to support sampling. If sampling is unavailable,
the tool gracefully degrades to returning documents with an explanation.
The client may prompt the user to approve the sampling request.
Examples:
>>> # Query about objectives across multiple apps
>>> result = await nc_semantic_search_answer(
... query="What are my Q1 2025 project goals?",
... ctx=ctx
... )
>>> print(result.generated_answer)
"Based on Document 1 (note: Project Kickoff), Document 2 (calendar event:
Q1 Planning Meeting), and Document 3 (deck card: Implement semantic search),
your main goals are: 1) Improve semantic search accuracy by 20%,
2) Deploy new embedding model, 3) Reduce indexing latency..."
>>> # Query about appointments
>>> result = await nc_semantic_search_answer(
... query="When is my next dentist appointment?",
... ctx=ctx,
... limit=10
... )
>>> len(result.sources) # Calendar events and related notes
3
"""
# 1. Retrieve relevant documents via existing semantic search
search_response = await nc_semantic_search(
@@ -267,6 +343,8 @@ def configure_semantic_tools(mcp: FastMCP):
limit=limit,
score_threshold=score_threshold,
fusion=fusion,
include_context=include_context,
context_chars=context_chars,
)
# 2. Handle no results case - don't waste a sampling call
-14
View File
@@ -64,20 +64,6 @@ def configure_webdav_tools(mcp: FastMCP):
- Text files are decoded to UTF-8
- Documents (PDF, DOCX, etc.) are parsed and text is extracted
- Other binary files are base64 encoded
Examples:
# Read a text file
result = await nc_webdav_read_file("Documents/readme.txt")
logger.info(result['content']) # Decoded text content
# Read a PDF document (automatically parsed)
result = await nc_webdav_read_file("Documents/report.pdf")
logger.info(result['content']) # Extracted text from PDF
logger.info(result['parsing_metadata']) # Document parsing info
# Read a binary file
result = await nc_webdav_read_file("Images/photo.jpg")
logger.info(result['encoding']) # 'base64'
"""
client = await get_client(ctx)
content, content_type = await client.webdav.read_file(path)
+60
View File
@@ -0,0 +1,60 @@
"""Smithery-specific entrypoint for stateless deployment.
ADR-016: This entrypoint is used when deploying on Smithery's hosting platform.
It configures the server for stateless operation with per-session authentication.
Features disabled in Smithery mode:
- Vector sync / semantic search (no persistent storage)
- Admin UI at /app (no webhooks, no vector viz)
- OAuth provisioning tools (no token storage)
Features enabled:
- Core Nextcloud tools (notes, calendar, contacts, files, deck, tables, cookbook)
- Per-session app password authentication via Smithery configSchema
- Health check endpoints (/health/live, /health/ready)
"""
import logging
import os
import uvicorn
from nextcloud_mcp_server.config import setup_logging
logger = logging.getLogger(__name__)
def main():
"""Start the MCP server in Smithery stateless mode."""
# Setup logging first
setup_logging()
# Force stateless mode environment variables
os.environ["SMITHERY_DEPLOYMENT"] = "true"
os.environ["VECTOR_SYNC_ENABLED"] = "false"
logger.info("Starting Nextcloud MCP Server in Smithery stateless mode")
# Import app after setting environment variables
from nextcloud_mcp_server.app import get_app
# Create the app with streamable-http transport (required for Smithery)
app = get_app(transport="streamable-http")
# Smithery sets PORT environment variable
port = int(os.environ.get("PORT", 8081))
logger.info(f"Listening on port {port}")
uvicorn.run(
app,
host="0.0.0.0",
port=port,
log_level="info",
# Disable access log for cleaner output
access_log=False,
)
if __name__ == "__main__":
main()
@@ -15,6 +15,8 @@ class ChunkWithPosition:
text: str
start_offset: int # Character position where chunk starts
end_offset: int # Character position where chunk ends (exclusive)
page_number: int | None = None # Page number for PDF chunks (optional)
metadata: dict | None = None # Additional processor-specific metadata (optional)
class DocumentChunker:
@@ -50,7 +52,7 @@ class DocumentChunker:
strip_whitespace=True,
)
def chunk_text(self, content: str) -> list[ChunkWithPosition]:
async def chunk_text(self, content: str) -> list[ChunkWithPosition]:
"""
Split text into overlapping chunks with position tracking.
@@ -66,12 +68,17 @@ class DocumentChunker:
Returns:
List of chunks with their character positions in the original content
"""
import anyio
# Handle empty content - return single empty chunk for backward compatibility
if not content:
return [ChunkWithPosition(text="", start_offset=0, end_offset=0)]
# Use LangChain to create documents with position tracking
docs = self.splitter.create_documents([content])
# Run CPU-bound text splitting in thread pool to avoid blocking event loop
docs = await anyio.to_thread.run_sync( # type: ignore[attr-defined]
self.splitter.create_documents,
[content],
)
# Convert LangChain Documents to ChunkWithPosition objects
chunks = [
+306
View File
@@ -0,0 +1,306 @@
"""Placeholder point management for Qdrant state tracking.
Placeholders are zero-vector points stored in Qdrant to track document processing
state. They prevent duplicate work by marking documents as "in-flight" during the
gap between scanner queuing and processor completion.
Architecture:
- Scanner writes placeholders when queuing documents for processing
- Processor deletes placeholders and writes real vectors after processing
- All user-facing queries filter out placeholders (is_placeholder: False)
Placeholders contain:
- Zero vectors (dimension from embedding service)
- is_placeholder: True flag (for filtering)
- status: "pending", "processing", "completed", "failed"
- modified_at, etag from source document
- queued_at timestamp
"""
import logging
import time
import uuid
from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.embedding import get_embedding_service
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
logger = logging.getLogger(__name__)
def _generate_placeholder_id(doc_type: str, doc_id: str | int) -> str:
"""Generate deterministic UUID for placeholder point.
Args:
doc_type: Document type (note, file, etc.)
doc_id: Document ID
Returns:
UUID string for point ID
"""
point_name = f"{doc_type}:{doc_id}:placeholder"
return str(uuid.uuid5(uuid.NAMESPACE_DNS, point_name))
async def write_placeholder_point(
doc_id: str | int,
doc_type: str,
user_id: str,
modified_at: int,
etag: str = "",
file_path: str | None = None,
) -> None:
"""Write a placeholder point to Qdrant to mark document as queued.
This should be called by the scanner BEFORE queuing a document for processing.
The placeholder prevents duplicate work if the scanner runs again before
processing completes.
Args:
doc_id: Document ID (int for notes/files)
doc_type: Document type (note, file, etc.)
user_id: User ID who owns the document
modified_at: Document modification timestamp
etag: Document ETag (if available)
file_path: File path (for files only)
Raises:
Exception: If Qdrant write fails
"""
try:
qdrant_client = await get_qdrant_client()
settings = get_settings()
embedding_service = get_embedding_service()
# Get dimension dynamically (never hardcode)
dimension = embedding_service.get_dimension()
# Create zero vectors
zero_dense = [0.0] * dimension
# Create empty sparse vector for placeholders
# Use models.SparseVector with empty indices/values
from qdrant_client import models
empty_sparse = models.SparseVector(indices=[], values=[])
# Generate deterministic point ID
point_id = _generate_placeholder_id(doc_type, doc_id)
# Build payload
payload = {
"user_id": user_id,
"doc_id": doc_id,
"doc_type": doc_type,
"is_placeholder": True,
"status": "pending",
"modified_at": modified_at,
"etag": etag,
"queued_at": int(time.time()),
}
# Add file_path for files
if doc_type == "file" and file_path:
payload["file_path"] = file_path
# Create placeholder point
point = PointStruct(
id=point_id,
vector={
"dense": zero_dense,
"sparse": empty_sparse, # Empty sparse vector for placeholders
},
payload=payload,
)
# Upsert to Qdrant
await qdrant_client.upsert(
collection_name=settings.get_collection_name(),
points=[point],
wait=True,
)
logger.debug(
f"Wrote placeholder for {doc_type}_{doc_id} (user={user_id}, "
f"modified_at={modified_at})"
)
except Exception as e:
logger.error(
f"Failed to write placeholder for {doc_type}_{doc_id}: {e}",
exc_info=True,
)
raise
async def query_document_metadata(
doc_id: str | int,
doc_type: str,
user_id: str,
) -> dict | None:
"""Query Qdrant for existing document entry (placeholder or real).
Returns the payload of the first matching point, which could be:
- A placeholder (is_placeholder: True)
- A real indexed document (is_placeholder: False or missing)
- None if document not in Qdrant
Args:
doc_id: Document ID
doc_type: Document type
user_id: User ID
Returns:
Payload dict if found, None otherwise
"""
try:
qdrant_client = await get_qdrant_client()
settings = get_settings()
# Query for any entry matching doc_id, doc_type, user_id
scroll_result = await qdrant_client.scroll(
collection_name=settings.get_collection_name(),
scroll_filter=Filter(
must=[
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
FieldCondition(key="doc_id", match=MatchValue(value=doc_id)),
FieldCondition(key="doc_type", match=MatchValue(value=doc_type)),
]
),
limit=1,
with_payload=True,
with_vectors=False,
)
if scroll_result[0]:
point = scroll_result[0][0]
return dict(point.payload)
return None
except Exception as e:
logger.warning(f"Error querying document metadata for {doc_type}_{doc_id}: {e}")
return None
async def delete_placeholder_point(
doc_id: str | int,
doc_type: str,
user_id: str,
) -> None:
"""Delete a placeholder point from Qdrant.
This should be called by the processor BEFORE writing real vectors.
We delete the placeholder to avoid duplicates, then write the real chunks.
Args:
doc_id: Document ID
doc_type: Document type
user_id: User ID
Raises:
Exception: If Qdrant delete fails
"""
try:
qdrant_client = await get_qdrant_client()
settings = get_settings()
# Delete by filter (in case there are multiple chunks from old indexing)
await qdrant_client.delete(
collection_name=settings.get_collection_name(),
points_selector=Filter(
must=[
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
FieldCondition(key="doc_id", match=MatchValue(value=doc_id)),
FieldCondition(key="doc_type", match=MatchValue(value=doc_type)),
FieldCondition(key="is_placeholder", match=MatchValue(value=True)),
]
),
)
logger.debug(f"Deleted placeholder for {doc_type}_{doc_id} (user={user_id})")
except Exception as e:
logger.error(
f"Failed to delete placeholder for {doc_type}_{doc_id}: {e}",
exc_info=True,
)
raise
async def update_placeholder_status(
doc_id: str | int,
doc_type: str,
user_id: str,
status: str,
) -> None:
"""Update the status field of a placeholder point.
Status values:
- "pending": Queued for processing
- "processing": Currently being processed
- "completed": Processing completed successfully
- "failed": Processing failed
Args:
doc_id: Document ID
doc_type: Document type
user_id: User ID
status: New status value
Raises:
Exception: If Qdrant update fails
"""
try:
qdrant_client = await get_qdrant_client()
settings = get_settings()
# Update payload using set_payload
await qdrant_client.set_payload(
collection_name=settings.get_collection_name(),
payload={"status": status},
points=Filter(
must=[
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
FieldCondition(key="doc_id", match=MatchValue(value=doc_id)),
FieldCondition(key="doc_type", match=MatchValue(value=doc_type)),
FieldCondition(key="is_placeholder", match=MatchValue(value=True)),
]
),
)
logger.debug(
f"Updated placeholder status for {doc_type}_{doc_id} to '{status}' "
f"(user={user_id})"
)
except Exception as e:
logger.warning(
f"Failed to update placeholder status for {doc_type}_{doc_id}: {e}"
)
# Don't raise - status updates are non-critical
def get_placeholder_filter() -> FieldCondition:
"""Get a filter condition to exclude placeholders from queries.
Add this to all user-facing search/visualization queries to ensure
placeholders are never returned to users.
Returns:
FieldCondition that filters out is_placeholder: True
Example:
Filter(
must=[
get_placeholder_filter(), # Exclude placeholders
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
]
)
"""
return FieldCondition(
key="is_placeholder",
match=MatchValue(value=False),
)
+356 -25
View File
@@ -23,12 +23,50 @@ from nextcloud_mcp_server.observability.metrics import (
)
from nextcloud_mcp_server.observability.tracing import trace_operation
from nextcloud_mcp_server.vector.document_chunker import DocumentChunker
from nextcloud_mcp_server.vector.placeholder import delete_placeholder_point
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
from nextcloud_mcp_server.vector.scanner import DocumentTask
logger = logging.getLogger(__name__)
def assign_page_numbers(chunks, page_boundaries):
"""Assign page numbers to chunks based on page boundaries.
Each chunk gets the page number where most of its content appears.
For chunks spanning multiple pages, assigns the page containing the
majority of the chunk's characters.
Args:
chunks: List of ChunkWithPosition objects
page_boundaries: List of dicts with {page, start_offset, end_offset}
Returns:
None (modifies chunks in place)
"""
if not page_boundaries:
return
for chunk in chunks:
# Find which page(s) this chunk overlaps with
max_overlap = 0
assigned_page = None
for boundary in page_boundaries:
# Calculate overlap between chunk and page
overlap_start = max(chunk.start_offset, boundary["start_offset"])
overlap_end = min(chunk.end_offset, boundary["end_offset"])
overlap = max(0, overlap_end - overlap_start)
# Assign to page with maximum overlap
if overlap > max_overlap:
max_overlap = overlap
assigned_page = boundary["page"]
if assigned_page is not None:
chunk.page_number = assigned_page
async def processor_task(
worker_id: int,
receive_stream: MemoryObjectReceiveStream[DocumentTask],
@@ -218,31 +256,265 @@ async def _index_document(
settings = get_settings()
# Fetch document content
if doc_task.doc_type == "note":
document = await nc_client.notes.get_note(int(doc_task.doc_id))
content = f"{document['title']}\n\n{document['content']}"
title = document["title"]
etag = document.get("etag", "")
else:
raise ValueError(f"Unsupported doc_type: {doc_task.doc_type}")
with trace_operation(
"vector_sync.fetch_content",
attributes={
"vector_sync.doc_type": doc_task.doc_type,
"vector_sync.doc_id": doc_task.doc_id,
},
):
if doc_task.doc_type == "note":
document = await nc_client.notes.get_note(int(doc_task.doc_id))
content = f"{document['title']}\n\n{document['content']}"
title = document["title"]
etag = document.get("etag", "")
file_metadata = {} # No file-specific metadata for notes
file_path = None # Notes don't have file paths
content_bytes = None # Notes don't have binary content
content_type = None
elif doc_task.doc_type == "file":
# For files, doc_id is now the numeric file ID, file_path comes from DocumentTask
if not doc_task.file_path:
raise ValueError(
f"File path required for file indexing but not provided (file_id={doc_task.doc_id})"
)
file_path = doc_task.file_path
# Read file content via WebDAV
content_bytes, content_type = await nc_client.webdav.read_file(file_path)
else:
raise ValueError(f"Unsupported doc_type: {doc_task.doc_type}")
# Process file content (text extraction)
if doc_task.doc_type == "file":
# Type narrowing: content_bytes and content_type are set for files
assert content_bytes is not None
assert content_type is not None
assert file_path is not None
with trace_operation(
"vector_sync.document_process",
attributes={
"vector_sync.content_type": content_type,
"vector_sync.file_size": len(content_bytes),
},
):
# Use document processor registry to extract text
from nextcloud_mcp_server.document_processors import get_registry
registry = get_registry()
try:
result = await registry.process(
content=content_bytes,
content_type=content_type,
filename=file_path,
)
content = result.text
file_metadata = result.metadata
title = file_metadata.get("title") or file_path.split("/")[-1]
etag = "" # WebDAV read_file doesn't return etag
# Diagnostic: Log page boundary information if available
if "page_boundaries" in file_metadata:
page_boundaries = file_metadata["page_boundaries"]
logger.info(
f"Page boundaries for {file_path}: "
f"{len(page_boundaries)} pages, text length: {len(content)}"
)
# Log first 3 page boundaries for debugging
for boundary in page_boundaries[:3]:
logger.debug(
f" Page {boundary['page']}: "
f"offsets [{boundary['start_offset']}:{boundary['end_offset']}]"
)
# Verify last boundary matches text length
if page_boundaries:
last_boundary = page_boundaries[-1]
if last_boundary["end_offset"] != len(content):
logger.warning(
f"Text length mismatch: content={len(content)}, "
f"last_boundary_end={last_boundary['end_offset']}"
)
else:
logger.debug(f"No page_boundaries in metadata for {file_path}")
except Exception as e:
logger.error(f"Failed to process file {file_path}: {e}")
raise
# Tokenize and chunk (using configured chunk size and overlap)
chunker = DocumentChunker(
chunk_size=settings.document_chunk_size,
overlap=settings.document_chunk_overlap,
)
chunks = chunker.chunk_text(content)
with trace_operation(
"vector_sync.chunk_text",
attributes={
"vector_sync.input_chars": len(content),
"vector_sync.chunk_size": settings.document_chunk_size,
"vector_sync.overlap": settings.document_chunk_overlap,
},
):
chunker = DocumentChunker(
chunk_size=settings.document_chunk_size,
overlap=settings.document_chunk_overlap,
)
chunks = await chunker.chunk_text(content)
# Assign page numbers to chunks if page boundaries are available (PDFs)
if doc_task.doc_type == "file" and "page_boundaries" in file_metadata:
with trace_operation(
"vector_sync.assign_page_numbers",
attributes={
"vector_sync.chunk_count": len(chunks),
"vector_sync.page_count": len(file_metadata["page_boundaries"]),
},
):
assign_page_numbers(chunks, file_metadata["page_boundaries"])
# Diagnostic: Verify page number assignment
assigned_count = sum(1 for c in chunks if c.page_number is not None)
logger.info(
f"Assigned page numbers to {assigned_count}/{len(chunks)} chunks "
f"for {file_path}"
)
# Log first 3 chunks to see their page assignments
for i, chunk in enumerate(chunks[:3]):
logger.debug(
f" Chunk {i}: page={chunk.page_number}, "
f"offsets=[{chunk.start_offset}:{chunk.end_offset}]"
)
# Warning if NO page numbers were assigned
if assigned_count == 0:
logger.warning(
f"NO page numbers assigned! "
f"Text length: {len(content)}, "
f"Chunks: {len(chunks)}, "
f"Chunk offset range: [{chunks[0].start_offset}:{chunks[-1].end_offset}], "
f"Page boundaries: {len(file_metadata['page_boundaries'])} pages, "
f"First boundary: {file_metadata['page_boundaries'][0] if file_metadata['page_boundaries'] else 'None'}"
)
# Extract chunk texts for embedding
chunk_texts = [chunk.text for chunk in chunks]
# Generate dense embeddings (I/O bound - external API call)
embedding_service = get_embedding_service()
dense_embeddings = await embedding_service.embed_batch(chunk_texts)
# Initialize results containers
dense_embeddings: list = []
sparse_embeddings: list = []
chunk_images: dict[int, dict] = {}
# Generate sparse embeddings (BM25 for keyword matching)
bm25_service = get_bm25_service()
sparse_embeddings = bm25_service.encode_batch(chunk_texts)
# Determine if we need PDF highlighting
is_pdf = doc_task.doc_type == "file" and content_type == "application/pdf"
# Define async tasks for parallel execution
async def generate_dense_embeddings():
"""Generate dense embeddings (I/O bound - external API call)."""
nonlocal dense_embeddings
with trace_operation(
"vector_sync.embed_dense",
attributes={
"vector_sync.chunk_count": len(chunk_texts),
"vector_sync.total_chars": sum(len(t) for t in chunk_texts),
},
):
embedding_service = get_embedding_service()
dense_embeddings = await embedding_service.embed_batch(chunk_texts)
async def generate_sparse_embeddings():
"""Generate sparse embeddings (BM25 for keyword matching)."""
nonlocal sparse_embeddings
with trace_operation(
"vector_sync.embed_sparse",
attributes={
"vector_sync.chunk_count": len(chunk_texts),
},
):
bm25_service = get_bm25_service()
sparse_embeddings = await bm25_service.encode_batch(chunk_texts)
async def generate_highlights():
"""Generate highlighted page images for PDF chunks (CPU-bound)."""
nonlocal chunk_images
if not is_pdf:
return
# Type narrowing: content_bytes is set for PDF files
assert content_bytes is not None
with trace_operation(
"vector_sync.generate_highlights",
attributes={
"vector_sync.chunk_count": len(chunks),
"vector_sync.pdf_size": len(content_bytes),
},
):
import base64
from nextcloud_mcp_server.search.pdf_highlighter import PDFHighlighter
# Build chunk data for batch processing
# Format: (chunk_index, start_offset, end_offset, page_number, chunk_text)
chunk_data: list[tuple[int, int, int, int | None, str]] = [
(i, chunk.start_offset, chunk.end_offset, chunk.page_number, chunk.text)
for i, chunk in enumerate(chunks)
if chunk.page_number is not None
]
# Get pre-computed page boundaries from document processor
page_boundaries = file_metadata.get("page_boundaries")
if not page_boundaries:
logger.warning("No page boundaries available, skipping highlighting")
return
logger.info(
f"Batch generating highlighted page images for {len(chunk_data)} PDF chunks"
)
# Run CPU-bound highlighting in thread pool
# Pass pre-computed page boundaries and full text to avoid re-processing the PDF
batch_results = await anyio.to_thread.run_sync( # type: ignore[attr-defined]
lambda: PDFHighlighter.highlight_chunks_batch(
pdf_bytes=content_bytes,
chunks=chunk_data,
page_boundaries=page_boundaries,
full_text=content,
color="yellow",
zoom=2.0,
)
)
# Convert results to storage format
for chunk_index, (
png_bytes,
actual_page_num,
highlight_count,
) in batch_results.items():
image_base64 = base64.b64encode(png_bytes).decode("utf-8")
chunk_images[chunk_index] = {
"image": image_base64,
"page": actual_page_num,
"highlights": highlight_count,
"size": len(png_bytes),
}
logger.info(
f"Generated {len(chunk_images)}/{len(chunks)} highlighted page images "
f"(avg {sum(img['size'] for img in chunk_images.values()) // max(len(chunk_images), 1):,} bytes)"
)
# Run all embedding/highlighting operations in parallel
# - Dense embeddings: I/O bound (API call)
# - Sparse embeddings: CPU bound (local BM25)
# - Highlighting: CPU bound (PyMuPDF rendering, runs in thread pool)
with trace_operation(
"vector_sync.parallel_processing",
attributes={
"vector_sync.is_pdf": is_pdf,
"vector_sync.chunk_count": len(chunks),
},
):
async with anyio.create_task_group() as tg:
tg.start_soon(generate_dense_embeddings)
tg.start_soon(generate_sparse_embeddings)
tg.start_soon(generate_highlights)
# Prepare Qdrant points
indexed_at = int(time.time())
@@ -267,8 +539,9 @@ async def _index_document(
"user_id": doc_task.user_id,
"doc_id": doc_task.doc_id,
"doc_type": doc_task.doc_type,
"is_placeholder": False, # Real indexed document (not placeholder)
"title": title,
"excerpt": chunk.text[:200],
"excerpt": chunk.text, # Full chunk text (up to chunk_size, default 2048 chars)
"indexed_at": indexed_at,
"modified_at": doc_task.modified_at,
"etag": etag,
@@ -277,16 +550,74 @@ async def _index_document(
"chunk_start_offset": chunk.start_offset,
"chunk_end_offset": chunk.end_offset,
"metadata_version": 2, # v2 includes position metadata
# File-specific metadata (PDF, etc.)
**(
{
"file_path": file_path, # Store file path for retrieval
"mime_type": content_type, # From WebDAV response
"file_size": file_metadata.get("file_size"),
"page_number": chunk.page_number,
"page_count": file_metadata.get("page_count"),
"author": file_metadata.get("author"),
"creation_date": file_metadata.get("creation_date"),
"has_images": file_metadata.get("has_images", False),
"image_count": file_metadata.get("image_count", 0),
}
if doc_task.doc_type == "file"
else {}
),
# Highlighted page image (PDF only)
**(
{
"highlighted_page_image": chunk_images[i]["image"],
"highlighted_page_number": chunk_images[i]["page"],
"highlight_count": chunk_images[i]["highlights"],
}
if i in chunk_images
else {}
),
},
)
)
# Upsert to Qdrant
await qdrant_client.upsert(
collection_name=settings.get_collection_name(),
points=points,
wait=True,
)
# Delete placeholder before writing real vectors
# This prevents duplicates and cleans up the placeholder state
try:
await delete_placeholder_point(
doc_id=doc_task.doc_id,
doc_type=doc_task.doc_type,
user_id=doc_task.user_id,
)
except Exception as e:
# Log but don't fail indexing if placeholder deletion fails
logger.warning(
f"Failed to delete placeholder for {doc_task.doc_type}_{doc_task.doc_id}: {e}"
)
# Upsert to Qdrant in batches to avoid timeout with large payloads
# Each batch is limited to avoid WriteTimeout when sending large image payloads
BATCH_SIZE = 10 # ~2MB per batch with images
with trace_operation(
"vector_sync.qdrant_upsert",
attributes={
"vector_sync.point_count": len(points),
"vector_sync.collection": settings.get_collection_name(),
"vector_sync.images_count": len(chunk_images),
"vector_sync.batch_size": BATCH_SIZE,
},
):
for batch_start in range(0, len(points), BATCH_SIZE):
batch_end = min(batch_start + BATCH_SIZE, len(points))
batch = points[batch_start:batch_end]
await qdrant_client.upsert(
collection_name=settings.get_collection_name(),
points=batch,
wait=True,
)
if batch_end < len(points):
logger.debug(
f"Upserted batch {batch_start // BATCH_SIZE + 1}/{(len(points) + BATCH_SIZE - 1) // BATCH_SIZE}"
)
logger.info(
f"Indexed {doc_task.doc_type}_{doc_task.doc_id} for {doc_task.user_id} "
+253 -16
View File
@@ -4,6 +4,7 @@ Periodically scans enabled users' content and queues changed documents for proce
"""
import logging
import os
import time
from dataclasses import dataclass
@@ -16,6 +17,10 @@ from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.observability.metrics import record_vector_sync_scan
from nextcloud_mcp_server.observability.tracing import trace_operation
from nextcloud_mcp_server.vector.placeholder import (
query_document_metadata,
write_placeholder_point,
)
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
logger = logging.getLogger(__name__)
@@ -26,10 +31,11 @@ class DocumentTask:
"""Document task for processing queue."""
user_id: str
doc_id: str
doc_id: int | str # int for files/notes, str for legacy
doc_type: str # "note", "file", "calendar"
operation: str # "index" or "delete"
modified_at: int
file_path: str | None = None # File path for files (when doc_id is file_id)
# Track documents potentially deleted (grace period before actual deletion)
@@ -182,8 +188,9 @@ async def scan_user_documents(
f"[SCAN-{scan_id}] Using pruneBefore={prune_before} to optimize data transfer"
)
# Get indexed state from Qdrant first (for incremental sync)
indexed_docs = {}
# For deletion tracking, get all doc_ids in Qdrant (for incremental sync)
# Note: We no longer bulk-query indexed_at, instead check per-document
indexed_doc_ids = set()
if not initial_sync:
qdrant_client = await get_qdrant_client()
scroll_result = await qdrant_client.scroll(
@@ -194,17 +201,14 @@ async def scan_user_documents(
FieldCondition(key="doc_type", match=MatchValue(value="note")),
]
),
with_payload=["doc_id", "indexed_at"],
with_payload=["doc_id"],
with_vectors=False,
limit=10000,
)
indexed_docs = {
point.payload["doc_id"]: point.payload["indexed_at"]
for point in scroll_result[0]
}
indexed_doc_ids = {point.payload["doc_id"] for point in scroll_result[0]}
logger.debug(f"Found {len(indexed_docs)} indexed documents in Qdrant")
logger.debug(f"Found {len(indexed_doc_ids)} indexed documents in Qdrant")
# Stream notes from Nextcloud and process immediately
note_count = 0
@@ -218,7 +222,14 @@ async def scan_user_documents(
modified_at = note.get("modified", 0)
if initial_sync:
# Send everything on first sync
# Send everything on first sync - write placeholder first
await write_placeholder_point(
doc_id=doc_id,
doc_type="note",
user_id=user_id,
modified_at=modified_at,
etag=note.get("etag", ""),
)
await send_stream.send(
DocumentTask(
user_id=user_id,
@@ -230,9 +241,7 @@ async def scan_user_documents(
)
queued += 1
else:
# Incremental sync: compare with indexed state
indexed_at = indexed_docs.get(doc_id)
# Incremental sync: check if document exists and compare modified_at
# If document reappeared, remove from potentially_deleted
doc_key = (user_id, doc_id)
if doc_key in _potentially_deleted:
@@ -241,8 +250,48 @@ async def scan_user_documents(
)
del _potentially_deleted[doc_key]
# Query Qdrant for existing entry (placeholder or real)
existing_metadata = await query_document_metadata(
doc_id=doc_id, doc_type="note", user_id=user_id
)
# Send if never indexed or modified since last index
if indexed_at is None or modified_at > indexed_at:
# Compare against stored modified_at (not indexed_at!)
needs_indexing = False
if existing_metadata is None:
# Never seen before
needs_indexing = True
elif existing_metadata.get("modified_at", 0) < modified_at:
# Document modified since last indexing
needs_indexing = True
elif existing_metadata.get("is_placeholder", False):
# Placeholder exists - check if it's stale (processing may have failed)
# Only requeue if placeholder is older than 5x scan interval
# (Large PDFs can take 3-4 minutes to process)
queued_at = existing_metadata.get("queued_at", 0)
placeholder_age = time.time() - queued_at
stale_threshold = get_settings().vector_sync_scan_interval * 5
if placeholder_age > stale_threshold:
logger.debug(
f"Found stale placeholder for note {doc_id} "
f"(age={placeholder_age:.1f}s), requeuing"
)
needs_indexing = True
else:
logger.debug(
f"Skipping note {doc_id} with recent placeholder "
f"(age={placeholder_age:.1f}s < {stale_threshold:.1f}s)"
)
if needs_indexing:
# Write placeholder before queuing
await write_placeholder_point(
doc_id=doc_id,
doc_type="note",
user_id=user_id,
modified_at=modified_at,
etag=note.get("etag", ""),
)
await send_stream.send(
DocumentTask(
user_id=user_id,
@@ -270,7 +319,7 @@ async def scan_user_documents(
) # Allow 1.5 scan intervals
current_time = time.time()
for doc_id in indexed_docs:
for doc_id in indexed_doc_ids:
if doc_id not in nextcloud_doc_ids:
doc_key = (user_id, doc_id)
@@ -309,7 +358,195 @@ async def scan_user_documents(
)
_potentially_deleted[doc_key] = current_time
# Scan tagged PDF files (after notes)
# Get indexed file IDs from Qdrant (for deletion tracking)
indexed_file_ids = set()
if not initial_sync:
file_scroll_result = await qdrant_client.scroll(
collection_name=settings.get_collection_name(),
scroll_filter=Filter(
must=[
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
FieldCondition(key="doc_type", match=MatchValue(value="file")),
]
),
limit=10000, # Reasonable limit for file count
with_payload=["doc_id"],
with_vectors=False,
)
indexed_file_ids = {
point.payload["doc_id"] for point in file_scroll_result[0]
}
logger.debug(f"Found {len(indexed_file_ids)} indexed files in Qdrant")
# Scan for tagged PDF files
file_count = 0
file_queued = 0
nextcloud_file_ids = set()
try:
# Find files with vector-index tag using OCS Tags API
settings = get_settings()
tag_name = os.getenv("VECTOR_SYNC_PDF_TAG", "vector-index")
# Use NextcloudClient.find_files_by_tag() which uses proper OCS API
# and filters by PDF MIME type
tagged_files = await nc_client.find_files_by_tag(
tag_name, mime_type_filter="application/pdf"
)
for file_info in tagged_files:
# Files are already filtered by MIME type in find_files_by_tag()
file_count += 1
file_id = file_info["id"] # Use numeric file ID, not path
file_path = file_info["path"] # Keep path for logging
nextcloud_file_ids.add(file_id)
# Use last_modified timestamp if available, otherwise use current time
modified_at = file_info.get("last_modified_timestamp", int(time.time()))
if isinstance(file_info.get("last_modified"), str):
# Parse RFC 2822 date format if needed
from email.utils import parsedate_to_datetime
try:
dt = parsedate_to_datetime(file_info["last_modified"])
modified_at = int(dt.timestamp())
except (ValueError, KeyError):
pass
if initial_sync:
# Send everything on first sync - write placeholder first
await write_placeholder_point(
doc_id=file_id,
doc_type="file",
user_id=user_id,
modified_at=modified_at,
file_path=file_path,
)
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=file_id, # Use numeric file ID
doc_type="file",
operation="index",
modified_at=modified_at,
file_path=file_path, # Pass file path for content retrieval
)
)
file_queued += 1
else:
# Incremental sync: check if file exists and compare modified_at
# If file reappeared, remove from potentially_deleted
file_key = (user_id, file_id)
if file_key in _potentially_deleted:
logger.debug(
f"File {file_path} (ID: {file_id}) reappeared, removing from deletion grace period"
)
del _potentially_deleted[file_key]
# Query Qdrant for existing entry (placeholder or real)
existing_metadata = await query_document_metadata(
doc_id=file_id, doc_type="file", user_id=user_id
)
# Send if never indexed or modified since last index
# Compare against stored modified_at (not indexed_at!)
needs_indexing = False
if existing_metadata is None:
# Never seen before
needs_indexing = True
elif existing_metadata.get("modified_at", 0) < modified_at:
# File modified since last indexing
needs_indexing = True
elif existing_metadata.get("is_placeholder", False):
# Placeholder exists - check if it's stale (processing may have failed)
# Only requeue if placeholder is older than 5x scan interval
# (Large PDFs can take 3-4 minutes to process)
queued_at = existing_metadata.get("queued_at", 0)
placeholder_age = time.time() - queued_at
stale_threshold = get_settings().vector_sync_scan_interval * 5
if placeholder_age > stale_threshold:
logger.debug(
f"Found stale placeholder for file {file_path} (ID: {file_id}) "
f"(age={placeholder_age:.1f}s), requeuing"
)
needs_indexing = True
else:
logger.debug(
f"Skipping file {file_path} (ID: {file_id}) with recent placeholder "
f"(age={placeholder_age:.1f}s < {stale_threshold:.1f}s)"
)
if needs_indexing:
# Write placeholder before queuing
await write_placeholder_point(
doc_id=file_id,
doc_type="file",
user_id=user_id,
modified_at=modified_at,
file_path=file_path,
)
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=file_id, # Use numeric file ID
doc_type="file",
operation="index",
modified_at=modified_at,
file_path=file_path, # Pass file path for content retrieval
)
)
file_queued += 1
logger.info(
f"[SCAN-{scan_id}] Found {file_count} tagged PDFs for {user_id}"
)
record_vector_sync_scan(file_count)
# Check for deleted files (not initial sync)
if not initial_sync:
for file_id in indexed_file_ids:
if file_id not in nextcloud_file_ids:
file_key = (user_id, file_id)
if file_key in _potentially_deleted:
# Check if grace period elapsed
first_missing_time = _potentially_deleted[file_key]
time_missing = current_time - first_missing_time
if time_missing >= grace_period:
# Grace period elapsed, send for deletion
logger.info(
f"File ID {file_id} missing for {time_missing:.1f}s "
f"(>{grace_period:.1f}s grace period), sending deletion"
)
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=file_id, # Use numeric file ID
doc_type="file",
operation="delete",
modified_at=0,
)
)
file_queued += 1
del _potentially_deleted[file_key]
else:
# First time missing, add to grace period tracking
logger.debug(
f"File ID {file_id} missing for first time, starting grace period"
)
_potentially_deleted[file_key] = current_time
except Exception as e:
logger.warning(f"Failed to scan tagged files for {user_id}: {e}")
queued += file_queued
if queued > 0:
logger.info(f"Sent {queued} documents for incremental sync: {user_id}")
logger.info(
f"Sent {queued} documents ({file_queued} files) for incremental sync: {user_id}"
)
else:
logger.debug(f"No changes detected for {user_id}")
+6 -2
View File
@@ -1,6 +1,6 @@
[project]
name = "nextcloud-mcp-server"
version = "0.44.0"
version = "0.46.2"
description = "Model Context Protocol (MCP) server for Nextcloud integration - enables AI assistants to interact with Nextcloud data"
authors = [
{name = "Chris Coutinho", email = "chris@coutinho.io"}
@@ -10,7 +10,7 @@ license = {text = "AGPL-3.0-only"}
requires-python = ">=3.11"
keywords = ["nextcloud", "mcp", "model-context-protocol", "llm", "ai", "claude", "webdav", "caldav", "carddav"]
dependencies = [
"mcp[cli] (>=1.21,<1.22)",
"mcp[cli] (>=1.22,<1.23)",
"httpx (>=0.28.1,<0.29.0)",
"pillow (>=10.3.0,<12.0.0)", # Compatible with fastembed
"icalendar (>=6.0.0,<7.0.0)",
@@ -36,6 +36,9 @@ dependencies = [
"python-json-logger>=3.2.0", # Structured JSON logging
"jinja2>=3.1.6",
"langchain-text-splitters>=1.0.0",
"pymupdf>=1.26.6",
"pymupdf4llm>=0.2.2",
"pymupdf-layout>=1.26.6",
]
classifiers = [
"Development Status :: 4 - Beta",
@@ -123,6 +126,7 @@ dev = [
[project.scripts]
nextcloud-mcp-server = "nextcloud_mcp_server.cli:run"
smithery-main = "nextcloud_mcp_server.smithery_main:main"
[[tool.uv.index]]
name = "testpypi"
+38
View File
@@ -0,0 +1,38 @@
# Smithery configuration for Nextcloud MCP Server
# See: https://smithery.ai/docs/build/configuration
# ADR-016: Stateless deployment mode for multi-user public Nextcloud instances
runtime: "container"
build:
dockerfile: "Dockerfile.smithery"
dockerBuildPath: "."
startCommand:
type: "http"
configSchema:
type: "object"
required:
- "nextcloud_url"
- "username"
- "app_password"
properties:
nextcloud_url:
type: "string"
title: "Nextcloud URL"
description: "Your Nextcloud instance URL (e.g., https://cloud.example.com). Must be publicly accessible."
pattern: "^https?://.+"
username:
type: "string"
title: "Username"
description: "Your Nextcloud username"
minLength: 1
app_password:
type: "string"
title: "App Password"
description: "Nextcloud app password. Generate at Settings > Security > App passwords. Do NOT use your main password."
minLength: 1
exampleConfig:
nextcloud_url: "https://cloud.example.com"
username: "alice"
app_password: "xxxxx-xxxxx-xxxxx-xxxxx-xxxxx"
+361
View File
@@ -0,0 +1,361 @@
"""Integration tests for PDF document indexing and semantic search.
These tests validate the complete PDF processing flow:
1. Process PDF with PyMuPDFProcessor
2. Chunk extracted text with page numbers
3. Index chunks into Qdrant with metadata
4. Perform semantic search on PDF content
5. Verify page numbers and metadata are preserved
"""
import pymupdf
import pytest
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import Distance, PointStruct, VectorParams
from nextcloud_mcp_server.document_processors.pymupdf import PyMuPDFProcessor
from nextcloud_mcp_server.embedding import SimpleEmbeddingProvider
from nextcloud_mcp_server.vector.document_chunker import (
ChunkWithPosition,
RecursiveCharacterTextSplitter,
)
pytestmark = pytest.mark.integration
def create_test_pdf() -> bytes:
"""Create a small test PDF with multiple pages."""
doc = pymupdf.open()
# Page 1: Introduction
page1 = doc.new_page(width=595, height=842) # A4 size
page1.insert_text(
(50, 50),
"Nextcloud Administration Guide\n\n"
"Chapter 1: Introduction\n\n"
"Nextcloud is a self-hosted file sharing and collaboration platform. "
"It provides secure file storage, sharing, and synchronization across devices. "
"This guide covers installation, configuration, and maintenance of Nextcloud.",
)
# Page 2: Installation
page2 = doc.new_page(width=595, height=842)
page2.insert_text(
(50, 50),
"Chapter 2: Installation\n\n"
"System Requirements:\n"
"- PHP 8.0 or higher\n"
"- MySQL 8.0 or MariaDB 10.5\n"
"- Apache or Nginx web server\n\n"
"Installation steps:\n"
"1. Download Nextcloud package\n"
"2. Extract to web server directory\n"
"3. Configure database connection\n"
"4. Run installation wizard",
)
# Page 3: Configuration
page3 = doc.new_page(width=595, height=842)
page3.insert_text(
(50, 50),
"Chapter 3: Configuration\n\n"
"Database Configuration:\n"
"Edit config/config.php to set database parameters. "
"Configure database host, username, password, and database name. "
"For optimal performance, use MySQL or MariaDB.\n\n"
"Security Settings:\n"
"Enable HTTPS, configure trusted domains, and set up firewall rules.",
)
# Convert to bytes
pdf_bytes = doc.tobytes()
doc.close()
return pdf_bytes
@pytest.fixture
async def simple_embedding_provider():
"""Simple in-process embedding provider for testing."""
return SimpleEmbeddingProvider(dimension=384)
@pytest.fixture
async def qdrant_test_client():
"""Qdrant client for testing (in-memory)."""
client = AsyncQdrantClient(":memory:")
yield client
await client.close()
@pytest.fixture
async def test_collection(qdrant_test_client: AsyncQdrantClient):
"""Create test collection in Qdrant."""
collection_name = "test_pdf_indexing"
# Create collection
await qdrant_test_client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
)
yield collection_name
# Cleanup
try:
await qdrant_test_client.delete_collection(collection_name)
except Exception:
pass
@pytest.fixture
def pymupdf_processor():
"""PyMuPDF processor for testing (without image extraction)."""
return PyMuPDFProcessor(extract_images=False)
async def test_pymupdf_processor_extracts_text_and_metadata(pymupdf_processor):
"""Test PyMuPDF processor extracts text and metadata from PDF."""
pdf_bytes = create_test_pdf()
result = await pymupdf_processor.process(
content=pdf_bytes,
content_type="application/pdf",
filename="test-admin-guide.pdf",
)
# Verify result structure
assert result.success is True
assert result.processor == "pymupdf"
assert result.text is not None
assert len(result.text) > 0
# Verify extracted text contains expected content
assert "Nextcloud Administration Guide" in result.text
assert "Chapter 1: Introduction" in result.text
assert "Chapter 2: Installation" in result.text
assert "Chapter 3: Configuration" in result.text
assert "PHP 8.0 or higher" in result.text
assert "MySQL" in result.text
# Verify metadata
assert result.metadata is not None
assert result.metadata["page_count"] == 3
assert result.metadata["filename"] == "test-admin-guide.pdf"
assert "format" in result.metadata
async def test_document_chunker_preserves_page_numbers():
"""Test that document chunker can handle chunks with page number metadata."""
# Create chunks with page numbers
chunks = [
ChunkWithPosition(
text="Chapter 1 content on page 1",
start_offset=0,
end_offset=28,
page_number=1,
),
ChunkWithPosition(
text="Chapter 2 content on page 2",
start_offset=29,
end_offset=57,
page_number=2,
),
ChunkWithPosition(
text="Chapter 3 content on page 3",
start_offset=58,
end_offset=86,
page_number=3,
),
]
# Verify page numbers are preserved
assert chunks[0].page_number == 1
assert chunks[1].page_number == 2
assert chunks[2].page_number == 3
async def test_pdf_indexing_and_search_flow(
pymupdf_processor: PyMuPDFProcessor,
qdrant_test_client: AsyncQdrantClient,
test_collection: str,
simple_embedding_provider: SimpleEmbeddingProvider,
):
"""Test complete PDF indexing and semantic search flow."""
# Step 1: Process PDF with PyMuPDF
pdf_bytes = create_test_pdf()
result = await pymupdf_processor.process(
content=pdf_bytes,
content_type="application/pdf",
filename="/Documents/admin-guide.pdf",
)
assert result.success is True
assert result.metadata["page_count"] == 3
# Step 2: Chunk the extracted text
# Note: In real implementation, we'd track which chunk came from which page
# For this test, we'll simulate by creating chunks manually
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
chunks = splitter.split_text(result.text)
assert len(chunks) > 0
# Step 3: Index chunks into Qdrant with PDF metadata
points = []
for idx, chunk_text in enumerate(chunks):
embedding = await simple_embedding_provider.embed(chunk_text)
# Simulate page number assignment (in real implementation, this would be tracked)
# For simplicity, assign page based on content
page_number = 1
if "Chapter 2" in chunk_text or "Installation" in chunk_text:
page_number = 2
elif "Chapter 3" in chunk_text or "Configuration" in chunk_text:
page_number = 3
points.append(
PointStruct(
id=idx,
vector=embedding,
payload={
"user_id": "admin",
"doc_id": "/Documents/admin-guide.pdf",
"doc_type": "file",
"title": "Nextcloud Administration Guide",
"file_path": "/Documents/admin-guide.pdf",
"mime_type": "application/pdf",
"page_number": page_number,
"page_count": result.metadata["page_count"],
"chunk_index": idx,
"excerpt": chunk_text[:200],
},
)
)
await qdrant_test_client.upsert(
collection_name=test_collection, points=points, wait=True
)
# Step 4: Perform semantic search for installation instructions
query = "how to install Nextcloud system requirements"
query_embedding = await simple_embedding_provider.embed(query)
response = await qdrant_test_client.query_points(
collection_name=test_collection,
query=query_embedding,
limit=3,
score_threshold=0.0,
)
# Verify search results
assert len(response.points) > 0
# Top result should be from installation chapter (page 2)
top_result = response.points[0]
assert top_result.payload["doc_type"] == "file"
assert top_result.payload["file_path"] == "/Documents/admin-guide.pdf"
assert (
"Installation" in top_result.payload["excerpt"]
or top_result.payload["page_number"] == 2
)
# Verify page number is preserved
assert top_result.payload["page_number"] in [1, 2, 3]
assert top_result.payload["page_count"] == 3
# Step 5: Search for configuration
query = "database configuration settings MySQL"
query_embedding = await simple_embedding_provider.embed(query)
response = await qdrant_test_client.query_points(
collection_name=test_collection,
query=query_embedding,
limit=3,
score_threshold=0.0,
)
assert len(response.points) > 0
# Should find configuration chapter (page 3)
found_config = any(
"Configuration" in r.payload["excerpt"] or r.payload["page_number"] == 3
for r in response.points[:2]
)
assert found_config
async def test_pdf_search_with_filters(
pymupdf_processor: PyMuPDFProcessor,
qdrant_test_client: AsyncQdrantClient,
test_collection: str,
simple_embedding_provider: SimpleEmbeddingProvider,
):
"""Test PDF search with metadata filters."""
from qdrant_client.models import FieldCondition, Filter, MatchValue
# Process and index PDF
pdf_bytes = create_test_pdf()
result = await pymupdf_processor.process(
content=pdf_bytes,
content_type="application/pdf",
filename="/Documents/admin-guide.pdf",
)
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
chunks = splitter.split_text(result.text)
# Index with metadata
points = []
for idx, chunk_text in enumerate(chunks):
embedding = await simple_embedding_provider.embed(chunk_text)
points.append(
PointStruct(
id=idx,
vector=embedding,
payload={
"user_id": "admin",
"doc_id": "/Documents/admin-guide.pdf",
"doc_type": "file",
"mime_type": "application/pdf",
"excerpt": chunk_text[:200],
},
)
)
await qdrant_test_client.upsert(
collection_name=test_collection, points=points, wait=True
)
# Search with filter for PDFs only
query = "Nextcloud installation"
query_embedding = await simple_embedding_provider.embed(query)
response = await qdrant_test_client.query_points(
collection_name=test_collection,
query=query_embedding,
query_filter=Filter(
must=[FieldCondition(key="doc_type", match=MatchValue(value="file"))]
),
limit=3,
)
# All results should be from file documents
assert len(response.points) > 0
for result in response.points:
assert result.payload["doc_type"] == "file"
assert result.payload["mime_type"] == "application/pdf"
async def test_pymupdf_health_check(pymupdf_processor: PyMuPDFProcessor):
"""Test PyMuPDF processor health check."""
is_healthy = await pymupdf_processor.health_check()
assert is_healthy is True
async def test_pymupdf_supports_pdf_mime_type(pymupdf_processor: PyMuPDFProcessor):
"""Test PyMuPDF processor declares PDF support."""
assert "application/pdf" in pymupdf_processor.supported_mime_types
assert pymupdf_processor.name == "pymupdf"
+119
View File
@@ -0,0 +1,119 @@
"""Unit tests for WebDAV client."""
from unittest.mock import AsyncMock
import pytest
from nextcloud_mcp_server.client.webdav import WebDAVClient
@pytest.mark.unit
async def test_find_by_tag_calls_search_files(mocker):
"""Test that find_by_tag constructs correct search query."""
# Create mock HTTP client
mock_http_client = AsyncMock()
# Create WebDAVClient instance
client = WebDAVClient(mock_http_client, "testuser")
# Mock the search_files method to avoid actual HTTP calls
mock_search_files = mocker.patch.object(client, "search_files", return_value=[])
# Call find_by_tag
await client.find_by_tag("vector-index")
# Verify search_files was called with correct parameters
mock_search_files.assert_called_once()
call_args = mock_search_files.call_args
# Check that the where_conditions contains the tag name
assert "vector-index" in call_args.kwargs["where_conditions"]
assert "<oc:tags/>" in call_args.kwargs["where_conditions"]
assert "<d:like>" in call_args.kwargs["where_conditions"]
# Check that tags property is requested
assert "tags" in call_args.kwargs["properties"]
@pytest.mark.unit
async def test_find_by_tag_with_scope_and_limit(mocker):
"""Test find_by_tag passes scope and limit parameters."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
mock_search_files = mocker.patch.object(client, "search_files", return_value=[])
# Call with scope and limit
await client.find_by_tag("test-tag", scope="Documents", limit=10)
# Verify parameters were passed through
call_args = mock_search_files.call_args
assert call_args.kwargs["scope"] == "Documents"
assert call_args.kwargs["limit"] == 10
@pytest.mark.unit
def test_parse_search_response_with_tags(mocker):
"""Test that _parse_search_response correctly parses tags."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock XML response with tags (comma-separated format)
xml_content = b"""<?xml version="1.0"?>
<d:multistatus xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
<d:response>
<d:href>/remote.php/dav/files/testuser/Documents/test.pdf</d:href>
<d:propstat>
<d:prop>
<d:displayname>test.pdf</d:displayname>
<d:getcontenttype>application/pdf</d:getcontenttype>
<d:getcontentlength>1024</d:getcontentlength>
<d:getetag>"abc123"</d:getetag>
<oc:fileid>12345</oc:fileid>
<oc:tags>vector-index,important</oc:tags>
<d:resourcetype/>
</d:prop>
</d:propstat>
</d:response>
</d:multistatus>"""
# Parse the response
results = client._parse_search_response(xml_content, scope="Documents")
# Verify tags were parsed correctly
assert len(results) == 1
assert "tags" in results[0]
assert results[0]["tags"] == ["vector-index", "important"]
assert results[0]["name"] == "test.pdf"
assert results[0]["content_type"] == "application/pdf"
@pytest.mark.unit
def test_parse_search_response_with_empty_tags(mocker):
"""Test that _parse_search_response handles files without tags."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock XML response without tags
xml_content = b"""<?xml version="1.0"?>
<d:multistatus xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
<d:response>
<d:href>/remote.php/dav/files/testuser/Documents/test.txt</d:href>
<d:propstat>
<d:prop>
<d:displayname>test.txt</d:displayname>
<d:getcontenttype>text/plain</d:getcontenttype>
<oc:tags/>
<d:resourcetype/>
</d:prop>
</d:propstat>
</d:response>
</d:multistatus>"""
# Parse the response
results = client._parse_search_response(xml_content, scope="Documents")
# Verify tags field is empty list
assert len(results) == 1
assert "tags" in results[0]
assert results[0]["tags"] == []
+24 -24
View File
@@ -9,12 +9,12 @@ from nextcloud_mcp_server.vector.document_chunker import (
class TestDocumentChunkerPositions:
"""Test suite for DocumentChunker position tracking functionality."""
def test_single_chunk_simple_text(self):
async def test_single_chunk_simple_text(self):
"""Test that single-chunk documents return correct positions."""
chunker = DocumentChunker(chunk_size=2048, overlap=200)
content = "This is a short document."
chunks = chunker.chunk_text(content)
chunks = await chunker.chunk_text(content)
assert len(chunks) == 1
assert isinstance(chunks[0], ChunkWithPosition)
@@ -22,7 +22,7 @@ class TestDocumentChunkerPositions:
assert chunks[0].start_offset == 0
assert chunks[0].end_offset == len(content)
def test_multiple_chunks_positions(self):
async def test_multiple_chunks_positions(self):
"""Test that multi-chunk documents have correct positions."""
# Use small chunk size to force multiple chunks
chunker = DocumentChunker(chunk_size=50, overlap=10)
@@ -34,7 +34,7 @@ class TestDocumentChunkerPositions:
"This is the fourth sentence adding more context."
)
chunks = chunker.chunk_text(content)
chunks = await chunker.chunk_text(content)
# Verify we got multiple chunks
assert len(chunks) > 1
@@ -61,12 +61,12 @@ class TestDocumentChunkerPositions:
extracted = content[chunk.start_offset : chunk.end_offset]
assert extracted == chunk.text
def test_chunk_positions_with_whitespace(self):
async def test_chunk_positions_with_whitespace(self):
"""Test position tracking with various whitespace."""
chunker = DocumentChunker(chunk_size=30, overlap=5)
content = "First sentence here. Second sentence.\n\nThird sentence.\tFourth sentence."
chunks = chunker.chunk_text(content)
chunks = await chunker.chunk_text(content)
# Verify positions correctly handle whitespace
for chunk in chunks:
@@ -75,19 +75,19 @@ class TestDocumentChunkerPositions:
# LangChain strips whitespace by default
assert len(chunk.text.strip()) > 0
def test_empty_content(self):
async def test_empty_content(self):
"""Test that empty content returns empty chunk."""
chunker = DocumentChunker(chunk_size=2048, overlap=200)
content = ""
chunks = chunker.chunk_text(content)
chunks = await chunker.chunk_text(content)
assert len(chunks) == 1
assert chunks[0].text == ""
assert chunks[0].start_offset == 0
assert chunks[0].end_offset == 0
def test_chunk_overlap_positions(self):
async def test_chunk_overlap_positions(self):
"""Test that overlapping chunks have correct positions."""
chunker = DocumentChunker(chunk_size=50, overlap=15)
content = (
@@ -97,7 +97,7 @@ class TestDocumentChunkerPositions:
"This is sentence four adding details."
)
chunks = chunker.chunk_text(content)
chunks = await chunker.chunk_text(content)
# Verify overlap exists if we have multiple chunks
if len(chunks) > 1:
@@ -112,14 +112,14 @@ class TestDocumentChunkerPositions:
# With overlap, next chunk may start before current ends
assert next_chunk.start_offset <= current_chunk.end_offset
def test_unicode_content_positions(self):
async def test_unicode_content_positions(self):
"""Test position tracking with Unicode characters."""
chunker = DocumentChunker(chunk_size=50, overlap=10)
content = (
"Hello 世界. こんにちは there. мир Привет world. שלום مرحبا 你好 friend."
)
chunks = chunker.chunk_text(content)
chunks = await chunker.chunk_text(content)
# Verify all chunks extract correctly
for chunk in chunks:
@@ -131,7 +131,7 @@ class TestDocumentChunkerPositions:
assert chunks[0].start_offset == 0
assert chunks[0].end_offset == len(content)
def test_realistic_note_content(self):
async def test_realistic_note_content(self):
"""Test with realistic note content similar to Nextcloud Notes."""
chunker = DocumentChunker(chunk_size=200, overlap=50)
content = """My Project Notes
@@ -152,7 +152,7 @@ position tracking for each chunk.
This allows us to highlight the exact chunk that matched a search query,
which builds trust in the RAG system."""
chunks = chunker.chunk_text(content)
chunks = await chunker.chunk_text(content)
# Should have multiple chunks
assert len(chunks) > 1
@@ -168,7 +168,7 @@ which builds trust in the RAG system."""
assert chunk.end_offset <= len(content)
assert chunk.start_offset < chunk.end_offset
def test_semantic_boundary_preservation(self):
async def test_semantic_boundary_preservation(self):
"""Test that LangChain creates semantically coherent chunks."""
chunker = DocumentChunker(chunk_size=100, overlap=20)
content = (
@@ -178,7 +178,7 @@ which builds trust in the RAG system."""
"Fourth sentence ends."
)
chunks = chunker.chunk_text(content)
chunks = await chunker.chunk_text(content)
# Verify all chunks are extractable using their positions
for chunk in chunks:
@@ -193,7 +193,7 @@ which builds trust in the RAG system."""
assert chunk.end_offset <= len(content)
assert chunk.start_offset < chunk.end_offset
def test_paragraph_boundary_preservation(self):
async def test_paragraph_boundary_preservation(self):
"""Test that LangChain preserves paragraph boundaries."""
chunker = DocumentChunker(chunk_size=80, overlap=15)
content = """First paragraph here.
@@ -204,7 +204,7 @@ Third paragraph here.
Fourth paragraph here."""
chunks = chunker.chunk_text(content)
chunks = await chunker.chunk_text(content)
# LangChain should prefer splitting at paragraph boundaries (\n\n)
# Verify we got multiple chunks
@@ -215,7 +215,7 @@ Fourth paragraph here."""
extracted = content[chunk.start_offset : chunk.end_offset]
assert extracted == chunk.text
def test_default_parameters(self):
async def test_default_parameters(self):
"""Test that default parameters work correctly."""
chunker = DocumentChunker() # Use defaults: 2048 chars, 200 overlap
@@ -224,14 +224,14 @@ Fourth paragraph here."""
"This is a short note with a few sentences. It should fit in one chunk."
)
chunks = chunker.chunk_text(content)
chunks = await chunker.chunk_text(content)
assert len(chunks) == 1
assert chunks[0].text == content
assert chunks[0].start_offset == 0
assert chunks[0].end_offset == len(content)
def test_large_document_chunking(self):
async def test_large_document_chunking(self):
"""Test chunking of a large document."""
chunker = DocumentChunker(chunk_size=100, overlap=20)
@@ -244,7 +244,7 @@ Fourth paragraph here."""
]
content = "\n\n".join(paragraphs)
chunks = chunker.chunk_text(content)
chunks = await chunker.chunk_text(content)
# Should create multiple chunks
assert len(chunks) > 1
@@ -261,12 +261,12 @@ Fourth paragraph here."""
assert chunks[0].start_offset == 0
assert chunks[-1].end_offset == len(content)
def test_position_tracking_with_overlap(self):
async def test_position_tracking_with_overlap(self):
"""Test that position tracking works correctly with overlap."""
chunker = DocumentChunker(chunk_size=50, overlap=15)
content = "A" * 25 + ". " + "B" * 25 + ". " + "C" * 25 + ". " + "D" * 25 + "."
chunks = chunker.chunk_text(content)
chunks = await chunker.chunk_text(content)
if len(chunks) > 1:
# Verify overlap creates correct positions
+41
View File
@@ -0,0 +1,41 @@
import logging
import pathlib
import anyio
import pymupdf
import pymupdf.layout
from nextcloud_mcp_server.client import NextcloudClient
pymupdf.layout.activate()
import pymupdf4llm # noqa: E402
client = NextcloudClient.from_env()
logger = logging.getLogger(__name__)
TMP_DIR = pathlib.Path("/tmp/tmp-images")
TMP_DIR.mkdir(exist_ok=True, parents=True)
async def print_markdown(filename):
content, _ = await client.webdav.read_file(filename)
doc = pymupdf.open("pdf", content)
md_text = pymupdf4llm.to_markdown(doc, write_images=True, image_path=str(TMP_DIR))
print(md_text)
async def run1():
response = await client.webdav.find_by_type("application/pdf")
# print(response)
for file in response:
await print_markdown(file["path"])
async def run():
tags = await client.tags.get_all_tags()
print(tags)
if __name__ == "__main__":
logging.basicConfig(level="INFO")
anyio.run(run)
Generated
+75 -5
View File
@@ -1645,7 +1645,7 @@ wheels = [
[[package]]
name = "mcp"
version = "1.21.1"
version = "1.22.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
@@ -1663,9 +1663,9 @@ dependencies = [
{ name = "typing-inspection" },
{ name = "uvicorn", marker = "sys_platform != 'emscripten'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f7/25/4df633e7574254ada574822db2245bbee424725d1b01bccae10bf128794e/mcp-1.21.1.tar.gz", hash = "sha256:540e6ac4b12b085c43f14879fde04cbdb10148a09ea9492ff82d8c7ba651a302", size = 469071, upload-time = "2025-11-13T20:33:46.139Z" }
sdist = { url = "https://files.pythonhosted.org/packages/a3/a2/c5ec0ab38b35ade2ae49a90fada718fbc76811dc5aa1760414c6aaa6b08a/mcp-1.22.0.tar.gz", hash = "sha256:769b9ac90ed42134375b19e777a2858ca300f95f2e800982b3e2be62dfc0ba01", size = 471788, upload-time = "2025-11-20T20:11:28.095Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/49/af/01fb42df59ad15925ffc1e2e609adafddd3ac4572f606faae0dc8b55ba0c/mcp-1.21.1-py3-none-any.whl", hash = "sha256:dd35abe36d68530a8a1291daa25d50276d8731e545c0434d6e250a3700dd2a6d", size = 174852, upload-time = "2025-11-13T20:33:44.502Z" },
{ url = "https://files.pythonhosted.org/packages/a9/bb/711099f9c6bb52770f56e56401cdfb10da5b67029f701e0df29362df4c8e/mcp-1.22.0-py3-none-any.whl", hash = "sha256:bed758e24df1ed6846989c909ba4e3df339a27b4f30f1b8b627862a4bade4e98", size = 175489, upload-time = "2025-11-20T20:11:26.542Z" },
]
[package.optional-dependencies]
@@ -1925,9 +1925,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/6c/28/dd72947e59a6a8c856448a5e74da6201cb5502ddff644fbc790e4bd40b9a/multiprocess-0.70.18-py39-none-any.whl", hash = "sha256:e78ca805a72b1b810c690b6b4cc32579eba34f403094bbbae962b7b5bf9dfcb8", size = 133478, upload-time = "2025-04-17T03:11:26.253Z" },
]
[[package]]
name = "networkx"
version = "3.5"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/6c/4f/ccdb8ad3a38e583f214547fd2f7ff1fc160c43a75af88e6aec213404b96a/networkx-3.5.tar.gz", hash = "sha256:d4c6f9cf81f52d69230866796b82afbccdec3db7ae4fbd1b65ea750feed50037", size = 2471065, upload-time = "2025-05-29T11:35:07.804Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/eb/8d/776adee7bbf76365fdd7f2552710282c79a4ead5d2a46408c9043a2b70ba/networkx-3.5-py3-none-any.whl", hash = "sha256:0030d386a9a06dee3565298b4a734b68589749a544acbb6c412dc9e2489ec6ec", size = 2034406, upload-time = "2025-05-29T11:35:04.961Z" },
]
[[package]]
name = "nextcloud-mcp-server"
version = "0.44.0"
version = "0.46.2"
source = { editable = "." }
dependencies = [
{ name = "aiosqlite" },
@@ -1952,6 +1961,9 @@ dependencies = [
{ name = "prometheus-client" },
{ name = "pydantic" },
{ name = "pyjwt", extra = ["crypto"] },
{ name = "pymupdf" },
{ name = "pymupdf-layout" },
{ name = "pymupdf4llm" },
{ name = "python-json-logger" },
{ name = "pythonvcard4" },
{ name = "qdrant-client" },
@@ -1986,7 +1998,7 @@ requires-dist = [
{ name = "icalendar", specifier = ">=6.0.0,<7.0.0" },
{ name = "jinja2", specifier = ">=3.1.6" },
{ name = "langchain-text-splitters", specifier = ">=1.0.0" },
{ name = "mcp", extras = ["cli"], specifier = ">=1.21,<1.22" },
{ name = "mcp", extras = ["cli"], specifier = ">=1.22,<1.23" },
{ name = "opentelemetry-api", specifier = ">=1.28.2" },
{ name = "opentelemetry-exporter-otlp-proto-grpc", specifier = ">=1.28.2" },
{ name = "opentelemetry-instrumentation-asgi", specifier = ">=0.49b2" },
@@ -1997,6 +2009,9 @@ requires-dist = [
{ name = "prometheus-client", specifier = ">=0.21.0" },
{ name = "pydantic", specifier = ">=2.11.4" },
{ name = "pyjwt", extras = ["crypto"], specifier = ">=2.8.0" },
{ name = "pymupdf", specifier = ">=1.26.6" },
{ name = "pymupdf-layout", specifier = ">=1.26.6" },
{ name = "pymupdf4llm", specifier = ">=0.2.2" },
{ name = "python-json-logger", specifier = ">=3.2.0" },
{ name = "pythonvcard4", specifier = ">=0.2.0" },
{ name = "qdrant-client", specifier = ">=1.7.0" },
@@ -2969,6 +2984,52 @@ crypto = [
{ name = "cryptography" },
]
[[package]]
name = "pymupdf"
version = "1.26.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ec/d7/a6f0e03a117fa2ad79c4b898203bb212b17804f92558a6a339298faca7bb/pymupdf-1.26.6.tar.gz", hash = "sha256:a2b4531cd4ab36d6f1f794bb6d3c33b49bda22f36d58bb1f3e81cbc10183bd2b", size = 84322494, upload-time = "2025-11-05T15:20:46.786Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9e/5c/dec354eee5fe4966c715f33818ed4193e0e6c986cf8484de35b6c167fb8e/pymupdf-1.26.6-cp310-abi3-macosx_10_9_x86_64.whl", hash = "sha256:e46f320a136ad55e5219e8f0f4061bdf3e4c12b126d2740d5a49f73fae7ea176", size = 23178988, upload-time = "2025-11-05T14:31:19.834Z" },
{ url = "https://files.pythonhosted.org/packages/ec/a0/11adb742d18142bd623556cd3b5d64649816decc5eafd30efc9498657e76/pymupdf-1.26.6-cp310-abi3-macosx_11_0_arm64.whl", hash = "sha256:6844cd2396553c0fa06de4869d5d5ecb1260e6fc3b9d85abe8fa35f14dd9d688", size = 22469764, upload-time = "2025-11-05T14:32:34.654Z" },
{ url = "https://files.pythonhosted.org/packages/e4/c8/377cf20e31f58d4c243bfcf2d3cb7466d5b97003b10b9f1161f11eb4a994/pymupdf-1.26.6-cp310-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:617ba69e02c44f0da1c0e039ea4a26cf630849fd570e169c71daeb8ac52a81d6", size = 23502227, upload-time = "2025-11-06T11:03:56.934Z" },
{ url = "https://files.pythonhosted.org/packages/4f/bf/6e02e3d84b32c137c71a0a3dcdba8f2f6e9950619a3bc272245c7c06a051/pymupdf-1.26.6-cp310-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:7777d0b7124c2ebc94849536b6a1fb85d158df3b9d873935e63036559391534c", size = 24115381, upload-time = "2025-11-05T14:33:54.338Z" },
{ url = "https://files.pythonhosted.org/packages/ab/9d/30f7fcb3776bfedde66c06297960debe4883b1667294a1ee9426c942e94d/pymupdf-1.26.6-cp310-abi3-win32.whl", hash = "sha256:8f3ef05befc90ca6bb0f12983200a7048d5bff3e1c1edef1bb3de60b32cb5274", size = 17203613, upload-time = "2025-11-05T17:19:47.494Z" },
{ url = "https://files.pythonhosted.org/packages/f9/e8/989f4eaa369c7166dc24f0eaa3023f13788c40ff1b96701f7047421554a8/pymupdf-1.26.6-cp310-abi3-win_amd64.whl", hash = "sha256:ce02ca96ed0d1acfd00331a4d41a34c98584d034155b06fd4ec0f051718de7ba", size = 18405680, upload-time = "2025-11-05T14:34:48.672Z" },
]
[[package]]
name = "pymupdf-layout"
version = "1.26.6"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "networkx" },
{ name = "numpy" },
{ name = "onnxruntime" },
{ name = "pymupdf" },
{ name = "pyyaml" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/70/86/31f8d05b36ebf43cca88d5c6415de46eb748e487b618a589671a610be8c8/pymupdf_layout-1.26.6-cp310-abi3-macosx_10_9_x86_64.whl", hash = "sha256:d632f83208db8b24600eb8ac54d3135fab6ab1f251a38fa6061e7470e81b9481", size = 12727222, upload-time = "2025-11-05T14:35:44.367Z" },
{ url = "https://files.pythonhosted.org/packages/ff/d3/0e52d7d1e2f975843f5354ac3b210a98471b690105efc332d3c285bd794b/pymupdf_layout-1.26.6-cp310-abi3-macosx_11_0_arm64.whl", hash = "sha256:f1d45f72ec08ef7f644928487e7a067df6df63172d682d0bb05158896d0d9c71", size = 12725266, upload-time = "2025-11-05T14:36:50.727Z" },
{ url = "https://files.pythonhosted.org/packages/ae/49/ad1a5edccc45477493d6a53a41df7620d6147febb897c3dd8354f413e154/pymupdf_layout-1.26.6-cp310-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:0561b9485a6ac1a40bb1e2ec7a1648aa64e4be56dab2f39182b11a69e3e43024", size = 12732580, upload-time = "2025-11-06T11:04:09.065Z" },
{ url = "https://files.pythonhosted.org/packages/a7/bd/3e049b359dd0c3a101ae915484b87ff73bfdedfb24a924e0a8e6783b33f3/pymupdf_layout-1.26.6-cp310-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:ee8e2bfed12d4b6421b27a1f89837ac09d8bc3f783f79670db397ec24614bf3d", size = 12732539, upload-time = "2025-11-05T14:38:01.244Z" },
{ url = "https://files.pythonhosted.org/packages/f8/7a/69078bf16669f8361360321ea6bede4cbfede35bf3f4ca5842a7c2387825/pymupdf_layout-1.26.6-cp310-abi3-win_amd64.whl", hash = "sha256:2305aac24fd6e12217afaaea8ec95be297be9b250b6077a3f4e92f7f9beeaf92", size = 12734904, upload-time = "2025-11-05T14:39:05.83Z" },
]
[[package]]
name = "pymupdf4llm"
version = "0.2.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pymupdf" },
{ name = "tabulate" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ec/26/e1226c5329d0c901cd42649e4e8d7544636524c31e95a84f4dcf7c25731d/pymupdf4llm-0.2.2.tar.gz", hash = "sha256:d8dee8451e31ec39daf691687403bf2a98ac7e7b8709400a4e13a582eab835c6", size = 59501, upload-time = "2025-11-17T11:10:20.204Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/26/23/08be1528f3ccb8c245e9a7b247255d6853a8e162b1451f4888f2006c52f0/pymupdf4llm-0.2.2-py3-none-any.whl", hash = "sha256:e7777d083f5f7c7daa804c3423804c309a7e096d682773c01e9dd4bb060f4a56", size = 62063, upload-time = "2025-11-17T11:10:22.452Z" },
]
[[package]]
name = "pyreadline3"
version = "3.5.4"
@@ -3553,6 +3614,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a2/09/77d55d46fd61b4a135c444fc97158ef34a095e5681d0a6c10b75bf356191/sympy-1.14.0-py3-none-any.whl", hash = "sha256:e091cc3e99d2141a0ba2847328f5479b05d94a6635cb96148ccb3f34671bd8f5", size = 6299353, upload-time = "2025-04-27T18:04:59.103Z" },
]
[[package]]
name = "tabulate"
version = "0.9.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ec/fe/802052aecb21e3797b8f7902564ab6ea0d60ff8ca23952079064155d1ae1/tabulate-0.9.0.tar.gz", hash = "sha256:0095b12bf5966de529c0feb1fa08671671b3368eec77d7ef7ab114be2c068b3c", size = 81090, upload-time = "2022-10-06T17:21:48.54Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/40/44/4a5f08c96eb108af5cb50b41f76142f0afa346dfa99d5296fe7202a11854/tabulate-0.9.0-py3-none-any.whl", hash = "sha256:024ca478df22e9340661486f85298cff5f6dcdba14f3813e8830015b9ed1948f", size = 35252, upload-time = "2022-10-06T17:21:44.262Z" },
]
[[package]]
name = "tenacity"
version = "9.1.2"