Compare commits

...

41 Commits

Author SHA1 Message Date
smithery-ai[bot] 0b5783c106 Update README 2025-11-26 17:27:49 +00:00
Chris Coutinho 1b1667bc2b Merge pull request #357 from cbcoutinho/renovate/shivammathur-setup-php-digest
chore(deps): update shivammathur/setup-php digest to 44454db
2025-11-26 18:25:06 +01:00
Chris Coutinho c2b4bf9c67 Merge pull request #358 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.9.13
2025-11-26 18:24:46 +01:00
Chris Coutinho 0845fefe6c Merge pull request #359 from cbcoutinho/renovate/qdrant-1.x
chore(deps): update helm release qdrant to v1.16.1
2025-11-26 18:24:34 +01:00
renovate-bot-cbcoutinho[bot] d911556a84 chore(deps): update helm release qdrant to v1.16.1 2025-11-26 17:04:52 +00:00
renovate-bot-cbcoutinho[bot] 38be8d9401 chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.9.13 2025-11-26 17:04:31 +00:00
renovate-bot-cbcoutinho[bot] 9f3190f62a chore(deps): update shivammathur/setup-php digest to 44454db 2025-11-26 17:04:26 +00:00
Chris Coutinho 41aeb7e0f2 Merge pull request #356 from cbcoutinho/renovate/quay.io-keycloak-keycloak-26.x
chore(deps): update quay.io/keycloak/keycloak docker tag to v26.4.6
2025-11-26 00:50:25 +01:00
renovate-bot-cbcoutinho[bot] f8e67519e1 chore(deps): update quay.io/keycloak/keycloak docker tag to v26.4.6 2025-11-25 23:06:05 +00:00
Chris Coutinho 4279dcba1e Merge pull request #354 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.9.12
2025-11-25 18:19:32 +01:00
Chris Coutinho be7e3d6b56 Merge pull request #355 from cbcoutinho/renovate/qdrant-qdrant-1.x
chore(deps): update qdrant/qdrant docker tag to v1.16.1
2025-11-25 18:19:07 +01:00
renovate-bot-cbcoutinho[bot] 41e128190b chore(deps): update qdrant/qdrant docker tag to v1.16.1 2025-11-25 17:06:22 +00:00
renovate-bot-cbcoutinho[bot] ba869ccde5 chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.9.12 2025-11-25 17:06:11 +00:00
Chris Coutinho 27fe066b23 Merge pull request #353 from cbcoutinho/renovate/docker.io-library-nextcloud-32.0.2
chore(deps): update docker.io/library/nextcloud:32.0.2 docker digest to 8cb1dc8
2025-11-23 19:41:19 +01:00
renovate-bot-cbcoutinho[bot] e94b8ff714 chore(deps): update docker.io/library/nextcloud:32.0.2 docker digest to 8cb1dc8 2025-11-23 17:04:03 +00:00
github-actions[bot] e3a6894904 bump: version 0.48.3 → 0.48.4 2025-11-23 16:40:06 +00:00
Chris Coutinho 92b97bda00 fix: Add rate limit retry logic to OpenAI provider
Add exponential backoff retry handling for OpenAI API rate limits
(429 errors). This is needed for GitHub Models API which has stricter
rate limits than standard OpenAI API.

- Add retry_on_rate_limit decorator with exponential backoff
- Max 5 retries with delays: 2s → 4s → 8s → 16s → 32s
- Apply to embed(), _embed_batch_request(), and generate() methods

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-23 17:24:48 +01:00
Chris Coutinho d5c6039296 ci: Update rag pipeline 2025-11-23 16:33:39 +01:00
Chris Coutinho 3fa13c8bfd ci: Update rag pipeline 2025-11-23 16:12:37 +01:00
Chris Coutinho 9d306b71fa ci: Fix pytest path 2025-11-23 15:43:45 +01:00
Chris Coutinho 38a936c120 Merge pull request #352 from cbcoutinho/renovate/major-github-artifact-actions
chore(deps): update actions/upload-artifact action to v5
2025-11-23 12:43:43 +01:00
renovate-bot-cbcoutinho[bot] 0b2d449ffa chore(deps): update actions/upload-artifact action to v5 2025-11-23 05:04:36 +00:00
Chris Coutinho d881373dce ci: Remove third_party from app mounts 2025-11-23 05:48:17 +01:00
github-actions[bot] 9ade4c65f3 bump: version 0.48.2 → 0.48.3 2025-11-23 04:44:17 +00:00
Chris Coutinho 5c73b85f65 fix: Increase MCP sampling timeout to 5 minutes for slower LLMs
- Increase sampling timeout from 30s to 300s in semantic.py to accommodate
  slower local LLMs like Ollama
- Refactor RAG integration tests to support multiple providers (ollama,
  openai, anthropic, bedrock)
- Remove unnecessary embedding_provider fixture since MCP server handles
  embeddings internally
- Add --provider flag via tests/integration/conftest.py
- Add provider_fixtures.py with factory functions for generation providers

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-23 05:43:48 +01:00
github-actions[bot] f5764c01fc bump: version 0.48.1 → 0.48.2 2025-11-23 03:25:23 +00:00
Chris Coutinho 8c7c2a4407 Merge pull request #350 from cbcoutinho/feature/openai-provider-support
feature/openai provider support
2025-11-23 04:24:55 +01:00
Chris Coutinho 978de5e9a4 Merge branch 'master' into feature/openai-provider-support 2025-11-23 04:23:50 +01:00
Chris Coutinho 4e9859117c fix: Share vector sync state with FastMCP session lifespan via module singleton
The refactor in fafeaf3 moved background tasks to Starlette server lifespan
but broke nc_get_vector_sync_status because it still looked for streams in
FastMCP's AppContext (lifespan_context).

Add VectorSyncState module singleton to bridge the lifespans:
- starlette_lifespan sets the singleton when starting background tasks
- app_lifespan_basic reads from singleton and includes in AppContext
- MCP tools can now access document_receive_stream for pending count

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-23 04:20:47 +01:00
Chris Coutinho a134a0fc08 fix: Share vector sync state with FastMCP session lifespan via module singleton
The refactor in fafeaf3 moved background tasks to Starlette server lifespan
but broke nc_get_vector_sync_status because it still looked for streams in
FastMCP's AppContext (lifespan_context).

Add VectorSyncState module singleton to bridge the lifespans:
- starlette_lifespan sets the singleton when starting background tasks
- app_lifespan_basic reads from singleton and includes in AppContext
- MCP tools can now access document_receive_stream for pending count

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-23 04:20:09 +01:00
Chris Coutinho 6df58af0c3 ci: Decrease polling interval to 5s 2025-11-23 04:09:37 +01:00
github-actions[bot] 852606ec8b bump: version 0.48.0 → 0.48.1 2025-11-23 03:03:55 +00:00
Chris Coutinho caae6922be Merge pull request #349 from cbcoutinho/feature/openai-provider-support
feature/openai provider support
2025-11-23 04:03:29 +01:00
Chris Coutinho fafeaf3d83 refactor: Move background tasks to server lifespan and deprecate SSE transport
- Move scanner/processor tasks from FastMCP session lifespan to Starlette
  server lifespan (correct architecture: background tasks run once at
  server level, not per-session)
- Change default CLI transport from SSE to streamable-http
- Remove SSE transport option from CLI (SSE is deprecated)
- Remove SSE client session factory from test fixtures
- Add tracing instrumentation to BM25 hybrid search operations for
  better observability

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-23 04:02:30 +01:00
Chris Coutinho 2ab8dad6a5 fix: Use WebDAV for tag creation and add LLM-as-a-judge for RAG tests
- Change create_tag() to use WebDAV POST instead of OCS API which
  returned 404 in some Nextcloud versions
- Add llm_judge() helper that evaluates system output against ground
  truth with simple TRUE/FALSE prompt
- Replace keyword-based assertions in RAG tests with LLM judge for
  more flexible semantic evaluation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-23 02:24:01 +01:00
Chris Coutinho 50216accde Merge pull request #348 from cbcoutinho/feature/openai-provider-support
feature/openai provider support
2025-11-23 01:56:49 +01:00
Chris Coutinho bf2fdac2d0 ci: Fix health endpoint 2025-11-23 01:56:17 +01:00
github-actions[bot] 626c4bf562 bump: version 0.47.0 → 0.48.0 2025-11-23 00:53:24 +00:00
Chris Coutinho a56b3f3d51 Merge pull request #347 from cbcoutinho/feature/openai-provider-support
feature/openai provider support
2025-11-23 01:52:55 +01:00
Chris Coutinho 2896fa1dc9 feat: Add tag management methods to WebDAV client
- Add get_file_info() to get file info including file ID via PROPFIND
- Add create_tag() to create system tags via OCS API
- Add get_or_create_tag() for idempotent tag creation
- Add assign_tag_to_file() to assign tags to files via WebDAV
- Add remove_tag_from_file() to remove tags from files

