from __future__ import annotations import base64 import json import logging import os import time import traceback from collections.abc import AsyncIterator from contextlib import AsyncExitStack, asynccontextmanager from contextvars import ContextVar from dataclasses import dataclass from typing import Optional, cast from urllib.parse import parse_qs, urlparse import anyio import click import httpx from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from mcp.server.auth.settings import AuthSettings from mcp.server.fastmcp import Context, FastMCP from mcp.server.transport_security import TransportSecuritySettings from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor from pydantic import AnyHttpUrl from starlette.applications import Starlette from starlette.middleware.authentication import AuthenticationMiddleware from starlette.middleware.cors import CORSMiddleware from starlette.responses import JSONResponse, RedirectResponse from starlette.routing import Mount, Route from starlette.staticfiles import StaticFiles from starlette.types import ASGIApp, Receive, Send from starlette.types import Scope as StarletteScope from nextcloud_mcp_server.api import ( create_webhook, delete_app_password, delete_webhook, get_app_password_status, get_chunk_context, get_installed_apps, get_pdf_preview, get_server_status, get_user_access, get_user_session, get_vector_sync_status, list_supported_scopes, list_webhooks, provision_app_password, revoke_user_access, unified_search, update_user_scopes, vector_search, ) from nextcloud_mcp_server.auth import ( InsufficientScopeError, discover_all_scopes, get_access_token_scopes, has_required_scopes, is_jwt_token, ) from nextcloud_mcp_server.auth.browser_oauth_routes import ( oauth_login, oauth_login_callback, oauth_logout, ) from nextcloud_mcp_server.auth.client_registration import ensure_oauth_client from nextcloud_mcp_server.auth.keycloak_oauth import KeycloakOAuthClient from nextcloud_mcp_server.auth.oauth_routes import ( oauth_as_metadata, oauth_authorize, oauth_authorize_nextcloud, oauth_callback, oauth_callback_nextcloud, oauth_register_proxy, oauth_token_endpoint, ) from nextcloud_mcp_server.auth.session_backend import SessionAuthBackend from nextcloud_mcp_server.auth.storage import RefreshTokenStorage, get_shared_storage from nextcloud_mcp_server.auth.token_broker import TokenBrokerService from nextcloud_mcp_server.auth.unified_verifier import UnifiedTokenVerifier from nextcloud_mcp_server.auth.userinfo_routes import ( revoke_session, user_info_html, vector_sync_status_fragment, ) from nextcloud_mcp_server.auth.viz_routes import ( chunk_context_endpoint, vector_visualization_html, vector_visualization_search, ) from nextcloud_mcp_server.auth.webhook_routes import ( disable_webhook_preset, enable_webhook_preset, webhook_management_pane, ) from nextcloud_mcp_server.client import NextcloudClient from nextcloud_mcp_server.config import ( DeploymentMode, Settings, get_document_processor_config, get_settings, ) from nextcloud_mcp_server.config_validators import ( AuthMode, get_mode_summary, validate_configuration, ) from nextcloud_mcp_server.context import get_client as get_nextcloud_client from nextcloud_mcp_server.document_processors import get_registry from nextcloud_mcp_server.http import nextcloud_httpx_client from nextcloud_mcp_server.observability import ( ObservabilityMiddleware, setup_metrics, setup_tracing, ) from nextcloud_mcp_server.observability.metrics import ( record_dependency_check, set_dependency_health, ) from nextcloud_mcp_server.server import ( configure_calendar_tools, configure_contacts_tools, configure_cookbook_tools, configure_deck_tools, configure_news_tools, configure_notes_tools, configure_semantic_tools, configure_sharing_tools, configure_tables_tools, configure_webdav_tools, ) from nextcloud_mcp_server.server.auth_tools import register_auth_tools from nextcloud_mcp_server.server.oauth_tools import register_oauth_tools from nextcloud_mcp_server.vector import processor_task, scanner_task from nextcloud_mcp_server.vector.oauth_sync import ( oauth_processor_task, user_manager_task, ) from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client logger = logging.getLogger(__name__) HTTPXClientInstrumentor().instrument() def initialize_document_processors(): """Initialize and register document processors based on configuration. This function reads the environment configuration and registers available processors (Unstructured, Tesseract, Custom HTTP) with the global registry. """ config = get_document_processor_config() if not config["enabled"]: logger.info("Document processing disabled") return registry = get_registry() registered_count = 0 # Register Unstructured processor if "unstructured" in config["processors"]: unst_config = config["processors"]["unstructured"] try: from nextcloud_mcp_server.document_processors.unstructured import ( # noqa: PLC0415 UnstructuredProcessor, ) processor = UnstructuredProcessor( api_url=unst_config["api_url"], timeout=unst_config["timeout"], default_strategy=unst_config["strategy"], default_languages=unst_config["languages"], progress_interval=unst_config.get("progress_interval", 10), ) registry.register(processor, priority=10) logger.info(f"Registered Unstructured processor: {unst_config['api_url']}") registered_count += 1 except Exception as e: logger.warning(f"Failed to register Unstructured processor: {e}") # Register Tesseract processor if "tesseract" in config["processors"]: tess_config = config["processors"]["tesseract"] try: from nextcloud_mcp_server.document_processors.tesseract import ( # noqa: PLC0415 TesseractProcessor, ) processor = TesseractProcessor( tesseract_cmd=tess_config.get("tesseract_cmd"), default_lang=tess_config["lang"], ) registry.register(processor, priority=5) logger.info(f"Registered Tesseract processor: lang={tess_config['lang']}") registered_count += 1 except Exception as e: logger.warning(f"Failed to register Tesseract processor: {e}") # Register PyMuPDF processor (high priority, local, no API required) if "pymupdf" in config["processors"]: pymupdf_config = config["processors"]["pymupdf"] try: from nextcloud_mcp_server.document_processors.pymupdf import ( # noqa: PLC0415 PyMuPDFProcessor, ) processor = PyMuPDFProcessor( extract_images=pymupdf_config.get("extract_images", True), image_dir=pymupdf_config.get("image_dir"), ) registry.register(processor, priority=15) # Higher than unstructured logger.info( f"Registered PyMuPDF processor: extract_images={pymupdf_config.get('extract_images', True)}" ) registered_count += 1 except Exception as e: logger.warning(f"Failed to register PyMuPDF processor: {e}") # Register custom processor if "custom" in config["processors"]: custom_config = config["processors"]["custom"] try: from nextcloud_mcp_server.document_processors.custom_http import ( # noqa: PLC0415 CustomHTTPProcessor, ) processor = CustomHTTPProcessor( name=custom_config["name"], api_url=custom_config["api_url"], api_key=custom_config.get("api_key"), timeout=custom_config["timeout"], supported_types=custom_config["supported_types"], ) registry.register(processor, priority=1) logger.info( f"Registered Custom processor '{custom_config['name']}': {custom_config['api_url']}" ) registered_count += 1 except Exception as e: logger.warning(f"Failed to register Custom processor: {e}") if registered_count > 0: logger.info( f"Document processing initialized with {registered_count} processor(s): " f"{', '.join(registry.list_processors())}" ) else: logger.warning("Document processing enabled but no processors registered") def validate_pkce_support(discovery: dict, discovery_url: str) -> None: """ Validate that the OIDC provider properly advertises PKCE support. According to RFC 8414, if code_challenge_methods_supported is absent, it means the authorization server does not support PKCE. MCP clients require PKCE with S256 and will refuse to connect if this field is missing or doesn't include S256. """ code_challenge_methods = discovery.get("code_challenge_methods_supported") if code_challenge_methods is None: click.echo("=" * 80, err=True) click.echo( "ERROR: OIDC CONFIGURATION ERROR - Missing PKCE Support Advertisement", err=True, ) click.echo("=" * 80, err=True) click.echo(f"Discovery URL: {discovery_url}", err=True) click.echo("", err=True) click.echo( "The OIDC discovery document is missing 'code_challenge_methods_supported'.", err=True, ) click.echo( "According to RFC 8414, this means the server does NOT support PKCE.", err=True, ) click.echo("", err=True) click.echo("⚠️ MCP clients (like Claude Code) WILL REJECT this provider!") click.echo("", err=True) click.echo("How to fix:", err=True) click.echo( " 1. Ensure PKCE is enabled in Nextcloud OIDC app settings", err=True ) click.echo( " 2. Update the OIDC app to advertise PKCE support in discovery", err=True ) click.echo(" 3. See: RFC 8414 Section 2 (Authorization Server Metadata)") click.echo("=" * 80, err=True) click.echo("", err=True) return if "S256" not in code_challenge_methods: click.echo("=" * 80, err=True) click.echo( "WARNING: OIDC CONFIGURATION WARNING - S256 Challenge Method Not Advertised", err=True, ) click.echo("=" * 80, err=True) click.echo(f"Discovery URL: {discovery_url}", err=True) click.echo(f"Advertised methods: {code_challenge_methods}", err=True) click.echo("", err=True) click.echo("MCP specification requires S256 code challenge method.", err=True) click.echo("Some clients may reject this provider.", err=True) click.echo("=" * 80, err=True) click.echo("", err=True) return click.echo(f"✓ PKCE support validated: {code_challenge_methods}") @dataclass class VectorSyncState: """ Module-level state for vector sync background tasks. This singleton bridges the Starlette server lifespan (where background tasks run) and FastMCP session lifespans (where MCP tools need access to the streams). """ document_send_stream: Optional[MemoryObjectSendStream] = None document_receive_stream: Optional[MemoryObjectReceiveStream] = None shutdown_event: Optional[anyio.Event] = None scanner_wake_event: Optional[anyio.Event] = None # Module-level singleton for vector sync state _vector_sync_state = VectorSyncState() @dataclass class AppContext: """Application context for BasicAuth mode.""" client: NextcloudClient storage: Optional["RefreshTokenStorage"] = None document_send_stream: Optional[MemoryObjectSendStream] = None document_receive_stream: Optional[MemoryObjectReceiveStream] = None shutdown_event: Optional[anyio.Event] = None scanner_wake_event: Optional[anyio.Event] = None @dataclass class OAuthAppContext: """Application context for OAuth mode.""" nextcloud_host: str token_verifier: object # UnifiedTokenVerifier (ADR-005 compliant) refresh_token_storage: Optional["RefreshTokenStorage"] = None oauth_client: Optional[object] = None # NextcloudOAuthClient or KeycloakOAuthClient oauth_provider: str = "nextcloud" # "nextcloud" or "keycloak" server_client_id: Optional[str] = ( None # MCP server's OAuth client ID (static or DCR) ) @dataclass class SmitheryAppContext: """Application context for Smithery stateless mode. ADR-016: No shared client - clients created per-request from session config. """ pass # No shared state needed - everything comes from session config # ADR-016: Smithery config schema for container runtime # This schema is served at /.well-known/mcp-config for Smithery discovery # See: https://smithery.ai/docs/build/session-config SMITHERY_CONFIG_SCHEMA = { "$schema": "http://json-schema.org/draft-07/schema#", "$id": "https://server.smithery.ai/nextcloud-mcp-server/.well-known/mcp-config", "title": "Nextcloud MCP Server Configuration", "description": "Configuration for connecting to your Nextcloud instance via app password authentication", "x-query-style": "flat", # Our schema has no nested objects, so flat style works "type": "object", "required": ["nextcloud_url", "username", "app_password"], "properties": { "nextcloud_url": { "type": "string", "title": "Nextcloud URL", "description": "Your Nextcloud instance URL (e.g., https://cloud.example.com). Must be publicly accessible.", "pattern": "^https?://.+", }, "username": { "type": "string", "title": "Username", "description": "Your Nextcloud username", "minLength": 1, }, "app_password": { "type": "string", "title": "App Password", "description": "Nextcloud app password. Generate at Settings > Security > App passwords. Do NOT use your main password.", "minLength": 1, }, }, "additionalProperties": False, } # ADR-016: Context variable to hold Smithery session config per-request # This is set by SmitheryConfigMiddleware and accessed in context.py _smithery_session_config: ContextVar[dict[str, str] | None] = ContextVar( "smithery_session_config" ) _smithery_session_config.set(None) # Set initial value def get_smithery_session_config() -> dict | None: """Get the current Smithery session config from context variable. Used by context.py to access config extracted from URL query parameters. """ return _smithery_session_config.get() class BasicAuthMiddleware: """Middleware to extract BasicAuth credentials from Authorization header. For multi-user BasicAuth pass-through mode, this middleware extracts username/password from the Authorization: Basic header and stores them in the request state for use by the context layer. The credentials are NOT stored persistently - they are passed through directly to Nextcloud APIs for each request (stateless). """ def __init__(self, app: ASGIApp): self.app = app async def __call__( self, scope: StarletteScope, receive: Receive, send: Send ) -> None: if scope["type"] == "http": # Extract Authorization header headers = dict(scope.get("headers", [])) auth_header = headers.get(b"authorization", b"") if auth_header.startswith(b"Basic "): try: # Decode base64(username:password) encoded = auth_header[6:] # Skip "Basic " decoded = base64.b64decode(encoded).decode("utf-8") username, password = decoded.split(":", 1) # Store in request state scope.setdefault("state", {}) scope["state"]["basic_auth"] = { "username": username, "password": password, } logger.debug( f"BasicAuth credentials extracted for user: {username}" ) except Exception as e: logger.warning(f"Failed to extract BasicAuth credentials: {e}") await self.app(scope, receive, send) class SmitheryConfigMiddleware: """Middleware to extract Smithery config from URL query parameters. ADR-016: For container runtime, Smithery passes configuration as URL query parameters to the /mcp endpoint. This middleware extracts those parameters and stores them in a context variable for access in tools. Configuration parameters: - nextcloud_url: Nextcloud instance URL - username: Nextcloud username - app_password: Nextcloud app password The extracted config is stored in a ContextVar and can be accessed via get_smithery_session_config() in context.py. """ def __init__(self, app: ASGIApp): self.app = app async def __call__( self, scope: StarletteScope, receive: Receive, send: Send ) -> None: if scope["type"] == "http": # Extract config from query parameters query_string = scope.get("query_string", b"").decode("utf-8") params = parse_qs(query_string) # Build session config from query parameters # Smithery uses dot notation for nested objects, but our schema is flat session_config = {} for key in ["nextcloud_url", "username", "app_password"]: if key in params: # parse_qs returns lists, take first value session_config[key] = params[key][0] # Store in context variable for access by context.py if session_config: _smithery_session_config.set(session_config) logger.debug( f"Smithery config extracted: nextcloud_url={session_config.get('nextcloud_url')}, " f"username={session_config.get('username')}" ) try: await self.app(scope, receive, send) finally: # Clear context variable after request _smithery_session_config.set(None) @asynccontextmanager async def app_lifespan_smithery(server: FastMCP) -> AsyncIterator[SmitheryAppContext]: """ Manage application lifecycle for Smithery stateless mode. ADR-016: Minimal lifespan with no shared state. - No shared Nextcloud client (created per-request from session config) - No vector sync (disabled in Smithery mode) - No persistent storage (stateless deployment) - No document processors (not enabled in Smithery mode) """ logger.info("Starting MCP server in Smithery stateless mode") logger.info("Clients will be created per-request from session config") try: yield SmitheryAppContext() finally: logger.info("Shutting down Smithery stateless mode") async def load_oauth_client_credentials( nextcloud_host: str, registration_endpoint: str | None ) -> tuple[str, str]: """ Load OAuth client credentials from environment, storage file, or dynamic registration. This consolidates the client loading logic that was duplicated across multiple functions. Args: nextcloud_host: Nextcloud instance URL registration_endpoint: Dynamic registration endpoint URL (or None if not available) Returns: Tuple of (client_id, client_secret) Raises: ValueError: If credentials cannot be obtained """ # Try environment variables first client_id = os.getenv("NEXTCLOUD_OIDC_CLIENT_ID") client_secret = os.getenv("NEXTCLOUD_OIDC_CLIENT_SECRET") if client_id and client_secret: logger.info("Using pre-configured OAuth client credentials from environment") return (client_id, client_secret) # Try loading from SQLite storage try: storage = RefreshTokenStorage.from_env() await storage.initialize() client_data = await storage.get_oauth_client() if client_data: logger.info( f"Loaded OAuth client from SQLite: {client_data['client_id'][:16]}..." ) return (client_data["client_id"], client_data["client_secret"]) except ValueError: # TOKEN_ENCRYPTION_KEY not set, skip SQLite storage check logger.debug("SQLite storage not available (TOKEN_ENCRYPTION_KEY not set)") # Try dynamic registration if available if registration_endpoint: logger.info("Dynamic client registration available") mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000") redirect_uris = [ f"{mcp_server_url}/oauth/callback", # Unified callback (flow determined by query param) ] # MCP server DCR: Register with ALL supported scopes # When we register as a resource server (with resource_url), the allowed_scopes # represent what scopes are AVAILABLE for this resource, not what the server needs. # External clients will request tokens with resource=http://localhost:8001/mcp # and the authorization server will limit them to these allowed scopes. # # The PRM endpoint advertises the same scopes dynamically via @require_scopes decorators. dcr_scopes = "openid profile email notes:read notes:write calendar:read calendar:write todo:read todo:write contacts:read contacts:write cookbook:read cookbook:write deck:read deck:write tables:read tables:write files:read files:write sharing:read sharing:write news:read news:write" # Add offline_access scope if refresh tokens are enabled # Use settings.enable_offline_access which handles both ENABLE_BACKGROUND_OPERATIONS (new) # and ENABLE_OFFLINE_ACCESS (deprecated) environment variables dcr_settings = get_settings() enable_offline_access = dcr_settings.enable_offline_access if enable_offline_access: dcr_scopes = f"{dcr_scopes} offline_access" logger.info("✓ offline_access scope enabled for refresh tokens") logger.info(f"MCP server DCR scopes (resource server): {dcr_scopes}") # Get token type from environment (Bearer or jwt) # Note: Must be lowercase "jwt" to match OIDC app's check token_type = os.getenv("NEXTCLOUD_OIDC_TOKEN_TYPE", "Bearer").lower() # Special case: "bearer" should remain capitalized for compatibility if token_type != "jwt": token_type = "Bearer" logger.info(f"Requesting token type: {token_type}") # Ensure OAuth client in SQLite storage storage = RefreshTokenStorage.from_env() await storage.initialize() # RFC 9728: resource_url must be a URL for the protected resource # This URL is used by token introspection to match tokens to this client resource_url = f"{mcp_server_url}/mcp" client_info = await ensure_oauth_client( nextcloud_url=nextcloud_host, registration_endpoint=registration_endpoint, storage=storage, client_name=f"Nextcloud MCP Server ({token_type})", redirect_uris=redirect_uris, scopes=dcr_scopes, # Use DCR-specific scopes (basic OIDC only) token_type=token_type, resource_url=resource_url, # RFC 9728 Protected Resource URL ) logger.info(f"OAuth client ready: {client_info.client_id[:16]}...") return (client_info.client_id, client_info.client_secret) # No credentials available raise ValueError( "OAuth mode requires either:\n" "1. NEXTCLOUD_OIDC_CLIENT_ID and NEXTCLOUD_OIDC_CLIENT_SECRET environment variables, OR\n" "2. Pre-existing client credentials in SQLite storage (TOKEN_STORAGE_DB), OR\n" "3. Dynamic client registration enabled on Nextcloud OIDC app\n\n" "Note: TOKEN_ENCRYPTION_KEY is required for SQLite storage" ) @asynccontextmanager async def app_lifespan_basic(server: FastMCP) -> AsyncIterator[AppContext]: """ Manage application lifecycle for BasicAuth mode (FastMCP session lifespan). For single-user mode: Creates a single Nextcloud client with basic authentication that is shared across all requests within a session. For multi-user mode: No shared client - clients created per-request by BasicAuthMiddleware. Note: Background tasks (scanner, processor) are started at server level in starlette_lifespan, not here. This lifespan runs per-session. """ settings = get_settings() is_multi_user = settings.enable_multi_user_basic_auth logger.info( f"Starting MCP session in {'multi-user' if is_multi_user else 'single-user'} BasicAuth mode" ) # Only create shared client for single-user mode client = None if not is_multi_user: logger.info("Creating shared Nextcloud client with BasicAuth") client = NextcloudClient.from_env() logger.info("Client initialization complete") else: logger.info( "Multi-user mode - clients created per-request from BasicAuth headers" ) # Initialize persistent storage (for webhook tracking and future features) storage = RefreshTokenStorage.from_env() await storage.initialize() logger.info("Persistent storage initialized (webhook tracking enabled)") # Initialize document processors initialize_document_processors() # Yield client context - scanner runs at server level (starlette_lifespan) # Include vector sync state from module singleton (set by starlette_lifespan) try: yield AppContext( client=client, # type: ignore[arg-type] # None in multi-user mode storage=storage, document_send_stream=_vector_sync_state.document_send_stream, document_receive_stream=_vector_sync_state.document_receive_stream, shutdown_event=_vector_sync_state.shutdown_event, scanner_wake_event=_vector_sync_state.scanner_wake_event, ) finally: logger.info("Shutting down BasicAuth session") if client is not None: await client.close() async def setup_oauth_config(): """ Setup OAuth configuration by performing OIDC discovery and client registration. Auto-detects OAuth provider mode: - Integrated mode: OIDC_DISCOVERY_URL points to NEXTCLOUD_HOST (or not set) → Nextcloud OIDC app provides both OAuth and API access - External IdP mode: OIDC_DISCOVERY_URL points to external provider → External IdP for OAuth, Nextcloud user_oidc validates tokens and provides API access Uses OIDC environment variables: - OIDC_DISCOVERY_URL: OIDC discovery endpoint (optional, defaults to NEXTCLOUD_HOST) - NEXTCLOUD_OIDC_CLIENT_ID / NEXTCLOUD_OIDC_CLIENT_SECRET: Static credentials (optional, uses DCR if not provided) - NEXTCLOUD_OIDC_SCOPES: Requested OAuth scopes This is done synchronously before FastMCP initialization because FastMCP requires token_verifier at construction time. Returns: Tuple of (nextcloud_host, token_verifier, auth_settings, refresh_token_storage, oauth_client, oauth_provider, client_id, client_secret) """ # Get settings for enable_offline_access check (handles both ENABLE_BACKGROUND_OPERATIONS # and ENABLE_OFFLINE_ACCESS environment variables) settings = get_settings() nextcloud_host = os.getenv("NEXTCLOUD_HOST") if not nextcloud_host: raise ValueError( "NEXTCLOUD_HOST environment variable is required for OAuth mode" ) nextcloud_host = nextcloud_host.rstrip("/") # Get OIDC discovery URL (defaults to Nextcloud integrated mode) discovery_url = os.getenv( "OIDC_DISCOVERY_URL", f"{nextcloud_host}/.well-known/openid-configuration" ) logger.info(f"Performing OIDC discovery: {discovery_url}") # Perform OIDC discovery async with nextcloud_httpx_client(follow_redirects=True) as client: response = await client.get(discovery_url) response.raise_for_status() discovery = response.json() logger.info("✓ OIDC discovery successful") # Validate PKCE support validate_pkce_support(discovery, discovery_url) # Extract OIDC endpoints issuer = discovery["issuer"] userinfo_uri = discovery["userinfo_endpoint"] jwks_uri = discovery.get("jwks_uri") introspection_uri = discovery.get("introspection_endpoint") registration_endpoint = discovery.get("registration_endpoint") logger.info("OIDC endpoints discovered:") logger.info(f" Issuer: {issuer}") logger.info(f" Userinfo: {userinfo_uri}") if jwks_uri: logger.info(f" JWKS: {jwks_uri}") if introspection_uri: logger.info(f" Introspection: {introspection_uri}") # Auto-detect provider mode based on issuer # External IdP mode: issuer doesn't match Nextcloud host # Normalize URLs for comparison (handle port differences like :80 for HTTP) def normalize_url(url: str) -> str: """Normalize URL by removing default ports (80 for HTTP, 443 for HTTPS).""" parsed = urlparse(url) # Remove default ports if (parsed.scheme == "http" and parsed.port == 80) or ( parsed.scheme == "https" and parsed.port == 443 ): # Remove explicit default port hostname = parsed.hostname or parsed.netloc.split(":")[0] return f"{parsed.scheme}://{hostname}" return f"{parsed.scheme}://{parsed.netloc}" issuer_normalized = normalize_url(issuer) nextcloud_normalized = normalize_url(nextcloud_host) # Determine if this is an external IdP by comparing discovered issuer with Nextcloud host is_external_idp = not issuer_normalized.startswith(nextcloud_normalized) if is_external_idp: oauth_provider = "external" # Could be Keycloak, Auth0, Okta, etc. logger.info( f"✓ Detected external IdP mode (issuer: {issuer} != Nextcloud: {nextcloud_host})" ) logger.info(" Tokens will be validated via Nextcloud user_oidc app") else: oauth_provider = "nextcloud" logger.info("✓ Detected integrated mode (Nextcloud OIDC app)") # Check if offline access (refresh tokens) is enabled # Use settings.enable_offline_access which handles both ENABLE_BACKGROUND_OPERATIONS (new) # and ENABLE_OFFLINE_ACCESS (deprecated) environment variables enable_offline_access = settings.enable_offline_access # Initialize refresh token storage if enabled refresh_token_storage = None if enable_offline_access: try: # Validate encryption key before initializing encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY") if not encryption_key: logger.warning( "ENABLE_OFFLINE_ACCESS=true but TOKEN_ENCRYPTION_KEY not set. " "Refresh tokens will NOT be stored. Generate a key with:\n" ' python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"' ) else: refresh_token_storage = RefreshTokenStorage.from_env() await refresh_token_storage.initialize() logger.info( "✓ Refresh token storage initialized (offline_access enabled)" ) except Exception as e: logger.error(f"Failed to initialize refresh token storage: {e}") logger.warning( "Continuing without refresh token storage - users will need to re-authenticate after token expiration" ) # Load client credentials (static or dynamic registration) client_id = os.getenv("NEXTCLOUD_OIDC_CLIENT_ID") client_secret = os.getenv("NEXTCLOUD_OIDC_CLIENT_SECRET") if client_id and client_secret: logger.info(f"Using static OIDC client credentials: {client_id}") elif registration_endpoint: logger.info( "NEXTCLOUD_OIDC_CLIENT_ID not set, attempting Dynamic Client Registration" ) client_id, client_secret = await load_oauth_client_credentials( nextcloud_host=nextcloud_host, registration_endpoint=registration_endpoint ) else: raise ValueError( "NEXTCLOUD_OIDC_CLIENT_ID and NEXTCLOUD_OIDC_CLIENT_SECRET environment variables are required " "when the OIDC provider does not support Dynamic Client Registration. " f"Discovery URL: {discovery_url}" ) # ADR-005: Unified Token Verifier with proper audience validation # Use public issuer URL for JWT validation if set (handles Docker internal/external URL mismatch) # Tokens are issued with the public URL, but OIDC discovery returns internal URL public_issuer_url = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL") client_issuer = public_issuer_url if public_issuer_url else issuer # Get MCP server URL for audience validation mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000") nextcloud_resource_uri = os.getenv("NEXTCLOUD_RESOURCE_URI", nextcloud_host) # Warn if resource URIs are not configured (required for ADR-005 compliance) if not os.getenv("NEXTCLOUD_MCP_SERVER_URL"): logger.warning( f"NEXTCLOUD_MCP_SERVER_URL not set, defaulting to: {mcp_server_url}. " "This should be set explicitly for proper audience validation." ) if not os.getenv("NEXTCLOUD_RESOURCE_URI"): logger.warning( f"NEXTCLOUD_RESOURCE_URI not set, defaulting to: {nextcloud_resource_uri}. " "This should be set explicitly for proper audience validation." ) # Create settings for UnifiedTokenVerifier (use same settings instance from start of function) # settings is already set at the start of setup_oauth_config() # Override with discovered values if not set in environment if not settings.oidc_client_id: settings.oidc_client_id = client_id if not settings.oidc_client_secret: settings.oidc_client_secret = client_secret if not settings.jwks_uri: settings.jwks_uri = jwks_uri if not settings.introspection_uri: settings.introspection_uri = introspection_uri if not settings.userinfo_uri: settings.userinfo_uri = userinfo_uri if not settings.oidc_issuer: # Use client_issuer which handles public URL override settings.oidc_issuer = client_issuer if not settings.nextcloud_mcp_server_url: settings.nextcloud_mcp_server_url = mcp_server_url if not settings.nextcloud_resource_uri: settings.nextcloud_resource_uri = nextcloud_resource_uri # Create Unified Token Verifier (ADR-005 compliant) token_verifier = UnifiedTokenVerifier(settings) # Log the mode enable_token_exchange = ( os.getenv("ENABLE_TOKEN_EXCHANGE", "false").lower() == "true" ) if enable_token_exchange: logger.info( "✓ Token Exchange mode enabled (ADR-005) - exchanging MCP tokens for Nextcloud tokens via RFC 8693" ) logger.info(f" MCP audience: {client_id} or {mcp_server_url}") logger.info(f" Nextcloud audience: {nextcloud_resource_uri}") else: logger.info( "✓ Multi-audience mode enabled (ADR-005) - tokens must contain both MCP and Nextcloud audiences" ) logger.info(f" Required MCP audience: {client_id} or {mcp_server_url}") logger.info(f" Required Nextcloud audience: {nextcloud_resource_uri}") if introspection_uri: logger.info("✓ Opaque token introspection enabled (RFC 7662)") if jwks_uri: logger.info("✓ JWT signature verification enabled (JWKS)") # Progressive Consent mode (for offline access / background jobs) encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY") if enable_offline_access and encryption_key and refresh_token_storage: logger.info("✓ Progressive Consent mode enabled - offline access available") # Note: Token Broker service would be initialized here for background job support # Currently not used in ADR-005 implementation as it's specific to offline access patterns # that are separate from the real-time token exchange flow logger.debug("Token broker available for future offline access features") # Create OAuth client for server-initiated flows (e.g., token exchange, background workers) oauth_client = None if enable_offline_access and refresh_token_storage and is_external_idp: # For external IdP mode, create generic OIDC client for token operations mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000") # Note: This redirect_uri is for OAuth client initialization, not used for actual redirects # since this client is used for backend token operations (exchange, refresh) redirect_uri = f"{mcp_server_url}/oauth/callback" # Extract base URL and realm from discovery URL # Format: http://keycloak:8080/realms/nextcloud-mcp/.well-known/openid-configuration # → base_url: http://keycloak:8080, realm: nextcloud-mcp if "/realms/" in discovery_url: base_url = discovery_url.split("/realms/")[0] realm = discovery_url.split("/realms/")[1].split("/")[0] else: # Fallback: use issuer to extract base URL base_url = ( issuer.rsplit("/realms/", 1)[0] if "/realms/" in issuer else issuer ) realm = issuer.split("/realms/")[1] if "/realms/" in issuer else "" oauth_client = KeycloakOAuthClient( keycloak_url=base_url, realm=realm, client_id=client_id, client_secret=client_secret, redirect_uri=redirect_uri, ) await oauth_client.discover() logger.info( "✓ OIDC client initialized for token operations (token exchange, refresh)" ) elif enable_offline_access and refresh_token_storage: # For integrated mode, OAuth client could be added later # For now, token refresh can use httpx directly with discovered endpoints logger.info( "OAuth client for token refresh not yet implemented for integrated mode" ) # Create auth settings mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000") # Note: We don't set required_scopes here anymore. # Scopes are now advertised via PRM endpoint and enforced per-tool. # This allows dynamic tool filtering based on user's actual token scopes. auth_settings = AuthSettings( issuer_url=AnyHttpUrl( client_issuer ), # Use client issuer (may be public override) resource_server_url=AnyHttpUrl(mcp_server_url), ) logger.info("OAuth configuration complete") return ( nextcloud_host, token_verifier, auth_settings, refresh_token_storage, oauth_client, oauth_provider, client_id, client_secret, ) async def setup_oauth_config_for_multi_user_basic( settings: Settings, client_id: str, client_secret: str, ) -> tuple[UnifiedTokenVerifier, RefreshTokenStorage | None, str, str]: """ Setup minimal OAuth configuration for multi-user BasicAuth mode. This is a lightweight version of setup_oauth_config() that: - Performs OIDC discovery to get endpoints - Creates UnifiedTokenVerifier for management API token validation - Creates RefreshTokenStorage for webhook token storage - Skips OAuth client creation (not needed for BasicAuth background sync) - Skips AuthSettings creation (not needed for BasicAuth MCP operations) This enables hybrid authentication mode where: - MCP operations use BasicAuth (stateless, simple) - Management APIs use OAuth bearer tokens (secure, per-user) - Background operations use OAuth refresh tokens (webhook sync) Args: settings: Application settings client_id: OAuth client ID (from DCR or static config) client_secret: OAuth client secret Returns: Tuple of (token_verifier, refresh_token_storage, client_id, client_secret) Raises: ValueError: If NEXTCLOUD_HOST is not set httpx.HTTPError: If OIDC discovery fails """ nextcloud_host = settings.nextcloud_host if not nextcloud_host: raise ValueError("NEXTCLOUD_HOST is required for OAuth infrastructure setup") nextcloud_host = nextcloud_host.rstrip("/") # Get OIDC discovery URL (always Nextcloud integrated mode for multi-user BasicAuth) discovery_url = os.getenv( "OIDC_DISCOVERY_URL", f"{nextcloud_host}/.well-known/openid-configuration", ) logger.info( f"Performing OIDC discovery for multi-user BasicAuth hybrid mode: {discovery_url}" ) # Perform OIDC discovery try: async with nextcloud_httpx_client( timeout=30.0, follow_redirects=True ) as http_client: response = await http_client.get(discovery_url) response.raise_for_status() discovery = response.json() except httpx.HTTPStatusError as e: logger.error( f"OIDC discovery failed: HTTP {e.response.status_code} from {discovery_url}" ) raise ValueError( f"OIDC discovery failed: HTTP {e.response.status_code} from {discovery_url}. " "Ensure Nextcloud OIDC (user_oidc app) is installed and configured." ) from e except httpx.RequestError as e: logger.error(f"OIDC discovery failed: {e}") raise ValueError( f"OIDC discovery failed: Cannot connect to {discovery_url}. Error: {e}" ) from e except (KeyError, ValueError) as e: logger.error( f"OIDC discovery failed: Invalid response from {discovery_url}: {e}" ) raise ValueError( f"OIDC discovery failed: Invalid response from {discovery_url}. " "The endpoint did not return valid OIDC configuration." ) from e logger.info("✓ OIDC discovery successful (multi-user BasicAuth)") # Extract OIDC endpoints from discovery issuer = discovery["issuer"] userinfo_uri = discovery["userinfo_endpoint"] jwks_uri = discovery.get("jwks_uri") introspection_uri = discovery.get("introspection_endpoint") logger.info("OIDC endpoints configured for management API:") logger.info(f" Issuer: {issuer}") logger.info(f" Userinfo: {userinfo_uri}") logger.info(f" JWKS: {jwks_uri}") logger.info(f" Introspection: {introspection_uri}") # Get MCP server URL for audience validation mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000") nextcloud_resource_uri = os.getenv("NEXTCLOUD_RESOURCE_URI", nextcloud_host) # Use public issuer URL for JWT validation if set (handles Docker internal/external URL mismatch) # Tokens are issued with the public URL, but OIDC discovery returns internal URL public_issuer_url = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL") client_issuer = public_issuer_url if public_issuer_url else issuer # Update settings with discovered values for UnifiedTokenVerifier if not settings.oidc_client_id: settings.oidc_client_id = client_id if not settings.oidc_client_secret: settings.oidc_client_secret = client_secret if not settings.jwks_uri: settings.jwks_uri = jwks_uri if not settings.introspection_uri: settings.introspection_uri = introspection_uri if not settings.userinfo_uri: settings.userinfo_uri = userinfo_uri if not settings.oidc_issuer: settings.oidc_issuer = client_issuer if not settings.nextcloud_mcp_server_url: settings.nextcloud_mcp_server_url = mcp_server_url if not settings.nextcloud_resource_uri: settings.nextcloud_resource_uri = nextcloud_resource_uri # Create Unified Token Verifier for management API authentication token_verifier = UnifiedTokenVerifier(settings) logger.info("✓ Token verifier created for management API (hybrid mode)") if introspection_uri: logger.info(" Opaque token introspection enabled (RFC 7662)") if jwks_uri: logger.info(" JWT signature verification enabled (JWKS)") # Initialize refresh token storage for background operations refresh_token_storage = None if settings.enable_offline_access: try: encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY") if not encryption_key: logger.warning( "ENABLE_OFFLINE_ACCESS=true but TOKEN_ENCRYPTION_KEY not set. " "Refresh tokens will NOT be stored. Generate a key with:\n" ' python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"' ) else: refresh_token_storage = RefreshTokenStorage.from_env() await refresh_token_storage.initialize() logger.info( "✓ Refresh token storage initialized for background operations (hybrid mode)" ) except Exception as e: logger.error(f"Failed to initialize refresh token storage: {e}") logger.debug(f"Full traceback:\n{traceback.format_exc()}") logger.warning( "Continuing without refresh token storage - webhook management may be limited" ) logger.info( "OAuth infrastructure setup complete for multi-user BasicAuth hybrid mode" ) return (token_verifier, refresh_token_storage, client_id, client_secret) def get_app(transport: str = "streamable-http", enabled_apps: list[str] | None = None): # Initialize observability (logging will be configured by uvicorn) settings = get_settings() # Validate configuration and detect deployment mode mode, config_errors = validate_configuration(settings) if config_errors: error_msg = ( f"Configuration validation failed for {mode.value} mode:\n" + "\n".join(f" - {err}" for err in config_errors) + "\n\n" + get_mode_summary(mode) ) logger.error(error_msg) raise ValueError(error_msg) logger.info(f"✅ Configuration validated successfully for {mode.value} mode") logger.debug(f"Mode details:\n{get_mode_summary(mode)}") # Derive helper variables for backward compatibility with existing code oauth_enabled = mode in ( AuthMode.OAUTH_SINGLE_AUDIENCE, AuthMode.OAUTH_TOKEN_EXCHANGE, ) deployment_mode = ( DeploymentMode.SMITHERY_STATELESS if mode == AuthMode.SMITHERY_STATELESS else DeploymentMode.SELF_HOSTED ) # Log hybrid authentication status for multi-user BasicAuth with offline access if mode == AuthMode.MULTI_USER_BASIC and settings.enable_offline_access: logger.info( "🔄 Hybrid authentication mode will be enabled:\n" " - MCP operations: BasicAuth (stateless, credentials per-request)\n" " - Management APIs: OAuth bearer tokens (secure, per-user)\n" " - Background operations: OAuth refresh tokens (webhook sync)" ) # Setup Prometheus metrics (always enabled by default) if settings.metrics_enabled: setup_metrics(port=settings.metrics_port) logger.info( f"Prometheus metrics enabled on dedicated port {settings.metrics_port}" ) # Setup OpenTelemetry tracing (optional) if settings.otel_exporter_otlp_endpoint: setup_tracing( service_name=settings.otel_service_name, otlp_endpoint=settings.otel_exporter_otlp_endpoint, otlp_verify_ssl=settings.otel_exporter_verify_ssl, sampling_rate=settings.otel_traces_sampler_arg, ) logger.info( f"OpenTelemetry tracing enabled (endpoint: {settings.otel_exporter_otlp_endpoint})" ) else: logger.info( "OpenTelemetry tracing disabled (set OTEL_EXPORTER_OTLP_ENDPOINT to enable)" ) # Initialize OAuth credentials for multi-user modes that need background operations # This must happen BEFORE uvicorn starts (same lifecycle point as OAuth modes) # to avoid async context issues multi_user_basic_oauth_creds: tuple[str, str] | None = None multi_user_token_verifier: UnifiedTokenVerifier | None = None multi_user_refresh_storage: RefreshTokenStorage | None = None if ( mode == AuthMode.MULTI_USER_BASIC and settings.vector_sync_enabled and settings.enable_background_operations ): print( f"DEBUG: Multi-user BasicAuth mode detected, vector_sync={settings.vector_sync_enabled}, background_operations={settings.enable_background_operations}" ) logger.info( "Multi-user BasicAuth with vector sync - checking for OAuth/app password credentials" ) # Check for static credentials first static_client_id = os.getenv("NEXTCLOUD_OIDC_CLIENT_ID") static_client_secret = os.getenv("NEXTCLOUD_OIDC_CLIENT_SECRET") if static_client_id and static_client_secret: print("DEBUG: Using static OAuth credentials") logger.info("Using static OAuth credentials for background operations") multi_user_basic_oauth_creds = (static_client_id, static_client_secret) else: # Perform DCR before uvicorn starts (same lifecycle as OAuth modes) print("DEBUG: No static credentials, attempting DCR...") logger.info( "OAuth credentials not configured - attempting Dynamic Client Registration..." ) async def setup_multi_user_basic_dcr(): """Setup DCR for multi-user BasicAuth background operations.""" # Construct registration endpoint directly from nextcloud_host # Standard RFC 7591 endpoint pattern for Nextcloud OIDC # This avoids relying on discovery doc which may use public URLs unreachable from containers registration_endpoint = f"{settings.nextcloud_host}/apps/oidc/register" logger.info( f"Attempting Dynamic Client Registration at: {registration_endpoint}" ) # Perform DCR try: # Assert nextcloud_host is not None (required for multi-user mode) assert settings.nextcloud_host is not None, ( "NEXTCLOUD_HOST is required" ) client_id, client_secret = await load_oauth_client_credentials( nextcloud_host=settings.nextcloud_host, registration_endpoint=registration_endpoint, ) logger.info( f"✓ Dynamic Client Registration successful for background operations " f"(client_id: {client_id[:16]}...)" ) return (client_id, client_secret) except Exception as e: logger.error(f"Dynamic Client Registration failed: {e}") logger.debug(f"Full traceback:\n{traceback.format_exc()}") logger.warning("Background vector sync will be disabled.") return None # Run DCR synchronously before uvicorn starts multi_user_basic_oauth_creds = anyio.run(setup_multi_user_basic_dcr) # Setup OAuth infrastructure for management APIs and background operations # This creates the UnifiedTokenVerifier needed by management.py and # RefreshTokenStorage for webhook token persistence if multi_user_basic_oauth_creds: sync_client_id, sync_client_secret = multi_user_basic_oauth_creds logger.info( "Setting up OAuth infrastructure for management APIs (hybrid mode)..." ) try: ( multi_user_token_verifier, multi_user_refresh_storage, _, _, ) = anyio.run( setup_oauth_config_for_multi_user_basic, settings, sync_client_id, sync_client_secret, ) logger.info( "✓ OAuth infrastructure setup complete for multi-user BasicAuth hybrid mode" ) except (httpx.HTTPError, ValueError, KeyError) as e: # Expected errors during OAuth infrastructure setup: # - httpx.HTTPError: Network issues, OIDC discovery failures # - ValueError: Missing required configuration (NEXTCLOUD_HOST) # - KeyError: Missing required fields in OIDC discovery response logger.error(f"Failed to setup OAuth infrastructure: {e}") logger.debug(f"Full traceback:\n{traceback.format_exc()}") logger.warning( "Management API will be unavailable. " "Webhook management from Astrolabe admin UI will not work." ) # Set to None to indicate failure multi_user_token_verifier = None multi_user_refresh_storage = None except Exception as e: # Unexpected error - this is a programming error, re-raise it logger.error( f"Unexpected error during OAuth infrastructure setup: {e}. " "This is likely a programming error that should be fixed." ) raise # Create MCP server based on detected mode if mode in (AuthMode.OAUTH_SINGLE_AUDIENCE, AuthMode.OAUTH_TOKEN_EXCHANGE): logger.info("Configuring MCP server for OAuth mode") # Asynchronously get the OAuth configuration ( nextcloud_host, token_verifier, auth_settings, refresh_token_storage, oauth_client, oauth_provider, client_id, client_secret, ) = anyio.run(setup_oauth_config) # Create lifespan function with captured OAuth context (closure) @asynccontextmanager async def oauth_lifespan(server: FastMCP) -> AsyncIterator[OAuthAppContext]: """ Lifespan context for OAuth mode - captures OAuth configuration from outer scope. """ logger.info("Starting MCP server in OAuth mode") logger.info(f"Using OAuth provider: {oauth_provider}") if refresh_token_storage: logger.info("Refresh token storage is available") if oauth_client: logger.info("OAuth client is available for token refresh") # Initialize document processors initialize_document_processors() try: yield OAuthAppContext( nextcloud_host=nextcloud_host, token_verifier=token_verifier, refresh_token_storage=refresh_token_storage, oauth_client=oauth_client, oauth_provider=oauth_provider, server_client_id=client_id, ) finally: logger.info("Shutting down MCP server") # RefreshTokenStorage uses context managers, no close() needed # OAuth client cleanup (if it has a close method) if oauth_client and hasattr(oauth_client, "close"): try: await oauth_client.close() except Exception as e: logger.warning(f"Error closing OAuth client: {e}") logger.info("MCP server shutdown complete") mcp = FastMCP( "Nextcloud MCP", lifespan=oauth_lifespan, token_verifier=token_verifier, auth=auth_settings, # Disable DNS rebinding protection for containerized deployments (k8s, Docker) # MCP 1.23+ auto-enables this for localhost, breaking k8s service DNS names transport_security=TransportSecuritySettings( enable_dns_rebinding_protection=False ), ) elif mode == AuthMode.SMITHERY_STATELESS: logger.info("Configuring MCP server for Smithery stateless mode") # json_response=True returns plain JSON-RPC instead of SSE format, # required for Smithery scanner compatibility mcp = FastMCP( "Nextcloud MCP", lifespan=app_lifespan_smithery, json_response=True, # Disable DNS rebinding protection for containerized deployments (k8s, Docker) # MCP 1.23+ auto-enables this for localhost, breaking k8s service DNS names transport_security=TransportSecuritySettings( enable_dns_rebinding_protection=False ), ) else: # BasicAuth modes (single-user or multi-user) logger.info(f"Configuring MCP server for {mode.value} mode") mcp = FastMCP( "Nextcloud MCP", lifespan=app_lifespan_basic, # Disable DNS rebinding protection for containerized deployments (k8s, Docker) # MCP 1.23+ auto-enables this for localhost, breaking k8s service DNS names transport_security=TransportSecuritySettings( enable_dns_rebinding_protection=False ), ) @mcp.resource("nc://capabilities") async def nc_get_capabilities(): """Get the Nextcloud Host capabilities""" ctx: Context = mcp.get_context() client = await get_nextcloud_client(ctx) return await client.capabilities() # Define available apps and their configuration functions available_apps = { "notes": configure_notes_tools, "tables": configure_tables_tools, "webdav": configure_webdav_tools, "sharing": configure_sharing_tools, "calendar": configure_calendar_tools, "contacts": configure_contacts_tools, "cookbook": configure_cookbook_tools, "deck": configure_deck_tools, "news": configure_news_tools, } # If no specific apps are specified, enable all if enabled_apps is None: enabled_apps = list(available_apps.keys()) # Configure only the enabled apps for app_name in enabled_apps: if app_name in available_apps: logger.info(f"Configuring {app_name} tools") available_apps[app_name](mcp) else: logger.warning( f"Unknown app: {app_name}. Available apps: {list(available_apps.keys())}" ) # Register semantic search tools (cross-app feature) # ADR-016: Skip in Smithery stateless mode (no vector database) if deployment_mode == DeploymentMode.SMITHERY_STATELESS: logger.info("Skipping semantic search tools (Smithery stateless mode)") elif settings.vector_sync_enabled: logger.info("Configuring semantic search tools (vector sync enabled)") configure_semantic_tools(mcp) else: logger.info("Skipping semantic search tools (VECTOR_SYNC_ENABLED not set)") # Register OAuth provisioning tools (only when offline access is enabled) # With token exchange enabled (external IdP), provisioning is not needed for MCP operations enable_token_exchange = ( os.getenv("ENABLE_TOKEN_EXCHANGE", "false").lower() == "true" ) # Use settings.enable_offline_access which handles both ENABLE_BACKGROUND_OPERATIONS (new) # and ENABLE_OFFLINE_ACCESS (deprecated) environment variables enable_offline_access_for_tools = settings.enable_offline_access if oauth_enabled and enable_offline_access_for_tools and not enable_token_exchange: logger.info("Registering OAuth provisioning tools for offline access") register_oauth_tools(mcp) elif oauth_enabled and enable_token_exchange: logger.info("Skipping provisioning tools registration (token exchange enabled)") elif oauth_enabled and not enable_offline_access_for_tools: logger.info( "Skipping provisioning tools registration (offline access not enabled)" ) # Register Login Flow v2 auth tools (ADR-022) if settings.enable_login_flow: logger.info("Registering Login Flow v2 auth tools") register_auth_tools(mcp) # Override list_tools to filter based on user's token scopes (OAuth mode only) if oauth_enabled: original_list_tools = mcp._tool_manager.list_tools def list_tools_filtered(): """List tools filtered by user's token scopes (JWT and Bearer tokens).""" # Get user's scopes from token using MCP SDK's contextvar # This works for all request types including list_tools user_scopes = get_access_token_scopes() is_jwt = is_jwt_token() logger.info( f"🔍 list_tools called - Token type: {'JWT' if is_jwt else 'opaque/none'}, " f"User scopes: {user_scopes}" ) # Get all tools all_tools = original_list_tools() # Filter tools based on user's token scopes (both JWT and opaque tokens) # JWT tokens have scopes embedded in payload # Opaque tokens get scopes via introspection endpoint # Claude Code now properly respects PRM endpoint for scope discovery if user_scopes: allowed_tools = [ tool for tool in all_tools if has_required_scopes(tool.fn, user_scopes) ] token_type = "JWT" if is_jwt else "Bearer" logger.info( f"✂️ {token_type} scope filtering: {len(allowed_tools)}/{len(all_tools)} tools " f"available for scopes: {user_scopes}" ) else: # BasicAuth mode or no token - show all tools allowed_tools = all_tools logger.info( f"📋 Showing all {len(all_tools)} tools (no token/BasicAuth)" ) # Return the Tool objects directly (they're already in the correct format) return allowed_tools # Replace the tool manager's list_tools method mcp._tool_manager.list_tools = list_tools_filtered # type: ignore[method-assign] logger.info( "Dynamic tool filtering enabled for OAuth mode (JWT and Bearer tokens)" ) mcp_app = mcp.streamable_http_app() async def _login_flow_cleanup_loop() -> None: """Periodically clean up expired Login Flow v2 sessions and proxy codes.""" from nextcloud_mcp_server.auth.oauth_routes import ( # noqa: PLC0415 _cleanup_expired_proxy_codes, ) while True: try: storage = await get_shared_storage() count = await storage.delete_expired_login_flow_sessions() if count: logger.info(f"Cleaned up {count} expired login flow sessions") # Also clean up expired AS proxy codes/sessions _cleanup_expired_proxy_codes() except Exception as e: logger.warning(f"Login flow cleanup error: {e}") await anyio.sleep(3600) # Every hour @asynccontextmanager async def _maybe_login_flow_cleanup(): """Start Login Flow cleanup task if enabled.""" if settings.enable_login_flow: async with anyio.create_task_group() as tg: tg.start_soon(_login_flow_cleanup_loop) yield tg.cancel_scope.cancel() else: yield @asynccontextmanager async def _mcp_session_with_login_flow(): """Start MCP session manager with optional Login Flow cleanup.""" async with AsyncExitStack() as stack: await stack.enter_async_context(mcp.session_manager.run()) await stack.enter_async_context(_maybe_login_flow_cleanup()) yield @asynccontextmanager async def starlette_lifespan(app: Starlette): # Set OAuth context for OAuth login routes (ADR-004) if oauth_enabled: # Prepare OAuth config from setup_oauth_config closure variables # Get nextcloud_host from settings (it was validated as required) nextcloud_host_for_context = settings.nextcloud_host if not nextcloud_host_for_context: raise ValueError("NEXTCLOUD_HOST is required for OAuth mode") mcp_server_url = os.getenv( "NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000" ) nextcloud_resource_uri = os.getenv( "NEXTCLOUD_RESOURCE_URI", nextcloud_host_for_context ) discovery_url = os.getenv( "OIDC_DISCOVERY_URL", f"{nextcloud_host_for_context}/.well-known/openid-configuration", ) scopes = os.getenv("NEXTCLOUD_OIDC_SCOPES", "") oauth_context_dict = { "storage": refresh_token_storage, "oauth_client": oauth_client, "token_verifier": token_verifier, # For querying IdP userinfo endpoint "config": { "mcp_server_url": mcp_server_url, "discovery_url": discovery_url, "client_id": client_id, # From setup_oauth_config (DCR or static) "client_secret": client_secret, # From setup_oauth_config (DCR or static) "scopes": scopes, "nextcloud_host": nextcloud_host_for_context, "nextcloud_resource_uri": nextcloud_resource_uri, "oauth_provider": oauth_provider, }, } app.state.oauth_context = oauth_context_dict # Also set oauth_context on browser_app for session authentication # browser_app is in the same function scope (defined later in create_app) # We need to find it in the mounted routes for route in app.routes: if isinstance(route, Mount) and route.path == "/app": browser_app = cast(Starlette, route.app) browser_app.state.oauth_context = oauth_context_dict logger.info( "OAuth context shared with browser_app for session auth" ) break logger.info( f"OAuth context initialized for login routes (client_id={client_id[:16]}...)" ) else: # BasicAuth mode - initialize storage for webhook management basic_auth_storage = RefreshTokenStorage.from_env() await basic_auth_storage.initialize() logger.info("Initialized refresh token storage for webhook management") app.state.storage = basic_auth_storage # For multi-user BasicAuth with offline access, create oauth_context for management APIs # This allows Astrolabe to use management APIs with OAuth bearer tokens if settings.enable_multi_user_basic_auth and settings.enable_offline_access: # Check if we have OAuth credentials AND infrastructure from setup if ( multi_user_basic_oauth_creds and multi_user_token_verifier is not None ): sync_client_id, sync_client_secret = multi_user_basic_oauth_creds # Create oauth_context for management API authentication nextcloud_host_for_context = settings.nextcloud_host mcp_server_url = os.getenv( "NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000" ) discovery_url = os.getenv( "OIDC_DISCOVERY_URL", f"{nextcloud_host_for_context}/.well-known/openid-configuration", ) oauth_context_dict = { # Use OAuth refresh token storage if available, fallback to basic_auth_storage "storage": multi_user_refresh_storage or basic_auth_storage, "oauth_client": None, # Not needed for management APIs "token_verifier": multi_user_token_verifier, # FIXED: Now has real verifier! "config": { "mcp_server_url": mcp_server_url, "discovery_url": discovery_url, "client_id": sync_client_id, "client_secret": sync_client_secret, "scopes": "", # Background sync only "nextcloud_host": nextcloud_host_for_context, "nextcloud_resource_uri": nextcloud_host_for_context, "oauth_provider": "nextcloud", # Always Nextcloud for multi-user BasicAuth }, } app.state.oauth_context = oauth_context_dict logger.info( f"✓ OAuth context initialized for management APIs (hybrid mode, client_id={sync_client_id[:16]}...)" ) elif multi_user_basic_oauth_creds and multi_user_token_verifier is None: logger.warning( "OAuth infrastructure setup failed - management API will be unavailable. " "This is expected if OIDC discovery failed or token verifier creation failed. " "Webhook management from Astrolabe admin UI will not work." ) else: logger.warning( "OAuth credentials not available - management API will be unavailable. " "This is expected if DCR failed or static credentials were not provided. " "Webhook management from Astrolabe admin UI will not work." ) # Also share with browser_app for webhook routes for route in app.routes: if isinstance(route, Mount) and route.path == "/app": browser_app = cast(Starlette, route.app) browser_app.state.storage = basic_auth_storage if ( settings.enable_multi_user_basic_auth and settings.enable_offline_access and hasattr(app.state, "oauth_context") ): browser_app.state.oauth_context = app.state.oauth_context logger.info( "OAuth context shared with browser_app for management APIs" ) logger.info( "Storage shared with browser_app for webhook management" ) break # Start background vector sync tasks (ADR-007) # Scanner runs at server-level (once), not per-session # Re-use settings from outer scope (already validated) # Note: enable_offline_access_for_sync, encryption_key, and refresh_token_storage # are already defined in outer scope before mode split # Multi-user BasicAuth uses OAuth-style background sync (with app passwords) # So skip single-user BasicAuth vector sync if in multi-user mode if ( settings.vector_sync_enabled and not oauth_enabled and not settings.enable_multi_user_basic_auth ): # BasicAuth mode - single user sync logger.info("Starting background vector sync tasks for BasicAuth mode") # Get username from environment username = os.getenv("NEXTCLOUD_USERNAME") if not username: raise ValueError( "NEXTCLOUD_USERNAME required for vector sync in BasicAuth mode" ) # Create client for vector sync (server-level, not per-session) client = NextcloudClient.from_env() # Initialize Qdrant collection before starting background tasks logger.info("Initializing Qdrant collection...") try: await get_qdrant_client() # Triggers collection creation if needed logger.info("Qdrant collection ready") except Exception as e: logger.error(f"Failed to initialize Qdrant collection: {e}") raise RuntimeError( f"Cannot start vector sync - Qdrant initialization failed: {e}" ) from e # Initialize shared state send_stream, receive_stream = anyio.create_memory_object_stream( max_buffer_size=settings.vector_sync_queue_max_size ) shutdown_event = anyio.Event() scanner_wake_event = anyio.Event() # Store in app state for access from routes (ADR-007) app.state.document_send_stream = send_stream app.state.document_receive_stream = receive_stream app.state.shutdown_event = shutdown_event app.state.scanner_wake_event = scanner_wake_event # Also store in module singleton for FastMCP session lifespans _vector_sync_state.document_send_stream = send_stream _vector_sync_state.document_receive_stream = receive_stream _vector_sync_state.shutdown_event = shutdown_event _vector_sync_state.scanner_wake_event = scanner_wake_event logger.info("Vector sync state stored in module singleton") # Also share with browser_app for /app route for route in app.routes: if isinstance(route, Mount) and route.path == "/app": browser_app = cast(Starlette, route.app) browser_app.state.document_send_stream = send_stream browser_app.state.document_receive_stream = receive_stream browser_app.state.shutdown_event = shutdown_event browser_app.state.scanner_wake_event = scanner_wake_event logger.info("Vector sync state shared with browser_app for /app") break # Start background tasks using anyio TaskGroup async with anyio.create_task_group() as tg: # Start scanner task await tg.start( scanner_task, send_stream, shutdown_event, scanner_wake_event, client, username, ) # Start processor pool (each gets a cloned receive stream) for i in range(settings.vector_sync_processor_workers): await tg.start( processor_task, i, receive_stream.clone(), shutdown_event, client, username, ) logger.info( f"Background sync tasks started: 1 scanner + " f"{settings.vector_sync_processor_workers} processors" ) # Run MCP session manager and yield async with _mcp_session_with_login_flow(): try: yield finally: # Shutdown signal logger.info("Shutting down background sync tasks") shutdown_event.set() await client.close() # TaskGroup automatically cancels all tasks on exit elif ( settings.vector_sync_enabled and (oauth_enabled or settings.enable_multi_user_basic_auth) and settings.enable_background_operations ): # OAuth mode with background operations - multi-user sync # Also used for multi-user BasicAuth mode (client auth is BasicAuth, background sync uses app passwords or OAuth) mode_desc = "OAuth mode" if oauth_enabled else "Multi-user BasicAuth mode" logger.info(f"Starting background vector sync tasks for {mode_desc}") # Get nextcloud_host (from settings - already validated) nextcloud_host_for_sync = settings.nextcloud_host if not nextcloud_host_for_sync: raise ValueError("NEXTCLOUD_HOST required for vector sync") # Get OIDC discovery URL (same as used for OAuth setup) discovery_url = os.getenv( "OIDC_DISCOVERY_URL", f"{nextcloud_host_for_sync}/.well-known/openid-configuration", ) # Get client credentials - these were obtained before uvicorn started # For OAuth modes: from setup_oauth_config() # For multi-user BasicAuth: from setup_multi_user_basic_dcr() oauth_ctx = getattr(app.state, "oauth_context", {}) oauth_config = oauth_ctx.get("config", {}) sync_client_id = oauth_config.get("client_id") sync_client_secret = oauth_config.get("client_secret") # For multi-user BasicAuth mode, use pre-obtained credentials from outer scope if not sync_client_id or not sync_client_secret: if multi_user_basic_oauth_creds: sync_client_id, sync_client_secret = multi_user_basic_oauth_creds logger.info( "Using pre-obtained OAuth credentials for background sync" ) else: # No credentials available - DCR was attempted before uvicorn started but failed sync_client_id = None sync_client_secret = None logger.warning( "OAuth credentials not available for background sync " "(DCR was attempted during startup but failed)" ) # Only start vector sync if credentials are available if sync_client_id and sync_client_secret: # Get storage - different for OAuth vs multi-user BasicAuth modes # OAuth mode: refresh_token_storage (from setup_oauth_config) # Multi-user BasicAuth: app.state.storage (basic_auth_storage) token_storage = ( refresh_token_storage if oauth_enabled else app.state.storage ) # Create token broker for background operations # Note: storage handles encryption internally, no key needed here # Client credentials are needed for token refresh operations token_broker = TokenBrokerService( storage=token_storage, oidc_discovery_url=discovery_url, nextcloud_host=nextcloud_host_for_sync, client_id=sync_client_id, client_secret=sync_client_secret, ) # Store token broker in oauth_context for management API (revoke endpoint) if hasattr(app.state, "oauth_context"): app.state.oauth_context["token_broker"] = token_broker logger.info( "Token broker added to oauth_context for management API" ) # Initialize Qdrant collection before starting background tasks logger.info("Initializing Qdrant collection...") try: await get_qdrant_client() # Triggers collection creation if needed logger.info("Qdrant collection ready") except Exception as e: logger.error(f"Failed to initialize Qdrant collection: {e}") raise RuntimeError( f"Cannot start vector sync - Qdrant initialization failed: {e}" ) from e # Clean up stale app passwords at startup (BasicAuth mode only) if not oauth_enabled: try: removed = await token_storage.cleanup_invalid_app_passwords( nextcloud_host=nextcloud_host_for_sync ) if removed: logger.info( f"Cleaned up {len(removed)} stale app password(s): {removed}" ) except Exception as e: logger.warning(f"App password cleanup failed (non-fatal): {e}") # Initialize shared state send_stream, receive_stream = anyio.create_memory_object_stream( max_buffer_size=settings.vector_sync_queue_max_size ) shutdown_event = anyio.Event() scanner_wake_event = anyio.Event() # User state tracking for user manager user_states: dict = {} # Store in app state for access from routes (ADR-007) app.state.document_send_stream = send_stream app.state.document_receive_stream = receive_stream app.state.shutdown_event = shutdown_event app.state.scanner_wake_event = scanner_wake_event # Also store in module singleton for FastMCP session lifespans _vector_sync_state.document_send_stream = send_stream _vector_sync_state.document_receive_stream = receive_stream _vector_sync_state.shutdown_event = shutdown_event _vector_sync_state.scanner_wake_event = scanner_wake_event logger.info("Vector sync state stored in module singleton") # Also share with browser_app for /app route for route in app.routes: if isinstance(route, Mount) and route.path == "/app": browser_app = cast(Starlette, route.app) browser_app.state.document_send_stream = send_stream browser_app.state.document_receive_stream = receive_stream browser_app.state.shutdown_event = shutdown_event browser_app.state.scanner_wake_event = scanner_wake_event logger.info( "Vector sync state shared with browser_app for /app" ) break # Determine authentication mode for background sync # Multi-user BasicAuth: use app passwords via Astrolabe (NOT OAuth) # OAuth mode: use OAuth refresh tokens (NOT app passwords) use_basic_auth = not oauth_enabled # Start background tasks using anyio TaskGroup async with anyio.create_task_group() as tg: # Start user manager task (supervises per-user scanners) await tg.start( user_manager_task, send_stream, shutdown_event, scanner_wake_event, token_broker if not use_basic_auth else None, token_storage, # Use token_storage (works for both OAuth and multi-user BasicAuth) nextcloud_host_for_sync, user_states, tg, use_basic_auth, # Pass as positional arg (before task_status) ) # Start processor pool (each gets a cloned receive stream) for i in range(settings.vector_sync_processor_workers): await tg.start( oauth_processor_task, i, receive_stream.clone(), shutdown_event, token_broker if not use_basic_auth else None, nextcloud_host_for_sync, use_basic_auth, # Pass as positional arg (before task_status) ) logger.info( f"Background sync tasks started: 1 user manager + " f"{settings.vector_sync_processor_workers} processors" ) # Run MCP session manager and yield async with _mcp_session_with_login_flow(): try: yield finally: # Shutdown signal logger.info("Shutting down background sync tasks") shutdown_event.set() # Close token broker HTTP client if token_broker._http_client: await token_broker._http_client.aclose() # TaskGroup automatically cancels all tasks on exit else: # No OAuth credentials available for background sync logger.warning( "Skipping background vector sync - OAuth credentials not available. " "Multi-user BasicAuth mode will run without semantic search background operations. " "To enable, set NEXTCLOUD_OIDC_CLIENT_ID and NEXTCLOUD_OIDC_CLIENT_SECRET." ) # Just run MCP session manager without vector sync async with _mcp_session_with_login_flow(): yield else: # No vector sync - just run MCP session manager if settings.vector_sync_enabled: # Log why vector sync is not starting if oauth_enabled and not settings.enable_offline_access: logger.warning( "Vector sync enabled but ENABLE_OFFLINE_ACCESS=false - " "vector sync requires offline access in OAuth mode" ) elif oauth_enabled and not refresh_token_storage: logger.warning( "Vector sync enabled but refresh token storage not available" ) elif oauth_enabled and not os.getenv("TOKEN_ENCRYPTION_KEY"): logger.warning( "Vector sync enabled but TOKEN_ENCRYPTION_KEY not set" ) async with _mcp_session_with_login_flow(): yield # Health check endpoints for Kubernetes probes def health_live(request): """Liveness probe endpoint. Returns 200 OK if the application process is running. This is a simple check that doesn't verify external dependencies. """ return JSONResponse( { "status": "alive", "mode": "oauth" if oauth_enabled else "basic", } ) async def health_ready(request): """Readiness probe endpoint. Returns 200 OK if the application is ready to serve traffic. Checks that required configuration is present and Qdrant if vector sync enabled. """ checks = {} is_ready = True # Check Nextcloud host configuration and connectivity nextcloud_host = os.getenv("NEXTCLOUD_HOST") if nextcloud_host: checks["nextcloud_configured"] = "ok" # Try to connect to Nextcloud start_time = time.time() try: async with nextcloud_httpx_client(timeout=2.0) as client: response = await client.get(f"{nextcloud_host}/status.php") duration = time.time() - start_time if response.status_code == 200: checks["nextcloud_reachable"] = "ok" set_dependency_health("nextcloud", True) else: checks["nextcloud_reachable"] = ( f"error: status {response.status_code}" ) set_dependency_health("nextcloud", False) is_ready = False record_dependency_check("nextcloud", duration) except Exception as e: duration = time.time() - start_time checks["nextcloud_reachable"] = f"error: {str(e)}" set_dependency_health("nextcloud", False) record_dependency_check("nextcloud", duration) is_ready = False else: checks["nextcloud_configured"] = "error: NEXTCLOUD_HOST not set" set_dependency_health("nextcloud", False) is_ready = False # Check authentication configuration # Report the deployment mode, not just whether OAuth is enabled # This helps clients (like Astrolabe) determine which auth flow to use if ( mode == AuthMode.OAUTH_SINGLE_AUDIENCE or mode == AuthMode.OAUTH_TOKEN_EXCHANGE ): checks["auth_mode"] = "oauth" checks["auth_configured"] = "ok" elif mode == AuthMode.MULTI_USER_BASIC: checks["auth_mode"] = "multi_user_basic" checks["auth_configured"] = "ok" # Indicate if app passwords are supported (when offline_access enabled) checks["supports_app_passwords"] = get_settings().enable_offline_access elif mode == AuthMode.SINGLE_USER_BASIC: username = os.getenv("NEXTCLOUD_USERNAME") password = os.getenv("NEXTCLOUD_PASSWORD") if username and password: checks["auth_mode"] = "basic" checks["auth_configured"] = "ok" else: checks["auth_mode"] = "basic" checks["auth_configured"] = "error: credentials not set" is_ready = False elif mode == AuthMode.SMITHERY_STATELESS: checks["auth_mode"] = "smithery" checks["auth_configured"] = "ok" # Check Qdrant status if using network mode (external Qdrant service) # In-memory and persistent modes use embedded Qdrant, no external service to check # Note: get_settings() supports both ENABLE_SEMANTIC_SEARCH and VECTOR_SYNC_ENABLED settings = get_settings() vector_sync_enabled = settings.vector_sync_enabled qdrant_url = os.getenv("QDRANT_URL") # Only set in network mode if vector_sync_enabled and qdrant_url: start_time = time.time() try: async with httpx.AsyncClient(timeout=2.0) as client: response = await client.get(f"{qdrant_url}/readyz") duration = time.time() - start_time if response.status_code == 200: checks["qdrant"] = "ok" set_dependency_health("qdrant", True) else: checks["qdrant"] = f"error: status {response.status_code}" set_dependency_health("qdrant", False) is_ready = False record_dependency_check("qdrant", duration) except Exception as e: duration = time.time() - start_time checks["qdrant"] = f"error: {str(e)}" set_dependency_health("qdrant", False) record_dependency_check("qdrant", duration) is_ready = False elif vector_sync_enabled: # Using embedded Qdrant (memory or persistent mode) checks["qdrant"] = "embedded" set_dependency_health("qdrant", True) status_code = 200 if is_ready else 503 return JSONResponse( { "status": "ready" if is_ready else "not_ready", "checks": checks, }, status_code=status_code, ) async def handle_nextcloud_webhook(request): """Test webhook endpoint to capture and log Nextcloud webhook payloads. This is a temporary endpoint for testing webhook schemas and payloads. It logs the full payload and returns 200 OK immediately. """ try: payload = await request.json() logger.info("=" * 80) logger.info("🔔 Webhook received from Nextcloud:") logger.info(json.dumps(payload, indent=2, sort_keys=True)) logger.info("=" * 80) return JSONResponse( {"status": "received", "timestamp": payload.get("time")}, status_code=200, ) except Exception as e: logger.error(f"❌ Failed to parse webhook payload: {e}") return JSONResponse( {"error": "invalid_payload", "message": str(e)}, status_code=400 ) # Add Protected Resource Metadata (PRM) endpoint for OAuth mode routes = [] # Add health check routes (available in both OAuth and BasicAuth modes) routes.append(Route("/health/live", health_live, methods=["GET"])) routes.append(Route("/health/ready", health_ready, methods=["GET"])) logger.info("Health check endpoints enabled: /health/live, /health/ready") # Add test webhook endpoint (for development/testing) routes.append( Route("/webhooks/nextcloud", handle_nextcloud_webhook, methods=["POST"]) ) logger.info("Test webhook endpoint enabled: /webhooks/nextcloud") # Add management API endpoints for Nextcloud PHP app # Available in: OAuth modes OR multi-user BasicAuth with offline access (for Astrolabe integration) enable_management_apis = oauth_enabled or ( settings.enable_multi_user_basic_auth and settings.enable_offline_access ) if enable_management_apis: routes.append(Route("/api/v1/status", get_server_status, methods=["GET"])) routes.append( Route( "/api/v1/vector-sync/status", get_vector_sync_status, methods=["GET"], ) ) routes.append( Route( "/api/v1/users/{user_id}/session", get_user_session, methods=["GET"], ) ) routes.append( Route( "/api/v1/users/{user_id}/revoke", revoke_user_access, methods=["POST"], ) ) # App password endpoints for multi-user BasicAuth mode routes.append( Route( "/api/v1/users/{user_id}/app-password", provision_app_password, methods=["POST"], ) ) routes.append( Route( "/api/v1/users/{user_id}/app-password", get_app_password_status, methods=["GET"], ) ) routes.append( Route( "/api/v1/users/{user_id}/app-password", delete_app_password, methods=["DELETE"], ) ) routes.append( Route("/api/v1/vector-viz/search", vector_search, methods=["POST"]) ) routes.append( Route("/api/v1/chunk-context", get_chunk_context, methods=["GET"]) ) # PDF preview endpoint for Astrolabe (server-side rendering) routes.append(Route("/api/v1/pdf-preview", get_pdf_preview, methods=["GET"])) # ADR-018: Unified search endpoint for Nextcloud PHP app integration routes.append(Route("/api/v1/search", unified_search, methods=["POST"])) routes.append(Route("/api/v1/apps", get_installed_apps, methods=["GET"])) # Webhook management endpoints routes.append(Route("/api/v1/webhooks", list_webhooks, methods=["GET"])) routes.append(Route("/api/v1/webhooks", create_webhook, methods=["POST"])) routes.append( Route("/api/v1/webhooks/{webhook_id}", delete_webhook, methods=["DELETE"]) ) # Access and scope management endpoints (ADR-022) routes.append( Route( "/api/v1/users/{user_id}/access", get_user_access, methods=["GET"], ) ) routes.append( Route( "/api/v1/users/{user_id}/scopes", update_user_scopes, methods=["PATCH"], ) ) routes.append(Route("/api/v1/scopes", list_supported_scopes, methods=["GET"])) logger.info( "Management API endpoints enabled: /api/v1/status, /api/v1/vector-sync/status, " "/api/v1/users/{user_id}/session, /api/v1/users/{user_id}/revoke, " "/api/v1/users/{user_id}/app-password, /api/v1/users/{user_id}/access, " "/api/v1/users/{user_id}/scopes, /api/v1/scopes, " "/api/v1/vector-viz/search, /api/v1/search, /api/v1/apps, " "/api/v1/webhooks, /api/v1/pdf-preview" ) # ADR-016: Add Smithery well-known config endpoint for container runtime discovery if deployment_mode == DeploymentMode.SMITHERY_STATELESS: def smithery_mcp_config(request): """Smithery MCP configuration endpoint. Returns JSON Schema for Smithery's configuration UI. This endpoint is required for Smithery container runtime discovery. """ return JSONResponse(SMITHERY_CONFIG_SCHEMA) routes.append( Route( "/.well-known/mcp-config", smithery_mcp_config, methods=["GET"], ) ) logger.info("Smithery config endpoint enabled: /.well-known/mcp-config") # Note: Metrics endpoint is NOT exposed on main HTTP port for security reasons. # Metrics are served on dedicated port via setup_metrics() (default: 9090) # Determine if OAuth provisioning is available # This is true for: # 1. OAuth modes (primary auth method for MCP operations) # 2. Multi-user BasicAuth with offline access (hybrid mode) oauth_provisioning_available = oauth_enabled or ( mode == AuthMode.MULTI_USER_BASIC and settings.enable_offline_access and multi_user_token_verifier is not None # Ensure OAuth setup succeeded ) if oauth_provisioning_available: logger.info( f"OAuth provisioning routes enabled for mode: {mode.value} " f"(oauth_enabled={oauth_enabled}, hybrid_mode={not oauth_enabled})" ) def oauth_protected_resource_metadata(request): """RFC 9728 Protected Resource Metadata endpoint. Dynamically discovers supported scopes from registered MCP tools. This ensures the advertised scopes always match the actual tool requirements. The 'resource' field is set to the MCP server's public URL (RFC 9728 requires a URL). This is used as the audience in access tokens via the resource parameter (RFC 8707). The introspection controller matches this URL to the MCP server's client via resource_url field. ADR-023: authorization_servers points to the MCP server itself (AS proxy) so that clients authenticate through the proxy and tokens have correct audience. """ # RFC 9728 requires resource to be a URL (not a client ID) # Use the MCP server's public URL mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL") if not mcp_server_url: # Fallback to constructing from host and port mcp_server_url = f"http://localhost:{os.getenv('PORT', '8000')}" # Dynamically discover all scopes from registered tools # This provides a single source of truth based on @require_scopes decorators supported_scopes = discover_all_scopes(mcp) # ADR-023: Point authorization_servers to the MCP server itself. # The MCP server acts as an OAuth AS proxy, forwarding to Nextcloud # with its own client_id so tokens have the correct audience. return JSONResponse( { "resource": f"{mcp_server_url}/mcp", # RFC 9728: must be a URL "scopes_supported": supported_scopes, "authorization_servers": [mcp_server_url], "bearer_methods_supported": ["header"], "resource_signing_alg_values_supported": ["RS256"], } ) # Register PRM endpoint at both path-based and root locations per RFC 9728 # Path-based discovery: /.well-known/oauth-protected-resource{path} routes.append( Route( "/.well-known/oauth-protected-resource/mcp", oauth_protected_resource_metadata, methods=["GET"], ) ) # Root discovery (fallback): /.well-known/oauth-protected-resource routes.append( Route( "/.well-known/oauth-protected-resource", oauth_protected_resource_metadata, methods=["GET"], ) ) logger.info( "Protected Resource Metadata (PRM) endpoints enabled (path-based + root)" ) # Add unified OAuth callback endpoint supporting both flows routes.append(Route("/oauth/callback", oauth_callback, methods=["GET"])) logger.info( "OAuth unified callback enabled: /oauth/callback?flow={browser|provisioning}" ) # Add OAuth resource provisioning routes (ADR-004 Progressive Consent Flow 2) routes.append( Route( "/oauth/authorize-nextcloud", oauth_authorize_nextcloud, methods=["GET"], ) ) # Keep old callback endpoint as backwards-compatible alias routes.append( Route( "/oauth/callback-nextcloud", oauth_callback_nextcloud, methods=["GET"], ) ) logger.info( "OAuth resource provisioning routes enabled: /oauth/authorize-nextcloud, /oauth/callback-nextcloud (Flow 2)" ) # Add OAuth Flow 1 routes (MCP client login) - ONLY for OAuth modes # Multi-user BasicAuth uses hybrid mode with only Flow 2 (resource provisioning) if oauth_enabled: routes.append(Route("/oauth/authorize", oauth_authorize, methods=["GET"])) # ADR-023: AS proxy endpoints — MCP server acts as its own OAuth AS routes.append(Route("/oauth/token", oauth_token_endpoint, methods=["POST"])) routes.append(Route("/oauth/register", oauth_register_proxy, methods=["POST"])) routes.append( Route( "/.well-known/oauth-authorization-server", oauth_as_metadata, methods=["GET"], ) ) logger.info( "OAuth AS proxy routes enabled: /oauth/authorize, /oauth/token, " "/oauth/register, /.well-known/oauth-authorization-server (ADR-023)" ) # Add browser OAuth login routes for Management API access # Available in OAuth modes AND multi-user BasicAuth with offline access # (hybrid mode). Separate from MCP tool auth - Management API uses OAuth if oauth_provisioning_available: routes.append( Route("/oauth/login", oauth_login, methods=["GET"], name="oauth_login") ) # Keep old callback endpoint as backwards-compatible alias routes.append( Route( "/oauth/login-callback", oauth_login_callback, methods=["GET"], name="oauth_login_callback", ) ) routes.append( Route("/oauth/logout", oauth_logout, methods=["GET"], name="oauth_logout") ) logger.info( "Browser OAuth routes enabled: /oauth/login, /oauth/login-callback (legacy), /oauth/logout" ) # Add user info routes (available in both BasicAuth and OAuth modes) # ADR-016: Skip /app admin UI in Smithery stateless mode (no vector sync, webhooks) if deployment_mode != DeploymentMode.SMITHERY_STATELESS: # Create a separate Starlette app for browser routes that need session auth # This prevents SessionAuthBackend from interfering with FastMCP's OAuth browser_routes = [ Route( "/", user_info_html, methods=["GET"] ), # /app → user info with all tabs Route( "/revoke", revoke_session, methods=["POST"], name="revoke_session_endpoint", ), # /app/revoke → revoke_session # Vector sync status fragment (htmx polling) Route( "/vector-sync/status", vector_sync_status_fragment, methods=["GET"], ), # /app/vector-sync/status # Vector visualization routes Route( "/vector-viz", vector_visualization_html, methods=["GET"] ), # /app/vector-viz Route( "/vector-viz/search", vector_visualization_search, methods=["GET"], ), # /app/vector-viz/search Route( "/chunk-context", chunk_context_endpoint, methods=["GET"], ), # /app/chunk-context # Webhook management routes (admin-only) Route( "/webhooks", webhook_management_pane, methods=["GET"] ), # /app/webhooks Route( "/webhooks/enable/{preset_id:str}", enable_webhook_preset, methods=["POST"], ), Route( "/webhooks/disable/{preset_id:str}", disable_webhook_preset, methods=["DELETE"], ), ] # Add static files mount if directory exists static_dir = os.path.join(os.path.dirname(__file__), "auth", "static") if os.path.isdir(static_dir): browser_routes.append( Mount("/static", StaticFiles(directory=static_dir), name="static") ) logger.info(f"Mounted static files from {static_dir}") browser_app = Starlette(routes=browser_routes) browser_app.add_middleware( AuthenticationMiddleware, # type: ignore[invalid-argument-type] backend=SessionAuthBackend(oauth_enabled=oauth_enabled), ) # Add redirect from /app to /app/ (Starlette requires trailing slash for mounted apps) routes.append( Route("/app", lambda request: RedirectResponse("/app/", status_code=307)) ) # Mount browser app at /app (webapp and admin routes) routes.append(Mount("/app", app=browser_app)) logger.info("App routes with session auth: /app, /app/webhooks, /app/revoke") else: logger.info("Admin UI (/app) disabled in Smithery stateless mode") # Mount FastMCP at root last (catch-all, handles OAuth via token_verifier) routes.append(Mount("/", app=mcp_app)) app = Starlette(routes=routes, lifespan=starlette_lifespan) logger.info( "Routes: /user/* with SessionAuth, /mcp with FastMCP OAuth Bearer tokens" ) # Store supported scopes on app.state for AS metadata endpoint (ADR-023) if oauth_enabled: app.state.supported_scopes = discover_all_scopes(mcp) # Add debugging middleware to log Authorization headers and client capabilities @app.middleware("http") async def log_auth_headers(request, call_next): auth_header = request.headers.get("authorization") if request.url.path.startswith("/mcp"): if auth_header: # Log first 50 chars of token for debugging token_preview = ( auth_header[:50] + "..." if len(auth_header) > 50 else auth_header ) logger.info(f"🔑 /mcp request with Authorization: {token_preview}") else: # Only warn about missing Authorization in OAuth mode # In BasicAuth mode, /mcp requests without Authorization are expected if oauth_enabled: logger.warning( f"⚠️ /mcp request WITHOUT Authorization header from {request.client}" ) # Log client capabilities on initialize request if request.method == "POST": # Read body to check for initialize request # Starlette caches the body internally, so it's safe to read here body = await request.body() try: data = json.loads(body) # Check if this is an initialize request if data.get("method") == "initialize": params = data.get("params", {}) capabilities = params.get("capabilities", {}) client_info = params.get("clientInfo", {}) logger.info( f"🔌 MCP client connected: {client_info.get('name', 'unknown')} " f"v{client_info.get('version', 'unknown')}" ) # Log capabilities in a structured way cap_summary = [] # Check for presence using 'in' not truthiness (empty dict {} counts as having capability) if "roots" in capabilities: cap_summary.append("roots") if "sampling" in capabilities: cap_summary.append("sampling") if "experimental" in capabilities: cap_summary.append( f"experimental({len(capabilities['experimental'])} features)" ) logger.info( f"📋 Client capabilities: {', '.join(cap_summary) if cap_summary else 'none'}" ) # Log full capabilities at INFO level to diagnose capability issues logger.info( f"Full capabilities JSON: {json.dumps(capabilities)}" ) except Exception as e: # Don't fail the request if logging fails logger.debug( f"Failed to parse MCP request for capability logging: {e}" ) response = await call_next(request) return response # Add CORS middleware to allow browser-based clients like MCP Inspector app.add_middleware( CORSMiddleware, # type: ignore[invalid-argument-type] allow_origins=["*"], # Allow all origins for development allow_credentials=True, allow_methods=["*"], allow_headers=["*"], expose_headers=["*"], ) # Add observability middleware (metrics + tracing) if settings.metrics_enabled or settings.otel_exporter_otlp_endpoint: app.add_middleware(ObservabilityMiddleware) # type: ignore[invalid-argument-type] logger.info("Observability middleware enabled (metrics and/or tracing)") # Add exception handler for scope challenges (OAuth mode only) if oauth_enabled: @app.exception_handler(InsufficientScopeError) async def handle_insufficient_scope(request, exc: InsufficientScopeError): """Return 403 with WWW-Authenticate header for scope challenges.""" resource_url = os.getenv( "NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000" ) scope_str = " ".join(exc.missing_scopes) return JSONResponse( status_code=403, headers={ "WWW-Authenticate": ( f'Bearer error="insufficient_scope", ' f'scope="{scope_str}", ' f'resource_metadata="{resource_url}/.well-known/oauth-protected-resource/mcp"' ) }, content={ "error": "insufficient_scope", "scopes_required": exc.missing_scopes, }, ) logger.info("WWW-Authenticate scope challenge handler enabled") # ADR-016: Apply SmitheryConfigMiddleware in Smithery stateless mode # This must be the outermost middleware to extract config from URL query parameters # before any other middleware processes the request if deployment_mode == DeploymentMode.SMITHERY_STATELESS: app = SmitheryConfigMiddleware(app) logger.info("SmitheryConfigMiddleware enabled for query parameter config") # Apply BasicAuthMiddleware for multi-user BasicAuth pass-through mode if settings.enable_multi_user_basic_auth: app = BasicAuthMiddleware(app) logger.info( "BasicAuthMiddleware enabled - multi-user BasicAuth pass-through mode active" ) return app