Compare commits

...

18 Commits

Author SHA1 Message Date
github-actions[bot] 626c4bf562 bump: version 0.47.0 → 0.48.0 2025-11-23 00:53:24 +00:00
Chris Coutinho a56b3f3d51 Merge pull request #347 from cbcoutinho/feature/openai-provider-support
feature/openai provider support
2025-11-23 01:52:55 +01:00
Chris Coutinho 2896fa1dc9 feat: Add tag management methods to WebDAV client
- Add get_file_info() to get file info including file ID via PROPFIND
- Add create_tag() to create system tags via OCS API
- Add get_or_create_tag() for idempotent tag creation
- Add assign_tag_to_file() to assign tags to files via WebDAV
- Add remove_tag_from_file() to remove tags from files

Also refactors RAG evaluation:
- Add indexed_manual_pdf fixture using existing nc_client/nc_mcp_client
- Remove manual tag creation steps from workflow (now handled by fixture)
- Add comprehensive unit tests for new WebDAV methods

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-23 01:51:42 +01:00
Chris Coutinho 04251401aa ci: Add permissions to github token 2025-11-23 01:26:22 +01:00
github-actions[bot] e86b6e83ae bump: version 0.46.2 → 0.47.0 2025-11-23 00:23:47 +00:00
Chris Coutinho 6f5e75da15 Merge pull request #346 from cbcoutinho/feature/openai-provider-support
feat: Add OpenAI provider support for embeddings and generation
2025-11-23 01:23:18 +01:00
Chris Coutinho b2742aab80 ci: Add RAG evaluation workflow with workflow_dispatch
Adds a manually-triggered GitHub Actions workflow for RAG evaluation:
- Builds Nextcloud User Manual PDF from documentation source
- Uploads PDF to Nextcloud via WebDAV
- Tags file with 'vector-index' for vector sync indexing
- Waits for vector sync to complete
- Runs RAG integration tests with OpenAI/GitHub Models API

Inputs:
- embedding_model: OpenAI embedding model (default: openai/text-embedding-3-small)
- generation_model: OpenAI generation model (default: openai/gpt-4o-mini)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-23 01:22:16 +01:00
Chris Coutinho 208365cd3d feat: Add OpenAI provider support for embeddings and generation
Adds OpenAI provider to the unified provider architecture (ADR-015),
supporting:
- OpenAI API (api.openai.com)
- GitHub Models API (models.github.ai/inference)
- OpenAI-compatible endpoints (Fireworks, Together, etc.)

Features:
- Embedding support with text-embedding-3-small/large models
- Text generation via chat completions API
- Automatic retry with exponential backoff for rate limits
- Provider auto-detection in registry (priority after Bedrock)

Environment variables:
- OPENAI_API_KEY: API key (required)
- OPENAI_BASE_URL: Base URL override (optional)
- OPENAI_EMBEDDING_MODEL: Embedding model (default: text-embedding-3-small)
- OPENAI_GENERATION_MODEL: Generation model (default: gpt-4o-mini)

Also adds:
- Integration tests for RAG pipeline with MCP sampling
- MCP client sampling support for integration tests
- Ground truth Q&A pairs for Nextcloud User Manual

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-23 00:33:32 +01:00
Chris Coutinho 26f679d86e Merge pull request #332 from cbcoutinho/renovate/docker.io-library-python-3.12-slim-trixie
chore(deps): update docker.io/library/python:3.12-slim-trixie docker digest to b43ff04
2025-11-23 00:29:07 +01:00
Chris Coutinho cf39a15db1 Merge pull request #345 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.9.11
2025-11-23 00:28:53 +01:00
renovate-bot-cbcoutinho[bot] 1f3c35f162 chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.9.11 2025-11-22 23:04:43 +00:00
renovate-bot-cbcoutinho[bot] 2bccc3dad9 chore(deps): update docker.io/library/python:3.12-slim-trixie docker digest to b43ff04 2025-11-22 23:04:40 +00:00
github-actions[bot] 959cb8b21a bump: version 0.46.1 → 0.46.2 2025-11-22 21:02:53 +00:00
Chris Coutinho f8a2410a0a Merge pull request #344 from cbcoutinho/fix/smithery-json-response
fix(smithery): Enable JSON response format for scanner compatibility
2025-11-22 22:02:24 +01:00
Chris Coutinho 03b984d5a7 fix(smithery): Enable JSON response format for scanner compatibility
The Smithery scanner was reporting "0 tools" despite the server returning
valid tool definitions. Root cause: the server was returning SSE-formatted
responses (event: message\ndata: {...}) which the scanner couldn't parse.

Changes:
- Add json_response=True to FastMCP for Smithery stateless mode
- Clean up verbose docstring examples in semantic.py and webdav.py

The MCP spec allows both SSE and plain JSON responses for HTTP transport.
Setting json_response=True returns Content-Type: application/json with
plain JSON-RPC instead of text/event-stream with SSE format.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 22:01:18 +01:00
github-actions[bot] 57db18c6a3 bump: version 0.46.0 → 0.46.1 2025-11-22 18:54:11 +00:00
Chris Coutinho ea79e94842 Merge pull request #343 from cbcoutinho/fix/vector-viz-search
perf: Optimize vector viz search performance
2025-11-22 19:53:40 +01:00
Chris Coutinho b0612cfa0f perf: Optimize vector viz search performance
- Replace sequential Qdrant scroll calls with batch retrieve
  (50 HTTP requests → 1 request, ~50x faster vector fetch)

- Add point_id to SearchResult to enable batch retrieval by Qdrant point ID

- Reuse query embedding from search algorithm in viz_routes
  (eliminates redundant embedding call, saves ~30ms)

- Make BM25 encode() async with thread pool to avoid blocking event loop
  (~4.4s was blocking, now properly async)

- Run PCA computation in thread pool to avoid blocking event loop
  (~1.2s was blocking, now properly async)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-22 19:47:43 +01:00