Also refactors RAG evaluation:
- Add indexed_manual_pdf fixture using existing nc_client/nc_mcp_client
- Remove manual tag creation steps from workflow (now handled by fixture)
- Add comprehensive unit tests for new WebDAV methods

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-23 01:51:42 +01:00
Chris Coutinho 04251401aa ci: Add permissions to github token 2025-11-23 01:26:22 +01:00
27 changed files with 1735 additions and 949 deletions
+22 -188
View File
@@ -3,6 +3,10 @@ name: RAG Evaluation
on:
workflow_dispatch:
inputs:
manual_path:
description: 'Path to Nextcloud User Manual PDF in Nextcloud'
required: false
default: 'Nextcloud Manual.pdf'
embedding_model:
description: 'OpenAI embedding model'
required: false
@@ -15,66 +19,27 @@ on:
jobs:
rag-evaluation:
runs-on: ubuntu-latest
timeout-minutes: 45
timeout-minutes: 30
permissions:
models: read
steps:
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
submodules: 'true'
- name: Clone Nextcloud documentation
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
repository: 'nextcloud/documentation'
path: 'nextcloud-docs'
- name: Install Sphinx and LaTeX dependencies
run: |
sudo apt-get update
sudo apt-get install -y \
python3-sphinx \
python3-pip \
latexmk \
texlive-latex-recommended \
texlive-latex-extra \
texlive-fonts-recommended \
texlive-fonts-extra
- name: Build User Manual PDF
run: |
cd nextcloud-docs/user_manual
pip3 install -r ../requirements.txt
make latexpdf
ls -la _build/latex/
cp _build/latex/NextcloudUserManual.pdf ../../Nextcloud_User_Manual.pdf
echo "PDF built successfully"
###### Required to build OIDC App ######
- name: Set up php 8.4
uses: shivammathur/setup-php@bf6b4fbd49ca58e4608c9c89fba0b8d90bd2a39f # v2
with:
php-version: 8.4
coverage: none
- name: Install OIDC app composer dependencies
run: |
cd third_party/oidc
composer install --no-dev
###### Required to build OIDC App ######
- name: Run docker compose with vector sync
uses: hoverkraft-tech/compose-action@3846bcd61da338e9eaaf83e7ed0234a12b099b72 # v2.4.1
with:
compose-file: "./docker-compose.yml"
compose-file: |
./docker-compose.yml
./docker-compose.ci.yml
up-flags: "--build"
env:
# Override MCP container environment for OpenAI + vector sync
VECTOR_SYNC_ENABLED: "true"
VECTOR_SYNC_SCAN_INTERVAL: "30"
# Environment variables passed to docker-compose.ci.yml
OPENAI_API_KEY: ${{ secrets.GITHUB_TOKEN }}
OPENAI_BASE_URL: "https://models.github.ai/inference"
OPENAI_EMBEDDING_MODEL: ${{ inputs.embedding_model }}
OPENAI_GENERATION_MODEL: ${{ inputs.generation_model }}
VECTOR_SYNC_SCAN_INTERVAL: "5"
- name: Install the latest version of uv
uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7.1.4
@@ -100,7 +65,7 @@ jobs:
echo "Waiting for MCP server..."
max_attempts=30
attempt=0
until curl -o /dev/null -s -w "%{http_code}\n" http://localhost:8000/health | grep -q "200"; do
until curl -o /dev/null -s -w "%{http_code}\n" http://localhost:8000/health/live | grep -q "200"; do
attempt=$((attempt + 1))
if [ $attempt -ge $max_attempts ]; then
echo "MCP server did not become ready in time."
@@ -111,159 +76,28 @@ jobs:
done
echo "MCP server is ready."
- name: Upload User Manual PDF to Nextcloud
run: |
echo "Uploading Nextcloud_User_Manual.pdf to Nextcloud..."
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" -u admin:admin \
-X PUT \
-T Nextcloud_User_Manual.pdf \
"http://localhost:8080/remote.php/dav/files/admin/Nextcloud_User_Manual.pdf")
if [ "$HTTP_CODE" = "201" ] || [ "$HTTP_CODE" = "204" ]; then
echo "PDF uploaded successfully (HTTP $HTTP_CODE)"
else
echo "Failed to upload PDF (HTTP $HTTP_CODE)"
exit 1
fi
- name: Create vector-index tag
id: create_tag
run: |
# Create the tag using OCS API
echo "Creating vector-index tag..."
RESPONSE=$(curl -s -u admin:admin \
-X POST \
-H 'Content-Type: application/json' \
-H 'OCS-APIRequest: true' \
-d '{"name":"vector-index","userVisible":true,"userAssignable":true}' \
"http://localhost:8080/ocs/v2.php/apps/systemtags/api/v1/tags")
echo "Create tag response: $RESPONSE"
# Get tag ID from response or lookup
TAG_ID=$(echo "$RESPONSE" | grep -oP '(?<="id":)[0-9]+' | head -1 || echo "")
if [ -z "$TAG_ID" ]; then
echo "Tag may already exist, looking it up..."
TAG_ID=$(curl -s -u admin:admin \
-X PROPFIND \
-H 'Content-Type: application/xml' \
-d '<?xml version="1.0"?><d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns"><d:prop><oc:id/><oc:display-name/></d:prop></d:propfind>' \
http://localhost:8080/remote.php/dav/systemtags/ \
| grep -B2 "vector-index" | grep -oP '(?<=<oc:id>)[0-9]+(?=</oc:id>)' | head -1 || echo "")
fi
if [ -z "$TAG_ID" ]; then
echo "ERROR: Could not create or find vector-index tag"
exit 1
fi
echo "Tag ID: $TAG_ID"
echo "tag_id=$TAG_ID" >> $GITHUB_OUTPUT
- name: Get file ID of uploaded PDF
id: get_file_id
run: |
echo "Getting file ID for Nextcloud_User_Manual.pdf..."
# Get file ID using PROPFIND
FILE_ID=$(curl -s -u admin:admin \
-X PROPFIND \
-H 'Content-Type: application/xml' \
-d '<?xml version="1.0"?><d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns"><d:prop><oc:fileid/></d:prop></d:propfind>' \
"http://localhost:8080/remote.php/dav/files/admin/Nextcloud_User_Manual.pdf" \
| grep -oP '(?<=<oc:fileid>)[0-9]+(?=</oc:fileid>)' || echo "")
if [ -z "$FILE_ID" ]; then
echo "ERROR: Could not find file ID"
exit 1
fi
echo "Found file ID: $FILE_ID"
echo "file_id=$FILE_ID" >> $GITHUB_OUTPUT
- name: Tag file with vector-index
env:
FILE_ID: ${{ steps.get_file_id.outputs.file_id }}
TAG_ID: ${{ steps.create_tag.outputs.tag_id }}
run: |
echo "Tagging file $FILE_ID with tag $TAG_ID..."
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" -u admin:admin \
-X PUT \
-H 'Content-Type: application/json' \
-H 'Content-Length: 0' \
"http://localhost:8080/remote.php/dav/systemtags-relations/files/$FILE_ID/$TAG_ID")
if [ "$HTTP_CODE" = "201" ] || [ "$HTTP_CODE" = "409" ]; then
echo "File tagged successfully (HTTP $HTTP_CODE)"
else
echo "Failed to tag file (HTTP $HTTP_CODE)"
exit 1
fi
- name: Wait for vector sync to complete indexing
env:
NEXTCLOUD_HOST: "http://localhost:8080"
NEXTCLOUD_USERNAME: "admin"
NEXTCLOUD_PASSWORD: "admin"
run: |
echo "Waiting for vector sync to index the manual..."
max_attempts=60
attempt=0
# Wait for initial scan to pick up the file
sleep 10
while [ $attempt -lt $max_attempts ]; do
attempt=$((attempt + 1))
# Check vector sync status via MCP
STATUS=$(curl -s http://localhost:8000/health || echo "{}")
echo "Attempt $attempt/$max_attempts: $STATUS"
# Also check indexed count via semantic search
# If we get results, indexing is done
RESULT=$(curl -s -X POST http://localhost:8000/mcp \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"nc_get_vector_sync_status","arguments":{}}}' \
2>/dev/null || echo "{}")
echo "Vector sync status: $RESULT"
# Check if pending is 0 and indexed > 0
INDEXED=$(echo "$RESULT" | jq -r '.result.structuredContent.indexed // 0' 2>/dev/null || echo "0")
PENDING=$(echo "$RESULT" | jq -r '.result.structuredContent.pending // 1' 2>/dev/null || echo "1")
echo "Indexed: $INDEXED, Pending: $PENDING"
if [ "$INDEXED" -gt "0" ] && [ "$PENDING" -eq "0" ]; then
echo "Indexing complete! $INDEXED documents indexed."
break
fi
sleep 10
done
if [ $attempt -ge $max_attempts ]; then
echo "WARNING: Indexing may not be complete, proceeding anyway..."
fi
- name: Run RAG evaluation tests
env:
NEXTCLOUD_HOST: "http://localhost:8080"
NEXTCLOUD_USERNAME: "admin"
NEXTCLOUD_PASSWORD: "admin"
RAG_MANUAL_PATH: ${{ inputs.manual_path }}
OPENAI_API_KEY: ${{ secrets.GITHUB_TOKEN }}
OPENAI_BASE_URL: "https://models.github.ai/inference"
OPENAI_EMBEDDING_MODEL: ${{ inputs.embedding_model }}
OPENAI_GENERATION_MODEL: ${{ inputs.generation_model }}
run: |
uv run pytest tests/integration/test_rag_openai.py -v --log-cli-level=INFO
uv run pytest tests/integration/test_rag.py -v --log-cli-level=INFO --provider openai
- name: Capture MCP container logs
if: always()
run: |
echo "=== MCP Container Logs ==="
docker compose logs mcp --tail=500
- name: Upload test results
if: always()
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5
with:
name: rag-evaluation-results
path: |
+1 -1
View File
@@ -35,7 +35,7 @@ jobs:
###### Required to build OIDC App ######
- name: Set up php 8.4
uses: shivammathur/setup-php@bf6b4fbd49ca58e4608c9c89fba0b8d90bd2a39f # v2
uses: shivammathur/setup-php@44454db4f0199b8b9685a5d763dc37cbf79108e1 # v2
with:
php-version: 8.4
coverage: none
+35
View File
@@ -1,3 +1,38 @@
## v0.48.4 (2025-11-23)
### Fix
- Add rate limit retry logic to OpenAI provider
## v0.48.3 (2025-11-23)
### Fix
- Increase MCP sampling timeout to 5 minutes for slower LLMs
## v0.48.2 (2025-11-23)
### Fix
- Share vector sync state with FastMCP session lifespan via module singleton
- Share vector sync state with FastMCP session lifespan via module singleton
## v0.48.1 (2025-11-23)
### Fix
- Use WebDAV for tag creation and add LLM-as-a-judge for RAG tests
### Refactor
- Move background tasks to server lifespan and deprecate SSE transport
## v0.48.0 (2025-11-23)
### Feat
- Add tag management methods to WebDAV client
## v0.47.0 (2025-11-23)
### Feat
+1 -1
View File
@@ -1,6 +1,6 @@
FROM docker.io/library/python:3.12-slim-trixie@sha256:b43ff04d5df04ad5cabb80890b7ef74e8410e3395b19af970dcd52d7a4bff921
COPY --from=ghcr.io/astral-sh/uv:0.9.11@sha256:5aa820129de0a600924f166aec9cb51613b15b68f1dcd2a02f31a500d2ede568 /uv /uvx /bin/
COPY --from=ghcr.io/astral-sh/uv:0.9.13@sha256:f07d1bf7b1fb4b983eed2b31320e25a2a76625bdf83d5ff0208fe105d4d8d2f5 /uv /uvx /bin/
# Install dependencies
# 1. git (required for caldav dependency from git)
+1 -1
View File
@@ -17,7 +17,7 @@ FROM docker.io/library/python:3.12-slim-trixie@sha256:b43ff04d5df04ad5cabb80890b
WORKDIR /app
# Install uv for fast dependency management
COPY --from=ghcr.io/astral-sh/uv:0.9.11@sha256:5aa820129de0a600924f166aec9cb51613b15b68f1dcd2a02f31a500d2ede568 /uv /uvx /bin/
COPY --from=ghcr.io/astral-sh/uv:0.9.13@sha256:f07d1bf7b1fb4b983eed2b31320e25a2a76625bdf83d5ff0208fe105d4d8d2f5 /uv /uvx /bin/
# Install dependencies
# 1. git (required for caldav dependency from git)
+3 -1
View File
@@ -1,11 +1,12 @@
```markdown
<p align="center">
<img src="astrolabe.svg" alt="Nextcloud MCP Server" width="128" height="128">
</p>
# Nextcloud MCP Server
[![Docker Image](https://img.shields.io/badge/docker-ghcr.io/cbcoutinho/nextcloud--mcp--server-blue)](https://github.com/cbcoutinho/nextcloud-mcp-server/pkgs/container/nextcloud-mcp-server)
[![smithery badge](https://smithery.ai/badge/@cbcoutinho/nextcloud-mcp-server)](https://smithery.ai/server/@cbcoutinho/nextcloud-mcp-server)
[![Docker Image](https://img.shields.io/badge/docker-ghcr.io/cbcoutinho/nextcloud--mcp--server-blue)](https://github.com/cbcoutinho/nextcloud-mcp-server/pkgs/container/nextcloud-mcp-server)
**A production-ready MCP server that connects AI assistants to your Nextcloud instance.**
@@ -223,3 +224,4 @@ This project is licensed under the AGPL-3.0 License. See [LICENSE](./LICENSE) fo
- [Model Context Protocol](https://github.com/modelcontextprotocol)
- [MCP Python SDK](https://github.com/modelcontextprotocol/python-sdk)
- [Nextcloud](https://nextcloud.com/)
```
+3 -3
View File
@@ -1,9 +1,9 @@
dependencies:
- name: qdrant
repository: https://qdrant.github.io/qdrant-helm
version: 1.16.0
version: 1.16.1
- name: ollama
repository: https://otwld.github.io/ollama-helm
version: 1.35.0
digest: sha256:da8db198b12ce0252df220fabb297cfe69186edb8e67952c52e05de778189b92
generated: "2025-11-21T11:09:07.997781541Z"
digest: sha256:b6889ef1eb8d339cbc046db8b39b0fca5df14aa7db4f800b8486db82e1df9e13
generated: "2025-11-26T17:04:46.314130537Z"
+3 -3
View File
@@ -2,8 +2,8 @@ apiVersion: v2
name: nextcloud-mcp-server
description: A Helm chart for Nextcloud MCP Server - enables AI assistants to interact with Nextcloud
type: application
version: 0.47.0
appVersion: "0.47.0"
version: 0.48.4
appVersion: "0.48.4"
keywords:
- nextcloud
- mcp
@@ -27,7 +27,7 @@ annotations:
grafana_dashboard_folder: "Nextcloud MCP"
dependencies:
- name: qdrant
version: "1.16.0"
version: "1.16.1"
repository: https://qdrant.github.io/qdrant-helm
condition: qdrant.networkMode.deploySubchart
- name: ollama
+25
View File
@@ -0,0 +1,25 @@
# CI-specific overrides for RAG evaluation pipeline
# This file is used by the rag-evaluation.yml workflow to configure the MCP
# container with OpenAI/GitHub Models API for vector embeddings.
#
# Usage:
# docker compose -f docker-compose.yml -f docker-compose.ci.yml up
#
# Environment variables (set in CI workflow):
# OPENAI_API_KEY - API key for embeddings (GitHub Models uses GITHUB_TOKEN)
# OPENAI_BASE_URL - API endpoint (e.g., https://models.github.ai/inference)
# OPENAI_EMBEDDING_MODEL - Model name (e.g., openai/text-embedding-3-small)
# OPENAI_GENERATION_MODEL - Model name for generation (e.g., openai/gpt-4o-mini)
services:
mcp:
environment:
# OpenAI provider configuration (required for CI vector sync)
- OPENAI_API_KEY=${OPENAI_API_KEY}
- OPENAI_BASE_URL=${OPENAI_BASE_URL:-https://models.github.ai/inference}
- OPENAI_EMBEDDING_MODEL=${OPENAI_EMBEDDING_MODEL:-openai/text-embedding-3-small}
- OPENAI_GENERATION_MODEL=${OPENAI_GENERATION_MODEL:-openai/gpt-4o-mini}
# Faster sync for CI
- VECTOR_SYNC_SCAN_INTERVAL=${VECTOR_SYNC_SCAN_INTERVAL:-5}
# Enable document processing for PDF parsing
- ENABLE_DOCUMENT_PROCESSING=true
+4 -4
View File
@@ -21,7 +21,7 @@ services:
restart: always
app:
image: docker.io/library/nextcloud:32.0.2@sha256:ac08482d73ffd85d94069ba291bbd5fb39a70ff21502030a2e3e2d89a7246a48
image: docker.io/library/nextcloud:32.0.2@sha256:8cb1dc8c26944115469dd22f4965d2ed35bab9cf8c48d2bb052c8e9f83821ded
restart: always
ports:
- 0.0.0.0:8080:80
@@ -34,7 +34,7 @@ services:
- ./app-hooks:/docker-entrypoint-hooks.d:ro
# Mount OIDC development directory outside /var/www/html to avoid rsync conflicts
# The post-installation hook will register /opt/apps as an additional app directory
- ./third_party:/opt/apps:ro
#- ./third_party:/opt/apps:ro
environment:
- NEXTCLOUD_TRUSTED_DOMAINS=app
- NEXTCLOUD_ADMIN_USER=admin
@@ -158,7 +158,7 @@ services:
- oauth-tokens:/app/data
keycloak:
image: quay.io/keycloak/keycloak:26.4.5@sha256:653852bfdea2be6e958b9e90a976eff1c6de34edd55f2f679bdc48ef16bc528e
image: quay.io/keycloak/keycloak:26.4.6@sha256:d0d4037f17521a7f06137afd5a0eecb1f977f4ade773ae7755f1ee82cad8a576
command:
- "start-dev"
- "--import-realm"
@@ -245,7 +245,7 @@ services:
- smithery
qdrant:
image: qdrant/qdrant:v1.16.0@sha256:1005201498cf927d835383d0f918b17d8c9da7db58550f169f694455e42d78f4
image: qdrant/qdrant:v1.16.1@sha256:db1c735496dfa982ef27576a17b624e48e6b46a140bcdc2ac34e39d186204ef5
restart: always
ports:
- 127.0.0.1:6333:6333 # REST API
+188 -243
View File
@@ -243,6 +243,25 @@ def validate_pkce_support(discovery: dict, discovery_url: str) -> None:
click.echo(f"✓ PKCE support validated: {code_challenge_methods}")
@dataclass
class VectorSyncState:
"""
Module-level state for vector sync background tasks.
This singleton bridges the Starlette server lifespan (where background tasks run)
and FastMCP session lifespans (where MCP tools need access to the streams).
"""
document_send_stream: Optional[MemoryObjectSendStream] = None
document_receive_stream: Optional[MemoryObjectReceiveStream] = None
shutdown_event: Optional[anyio.Event] = None
scanner_wake_event: Optional[anyio.Event] = None
# Module-level singleton for vector sync state
_vector_sync_state = VectorSyncState()
@dataclass
class AppContext:
"""Application context for BasicAuth mode."""
@@ -555,15 +574,15 @@ async def load_oauth_client_credentials(
@asynccontextmanager
async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]:
"""
Manage application lifecycle for BasicAuth mode.
Manage application lifecycle for BasicAuth mode (FastMCP session lifespan).
Creates a single Nextcloud client with basic authentication
that is shared across all requests.
that is shared across all requests within a session.
If vector sync is enabled (VECTOR_SYNC_ENABLED=true), also starts
background tasks for automatic document indexing (ADR-007).
Note: Background tasks (scanner, processor) are started at server level
in starlette_lifespan, not here. This lifespan runs per-session.
"""
logger.info("Starting MCP server in BasicAuth mode")
logger.info("Starting MCP session in BasicAuth mode")
logger.info("Creating Nextcloud client with BasicAuth")
client = NextcloudClient.from_env()
@@ -579,91 +598,20 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]:
# Initialize document processors
initialize_document_processors()
settings = get_settings()
# Check if vector sync is enabled
if settings.vector_sync_enabled:
logger.info("Vector sync enabled - starting background tasks")
# Get username from environment for BasicAuth mode
username = os.getenv("NEXTCLOUD_USERNAME")
if not username:
raise ValueError(
"NEXTCLOUD_USERNAME is required for vector sync in BasicAuth mode"
)
# Initialize Qdrant collection before starting background tasks
logger.info("Initializing Qdrant collection...")
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
try:
await get_qdrant_client() # Triggers collection creation if needed
logger.info("Qdrant collection ready")
except Exception as e:
logger.error(f"Failed to initialize Qdrant collection: {e}")
raise RuntimeError(
f"Cannot start vector sync - Qdrant initialization failed: {e}"
) from e
# Initialize shared state
send_stream, receive_stream = anyio.create_memory_object_stream(
max_buffer_size=settings.vector_sync_queue_max_size
# Yield client context - scanner runs at server level (starlette_lifespan)
# Include vector sync state from module singleton (set by starlette_lifespan)
try:
yield AppContext(
client=client,
storage=storage,
document_send_stream=_vector_sync_state.document_send_stream,
document_receive_stream=_vector_sync_state.document_receive_stream,
shutdown_event=_vector_sync_state.shutdown_event,
scanner_wake_event=_vector_sync_state.scanner_wake_event,
)
shutdown_event = anyio.Event()
scanner_wake_event = anyio.Event()
# Start background tasks using anyio TaskGroup
async with anyio.create_task_group() as tg:
# Start scanner task
await tg.start(
scanner_task,
send_stream,
shutdown_event,
scanner_wake_event,
client,
username,
)
# Start processor pool (each gets a cloned receive stream)
for i in range(settings.vector_sync_processor_workers):
await tg.start(
processor_task,
i,
receive_stream.clone(),
shutdown_event,
client,
username,
)
logger.info(
f"Background sync tasks started: 1 scanner + {settings.vector_sync_processor_workers} processors"
)
# Yield with background tasks running
try:
yield AppContext(
client=client,
storage=storage,
document_send_stream=send_stream,
document_receive_stream=receive_stream,
shutdown_event=shutdown_event,
scanner_wake_event=scanner_wake_event,
)
finally:
# Shutdown signal
logger.info("Shutting down background sync tasks")
shutdown_event.set()
# TaskGroup automatically cancels all tasks on exit
logger.info("Background sync tasks stopped")
await client.close()
else:
# No vector sync - simple lifecycle
try:
yield AppContext(client=client, storage=storage)
finally:
logger.info("Shutting down BasicAuth mode")
await client.close()
finally:
logger.info("Shutting down BasicAuth session")
await client.close()
async def setup_oauth_config():
@@ -979,7 +927,7 @@ async def setup_oauth_config():
)
def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None = None):
# Initialize observability (logging will be configured by uvicorn)
settings = get_settings()
@@ -1197,180 +1145,177 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
"Dynamic tool filtering enabled for OAuth mode (JWT and Bearer tokens)"
)
if transport == "sse":
mcp_app = mcp.sse_app()
starlette_lifespan = None
elif transport in ("http", "streamable-http"):
mcp_app = mcp.streamable_http_app()
mcp_app = mcp.streamable_http_app()
@asynccontextmanager
async def starlette_lifespan(app: Starlette):
# Set OAuth context for OAuth login routes (ADR-004)
if oauth_enabled:
# Prepare OAuth config from setup_oauth_config closure variables
mcp_server_url = os.getenv(
"NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000"
)
nextcloud_resource_uri = os.getenv(
"NEXTCLOUD_RESOURCE_URI", nextcloud_host
)
discovery_url = os.getenv(
"OIDC_DISCOVERY_URL",
f"{nextcloud_host}/.well-known/openid-configuration",
)
scopes = os.getenv("NEXTCLOUD_OIDC_SCOPES", "")
@asynccontextmanager
async def starlette_lifespan(app: Starlette):
# Set OAuth context for OAuth login routes (ADR-004)
if oauth_enabled:
# Prepare OAuth config from setup_oauth_config closure variables
mcp_server_url = os.getenv(
"NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000"
)
nextcloud_resource_uri = os.getenv("NEXTCLOUD_RESOURCE_URI", nextcloud_host)
discovery_url = os.getenv(
"OIDC_DISCOVERY_URL",
f"{nextcloud_host}/.well-known/openid-configuration",
)
scopes = os.getenv("NEXTCLOUD_OIDC_SCOPES", "")
oauth_context_dict = {
"storage": refresh_token_storage,
"oauth_client": oauth_client,
"token_verifier": token_verifier, # For querying IdP userinfo endpoint
"config": {
"mcp_server_url": mcp_server_url,
"discovery_url": discovery_url,
"client_id": client_id, # From setup_oauth_config (DCR or static)
"client_secret": client_secret, # From setup_oauth_config (DCR or static)
"scopes": scopes,
"nextcloud_host": nextcloud_host,
"nextcloud_resource_uri": nextcloud_resource_uri,
"oauth_provider": oauth_provider,
},
}
app.state.oauth_context = oauth_context_dict
oauth_context_dict = {
"storage": refresh_token_storage,
"oauth_client": oauth_client,
"token_verifier": token_verifier, # For querying IdP userinfo endpoint
"config": {
"mcp_server_url": mcp_server_url,
"discovery_url": discovery_url,
"client_id": client_id, # From setup_oauth_config (DCR or static)
"client_secret": client_secret, # From setup_oauth_config (DCR or static)
"scopes": scopes,
"nextcloud_host": nextcloud_host,
"nextcloud_resource_uri": nextcloud_resource_uri,
"oauth_provider": oauth_provider,
},
}
app.state.oauth_context = oauth_context_dict
# Also set oauth_context on browser_app for session authentication
# browser_app is in the same function scope (defined later in create_app)
# We need to find it in the mounted routes
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
route.app.state.oauth_context = oauth_context_dict
logger.info(
"OAuth context shared with browser_app for session auth"
)
break
logger.info(
f"OAuth context initialized for login routes (client_id={client_id[:16]}...)"
)
else:
# BasicAuth mode - share storage with browser_app for webhook management
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
storage = RefreshTokenStorage.from_env()
await storage.initialize()
app.state.storage = storage
# Also share with browser_app for webhook routes
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
route.app.state.storage = storage
logger.info(
"Storage shared with browser_app for webhook management"
)
break
# Start background vector sync tasks for BasicAuth mode (ADR-007)
# For streamable-http transport, FastMCP lifespan isn't automatically triggered
# so we manually start background tasks here if vector sync is enabled
import anyio as anyio_module
settings = get_settings()
if not oauth_enabled and settings.vector_sync_enabled:
logger.info("Starting background vector sync tasks for BasicAuth mode")
# Get username from environment
username = os.getenv("NEXTCLOUD_USERNAME")
if not username:
raise ValueError(
"NEXTCLOUD_USERNAME required for vector sync in BasicAuth mode"
# Also set oauth_context on browser_app for session authentication
# browser_app is in the same function scope (defined later in create_app)
# We need to find it in the mounted routes
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
route.app.state.oauth_context = oauth_context_dict
logger.info(
"OAuth context shared with browser_app for session auth"
)
break
# Get Nextcloud client from MCP app context
# Create client since we're outside FastMCP lifespan
client = NextcloudClient.from_env()
logger.info(
f"OAuth context initialized for login routes (client_id={client_id[:16]}...)"
)
else:
# BasicAuth mode - share storage with browser_app for webhook management
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
# Initialize Qdrant collection before starting background tasks
logger.info("Initializing Qdrant collection...")
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
storage = RefreshTokenStorage.from_env()
await storage.initialize()
try:
await get_qdrant_client() # Triggers collection creation if needed
logger.info("Qdrant collection ready")
except Exception as e:
logger.error(f"Failed to initialize Qdrant collection: {e}")
raise RuntimeError(
f"Cannot start vector sync - Qdrant initialization failed: {e}"
) from e
app.state.storage = storage
# Initialize shared state
send_stream, receive_stream = anyio_module.create_memory_object_stream(
max_buffer_size=settings.vector_sync_queue_max_size
# Also share with browser_app for webhook routes
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
route.app.state.storage = storage
logger.info(
"Storage shared with browser_app for webhook management"
)
break
# Start background vector sync tasks for BasicAuth mode (ADR-007)
# Scanner runs at server-level (once), not per-session
import anyio as anyio_module
settings = get_settings()
if not oauth_enabled and settings.vector_sync_enabled:
logger.info("Starting background vector sync tasks for BasicAuth mode")
# Get username from environment
username = os.getenv("NEXTCLOUD_USERNAME")
if not username:
raise ValueError(
"NEXTCLOUD_USERNAME required for vector sync in BasicAuth mode"
)
shutdown_event = anyio_module.Event()
scanner_wake_event = anyio_module.Event()
# Store in app state for access from routes (ADR-007)
app.state.document_send_stream = send_stream
app.state.document_receive_stream = receive_stream
app.state.shutdown_event = shutdown_event
app.state.scanner_wake_event = scanner_wake_event
# Create client for vector sync (server-level, not per-session)
client = NextcloudClient.from_env()
# Also share with browser_app for /app route
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
route.app.state.document_send_stream = send_stream
route.app.state.document_receive_stream = receive_stream
route.app.state.shutdown_event = shutdown_event
route.app.state.scanner_wake_event = scanner_wake_event
logger.info(
"Vector sync state shared with browser_app for /app"
)
break
# Initialize Qdrant collection before starting background tasks
logger.info("Initializing Qdrant collection...")
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
# Start background tasks using anyio TaskGroup
async with anyio_module.create_task_group() as tg:
# Start scanner task
try:
await get_qdrant_client() # Triggers collection creation if needed
logger.info("Qdrant collection ready")
except Exception as e:
logger.error(f"Failed to initialize Qdrant collection: {e}")
raise RuntimeError(
f"Cannot start vector sync - Qdrant initialization failed: {e}"
) from e
# Initialize shared state
send_stream, receive_stream = anyio_module.create_memory_object_stream(
max_buffer_size=settings.vector_sync_queue_max_size
)
shutdown_event = anyio_module.Event()
scanner_wake_event = anyio_module.Event()
# Store in app state for access from routes (ADR-007)
app.state.document_send_stream = send_stream
app.state.document_receive_stream = receive_stream
app.state.shutdown_event = shutdown_event
app.state.scanner_wake_event = scanner_wake_event
# Also store in module singleton for FastMCP session lifespans
_vector_sync_state.document_send_stream = send_stream
_vector_sync_state.document_receive_stream = receive_stream
_vector_sync_state.shutdown_event = shutdown_event
_vector_sync_state.scanner_wake_event = scanner_wake_event
logger.info("Vector sync state stored in module singleton")
# Also share with browser_app for /app route
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
route.app.state.document_send_stream = send_stream
route.app.state.document_receive_stream = receive_stream
route.app.state.shutdown_event = shutdown_event
route.app.state.scanner_wake_event = scanner_wake_event
logger.info("Vector sync state shared with browser_app for /app")
break
# Start background tasks using anyio TaskGroup
async with anyio_module.create_task_group() as tg:
# Start scanner task
await tg.start(
scanner_task,
send_stream,
shutdown_event,
scanner_wake_event,
client,
username,
)
# Start processor pool (each gets a cloned receive stream)
for i in range(settings.vector_sync_processor_workers):
await tg.start(
scanner_task,
send_stream,
processor_task,
i,
receive_stream.clone(),
shutdown_event,
scanner_wake_event,
client,
username,
)
# Start processor pool (each gets a cloned receive stream)
for i in range(settings.vector_sync_processor_workers):
await tg.start(
processor_task,
i,
receive_stream.clone(),
shutdown_event,
client,
username,
)
logger.info(
f"Background sync tasks started: 1 scanner + "
f"{settings.vector_sync_processor_workers} processors"
)
logger.info(
f"Background sync tasks started: 1 scanner + "
f"{settings.vector_sync_processor_workers} processors"
)
# Run MCP session manager and yield
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
try:
yield
finally:
# Shutdown signal
logger.info("Shutting down background sync tasks")
shutdown_event.set()
await client.close()
# TaskGroup automatically cancels all tasks on exit
else:
# No vector sync - just run MCP session manager
# Run MCP session manager and yield
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
yield
try:
yield
finally:
# Shutdown signal
logger.info("Shutting down background sync tasks")
shutdown_event.set()
await client.close()
# TaskGroup automatically cancels all tasks on exit
else:
# No vector sync - just run MCP session manager
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
yield
# Health check endpoints for Kubernetes probes
def health_live(request):
+74 -37
View File
@@ -22,6 +22,7 @@ from starlette.requests import Request
from starlette.responses import HTMLResponse, JSONResponse
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.observability.tracing import trace_operation
from nextcloud_mcp_server.search import (
BM25HybridSearchAlgorithm,
SemanticSearchAlgorithm,
@@ -139,7 +140,10 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
_get_authenticated_client_for_userinfo,
)
async with await _get_authenticated_client_for_userinfo(request) as nc_client: # noqa: F841
with trace_operation("vector_viz.get_auth_client"):
auth_client_ctx = await _get_authenticated_client_for_userinfo(request)
async with auth_client_ctx as nc_client: # noqa: F841
# Create search algorithm (no client needed - verification removed)
if algorithm == "semantic":
search_algo = SemanticSearchAlgorithm(score_threshold=score_threshold)
@@ -159,24 +163,40 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
all_results = []
if doc_types is None or len(doc_types) == 0:
# Cross-app search - search all indexed types
unverified_results = await search_algo.search(
query=query,
user_id=username,
limit=limit * 2, # Buffer for verification filtering
doc_type=None, # Search all types
score_threshold=score_threshold,
)
all_results.extend(unverified_results)
else:
# Search each document type and combine
for doc_type in doc_types:
with trace_operation(
"vector_viz.search_execute",
attributes={
"search.algorithm": algorithm,
"search.limit": limit * 2,
"search.doc_type": "all",
},
):
unverified_results = await search_algo.search(
query=query,
user_id=username,
limit=limit * 2, # Buffer for verification filtering
doc_type=doc_type,
doc_type=None, # Search all types
score_threshold=score_threshold,
)
all_results.extend(unverified_results)
else:
# Search each document type and combine
for doc_type in doc_types:
with trace_operation(
"vector_viz.search_execute",
attributes={
"search.algorithm": algorithm,
"search.limit": limit * 2,
"search.doc_type": doc_type,
},
):
unverified_results = await search_algo.search(
query=query,
user_id=username,
limit=limit * 2, # Buffer for verification filtering
doc_type=doc_type,
score_threshold=score_threshold,
)
all_results.extend(unverified_results)
# Sort by score before verification
all_results.sort(key=lambda r: r.score, reverse=True)
@@ -190,22 +210,26 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
# Store original scores and normalize for visualization
# (best result = 1.0, worst result = 0.0 within THIS result set)
# This makes visual encoding meaningful regardless of RRF normalization
if search_results:
scores = [r.score for r in search_results]
min_score, max_score = min(scores), max(scores)
score_range = max_score - min_score if max_score > min_score else 1.0
with trace_operation(
"vector_viz.score_normalize",
attributes={"normalize.num_results": len(search_results)},
):
if search_results:
scores = [r.score for r in search_results]
min_score, max_score = min(scores), max(scores)
score_range = max_score - min_score if max_score > min_score else 1.0
logger.info(
f"Normalizing scores for viz: original range [{min_score:.3f}, {max_score:.3f}] "
f"→ [0.0, 1.0]"
)
logger.info(
f"Normalizing scores for viz: original range [{min_score:.3f}, {max_score:.3f}] "
f"→ [0.0, 1.0]"
)
# Store original score and rescale to 0-1 for visualization
for r in search_results:
# Store original score before normalization
r.original_score = r.score
# Rescale for visual encoding
r.score = (r.score - min_score) / score_range
# Store original score and rescale to 0-1 for visualization
for r in search_results:
# Store original score before normalization
r.original_score = r.score
# Rescale for visual encoding
r.score = (r.score - min_score) / score_range
if not search_results:
return JSONResponse(
@@ -220,7 +244,9 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
# Fetch vectors for specific matching chunks from Qdrant using batch retrieve
vector_fetch_start = time.perf_counter()
qdrant_client = await get_qdrant_client()
with trace_operation("vector_viz.get_qdrant_client"):
qdrant_client = await get_qdrant_client()
chunk_vectors_map = {} # Map (doc_id, chunk_start, chunk_end) -> vector
@@ -231,12 +257,16 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
if point_ids:
# Single batch retrieve call instead of N sequential scroll calls
# This is ~50x faster for 50 results (1 HTTP request vs 50)
points_response = await qdrant_client.retrieve(
collection_name=settings.get_collection_name(),
ids=point_ids,
with_vectors=["dense"],
with_payload=["doc_id", "chunk_start_offset", "chunk_end_offset"],
)
with trace_operation(
"vector_viz.vector_retrieve",
attributes={"retrieve.num_points": len(point_ids)},
):
points_response = await qdrant_client.retrieve(
collection_name=settings.get_collection_name(),
ids=point_ids,
with_vectors=["dense"],
with_payload=["doc_id", "chunk_start_offset", "chunk_end_offset"],
)
# Build chunk_vectors_map from batch response
for point in points_response:
@@ -367,9 +397,16 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
import anyio
coords_3d, pca = await anyio.to_thread.run_sync( # type: ignore[attr-defined]
lambda: _compute_pca(all_vectors_normalized)
)
with trace_operation(
"vector_viz.pca_compute",
attributes={
"pca.num_vectors": len(all_vectors_normalized),
"pca.embedding_dim": embedding_dim,
},
):
coords_3d, pca = await anyio.to_thread.run_sync( # type: ignore[attr-defined]
lambda: _compute_pca(all_vectors_normalized)
)
pca_duration = time.perf_counter() - pca_start
# After fit, these attributes are guaranteed to be set
+2 -2
View File
@@ -29,9 +29,9 @@ from .app import get_app
@click.option(
"--transport",
"-t",
default="sse",
default="streamable-http",
show_default=True,
type=click.Choice(["sse", "streamable-http", "http"]),
type=click.Choice(["streamable-http", "http"]),
help="MCP transport protocol",
)
@click.option(
+234
View File
@@ -1295,3 +1295,237 @@ class WebDAVClient(BaseNextcloudClient):
logger.debug(f"Found {len(files)} files with tag ID {tag_id}")
return files
async def get_file_info(self, path: str) -> dict[str, Any] | None:
"""Get file info including file ID via WebDAV PROPFIND.
Args:
path: Path to the file (relative to user's files directory)
Returns:
File info dictionary with id, name, size, content_type, etc.
Returns None if file not found.
"""
webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}"
propfind_body = """<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
<d:prop>
<oc:fileid/>
<d:displayname/>
<d:getcontentlength/>
<d:getcontenttype/>
<d:getlastmodified/>
<d:getetag/>
<d:resourcetype/>
</d:prop>
</d:propfind>"""
try:
response = await self._client.request(
"PROPFIND",
webdav_path,
headers={"Depth": "0"},
content=propfind_body,
)
response.raise_for_status()
except HTTPStatusError as e:
if e.response.status_code == 404:
logger.debug(f"File not found: {path}")
return None
raise
# Parse XML response
root = ET.fromstring(response.content)
ns = {
"d": "DAV:",
"oc": "http://owncloud.org/ns",
}
response_elem = root.find("d:response", ns)
if response_elem is None:
return None
propstat = response_elem.find("d:propstat", ns)
if propstat is None:
return None
prop = propstat.find("d:prop", ns)
if prop is None:
return None
# Extract properties
fileid_elem = prop.find("oc:fileid", ns)
displayname_elem = prop.find("d:displayname", ns)
contentlength_elem = prop.find("d:getcontentlength", ns)
contenttype_elem = prop.find("d:getcontenttype", ns)
lastmodified_elem = prop.find("d:getlastmodified", ns)
etag_elem = prop.find("d:getetag", ns)
resourcetype_elem = prop.find("d:resourcetype", ns)
is_directory = (
resourcetype_elem is not None
and resourcetype_elem.find("d:collection", ns) is not None
)
file_info = {
"id": int(fileid_elem.text) if fileid_elem is not None else None,
"path": path,
"name": displayname_elem.text
if displayname_elem is not None
else path.split("/")[-1],
"size": int(contentlength_elem.text)
if contentlength_elem is not None and contentlength_elem.text
else 0,
"content_type": contenttype_elem.text
if contenttype_elem is not None
else "",
"last_modified": lastmodified_elem.text
if lastmodified_elem is not None
else None,
"etag": etag_elem.text.strip('"')
if etag_elem is not None and etag_elem.text
else None,
"is_directory": is_directory,
}
logger.debug(f"Got file info for '{path}': id={file_info['id']}")
return file_info
async def create_tag(
self,
name: str,
user_visible: bool = True,
user_assignable: bool = True,
) -> dict[str, Any]:
"""Create a system tag via WebDAV.
Args:
name: Name of the tag to create
user_visible: Whether the tag is visible to users
user_assignable: Whether users can assign this tag
Returns:
Tag dictionary with id, name, userVisible, userAssignable
Raises:
HTTPStatusError: If tag creation fails (409 if already exists)
"""
# Use WebDAV POST with JSON body to create tag
response = await self._client.post(
"/remote.php/dav/systemtags/",
headers={"Content-Type": "application/json"},
json={
"name": name,
"userVisible": user_visible,
"userAssignable": user_assignable,
},
)
response.raise_for_status()
# Extract tag ID from Content-Location header (e.g., /remote.php/dav/systemtags/42)
content_location = response.headers.get("Content-Location", "")
tag_id = None
if content_location:
# Extract the numeric ID from the path
try:
tag_id = int(content_location.rstrip("/").split("/")[-1])
except (ValueError, IndexError):
pass
tag_info = {
"id": tag_id,
"name": name,
"userVisible": user_visible,
"userAssignable": user_assignable,
}
logger.info(f"Created tag '{name}' with ID {tag_info['id']}")
return tag_info
async def get_or_create_tag(
self,
name: str,
user_visible: bool = True,
user_assignable: bool = True,
) -> dict[str, Any]:
"""Get a tag by name, creating it if it doesn't exist.
Args:
name: Name of the tag
user_visible: Whether the tag is visible to users (for creation)
user_assignable: Whether users can assign this tag (for creation)
Returns:
Tag dictionary with id, name, userVisible, userAssignable
"""
# First try to get existing tag
existing_tag = await self.get_tag_by_name(name)
if existing_tag:
logger.debug(f"Tag '{name}' already exists with ID {existing_tag['id']}")
return existing_tag
# Create new tag
try:
return await self.create_tag(name, user_visible, user_assignable)
except HTTPStatusError as e:
if e.response.status_code == 409:
# Tag was created between our check and creation, fetch it
existing_tag = await self.get_tag_by_name(name)
if existing_tag:
return existing_tag
raise
async def assign_tag_to_file(self, file_id: int, tag_id: int) -> bool:
"""Assign a system tag to a file.
Args:
file_id: Numeric file ID
tag_id: Numeric tag ID
Returns:
True if tag was assigned successfully (or already assigned)
Raises:
HTTPStatusError: If tag assignment fails
"""
response = await self._client.request(
"PUT",
f"/remote.php/dav/systemtags-relations/files/{file_id}/{tag_id}",
headers={"Content-Length": "0"},
content=b"",
)
# 201 = Created (new assignment), 409 = Conflict (already assigned)
if response.status_code in (201, 409):
logger.info(f"Tagged file {file_id} with tag {tag_id}")
return True
response.raise_for_status()
return True
async def remove_tag_from_file(self, file_id: int, tag_id: int) -> bool:
"""Remove a system tag from a file.
Args:
file_id: Numeric file ID
tag_id: Numeric tag ID
Returns:
True if tag was removed successfully (or wasn't assigned)
Raises:
HTTPStatusError: If tag removal fails
"""
response = await self._client.request(
"DELETE",
f"/remote.php/dav/systemtags-relations/files/{file_id}/{tag_id}",
)
# 204 = No Content (removed), 404 = Not Found (wasn't assigned)
if response.status_code in (204, 404):
logger.info(f"Removed tag {tag_id} from file {file_id}")
return True
response.raise_for_status()
return True
+6 -4
View File
@@ -17,18 +17,20 @@ class AnthropicProvider(Provider):
Note: Anthropic doesn't provide embedding models, only text generation.
"""
def __init__(self, api_key: str, model: str = "claude-3-5-sonnet-20241022"):
def __init__(
self, api_key: str, generation_model: str = "claude-3-5-sonnet-20241022"
):
"""
Initialize Anthropic provider.
Args:
api_key: Anthropic API key
model: Model name (e.g., "claude-3-5-sonnet-20241022")
generation_model: Model name (e.g., "claude-3-5-sonnet-20241022")
"""
self.client = AsyncAnthropic(api_key=api_key)
self.model = model
self.model = generation_model
logger.info(f"Initialized Anthropic provider (model={model})")
logger.info(f"Initialized Anthropic provider (model={self.model})")
@property
def supports_embeddings(self) -> bool:
+51 -9
View File
@@ -7,13 +7,48 @@ Supports:
"""
import logging
from functools import wraps
from openai import AsyncOpenAI
import anyio
from openai import AsyncOpenAI, RateLimitError
from .base import Provider
logger = logging.getLogger(__name__)
# Rate limit retry configuration
MAX_RETRIES = 5
INITIAL_RETRY_DELAY = 2.0 # seconds
MAX_RETRY_DELAY = 60.0 # seconds
def retry_on_rate_limit(func):
"""Decorator to retry on OpenAI rate limit errors with exponential backoff."""
@wraps(func)
async def wrapper(*args, **kwargs):
retry_delay = INITIAL_RETRY_DELAY
last_error: Exception | None = None
for attempt in range(1, MAX_RETRIES + 1):
try:
return await func(*args, **kwargs)
except RateLimitError as e:
last_error = e
if attempt < MAX_RETRIES:
logger.warning(
f"Rate limit hit (attempt {attempt}/{MAX_RETRIES}), "
f"retrying in {retry_delay:.1f}s..."
)
await anyio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, MAX_RETRY_DELAY)
logger.error(f"Rate limit exceeded after {MAX_RETRIES} attempts")
raise last_error # type: ignore[misc]
return wrapper
# Well-known embedding dimensions for OpenAI models
OPENAI_EMBEDDING_DIMENSIONS: dict[str, int] = {
"text-embedding-3-small": 1536,
@@ -86,6 +121,7 @@ class OpenAIProvider(Provider):
"""Whether this provider supports text generation."""
return self.generation_model is not None
@retry_on_rate_limit
async def embed(self, text: str) -> list[float]:
"""
Generate embedding vector for text.
@@ -151,14 +187,8 @@ class OpenAIProvider(Provider):
for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size]
response = await self.client.embeddings.create(
input=batch,
model=self.embedding_model,
)
# Sort by index to maintain order
sorted_data = sorted(response.data, key=lambda x: x.index)
batch_embeddings = [item.embedding for item in sorted_data]
# Use helper method with retry logic for each batch
batch_embeddings = await self._embed_batch_request(batch)
all_embeddings.extend(batch_embeddings)
# Update dimension if not set
@@ -171,6 +201,17 @@ class OpenAIProvider(Provider):
return all_embeddings
@retry_on_rate_limit
async def _embed_batch_request(self, batch: list[str]) -> list[list[float]]:
"""Make a single batch embedding request with retry logic."""
response = await self.client.embeddings.create(
input=batch,
model=self.embedding_model,
)
# Sort by index to maintain order
sorted_data = sorted(response.data, key=lambda x: x.index)
return [item.embedding for item in sorted_data]
def get_dimension(self) -> int:
"""
Get embedding dimension.
@@ -194,6 +235,7 @@ class OpenAIProvider(Provider):
)
return self._dimension
@retry_on_rate_limit
async def generate(self, prompt: str, max_tokens: int = 500) -> str:
"""
Generate text from a prompt.
+82 -67
View File
@@ -9,6 +9,7 @@ from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.embedding import get_bm25_service, get_embedding_service
from nextcloud_mcp_server.observability.metrics import record_qdrant_operation
from nextcloud_mcp_server.observability.tracing import trace_operation
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
@@ -99,15 +100,19 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
)
# Generate dense embedding for semantic search
embedding_service = get_embedding_service()
dense_embedding = await embedding_service.embed(query)
with trace_operation("search.get_embedding_service"):
embedding_service = get_embedding_service()
with trace_operation("search.dense_embedding"):
dense_embedding = await embedding_service.embed(query)
# Store for reuse by callers (e.g., viz_routes PCA visualization)
self.query_embedding = dense_embedding
logger.debug(f"Generated dense embedding (dimension={len(dense_embedding)})")
# Generate sparse embedding for BM25 keyword search
bm25_service = get_bm25_service()
sparse_embedding = await bm25_service.encode_async(query)
with trace_operation("search.get_bm25_service"):
bm25_service = get_bm25_service()
with trace_operation("search.sparse_embedding_bm25"):
sparse_embedding = await bm25_service.encode_async(query)
logger.debug(
f"Generated sparse embedding "
f"({len(sparse_embedding['indices'])} non-zero terms)"
@@ -134,38 +139,44 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
query_filter = Filter(must=filter_conditions)
# Execute hybrid search with Qdrant native RRF fusion
qdrant_client = await get_qdrant_client()
with trace_operation("search.get_qdrant_client"):
qdrant_client = await get_qdrant_client()
try:
# Use prefetch to run both dense and sparse searches
# Qdrant will automatically merge results using RRF
search_response = await qdrant_client.query_points(
collection_name=settings.get_collection_name(),
prefetch=[
# Dense semantic search
models.Prefetch(
query=dense_embedding,
using="dense",
limit=limit * 2, # Get extra for deduplication
filter=query_filter,
),
# Sparse BM25 search
models.Prefetch(
query=models.SparseVector(
indices=sparse_embedding["indices"],
values=sparse_embedding["values"],
with trace_operation(
"search.qdrant_query",
attributes={"query.limit": limit * 2, "query.fusion": self.fusion_name},
):
search_response = await qdrant_client.query_points(
collection_name=settings.get_collection_name(),
prefetch=[
# Dense semantic search
models.Prefetch(
query=dense_embedding,
using="dense",
limit=limit * 2, # Get extra for deduplication
filter=query_filter,
),
using="sparse",
limit=limit * 2, # Get extra for deduplication
filter=query_filter,
),
],
# Fusion query (RRF or DBSF based on initialization)
query=models.FusionQuery(fusion=self.fusion),
limit=limit * 2, # Get extra for deduplication
score_threshold=score_threshold,
with_payload=True,
with_vectors=False, # Don't return vectors to save bandwidth
)
# Sparse BM25 search
models.Prefetch(
query=models.SparseVector(
indices=sparse_embedding["indices"],
values=sparse_embedding["values"],
),
using="sparse",
limit=limit * 2, # Get extra for deduplication
filter=query_filter,
),
],
# Fusion query (RRF or DBSF based on initialization)
query=models.FusionQuery(fusion=self.fusion),
limit=limit * 2, # Get extra for deduplication
score_threshold=score_threshold,
with_payload=True,
with_vectors=False, # Don't return vectors to save bandwidth
)
record_qdrant_operation("search", "success")
except Exception:
record_qdrant_operation("search", "error")
@@ -185,47 +196,51 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
# Deduplicate by (doc_id, doc_type, chunk_start, chunk_end)
# This allows multiple chunks from same doc, but removes duplicate chunks
seen_chunks = set()
results = []
with trace_operation(
"search.deduplicate",
attributes={"dedupe.num_points": len(search_response.points)},
):
seen_chunks = set()
results = []
for result in search_response.points:
# doc_id can be int (notes) or str (files - file paths)
doc_id = result.payload["doc_id"]
doc_type = result.payload.get("doc_type", "note")
chunk_start = result.payload.get("chunk_start_offset")
chunk_end = result.payload.get("chunk_end_offset")
chunk_key = (doc_id, doc_type, chunk_start, chunk_end)
for result in search_response.points:
# doc_id can be int (notes) or str (files - file paths)
doc_id = result.payload["doc_id"]
doc_type = result.payload.get("doc_type", "note")
chunk_start = result.payload.get("chunk_start_offset")
chunk_end = result.payload.get("chunk_end_offset")
chunk_key = (doc_id, doc_type, chunk_start, chunk_end)
# Skip if we've already seen this exact chunk
if chunk_key in seen_chunks:
continue
# Skip if we've already seen this exact chunk
if chunk_key in seen_chunks:
continue
seen_chunks.add(chunk_key)
seen_chunks.add(chunk_key)
# Return unverified results (verification happens at output stage)
results.append(
SearchResult(
id=doc_id,
doc_type=doc_type,
title=result.payload.get("title", "Untitled"),
excerpt=result.payload.get("excerpt", ""),
score=result.score, # Fusion score (RRF or DBSF)
metadata={
"chunk_index": result.payload.get("chunk_index"),
"total_chunks": result.payload.get("total_chunks"),
"search_method": f"bm25_hybrid_{self.fusion_name}",
},
chunk_start_offset=result.payload.get("chunk_start_offset"),
chunk_end_offset=result.payload.get("chunk_end_offset"),
page_number=result.payload.get("page_number"),
chunk_index=result.payload.get("chunk_index", 0),
total_chunks=result.payload.get("total_chunks", 1),
point_id=str(result.id), # Qdrant point ID for batch retrieval
# Return unverified results (verification happens at output stage)
results.append(
SearchResult(
id=doc_id,
doc_type=doc_type,
title=result.payload.get("title", "Untitled"),
excerpt=result.payload.get("excerpt", ""),
score=result.score, # Fusion score (RRF or DBSF)
metadata={
"chunk_index": result.payload.get("chunk_index"),
"total_chunks": result.payload.get("total_chunks"),
"search_method": f"bm25_hybrid_{self.fusion_name}",
},
chunk_start_offset=result.payload.get("chunk_start_offset"),
chunk_end_offset=result.payload.get("chunk_end_offset"),
page_number=result.payload.get("page_number"),
chunk_index=result.payload.get("chunk_index", 0),
total_chunks=result.payload.get("total_chunks", 1),
point_id=str(result.id), # Qdrant point ID for batch retrieval
)
)
)
if len(results) >= limit:
break
if len(results) >= limit:
break
logger.info(f"Returning {len(results)} unverified results after deduplication")
if results:
+14 -5
View File
@@ -499,9 +499,11 @@ def configure_semantic_tools(mcp: FastMCP):
)
# 6. Request LLM completion via MCP sampling with timeout
# Note: 5 minute timeout to accommodate slower local LLMs (e.g., Ollama)
sampling_timeout_seconds = 300
try:
with anyio.fail_after(30):
with anyio.fail_after(sampling_timeout_seconds):
sampling_result = await ctx.session.create_message(
messages=[
SamplingMessage(
@@ -548,14 +550,14 @@ def configure_semantic_tools(mcp: FastMCP):
except TimeoutError:
logger.warning(
f"Sampling request timed out after 30 seconds for query: '{query}', "
f"Sampling request timed out after {sampling_timeout_seconds} seconds for query: '{query}', "
f"returning search results only"
)
return SamplingSearchResponse(
query=query,
generated_answer=(
f"[Sampling request timed out]\n\n"
f"The answer generation took too long (>30s). "
f"The answer generation took too long (>{sampling_timeout_seconds}s). "
f"Found {len(accessible_results)} relevant documents. "
f"Please review the sources below or try a simpler query."
),
@@ -675,15 +677,22 @@ def configure_semantic_tools(mcp: FastMCP):
# Get Qdrant client and query indexed count
indexed_count = 0
try:
from qdrant_client.models import Filter
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.vector.placeholder import (
get_placeholder_filter,
)
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
settings = get_settings()
qdrant_client = await get_qdrant_client()
# Count documents in collection
# Count documents in collection, excluding placeholders
# Placeholders are zero-vector points used to track processing state
count_result = await qdrant_client.count(
collection_name=settings.get_collection_name()
collection_name=settings.get_collection_name(),
count_filter=Filter(must=[get_placeholder_filter()]),
)
indexed_count = count_result.count
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "nextcloud-mcp-server"
version = "0.47.0"
version = "0.48.4"
description = "Model Context Protocol (MCP) server for Nextcloud integration - enables AI assistants to interact with Nextcloud data"
authors = [
{name = "Chris Coutinho", email = "chris@coutinho.io"}
+1 -55
View File
@@ -9,7 +9,6 @@ import pytest
from httpx import HTTPStatusError
from mcp import ClientSession
from mcp.client.session import RequestContext
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client
from mcp.types import ElicitRequestParams, ElicitResult, ErrorData
@@ -172,51 +171,6 @@ async def create_mcp_client_session(
logger.debug(f"{client_name} client session cleaned up successfully")
async def create_mcp_client_session_sse(
url: str,
token: str | None = None,
client_name: str = "MCP",
elicitation_callback: Any = None,
) -> AsyncGenerator[ClientSession, Any]:
"""
Factory function to create an MCP client session using SSE transport.
Similar to create_mcp_client_session but uses SSE transport instead of streamable-http.
Uses native async context managers to ensure correct LIFO cleanup order.
Args:
url: MCP server URL (e.g., "http://localhost:8000/sse")
token: Optional OAuth access token for Bearer authentication
client_name: Client name for logging (e.g., "Basic MCP (SSE)")
elicitation_callback: Optional callback for handling elicitation requests
Yields:
Initialized MCP ClientSession
Note:
SSE transport is being deprecated in favor of streamable-http.
This function exists for compatibility testing only.
"""
logger.info(f"Creating SSE client for {client_name}")
# Prepare headers with OAuth token if provided
headers = {"Authorization": f"Bearer {token}"} if token else None
# Use native async with - Python ensures LIFO cleanup
# Cleanup order will be: ClientSession.__aexit__ -> sse_client.__aexit__
# Note: sse_client yields only (read_stream, write_stream), not 3 values like streamablehttp_client
async with sse_client(url, headers=headers) as (read_stream, write_stream):
async with ClientSession(
read_stream, write_stream, elicitation_callback=elicitation_callback
) as session:
await session.initialize()
logger.info(f"{client_name} client session initialized successfully")
yield session
# Cleanup happens automatically in LIFO order - no exception suppression needed
logger.debug(f"{client_name} client session cleaned up successfully")
@pytest.fixture(scope="session")
async def nc_client(anyio_backend) -> AsyncGenerator[NextcloudClient, Any]:
"""
@@ -255,18 +209,10 @@ async def nc_client(anyio_backend) -> AsyncGenerator[NextcloudClient, Any]:
@pytest.fixture(scope="session")
async def nc_mcp_client(anyio_backend) -> AsyncGenerator[ClientSession, Any]:
"""
Fixture to create an MCP client session for integration tests using SSE transport.
Fixture to create an MCP client session for integration tests using streamable-http.
Uses anyio pytest plugin for proper async fixture handling.
Note: SSE transport is being deprecated. This fixture uses SSE for compatibility testing.
"""
# async for session in create_mcp_client_session_sse(
# url="http://localhost:8000/sse", client_name="Basic MCP (SSE)"
# ):
# yield session
async for session in create_mcp_client_session(
url="http://localhost:8000/mcp",
client_name="Basic MCP (HTTP)",
+26
View File
@@ -0,0 +1,26 @@
"""Pytest configuration for integration tests.
This conftest.py provides hooks and fixtures specific to integration tests,
including the --provider flag for RAG tests.
"""
# Valid provider names
VALID_PROVIDERS = ["openai", "ollama", "anthropic", "bedrock"]
def pytest_addoption(parser):
"""Add --provider command line option for RAG tests."""
parser.addoption(
"--provider",
action="store",
default=None,
choices=VALID_PROVIDERS,
help="LLM provider for RAG tests: openai, ollama, anthropic, bedrock",
)
def pytest_configure(config):
"""Configure custom markers."""
config.addinivalue_line(
"markers", "rag: mark test as RAG integration test (requires --provider flag)"
)
+264
View File
@@ -0,0 +1,264 @@
"""Provider fixtures for integration tests.
This module provides pytest fixtures that configure LLM providers based on
an explicit --provider flag. Supports OpenAI, Ollama, Anthropic, and Bedrock.
Usage:
pytest tests/integration/test_rag.py --provider=openai
pytest tests/integration/test_rag.py --provider=ollama
pytest tests/integration/test_rag.py --provider=anthropic
pytest tests/integration/test_rag.py --provider=bedrock
Environment Variables by Provider:
OpenAI:
OPENAI_API_KEY: API key (required)
OPENAI_BASE_URL: Base URL override (e.g., "https://models.github.ai/inference")
OPENAI_EMBEDDING_MODEL: Embedding model (default: "text-embedding-3-small")
OPENAI_GENERATION_MODEL: Generation model (default: "gpt-4o-mini")
Ollama:
OLLAMA_BASE_URL: API URL (required, e.g., "http://localhost:11434")
OLLAMA_EMBEDDING_MODEL: Embedding model (default: "nomic-embed-text")
OLLAMA_GENERATION_MODEL: Generation model (default: "llama3.2:1b")
Anthropic:
ANTHROPIC_API_KEY: API key (required)
ANTHROPIC_GENERATION_MODEL: Model (default: "claude-3-haiku-20240307")
Bedrock:
AWS_REGION: AWS region (required)
BEDROCK_EMBEDDING_MODEL: Embedding model ID
BEDROCK_GENERATION_MODEL: Generation model ID
"""
import logging
import os
from typing import AsyncGenerator
import pytest
from nextcloud_mcp_server.providers.base import Provider
logger = logging.getLogger(__name__)
# Valid provider names (must match conftest.py)
VALID_PROVIDERS = ["openai", "ollama", "anthropic", "bedrock"]
async def create_generation_provider(provider_name: str) -> Provider:
"""Create a provider configured for text generation.
Args:
provider_name: One of "openai", "ollama", "anthropic", "bedrock"
Returns:
Provider instance configured for generation
Raises:
ValueError: If provider_name is invalid or required env vars missing
"""
if provider_name == "openai":
from nextcloud_mcp_server.providers.openai import OpenAIProvider
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable required")
base_url = os.getenv("OPENAI_BASE_URL")
generation_model = os.getenv("OPENAI_GENERATION_MODEL", "gpt-4o-mini")
# GitHub Models API requires model name prefix
if base_url and "models.github.ai" in base_url:
if not generation_model.startswith("openai/"):
generation_model = f"openai/{generation_model}"
provider = OpenAIProvider(
api_key=api_key,
base_url=base_url,
embedding_model=None, # Generation only
generation_model=generation_model,
)
logger.info(f"Created OpenAI generation provider: model={generation_model}")
return provider
elif provider_name == "ollama":
from nextcloud_mcp_server.providers.ollama import OllamaProvider
base_url = os.getenv("OLLAMA_BASE_URL")
if not base_url:
raise ValueError("OLLAMA_BASE_URL environment variable required")
generation_model = os.getenv("OLLAMA_GENERATION_MODEL", "llama3.2:1b")
provider = OllamaProvider(
base_url=base_url,
embedding_model=None, # Generation only
generation_model=generation_model,
)
logger.info(f"Created Ollama generation provider: model={generation_model}")
return provider
elif provider_name == "anthropic":
from nextcloud_mcp_server.providers.anthropic import AnthropicProvider
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY environment variable required")
generation_model = os.getenv(
"ANTHROPIC_GENERATION_MODEL", "claude-3-haiku-20240307"
)
provider = AnthropicProvider(
api_key=api_key,
generation_model=generation_model,
)
logger.info(f"Created Anthropic generation provider: model={generation_model}")
return provider
elif provider_name == "bedrock":
from nextcloud_mcp_server.providers.bedrock import BedrockProvider
region = os.getenv("AWS_REGION")
if not region:
raise ValueError("AWS_REGION environment variable required")
generation_model = os.getenv("BEDROCK_GENERATION_MODEL")
if not generation_model:
raise ValueError("BEDROCK_GENERATION_MODEL environment variable required")
provider = BedrockProvider(
region=region,
embedding_model=None, # Generation only
generation_model=generation_model,
)
logger.info(f"Created Bedrock generation provider: model={generation_model}")
return provider
else:
raise ValueError(f"Unknown provider: {provider_name}. Valid: {VALID_PROVIDERS}")
async def create_embedding_provider(provider_name: str) -> Provider:
"""Create a provider configured for embeddings.
Args:
provider_name: One of "openai", "ollama", "bedrock"
(Anthropic does not support embeddings)
Returns:
Provider instance configured for embeddings
Raises:
ValueError: If provider_name is invalid, doesn't support embeddings,
or required env vars missing
"""
if provider_name == "anthropic":
raise ValueError("Anthropic does not support embeddings")
if provider_name == "openai":
from nextcloud_mcp_server.providers.openai import OpenAIProvider
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable required")
base_url = os.getenv("OPENAI_BASE_URL")
embedding_model = os.getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-3-small")
# GitHub Models API requires model name prefix
if base_url and "models.github.ai" in base_url:
if not embedding_model.startswith("openai/"):
embedding_model = f"openai/{embedding_model}"
provider = OpenAIProvider(
api_key=api_key,
base_url=base_url,
embedding_model=embedding_model,
generation_model=None, # Embeddings only
)
logger.info(f"Created OpenAI embedding provider: model={embedding_model}")
return provider
elif provider_name == "ollama":
from nextcloud_mcp_server.providers.ollama import OllamaProvider
base_url = os.getenv("OLLAMA_BASE_URL")
if not base_url:
raise ValueError("OLLAMA_BASE_URL environment variable required")
embedding_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text")
provider = OllamaProvider(
base_url=base_url,
embedding_model=embedding_model,
generation_model=None, # Embeddings only
)
logger.info(f"Created Ollama embedding provider: model={embedding_model}")
return provider
elif provider_name == "bedrock":
from nextcloud_mcp_server.providers.bedrock import BedrockProvider
region = os.getenv("AWS_REGION")
if not region:
raise ValueError("AWS_REGION environment variable required")
embedding_model = os.getenv("BEDROCK_EMBEDDING_MODEL")
if not embedding_model:
raise ValueError("BEDROCK_EMBEDDING_MODEL environment variable required")
provider = BedrockProvider(
region=region,
embedding_model=embedding_model,
generation_model=None, # Embeddings only
)
logger.info(f"Created Bedrock embedding provider: model={embedding_model}")
return provider
else:
raise ValueError(f"Unknown provider: {provider_name}. Valid: {VALID_PROVIDERS}")
# =============================================================================
# Pytest Fixtures
# =============================================================================
@pytest.fixture(scope="module")
def provider_name(request) -> str:
"""Get the provider name from --provider flag.
Raises pytest.skip if --provider not specified.
"""
name = request.config.getoption("--provider")
if not name:
pytest.skip("--provider flag required (openai, ollama, anthropic, bedrock)")
return name
@pytest.fixture(scope="module")
async def generation_provider(provider_name: str) -> AsyncGenerator[Provider, None]:
"""Fixture providing a generation-capable provider.
Requires --provider flag to be set.
"""
provider = await create_generation_provider(provider_name)
yield provider
await provider.close()
@pytest.fixture(scope="module")
async def embedding_provider(provider_name: str) -> AsyncGenerator[Provider, None]:
"""Fixture providing an embedding-capable provider.
Requires --provider flag to be set.
Note: Anthropic does not support embeddings - test will fail if used.
"""
if provider_name == "anthropic":
pytest.skip("Anthropic does not support embeddings")
provider = await create_embedding_provider(provider_name)
yield provider
await provider.close()
+49 -23
View File
@@ -1,7 +1,7 @@
"""MCP sampling support for integration tests.
This module provides utilities to enable real LLM-based sampling in integration tests
using OpenAI or GitHub Models API.
using any provider that supports text generation (OpenAI, Ollama, Anthropic, Bedrock).
"""
import logging
@@ -10,46 +10,58 @@ from typing import Any
from mcp import types
from mcp.client.session import ClientSession, RequestContext
from nextcloud_mcp_server.providers.openai import OpenAIProvider
from nextcloud_mcp_server.providers.base import Provider
logger = logging.getLogger(__name__)
def create_openai_sampling_callback(provider: OpenAIProvider):
"""Factory to create a sampling callback using OpenAI provider.
def create_sampling_callback(provider: Provider):
"""Factory to create a sampling callback using any generation-capable provider.
The callback conforms to MCP's SamplingFnT protocol and can be passed
to ClientSession for handling sampling requests from the server.
Args:
provider: OpenAIProvider instance configured with a generation model
provider: Any Provider instance that supports generation
(supports_generation=True)
Returns:
Async callback function for MCP sampling
Raises:
ValueError: If provider doesn't support generation
Example:
```python
provider = OpenAIProvider(
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
generation_model="gpt-4o-mini",
)
callback = create_openai_sampling_callback(provider)
from nextcloud_mcp_server.providers import get_provider
async for session in create_mcp_client_session(
url="http://localhost:8000/mcp",
sampling_callback=callback,
):
# Session now supports sampling
pass
provider = get_provider() # Auto-detect from environment
if provider.supports_generation:
callback = create_sampling_callback(provider)
async for session in create_mcp_client_session(
url="http://localhost:8000/mcp",
sampling_callback=callback,
):
# Session now supports sampling
pass
```
"""
if not provider.supports_generation:
raise ValueError(
f"Provider {provider.__class__.__name__} does not support generation"
)
# Get model name for logging (provider-specific attribute)
model_name = (
getattr(provider, "generation_model", None) or provider.__class__.__name__
)
async def sampling_callback(
context: RequestContext[ClientSession, Any],
params: types.CreateMessageRequestParams,
) -> types.CreateMessageResult | types.ErrorData:
"""Handle sampling requests using OpenAI provider."""
"""Handle sampling requests using the configured provider."""
logger.debug(f"Sampling callback invoked with {len(params.messages)} messages")
# Extract messages and build prompt
@@ -68,14 +80,13 @@ def create_openai_sampling_callback(provider: OpenAIProvider):
logger.debug(f"Generating response for prompt ({len(prompt)} chars)")
try:
# Generate response using OpenAI provider
# Note: temperature is hardcoded in the provider at 0.7
# Generate response using provider
# Note: temperature is typically hardcoded in providers at 0.7
response = await provider.generate(
prompt=prompt,
max_tokens=params.maxTokens,
)
model_name = provider.generation_model or "unknown"
logger.info(f"Sampling completed: {len(response)} chars from {model_name}")
return types.CreateMessageResult(
@@ -85,10 +96,25 @@ def create_openai_sampling_callback(provider: OpenAIProvider):
stopReason="endTurn",
)
except Exception as e:
logger.error(f"OpenAI generation failed: {e}")
logger.error(f"Generation failed ({provider.__class__.__name__}): {e}")
return types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"OpenAI generation failed: {e!s}",
message=f"Generation failed: {e!s}",
)
return sampling_callback
def create_openai_sampling_callback(provider: "Provider"):
"""Factory to create a sampling callback using OpenAI provider.
This is a backward-compatible wrapper around create_sampling_callback().
Prefer using create_sampling_callback() directly for new code.
Args:
provider: OpenAIProvider instance configured with a generation model
Returns:
Async callback function for MCP sampling
"""
return create_sampling_callback(provider)
+403
View File
@@ -0,0 +1,403 @@
"""Integration tests for RAG pipeline with multiple LLM providers.
These tests validate the complete semantic search and MCP sampling flow using:
1. MCP server's built-in semantic search (embeddings handled server-side)
2. MCP sampling for answer generation (any generation-capable provider)
3. Pre-indexed Nextcloud User Manual as the knowledge base
Usage:
# Run with OpenAI (including GitHub Models API)
OPENAI_API_KEY=... pytest tests/integration/test_rag.py --provider=openai -v
# Run with Ollama
OLLAMA_BASE_URL=http://localhost:11434 OLLAMA_GENERATION_MODEL=llama3.2:1b \\
pytest tests/integration/test_rag.py --provider=ollama -v
# Run with Anthropic
ANTHROPIC_API_KEY=... pytest tests/integration/test_rag.py --provider=anthropic -v
# Run with AWS Bedrock
AWS_REGION=us-east-1 BEDROCK_GENERATION_MODEL=... \\
pytest tests/integration/test_rag.py --provider=bedrock -v
Environment Variables:
See tests/integration/provider_fixtures.py for provider-specific configuration.
RAG_MANUAL_PATH: Path to manual PDF in Nextcloud (default: "Nextcloud Manual.pdf")
Prerequisites:
- Nextcloud User Manual PDF uploaded to Nextcloud
- VECTOR_SYNC_ENABLED=true on the MCP server
- Provider-specific environment variables set
"""
import json
import logging
import os
from pathlib import Path
from typing import Any, AsyncGenerator
import anyio
import pytest
from mcp import ClientSession
from nextcloud_mcp_server.providers.base import Provider
from tests.conftest import create_mcp_client_session
from tests.integration.provider_fixtures import create_generation_provider
from tests.integration.sampling_support import create_sampling_callback
logger = logging.getLogger(__name__)
# Default path to the Nextcloud User Manual PDF
DEFAULT_MANUAL_PATH = "Nextcloud Manual.pdf"
async def llm_judge(
provider: Provider,
ground_truth: str,
system_output: str,
) -> bool:
"""Use LLM to judge if system output aligns with ground truth.
Args:
provider: Any provider with generation capability
ground_truth: The expected/reference answer
system_output: The system's actual output to evaluate
Returns:
True if output aligns with ground truth, False otherwise
"""
prompt = f"""GROUND TRUTH: {ground_truth}
SYSTEM OUTPUT: {system_output}
Does the system output contain the key facts from the ground truth?
Answer: TRUE or FALSE"""
logger.info("Received ground truth: %s", ground_truth)
logger.info("Received system output: %s", system_output)
response = await provider.generate(prompt, max_tokens=10)
logger.info("LLM Judge response: %s", response)
return "TRUE" in response.upper()
# Mark all tests as integration tests
pytestmark = [
pytest.mark.integration,
pytest.mark.rag,
]
# Ground truth fixture path
FIXTURES_DIR = Path(__file__).parent / "fixtures"
GROUND_TRUTH_FILE = FIXTURES_DIR / "nextcloud_manual_ground_truth.json"
@pytest.fixture(scope="module")
def ground_truth_qa():
"""Load ground truth Q&A pairs for the Nextcloud manual."""
if not GROUND_TRUTH_FILE.exists():
pytest.skip(f"Ground truth file not found: {GROUND_TRUTH_FILE}")
with open(GROUND_TRUTH_FILE) as f:
return json.load(f)
@pytest.fixture(scope="module")
async def indexed_manual_pdf(nc_client, nc_mcp_client):
"""Ensure the Nextcloud User Manual PDF is tagged and indexed for vector search.
This fixture:
1. Gets file info for the manual PDF
2. Creates/gets the 'vector-index' tag
3. Assigns the tag to the file
4. Waits for vector sync to complete indexing
Environment Variables:
RAG_MANUAL_PATH: Path to manual PDF in Nextcloud (default: Nextcloud Manual.pdf)
"""
manual_path = os.getenv("RAG_MANUAL_PATH", DEFAULT_MANUAL_PATH)
logger.info(f"Setting up indexed manual PDF: {manual_path}")
# Get file info to verify file exists and get file ID
file_info = await nc_client.webdav.get_file_info(manual_path)
if not file_info:
pytest.skip(f"Manual PDF not found at '{manual_path}'")
file_id = file_info["id"]
logger.info(f"Found manual PDF: {manual_path} (file_id={file_id})")
# Create or get the vector-index tag
tag = await nc_client.webdav.get_or_create_tag("vector-index")
tag_id = tag["id"]
logger.info(f"Using tag 'vector-index' (tag_id={tag_id})")
# Assign tag to file
await nc_client.webdav.assign_tag_to_file(file_id, tag_id)
logger.info(f"Tagged file {file_id} with vector-index tag")
# Wait for vector sync to complete indexing
max_attempts = 60
poll_interval = 10
logger.info("Waiting for vector sync to index the manual...")
for attempt in range(1, max_attempts + 1):
try:
# Call the MCP tool via the existing client session
result = await nc_mcp_client.call_tool(
"nc_get_vector_sync_status",
arguments={},
)
if not result.isError:
content = result.structuredContent or {}
indexed = content.get("indexed_count", 0)
pending = content.get("pending_count", 1)
logger.info(
f"Attempt {attempt}/{max_attempts}: "
f"indexed={indexed}, pending={pending}"
)
if indexed > 0 and pending == 0:
logger.info(
f"Vector indexing complete: {indexed} documents indexed"
)
break
except Exception as e:
logger.warning(f"Attempt {attempt}: Error checking status: {e}")
if attempt < max_attempts:
await anyio.sleep(poll_interval)
else:
logger.warning(
f"Vector indexing may not be complete after {max_attempts} attempts"
)
yield {
"path": manual_path,
"file_id": file_id,
"tag_id": tag_id,
}
@pytest.fixture(scope="module")
def provider_name(request) -> str:
"""Get the provider name from --provider flag.
Raises pytest.skip if --provider not specified.
"""
name = request.config.getoption("--provider")
if not name:
pytest.skip("--provider flag required (openai, ollama, anthropic, bedrock)")
return name
@pytest.fixture(scope="module")
async def generation_provider(provider_name: str) -> AsyncGenerator[Provider, None]:
"""Provider configured for text generation.
Requires --provider flag to be set.
"""
provider = await create_generation_provider(provider_name)
yield provider
await provider.close()
@pytest.fixture(scope="module")
async def nc_mcp_client_with_sampling(
anyio_backend, generation_provider, provider_name
) -> AsyncGenerator[ClientSession, Any]:
"""MCP client with sampling support using the specified provider.
This fixture creates an MCP client that can handle sampling requests
from the server using the configured generation provider.
"""
sampling_callback = create_sampling_callback(generation_provider)
async for session in create_mcp_client_session(
url="http://localhost:8000/mcp",
client_name=f"Sampling MCP ({provider_name})",
sampling_callback=sampling_callback,
):
yield session
async def test_semantic_search_retrieval(
nc_mcp_client, ground_truth_qa, indexed_manual_pdf, generation_provider
):
"""Test that semantic search retrieves relevant documents from the manual.
This tests the retrieval component of RAG - ensuring that queries
return relevant chunks from the indexed Nextcloud User Manual.
"""
# Use first query from ground truth
test_case = ground_truth_qa[0] # 2FA question
query = test_case["query"]
# Perform semantic search via MCP tool
result = await nc_mcp_client.call_tool(
"nc_semantic_search",
arguments={
"query": query,
"limit": 5,
"score_threshold": 0.0,
},
)
assert result.isError is False, f"Tool call failed: {result}"
data = result.structuredContent
# Verify we got results
assert data["success"] is True
assert data["total_found"] > 0, f"No results for query: {query}"
assert len(data["results"]) > 0
# Use LLM judge to evaluate if excerpts are relevant to ground truth
all_excerpts = " ".join([r["excerpt"] for r in data["results"]])
is_relevant = await llm_judge(
generation_provider,
test_case["ground_truth"],
all_excerpts,
)
assert is_relevant, f"LLM judge: excerpts not relevant to query: {query}"
async def test_semantic_search_answer_with_sampling(
nc_mcp_client_with_sampling,
ground_truth_qa,
indexed_manual_pdf,
generation_provider,
):
"""Test semantic search with MCP sampling for answer generation.
This tests the full RAG pipeline:
1. Semantic search retrieves relevant documents
2. MCP sampling generates an answer from the retrieved context
3. Provider generates the answer via the sampling callback
Uses nc_mcp_client_with_sampling which has sampling enabled.
"""
# Use the 2FA question - has clear expected answer
test_case = ground_truth_qa[0]
query = test_case["query"]
result = await nc_mcp_client_with_sampling.call_tool(
"nc_semantic_search_answer",
arguments={
"query": query,
"limit": 5,
"score_threshold": 0.0,
"max_answer_tokens": 300,
},
)
assert result.isError is False, f"Tool call failed: {result}"
data = result.structuredContent
# Verify response structure
assert data["success"] is True
assert "query" in data
assert "generated_answer" in data
assert "sources" in data
assert "search_method" in data
# Check for either successful sampling or graceful fallback
fallback_methods = {
"semantic_sampling_unsupported",
"semantic_sampling_user_declined",
"semantic_sampling_timeout",
"semantic_sampling_mcp_error",
"semantic_sampling_fallback",
}
if data["search_method"] in fallback_methods:
# Fallback mode - verify sources still returned
assert len(data["sources"]) > 0, "Expected sources even in fallback mode"
pytest.skip(
f"MCP sampling not available (method: {data['search_method']}), "
f"but retrieval succeeded with {len(data['sources'])} sources"
)
else:
# Successful sampling - verify answer quality
assert data["search_method"] == "semantic_sampling"
assert data["generated_answer"] is not None
assert len(data["generated_answer"]) > 50 # Non-trivial answer
# Use LLM judge to evaluate answer relevance
is_relevant = await llm_judge(
generation_provider,
test_case["ground_truth"],
data["generated_answer"],
)
assert is_relevant, f"LLM judge: answer not relevant to query: {query}"
@pytest.mark.parametrize(
"qa_index,min_expected_results",
[
(0, 1), # 2FA question
(1, 1), # File quotas question
(2, 1), # Linux installation question
(3, 1), # Windows requirements question
(4, 1), # Client apps with 2FA question
],
)
async def test_retrieval_quality_all_queries(
nc_mcp_client, ground_truth_qa, indexed_manual_pdf, qa_index, min_expected_results
):
"""Test retrieval quality for all ground truth queries.
Validates that each query returns at least the minimum expected
number of relevant results from the Nextcloud manual.
"""
if qa_index >= len(ground_truth_qa):
pytest.skip(f"Ground truth index {qa_index} not available")
test_case = ground_truth_qa[qa_index]
query = test_case["query"]
result = await nc_mcp_client.call_tool(
"nc_semantic_search",
arguments={
"query": query,
"limit": 5,
"score_threshold": 0.0,
},
)
assert result.isError is False
data = result.structuredContent
assert data["total_found"] >= min_expected_results, (
f"Query '{query}' returned {data['total_found']} results, "
f"expected at least {min_expected_results}"
)
async def test_no_results_for_unrelated_query(nc_mcp_client, indexed_manual_pdf):
"""Test that completely unrelated queries return low/no scores.
The Nextcloud manual shouldn't have relevant content for
quantum physics queries.
"""
result = await nc_mcp_client.call_tool(
"nc_semantic_search",
arguments={
"query": "quantum entanglement hadron collider particle physics",
"limit": 5,
"score_threshold": 0.5, # Higher threshold to filter irrelevant
},
)
assert result.isError is False
data = result.structuredContent
# Should have few or no high-scoring results
# Low score threshold means we might get some results, but they should be low quality
if data["total_found"] > 0:
# If results exist, they should have low scores
max_score = max(r["score"] for r in data["results"])
assert max_score < 0.8, f"Unexpected high score {max_score} for unrelated query"
-300
View File
@@ -1,300 +0,0 @@
"""Integration tests for RAG pipeline with OpenAI/GitHub Models API.
These tests validate the complete semantic search and MCP sampling flow using:
1. OpenAI embeddings for semantic search
2. MCP sampling for answer generation
3. Pre-indexed Nextcloud User Manual as the knowledge base
Environment Variables:
OPENAI_API_KEY: OpenAI API key or GitHub token for models.github.ai
OPENAI_BASE_URL: Base URL override (e.g., "https://models.github.ai/inference")
OPENAI_EMBEDDING_MODEL: Embedding model (default: "text-embedding-3-small")
OPENAI_GENERATION_MODEL: Generation model for sampling (default: "gpt-4o-mini")
For GitHub CI, set:
OPENAI_API_KEY: ${{ secrets.GITHUB_TOKEN }}
OPENAI_BASE_URL: https://models.github.ai/inference
OPENAI_EMBEDDING_MODEL: openai/text-embedding-3-small
OPENAI_GENERATION_MODEL: openai/gpt-4o-mini
Prerequisites:
- Nextcloud User Manual indexed in Qdrant (via vector sync)
- VECTOR_SYNC_ENABLED=true on the MCP server
"""
import json
import os
from pathlib import Path
from typing import Any, AsyncGenerator
import pytest
from mcp import ClientSession
from nextcloud_mcp_server.providers.openai import OpenAIProvider
from tests.conftest import create_mcp_client_session
from tests.integration.sampling_support import create_openai_sampling_callback
# Skip all tests if OpenAI API key not configured
pytestmark = [
pytest.mark.integration,
pytest.mark.skipif(
not os.getenv("OPENAI_API_KEY"),
reason="OPENAI_API_KEY not set - skipping OpenAI RAG tests",
),
]
# Ground truth fixture path
FIXTURES_DIR = Path(__file__).parent / "fixtures"
GROUND_TRUTH_FILE = FIXTURES_DIR / "nextcloud_manual_ground_truth.json"
@pytest.fixture(scope="module")
def ground_truth_qa():
"""Load ground truth Q&A pairs for the Nextcloud manual."""
if not GROUND_TRUTH_FILE.exists():
pytest.skip(f"Ground truth file not found: {GROUND_TRUTH_FILE}")
with open(GROUND_TRUTH_FILE) as f:
return json.load(f)
@pytest.fixture(scope="module")
async def openai_provider():
"""OpenAI provider configured from environment (embeddings only)."""
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL")
embedding_model = os.getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-3-small")
provider = OpenAIProvider(
api_key=api_key,
base_url=base_url,
embedding_model=embedding_model,
generation_model=None, # Embeddings only
)
yield provider
await provider.close()
@pytest.fixture(scope="module")
async def openai_generation_provider():
"""OpenAI provider configured for text generation (for sampling callback)."""
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL")
generation_model = os.getenv("OPENAI_GENERATION_MODEL", "gpt-4o-mini")
# For GitHub Models API, use the prefixed model name
if base_url and "models.github.ai" in base_url:
if not generation_model.startswith("openai/"):
generation_model = f"openai/{generation_model}"
provider = OpenAIProvider(
api_key=api_key,
base_url=base_url,
embedding_model=None, # Generation only
generation_model=generation_model,
)
yield provider
await provider.close()
@pytest.fixture(scope="module")
async def nc_mcp_client_with_sampling(
anyio_backend, openai_generation_provider
) -> AsyncGenerator[ClientSession, Any]:
"""MCP client with OpenAI-based sampling support.
This fixture creates an MCP client that can handle sampling requests
from the server using OpenAI for text generation.
"""
sampling_callback = create_openai_sampling_callback(openai_generation_provider)
async for session in create_mcp_client_session(
url="http://localhost:8000/mcp",
client_name="OpenAI Sampling MCP",
sampling_callback=sampling_callback,
):
yield session
async def test_openai_embeddings_work(openai_provider: OpenAIProvider):
"""Test that OpenAI embeddings can be generated."""
embedding = await openai_provider.embed("test query about Nextcloud")
assert isinstance(embedding, list)
assert len(embedding) > 0
assert all(isinstance(x, float) for x in embedding)
# OpenAI embedding dimensions: 1536 (small) or 3072 (large)
assert len(embedding) in [1536, 3072]
async def test_semantic_search_retrieval(nc_mcp_client, ground_truth_qa):
"""Test that semantic search retrieves relevant documents from the manual.
This tests the retrieval component of RAG - ensuring that queries
return relevant chunks from the indexed Nextcloud User Manual.
"""
# Use first query from ground truth
test_case = ground_truth_qa[0] # 2FA question
query = test_case["query"]
expected_topics = test_case["expected_topics"]
# Perform semantic search via MCP tool
result = await nc_mcp_client.call_tool(
"nc_semantic_search",
arguments={
"query": query,
"limit": 5,
"score_threshold": 0.0,
},
)
assert result.isError is False, f"Tool call failed: {result}"
data = result.structuredContent
# Verify we got results
assert data["success"] is True
assert data["total_found"] > 0, f"No results for query: {query}"
assert len(data["results"]) > 0
# Check that at least one result contains expected topic keywords
all_excerpts = " ".join([r["excerpt"].lower() for r in data["results"]])
topic_found = any(topic.lower() in all_excerpts for topic in expected_topics)
assert topic_found, (
f"Expected topics {expected_topics} not found in results for query: {query}"
)
async def test_semantic_search_answer_with_sampling(
nc_mcp_client_with_sampling, ground_truth_qa
):
"""Test semantic search with MCP sampling for answer generation.
This tests the full RAG pipeline:
1. Semantic search retrieves relevant documents
2. MCP sampling generates an answer from the retrieved context
3. OpenAI generates the answer via the sampling callback
Uses nc_mcp_client_with_sampling which has OpenAI-based sampling enabled.
"""
# Use the 2FA question - has clear expected answer
test_case = ground_truth_qa[0]
query = test_case["query"]
result = await nc_mcp_client_with_sampling.call_tool(
"nc_semantic_search_answer",
arguments={
"query": query,
"limit": 5,
"score_threshold": 0.0,
"max_answer_tokens": 300,
},
)
assert result.isError is False, f"Tool call failed: {result}"
data = result.structuredContent
# Verify response structure
assert data["success"] is True
assert "query" in data
assert "generated_answer" in data
assert "sources" in data
assert "search_method" in data
# Check for either successful sampling or graceful fallback
fallback_methods = {
"semantic_sampling_unsupported",
"semantic_sampling_user_declined",
"semantic_sampling_timeout",
"semantic_sampling_mcp_error",
"semantic_sampling_fallback",
}
if data["search_method"] in fallback_methods:
# Fallback mode - verify sources still returned
assert len(data["sources"]) > 0, "Expected sources even in fallback mode"
pytest.skip(
f"MCP sampling not available (method: {data['search_method']}), "
f"but retrieval succeeded with {len(data['sources'])} sources"
)
else:
# Successful sampling - verify answer quality
assert data["search_method"] == "semantic_sampling"
assert data["generated_answer"] is not None
assert len(data["generated_answer"]) > 50 # Non-trivial answer
# Check answer contains relevant content
answer_lower = data["generated_answer"].lower()
assert any(
keyword in answer_lower
for keyword in ["two-factor", "2fa", "authentication", "password"]
), f"Answer doesn't seem relevant to query: {data['generated_answer'][:200]}"
@pytest.mark.parametrize(
"qa_index,min_expected_results",
[
(0, 1), # 2FA question
(1, 1), # File quotas question
(2, 1), # Linux installation question
(3, 1), # Windows requirements question
(4, 1), # Client apps with 2FA question
],
)
async def test_retrieval_quality_all_queries(
nc_mcp_client, ground_truth_qa, qa_index, min_expected_results
):
"""Test retrieval quality for all ground truth queries.
Validates that each query returns at least the minimum expected
number of relevant results from the Nextcloud manual.
"""
if qa_index >= len(ground_truth_qa):
pytest.skip(f"Ground truth index {qa_index} not available")
test_case = ground_truth_qa[qa_index]
query = test_case["query"]
result = await nc_mcp_client.call_tool(
"nc_semantic_search",
arguments={
"query": query,
"limit": 5,
"score_threshold": 0.0,
},
)
assert result.isError is False
data = result.structuredContent
assert data["total_found"] >= min_expected_results, (
f"Query '{query}' returned {data['total_found']} results, "
f"expected at least {min_expected_results}"
)
async def test_no_results_for_unrelated_query(nc_mcp_client):
"""Test that completely unrelated queries return low/no scores.
The Nextcloud manual shouldn't have relevant content for
quantum physics queries.
"""
result = await nc_mcp_client.call_tool(
"nc_semantic_search",
arguments={
"query": "quantum entanglement hadron collider particle physics",
"limit": 5,
"score_threshold": 0.5, # Higher threshold to filter irrelevant
},
)
assert result.isError is False
data = result.structuredContent
# Should have few or no high-scoring results
# Low score threshold means we might get some results, but they should be low quality
if data["total_found"] > 0:
# If results exist, they should have low scores
max_score = max(r["score"] for r in data["results"])
assert max_score < 0.8, f"Unexpected high score {max_score} for unrelated query"
+241
View File
@@ -117,3 +117,244 @@ def test_parse_search_response_with_empty_tags(mocker):
assert len(results) == 1
assert "tags" in results[0]
assert results[0]["tags"] == []
@pytest.mark.unit
async def test_get_file_info_returns_file_details(mocker):
"""Test that get_file_info returns file info including file ID."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock PROPFIND response
mock_response = AsyncMock()
mock_response.status_code = 207
mock_response.content = b"""<?xml version="1.0"?>
<d:multistatus xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
<d:response>
<d:href>/remote.php/dav/files/testuser/Documents/test.pdf</d:href>
<d:propstat>
<d:prop>
<oc:fileid>12345</oc:fileid>
<d:displayname>test.pdf</d:displayname>
<d:getcontentlength>1024</d:getcontentlength>
<d:getcontenttype>application/pdf</d:getcontenttype>
<d:getlastmodified>Sat, 01 Jan 2025 00:00:00 GMT</d:getlastmodified>
<d:getetag>"abc123"</d:getetag>
<d:resourcetype/>
</d:prop>
</d:propstat>
</d:response>
</d:multistatus>"""
mock_response.raise_for_status = mocker.Mock()
mock_http_client.request = AsyncMock(return_value=mock_response)
# Call get_file_info
result = await client.get_file_info("Documents/test.pdf")
# Verify result
assert result is not None
assert result["id"] == 12345
assert result["name"] == "test.pdf"
assert result["path"] == "Documents/test.pdf"
assert result["content_type"] == "application/pdf"
assert result["size"] == 1024
assert result["etag"] == "abc123"
assert result["is_directory"] is False
@pytest.mark.unit
async def test_get_file_info_returns_none_for_missing_file(mocker):
"""Test that get_file_info returns None for missing files."""
from httpx import HTTPStatusError, Response
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock 404 response
mock_response = mocker.Mock(spec=Response)
mock_response.status_code = 404
mock_http_client.request = AsyncMock(
side_effect=HTTPStatusError(
"Not Found", request=mocker.Mock(), response=mock_response
)
)
# Call get_file_info
result = await client.get_file_info("nonexistent.pdf")
# Verify result is None
assert result is None
@pytest.mark.unit
async def test_create_tag_creates_system_tag(mocker):
"""Test that create_tag creates a system tag via OCS API."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock OCS response
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json = mocker.Mock(
return_value={
"ocs": {
"data": {
"id": 42,
"name": "vector-index",
"userVisible": True,
"userAssignable": True,
}
}
}
)
mock_response.raise_for_status = mocker.Mock()
mock_http_client.post = AsyncMock(return_value=mock_response)
# Call create_tag
result = await client.create_tag("vector-index")
# Verify result
assert result["id"] == 42
assert result["name"] == "vector-index"
assert result["userVisible"] is True
assert result["userAssignable"] is True
# Verify API call
mock_http_client.post.assert_called_once()
call_args = mock_http_client.post.call_args
assert call_args[0][0] == "/ocs/v2.php/apps/systemtags/api/v1/tags"
assert call_args[1]["json"]["name"] == "vector-index"
@pytest.mark.unit
async def test_get_or_create_tag_returns_existing_tag(mocker):
"""Test that get_or_create_tag returns existing tag without creating."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock existing tag
mocker.patch.object(
client,
"get_tag_by_name",
return_value={"id": 42, "name": "vector-index", "userVisible": True},
)
mock_create = mocker.patch.object(client, "create_tag")
# Call get_or_create_tag
result = await client.get_or_create_tag("vector-index")
# Verify existing tag returned without creating
assert result["id"] == 42
mock_create.assert_not_called()
@pytest.mark.unit
async def test_get_or_create_tag_creates_new_tag(mocker):
"""Test that get_or_create_tag creates tag when not found."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock no existing tag
mocker.patch.object(client, "get_tag_by_name", return_value=None)
mocker.patch.object(
client,
"create_tag",
return_value={"id": 42, "name": "vector-index", "userVisible": True},
)
# Call get_or_create_tag
result = await client.get_or_create_tag("vector-index")
# Verify tag was created
assert result["id"] == 42
client.create_tag.assert_called_once_with("vector-index", True, True)
@pytest.mark.unit
async def test_assign_tag_to_file_success(mocker):
"""Test that assign_tag_to_file assigns tag via WebDAV."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock 201 Created response
mock_response = AsyncMock()
mock_response.status_code = 201
mock_http_client.request = AsyncMock(return_value=mock_response)
# Call assign_tag_to_file
result = await client.assign_tag_to_file(12345, 42)
# Verify result
assert result is True
# Verify API call
mock_http_client.request.assert_called_once()
call_args = mock_http_client.request.call_args
assert call_args[0][0] == "PUT"
assert "/systemtags-relations/files/12345/42" in call_args[0][1]
@pytest.mark.unit
async def test_assign_tag_to_file_already_assigned(mocker):
"""Test that assign_tag_to_file handles already assigned (409) gracefully."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock 409 Conflict response (already assigned)
mock_response = AsyncMock()
mock_response.status_code = 409
mock_http_client.request = AsyncMock(return_value=mock_response)
# Call assign_tag_to_file
result = await client.assign_tag_to_file(12345, 42)
# Verify result (should succeed even with 409)
assert result is True
@pytest.mark.unit
async def test_remove_tag_from_file_success(mocker):
"""Test that remove_tag_from_file removes tag via WebDAV."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock 204 No Content response
mock_response = AsyncMock()
mock_response.status_code = 204
mock_http_client.request = AsyncMock(return_value=mock_response)
# Call remove_tag_from_file
result = await client.remove_tag_from_file(12345, 42)
# Verify result
assert result is True
# Verify API call
mock_http_client.request.assert_called_once()
call_args = mock_http_client.request.call_args
assert call_args[0][0] == "DELETE"
assert "/systemtags-relations/files/12345/42" in call_args[0][1]
@pytest.mark.unit
async def test_remove_tag_from_file_not_assigned(mocker):
"""Test that remove_tag_from_file handles not assigned (404) gracefully."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock 404 Not Found response (tag wasn't assigned)
mock_response = AsyncMock()
mock_response.status_code = 404
mock_http_client.request = AsyncMock(return_value=mock_response)
# Call remove_tag_from_file
result = await client.remove_tag_from_file(12345, 42)
# Verify result (should succeed even with 404)
assert result is True
Generated
+1 -1
View File
@@ -1936,7 +1936,7 @@ wheels = [
[[package]]
name = "nextcloud-mcp-server"
version = "0.47.0"
version = "0.48.4"
source = { editable = "." }
dependencies = [
{ name = "aiosqlite" },