diff --git a/app-hooks/post-installation/10-install-oidc-app.sh b/app-hooks/post-installation/10-install-oidc-app.sh index 5a0094d..b4367bc 100755 --- a/app-hooks/post-installation/10-install-oidc-app.sh +++ b/app-hooks/post-installation/10-install-oidc-app.sh @@ -35,5 +35,6 @@ php /var/www/html/occ config:app:set oidc dynamic_client_registration --value='t php /var/www/html/occ config:app:set oidc proof_key_for_code_exchange --value=true --type=boolean php /var/www/html/occ config:app:set oidc allow_user_settings --value='enabled' php /var/www/html/occ config:app:set oidc default_token_type --value='jwt' +php /var/www/html/occ config:app:set oidc default_resource_identifier --value='http://localhost:8080' echo "OIDC app installed and configured successfully" diff --git a/app-hooks/post-installation/30-disable-welcome-wizard.sh b/app-hooks/post-installation/30-disable-welcome-wizard.sh new file mode 100755 index 0000000..ce352f2 --- /dev/null +++ b/app-hooks/post-installation/30-disable-welcome-wizard.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +php /var/www/html/occ config:app:set --value false firstrunwizard wizard_enabled diff --git a/docker-compose.yml b/docker-compose.yml index 6893252..07ba22a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -96,6 +96,7 @@ services: # OIDC_CLIENT_ID not set - uses Dynamic Client Registration (DCR) - NEXTCLOUD_HOST=http://app:80 - NEXTCLOUD_MCP_SERVER_URL=http://localhost:8001 + - NEXTCLOUD_RESOURCE_URI=http://localhost:8080 # ADR-005: Nextcloud resource identifier for audience validation - NEXTCLOUD_PUBLIC_ISSUER_URL=http://localhost:8080 - NEXTCLOUD_OIDC_SCOPES=openid profile email notes:read notes:write calendar:read calendar:write contacts:read contacts:write cookbook:read cookbook:write deck:read deck:write tables:read tables:write files:read files:write sharing:read sharing:write todo:read todo:write @@ -104,8 +105,9 @@ services: - TOKEN_ENCRYPTION_KEY=ESF1BvEQdGYsCluwMx9Cxvw3uh5pFowPH7Rg_nIliyo= - TOKEN_STORAGE_DB=/app/data/tokens.db - # ADR-004: Use Hybrid Flow (server intercepts OAuth callback) - # Set to false to enable Hybrid Flow tests - server stores refresh token and issues MCP codes + # ADR-005: Multi-audience mode (default - ENABLE_TOKEN_EXCHANGE=false) + # Tokens must contain BOTH MCP and Nextcloud audiences + # No token exchange needed - tokens work for both MCP auth and Nextcloud APIs # NO admin credentials - using OAuth with Dynamic Client Registration (DCR) # Client credentials registered via RFC 7591 and stored in volume @@ -159,6 +161,7 @@ services: # Nextcloud API endpoint (for accessing APIs with validated token) - NEXTCLOUD_HOST=http://app:80 - NEXTCLOUD_MCP_SERVER_URL=http://localhost:8002 + - NEXTCLOUD_RESOURCE_URI=http://localhost:8080 # ADR-005: Nextcloud resource identifier for audience validation - NEXTCLOUD_PUBLIC_ISSUER_URL=http://localhost:8888/realms/nextcloud-mcp # Refresh token storage (ADR-002 Tier 1 & 2) @@ -166,8 +169,11 @@ services: - TOKEN_ENCRYPTION_KEY=ESF1BvEQdGYsCluwMx9Cxvw3uh5pFowPH7Rg_nIliyo= - TOKEN_STORAGE_DB=/app/data/tokens.db - # Token exchange (RFC 8693) - convert aud:nextcloud-mcp-server → aud:nextcloud + # ADR-005: Token exchange mode (RFC 8693) + # Exchange MCP tokens (aud: nextcloud-mcp-server) for Nextcloud tokens (aud: http://localhost:8080) + # Provides strict audience separation between MCP session and Nextcloud API access - ENABLE_TOKEN_EXCHANGE=true + - TOKEN_EXCHANGE_CACHE_TTL=300 # Cache exchanged tokens for 5 minutes (default) # OAuth scopes (optional - uses defaults if not specified) - NEXTCLOUD_OIDC_SCOPES=openid profile email offline_access notes:read notes:write calendar:read calendar:write contacts:read contacts:write cookbook:read cookbook:write deck:read deck:write tables:read tables:write files:read files:write sharing:read sharing:write todo:read todo:write diff --git a/nextcloud_mcp_server/app.py b/nextcloud_mcp_server/app.py index 227deb3..501e4a0 100644 --- a/nextcloud_mcp_server/app.py +++ b/nextcloud_mcp_server/app.py @@ -27,9 +27,7 @@ from nextcloud_mcp_server.auth import ( has_required_scopes, is_jwt_token, ) -from nextcloud_mcp_server.auth.progressive_token_verifier import ( - ProgressiveConsentTokenVerifier, -) +from nextcloud_mcp_server.auth.unified_verifier import UnifiedTokenVerifier from nextcloud_mcp_server.client import NextcloudClient from nextcloud_mcp_server.config import ( LOGGING_CONFIG, @@ -215,9 +213,7 @@ class OAuthAppContext: """Application context for OAuth mode.""" nextcloud_host: str - token_verifier: ( - object # Can be NextcloudTokenVerifier or ProgressiveConsentTokenVerifier - ) + token_verifier: object # UnifiedTokenVerifier (ADR-005 compliant) refresh_token_storage: Optional["RefreshTokenStorage"] = None oauth_client: Optional[object] = None # NextcloudOAuthClient or KeycloakOAuthClient oauth_provider: str = "nextcloud" # "nextcloud" or "keycloak" @@ -555,46 +551,75 @@ async def setup_oauth_config(): else: client_issuer = issuer - # Progressive Consent mode (always enabled) - dual OAuth flows with audience separation - logger.info("✓ Progressive Consent mode enabled - dual OAuth flows active") + # ADR-005: Unified Token Verifier with proper audience validation + # Get MCP server URL for audience validation + mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000") + nextcloud_resource_uri = os.getenv("NEXTCLOUD_RESOURCE_URI", nextcloud_host) - # Get encryption key for token broker - encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY") - if not encryption_key: + # Warn if resource URIs are not configured (required for ADR-005 compliance) + if not os.getenv("NEXTCLOUD_MCP_SERVER_URL"): logger.warning( - "TOKEN_ENCRYPTION_KEY not set - token broker will not be available" + f"NEXTCLOUD_MCP_SERVER_URL not set, defaulting to: {mcp_server_url}. " + "This should be set explicitly for proper audience validation." + ) + if not os.getenv("NEXTCLOUD_RESOURCE_URI"): + logger.warning( + f"NEXTCLOUD_RESOURCE_URI not set, defaulting to: {nextcloud_resource_uri}. " + "This should be set explicitly for proper audience validation." ) - # Create token broker service - from nextcloud_mcp_server.auth.token_broker import TokenBrokerService + # Create settings for UnifiedTokenVerifier + from nextcloud_mcp_server.config import get_settings - token_broker = None - if encryption_key and refresh_token_storage: - token_broker = TokenBrokerService( - storage=refresh_token_storage, - oidc_discovery_url=discovery_url, - nextcloud_host=nextcloud_host, - encryption_key=encryption_key, + settings = get_settings() + # Override with discovered values if not set in environment + if not settings.jwks_uri: + settings.jwks_uri = jwks_uri + if not settings.introspection_uri: + settings.introspection_uri = introspection_uri + if not settings.userinfo_uri: + settings.userinfo_uri = userinfo_uri + if not settings.oidc_issuer: + settings.oidc_issuer = issuer + if not settings.nextcloud_mcp_server_url: + settings.nextcloud_mcp_server_url = mcp_server_url + if not settings.nextcloud_resource_uri: + settings.nextcloud_resource_uri = nextcloud_resource_uri + + # Create Unified Token Verifier (ADR-005 compliant) + token_verifier = UnifiedTokenVerifier(settings) + + # Log the mode + enable_token_exchange = ( + os.getenv("ENABLE_TOKEN_EXCHANGE", "false").lower() == "true" + ) + if enable_token_exchange: + logger.info( + "✓ Token Exchange mode enabled (ADR-005) - exchanging MCP tokens for Nextcloud tokens via RFC 8693" ) - logger.info("✓ Token Broker service initialized for audience-specific tokens") + logger.info(f" MCP audience: {client_id} or {mcp_server_url}") + logger.info(f" Nextcloud audience: {nextcloud_resource_uri}") + else: + logger.info( + "✓ Multi-audience mode enabled (ADR-005) - tokens must contain both MCP and Nextcloud audiences" + ) + logger.info(f" Required MCP audience: {client_id} or {mcp_server_url}") + logger.info(f" Required Nextcloud audience: {nextcloud_resource_uri}") - # Create Progressive Consent token verifier - token_verifier = ProgressiveConsentTokenVerifier( - token_storage=refresh_token_storage, - token_broker=token_broker, - oidc_discovery_url=discovery_url, - nextcloud_host=nextcloud_host, - encryption_key=encryption_key, - mcp_client_id=client_id, - introspection_uri=introspection_uri, - client_secret=client_secret, - ) - - logger.info( - "✓ Progressive Consent verifier configured - enforcing audience separation" - ) if introspection_uri: logger.info("✓ Opaque token introspection enabled (RFC 7662)") + if jwks_uri: + logger.info("✓ JWT signature verification enabled (JWKS)") + + # Progressive Consent mode (for offline access / background jobs) + encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY") + if enable_offline_access and encryption_key and refresh_token_storage: + logger.info("✓ Progressive Consent mode enabled - offline access available") + + # Note: Token Broker service would be initialized here for background job support + # Currently not used in ADR-005 implementation as it's specific to offline access patterns + # that are separate from the real-time token exchange flow + logger.debug("Token broker available for future offline access features") # Create OAuth client for server-initiated flows (e.g., token exchange, background workers) oauth_client = None diff --git a/nextcloud_mcp_server/auth/__init__.py b/nextcloud_mcp_server/auth/__init__.py index dbb34f5..3795fe5 100644 --- a/nextcloud_mcp_server/auth/__init__.py +++ b/nextcloud_mcp_server/auth/__init__.py @@ -14,11 +14,11 @@ from .scope_authorization import ( is_jwt_token, require_scopes, ) -from .token_verifier import NextcloudTokenVerifier +from .unified_verifier import UnifiedTokenVerifier __all__ = [ "BearerAuth", - "NextcloudTokenVerifier", + "UnifiedTokenVerifier", "register_client", "ensure_oauth_client", "get_client_from_context", diff --git a/nextcloud_mcp_server/auth/context_helper.py b/nextcloud_mcp_server/auth/context_helper.py index a964053..7922237 100644 --- a/nextcloud_mcp_server/auth/context_helper.py +++ b/nextcloud_mcp_server/auth/context_helper.py @@ -1,6 +1,11 @@ -"""Helper functions for extracting OAuth context from MCP requests.""" +"""Helper functions for extracting OAuth context from MCP requests. +ADR-005 compliant implementation with token exchange caching. +""" + +import hashlib import logging +import time from mcp.server.auth.provider import AccessToken from mcp.server.fastmcp import Context @@ -11,35 +16,36 @@ from .token_exchange import exchange_token_for_audience logger = logging.getLogger(__name__) +# Token exchange cache: token_hash -> (exchanged_token, expiry_timestamp) +_exchange_cache: dict[str, tuple[str, float]] = {} + def get_client_from_context(ctx: Context, base_url: str) -> NextcloudClient: """ - Extract authenticated user context from MCP request and create NextcloudClient. + Create NextcloudClient for multi-audience mode (no exchange needed). - This function retrieves the OAuth access token from the MCP context, - extracts the username from the token's resource field (where we stored it - during token verification), and creates a NextcloudClient with bearer auth. + ADR-005 Mode 1: Token already contains both MCP and Nextcloud audiences. + The UnifiedTokenVerifier validated both audiences are present, so we can + use the token directly without exchange. Args: ctx: MCP request context containing session info base_url: Nextcloud base URL Returns: - NextcloudClient configured with bearer token auth + NextcloudClient configured with multi-audience token Raises: AttributeError: If context doesn't contain expected OAuth session data ValueError: If username cannot be extracted from token """ try: - # In Starlette with FastMCP OAuth, the authenticated user info is stored in request.user - # The FastMCP auth middleware sets request.user to an AuthenticatedUser object - # which contains the access_token + # Extract validated access token from MCP context if hasattr(ctx.request_context.request, "user") and hasattr( ctx.request_context.request.user, "access_token" ): access_token: AccessToken = ctx.request_context.request.user.access_token - logger.debug("Retrieved access token from request.user for OAuth request") + logger.debug("Retrieved multi-audience token from request.user") else: logger.error( "OAuth authentication failed: No access token found in request" @@ -47,16 +53,20 @@ def get_client_from_context(ctx: Context, base_url: str) -> NextcloudClient: raise AttributeError("No access token found in OAuth request context") # Extract username from resource field (RFC 8707) - # We stored the username here during token verification + # UnifiedTokenVerifier stored the username here during validation username = access_token.resource if not username: logger.error("No username found in access token resource field") raise ValueError("Username not available in OAuth token context") - logger.debug(f"Creating OAuth NextcloudClient for user: {username}") + logger.debug( + f"Creating NextcloudClient for user {username} with multi-audience token " + f"(no exchange needed)" + ) - # Create client with bearer token + # Token was already validated to have both audiences + # Can use directly without exchange return NextcloudClient.from_token( base_url=base_url, token=access_token.token, username=username ) @@ -71,12 +81,19 @@ async def get_session_client_from_context( ctx: Context, base_url: str ) -> NextcloudClient: """ - Create NextcloudClient using RFC 8693 token exchange for session operations. + Create NextcloudClient using RFC 8693 token exchange with caching. + + ADR-005 Mode 2: Exchange MCP token for Nextcloud token via RFC 8693. This implements the token exchange pattern where: - 1. Extract Flow 1 token from context (aud: "mcp-server") - 2. Exchange it for ephemeral Nextcloud token via RFC 8693 - 3. Create client with delegated token (NOT stored) + 1. Extract MCP token from context (validated by UnifiedTokenVerifier) + 2. Check cache for existing exchanged token + 3. If not cached or expired, exchange via RFC 8693 + 4. Cache the exchanged token to minimize exchange frequency + 5. Create client with exchanged token + + CRITICAL: This is where token exchange happens, NOT in the verifier. + The verifier already validated the MCP audience; now we exchange for Nextcloud. Note: Nextcloud doesn't support OAuth scopes natively. Scopes are enforced by the MCP server via @require_scopes decorator, not by the IdP. Therefore, @@ -88,7 +105,7 @@ async def get_session_client_from_context( base_url: Nextcloud base URL Returns: - NextcloudClient configured with ephemeral delegated token + NextcloudClient configured with ephemeral exchanged token Raises: AttributeError: If context doesn't contain expected OAuth session data @@ -96,43 +113,60 @@ async def get_session_client_from_context( """ settings = get_settings() - # Check if token exchange is enabled - if not settings.enable_token_exchange: - logger.info("Token exchange disabled, falling back to standard OAuth flow") - return get_client_from_context(ctx, base_url) - try: - # Extract Flow 1 token from context + # Extract MCP token from context if hasattr(ctx.request_context.request, "user") and hasattr( ctx.request_context.request.user, "access_token" ): access_token: AccessToken = ctx.request_context.request.user.access_token - flow1_token = access_token.token - username = access_token.resource # Username stored during verification - logger.debug(f"Retrieved Flow 1 token for user: {username}") + mcp_token = access_token.token + username = access_token.resource # Username from UnifiedTokenVerifier + logger.debug(f"Retrieved MCP token for user: {username}") else: - logger.error("No Flow 1 token found in request context") + logger.error("No MCP token found in request context") raise AttributeError("No access token found in OAuth request context") if not username: logger.error("No username found in access token resource field") raise ValueError("Username not available in OAuth token context") - logger.info("Exchanging client token for Nextcloud API token (pure RFC 8693)") + # Check cache for existing exchanged token + cache_key = hashlib.sha256(mcp_token.encode()).hexdigest() + if cache_key in _exchange_cache: + cached_token, expiry = _exchange_cache[cache_key] + if time.time() < expiry: + logger.debug( + f"Using cached exchanged token (expires in {expiry - time.time():.1f}s)" + ) + return NextcloudClient.from_token( + base_url=base_url, token=cached_token, username=username + ) + else: + logger.debug("Cached token expired, removing from cache") + del _exchange_cache[cache_key] - # Perform pure RFC 8693 token exchange (no refresh tokens) - # Note: We don't pass scopes since Nextcloud doesn't enforce them. - # The MCP server's @require_scopes decorator handles authorization. + # Perform RFC 8693 token exchange + logger.info(f"Exchanging MCP token for Nextcloud API token (user: {username})") + + # Exchange for Nextcloud resource URI audience exchanged_token, expires_in = await exchange_token_for_audience( - subject_token=flow1_token, - requested_audience="nextcloud", + subject_token=mcp_token, + requested_audience=settings.nextcloud_resource_uri or "nextcloud", requested_scopes=None, # Nextcloud doesn't support scopes ) - logger.info(f"Pure token exchange successful. Token expires in {expires_in}s") + logger.info(f"Token exchange successful. Token expires in {expires_in}s") + + # Cache the exchanged token + # Use the minimum of exchange TTL and configured cache TTL + cache_ttl = min(expires_in, settings.token_exchange_cache_ttl) + _exchange_cache[cache_key] = (exchanged_token, time.time() + cache_ttl) + logger.debug(f"Cached exchanged token for {cache_ttl}s") + + # Clean up expired cache entries + _cleanup_exchange_cache() # Create client with exchanged token - # This token is ephemeral (per-request) and NOT stored return NextcloudClient.from_token( base_url=base_url, token=exchanged_token, username=username ) @@ -143,3 +177,21 @@ async def get_session_client_from_context( except Exception as e: logger.error(f"Token exchange failed: {e}") raise RuntimeError(f"Token exchange required but failed: {e}") from e + + +def _cleanup_exchange_cache(): + """Remove expired entries from the token exchange cache.""" + global _exchange_cache + now = time.time() + expired_keys = [k for k, (_, expiry) in _exchange_cache.items() if expiry <= now] + for key in expired_keys: + del _exchange_cache[key] + if expired_keys: + logger.debug(f"Cleaned up {len(expired_keys)} expired cache entries") + + +def clear_exchange_cache(): + """Clear the entire token exchange cache. Useful for testing.""" + global _exchange_cache + _exchange_cache.clear() + logger.debug("Token exchange cache cleared") diff --git a/nextcloud_mcp_server/auth/progressive_token_verifier.py b/nextcloud_mcp_server/auth/progressive_token_verifier.py deleted file mode 100644 index a7af61b..0000000 --- a/nextcloud_mcp_server/auth/progressive_token_verifier.py +++ /dev/null @@ -1,366 +0,0 @@ -""" -Token Verifier for ADR-004 Progressive Consent Architecture. - -This module implements token verification with strict audience separation: -- Flow 1 tokens have aud: for MCP authentication -- Flow 2 tokens have aud: "nextcloud" for resource access -- Token Broker manages the exchange between audiences -""" - -import logging -import os -from datetime import datetime, timezone -from typing import Optional - -import httpx -import jwt -from mcp.server.auth.provider import AccessToken - -from nextcloud_mcp_server.auth.refresh_token_storage import RefreshTokenStorage -from nextcloud_mcp_server.auth.token_broker import TokenBrokerService - -logger = logging.getLogger(__name__) - - -class ProgressiveConsentTokenVerifier: - """ - Token verifier for Progressive Consent dual OAuth flows. - - This verifier: - 1. Validates Flow 1 tokens (aud: ) for MCP authentication - 2. Checks if user has provisioned Nextcloud access (Flow 2) - 3. Uses Token Broker to obtain aud: "nextcloud" tokens when needed - """ - - def __init__( - self, - token_storage: RefreshTokenStorage | None, - token_broker: Optional[TokenBrokerService] = None, - oidc_discovery_url: Optional[str] = None, - nextcloud_host: Optional[str] = None, - encryption_key: Optional[str] = None, - mcp_client_id: Optional[str] = None, - introspection_uri: Optional[str] = None, - client_secret: Optional[str] = None, - ): - """ - Initialize the Progressive Consent token verifier. - - Args: - token_storage: Storage for refresh tokens - token_broker: Token broker service (created if not provided) - oidc_discovery_url: OIDC provider discovery URL - nextcloud_host: Nextcloud server URL - encryption_key: Fernet key for token encryption - mcp_client_id: MCP server OAuth client ID for audience validation - introspection_uri: OAuth introspection endpoint URL (for opaque tokens) - client_secret: OAuth client secret (required for introspection) - """ - self.storage = token_storage - self.oidc_discovery_url = oidc_discovery_url or os.getenv( - "OIDC_DISCOVERY_URL", - f"{os.getenv('NEXTCLOUD_HOST')}/.well-known/openid-configuration", - ) - self.nextcloud_host = nextcloud_host or os.getenv("NEXTCLOUD_HOST") - self.encryption_key = encryption_key or os.getenv("TOKEN_ENCRYPTION_KEY") - self.mcp_client_id = mcp_client_id or os.getenv("OIDC_CLIENT_ID") - self.introspection_uri = introspection_uri - self.client_secret = client_secret or os.getenv("OIDC_CLIENT_SECRET") - - # HTTP client for introspection requests - self._http_client: Optional[httpx.AsyncClient] = None - if self.introspection_uri and self.mcp_client_id and self.client_secret: - self._http_client = httpx.AsyncClient(timeout=10.0) - logger.info(f"Introspection support enabled: {introspection_uri}") - elif self.introspection_uri: - logger.warning( - "Introspection URI provided but missing client credentials - introspection disabled" - ) - - # Create token broker if not provided - if token_broker: - self.token_broker = token_broker - elif self.encryption_key and token_storage and self.nextcloud_host: - self.token_broker = TokenBrokerService( - storage=token_storage, - oidc_discovery_url=self.oidc_discovery_url, - nextcloud_host=self.nextcloud_host, - encryption_key=self.encryption_key, - ) - else: - self.token_broker = None - if not self.encryption_key: - logger.warning("Token broker not available - encryption key missing") - elif not token_storage: - logger.warning("Token broker not available - token storage missing") - elif not self.nextcloud_host: - logger.warning("Token broker not available - nextcloud host missing") - - async def verify_token(self, token: str) -> Optional[AccessToken]: - """ - Verify a Flow 1 token (aud: ). - - This validates that: - 1. Token has correct audience for MCP server (matches client ID) - 2. Token is not expired - 3. Token has valid signature (if verification enabled) - - Supports both JWT and opaque tokens: - - JWT tokens: Decoded directly from payload - - Opaque tokens: Validated via introspection endpoint (RFC 7662) - - Args: - token: Access token from Flow 1 (JWT or opaque) - - Returns: - AccessToken if valid, None otherwise - """ - logger.info("🔐 verify_token called - attempting to validate token") - logger.info(f"Token (first 50 chars): {token[:50]}...") - logger.info(f"Expected MCP client ID: {self.mcp_client_id}") - - # Check if token is JWT format (has 3 parts separated by dots) - is_jwt = "." in token and token.count(".") == 2 - logger.info(f"Token format: {'JWT' if is_jwt else 'opaque'}") - - if is_jwt: - # Try JWT verification - return await self._verify_jwt_token(token) - else: - # Fall back to introspection for opaque tokens - return await self._verify_opaque_token(token) - - async def _verify_jwt_token(self, token: str) -> Optional[AccessToken]: - """Verify JWT token by decoding payload.""" - try: - # Decode without signature verification (IdP handles that) - # In production, would verify signature with IdP public key - payload = jwt.decode(token, options={"verify_signature": False}) - logger.info(f"Token payload decoded: {payload}") - - # CRITICAL: Verify audience is for MCP server (Flow 1) - audiences = payload.get("aud", []) - if isinstance(audiences, str): - audiences = [audiences] - - # Audience validation: - # - Accept tokens with no audience (will validate via introspection if needed) - # - Accept tokens with MCP client ID in audience (Keycloak multi-audience) - # - Accept tokens with resource URL in audience (Nextcloud JWT redirect URI) - # - Reject tokens with "nextcloud" audience only (wrong flow) - if audiences: - # Check if MCP client ID is in the audience (Keycloak multi-audience) - if self.mcp_client_id in audiences: - logger.debug( - f"Token has audience {audiences} - MCP client ID present" - ) - # Check if this is a Nextcloud-only token (wrong flow) - elif audiences == ["nextcloud"]: - logger.warning( - f"Token rejected: Nextcloud-only audience {audiences}" - ) - logger.error( - "Received Nextcloud token in MCP context - " - "client may be using wrong token" - ) - return None - # Otherwise accept (likely resource URL audience from Nextcloud JWT) - else: - logger.info( - f"Token has audience {audiences} (resource URL or non-standard) - accepting" - ) - else: - logger.info( - "Token has no audience claim - accepting for MCP server validation" - ) - - # Check expiry - exp = payload.get("exp", 0) - if exp < datetime.now(timezone.utc).timestamp(): - logger.warning( - f"❌ Token expired: exp={exp}, now={datetime.now(timezone.utc).timestamp()}" - ) - return None - - # Extract user info - user_id = payload.get("sub", "unknown") - client_id = payload.get("client_id", "unknown") - scopes = payload.get("scope", "").split() - exp = payload.get("exp", None) - - logger.info( - f"✅ Token validation successful! user={user_id}, scopes={scopes}" - ) - - # Create AccessToken for MCP framework - return AccessToken( - token=token, - client_id=client_id, - scopes=scopes, - expires_at=exp, - resource=user_id, # Store user_id in resource field (RFC 8707) - ) - - except jwt.InvalidTokenError as e: - logger.warning(f"❌ Invalid token (JWT decode failed): {e}") - return None - except Exception as e: - logger.error(f"❌ Token verification failed with exception: {e}") - return None - - async def _verify_opaque_token(self, token: str) -> Optional[AccessToken]: - """ - Verify opaque token via introspection endpoint (RFC 7662). - - Args: - token: Opaque access token - - Returns: - AccessToken if active and valid, None otherwise - """ - if not self._http_client or not self.introspection_uri: - logger.error( - "❌ Cannot verify opaque token - introspection not configured. " - "Set introspection_uri and client credentials." - ) - return None - - try: - logger.info(f"Introspecting token at {self.introspection_uri}") - - # Call introspection endpoint (requires client authentication) - response = await self._http_client.post( - self.introspection_uri, - data={"token": token}, - auth=(self.mcp_client_id, self.client_secret), - ) - - if response.status_code != 200: - logger.warning( - f"❌ Introspection failed: HTTP {response.status_code} - {response.text[:200]}" - ) - return None - - introspection_data = response.json() - logger.info(f"Introspection response: {introspection_data}") - - # Check if token is active - if not introspection_data.get("active", False): - logger.warning("❌ Token introspection returned active=false") - return None - - # Extract user info - user_id = introspection_data.get("sub") or introspection_data.get( - "username" - ) - if not user_id: - logger.error("❌ No username found in introspection response") - return None - - # Extract scopes (space-separated string) - scope_string = introspection_data.get("scope", "") - scopes = scope_string.split() if scope_string else [] - - # Extract client ID and expiration - client_id = introspection_data.get("client_id", "unknown") - exp = introspection_data.get("exp") - - logger.info(f"✅ Opaque token validated! user={user_id}, scopes={scopes}") - - return AccessToken( - token=token, - client_id=client_id, - scopes=scopes, - expires_at=int(exp) if exp else None, - resource=user_id, - ) - - except httpx.TimeoutException: - logger.error("❌ Timeout while introspecting token") - return None - except httpx.RequestError as e: - logger.error(f"❌ Network error during introspection: {e}") - return None - except Exception as e: - logger.error(f"❌ Introspection failed with exception: {e}") - return None - - async def check_provisioning(self, user_id: str) -> bool: - """ - Check if user has provisioned Nextcloud access (Flow 2). - - Args: - user_id: User identifier from Flow 1 token - - Returns: - True if user has completed Flow 2, False otherwise - """ - if not self.storage: - return False - - refresh_data = await self.storage.get_refresh_token(user_id) - return refresh_data is not None - - async def get_nextcloud_token(self, user_id: str) -> Optional[str]: - """ - Get a Nextcloud access token (aud: "nextcloud") for the user. - - This uses the Token Broker to: - 1. Check for cached Nextcloud token - 2. If expired, refresh using stored master refresh token - 3. Return token with aud: "nextcloud" for API access - - Args: - user_id: User identifier from Flow 1 token - - Returns: - Nextcloud access token if provisioned, None otherwise - """ - if not self.token_broker: - logger.error("Token broker not available") - return None - - # Check if user has provisioned access - if not await self.check_provisioning(user_id): - logger.info(f"User {user_id} has not provisioned Nextcloud access") - return None - - # Get or refresh Nextcloud token - try: - nextcloud_token = await self.token_broker.get_nextcloud_token(user_id) - if nextcloud_token: - logger.debug(f"Obtained Nextcloud token for user {user_id}") - return nextcloud_token - except Exception as e: - logger.error(f"Failed to get Nextcloud token: {e}") - return None - - async def validate_scopes( - self, token: AccessToken, required_scopes: list[str] - ) -> bool: - """ - Validate that token has required scopes. - - Args: - token: The access token - required_scopes: List of required scopes - - Returns: - True if all required scopes present, False otherwise - """ - token_scopes = set(token.scopes) if token.scopes else set() - required = set(required_scopes) - - missing = required - token_scopes - if missing: - logger.debug(f"Token missing required scopes: {missing}") - return False - - return True - - async def close(self): - """Clean up resources.""" - if self.token_broker: - await self.token_broker.close() - if self._http_client: - await self._http_client.aclose() diff --git a/nextcloud_mcp_server/auth/token_verifier.py b/nextcloud_mcp_server/auth/token_verifier.py deleted file mode 100644 index cc94eeb..0000000 --- a/nextcloud_mcp_server/auth/token_verifier.py +++ /dev/null @@ -1,492 +0,0 @@ -"""Token verification using Nextcloud OIDC userinfo endpoint.""" - -import logging -import time -from typing import Any - -import httpx -import jwt -from jwt import PyJWKClient -from mcp.server.auth.provider import AccessToken, TokenVerifier - -logger = logging.getLogger(__name__) - - -class NextcloudTokenVerifier(TokenVerifier): - """ - Validates access tokens using JWT verification with JWKS or userinfo endpoint fallback. - - This verifier supports both JWT and opaque tokens: - 1. For JWT tokens: Verifies signature with JWKS and extracts scopes from payload - 2. For opaque tokens: Falls back to userinfo endpoint validation - 3. Caches successful responses to avoid repeated API calls/verifications - - JWT validation provides: - - Faster validation (no HTTP call needed) - - Direct scope extraction from token payload - - Signature verification using JWKS - - Userinfo fallback provides: - - Support for opaque tokens - - Backward compatibility - - Additional validation layer - """ - - def __init__( - self, - nextcloud_host: str, - userinfo_uri: str, - jwks_uri: str | None = None, - issuer: str | None = None, - introspection_uri: str | None = None, - client_id: str | None = None, - client_secret: str | None = None, - cache_ttl: int = 3600, - ): - """ - Initialize the token verifier. - - Args: - nextcloud_host: Base URL of the Nextcloud instance (e.g., https://cloud.example.com) - userinfo_uri: Full URL to the userinfo endpoint - jwks_uri: Full URL to the JWKS endpoint (for JWT verification) - issuer: Expected issuer claim value (for JWT verification) - introspection_uri: Full URL to the introspection endpoint (for opaque tokens) - client_id: OAuth client ID (required for introspection) - client_secret: OAuth client secret (required for introspection) - cache_ttl: Time-to-live for cached tokens in seconds (default: 3600) - """ - self.nextcloud_host = nextcloud_host.rstrip("/") - self.userinfo_uri = userinfo_uri - self.jwks_uri = jwks_uri - self.issuer = issuer - self.introspection_uri = introspection_uri - self.client_id = client_id - self.client_secret = client_secret - self.cache_ttl = cache_ttl - - # Cache: token -> (userinfo, expiry_timestamp) - self._token_cache: dict[str, tuple[dict[str, Any], float]] = {} - - # HTTP client for userinfo/introspection requests - self._client = httpx.AsyncClient(timeout=10.0) - - # PyJWKClient for JWT verification (lazy initialization) - self._jwks_client: PyJWKClient | None = None - if jwks_uri: - logger.info(f"JWT verification enabled with JWKS URI: {jwks_uri}") - self._jwks_client = PyJWKClient(jwks_uri, cache_keys=True) - - # Introspection support - if introspection_uri and client_id and client_secret: - logger.info(f"Token introspection enabled: {introspection_uri}") - elif introspection_uri: - logger.warning( - "Introspection URI provided but missing client credentials - introspection disabled" - ) - - async def verify_token(self, token: str) -> AccessToken | None: - """ - Verify a bearer token using JWT verification, introspection, or userinfo endpoint. - - This method: - 1. Checks the cache first for recent validations - 2. Attempts JWT verification if JWKS is configured and token looks like JWT - 3. Falls back to introspection for opaque tokens (if configured) - 4. Falls back to userinfo endpoint as last resort - 5. Returns AccessToken with username and scopes - - Args: - token: The bearer token to verify - - Returns: - AccessToken if valid, None if invalid or expired - """ - # Check cache first - cached = self._get_cached_token(token) - if cached: - logger.debug("Token found in cache") - return cached - - # Try JWT verification first if enabled and token looks like JWT - is_jwt_format = self._is_jwt_format(token) - logger.debug( - f"Token format check: is_jwt_format={is_jwt_format}, _jwks_client={self._jwks_client is not None}" - ) - if self._jwks_client and is_jwt_format: - logger.debug("Attempting JWT verification...") - jwt_result = self._verify_jwt(token) - if jwt_result: - logger.info("Token validated via JWT verification") - return jwt_result - else: - logger.warning("JWT verification failed, will try other methods") - - # For opaque tokens, try introspection if available - if self.introspection_uri and self.client_id and self.client_secret: - logger.debug("Attempting token introspection...") - try: - introspection_result = await self._verify_via_introspection(token) - if introspection_result: - logger.info("Token validated via introspection") - return introspection_result - except Exception as e: - logger.warning(f"Introspection failed: {e}") - - # Fall back to userinfo endpoint validation (last resort) - logger.debug("Attempting userinfo endpoint validation...") - try: - return await self._verify_via_userinfo(token) - except Exception as e: - logger.warning(f"Token verification failed: {e}") - return None - - def _is_jwt_format(self, token: str) -> bool: - """ - Check if token looks like a JWT (has 3 parts separated by dots). - - Args: - token: The token to check - - Returns: - True if token appears to be JWT format - """ - return "." in token and token.count(".") == 2 - - def _verify_jwt(self, token: str) -> AccessToken | None: - """ - Verify JWT token with signature validation using JWKS. - - Args: - token: The JWT token to verify - - Returns: - AccessToken if valid, None if invalid - """ - try: - # Get signing key from JWKS - assert self._jwks_client is not None # Caller should check before calling - signing_key = self._jwks_client.get_signing_key_from_jwt(token) - - # Verify and decode JWT - # Accept tokens with audience: "mcp-server" or ["mcp-server", "nextcloud"] - # This allows: - # 1. Tokens from MCP clients (aud: "mcp-server") - # 2. Tokens for Nextcloud APIs (aud: "nextcloud") - # 3. Tokens for both (aud: ["mcp-server", "nextcloud"]) - payload = jwt.decode( - token, - signing_key.key, - algorithms=["RS256"], - issuer=self.issuer, - audience=["mcp-server", "nextcloud"], # Accept either audience - options={ - "verify_signature": True, - "verify_exp": True, - "verify_iat": True, - "verify_iss": True if self.issuer else False, - "verify_aud": True, # Enable audience validation - }, - ) - - logger.debug(f"JWT verified successfully for user: {payload.get('sub')}") - logger.debug(f"Full JWT payload: {payload}") - - # Extract username (sub claim, with fallback to preferred_username) - # Some OIDC providers (like Keycloak) may not include sub in access tokens - username = payload.get("sub") or payload.get("preferred_username") - if not username: - logger.error( - "No 'sub' or 'preferred_username' claim found in JWT payload" - ) - return None - - # Extract scopes from scope claim (space-separated string) - scope_string = payload.get("scope", "") - scopes = scope_string.split() if scope_string else [] - logger.debug( - f"Extracted scopes from JWT - scope claim: '{scope_string}' -> scopes list: {scopes}" - ) - - # Extract expiration - exp = payload.get("exp") - if not exp: - logger.warning("No 'exp' claim in JWT, using default TTL") - exp = int(time.time() + self.cache_ttl) - - # Cache the result - userinfo = { - "sub": username, - "scope": scope_string, - **{k: v for k, v in payload.items() if k not in ["sub", "scope"]}, - } - self._token_cache[token] = (userinfo, exp) - - return AccessToken( - token=token, - client_id=payload.get("client_id", ""), - scopes=scopes, - expires_at=exp, - resource=username, # Store username in resource field (RFC 8707) - ) - - except jwt.ExpiredSignatureError: - logger.info("JWT token has expired") - return None - except jwt.InvalidIssuerError as e: - logger.warning(f"JWT issuer validation failed: {e}") - return None - except jwt.InvalidTokenError as e: - logger.warning(f"JWT validation failed: {e}") - return None - except Exception as e: - logger.error(f"Unexpected error during JWT verification: {e}") - return None - - async def _verify_via_introspection(self, token: str) -> AccessToken | None: - """ - Validate token by calling the introspection endpoint (RFC 7662). - - This method validates opaque tokens and retrieves their scopes. - - Args: - token: The bearer token to introspect - - Returns: - AccessToken if active, None if inactive or invalid - """ - try: - # Introspection requires client authentication - response = await self._client.post( - self.introspection_uri, # type: ignore - data={"token": token}, - auth=(self.client_id, self.client_secret), - ) - - if response.status_code == 200: - introspection_data = response.json() - - # Check if token is active - if not introspection_data.get("active", False): - logger.info("Token introspection returned inactive=false") - return None - - logger.debug( - f"Token introspected successfully for user: {introspection_data.get('sub')}" - ) - - # Extract username - username = introspection_data.get("sub") or introspection_data.get( - "username" - ) - if not username: - logger.error("No username found in introspection response") - return None - - # Extract scopes (space-separated string) - scope_string = introspection_data.get("scope", "") - scopes = scope_string.split() if scope_string else [] - logger.debug(f"Extracted scopes from introspection: {scopes}") - - # Extract expiration - exp = introspection_data.get("exp") - if exp: - expiry = float(exp) - else: - logger.warning( - "No 'exp' in introspection response, using default TTL" - ) - expiry = time.time() + self.cache_ttl - - # Cache the result - cache_data = { - "sub": username, - "scope": scope_string, - **{ - k: v - for k, v in introspection_data.items() - if k not in ["sub", "scope", "active"] - }, - } - self._token_cache[token] = (cache_data, expiry) - - return AccessToken( - token=token, - client_id=introspection_data.get("client_id", ""), - scopes=scopes, - expires_at=int(expiry), - resource=username, - ) - - elif response.status_code in (400, 401, 403): - logger.warning( - f"Token introspection failed: HTTP {response.status_code}. " - f"This may indicate: (1) Client credentials mismatch - trying to introspect " - f"token issued to different OAuth client, (2) Expired client credentials, " - f"(3) Invalid token. Will fall back to userinfo endpoint. " - f"Response: {response.text[:200] if response.text else 'empty'}" - ) - return None - else: - logger.warning( - f"Unexpected response from introspection: {response.status_code}. " - f"Response: {response.text[:200] if response.text else 'empty'}" - ) - return None - - except httpx.TimeoutException: - logger.error("Timeout while introspecting token") - return None - except httpx.RequestError as e: - logger.error(f"Network error while introspecting token: {e}") - return None - except Exception as e: - logger.error(f"Unexpected error during token introspection: {e}") - return None - - async def _verify_via_userinfo(self, token: str) -> AccessToken | None: - """ - Validate token by calling the userinfo endpoint. - - Args: - token: The bearer token to verify - - Returns: - AccessToken if valid, None otherwise - """ - try: - response = await self._client.get( - self.userinfo_uri, headers={"Authorization": f"Bearer {token}"} - ) - - if response.status_code == 200: - userinfo = response.json() - logger.debug( - f"Token validated successfully for user: {userinfo.get('sub')}" - ) - - # Cache the result - expiry = time.time() + self.cache_ttl - self._token_cache[token] = (userinfo, expiry) - - # Create AccessToken with username in resource field (workaround for MCP SDK) - username = userinfo.get("sub") or userinfo.get("preferred_username") - if not username: - logger.error("No username found in userinfo response") - return None - - return AccessToken( - token=token, - client_id="", # Not available from userinfo - scopes=self._extract_scopes(userinfo), - expires_at=int(expiry), - resource=username, # Store username in resource field (RFC 8707) - ) - - elif response.status_code in (400, 401, 403): - logger.info(f"Token validation failed: HTTP {response.status_code}") - return None - else: - logger.warning( - f"Unexpected response from userinfo: {response.status_code}" - ) - return None - - except httpx.TimeoutException: - logger.error("Timeout while validating token via userinfo endpoint") - return None - except httpx.RequestError as e: - logger.error(f"Network error while validating token: {e}") - return None - except Exception as e: - logger.error(f"Unexpected error during token validation: {e}") - return None - - def _get_cached_token(self, token: str) -> AccessToken | None: - """ - Retrieve a token from cache if not expired. - - Args: - token: The bearer token to look up - - Returns: - AccessToken if cached and valid, None otherwise - """ - if token not in self._token_cache: - return None - - userinfo, expiry = self._token_cache[token] - - # Check if expired - if time.time() >= expiry: - logger.debug("Cached token expired, removing from cache") - del self._token_cache[token] - return None - - # Return cached AccessToken - username = userinfo.get("sub") or userinfo.get("preferred_username") - return AccessToken( - token=token, - client_id="", - scopes=self._extract_scopes(userinfo), - expires_at=int(expiry), - resource=username, - ) - - def _extract_scopes(self, userinfo: dict[str, Any]) -> list[str]: - """ - Extract scopes from userinfo response. - - First attempts to read actual scopes from the 'scope' field (RFC 8693). - If not present, infers scopes from the claims present in the response. - - Args: - userinfo: The userinfo response dictionary - - Returns: - List of scopes (actual or inferred) - """ - # Try to get actual scopes from userinfo response (if OIDC provider includes it) - scope_string = userinfo.get("scope") - if scope_string: - scopes = scope_string.split() if isinstance(scope_string, str) else [] - if scopes: - logger.debug( - f"Using actual scopes from userinfo: {scopes} (scope field present)" - ) - return scopes - - # Fallback: Infer scopes from claims present in response - # This maintains backward compatibility with OIDC providers that don't - # include the scope field in userinfo responses - logger.debug( - "No scope field in userinfo response, inferring scopes from claims" - ) - scopes = ["openid"] # Always present - - if "email" in userinfo: - scopes.append("email") - - if any( - key in userinfo for key in ["name", "given_name", "family_name", "picture"] - ): - scopes.append("profile") - - if "roles" in userinfo: - scopes.append("roles") - - if "groups" in userinfo: - scopes.append("groups") - - logger.debug(f"Inferred scopes from userinfo claims: {scopes}") - return scopes - - def clear_cache(self): - """Clear the token cache.""" - self._token_cache.clear() - logger.debug("Token cache cleared") - - async def close(self): - """Cleanup resources.""" - await self._client.aclose() - logger.debug("Token verifier closed") diff --git a/nextcloud_mcp_server/auth/unified_verifier.py b/nextcloud_mcp_server/auth/unified_verifier.py new file mode 100644 index 0000000..32d57dd --- /dev/null +++ b/nextcloud_mcp_server/auth/unified_verifier.py @@ -0,0 +1,475 @@ +""" +Unified Token Verifier for ADR-005 Token Audience Validation. + +This module replaces both NextcloudTokenVerifier and ProgressiveConsentTokenVerifier +with a single implementation that supports two compliant OAuth modes: + +1. Multi-audience mode (default): Tokens must contain BOTH MCP and Nextcloud audiences +2. Token exchange mode (opt-in): Tokens have MCP audience only, exchanged for Nextcloud tokens + +Key Design Principles: +- Token verification happens HERE (validates audiences) +- Token exchange happens in context_helper.py (when creating NextcloudClient) +- No token passthrough allowed (complies with MCP Security Specification) +""" + +import hashlib +import logging +import time +from typing import Any + +import httpx +import jwt +from jwt import PyJWKClient +from mcp.server.auth.provider import AccessToken, TokenVerifier + +from nextcloud_mcp_server.config import Settings + +logger = logging.getLogger(__name__) + + +class UnifiedTokenVerifier(TokenVerifier): + """ + Unified token verifier supporting both multi-audience and token exchange modes. + Compliant with MCP security specification - no token pass-through. + + This verifier: + 1. Validates tokens using JWT verification with JWKS or introspection fallback + 2. Enforces proper audience validation based on configured mode + 3. Caches successful validations to avoid repeated API calls + + Mode Selection (via ENABLE_TOKEN_EXCHANGE setting): + - False/omit (default): Multi-audience mode - requires BOTH MCP and Nextcloud audiences + - True: Exchange mode - requires MCP audience only (exchange happens later) + """ + + def __init__(self, settings: Settings): + """ + Initialize the unified token verifier. + + Args: + settings: Application settings containing OAuth configuration + """ + self.settings = settings + self.mode = "exchange" if settings.enable_token_exchange else "multi-audience" + + # Common components for all modes + self.http_client = httpx.AsyncClient(timeout=10.0) + + # JWT verification support + self.jwks_client: PyJWKClient | None = None + if hasattr(settings, "jwks_uri") and settings.jwks_uri: + logger.info(f"JWT verification enabled with JWKS URI: {settings.jwks_uri}") + self.jwks_client = PyJWKClient(settings.jwks_uri, cache_keys=True) + + # Introspection support (for opaque tokens) + self.introspection_uri: str | None = None + if ( + hasattr(settings, "introspection_uri") + and settings.introspection_uri + and settings.oidc_client_id + and settings.oidc_client_secret + ): + self.introspection_uri = settings.introspection_uri + logger.info(f"Token introspection enabled: {self.introspection_uri}") + + # Token cache: token_hash -> (userinfo, expiry_timestamp) + self._token_cache: dict[str, tuple[dict[str, Any], float]] = {} + self.cache_ttl = 3600 # 1 hour default + + logger.info( + f"UnifiedTokenVerifier initialized in {self.mode} mode. " + f"MCP audience: {settings.oidc_client_id} or {settings.nextcloud_mcp_server_url}, " + f"Nextcloud resource URI: {settings.nextcloud_resource_uri}" + ) + + async def verify_token(self, token: str) -> AccessToken | None: + """ + Verify token according to MCP TokenVerifier protocol. + + CRITICAL: This method only validates tokens - it does NOT perform exchange. + Token exchange happens later in context_helper.py when creating NextcloudClient. + + Multi-audience mode: Validates token has BOTH MCP and Nextcloud audiences + Exchange mode: Validates token has MCP audience ONLY (exchange happens later) + + Args: + token: Bearer token to verify + + Returns: + AccessToken if valid, None if invalid or expired + """ + # Check cache first + cached = self._get_cached_token(token) + if cached: + logger.debug("Token found in cache") + return cached + + # Verify based on mode + if self.mode == "multi-audience": + return await self._verify_multi_audience_token(token) + else: + # Exchange mode: Only validate MCP audience here + # Actual exchange happens in context_helper.py + return await self._verify_mcp_audience_only(token) + + async def _verify_multi_audience_token(self, token: str) -> AccessToken | None: + """ + Validate token has both MCP and Nextcloud audiences (Mode 1). + Token can be used directly without exchange. + + Args: + token: Bearer token to verify + + Returns: + AccessToken if valid with both audiences, None otherwise + """ + try: + # Attempt JWT verification first + if self._is_jwt_format(token) and self.jwks_client: + payload = await self._verify_jwt_signature(token) + else: + # Fall back to introspection for opaque tokens + payload = await self._introspect_token(token) + if not payload: + return None + + # Check payload is valid + if not payload: + return None + + # Validate both audiences are present + if not self._validate_multi_audience(payload): + audiences = payload.get("aud", []) + logger.error( + f"Token rejected: Missing required audiences. " + f"Got {audiences}, need both MCP ({self.settings.oidc_client_id} or " + f"{self.settings.nextcloud_mcp_server_url}) AND Nextcloud " + f"({self.settings.nextcloud_resource_uri})" + ) + return None + + logger.info( + "Multi-audience validation passed - token has both MCP and Nextcloud audiences" + ) + return self._create_access_token(token, payload) + + except Exception as e: + logger.error(f"Multi-audience validation failed: {e}") + return None + + async def _verify_mcp_audience_only(self, token: str) -> AccessToken | None: + """ + Validate token has MCP audience only (Mode 2). + Token will be exchanged later in context_helper.py. + + Args: + token: Bearer token to verify + + Returns: + AccessToken if valid with MCP audience, None otherwise + """ + try: + # Attempt JWT verification first + if self._is_jwt_format(token) and self.jwks_client: + payload = await self._verify_jwt_signature(token) + else: + # Fall back to introspection for opaque tokens + payload = await self._introspect_token(token) + if not payload: + return None + + # Check payload is valid + if not payload: + return None + + # Only validate MCP audience (exchange will handle Nextcloud) + if not self._has_mcp_audience(payload): + audiences = payload.get("aud", []) + logger.error( + f"Token rejected: Missing MCP audience. " + f"Got {audiences}, need {self.settings.oidc_client_id} " + f"or {self.settings.nextcloud_mcp_server_url}" + ) + return None + + logger.info( + "MCP audience validation passed - token will be exchanged for Nextcloud access" + ) + return self._create_access_token(token, payload) + + except Exception as e: + logger.error(f"MCP audience validation failed: {e}") + return None + + def _validate_multi_audience(self, payload: dict[str, Any]) -> bool: + """ + Check if token has both MCP and Nextcloud audiences. + + Args: + payload: Decoded token payload + + Returns: + True if both audiences present, False otherwise + """ + audiences = payload.get("aud", []) + if isinstance(audiences, str): + audiences = [audiences] + + audiences_set = set(audiences) + + # MCP must have at least one: client_id OR server_url + mcp_valid = self.settings.oidc_client_id in audiences_set or ( + self.settings.nextcloud_mcp_server_url + and self.settings.nextcloud_mcp_server_url in audiences_set + ) + + # Nextcloud must have its resource URI + nextcloud_valid = bool( + self.settings.nextcloud_resource_uri + and self.settings.nextcloud_resource_uri in audiences_set + ) + + return bool(mcp_valid and nextcloud_valid) + + def _has_mcp_audience(self, payload: dict[str, Any]) -> bool: + """ + Check if token has MCP audience (for exchange mode). + + Args: + payload: Decoded token payload + + Returns: + True if MCP audience present, False otherwise + """ + audiences = payload.get("aud", []) + if isinstance(audiences, str): + audiences = [audiences] + + audiences_set = set(audiences) + return bool( + self.settings.oidc_client_id in audiences_set + or ( + self.settings.nextcloud_mcp_server_url + and self.settings.nextcloud_mcp_server_url in audiences_set + ) + ) + + def _is_jwt_format(self, token: str) -> bool: + """ + Check if token looks like a JWT (has 3 parts separated by dots). + + Args: + token: The token to check + + Returns: + True if token appears to be JWT format + """ + return "." in token and token.count(".") == 2 + + async def _verify_jwt_signature(self, token: str) -> dict[str, Any] | None: + """ + Verify JWT token with signature validation using JWKS. + + Args: + token: JWT token to verify + + Returns: + Decoded payload if valid, None if invalid + """ + try: + assert self.jwks_client is not None # Caller should check before calling + + # Get signing key from JWKS + signing_key = self.jwks_client.get_signing_key_from_jwt(token) + + # Verify and decode JWT + # Note: We don't validate audience here - that's done separately based on mode + payload = jwt.decode( + token, + signing_key.key, + algorithms=["RS256"], + issuer=self.settings.oidc_issuer + if hasattr(self.settings, "oidc_issuer") + else None, + options={ + "verify_signature": True, + "verify_exp": True, + "verify_iat": True, + "verify_iss": True + if hasattr(self.settings, "oidc_issuer") + and self.settings.oidc_issuer + else False, + "verify_aud": False, # We handle audience validation separately + }, + ) + + logger.debug(f"JWT signature verified for user: {payload.get('sub')}") + return payload + + except jwt.ExpiredSignatureError: + logger.info("JWT token has expired") + return None + except jwt.InvalidIssuerError as e: + logger.warning(f"JWT issuer validation failed: {e}") + return None + except jwt.InvalidTokenError as e: + logger.warning(f"JWT validation failed: {e}") + return None + except Exception as e: + logger.error(f"Unexpected error during JWT verification: {e}") + return None + + async def _introspect_token(self, token: str) -> dict[str, Any] | None: + """ + Validate token by calling the introspection endpoint (RFC 7662). + + Args: + token: Bearer token to introspect + + Returns: + Token payload if active, None if inactive or invalid + """ + if not self.introspection_uri: + logger.debug("No introspection endpoint configured") + return None + + try: + # Introspection requires client authentication + response = await self.http_client.post( + self.introspection_uri, + data={"token": token}, + auth=(self.settings.oidc_client_id, self.settings.oidc_client_secret), + ) + + if response.status_code == 200: + introspection_data = response.json() + + # Check if token is active + if not introspection_data.get("active", False): + logger.info("Token introspection returned inactive=false") + return None + + logger.debug( + f"Token introspected successfully for user: {introspection_data.get('sub')}" + ) + return introspection_data + + elif response.status_code in (400, 401, 403): + logger.warning( + f"Token introspection failed: HTTP {response.status_code}. " + f"Response: {response.text[:200] if response.text else 'empty'}" + ) + return None + else: + logger.warning( + f"Unexpected response from introspection: {response.status_code}. " + f"Response: {response.text[:200] if response.text else 'empty'}" + ) + return None + + except httpx.TimeoutException: + logger.error("Timeout while introspecting token") + return None + except httpx.RequestError as e: + logger.error(f"Network error while introspecting token: {e}") + return None + except Exception as e: + logger.error(f"Unexpected error during token introspection: {e}") + return None + + def _create_access_token( + self, token: str, payload: dict[str, Any] + ) -> AccessToken | None: + """ + Create AccessToken object from validated token payload. + + Args: + token: The bearer token + payload: Validated token payload + + Returns: + AccessToken object or None if required fields missing + """ + # Extract username (sub claim, with fallback to preferred_username) + username = payload.get("sub") or payload.get("preferred_username") + if not username: + logger.error( + "No 'sub' or 'preferred_username' claim found in token payload" + ) + return None + + # Extract scopes from scope claim (space-separated string) + scope_string = payload.get("scope", "") + scopes = scope_string.split() if scope_string else [] + logger.debug( + f"Extracted scopes from token - scope claim: '{scope_string}' -> scopes list: {scopes}" + ) + + # Extract expiration + exp = payload.get("exp") + if not exp: + logger.warning("No 'exp' claim in token, using default TTL") + exp = int(time.time() + self.cache_ttl) + + # Cache the result + token_hash = hashlib.sha256(token.encode()).hexdigest() + userinfo = { + "sub": username, + "scope": scope_string, + **{k: v for k, v in payload.items() if k not in ["sub", "scope"]}, + } + self._token_cache[token_hash] = (userinfo, exp) + + return AccessToken( + token=token, + client_id=payload.get("client_id", ""), + scopes=scopes, + expires_at=exp, + resource=username, # Store username in resource field (RFC 8707) + ) + + def _get_cached_token(self, token: str) -> AccessToken | None: + """ + Retrieve a token from cache if not expired. + + Args: + token: The bearer token to look up + + Returns: + AccessToken if cached and valid, None otherwise + """ + token_hash = hashlib.sha256(token.encode()).hexdigest() + if token_hash not in self._token_cache: + return None + + userinfo, expiry = self._token_cache[token_hash] + + # Check if expired + if time.time() >= expiry: + logger.debug("Cached token expired, removing from cache") + del self._token_cache[token_hash] + return None + + # Return cached AccessToken + username = userinfo.get("sub") or userinfo.get("preferred_username") + scope_string = userinfo.get("scope", "") + scopes = scope_string.split() if scope_string else [] + + return AccessToken( + token=token, + client_id=userinfo.get("client_id", ""), + scopes=scopes, + expires_at=int(expiry), + resource=username, + ) + + def clear_cache(self): + """Clear the token cache.""" + self._token_cache.clear() + logger.debug("Token cache cleared") + + async def close(self): + """Cleanup resources.""" + await self.http_client.aclose() + logger.debug("Unified token verifier closed") diff --git a/nextcloud_mcp_server/config.py b/nextcloud_mcp_server/config.py index e7a36ff..73d86e4 100644 --- a/nextcloud_mcp_server/config.py +++ b/nextcloud_mcp_server/config.py @@ -129,16 +129,29 @@ class Settings: oidc_discovery_url: Optional[str] = None oidc_client_id: Optional[str] = None oidc_client_secret: Optional[str] = None + oidc_issuer: Optional[str] = None # Nextcloud settings nextcloud_host: Optional[str] = None nextcloud_username: Optional[str] = None nextcloud_password: Optional[str] = None + # ADR-005: Token Audience Validation (required for OAuth mode) + nextcloud_mcp_server_url: Optional[str] = None # MCP server URL (used as audience) + nextcloud_resource_uri: Optional[str] = None # Nextcloud resource identifier + + # Token verification endpoints + jwks_uri: Optional[str] = None + introspection_uri: Optional[str] = None + userinfo_uri: Optional[str] = None + # Progressive Consent settings (always enabled - no flag needed) enable_token_exchange: bool = False enable_offline_access: bool = False + # Token exchange cache settings + token_exchange_cache_ttl: int = 300 # seconds (5 minutes default) + # Token settings token_encryption_key: Optional[str] = None token_storage_db: Optional[str] = None @@ -155,10 +168,18 @@ def get_settings() -> Settings: oidc_discovery_url=os.getenv("OIDC_DISCOVERY_URL"), oidc_client_id=os.getenv("OIDC_CLIENT_ID"), oidc_client_secret=os.getenv("OIDC_CLIENT_SECRET"), + oidc_issuer=os.getenv("OIDC_ISSUER"), # Nextcloud settings nextcloud_host=os.getenv("NEXTCLOUD_HOST"), nextcloud_username=os.getenv("NEXTCLOUD_USERNAME"), nextcloud_password=os.getenv("NEXTCLOUD_PASSWORD"), + # ADR-005: Token Audience Validation + nextcloud_mcp_server_url=os.getenv("NEXTCLOUD_MCP_SERVER_URL"), + nextcloud_resource_uri=os.getenv("NEXTCLOUD_RESOURCE_URI"), + # Token verification endpoints + jwks_uri=os.getenv("JWKS_URI"), + introspection_uri=os.getenv("INTROSPECTION_URI"), + userinfo_uri=os.getenv("USERINFO_URI"), # Progressive Consent settings (always enabled) enable_token_exchange=( os.getenv("ENABLE_TOKEN_EXCHANGE", "false").lower() == "true" @@ -166,6 +187,8 @@ def get_settings() -> Settings: enable_offline_access=( os.getenv("ENABLE_OFFLINE_ACCESS", "false").lower() == "true" ), + # Token exchange cache settings + token_exchange_cache_ttl=int(os.getenv("TOKEN_EXCHANGE_CACHE_TTL", "300")), # Token settings token_encryption_key=os.getenv("TOKEN_ENCRYPTION_KEY"), token_storage_db=os.getenv("TOKEN_STORAGE_DB", "/tmp/tokens.db"), diff --git a/nextcloud_mcp_server/context.py b/nextcloud_mcp_server/context.py index c568e29..f3e86d6 100644 --- a/nextcloud_mcp_server/context.py +++ b/nextcloud_mcp_server/context.py @@ -10,12 +10,15 @@ async def get_client(ctx: Context) -> NextcloudClient: """ Get the appropriate Nextcloud client based on authentication mode. - This function handles three modes: + ADR-005 compliant implementation supporting two modes: 1. BasicAuth mode: Returns shared client from lifespan context - 2. OAuth pass-through mode (ENABLE_TOKEN_EXCHANGE=false, default): - Verifies Flow 1 token and passes it to Nextcloud - 3. OAuth token exchange mode (ENABLE_TOKEN_EXCHANGE=true): - Exchanges Flow 1 token for ephemeral Nextcloud token via RFC 8693 + 2. Multi-audience mode (ENABLE_TOKEN_EXCHANGE=false, default): + Token already contains both MCP and Nextcloud audiences - use directly + 3. Token exchange mode (ENABLE_TOKEN_EXCHANGE=true): + Exchange MCP token for Nextcloud token via RFC 8693 + + SECURITY: Token passthrough has been REMOVED. All OAuth modes validate + proper token audiences per MCP Security Best Practices specification. Note: Nextcloud doesn't support OAuth scopes natively. Scopes are enforced by the MCP server via @require_scopes decorator, not by the IdP. @@ -49,20 +52,21 @@ async def get_client(ctx: Context) -> NextcloudClient: # OAuth mode (has 'nextcloud_host' attribute) if hasattr(lifespan_ctx, "nextcloud_host"): - # Check if token exchange is enabled - if settings.enable_token_exchange: - from nextcloud_mcp_server.auth.context_helper import ( - get_session_client_from_context, - ) + from nextcloud_mcp_server.auth.context_helper import ( + get_client_from_context, + get_session_client_from_context, + ) - # Token exchange mode: Exchange Flow 1 token for ephemeral Nextcloud token + if settings.enable_token_exchange: + # Mode 2: Exchange MCP token for Nextcloud token + # Token was validated to have MCP audience in UnifiedTokenVerifier + # Now exchange it for Nextcloud audience return await get_session_client_from_context( ctx, lifespan_ctx.nextcloud_host ) else: - # Pass-through mode (default): Verify and pass Flow 1 token to Nextcloud - from nextcloud_mcp_server.auth import get_client_from_context - + # Mode 1: Multi-audience token - use directly + # Token was validated to have BOTH audiences in UnifiedTokenVerifier return get_client_from_context(ctx, lifespan_ctx.nextcloud_host) # Unknown context type diff --git a/tests/unit/test_unified_verifier.py b/tests/unit/test_unified_verifier.py new file mode 100644 index 0000000..cb1c639 --- /dev/null +++ b/tests/unit/test_unified_verifier.py @@ -0,0 +1,518 @@ +""" +Unit tests for UnifiedTokenVerifier (ADR-005). + +Tests token audience validation for both multi-audience and token exchange modes +without requiring real network calls or IdP connections. +""" + +import time +from unittest.mock import AsyncMock, MagicMock, patch + +import jwt +import pytest + +from nextcloud_mcp_server.auth.unified_verifier import UnifiedTokenVerifier +from nextcloud_mcp_server.config import Settings + +pytestmark = pytest.mark.unit + + +@pytest.fixture +def base_settings(): + """Create base settings for testing.""" + return Settings( + oidc_client_id="test-client-id", + oidc_client_secret="test-client-secret", + oidc_issuer="https://idp.example.com", + nextcloud_host="https://nextcloud.example.com", + nextcloud_mcp_server_url="http://localhost:8000", + nextcloud_resource_uri="http://localhost:8080", + jwks_uri="https://idp.example.com/jwks", + introspection_uri="https://idp.example.com/introspect", + enable_token_exchange=False, # Multi-audience mode + token_exchange_cache_ttl=300, + ) + + +@pytest.fixture +def exchange_settings(base_settings): + """Create settings for token exchange mode.""" + base_settings.enable_token_exchange = True + return base_settings + + +class TestUnifiedTokenVerifierInit: + """Test UnifiedTokenVerifier initialization.""" + + def test_init_multi_audience_mode(self, base_settings): + """Test verifier initialization in multi-audience mode.""" + verifier = UnifiedTokenVerifier(base_settings) + assert verifier.mode == "multi-audience" + assert verifier.settings == base_settings + + def test_init_exchange_mode(self, exchange_settings): + """Test verifier initialization in token exchange mode.""" + verifier = UnifiedTokenVerifier(exchange_settings) + assert verifier.mode == "exchange" + assert verifier.settings == exchange_settings + + +class TestAudienceValidation: + """Test audience validation logic.""" + + def test_validate_multi_audience_both_present(self, base_settings): + """Test multi-audience validation with both audiences present.""" + verifier = UnifiedTokenVerifier(base_settings) + payload = { + "aud": ["test-client-id", "http://localhost:8080"], + "sub": "testuser", + "exp": int(time.time() + 3600), + } + + assert verifier._validate_multi_audience(payload) is True + + def test_validate_multi_audience_server_url_and_resource(self, base_settings): + """Test multi-audience validation with server URL instead of client ID.""" + verifier = UnifiedTokenVerifier(base_settings) + payload = { + "aud": ["http://localhost:8000", "http://localhost:8080"], + "sub": "testuser", + "exp": int(time.time() + 3600), + } + + assert verifier._validate_multi_audience(payload) is True + + def test_validate_multi_audience_missing_mcp(self, base_settings): + """Test multi-audience validation fails without MCP audience.""" + verifier = UnifiedTokenVerifier(base_settings) + payload = { + "aud": ["http://localhost:8080"], # Only Nextcloud + "sub": "testuser", + "exp": int(time.time() + 3600), + } + + assert verifier._validate_multi_audience(payload) is False + + def test_validate_multi_audience_missing_nextcloud(self, base_settings): + """Test multi-audience validation fails without Nextcloud audience.""" + verifier = UnifiedTokenVerifier(base_settings) + payload = { + "aud": ["test-client-id"], # Only MCP + "sub": "testuser", + "exp": int(time.time() + 3600), + } + + assert verifier._validate_multi_audience(payload) is False + + def test_validate_multi_audience_string_audience(self, base_settings): + """Test multi-audience validation with string audience (should still work).""" + verifier = UnifiedTokenVerifier(base_settings) + payload = { + "aud": "test-client-id", # Single audience as string + "sub": "testuser", + "exp": int(time.time() + 3600), + } + + # Should fail - needs both audiences + assert verifier._validate_multi_audience(payload) is False + + def test_has_mcp_audience_with_client_id(self, exchange_settings): + """Test MCP audience validation with client ID.""" + verifier = UnifiedTokenVerifier(exchange_settings) + payload = { + "aud": ["test-client-id"], + "sub": "testuser", + "exp": int(time.time() + 3600), + } + + assert verifier._has_mcp_audience(payload) is True + + def test_has_mcp_audience_with_server_url(self, exchange_settings): + """Test MCP audience validation with server URL.""" + verifier = UnifiedTokenVerifier(exchange_settings) + payload = { + "aud": ["http://localhost:8000"], + "sub": "testuser", + "exp": int(time.time() + 3600), + } + + assert verifier._has_mcp_audience(payload) is True + + def test_has_mcp_audience_missing(self, exchange_settings): + """Test MCP audience validation fails without MCP audience.""" + verifier = UnifiedTokenVerifier(exchange_settings) + payload = { + "aud": ["http://localhost:8080"], # Wrong audience + "sub": "testuser", + "exp": int(time.time() + 3600), + } + + assert verifier._has_mcp_audience(payload) is False + + +class TestTokenFormatDetection: + """Test JWT format detection.""" + + def test_is_jwt_format_valid(self, base_settings): + """Test JWT format detection with valid JWT.""" + verifier = UnifiedTokenVerifier(base_settings) + jwt_token = "eyJhbGc.eyJzdWI.signature" + assert verifier._is_jwt_format(jwt_token) is True + + def test_is_jwt_format_opaque(self, base_settings): + """Test JWT format detection with opaque token.""" + verifier = UnifiedTokenVerifier(base_settings) + opaque_token = "opaque-token-12345" + assert verifier._is_jwt_format(opaque_token) is False + + +class TestTokenCaching: + """Test token caching functionality.""" + + async def test_cache_stores_and_retrieves(self, base_settings): + """Test token caching stores and retrieves tokens.""" + verifier = UnifiedTokenVerifier(base_settings) + + # Create a valid access token + payload = { + "aud": ["test-client-id", "http://localhost:8080"], + "sub": "testuser", + "scope": "openid profile", + "exp": int(time.time() + 3600), + "client_id": "test-client-id", + } + test_token = jwt.encode(payload, "secret", algorithm="HS256") + + # Create AccessToken and cache it + access_token = verifier._create_access_token(test_token, payload) + assert access_token is not None + + # Should retrieve from cache + cached = verifier._get_cached_token(test_token) + assert cached is not None + assert cached.resource == "testuser" + assert cached.scopes == ["openid", "profile"] + + async def test_cache_respects_expiry(self, base_settings): + """Test that expired tokens are not returned from cache.""" + verifier = UnifiedTokenVerifier(base_settings) + + # Create expired token payload + payload = { + "aud": ["test-client-id", "http://localhost:8080"], + "sub": "testuser", + "scope": "openid profile", + "exp": int(time.time() - 100), # Expired 100 seconds ago + "client_id": "test-client-id", + } + test_token = jwt.encode(payload, "secret", algorithm="HS256") + + # Create and cache + access_token = verifier._create_access_token(test_token, payload) + assert access_token is not None + + # Should not retrieve expired token + cached = verifier._get_cached_token(test_token) + assert cached is None + + async def test_cache_clear(self, base_settings): + """Test cache clearing.""" + verifier = UnifiedTokenVerifier(base_settings) + + # Create and cache token + payload = { + "aud": ["test-client-id", "http://localhost:8080"], + "sub": "testuser", + "exp": int(time.time() + 3600), + } + test_token = jwt.encode(payload, "secret", algorithm="HS256") + verifier._create_access_token(test_token, payload) + + # Clear cache + verifier.clear_cache() + + # Should not retrieve after clear + cached = verifier._get_cached_token(test_token) + assert cached is None + + +class TestMultiAudienceVerification: + """Test multi-audience token verification.""" + + async def test_verify_multi_audience_with_introspection(self, base_settings): + """Test multi-audience verification using introspection.""" + verifier = UnifiedTokenVerifier(base_settings) + + # Mock introspection response + introspection_response = { + "active": True, + "sub": "testuser", + "aud": ["test-client-id", "http://localhost:8080"], + "scope": "openid profile", + "exp": int(time.time() + 3600), + "client_id": "test-client-id", + } + + with patch.object( + verifier, "_introspect_token", return_value=introspection_response + ): + opaque_token = "opaque-token-12345" + result = await verifier._verify_multi_audience_token(opaque_token) + + assert result is not None + assert result.resource == "testuser" + assert result.scopes == ["openid", "profile"] + + async def test_verify_multi_audience_fails_without_both_audiences( + self, base_settings + ): + """Test multi-audience verification fails without both audiences.""" + verifier = UnifiedTokenVerifier(base_settings) + + # Mock introspection response with only one audience + introspection_response = { + "active": True, + "sub": "testuser", + "aud": ["test-client-id"], # Missing Nextcloud audience + "scope": "openid profile", + "exp": int(time.time() + 3600), + } + + with patch.object( + verifier, "_introspect_token", return_value=introspection_response + ): + opaque_token = "opaque-token-12345" + result = await verifier._verify_multi_audience_token(opaque_token) + + assert result is None + + +class TestExchangeModeVerification: + """Test token exchange mode verification.""" + + async def test_verify_mcp_audience_only_success(self, exchange_settings): + """Test MCP-only audience verification succeeds with MCP audience.""" + verifier = UnifiedTokenVerifier(exchange_settings) + + # Mock introspection response with MCP audience only + introspection_response = { + "active": True, + "sub": "testuser", + "aud": ["test-client-id"], + "scope": "openid profile", + "exp": int(time.time() + 3600), + "client_id": "test-client-id", + } + + with patch.object( + verifier, "_introspect_token", return_value=introspection_response + ): + opaque_token = "opaque-token-12345" + result = await verifier._verify_mcp_audience_only(opaque_token) + + assert result is not None + assert result.resource == "testuser" + + async def test_verify_mcp_audience_only_fails_without_mcp(self, exchange_settings): + """Test MCP-only audience verification fails without MCP audience.""" + verifier = UnifiedTokenVerifier(exchange_settings) + + # Mock introspection response without MCP audience + introspection_response = { + "active": True, + "sub": "testuser", + "aud": ["http://localhost:8080"], # Wrong audience + "scope": "openid profile", + "exp": int(time.time() + 3600), + } + + with patch.object( + verifier, "_introspect_token", return_value=introspection_response + ): + opaque_token = "opaque-token-12345" + result = await verifier._verify_mcp_audience_only(opaque_token) + + assert result is None + + +class TestIntrospection: + """Test token introspection.""" + + async def test_introspect_active_token(self, base_settings): + """Test introspection of active token.""" + verifier = UnifiedTokenVerifier(base_settings) + + # Mock HTTP response + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "active": True, + "sub": "testuser", + "aud": ["test-client-id", "http://localhost:8080"], + "scope": "openid profile", + "exp": int(time.time() + 3600), + "client_id": "test-client-id", + } + + verifier.http_client.post = AsyncMock(return_value=mock_response) + + result = await verifier._introspect_token("test-token") + assert result is not None + assert result["active"] is True + assert result["sub"] == "testuser" + + async def test_introspect_inactive_token(self, base_settings): + """Test introspection of inactive token.""" + verifier = UnifiedTokenVerifier(base_settings) + + # Mock HTTP response + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"active": False} + + verifier.http_client.post = AsyncMock(return_value=mock_response) + + result = await verifier._introspect_token("test-token") + assert result is None + + async def test_introspect_without_endpoint(self, base_settings): + """Test introspection when endpoint not configured.""" + base_settings.introspection_uri = None + verifier = UnifiedTokenVerifier(base_settings) + + result = await verifier._introspect_token("test-token") + assert result is None + + +class TestAccessTokenCreation: + """Test AccessToken object creation.""" + + def test_create_access_token_success(self, base_settings): + """Test successful AccessToken creation.""" + verifier = UnifiedTokenVerifier(base_settings) + + payload = { + "sub": "testuser", + "scope": "openid profile email", + "exp": int(time.time() + 3600), + "client_id": "test-client-id", + } + token = "test-token-123" + + result = verifier._create_access_token(token, payload) + assert result is not None + assert result.token == token + assert result.resource == "testuser" + assert result.scopes == ["openid", "profile", "email"] + assert result.client_id == "test-client-id" + + def test_create_access_token_with_preferred_username(self, base_settings): + """Test AccessToken creation with preferred_username fallback.""" + verifier = UnifiedTokenVerifier(base_settings) + + payload = { + "preferred_username": "testuser", # No 'sub' claim + "scope": "openid profile", + "exp": int(time.time() + 3600), + } + token = "test-token-123" + + result = verifier._create_access_token(token, payload) + assert result is not None + assert result.resource == "testuser" + + def test_create_access_token_no_username(self, base_settings): + """Test AccessToken creation fails without username.""" + verifier = UnifiedTokenVerifier(base_settings) + + payload = { + # No sub or preferred_username + "scope": "openid profile", + "exp": int(time.time() + 3600), + } + token = "test-token-123" + + result = verifier._create_access_token(token, payload) + assert result is None + + def test_create_access_token_no_expiry(self, base_settings): + """Test AccessToken creation uses default TTL without expiry.""" + verifier = UnifiedTokenVerifier(base_settings) + + payload = { + "sub": "testuser", + "scope": "openid profile", + # No exp claim + } + token = "test-token-123" + + result = verifier._create_access_token(token, payload) + assert result is not None + # Should have set a default expiry + assert result.expires_at > int(time.time()) + + +class TestVerifyTokenFlow: + """Test complete verify_token flow.""" + + async def test_verify_token_from_cache(self, base_settings): + """Test verify_token returns cached token.""" + verifier = UnifiedTokenVerifier(base_settings) + + payload = { + "aud": ["test-client-id", "http://localhost:8080"], + "sub": "testuser", + "scope": "openid profile", + "exp": int(time.time() + 3600), + } + token = jwt.encode(payload, "secret", algorithm="HS256") + + # First call - should cache + result1 = verifier._create_access_token(token, payload) + assert result1 is not None + + # Mock _verify_multi_audience_token to ensure it's not called + with patch.object(verifier, "_verify_multi_audience_token") as mock_verify: + result2 = await verifier.verify_token(token) + assert result2 is not None + assert result2.resource == "testuser" + # Should not call verification since it's cached + mock_verify.assert_not_called() + + async def test_verify_token_multi_audience_mode(self, base_settings): + """Test verify_token in multi-audience mode.""" + verifier = UnifiedTokenVerifier(base_settings) + + introspection_response = { + "active": True, + "sub": "testuser", + "aud": ["test-client-id", "http://localhost:8080"], + "scope": "openid profile", + "exp": int(time.time() + 3600), + } + + with patch.object( + verifier, "_introspect_token", return_value=introspection_response + ): + result = await verifier.verify_token("opaque-token") + assert result is not None + assert result.resource == "testuser" + + async def test_verify_token_exchange_mode(self, exchange_settings): + """Test verify_token in exchange mode.""" + verifier = UnifiedTokenVerifier(exchange_settings) + + introspection_response = { + "active": True, + "sub": "testuser", + "aud": ["test-client-id"], # MCP audience only + "scope": "openid profile", + "exp": int(time.time() + 3600), + } + + with patch.object( + verifier, "_introspect_token", return_value=introspection_response + ): + result = await verifier.verify_token("opaque-token") + assert result is not None + assert result.resource == "testuser"