"""Management API endpoints for Nextcloud PHP app integration. ADR-018: Provides REST API endpoints for the Nextcloud PHP app to query: - Server status and version - User session information and background access status - Vector sync metrics All endpoints use OAuth bearer token authentication via UnifiedTokenVerifier. The PHP app obtains tokens through PKCE flow and uses them to access these endpoints. Shared helper functions for other API modules are also exported from here: - extract_bearer_token: Extract OAuth token from request - validate_token_and_get_user: Validate token and get user ID - _sanitize_error_for_client: Return safe error messages - _parse_int_param, _parse_float_param, _validate_query_string: Parameter validation """ import logging import time from importlib.metadata import version from typing import Any from qdrant_client.models import Filter from starlette.requests import Request from starlette.responses import JSONResponse from nextcloud_mcp_server.config import get_settings from nextcloud_mcp_server.config_validators import AuthMode, detect_auth_mode from nextcloud_mcp_server.vector.placeholder import get_placeholder_filter from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client logger = logging.getLogger(__name__) # Get package version from metadata __version__ = version("nextcloud-mcp-server") # Track server start time for uptime calculation _server_start_time = time.time() def extract_bearer_token(request: Request) -> str | None: """Extract OAuth bearer token from Authorization header. Args: request: Starlette request Returns: Token string or None if no valid Authorization header """ auth_header = request.headers.get("Authorization") if not auth_header: return None # Parse "Bearer " parts = auth_header.split() if len(parts) != 2 or parts[0].lower() != "bearer": return None return parts[1] async def validate_token_and_get_user( request: Request, ) -> tuple[str, dict[str, Any]]: """Validate OAuth bearer token and extract user ID. Uses verify_token_for_management_api which accepts any valid Nextcloud OIDC token (not just MCP-audience tokens). This is needed because Astrolabe (NC PHP app) uses its own OAuth client, separate from MCP server's client. Security Model: ~~~~~~~~~~~~~~~ - **Authentication** (this function): Verifies token is cryptographically valid and extracts user identity from the `sub` claim. - **Authorization** (calling endpoints): Each endpoint MUST verify that the authenticated user owns the requested resource. For example: - GET /users/{user_id}/session: Checks token_user_id == path_user_id (403 if mismatch) - POST /users/{user_id}/revoke: Checks token_user_id == path_user_id (403 if mismatch) This separation ensures that even without audience validation, users can only access their own resources. Cross-user access is blocked at the authorization layer. Args: request: Starlette request with Authorization header Returns: Tuple of (user_id, validated_token_data) Raises: Exception: If token is invalid or missing """ token = extract_bearer_token(request) if not token: raise ValueError("Missing Authorization header") # Get token verifier from app state # Note: This is set in app.py starlette_lifespan for OAuth mode token_verifier = request.app.state.oauth_context["token_verifier"] # Validate token for management API (handles both JWT and opaque tokens) # Uses verify_token_for_management_api which accepts any valid Nextcloud token # without requiring MCP audience - needed for Astrolabe integration (ADR-018) access_token = await token_verifier.verify_token_for_management_api(token) if not access_token: raise ValueError("Token validation failed") # Extract user ID from AccessToken.resource field (set during verification) user_id = access_token.resource if not user_id: raise ValueError("Token missing user identifier") # Return user_id and a dict with token info for compatibility validated = { "sub": user_id, "client_id": access_token.client_id, "scopes": access_token.scopes, "expires_at": access_token.expires_at, } return user_id, validated def _sanitize_error_for_client(error: Exception, context: str = "") -> str: """ Return a safe, generic error message for clients. Detailed error is logged internally but not exposed to clients to prevent information leakage (database paths, API URLs, tokens, etc.). Args: error: The exception that occurred context: Optional context for logging (e.g., "revoke_user_access") Returns: Generic error message safe for client consumption """ # Log detailed error for debugging logger.error(f"Error in {context}: {error}", exc_info=True) # Return generic message return "An internal error occurred. Please contact your administrator." def _parse_int_param( value: str | None, default: int, min_val: int, max_val: int, param_name: str, ) -> int: """Parse and validate integer parameter.""" if value is None: return default try: parsed = int(value) except ValueError: raise ValueError(f"Invalid {param_name}: must be an integer") if parsed < min_val or parsed > max_val: raise ValueError( f"Invalid {param_name}: must be between {min_val} and {max_val}" ) return parsed def _parse_float_param( value: Any, default: float, min_val: float, max_val: float, param_name: str, ) -> float: """Parse and validate float parameter.""" if value is None: return default try: parsed = float(value) except (ValueError, TypeError): raise ValueError(f"Invalid {param_name}: must be a number") if parsed < min_val or parsed > max_val: raise ValueError( f"Invalid {param_name}: must be between {min_val} and {max_val}" ) return parsed def _validate_query_string(query: str, max_length: int = 10000) -> None: """Validate query string length.""" if len(query) > max_length: raise ValueError(f"Query too long: maximum {max_length} characters") async def get_server_status(request: Request) -> JSONResponse: """GET /api/v1/status - Server status and version. Returns basic server information including version, auth mode, vector sync status, and uptime. Public endpoint - no authentication required. """ # Public endpoint - no authentication required # Get configuration settings = get_settings() # Calculate uptime uptime_seconds = int(time.time() - _server_start_time) # Determine auth mode using proper mode detection mode = detect_auth_mode(settings) # Map deployment mode to auth_mode for API response # This helps clients (like Astrolabe) determine which auth flow to use if mode == AuthMode.OAUTH_SINGLE_AUDIENCE or mode == AuthMode.OAUTH_TOKEN_EXCHANGE: auth_mode = "oauth" elif mode == AuthMode.MULTI_USER_BASIC: auth_mode = "multi_user_basic" elif mode == AuthMode.SINGLE_USER_BASIC: auth_mode = "basic" elif mode == AuthMode.SMITHERY_STATELESS: auth_mode = "smithery" else: auth_mode = "unknown" response_data = { "version": __version__, "auth_mode": auth_mode, "vector_sync_enabled": settings.vector_sync_enabled, "uptime_seconds": uptime_seconds, "management_api_version": "1.0", } # Add app password support indicator for multi-user BasicAuth mode if mode == AuthMode.MULTI_USER_BASIC: response_data["supports_app_passwords"] = settings.enable_offline_access # Include OIDC configuration for client discovery (e.g. Astrolabe PHP app). # Always attempt to provide oidc.discovery_url so clients can discover the # IdP regardless of the current auth mode. This enables smoother transitions # between auth modes and lets Astrolabe pre-discover OIDC endpoints. oidc_config: dict[str, str] = {} if settings.oidc_discovery_url: # Explicit OIDC_DISCOVERY_URL takes precedence oidc_config["discovery_url"] = settings.oidc_discovery_url elif settings.nextcloud_host: # Auto-derive from NEXTCLOUD_HOST — Nextcloud exposes OIDC discovery # at the standard well-known path when user_oidc is enabled host = settings.nextcloud_host.rstrip("/") oidc_config["discovery_url"] = f"{host}/.well-known/openid-configuration" if settings.oidc_issuer: oidc_config["issuer"] = settings.oidc_issuer if oidc_config: response_data["oidc"] = oidc_config return JSONResponse(response_data) async def get_vector_sync_status(request: Request) -> JSONResponse: """GET /api/v1/vector-sync/status - Vector sync metrics. Returns real-time indexing status and metrics. Requires: VECTOR_SYNC_ENABLED=true Public endpoint - no authentication required. """ # Public endpoint - no authentication required settings = get_settings() if not settings.vector_sync_enabled: return JSONResponse( {"error": "Vector sync is disabled on this server"}, status_code=404, ) try: # Get document receive stream from app state (set by starlette_lifespan in app.py) document_receive_stream = getattr( request.app.state, "document_receive_stream", None ) if document_receive_stream is None: logger.debug("document_receive_stream not available in app state") return JSONResponse( { "status": "unknown", "indexed_documents": 0, "pending_documents": 0, "message": "Vector sync stream not initialized", } ) # Get pending count from stream statistics stream_stats = document_receive_stream.statistics() pending_count = stream_stats.current_buffer_used # Get Qdrant client and query indexed count indexed_count = 0 try: qdrant_client = await get_qdrant_client() # Count documents in collection, excluding placeholders count_result = await qdrant_client.count( collection_name=settings.get_collection_name(), count_filter=Filter(must=[get_placeholder_filter()]), ) indexed_count = count_result.count except Exception as e: logger.warning(f"Failed to query Qdrant for indexed count: {e}") # Continue with indexed_count = 0 # Determine status status = "syncing" if pending_count > 0 else "idle" return JSONResponse( { "status": status, "indexed_documents": indexed_count, "pending_documents": pending_count, } ) except Exception as e: error_msg = _sanitize_error_for_client(e, "get_vector_sync_status") return JSONResponse( {"error": error_msg}, status_code=500, ) async def get_user_session(request: Request) -> JSONResponse: """GET /api/v1/users/{user_id}/session - User session details. Returns information about the user's MCP session including: - Background access status (offline_access) - IdP profile information Requires OAuth bearer token. The user_id in the path must match the user_id in the token. """ try: # Validate OAuth token and extract user token_user_id, validated = await validate_token_and_get_user(request) except Exception as e: error_msg = _sanitize_error_for_client(e, "get_user_session_auth") return JSONResponse( {"error": error_msg}, status_code=401, ) # Get user_id from path path_user_id = request.path_params.get("user_id") # Verify token user matches requested user if token_user_id != path_user_id: logger.warning( f"User {token_user_id} attempted to access session for {path_user_id}" ) return JSONResponse( { "error": "Forbidden", "message": "Cannot access another user's session", }, status_code=403, ) # Check if offline access is enabled # Use settings.enable_offline_access which handles both ENABLE_BACKGROUND_OPERATIONS (new) # and ENABLE_OFFLINE_ACCESS (deprecated) environment variables settings = get_settings() enable_offline_access = settings.enable_offline_access if not enable_offline_access: # Offline access disabled - return minimal session info return JSONResponse( { "session_id": token_user_id, "background_access_granted": False, } ) # Get refresh token storage from app state storage = request.app.state.oauth_context.get("storage") if not storage: logger.error("Refresh token storage not available in app state") return JSONResponse( { "session_id": token_user_id, "background_access_granted": False, "error": "Storage not configured", } ) try: # Check if user has refresh token stored refresh_token_data = await storage.get_refresh_token(token_user_id) if not refresh_token_data: # No refresh token - user hasn't provisioned background access return JSONResponse( { "session_id": token_user_id, "background_access_granted": False, } ) # User has background access - get profile info profile = await storage.get_user_profile(token_user_id) response_data = { "session_id": token_user_id, "background_access_granted": True, "background_access_details": { "granted_at": refresh_token_data.get("created_at"), "scopes": refresh_token_data.get("scope", "").split(), }, } if profile: response_data["idp_profile"] = profile return JSONResponse(response_data) except Exception as e: error_msg = _sanitize_error_for_client(e, "get_user_session") return JSONResponse( {"error": error_msg}, status_code=500, ) async def revoke_user_access(request: Request) -> JSONResponse: """POST /api/v1/users/{user_id}/revoke - Revoke user's background access. Deletes the user's stored refresh token, removing their offline access. Requires OAuth bearer token. The user_id in the path must match the user_id in the token. """ try: # Validate OAuth token and extract user token_user_id, validated = await validate_token_and_get_user(request) except Exception as e: logger.warning(f"Unauthorized access to /api/v1/users/{{user_id}}/revoke: {e}") return JSONResponse( { "error": "Unauthorized", "message": _sanitize_error_for_client(e, "revoke_user_access"), }, status_code=401, ) # Get user_id from path path_user_id = request.path_params.get("user_id") # Verify token user matches requested user if token_user_id != path_user_id: logger.warning( f"User {token_user_id} attempted to revoke access for {path_user_id}" ) return JSONResponse( { "error": "Forbidden", "message": "Cannot revoke another user's access", }, status_code=403, ) # Get token broker from app state oauth_context = request.app.state.oauth_context if oauth_context is None: logger.error("OAuth context not initialized") return JSONResponse( {"error": "OAuth not enabled"}, status_code=500, ) token_broker = oauth_context.get("token_broker") if not token_broker: logger.error("Token broker not available in app state") return JSONResponse( {"error": "Token broker not configured"}, status_code=500, ) try: # Delete refresh token from storage await token_broker.storage.delete_refresh_token(token_user_id) # CRITICAL: Invalidate all cached tokens for this user await token_broker.cache.invalidate(token_user_id) logger.info( f"Revoked background access for user {token_user_id} (cache and storage cleared)" ) return JSONResponse( { "success": True, "message": f"Background access revoked for {token_user_id}", } ) except Exception as e: error_msg = _sanitize_error_for_client(e, "revoke_user_access") return JSONResponse( {"success": False, "error": error_msg}, status_code=500, )