From 2ab8dad6a5be782f18ec4ed4a9994a7eed1236f7 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sun, 23 Nov 2025 01:56:17 +0100 Subject: [PATCH 1/2] fix: Use WebDAV for tag creation and add LLM-as-a-judge for RAG tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Change create_tag() to use WebDAV POST instead of OCS API which returned 404 in some Nextcloud versions - Add llm_judge() helper that evaluates system output against ground truth with simple TRUE/FALSE prompt - Replace keyword-based assertions in RAG tests with LLM judge for more flexible semantic evaluation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- nextcloud_mcp_server/client/webdav.py | 30 +++++++------ tests/integration/test_rag_openai.py | 61 +++++++++++++++++++++------ 2 files changed, 64 insertions(+), 27 deletions(-) diff --git a/nextcloud_mcp_server/client/webdav.py b/nextcloud_mcp_server/client/webdav.py index 5a5f0cd..e8b3f6f 100644 --- a/nextcloud_mcp_server/client/webdav.py +++ b/nextcloud_mcp_server/client/webdav.py @@ -1398,7 +1398,7 @@ class WebDAVClient(BaseNextcloudClient): user_visible: bool = True, user_assignable: bool = True, ) -> dict[str, Any]: - """Create a system tag via OCS API. + """Create a system tag via WebDAV. Args: name: Name of the tag to create @@ -1411,12 +1411,10 @@ class WebDAVClient(BaseNextcloudClient): Raises: HTTPStatusError: If tag creation fails (409 if already exists) """ + # Use WebDAV POST with JSON body to create tag response = await self._client.post( - "/ocs/v2.php/apps/systemtags/api/v1/tags", - headers={ - "OCS-APIRequest": "true", - "Content-Type": "application/json", - }, + "/remote.php/dav/systemtags/", + headers={"Content-Type": "application/json"}, json={ "name": name, "userVisible": user_visible, @@ -1425,15 +1423,21 @@ class WebDAVClient(BaseNextcloudClient): ) response.raise_for_status() - # Parse OCS response - data = response.json() - ocs_data = data.get("ocs", {}).get("data", {}) + # Extract tag ID from Content-Location header (e.g., /remote.php/dav/systemtags/42) + content_location = response.headers.get("Content-Location", "") + tag_id = None + if content_location: + # Extract the numeric ID from the path + try: + tag_id = int(content_location.rstrip("/").split("/")[-1]) + except (ValueError, IndexError): + pass tag_info = { - "id": ocs_data.get("id"), - "name": ocs_data.get("name", name), - "userVisible": ocs_data.get("userVisible", user_visible), - "userAssignable": ocs_data.get("userAssignable", user_assignable), + "id": tag_id, + "name": name, + "userVisible": user_visible, + "userAssignable": user_assignable, } logger.info(f"Created tag '{name}' with ID {tag_info['id']}") diff --git a/tests/integration/test_rag_openai.py b/tests/integration/test_rag_openai.py index 8f56495..1f750fc 100644 --- a/tests/integration/test_rag_openai.py +++ b/tests/integration/test_rag_openai.py @@ -42,6 +42,34 @@ logger = logging.getLogger(__name__) # Default path to the Nextcloud User Manual PDF DEFAULT_MANUAL_PATH = "Nextcloud Manual.pdf" + +async def llm_judge( + provider: "OpenAIProvider", + ground_truth: str, + system_output: str, +) -> bool: + """Use LLM to judge if system output aligns with ground truth. + + Args: + provider: OpenAI provider with generation capability + ground_truth: The expected/reference answer + system_output: The system's actual output to evaluate + + Returns: + True if output aligns with ground truth, False otherwise + """ + prompt = f"""GROUND TRUTH: {ground_truth} + +SYSTEM OUTPUT: {system_output} + +Does the system output contain the key facts from the ground truth? + +Answer: TRUE or FALSE""" + + response = await provider.generate(prompt, max_tokens=10) + return "TRUE" in response.upper() + + # Skip all tests if OpenAI API key not configured pytestmark = [ pytest.mark.integration, @@ -218,7 +246,7 @@ async def test_openai_embeddings_work(openai_provider: OpenAIProvider): async def test_semantic_search_retrieval( - nc_mcp_client, ground_truth_qa, indexed_manual_pdf + nc_mcp_client, ground_truth_qa, indexed_manual_pdf, openai_generation_provider ): """Test that semantic search retrieves relevant documents from the manual. @@ -228,7 +256,6 @@ async def test_semantic_search_retrieval( # Use first query from ground truth test_case = ground_truth_qa[0] # 2FA question query = test_case["query"] - expected_topics = test_case["expected_topics"] # Perform semantic search via MCP tool result = await nc_mcp_client.call_tool( @@ -248,16 +275,21 @@ async def test_semantic_search_retrieval( assert data["total_found"] > 0, f"No results for query: {query}" assert len(data["results"]) > 0 - # Check that at least one result contains expected topic keywords - all_excerpts = " ".join([r["excerpt"].lower() for r in data["results"]]) - topic_found = any(topic.lower() in all_excerpts for topic in expected_topics) - assert topic_found, ( - f"Expected topics {expected_topics} not found in results for query: {query}" + # Use LLM judge to evaluate if excerpts are relevant to ground truth + all_excerpts = " ".join([r["excerpt"] for r in data["results"]]) + is_relevant = await llm_judge( + openai_generation_provider, + test_case["ground_truth"], + all_excerpts, ) + assert is_relevant, f"LLM judge: excerpts not relevant to query: {query}" async def test_semantic_search_answer_with_sampling( - nc_mcp_client_with_sampling, ground_truth_qa, indexed_manual_pdf + nc_mcp_client_with_sampling, + ground_truth_qa, + indexed_manual_pdf, + openai_generation_provider, ): """Test semantic search with MCP sampling for answer generation. @@ -314,12 +346,13 @@ async def test_semantic_search_answer_with_sampling( assert data["generated_answer"] is not None assert len(data["generated_answer"]) > 50 # Non-trivial answer - # Check answer contains relevant content - answer_lower = data["generated_answer"].lower() - assert any( - keyword in answer_lower - for keyword in ["two-factor", "2fa", "authentication", "password"] - ), f"Answer doesn't seem relevant to query: {data['generated_answer'][:200]}" + # Use LLM judge to evaluate answer relevance + is_relevant = await llm_judge( + openai_generation_provider, + test_case["ground_truth"], + data["generated_answer"], + ) + assert is_relevant, f"LLM judge: answer not relevant to query: {query}" @pytest.mark.parametrize( From fafeaf3d834c9d1f89ce08a4918a85aaf36a15f7 Mon Sep 17 00:00:00 2001 From: Chris Coutinho Date: Sun, 23 Nov 2025 04:02:30 +0100 Subject: [PATCH 2/2] refactor: Move background tasks to server lifespan and deprecate SSE transport MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move scanner/processor tasks from FastMCP session lifespan to Starlette server lifespan (correct architecture: background tasks run once at server level, not per-session) - Change default CLI transport from SSE to streamable-http - Remove SSE transport option from CLI (SSE is deprecated) - Remove SSE client session factory from test fixtures - Add tracing instrumentation to BM25 hybrid search operations for better observability 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- nextcloud_mcp_server/app.py | 399 ++++++++------------- nextcloud_mcp_server/auth/viz_routes.py | 111 ++++-- nextcloud_mcp_server/cli.py | 4 +- nextcloud_mcp_server/search/bm25_hybrid.py | 149 ++++---- tests/conftest.py | 56 +-- 5 files changed, 314 insertions(+), 405 deletions(-) diff --git a/nextcloud_mcp_server/app.py b/nextcloud_mcp_server/app.py index 5e0a5be..b4ab820 100644 --- a/nextcloud_mcp_server/app.py +++ b/nextcloud_mcp_server/app.py @@ -555,15 +555,15 @@ async def load_oauth_client_credentials( @asynccontextmanager async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]: """ - Manage application lifecycle for BasicAuth mode. + Manage application lifecycle for BasicAuth mode (FastMCP session lifespan). Creates a single Nextcloud client with basic authentication - that is shared across all requests. + that is shared across all requests within a session. - If vector sync is enabled (VECTOR_SYNC_ENABLED=true), also starts - background tasks for automatic document indexing (ADR-007). + Note: Background tasks (scanner, processor) are started at server level + in starlette_lifespan, not here. This lifespan runs per-session. """ - logger.info("Starting MCP server in BasicAuth mode") + logger.info("Starting MCP session in BasicAuth mode") logger.info("Creating Nextcloud client with BasicAuth") client = NextcloudClient.from_env() @@ -579,91 +579,12 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]: # Initialize document processors initialize_document_processors() - settings = get_settings() - - # Check if vector sync is enabled - if settings.vector_sync_enabled: - logger.info("Vector sync enabled - starting background tasks") - - # Get username from environment for BasicAuth mode - username = os.getenv("NEXTCLOUD_USERNAME") - if not username: - raise ValueError( - "NEXTCLOUD_USERNAME is required for vector sync in BasicAuth mode" - ) - - # Initialize Qdrant collection before starting background tasks - logger.info("Initializing Qdrant collection...") - from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client - - try: - await get_qdrant_client() # Triggers collection creation if needed - logger.info("Qdrant collection ready") - except Exception as e: - logger.error(f"Failed to initialize Qdrant collection: {e}") - raise RuntimeError( - f"Cannot start vector sync - Qdrant initialization failed: {e}" - ) from e - - # Initialize shared state - send_stream, receive_stream = anyio.create_memory_object_stream( - max_buffer_size=settings.vector_sync_queue_max_size - ) - shutdown_event = anyio.Event() - scanner_wake_event = anyio.Event() - - # Start background tasks using anyio TaskGroup - async with anyio.create_task_group() as tg: - # Start scanner task - await tg.start( - scanner_task, - send_stream, - shutdown_event, - scanner_wake_event, - client, - username, - ) - - # Start processor pool (each gets a cloned receive stream) - for i in range(settings.vector_sync_processor_workers): - await tg.start( - processor_task, - i, - receive_stream.clone(), - shutdown_event, - client, - username, - ) - - logger.info( - f"Background sync tasks started: 1 scanner + {settings.vector_sync_processor_workers} processors" - ) - - # Yield with background tasks running - try: - yield AppContext( - client=client, - storage=storage, - document_send_stream=send_stream, - document_receive_stream=receive_stream, - shutdown_event=shutdown_event, - scanner_wake_event=scanner_wake_event, - ) - finally: - # Shutdown signal - logger.info("Shutting down background sync tasks") - shutdown_event.set() - - # TaskGroup automatically cancels all tasks on exit - logger.info("Background sync tasks stopped") - await client.close() - else: - # No vector sync - simple lifecycle - try: - yield AppContext(client=client, storage=storage) - finally: - logger.info("Shutting down BasicAuth mode") - await client.close() + # Yield client context - scanner runs at server level (starlette_lifespan) + try: + yield AppContext(client=client, storage=storage) + finally: + logger.info("Shutting down BasicAuth session") + await client.close() async def setup_oauth_config(): @@ -979,7 +900,7 @@ async def setup_oauth_config(): ) -def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): +def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None = None): # Initialize observability (logging will be configured by uvicorn) settings = get_settings() @@ -1197,180 +1118,170 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): "Dynamic tool filtering enabled for OAuth mode (JWT and Bearer tokens)" ) - if transport == "sse": - mcp_app = mcp.sse_app() - starlette_lifespan = None - elif transport in ("http", "streamable-http"): - mcp_app = mcp.streamable_http_app() + mcp_app = mcp.streamable_http_app() - @asynccontextmanager - async def starlette_lifespan(app: Starlette): - # Set OAuth context for OAuth login routes (ADR-004) - if oauth_enabled: - # Prepare OAuth config from setup_oauth_config closure variables - mcp_server_url = os.getenv( - "NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000" - ) - nextcloud_resource_uri = os.getenv( - "NEXTCLOUD_RESOURCE_URI", nextcloud_host - ) - discovery_url = os.getenv( - "OIDC_DISCOVERY_URL", - f"{nextcloud_host}/.well-known/openid-configuration", - ) - scopes = os.getenv("NEXTCLOUD_OIDC_SCOPES", "") + @asynccontextmanager + async def starlette_lifespan(app: Starlette): + # Set OAuth context for OAuth login routes (ADR-004) + if oauth_enabled: + # Prepare OAuth config from setup_oauth_config closure variables + mcp_server_url = os.getenv( + "NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000" + ) + nextcloud_resource_uri = os.getenv("NEXTCLOUD_RESOURCE_URI", nextcloud_host) + discovery_url = os.getenv( + "OIDC_DISCOVERY_URL", + f"{nextcloud_host}/.well-known/openid-configuration", + ) + scopes = os.getenv("NEXTCLOUD_OIDC_SCOPES", "") - oauth_context_dict = { - "storage": refresh_token_storage, - "oauth_client": oauth_client, - "token_verifier": token_verifier, # For querying IdP userinfo endpoint - "config": { - "mcp_server_url": mcp_server_url, - "discovery_url": discovery_url, - "client_id": client_id, # From setup_oauth_config (DCR or static) - "client_secret": client_secret, # From setup_oauth_config (DCR or static) - "scopes": scopes, - "nextcloud_host": nextcloud_host, - "nextcloud_resource_uri": nextcloud_resource_uri, - "oauth_provider": oauth_provider, - }, - } - app.state.oauth_context = oauth_context_dict + oauth_context_dict = { + "storage": refresh_token_storage, + "oauth_client": oauth_client, + "token_verifier": token_verifier, # For querying IdP userinfo endpoint + "config": { + "mcp_server_url": mcp_server_url, + "discovery_url": discovery_url, + "client_id": client_id, # From setup_oauth_config (DCR or static) + "client_secret": client_secret, # From setup_oauth_config (DCR or static) + "scopes": scopes, + "nextcloud_host": nextcloud_host, + "nextcloud_resource_uri": nextcloud_resource_uri, + "oauth_provider": oauth_provider, + }, + } + app.state.oauth_context = oauth_context_dict - # Also set oauth_context on browser_app for session authentication - # browser_app is in the same function scope (defined later in create_app) - # We need to find it in the mounted routes - for route in app.routes: - if isinstance(route, Mount) and route.path == "/app": - route.app.state.oauth_context = oauth_context_dict - logger.info( - "OAuth context shared with browser_app for session auth" - ) - break - - logger.info( - f"OAuth context initialized for login routes (client_id={client_id[:16]}...)" - ) - else: - # BasicAuth mode - share storage with browser_app for webhook management - from nextcloud_mcp_server.auth.storage import RefreshTokenStorage - - storage = RefreshTokenStorage.from_env() - await storage.initialize() - - app.state.storage = storage - - # Also share with browser_app for webhook routes - for route in app.routes: - if isinstance(route, Mount) and route.path == "/app": - route.app.state.storage = storage - logger.info( - "Storage shared with browser_app for webhook management" - ) - break - - # Start background vector sync tasks for BasicAuth mode (ADR-007) - # For streamable-http transport, FastMCP lifespan isn't automatically triggered - # so we manually start background tasks here if vector sync is enabled - import anyio as anyio_module - - settings = get_settings() - if not oauth_enabled and settings.vector_sync_enabled: - logger.info("Starting background vector sync tasks for BasicAuth mode") - - # Get username from environment - username = os.getenv("NEXTCLOUD_USERNAME") - if not username: - raise ValueError( - "NEXTCLOUD_USERNAME required for vector sync in BasicAuth mode" + # Also set oauth_context on browser_app for session authentication + # browser_app is in the same function scope (defined later in create_app) + # We need to find it in the mounted routes + for route in app.routes: + if isinstance(route, Mount) and route.path == "/app": + route.app.state.oauth_context = oauth_context_dict + logger.info( + "OAuth context shared with browser_app for session auth" ) + break - # Get Nextcloud client from MCP app context - # Create client since we're outside FastMCP lifespan - client = NextcloudClient.from_env() + logger.info( + f"OAuth context initialized for login routes (client_id={client_id[:16]}...)" + ) + else: + # BasicAuth mode - share storage with browser_app for webhook management + from nextcloud_mcp_server.auth.storage import RefreshTokenStorage - # Initialize Qdrant collection before starting background tasks - logger.info("Initializing Qdrant collection...") - from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client + storage = RefreshTokenStorage.from_env() + await storage.initialize() - try: - await get_qdrant_client() # Triggers collection creation if needed - logger.info("Qdrant collection ready") - except Exception as e: - logger.error(f"Failed to initialize Qdrant collection: {e}") - raise RuntimeError( - f"Cannot start vector sync - Qdrant initialization failed: {e}" - ) from e + app.state.storage = storage - # Initialize shared state - send_stream, receive_stream = anyio_module.create_memory_object_stream( - max_buffer_size=settings.vector_sync_queue_max_size + # Also share with browser_app for webhook routes + for route in app.routes: + if isinstance(route, Mount) and route.path == "/app": + route.app.state.storage = storage + logger.info( + "Storage shared with browser_app for webhook management" + ) + break + + # Start background vector sync tasks for BasicAuth mode (ADR-007) + # Scanner runs at server-level (once), not per-session + import anyio as anyio_module + + settings = get_settings() + if not oauth_enabled and settings.vector_sync_enabled: + logger.info("Starting background vector sync tasks for BasicAuth mode") + + # Get username from environment + username = os.getenv("NEXTCLOUD_USERNAME") + if not username: + raise ValueError( + "NEXTCLOUD_USERNAME required for vector sync in BasicAuth mode" ) - shutdown_event = anyio_module.Event() - scanner_wake_event = anyio_module.Event() - # Store in app state for access from routes (ADR-007) - app.state.document_send_stream = send_stream - app.state.document_receive_stream = receive_stream - app.state.shutdown_event = shutdown_event - app.state.scanner_wake_event = scanner_wake_event + # Create client for vector sync (server-level, not per-session) + client = NextcloudClient.from_env() - # Also share with browser_app for /app route - for route in app.routes: - if isinstance(route, Mount) and route.path == "/app": - route.app.state.document_send_stream = send_stream - route.app.state.document_receive_stream = receive_stream - route.app.state.shutdown_event = shutdown_event - route.app.state.scanner_wake_event = scanner_wake_event - logger.info( - "Vector sync state shared with browser_app for /app" - ) - break + # Initialize Qdrant collection before starting background tasks + logger.info("Initializing Qdrant collection...") + from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client - # Start background tasks using anyio TaskGroup - async with anyio_module.create_task_group() as tg: - # Start scanner task + try: + await get_qdrant_client() # Triggers collection creation if needed + logger.info("Qdrant collection ready") + except Exception as e: + logger.error(f"Failed to initialize Qdrant collection: {e}") + raise RuntimeError( + f"Cannot start vector sync - Qdrant initialization failed: {e}" + ) from e + + # Initialize shared state + send_stream, receive_stream = anyio_module.create_memory_object_stream( + max_buffer_size=settings.vector_sync_queue_max_size + ) + shutdown_event = anyio_module.Event() + scanner_wake_event = anyio_module.Event() + + # Store in app state for access from routes (ADR-007) + app.state.document_send_stream = send_stream + app.state.document_receive_stream = receive_stream + app.state.shutdown_event = shutdown_event + app.state.scanner_wake_event = scanner_wake_event + + # Also share with browser_app for /app route + for route in app.routes: + if isinstance(route, Mount) and route.path == "/app": + route.app.state.document_send_stream = send_stream + route.app.state.document_receive_stream = receive_stream + route.app.state.shutdown_event = shutdown_event + route.app.state.scanner_wake_event = scanner_wake_event + logger.info("Vector sync state shared with browser_app for /app") + break + + # Start background tasks using anyio TaskGroup + async with anyio_module.create_task_group() as tg: + # Start scanner task + await tg.start( + scanner_task, + send_stream, + shutdown_event, + scanner_wake_event, + client, + username, + ) + + # Start processor pool (each gets a cloned receive stream) + for i in range(settings.vector_sync_processor_workers): await tg.start( - scanner_task, - send_stream, + processor_task, + i, + receive_stream.clone(), shutdown_event, - scanner_wake_event, client, username, ) - # Start processor pool (each gets a cloned receive stream) - for i in range(settings.vector_sync_processor_workers): - await tg.start( - processor_task, - i, - receive_stream.clone(), - shutdown_event, - client, - username, - ) + logger.info( + f"Background sync tasks started: 1 scanner + " + f"{settings.vector_sync_processor_workers} processors" + ) - logger.info( - f"Background sync tasks started: 1 scanner + " - f"{settings.vector_sync_processor_workers} processors" - ) - - # Run MCP session manager and yield - async with AsyncExitStack() as stack: - await stack.enter_async_context(mcp.session_manager.run()) - try: - yield - finally: - # Shutdown signal - logger.info("Shutting down background sync tasks") - shutdown_event.set() - await client.close() - # TaskGroup automatically cancels all tasks on exit - else: - # No vector sync - just run MCP session manager + # Run MCP session manager and yield async with AsyncExitStack() as stack: await stack.enter_async_context(mcp.session_manager.run()) - yield + try: + yield + finally: + # Shutdown signal + logger.info("Shutting down background sync tasks") + shutdown_event.set() + await client.close() + # TaskGroup automatically cancels all tasks on exit + else: + # No vector sync - just run MCP session manager + async with AsyncExitStack() as stack: + await stack.enter_async_context(mcp.session_manager.run()) + yield # Health check endpoints for Kubernetes probes def health_live(request): diff --git a/nextcloud_mcp_server/auth/viz_routes.py b/nextcloud_mcp_server/auth/viz_routes.py index 2d6f20f..272f751 100644 --- a/nextcloud_mcp_server/auth/viz_routes.py +++ b/nextcloud_mcp_server/auth/viz_routes.py @@ -22,6 +22,7 @@ from starlette.requests import Request from starlette.responses import HTMLResponse, JSONResponse from nextcloud_mcp_server.config import get_settings +from nextcloud_mcp_server.observability.tracing import trace_operation from nextcloud_mcp_server.search import ( BM25HybridSearchAlgorithm, SemanticSearchAlgorithm, @@ -139,7 +140,10 @@ async def vector_visualization_search(request: Request) -> JSONResponse: _get_authenticated_client_for_userinfo, ) - async with await _get_authenticated_client_for_userinfo(request) as nc_client: # noqa: F841 + with trace_operation("vector_viz.get_auth_client"): + auth_client_ctx = await _get_authenticated_client_for_userinfo(request) + + async with auth_client_ctx as nc_client: # noqa: F841 # Create search algorithm (no client needed - verification removed) if algorithm == "semantic": search_algo = SemanticSearchAlgorithm(score_threshold=score_threshold) @@ -159,24 +163,40 @@ async def vector_visualization_search(request: Request) -> JSONResponse: all_results = [] if doc_types is None or len(doc_types) == 0: # Cross-app search - search all indexed types - unverified_results = await search_algo.search( - query=query, - user_id=username, - limit=limit * 2, # Buffer for verification filtering - doc_type=None, # Search all types - score_threshold=score_threshold, - ) - all_results.extend(unverified_results) - else: - # Search each document type and combine - for doc_type in doc_types: + with trace_operation( + "vector_viz.search_execute", + attributes={ + "search.algorithm": algorithm, + "search.limit": limit * 2, + "search.doc_type": "all", + }, + ): unverified_results = await search_algo.search( query=query, user_id=username, limit=limit * 2, # Buffer for verification filtering - doc_type=doc_type, + doc_type=None, # Search all types score_threshold=score_threshold, ) + all_results.extend(unverified_results) + else: + # Search each document type and combine + for doc_type in doc_types: + with trace_operation( + "vector_viz.search_execute", + attributes={ + "search.algorithm": algorithm, + "search.limit": limit * 2, + "search.doc_type": doc_type, + }, + ): + unverified_results = await search_algo.search( + query=query, + user_id=username, + limit=limit * 2, # Buffer for verification filtering + doc_type=doc_type, + score_threshold=score_threshold, + ) all_results.extend(unverified_results) # Sort by score before verification all_results.sort(key=lambda r: r.score, reverse=True) @@ -190,22 +210,26 @@ async def vector_visualization_search(request: Request) -> JSONResponse: # Store original scores and normalize for visualization # (best result = 1.0, worst result = 0.0 within THIS result set) # This makes visual encoding meaningful regardless of RRF normalization - if search_results: - scores = [r.score for r in search_results] - min_score, max_score = min(scores), max(scores) - score_range = max_score - min_score if max_score > min_score else 1.0 + with trace_operation( + "vector_viz.score_normalize", + attributes={"normalize.num_results": len(search_results)}, + ): + if search_results: + scores = [r.score for r in search_results] + min_score, max_score = min(scores), max(scores) + score_range = max_score - min_score if max_score > min_score else 1.0 - logger.info( - f"Normalizing scores for viz: original range [{min_score:.3f}, {max_score:.3f}] " - f"→ [0.0, 1.0]" - ) + logger.info( + f"Normalizing scores for viz: original range [{min_score:.3f}, {max_score:.3f}] " + f"→ [0.0, 1.0]" + ) - # Store original score and rescale to 0-1 for visualization - for r in search_results: - # Store original score before normalization - r.original_score = r.score - # Rescale for visual encoding - r.score = (r.score - min_score) / score_range + # Store original score and rescale to 0-1 for visualization + for r in search_results: + # Store original score before normalization + r.original_score = r.score + # Rescale for visual encoding + r.score = (r.score - min_score) / score_range if not search_results: return JSONResponse( @@ -220,7 +244,9 @@ async def vector_visualization_search(request: Request) -> JSONResponse: # Fetch vectors for specific matching chunks from Qdrant using batch retrieve vector_fetch_start = time.perf_counter() - qdrant_client = await get_qdrant_client() + + with trace_operation("vector_viz.get_qdrant_client"): + qdrant_client = await get_qdrant_client() chunk_vectors_map = {} # Map (doc_id, chunk_start, chunk_end) -> vector @@ -231,12 +257,16 @@ async def vector_visualization_search(request: Request) -> JSONResponse: if point_ids: # Single batch retrieve call instead of N sequential scroll calls # This is ~50x faster for 50 results (1 HTTP request vs 50) - points_response = await qdrant_client.retrieve( - collection_name=settings.get_collection_name(), - ids=point_ids, - with_vectors=["dense"], - with_payload=["doc_id", "chunk_start_offset", "chunk_end_offset"], - ) + with trace_operation( + "vector_viz.vector_retrieve", + attributes={"retrieve.num_points": len(point_ids)}, + ): + points_response = await qdrant_client.retrieve( + collection_name=settings.get_collection_name(), + ids=point_ids, + with_vectors=["dense"], + with_payload=["doc_id", "chunk_start_offset", "chunk_end_offset"], + ) # Build chunk_vectors_map from batch response for point in points_response: @@ -367,9 +397,16 @@ async def vector_visualization_search(request: Request) -> JSONResponse: import anyio - coords_3d, pca = await anyio.to_thread.run_sync( # type: ignore[attr-defined] - lambda: _compute_pca(all_vectors_normalized) - ) + with trace_operation( + "vector_viz.pca_compute", + attributes={ + "pca.num_vectors": len(all_vectors_normalized), + "pca.embedding_dim": embedding_dim, + }, + ): + coords_3d, pca = await anyio.to_thread.run_sync( # type: ignore[attr-defined] + lambda: _compute_pca(all_vectors_normalized) + ) pca_duration = time.perf_counter() - pca_start # After fit, these attributes are guaranteed to be set diff --git a/nextcloud_mcp_server/cli.py b/nextcloud_mcp_server/cli.py index 3b93cae..a7b9d06 100644 --- a/nextcloud_mcp_server/cli.py +++ b/nextcloud_mcp_server/cli.py @@ -29,9 +29,9 @@ from .app import get_app @click.option( "--transport", "-t", - default="sse", + default="streamable-http", show_default=True, - type=click.Choice(["sse", "streamable-http", "http"]), + type=click.Choice(["streamable-http", "http"]), help="MCP transport protocol", ) @click.option( diff --git a/nextcloud_mcp_server/search/bm25_hybrid.py b/nextcloud_mcp_server/search/bm25_hybrid.py index 717288b..a10e3db 100644 --- a/nextcloud_mcp_server/search/bm25_hybrid.py +++ b/nextcloud_mcp_server/search/bm25_hybrid.py @@ -9,6 +9,7 @@ from qdrant_client.models import FieldCondition, Filter, MatchValue from nextcloud_mcp_server.config import get_settings from nextcloud_mcp_server.embedding import get_bm25_service, get_embedding_service from nextcloud_mcp_server.observability.metrics import record_qdrant_operation +from nextcloud_mcp_server.observability.tracing import trace_operation from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client @@ -99,15 +100,19 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm): ) # Generate dense embedding for semantic search - embedding_service = get_embedding_service() - dense_embedding = await embedding_service.embed(query) + with trace_operation("search.get_embedding_service"): + embedding_service = get_embedding_service() + with trace_operation("search.dense_embedding"): + dense_embedding = await embedding_service.embed(query) # Store for reuse by callers (e.g., viz_routes PCA visualization) self.query_embedding = dense_embedding logger.debug(f"Generated dense embedding (dimension={len(dense_embedding)})") # Generate sparse embedding for BM25 keyword search - bm25_service = get_bm25_service() - sparse_embedding = await bm25_service.encode_async(query) + with trace_operation("search.get_bm25_service"): + bm25_service = get_bm25_service() + with trace_operation("search.sparse_embedding_bm25"): + sparse_embedding = await bm25_service.encode_async(query) logger.debug( f"Generated sparse embedding " f"({len(sparse_embedding['indices'])} non-zero terms)" @@ -134,38 +139,44 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm): query_filter = Filter(must=filter_conditions) # Execute hybrid search with Qdrant native RRF fusion - qdrant_client = await get_qdrant_client() + with trace_operation("search.get_qdrant_client"): + qdrant_client = await get_qdrant_client() + try: # Use prefetch to run both dense and sparse searches # Qdrant will automatically merge results using RRF - search_response = await qdrant_client.query_points( - collection_name=settings.get_collection_name(), - prefetch=[ - # Dense semantic search - models.Prefetch( - query=dense_embedding, - using="dense", - limit=limit * 2, # Get extra for deduplication - filter=query_filter, - ), - # Sparse BM25 search - models.Prefetch( - query=models.SparseVector( - indices=sparse_embedding["indices"], - values=sparse_embedding["values"], + with trace_operation( + "search.qdrant_query", + attributes={"query.limit": limit * 2, "query.fusion": self.fusion_name}, + ): + search_response = await qdrant_client.query_points( + collection_name=settings.get_collection_name(), + prefetch=[ + # Dense semantic search + models.Prefetch( + query=dense_embedding, + using="dense", + limit=limit * 2, # Get extra for deduplication + filter=query_filter, ), - using="sparse", - limit=limit * 2, # Get extra for deduplication - filter=query_filter, - ), - ], - # Fusion query (RRF or DBSF based on initialization) - query=models.FusionQuery(fusion=self.fusion), - limit=limit * 2, # Get extra for deduplication - score_threshold=score_threshold, - with_payload=True, - with_vectors=False, # Don't return vectors to save bandwidth - ) + # Sparse BM25 search + models.Prefetch( + query=models.SparseVector( + indices=sparse_embedding["indices"], + values=sparse_embedding["values"], + ), + using="sparse", + limit=limit * 2, # Get extra for deduplication + filter=query_filter, + ), + ], + # Fusion query (RRF or DBSF based on initialization) + query=models.FusionQuery(fusion=self.fusion), + limit=limit * 2, # Get extra for deduplication + score_threshold=score_threshold, + with_payload=True, + with_vectors=False, # Don't return vectors to save bandwidth + ) record_qdrant_operation("search", "success") except Exception: record_qdrant_operation("search", "error") @@ -185,47 +196,51 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm): # Deduplicate by (doc_id, doc_type, chunk_start, chunk_end) # This allows multiple chunks from same doc, but removes duplicate chunks - seen_chunks = set() - results = [] + with trace_operation( + "search.deduplicate", + attributes={"dedupe.num_points": len(search_response.points)}, + ): + seen_chunks = set() + results = [] - for result in search_response.points: - # doc_id can be int (notes) or str (files - file paths) - doc_id = result.payload["doc_id"] - doc_type = result.payload.get("doc_type", "note") - chunk_start = result.payload.get("chunk_start_offset") - chunk_end = result.payload.get("chunk_end_offset") - chunk_key = (doc_id, doc_type, chunk_start, chunk_end) + for result in search_response.points: + # doc_id can be int (notes) or str (files - file paths) + doc_id = result.payload["doc_id"] + doc_type = result.payload.get("doc_type", "note") + chunk_start = result.payload.get("chunk_start_offset") + chunk_end = result.payload.get("chunk_end_offset") + chunk_key = (doc_id, doc_type, chunk_start, chunk_end) - # Skip if we've already seen this exact chunk - if chunk_key in seen_chunks: - continue + # Skip if we've already seen this exact chunk + if chunk_key in seen_chunks: + continue - seen_chunks.add(chunk_key) + seen_chunks.add(chunk_key) - # Return unverified results (verification happens at output stage) - results.append( - SearchResult( - id=doc_id, - doc_type=doc_type, - title=result.payload.get("title", "Untitled"), - excerpt=result.payload.get("excerpt", ""), - score=result.score, # Fusion score (RRF or DBSF) - metadata={ - "chunk_index": result.payload.get("chunk_index"), - "total_chunks": result.payload.get("total_chunks"), - "search_method": f"bm25_hybrid_{self.fusion_name}", - }, - chunk_start_offset=result.payload.get("chunk_start_offset"), - chunk_end_offset=result.payload.get("chunk_end_offset"), - page_number=result.payload.get("page_number"), - chunk_index=result.payload.get("chunk_index", 0), - total_chunks=result.payload.get("total_chunks", 1), - point_id=str(result.id), # Qdrant point ID for batch retrieval + # Return unverified results (verification happens at output stage) + results.append( + SearchResult( + id=doc_id, + doc_type=doc_type, + title=result.payload.get("title", "Untitled"), + excerpt=result.payload.get("excerpt", ""), + score=result.score, # Fusion score (RRF or DBSF) + metadata={ + "chunk_index": result.payload.get("chunk_index"), + "total_chunks": result.payload.get("total_chunks"), + "search_method": f"bm25_hybrid_{self.fusion_name}", + }, + chunk_start_offset=result.payload.get("chunk_start_offset"), + chunk_end_offset=result.payload.get("chunk_end_offset"), + page_number=result.payload.get("page_number"), + chunk_index=result.payload.get("chunk_index", 0), + total_chunks=result.payload.get("total_chunks", 1), + point_id=str(result.id), # Qdrant point ID for batch retrieval + ) ) - ) - if len(results) >= limit: - break + if len(results) >= limit: + break logger.info(f"Returning {len(results)} unverified results after deduplication") if results: diff --git a/tests/conftest.py b/tests/conftest.py index 953b1ce..e429f9a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,7 +9,6 @@ import pytest from httpx import HTTPStatusError from mcp import ClientSession from mcp.client.session import RequestContext -from mcp.client.sse import sse_client from mcp.client.streamable_http import streamablehttp_client from mcp.types import ElicitRequestParams, ElicitResult, ErrorData @@ -172,51 +171,6 @@ async def create_mcp_client_session( logger.debug(f"{client_name} client session cleaned up successfully") -async def create_mcp_client_session_sse( - url: str, - token: str | None = None, - client_name: str = "MCP", - elicitation_callback: Any = None, -) -> AsyncGenerator[ClientSession, Any]: - """ - Factory function to create an MCP client session using SSE transport. - - Similar to create_mcp_client_session but uses SSE transport instead of streamable-http. - Uses native async context managers to ensure correct LIFO cleanup order. - - Args: - url: MCP server URL (e.g., "http://localhost:8000/sse") - token: Optional OAuth access token for Bearer authentication - client_name: Client name for logging (e.g., "Basic MCP (SSE)") - elicitation_callback: Optional callback for handling elicitation requests - - Yields: - Initialized MCP ClientSession - - Note: - SSE transport is being deprecated in favor of streamable-http. - This function exists for compatibility testing only. - """ - logger.info(f"Creating SSE client for {client_name}") - - # Prepare headers with OAuth token if provided - headers = {"Authorization": f"Bearer {token}"} if token else None - - # Use native async with - Python ensures LIFO cleanup - # Cleanup order will be: ClientSession.__aexit__ -> sse_client.__aexit__ - # Note: sse_client yields only (read_stream, write_stream), not 3 values like streamablehttp_client - async with sse_client(url, headers=headers) as (read_stream, write_stream): - async with ClientSession( - read_stream, write_stream, elicitation_callback=elicitation_callback - ) as session: - await session.initialize() - logger.info(f"{client_name} client session initialized successfully") - yield session - - # Cleanup happens automatically in LIFO order - no exception suppression needed - logger.debug(f"{client_name} client session cleaned up successfully") - - @pytest.fixture(scope="session") async def nc_client(anyio_backend) -> AsyncGenerator[NextcloudClient, Any]: """ @@ -255,18 +209,10 @@ async def nc_client(anyio_backend) -> AsyncGenerator[NextcloudClient, Any]: @pytest.fixture(scope="session") async def nc_mcp_client(anyio_backend) -> AsyncGenerator[ClientSession, Any]: """ - Fixture to create an MCP client session for integration tests using SSE transport. + Fixture to create an MCP client session for integration tests using streamable-http. Uses anyio pytest plugin for proper async fixture handling. - - Note: SSE transport is being deprecated. This fixture uses SSE for compatibility testing. """ - - # async for session in create_mcp_client_session_sse( - # url="http://localhost:8000/sse", client_name="Basic MCP (SSE)" - # ): - # yield session - async for session in create_mcp_client_session( url="http://localhost:8000/mcp", client_name="Basic MCP (HTTP)",