fix: remove Hybrid Flow, make Progressive Consent default (ADR-004)

Eliminates scope escalation security vulnerability by removing Hybrid Flow
and making Progressive Consent the only OAuth mode.

Changes:
- Delete oauth_callback() and oauth_token() (Hybrid Flow only, ~314 lines)
- Fix scope flows: Flow 1 requests resource scopes, Flow 2 requests identity+offline
- Remove ENABLE_PROGRESSIVE_CONSENT flag (always enabled in OAuth mode)
- Update documentation to reflect Progressive Consent as default
- Delete test_adr004_hybrid_flow.py test file
- Remove unused variables (ruff lint fixes)

Security improvements:
- No scope escalation: client gets exactly what it requests
- Clear separation: MCP session tokens vs Nextcloud offline tokens
- OAuth2 compliant: follows best practices for scope handling

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-11-04 00:26:07 +01:00
parent d14f2f666d
commit 15113dbb03
8 changed files with 125 additions and 940 deletions
+20 -16
View File
@@ -165,32 +165,36 @@ docker compose exec db mariadb -u root -ppassword nextcloud -e \
3. MCP tools use context pattern: `get_client(ctx)` → `NextcloudClient`
4. All operations are async using httpx
### Progressive Consent Mode (ADR-004)
### Progressive Consent Architecture (ADR-004)
**Status**: Opt-in feature (disabled by default)
**Enable**: Set `ENABLE_PROGRESSIVE_CONSENT=true`
**Default**: Hybrid Flow (backward compatible, single OAuth flow)
**Status**: Always enabled in OAuth mode (default)
**What is Progressive Consent?**
- Dual OAuth flow architecture that separates client authentication (Flow 1) from resource provisioning (Flow 2)
- Flow 1: MCP client authenticates directly to IdP (aud: "mcp-server")
- Flow 2: User explicitly provisions Nextcloud access via separate login (not during MCP session)
- Provides clear separation between session tokens and background job tokens
- Flow 1: MCP client authenticates directly to IdP with resource scopes (notes:*, calendar:*, etc.)
- Token audience: "mcp-server"
- Client receives resource-scoped token for MCP session
- Flow 2: Server explicitly provisions Nextcloud access via separate login
- Server requests: openid, profile, email, offline_access
- Token audience: "nextcloud"
- Server receives refresh token for offline access
- Client never sees this token
- Provides clear separation between session tokens and offline access tokens
**When to use:**
**When to use OAuth mode:**
- Multi-user deployments
- Background jobs requiring offline access
- Enhanced security with separate authorization contexts
- Explicit user control over resource access
**When NOT to use:**
- Simple single-user deployments (use BasicAuth)
- Standard OAuth without background jobs (use default Hybrid Flow)
**When to use BasicAuth instead:**
- Simple single-user deployments
- Local development and testing
**Key difference from Hybrid Flow:**
- Hybrid Flow: Server intercepts OAuth callback, stores refresh token automatically
- Progressive Consent: User explicitly authorizes via `provision_nextcloud_access` tool
**Key features:**
- No scope escalation - client gets exactly what it requests
- User explicitly authorizes via `provision_nextcloud_access` tool
- Clear security boundaries between MCP session and Nextcloud access
## MCP Response Patterns (CRITICAL)
-1
View File
@@ -105,7 +105,6 @@ services:
# ADR-004: Use Hybrid Flow (server intercepts OAuth callback)
# Set to false to enable Hybrid Flow tests - server stores refresh token and issues MCP codes
- ENABLE_PROGRESSIVE_CONSENT=false
# NO admin credentials - using OAuth with Dynamic Client Registration (DCR)
# Client credentials registered via RFC 7591 and stored in volume
-1
View File
@@ -25,7 +25,6 @@ NEXTCLOUD_MCP_SERVER_URL=http://localhost:8000
# Enable Progressive Consent mode (dual OAuth flows)
# When enabled: Flow 1 for client auth, Flow 2 for Nextcloud resource access
# When disabled: Uses existing hybrid flow (backward compatible)
#ENABLE_PROGRESSIVE_CONSENT=false
# MCP Server OAuth Client Configuration
# The MCP server's own OAuth client credentials for Flow 2
+29 -80
View File
@@ -22,7 +22,6 @@ from starlette.routing import Mount, Route
from nextcloud_mcp_server.auth import (
InsufficientScopeError,
NextcloudTokenVerifier,
discover_all_scopes,
get_access_token_scopes,
has_required_scopes,
@@ -547,91 +546,45 @@ async def setup_oauth_config():
logger.info(
f"Using public issuer URL override for JWT validation: {public_issuer}"
)
jwt_validation_issuer = public_issuer
client_issuer = public_issuer
else:
jwt_validation_issuer = issuer
client_issuer = issuer
# Check if Progressive Consent mode is enabled (opt-in, defaults to false)
enable_progressive = (
os.getenv("ENABLE_PROGRESSIVE_CONSENT", "false").lower() == "true"
)
# Progressive Consent mode (always enabled) - dual OAuth flows with audience separation
logger.info("✓ Progressive Consent mode enabled - dual OAuth flows active")
# Create token verifier
if enable_progressive:
# Progressive Consent mode: Use specialized verifier with audience separation
logger.info("✓ Progressive Consent mode enabled - dual OAuth flows active")
# Get encryption key for token broker
encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY")
if not encryption_key:
logger.warning(
"TOKEN_ENCRYPTION_KEY not set - token broker will not be available"
)
# Get encryption key for token broker
encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY")
if not encryption_key:
logger.warning(
"TOKEN_ENCRYPTION_KEY not set - token broker will not be available"
)
# Create token broker service
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
# Create token broker service
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
token_broker = None
if encryption_key and refresh_token_storage:
token_broker = TokenBrokerService(
storage=refresh_token_storage,
oidc_discovery_url=discovery_url,
nextcloud_host=nextcloud_host,
encryption_key=encryption_key,
)
logger.info(
"✓ Token Broker service initialized for audience-specific tokens"
)
# Create Progressive Consent token verifier
token_verifier = ProgressiveConsentTokenVerifier(
token_storage=refresh_token_storage,
token_broker=token_broker,
token_broker = None
if encryption_key and refresh_token_storage:
token_broker = TokenBrokerService(
storage=refresh_token_storage,
oidc_discovery_url=discovery_url,
nextcloud_host=nextcloud_host,
encryption_key=encryption_key,
)
logger.info("✓ Token Broker service initialized for audience-specific tokens")
logger.info(
"✓ Progressive Consent verifier configured - enforcing audience separation"
)
# Create Progressive Consent token verifier
token_verifier = ProgressiveConsentTokenVerifier(
token_storage=refresh_token_storage,
token_broker=token_broker,
oidc_discovery_url=discovery_url,
nextcloud_host=nextcloud_host,
encryption_key=encryption_key,
)
elif is_external_idp:
# External IdP mode: Validate via Nextcloud user_oidc app
# The user_oidc app accepts tokens from the external IdP and provisions users
nextcloud_userinfo_uri = f"{nextcloud_host}/apps/user_oidc/userinfo"
token_verifier = NextcloudTokenVerifier(
nextcloud_host=nextcloud_host,
userinfo_uri=nextcloud_userinfo_uri, # Nextcloud validates external tokens
jwks_uri=jwks_uri, # External IdP's JWKS for JWT validation
issuer=jwt_validation_issuer, # External IdP issuer
introspection_uri=None, # External IdP introspection not used
client_id=client_id,
client_secret=client_secret,
)
logger.info(
"✓ External IdP mode configured - tokens validated via Nextcloud user_oidc app"
)
else:
# Integrated mode: Nextcloud provides both OAuth and validation
token_verifier = NextcloudTokenVerifier(
nextcloud_host=nextcloud_host,
userinfo_uri=userinfo_uri, # Nextcloud userinfo endpoint
jwks_uri=jwks_uri, # Nextcloud JWKS for JWT validation
issuer=jwt_validation_issuer, # Nextcloud issuer (or public override)
introspection_uri=introspection_uri, # Nextcloud introspection for opaque tokens
client_id=client_id,
client_secret=client_secret,
)
logger.info(
"✓ Integrated mode configured - Nextcloud provides OAuth and validation"
)
logger.info(
"✓ Progressive Consent verifier configured - enforcing audience separation"
)
# Create OAuth client for server-initiated flows (e.g., token exchange, background workers)
oauth_client = None
@@ -800,14 +753,10 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
f"Unknown app: {app_name}. Available apps: {list(available_apps.keys())}"
)
# Register OAuth provisioning tools if in OAuth mode with Progressive Consent
# Register OAuth provisioning tools (Progressive Consent always enabled in OAuth mode)
if oauth_enabled:
enable_progressive = (
os.getenv("ENABLE_PROGRESSIVE_CONSENT", "false").lower() == "true"
)
if enable_progressive:
logger.info("Registering OAuth provisioning tools for Progressive Consent")
register_oauth_tools(mcp)
logger.info("Registering OAuth provisioning tools for Progressive Consent")
register_oauth_tools(mcp)
# Override list_tools to filter based on user's token scopes (OAuth mode only)
if oauth_enabled:
+74 -463
View File
@@ -1,29 +1,27 @@
"""
OAuth 2.0 Login Routes for ADR-004 Progressive Consent Architecture
Implements OAuth endpoints that support both:
1. Hybrid Flow (default, backward compatible) - Single OAuth flow with server interception
2. Progressive Consent (opt-in via ENABLE_PROGRESSIVE_CONSENT=true) - Dual OAuth flows with explicit provisioning
Implements dual OAuth flows with explicit provisioning:
Progressive Consent Mode (opt-in, requires separate login):
- Enable with ENABLE_PROGRESSIVE_CONSENT=true
- Flow 1: Client Authentication - MCP client authenticates directly to IdP
- Flow 2: Resource Provisioning - MCP server gets delegated Nextcloud access (separate login, not during MCP session)
Flow 1: Client Authentication - MCP client authenticates directly to IdP
- Client requests: Nextcloud MCP resource scopes (notes:*, calendar:*, etc.)
- Token audience (aud): "mcp-server"
- No server interception - IdP redirects directly to client
- Client receives resource-scoped token for MCP session
Flow 2: Resource Provisioning - MCP server gets delegated Nextcloud access
- Triggered by user calling provision_nextcloud_access tool
- Server requests: openid, profile, email scopes, offline_access
- Separate login flow outside MCP session, results in browser login for user
- Token audience (aud): "nextcloud", redirect/callback to mcp server
- Server receives refresh token for offline access
- Client never sees this token
Hybrid Flow Mode (default, backward compatible):
1. MCP client initiates OAuth at /oauth/authorize
2. MCP server redirects to IdP (intercepts callback)
3. IdP redirects back to /oauth/callback (server gets master tokens)
4. Server generates MCP auth code and redirects to client
5. Client exchanges MCP code at /oauth/token using PKCE
"""
import hashlib
import logging
import os
import secrets
from urllib.parse import urlencode
from uuid import uuid4
import httpx
import jwt
@@ -38,23 +36,17 @@ logger = logging.getLogger(__name__)
async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
"""
OAuth authorization endpoint with PKCE support.
OAuth authorization endpoint for Flow 1: Client Authentication.
Supports both Hybrid Flow (default) and Progressive Consent Flow 1 (opt-in).
In Progressive Consent mode (opt-in, ENABLE_PROGRESSIVE_CONSENT=true):
- Flow 1: Client authenticates directly to IdP with its own client_id
- Server validates client_id is in ALLOWED_MCP_CLIENTS list
- Issues tokens with aud: "mcp-server" for MCP authentication only
In Hybrid Flow mode (default):
- Single OAuth flow where server intercepts and stores refresh token
The client authenticates directly to the IdP with its own client_id.
The server validates the client is authorized but does NOT intercept the callback.
IdP redirects directly back to the client's redirect_uri.
Query parameters:
response_type: Must be "code"
client_id: MCP client identifier (required in Progressive mode)
client_id: MCP client identifier (required)
redirect_uri: Client's localhost redirect URI (required)
scope: Requested scopes (optional)
scope: Requested scopes (optional, defaults to "openid profile email")
state: CSRF protection state (required)
code_challenge: PKCE code challenge from client (required)
code_challenge_method: PKCE method, must be "S256" (required)
@@ -62,11 +54,6 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
Returns:
302 redirect to IdP authorization endpoint
"""
# Check if Progressive Consent is enabled (opt-in, defaults to false)
enable_progressive = (
os.getenv("ENABLE_PROGRESSIVE_CONSENT", "false").lower() == "true"
)
# Extract parameters
response_type = request.query_params.get("response_type")
client_id = request.query_params.get("client_id")
@@ -131,36 +118,35 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
status_code=400,
)
# In Progressive Consent mode, validate client_id using registry
if enable_progressive:
if not client_id:
return JSONResponse(
{
"error": "invalid_request",
"error_description": "client_id is required in Progressive Consent mode",
},
status_code=400,
)
# Validate client using registry
registry = get_client_registry()
is_valid, error_msg = registry.validate_client(
client_id=client_id,
redirect_uri=redirect_uri,
scopes=request.query_params.get("scope", "").split()
if request.query_params.get("scope")
else None,
# Validate client_id (required for Progressive Consent Flow 1)
if not client_id:
return JSONResponse(
{
"error": "invalid_request",
"error_description": "client_id is required",
},
status_code=400,
)
if not is_valid:
logger.warning(f"Client validation failed: {error_msg}")
return JSONResponse(
{
"error": "unauthorized_client",
"error_description": error_msg,
},
status_code=401,
)
# Validate client using registry
registry = get_client_registry()
is_valid, error_msg = registry.validate_client(
client_id=client_id,
redirect_uri=redirect_uri,
scopes=request.query_params.get("scope", "").split()
if request.query_params.get("scope")
else None,
)
if not is_valid:
logger.warning(f"Client validation failed: {error_msg}")
return JSONResponse(
{
"error": "unauthorized_client",
"error_description": error_msg,
},
status_code=401,
)
# Get OAuth context from app state
oauth_ctx = request.app.state.oauth_context
@@ -173,78 +159,39 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
status_code=500,
)
storage: RefreshTokenStorage = oauth_ctx["storage"]
oauth_client = oauth_ctx["oauth_client"]
oauth_config = oauth_ctx["config"]
# Build IdP authorization URL
mcp_server_url = oauth_config["mcp_server_url"]
# Flow 1: Client authenticates directly to IdP WITHOUT server interception
# CRITICAL: This is a direct pass-through to IdP
# The IdP will redirect directly back to the client's callback
# The MCP server does NOT see the IdP authorization code!
if enable_progressive:
# Flow 1: Client authenticates directly to IdP WITHOUT server interception
# CRITICAL: This is a direct pass-through to IdP
# The IdP will redirect directly back to the client's callback
# The MCP server does NOT see the IdP authorization code!
logger.info(
f"Starting Progressive Consent Flow 1 - no server session needed, "
f"client will handle IdP response directly at {redirect_uri}"
)
logger.info(
f"Starting Progressive Consent Flow 1 - no server session needed, "
f"client will handle IdP response directly at {redirect_uri}"
)
# Use client's redirect_uri for DIRECT callback (bypasses server)
callback_uri = redirect_uri
# Use client's redirect_uri for DIRECT callback (bypasses server)
callback_uri = redirect_uri
# Request resource scopes for MCP tools access
# The token will have aud: "mcp-server" claim
# Build scopes from NEXTCLOUD_OIDC_SCOPES config
default_scopes = "openid profile email"
resource_scopes = oauth_config.get("scopes", "")
scopes = f"{default_scopes} {resource_scopes}".strip()
# Only request MCP authentication scopes (no Nextcloud scopes!)
# The token will have aud: "mcp-server" claim
scopes = "openid profile email"
# Pass through client's state directly
idp_state = state
# Pass through client's state directly
idp_state = state
# Use client's own client_id (client must be pre-registered at IdP)
idp_client_id = client_id
# Use client's own client_id (client must be pre-registered at IdP)
idp_client_id = client_id
logger.info("Flow 1 (Progressive Consent): Direct client auth to IdP")
logger.info(f" Client ID: {client_id}")
logger.info(f" Client will receive IdP code directly at: {callback_uri}")
logger.info(f" Scopes: {scopes} (no resource access)")
else:
# Hybrid Flow: Server intercepts callback (backward compatible)
# Generate session ID and MCP authorization code for Hybrid Flow
session_id = str(uuid4())
mcp_authorization_code = f"mcp-code-{secrets.token_urlsafe(32)}"
logger.info(
f"Starting Hybrid OAuth flow - session={session_id[:8]}..., "
f"client_redirect={redirect_uri}"
)
# Store session with client details and PKCE challenge
await storage.store_oauth_session(
session_id=session_id,
client_id=client_id,
client_redirect_uri=redirect_uri,
state=state,
code_challenge=code_challenge,
code_challenge_method=code_challenge_method,
mcp_authorization_code=mcp_authorization_code,
flow_type="hybrid",
ttl_seconds=600, # 10 minutes
)
callback_uri = f"{mcp_server_url}/oauth/callback"
# Combine session_id and client state for IdP state parameter
idp_state = f"{session_id}:{state}"
# Build scopes - include both identity scopes and Nextcloud scopes
default_scopes = "openid profile email offline_access"
nextcloud_scopes = oauth_config.get("scopes", "")
scopes = f"{default_scopes} {nextcloud_scopes}".strip()
# Use server's client_id
idp_client_id = oauth_config["client_id"]
logger.info("Hybrid Flow: Server intercepts callback")
logger.info(f" Server callback: {callback_uri}")
logger.info(f" Combined scopes: {scopes}")
logger.info("Flow 1 (Progressive Consent): Direct client auth to IdP")
logger.info(f" Client ID: {client_id}")
logger.info(f" Client will receive IdP code directly at: {callback_uri}")
logger.info(f" Scopes: {scopes} (resource access for MCP tools)")
# Get authorization endpoint from OAuth client
if oauth_client:
@@ -313,322 +260,6 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
return RedirectResponse(auth_url, status_code=302)
async def oauth_callback(request: Request) -> RedirectResponse | JSONResponse:
"""
OAuth callback endpoint - IdP redirects here after user authentication.
This is the CRITICAL difference in the Hybrid Flow:
- The server receives the IdP authorization code
- Server exchanges it for master tokens (including refresh token)
- Server stores the refresh token securely
- Server generates MCP authorization code
- Server redirects client with MCP code (not IdP code!)
Query parameters:
code: Authorization code from IdP
state: State parameter (contains session_id:client_state)
error: Error code (if authorization failed)
error_description: Error description
Returns:
302 redirect to client's redirect_uri with MCP authorization code
"""
# Check for errors from IdP
error = request.query_params.get("error")
if error:
error_description = request.query_params.get(
"error_description", "Authorization failed"
)
logger.error(f"IdP authorization error: {error} - {error_description}")
return JSONResponse(
{
"error": error,
"error_description": error_description,
},
status_code=400,
)
# Extract IdP authorization code and state
idp_code = request.query_params.get("code")
idp_state = request.query_params.get("state")
if not idp_code or not idp_state:
return JSONResponse(
{
"error": "invalid_request",
"error_description": "code and state parameters are required",
},
status_code=400,
)
# Parse state to extract session_id and client_state
try:
session_id, client_state = idp_state.split(":", 1)
except ValueError:
return JSONResponse(
{"error": "invalid_state", "error_description": "Invalid state format"},
status_code=400,
)
# Get OAuth context
oauth_ctx = request.app.state.oauth_context
storage: RefreshTokenStorage = oauth_ctx["storage"]
oauth_client = oauth_ctx["oauth_client"]
oauth_config = oauth_ctx["config"]
# Retrieve OAuth session
oauth_session = await storage.get_oauth_session(session_id)
if not oauth_session:
return JSONResponse(
{
"error": "invalid_session",
"error_description": "Session not found or expired",
},
status_code=400,
)
logger.info(
f"Processing OAuth callback - session={session_id[:8]}..., "
f"exchanging IdP code for tokens"
)
# STEP 1: Exchange IdP code for master tokens
# The server gets the master refresh token!
mcp_server_url = oauth_config["mcp_server_url"]
server_callback_uri = f"{mcp_server_url}/oauth/callback"
try:
if oauth_client:
# External IdP mode (Keycloak)
# Note: This requires code_verifier, but server doesn't use PKCE with IdP
# We'll need to modify KeycloakOAuthClient to support this pattern
token_data = await oauth_client.exchange_authorization_code(
code=idp_code,
code_verifier="", # Server doesn't use PKCE with IdP
)
else:
# Integrated mode (Nextcloud OIDC)
discovery_url = oauth_config.get("discovery_url")
async with httpx.AsyncClient() as http_client:
response = await http_client.get(discovery_url)
response.raise_for_status()
discovery = response.json()
token_endpoint = discovery["token_endpoint"]
# Exchange code for tokens
async with httpx.AsyncClient() as http_client:
response = await http_client.post(
token_endpoint,
data={
"grant_type": "authorization_code",
"code": idp_code,
"redirect_uri": server_callback_uri,
"client_id": oauth_config["client_id"],
"client_secret": oauth_config["client_secret"],
},
)
response.raise_for_status()
token_data = response.json()
except Exception as e:
logger.error(f"Token exchange failed: {e}")
return JSONResponse(
{
"error": "server_error",
"error_description": f"Failed to exchange authorization code: {e}",
},
status_code=500,
)
access_token = token_data["access_token"]
refresh_token = token_data.get("refresh_token")
id_token = token_data.get("id_token")
# Decode ID token to get user info (without verification - just for userinfo)
try:
userinfo = jwt.decode(id_token, options={"verify_signature": False})
user_id = userinfo.get("sub")
username = userinfo.get("preferred_username") or userinfo.get("email")
logger.info(f"User authenticated: {username} (sub={user_id})")
except Exception as e:
logger.warning(f"Failed to decode ID token: {e}")
user_id = "unknown"
username = "unknown"
# STEP 2: Store master refresh token (if provided)
if refresh_token:
await storage.store_refresh_token(
user_id=user_id,
refresh_token=refresh_token,
expires_at=None, # Refresh tokens typically don't have expiration
)
logger.info(f"Stored master refresh token for user {user_id}")
# STEP 3: Update session with tokens
await storage.update_oauth_session(
session_id=session_id,
user_id=user_id,
idp_access_token=access_token,
idp_refresh_token=refresh_token,
)
# STEP 4: Redirect to native client with MCP-generated code
mcp_code = oauth_session["mcp_authorization_code"]
client_redirect_uri = oauth_session["client_redirect_uri"]
redirect_params = {
"code": mcp_code, # MCP code, NOT IdP code!
"state": client_state, # Return original client state
}
redirect_url = f"{client_redirect_uri}?{urlencode(redirect_params)}"
logger.info(
f"OAuth callback complete - redirecting to client with MCP code: {mcp_code[:16]}..."
)
return RedirectResponse(redirect_url, status_code=302)
async def oauth_token(request: Request) -> JSONResponse:
"""
OAuth token endpoint - client exchanges MCP code for tokens.
The client sends the MCP-generated code (not IdP code) and proves
ownership via PKCE code_verifier.
Form parameters:
grant_type: Must be "authorization_code" or "refresh_token"
code: MCP authorization code (for authorization_code grant)
code_verifier: PKCE code verifier (for authorization_code grant)
redirect_uri: Must match the redirect_uri from /oauth/authorize
client_id: MCP client identifier (optional)
refresh_token: Refresh token (for refresh_token grant)
Returns:
JSON response with access_token and optional refresh_token
"""
# Parse form data
form = await request.form()
grant_type = form.get("grant_type")
if grant_type == "authorization_code":
# Authorization code grant
code = form.get("code")
code_verifier = form.get("code_verifier")
redirect_uri = form.get("redirect_uri")
if not code or not code_verifier or not redirect_uri:
return JSONResponse(
{
"error": "invalid_request",
"error_description": "code, code_verifier, and redirect_uri are required",
},
status_code=400,
)
# Get OAuth context
oauth_ctx = request.app.state.oauth_context
storage: RefreshTokenStorage = oauth_ctx["storage"]
# Retrieve session by MCP authorization code
oauth_session = await storage.get_oauth_session_by_mcp_code(code)
if not oauth_session:
return JSONResponse(
{
"error": "invalid_grant",
"error_description": "Invalid authorization code",
},
status_code=400,
)
# Verify PKCE
code_challenge = oauth_session.get("code_challenge")
if code_challenge:
# Compute challenge from verifier
computed_challenge = hashlib.sha256(code_verifier.encode()).digest().hex()
# Convert to base64url format
import base64
computed_challenge = (
base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
)
.decode()
.rstrip("=")
)
if computed_challenge != code_challenge:
logger.error("PKCE verification failed")
return JSONResponse(
{
"error": "invalid_grant",
"error_description": "PKCE verification failed",
},
status_code=400,
)
# Verify redirect_uri matches
if redirect_uri != oauth_session["client_redirect_uri"]:
return JSONResponse(
{
"error": "invalid_grant",
"error_description": "redirect_uri mismatch",
},
status_code=400,
)
# Get stored IdP access token
idp_access_token = oauth_session.get("idp_access_token")
if not idp_access_token:
return JSONResponse(
{
"error": "server_error",
"error_description": "Access token not found in session",
},
status_code=500,
)
# Invalidate MCP authorization code (one-time use)
await storage.delete_oauth_session(oauth_session["session_id"])
logger.info(f"Token exchange successful - user={oauth_session.get('user_id')}")
# Return tokens to client
# CRITICAL: Client gets access token but NOT the master refresh token
# (unless we implement MCP session refresh tokens)
return JSONResponse(
{
"access_token": idp_access_token,
"token_type": "Bearer",
"expires_in": 3600, # Typical access token lifetime
# Note: We don't return the master refresh token!
# MCP client would need to re-authenticate when token expires
}
)
elif grant_type == "refresh_token":
# Refresh token grant (not implemented in ADR-004 initial version)
return JSONResponse(
{
"error": "unsupported_grant_type",
"error_description": "refresh_token grant not yet implemented",
},
status_code=400,
)
else:
return JSONResponse(
{
"error": "unsupported_grant_type",
"error_description": f"grant_type '{grant_type}' is not supported",
},
status_code=400,
)
async def oauth_authorize_nextcloud(
request: Request,
) -> RedirectResponse | JSONResponse:
@@ -639,27 +270,12 @@ async def oauth_authorize_nextcloud(
to initiate delegated resource access to Nextcloud. Requires a separate
login flow outside of the MCP session.
Only available when Progressive Consent is enabled (opt-in).
Query parameters:
state: Session state for tracking
Returns:
302 redirect to IdP authorization endpoint
"""
# Check if Progressive Consent is enabled (opt-in, defaults to false)
enable_progressive = (
os.getenv("ENABLE_PROGRESSIVE_CONSENT", "false").lower() == "true"
)
if not enable_progressive:
return JSONResponse(
{
"error": "not_enabled",
"error_description": "Progressive Consent mode is not enabled",
},
status_code=400,
)
state = request.query_params.get("state")
if not state:
return JSONResponse(
@@ -699,14 +315,9 @@ async def oauth_authorize_nextcloud(
mcp_server_url = oauth_config["mcp_server_url"]
callback_uri = f"{mcp_server_url}/oauth/callback-nextcloud"
# Define resource access scopes
scopes = (
"openid profile email offline_access "
"notes:read notes:write "
"calendar:read calendar:write "
"contacts:read contacts:write "
"files:read files:write"
)
# Flow 2: Server only needs identity + offline access (no resource scopes)
# Resource scopes are requested by client in Flow 1
scopes = "openid profile email offline_access"
# Get authorization endpoint
discovery_url = oauth_config.get("discovery_url")
@@ -63,19 +63,6 @@ def require_provisioning(func: Callable) -> Callable:
logger.debug("BasicAuth mode detected - skipping provisioning check")
return await func(*args, **kwargs)
# Check if Progressive Consent is enabled (opt-in, defaults to false)
# Provisioning checks only apply when using Progressive Consent Flow 2
import os
enable_progressive = (
os.getenv("ENABLE_PROGRESSIVE_CONSENT", "false").lower() == "true"
)
if not enable_progressive:
logger.debug(
"Progressive Consent disabled (ENABLE_PROGRESSIVE_CONSENT=false) - skipping provisioning check"
)
return await func(*args, **kwargs)
# Progressive Consent mode - check if user has completed Flow 2 provisioning
# Get user_id from authorization token
user_id = None
+2 -6
View File
@@ -135,8 +135,7 @@ class Settings:
nextcloud_username: Optional[str] = None
nextcloud_password: Optional[str] = None
# Progressive Consent settings
enable_progressive_consent: bool = False
# Progressive Consent settings (always enabled - no flag needed)
enable_token_exchange: bool = False
enable_offline_access: bool = False
@@ -160,10 +159,7 @@ def get_settings() -> Settings:
nextcloud_host=os.getenv("NEXTCLOUD_HOST"),
nextcloud_username=os.getenv("NEXTCLOUD_USERNAME"),
nextcloud_password=os.getenv("NEXTCLOUD_PASSWORD"),
# Progressive Consent settings
enable_progressive_consent=(
os.getenv("ENABLE_PROGRESSIVE_CONSENT", "false").lower() == "true"
),
# Progressive Consent settings (always enabled)
enable_token_exchange=(
os.getenv("ENABLE_TOKEN_EXCHANGE", "false").lower() == "true"
),
@@ -1,360 +0,0 @@
"""ADR-004 Hybrid Flow Integration Tests.
Tests the complete ADR-004 Hybrid Flow where:
1. Client initiates OAuth at MCP server /oauth/authorize with PKCE
2. MCP server intercepts the flow and redirects to IdP
3. User authenticates and consents at IdP
4. IdP redirects to MCP server /oauth/callback
5. MCP server exchanges IdP code for master refresh token (stored securely)
6. MCP server redirects client with MCP authorization code
7. Client exchanges MCP code for MCP access token using PKCE verifier
8. Client uses MCP access token to establish MCP session and call tools
9. MCP server uses stored refresh token to access Nextcloud APIs on behalf of user
This validates:
- PKCE code challenge/verifier flow
- Master refresh token storage
- Token isolation (client never sees master refresh token)
- End-to-end tool execution with hybrid flow tokens
"""
import hashlib
import json
import logging
import os
import secrets
import time
from base64 import urlsafe_b64encode
from urllib.parse import quote
import anyio
import httpx
import pytest
from tests.conftest import create_mcp_client_session
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
def generate_pkce_challenge():
"""Generate PKCE code verifier and challenge.
Returns:
Tuple of (code_verifier, code_challenge)
"""
code_verifier = secrets.token_urlsafe(32)
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = urlsafe_b64encode(digest).decode().rstrip("=")
return code_verifier, code_challenge
@pytest.fixture(scope="session")
async def adr004_hybrid_flow_mcp_client(
anyio_backend,
browser,
oauth_callback_server,
):
"""
Fixture to create an MCP client session via ADR-004 Hybrid Flow with Playwright automation.
This fixture tests the complete hybrid flow:
1. Client initiates OAuth at MCP server with PKCE
2. MCP server intercepts and redirects to IdP
3. Playwright automates login and consent at IdP
4. IdP redirects to MCP server callback
5. MCP server stores master refresh token and redirects client with MCP code
6. Client exchanges MCP code for access token using PKCE verifier
7. Creates and returns MCP ClientSession with the token
Yields:
Initialized MCP ClientSession for ADR-004 hybrid flow
"""
nextcloud_host = os.getenv("NEXTCLOUD_HOST")
username = os.getenv("NEXTCLOUD_USERNAME", "admin")
password = os.getenv("NEXTCLOUD_PASSWORD", "admin")
mcp_server_url = "http://localhost:8001" # MCP OAuth server
if not all([nextcloud_host, username, password]):
pytest.skip(
"ADR-004 Hybrid Flow requires NEXTCLOUD_HOST, NEXTCLOUD_USERNAME, and NEXTCLOUD_PASSWORD"
)
# Get auth_states dict and callback URL from callback server
auth_states, callback_url = oauth_callback_server
logger.info("=" * 70)
logger.info("Starting ADR-004 Hybrid Flow test with Playwright")
logger.info("=" * 70)
logger.info(f"MCP Server: {mcp_server_url}")
logger.info(f"Nextcloud: {nextcloud_host}")
logger.info(f"User: {username}")
logger.info(f"Client Callback: {callback_url}")
logger.info("=" * 70)
# Step 1: Generate PKCE challenge
code_verifier, code_challenge = generate_pkce_challenge()
logger.info(f"✓ Generated PKCE challenge: {code_challenge[:16]}...")
# Step 2: Generate state for CSRF protection
state = secrets.token_urlsafe(32)
logger.debug(f"✓ Generated state: {state[:16]}...")
# Step 3: Construct authorization URL to MCP server (not IdP!)
# The MCP server will intercept this and redirect to IdP
auth_params = {
"response_type": "code",
"client_id": "test-mcp-client", # Client identifier (not OAuth client_id)
"redirect_uri": callback_url, # Client's callback
"scope": "openid profile email offline_access notes:read notes:write",
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
# Build query string manually to avoid double encoding
query_parts = [f"{k}={quote(str(v), safe='')}" for k, v in auth_params.items()]
auth_url = f"{mcp_server_url}/oauth/authorize?{'&'.join(query_parts)}"
logger.info("Step 1: Client initiates OAuth at MCP server")
logger.debug(f"Authorization URL: {auth_url[:100]}...")
# Step 4: Navigate to authorization URL with Playwright
context = await browser.new_context(ignore_https_errors=True)
page = await context.new_page()
try:
# Navigate to MCP server authorization endpoint
# MCP server will redirect to IdP
logger.debug("Navigating to MCP authorization endpoint...")
await page.goto(auth_url, wait_until="networkidle", timeout=60000)
# Check current URL - should be at IdP login page
current_url = page.url
logger.info(f"Step 2: Redirected to IdP login: {current_url[:80]}...")
# Fill in login form if present
if "/login" in current_url or "/index.php/login" in current_url:
logger.info("Step 3: Filling in credentials at IdP...")
# Wait for login form
await page.wait_for_selector('input[name="user"]', timeout=10000)
# Fill in username and password
await page.fill('input[name="user"]', username)
await page.fill('input[name="password"]', password)
logger.debug("Submitting login form...")
# Submit the form
await page.click('button[type="submit"]')
# Wait for navigation after login
await page.wait_for_load_state("networkidle", timeout=60000)
current_url = page.url
logger.info(f"Step 4: After login: {current_url[:80]}...")
# Handle consent screen if present
logger.info("Step 5: Handling IdP consent screen...")
try:
await _handle_oauth_consent_screen(page, username)
except Exception as e:
logger.debug(f"No consent screen or already authorized: {e}")
# Wait for callback server to receive the MCP authorization code
# Browser will be redirected through: IdP → MCP callback → Client callback
logger.info("Step 6: Waiting for MCP server to redirect with MCP code...")
timeout_seconds = 30
start_time = time.time()
while state not in auth_states:
if time.time() - start_time > timeout_seconds:
# Take a screenshot for debugging
screenshot_path = "/tmp/adr004_oauth_error.png"
await page.screenshot(path=screenshot_path)
logger.error(f"Screenshot saved to {screenshot_path}")
raise TimeoutError(
f"Timeout waiting for MCP authorization code (state={state[:16]}...)"
)
await anyio.sleep(0.5)
mcp_authorization_code = auth_states[state]
logger.info(
f"✓ Received MCP authorization code: {mcp_authorization_code[:20]}..."
)
finally:
await context.close()
# Step 7: Exchange MCP authorization code for MCP access token
logger.info("Step 7: Exchanging MCP code for access token with PKCE verifier...")
async with httpx.AsyncClient(timeout=30.0) as http_client:
token_response = await http_client.post(
f"{mcp_server_url}/oauth/token",
data={
"grant_type": "authorization_code",
"code": mcp_authorization_code,
"code_verifier": code_verifier, # PKCE verifier
"redirect_uri": callback_url,
"client_id": "test-mcp-client",
},
)
if token_response.status_code != 200:
logger.error(f"Token exchange failed: {token_response.status_code}")
logger.error(f"Response: {token_response.text}")
raise RuntimeError(
f"Token exchange failed: {token_response.status_code} - {token_response.text}"
)
token_data = token_response.json()
access_token = token_data.get("access_token")
if not access_token:
raise ValueError(f"No access_token in response: {token_data}")
logger.info("✓ Successfully obtained MCP access token via ADR-004 Hybrid Flow")
logger.info(f" Token: {access_token[:30]}...")
logger.info(f" Type: {token_data.get('token_type', 'Bearer')}")
logger.info(f" Expires in: {token_data.get('expires_in', 'unknown')}s")
# Verify refresh token was stored (check database)
logger.info("Step 8: Verifying master refresh token was stored...")
# Note: In production, we'd verify the refresh token is in the database
# For now, we'll verify by successfully calling a tool
logger.info("=" * 70)
logger.info("ADR-004 Hybrid Flow completed successfully!")
logger.info("=" * 70)
# Step 9: Create MCP client session with the token
logger.info("Step 9: Creating MCP client session with hybrid flow token...")
async for session in create_mcp_client_session(
url=f"{mcp_server_url}/mcp",
token=access_token,
client_name="ADR-004 Hybrid Flow",
):
logger.info("✓ ADR-004 MCP client session established")
yield session
async def _handle_oauth_consent_screen(page, username: str = "admin"):
"""
Handle the OIDC consent screen during ADR-004 flow.
The consent screen:
- Asks user to authorize MCP server to access Nextcloud
- Contains scope information (notes:read, notes:write, etc.)
- Has an "Authorize" button to grant access
Args:
page: Playwright page object
username: Username for logging
"""
try:
# Wait for consent screen elements
logger.debug("Checking for OAuth consent screen...")
# Look for the authorize button
authorize_button = page.locator('button[type="submit"]').filter(
has_text="Authorize"
)
# Check if button exists with short timeout
if await authorize_button.count() > 0:
logger.info(
f"Consent screen detected - authorizing MCP server access for {username}"
)
await authorize_button.click()
logger.debug("Clicked Authorize button")
# Wait for redirect after consent
await page.wait_for_load_state("networkidle", timeout=30000)
logger.info("Consent granted, waiting for redirect...")
else:
logger.debug("No consent screen found (may be pre-authorized)")
except Exception as e:
logger.debug(f"Consent screen handling skipped: {e}")
# Not fatal - might already be authorized
# ============================================================================
# ADR-004 Hybrid Flow Tests
# ============================================================================
async def test_adr004_hybrid_flow_connection(adr004_hybrid_flow_mcp_client):
"""Test that ADR-004 hybrid flow token can establish MCP session."""
# List tools to verify session is established
result = await adr004_hybrid_flow_mcp_client.list_tools()
assert result is not None
assert len(result.tools) > 0
logger.info(
f"✓ ADR-004 session established with {len(result.tools)} tools available"
)
async def test_adr004_hybrid_flow_tool_execution(adr004_hybrid_flow_mcp_client):
"""Test that ADR-004 hybrid flow token can execute MCP tools.
This verifies the complete flow:
1. Client has MCP access token from hybrid flow
2. MCP server has stored master refresh token
3. MCP server can exchange master token for Nextcloud access
4. Tool execution succeeds using on-behalf-of pattern
"""
# Execute a tool that requires Nextcloud API access
result = await adr004_hybrid_flow_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": ""}
)
assert result.isError is False, f"Tool execution failed: {result.content}"
assert result.content is not None
response_data = json.loads(result.content[0].text)
# Verify response structure
assert "results" in response_data
assert isinstance(response_data["results"], list)
logger.info("=" * 70)
logger.info("✓ ADR-004 HYBRID FLOW TEST - SUCCESS")
logger.info("=" * 70)
logger.info("✓ User consented to MCP server access")
logger.info("✓ User consented to offline_access (refresh tokens)")
logger.info("✓ MCP server stored master refresh token")
logger.info("✓ Client received MCP access token via PKCE")
logger.info("✓ MCP session established with hybrid flow token")
logger.info("✓ MCP tool executed successfully")
logger.info("✓ MCP server exchanged master token for Nextcloud access")
logger.info(f"✓ Nextcloud API returned {len(response_data['results'])} notes")
logger.info("=" * 70)
async def test_adr004_hybrid_flow_multiple_operations(adr004_hybrid_flow_mcp_client):
"""Test that ADR-004 token persists across multiple operations.
Verifies that the stored master refresh token enables multiple tool calls
without requiring re-authentication.
"""
# First operation: Search notes
result1 = await adr004_hybrid_flow_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": ""}
)
assert result1.isError is False
# Second operation: List tools
result2 = await adr004_hybrid_flow_mcp_client.list_tools()
assert result2 is not None
assert len(result2.tools) > 0
# Third operation: Search notes again
result3 = await adr004_hybrid_flow_mcp_client.call_tool(
"nc_notes_search_notes", arguments={"query": "test"}
)
assert result3.isError is False
logger.info("✓ ADR-004 token successfully used for 3 consecutive operations")
logger.info("✓ Master refresh token enables persistent access")