f43343356e
- Add 429 retry with exponential backoff to register_client() (fixes CI oauth matrix failures from parallel DCR requests) - Make client_id, redirect_uri, and PKCE mandatory at token endpoint - Add null-checks for discovery_url and OAuth credentials in proxy flows - Add OIDC discovery document caching with 5-min TTL - Add per-IP rate limiting on /oauth/register DCR proxy - Discover DCR endpoint from OIDC discovery instead of hardcoding - Extract extract_user_id_from_token to auth/token_utils.py (breaks circular imports between server/ and auth/ layers) - Add TTL scope cache in scope_authorization.py (avoids DB hit per tool) - Add defense-in-depth scope validation in storage layer - Broaden elicitation exception handling with graceful fallback - Add idempotentHint to nc_auth_check_status, return "pending" status after accepted elicitation, add polling interval to description - Change ALL_SUPPORTED_SCOPES from tuple to frozenset for O(1) lookups - Replace Optional[str] with str | None throughout config.py - Use default_factory for ProxyCodeEntry/ASProxySession dataclasses - Add proxy code/session cleanup to background loop - Fix OIDC verification CI step to only run for oauth/login-flow modes - Add unit tests for access.py REST endpoints (10 tests) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
86 lines
3.0 KiB
Python
86 lines
3.0 KiB
Python
"""Token utility functions for extracting user identity from MCP access tokens.
|
|
|
|
Extracted from server/oauth_tools.py to break circular import dependencies
|
|
between server/ and auth/ layers.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
|
|
import jwt
|
|
from mcp.server.auth.middleware.auth_context import get_access_token
|
|
from mcp.server.auth.provider import AccessToken
|
|
from mcp.server.fastmcp import Context
|
|
|
|
from nextcloud_mcp_server.auth.userinfo_routes import _query_idp_userinfo
|
|
|
|
from ..http import nextcloud_httpx_client
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def extract_user_id_from_token(ctx: Context) -> str:
|
|
"""Extract user_id from the MCP access token (Flow 1).
|
|
|
|
Handles both JWT and opaque tokens:
|
|
- JWT: Decode and extract 'sub' claim
|
|
- Opaque: Call userinfo endpoint to get 'sub'
|
|
|
|
Args:
|
|
ctx: MCP context with access token
|
|
|
|
Returns:
|
|
user_id extracted from token, or "default_user" as fallback
|
|
"""
|
|
# Use MCP SDK's get_access_token() which uses contextvars
|
|
access_token: AccessToken | None = get_access_token()
|
|
|
|
if not access_token or not access_token.token:
|
|
logger.warning(" ✗ No access token found via get_access_token()")
|
|
return "default_user"
|
|
|
|
token = access_token.token
|
|
is_jwt = "." in token and token.count(".") >= 2
|
|
logger.info(f" Token type: {'JWT' if is_jwt else 'Opaque'}")
|
|
|
|
# Try JWT decode first
|
|
if is_jwt:
|
|
try:
|
|
payload = jwt.decode(token, options={"verify_signature": False})
|
|
user_id = payload.get("sub", "unknown")
|
|
logger.info(f" ✓ JWT decode successful: user_id={user_id}")
|
|
return user_id
|
|
except Exception as e:
|
|
logger.error(f" ✗ JWT decode failed: {type(e).__name__}: {e}")
|
|
|
|
# Opaque token - call userinfo endpoint
|
|
logger.info(" Opaque token detected, calling userinfo endpoint...")
|
|
try:
|
|
# Get userinfo endpoint from OIDC discovery
|
|
oidc_discovery_uri = os.getenv(
|
|
"OIDC_DISCOVERY_URI",
|
|
"http://localhost:8080/.well-known/openid-configuration",
|
|
)
|
|
async with nextcloud_httpx_client() as http_client:
|
|
discovery_response = await http_client.get(oidc_discovery_uri)
|
|
discovery_response.raise_for_status()
|
|
discovery = discovery_response.json()
|
|
userinfo_endpoint = discovery.get("userinfo_endpoint")
|
|
|
|
if userinfo_endpoint:
|
|
userinfo = await _query_idp_userinfo(token, userinfo_endpoint)
|
|
if userinfo:
|
|
user_id = userinfo.get("sub", "unknown")
|
|
logger.info(f" ✓ Userinfo query successful: user_id={user_id}")
|
|
return user_id
|
|
else:
|
|
logger.error(" ✗ Userinfo query failed")
|
|
else:
|
|
logger.error(" ✗ No userinfo_endpoint available")
|
|
except Exception as e:
|
|
logger.error(f" ✗ Userinfo query failed: {type(e).__name__}: {e}")
|
|
|
|
# Fallback
|
|
logger.warning(" Using fallback user_id: default_user")
|
|
return "default_user"
|