29 changed files with 1966 additions and 123 deletions
+113
View File
@@ -0,0 +1,113 @@
name: RAG Evaluation
on:
workflow_dispatch:
inputs:
manual_path:
description: 'Path to Nextcloud User Manual PDF in Nextcloud'
required: false
default: 'Nextcloud Manual.pdf'
embedding_model:
description: 'OpenAI embedding model'
required: false
default: 'openai/text-embedding-3-small'
generation_model:
description: 'OpenAI generation model'
required: false
default: 'openai/gpt-4o-mini'
jobs:
rag-evaluation:
runs-on: ubuntu-latest
timeout-minutes: 30
permissions:
models: read
steps:
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
submodules: 'true'
###### Required to build OIDC App ######
- name: Set up php 8.4
uses: shivammathur/setup-php@bf6b4fbd49ca58e4608c9c89fba0b8d90bd2a39f # v2
with:
php-version: 8.4
coverage: none
- name: Install OIDC app composer dependencies
run: |
cd third_party/oidc
composer install --no-dev
###### Required to build OIDC App ######
- name: Run docker compose with vector sync
uses: hoverkraft-tech/compose-action@3846bcd61da338e9eaaf83e7ed0234a12b099b72 # v2.4.1
with:
compose-file: "./docker-compose.yml"
up-flags: "--build"
env:
# Override MCP container environment for OpenAI + vector sync
VECTOR_SYNC_ENABLED: "true"
VECTOR_SYNC_SCAN_INTERVAL: "30"
OPENAI_API_KEY: ${{ secrets.GITHUB_TOKEN }}
OPENAI_BASE_URL: "https://models.github.ai/inference"
OPENAI_EMBEDDING_MODEL: ${{ inputs.embedding_model }}
OPENAI_GENERATION_MODEL: ${{ inputs.generation_model }}
- name: Install the latest version of uv
uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7.1.4
- name: Wait for Nextcloud to be ready
run: |
echo "Waiting for Nextcloud..."
max_attempts=60
attempt=0
until curl -o /dev/null -s -w "%{http_code}\n" http://localhost:8080/ocs/v2.php/apps/serverinfo/api/v1/info | grep -q "401"; do
attempt=$((attempt + 1))
if [ $attempt -ge $max_attempts ]; then
echo "Service did not become ready in time."
exit 1
fi
echo "Attempt $attempt/$max_attempts: Service not ready, sleeping for 5 seconds..."
sleep 5
done
echo "Nextcloud is ready."
- name: Wait for MCP server to be ready
run: |
echo "Waiting for MCP server..."
max_attempts=30
attempt=0
until curl -o /dev/null -s -w "%{http_code}\n" http://localhost:8000/health | grep -q "200"; do
attempt=$((attempt + 1))
if [ $attempt -ge $max_attempts ]; then
echo "MCP server did not become ready in time."
exit 1
fi
echo "Attempt $attempt/$max_attempts: MCP not ready, sleeping for 2 seconds..."
sleep 2
done
echo "MCP server is ready."
- name: Run RAG evaluation tests
env:
NEXTCLOUD_HOST: "http://localhost:8080"
NEXTCLOUD_USERNAME: "admin"
NEXTCLOUD_PASSWORD: "admin"
RAG_MANUAL_PATH: ${{ inputs.manual_path }}
OPENAI_API_KEY: ${{ secrets.GITHUB_TOKEN }}
OPENAI_BASE_URL: "https://models.github.ai/inference"
OPENAI_EMBEDDING_MODEL: ${{ inputs.embedding_model }}
OPENAI_GENERATION_MODEL: ${{ inputs.generation_model }}
run: |
uv run pytest tests/integration/test_rag_openai.py -v --log-cli-level=INFO
- name: Upload test results
if: always()
uses: actions/upload-artifact@v4
with:
name: rag-evaluation-results
path: |
pytest-results.xml
retention-days: 30
+24
View File
@@ -1,3 +1,27 @@
## v0.48.0 (2025-11-23)
### Feat
- Add tag management methods to WebDAV client
## v0.47.0 (2025-11-23)
### Feat
- Add OpenAI provider support for embeddings and generation
## v0.46.2 (2025-11-22)
### Fix
- **smithery**: Enable JSON response format for scanner compatibility
## v0.46.1 (2025-11-22)
### Perf
- Optimize vector viz search performance
## v0.46.0 (2025-11-22)
### Feat
+1 -1
View File
@@ -1,4 +1,4 @@
FROM docker.io/library/python:3.12-slim-trixie@sha256:2e683fc3e18a248aa23b8022f2a3474b072b04fb851efe9b49f6b516a8944939
FROM docker.io/library/python:3.12-slim-trixie@sha256:b43ff04d5df04ad5cabb80890b7ef74e8410e3395b19af970dcd52d7a4bff921
COPY --from=ghcr.io/astral-sh/uv:0.9.11@sha256:5aa820129de0a600924f166aec9cb51613b15b68f1dcd2a02f31a500d2ede568 /uv /uvx /bin/
+2 -2
View File
@@ -12,12 +12,12 @@
# - Per-session app password authentication
# - Multi-user support via Smithery session config
FROM docker.io/library/python:3.12-slim-trixie@sha256:2e683fc3e18a248aa23b8022f2a3474b072b04fb851efe9b49f6b516a8944939
FROM docker.io/library/python:3.12-slim-trixie@sha256:b43ff04d5df04ad5cabb80890b7ef74e8410e3395b19af970dcd52d7a4bff921
WORKDIR /app
# Install uv for fast dependency management
COPY --from=ghcr.io/astral-sh/uv:0.9.10@sha256:29bd45092ea8902c0bbb7f0a338f0494a382b1f4b18355df5be270ade679ff1d /uv /uvx /bin/
COPY --from=ghcr.io/astral-sh/uv:0.9.11@sha256:5aa820129de0a600924f166aec9cb51613b15b68f1dcd2a02f31a500d2ede568 /uv /uvx /bin/
# Install dependencies
# 1. git (required for caldav dependency from git)
+2 -2
View File
@@ -2,8 +2,8 @@ apiVersion: v2
name: nextcloud-mcp-server
description: A Helm chart for Nextcloud MCP Server - enables AI assistants to interact with Nextcloud
type: application
version: 0.46.0
appVersion: "0.46.0"
version: 0.48.0
appVersion: "0.48.0"
keywords:
- nextcloud
- mcp
+5 -1
View File
@@ -1072,7 +1072,11 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
# ADR-016: Use Smithery lifespan for stateless mode, BasicAuth otherwise
if deployment_mode == DeploymentMode.SMITHERY_STATELESS:
logger.info("Configuring MCP server for Smithery stateless mode")
mcp = FastMCP("Nextcloud MCP", lifespan=app_lifespan_smithery)
# json_response=True returns plain JSON-RPC instead of SSE format,
# required for Smithery scanner compatibility
mcp = FastMCP(
"Nextcloud MCP", lifespan=app_lifespan_smithery, json_response=True
)
else:
logger.info("Configuring MCP server for BasicAuth mode")
mcp = FastMCP("Nextcloud MCP", lifespan=app_lifespan_basic)
+44 -57
View File
@@ -218,71 +218,41 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
}
)
# Fetch vectors for specific matching chunks from Qdrant
# Fetch vectors for specific matching chunks from Qdrant using batch retrieve
vector_fetch_start = time.perf_counter()
qdrant_client = await get_qdrant_client()
# Build filters for each specific chunk
from qdrant_client.models import FieldCondition, Filter, MatchValue
chunk_vectors_map = {} # Map (doc_id, chunk_start, chunk_end) -> vector
# Fetch vectors in batches by filtering on chunk-specific fields
for result in search_results:
chunk_start = result.chunk_start_offset
chunk_end = result.chunk_end_offset
# Collect point IDs from search results for batch retrieval
# point_id is the Qdrant internal ID returned by search algorithms
point_ids = [r.point_id for r in search_results if r.point_id]
# Build filter for this specific chunk
must_conditions = [
get_placeholder_filter(), # Always exclude placeholders from user-facing queries
FieldCondition(
key="doc_id",
match=MatchValue(value=result.id),
),
FieldCondition(
key="user_id",
match=MatchValue(value=username),
),
]
# Add chunk position filters if available
if chunk_start is not None:
must_conditions.append(
FieldCondition(
key="chunk_start_offset",
match=MatchValue(value=chunk_start),
)
)
if chunk_end is not None:
must_conditions.append(
FieldCondition(
key="chunk_end_offset",
match=MatchValue(value=chunk_end),
)
)
# Fetch this specific chunk vector
points_response = await qdrant_client.scroll(
if point_ids:
# Single batch retrieve call instead of N sequential scroll calls
# This is ~50x faster for 50 results (1 HTTP request vs 50)
points_response = await qdrant_client.retrieve(
collection_name=settings.get_collection_name(),
scroll_filter=Filter(must=must_conditions),
limit=1, # Only need the first match
ids=point_ids,
with_vectors=["dense"],
with_payload=False,
with_payload=["doc_id", "chunk_start_offset", "chunk_end_offset"],
)
points = points_response[0]
if points:
# Extract dense vector
point = points[0]
# Build chunk_vectors_map from batch response
for point in points_response:
if point.vector is not None:
# If named vectors (dict), extract "dense"
# Extract dense vector (handle both named and unnamed vectors)
if isinstance(point.vector, dict):
vector = point.vector.get("dense")
else:
vector = point.vector
chunk_key = (result.id, chunk_start, chunk_end)
chunk_vectors_map[chunk_key] = vector
if vector is not None and point.payload:
doc_id = point.payload.get("doc_id")
chunk_start = point.payload.get("chunk_start_offset")
chunk_end = point.payload.get("chunk_end_offset")
chunk_key = (doc_id, chunk_start, chunk_end)
chunk_vectors_map[chunk_key] = vector
vector_fetch_duration = time.perf_counter() - vector_fetch_start
@@ -341,16 +311,23 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
chunk_vectors = np.array(chunk_vectors)
# Generate query embedding for visualization
# Reuse query embedding from search algorithm (avoids redundant embedding call)
query_embed_start = time.perf_counter()
from nextcloud_mcp_server.embedding.service import get_embedding_service
if search_algo.query_embedding is not None:
query_embedding = search_algo.query_embedding
logger.info(
f"Reusing query embedding from search algorithm "
f"(dimension={len(query_embedding)})"
)
else:
# Fallback: generate embedding if not available from search
from nextcloud_mcp_server.embedding.service import get_embedding_service
embedding_service = get_embedding_service()
query_embedding = await embedding_service.embed(query)
embedding_service = get_embedding_service()
query_embedding = await embedding_service.embed(query)
logger.info(f"Generated query embedding (dimension={len(query_embedding)})")
query_embed_duration = time.perf_counter() - query_embed_start
logger.info(f"Generated query embedding (dimension={len(query_embedding)})")
# Combine query vector with chunk vectors for PCA
# Query will be the last point in the array
all_vectors = np.vstack([chunk_vectors, np.array([query_embedding])])
@@ -380,9 +357,19 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
)
# Apply PCA dimensionality reduction (768-dim → 3D) on normalized vectors
# Run in thread pool to avoid blocking the event loop (CPU-bound)
pca_start = time.perf_counter()
pca = PCA(n_components=3)
coords_3d = pca.fit_transform(all_vectors_normalized)
def _compute_pca(vectors: np.ndarray) -> tuple[np.ndarray, PCA]:
pca = PCA(n_components=3)
coords = pca.fit_transform(vectors)
return coords, pca
import anyio
coords_3d, pca = await anyio.to_thread.run_sync( # type: ignore[attr-defined]
lambda: _compute_pca(all_vectors_normalized)
)
pca_duration = time.perf_counter() - pca_start
# After fit, these attributes are guaranteed to be set
+230
View File
@@ -1295,3 +1295,233 @@ class WebDAVClient(BaseNextcloudClient):
logger.debug(f"Found {len(files)} files with tag ID {tag_id}")
return files
async def get_file_info(self, path: str) -> dict[str, Any] | None:
"""Get file info including file ID via WebDAV PROPFIND.
Args:
path: Path to the file (relative to user's files directory)
Returns:
File info dictionary with id, name, size, content_type, etc.
Returns None if file not found.
"""
webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}"
propfind_body = """<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
<d:prop>
<oc:fileid/>
<d:displayname/>
<d:getcontentlength/>
<d:getcontenttype/>
<d:getlastmodified/>
<d:getetag/>
<d:resourcetype/>
</d:prop>
</d:propfind>"""
try:
response = await self._client.request(
"PROPFIND",
webdav_path,
headers={"Depth": "0"},
content=propfind_body,
)
response.raise_for_status()
except HTTPStatusError as e:
if e.response.status_code == 404:
logger.debug(f"File not found: {path}")
return None
raise
# Parse XML response
root = ET.fromstring(response.content)
ns = {
"d": "DAV:",
"oc": "http://owncloud.org/ns",
}
response_elem = root.find("d:response", ns)
if response_elem is None:
return None
propstat = response_elem.find("d:propstat", ns)
if propstat is None:
return None
prop = propstat.find("d:prop", ns)
if prop is None:
return None
# Extract properties
fileid_elem = prop.find("oc:fileid", ns)
displayname_elem = prop.find("d:displayname", ns)
contentlength_elem = prop.find("d:getcontentlength", ns)
contenttype_elem = prop.find("d:getcontenttype", ns)
lastmodified_elem = prop.find("d:getlastmodified", ns)
etag_elem = prop.find("d:getetag", ns)
resourcetype_elem = prop.find("d:resourcetype", ns)
is_directory = (
resourcetype_elem is not None
and resourcetype_elem.find("d:collection", ns) is not None
)
file_info = {
"id": int(fileid_elem.text) if fileid_elem is not None else None,
"path": path,
"name": displayname_elem.text
if displayname_elem is not None
else path.split("/")[-1],
"size": int(contentlength_elem.text)
if contentlength_elem is not None and contentlength_elem.text
else 0,
"content_type": contenttype_elem.text
if contenttype_elem is not None
else "",
"last_modified": lastmodified_elem.text
if lastmodified_elem is not None
else None,
"etag": etag_elem.text.strip('"')
if etag_elem is not None and etag_elem.text
else None,
"is_directory": is_directory,
}
logger.debug(f"Got file info for '{path}': id={file_info['id']}")
return file_info
async def create_tag(
self,
name: str,
user_visible: bool = True,
user_assignable: bool = True,
) -> dict[str, Any]:
"""Create a system tag via OCS API.
Args:
name: Name of the tag to create
user_visible: Whether the tag is visible to users
user_assignable: Whether users can assign this tag
Returns:
Tag dictionary with id, name, userVisible, userAssignable
Raises:
HTTPStatusError: If tag creation fails (409 if already exists)
"""
response = await self._client.post(
"/ocs/v2.php/apps/systemtags/api/v1/tags",
headers={
"OCS-APIRequest": "true",
"Content-Type": "application/json",
},
json={
"name": name,
"userVisible": user_visible,
"userAssignable": user_assignable,
},
)
response.raise_for_status()
# Parse OCS response
data = response.json()
ocs_data = data.get("ocs", {}).get("data", {})
tag_info = {
"id": ocs_data.get("id"),
"name": ocs_data.get("name", name),
"userVisible": ocs_data.get("userVisible", user_visible),
"userAssignable": ocs_data.get("userAssignable", user_assignable),
}
logger.info(f"Created tag '{name}' with ID {tag_info['id']}")
return tag_info
async def get_or_create_tag(
self,
name: str,
user_visible: bool = True,
user_assignable: bool = True,
) -> dict[str, Any]:
"""Get a tag by name, creating it if it doesn't exist.
Args:
name: Name of the tag
user_visible: Whether the tag is visible to users (for creation)
user_assignable: Whether users can assign this tag (for creation)
Returns:
Tag dictionary with id, name, userVisible, userAssignable
"""
# First try to get existing tag
existing_tag = await self.get_tag_by_name(name)
if existing_tag:
logger.debug(f"Tag '{name}' already exists with ID {existing_tag['id']}")
return existing_tag
# Create new tag
try:
return await self.create_tag(name, user_visible, user_assignable)
except HTTPStatusError as e:
if e.response.status_code == 409:
# Tag was created between our check and creation, fetch it
existing_tag = await self.get_tag_by_name(name)
if existing_tag:
return existing_tag
raise
async def assign_tag_to_file(self, file_id: int, tag_id: int) -> bool:
"""Assign a system tag to a file.
Args:
file_id: Numeric file ID
tag_id: Numeric tag ID
Returns:
True if tag was assigned successfully (or already assigned)
Raises:
HTTPStatusError: If tag assignment fails
"""
response = await self._client.request(
"PUT",
f"/remote.php/dav/systemtags-relations/files/{file_id}/{tag_id}",
headers={"Content-Length": "0"},
content=b"",
)
# 201 = Created (new assignment), 409 = Conflict (already assigned)
if response.status_code in (201, 409):
logger.info(f"Tagged file {file_id} with tag {tag_id}")
return True
response.raise_for_status()
return True
async def remove_tag_from_file(self, file_id: int, tag_id: int) -> bool:
"""Remove a system tag from a file.
Args:
file_id: Numeric file ID
tag_id: Numeric tag ID
Returns:
True if tag was removed successfully (or wasn't assigned)
Raises:
HTTPStatusError: If tag removal fails
"""
response = await self._client.request(
"DELETE",
f"/remote.php/dav/systemtags-relations/files/{file_id}/{tag_id}",
)
# 204 = No Content (removed), 404 = Not Found (wasn't assigned)
if response.status_code in (204, 404):
logger.info(f"Removed tag {tag_id} from file {file_id}")
return True
response.raise_for_status()
return True
+38 -3
View File
@@ -217,6 +217,11 @@ class Settings:
ollama_embedding_model: str = "nomic-embed-text"
ollama_verify_ssl: bool = True
# OpenAI settings (for embeddings)
openai_api_key: Optional[str] = None
openai_base_url: Optional[str] = None
openai_embedding_model: str = "text-embedding-3-small"
# Document chunking settings (for vector embeddings)
document_chunk_size: int = 2048 # Characters per chunk
document_chunk_overlap: int = 200 # Overlapping characters between chunks
@@ -275,6 +280,29 @@ class Settings:
f"DOCUMENT_CHUNK_OVERLAP ({self.document_chunk_overlap}) cannot be negative."
)
def get_embedding_model_name(self) -> str:
"""
Get the active embedding model name based on provider priority.
Priority order (same as ProviderRegistry):
1. OpenAI - if OPENAI_API_KEY is set
2. Ollama - if OLLAMA_BASE_URL is set
3. Simple - fallback (returns "simple-384")
Returns:
Active embedding model name
"""
# Check OpenAI first (higher priority than Ollama in registry)
if self.openai_api_key:
return self.openai_embedding_model
# Check Ollama
if self.ollama_base_url:
return self.ollama_embedding_model
# Fallback to simple provider indicator
return "simple-384"
def get_collection_name(self) -> str:
"""
Get Qdrant collection name.
@@ -290,8 +318,9 @@ class Settings:
Format: {deployment-id}-{model-name}
Examples:
- "my-deployment-nomic-embed-text" (OTEL_SERVICE_NAME set)
- "mcp-container-all-minilm" (hostname fallback)
- "my-deployment-nomic-embed-text" (Ollama)
- "my-deployment-text-embedding-3-small" (OpenAI)
- "mcp-container-openai-text-embedding-3-small" (hostname fallback)
Returns:
Collection name string
@@ -311,7 +340,7 @@ class Settings:
# Sanitize deployment ID and model name
deployment_id = deployment_id.lower().replace(" ", "-").replace("_", "-")
model_name = self.ollama_embedding_model.replace("/", "-").replace(":", "-")
model_name = self.get_embedding_model_name().replace("/", "-").replace(":", "-")
return f"{deployment_id}-{model_name}"
@@ -371,6 +400,12 @@ def get_settings() -> Settings:
ollama_base_url=os.getenv("OLLAMA_BASE_URL"),
ollama_embedding_model=os.getenv("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text"),
ollama_verify_ssl=os.getenv("OLLAMA_VERIFY_SSL", "true").lower() == "true",
# OpenAI settings
openai_api_key=os.getenv("OPENAI_API_KEY"),
openai_base_url=os.getenv("OPENAI_BASE_URL"),
openai_embedding_model=os.getenv(
"OPENAI_EMBEDDING_MODEL", "text-embedding-3-small"
),
# Document chunking settings
document_chunk_size=int(os.getenv("DOCUMENT_CHUNK_SIZE", "2048")),
document_chunk_overlap=int(os.getenv("DOCUMENT_CHUNK_OVERLAP", "200")),
@@ -37,7 +37,9 @@ class BM25SparseEmbeddingProvider:
def encode(self, text: str) -> dict[str, Any]:
"""
Generate BM25 sparse embedding for a single text.
Generate BM25 sparse embedding for a single text (synchronous).
Note: For async contexts, prefer encode_async() to avoid blocking the event loop.
Args:
text: Input text to encode
@@ -53,6 +55,23 @@ class BM25SparseEmbeddingProvider:
"values": sparse_embedding.values.tolist(),
}
async def encode_async(self, text: str) -> dict[str, Any]:
"""
Generate BM25 sparse embedding for a single text (async).
Runs CPU-bound BM25 encoding in thread pool to avoid blocking the event loop.
Args:
text: Input text to encode
Returns:
Dictionary with 'indices' and 'values' keys for Qdrant sparse vector
"""
import anyio
# Run CPU-bound BM25 encoding in thread pool
return await anyio.to_thread.run_sync(lambda: self.encode(text)) # type: ignore[attr-defined]
async def encode_batch(self, texts: list[str]) -> list[dict[str, Any]]:
"""
Generate BM25 sparse embeddings for multiple texts (batched).
@@ -4,12 +4,14 @@ from .anthropic import AnthropicProvider
from .base import Provider
from .bedrock import BedrockProvider
from .ollama import OllamaProvider
from .openai import OpenAIProvider
from .registry import get_provider, reset_provider
from .simple import SimpleProvider
__all__ = [
"Provider",
"OllamaProvider",
"OpenAIProvider",
"AnthropicProvider",
"SimpleProvider",
"BedrockProvider",
+227
View File
@@ -0,0 +1,227 @@
"""Unified OpenAI provider for embeddings and text generation.
Supports:
- OpenAI's standard API
- GitHub Models API (models.github.ai)
- Any OpenAI-compatible API via base_url override
"""
import logging
from openai import AsyncOpenAI
from .base import Provider
logger = logging.getLogger(__name__)
# Well-known embedding dimensions for OpenAI models
OPENAI_EMBEDDING_DIMENSIONS: dict[str, int] = {
"text-embedding-3-small": 1536,
"text-embedding-3-large": 3072,
"text-embedding-ada-002": 1536,
# GitHub Models API uses openai/ prefix
"openai/text-embedding-3-small": 1536,
"openai/text-embedding-3-large": 3072,
}
class OpenAIProvider(Provider):
"""
OpenAI provider supporting both embeddings and text generation.
Works with:
- OpenAI's standard API (api.openai.com)
- GitHub Models API (models.github.ai)
- Any OpenAI-compatible API (via base_url)
"""
def __init__(
self,
api_key: str,
base_url: str | None = None,
embedding_model: str | None = None,
generation_model: str | None = None,
timeout: float = 120.0,
):
"""
Initialize OpenAI provider.
Args:
api_key: OpenAI API key (or GITHUB_TOKEN for GitHub Models)
base_url: Base URL override (e.g., "https://models.github.ai/inference")
embedding_model: Model for embeddings (e.g., "text-embedding-3-small").
None disables embeddings.
generation_model: Model for text generation (e.g., "gpt-4o-mini").
None disables generation.
timeout: HTTP timeout in seconds (default: 120)
"""
self.embedding_model = embedding_model
self.generation_model = generation_model
self._dimension: int | None = None
# Initialize async client
self.client = AsyncOpenAI(
api_key=api_key,
base_url=base_url,
timeout=timeout,
)
# Try to get known dimension without API call
if embedding_model and embedding_model in OPENAI_EMBEDDING_DIMENSIONS:
self._dimension = OPENAI_EMBEDDING_DIMENSIONS[embedding_model]
logger.info(
f"Initialized OpenAI provider: base_url={base_url or 'default'} "
f"(embedding_model={embedding_model}, generation_model={generation_model}, "
f"dimension={self._dimension})"
)
@property
def supports_embeddings(self) -> bool:
"""Whether this provider supports embedding generation."""
return self.embedding_model is not None
@property
def supports_generation(self) -> bool:
"""Whether this provider supports text generation."""
return self.generation_model is not None
async def embed(self, text: str) -> list[float]:
"""
Generate embedding vector for text.
Args:
text: Input text to embed
Returns:
Vector embedding as list of floats
Raises:
NotImplementedError: If embeddings not enabled (no embedding_model)
"""
if not self.supports_embeddings:
raise NotImplementedError(
"Embedding not supported - no embedding_model configured"
)
response = await self.client.embeddings.create(
input=text,
model=self.embedding_model,
)
embedding = response.data[0].embedding
# Update dimension if not set
if self._dimension is None:
self._dimension = len(embedding)
logger.info(
f"Detected embedding dimension: {self._dimension} "
f"for model {self.embedding_model}"
)
return embedding
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
"""
Generate embeddings for multiple texts using OpenAI's batch API.
OpenAI supports up to 2048 inputs per request.
Args:
texts: List of texts to embed
Returns:
List of vector embeddings
Raises:
NotImplementedError: If embeddings not enabled (no embedding_model)
"""
if not self.supports_embeddings:
raise NotImplementedError(
"Embedding not supported - no embedding_model configured"
)
if not texts:
return []
# OpenAI supports batches up to 2048, but use smaller batches for safety
batch_size = 100
all_embeddings: list[list[float]] = []
for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size]
response = await self.client.embeddings.create(
input=batch,
model=self.embedding_model,
)
# Sort by index to maintain order
sorted_data = sorted(response.data, key=lambda x: x.index)
batch_embeddings = [item.embedding for item in sorted_data]
all_embeddings.extend(batch_embeddings)
# Update dimension if not set
if self._dimension is None and batch_embeddings:
self._dimension = len(batch_embeddings[0])
logger.info(
f"Detected embedding dimension: {self._dimension} "
f"for model {self.embedding_model}"
)
return all_embeddings
def get_dimension(self) -> int:
"""
Get embedding dimension.
Returns:
Vector dimension for the configured embedding model
Raises:
NotImplementedError: If embeddings not enabled (no embedding_model)
RuntimeError: If dimension not detected yet (call embed first)
"""
if not self.supports_embeddings:
raise NotImplementedError(
"Embedding not supported - no embedding_model configured"
)
if self._dimension is None:
raise RuntimeError(
f"Embedding dimension not detected yet for model {self.embedding_model}. "
"Call embed() first or use a known model."
)
return self._dimension
async def generate(self, prompt: str, max_tokens: int = 500) -> str:
"""
Generate text from a prompt.
Args:
prompt: The prompt to generate from
max_tokens: Maximum tokens to generate
Returns:
Generated text
Raises:
NotImplementedError: If generation not enabled (no generation_model)
"""
if not self.supports_generation:
raise NotImplementedError(
"Text generation not supported - no generation_model configured"
)
response = await self.client.chat.completions.create(
model=self.generation_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
temperature=0.7,
)
return response.choices[0].message.content or ""
async def close(self) -> None:
"""Close HTTP client."""
await self.client.close()
+38 -8
View File
@@ -6,6 +6,7 @@ import os
from .base import Provider
from .bedrock import BedrockProvider
from .ollama import OllamaProvider
from .openai import OpenAIProvider
from .simple import SimpleProvider
logger = logging.getLogger(__name__)
@@ -17,8 +18,9 @@ class ProviderRegistry:
Checks environment variables in priority order and creates appropriate provider:
1. Bedrock (AWS_REGION + BEDROCK_*_MODEL)
2. Ollama (OLLAMA_BASE_URL)
3. Simple (fallback for testing/development)
2. OpenAI (OPENAI_API_KEY)
3. Ollama (OLLAMA_BASE_URL)
4. Simple (fallback for testing/development)
"""
@staticmethod
@@ -28,8 +30,9 @@ class ProviderRegistry:
Priority order:
1. Bedrock - if AWS_REGION or BEDROCK_EMBEDDING_MODEL is set
2. Ollama - if OLLAMA_BASE_URL is set
3. Simple - fallback for testing/development
2. OpenAI - if OPENAI_API_KEY is set
3. Ollama - if OLLAMA_BASE_URL is set
4. Simple - fallback for testing/development
Returns:
Provider instance
@@ -42,6 +45,12 @@ class ProviderRegistry:
- BEDROCK_EMBEDDING_MODEL: Model ID for embeddings (e.g., "amazon.titan-embed-text-v2:0")
- BEDROCK_GENERATION_MODEL: Model ID for text generation (e.g., "anthropic.claude-3-sonnet-20240229-v1:0")
OpenAI:
- OPENAI_API_KEY: OpenAI API key (or GITHUB_TOKEN for GitHub Models)
- OPENAI_BASE_URL: Base URL override (e.g., "https://models.github.ai/inference")
- OPENAI_EMBEDDING_MODEL: Model for embeddings (default: "text-embedding-3-small")
- OPENAI_GENERATION_MODEL: Model for text generation (e.g., "gpt-4o-mini")
Ollama:
- OLLAMA_BASE_URL: Ollama API base URL (e.g., "http://localhost:11434")
- OLLAMA_EMBEDDING_MODEL: Model for embeddings (default: "nomic-embed-text")
@@ -70,7 +79,28 @@ class ProviderRegistry:
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
)
# 2. Check for Ollama
# 2. Check for OpenAI
openai_api_key = os.getenv("OPENAI_API_KEY")
if openai_api_key:
base_url = os.getenv("OPENAI_BASE_URL")
embedding_model = os.getenv(
"OPENAI_EMBEDDING_MODEL", "text-embedding-3-small"
)
generation_model = os.getenv("OPENAI_GENERATION_MODEL")
logger.info(
f"Using OpenAI provider: base_url={base_url or 'default'}, "
f"embedding_model={embedding_model}, "
f"generation_model={generation_model}"
)
return OpenAIProvider(
api_key=openai_api_key,
base_url=base_url,
embedding_model=embedding_model,
generation_model=generation_model,
)
# 3. Check for Ollama (local LLM)
ollama_url = os.getenv("OLLAMA_BASE_URL")
if ollama_url:
embedding_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text")
@@ -89,12 +119,12 @@ class ProviderRegistry:
verify_ssl=verify_ssl,
)
# 3. Fallback to Simple provider for development/testing
# 4. Fallback to Simple provider for development/testing
dimension = int(os.getenv("SIMPLE_EMBEDDING_DIMENSION", "384"))
logger.warning(
"No provider configured (AWS_REGION, OLLAMA_BASE_URL not set). "
"No provider configured (AWS_REGION, OPENAI_API_KEY, OLLAMA_BASE_URL not set). "
"Using SimpleProvider for testing/development. "
"For production, configure Bedrock or Ollama."
"For production, configure Bedrock, OpenAI, or Ollama."
)
return SimpleProvider(dimension=dimension)
@@ -140,6 +140,7 @@ class SearchResult:
page_number: Page number for PDF documents (None for other doc types)
chunk_index: Zero-based index of this chunk in the document
total_chunks: Total number of chunks in the document
point_id: Qdrant point ID for batch vector retrieval (None if not from Qdrant)
"""
id: int
@@ -153,6 +154,7 @@ class SearchResult:
page_number: int | None = None
chunk_index: int = 0
total_chunks: int = 1
point_id: str | None = None
def __post_init__(self):
"""Validate score is non-negative.
@@ -172,8 +174,15 @@ class SearchAlgorithm(ABC):
All search algorithms must implement the search() method with consistent
interface, allowing them to be used interchangeably.
Attributes:
query_embedding: The query embedding generated during the last search.
Available after search() completes for algorithms that use embeddings.
Can be reused by callers to avoid redundant embedding generation.
"""
query_embedding: list[float] | None = None
@abstractmethod
async def search(
self,
+4 -1
View File
@@ -101,11 +101,13 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
# Generate dense embedding for semantic search
embedding_service = get_embedding_service()
dense_embedding = await embedding_service.embed(query)
# Store for reuse by callers (e.g., viz_routes PCA visualization)
self.query_embedding = dense_embedding
logger.debug(f"Generated dense embedding (dimension={len(dense_embedding)})")
# Generate sparse embedding for BM25 keyword search
bm25_service = get_bm25_service()
sparse_embedding = bm25_service.encode(query)
sparse_embedding = await bm25_service.encode_async(query)
logger.debug(
f"Generated sparse embedding "
f"({len(sparse_embedding['indices'])} non-zero terms)"
@@ -218,6 +220,7 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
page_number=result.payload.get("page_number"),
chunk_index=result.payload.get("chunk_index", 0),
total_chunks=result.payload.get("total_chunks", 1),
point_id=str(result.id), # Qdrant point ID for batch retrieval
)
)
+3
View File
@@ -78,6 +78,8 @@ class SemanticSearchAlgorithm(SearchAlgorithm):
# Generate embedding for query
embedding_service = get_embedding_service()
query_embedding = await embedding_service.embed(query)
# Store for reuse by callers (e.g., viz_routes PCA visualization)
self.query_embedding = query_embedding
logger.debug(
f"Generated embedding for query (dimension={len(query_embedding)})"
)
@@ -164,6 +166,7 @@ class SemanticSearchAlgorithm(SearchAlgorithm):
page_number=result.payload.get("page_number"),
chunk_index=result.payload.get("chunk_index", 0),
total_chunks=result.payload.get("total_chunks", 1),
point_id=str(result.id), # Qdrant point ID for batch retrieval
)
)
-21
View File
@@ -335,27 +335,6 @@ def configure_semantic_tools(mcp: FastMCP):
Note: Requires MCP client to support sampling. If sampling is unavailable,
the tool gracefully degrades to returning documents with an explanation.
The client may prompt the user to approve the sampling request.
Examples:
>>> # Query about objectives across multiple apps
>>> result = await nc_semantic_search_answer(
... query="What are my Q1 2025 project goals?",
... ctx=ctx
... )
>>> print(result.generated_answer)
"Based on Document 1 (note: Project Kickoff), Document 2 (calendar event:
Q1 Planning Meeting), and Document 3 (deck card: Implement semantic search),
your main goals are: 1) Improve semantic search accuracy by 20%,
2) Deploy new embedding model, 3) Reduce indexing latency..."
>>> # Query about appointments
>>> result = await nc_semantic_search_answer(
... query="When is my next dentist appointment?",
... ctx=ctx,
... limit=10
... )
>>> len(result.sources) # Calendar events and related notes
3
"""
# 1. Retrieve relevant documents via existing semantic search
search_response = await nc_semantic_search(
-14
View File
@@ -64,20 +64,6 @@ def configure_webdav_tools(mcp: FastMCP):
- Text files are decoded to UTF-8
- Documents (PDF, DOCX, etc.) are parsed and text is extracted
- Other binary files are base64 encoded
Examples:
# Read a text file
result = await nc_webdav_read_file("Documents/readme.txt")
logger.info(result['content']) # Decoded text content
# Read a PDF document (automatically parsed)
result = await nc_webdav_read_file("Documents/report.pdf")
logger.info(result['content']) # Extracted text from PDF
logger.info(result['parsing_metadata']) # Document parsing info
# Read a binary file
result = await nc_webdav_read_file("Images/photo.jpg")
logger.info(result['encoding']) # 'base64'
"""
client = await get_client(ctx)
content, content_type = await client.webdav.read_file(path)
+7 -5
View File
@@ -93,27 +93,29 @@ async def get_qdrant_client() -> AsyncQdrantClient:
# Validate dimension matches
if actual_dimension != expected_dimension:
embedding_model = settings.get_embedding_model_name()
raise ValueError(
f"Dimension mismatch for collection '{collection_name}':\n"
f" Expected: {expected_dimension} (from embedding model '{settings.ollama_embedding_model}')\n"
f" Expected: {expected_dimension} (from embedding model '{embedding_model}')\n"
f" Found: {actual_dimension}\n"
f"This usually means you changed the embedding model.\n"
f"Solutions:\n"
f" 1. Delete the old collection: Collection will be recreated with new dimensions\n"
f" 2. Set QDRANT_COLLECTION to use a different collection name\n"
f" 3. Revert OLLAMA_EMBEDDING_MODEL to the original model"
f" 3. Revert to the original embedding model"
)
logger.info(
f"Using existing Qdrant collection: {collection_name} "
f"(dimension={actual_dimension}, model={settings.ollama_embedding_model})"
f"(dimension={actual_dimension}, model={settings.get_embedding_model_name()})"
)
else:
# Collection doesn't exist - create it
embedding_model = settings.get_embedding_model_name()
logger.info(
f"Collection '{collection_name}' not found, creating with "
f"dimension={expected_dimension}, model={settings.ollama_embedding_model}..."
f"dimension={expected_dimension}, model={embedding_model}..."
)
await _qdrant_client.create_collection(
collection_name=collection_name,
@@ -134,7 +136,7 @@ async def get_qdrant_client() -> AsyncQdrantClient:
logger.info(
f"Created Qdrant collection: {collection_name}\n"
f" Dense vector dimension: {expected_dimension}\n"
f" Dense embedding model: {settings.ollama_embedding_model}\n"
f" Dense embedding model: {embedding_model}\n"
f" Sparse vectors: BM25 (for hybrid search)\n"
f" Distance: COSINE\n"
f"Background sync will index all documents with dense + sparse vectors."
+2 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "nextcloud-mcp-server"
version = "0.46.0"
version = "0.48.0"
description = "Model Context Protocol (MCP) server for Nextcloud integration - enables AI assistants to interact with Nextcloud data"
authors = [
{name = "Chris Coutinho", email = "chris@coutinho.io"}
@@ -39,6 +39,7 @@ dependencies = [
"pymupdf>=1.26.6",
"pymupdf4llm>=0.2.2",
"pymupdf-layout>=1.26.6",
"openai>=2.8.1",
]
classifiers = [
"Development Status :: 4 - Beta",
+7 -1
View File
@@ -114,6 +114,7 @@ async def create_mcp_client_session(
token: str | None = None,
client_name: str = "MCP",
elicitation_callback: Any = None,
sampling_callback: Any = None,
) -> AsyncGenerator[ClientSession, Any]:
"""
Factory function to create an MCP client session with proper lifecycle management.
@@ -133,6 +134,8 @@ async def create_mcp_client_session(
client_name: Client name for logging (e.g., "OAuth MCP (Playwright)")
elicitation_callback: Optional callback for handling elicitation requests.
Should match signature: async def callback(context: RequestContext, params: ElicitRequestParams) -> ElicitResult | ErrorData
sampling_callback: Optional callback for handling sampling (LLM generation) requests.
Should match signature: async def callback(context: RequestContext, params: CreateMessageRequestParams) -> CreateMessageResult | ErrorData
Yields:
Initialized MCP ClientSession
@@ -156,7 +159,10 @@ async def create_mcp_client_session(
_,
):
async with ClientSession(
read_stream, write_stream, elicitation_callback=elicitation_callback
read_stream,
write_stream,
elicitation_callback=elicitation_callback,
sampling_callback=sampling_callback,
) as session:
await session.initialize()
logger.info(f"{client_name} client session initialized successfully")
@@ -0,0 +1,37 @@
[
{
"id": "nc-manual-001",
"query": "What is two-factor authentication and how does it protect my Nextcloud account?",
"ground_truth": "Two-factor authentication (2FA) protects your Nextcloud account by requiring two different proofs of identity - something you know (like a password) and something you have (like a code from your phone). The first factor is typically a password, and the second can be a text message or code generated on your phone.",
"expected_topics": ["two-factor authentication", "2FA", "password", "security"],
"difficulty": "easy"
},
{
"id": "nc-manual-002",
"query": "How do file quotas work in Nextcloud when sharing files?",
"ground_truth": "When you share files with other users, the shared files count against the original share owner's quota. When you share a folder and allow others to upload files, all uploaded and edited files count against your quota. Re-shared files still count against the original share owner's quota. Deleted files in trash don't count against quotas until trash exceeds 50% of quota.",
"expected_topics": ["quota", "sharing", "files", "storage"],
"difficulty": "medium"
},
{
"id": "nc-manual-003",
"query": "How do I install the Nextcloud desktop sync client on Linux?",
"ground_truth": "Linux users must follow instructions on the download page to add the appropriate repository for their Linux distribution, install the signing key, and use their package managers to install the desktop sync client. Linux users also need a password manager enabled, such as GNOME Keyring or KWallet, so the sync client can login automatically.",
"expected_topics": ["Linux", "desktop client", "installation", "package manager", "GNOME Keyring", "KWallet"],
"difficulty": "medium"
},
{
"id": "nc-manual-004",
"query": "What are the system requirements for the Nextcloud desktop client on Windows?",
"ground_truth": "The Nextcloud desktop sync client requires Windows 10 or later, 64-bits only.",
"expected_topics": ["Windows", "system requirements", "desktop client"],
"difficulty": "easy"
},
{
"id": "nc-manual-005",
"query": "How do I use client applications with two-factor authentication enabled?",
"ground_truth": "Once you have enabled 2FA, your clients will no longer be able to connect with just your password unless they also support two-factor authentication. To solve this, you should generate device-specific passwords for them. This is managed through the connected browsers and devices settings.",
"expected_topics": ["2FA", "client applications", "device-specific passwords", "app passwords"],
"difficulty": "medium"
}
]
+94
View File
@@ -0,0 +1,94 @@
"""MCP sampling support for integration tests.
This module provides utilities to enable real LLM-based sampling in integration tests
using OpenAI or GitHub Models API.
"""
import logging
from typing import Any
from mcp import types
from mcp.client.session import ClientSession, RequestContext
from nextcloud_mcp_server.providers.openai import OpenAIProvider
logger = logging.getLogger(__name__)
def create_openai_sampling_callback(provider: OpenAIProvider):
"""Factory to create a sampling callback using OpenAI provider.
The callback conforms to MCP's SamplingFnT protocol and can be passed
to ClientSession for handling sampling requests from the server.
Args:
provider: OpenAIProvider instance configured with a generation model
Returns:
Async callback function for MCP sampling
Example:
```python
provider = OpenAIProvider(
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
generation_model="gpt-4o-mini",
)
callback = create_openai_sampling_callback(provider)
async for session in create_mcp_client_session(
url="http://localhost:8000/mcp",
sampling_callback=callback,
):
# Session now supports sampling
pass
```
"""
async def sampling_callback(
context: RequestContext[ClientSession, Any],
params: types.CreateMessageRequestParams,
) -> types.CreateMessageResult | types.ErrorData:
"""Handle sampling requests using OpenAI provider."""
logger.debug(f"Sampling callback invoked with {len(params.messages)} messages")
# Extract messages and build prompt
messages_text = []
for msg in params.messages:
if hasattr(msg.content, "text"):
role_prefix = "User" if msg.role == "user" else "Assistant"
messages_text.append(f"{role_prefix}: {msg.content.text}")
prompt = "\n\n".join(messages_text)
# Add system prompt if provided
if params.systemPrompt:
prompt = f"System: {params.systemPrompt}\n\n{prompt}"
logger.debug(f"Generating response for prompt ({len(prompt)} chars)")
try:
# Generate response using OpenAI provider
# Note: temperature is hardcoded in the provider at 0.7
response = await provider.generate(
prompt=prompt,
max_tokens=params.maxTokens,
)
model_name = provider.generation_model or "unknown"
logger.info(f"Sampling completed: {len(response)} chars from {model_name}")
return types.CreateMessageResult(
role="assistant",
content=types.TextContent(type="text", text=response),
model=model_name,
stopReason="endTurn",
)
except Exception as e:
logger.error(f"OpenAI generation failed: {e}")
return types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"OpenAI generation failed: {e!s}",
)
return sampling_callback
+390
View File
@@ -0,0 +1,390 @@
"""Integration tests for RAG pipeline with OpenAI/GitHub Models API.
These tests validate the complete semantic search and MCP sampling flow using:
1. OpenAI embeddings for semantic search
2. MCP sampling for answer generation
3. Pre-indexed Nextcloud User Manual as the knowledge base
Environment Variables:
OPENAI_API_KEY: OpenAI API key or GitHub token for models.github.ai
OPENAI_BASE_URL: Base URL override (e.g., "https://models.github.ai/inference")
OPENAI_EMBEDDING_MODEL: Embedding model (default: "text-embedding-3-small")
OPENAI_GENERATION_MODEL: Generation model for sampling (default: "gpt-4o-mini")
RAG_MANUAL_PATH: Path to manual PDF in Nextcloud (default: "Nextcloud_User_Manual.pdf")
For GitHub CI, set:
OPENAI_API_KEY: ${{ secrets.GITHUB_TOKEN }}
OPENAI_BASE_URL: https://models.github.ai/inference
OPENAI_EMBEDDING_MODEL: openai/text-embedding-3-small
OPENAI_GENERATION_MODEL: openai/gpt-4o-mini
Prerequisites:
- Nextcloud User Manual PDF uploaded to Nextcloud
- VECTOR_SYNC_ENABLED=true on the MCP server
"""
import json
import logging
import os
from pathlib import Path
from typing import Any, AsyncGenerator
import anyio
import pytest
from mcp import ClientSession
from nextcloud_mcp_server.providers.openai import OpenAIProvider
from tests.conftest import create_mcp_client_session
from tests.integration.sampling_support import create_openai_sampling_callback
logger = logging.getLogger(__name__)
# Default path to the Nextcloud User Manual PDF
DEFAULT_MANUAL_PATH = "Nextcloud Manual.pdf"
# Skip all tests if OpenAI API key not configured
pytestmark = [
pytest.mark.integration,
pytest.mark.skipif(
not os.getenv("OPENAI_API_KEY"),
reason="OPENAI_API_KEY not set - skipping OpenAI RAG tests",
),
]
# Ground truth fixture path
FIXTURES_DIR = Path(__file__).parent / "fixtures"
GROUND_TRUTH_FILE = FIXTURES_DIR / "nextcloud_manual_ground_truth.json"
@pytest.fixture(scope="module")
def ground_truth_qa():
"""Load ground truth Q&A pairs for the Nextcloud manual."""
if not GROUND_TRUTH_FILE.exists():
pytest.skip(f"Ground truth file not found: {GROUND_TRUTH_FILE}")
with open(GROUND_TRUTH_FILE) as f:
return json.load(f)
@pytest.fixture(scope="module")
async def indexed_manual_pdf(nc_client, nc_mcp_client):
"""Ensure the Nextcloud User Manual PDF is tagged and indexed for vector search.
This fixture:
1. Gets file info for the manual PDF
2. Creates/gets the 'vector-index' tag
3. Assigns the tag to the file
4. Waits for vector sync to complete indexing
Environment Variables:
RAG_MANUAL_PATH: Path to manual PDF in Nextcloud (default: Nextcloud Manual.pdf)
"""
manual_path = os.getenv("RAG_MANUAL_PATH", DEFAULT_MANUAL_PATH)
logger.info(f"Setting up indexed manual PDF: {manual_path}")
# Get file info to verify file exists and get file ID
file_info = await nc_client.webdav.get_file_info(manual_path)
if not file_info:
pytest.skip(f"Manual PDF not found at '{manual_path}'")
file_id = file_info["id"]
logger.info(f"Found manual PDF: {manual_path} (file_id={file_id})")
# Create or get the vector-index tag
tag = await nc_client.webdav.get_or_create_tag("vector-index")
tag_id = tag["id"]
logger.info(f"Using tag 'vector-index' (tag_id={tag_id})")
# Assign tag to file
await nc_client.webdav.assign_tag_to_file(file_id, tag_id)
logger.info(f"Tagged file {file_id} with vector-index tag")
# Wait for vector sync to complete indexing
max_attempts = 60
poll_interval = 10
logger.info("Waiting for vector sync to index the manual...")
for attempt in range(1, max_attempts + 1):
try:
# Call the MCP tool via the existing client session
result = await nc_mcp_client.call_tool(
"nc_get_vector_sync_status",
arguments={},
)
if not result.isError:
content = result.structuredContent or {}
indexed = content.get("indexed_count", 0)
pending = content.get("pending_count", 1)
logger.info(
f"Attempt {attempt}/{max_attempts}: "
f"indexed={indexed}, pending={pending}"
)
if indexed > 0 and pending == 0:
logger.info(
f"Vector indexing complete: {indexed} documents indexed"
)
break
except Exception as e:
logger.warning(f"Attempt {attempt}: Error checking status: {e}")
if attempt < max_attempts:
await anyio.sleep(poll_interval)
else:
logger.warning(
f"Vector indexing may not be complete after {max_attempts} attempts"
)
yield {
"path": manual_path,
"file_id": file_id,
"tag_id": tag_id,
}
@pytest.fixture(scope="module")
async def openai_provider():
"""OpenAI provider configured from environment (embeddings only)."""
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL")
embedding_model = os.getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-3-small")
provider = OpenAIProvider(
api_key=api_key,
base_url=base_url,
embedding_model=embedding_model,
generation_model=None, # Embeddings only
)
yield provider
await provider.close()
@pytest.fixture(scope="module")
async def openai_generation_provider():
"""OpenAI provider configured for text generation (for sampling callback)."""
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL")
generation_model = os.getenv("OPENAI_GENERATION_MODEL", "gpt-4o-mini")
# For GitHub Models API, use the prefixed model name
if base_url and "models.github.ai" in base_url:
if not generation_model.startswith("openai/"):
generation_model = f"openai/{generation_model}"
provider = OpenAIProvider(
api_key=api_key,
base_url=base_url,
embedding_model=None, # Generation only
generation_model=generation_model,
)
yield provider
await provider.close()
@pytest.fixture(scope="module")
async def nc_mcp_client_with_sampling(
anyio_backend, openai_generation_provider
) -> AsyncGenerator[ClientSession, Any]:
"""MCP client with OpenAI-based sampling support.
This fixture creates an MCP client that can handle sampling requests
from the server using OpenAI for text generation.
"""
sampling_callback = create_openai_sampling_callback(openai_generation_provider)
async for session in create_mcp_client_session(
url="http://localhost:8000/mcp",
client_name="OpenAI Sampling MCP",
sampling_callback=sampling_callback,
):
yield session
async def test_openai_embeddings_work(openai_provider: OpenAIProvider):
"""Test that OpenAI embeddings can be generated."""
embedding = await openai_provider.embed("test query about Nextcloud")
assert isinstance(embedding, list)
assert len(embedding) > 0
assert all(isinstance(x, float) for x in embedding)
# OpenAI embedding dimensions: 1536 (small) or 3072 (large)
assert len(embedding) in [1536, 3072]
async def test_semantic_search_retrieval(
nc_mcp_client, ground_truth_qa, indexed_manual_pdf
):
"""Test that semantic search retrieves relevant documents from the manual.
This tests the retrieval component of RAG - ensuring that queries
return relevant chunks from the indexed Nextcloud User Manual.
"""
# Use first query from ground truth
test_case = ground_truth_qa[0] # 2FA question
query = test_case["query"]
expected_topics = test_case["expected_topics"]
# Perform semantic search via MCP tool
result = await nc_mcp_client.call_tool(
"nc_semantic_search",
arguments={
"query": query,
"limit": 5,
"score_threshold": 0.0,
},
)
assert result.isError is False, f"Tool call failed: {result}"
data = result.structuredContent
# Verify we got results
assert data["success"] is True
assert data["total_found"] > 0, f"No results for query: {query}"
assert len(data["results"]) > 0
# Check that at least one result contains expected topic keywords
all_excerpts = " ".join([r["excerpt"].lower() for r in data["results"]])
topic_found = any(topic.lower() in all_excerpts for topic in expected_topics)
assert topic_found, (
f"Expected topics {expected_topics} not found in results for query: {query}"
)
async def test_semantic_search_answer_with_sampling(
nc_mcp_client_with_sampling, ground_truth_qa, indexed_manual_pdf
):
"""Test semantic search with MCP sampling for answer generation.
This tests the full RAG pipeline:
1. Semantic search retrieves relevant documents
2. MCP sampling generates an answer from the retrieved context
3. OpenAI generates the answer via the sampling callback
Uses nc_mcp_client_with_sampling which has OpenAI-based sampling enabled.
"""
# Use the 2FA question - has clear expected answer
test_case = ground_truth_qa[0]
query = test_case["query"]
result = await nc_mcp_client_with_sampling.call_tool(
"nc_semantic_search_answer",
arguments={
"query": query,
"limit": 5,
"score_threshold": 0.0,
"max_answer_tokens": 300,
},
)
assert result.isError is False, f"Tool call failed: {result}"
data = result.structuredContent
# Verify response structure
assert data["success"] is True
assert "query" in data
assert "generated_answer" in data
assert "sources" in data
assert "search_method" in data
# Check for either successful sampling or graceful fallback
fallback_methods = {
"semantic_sampling_unsupported",
"semantic_sampling_user_declined",
"semantic_sampling_timeout",
"semantic_sampling_mcp_error",
"semantic_sampling_fallback",
}
if data["search_method"] in fallback_methods:
# Fallback mode - verify sources still returned
assert len(data["sources"]) > 0, "Expected sources even in fallback mode"
pytest.skip(
f"MCP sampling not available (method: {data['search_method']}), "
f"but retrieval succeeded with {len(data['sources'])} sources"
)
else:
# Successful sampling - verify answer quality
assert data["search_method"] == "semantic_sampling"
assert data["generated_answer"] is not None
assert len(data["generated_answer"]) > 50 # Non-trivial answer
# Check answer contains relevant content
answer_lower = data["generated_answer"].lower()
assert any(
keyword in answer_lower
for keyword in ["two-factor", "2fa", "authentication", "password"]
), f"Answer doesn't seem relevant to query: {data['generated_answer'][:200]}"
@pytest.mark.parametrize(
"qa_index,min_expected_results",
[
(0, 1), # 2FA question
(1, 1), # File quotas question
(2, 1), # Linux installation question
(3, 1), # Windows requirements question
(4, 1), # Client apps with 2FA question
],
)
async def test_retrieval_quality_all_queries(
nc_mcp_client, ground_truth_qa, indexed_manual_pdf, qa_index, min_expected_results
):
"""Test retrieval quality for all ground truth queries.
Validates that each query returns at least the minimum expected
number of relevant results from the Nextcloud manual.
"""
if qa_index >= len(ground_truth_qa):
pytest.skip(f"Ground truth index {qa_index} not available")
test_case = ground_truth_qa[qa_index]
query = test_case["query"]
result = await nc_mcp_client.call_tool(
"nc_semantic_search",
arguments={
"query": query,
"limit": 5,
"score_threshold": 0.0,
},
)
assert result.isError is False
data = result.structuredContent
assert data["total_found"] >= min_expected_results, (
f"Query '{query}' returned {data['total_found']} results, "
f"expected at least {min_expected_results}"
)
async def test_no_results_for_unrelated_query(nc_mcp_client, indexed_manual_pdf):
"""Test that completely unrelated queries return low/no scores.
The Nextcloud manual shouldn't have relevant content for
quantum physics queries.
"""
result = await nc_mcp_client.call_tool(
"nc_semantic_search",
arguments={
"query": "quantum entanglement hadron collider particle physics",
"limit": 5,
"score_threshold": 0.5, # Higher threshold to filter irrelevant
},
)
assert result.isError is False
data = result.structuredContent
# Should have few or no high-scoring results
# Low score threshold means we might get some results, but they should be low quality
if data["total_found"] > 0:
# If results exist, they should have low scores
max_score = max(r["score"] for r in data["results"])
assert max_score < 0.8, f"Unexpected high score {max_score} for unrelated query"
+26 -4
View File
@@ -3,8 +3,8 @@
DEPRECATED: This module is maintained for backward compatibility with RAG evaluation tests.
New code should use nextcloud_mcp_server.providers directly.
Supports Ollama (local), Anthropic (cloud), and Bedrock (AWS) providers for both ground truth
generation and evaluation.
Supports Ollama (local), Anthropic (cloud), Bedrock (AWS), and OpenAI (cloud) providers
for both ground truth generation and evaluation.
"""
import os
@@ -13,6 +13,7 @@ from nextcloud_mcp_server.providers import (
AnthropicProvider,
BedrockProvider,
OllamaProvider,
OpenAIProvider,
Provider,
)
@@ -25,11 +26,14 @@ def create_llm_provider(
anthropic_model: str | None = None,
bedrock_region: str | None = None,
bedrock_model: str | None = None,
openai_api_key: str | None = None,
openai_base_url: str | None = None,
openai_model: str | None = None,
) -> Provider:
"""Create an LLM provider from environment variables or arguments.
Args:
provider: Provider type ('ollama', 'anthropic', or 'bedrock').
provider: Provider type ('ollama', 'anthropic', 'bedrock', or 'openai').
Defaults to RAG_EVAL_PROVIDER env var or 'ollama'
ollama_base_url: Ollama base URL. Defaults to RAG_EVAL_OLLAMA_BASE_URL or 'http://localhost:11434'
ollama_model: Ollama model. Defaults to RAG_EVAL_OLLAMA_MODEL or 'llama3.2:1b'
@@ -38,6 +42,9 @@ def create_llm_provider(
bedrock_region: AWS region. Defaults to RAG_EVAL_BEDROCK_REGION or AWS_REGION env var
bedrock_model: Bedrock model ID. Defaults to RAG_EVAL_BEDROCK_MODEL or
'anthropic.claude-3-sonnet-20240229-v1:0'
openai_api_key: OpenAI API key. Defaults to OPENAI_API_KEY env var
openai_base_url: OpenAI base URL. Defaults to OPENAI_BASE_URL (for GitHub Models API)
openai_model: OpenAI model. Defaults to OPENAI_GENERATION_MODEL or 'gpt-4o-mini'
Returns:
Provider instance
@@ -83,7 +90,22 @@ def create_llm_provider(
region_name=region, embedding_model=None, generation_model=model
)
elif provider == "openai":
api_key = openai_api_key or os.environ.get("OPENAI_API_KEY")
if not api_key:
raise ValueError(
"OpenAI API key required. Set OPENAI_API_KEY environment variable."
)
base_url = openai_base_url or os.environ.get("OPENAI_BASE_URL")
model = openai_model or os.environ.get("OPENAI_GENERATION_MODEL", "gpt-4o-mini")
return OpenAIProvider(
api_key=api_key,
base_url=base_url,
embedding_model=None,
generation_model=model,
)
else:
raise ValueError(
f"Invalid provider: {provider}. Must be 'ollama', 'anthropic', or 'bedrock'."
f"Invalid provider: {provider}. Must be 'ollama', 'anthropic', 'bedrock', or 'openai'."
)
+241
View File
@@ -117,3 +117,244 @@ def test_parse_search_response_with_empty_tags(mocker):
assert len(results) == 1
assert "tags" in results[0]
assert results[0]["tags"] == []
@pytest.mark.unit
async def test_get_file_info_returns_file_details(mocker):
"""Test that get_file_info returns file info including file ID."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock PROPFIND response
mock_response = AsyncMock()
mock_response.status_code = 207
mock_response.content = b"""<?xml version="1.0"?>
<d:multistatus xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
<d:response>
<d:href>/remote.php/dav/files/testuser/Documents/test.pdf</d:href>
<d:propstat>
<d:prop>
<oc:fileid>12345</oc:fileid>
<d:displayname>test.pdf</d:displayname>
<d:getcontentlength>1024</d:getcontentlength>
<d:getcontenttype>application/pdf</d:getcontenttype>
<d:getlastmodified>Sat, 01 Jan 2025 00:00:00 GMT</d:getlastmodified>
<d:getetag>"abc123"</d:getetag>
<d:resourcetype/>
</d:prop>
</d:propstat>
</d:response>
</d:multistatus>"""
mock_response.raise_for_status = mocker.Mock()
mock_http_client.request = AsyncMock(return_value=mock_response)
# Call get_file_info
result = await client.get_file_info("Documents/test.pdf")
# Verify result
assert result is not None
assert result["id"] == 12345
assert result["name"] == "test.pdf"
assert result["path"] == "Documents/test.pdf"
assert result["content_type"] == "application/pdf"
assert result["size"] == 1024
assert result["etag"] == "abc123"
assert result["is_directory"] is False
@pytest.mark.unit
async def test_get_file_info_returns_none_for_missing_file(mocker):
"""Test that get_file_info returns None for missing files."""
from httpx import HTTPStatusError, Response
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock 404 response
mock_response = mocker.Mock(spec=Response)
mock_response.status_code = 404
mock_http_client.request = AsyncMock(
side_effect=HTTPStatusError(
"Not Found", request=mocker.Mock(), response=mock_response
)
)
# Call get_file_info
result = await client.get_file_info("nonexistent.pdf")
# Verify result is None
assert result is None
@pytest.mark.unit
async def test_create_tag_creates_system_tag(mocker):
"""Test that create_tag creates a system tag via OCS API."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock OCS response
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json = mocker.Mock(
return_value={
"ocs": {
"data": {
"id": 42,
"name": "vector-index",
"userVisible": True,
"userAssignable": True,
}
}
}
)
mock_response.raise_for_status = mocker.Mock()
mock_http_client.post = AsyncMock(return_value=mock_response)
# Call create_tag
result = await client.create_tag("vector-index")
# Verify result
assert result["id"] == 42
assert result["name"] == "vector-index"
assert result["userVisible"] is True
assert result["userAssignable"] is True
# Verify API call
mock_http_client.post.assert_called_once()
call_args = mock_http_client.post.call_args
assert call_args[0][0] == "/ocs/v2.php/apps/systemtags/api/v1/tags"
assert call_args[1]["json"]["name"] == "vector-index"
@pytest.mark.unit
async def test_get_or_create_tag_returns_existing_tag(mocker):
"""Test that get_or_create_tag returns existing tag without creating."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock existing tag
mocker.patch.object(
client,
"get_tag_by_name",
return_value={"id": 42, "name": "vector-index", "userVisible": True},
)
mock_create = mocker.patch.object(client, "create_tag")
# Call get_or_create_tag
result = await client.get_or_create_tag("vector-index")
# Verify existing tag returned without creating
assert result["id"] == 42
mock_create.assert_not_called()
@pytest.mark.unit
async def test_get_or_create_tag_creates_new_tag(mocker):
"""Test that get_or_create_tag creates tag when not found."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock no existing tag
mocker.patch.object(client, "get_tag_by_name", return_value=None)
mocker.patch.object(
client,
"create_tag",
return_value={"id": 42, "name": "vector-index", "userVisible": True},
)
# Call get_or_create_tag
result = await client.get_or_create_tag("vector-index")
# Verify tag was created
assert result["id"] == 42
client.create_tag.assert_called_once_with("vector-index", True, True)
@pytest.mark.unit
async def test_assign_tag_to_file_success(mocker):
"""Test that assign_tag_to_file assigns tag via WebDAV."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock 201 Created response
mock_response = AsyncMock()
mock_response.status_code = 201
mock_http_client.request = AsyncMock(return_value=mock_response)
# Call assign_tag_to_file
result = await client.assign_tag_to_file(12345, 42)
# Verify result
assert result is True
# Verify API call
mock_http_client.request.assert_called_once()
call_args = mock_http_client.request.call_args
assert call_args[0][0] == "PUT"
assert "/systemtags-relations/files/12345/42" in call_args[0][1]
@pytest.mark.unit
async def test_assign_tag_to_file_already_assigned(mocker):
"""Test that assign_tag_to_file handles already assigned (409) gracefully."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock 409 Conflict response (already assigned)
mock_response = AsyncMock()
mock_response.status_code = 409
mock_http_client.request = AsyncMock(return_value=mock_response)
# Call assign_tag_to_file
result = await client.assign_tag_to_file(12345, 42)
# Verify result (should succeed even with 409)
assert result is True
@pytest.mark.unit
async def test_remove_tag_from_file_success(mocker):
"""Test that remove_tag_from_file removes tag via WebDAV."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock 204 No Content response
mock_response = AsyncMock()
mock_response.status_code = 204
mock_http_client.request = AsyncMock(return_value=mock_response)
# Call remove_tag_from_file
result = await client.remove_tag_from_file(12345, 42)
# Verify result
assert result is True
# Verify API call
mock_http_client.request.assert_called_once()
call_args = mock_http_client.request.call_args
assert call_args[0][0] == "DELETE"
assert "/systemtags-relations/files/12345/42" in call_args[0][1]
@pytest.mark.unit
async def test_remove_tag_from_file_not_assigned(mocker):
"""Test that remove_tag_from_file handles not assigned (404) gracefully."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock 404 Not Found response (tag wasn't assigned)
mock_response = AsyncMock()
mock_response.status_code = 404
mock_http_client.request = AsyncMock(return_value=mock_response)
# Call remove_tag_from_file
result = await client.remove_tag_from_file(12345, 42)
# Verify result (should succeed even with 404)
assert result is True
+292
View File
@@ -0,0 +1,292 @@
"""Unit tests for OpenAI provider."""
from unittest.mock import AsyncMock, MagicMock
import pytest
from nextcloud_mcp_server.providers.openai import (
OPENAI_EMBEDDING_DIMENSIONS,
OpenAIProvider,
)
@pytest.fixture
def mock_openai_client(mocker):
"""Mock OpenAI AsyncClient."""
mock_client = MagicMock()
mock_client.embeddings = MagicMock()
mock_client.chat = MagicMock()
mock_client.chat.completions = MagicMock()
mock_client.close = AsyncMock()
mocker.patch(
"nextcloud_mcp_server.providers.openai.AsyncOpenAI", return_value=mock_client
)
return mock_client
@pytest.mark.unit
async def test_openai_embedding(mock_openai_client):
"""Test OpenAI embedding with text-embedding-3-small."""
# Mock response
mock_embedding_data = MagicMock()
mock_embedding_data.embedding = [0.1, 0.2, 0.3]
mock_embedding_data.index = 0
mock_response = MagicMock()
mock_response.data = [mock_embedding_data]
mock_openai_client.embeddings.create = AsyncMock(return_value=mock_response)
# Create provider
provider = OpenAIProvider(
api_key="test-key",
embedding_model="text-embedding-3-small",
generation_model=None,
)
# Test embedding
embedding = await provider.embed("test text")
assert embedding == [0.1, 0.2, 0.3]
mock_openai_client.embeddings.create.assert_called_once_with(
input="test text",
model="text-embedding-3-small",
)
@pytest.mark.unit
async def test_openai_embedding_batch(mock_openai_client):
"""Test OpenAI batch embedding."""
# Mock response
mock_embedding_data_1 = MagicMock()
mock_embedding_data_1.embedding = [0.1, 0.2, 0.3]
mock_embedding_data_1.index = 0
mock_embedding_data_2 = MagicMock()
mock_embedding_data_2.embedding = [0.4, 0.5, 0.6]
mock_embedding_data_2.index = 1
mock_response = MagicMock()
mock_response.data = [mock_embedding_data_1, mock_embedding_data_2]
mock_openai_client.embeddings.create = AsyncMock(return_value=mock_response)
# Create provider
provider = OpenAIProvider(
api_key="test-key",
embedding_model="text-embedding-3-small",
generation_model=None,
)
# Test batch embedding
embeddings = await provider.embed_batch(["text1", "text2"])
assert len(embeddings) == 2
assert embeddings[0] == [0.1, 0.2, 0.3]
assert embeddings[1] == [0.4, 0.5, 0.6]
mock_openai_client.embeddings.create.assert_called_once_with(
input=["text1", "text2"],
model="text-embedding-3-small",
)
@pytest.mark.unit
async def test_openai_generation(mock_openai_client):
"""Test OpenAI text generation."""
# Mock response
mock_choice = MagicMock()
mock_choice.message.content = "Generated response"
mock_response = MagicMock()
mock_response.choices = [mock_choice]
mock_openai_client.chat.completions.create = AsyncMock(return_value=mock_response)
# Create provider
provider = OpenAIProvider(
api_key="test-key",
embedding_model=None,
generation_model="gpt-4o-mini",
)
# Test generation
text = await provider.generate("test prompt", max_tokens=100)
assert text == "Generated response"
mock_openai_client.chat.completions.create.assert_called_once_with(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "test prompt"}],
max_tokens=100,
temperature=0.7,
)
@pytest.mark.unit
async def test_openai_both_capabilities(mock_openai_client):
"""Test OpenAI with both embedding and generation models."""
# Mock embedding response
mock_embedding_data = MagicMock()
mock_embedding_data.embedding = [0.1, 0.2]
mock_embedding_data.index = 0
mock_embed_response = MagicMock()
mock_embed_response.data = [mock_embedding_data]
mock_openai_client.embeddings.create = AsyncMock(return_value=mock_embed_response)
# Mock generation response
mock_choice = MagicMock()
mock_choice.message.content = "Response"
mock_gen_response = MagicMock()
mock_gen_response.choices = [mock_choice]
mock_openai_client.chat.completions.create = AsyncMock(
return_value=mock_gen_response
)
# Create provider with both models
provider = OpenAIProvider(
api_key="test-key",
embedding_model="text-embedding-3-small",
generation_model="gpt-4o-mini",
)
assert provider.supports_embeddings is True
assert provider.supports_generation is True
# Test both capabilities
embedding = await provider.embed("test")
assert embedding == [0.1, 0.2]
text = await provider.generate("test")
assert text == "Response"
@pytest.mark.unit
async def test_openai_no_embeddings():
"""Test OpenAI provider with no embedding model raises error."""
provider = OpenAIProvider(
api_key="test-key",
embedding_model=None,
generation_model="gpt-4o-mini",
)
assert provider.supports_embeddings is False
with pytest.raises(NotImplementedError, match="no embedding_model configured"):
await provider.embed("test")
with pytest.raises(NotImplementedError, match="no embedding_model configured"):
await provider.embed_batch(["test"])
with pytest.raises(NotImplementedError, match="no embedding_model configured"):
provider.get_dimension()
@pytest.mark.unit
async def test_openai_no_generation():
"""Test OpenAI provider with no generation model raises error."""
provider = OpenAIProvider(
api_key="test-key",
embedding_model="text-embedding-3-small",
generation_model=None,
)
assert provider.supports_generation is False
with pytest.raises(NotImplementedError, match="no generation_model configured"):
await provider.generate("test")
@pytest.mark.unit
async def test_openai_known_dimension():
"""Test dimension detection for known OpenAI models."""
provider = OpenAIProvider(
api_key="test-key",
embedding_model="text-embedding-3-small",
)
# Known model should have dimension set from lookup table
assert provider.get_dimension() == 1536
@pytest.mark.unit
async def test_openai_unknown_dimension_detected(mock_openai_client):
"""Test dimension detection for unknown model via API call."""
# Mock response with specific dimension
mock_embedding_data = MagicMock()
mock_embedding_data.embedding = [0.1] * 768
mock_embedding_data.index = 0
mock_response = MagicMock()
mock_response.data = [mock_embedding_data]
mock_openai_client.embeddings.create = AsyncMock(return_value=mock_response)
provider = OpenAIProvider(
api_key="test-key",
embedding_model="custom-embedding-model",
)
# Dimension not known yet for custom model
with pytest.raises(RuntimeError, match="not detected yet"):
provider.get_dimension()
# Detect dimension via embed call
await provider.embed("test")
# Now dimension should be available
assert provider.get_dimension() == 768
@pytest.mark.unit
async def test_openai_github_models_api(mock_openai_client):
"""Test OpenAI provider with GitHub Models API configuration."""
# Mock response
mock_embedding_data = MagicMock()
mock_embedding_data.embedding = [0.1, 0.2, 0.3]
mock_embedding_data.index = 0
mock_response = MagicMock()
mock_response.data = [mock_embedding_data]
mock_openai_client.embeddings.create = AsyncMock(return_value=mock_response)
# Create provider with GitHub Models configuration
provider = OpenAIProvider(
api_key="ghp_test_token",
base_url="https://models.github.ai/inference",
embedding_model="openai/text-embedding-3-small",
generation_model=None,
)
# Known dimension for GitHub Models prefixed model
assert (
provider.get_dimension()
== OPENAI_EMBEDDING_DIMENSIONS["openai/text-embedding-3-small"]
)
# Test embedding
embedding = await provider.embed("test text")
assert embedding == [0.1, 0.2, 0.3]
@pytest.mark.unit
async def test_openai_empty_batch():
"""Test OpenAI batch embedding with empty list."""
provider = OpenAIProvider(
api_key="test-key",
embedding_model="text-embedding-3-small",
)
embeddings = await provider.embed_batch([])
assert embeddings == []
@pytest.mark.unit
async def test_openai_close(mock_openai_client):
"""Test OpenAI client close."""
provider = OpenAIProvider(
api_key="test-key",
embedding_model="text-embedding-3-small",
)
await provider.close()
mock_openai_client.close.assert_called_once()
+86
View File
@@ -259,3 +259,89 @@ class TestChunkConfigValidation:
match="DOCUMENT_CHUNK_OVERLAP .* must be less than DOCUMENT_CHUNK_SIZE",
):
get_settings()
class TestEmbeddingModelName:
"""Test get_embedding_model_name() method."""
def test_openai_takes_priority(self):
"""Test that OpenAI model is returned when OPENAI_API_KEY is set."""
settings = Settings(
openai_api_key="test-key",
openai_embedding_model="text-embedding-3-large",
ollama_base_url="http://ollama:11434",
ollama_embedding_model="nomic-embed-text",
)
assert settings.get_embedding_model_name() == "text-embedding-3-large"
def test_ollama_used_when_no_openai(self):
"""Test that Ollama model is returned when no OpenAI configured."""
settings = Settings(
ollama_base_url="http://ollama:11434",
ollama_embedding_model="all-minilm",
)
assert settings.get_embedding_model_name() == "all-minilm"
def test_simple_fallback(self):
"""Test fallback to simple provider when nothing configured."""
settings = Settings()
assert settings.get_embedding_model_name() == "simple-384"
@patch.dict(
os.environ,
{
"OPENAI_API_KEY": "test-openai-key",
"OPENAI_EMBEDDING_MODEL": "openai/text-embedding-3-small",
},
clear=True,
)
def test_get_settings_openai_model(self):
"""Test get_settings() loads OpenAI embedding model."""
settings = get_settings()
assert settings.openai_api_key == "test-openai-key"
assert settings.openai_embedding_model == "openai/text-embedding-3-small"
assert settings.get_embedding_model_name() == "openai/text-embedding-3-small"
class TestCollectionNameWithProviders:
"""Test get_collection_name() with different providers."""
def test_collection_name_with_openai(self):
"""Test collection name uses OpenAI model when configured."""
settings = Settings(
openai_api_key="test-key",
openai_embedding_model="text-embedding-3-small",
otel_service_name="my-deployment",
)
assert settings.get_collection_name() == "my-deployment-text-embedding-3-small"
def test_collection_name_with_github_models(self):
"""Test collection name sanitizes GitHub Models prefix."""
settings = Settings(
openai_api_key="ghp_test",
openai_embedding_model="openai/text-embedding-3-small",
otel_service_name="my-deployment",
)
# Slashes should be replaced with dashes
assert (
settings.get_collection_name()
== "my-deployment-openai-text-embedding-3-small"
)
def test_collection_name_with_ollama(self):
"""Test collection name uses Ollama model when no OpenAI."""
settings = Settings(
ollama_base_url="http://ollama:11434",
ollama_embedding_model="nomic-embed-text",
otel_service_name="my-deployment",
)
assert settings.get_collection_name() == "my-deployment-nomic-embed-text"
def test_collection_name_explicit_override(self):
"""Test explicit QDRANT_COLLECTION overrides auto-generation."""
settings = Settings(
qdrant_collection="custom-collection",
openai_api_key="test-key",
openai_embedding_model="text-embedding-3-large",
)
assert settings.get_collection_name() == "custom-collection"
Generated
+22 -1
View File
@@ -1936,7 +1936,7 @@ wheels = [
[[package]]
name = "nextcloud-mcp-server"
version = "0.46.0"
version = "0.48.0"
source = { editable = "." }
dependencies = [
{ name = "aiosqlite" },
@@ -1951,6 +1951,7 @@ dependencies = [
{ name = "jinja2" },
{ name = "langchain-text-splitters" },
{ name = "mcp", extra = ["cli"] },
{ name = "openai" },
{ name = "opentelemetry-api" },
{ name = "opentelemetry-exporter-otlp-proto-grpc" },
{ name = "opentelemetry-instrumentation-asgi" },
@@ -1999,6 +2000,7 @@ requires-dist = [
{ name = "jinja2", specifier = ">=3.1.6" },
{ name = "langchain-text-splitters", specifier = ">=1.0.0" },
{ name = "mcp", extras = ["cli"], specifier = ">=1.22,<1.23" },
{ name = "openai", specifier = ">=2.8.1" },
{ name = "opentelemetry-api", specifier = ">=1.28.2" },
{ name = "opentelemetry-exporter-otlp-proto-grpc", specifier = ">=1.28.2" },
{ name = "opentelemetry-instrumentation-asgi", specifier = ">=0.49b2" },
@@ -2146,6 +2148,25 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b6/ca/862b1e7a639460f0ca25fd5b6135fb42cf9deea86d398a92e44dfda2279d/onnxruntime-1.23.2-cp313-cp313t-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e2b9233c4947907fd1818d0e581c049c41ccc39b2856cc942ff6d26317cee145", size = 17394184, upload-time = "2025-10-22T03:47:08.127Z" },
]
[[package]]
name = "openai"
version = "2.8.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "distro" },
{ name = "httpx" },
{ name = "jiter" },
{ name = "pydantic" },
{ name = "sniffio" },
{ name = "tqdm" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d5/e4/42591e356f1d53c568418dc7e30dcda7be31dd5a4d570bca22acb0525862/openai-2.8.1.tar.gz", hash = "sha256:cb1b79eef6e809f6da326a7ef6038719e35aa944c42d081807bfa1be8060f15f", size = 602490, upload-time = "2025-11-17T22:39:59.549Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/55/4f/dbc0c124c40cb390508a82770fb9f6e3ed162560181a85089191a851c59a/openai-2.8.1-py3-none-any.whl", hash = "sha256:c6c3b5a04994734386e8dad3c00a393f56d3b68a27cd2e8acae91a59e4122463", size = 1022688, upload-time = "2025-11-17T22:39:57.675Z" },
]
[[package]]
name = "opentelemetry-api"
version = "1.38.0"