refactor: Eliminate duplicate validation logic in UnifiedTokenVerifier

Since both multi-audience and exchange modes now validate the same thing
(MCP audience only per RFC 7519), consolidated the duplicate methods:

- Removed duplicate verification methods (_verify_multi_audience_token
  and _verify_mcp_audience_only)
- Created single _verify_mcp_audience() method for all validation
- Removed duplicate helper (_validate_multi_audience), kept _has_mcp_audience
- Mode only affects logging and what happens AFTER verification

The mode distinction is now purely about post-verification behavior:
- Multi-audience mode: Use token directly (Nextcloud validates its own)
- Exchange mode: Exchange for Nextcloud-audience token via RFC 8693

This makes the code cleaner and clearer about what's actually happening -
both modes do identical validation, they just differ in how the validated
token is used.

All tests pass: unit (65), OAuth integration confirmed working.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-11-05 21:58:52 +01:00
parent 7d9ab5559c
commit bdb1ba2051
2 changed files with 42 additions and 112 deletions
+24 -94
View File
@@ -90,17 +90,16 @@ class UnifiedTokenVerifier(TokenVerifier):
"""
Verify token according to MCP TokenVerifier protocol.
CRITICAL: This method only validates tokens - it does NOT perform exchange.
Token exchange happens later in context_helper.py when creating NextcloudClient.
Multi-audience mode: Validates token has MCP audience (per RFC 7519)
Exchange mode: Validates token has MCP audience ONLY (exchange happens later)
Per RFC 7519, we validate only MCP audience. The mode determines what
happens AFTER verification in context_helper.py:
- Multi-audience mode: Use token directly (Nextcloud validates its own audience)
- Exchange mode: Exchange for Nextcloud-audience token via RFC 8693
Args:
token: Bearer token to verify
Returns:
AccessToken if valid, None if invalid or expired
AccessToken if valid with MCP audience, None otherwise
"""
# Check cache first
cached = self._get_cached_token(token)
@@ -108,21 +107,16 @@ class UnifiedTokenVerifier(TokenVerifier):
logger.debug("Token found in cache")
return cached
# Verify based on mode
if self.mode == "multi-audience":
return await self._verify_multi_audience_token(token)
else:
# Exchange mode: Only validate MCP audience here
# Actual exchange happens in context_helper.py
return await self._verify_mcp_audience_only(token)
# Both modes do the same validation (MCP audience only)
return await self._verify_mcp_audience(token)
async def _verify_multi_audience_token(self, token: str) -> AccessToken | None:
async def _verify_mcp_audience(self, token: str) -> AccessToken | None:
"""
Validate token has MCP audience (Mode 1).
Token can be used directly without exchange.
Validate token has MCP audience.
Per RFC 7519, we only validate our own (MCP) audience. Nextcloud will
independently validate its own audience when it receives the token.
Per RFC 7519 Section 4.1.3, resource servers validate only their own
presence in the audience claim. We don't validate Nextcloud's audience -
that's Nextcloud's responsibility when it receives the token.
Args:
token: Bearer token to verify
@@ -145,7 +139,7 @@ class UnifiedTokenVerifier(TokenVerifier):
return None
# Validate MCP audience is present
if not self._validate_multi_audience(payload):
if not self._has_mcp_audience(payload):
audiences = payload.get("aud", [])
logger.error(
f"Token rejected: Missing MCP audience. "
@@ -154,60 +148,24 @@ class UnifiedTokenVerifier(TokenVerifier):
)
return None
logger.info(
"MCP audience validation passed - token authorized for MCP server"
)
return self._create_access_token(token, payload)
except Exception as e:
logger.error(f"Multi-audience validation failed: {e}")
return None
async def _verify_mcp_audience_only(self, token: str) -> AccessToken | None:
"""
Validate token has MCP audience only (Mode 2).
Token will be exchanged later in context_helper.py.
Args:
token: Bearer token to verify
Returns:
AccessToken if valid with MCP audience, None otherwise
"""
try:
# Attempt JWT verification first
if self._is_jwt_format(token) and self.jwks_client:
payload = await self._verify_jwt_signature(token)
else:
# Fall back to introspection for opaque tokens
payload = await self._introspect_token(token)
if not payload:
return None
# Check payload is valid
if not payload:
return None
# Only validate MCP audience (exchange will handle Nextcloud)
if not self._has_mcp_audience(payload):
audiences = payload.get("aud", [])
logger.error(
f"Token rejected: Missing MCP audience. "
f"Got {audiences}, need {self.settings.oidc_client_id} "
f"or {self.settings.nextcloud_mcp_server_url}"
# Log based on mode for clarity
if self.mode == "multi-audience":
logger.info(
"MCP audience validated - token can be used directly "
"(Nextcloud will validate its own audience)"
)
else:
logger.info(
"MCP audience validated - token will be exchanged for Nextcloud access"
)
return None
logger.info(
"MCP audience validation passed - token will be exchanged for Nextcloud access"
)
return self._create_access_token(token, payload)
except Exception as e:
logger.error(f"MCP audience validation failed: {e}")
logger.error(f"Token verification failed: {e}")
return None
def _validate_multi_audience(self, payload: dict[str, Any]) -> bool:
def _has_mcp_audience(self, payload: dict[str, Any]) -> bool:
"""
Check if token has MCP audience.
@@ -215,9 +173,6 @@ class UnifiedTokenVerifier(TokenVerifier):
presence in the audience claim. We don't validate Nextcloud's audience - that's
Nextcloud's responsibility when it receives the token.
This is NOT token passthrough (we validate the token). This IS token reuse
which is allowed by RFC 8707 for multi-audience tokens between trusted services.
Args:
payload: Decoded token payload
@@ -231,31 +186,6 @@ class UnifiedTokenVerifier(TokenVerifier):
audiences_set = set(audiences)
# MCP must have at least one: client_id OR server_url OR server_url/mcp
mcp_valid = self.settings.oidc_client_id in audiences_set or (
self.settings.nextcloud_mcp_server_url
and (
self.settings.nextcloud_mcp_server_url in audiences_set
or f"{self.settings.nextcloud_mcp_server_url}/mcp" in audiences_set
)
)
return bool(mcp_valid)
def _has_mcp_audience(self, payload: dict[str, Any]) -> bool:
"""
Check if token has MCP audience (for exchange mode).
Args:
payload: Decoded token payload
Returns:
True if MCP audience present, False otherwise
"""
audiences = payload.get("aud", [])
if isinstance(audiences, str):
audiences = [audiences]
audiences_set = set(audiences)
return bool(
self.settings.oidc_client_id in audiences_set
or (