Compare commits

...

10 Commits

Author SHA1 Message Date
github-actions[bot] 852606ec8b bump: version 0.48.0 → 0.48.1 2025-11-23 03:03:55 +00:00
Chris Coutinho caae6922be Merge pull request #349 from cbcoutinho/feature/openai-provider-support
feature/openai provider support
2025-11-23 04:03:29 +01:00
Chris Coutinho fafeaf3d83 refactor: Move background tasks to server lifespan and deprecate SSE transport
- Move scanner/processor tasks from FastMCP session lifespan to Starlette
  server lifespan (correct architecture: background tasks run once at
  server level, not per-session)
- Change default CLI transport from SSE to streamable-http
- Remove SSE transport option from CLI (SSE is deprecated)
- Remove SSE client session factory from test fixtures
- Add tracing instrumentation to BM25 hybrid search operations for
  better observability

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-23 04:02:30 +01:00
Chris Coutinho 2ab8dad6a5 fix: Use WebDAV for tag creation and add LLM-as-a-judge for RAG tests
- Change create_tag() to use WebDAV POST instead of OCS API which
  returned 404 in some Nextcloud versions
- Add llm_judge() helper that evaluates system output against ground
  truth with simple TRUE/FALSE prompt
- Replace keyword-based assertions in RAG tests with LLM judge for
  more flexible semantic evaluation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-23 02:24:01 +01:00
Chris Coutinho 50216accde Merge pull request #348 from cbcoutinho/feature/openai-provider-support
feature/openai provider support
2025-11-23 01:56:49 +01:00
Chris Coutinho bf2fdac2d0 ci: Fix health endpoint 2025-11-23 01:56:17 +01:00
github-actions[bot] 626c4bf562 bump: version 0.47.0 → 0.48.0 2025-11-23 00:53:24 +00:00
Chris Coutinho a56b3f3d51 Merge pull request #347 from cbcoutinho/feature/openai-provider-support
feature/openai provider support
2025-11-23 01:52:55 +01:00
Chris Coutinho 2896fa1dc9 feat: Add tag management methods to WebDAV client
- Add get_file_info() to get file info including file ID via PROPFIND
- Add create_tag() to create system tags via OCS API
- Add get_or_create_tag() for idempotent tag creation
- Add assign_tag_to_file() to assign tags to files via WebDAV
- Add remove_tag_from_file() to remove tags from files

