refactor: Move background tasks to server lifespan and deprecate SSE transport

- Move scanner/processor tasks from FastMCP session lifespan to Starlette
  server lifespan (correct architecture: background tasks run once at
  server level, not per-session)
- Change default CLI transport from SSE to streamable-http
- Remove SSE transport option from CLI (SSE is deprecated)
- Remove SSE client session factory from test fixtures
- Add tracing instrumentation to BM25 hybrid search operations for
  better observability

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-11-23 04:02:30 +01:00
parent 2ab8dad6a5
commit fafeaf3d83
5 changed files with 314 additions and 405 deletions
+12 -101
View File
@@ -555,15 +555,15 @@ async def load_oauth_client_credentials(
@asynccontextmanager @asynccontextmanager
async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]: async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]:
""" """
Manage application lifecycle for BasicAuth mode. Manage application lifecycle for BasicAuth mode (FastMCP session lifespan).
Creates a single Nextcloud client with basic authentication Creates a single Nextcloud client with basic authentication
that is shared across all requests. that is shared across all requests within a session.
If vector sync is enabled (VECTOR_SYNC_ENABLED=true), also starts Note: Background tasks (scanner, processor) are started at server level
background tasks for automatic document indexing (ADR-007). in starlette_lifespan, not here. This lifespan runs per-session.
""" """
logger.info("Starting MCP server in BasicAuth mode") logger.info("Starting MCP session in BasicAuth mode")
logger.info("Creating Nextcloud client with BasicAuth") logger.info("Creating Nextcloud client with BasicAuth")
client = NextcloudClient.from_env() client = NextcloudClient.from_env()
@@ -579,90 +579,11 @@ async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]:
# Initialize document processors # Initialize document processors
initialize_document_processors() initialize_document_processors()
settings = get_settings() # Yield client context - scanner runs at server level (starlette_lifespan)
# Check if vector sync is enabled
if settings.vector_sync_enabled:
logger.info("Vector sync enabled - starting background tasks")
# Get username from environment for BasicAuth mode
username = os.getenv("NEXTCLOUD_USERNAME")
if not username:
raise ValueError(
"NEXTCLOUD_USERNAME is required for vector sync in BasicAuth mode"
)
# Initialize Qdrant collection before starting background tasks
logger.info("Initializing Qdrant collection...")
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
try:
await get_qdrant_client() # Triggers collection creation if needed
logger.info("Qdrant collection ready")
except Exception as e:
logger.error(f"Failed to initialize Qdrant collection: {e}")
raise RuntimeError(
f"Cannot start vector sync - Qdrant initialization failed: {e}"
) from e
# Initialize shared state
send_stream, receive_stream = anyio.create_memory_object_stream(
max_buffer_size=settings.vector_sync_queue_max_size
)
shutdown_event = anyio.Event()
scanner_wake_event = anyio.Event()
# Start background tasks using anyio TaskGroup
async with anyio.create_task_group() as tg:
# Start scanner task
await tg.start(
scanner_task,
send_stream,
shutdown_event,
scanner_wake_event,
client,
username,
)
# Start processor pool (each gets a cloned receive stream)
for i in range(settings.vector_sync_processor_workers):
await tg.start(
processor_task,
i,
receive_stream.clone(),
shutdown_event,
client,
username,
)
logger.info(
f"Background sync tasks started: 1 scanner + {settings.vector_sync_processor_workers} processors"
)
# Yield with background tasks running
try:
yield AppContext(
client=client,
storage=storage,
document_send_stream=send_stream,
document_receive_stream=receive_stream,
shutdown_event=shutdown_event,
scanner_wake_event=scanner_wake_event,
)
finally:
# Shutdown signal
logger.info("Shutting down background sync tasks")
shutdown_event.set()
# TaskGroup automatically cancels all tasks on exit
logger.info("Background sync tasks stopped")
await client.close()
else:
# No vector sync - simple lifecycle
try: try:
yield AppContext(client=client, storage=storage) yield AppContext(client=client, storage=storage)
finally: finally:
logger.info("Shutting down BasicAuth mode") logger.info("Shutting down BasicAuth session")
await client.close() await client.close()
@@ -979,7 +900,7 @@ async def setup_oauth_config():
) )
def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None = None):
# Initialize observability (logging will be configured by uvicorn) # Initialize observability (logging will be configured by uvicorn)
settings = get_settings() settings = get_settings()
@@ -1197,10 +1118,6 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
"Dynamic tool filtering enabled for OAuth mode (JWT and Bearer tokens)" "Dynamic tool filtering enabled for OAuth mode (JWT and Bearer tokens)"
) )
if transport == "sse":
mcp_app = mcp.sse_app()
starlette_lifespan = None
elif transport in ("http", "streamable-http"):
mcp_app = mcp.streamable_http_app() mcp_app = mcp.streamable_http_app()
@asynccontextmanager @asynccontextmanager
@@ -1211,9 +1128,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
mcp_server_url = os.getenv( mcp_server_url = os.getenv(
"NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000" "NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000"
) )
nextcloud_resource_uri = os.getenv( nextcloud_resource_uri = os.getenv("NEXTCLOUD_RESOURCE_URI", nextcloud_host)
"NEXTCLOUD_RESOURCE_URI", nextcloud_host
)
discovery_url = os.getenv( discovery_url = os.getenv(
"OIDC_DISCOVERY_URL", "OIDC_DISCOVERY_URL",
f"{nextcloud_host}/.well-known/openid-configuration", f"{nextcloud_host}/.well-known/openid-configuration",
@@ -1270,8 +1185,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
break break
# Start background vector sync tasks for BasicAuth mode (ADR-007) # Start background vector sync tasks for BasicAuth mode (ADR-007)
# For streamable-http transport, FastMCP lifespan isn't automatically triggered # Scanner runs at server-level (once), not per-session
# so we manually start background tasks here if vector sync is enabled
import anyio as anyio_module import anyio as anyio_module
settings = get_settings() settings = get_settings()
@@ -1285,8 +1199,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
"NEXTCLOUD_USERNAME required for vector sync in BasicAuth mode" "NEXTCLOUD_USERNAME required for vector sync in BasicAuth mode"
) )
# Get Nextcloud client from MCP app context # Create client for vector sync (server-level, not per-session)
# Create client since we're outside FastMCP lifespan
client = NextcloudClient.from_env() client = NextcloudClient.from_env()
# Initialize Qdrant collection before starting background tasks # Initialize Qdrant collection before starting background tasks
@@ -1322,9 +1235,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
route.app.state.document_receive_stream = receive_stream route.app.state.document_receive_stream = receive_stream
route.app.state.shutdown_event = shutdown_event route.app.state.shutdown_event = shutdown_event
route.app.state.scanner_wake_event = scanner_wake_event route.app.state.scanner_wake_event = scanner_wake_event
logger.info( logger.info("Vector sync state shared with browser_app for /app")
"Vector sync state shared with browser_app for /app"
)
break break
# Start background tasks using anyio TaskGroup # Start background tasks using anyio TaskGroup
+38 -1
View File
@@ -22,6 +22,7 @@ from starlette.requests import Request
from starlette.responses import HTMLResponse, JSONResponse from starlette.responses import HTMLResponse, JSONResponse
from nextcloud_mcp_server.config import get_settings from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.observability.tracing import trace_operation
from nextcloud_mcp_server.search import ( from nextcloud_mcp_server.search import (
BM25HybridSearchAlgorithm, BM25HybridSearchAlgorithm,
SemanticSearchAlgorithm, SemanticSearchAlgorithm,
@@ -139,7 +140,10 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
_get_authenticated_client_for_userinfo, _get_authenticated_client_for_userinfo,
) )
async with await _get_authenticated_client_for_userinfo(request) as nc_client: # noqa: F841 with trace_operation("vector_viz.get_auth_client"):
auth_client_ctx = await _get_authenticated_client_for_userinfo(request)
async with auth_client_ctx as nc_client: # noqa: F841
# Create search algorithm (no client needed - verification removed) # Create search algorithm (no client needed - verification removed)
if algorithm == "semantic": if algorithm == "semantic":
search_algo = SemanticSearchAlgorithm(score_threshold=score_threshold) search_algo = SemanticSearchAlgorithm(score_threshold=score_threshold)
@@ -159,6 +163,14 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
all_results = [] all_results = []
if doc_types is None or len(doc_types) == 0: if doc_types is None or len(doc_types) == 0:
# Cross-app search - search all indexed types # Cross-app search - search all indexed types
with trace_operation(
"vector_viz.search_execute",
attributes={
"search.algorithm": algorithm,
"search.limit": limit * 2,
"search.doc_type": "all",
},
):
unverified_results = await search_algo.search( unverified_results = await search_algo.search(
query=query, query=query,
user_id=username, user_id=username,
@@ -170,6 +182,14 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
else: else:
# Search each document type and combine # Search each document type and combine
for doc_type in doc_types: for doc_type in doc_types:
with trace_operation(
"vector_viz.search_execute",
attributes={
"search.algorithm": algorithm,
"search.limit": limit * 2,
"search.doc_type": doc_type,
},
):
unverified_results = await search_algo.search( unverified_results = await search_algo.search(
query=query, query=query,
user_id=username, user_id=username,
@@ -190,6 +210,10 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
# Store original scores and normalize for visualization # Store original scores and normalize for visualization
# (best result = 1.0, worst result = 0.0 within THIS result set) # (best result = 1.0, worst result = 0.0 within THIS result set)
# This makes visual encoding meaningful regardless of RRF normalization # This makes visual encoding meaningful regardless of RRF normalization
with trace_operation(
"vector_viz.score_normalize",
attributes={"normalize.num_results": len(search_results)},
):
if search_results: if search_results:
scores = [r.score for r in search_results] scores = [r.score for r in search_results]
min_score, max_score = min(scores), max(scores) min_score, max_score = min(scores), max(scores)
@@ -220,6 +244,8 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
# Fetch vectors for specific matching chunks from Qdrant using batch retrieve # Fetch vectors for specific matching chunks from Qdrant using batch retrieve
vector_fetch_start = time.perf_counter() vector_fetch_start = time.perf_counter()
with trace_operation("vector_viz.get_qdrant_client"):
qdrant_client = await get_qdrant_client() qdrant_client = await get_qdrant_client()
chunk_vectors_map = {} # Map (doc_id, chunk_start, chunk_end) -> vector chunk_vectors_map = {} # Map (doc_id, chunk_start, chunk_end) -> vector
@@ -231,6 +257,10 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
if point_ids: if point_ids:
# Single batch retrieve call instead of N sequential scroll calls # Single batch retrieve call instead of N sequential scroll calls
# This is ~50x faster for 50 results (1 HTTP request vs 50) # This is ~50x faster for 50 results (1 HTTP request vs 50)
with trace_operation(
"vector_viz.vector_retrieve",
attributes={"retrieve.num_points": len(point_ids)},
):
points_response = await qdrant_client.retrieve( points_response = await qdrant_client.retrieve(
collection_name=settings.get_collection_name(), collection_name=settings.get_collection_name(),
ids=point_ids, ids=point_ids,
@@ -367,6 +397,13 @@ async def vector_visualization_search(request: Request) -> JSONResponse:
import anyio import anyio
with trace_operation(
"vector_viz.pca_compute",
attributes={
"pca.num_vectors": len(all_vectors_normalized),
"pca.embedding_dim": embedding_dim,
},
):
coords_3d, pca = await anyio.to_thread.run_sync( # type: ignore[attr-defined] coords_3d, pca = await anyio.to_thread.run_sync( # type: ignore[attr-defined]
lambda: _compute_pca(all_vectors_normalized) lambda: _compute_pca(all_vectors_normalized)
) )
+2 -2
View File
@@ -29,9 +29,9 @@ from .app import get_app
@click.option( @click.option(
"--transport", "--transport",
"-t", "-t",
default="sse", default="streamable-http",
show_default=True, show_default=True,
type=click.Choice(["sse", "streamable-http", "http"]), type=click.Choice(["streamable-http", "http"]),
help="MCP transport protocol", help="MCP transport protocol",
) )
@click.option( @click.option(
@@ -9,6 +9,7 @@ from qdrant_client.models import FieldCondition, Filter, MatchValue
from nextcloud_mcp_server.config import get_settings from nextcloud_mcp_server.config import get_settings
from nextcloud_mcp_server.embedding import get_bm25_service, get_embedding_service from nextcloud_mcp_server.embedding import get_bm25_service, get_embedding_service
from nextcloud_mcp_server.observability.metrics import record_qdrant_operation from nextcloud_mcp_server.observability.metrics import record_qdrant_operation
from nextcloud_mcp_server.observability.tracing import trace_operation
from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult from nextcloud_mcp_server.search.algorithms import SearchAlgorithm, SearchResult
from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
@@ -99,14 +100,18 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
) )
# Generate dense embedding for semantic search # Generate dense embedding for semantic search
with trace_operation("search.get_embedding_service"):
embedding_service = get_embedding_service() embedding_service = get_embedding_service()
with trace_operation("search.dense_embedding"):
dense_embedding = await embedding_service.embed(query) dense_embedding = await embedding_service.embed(query)
# Store for reuse by callers (e.g., viz_routes PCA visualization) # Store for reuse by callers (e.g., viz_routes PCA visualization)
self.query_embedding = dense_embedding self.query_embedding = dense_embedding
logger.debug(f"Generated dense embedding (dimension={len(dense_embedding)})") logger.debug(f"Generated dense embedding (dimension={len(dense_embedding)})")
# Generate sparse embedding for BM25 keyword search # Generate sparse embedding for BM25 keyword search
with trace_operation("search.get_bm25_service"):
bm25_service = get_bm25_service() bm25_service = get_bm25_service()
with trace_operation("search.sparse_embedding_bm25"):
sparse_embedding = await bm25_service.encode_async(query) sparse_embedding = await bm25_service.encode_async(query)
logger.debug( logger.debug(
f"Generated sparse embedding " f"Generated sparse embedding "
@@ -134,10 +139,16 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
query_filter = Filter(must=filter_conditions) query_filter = Filter(must=filter_conditions)
# Execute hybrid search with Qdrant native RRF fusion # Execute hybrid search with Qdrant native RRF fusion
with trace_operation("search.get_qdrant_client"):
qdrant_client = await get_qdrant_client() qdrant_client = await get_qdrant_client()
try: try:
# Use prefetch to run both dense and sparse searches # Use prefetch to run both dense and sparse searches
# Qdrant will automatically merge results using RRF # Qdrant will automatically merge results using RRF
with trace_operation(
"search.qdrant_query",
attributes={"query.limit": limit * 2, "query.fusion": self.fusion_name},
):
search_response = await qdrant_client.query_points( search_response = await qdrant_client.query_points(
collection_name=settings.get_collection_name(), collection_name=settings.get_collection_name(),
prefetch=[ prefetch=[
@@ -185,6 +196,10 @@ class BM25HybridSearchAlgorithm(SearchAlgorithm):
# Deduplicate by (doc_id, doc_type, chunk_start, chunk_end) # Deduplicate by (doc_id, doc_type, chunk_start, chunk_end)
# This allows multiple chunks from same doc, but removes duplicate chunks # This allows multiple chunks from same doc, but removes duplicate chunks
with trace_operation(
"search.deduplicate",
attributes={"dedupe.num_points": len(search_response.points)},
):
seen_chunks = set() seen_chunks = set()
results = [] results = []
+1 -55
View File
@@ -9,7 +9,6 @@ import pytest
from httpx import HTTPStatusError from httpx import HTTPStatusError
from mcp import ClientSession from mcp import ClientSession
from mcp.client.session import RequestContext from mcp.client.session import RequestContext
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client from mcp.client.streamable_http import streamablehttp_client
from mcp.types import ElicitRequestParams, ElicitResult, ErrorData from mcp.types import ElicitRequestParams, ElicitResult, ErrorData
@@ -172,51 +171,6 @@ async def create_mcp_client_session(
logger.debug(f"{client_name} client session cleaned up successfully") logger.debug(f"{client_name} client session cleaned up successfully")
async def create_mcp_client_session_sse(
url: str,
token: str | None = None,
client_name: str = "MCP",
elicitation_callback: Any = None,
) -> AsyncGenerator[ClientSession, Any]:
"""
Factory function to create an MCP client session using SSE transport.
Similar to create_mcp_client_session but uses SSE transport instead of streamable-http.
Uses native async context managers to ensure correct LIFO cleanup order.
Args:
url: MCP server URL (e.g., "http://localhost:8000/sse")
token: Optional OAuth access token for Bearer authentication
client_name: Client name for logging (e.g., "Basic MCP (SSE)")
elicitation_callback: Optional callback for handling elicitation requests
Yields:
Initialized MCP ClientSession
Note:
SSE transport is being deprecated in favor of streamable-http.
This function exists for compatibility testing only.
"""
logger.info(f"Creating SSE client for {client_name}")
# Prepare headers with OAuth token if provided
headers = {"Authorization": f"Bearer {token}"} if token else None
# Use native async with - Python ensures LIFO cleanup
# Cleanup order will be: ClientSession.__aexit__ -> sse_client.__aexit__
# Note: sse_client yields only (read_stream, write_stream), not 3 values like streamablehttp_client
async with sse_client(url, headers=headers) as (read_stream, write_stream):
async with ClientSession(
read_stream, write_stream, elicitation_callback=elicitation_callback
) as session:
await session.initialize()
logger.info(f"{client_name} client session initialized successfully")
yield session
# Cleanup happens automatically in LIFO order - no exception suppression needed
logger.debug(f"{client_name} client session cleaned up successfully")
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
async def nc_client(anyio_backend) -> AsyncGenerator[NextcloudClient, Any]: async def nc_client(anyio_backend) -> AsyncGenerator[NextcloudClient, Any]:
""" """
@@ -255,18 +209,10 @@ async def nc_client(anyio_backend) -> AsyncGenerator[NextcloudClient, Any]:
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
async def nc_mcp_client(anyio_backend) -> AsyncGenerator[ClientSession, Any]: async def nc_mcp_client(anyio_backend) -> AsyncGenerator[ClientSession, Any]:
""" """
Fixture to create an MCP client session for integration tests using SSE transport. Fixture to create an MCP client session for integration tests using streamable-http.
Uses anyio pytest plugin for proper async fixture handling. Uses anyio pytest plugin for proper async fixture handling.
Note: SSE transport is being deprecated. This fixture uses SSE for compatibility testing.
""" """
# async for session in create_mcp_client_session_sse(
# url="http://localhost:8000/sse", client_name="Basic MCP (SSE)"
# ):
# yield session
async for session in create_mcp_client_session( async for session in create_mcp_client_session(
url="http://localhost:8000/mcp", url="http://localhost:8000/mcp",
client_name="Basic MCP (HTTP)", client_name="Basic MCP (HTTP)",