diff --git a/nextcloud_mcp_server/app.py b/nextcloud_mcp_server/app.py index 8233ef5..4fa06ff 100644 --- a/nextcloud_mcp_server/app.py +++ b/nextcloud_mcp_server/app.py @@ -27,6 +27,9 @@ from nextcloud_mcp_server.auth import ( has_required_scopes, is_jwt_token, ) +from nextcloud_mcp_server.auth.progressive_token_verifier import ( + ProgressiveConsentTokenVerifier, +) from nextcloud_mcp_server.client import NextcloudClient from nextcloud_mcp_server.config import ( LOGGING_CONFIG, @@ -45,6 +48,7 @@ from nextcloud_mcp_server.server import ( configure_tables_tools, configure_webdav_tools, ) +from nextcloud_mcp_server.server.oauth_tools import register_oauth_tools logger = logging.getLogger(__name__) @@ -211,7 +215,9 @@ class OAuthAppContext: """Application context for OAuth mode.""" nextcloud_host: str - token_verifier: NextcloudTokenVerifier + token_verifier: ( + object # Can be NextcloudTokenVerifier or ProgressiveConsentTokenVerifier + ) refresh_token_storage: Optional["RefreshTokenStorage"] = None oauth_client: Optional[object] = None # NextcloudOAuthClient or KeycloakOAuthClient oauth_provider: str = "nextcloud" # "nextcloud" or "keycloak" @@ -558,8 +564,52 @@ async def setup_oauth_config(): jwt_validation_issuer = issuer client_issuer = issuer + # Check if Progressive Consent mode is enabled + enable_progressive = ( + os.getenv("ENABLE_PROGRESSIVE_CONSENT", "true").lower() == "true" + ) + # Create token verifier - if is_external_idp: + if enable_progressive: + # Progressive Consent mode: Use specialized verifier with audience separation + logger.info("✓ Progressive Consent mode enabled - dual OAuth flows active") + + # Get encryption key for token broker + encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY") + if not encryption_key: + logger.warning( + "TOKEN_ENCRYPTION_KEY not set - token broker will not be available" + ) + + # Create token broker service + from nextcloud_mcp_server.auth.token_broker import TokenBrokerService + + token_broker = None + if encryption_key and refresh_token_storage: + token_broker = TokenBrokerService( + storage=refresh_token_storage, + oidc_discovery_url=discovery_url, + nextcloud_host=nextcloud_host, + encryption_key=encryption_key, + ) + logger.info( + "✓ Token Broker service initialized for audience-specific tokens" + ) + + # Create Progressive Consent token verifier + token_verifier = ProgressiveConsentTokenVerifier( + token_storage=refresh_token_storage, + token_broker=token_broker, + oidc_discovery_url=discovery_url, + nextcloud_host=nextcloud_host, + encryption_key=encryption_key, + ) + + logger.info( + "✓ Progressive Consent verifier configured - enforcing audience separation" + ) + + elif is_external_idp: # External IdP mode: Validate via Nextcloud user_oidc app # The user_oidc app accepts tokens from the external IdP and provisions users nextcloud_userinfo_uri = f"{nextcloud_host}/apps/user_oidc/userinfo" @@ -761,6 +811,15 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None): f"Unknown app: {app_name}. Available apps: {list(available_apps.keys())}" ) + # Register OAuth provisioning tools if in OAuth mode with Progressive Consent + if oauth_enabled: + enable_progressive = ( + os.getenv("ENABLE_PROGRESSIVE_CONSENT", "true").lower() == "true" + ) + if enable_progressive: + logger.info("Registering OAuth provisioning tools for Progressive Consent") + register_oauth_tools(mcp) + # Override list_tools to filter based on user's token scopes (OAuth mode only) if oauth_enabled: original_list_tools = mcp._tool_manager.list_tools diff --git a/nextcloud_mcp_server/auth/client_registry.py b/nextcloud_mcp_server/auth/client_registry.py new file mode 100644 index 0000000..03069c9 --- /dev/null +++ b/nextcloud_mcp_server/auth/client_registry.py @@ -0,0 +1,239 @@ +""" +MCP Client Registry for ADR-004 Progressive Consent Architecture. + +This module manages the registry of allowed MCP clients that can authenticate +via Flow 1. In production, this would integrate with Dynamic Client Registration +(DCR) or a database of pre-registered clients. +""" + +import logging +import os +from dataclasses import dataclass +from typing import Dict, List, Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class MCPClientInfo: + """Information about a registered MCP client.""" + + client_id: str + name: str + redirect_uris: List[str] + allowed_scopes: List[str] + is_public: bool = True # Native clients are public (no client_secret) + metadata: Optional[Dict] = None + + +class ClientRegistry: + """ + Registry for MCP clients allowed to authenticate via Flow 1. + + In production, this would: + 1. Support Dynamic Client Registration (DCR) per RFC 7591 + 2. Integrate with IdP client registry + 3. Store client metadata in database + 4. Support client updates and revocation + """ + + def __init__(self, allow_dynamic_registration: bool = False): + """ + Initialize the client registry. + + Args: + allow_dynamic_registration: Whether to allow DCR for new clients + """ + self.allow_dynamic_registration = allow_dynamic_registration + self._clients: Dict[str, MCPClientInfo] = {} + self._load_static_clients() + + def _load_static_clients(self): + """Load statically configured clients from environment.""" + # Load from ALLOWED_MCP_CLIENTS environment variable + allowed_clients = os.getenv("ALLOWED_MCP_CLIENTS", "").strip() + + if allowed_clients: + # Parse comma-separated list + for client_id in allowed_clients.split(","): + client_id = client_id.strip() + if client_id: + # Create basic client info + # In production, would load full metadata from database + self._clients[client_id] = MCPClientInfo( + client_id=client_id, + name=self._get_client_name(client_id), + redirect_uris=["http://localhost:*", "http://127.0.0.1:*"], + allowed_scopes=["openid", "profile", "email", "mcp-server:api"], + is_public=True, + ) + logger.info(f"Registered static client: {client_id}") + + # Add well-known clients if not explicitly configured + if not self._clients: + self._add_well_known_clients() + + def _get_client_name(self, client_id: str) -> str: + """Get human-readable name for client_id.""" + known_names = { + "claude-desktop": "Claude Desktop", + "continue-dev": "Continue IDE Extension", + "zed-editor": "Zed Editor", + "vscode-mcp": "VS Code MCP Extension", + "test-mcp-client": "Test MCP Client", + } + return known_names.get(client_id, client_id.replace("-", " ").title()) + + def _add_well_known_clients(self): + """Add well-known MCP clients for testing and development.""" + well_known = [ + MCPClientInfo( + client_id="claude-desktop", + name="Claude Desktop", + redirect_uris=["http://localhost:*", "http://127.0.0.1:*"], + allowed_scopes=["openid", "profile", "email", "mcp-server:api"], + is_public=True, + metadata={"vendor": "Anthropic"}, + ), + MCPClientInfo( + client_id="test-mcp-client", + name="Test MCP Client", + redirect_uris=["http://localhost:*", "http://127.0.0.1:*"], + allowed_scopes=["openid", "profile", "email", "mcp-server:api"], + is_public=True, + metadata={"purpose": "testing"}, + ), + ] + + for client in well_known: + self._clients[client.client_id] = client + logger.info(f"Registered well-known client: {client.client_id}") + + def validate_client( + self, + client_id: str, + redirect_uri: Optional[str] = None, + scopes: Optional[List[str]] = None, + ) -> tuple[bool, Optional[str]]: + """ + Validate a client_id and optionally its redirect_uri and scopes. + + Args: + client_id: The client identifier to validate + redirect_uri: Optional redirect URI to validate + scopes: Optional list of scopes to validate + + Returns: + Tuple of (is_valid, error_message) + """ + # Check if client exists + client = self._clients.get(client_id) + if not client: + if self.allow_dynamic_registration: + # In production, would attempt DCR here + logger.info(f"Unknown client {client_id}, would attempt DCR") + return True, None + else: + return False, f"Unknown client_id: {client_id}" + + # Validate redirect_uri if provided + if redirect_uri: + if not self._validate_redirect_uri(client, redirect_uri): + return False, f"Invalid redirect_uri for client {client_id}" + + # Validate scopes if provided + if scopes: + invalid_scopes = set(scopes) - set(client.allowed_scopes) + if invalid_scopes: + return False, f"Invalid scopes for client {client_id}: {invalid_scopes}" + + return True, None + + def _validate_redirect_uri(self, client: MCPClientInfo, redirect_uri: str) -> bool: + """ + Validate redirect_uri against client's registered URIs. + + Args: + client: The client info + redirect_uri: The URI to validate + + Returns: + True if valid, False otherwise + """ + # Parse the redirect URI + from urllib.parse import urlparse + + parsed = urlparse(redirect_uri) + + # Check against registered patterns + for pattern in client.redirect_uris: + if "*" in pattern: + # Handle wildcard port (localhost:*) + pattern_base = pattern.replace(":*", "") + if redirect_uri.startswith(pattern_base + ":"): + # Validate it's localhost with a port + if parsed.hostname in ["localhost", "127.0.0.1"]: + return True + elif redirect_uri == pattern: + return True + + return False + + def register_client(self, client_info: MCPClientInfo) -> bool: + """ + Register a new MCP client (DCR support). + + Args: + client_info: Client information to register + + Returns: + True if registered successfully + """ + if not self.allow_dynamic_registration: + logger.warning(f"DCR disabled, cannot register {client_info.client_id}") + return False + + if client_info.client_id in self._clients: + logger.warning(f"Client {client_info.client_id} already registered") + return False + + self._clients[client_info.client_id] = client_info + logger.info(f"Dynamically registered client: {client_info.client_id}") + + # In production, would persist to database + return True + + def get_client(self, client_id: str) -> Optional[MCPClientInfo]: + """ + Get client information. + + Args: + client_id: The client identifier + + Returns: + Client info if found, None otherwise + """ + return self._clients.get(client_id) + + def list_clients(self) -> List[MCPClientInfo]: + """ + List all registered clients. + + Returns: + List of client information + """ + return list(self._clients.values()) + + +# Global registry instance +_registry: Optional[ClientRegistry] = None + + +def get_client_registry() -> ClientRegistry: + """Get the global client registry instance.""" + global _registry + if _registry is None: + # Check if DCR is enabled + allow_dcr = os.getenv("ENABLE_DCR", "false").lower() == "true" + _registry = ClientRegistry(allow_dynamic_registration=allow_dcr) + return _registry diff --git a/nextcloud_mcp_server/auth/oauth_routes.py b/nextcloud_mcp_server/auth/oauth_routes.py index bbc3ba8..9c40209 100644 --- a/nextcloud_mcp_server/auth/oauth_routes.py +++ b/nextcloud_mcp_server/auth/oauth_routes.py @@ -29,6 +29,7 @@ import jwt from starlette.requests import Request from starlette.responses import JSONResponse, RedirectResponse +from nextcloud_mcp_server.auth.client_registry import get_client_registry from nextcloud_mcp_server.auth.refresh_token_storage import RefreshTokenStorage logger = logging.getLogger(__name__) @@ -60,9 +61,9 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse: Returns: 302 redirect to IdP authorization endpoint """ - # Check if Progressive Consent is enabled + # Check if Progressive Consent is enabled (default: true for ADR-004) enable_progressive = ( - os.getenv("ENABLE_PROGRESSIVE_CONSENT", "false").lower() == "true" + os.getenv("ENABLE_PROGRESSIVE_CONSENT", "true").lower() == "true" ) # Extract parameters @@ -129,7 +130,7 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse: status_code=400, ) - # In Progressive Consent mode, validate client_id + # In Progressive Consent mode, validate client_id using registry if enable_progressive: if not client_id: return JSONResponse( @@ -140,16 +141,22 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse: status_code=400, ) - # Check if client_id is in allowed list - allowed_clients = os.getenv("ALLOWED_MCP_CLIENTS", "").split(",") - allowed_clients = [c.strip() for c in allowed_clients if c.strip()] + # Validate client using registry + registry = get_client_registry() + is_valid, error_msg = registry.validate_client( + client_id=client_id, + redirect_uri=redirect_uri, + scopes=request.query_params.get("scope", "").split() + if request.query_params.get("scope") + else None, + ) - if allowed_clients and client_id not in allowed_clients: - logger.warning(f"Unauthorized client_id: {client_id}") + if not is_valid: + logger.warning(f"Client validation failed: {error_msg}") return JSONResponse( { "error": "unauthorized_client", - "error_description": f"Client {client_id} is not authorized", + "error_description": error_msg, }, status_code=401, ) @@ -169,44 +176,61 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse: oauth_client = oauth_ctx["oauth_client"] oauth_config = oauth_ctx["config"] - # Generate session ID and MCP authorization code - session_id = str(uuid4()) - mcp_authorization_code = f"mcp-code-{secrets.token_urlsafe(32)}" - - logger.info( - f"Starting OAuth authorization flow - session={session_id[:8]}..., " - f"client_redirect={redirect_uri}" - ) - - # Store session with client details and PKCE challenge - flow_type = "flow1" if enable_progressive else "hybrid" - await storage.store_oauth_session( - session_id=session_id, - client_id=client_id, - client_redirect_uri=redirect_uri, - state=state, - code_challenge=code_challenge, - code_challenge_method=code_challenge_method, - mcp_authorization_code=mcp_authorization_code, - flow_type=flow_type, - ttl_seconds=600, # 10 minutes - ) - # Build IdP authorization URL mcp_server_url = oauth_config["mcp_server_url"] if enable_progressive: - # Flow 1: Client authenticates directly to IdP - # Use client's redirect_uri for direct callback + # Flow 1: Client authenticates directly to IdP WITHOUT server interception + # CRITICAL: This is a direct pass-through to IdP + # The IdP will redirect directly back to the client's callback + # The MCP server does NOT see the IdP authorization code! + + logger.info( + f"Starting Progressive Consent Flow 1 - no server session needed, " + f"client will handle IdP response directly at {redirect_uri}" + ) + + # Use client's redirect_uri for DIRECT callback (bypasses server) callback_uri = redirect_uri - # Only request MCP authentication scopes + + # Only request MCP authentication scopes (no Nextcloud scopes!) + # The token will have aud: "mcp-server" claim scopes = "openid profile email" + # Pass through client's state directly idp_state = state - # Use client's own client_id (if IdP requires it) + + # Use client's own client_id (client must be pre-registered at IdP) idp_client_id = client_id + + logger.info("Flow 1 (Progressive Consent): Direct client auth to IdP") + logger.info(f" Client ID: {client_id}") + logger.info(f" Client will receive IdP code directly at: {callback_uri}") + logger.info(f" Scopes: {scopes} (no resource access)") else: - # Hybrid Flow: Server intercepts callback + # Hybrid Flow: Server intercepts callback (backward compatible) + # Generate session ID and MCP authorization code for Hybrid Flow + session_id = str(uuid4()) + mcp_authorization_code = f"mcp-code-{secrets.token_urlsafe(32)}" + + logger.info( + f"Starting Hybrid OAuth flow - session={session_id[:8]}..., " + f"client_redirect={redirect_uri}" + ) + + # Store session with client details and PKCE challenge + await storage.store_oauth_session( + session_id=session_id, + client_id=client_id, + client_redirect_uri=redirect_uri, + state=state, + code_challenge=code_challenge, + code_challenge_method=code_challenge_method, + mcp_authorization_code=mcp_authorization_code, + flow_type="hybrid", + ttl_seconds=600, # 10 minutes + ) + callback_uri = f"{mcp_server_url}/oauth/callback" # Combine session_id and client state for IdP state parameter idp_state = f"{session_id}:{state}" @@ -217,6 +241,10 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse: # Use server's client_id idp_client_id = oauth_config["client_id"] + logger.info("Hybrid Flow: Server intercepts callback") + logger.info(f" Server callback: {callback_uri}") + logger.info(f" Combined scopes: {scopes}") + # Get authorization endpoint from OAuth client if oauth_client: # External IdP mode (Keycloak) - use oauth_client @@ -615,9 +643,9 @@ async def oauth_authorize_nextcloud( Returns: 302 redirect to IdP authorization endpoint """ - # Check if Progressive Consent is enabled + # Check if Progressive Consent is enabled (default: true for ADR-004) enable_progressive = ( - os.getenv("ENABLE_PROGRESSIVE_CONSENT", "false").lower() == "true" + os.getenv("ENABLE_PROGRESSIVE_CONSENT", "true").lower() == "true" ) if not enable_progressive: return JSONResponse( @@ -724,7 +752,7 @@ async def oauth_authorize_nextcloud( return RedirectResponse(auth_url, status_code=302) -async def oauth_callback_nextcloud(request: Request) -> JSONResponse: +async def oauth_callback_nextcloud(request: Request): """ OAuth callback endpoint for Flow 2: Resource Provisioning. diff --git a/nextcloud_mcp_server/auth/progressive_token_verifier.py b/nextcloud_mcp_server/auth/progressive_token_verifier.py new file mode 100644 index 0000000..d278970 --- /dev/null +++ b/nextcloud_mcp_server/auth/progressive_token_verifier.py @@ -0,0 +1,214 @@ +""" +Token Verifier for ADR-004 Progressive Consent Architecture. + +This module implements token verification with strict audience separation: +- Flow 1 tokens have aud: "mcp-server" for MCP authentication +- Flow 2 tokens have aud: "nextcloud" for resource access +- Token Broker manages the exchange between audiences +""" + +import logging +import os +from datetime import datetime, timezone +from typing import Optional + +import jwt +from mcp.server.auth.provider import AccessToken + +from nextcloud_mcp_server.auth.refresh_token_storage import RefreshTokenStorage +from nextcloud_mcp_server.auth.token_broker import TokenBrokerService + +logger = logging.getLogger(__name__) + + +class ProgressiveConsentTokenVerifier: + """ + Token verifier for Progressive Consent dual OAuth flows. + + This verifier: + 1. Validates Flow 1 tokens (aud: "mcp-server") for MCP authentication + 2. Checks if user has provisioned Nextcloud access (Flow 2) + 3. Uses Token Broker to obtain aud: "nextcloud" tokens when needed + """ + + def __init__( + self, + token_storage: RefreshTokenStorage, + token_broker: Optional[TokenBrokerService] = None, + oidc_discovery_url: Optional[str] = None, + nextcloud_host: Optional[str] = None, + encryption_key: Optional[str] = None, + ): + """ + Initialize the Progressive Consent token verifier. + + Args: + token_storage: Storage for refresh tokens + token_broker: Token broker service (created if not provided) + oidc_discovery_url: OIDC provider discovery URL + nextcloud_host: Nextcloud server URL + encryption_key: Fernet key for token encryption + """ + self.storage = token_storage + self.oidc_discovery_url = oidc_discovery_url or os.getenv( + "OIDC_DISCOVERY_URL", + f"{os.getenv('NEXTCLOUD_HOST')}/.well-known/openid-configuration", + ) + self.nextcloud_host = nextcloud_host or os.getenv("NEXTCLOUD_HOST") + self.encryption_key = encryption_key or os.getenv("TOKEN_ENCRYPTION_KEY") + + # Create token broker if not provided + if token_broker: + self.token_broker = token_broker + elif self.encryption_key: + self.token_broker = TokenBrokerService( + storage=token_storage, + oidc_discovery_url=self.oidc_discovery_url, + nextcloud_host=self.nextcloud_host, + encryption_key=self.encryption_key, + ) + else: + self.token_broker = None + logger.warning("Token broker not available - encryption key missing") + + async def verify_token(self, token: str) -> Optional[AccessToken]: + """ + Verify a Flow 1 token (aud: "mcp-server"). + + This validates that: + 1. Token has correct audience for MCP server + 2. Token is not expired + 3. Token has valid signature (if verification enabled) + + Args: + token: JWT access token from Flow 1 + + Returns: + AccessToken if valid, None otherwise + """ + try: + # Decode without signature verification (IdP handles that) + # In production, would verify signature with IdP public key + payload = jwt.decode(token, options={"verify_signature": False}) + + # CRITICAL: Verify audience is for MCP server (Flow 1) + audiences = payload.get("aud", []) + if isinstance(audiences, str): + audiences = [audiences] + + # Check for correct audience + if "mcp-server" not in audiences: + logger.warning(f"Token rejected: wrong audience {audiences}") + # Check if this is a Nextcloud token (wrong flow) + if "nextcloud" in audiences: + logger.error( + "Received Nextcloud token in MCP context - " + "client may be using wrong token" + ) + return None + + # Check expiry + exp = payload.get("exp", 0) + if exp < datetime.now(timezone.utc).timestamp(): + logger.debug("Token expired") + return None + + # Extract user info + user_id = payload.get("sub", "unknown") + client_id = payload.get("client_id", "unknown") + scopes = payload.get("scope", "").split() + exp = payload.get("exp", None) + + # Create AccessToken for MCP framework + return AccessToken( + token=token, + client_id=client_id, + scopes=scopes, + expires_at=exp, + resource=f"user:{user_id}", # Store user_id in resource field + ) + + except jwt.InvalidTokenError as e: + logger.debug(f"Invalid token: {e}") + return None + except Exception as e: + logger.error(f"Token verification failed: {e}") + return None + + async def check_provisioning(self, user_id: str) -> bool: + """ + Check if user has provisioned Nextcloud access (Flow 2). + + Args: + user_id: User identifier from Flow 1 token + + Returns: + True if user has completed Flow 2, False otherwise + """ + if not self.storage: + return False + + refresh_data = await self.storage.get_refresh_token(user_id) + return refresh_data is not None + + async def get_nextcloud_token(self, user_id: str) -> Optional[str]: + """ + Get a Nextcloud access token (aud: "nextcloud") for the user. + + This uses the Token Broker to: + 1. Check for cached Nextcloud token + 2. If expired, refresh using stored master refresh token + 3. Return token with aud: "nextcloud" for API access + + Args: + user_id: User identifier from Flow 1 token + + Returns: + Nextcloud access token if provisioned, None otherwise + """ + if not self.token_broker: + logger.error("Token broker not available") + return None + + # Check if user has provisioned access + if not await self.check_provisioning(user_id): + logger.info(f"User {user_id} has not provisioned Nextcloud access") + return None + + # Get or refresh Nextcloud token + try: + nextcloud_token = await self.token_broker.get_nextcloud_token(user_id) + if nextcloud_token: + logger.debug(f"Obtained Nextcloud token for user {user_id}") + return nextcloud_token + except Exception as e: + logger.error(f"Failed to get Nextcloud token: {e}") + return None + + async def validate_scopes( + self, token: AccessToken, required_scopes: list[str] + ) -> bool: + """ + Validate that token has required scopes. + + Args: + token: The access token + required_scopes: List of required scopes + + Returns: + True if all required scopes present, False otherwise + """ + token_scopes = set(token.scopes) if token.scopes else set() + required = set(required_scopes) + + missing = required - token_scopes + if missing: + logger.debug(f"Token missing required scopes: {missing}") + return False + + return True + + async def close(self): + """Clean up resources.""" + if self.token_broker: + await self.token_broker.close() diff --git a/nextcloud_mcp_server/auth/provisioning_decorator.py b/nextcloud_mcp_server/auth/provisioning_decorator.py new file mode 100644 index 0000000..1613257 --- /dev/null +++ b/nextcloud_mcp_server/auth/provisioning_decorator.py @@ -0,0 +1,175 @@ +""" +Provisioning decorator for ADR-004 Progressive Consent Architecture. + +This decorator ensures users have completed Flow 2 (Resource Provisioning) +before accessing Nextcloud resources. +""" + +import functools +import logging +from typing import Callable + +from mcp.server.fastmcp import Context +from mcp.shared.exceptions import McpError +from mcp.types import ErrorData + +from nextcloud_mcp_server.auth.refresh_token_storage import RefreshTokenStorage + +logger = logging.getLogger(__name__) + + +def require_provisioning(func: Callable) -> Callable: + """ + Decorator that checks if user has provisioned Nextcloud access (Flow 2). + + This decorator: + 1. Extracts user_id from the MCP token (Flow 1) + 2. Checks if user has completed Flow 2 provisioning + 3. Returns helpful error message if not provisioned + 4. Allows access if provisioned + + Usage: + @mcp.tool() + @require_provisioning + async def list_notes(ctx: Context): + # Tool implementation + pass + """ + + @functools.wraps(func) + async def wrapper(*args, **kwargs): + # Extract context from arguments + ctx = None + for arg in args: + if isinstance(arg, Context): + ctx = arg + break + if not ctx: + ctx = kwargs.get("ctx") + + if not ctx: + raise McpError( + ErrorData( + code=-1, + message="Context not found - cannot verify provisioning", + ) + ) + + # Get user_id from authorization token + user_id = None + if hasattr(ctx, "authorization") and ctx.authorization: + try: + import jwt + + token = ctx.authorization.token + payload = jwt.decode(token, options={"verify_signature": False}) + user_id = payload.get("sub") + logger.debug(f"Checking provisioning for user: {user_id}") + except Exception as e: + logger.warning(f"Failed to extract user_id from token: {e}") + + if not user_id: + raise McpError( + ErrorData( + code=-1, + message="Cannot determine user identity for provisioning check", + ) + ) + + # Check provisioning status + storage = RefreshTokenStorage.from_env() + await storage.initialize() + + refresh_data = await storage.get_refresh_token(user_id) + + if not refresh_data: + # User has not completed Flow 2 - provide helpful error + logger.info( + f"User {user_id} attempted to use Nextcloud tool without provisioning" + ) + raise McpError( + ErrorData( + code=-1, + message=( + "Nextcloud access not provisioned. " + "Please run the 'provision_nextcloud_access' tool first to authorize " + "the MCP server to access Nextcloud on your behalf. " + "This is a one-time setup required for security." + ), + ) + ) + + logger.debug( + f"User {user_id} has provisioned access - proceeding with tool execution" + ) + + # User has provisioned - allow access + return await func(*args, **kwargs) + + return wrapper + + +def require_provisioning_or_suggest(func: Callable) -> Callable: + """ + Softer version that suggests provisioning but doesn't block. + + This decorator: + 1. Checks provisioning status + 2. Logs a warning if not provisioned + 3. Still allows the function to proceed + 4. Can be used for read-only operations that might work without explicit provisioning + + Usage: + @mcp.tool() + @require_provisioning_or_suggest + async def list_tools(ctx: Context): + # Tool implementation + pass + """ + + @functools.wraps(func) + async def wrapper(*args, **kwargs): + # Extract context from arguments + ctx = None + for arg in args: + if isinstance(arg, Context): + ctx = arg + break + if not ctx: + ctx = kwargs.get("ctx") + + if ctx: + # Try to check provisioning status + try: + # Get user_id from authorization token + user_id = None + if hasattr(ctx, "authorization") and ctx.authorization: + import jwt + + token = ctx.authorization.token + payload = jwt.decode(token, options={"verify_signature": False}) + user_id = payload.get("sub") + + if user_id: + # Check provisioning status + storage = RefreshTokenStorage.from_env() + await storage.initialize() + + refresh_data = await storage.get_refresh_token(user_id) + + if not refresh_data: + logger.info( + f"User {user_id} has not provisioned Nextcloud access. " + "Some features may not work. Consider running " + "'provision_nextcloud_access' tool." + ) + else: + logger.debug(f"User {user_id} has provisioned access") + + except Exception as e: + logger.debug(f"Could not check provisioning status: {e}") + + # Always proceed with the function + return await func(*args, **kwargs) + + return wrapper diff --git a/nextcloud_mcp_server/server/notes.py b/nextcloud_mcp_server/server/notes.py index c7528de..c36241c 100644 --- a/nextcloud_mcp_server/server/notes.py +++ b/nextcloud_mcp_server/server/notes.py @@ -6,6 +6,7 @@ from mcp.shared.exceptions import McpError from mcp.types import ErrorData from nextcloud_mcp_server.auth import require_scopes +from nextcloud_mcp_server.auth.provisioning_decorator import require_provisioning from nextcloud_mcp_server.context import get_client from nextcloud_mcp_server.models.notes import ( AppendContentResponse, @@ -86,6 +87,7 @@ def configure_notes_tools(mcp: FastMCP): @mcp.tool() @require_scopes("notes:write") + @require_provisioning async def nc_notes_create_note( title: str, content: str, category: str, ctx: Context ) -> CreateNoteResponse: @@ -247,6 +249,7 @@ def configure_notes_tools(mcp: FastMCP): @mcp.tool() @require_scopes("notes:read") + @require_provisioning async def nc_notes_search_notes(query: str, ctx: Context) -> SearchNotesResponse: """Search notes by title or content, returning only id, title, and category (requires notes:read scope).""" client = get_client(ctx) diff --git a/nextcloud_mcp_server/server/oauth_tools.py b/nextcloud_mcp_server/server/oauth_tools.py index bffafee..2092c4d 100644 --- a/nextcloud_mcp_server/server/oauth_tools.py +++ b/nextcloud_mcp_server/server/oauth_tools.py @@ -56,7 +56,7 @@ class RevocationResult(BaseModel): message: str = Field(description="Status message for the user") -async def get_provisioning_status(mcp: Context, user_id: str) -> ProvisioningStatus: +async def get_provisioning_status(ctx: Context, user_id: str) -> ProvisioningStatus: """ Check the provisioning status for Nextcloud access. @@ -140,7 +140,7 @@ def generate_oauth_url_for_flow2( async def provision_nextcloud_access( - mcp: Context, user_id: Optional[str] = None + ctx: Context, user_id: Optional[str] = None ) -> ProvisioningResult: """ MCP Tool: Provision offline access to Nextcloud resources. @@ -151,20 +151,33 @@ async def provision_nextcloud_access( The user must complete the OAuth flow in their browser to grant access. Args: - mcp: MCP context + ctx: MCP context with user's Flow 1 token user_id: Optional user identifier (extracted from token if not provided) Returns: ProvisioningResult with authorization URL or status """ try: - # Get user ID from context if not provided + # Extract user ID from the MCP access token (Flow 1 token) if not user_id: - # In a real implementation, extract from the MCP access token - user_id = mcp.context.get("user_id", "default_user") + # Get the authorization token from context + if hasattr(ctx, "authorization") and ctx.authorization: + token = ctx.authorization.token + # Decode token to get user info + try: + import jwt + + payload = jwt.decode(token, options={"verify_signature": False}) + user_id = payload.get("sub", "unknown") + logger.info(f"Extracted user_id from Flow 1 token: {user_id}") + except Exception as e: + logger.warning(f"Failed to decode token: {e}") + user_id = "default_user" + else: + user_id = "default_user" # Check if already provisioned - status = await get_provisioning_status(mcp, user_id) + status = await get_provisioning_status(ctx, user_id) if status.is_provisioned: return ProvisioningResult( success=True, @@ -271,7 +284,7 @@ async def provision_nextcloud_access( async def revoke_nextcloud_access( - mcp: Context, user_id: Optional[str] = None + ctx: Context, user_id: Optional[str] = None ) -> RevocationResult: """ MCP Tool: Revoke offline access to Nextcloud resources. @@ -289,10 +302,14 @@ async def revoke_nextcloud_access( try: # Get user ID from context if not provided if not user_id: - user_id = mcp.context.get("user_id", "default_user") + user_id = ( + ctx.context.get("user_id", "default_user") + if hasattr(ctx, "context") + else "default_user" + ) # Check current status - status = await get_provisioning_status(mcp, user_id) + status = await get_provisioning_status(ctx, user_id) if not status.is_provisioned: return RevocationResult( success=True, @@ -346,7 +363,7 @@ async def revoke_nextcloud_access( async def check_provisioning_status( - mcp: Context, user_id: Optional[str] = None + ctx: Context, user_id: Optional[str] = None ) -> ProvisioningStatus: """ MCP Tool: Check the current provisioning status. @@ -363,9 +380,13 @@ async def check_provisioning_status( """ # Get user ID from context if not provided if not user_id: - user_id = mcp.context.get("user_id", "default_user") + user_id = ( + ctx.context.get("user_id", "default_user") + if hasattr(ctx, "context") + else "default_user" + ) - return await get_provisioning_status(mcp, user_id) + return await get_provisioning_status(ctx, user_id) # Register MCP tools @@ -381,20 +402,25 @@ def register_oauth_tools(mcp): ), ) async def tool_provision_access( + ctx: Context, user_id: Optional[str] = None, ) -> ProvisioningResult: - return await provision_nextcloud_access(mcp, user_id) + return await provision_nextcloud_access(ctx, user_id) @mcp.tool( name="revoke_nextcloud_access", description="Revoke offline access to Nextcloud resources.", ) - async def tool_revoke_access(user_id: Optional[str] = None) -> RevocationResult: - return await revoke_nextcloud_access(mcp, user_id) + async def tool_revoke_access( + ctx: Context, user_id: Optional[str] = None + ) -> RevocationResult: + return await revoke_nextcloud_access(ctx, user_id) @mcp.tool( name="check_provisioning_status", description="Check whether Nextcloud access is provisioned.", ) - async def tool_check_status(user_id: Optional[str] = None) -> ProvisioningStatus: - return await check_provisioning_status(mcp, user_id) + async def tool_check_status( + ctx: Context, user_id: Optional[str] = None + ) -> ProvisioningStatus: + return await check_provisioning_status(ctx, user_id)