Also refactors RAG evaluation:
- Add indexed_manual_pdf fixture using existing nc_client/nc_mcp_client
- Remove manual tag creation steps from workflow (now handled by fixture)
- Add comprehensive unit tests for new WebDAV methods

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-23 01:51:42 +01:00
Chris Coutinho 04251401aa ci: Add permissions to github token 2025-11-23 01:26:22 +01:00
13 changed files with 958 additions and 593 deletions
+9 -167
View File
@@ -3,6 +3,10 @@ name: RAG Evaluation
on:
workflow_dispatch:
inputs:
manual_path:
description: 'Path to Nextcloud User Manual PDF in Nextcloud'
required: false
default: 'Nextcloud Manual.pdf'
embedding_model:
description: 'OpenAI embedding model'
required: false
@@ -15,40 +19,15 @@ on:
jobs:
rag-evaluation:
runs-on: ubuntu-latest
timeout-minutes: 45
timeout-minutes: 30
permissions:
models: read
steps:
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
submodules: 'true'
- name: Clone Nextcloud documentation
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
repository: 'nextcloud/documentation'
path: 'nextcloud-docs'
- name: Install Sphinx and LaTeX dependencies
run: |
sudo apt-get update
sudo apt-get install -y \
python3-sphinx \
python3-pip \
latexmk \
texlive-latex-recommended \
texlive-latex-extra \
texlive-fonts-recommended \
texlive-fonts-extra
- name: Build User Manual PDF
run: |
cd nextcloud-docs/user_manual
pip3 install -r ../requirements.txt
make latexpdf
ls -la _build/latex/
cp _build/latex/NextcloudUserManual.pdf ../../Nextcloud_User_Manual.pdf
echo "PDF built successfully"
###### Required to build OIDC App ######
- name: Set up php 8.4
uses: shivammathur/setup-php@bf6b4fbd49ca58e4608c9c89fba0b8d90bd2a39f # v2
@@ -100,7 +79,7 @@ jobs:
echo "Waiting for MCP server..."
max_attempts=30
attempt=0
until curl -o /dev/null -s -w "%{http_code}\n" http://localhost:8000/health | grep -q "200"; do
until curl -o /dev/null -s -w "%{http_code}\n" http://localhost:8000/health/live | grep -q "200"; do
attempt=$((attempt + 1))
if [ $attempt -ge $max_attempts ]; then
echo "MCP server did not become ready in time."
@@ -111,149 +90,12 @@ jobs:
done
echo "MCP server is ready."
- name: Upload User Manual PDF to Nextcloud
run: |
echo "Uploading Nextcloud_User_Manual.pdf to Nextcloud..."
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" -u admin:admin \
-X PUT \
-T Nextcloud_User_Manual.pdf \
"http://localhost:8080/remote.php/dav/files/admin/Nextcloud_User_Manual.pdf")
if [ "$HTTP_CODE" = "201" ] || [ "$HTTP_CODE" = "204" ]; then
echo "PDF uploaded successfully (HTTP $HTTP_CODE)"
else
echo "Failed to upload PDF (HTTP $HTTP_CODE)"
exit 1
fi
- name: Create vector-index tag
id: create_tag
run: |
# Create the tag using OCS API
echo "Creating vector-index tag..."
RESPONSE=$(curl -s -u admin:admin \
-X POST \
-H 'Content-Type: application/json' \
-H 'OCS-APIRequest: true' \
-d '{"name":"vector-index","userVisible":true,"userAssignable":true}' \
"http://localhost:8080/ocs/v2.php/apps/systemtags/api/v1/tags")
echo "Create tag response: $RESPONSE"
# Get tag ID from response or lookup
TAG_ID=$(echo "$RESPONSE" | grep -oP '(?<="id":)[0-9]+' | head -1 || echo "")
if [ -z "$TAG_ID" ]; then
echo "Tag may already exist, looking it up..."
TAG_ID=$(curl -s -u admin:admin \
-X PROPFIND \
-H 'Content-Type: application/xml' \
-d '<?xml version="1.0"?><d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns"><d:prop><oc:id/><oc:display-name/></d:prop></d:propfind>' \
http://localhost:8080/remote.php/dav/systemtags/ \
| grep -B2 "vector-index" | grep -oP '(?<=<oc:id>)[0-9]+(?=</oc:id>)' | head -1 || echo "")
fi
if [ -z "$TAG_ID" ]; then
echo "ERROR: Could not create or find vector-index tag"
exit 1
fi
echo "Tag ID: $TAG_ID"
echo "tag_id=$TAG_ID" >> $GITHUB_OUTPUT
- name: Get file ID of uploaded PDF
id: get_file_id
run: |
echo "Getting file ID for Nextcloud_User_Manual.pdf..."
# Get file ID using PROPFIND
FILE_ID=$(curl -s -u admin:admin \
-X PROPFIND \
-H 'Content-Type: application/xml' \
-d '<?xml version="1.0"?><d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns"><d:prop><oc:fileid/></d:prop></d:propfind>' \
"http://localhost:8080/remote.php/dav/files/admin/Nextcloud_User_Manual.pdf" \
| grep -oP '(?<=<oc:fileid>)[0-9]+(?=</oc:fileid>)' || echo "")
if [ -z "$FILE_ID" ]; then
echo "ERROR: Could not find file ID"
exit 1
fi
echo "Found file ID: $FILE_ID"
echo "file_id=$FILE_ID" >> $GITHUB_OUTPUT
- name: Tag file with vector-index
env:
FILE_ID: ${{ steps.get_file_id.outputs.file_id }}
TAG_ID: ${{ steps.create_tag.outputs.tag_id }}
run: |
echo "Tagging file $FILE_ID with tag $TAG_ID..."
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" -u admin:admin \
-X PUT \
-H 'Content-Type: application/json' \
-H 'Content-Length: 0' \
"http://localhost:8080/remote.php/dav/systemtags-relations/files/$FILE_ID/$TAG_ID")
if [ "$HTTP_CODE" = "201" ] || [ "$HTTP_CODE" = "409" ]; then
echo "File tagged successfully (HTTP $HTTP_CODE)"
else
echo "Failed to tag file (HTTP $HTTP_CODE)"
exit 1
fi
- name: Wait for vector sync to complete indexing
env:
NEXTCLOUD_HOST: "http://localhost:8080"
NEXTCLOUD_USERNAME: "admin"
NEXTCLOUD_PASSWORD: "admin"
run: |
echo "Waiting for vector sync to index the manual..."
max_attempts=60
attempt=0
# Wait for initial scan to pick up the file
sleep 10
while [ $attempt -lt $max_attempts ]; do
attempt=$((attempt + 1))
# Check vector sync status via MCP
STATUS=$(curl -s http://localhost:8000/health || echo "{}")
echo "Attempt $attempt/$max_attempts: $STATUS"
# Also check indexed count via semantic search
# If we get results, indexing is done
RESULT=$(curl -s -X POST http://localhost:8000/mcp \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"nc_get_vector_sync_status","arguments":{}}}' \
2>/dev/null || echo "{}")
echo "Vector sync status: $RESULT"
# Check if pending is 0 and indexed > 0
INDEXED=$(echo "$RESULT" | jq -r '.result.structuredContent.indexed // 0' 2>/dev/null || echo "0")
PENDING=$(echo "$RESULT" | jq -r '.result.structuredContent.pending // 1' 2>/dev/null || echo "1")
echo "Indexed: $INDEXED, Pending: $PENDING"
if [ "$INDEXED" -gt "0" ] && [ "$PENDING" -eq "0" ]; then
echo "Indexing complete! $INDEXED documents indexed."
break
fi
sleep 10
done
if [ $attempt -ge $max_attempts ]; then
echo "WARNING: Indexing may not be complete, proceeding anyway..."
fi
- name: Run RAG evaluation tests
env:
NEXTCLOUD_HOST: "http://localhost:8080"
NEXTCLOUD_USERNAME: "admin"
NEXTCLOUD_PASSWORD: "admin"
RAG_MANUAL_PATH: ${{ inputs.manual_path }}
OPENAI_API_KEY: ${{ secrets.GITHUB_TOKEN }}
OPENAI_BASE_URL: "https://models.github.ai/inference"
OPENAI_EMBEDDING_MODEL: ${{ inputs.embedding_model }}
+16
View File
@@ -1,3 +1,19 @@
## v0.48.1 (2025-11-23)
### Fix
- Use WebDAV for tag creation and add LLM-as-a-judge for RAG tests
### Refactor
- Move background tasks to server lifespan and deprecate SSE transport
## v0.48.0 (2025-11-23)
### Feat
- Add tag management methods to WebDAV client
## v0.47.0 (2025-11-23)
### Feat
+2 -2
View File
@@ -2,8 +2,8 @@ apiVersion: v2
name: nextcloud-mcp-server
description: A Helm chart for Nextcloud MCP Server - enables AI assistants to interact with Nextcloud
type: application
version: 0.47.0
appVersion: "0.47.0"
version: 0.48.1
appVersion: "0.48.1"
keywords:
- nextcloud
- mcp
+155 -244
View File
@@ -555,15 +555,15 @@ async def load_oauth_client_credentials(
@asynccontextmanager
async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]:
"""
Manage application lifecycle for BasicAuth mode.
Manage application lifecycle for BasicAuth mode (FastMCP session lifespan).
Creates a single Nextcloud client with basic authentication
that is shared across all requests.
that is shared across all requests within a session.
If vector sync is enabled (VECTOR_SYNC_ENABLED=true), also starts
background tasks for automatic document indexing (ADR-007).
Note: Background tasks (scanner, processor) are started at server level
in starlette_lifespan, not here. This lifespan runs per-session.
"""
logger.info("Starting MCP server in BasicAuth mode")
logger.info("Starting MCP session in BasicAuth mode")
logger.info("Creating Nextcloud client with BasicAuth")
client = NextcloudClient.from_env()
@@ -579,91 +579,12 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]:
# Initialize document processors
initialize_document_processors()
settings = get_settings()
# Check if vector sync is enabled
if settings.vector_sync_enabled:
logger.info("Vector sync enabled - starting background tasks")
# Get username from environment for BasicAuth mode
username = os.getenv("NEXTCLOUD_USERNAME")
if not username:
raise ValueError(
"NEXTCLOUD_USERNAME is required for vector sync in BasicAuth mode"
)
# Initialize Qdrant collection before starting background tasks
logger.info("Initializing Qdrant collection...")
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
try:
await get_qdrant_client() # Triggers collection creation if needed
logger.info("Qdrant collection ready")
except Exception as e:
logger.error(f"Failed to initialize Qdrant collection: {e}")
raise RuntimeError(
f"Cannot start vector sync - Qdrant initialization failed: {e}"
) from e
# Initialize shared state
send_stream, receive_stream = anyio.create_memory_object_stream(
max_buffer_size=settings.vector_sync_queue_max_size
)
shutdown_event = anyio.Event()
scanner_wake_event = anyio.Event()
# Start background tasks using anyio TaskGroup
async with anyio.create_task_group() as tg:
# Start scanner task
await tg.start(
scanner_task,
send_stream,
shutdown_event,
scanner_wake_event,
client,
username,
)
# Start processor pool (each gets a cloned receive stream)
for i in range(settings.vector_sync_processor_workers):
await tg.start(
processor_task,
i,
receive_stream.clone(),
shutdown_event,
client,
username,
)
logger.info(
f"Background sync tasks started: 1 scanner + {settings.vector_sync_processor_workers} processors"
)
# Yield with background tasks running
try:
yield AppContext(
client=client,
storage=storage,
document_send_stream=send_stream,
document_receive_stream=receive_stream,
shutdown_event=shutdown_event,
scanner_wake_event=scanner_wake_event,
)
finally:
# Shutdown signal
logger.info("Shutting down background sync tasks")
shutdown_event.set()
# TaskGroup automatically cancels all tasks on exit
logger.info("Background sync tasks stopped")
await client.close()
else:
# No vector sync - simple lifecycle
try:
yield AppContext(client=client, storage=storage)
finally:
logger.info("Shutting down BasicAuth mode")
await client.close()
# Yield client context - scanner runs at server level (starlette_lifespan)
try:
yield AppContext(client=client, storage=storage)
finally:
logger.info("Shutting down BasicAuth session")
await client.close()
async def setup_oauth_config():
@@ -979,7 +900,7 @@ async def setup_oauth_config():
)
def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None = None):
# Initialize observability (logging will be configured by uvicorn)
settings = get_settings()
@@ -1197,180 +1118,170 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
"Dynamic tool filtering enabled for OAuth mode (JWT and Bearer tokens)"
)
if transport == "sse":
mcp_app = mcp.sse_app()
starlette_lifespan = None
elif transport in ("http", "streamable-http"):
mcp_app = mcp.streamable_http_app()
mcp_app = mcp.streamable_http_app()
@asynccontextmanager
async def starlette_lifespan(app: Starlette):
# Set OAuth context for OAuth login routes (ADR-004)
if oauth_enabled:
# Prepare OAuth config from setup_oauth_config closure variables
mcp_server_url = os.getenv(
"NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000"
)
nextcloud_resource_uri = os.getenv(
"NEXTCLOUD_RESOURCE_URI", nextcloud_host
)
discovery_url = os.getenv(
"OIDC_DISCOVERY_URL",
f"{nextcloud_host}/.well-known/openid-configuration",
)
scopes = os.getenv("NEXTCLOUD_OIDC_SCOPES", "")
@asynccontextmanager
async def starlette_lifespan(app: Starlette):
# Set OAuth context for OAuth login routes (ADR-004)
if oauth_enabled:
# Prepare OAuth config from setup_oauth_config closure variables
mcp_server_url = os.getenv(
"NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000"
)
nextcloud_resource_uri = os.getenv("NEXTCLOUD_RESOURCE_URI", nextcloud_host)
discovery_url = os.getenv(
"OIDC_DISCOVERY_URL",
f"{nextcloud_host}/.well-known/openid-configuration",
)
scopes = os.getenv("NEXTCLOUD_OIDC_SCOPES", "")
oauth_context_dict = {
"storage": refresh_token_storage,
"oauth_client": oauth_client,
"token_verifier": token_verifier, # For querying IdP userinfo endpoint
"config": {
"mcp_server_url": mcp_server_url,
"discovery_url": discovery_url,
"client_id": client_id, # From setup_oauth_config (DCR or static)
"client_secret": client_secret, # From setup_oauth_config (DCR or static)
"scopes": scopes,
"nextcloud_host": nextcloud_host,
"nextcloud_resource_uri": nextcloud_resource_uri,
"oauth_provider": oauth_provider,
},
}
app.state.oauth_context = oauth_context_dict
oauth_context_dict = {
"storage": refresh_token_storage,
"oauth_client": oauth_client,
"token_verifier": token_verifier, # For querying IdP userinfo endpoint
"config": {
"mcp_server_url": mcp_server_url,
"discovery_url": discovery_url,
"client_id": client_id, # From setup_oauth_config (DCR or static)
"client_secret": client_secret, # From setup_oauth_config (DCR or static)
"scopes": scopes,
"nextcloud_host": nextcloud_host,
"nextcloud_resource_uri": nextcloud_resource_uri,
"oauth_provider": oauth_provider,
},
}
app.state.oauth_context = oauth_context_dict
# Also set oauth_context on browser_app for session authentication
# browser_app is in the same function scope (defined later in create_app)
# We need to find it in the mounted routes
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
route.app.state.oauth_context = oauth_context_dict
logger.info(
"OAuth context shared with browser_app for session auth"
)
break
logger.info(
f"OAuth context initialized for login routes (client_id={client_id[:16]}...)"
)
else:
# BasicAuth mode - share storage with browser_app for webhook management
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
storage = RefreshTokenStorage.from_env()
await storage.initialize()
app.state.storage = storage
# Also share with browser_app for webhook routes
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
route.app.state.storage = storage
logger.info(
"Storage shared with browser_app for webhook management"
)
break
# Start background vector sync tasks for BasicAuth mode (ADR-007)
# For streamable-http transport, FastMCP lifespan isn't automatically triggered
# so we manually start background tasks here if vector sync is enabled
import anyio as anyio_module
settings = get_settings()
if not oauth_enabled and settings.vector_sync_enabled:
logger.info("Starting background vector sync tasks for BasicAuth mode")
# Get username from environment
username = os.getenv("NEXTCLOUD_USERNAME")
if not username:
raise ValueError(
"NEXTCLOUD_USERNAME required for vector sync in BasicAuth mode"
# Also set oauth_context on browser_app for session authentication
# browser_app is in the same function scope (defined later in create_app)
# We need to find it in the mounted routes
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
route.app.state.oauth_context = oauth_context_dict
logger.info(
"OAuth context shared with browser_app for session auth"
)
break
# Get Nextcloud client from MCP app context
# Create client since we're outside FastMCP lifespan
client = NextcloudClient.from_env()
logger.info(
f"OAuth context initialized for login routes (client_id={client_id[:16]}...)"
)
else:
# BasicAuth mode - share storage with browser_app for webhook management
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
# Initialize Qdrant collection before starting background tasks
logger.info("Initializing Qdrant collection...")
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
storage = RefreshTokenStorage.from_env()
await storage.initialize()
try:
await get_qdrant_client() # Triggers collection creation if needed
logger.info("Qdrant collection ready")
except Exception as e:
logger.error(f"Failed to initialize Qdrant collection: {e}")
raise RuntimeError(
f"Cannot start vector sync - Qdrant initialization failed: {e}"
) from e
app.state.storage = storage
# Initialize shared state
send_stream, receive_stream = anyio_module.create_memory_object_stream(
max_buffer_size=settings.vector_sync_queue_max_size
# Also share with browser_app for webhook routes
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
route.app.state.storage = storage
logger.info(
"Storage shared with browser_app for webhook management"
)
break
# Start background vector sync tasks for BasicAuth mode (ADR-007)
# Scanner runs at server-level (once), not per-session
import anyio as anyio_module
settings = get_settings()
if not oauth_enabled and settings.vector_sync_enabled:
logger.info("Starting background vector sync tasks for BasicAuth mode")
# Get username from environment
username = os.getenv("NEXTCLOUD_USERNAME")
if not username:
raise ValueError(
"NEXTCLOUD_USERNAME required for vector sync in BasicAuth mode"
)
shutdown_event = anyio_module.Event()
scanner_wake_event = anyio_module.Event()
# Store in app state for access from routes (ADR-007)
app.state.document_send_stream = send_stream
app.state.document_receive_stream = receive_stream
app.state.shutdown_event = shutdown_event
app.state.scanner_wake_event = scanner_wake_event
# Create client for vector sync (server-level, not per-session)
client = NextcloudClient.from_env()
# Also share with browser_app for /app route
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
route.app.state.document_send_stream = send_stream
route.app.state.document_receive_stream = receive_stream
route.app.state.shutdown_event = shutdown_event
route.app.state.scanner_wake_event = scanner_wake_event
logger.info(
"Vector sync state shared with browser_app for /app"
)
break
# Initialize Qdrant collection before starting background tasks
logger.info("Initializing Qdrant collection...")
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
# Start background tasks using anyio TaskGroup
async with anyio_module.create_task_group() as tg:
# Start scanner task
try:
await get_qdrant_client() # Triggers collection creation if needed
logger.info("Qdrant collection ready")
except Exception as e:
logger.error(f"Failed to initialize Qdrant collection: {e}")
raise RuntimeError(
f"Cannot start vector sync - Qdrant initialization failed: {e}"
) from e
# Initialize shared state
send_stream, receive_stream = anyio_module.create_memory_object_stream(
max_buffer_size=settings.vector_sync_queue_max_size
)
shutdown_event = anyio_module.Event()
scanner_wake_event = anyio_module.Event()
# Store in app state for access from routes (ADR-007)
app.state.document_send_stream = send_stream
app.state.document_receive_stream = receive_stream
app.state.shutdown_event = shutdown_event
app.state.scanner_wake_event = scanner_wake_event
# Also share with browser_app for /app route
for route in app.routes:
if isinstance(route, Mount) and route.path == "/app":
route.app.state.document_send_stream = send_stream
route.app.state.document_receive_stream = receive_stream
route.app.state.shutdown_event = shutdown_event
route.app.state.scanner_wake_event = scanner_wake_event
logger.info("Vector sync state shared with browser_app for /app")
break
# Start background tasks using anyio TaskGroup
async with anyio_module.create_task_group() as tg:
# Start scanner task
await tg.start(
scanner_task,
send_stream,
shutdown_event,
scanner_wake_event,
client,
username,
)
# Start processor pool (each gets a cloned receive stream)
for i in range(settings.vector_sync_processor_workers):
await tg.start(
scanner_task,
send_stream,
processor_task,
i,
receive_stream.clone(),
shutdown_event,
scanner_wake_event,
client,
username,
)
# Start processor pool (each gets a cloned receive stream)
for i in range(settings.vector_sync_processor_workers):
await tg.start(
processor_task,
i,
receive_stream.clone(),
shutdown_event,
client,
username,
)
logger.info(
f"Background sync tasks started: 1 scanner + "
f"{settings.vector_sync_processor_workers} processors"
)
logger.info(
f"Background sync tasks started: 1 scanner + "
f"{settings.vector_sync_processor_workers} processors"
)
# Run MCP session manager and yield
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
try:
yield
finally:
# Shutdown signal
logger.info("Shutting down background sync tasks")
shutdown_event.set()
await client.close()
# TaskGroup automatically cancels all tasks on exit
else:
# No vector sync - just run MCP session manager
# Run MCP session manager and yield
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
yield
try:
yield
finally:
# Shutdown signal
logger.info("Shutting down background sync tasks")
shutdown_event.set()
await client.close()
# TaskGroup automatically cancels all tasks on exit
else:
# No vector sync - just run MCP session manager
async with AsyncExitStack() as stack:
await stack.enter_async_context(mcp.session_manager.run())
yield
# Health check endpoints for Kubernetes probes
def health_live(request):
+74 -37
View File
@@ -22,6 +22,7 @@ from starlette.requests import Request
from starlette.responses import HTMLResponse, JSONResponse
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.observability.tracing import trace_operation
from nextcloud_mcp_server.search import (
BM25HybridSearchAlgorithm,
SemanticSearchAlgorithm,
@@ -139,7 +140,10 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
_get_authenticated_client_for_userinfo,
)
async with await _get_authenticated_client_for_userinfo(request) as nc_client: # noqa: F841
with trace_operation("vector_viz.get_auth_client"):
auth_client_ctx = await _get_authenticated_client_for_userinfo(request)
async with auth_client_ctx as nc_client: # noqa: F841
# Create search algorithm (no client needed - verification removed)
if algorithm == "semantic":
search_algo = SemanticSearchAlgorithm(score_threshold=score_threshold)
@@ -159,24 +163,40 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
all_results = []
if doc_types is None or len(doc_types) == 0:
# Cross-app search - search all indexed types
unverified_results = await search_algo.search(
query=query,
user_id=username,
limit=limit * 2, # Buffer for verification filtering
doc_type=None, # Search all types
score_threshold=score_threshold,
)
all_results.extend(unverified_results)
else:
# Search each document type and combine
for doc_type in doc_types:
with trace_operation(
"vector_viz.search_execute",
attributes={
"search.algorithm": algorithm,
"search.limit": limit * 2,
"search.doc_type": "all",
},
):
unverified_results = await search_algo.search(
query=query,
user_id=username,
limit=limit * 2, # Buffer for verification filtering
doc_type=doc_type,
doc_type=None, # Search all types
score_threshold=score_threshold,
)
all_results.extend(unverified_results)
else:
# Search each document type and combine
for doc_type in doc_types:
with trace_operation(
"vector_viz.search_execute",
attributes={
"search.algorithm": algorithm,
"search.limit": limit * 2,
"search.doc_type": doc_type,
},
):
unverified_results = await search_algo.search(
query=query,
user_id=username,
limit=limit * 2, # Buffer for verification filtering
doc_type=doc_type,
score_threshold=score_threshold,
)
all_results.extend(unverified_results)
# Sort by score before verification
all_results.sort(key=lambda r: r.score, reverse=True)
@@ -190,22 +210,26 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
# Store original scores and normalize for visualization
# (best result = 1.0, worst result = 0.0 within THIS result set)
# This makes visual encoding meaningful regardless of RRF normalization
if search_results:
scores = [r.score for r in search_results]
min_score, max_score = min(scores), max(scores)
score_range = max_score - min_score if max_score > min_score else 1.0
with trace_operation(
"vector_viz.score_normalize",
attributes={"normalize.num_results": len(search_results)},
):
if search_results:
scores = [r.score for r in search_results]
min_score, max_score = min(scores), max(scores)
score_range = max_score - min_score if max_score > min_score else 1.0
logger.info(
f"Normalizing scores for viz: original range [{min_score:.3f}, {max_score:.3f}] "
f"→ [0.0, 1.0]"
)
logger.info(
f"Normalizing scores for viz: original range [{min_score:.3f}, {max_score:.3f}] "
f"→ [0.0, 1.0]"
)
# Store original score and rescale to 0-1 for visualization
for r in search_results:
# Store original score before normalization
r.original_score = r.score
# Rescale for visual encoding
r.score = (r.score - min_score) / score_range
# Store original score and rescale to 0-1 for visualization
for r in search_results:
# Store original score before normalization
r.original_score = r.score
# Rescale for visual encoding
r.score = (r.score - min_score) / score_range
if not search_results:
return JSONResponse(
@@ -220,7 +244,9 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
# Fetch vectors for specific matching chunks from Qdrant using batch retrieve
vector_fetch_start = time.perf_counter()
qdrant_client = await get_qdrant_client()
with trace_operation("vector_viz.get_qdrant_client"):
qdrant_client = await get_qdrant_client()
chunk_vectors_map = {} # Map (doc_id, chunk_start, chunk_end) -> vector
@@ -231,12 +257,16 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
if point_ids:
# Single batch retrieve call instead of N sequential scroll calls
# This is ~50x faster for 50 results (1 HTTP request vs 50)
points_response = await qdrant_client.retrieve(
collection_name=settings.get_collection_name(),
ids=point_ids,
with_vectors=["dense"],
with_payload=["doc_id", "chunk_start_offset", "chunk_end_offset"],
)
with trace_operation(
"vector_viz.vector_retrieve",
attributes={"retrieve.num_points": len(point_ids)},
):
points_response = await qdrant_client.retrieve(
collection_name=settings.get_collection_name(),
ids=point_ids,
with_vectors=["dense"],
with_payload=["doc_id", "chunk_start_offset", "chunk_end_offset"],
)
# Build chunk_vectors_map from batch response
for point in points_response:
@@ -367,9 +397,16 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
import anyio
coords_3d, pca = await anyio.to_thread.run_sync( # type: ignore[attr-defined]
lambda: _compute_pca(all_vectors_normalized)
)
with trace_operation(
"vector_viz.pca_compute",
attributes={
"pca.num_vectors": len(all_vectors_normalized),
"pca.embedding_dim": embedding_dim,
},
):
coords_3d, pca = await anyio.to_thread.run_sync( # type: ignore[attr-defined]
lambda: _compute_pca(all_vectors_normalized)
)
pca_duration = time.perf_counter() - pca_start
# After fit, these attributes are guaranteed to be set
+2 -2
View File
@@ -29,9 +29,9 @@ from .app import get_app
@click.option(
"--transport",
"-t",
default="sse",
default="streamable-http",
show_default=True,
type=click.Choice(["sse", "streamable-http", "http"]),
type=click.Choice(["streamable-http", "http"]),
help="MCP transport protocol",
)
@click.option(
+234
View File
@@ -1295,3 +1295,237 @@ class WebDAVClient(BaseNextcloudClient):
logger.debug(f"Found {len(files)} files with tag ID {tag_id}")
return files
async def get_file_info(self, path: str) -> dict[str, Any] | None:
"""Get file info including file ID via WebDAV PROPFIND.
Args:
path: Path to the file (relative to user's files directory)
Returns:
File info dictionary with id, name, size, content_type, etc.
Returns None if file not found.
"""
webdav_path = f"{self._get_webdav_base_path()}/{path.lstrip('/')}"
propfind_body = """<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
<d:prop>
<oc:fileid/>
<d:displayname/>
<d:getcontentlength/>
<d:getcontenttype/>
<d:getlastmodified/>
<d:getetag/>
<d:resourcetype/>
</d:prop>
</d:propfind>"""
try:
response = await self._client.request(
"PROPFIND",
webdav_path,
headers={"Depth": "0"},
content=propfind_body,
)
response.raise_for_status()
except HTTPStatusError as e:
if e.response.status_code == 404:
logger.debug(f"File not found: {path}")
return None
raise
# Parse XML response
root = ET.fromstring(response.content)
ns = {
"d": "DAV:",
"oc": "http://owncloud.org/ns",
}
response_elem = root.find("d:response", ns)
if response_elem is None:
return None
propstat = response_elem.find("d:propstat", ns)
if propstat is None:
return None
prop = propstat.find("d:prop", ns)
if prop is None:
return None
# Extract properties
fileid_elem = prop.find("oc:fileid", ns)
displayname_elem = prop.find("d:displayname", ns)
contentlength_elem = prop.find("d:getcontentlength", ns)
contenttype_elem = prop.find("d:getcontenttype", ns)
lastmodified_elem = prop.find("d:getlastmodified", ns)
etag_elem = prop.find("d:getetag", ns)
resourcetype_elem = prop.find("d:resourcetype", ns)
is_directory = (
resourcetype_elem is not None
and resourcetype_elem.find("d:collection", ns) is not None
)
file_info = {
"id": int(fileid_elem.text) if fileid_elem is not None else None,
"path": path,
"name": displayname_elem.text
if displayname_elem is not None
else path.split("/")[-1],
"size": int(contentlength_elem.text)
if contentlength_elem is not None and contentlength_elem.text
else 0,
"content_type": contenttype_elem.text
if contenttype_elem is not None
else "",
"last_modified": lastmodified_elem.text
if lastmodified_elem is not None
else None,
"etag": etag_elem.text.strip('"')
if etag_elem is not None and etag_elem.text
else None,
"is_directory": is_directory,
}
logger.debug(f"Got file info for '{path}': id={file_info['id']}")
return file_info
async def create_tag(
self,
name: str,
user_visible: bool = True,
user_assignable: bool = True,
) -> dict[str, Any]:
"""Create a system tag via WebDAV.
Args:
name: Name of the tag to create
user_visible: Whether the tag is visible to users
user_assignable: Whether users can assign this tag
Returns:
Tag dictionary with id, name, userVisible, userAssignable
Raises:
HTTPStatusError: If tag creation fails (409 if already exists)
"""
# Use WebDAV POST with JSON body to create tag
response = await self._client.post(
"/remote.php/dav/systemtags/",
headers={"Content-Type": "application/json"},
json={
"name": name,
"userVisible": user_visible,
"userAssignable": user_assignable,
},
)
response.raise_for_status()
# Extract tag ID from Content-Location header (e.g., /remote.php/dav/systemtags/42)
content_location = response.headers.get("Content-Location", "")
tag_id = None
if content_location:
# Extract the numeric ID from the path
try:
tag_id = int(content_location.rstrip("/").split("/")[-1])
except (ValueError, IndexError):
pass
tag_info = {
"id": tag_id,
"name": name,
"userVisible": user_visible,
"userAssignable": user_assignable,
}
logger.info(f"Created tag '{name}' with ID {tag_info['id']}")
return tag_info
async def get_or_create_tag(
self,
name: str,
user_visible: bool = True,
user_assignable: bool = True,
) -> dict[str, Any]:
"""Get a tag by name, creating it if it doesn't exist.
Args:
name: Name of the tag
user_visible: Whether the tag is visible to users (for creation)
user_assignable: Whether users can assign this tag (for creation)
Returns:
Tag dictionary with id, name, userVisible, userAssignable
"""
# First try to get existing tag
existing_tag = await self.get_tag_by_name(name)
if existing_tag:
logger.debug(f"Tag '{name}' already exists with ID {existing_tag['id']}")
return existing_tag
# Create new tag
try:
return await self.create_tag(name, user_visible, user_assignable)
except HTTPStatusError as e:
if e.response.status_code == 409:
# Tag was created between our check and creation, fetch it
existing_tag = await self.get_tag_by_name(name)
if existing_tag:
return existing_tag
raise
async def assign_tag_to_file(self, file_id: int, tag_id: int) -> bool:
"""Assign a system tag to a file.
Args:
file_id: Numeric file ID
tag_id: Numeric tag ID
Returns:
True if tag was assigned successfully (or already assigned)
Raises:
HTTPStatusError: If tag assignment fails
"""
response = await self._client.request(
"PUT",
f"/remote.php/dav/systemtags-relations/files/{file_id}/{tag_id}",
headers={"Content-Length": "0"},
content=b"",
)
# 201 = Created (new assignment), 409 = Conflict (already assigned)
if response.status_code in (201, 409):
logger.info(f"Tagged file {file_id} with tag {tag_id}")
return True
response.raise_for_status()
return True
async def remove_tag_from_file(self, file_id: int, tag_id: int) -> bool:
"""Remove a system tag from a file.
Args:
file_id: Numeric file ID
tag_id: Numeric tag ID
Returns:
True if tag was removed successfully (or wasn't assigned)
Raises:
HTTPStatusError: If tag removal fails
"""
response = await self._client.request(
"DELETE",
f"/remote.php/dav/systemtags-relations/files/{file_id}/{tag_id}",
)
# 204 = No Content (removed), 404 = Not Found (wasn't assigned)
if response.status_code in (204, 404):
logger.info(f"Removed tag {tag_id} from file {file_id}")
return True
response.raise_for_status()
return True
+82 -67
View File
@@ -9,6 +9,7 @@ from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.embedding import get_bm25_service, get_embedding_service
from nextcloud_mcp_server.observability.metrics import record_qdrant_operation
from nextcloud_mcp_server.observability.tracing import trace_operation
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
@@ -99,15 +100,19 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
)
# Generate dense embedding for semantic search
embedding_service = get_embedding_service()
dense_embedding = await embedding_service.embed(query)
with trace_operation("search.get_embedding_service"):
embedding_service = get_embedding_service()
with trace_operation("search.dense_embedding"):
dense_embedding = await embedding_service.embed(query)
# Store for reuse by callers (e.g., viz_routes PCA visualization)
self.query_embedding = dense_embedding
logger.debug(f"Generated dense embedding (dimension={len(dense_embedding)})")
# Generate sparse embedding for BM25 keyword search
bm25_service = get_bm25_service()
sparse_embedding = await bm25_service.encode_async(query)
with trace_operation("search.get_bm25_service"):
bm25_service = get_bm25_service()
with trace_operation("search.sparse_embedding_bm25"):
sparse_embedding = await bm25_service.encode_async(query)
logger.debug(
f"Generated sparse embedding "
f"({len(sparse_embedding['indices'])} non-zero terms)"
@@ -134,38 +139,44 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
query_filter = Filter(must=filter_conditions)
# Execute hybrid search with Qdrant native RRF fusion
qdrant_client = await get_qdrant_client()
with trace_operation("search.get_qdrant_client"):
qdrant_client = await get_qdrant_client()
try:
# Use prefetch to run both dense and sparse searches
# Qdrant will automatically merge results using RRF
search_response = await qdrant_client.query_points(
collection_name=settings.get_collection_name(),
prefetch=[
# Dense semantic search
models.Prefetch(
query=dense_embedding,
using="dense",
limit=limit * 2, # Get extra for deduplication
filter=query_filter,
),
# Sparse BM25 search
models.Prefetch(
query=models.SparseVector(
indices=sparse_embedding["indices"],
values=sparse_embedding["values"],
with trace_operation(
"search.qdrant_query",
attributes={"query.limit": limit * 2, "query.fusion": self.fusion_name},
):
search_response = await qdrant_client.query_points(
collection_name=settings.get_collection_name(),
prefetch=[
# Dense semantic search
models.Prefetch(
query=dense_embedding,
using="dense",
limit=limit * 2, # Get extra for deduplication
filter=query_filter,
),
using="sparse",
limit=limit * 2, # Get extra for deduplication
filter=query_filter,
),
],
# Fusion query (RRF or DBSF based on initialization)
query=models.FusionQuery(fusion=self.fusion),
limit=limit * 2, # Get extra for deduplication
score_threshold=score_threshold,
with_payload=True,
with_vectors=False, # Don't return vectors to save bandwidth
)
# Sparse BM25 search
models.Prefetch(
query=models.SparseVector(
indices=sparse_embedding["indices"],
values=sparse_embedding["values"],
),
using="sparse",
limit=limit * 2, # Get extra for deduplication
filter=query_filter,
),
],
# Fusion query (RRF or DBSF based on initialization)
query=models.FusionQuery(fusion=self.fusion),
limit=limit * 2, # Get extra for deduplication
score_threshold=score_threshold,
with_payload=True,
with_vectors=False, # Don't return vectors to save bandwidth
)
record_qdrant_operation("search", "success")
except Exception:
record_qdrant_operation("search", "error")
@@ -185,47 +196,51 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
# Deduplicate by (doc_id, doc_type, chunk_start, chunk_end)
# This allows multiple chunks from same doc, but removes duplicate chunks
seen_chunks = set()
results = []
with trace_operation(
"search.deduplicate",
attributes={"dedupe.num_points": len(search_response.points)},
):
seen_chunks = set()
results = []
for result in search_response.points:
# doc_id can be int (notes) or str (files - file paths)
doc_id = result.payload["doc_id"]
doc_type = result.payload.get("doc_type", "note")
chunk_start = result.payload.get("chunk_start_offset")
chunk_end = result.payload.get("chunk_end_offset")
chunk_key = (doc_id, doc_type, chunk_start, chunk_end)
for result in search_response.points:
# doc_id can be int (notes) or str (files - file paths)
doc_id = result.payload["doc_id"]
doc_type = result.payload.get("doc_type", "note")
chunk_start = result.payload.get("chunk_start_offset")
chunk_end = result.payload.get("chunk_end_offset")
chunk_key = (doc_id, doc_type, chunk_start, chunk_end)
# Skip if we've already seen this exact chunk
if chunk_key in seen_chunks:
continue
# Skip if we've already seen this exact chunk
if chunk_key in seen_chunks:
continue
seen_chunks.add(chunk_key)
seen_chunks.add(chunk_key)
# Return unverified results (verification happens at output stage)
results.append(
SearchResult(
id=doc_id,
doc_type=doc_type,
title=result.payload.get("title", "Untitled"),
excerpt=result.payload.get("excerpt", ""),
score=result.score, # Fusion score (RRF or DBSF)
metadata={
"chunk_index": result.payload.get("chunk_index"),
"total_chunks": result.payload.get("total_chunks"),
"search_method": f"bm25_hybrid_{self.fusion_name}",
},
chunk_start_offset=result.payload.get("chunk_start_offset"),
chunk_end_offset=result.payload.get("chunk_end_offset"),
page_number=result.payload.get("page_number"),
chunk_index=result.payload.get("chunk_index", 0),
total_chunks=result.payload.get("total_chunks", 1),
point_id=str(result.id), # Qdrant point ID for batch retrieval
# Return unverified results (verification happens at output stage)
results.append(
SearchResult(
id=doc_id,
doc_type=doc_type,
title=result.payload.get("title", "Untitled"),
excerpt=result.payload.get("excerpt", ""),
score=result.score, # Fusion score (RRF or DBSF)
metadata={
"chunk_index": result.payload.get("chunk_index"),
"total_chunks": result.payload.get("total_chunks"),
"search_method": f"bm25_hybrid_{self.fusion_name}",
},
chunk_start_offset=result.payload.get("chunk_start_offset"),
chunk_end_offset=result.payload.get("chunk_end_offset"),
page_number=result.payload.get("page_number"),
chunk_index=result.payload.get("chunk_index", 0),
total_chunks=result.payload.get("total_chunks", 1),
point_id=str(result.id), # Qdrant point ID for batch retrieval
)
)
)
if len(results) >= limit:
break
if len(results) >= limit:
break
logger.info(f"Returning {len(results)} unverified results after deduplication")
if results:
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "nextcloud-mcp-server"
version = "0.47.0"
version = "0.48.1"
description = "Model Context Protocol (MCP) server for Nextcloud integration - enables AI assistants to interact with Nextcloud data"
authors = [
{name = "Chris Coutinho", email = "chris@coutinho.io"}
+1 -55
View File
@@ -9,7 +9,6 @@ import pytest
from httpx import HTTPStatusError
from mcp import ClientSession
from mcp.client.session import RequestContext
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client
from mcp.types import ElicitRequestParams, ElicitResult, ErrorData
@@ -172,51 +171,6 @@ async def create_mcp_client_session(
logger.debug(f"{client_name} client session cleaned up successfully")
async def create_mcp_client_session_sse(
url: str,
token: str | None = None,
client_name: str = "MCP",
elicitation_callback: Any = None,
) -> AsyncGenerator[ClientSession, Any]:
"""
Factory function to create an MCP client session using SSE transport.
Similar to create_mcp_client_session but uses SSE transport instead of streamable-http.
Uses native async context managers to ensure correct LIFO cleanup order.
Args:
url: MCP server URL (e.g., "http://localhost:8000/sse")
token: Optional OAuth access token for Bearer authentication
client_name: Client name for logging (e.g., "Basic MCP (SSE)")
elicitation_callback: Optional callback for handling elicitation requests
Yields:
Initialized MCP ClientSession
Note:
SSE transport is being deprecated in favor of streamable-http.
This function exists for compatibility testing only.
"""
logger.info(f"Creating SSE client for {client_name}")
# Prepare headers with OAuth token if provided
headers = {"Authorization": f"Bearer {token}"} if token else None
# Use native async with - Python ensures LIFO cleanup
# Cleanup order will be: ClientSession.__aexit__ -> sse_client.__aexit__
# Note: sse_client yields only (read_stream, write_stream), not 3 values like streamablehttp_client
async with sse_client(url, headers=headers) as (read_stream, write_stream):
async with ClientSession(
read_stream, write_stream, elicitation_callback=elicitation_callback
) as session:
await session.initialize()
logger.info(f"{client_name} client session initialized successfully")
yield session
# Cleanup happens automatically in LIFO order - no exception suppression needed
logger.debug(f"{client_name} client session cleaned up successfully")
@pytest.fixture(scope="session")
async def nc_client(anyio_backend) -> AsyncGenerator[NextcloudClient, Any]:
"""
@@ -255,18 +209,10 @@ async def nc_client(anyio_backend) -> AsyncGenerator[NextcloudClient, Any]:
@pytest.fixture(scope="session")
async def nc_mcp_client(anyio_backend) -> AsyncGenerator[ClientSession, Any]:
"""
Fixture to create an MCP client session for integration tests using SSE transport.
Fixture to create an MCP client session for integration tests using streamable-http.
Uses anyio pytest plugin for proper async fixture handling.
Note: SSE transport is being deprecated. This fixture uses SSE for compatibility testing.
"""
# async for session in create_mcp_client_session_sse(
# url="http://localhost:8000/sse", client_name="Basic MCP (SSE)"
# ):
# yield session
async for session in create_mcp_client_session(
url="http://localhost:8000/mcp",
client_name="Basic MCP (HTTP)",
+140 -17
View File
@@ -10,6 +10,7 @@ Environment Variables:
OPENAI_BASE_URL: Base URL override (e.g., "https://models.github.ai/inference")
OPENAI_EMBEDDING_MODEL: Embedding model (default: "text-embedding-3-small")
OPENAI_GENERATION_MODEL: Generation model for sampling (default: "gpt-4o-mini")
RAG_MANUAL_PATH: Path to manual PDF in Nextcloud (default: "Nextcloud_User_Manual.pdf")
For GitHub CI, set:
OPENAI_API_KEY: ${{ secrets.GITHUB_TOKEN }}
@@ -18,15 +19,17 @@ For GitHub CI, set:
OPENAI_GENERATION_MODEL: openai/gpt-4o-mini
Prerequisites:
- Nextcloud User Manual indexed in Qdrant (via vector sync)
- Nextcloud User Manual PDF uploaded to Nextcloud
- VECTOR_SYNC_ENABLED=true on the MCP server
"""
import json
import logging
import os
from pathlib import Path
from typing import Any, AsyncGenerator
import anyio
import pytest
from mcp import ClientSession
@@ -34,6 +37,39 @@ from nextcloud_mcp_server.providers.openai import OpenAIProvider
from tests.conftest import create_mcp_client_session
from tests.integration.sampling_support import create_openai_sampling_callback
logger = logging.getLogger(__name__)
# Default path to the Nextcloud User Manual PDF
DEFAULT_MANUAL_PATH = "Nextcloud Manual.pdf"
async def llm_judge(
provider: "OpenAIProvider",
ground_truth: str,
system_output: str,
) -> bool:
"""Use LLM to judge if system output aligns with ground truth.
Args:
provider: OpenAI provider with generation capability
ground_truth: The expected/reference answer
system_output: The system's actual output to evaluate
Returns:
True if output aligns with ground truth, False otherwise
"""
prompt = f"""GROUND TRUTH: {ground_truth}
SYSTEM OUTPUT: {system_output}
Does the system output contain the key facts from the ground truth?
Answer: TRUE or FALSE"""
response = await provider.generate(prompt, max_tokens=10)
return "TRUE" in response.upper()
# Skip all tests if OpenAI API key not configured
pytestmark = [
pytest.mark.integration,
@@ -58,6 +94,86 @@ def ground_truth_qa():
return json.load(f)
@pytest.fixture(scope="module")
async def indexed_manual_pdf(nc_client, nc_mcp_client):
"""Ensure the Nextcloud User Manual PDF is tagged and indexed for vector search.
This fixture:
1. Gets file info for the manual PDF
2. Creates/gets the 'vector-index' tag
3. Assigns the tag to the file
4. Waits for vector sync to complete indexing
Environment Variables:
RAG_MANUAL_PATH: Path to manual PDF in Nextcloud (default: Nextcloud Manual.pdf)
"""
manual_path = os.getenv("RAG_MANUAL_PATH", DEFAULT_MANUAL_PATH)
logger.info(f"Setting up indexed manual PDF: {manual_path}")
# Get file info to verify file exists and get file ID
file_info = await nc_client.webdav.get_file_info(manual_path)
if not file_info:
pytest.skip(f"Manual PDF not found at '{manual_path}'")
file_id = file_info["id"]
logger.info(f"Found manual PDF: {manual_path} (file_id={file_id})")
# Create or get the vector-index tag
tag = await nc_client.webdav.get_or_create_tag("vector-index")
tag_id = tag["id"]
logger.info(f"Using tag 'vector-index' (tag_id={tag_id})")
# Assign tag to file
await nc_client.webdav.assign_tag_to_file(file_id, tag_id)
logger.info(f"Tagged file {file_id} with vector-index tag")
# Wait for vector sync to complete indexing
max_attempts = 60
poll_interval = 10
logger.info("Waiting for vector sync to index the manual...")
for attempt in range(1, max_attempts + 1):
try:
# Call the MCP tool via the existing client session
result = await nc_mcp_client.call_tool(
"nc_get_vector_sync_status",
arguments={},
)
if not result.isError:
content = result.structuredContent or {}
indexed = content.get("indexed_count", 0)
pending = content.get("pending_count", 1)
logger.info(
f"Attempt {attempt}/{max_attempts}: "
f"indexed={indexed}, pending={pending}"
)
if indexed > 0 and pending == 0:
logger.info(
f"Vector indexing complete: {indexed} documents indexed"
)
break
except Exception as e:
logger.warning(f"Attempt {attempt}: Error checking status: {e}")
if attempt < max_attempts:
await anyio.sleep(poll_interval)
else:
logger.warning(
f"Vector indexing may not be complete after {max_attempts} attempts"
)
yield {
"path": manual_path,
"file_id": file_id,
"tag_id": tag_id,
}
@pytest.fixture(scope="module")
async def openai_provider():
"""OpenAI provider configured from environment (embeddings only)."""
@@ -129,7 +245,9 @@ async def test_openai_embeddings_work(openai_provider: OpenAIProvider):
assert len(embedding) in [1536, 3072]
async def test_semantic_search_retrieval(nc_mcp_client, ground_truth_qa):
async def test_semantic_search_retrieval(
nc_mcp_client, ground_truth_qa, indexed_manual_pdf, openai_generation_provider
):
"""Test that semantic search retrieves relevant documents from the manual.
This tests the retrieval component of RAG - ensuring that queries
@@ -138,7 +256,6 @@ async def test_semantic_search_retrieval(nc_mcp_client, ground_truth_qa):
# Use first query from ground truth
test_case = ground_truth_qa[0] # 2FA question
query = test_case["query"]
expected_topics = test_case["expected_topics"]
# Perform semantic search via MCP tool
result = await nc_mcp_client.call_tool(
@@ -158,16 +275,21 @@ async def test_semantic_search_retrieval(nc_mcp_client, ground_truth_qa):
assert data["total_found"] > 0, f"No results for query: {query}"
assert len(data["results"]) > 0
# Check that at least one result contains expected topic keywords
all_excerpts = " ".join([r["excerpt"].lower() for r in data["results"]])
topic_found = any(topic.lower() in all_excerpts for topic in expected_topics)
assert topic_found, (
f"Expected topics {expected_topics} not found in results for query: {query}"
# Use LLM judge to evaluate if excerpts are relevant to ground truth
all_excerpts = " ".join([r["excerpt"] for r in data["results"]])
is_relevant = await llm_judge(
openai_generation_provider,
test_case["ground_truth"],
all_excerpts,
)
assert is_relevant, f"LLM judge: excerpts not relevant to query: {query}"
async def test_semantic_search_answer_with_sampling(
nc_mcp_client_with_sampling, ground_truth_qa
nc_mcp_client_with_sampling,
ground_truth_qa,
indexed_manual_pdf,
openai_generation_provider,
):
"""Test semantic search with MCP sampling for answer generation.
@@ -224,12 +346,13 @@ async def test_semantic_search_answer_with_sampling(
assert data["generated_answer"] is not None
assert len(data["generated_answer"]) > 50 # Non-trivial answer
# Check answer contains relevant content
answer_lower = data["generated_answer"].lower()
assert any(
keyword in answer_lower
for keyword in ["two-factor", "2fa", "authentication", "password"]
), f"Answer doesn't seem relevant to query: {data['generated_answer'][:200]}"
# Use LLM judge to evaluate answer relevance
is_relevant = await llm_judge(
openai_generation_provider,
test_case["ground_truth"],
data["generated_answer"],
)
assert is_relevant, f"LLM judge: answer not relevant to query: {query}"
@pytest.mark.parametrize(
@@ -243,7 +366,7 @@ async def test_semantic_search_answer_with_sampling(
],
)
async def test_retrieval_quality_all_queries(
nc_mcp_client, ground_truth_qa, qa_index, min_expected_results
nc_mcp_client, ground_truth_qa, indexed_manual_pdf, qa_index, min_expected_results
):
"""Test retrieval quality for all ground truth queries.
@@ -274,7 +397,7 @@ async def test_retrieval_quality_all_queries(
)
async def test_no_results_for_unrelated_query(nc_mcp_client):
async def test_no_results_for_unrelated_query(nc_mcp_client, indexed_manual_pdf):
"""Test that completely unrelated queries return low/no scores.
The Nextcloud manual shouldn't have relevant content for
+241
View File
@@ -117,3 +117,244 @@ def test_parse_search_response_with_empty_tags(mocker):
assert len(results) == 1
assert "tags" in results[0]
assert results[0]["tags"] == []
@pytest.mark.unit
async def test_get_file_info_returns_file_details(mocker):
"""Test that get_file_info returns file info including file ID."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock PROPFIND response
mock_response = AsyncMock()
mock_response.status_code = 207
mock_response.content = b"""<?xml version="1.0"?>
<d:multistatus xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
<d:response>
<d:href>/remote.php/dav/files/testuser/Documents/test.pdf</d:href>
<d:propstat>
<d:prop>
<oc:fileid>12345</oc:fileid>
<d:displayname>test.pdf</d:displayname>
<d:getcontentlength>1024</d:getcontentlength>
<d:getcontenttype>application/pdf</d:getcontenttype>
<d:getlastmodified>Sat, 01 Jan 2025 00:00:00 GMT</d:getlastmodified>
<d:getetag>"abc123"</d:getetag>
<d:resourcetype/>
</d:prop>
</d:propstat>
</d:response>
</d:multistatus>"""
mock_response.raise_for_status = mocker.Mock()
mock_http_client.request = AsyncMock(return_value=mock_response)
# Call get_file_info
result = await client.get_file_info("Documents/test.pdf")
# Verify result
assert result is not None
assert result["id"] == 12345
assert result["name"] == "test.pdf"
assert result["path"] == "Documents/test.pdf"
assert result["content_type"] == "application/pdf"
assert result["size"] == 1024
assert result["etag"] == "abc123"
assert result["is_directory"] is False
@pytest.mark.unit
async def test_get_file_info_returns_none_for_missing_file(mocker):
"""Test that get_file_info returns None for missing files."""
from httpx import HTTPStatusError, Response
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock 404 response
mock_response = mocker.Mock(spec=Response)
mock_response.status_code = 404
mock_http_client.request = AsyncMock(
side_effect=HTTPStatusError(
"Not Found", request=mocker.Mock(), response=mock_response
)
)
# Call get_file_info
result = await client.get_file_info("nonexistent.pdf")
# Verify result is None
assert result is None
@pytest.mark.unit
async def test_create_tag_creates_system_tag(mocker):
"""Test that create_tag creates a system tag via OCS API."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock OCS response
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json = mocker.Mock(
return_value={
"ocs": {
"data": {
"id": 42,
"name": "vector-index",
"userVisible": True,
"userAssignable": True,
}
}
}
)
mock_response.raise_for_status = mocker.Mock()
mock_http_client.post = AsyncMock(return_value=mock_response)
# Call create_tag
result = await client.create_tag("vector-index")
# Verify result
assert result["id"] == 42
assert result["name"] == "vector-index"
assert result["userVisible"] is True
assert result["userAssignable"] is True
# Verify API call
mock_http_client.post.assert_called_once()
call_args = mock_http_client.post.call_args
assert call_args[0][0] == "/ocs/v2.php/apps/systemtags/api/v1/tags"
assert call_args[1]["json"]["name"] == "vector-index"
@pytest.mark.unit
async def test_get_or_create_tag_returns_existing_tag(mocker):
"""Test that get_or_create_tag returns existing tag without creating."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock existing tag
mocker.patch.object(
client,
"get_tag_by_name",
return_value={"id": 42, "name": "vector-index", "userVisible": True},
)
mock_create = mocker.patch.object(client, "create_tag")
# Call get_or_create_tag
result = await client.get_or_create_tag("vector-index")
# Verify existing tag returned without creating
assert result["id"] == 42
mock_create.assert_not_called()
@pytest.mark.unit
async def test_get_or_create_tag_creates_new_tag(mocker):
"""Test that get_or_create_tag creates tag when not found."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock no existing tag
mocker.patch.object(client, "get_tag_by_name", return_value=None)
mocker.patch.object(
client,
"create_tag",
return_value={"id": 42, "name": "vector-index", "userVisible": True},
)
# Call get_or_create_tag
result = await client.get_or_create_tag("vector-index")
# Verify tag was created
assert result["id"] == 42
client.create_tag.assert_called_once_with("vector-index", True, True)
@pytest.mark.unit
async def test_assign_tag_to_file_success(mocker):
"""Test that assign_tag_to_file assigns tag via WebDAV."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock 201 Created response
mock_response = AsyncMock()
mock_response.status_code = 201
mock_http_client.request = AsyncMock(return_value=mock_response)
# Call assign_tag_to_file
result = await client.assign_tag_to_file(12345, 42)
# Verify result
assert result is True
# Verify API call
mock_http_client.request.assert_called_once()
call_args = mock_http_client.request.call_args
assert call_args[0][0] == "PUT"
assert "/systemtags-relations/files/12345/42" in call_args[0][1]
@pytest.mark.unit
async def test_assign_tag_to_file_already_assigned(mocker):
"""Test that assign_tag_to_file handles already assigned (409) gracefully."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock 409 Conflict response (already assigned)
mock_response = AsyncMock()
mock_response.status_code = 409
mock_http_client.request = AsyncMock(return_value=mock_response)
# Call assign_tag_to_file
result = await client.assign_tag_to_file(12345, 42)
# Verify result (should succeed even with 409)
assert result is True
@pytest.mark.unit
async def test_remove_tag_from_file_success(mocker):
"""Test that remove_tag_from_file removes tag via WebDAV."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock 204 No Content response
mock_response = AsyncMock()
mock_response.status_code = 204
mock_http_client.request = AsyncMock(return_value=mock_response)
# Call remove_tag_from_file
result = await client.remove_tag_from_file(12345, 42)
# Verify result
assert result is True
# Verify API call
mock_http_client.request.assert_called_once()
call_args = mock_http_client.request.call_args
assert call_args[0][0] == "DELETE"
assert "/systemtags-relations/files/12345/42" in call_args[0][1]
@pytest.mark.unit
async def test_remove_tag_from_file_not_assigned(mocker):
"""Test that remove_tag_from_file handles not assigned (404) gracefully."""
mock_http_client = AsyncMock()
client = WebDAVClient(mock_http_client, "testuser")
# Mock 404 Not Found response (tag wasn't assigned)
mock_response = AsyncMock()
mock_response.status_code = 404
mock_http_client.request = AsyncMock(return_value=mock_response)
# Call remove_tag_from_file
result = await client.remove_tag_from_file(12345, 42)
# Verify result (should succeed even with 404)
assert result is True
Generated
+1 -1
View File
@@ -1936,7 +1936,7 @@ wheels = [
[[package]]
name = "nextcloud-mcp-server"
version = "0.47.0"
version = "0.48.1"
source = { editable = "." }
dependencies = [
{ name = "aiosqlite" },