Merge origin/master into feature/bm25

Resolved conflicts:
- viz_routes.py: Kept bm25's extract_dense_vector() function for robust vector handling
- hybrid.py: Removed (bm25 uses native Qdrant RRF fusion instead)
- uv.lock: Regenerated after accepting master's dependencies

This merge brings in:
- RAG evaluation framework (ADR-013)
- Performance optimizations (double-fetch elimination)
- Migration from asyncio to anyio
- OpenTelemetry tracing improvements
- Notes app enhancements

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-11-16 11:52:40 +01:00
32 changed files with 3180 additions and 303 deletions
+1 -1
View File
@@ -20,7 +20,7 @@ jobs:
fetch-depth: 0
token: "${{ secrets.PERSONAL_ACCESS_TOKEN }}"
- name: Create bump and changelog
uses: commitizen-tools/commitizen-action@5b0848cd060263e24602d1eba03710e056ef7711 # 0.24.0
uses: commitizen-tools/commitizen-action@9615e7be1cf341393c52e865ebbdaa0712176d81 # 0.25.0
with:
github_token: ${{ secrets.PERSONAL_ACCESS_TOKEN }}
changelog_increment_filename: body.md
+3
View File
@@ -13,3 +13,6 @@ docker-compose.override.yml
# Generated by pytest used to login users
.nextcloud_oauth_*.json
.playwright-mcp/
# RAG Evaluation
tests/rag_evaluation/fixtures/
+28
View File
@@ -1,3 +1,31 @@
## v0.38.0 (2025-11-16)
### Feat
- add concurrent uploads and --force flag to upload command
- implement RAG evaluation framework with CLI tooling
### Fix
- download qrels from BEIR ZIP instead of HuggingFace
### Refactor
- migrate asyncio to anyio for consistent structured concurrency
- replace httpx client with NextcloudClient in upload command
### Perf
- Eliminate double-fetching in semantic search sampling
- fix vector viz search performance and visual encoding
- make note deletion concurrent in upload --force
## v0.37.0 (2025-11-16)
### Feat
- Add OpenTelemetry tracing to @instrument_tool decorator
## v0.36.0 (2025-11-15)
### BREAKING CHANGE
+5 -3
View File
@@ -5,11 +5,13 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
## Coding Conventions
### async/await Patterns
- **Use anyio + asyncio hybrid** - Both libraries are available
- **Use anyio for all async operations** - Provides structured concurrency
- pytest runs in `anyio` mode (`anyio_mode = "auto"` in pyproject.toml)
- asyncio used in auth modules (refresh_token_storage.py, token_exchange.py, token_broker.py)
- anyio used in calendar.py, client_registration.py, app.py
- Use `anyio.create_task_group()` for concurrent execution (NOT `asyncio.gather()`)
- Use `anyio.Lock()` for synchronization primitives (NOT `asyncio.Lock()`)
- Use `anyio.run()` for entry points (NOT `asyncio.run()`)
- Prefer standard async/await syntax without explicit library imports when possible
- Examples: app.py, search/hybrid.py, search/verification.py, auth/token_broker.py
### Type Hints
- **Use Python 3.10+ union syntax**: `str | None` instead of `Optional[str]`
+2 -2
View File
@@ -2,8 +2,8 @@ apiVersion: v2
name: nextcloud-mcp-server
description: A Helm chart for Nextcloud MCP Server - enables AI assistants to interact with Nextcloud
type: application
version: 0.36.0
appVersion: "0.36.0"
version: 0.38.0
appVersion: "0.38.0"
keywords:
- nextcloud
- mcp
+5 -3
View File
@@ -70,11 +70,13 @@ services:
mcp:
build: .
restart: always
command: ["--transport", "streamable-http"]
depends_on:
app:
condition: service_healthy
ports:
- 127.0.0.1:8000:8000
- 127.0.0.1:9090:9090
volumes:
- mcp-data:/app/data
environment:
@@ -85,7 +87,7 @@ services:
# Vector sync configuration (ADR-007)
- VECTOR_SYNC_ENABLED=true
- VECTOR_SYNC_SCAN_INTERVAL=10
- VECTOR_SYNC_SCAN_INTERVAL=60
- VECTOR_SYNC_PROCESSOR_WORKERS=1
#- LOG_FORMAT=json
@@ -193,8 +195,8 @@ services:
# Provider auto-detected from OIDC_DISCOVERY_URL issuer
# Using internal Docker hostname for discovery to get consistent issuer
- OIDC_DISCOVERY_URL=http://keycloak:8080/realms/nextcloud-mcp/.well-known/openid-configuration
- OIDC_CLIENT_ID=nextcloud-mcp-server
- OIDC_CLIENT_SECRET=mcp-secret-change-in-production
- NEXTCLOUD_OIDC_CLIENT_ID=nextcloud-mcp-server
- NEXTCLOUD_OIDC_CLIENT_SECRET=mcp-secret-change-in-production
- OIDC_JWKS_URI=http://keycloak:8080/realms/nextcloud-mcp/protocol/openid-connect/certs
# Nextcloud API endpoint (for accessing APIs with validated token)
+254
View File
@@ -0,0 +1,254 @@
## ADR-013: RAG Evaluation Testing Framework
**Status:** Proposed
**Date:** 2025-11-15
### Context
The `nc_semantic_search_answer` tool implements a Retrieval-Augmented Generation (RAG) system where:
1. **Retrieval**: Vector sync pipeline indexes Nextcloud documents (notes, calendar, contacts, etc.) into a vector database
2. **Generation**: MCP client's LLM synthesizes answers from retrieved documents via MCP sampling (ADR-008)
We need a testing framework to evaluate RAG system performance and identify whether failures occur in retrieval (wrong documents found) or generation (poor answer quality). This framework must use industry-standard evaluation methodologies while remaining practical to implement and maintain.
To establish a baseline, we will use the **BeIR/nfcorpus** dataset (medical/biomedical corpus) with ~5,000 documents and established query/answer pairs.
Homepage: https://www.cl.uni-heidelberg.de/statnlpgroup/nfcorpus/
Download: https://public.ukp.informatik.tu-darmstadt.de/thakur/BEIR/datasets/nfcorpus.zip
### Decision
We will implement a **two-part evaluation framework** that independently tests retrieval and generation quality using pytest fixtures.
#### In Scope
**1. Retrieval Evaluation**
Tests the vector sync/embedding pipeline's ability to find relevant documents.
- **Metric: Context Recall** (Did we retrieve documents containing the answer?)
- **Evaluation method**: Heuristic - Check if ground-truth document IDs appear in top-k retrieval results
- **Test**: Query → Semantic search → Assert expected doc IDs present
**2. Generation Evaluation**
Tests the MCP client LLM's ability to synthesize correct answers from retrieved context.
- **Metric: Answer Correctness** (Is the generated answer factually correct?)
- **Evaluation method**: LLM-as-judge - Compare RAG answer against ground-truth answer
- **Test**: Query → `nc_semantic_search_answer` → LLM evaluates answer vs. ground truth (binary true/false)
#### Out of Scope (Initial Implementation)
- **Context Relevance/Precision**: Measuring irrelevant documents in retrieval results
- **Faithfulness/Groundedness**: Detecting hallucinations not supported by retrieved context
- **Answer Relevance**: Whether answer addresses the specific question asked
- **Out-of-Scope Handling**: Testing "I don't know" responses when answer isn't in context
- **Continuous benchmarking**: Automated tracking of metric trends over time
- **Custom domain datasets**: Production-specific test data (medical corpus used initially)
These remain valuable for future iterations but add complexity beyond our initial goals.
#### Implementation
**Test Structure**
Location: `tests/rag_evaluation/`
- `test_retrieval_quality.py` - Retrieval evaluation tests
- `test_generation_quality.py` - Generation evaluation tests
- `conftest.py` - Fixtures for test data, MCP clients, and evaluation LLMs
**Required Pytest Fixtures**
1. **`nfcorpus_test_data`** (session-scoped)
- Downloads/caches BeIR nfcorpus dataset at runtime
- Loads 5 pre-selected test queries with:
- Query text
- Pre-generated ground-truth answer (from `tests/rag_evaluation/fixtures/ground_truth.json`)
- Expected document IDs (from qrels with score=2)
- Uploads all corpus documents as notes in test Nextcloud instance
- Triggers vector sync to index documents
- Waits for indexing completion
- Returns test case data structure
2. **`mcp_sampling_client`** (session-scoped)
- Creates MCP client that supports sampling
- Configurable LLM provider (ollama or anthropic) via environment:
- `RAG_EVAL_PROVIDER=ollama` (default) or `anthropic`
- `RAG_EVAL_OLLAMA_BASE_URL=http://localhost:11434`
- `RAG_EVAL_OLLAMA_MODEL=llama3.1:8b`
- `RAG_EVAL_ANTHROPIC_API_KEY=sk-...`
- `RAG_EVAL_ANTHROPIC_MODEL=claude-3-5-sonnet-20241022`
- Returns configured MCP client fixture
3. **`evaluation_llm`** (session-scoped)
- Separate LLM instance for evaluation (independent from MCP client)
- Same provider configuration as `mcp_sampling_client`
- Returns callable: `async def evaluate(prompt: str) -> str`
**Test Implementation Examples**
```python
# tests/rag_evaluation/test_retrieval_quality.py
async def test_retrieval_recall(nc_client, nfcorpus_test_data):
"""Test that semantic search retrieves documents containing the answer."""
for test_case in nfcorpus_test_data:
# Perform semantic search (retrieval only, no generation)
results = await nc_client.notes.semantic_search(
query=test_case.query,
limit=10
)
retrieved_doc_ids = {r.document_id for r in results}
expected_doc_ids = set(test_case.expected_document_ids)
# Context Recall: Are expected documents in top-k results?
recall = len(expected_doc_ids & retrieved_doc_ids) / len(expected_doc_ids)
assert recall >= 0.8, f"Recall {recall} below threshold for query: {test_case.query}"
# tests/rag_evaluation/test_generation_quality.py
async def test_answer_correctness(mcp_sampling_client, evaluation_llm, nfcorpus_test_data):
"""Test that RAG system generates factually correct answers."""
for test_case in nfcorpus_test_data:
# Execute full RAG pipeline (retrieval + generation)
result = await mcp_sampling_client.call_tool(
"nc_semantic_search_answer",
arguments={"query": test_case.query, "limit": 5}
)
rag_answer = result["generated_answer"]
# LLM-as-judge evaluation
evaluation_prompt = f"""Compare these two answers and respond with only TRUE or FALSE.
Question: {test_case.query}
Generated Answer: {rag_answer}
Ground Truth Answer: {test_case.ground_truth}
Are these answers semantically equivalent (do they convey the same factual information)?
Respond with only: TRUE or FALSE"""
evaluation_result = await evaluation_llm(evaluation_prompt)
assert evaluation_result.strip().upper() == "TRUE", \
f"Answer mismatch for query: {test_case.query}\nGot: {rag_answer}\nExpected: {test_case.ground_truth}"
```
**Dataset Integration**
The BeIR nfcorpus dataset structure:
- **corpus.jsonl**: 3,633 medical/biomedical documents (articles from PubMed)
- **queries.jsonl**: 3,237 queries (questions)
- **qrels/*.tsv**: Relevance judgments mapping query IDs to document IDs with scores (2=highly relevant, 1=somewhat relevant)
**Important**: The dataset provides relevance judgments (which documents answer which queries) but does NOT include ground truth answers. We must generate synthetic ground truth offline.
**Selected Test Queries** (5 diverse candidates):
1. **PLAIN-2630**: "Alkylphenol Endocrine Disruptors and Allergies" (5 words, 21 highly relevant docs)
2. **PLAIN-2660**: "How Long to Detox From Fish Before Pregnancy?" (8 words, 20 highly relevant docs)
3. **PLAIN-2510**: "Coffee and Artery Function" (4 words, 16 highly relevant docs)
4. **PLAIN-2430**: "Preventing Brain Loss with B Vitamins?" (6 words, 15 highly relevant docs)
5. **PLAIN-2690**: "Chronic Headaches and Pork Tapeworms" (5 words, 14 highly relevant docs)
**Ground Truth Generation** (offline, pre-test):
Ground truth answers will be generated offline using a script that:
1. Loads nfcorpus dataset
2. For each selected query, extracts top 3-5 highly relevant documents
3. Uses an LLM (ollama/anthropic) to synthesize a reference answer
4. Stores ground truth in `tests/rag_evaluation/fixtures/ground_truth.json`
```python
# tools/generate_rag_ground_truth.py
async def generate_ground_truth(query: str, relevant_docs: List[dict], llm: LLMProvider) -> str:
"""Generate synthetic ground truth answer from highly relevant documents."""
context = "\n\n".join([
f"Document {i+1}:\nTitle: {doc['title']}\n{doc['text']}"
for i, doc in enumerate(relevant_docs[:5])
])
prompt = f"""Based on the following documents, provide a comprehensive answer to this question:
Question: {query}
{context}
Provide a factual, well-structured answer that synthesizes information from the documents.
Focus on accuracy and completeness."""
return await llm.generate(prompt, max_tokens=500)
```
**Dataset Loading at Test Runtime** (in `nfcorpus_test_data` fixture):
1. Download nfcorpus dataset (cached in pytest temp directory)
2. Load corpus, queries, and qrels (relevance judgments)
3. Load pre-generated ground truth from `tests/rag_evaluation/fixtures/ground_truth.json`
4. Upload all corpus documents as Nextcloud notes
5. Trigger vector sync to index documents
6. Wait for indexing completion
7. Return test cases with query, ground truth, and expected doc IDs
**LLM Provider Abstraction**
```python
# tests/rag_evaluation/llm_providers.py
class LLMProvider(Protocol):
async def generate(self, prompt: str, max_tokens: int = 100) -> str: ...
class OllamaProvider:
def __init__(self, base_url: str, model: str):
self.base_url = base_url
self.model = model
async def generate(self, prompt: str, max_tokens: int = 100) -> str:
# Use httpx to call Ollama API
...
class AnthropicProvider:
def __init__(self, api_key: str, model: str):
self.client = anthropic.AsyncAnthropic(api_key=api_key)
self.model = model
async def generate(self, prompt: str, max_tokens: int = 100) -> str:
message = await self.client.messages.create(
model=self.model,
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}]
)
return message.content[0].text
```
### Consequences
**Positive:**
* **Actionable debugging**: Separate retrieval/generation tests pinpoint failure location
* **Industry-standard metrics**: Context Recall and Answer Correctness are recognized RAG evaluation metrics
* **Simple initial implementation**: Binary LLM evaluation (true/false) is straightforward to implement and interpret
* **Extensible framework**: Easy to add more metrics (faithfulness, relevance) later
* **Standardized benchmark**: nfcorpus provides objective comparison against published RAG systems
* **Hybrid evaluation**: Combines efficiency (heuristics for retrieval) with quality (LLM-as-judge for generation)
* **Provider flexibility**: Supports both local (Ollama) and cloud (Anthropic) LLM evaluation
**Negative:**
* **Medical domain bias**: nfcorpus is medical/biomedical content, may not represent production use cases (personal notes, calendar events, etc.)
* **Manual test execution**: Tests require external LLM access and are not integrated into CI pipeline
* **Limited initial coverage**: Starting with only 5 queries provides limited statistical confidence
* **Evaluation cost**: LLM-as-judge for generation evaluation incurs API costs (Anthropic) or requires local inference (Ollama)
* **Single metric per component**: Initial scope tests only one metric per component, missing other important quality dimensions
* **Synthetic ground truth**: Ground truth answers are LLM-generated, not human-validated, which may introduce evaluation bias
* **Large corpus upload**: Uploading 3,633 documents at test runtime may be slow; caching strategy needed
**Future Work:**
* Expand to 50-100 queries for statistical significance
* Add custom test dataset with production-representative documents (meeting notes, task lists, etc.)
* Implement additional metrics (faithfulness, context relevance, answer relevance)
* Create automated benchmarking dashboard to track metric trends
* Test multi-hop reasoning (synthesis questions requiring multiple documents)
* Evaluate out-of-scope handling ("I don't know" responses)
+1 -1
View File
@@ -243,7 +243,7 @@ If you see cardinality warnings:
The observability stack integrates at multiple layers:
1. **HTTP Layer**: `ObservabilityMiddleware` tracks all HTTP requests
2. **MCP Layer**: Tools use `@trace_mcp_tool` for span creation
2. **MCP Layer**: Tools use `@instrument_tool` for automatic metrics and trace span creation
3. **Client Layer**: `BaseNextcloudClient` tracks all API calls
4. **OAuth Layer**: Token operations are traced and metered
5. **Background Tasks**: Vector sync operations emit metrics/traces
+4 -4
View File
@@ -446,7 +446,7 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]:
# Start background tasks using anyio TaskGroup
async with anyio.create_task_group() as tg:
# Start scanner task
tg.start_soon(
await tg.start(
scanner_task,
send_stream,
shutdown_event,
@@ -457,7 +457,7 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]:
# Start processor pool (each gets a cloned receive stream)
for i in range(settings.vector_sync_processor_workers):
tg.start_soon(
await tg.start(
processor_task,
i,
receive_stream.clone(),
@@ -1147,7 +1147,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
# Start background tasks using anyio TaskGroup
async with anyio_module.create_task_group() as tg:
# Start scanner task
tg.start_soon(
await tg.start(
scanner_task,
send_stream,
shutdown_event,
@@ -1158,7 +1158,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
# Start processor pool (each gets a cloned receive stream)
for i in range(settings.vector_sync_processor_workers):
tg.start_soon(
await tg.start(
processor_task,
i,
receive_stream.clone(),
+2 -2
View File
@@ -1310,7 +1310,7 @@ async def generate_encryption_key() -> str:
# Example usage
if __name__ == "__main__":
import asyncio
import anyio
async def main():
# Generate a key for testing
@@ -1318,4 +1318,4 @@ if __name__ == "__main__":
print(f"Generated encryption key: {key}")
print(f"Set this in your environment: export TOKEN_ENCRYPTION_KEY='{key}'")
asyncio.run(main())
anyio.run(main)
+2 -2
View File
@@ -14,11 +14,11 @@ The Token Broker provides:
- Session vs background token separation (RFC 8693)
"""
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from typing import Dict, Optional, Tuple
import anyio
import httpx
import jwt
from cryptography.fernet import Fernet
@@ -43,7 +43,7 @@ class TokenCache:
self._cache: Dict[str, Tuple[str, datetime]] = {}
self._ttl = timedelta(seconds=ttl_seconds)
self._early_refresh = timedelta(seconds=early_refresh_seconds)
self._lock = asyncio.Lock()
self._lock = anyio.Lock()
async def get(self, user_id: str) -> Optional[str]:
"""Get cached token if valid."""
+15 -3
View File
@@ -719,6 +719,11 @@ async def user_info_html(request: Request) -> HTMLResponse:
}},
renderPlot(coordinates, results) {{
// Calculate score range for auto-scaling
const scores = results.map(r => r.score);
const minScore = Math.min(...scores);
const maxScore = Math.max(...scores);
const trace = {{
x: coordinates.map(c => c[0]),
y: coordinates.map(c => c[1]),
@@ -726,11 +731,18 @@ async def user_info_html(request: Request) -> HTMLResponse:
type: 'scatter',
text: results.map(r => `${{r.title}}<br>Score: ${{r.score.toFixed(3)}}`),
marker: {{
size: 8,
color: results.map(r => r.score),
// Multi-channel encoding: size + opacity + color for visual hierarchy
// Power scaling (score^2) amplifies visual differences dramatically
// score=0.0 6px, score=0.5 9.5px, score=1.0 20px
size: results.map(r => 6 + (Math.pow(r.score, 2) * 14)),
// Linear opacity scaling (0.2-1.0 range keeps all points visible)
opacity: results.map(r => 0.2 + (r.score * 0.8)),
// Color gradient shows score
color: scores,
colorscale: 'Viridis',
showscale: true,
colorbar: {{ title: 'Score' }},
colorbar: {{ title: 'Relative Score' }},
// Scores are normalized 0-1 within result set
cmin: 0,
cmax: 1
}}
+50 -48
View File
@@ -11,6 +11,7 @@ All processing happens server-side following ADR-012:
"""
import logging
import time
import numpy as np
from starlette.authentication import requires
@@ -362,56 +363,17 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
)
try:
# Start total request timer
request_start = time.perf_counter()
# Get authenticated HTTP client from session
# In BasicAuth mode: uses username/password from session
# In OAuth mode: uses access token from session
from nextcloud_mcp_server.auth.userinfo_routes import (
_get_authenticated_client_for_userinfo,
)
from nextcloud_mcp_server.client.notes import NotesClient
async with await _get_authenticated_client_for_userinfo(request) as http_client:
# Create NotesClient directly with authenticated HTTP client
notes_client = NotesClient(http_client, username)
# Wrap in a minimal client object for search algorithms
# This conforms to NextcloudClientProtocol but only implements notes
class MinimalNextcloudClient:
def __init__(self, notes_client, username):
self._notes = notes_client
self.username = username
@property
def notes(self):
return self._notes
@property
def webdav(self):
return None
@property
def calendar(self):
return None
@property
def contacts(self):
return None
@property
def deck(self):
return None
@property
def cookbook(self):
return None
@property
def tables(self):
return None
nextcloud_client = MinimalNextcloudClient(notes_client, username)
# Create search algorithm
async with await _get_authenticated_client_for_userinfo(request) as http_client: # noqa: F841
# Create search algorithm (no client needed - verification removed)
if algorithm == "semantic":
search_algo = SemanticSearchAlgorithm(score_threshold=score_threshold)
elif algorithm == "bm25_hybrid":
@@ -424,6 +386,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
# Execute search (supports cross-app when doc_types=None)
# Get unverified results with buffer for filtering
search_start = time.perf_counter()
all_results = []
if doc_types is None or len(doc_types) == 0:
# Cross-app search - search all indexed types
@@ -449,13 +412,28 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
# Sort by score before verification
all_results.sort(key=lambda r: r.score, reverse=True)
# Verify access for all results (deduplicates and filters)
from nextcloud_mcp_server.search.verification import verify_search_results
# No verification needed for visualization - we only need Qdrant metadata
# (title, excerpt, doc_type) which is already in search results.
# Verification is only needed for sampling (LLM needs full content).
search_results = all_results[:limit]
search_duration = time.perf_counter() - search_start
verified_results = await verify_search_results(
all_results, nextcloud_client
# Normalize scores relative to this result set for better visualization
# (best result = 1.0, worst result = 0.0 within THIS result set)
# This makes visual encoding meaningful regardless of RRF normalization
if search_results:
scores = [r.score for r in search_results]
min_score, max_score = min(scores), max(scores)
score_range = max_score - min_score if max_score > min_score else 1.0
logger.info(
f"Normalizing scores for viz: original range [{min_score:.3f}, {max_score:.3f}] "
f"→ [0.0, 1.0]"
)
search_results = verified_results[:limit]
# Rescale each result's score to 0-1 within this result set
for r in search_results:
r.score = (r.score - min_score) / score_range
if not search_results:
return JSONResponse(
@@ -468,6 +446,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
)
# Fetch vectors for matching results from Qdrant
vector_fetch_start = time.perf_counter()
qdrant_client = await get_qdrant_client()
doc_ids = [r.id for r in search_results]
@@ -518,6 +497,7 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
vectors = np.array(
[v for v in (extract_dense_vector(p) for p in points) if v is not None]
)
vector_fetch_duration = time.perf_counter() - vector_fetch_start
if len(vectors) < 2:
# Not enough points for PCA
@@ -540,8 +520,10 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
)
# Apply PCA dimensionality reduction (768-dim → 2D)
pca_start = time.perf_counter()
pca = PCA(n_components=2)
coords_2d = pca.fit_transform(vectors)
pca_duration = time.perf_counter() - pca_start
# After fit, these attributes are guaranteed to be set
assert pca.explained_variance_ratio_ is not None
@@ -574,6 +556,18 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
for r in search_results
]
# Calculate total request duration
total_duration = time.perf_counter() - request_start
# Log comprehensive timing metrics
logger.info(
f"Viz search timing: total={total_duration * 1000:.1f}ms, "
f"search={search_duration * 1000:.1f}ms ({search_duration / total_duration * 100:.1f}%), "
f"vector_fetch={vector_fetch_duration * 1000:.1f}ms ({vector_fetch_duration / total_duration * 100:.1f}%), "
f"pca={pca_duration * 1000:.1f}ms ({pca_duration / total_duration * 100:.1f}%), "
f"results={len(search_results)}, vectors={len(vectors)}"
)
return JSONResponse(
{
"success": True,
@@ -583,6 +577,14 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
"pc1": float(pca.explained_variance_ratio_[0]),
"pc2": float(pca.explained_variance_ratio_[1]),
},
"timing": {
"total_ms": round(total_duration * 1000, 2),
"search_ms": round(search_duration * 1000, 2),
"vector_fetch_ms": round(vector_fetch_duration * 1000, 2),
"pca_ms": round(pca_duration * 1000, 2),
"num_results": len(search_results),
"num_vectors": len(vectors),
},
}
)
+2 -1
View File
@@ -5,6 +5,7 @@ import time
from abc import ABC
from functools import wraps
import anyio
from httpx import AsyncClient, HTTPStatusError, RequestError, codes
from nextcloud_mcp_server.observability.metrics import (
@@ -47,7 +48,7 @@ def retry_on_429(func):
# Record retry metric (extract app name from args if available)
if len(args) > 0 and hasattr(args[0], "app_name"):
record_nextcloud_api_retry(app=args[0].app_name, reason="429")
time.sleep(5)
await anyio.sleep(5)
elif e.response.status_code == 404:
# 404 errors are often expected (e.g., checking if attachments exist)
# Log as debug instead of warning
+1 -1
View File
@@ -40,7 +40,7 @@ class NotesClient(BaseNextcloudClient):
seen_ids: set[int] = set()
while True:
params: Dict[str, Any] = {"chunkSize": 10}
params: Dict[str, Any] = {"chunkSize": 100}
if cursor:
params["chunkCursor"] = cursor
if prune_before is not None:
@@ -39,7 +39,12 @@ class HealthCheckFilter(logging.Filter):
message = record.getMessage()
return not any(
endpoint in message
for endpoint in ["/health/live", "/health/ready", "/metrics"]
for endpoint in [
"/health/live",
"/health/ready",
"/metrics",
"/app/vector-sync/status",
]
)
+37 -14
View File
@@ -404,10 +404,11 @@ def update_vector_sync_queue_size(size: int) -> None:
def instrument_tool(func):
"""
Decorator to automatically instrument MCP tool functions with metrics.
Decorator to automatically instrument MCP tool functions with metrics and tracing.
Wraps async tool functions to record execution time and success/error status.
Compatible with @mcp.tool() and @require_scopes() decorators.
Wraps async tool functions to record execution time, success/error status, and
create OpenTelemetry trace spans. Compatible with @mcp.tool() and @require_scopes()
decorators.
Usage:
@mcp.tool()
@@ -420,24 +421,46 @@ def instrument_tool(func):
func: The async function to instrument
Returns:
Wrapped function with metrics instrumentation
Wrapped function with metrics and tracing instrumentation
"""
import functools
import time
from nextcloud_mcp_server.observability.tracing import trace_operation
@functools.wraps(func)
async def wrapper(*args, **kwargs):
tool_name = func.__name__
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
record_tool_call(tool_name, duration, "success")
return result
except Exception as e:
duration = time.time() - start_time
record_tool_call(tool_name, duration, "error")
record_tool_error(tool_name, type(e).__name__)
raise
# Extract tool arguments for tracing (sanitize sensitive fields)
# kwargs contains the actual arguments passed to the tool
tool_args = {
k: v
for k, v in kwargs.items()
if k not in ("password", "token", "secret", "api_key", "etag", "ctx")
}
# Create trace span with metrics collection
with trace_operation(
f"mcp.tool.{tool_name}",
attributes={
"mcp.tool.name": tool_name,
"mcp.tool.args": str(tool_args)[:500]
if tool_args
else None, # Limit to 500 chars
},
record_exception=True,
):
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
record_tool_call(tool_name, duration, "success")
return result
except Exception as e:
duration = time.time() - start_time
record_tool_call(tool_name, duration, "error")
record_tool_error(tool_name, type(e).__name__)
raise
return wrapper
-122
View File
@@ -1,122 +0,0 @@
"""Access verification for search results.
This module provides centralized verification of Nextcloud access permissions
for search results. Verification happens at the final output stage (MCP tool/viz endpoint)
rather than within individual search algorithms, preventing redundant API calls.
Key benefits:
- Deduplication: Each document verified exactly once (even in hybrid mode)
- Parallel execution: All verifications run concurrently via anyio task groups
- Separation of concerns: Algorithms handle scoring, this module handles security
"""
import logging
from dataclasses import replace
from typing import Protocol
import anyio
from nextcloud_mcp_server.search.algorithms import SearchResult
logger = logging.getLogger(__name__)
class NextcloudClientProtocol(Protocol):
"""Protocol for Nextcloud client with app-specific access."""
@property
def notes(self):
"""Notes client for accessing notes API."""
...
async def verify_search_results(
results: list[SearchResult],
nextcloud_client: NextcloudClientProtocol,
) -> list[SearchResult]:
"""
Verify Nextcloud access for search results.
Deduplicates by (doc_id, doc_type), verifies in parallel using anyio task groups,
and filters out inaccessible documents. Maintains original result ordering.
Args:
results: Unverified search results from Qdrant
nextcloud_client: Nextcloud client for access checks
Returns:
Verified and accessible results (same order as input)
Example:
>>> unverified = await search_algo.search(query="test", limit=10)
>>> verified = await verify_search_results(unverified, client)
>>> # verified contains only documents user can access
"""
# Deduplicate by (doc_id, doc_type) while preserving order
# This is critical for hybrid search where same doc may appear in multiple algorithm results
seen = set()
unique_results = []
for result in results:
key = (result.id, result.doc_type)
if key not in seen:
seen.add(key)
unique_results.append(result)
if not unique_results:
return []
logger.debug(
f"Verifying access for {len(unique_results)} unique documents "
f"(from {len(results)} total results)"
)
# Verify all unique documents in parallel using anyio task group
# Use list to maintain order (index-based storage)
verified_results = [None] * len(unique_results)
async def verify_one(index: int, result: SearchResult):
"""
Verify a single document and store result at index.
Args:
index: Position in verified_results list
result: Search result to verify
"""
try:
if result.doc_type == "note":
# Fetch note to verify access and get fresh metadata
note = await nextcloud_client.notes.get_note(result.id)
# Update metadata with fresh data from Nextcloud
updated_metadata = {**(result.metadata or {}), **note}
verified_results[index] = replace(result, metadata=updated_metadata)
# TODO: Add verification for other doc types (calendar, deck, file, etc.)
else:
# For now, assume other types are accessible
# In production, add proper verification for each type
logger.debug(
f"No verification implemented for doc_type={result.doc_type}, "
"assuming accessible"
)
verified_results[index] = result
except Exception as e:
# Document is inaccessible (403, 404, or other error)
# Log at debug level since this is expected for filtered results
logger.debug(f"Document {result.doc_type}/{result.id} not accessible: {e}")
verified_results[index] = None
# Run all verifications in parallel using anyio task group
# This provides structured concurrency with automatic cancellation on errors
async with anyio.create_task_group() as tg:
for idx, result in enumerate(unique_results):
tg.start_soon(verify_one, idx, result)
# Filter out None (inaccessible) and return verified results
accessible = [r for r in verified_results if r is not None]
logger.debug(
f"Verification complete: {len(accessible)} accessible, "
f"{len(unique_results) - len(accessible)} filtered out"
)
return accessible
+59 -32
View File
@@ -2,6 +2,7 @@
import logging
import anyio
from httpx import RequestError
from mcp.server.fastmcp import Context, FastMCP
from mcp.shared.exceptions import McpError
@@ -121,11 +122,18 @@ def configure_semantic_tools(mcp: FastMCP):
# Sort combined results by score
all_results.sort(key=lambda r: r.score, reverse=True)
# Verify access for all results (deduplicates and filters)
from nextcloud_mcp_server.search.verification import verify_search_results
# Deduplicate results (hybrid search may return same doc from dense + sparse)
# Qdrant already filters by user_id for multi-tenant isolation
# Sampling tool will verify access when fetching full content
seen = set()
unique_results = []
for result in all_results:
key = (result.id, result.doc_type)
if key not in seen:
seen.add(key)
unique_results.append(result)
verified_results = await verify_search_results(all_results, client)
search_results = verified_results[:limit] # Final limit after verification
search_results = unique_results[:limit] # Final limit after deduplication
# Convert SearchResult objects to SemanticSearchResult for response
results = []
@@ -302,35 +310,55 @@ def configure_semantic_tools(mcp: FastMCP):
success=True,
)
# 4. Fetch full content for notes to provide complete context to LLM
# Filter out inaccessible notes (deleted or permissions changed)
# 4. Fetch full content for notes in parallel (also verifies access)
# Use anyio task group for concurrent fetching with semaphore to prevent
# connection pool exhaustion
client = await get_client(ctx)
accessible_results = []
full_contents = [] # Full content for accessible notes
accessible_results = [None] * len(search_response.results)
full_contents = [None] * len(search_response.results)
for result in search_response.results:
if result.doc_type == "note":
try:
note = await client.notes.get_note(result.id)
# Note is accessible, store full content
accessible_results.append(result)
full_contents.append(note.get("content", ""))
logger.debug(
f"Fetched full content for note {result.id} "
f"(length: {len(full_contents[-1])} chars)"
)
except Exception as e:
# Note might have been deleted or permissions changed
# Filter it out to avoid corrupting LLM with inaccessible data
logger.warning(
f"Failed to fetch full content for note {result.id}: {e}. "
f"Excluding from results."
)
else:
# Non-note document types (future: calendar, deck, files)
# For now, keep them with excerpts
accessible_results.append(result)
full_contents.append(None)
# Limit concurrent requests to prevent connection pool exhaustion
max_concurrent = 20
semaphore = anyio.Semaphore(max_concurrent)
async def fetch_content(index: int, result: SemanticSearchResult):
"""Fetch full content for a single document (parallel with semaphore)."""
async with semaphore:
if result.doc_type == "note":
try:
note = await client.notes.get_note(result.id)
# Note is accessible, store result and full content
content = note.get("content", "")
accessible_results[index] = result
full_contents[index] = content
logger.debug(
f"Fetched full content for note {result.id} "
f"(length: {len(content)} chars)"
)
except Exception as e:
# Note might have been deleted or permissions changed
# Leave as None to filter out later
logger.debug(
f"Note {result.id} not accessible: {e}. "
f"Excluding from results."
)
else:
# Non-note document types (future: calendar, deck, files)
# For now, keep them with excerpts
accessible_results[index] = result
# full_contents[index] remains None (will use excerpt)
# Run all fetches in parallel using anyio task group
async with anyio.create_task_group() as tg:
for idx, result in enumerate(search_response.results):
tg.start_soon(fetch_content, idx, result)
# Filter out None (inaccessible notes) while preserving order
final_pairs = [
(r, c) for r, c in zip(accessible_results, full_contents) if r is not None
]
accessible_results = [r for r, c in final_pairs]
full_contents = [c for r, c in final_pairs]
# Check if we filtered out all results
if not accessible_results:
@@ -382,7 +410,6 @@ def configure_semantic_tools(mcp: FastMCP):
)
# 6. Request LLM completion via MCP sampling with timeout
import anyio
try:
with anyio.fail_after(30):
+7
View File
@@ -8,6 +8,7 @@ import time
import uuid
import anyio
from anyio.abc import TaskStatus
from anyio.streams.memory import MemoryObjectReceiveStream
from httpx import HTTPStatusError
from qdrant_client.models import FieldCondition, Filter, MatchValue, PointStruct
@@ -34,6 +35,8 @@ async def processor_task(
shutdown_event: anyio.Event,
nc_client: NextcloudClient,
user_id: str,
*,
task_status: TaskStatus = anyio.TASK_STATUS_IGNORED,
):
"""
Process documents from stream concurrently.
@@ -53,9 +56,13 @@ async def processor_task(
shutdown_event: Event signaling shutdown
nc_client: Authenticated Nextcloud client
user_id: User being processed
task_status: Status object for signaling task readiness
"""
logger.info(f"Processor {worker_id} started")
# Signal that the task has started and is ready
task_status.started()
while not shutdown_event.is_set():
try:
# Get document with timeout (allows checking shutdown)
+68 -59
View File
@@ -8,6 +8,7 @@ import time
from dataclasses import dataclass
import anyio
from anyio.abc import TaskStatus
from anyio.streams.memory import MemoryObjectSendStream
from qdrant_client.models import FieldCondition, Filter, MatchValue
@@ -93,6 +94,8 @@ async def scanner_task(
wake_event: anyio.Event,
nc_client: NextcloudClient,
user_id: str,
*,
task_status: TaskStatus = anyio.TASK_STATUS_IGNORED,
):
"""
Periodic scanner that detects changed documents for enabled user.
@@ -105,10 +108,14 @@ async def scanner_task(
wake_event: Event to trigger immediate scan
nc_client: Authenticated Nextcloud client
user_id: User to scan
task_status: Status object for signaling task readiness
"""
logger.info(f"Scanner task started for user: {user_id}")
settings = get_settings()
# Signal that the task has started and is ready
task_status.started()
async with send_stream:
while not shutdown_event.is_set():
try:
@@ -175,73 +182,43 @@ async def scan_user_documents(
f"[SCAN-{scan_id}] Using pruneBefore={prune_before} to optimize data transfer"
)
# Fetch all notes from Nextcloud
notes = [
note
async for note in nc_client.notes.get_all_notes(prune_before=prune_before)
]
logger.info(f"[SCAN-{scan_id}] Found {len(notes)} notes for {user_id}")
# Get indexed state from Qdrant first (for incremental sync)
indexed_docs = {}
if not initial_sync:
qdrant_client = await get_qdrant_client()
scroll_result = await qdrant_client.scroll(
collection_name=get_settings().get_collection_name(),
scroll_filter=Filter(
must=[
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
FieldCondition(key="doc_type", match=MatchValue(value="note")),
]
),
with_payload=["doc_id", "indexed_at"],
with_vectors=False,
limit=10000,
)
# Record documents scanned
record_vector_sync_scan(len(notes))
indexed_docs = {
point.payload["doc_id"]: point.payload["indexed_at"]
for point in scroll_result[0]
}
if initial_sync:
# Send everything on first sync
for note in notes:
modified_at = note.get("modified", 0)
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=str(note["id"]),
doc_type="note",
operation="index",
modified_at=modified_at,
)
)
logger.info(f"Sent {len(notes)} documents for initial sync: {user_id}")
return
logger.debug(f"Found {len(indexed_docs)} indexed documents in Qdrant")
# Get indexed state from Qdrant
qdrant_client = await get_qdrant_client()
scroll_result = await qdrant_client.scroll(
collection_name=get_settings().get_collection_name(),
scroll_filter=Filter(
must=[
FieldCondition(key="user_id", match=MatchValue(value=user_id)),
FieldCondition(key="doc_type", match=MatchValue(value="note")),
]
),
with_payload=["doc_id", "indexed_at"],
with_vectors=False,
limit=10000,
)
indexed_docs = {
point.payload["doc_id"]: point.payload["indexed_at"]
for point in scroll_result[0]
}
logger.debug(f"Found {len(indexed_docs)} indexed documents in Qdrant")
# Compare and queue changes
# Stream notes from Nextcloud and process immediately
note_count = 0
queued = 0
nextcloud_doc_ids = {str(note["id"]) for note in notes}
nextcloud_doc_ids = set()
for note in notes:
async for note in nc_client.notes.get_all_notes(prune_before=prune_before):
note_count += 1
doc_id = str(note["id"])
indexed_at = indexed_docs.get(doc_id)
nextcloud_doc_ids.add(doc_id)
modified_at = note.get("modified", 0)
# If document reappeared, remove from potentially_deleted
doc_key = (user_id, doc_id)
if doc_key in _potentially_deleted:
logger.debug(
f"Document {doc_id} reappeared, removing from deletion grace period"
)
del _potentially_deleted[doc_key]
# Send if never indexed or modified since last index
if indexed_at is None or modified_at > indexed_at:
if initial_sync:
# Send everything on first sync
await send_stream.send(
DocumentTask(
user_id=user_id,
@@ -252,6 +229,38 @@ async def scan_user_documents(
)
)
queued += 1
else:
# Incremental sync: compare with indexed state
indexed_at = indexed_docs.get(doc_id)
# If document reappeared, remove from potentially_deleted
doc_key = (user_id, doc_id)
if doc_key in _potentially_deleted:
logger.debug(
f"Document {doc_id} reappeared, removing from deletion grace period"
)
del _potentially_deleted[doc_key]
# Send if never indexed or modified since last index
if indexed_at is None or modified_at > indexed_at:
await send_stream.send(
DocumentTask(
user_id=user_id,
doc_id=doc_id,
doc_type="note",
operation="index",
modified_at=modified_at,
)
)
queued += 1
# Log and record metrics after streaming
logger.info(f"[SCAN-{scan_id}] Found {note_count} notes for {user_id}")
record_vector_sync_scan(note_count)
if initial_sync:
logger.info(f"Sent {queued} documents for initial sync: {user_id}")
return
# Check for deleted documents (in Qdrant but not in Nextcloud)
# Use grace period: only delete after 2 consecutive scans confirm absence
+3 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "nextcloud-mcp-server"
version = "0.36.0"
version = "0.38.0"
description = "Model Context Protocol (MCP) server for Nextcloud integration - enables AI assistants to interact with Nextcloud data"
authors = [
{name = "Chris Coutinho", email = "chris@coutinho.io"}
@@ -103,7 +103,9 @@ module-root = ""
[dependency-groups]
dev = [
"anthropic>=0.42.0", # For RAG evaluation with Anthropic LLMs
"commitizen>=4.8.2",
"datasets>=3.3.0", # For BeIR nfcorpus dataset loading
"ipython>=9.2.0",
"playwright>=1.49.1",
"pytest>=8.3.5",
+9 -2
View File
@@ -255,8 +255,15 @@ async def nc_mcp_client(anyio_backend) -> AsyncGenerator[ClientSession, Any]:
Note: SSE transport is being deprecated. This fixture uses SSE for compatibility testing.
"""
async for session in create_mcp_client_session_sse(
url="http://localhost:8000/sse", client_name="Basic MCP (SSE)"
# async for session in create_mcp_client_session_sse(
# url="http://localhost:8000/sse", client_name="Basic MCP (SSE)"
# ):
# yield session
async for session in create_mcp_client_session(
url="http://localhost:8000/mcp",
client_name="Basic MCP (HTTP)",
):
yield session
+278
View File
@@ -0,0 +1,278 @@
# RAG Evaluation Tests
This directory contains tests for evaluating the Retrieval-Augmented Generation (RAG) system in the Nextcloud MCP server, specifically the `nc_semantic_search_answer` tool.
## Architecture
The RAG system has two components that are tested independently:
1. **Retrieval** - Vector sync/embedding pipeline (indexed Nextcloud documents → vector database)
2. **Generation** - MCP client LLM synthesis (retrieved context → natural language answer)
See [ADR-013](../../docs/ADR-013-rag-evaluation.md) for full architectural details.
## Test Structure
```
tests/rag_evaluation/
├── README.md # This file
├── conftest.py # Pytest fixtures
├── llm_providers.py # LLM provider abstraction (Ollama/Anthropic)
├── fixtures/
│ └── ground_truth.json # Pre-generated reference answers
├── test_retrieval_quality.py # Retrieval evaluation (Context Recall)
└── test_generation_quality.py # Generation evaluation (Answer Correctness)
```
## Metrics
### Retrieval Evaluation
- **Metric**: Context Recall
- **Method**: Heuristic - Check if ground-truth document IDs appear in top-k results
- **Target**: ≥80% recall
### Generation Evaluation
- **Metric**: Answer Correctness
- **Method**: LLM-as-judge - Compare RAG answer vs ground truth (binary true/false)
- **Evaluation**: External LLM evaluates semantic equivalence
## Dataset
**BeIR/nfcorpus** - Medical/biomedical corpus with ~3,600 documents
**Test Queries** (5 selected):
1. PLAIN-2630: "Alkylphenol Endocrine Disruptors and Allergies" (21 relevant docs)
2. PLAIN-2660: "How Long to Detox From Fish Before Pregnancy?" (20 relevant docs)
3. PLAIN-2510: "Coffee and Artery Function" (16 relevant docs)
4. PLAIN-2430: "Preventing Brain Loss with B Vitamins?" (15 relevant docs)
5. PLAIN-2690: "Chronic Headaches and Pork Tapeworms" (14 relevant docs)
## Setup
### 1. Install Dependencies
```bash
uv sync --group dev
```
This installs:
- `anthropic>=0.42.0` - For Anthropic LLM evaluation
- `click>=8.1.8` - For CLI interface
- `datasets>=3.3.0` - For BeIR nfcorpus dataset loading
### 2. Configure LLM Provider
Set environment variables for your LLM provider:
**Option A: Ollama (default, local/remote)**
```bash
export RAG_EVAL_PROVIDER=ollama
export OLLAMA_HOST=https://ollama.example.com # or RAG_EVAL_OLLAMA_BASE_URL
export RAG_EVAL_OLLAMA_MODEL=llama3.2:1b
```
**Option B: Anthropic (cloud)**
```bash
export RAG_EVAL_PROVIDER=anthropic
export RAG_EVAL_ANTHROPIC_API_KEY=sk-ant-...
export RAG_EVAL_ANTHROPIC_MODEL=claude-3-5-sonnet-20241022
```
### 3. One-Time Setup: Generate Ground Truth
Generate synthetic reference answers for the 5 test queries:
```bash
uv run python tools/rag_eval_cli.py generate
```
**What this does:**
- Downloads nfcorpus dataset to `tests/rag_evaluation/fixtures/nfcorpus/` (cached locally)
- For each of the 5 selected queries, extracts highly relevant documents
- Uses configured LLM to synthesize a reference answer
- Saves to `tests/rag_evaluation/fixtures/ground_truth.json`
**Optional flags:**
- `--provider ollama|anthropic` - Override LLM provider
- `--model MODEL_NAME` - Override model name
- `--force-download` - Re-download nfcorpus dataset
### 4. One-Time Setup: Upload Corpus to Nextcloud
Upload all 3,633 nfcorpus documents as Nextcloud notes:
```bash
uv run python tools/rag_eval_cli.py upload \
--nextcloud-url http://localhost:8000 \
--username admin \
--password admin
```
**What this does:**
- Downloads nfcorpus dataset (if not already cached)
- Uploads all documents as notes in Nextcloud
- Saves document ID → note ID mapping to `tests/rag_evaluation/fixtures/note_mapping.json`
**Optional flags:**
- `--category CATEGORY` - Custom category for notes (default: `nfcorpus_rag_eval`)
- `--force-download` - Re-download nfcorpus dataset
- `--force` - Delete all existing notes in the target category before uploading (efficient corpus refresh)
**Important:** This step requires:
- A running Nextcloud instance with vector sync enabled
- Notes app installed
- Valid credentials
**Duration:** ~10-15 minutes to upload 3,633 documents
## Running Tests
### Run All RAG Evaluation Tests
```bash
uv run pytest tests/rag_evaluation/ -v
```
### Run Specific Test Suites
**Retrieval Quality Only:**
```bash
uv run pytest tests/rag_evaluation/test_retrieval_quality.py -v
```
**Generation Quality Only:**
```bash
uv run pytest tests/rag_evaluation/test_generation_quality.py -v
```
### Run Individual Tests
```bash
uv run pytest tests/rag_evaluation/test_retrieval_quality.py::test_retrieval_context_recall -v
uv run pytest tests/rag_evaluation/test_generation_quality.py::test_answer_correctness -v
```
## Test Execution Flow
**Prerequisites** (one-time setup):
1. Generated ground truth (`tools/rag_eval_cli.py generate`)
2. Uploaded corpus to Nextcloud (`tools/rag_eval_cli.py upload`)
### Retrieval Quality Tests
1. **Setup** (`nfcorpus_test_data` fixture):
- Loads pre-generated ground truth from `fixtures/ground_truth.json`
- Loads note mapping from `fixtures/note_mapping.json`
- Returns test cases with expected note IDs
2. **Test** (`test_retrieval_context_recall`):
- For each query: Perform semantic search (top-10)
- Extract retrieved note IDs
- Calculate Context Recall = (expected ∩ retrieved) / expected
- Assert recall ≥ 80%
3. **Cleanup**:
- None required (notes persist in Nextcloud for reuse)
### Generation Quality Tests
1. **Setup**:
- Same as retrieval tests (reuses `nfcorpus_test_data` fixture)
- Creates evaluation LLM provider
2. **Test** (`test_answer_correctness`):
- For each query: Call `nc_semantic_search_answer` MCP tool
- Extract generated answer
- Use LLM-as-judge to compare vs ground truth
- Assert semantic equivalence (TRUE/FALSE)
3. **Cleanup**:
- LLM provider closed
## Expected Test Duration
**One-time setup:**
- **Generate ground truth**: ~5-10 minutes (5 queries with LLM generation)
- **Upload corpus**: ~10-15 minutes (3,633 documents)
- **Total setup**: ~15-25 minutes
**Test execution** (after setup):
- **Retrieval tests**: ~1-2 minutes (5 queries, no upload/cleanup)
- **Generation tests**: ~5-10 minutes (RAG generation + LLM evaluation)
- **Total per run**: ~6-12 minutes
**Note**: These are NOT smoke tests and are NOT run in CI.
## Limitations & Future Work
**Current Limitations:**
- Only 5 test queries (limited statistical confidence)
- Medical domain bias (may not represent production use cases)
- Synthetic ground truth (LLM-generated, not human-validated)
- Manual test execution (requires external LLM access)
**Future Enhancements:**
- Expand to 50-100 queries for statistical significance
- Add custom test dataset with production-representative documents
- Implement additional metrics (faithfulness, context relevance, answer relevance)
- Create automated benchmarking dashboard
- Test multi-hop reasoning (synthesis questions)
- Evaluate out-of-scope handling ("I don't know" responses)
## Troubleshooting
### Tests Fail with "Ground truth file not found"
Run the generate command first:
```bash
uv run python tools/rag_eval_cli.py generate
```
### Tests Fail with "Note mapping file not found"
Run the upload command first:
```bash
uv run python tools/rag_eval_cli.py upload --nextcloud-url http://localhost:8000 --username admin --password admin
```
### Tests Fail with "MCP sampling client not yet implemented"
The `mcp_sampling_client` fixture is a placeholder. You need to implement MCP client creation with sampling support. See the TODO in `conftest.py`.
### Upload Command Fails
Common issues:
1. **Nextcloud not running**: Ensure Nextcloud is accessible at the URL
2. **Invalid credentials**: Verify username/password
3. **Notes app not installed**: Install Notes app in Nextcloud
4. **Network timeout**: Increase timeout in CLI (currently 60s)
### LLM Timeout
If ground truth generation times out:
1. Increase timeout in `llm_providers.py` (currently 10 min)
2. Use a faster model: `--model llama3.2:1b`
3. Check Ollama/Anthropic service availability
### Dataset Download Fails
The nfcorpus dataset is downloaded automatically. If download fails:
1. Check internet connection
2. Manually download from: https://public.ukp.informatik.tu-darmstadt.de/thakur/BEIR/datasets/nfcorpus.zip
3. Extract to `tests/rag_evaluation/fixtures/nfcorpus/`
4. Or use HuggingFace datasets cache: `~/.cache/huggingface/datasets/BeIR___nfcorpus/`
### Vector Sync Not Indexing Documents
After uploading, vector sync must index the documents:
1. Check vector sync is enabled in Nextcloud
2. Trigger manual sync if needed
3. Wait for background job to process all documents
4. Verify in Qdrant that vectors exist for uploaded notes
## References
- [ADR-013: RAG Evaluation Testing Framework](../../docs/ADR-013-rag-evaluation.md)
- [ADR-008: MCP Sampling for Semantic Search](../../docs/ADR-008-mcp-sampling-for-semantic-search.md)
- [BeIR Benchmark](https://github.com/beir-cellar/beir)
- [NFCorpus Dataset](https://www.cl.uni-heidelberg.de/statnlpgroup/nfcorpus/)
+1
View File
@@ -0,0 +1 @@
"""RAG evaluation tests for the Nextcloud MCP semantic search system."""
+145
View File
@@ -0,0 +1,145 @@
"""Pytest fixtures for RAG evaluation tests.
IMPORTANT: Before running these tests, you must:
1. Generate ground truth: uv run python tools/rag_eval_cli.py generate
2. Upload corpus: uv run python tools/rag_eval_cli.py upload --nextcloud-url http://localhost:8000 --username admin --password admin
This ensures that the ground truth and note mappings are available.
"""
import json
from pathlib import Path
from typing import Any
import pytest
from tests.rag_evaluation.llm_providers import create_llm_provider
# Paths
FIXTURES_DIR = Path(__file__).parent / "fixtures"
GROUND_TRUTH_FILE = FIXTURES_DIR / "ground_truth.json"
NOTE_MAPPING_FILE = FIXTURES_DIR / "note_mapping.json"
@pytest.fixture(scope="session")
def ground_truth_data() -> list[dict[str, Any]]:
"""Load pre-generated ground truth data.
Returns:
List of test cases with query, ground truth answer, and expected doc IDs
Raises:
FileNotFoundError: If ground_truth.json doesn't exist
"""
if not GROUND_TRUTH_FILE.exists():
raise FileNotFoundError(
f"Ground truth file not found: {GROUND_TRUTH_FILE}\n"
"Run: uv run python tools/rag_eval_cli.py generate"
)
with open(GROUND_TRUTH_FILE) as f:
return json.load(f)
@pytest.fixture(scope="session")
def note_mapping() -> dict[str, int]:
"""Load document ID → note ID mapping.
Returns:
Dict mapping nfcorpus document ID to Nextcloud note ID
Raises:
FileNotFoundError: If note_mapping.json doesn't exist
"""
if not NOTE_MAPPING_FILE.exists():
raise FileNotFoundError(
f"Note mapping file not found: {NOTE_MAPPING_FILE}\n"
"Run: uv run python tools/rag_eval_cli.py upload --nextcloud-url ... --username ... --password ..."
)
with open(NOTE_MAPPING_FILE) as f:
return json.load(f)
@pytest.fixture(scope="session")
def nfcorpus_test_data(
ground_truth_data: list[dict[str, Any]],
note_mapping: dict[str, int],
):
"""Prepare nfcorpus test data for evaluation.
This fixture combines ground truth answers with note mappings to create
test cases ready for retrieval and generation quality tests.
Args:
ground_truth_data: Pre-generated ground truth answers
note_mapping: Document ID note ID mapping
Returns:
List of test cases with query, ground truth, expected doc IDs, and note IDs
"""
test_cases = []
for gt in ground_truth_data:
# Map expected document IDs to note IDs
expected_note_ids = [
note_mapping.get(doc_id)
for doc_id in gt["expected_document_ids"]
if doc_id in note_mapping
]
# Filter out None values (docs that weren't uploaded)
expected_note_ids = [nid for nid in expected_note_ids if nid is not None]
test_cases.append(
{
"query_id": gt["query_id"],
"query_text": gt["query_text"],
"ground_truth_answer": gt["ground_truth_answer"],
"expected_document_ids": gt["expected_document_ids"],
"expected_note_ids": expected_note_ids,
"highly_relevant_count": gt["highly_relevant_count"],
}
)
return test_cases
@pytest.fixture(scope="session")
async def evaluation_llm():
"""Create LLM provider for evaluation (separate from MCP client).
Environment variables:
RAG_EVAL_PROVIDER: Provider type (ollama or anthropic)
RAG_EVAL_OLLAMA_BASE_URL: Ollama base URL (or OLLAMA_HOST)
RAG_EVAL_OLLAMA_MODEL: Ollama model name
RAG_EVAL_ANTHROPIC_API_KEY: Anthropic API key
RAG_EVAL_ANTHROPIC_MODEL: Anthropic model name
Returns:
LLM provider instance (OllamaProvider or AnthropicProvider)
"""
llm = create_llm_provider()
yield llm
await llm.close()
@pytest.fixture(scope="session")
async def mcp_sampling_client():
"""Create MCP client that supports sampling for RAG generation.
This fixture creates an MCP client configured to support sampling,
which is required for testing the nc_semantic_search_answer tool.
TODO: Implement MCP client with sampling support
For now, this is a placeholder.
Returns:
MCP client instance with sampling enabled
"""
# TODO: Implement MCP client creation with sampling support
# This will require:
# 1. Creating an MCP client configured for sampling
# 2. Authenticating with Nextcloud
# 3. Ensuring sampling is enabled
pytest.skip("MCP sampling client not yet implemented")
+149
View File
@@ -0,0 +1,149 @@
"""LLM provider abstraction for RAG evaluation.
Supports Ollama (local) and Anthropic (cloud) providers for both ground truth
generation and evaluation.
"""
import os
from typing import Protocol
import httpx
from anthropic import AsyncAnthropic
class LLMProvider(Protocol):
"""Protocol for LLM providers."""
async def generate(self, prompt: str, max_tokens: int = 500) -> str:
"""Generate text from a prompt.
Args:
prompt: The prompt to generate from
max_tokens: Maximum tokens to generate
Returns:
Generated text
"""
...
async def close(self) -> None:
"""Close the provider and release resources."""
...
class OllamaProvider:
"""Ollama provider for local LLM inference."""
def __init__(self, base_url: str, model: str):
"""Initialize Ollama provider.
Args:
base_url: Ollama API base URL (e.g., http://localhost:11434)
model: Model name (e.g., llama3.1:8b)
"""
self.base_url = base_url.rstrip("/")
self.model = model
self.client = httpx.AsyncClient(timeout=600.0) # 10 min timeout for generation
async def generate(self, prompt: str, max_tokens: int = 500) -> str:
"""Generate text using Ollama API."""
response = await self.client.post(
f"{self.base_url}/api/generate",
json={
"model": self.model,
"prompt": prompt,
"stream": False,
"options": {
"num_predict": max_tokens,
"temperature": 0.7,
},
},
)
response.raise_for_status()
data = response.json()
return data["response"]
async def close(self):
"""Close the HTTP client."""
await self.client.aclose()
class AnthropicProvider:
"""Anthropic provider for cloud LLM inference."""
def __init__(self, api_key: str, model: str):
"""Initialize Anthropic provider.
Args:
api_key: Anthropic API key
model: Model name (e.g., claude-3-5-sonnet-20241022)
"""
self.client = AsyncAnthropic(api_key=api_key)
self.model = model
async def generate(self, prompt: str, max_tokens: int = 500) -> str:
"""Generate text using Anthropic API."""
message = await self.client.messages.create(
model=self.model,
max_tokens=max_tokens,
temperature=0.7,
messages=[{"role": "user", "content": prompt}],
)
return message.content[0].text
async def close(self):
"""Close the client (no-op for Anthropic)."""
pass
def create_llm_provider(
provider: str | None = None,
ollama_base_url: str | None = None,
ollama_model: str | None = None,
anthropic_api_key: str | None = None,
anthropic_model: str | None = None,
) -> LLMProvider:
"""Create an LLM provider from environment variables or arguments.
Args:
provider: Provider type ('ollama' or 'anthropic'). Defaults to RAG_EVAL_PROVIDER env var or 'ollama'
ollama_base_url: Ollama base URL. Defaults to RAG_EVAL_OLLAMA_BASE_URL or 'http://localhost:11434'
ollama_model: Ollama model. Defaults to RAG_EVAL_OLLAMA_MODEL or 'llama3.1:8b'
anthropic_api_key: Anthropic API key. Defaults to RAG_EVAL_ANTHROPIC_API_KEY env var
anthropic_model: Anthropic model. Defaults to RAG_EVAL_ANTHROPIC_MODEL or 'claude-3-5-sonnet-20241022'
Returns:
LLMProvider instance
Raises:
ValueError: If provider is invalid or required credentials are missing
"""
# Get provider from args or env
provider = provider or os.environ.get("RAG_EVAL_PROVIDER", "ollama")
if provider == "ollama":
# Try RAG_EVAL_OLLAMA_BASE_URL, then OLLAMA_HOST, then default
base_url = (
ollama_base_url
or os.environ.get("RAG_EVAL_OLLAMA_BASE_URL")
or os.environ.get("OLLAMA_HOST")
or "http://localhost:11434"
)
model = ollama_model or os.environ.get("RAG_EVAL_OLLAMA_MODEL", "llama3.2:1b")
return OllamaProvider(base_url=base_url, model=model)
elif provider == "anthropic":
api_key = anthropic_api_key or os.environ.get("RAG_EVAL_ANTHROPIC_API_KEY")
if not api_key:
raise ValueError(
"Anthropic API key required. Set RAG_EVAL_ANTHROPIC_API_KEY environment variable."
)
model = anthropic_model or os.environ.get(
"RAG_EVAL_ANTHROPIC_MODEL", "claude-3-5-sonnet-20241022"
)
return AnthropicProvider(api_key=api_key, model=model)
else:
raise ValueError(
f"Invalid provider: {provider}. Must be 'ollama' or 'anthropic'."
)
@@ -0,0 +1,139 @@
"""Tests for RAG generation quality (Answer Correctness metric).
These tests evaluate whether the MCP client LLM generates factually correct
answers from retrieved context using the nc_semantic_search_answer tool.
Metric: Answer Correctness
- Measures: Is the generated answer factually correct?
- Method: LLM-as-judge - Compare RAG answer vs ground truth (binary true/false)
- Evaluation: External LLM evaluates semantic equivalence
"""
import pytest
@pytest.mark.integration
async def test_answer_correctness(
mcp_sampling_client,
evaluation_llm,
nfcorpus_test_data,
):
"""Test that RAG system generates factually correct answers.
For each test query:
1. Execute full RAG pipeline via nc_semantic_search_answer MCP tool
2. Extract generated answer from RAG response
3. Use LLM-as-judge to compare against ground truth (binary true/false)
4. Assert answer is semantically equivalent to ground truth
This tests the quality of the generation component (MCP client LLM).
"""
results_summary = []
for test_case in nfcorpus_test_data:
query = test_case["query_text"]
ground_truth = test_case["ground_truth_answer"]
print(f"\n{'=' * 80}")
print(f"Query: {query}")
# Execute full RAG pipeline
print("Executing RAG pipeline...")
rag_result = await mcp_sampling_client.call_tool(
"nc_semantic_search_answer",
arguments={"query": query, "limit": 5},
)
rag_answer = rag_result["generated_answer"]
print(f"RAG Answer preview: {rag_answer[:200]}...")
print(f"Ground Truth preview: {ground_truth[:200]}...")
# LLM-as-judge evaluation
evaluation_prompt = f"""Compare these two answers and respond with only TRUE or FALSE.
Question: {query}
Generated Answer: {rag_answer}
Ground Truth Answer: {ground_truth}
Are these answers semantically equivalent (do they convey the same factual information)?
Respond with only: TRUE or FALSE"""
print("Evaluating answer correctness...")
evaluation_result = await evaluation_llm.generate(
evaluation_prompt,
max_tokens=10,
)
is_correct = evaluation_result.strip().upper() == "TRUE"
result = {
"query_id": test_case["query_id"],
"query": query,
"rag_answer_length": len(rag_answer),
"ground_truth_length": len(ground_truth),
"is_correct": is_correct,
"evaluation_result": evaluation_result.strip(),
}
results_summary.append(result)
print(f" Evaluation: {evaluation_result.strip()}")
print(f" Status: {'✓ CORRECT' if is_correct else '✗ INCORRECT'}")
# Assert answer correctness
assert is_correct, (
f"Answer mismatch for query: {query}\n\n"
f"Generated Answer:\n{rag_answer}\n\n"
f"Ground Truth:\n{ground_truth}\n\n"
f"Evaluation: {evaluation_result.strip()}"
)
# Print summary
print(f"\n{'=' * 80}")
print("Answer Correctness Summary:")
print(f" Total queries: {len(results_summary)}")
print(f" Correct: {sum(r['is_correct'] for r in results_summary)}")
print(f" Incorrect: {sum(not r['is_correct'] for r in results_summary)}")
accuracy = sum(r["is_correct"] for r in results_summary) / len(results_summary)
print(f" Accuracy: {accuracy:.2%}")
print(f"{'=' * 80}")
@pytest.mark.integration
async def test_answer_contains_sources(mcp_sampling_client, nfcorpus_test_data):
"""Test that RAG answers include source citations.
This is a basic quality check - we verify that the nc_semantic_search_answer
tool returns both a generated answer and source documents.
"""
for test_case in nfcorpus_test_data:
query = test_case["query_text"]
# Execute RAG pipeline
rag_result = await mcp_sampling_client.call_tool(
"nc_semantic_search_answer",
arguments={"query": query, "limit": 5},
)
# Check response structure
assert "generated_answer" in rag_result, "Response missing 'generated_answer'"
assert "sources" in rag_result, "Response missing 'sources'"
# Check sources are provided
sources = rag_result["sources"]
assert len(sources) > 0, f"No sources returned for query: {query}"
# Check each source has required fields
for i, source in enumerate(sources):
assert "document_id" in source or "id" in source, (
f"Source {i} missing document ID"
)
assert "excerpt" in source or "content" in source or "text" in source, (
f"Source {i} missing content"
)
print(f"Query: {query}")
print(f" Sources provided: {len(sources)}")
print(" Status: ✓ PASS")
@@ -0,0 +1,143 @@
"""Tests for RAG retrieval quality (Context Recall metric).
These tests evaluate whether the vector sync/embedding pipeline successfully
retrieves documents containing the answer to a query.
Metric: Context Recall
- Measures: Did we retrieve documents containing the answer?
- Method: Heuristic - Check if ground-truth document IDs appear in top-k results
- Target: 80% recall (at least 80% of expected docs in top-10 results)
"""
import pytest
@pytest.mark.integration
async def test_retrieval_context_recall(nc_client, nfcorpus_test_data):
"""Test that semantic search retrieves documents containing the answer.
For each test query:
1. Perform semantic search (retrieval only, no generation)
2. Extract retrieved document IDs from top-k results
3. Calculate Context Recall: intersection of retrieved and expected docs
4. Assert recall meets threshold (80%)
This tests the quality of the vector sync/embedding pipeline.
"""
# Top-k documents to retrieve
k = 10
# Minimum acceptable recall
min_recall = 0.8
results_summary = []
for test_case in nfcorpus_test_data:
query = test_case["query_text"]
expected_note_ids = set(test_case["expected_note_ids"])
# Perform semantic search (retrieval only)
search_results = await nc_client.notes.semantic_search(
query=query,
limit=k,
)
# Extract retrieved note IDs
retrieved_note_ids = {result["id"] for result in search_results}
# Calculate Context Recall
intersection = expected_note_ids & retrieved_note_ids
recall = len(intersection) / len(expected_note_ids) if expected_note_ids else 0
# Store results
result = {
"query_id": test_case["query_id"],
"query": query,
"expected_count": len(expected_note_ids),
"retrieved_count": len(retrieved_note_ids),
"intersection_count": len(intersection),
"recall": recall,
"passed": recall >= min_recall,
}
results_summary.append(result)
# Print detailed result for this query
print(f"\n{'=' * 80}")
print(f"Query: {query}")
print(f" Expected docs: {len(expected_note_ids)}")
print(f" Retrieved (top-{k}): {len(retrieved_note_ids)}")
print(f" Intersection: {len(intersection)}")
print(f" Context Recall: {recall:.2%}")
print(f" Status: {'✓ PASS' if result['passed'] else '✗ FAIL'}")
# Assert recall meets threshold
assert recall >= min_recall, (
f"Context Recall {recall:.2%} below threshold {min_recall:.2%} "
f"for query: {query}\n"
f"Expected {len(expected_note_ids)} docs, found {len(intersection)} in top-{k}"
)
# Print summary
print(f"\n{'=' * 80}")
print("Context Recall Summary:")
print(f" Total queries: {len(results_summary)}")
print(f" Passed: {sum(r['passed'] for r in results_summary)}")
print(f" Failed: {sum(not r['passed'] for r in results_summary)}")
print(
f" Average recall: {sum(r['recall'] for r in results_summary) / len(results_summary):.2%}"
)
print(f"{'=' * 80}")
@pytest.mark.integration
async def test_retrieval_top1_precision(nc_client, nfcorpus_test_data):
"""Test that the top-1 retrieved document is highly relevant.
This is a stricter test than context recall - we verify that
the single most relevant document (rank 1) is in the expected set.
This tests whether the ranking is good, not just retrieval.
"""
results_summary = []
for test_case in nfcorpus_test_data:
query = test_case["query_text"]
expected_note_ids = set(test_case["expected_note_ids"])
# Perform semantic search
search_results = await nc_client.notes.semantic_search(
query=query,
limit=1, # Only top-1
)
# Check if top result is in expected set
if search_results:
top_result_id = search_results[0]["id"]
is_relevant = top_result_id in expected_note_ids
else:
is_relevant = False
result = {
"query_id": test_case["query_id"],
"query": query,
"top_result_id": search_results[0]["id"] if search_results else None,
"is_relevant": is_relevant,
}
results_summary.append(result)
print(f"\nQuery: {query}")
print(f" Top-1 relevant: {'✓ YES' if is_relevant else '✗ NO'}")
# This is informational - we don't assert here
# Some queries may have multiple valid top results
# Print summary
precision_at_1 = sum(r["is_relevant"] for r in results_summary) / len(
results_summary
)
print(f"\n{'=' * 80}")
print(f"Precision@1: {precision_at_1:.2%}")
print(
f" ({sum(r['is_relevant'] for r in results_summary)}/{len(results_summary)} queries)"
)
print(f"{'=' * 80}")
+217
View File
@@ -0,0 +1,217 @@
"""
Unit tests for @instrument_tool decorator.
Tests that the decorator correctly instruments MCP tools with both
Prometheus metrics and OpenTelemetry tracing.
"""
from unittest.mock import MagicMock, patch
import pytest
from nextcloud_mcp_server.observability.metrics import instrument_tool
pytestmark = pytest.mark.unit
@pytest.fixture
def mock_metrics():
"""Mock Prometheus metrics."""
with (
patch(
"nextcloud_mcp_server.observability.metrics.record_tool_call"
) as mock_record,
patch(
"nextcloud_mcp_server.observability.metrics.record_tool_error"
) as mock_error,
):
yield {"record_tool_call": mock_record, "record_tool_error": mock_error}
@pytest.fixture
def mock_tracer():
"""Mock OpenTelemetry tracer."""
with patch(
"nextcloud_mcp_server.observability.tracing.trace_operation"
) as mock_trace:
# Configure mock to act as a context manager that allows exceptions to propagate
mock_trace.return_value.__enter__ = MagicMock(return_value=None)
mock_trace.return_value.__exit__ = MagicMock(
return_value=False
) # Return False to allow exceptions to propagate
yield mock_trace
class TestInstrumentToolDecorator:
"""Test the @instrument_tool decorator."""
async def test_decorator_creates_trace_span(self, mock_tracer, mock_metrics):
"""Test that decorator creates OpenTelemetry span with correct attributes."""
@instrument_tool
async def example_tool(query: str, limit: int = 10):
return {"results": []}
# Call the tool
await example_tool(query="test query", limit=5)
# Verify trace_operation was called with correct parameters
mock_tracer.assert_called_once()
call_args = mock_tracer.call_args
# Check span name
assert call_args[0][0] == "mcp.tool.example_tool"
# Check span attributes
attributes = call_args[1]["attributes"]
assert attributes["mcp.tool.name"] == "example_tool"
assert "query" in attributes["mcp.tool.args"]
assert "test query" in attributes["mcp.tool.args"]
assert "limit" in attributes["mcp.tool.args"]
# Verify record_exception parameter
assert call_args[1]["record_exception"] is True
async def test_decorator_sanitizes_sensitive_arguments(
self, mock_tracer, mock_metrics
):
"""Test that sensitive arguments are excluded from span attributes."""
@instrument_tool
async def example_tool(
query: str, password: str, token: str, api_key: str, ctx: object
):
return {"success": True}
# Call with sensitive parameters
await example_tool(
query="test",
password="secret123",
token="bearer_token",
api_key="api_key_123",
ctx=MagicMock(),
)
# Verify trace was created
mock_tracer.assert_called_once()
attributes = mock_tracer.call_args[1]["attributes"]
# Check that sensitive fields are NOT in attributes
tool_args = attributes["mcp.tool.args"]
assert "password" not in tool_args
assert "secret123" not in tool_args
assert "token" not in tool_args
assert "bearer_token" not in tool_args
assert "api_key" not in tool_args
assert "api_key_123" not in tool_args
assert "ctx" not in tool_args
# Check that non-sensitive field IS included
assert "query" in tool_args
assert "test" in tool_args
async def test_decorator_limits_argument_string_length(
self, mock_tracer, mock_metrics
):
"""Test that tool arguments are limited to 500 characters."""
@instrument_tool
async def example_tool(query: str):
return {"results": []}
# Create a very long query string (>500 chars)
long_query = "x" * 1000
await example_tool(query=long_query)
# Verify arguments were truncated
mock_tracer.assert_called_once()
attributes = mock_tracer.call_args[1]["attributes"]
tool_args = attributes["mcp.tool.args"]
assert len(tool_args) <= 500
async def test_decorator_records_success_metrics(self, mock_tracer, mock_metrics):
"""Test that successful tool execution records metrics."""
@instrument_tool
async def example_tool():
return {"success": True}
# Call the tool
await example_tool()
# Verify success metrics were recorded
mock_metrics["record_tool_call"].assert_called_once()
call_args = mock_metrics["record_tool_call"].call_args
assert call_args[0][0] == "example_tool" # tool_name
assert isinstance(call_args[0][1], float) # duration
assert call_args[0][2] == "success" # status
async def test_decorator_records_error_metrics(self, mock_tracer, mock_metrics):
"""Test that tool errors are recorded in metrics."""
@instrument_tool
async def failing_tool():
raise ValueError("Test error")
# Call the tool and expect exception
with pytest.raises(ValueError, match="Test error"):
await failing_tool()
# Verify error metrics were recorded
mock_metrics["record_tool_call"].assert_called_once()
call_args = mock_metrics["record_tool_call"].call_args
assert call_args[0][0] == "failing_tool" # tool_name
assert isinstance(call_args[0][1], float) # duration
assert call_args[0][2] == "error" # status
# Verify error type was recorded
mock_metrics["record_tool_error"].assert_called_once()
error_args = mock_metrics["record_tool_error"].call_args
assert error_args[0][0] == "failing_tool" # tool_name
assert error_args[0][1] == "ValueError" # error_type
async def test_decorator_preserves_function_metadata(
self, mock_tracer, mock_metrics
):
"""Test that decorator preserves function name and docstring."""
@instrument_tool
async def example_tool():
"""This is a test tool."""
return {"success": True}
# Verify function metadata is preserved
assert example_tool.__name__ == "example_tool"
assert example_tool.__doc__ == "This is a test tool."
async def test_decorator_preserves_return_value(self, mock_tracer, mock_metrics):
"""Test that decorator returns the original function's return value."""
@instrument_tool
async def example_tool(value: int):
return {"result": value * 2}
# Call the tool
result = await example_tool(value=5)
# Verify return value is unchanged
assert result == {"result": 10}
async def test_decorator_with_no_arguments(self, mock_tracer, mock_metrics):
"""Test decorator with tool that takes no arguments."""
@instrument_tool
async def no_args_tool():
return {"status": "ok"}
# Call the tool
await no_args_tool()
# Verify tracing works with no arguments
mock_tracer.assert_called_once()
attributes = mock_tracer.call_args[1]["attributes"]
# tool_args should be None when there are no kwargs
assert attributes["mcp.tool.args"] is None
+587
View File
@@ -0,0 +1,587 @@
#!/usr/bin/env python3
"""RAG Evaluation Management CLI.
Commands:
generate - Generate ground truth answers from nfcorpus dataset
upload - Upload nfcorpus documents as Nextcloud notes
Usage:
# Generate ground truth
uv run python tools/rag_eval_cli.py generate
# Upload corpus to Nextcloud
uv run python tools/rag_eval_cli.py upload --nextcloud-url http://localhost:8000 --username admin --password admin
"""
import io
import json
import sys
import zipfile
from pathlib import Path
from typing import Any
import anyio
import click
import httpx
from datasets import load_dataset
from httpx import BasicAuth
# Add parent directory to path to import from tests/
sys.path.insert(0, str(Path(__file__).parent.parent))
from nextcloud_mcp_server.client import NextcloudClient
from tests.rag_evaluation.llm_providers import create_llm_provider
# Paths
FIXTURES_DIR = Path(__file__).parent.parent / "tests" / "rag_evaluation" / "fixtures"
CORPUS_DIR = FIXTURES_DIR / "nfcorpus"
GROUND_TRUTH_FILE = FIXTURES_DIR / "ground_truth.json"
NOTE_MAPPING_FILE = FIXTURES_DIR / "note_mapping.json"
# Dataset URL
NFCORPUS_URL = (
"https://public.ukp.informatik.tu-darmstadt.de/thakur/BEIR/datasets/nfcorpus.zip"
)
# Selected test queries (from ADR-013)
SELECTED_QUERIES = [
"PLAIN-2630", # Alkylphenol Endocrine Disruptors and Allergies
"PLAIN-2660", # How Long to Detox From Fish Before Pregnancy?
"PLAIN-2510", # Coffee and Artery Function
"PLAIN-2430", # Preventing Brain Loss with B Vitamins?
"PLAIN-2690", # Chronic Headaches and Pork Tapeworms
]
def ensure_corpus_downloaded(force_download: bool = False) -> Path:
"""Ensure nfcorpus dataset is downloaded to fixtures directory.
Args:
force_download: Force re-download even if corpus exists
Returns:
Path to corpus directory
Raises:
RuntimeError: If download fails
"""
if CORPUS_DIR.exists() and not force_download:
click.echo(f"Corpus already exists at {CORPUS_DIR}")
return CORPUS_DIR
click.echo(f"Downloading nfcorpus dataset to {CORPUS_DIR}...")
# Create fixtures directory
FIXTURES_DIR.mkdir(parents=True, exist_ok=True)
# Download using HuggingFace datasets library (handles caching)
try:
# Download corpus
click.echo(" Downloading corpus...")
corpus_dataset = load_dataset(
"BeIR/nfcorpus",
"corpus",
split="corpus",
)
# Download queries
click.echo(" Downloading queries...")
queries_dataset = load_dataset(
"BeIR/nfcorpus",
"queries",
split="queries",
)
# Save to local fixtures directory as JSONL
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
# Save corpus
with open(CORPUS_DIR / "corpus.jsonl", "w") as f:
for doc in corpus_dataset:
f.write(json.dumps(doc) + "\n")
# Save queries
with open(CORPUS_DIR / "queries.jsonl", "w") as f:
for query in queries_dataset:
f.write(json.dumps(query) + "\n")
# Download qrels from BEIR directly (not available via HuggingFace)
click.echo(" Downloading qrels from BEIR ZIP...")
with httpx.Client(timeout=300.0) as client:
response = client.get(NFCORPUS_URL)
response.raise_for_status()
# Extract qrels from ZIP
with zipfile.ZipFile(io.BytesIO(response.content)) as zf:
# The qrels are in nfcorpus/qrels/test.tsv within the ZIP
qrels_path = "nfcorpus/qrels/test.tsv"
qrels_dir = CORPUS_DIR / "qrels"
qrels_dir.mkdir(parents=True, exist_ok=True)
qrels_content = zf.read(qrels_path).decode("utf-8")
with open(qrels_dir / "test.tsv", "w") as f:
f.write(qrels_content)
click.echo(f"Dataset downloaded to {CORPUS_DIR}")
return CORPUS_DIR
except Exception as e:
raise RuntimeError(f"Failed to download nfcorpus dataset: {e}") from e
def load_corpus(corpus_dir: Path) -> dict[str, dict]:
"""Load corpus documents from local directory.
Args:
corpus_dir: Path to corpus directory
Returns:
Dict mapping document ID to document data
"""
corpus = {}
with open(corpus_dir / "corpus.jsonl") as f:
for line in f:
doc = json.loads(line)
corpus[doc["_id"]] = doc
return corpus
def load_queries(corpus_dir: Path) -> dict[str, dict]:
"""Load queries from local directory.
Args:
corpus_dir: Path to corpus directory
Returns:
Dict mapping query ID to query data
"""
queries = {}
with open(corpus_dir / "queries.jsonl") as f:
for line in f:
query = json.loads(line)
queries[query["_id"]] = query
return queries
def load_qrels(corpus_dir: Path) -> dict[str, list[tuple[str, int]]]:
"""Load query relevance judgments from local directory.
Args:
corpus_dir: Path to corpus directory
Returns:
Dict mapping query ID to list of (doc_id, score) tuples
"""
qrels: dict[str, list[tuple[str, int]]] = {}
with open(corpus_dir / "qrels" / "test.tsv") as f:
next(f) # Skip header
for line in f:
query_id, corpus_id, score = line.strip().split("\t")
if query_id not in qrels:
qrels[query_id] = []
qrels[query_id].append((corpus_id, int(score)))
# Sort by score descending
for query_id in qrels:
qrels[query_id].sort(key=lambda x: x[1], reverse=True)
return qrels
async def generate_ground_truth_answer(
query_text: str, relevant_docs: list[dict[str, Any]], llm
) -> str:
"""Generate ground truth answer from highly relevant documents.
Args:
query_text: The query/question
relevant_docs: List of highly relevant documents (top 5)
llm: LLM provider instance
Returns:
Generated ground truth answer
"""
# Construct context from documents
context_parts = []
for i, doc in enumerate(relevant_docs, 1):
context_parts.append(
f"Document {i}:\nTitle: {doc['title']}\nText: {doc['text']}\n"
)
context = "\n".join(context_parts)
# Generate ground truth
prompt = f"""Based on the following medical/biomedical documents, provide a comprehensive, factual answer to this question.
Question: {query_text}
{context}
Instructions:
- Provide a clear, well-structured answer that synthesizes information from the documents
- Focus on accuracy and completeness
- Use specific facts and findings from the documents
- Keep the answer concise but informative (2-4 paragraphs)
- Do not make up information not present in the documents
Answer:"""
click.echo(f" Generating answer for: {query_text}")
answer = await llm.generate(prompt, max_tokens=500)
click.echo(f" Generated {len(answer)} characters")
return answer.strip()
@click.group()
def cli():
"""RAG Evaluation Management CLI.
Manage ground truth generation and corpus upload for RAG evaluation tests.
"""
pass
@cli.command()
@click.option(
"--provider",
type=click.Choice(["ollama", "anthropic"]),
default="ollama",
help="LLM provider to use for generation",
)
@click.option(
"--model",
help="Model name (default: llama3.2:1b for Ollama, claude-3-5-sonnet-20241022 for Anthropic)",
)
@click.option(
"--force-download",
is_flag=True,
help="Force re-download of nfcorpus dataset",
)
def generate(provider: str, model: str | None, force_download: bool):
"""Generate ground truth answers for RAG evaluation.
This command:
1. Downloads nfcorpus dataset (if not already cached)
2. For each selected query, extracts highly relevant documents
3. Uses an LLM to synthesize a reference answer
4. Saves ground truth to fixtures/ground_truth.json
Environment variables:
RAG_EVAL_PROVIDER: Provider type (ollama or anthropic)
RAG_EVAL_OLLAMA_BASE_URL: Ollama base URL
RAG_EVAL_OLLAMA_MODEL: Ollama model name
RAG_EVAL_ANTHROPIC_API_KEY: Anthropic API key
RAG_EVAL_ANTHROPIC_MODEL: Anthropic model name
"""
async def _generate():
click.echo("=" * 80)
click.echo("RAG Ground Truth Generation")
click.echo("=" * 80)
# Ensure corpus is downloaded
corpus_dir = ensure_corpus_downloaded(force_download)
# Load dataset
click.echo("\nLoading nfcorpus dataset...")
corpus = load_corpus(corpus_dir)
queries = load_queries(corpus_dir)
qrels = load_qrels(corpus_dir)
click.echo(f"Loaded {len(corpus)} documents, {len(queries)} queries")
# Create LLM provider
click.echo("\nInitializing LLM provider...")
try:
llm = create_llm_provider(
provider=provider,
ollama_model=model if provider == "ollama" else None,
anthropic_model=model if provider == "anthropic" else None,
)
provider_type = type(llm).__name__
click.echo(f"Using provider: {provider_type}")
except ValueError as e:
click.echo(f"\nError: {e}", err=True)
return 1
# Generate ground truth for each selected query
ground_truth_data = []
try:
for query_id in SELECTED_QUERIES:
if query_id not in queries:
click.echo(
f"\nWarning: Query {query_id} not found in dataset", err=True
)
continue
query = queries[query_id]
query_text = query["text"]
# Get highly relevant documents (score=2)
if query_id not in qrels:
click.echo(
f"\nWarning: No relevance judgments for {query_id}", err=True
)
continue
highly_relevant_doc_ids = [
doc_id for doc_id, score in qrels[query_id] if score == 2
]
if not highly_relevant_doc_ids:
click.echo(
f"\nWarning: No highly relevant docs for {query_id}", err=True
)
continue
# Get top 5 highly relevant documents
relevant_docs = []
for doc_id in highly_relevant_doc_ids[:5]:
if doc_id in corpus:
relevant_docs.append(corpus[doc_id])
if not relevant_docs:
click.echo(
f"\nWarning: Could not load documents for {query_id}", err=True
)
continue
# Generate ground truth answer
click.echo(f"\n{'-' * 80}")
ground_truth_answer = await generate_ground_truth_answer(
query_text, relevant_docs, llm
)
# Store result
ground_truth_data.append(
{
"query_id": query_id,
"query_text": query_text,
"ground_truth_answer": ground_truth_answer,
"expected_document_ids": highly_relevant_doc_ids,
"highly_relevant_count": len(highly_relevant_doc_ids),
}
)
click.echo(f" Preview: {ground_truth_answer[:200]}...")
finally:
await llm.close()
# Save ground truth
GROUND_TRUTH_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(GROUND_TRUTH_FILE, "w") as f:
json.dump(ground_truth_data, f, indent=2)
click.echo(f"\n{'=' * 80}")
click.echo(f"Generated {len(ground_truth_data)} ground truth answers")
click.echo(f"Saved to: {GROUND_TRUTH_FILE}")
click.echo("=" * 80)
return 0
sys.exit(anyio.run(_generate))
@cli.command()
@click.option(
"--nextcloud-url",
envvar="NEXTCLOUD_HOST",
required=True,
help="Nextcloud base URL (e.g., http://localhost:8000)",
)
@click.option(
"--username",
envvar="NEXTCLOUD_USERNAME",
required=True,
help="Nextcloud username",
)
@click.option(
"--password",
envvar="NEXTCLOUD_PASSWORD",
required=True,
help="Nextcloud password",
)
@click.option(
"--category",
default="nfcorpus_rag_eval",
help="Category/folder for uploaded notes",
)
@click.option(
"--force-download",
is_flag=True,
help="Force re-download of nfcorpus dataset",
)
@click.option(
"--force",
is_flag=True,
help="Delete all existing notes in the target category before uploading",
)
def upload(
nextcloud_url: str,
username: str,
password: str,
category: str,
force_download: bool,
force: bool,
):
"""Upload nfcorpus corpus documents as Nextcloud notes.
This command:
1. Downloads nfcorpus dataset (if not already cached)
2. Optionally deletes existing notes in target category (--force)
3. Uploads all corpus documents as Nextcloud notes
4. Saves document ID note ID mapping to fixtures/note_mapping.json
The note mapping file is used by pytest tests to map expected document IDs
to actual note IDs in Nextcloud.
"""
async def _upload():
click.echo("=" * 80)
click.echo("Upload nfcorpus Corpus to Nextcloud")
click.echo("=" * 80)
# Ensure corpus is downloaded
corpus_dir = ensure_corpus_downloaded(force_download)
# Load corpus
click.echo("\nLoading corpus...")
corpus = load_corpus(corpus_dir)
click.echo(f"Loaded {len(corpus)} documents")
# Create Nextcloud client
click.echo(f"\nConnecting to Nextcloud at {nextcloud_url}...")
nc_client = NextcloudClient(
base_url=nextcloud_url,
username=username,
auth=BasicAuth(username, password),
)
try:
# Delete existing notes in category if force is specified
if force:
click.echo(
f"\n--force specified: Deleting existing notes in category '{category}'..."
)
# Collect notes to delete
notes_to_delete = []
async for note in nc_client.notes.get_all_notes():
if note.get("category") == category:
notes_to_delete.append(note["id"])
if not notes_to_delete:
click.echo(f"No existing notes found in category '{category}'")
else:
click.echo(f"Found {len(notes_to_delete)} notes to delete")
deleted_count = 0
delete_errors = []
delete_semaphore = anyio.Semaphore(20)
async def delete_note(note_id: int):
"""Delete a single note."""
nonlocal deleted_count
async with delete_semaphore:
try:
await nc_client.notes.delete_note(note_id)
deleted_count += 1
if deleted_count % 100 == 0:
click.echo(f" Deleted {deleted_count} notes...")
except Exception as e:
error_msg = f"Error deleting note {note_id}: {e}"
delete_errors.append(error_msg)
click.echo(f" {error_msg}", err=True)
# Delete all notes concurrently
async with anyio.create_task_group() as tg:
for note_id in notes_to_delete:
tg.start_soon(delete_note, note_id)
click.echo(
f"Deleted {deleted_count} existing notes in category '{category}'"
)
if delete_errors:
click.echo(
f"Encountered {len(delete_errors)} errors during deletion",
err=True,
)
# Upload documents concurrently
click.echo(f"\nUploading {len(corpus)} documents as notes (concurrent)...")
click.echo(f"Category: {category}")
note_mapping = {}
uploaded_count = 0
upload_errors = []
# Semaphore to limit concurrent uploads (avoid overwhelming server)
max_concurrent = 20
semaphore = anyio.Semaphore(max_concurrent)
async def upload_document(doc_id: str, doc: dict[str, Any]):
"""Upload a single document as a note."""
nonlocal uploaded_count
async with semaphore:
title = f"[{doc_id}] {doc['title'][:100]}" # Truncate long titles
content = doc["text"]
try:
note_data = await nc_client.notes.create_note(
title=title,
content=content,
category=category,
)
# Store mapping
note_id = note_data["id"]
note_mapping[doc_id] = note_id
uploaded_count += 1
# Progress indicator every 100 docs
if uploaded_count % 100 == 0:
click.echo(
f" Uploaded {uploaded_count}/{len(corpus)} documents..."
)
except Exception as e:
error_msg = f"Error uploading {doc_id}: {e}"
upload_errors.append(error_msg)
click.echo(f" {error_msg}", err=True)
# Upload all documents concurrently using task group
async with anyio.create_task_group() as tg:
for doc_id, doc in corpus.items():
tg.start_soon(upload_document, doc_id, doc)
click.echo(f"\nUploaded {uploaded_count} documents successfully")
if upload_errors:
click.echo(
f"Encountered {len(upload_errors)} errors during upload", err=True
)
# Save note mapping
with open(NOTE_MAPPING_FILE, "w") as f:
json.dump(note_mapping, f, indent=2)
click.echo(f"Saved note mapping to: {NOTE_MAPPING_FILE}")
click.echo(f" Mapped {len(note_mapping)} document IDs to note IDs")
finally:
# Close the Nextcloud client
await nc_client.close()
click.echo("=" * 80)
click.echo("Upload complete!")
click.echo("=" * 80)
return 0
sys.exit(anyio.run(_upload))
if __name__ == "__main__":
cli()
Generated
+957 -1
View File
File diff suppressed because it is too large Load Diff