feat: Add userinfo route/page

This commit is contained in:
Chris Coutinho
2025-11-04 00:03:24 +01:00
parent d92945a388
commit d14f2f666d
6 changed files with 220 additions and 129 deletions
+65
View File
@@ -0,0 +1,65 @@
Excellent and incredibly thorough work on ADR-004. It outlines a robust, secure, and modern approach to federated authentication that aligns with industry best practices. The Progressive Consent architecture with dual OAuth flows is the right direction for a system with these requirements.
Here is a review of the current implementation in light of the architecture proposed in the ADR.
### High-Level Assessment
The project is in a good state, with a clear vision for its authentication architecture. The current implementation provides a backward-compatible "Hybrid Flow" while also containing the scaffolding for the target "Progressive Consent" flow. The hybrid flow is well-tested, which is a great foundation.
The following points are intended to help bridge the gap between the current implementation and the final vision outlined in ADR-004.
### Critical Security Review
#### 1. Missing Token Audience (`aud`) Validation
This is the most critical issue. The `require_scopes` decorator currently checks for scopes but does not validate the `audience` (`aud` claim) of the incoming JWT.
* **Risk:** This creates a "confused deputy" vulnerability. An access token issued for a different application could be used to access the MCP server, as long as the scope names happen to match.
* **ADR Reference:** The ADR correctly identifies this and proposes an `MCPTokenVerifier` that validates `aud: "mcp-server"`.
* **Recommendation:** Implement the audience validation as a central part of your token verification middleware. An incoming token should be rejected immediately if its audience is not `mcp-server`. This check should happen before any tool-specific scope checks.
### Architecture and Implementation Review
#### 2. Progressive Consent Flow is Untested
The code for the Progressive Consent flow (behind the `ENABLE_PROGRESSIVE_CONSENT` flag) exists in `oauth_routes.py` and `oauth_tools.py`. However, there are no integration tests to validate it.
* **Risk:** Given the complexity of OAuth flows, it's likely there are bugs in the untested implementation.
* **Recommendation:** Create a new test file, `test_adr004_progressive_flow.py`, that uses Playwright to test the dual-flow architecture end-to-end:
1. **Flow 1:** A test MCP client authenticates directly with the IdP to get an `mcp-server` token.
2. **Provisioning Check:** The test verifies that calling a Nextcloud tool fails with a `ProvisioningRequiredError`.
3. **Flow 2:** The test calls the `provision_nextcloud_access` tool and automates the second OAuth flow to grant the server offline access.
4. **Tool Execution:** The test verifies that Nextcloud tools can now be successfully called.
#### 3. Inconsistent Authorization URL Generation
There is duplicated and inconsistent logic for generating the IdP authorization URL.
* **Location 1:** `oauth_tools.py` in `generate_oauth_url_for_flow2` hardcodes the authorization endpoint path.
* **Location 2:** `oauth_routes.py` in `oauth_authorize_nextcloud` correctly uses the OIDC discovery document to find the `authorization_endpoint`.
* **Risk:** The hardcoded path is brittle and will break with IdPs that use different endpoint paths (like Keycloak).
* **Recommendation:** Consolidate this logic. The `provision_nextcloud_access` tool should not build the URL itself. Instead, it should return a URL pointing to the MCP server's own `/oauth/authorize-nextcloud` endpoint. This endpoint (which you've already created as `oauth_authorize_nextcloud` in `oauth_routes.py`) can then be the single source of truth for generating the IdP redirect.
#### 4. Poor User Experience due to Missing Token Refresh
The `/oauth/token` endpoint does not implement the `refresh_token` grant type. This means that when the client's `mcp-server` access token expires (e.g., after one hour), the user must go through the entire browser-based login flow again.
* **Risk:** This creates a frustrating user experience, especially for long-lived desktop clients.
* **ADR Reference:** A proper Flow 1 should result in the MCP client receiving both an access token and a refresh token from the IdP.
* **Recommendation:**
1. Ensure the IdP is configured to issue refresh tokens to the MCP client for Flow 1.
2. The MCP client should securely store this refresh token.
3. The client should use the refresh token to get new `mcp-server` access tokens directly from the IdP, without involving the MCP server or the user. The MCP server should not be involved in the client's session management with the IdP.
### Summary
The project is on the right track. The ADR is a solid plan, and the initial implementation is a good starting point.
My recommendations in order of priority are:
1. **Implement Audience Validation** to close the security gap.
2. **Add Integration Tests** for the Progressive Consent flow.
3. **Refactor the client-side token refresh** to improve user experience.
4. **Consolidate the URL generation** logic to fix the inconsistency.
Addressing these points will align the implementation with the excellent vision in ADR-004 and result in a secure, robust, and user-friendly system.
@@ -14,6 +14,11 @@ import jwt
from starlette.requests import Request
from starlette.responses import HTMLResponse, JSONResponse, RedirectResponse
from nextcloud_mcp_server.auth.userinfo_routes import (
_get_userinfo_endpoint,
_query_idp_userinfo,
)
logger = logging.getLogger(__name__)
@@ -307,7 +312,7 @@ async def oauth_login_callback(request: Request) -> RedirectResponse | HTMLRespo
user_id = f"user-{secrets.token_hex(8)}"
username = "unknown"
# Store refresh token
# Store refresh token (for background jobs ONLY)
if refresh_token:
logger.info(f"Storing refresh token for user_id: {user_id}")
await storage.store_refresh_token(
@@ -320,6 +325,32 @@ async def oauth_login_callback(request: Request) -> RedirectResponse | HTMLRespo
else:
logger.warning("No refresh token in token response - cannot store session")
# Query and cache user profile (for browser UI display)
access_token = token_data.get("access_token")
if access_token:
try:
# Get the OAuth context to determine correct userinfo endpoint
oauth_ctx = getattr(request.app.state, "oauth_context", {})
userinfo_endpoint = await _get_userinfo_endpoint(oauth_ctx)
if userinfo_endpoint:
# Query userinfo endpoint with fresh access token
profile_data = await _query_idp_userinfo(
access_token, userinfo_endpoint
)
if profile_data:
# Cache profile for browser UI (no token needed to display)
await storage.store_user_profile(user_id, profile_data)
logger.info(f"✓ User profile cached for {user_id}")
else:
logger.warning(f"Failed to query userinfo endpoint for {user_id}")
else:
logger.warning("Could not determine userinfo endpoint")
except Exception as e:
logger.error(f"Error caching user profile: {e}")
# Continue anyway - profile cache is optional for browser UI
# Create response and set session cookie
response = RedirectResponse("/user/page", status_code=302)
response.set_cookie(
@@ -1,7 +1,22 @@
"""
Refresh Token Storage for ADR-002 Tier 1: Offline Access
Securely stores and manages user refresh tokens for background operations.
Manages two separate concerns for OAuth authentication:
1. **Refresh Tokens** (for background jobs ONLY)
- Securely stores encrypted refresh tokens for offline access
- Used ONLY by background jobs to obtain access tokens
- NEVER used within MCP client sessions or browser sessions
2. **User Profile Cache** (for browser UI display ONLY)
- Caches IdP user profile data for browser-based admin UI
- Queried ONCE at login, displayed from cache thereafter
- NOT used for authorization decisions or background jobs
IMPORTANT: These are separate concerns. Browser sessions read profile cache for
display purposes. Background jobs use refresh tokens for API access. Never mix
the two.
Tokens are encrypted at rest using Fernet symmetric encryption.
"""
@@ -19,7 +34,14 @@ logger = logging.getLogger(__name__)
class RefreshTokenStorage:
"""Securely store and manage user refresh tokens"""
"""Securely store and manage user refresh tokens and profile cache.
This class manages two separate concerns:
- Refresh tokens: Encrypted storage for background job access (write-only by OAuth, read-only by background jobs)
- User profiles: Plain JSON cache for browser UI display (written at login, read by UI)
These concerns are architecturally separate and should never be mixed.
"""
def __init__(self, db_path: str, encryption_key: bytes):
"""
@@ -104,7 +126,10 @@ class RefreshTokenStorage:
token_audience TEXT DEFAULT 'nextcloud', -- 'mcp-server' or 'nextcloud'
provisioned_at INTEGER, -- When Flow 2 was completed
provisioning_client_id TEXT, -- Which MCP client initiated Flow 1
scopes TEXT -- JSON array of granted scopes
scopes TEXT, -- JSON array of granted scopes
-- Browser session profile cache
user_profile TEXT, -- JSON cache of IdP user profile (for browser UI only)
profile_cached_at INTEGER -- When profile was last cached
)
"""
)
@@ -257,6 +282,76 @@ class RefreshTokenStorage:
auth_method="offline_access",
)
async def store_user_profile(
self, user_id: str, profile_data: dict[str, any]
) -> None:
"""
Store user profile data (cached from IdP userinfo endpoint).
This profile is cached ONLY for browser UI display purposes, not for
authorization decisions. Background jobs should NOT rely on this data.
Args:
user_id: User identifier (must match refresh_tokens.user_id)
profile_data: User profile dict from IdP userinfo endpoint
"""
if not self._initialized:
await self.initialize()
profile_json = json.dumps(profile_data)
now = int(time.time())
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
UPDATE refresh_tokens
SET user_profile = ?, profile_cached_at = ?
WHERE user_id = ?
""",
(profile_json, now, user_id),
)
await db.commit()
logger.debug(f"Cached user profile for {user_id}")
async def get_user_profile(self, user_id: str) -> Optional[dict[str, any]]:
"""
Retrieve cached user profile data.
This returns cached profile data from the initial OAuth login,
NOT fresh data from the IdP. Use this for browser UI display only.
Args:
user_id: User identifier
Returns:
User profile dict or None if not cached
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(
"""
SELECT user_profile, profile_cached_at
FROM refresh_tokens
WHERE user_id = ?
""",
(user_id,),
) as cursor:
row = await cursor.fetchone()
if not row or not row[0]:
return None
profile_json, cached_at = row
profile_data = json.loads(profile_json)
# Optionally add cache metadata
profile_data["_cached_at"] = cached_at
return profile_data
async def get_refresh_token(self, user_id: str) -> Optional[dict]:
"""
Retrieve and decrypt refresh token for user.
+25 -110
View File
@@ -103,11 +103,19 @@ async def _query_idp_userinfo(
async def _get_user_info(request: Request) -> dict[str, Any]:
"""Get user information for the currently authenticated user.
IMPORTANT: This function reads from cached profile data stored at login time.
It does NOT perform token refresh or query the IdP on every request. The
profile was cached once during oauth_login_callback and is displayed from
storage thereafter.
This is for BROWSER UI DISPLAY ONLY. Do not use this for authorization
decisions or background job authentication.
Args:
request: Starlette request object (must be authenticated)
Returns:
Dictionary containing user information
Dictionary containing user information from cache
"""
username = request.user.display_name
oauth_ctx = getattr(request.app.state, "oauth_context", None)
@@ -120,7 +128,7 @@ async def _get_user_info(request: Request) -> dict[str, Any]:
"nextcloud_host": os.getenv("NEXTCLOUD_HOST", "unknown"),
}
# OAuth mode - get user's refresh token and current access token
# OAuth mode - read cached profile from browser session
storage = oauth_ctx.get("storage")
session_id = request.cookies.get("mcp_session")
@@ -132,123 +140,30 @@ async def _get_user_info(request: Request) -> dict[str, Any]:
}
try:
# Get refresh token data
# Check if background access was granted (refresh token exists)
token_data = await storage.get_refresh_token(session_id)
if not token_data:
return {
"error": "No refresh token found",
"username": username,
"auth_mode": "oauth",
}
background_access_granted = token_data is not None
refresh_token = token_data.get("refresh_token")
# Retrieve cached user profile (no token operations!)
profile_data = await storage.get_user_profile(session_id)
# Exchange refresh token for fresh access token
oauth_client = oauth_ctx.get("oauth_client")
oauth_config = oauth_ctx.get("config")
if oauth_client:
# External IdP mode (Keycloak)
# Create fresh HTTP client to avoid event loop issues
if not oauth_client.token_endpoint:
await oauth_client.discover()
async with httpx.AsyncClient(timeout=30.0) as http_client:
response = await http_client.post(
oauth_client.token_endpoint,
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
},
auth=(oauth_client.client_id, oauth_client.client_secret),
)
response.raise_for_status()
token_response = response.json()
access_token = token_response["access_token"]
# Update stored refresh token if a new one was issued (token rotation)
new_refresh_token = token_response.get("refresh_token")
if new_refresh_token and new_refresh_token != refresh_token:
logger.info(
f"Refresh token rotated, updating storage for session: {session_id[:16]}..."
)
await storage.store_refresh_token(
user_id=session_id,
refresh_token=new_refresh_token,
)
else:
# Integrated mode (Nextcloud OIDC)
# Note: This is server-side code, so we use internal Docker hostnames
# (not public URLs) for server-to-server communication
discovery_url = oauth_config.get("discovery_url")
logger.info(f"Querying discovery URL: {discovery_url}")
async with httpx.AsyncClient() as http_client:
response = await http_client.get(discovery_url)
response.raise_for_status()
discovery = response.json()
token_endpoint = discovery["token_endpoint"]
logger.info(
f"Using token endpoint for server-side refresh: {token_endpoint}"
)
async with httpx.AsyncClient() as http_client:
response = await http_client.post(
token_endpoint,
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": oauth_config["client_id"],
"client_secret": oauth_config["client_secret"],
},
)
if response.status_code != 200:
error_body = response.text
logger.error(
f"Token refresh failed: HTTP {response.status_code}\n"
f"Request data: grant_type=refresh_token, "
f"refresh_token={refresh_token[:20] if refresh_token else 'None'}..., "
f"client_id={oauth_config.get('client_id')}\n"
f"Response: {error_body}"
)
response.raise_for_status()
token_response = response.json()
access_token = token_response["access_token"]
# Update stored refresh token if a new one was issued (token rotation)
new_refresh_token = token_response.get("refresh_token")
if new_refresh_token and new_refresh_token != refresh_token:
logger.info(
f"Refresh token rotated, updating storage for session: {session_id[:16]}..."
)
await storage.store_refresh_token(
user_id=session_id,
refresh_token=new_refresh_token,
)
# Build basic user context
# Build user context
user_context = {
"username": username, # From request.user.display_name
"username": username, # From request.user.display_name (session_id)
"auth_mode": "oauth",
"session_id": session_id[:16] + "...", # Truncated for security
"background_access_granted": background_access_granted,
}
# Query IdP userinfo for enhanced profile
# Get the correct userinfo endpoint based on OAuth mode (Keycloak vs Nextcloud)
userinfo_endpoint = await _get_userinfo_endpoint(oauth_ctx)
if userinfo_endpoint:
idp_profile = await _query_idp_userinfo(access_token, userinfo_endpoint)
if idp_profile:
user_context["idp_profile"] = idp_profile
else:
user_context["idp_profile_error"] = (
"Failed to retrieve profile from IdP"
)
# Include cached profile if available
if profile_data:
user_context["idp_profile"] = profile_data
logger.debug(f"Loaded cached profile for {session_id[:16]}...")
else:
logger.warning("Could not determine userinfo endpoint")
user_context["idp_profile_error"] = "Userinfo endpoint not available"
logger.warning(f"No cached profile found for {session_id[:16]}...")
user_context["idp_profile_error"] = (
"Profile not cached. Try logging out and back in."
)
return user_context
@@ -17,7 +17,6 @@ from nextcloud_mcp_server.auth.userinfo_routes import _query_idp_userinfo
pytestmark = pytest.mark.unit
@pytest.mark.asyncio
async def test_query_idp_userinfo_success(mocker):
"""Test successful IdP userinfo query."""
mock_response = Mock()
@@ -52,7 +51,6 @@ async def test_query_idp_userinfo_success(mocker):
)
@pytest.mark.asyncio
async def test_query_idp_userinfo_failure(mocker):
"""Test IdP userinfo query failure handling."""
mock_client = AsyncMock()
-13
View File
@@ -97,7 +97,6 @@ def create_test_jwt(
class TestTokenExchange:
"""Test RFC 8693 token exchange implementation."""
@pytest.mark.asyncio
async def test_validate_flow1_token_success(self, token_exchange_service):
"""Test validation of Flow 1 token with correct audience."""
# Create token with correct audience
@@ -106,7 +105,6 @@ class TestTokenExchange:
# Should not raise an exception
await token_exchange_service._validate_flow1_token(flow1_token)
@pytest.mark.asyncio
async def test_validate_flow1_token_wrong_audience(self, token_exchange_service):
"""Test validation fails with wrong audience."""
# Create token with wrong audience
@@ -115,7 +113,6 @@ class TestTokenExchange:
with pytest.raises(ValueError, match="Invalid token audience"):
await token_exchange_service._validate_flow1_token(flow1_token)
@pytest.mark.asyncio
async def test_validate_flow1_token_expired(self, token_exchange_service):
"""Test validation fails with expired token."""
# Create expired token
@@ -124,7 +121,6 @@ class TestTokenExchange:
with pytest.raises(ValueError, match="Token has expired"):
await token_exchange_service._validate_flow1_token(flow1_token)
@pytest.mark.asyncio
async def test_extract_user_id(self, token_exchange_service):
"""Test extraction of user ID from token."""
flow1_token = create_test_jwt(user_id="alice")
@@ -132,13 +128,11 @@ class TestTokenExchange:
user_id = token_exchange_service._extract_user_id(flow1_token)
assert user_id == "alice"
@pytest.mark.asyncio
async def test_check_provisioning_not_provisioned(self, token_exchange_service):
"""Test provisioning check when user not provisioned."""
result = await token_exchange_service._check_provisioning("unknown_user")
assert result is False
@pytest.mark.asyncio
async def test_check_provisioning_is_provisioned(
self, token_exchange_service, token_storage
):
@@ -151,7 +145,6 @@ class TestTokenExchange:
result = await token_exchange_service._check_provisioning("alice")
assert result is True
@pytest.mark.asyncio
async def test_exchange_token_not_provisioned(self, token_exchange_service):
"""Test token exchange fails when user not provisioned."""
flow1_token = create_test_jwt(user_id="unprovisioneduser")
@@ -163,7 +156,6 @@ class TestTokenExchange:
requested_audience="nextcloud",
)
@pytest.mark.asyncio
async def test_exchange_token_with_fallback(
self, token_exchange_service, token_storage
):
@@ -211,7 +203,6 @@ class TestTokenExchange:
class TestTokenBroker:
"""Test Token Broker session/background separation."""
@pytest.mark.asyncio
async def test_get_session_token(self, token_broker, token_storage):
"""Test getting ephemeral session token via exchange."""
# Store refresh token for user
@@ -239,7 +230,6 @@ class TestTokenBroker:
cached = await token_broker.cache.get("alice")
assert cached is None # Should not be in cache
@pytest.mark.asyncio
async def test_get_background_token(self, token_broker, token_storage):
"""Test getting background token with stored refresh."""
# Store encrypted refresh token for user
@@ -284,7 +274,6 @@ class TestTokenBroker:
cached = await token_broker.cache.get(cache_key)
assert cached == "background_token_abc"
@pytest.mark.asyncio
async def test_session_background_separation(self, token_broker, token_storage):
"""Test that session and background tokens are kept separate."""
# Store refresh token
@@ -350,7 +339,6 @@ class TestTokenBroker:
class TestScopeDownscoping:
"""Test that tokens request only necessary scopes."""
@pytest.mark.asyncio
async def test_session_token_minimal_scopes(
self, token_exchange_service, token_storage
):
@@ -396,7 +384,6 @@ class TestScopeDownscoping:
assert "notes:write" not in requested_scopes
assert "calendar:write" not in requested_scopes
@pytest.mark.asyncio
async def test_background_token_different_scopes(self, token_broker, token_storage):
"""Test background tokens can request different scopes than session."""
from cryptography.fernet import Fernet