diff --git a/nextcloud_mcp_server/auth/unified_verifier.py b/nextcloud_mcp_server/auth/unified_verifier.py index e3412c6..bd7e99b 100644 --- a/nextcloud_mcp_server/auth/unified_verifier.py +++ b/nextcloud_mcp_server/auth/unified_verifier.py @@ -90,17 +90,16 @@ class UnifiedTokenVerifier(TokenVerifier): """ Verify token according to MCP TokenVerifier protocol. - CRITICAL: This method only validates tokens - it does NOT perform exchange. - Token exchange happens later in context_helper.py when creating NextcloudClient. - - Multi-audience mode: Validates token has MCP audience (per RFC 7519) - Exchange mode: Validates token has MCP audience ONLY (exchange happens later) + Per RFC 7519, we validate only MCP audience. The mode determines what + happens AFTER verification in context_helper.py: + - Multi-audience mode: Use token directly (Nextcloud validates its own audience) + - Exchange mode: Exchange for Nextcloud-audience token via RFC 8693 Args: token: Bearer token to verify Returns: - AccessToken if valid, None if invalid or expired + AccessToken if valid with MCP audience, None otherwise """ # Check cache first cached = self._get_cached_token(token) @@ -108,21 +107,16 @@ class UnifiedTokenVerifier(TokenVerifier): logger.debug("Token found in cache") return cached - # Verify based on mode - if self.mode == "multi-audience": - return await self._verify_multi_audience_token(token) - else: - # Exchange mode: Only validate MCP audience here - # Actual exchange happens in context_helper.py - return await self._verify_mcp_audience_only(token) + # Both modes do the same validation (MCP audience only) + return await self._verify_mcp_audience(token) - async def _verify_multi_audience_token(self, token: str) -> AccessToken | None: + async def _verify_mcp_audience(self, token: str) -> AccessToken | None: """ - Validate token has MCP audience (Mode 1). - Token can be used directly without exchange. + Validate token has MCP audience. - Per RFC 7519, we only validate our own (MCP) audience. Nextcloud will - independently validate its own audience when it receives the token. + Per RFC 7519 Section 4.1.3, resource servers validate only their own + presence in the audience claim. We don't validate Nextcloud's audience - + that's Nextcloud's responsibility when it receives the token. Args: token: Bearer token to verify @@ -145,7 +139,7 @@ class UnifiedTokenVerifier(TokenVerifier): return None # Validate MCP audience is present - if not self._validate_multi_audience(payload): + if not self._has_mcp_audience(payload): audiences = payload.get("aud", []) logger.error( f"Token rejected: Missing MCP audience. " @@ -154,60 +148,24 @@ class UnifiedTokenVerifier(TokenVerifier): ) return None - logger.info( - "MCP audience validation passed - token authorized for MCP server" - ) - return self._create_access_token(token, payload) - - except Exception as e: - logger.error(f"Multi-audience validation failed: {e}") - return None - - async def _verify_mcp_audience_only(self, token: str) -> AccessToken | None: - """ - Validate token has MCP audience only (Mode 2). - Token will be exchanged later in context_helper.py. - - Args: - token: Bearer token to verify - - Returns: - AccessToken if valid with MCP audience, None otherwise - """ - try: - # Attempt JWT verification first - if self._is_jwt_format(token) and self.jwks_client: - payload = await self._verify_jwt_signature(token) - else: - # Fall back to introspection for opaque tokens - payload = await self._introspect_token(token) - if not payload: - return None - - # Check payload is valid - if not payload: - return None - - # Only validate MCP audience (exchange will handle Nextcloud) - if not self._has_mcp_audience(payload): - audiences = payload.get("aud", []) - logger.error( - f"Token rejected: Missing MCP audience. " - f"Got {audiences}, need {self.settings.oidc_client_id} " - f"or {self.settings.nextcloud_mcp_server_url}" + # Log based on mode for clarity + if self.mode == "multi-audience": + logger.info( + "MCP audience validated - token can be used directly " + "(Nextcloud will validate its own audience)" + ) + else: + logger.info( + "MCP audience validated - token will be exchanged for Nextcloud access" ) - return None - logger.info( - "MCP audience validation passed - token will be exchanged for Nextcloud access" - ) return self._create_access_token(token, payload) except Exception as e: - logger.error(f"MCP audience validation failed: {e}") + logger.error(f"Token verification failed: {e}") return None - def _validate_multi_audience(self, payload: dict[str, Any]) -> bool: + def _has_mcp_audience(self, payload: dict[str, Any]) -> bool: """ Check if token has MCP audience. @@ -215,9 +173,6 @@ class UnifiedTokenVerifier(TokenVerifier): presence in the audience claim. We don't validate Nextcloud's audience - that's Nextcloud's responsibility when it receives the token. - This is NOT token passthrough (we validate the token). This IS token reuse - which is allowed by RFC 8707 for multi-audience tokens between trusted services. - Args: payload: Decoded token payload @@ -231,31 +186,6 @@ class UnifiedTokenVerifier(TokenVerifier): audiences_set = set(audiences) # MCP must have at least one: client_id OR server_url OR server_url/mcp - mcp_valid = self.settings.oidc_client_id in audiences_set or ( - self.settings.nextcloud_mcp_server_url - and ( - self.settings.nextcloud_mcp_server_url in audiences_set - or f"{self.settings.nextcloud_mcp_server_url}/mcp" in audiences_set - ) - ) - - return bool(mcp_valid) - - def _has_mcp_audience(self, payload: dict[str, Any]) -> bool: - """ - Check if token has MCP audience (for exchange mode). - - Args: - payload: Decoded token payload - - Returns: - True if MCP audience present, False otherwise - """ - audiences = payload.get("aud", []) - if isinstance(audiences, str): - audiences = [audiences] - - audiences_set = set(audiences) return bool( self.settings.oidc_client_id in audiences_set or ( diff --git a/tests/unit/test_unified_verifier.py b/tests/unit/test_unified_verifier.py index 08d2ec8..491ee0f 100644 --- a/tests/unit/test_unified_verifier.py +++ b/tests/unit/test_unified_verifier.py @@ -61,7 +61,7 @@ class TestAudienceValidation: """Test audience validation logic.""" def test_validate_multi_audience_both_present(self, base_settings): - """Test multi-audience validation with both audiences present.""" + """Test MCP audience validation with both audiences present.""" verifier = UnifiedTokenVerifier(base_settings) payload = { "aud": ["test-client-id", "http://localhost:8080"], @@ -69,10 +69,10 @@ class TestAudienceValidation: "exp": int(time.time() + 3600), } - assert verifier._validate_multi_audience(payload) is True + assert verifier._has_mcp_audience(payload) is True def test_validate_multi_audience_server_url_and_resource(self, base_settings): - """Test multi-audience validation with server URL instead of client ID.""" + """Test MCP audience validation with server URL instead of client ID.""" verifier = UnifiedTokenVerifier(base_settings) payload = { "aud": ["http://localhost:8000", "http://localhost:8080"], @@ -80,10 +80,10 @@ class TestAudienceValidation: "exp": int(time.time() + 3600), } - assert verifier._validate_multi_audience(payload) is True + assert verifier._has_mcp_audience(payload) is True def test_validate_multi_audience_missing_mcp(self, base_settings): - """Test multi-audience validation fails without MCP audience.""" + """Test MCP audience validation fails without MCP audience.""" verifier = UnifiedTokenVerifier(base_settings) payload = { "aud": ["http://localhost:8080"], # Only Nextcloud @@ -91,10 +91,10 @@ class TestAudienceValidation: "exp": int(time.time() + 3600), } - assert verifier._validate_multi_audience(payload) is False + assert verifier._has_mcp_audience(payload) is False def test_validate_multi_audience_missing_nextcloud(self, base_settings): - """Test multi-audience validation succeeds with only MCP audience (RFC 7519 compliant).""" + """Test MCP audience validation succeeds with only MCP audience (RFC 7519 compliant).""" verifier = UnifiedTokenVerifier(base_settings) payload = { "aud": ["test-client-id"], # Only MCP @@ -103,10 +103,10 @@ class TestAudienceValidation: } # Per RFC 7519, we only validate MCP audience. Nextcloud validates its own. - assert verifier._validate_multi_audience(payload) is True + assert verifier._has_mcp_audience(payload) is True def test_validate_multi_audience_string_audience(self, base_settings): - """Test multi-audience validation with string audience works (RFC 7519 compliant).""" + """Test MCP audience validation with string audience works (RFC 7519 compliant).""" verifier = UnifiedTokenVerifier(base_settings) payload = { "aud": "test-client-id", # Single audience as string @@ -115,7 +115,7 @@ class TestAudienceValidation: } # Should pass - we only validate MCP audience per RFC 7519 - assert verifier._validate_multi_audience(payload) is True + assert verifier._has_mcp_audience(payload) is True def test_has_mcp_audience_with_client_id(self, exchange_settings): """Test MCP audience validation with client ID.""" @@ -258,7 +258,7 @@ class TestMultiAudienceVerification: verifier, "_introspect_token", return_value=introspection_response ): opaque_token = "opaque-token-12345" - result = await verifier._verify_multi_audience_token(opaque_token) + result = await verifier._verify_mcp_audience(opaque_token) assert result is not None assert result.resource == "testuser" @@ -267,7 +267,7 @@ class TestMultiAudienceVerification: async def test_verify_multi_audience_fails_without_both_audiences( self, base_settings ): - """Test multi-audience verification succeeds with only MCP audience (RFC 7519 compliant).""" + """Test MCP audience verification succeeds with only MCP audience (RFC 7519 compliant).""" verifier = UnifiedTokenVerifier(base_settings) # Mock introspection response with only MCP audience @@ -285,7 +285,7 @@ class TestMultiAudienceVerification: verifier, "_introspect_token", return_value=introspection_response ): opaque_token = "opaque-token-12345" - result = await verifier._verify_multi_audience_token(opaque_token) + result = await verifier._verify_mcp_audience(opaque_token) # Should succeed with only MCP audience per RFC 7519 assert result is not None @@ -313,13 +313,13 @@ class TestExchangeModeVerification: verifier, "_introspect_token", return_value=introspection_response ): opaque_token = "opaque-token-12345" - result = await verifier._verify_mcp_audience_only(opaque_token) + result = await verifier._verify_mcp_audience(opaque_token) assert result is not None assert result.resource == "testuser" async def test_verify_mcp_audience_only_fails_without_mcp(self, exchange_settings): - """Test MCP-only audience verification fails without MCP audience.""" + """Test MCP audience verification fails without MCP audience.""" verifier = UnifiedTokenVerifier(exchange_settings) # Mock introspection response without MCP audience @@ -335,7 +335,7 @@ class TestExchangeModeVerification: verifier, "_introspect_token", return_value=introspection_response ): opaque_token = "opaque-token-12345" - result = await verifier._verify_mcp_audience_only(opaque_token) + result = await verifier._verify_mcp_audience(opaque_token) assert result is None @@ -476,8 +476,8 @@ class TestVerifyTokenFlow: result1 = verifier._create_access_token(token, payload) assert result1 is not None - # Mock _verify_multi_audience_token to ensure it's not called - with patch.object(verifier, "_verify_multi_audience_token") as mock_verify: + # Mock _verify_mcp_audience to ensure it's not called + with patch.object(verifier, "_verify_mcp_audience") as mock_verify: result2 = await verifier.verify_token(token) assert result2 is not None assert result2.resource == "testuser"