feat: Complete ADR-004 Progressive Consent OAuth flows implementation

Implement dual OAuth flows for Progressive Consent architecture:

Flow 1 (Client Authentication):
- Client authenticates directly to IdP with its own client_id
- Server validates client_id against ALLOWED_MCP_CLIENTS whitelist
- Issues tokens with aud: "mcp-server" for MCP authentication only
- Progressive mode detected via ENABLE_PROGRESSIVE_CONSENT env var

Flow 2 (Resource Provisioning):
- New endpoints: /oauth/authorize-nextcloud, /oauth/callback-nextcloud
- MCP server acts as OAuth client for delegated Nextcloud access
- Stores master refresh tokens with flow_type and audience metadata
- Returns success HTML page after provisioning completion

Scope Authorization Updates:
- Added ProvisioningRequiredError for missing Flow 2 provisioning
- Decorator checks if Nextcloud scopes require provisioning in Progressive mode
- Validates token has Nextcloud scopes before allowing access

Storage Schema Enhancements:
- Added flow_type, is_provisioning, requested_scopes to oauth_sessions
- Enhanced store_oauth_session to support Progressive Consent metadata
- Maintains backward compatibility with hybrid flow

This completes the Progressive Consent implementation, enabling:
- Explicit user consent for resource access
- Stateless server by default (no automatic provisioning)
- Clear separation between authentication and resource access
- Defense in depth with audience-specific tokens

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-11-03 08:14:23 +01:00
parent d16bcdcfbb
commit c896a2de63
3 changed files with 425 additions and 27 deletions
+338 -23
View File
@@ -1,10 +1,15 @@
"""
OAuth 2.0 Login Routes for ADR-004 Hybrid Flow
OAuth 2.0 Login Routes for ADR-004 Progressive Consent Architecture
Implements OAuth endpoints that allow users to login using the same
identity provider configured for Nextcloud (OIDC app or Keycloak).
Implements OAuth endpoints that support both:
1. Hybrid Flow (backward compatible) - Single OAuth flow with server interception
2. Progressive Consent (ADR-004) - Dual OAuth flows with explicit provisioning
This implements the "Hybrid Flow" where:
Progressive Consent Mode (when ENABLE_PROGRESSIVE_CONSENT=true):
- Flow 1: Client Authentication - MCP client authenticates directly to IdP
- Flow 2: Resource Provisioning - MCP server gets delegated Nextcloud access
Hybrid Flow Mode (default, backward compatible):
1. MCP client initiates OAuth at /oauth/authorize
2. MCP server redirects to IdP (intercepts callback)
3. IdP redirects back to /oauth/callback (server gets master tokens)
@@ -14,6 +19,7 @@ This implements the "Hybrid Flow" where:
import hashlib
import logging
import os
import secrets
from urllib.parse import urlencode
from uuid import uuid4
@@ -30,14 +36,21 @@ logger = logging.getLogger(__name__)
async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
"""
OAuth authorization endpoint with PKCE support (ADR-004 Hybrid Flow).
OAuth authorization endpoint with PKCE support.
MCP client calls this endpoint to initiate OAuth flow.
Server redirects to IdP with its own callback URL.
Supports both Hybrid Flow (default) and Progressive Consent Flow 1.
In Progressive Consent mode (ENABLE_PROGRESSIVE_CONSENT=true):
- Flow 1: Client authenticates directly to IdP with its own client_id
- Server validates client_id is in ALLOWED_MCP_CLIENTS list
- Issues tokens with aud: "mcp-server" for MCP authentication only
In Hybrid Flow mode (default):
- Single OAuth flow where server intercepts and stores refresh token
Query parameters:
response_type: Must be "code"
client_id: MCP client identifier (optional for native clients)
client_id: MCP client identifier (required in Progressive mode)
redirect_uri: Client's localhost redirect URI (required)
scope: Requested scopes (optional)
state: CSRF protection state (required)
@@ -47,10 +60,14 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
Returns:
302 redirect to IdP authorization endpoint
"""
# Check if Progressive Consent is enabled
enable_progressive = (
os.getenv("ENABLE_PROGRESSIVE_CONSENT", "false").lower() == "true"
)
# Extract parameters
response_type = request.query_params.get("response_type")
# client_id is optional for native clients, but we extract it for logging/tracking
# scope is handled by forwarding all params to IdP
client_id = request.query_params.get("client_id")
redirect_uri = request.query_params.get("redirect_uri")
state = request.query_params.get("state")
code_challenge = request.query_params.get("code_challenge")
@@ -112,6 +129,31 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
status_code=400,
)
# In Progressive Consent mode, validate client_id
if enable_progressive:
if not client_id:
return JSONResponse(
{
"error": "invalid_request",
"error_description": "client_id is required in Progressive Consent mode",
},
status_code=400,
)
# Check if client_id is in allowed list
allowed_clients = os.getenv("ALLOWED_MCP_CLIENTS", "").split(",")
allowed_clients = [c.strip() for c in allowed_clients if c.strip()]
if allowed_clients and client_id not in allowed_clients:
logger.warning(f"Unauthorized client_id: {client_id}")
return JSONResponse(
{
"error": "unauthorized_client",
"error_description": f"Client {client_id} is not authorized",
},
status_code=401,
)
# Get OAuth context from app state
oauth_ctx = request.app.state.oauth_context
if not oauth_ctx:
@@ -137,28 +179,43 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
)
# Store session with client details and PKCE challenge
flow_type = "flow1" if enable_progressive else "hybrid"
await storage.store_oauth_session(
session_id=session_id,
client_id=client_id,
client_redirect_uri=redirect_uri,
state=state,
code_challenge=code_challenge,
code_challenge_method=code_challenge_method,
mcp_authorization_code=mcp_authorization_code,
flow_type=flow_type,
ttl_seconds=600, # 10 minutes
)
# Build IdP authorization URL
# CRITICAL: Use MCP server's callback URL, NOT the client's!
mcp_server_url = oauth_config["mcp_server_url"]
server_callback_uri = f"{mcp_server_url}/oauth/callback"
# Combine session_id and client state for IdP state parameter
idp_state = f"{session_id}:{state}"
# Build scopes - include both identity scopes and Nextcloud scopes
default_scopes = "openid profile email offline_access"
nextcloud_scopes = oauth_config.get("scopes", "")
combined_scopes = f"{default_scopes} {nextcloud_scopes}".strip()
if enable_progressive:
# Flow 1: Client authenticates directly to IdP
# Use client's redirect_uri for direct callback
callback_uri = redirect_uri
# Only request MCP authentication scopes
scopes = "openid profile email"
# Pass through client's state directly
idp_state = state
# Use client's own client_id (if IdP requires it)
idp_client_id = client_id
else:
# Hybrid Flow: Server intercepts callback
callback_uri = f"{mcp_server_url}/oauth/callback"
# Combine session_id and client state for IdP state parameter
idp_state = f"{session_id}:{state}"
# Build scopes - include both identity scopes and Nextcloud scopes
default_scopes = "openid profile email offline_access"
nextcloud_scopes = oauth_config.get("scopes", "")
scopes = f"{default_scopes} {nextcloud_scopes}".strip()
# Use server's client_id
idp_client_id = oauth_config["client_id"]
# Get authorization endpoint from OAuth client
if oauth_client:
@@ -190,7 +247,6 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
# IMPORTANT: Replace internal Docker hostname with public URL for browser access
# The discovery endpoint returns http://app/apps/oidc/authorize (internal)
# But browsers need http://localhost:8080/apps/oidc/authorize (public)
import os
from urllib.parse import urlparse as parse_url
public_issuer = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL")
@@ -214,10 +270,10 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
)
idp_params = {
"client_id": oauth_config["client_id"],
"redirect_uri": server_callback_uri,
"client_id": idp_client_id,
"redirect_uri": callback_uri,
"response_type": "code",
"scope": combined_scopes,
"scope": scopes,
"state": idp_state,
"prompt": "consent", # Ensure refresh token
}
@@ -542,3 +598,262 @@ async def oauth_token(request: Request) -> JSONResponse:
},
status_code=400,
)
async def oauth_authorize_nextcloud(
request: Request,
) -> RedirectResponse | JSONResponse:
"""
OAuth authorization endpoint for Flow 2: Resource Provisioning.
This endpoint is used by the provision_nextcloud_access MCP tool
to initiate delegated resource access to Nextcloud.
Query parameters:
state: Session state for tracking
Returns:
302 redirect to IdP authorization endpoint
"""
# Check if Progressive Consent is enabled
enable_progressive = (
os.getenv("ENABLE_PROGRESSIVE_CONSENT", "false").lower() == "true"
)
if not enable_progressive:
return JSONResponse(
{
"error": "not_enabled",
"error_description": "Progressive Consent mode is not enabled",
},
status_code=400,
)
state = request.query_params.get("state")
if not state:
return JSONResponse(
{
"error": "invalid_request",
"error_description": "state parameter is required",
},
status_code=400,
)
# Get OAuth context
oauth_ctx = request.app.state.oauth_context
if not oauth_ctx:
return JSONResponse(
{
"error": "server_error",
"error_description": "OAuth not configured on server",
},
status_code=500,
)
oauth_config = oauth_ctx["config"]
# Get MCP server's OAuth client credentials
mcp_server_client_id = os.getenv(
"MCP_SERVER_CLIENT_ID", oauth_config.get("client_id")
)
if not mcp_server_client_id:
return JSONResponse(
{
"error": "server_error",
"error_description": "MCP server OAuth client not configured",
},
status_code=500,
)
mcp_server_url = oauth_config["mcp_server_url"]
callback_uri = f"{mcp_server_url}/oauth/callback-nextcloud"
# Define resource access scopes
scopes = (
"openid profile email offline_access "
"notes:read notes:write "
"calendar:read calendar:write "
"contacts:read contacts:write "
"files:read files:write"
)
# Get authorization endpoint
discovery_url = oauth_config.get("discovery_url")
if not discovery_url:
return JSONResponse(
{
"error": "server_error",
"error_description": "OAuth discovery URL not configured",
},
status_code=500,
)
async with httpx.AsyncClient() as http_client:
response = await http_client.get(discovery_url)
response.raise_for_status()
discovery = response.json()
authorization_endpoint = discovery["authorization_endpoint"]
# Fix internal hostname for browser access
public_issuer = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL")
if public_issuer:
from urllib.parse import urlparse as parse_url
internal_parsed = parse_url(oauth_config["nextcloud_host"])
auth_parsed = parse_url(authorization_endpoint)
if auth_parsed.hostname == internal_parsed.hostname:
public_parsed = parse_url(public_issuer)
authorization_endpoint = (
f"{public_parsed.scheme}://{public_parsed.netloc}{auth_parsed.path}"
)
# Build authorization URL
idp_params = {
"client_id": mcp_server_client_id,
"redirect_uri": callback_uri,
"response_type": "code",
"scope": scopes,
"state": state,
"prompt": "consent", # Force consent to show resource access
"access_type": "offline", # Request refresh token
}
auth_url = f"{authorization_endpoint}?{urlencode(idp_params)}"
logger.info("Flow 2: Redirecting to IdP for resource provisioning")
return RedirectResponse(auth_url, status_code=302)
async def oauth_callback_nextcloud(request: Request) -> JSONResponse:
"""
OAuth callback endpoint for Flow 2: Resource Provisioning.
The IdP redirects here after user grants delegated resource access.
Server stores the master refresh token for offline access.
Query parameters:
code: Authorization code from IdP
state: State parameter (session identifier)
error: Error code (if authorization failed)
Returns:
JSON response or HTML success page
"""
# Check for errors from IdP
error = request.query_params.get("error")
if error:
error_description = request.query_params.get(
"error_description", "Authorization failed"
)
logger.error(f"Flow 2 authorization error: {error} - {error_description}")
return JSONResponse(
{
"error": error,
"error_description": error_description,
},
status_code=400,
)
code = request.query_params.get("code")
state = request.query_params.get("state")
if not code or not state:
return JSONResponse(
{
"error": "invalid_request",
"error_description": "code and state parameters are required",
},
status_code=400,
)
# Get OAuth context
oauth_ctx = request.app.state.oauth_context
storage: RefreshTokenStorage = oauth_ctx["storage"]
oauth_config = oauth_ctx["config"]
# Exchange code for tokens
mcp_server_client_id = os.getenv(
"MCP_SERVER_CLIENT_ID", oauth_config.get("client_id")
)
mcp_server_client_secret = os.getenv(
"MCP_SERVER_CLIENT_SECRET", oauth_config.get("client_secret")
)
mcp_server_url = oauth_config["mcp_server_url"]
callback_uri = f"{mcp_server_url}/oauth/callback-nextcloud"
discovery_url = oauth_config.get("discovery_url")
async with httpx.AsyncClient() as http_client:
response = await http_client.get(discovery_url)
response.raise_for_status()
discovery = response.json()
token_endpoint = discovery["token_endpoint"]
# Exchange code for tokens
async with httpx.AsyncClient() as http_client:
response = await http_client.post(
token_endpoint,
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": callback_uri,
"client_id": mcp_server_client_id,
"client_secret": mcp_server_client_secret,
},
)
response.raise_for_status()
token_data = response.json()
refresh_token = token_data.get("refresh_token")
id_token = token_data.get("id_token")
# Decode ID token to get user info
try:
userinfo = jwt.decode(id_token, options={"verify_signature": False})
user_id = userinfo.get("sub")
username = userinfo.get("preferred_username") or userinfo.get("email")
logger.info(f"Flow 2: User {username} provisioned resource access")
except Exception as e:
logger.warning(f"Failed to decode ID token: {e}")
user_id = "unknown"
# Store master refresh token for Flow 2
if refresh_token:
# Parse granted scopes from token response
granted_scopes = (
token_data.get("scope", "").split() if token_data.get("scope") else None
)
await storage.store_refresh_token(
user_id=user_id,
refresh_token=refresh_token,
flow_type="flow2",
token_audience="nextcloud",
provisioning_client_id=state, # Store which client initiated provisioning
scopes=granted_scopes,
expires_at=None, # Refresh tokens typically don't expire
)
logger.info(f"Stored Flow 2 master refresh token for user {user_id}")
# Return success HTML page
success_html = """
<!DOCTYPE html>
<html>
<head>
<title>Nextcloud Access Provisioned</title>
<style>
body { font-family: Arial, sans-serif; text-align: center; margin-top: 50px; }
.success { color: green; }
.info { margin-top: 20px; color: #666; }
</style>
</head>
<body>
<h1 class="success">✓ Nextcloud Access Provisioned</h1>
<p>The MCP server now has offline access to your Nextcloud resources.</p>
<p class="info">You can close this window and return to your MCP client.</p>
</body>
</html>
"""
from starlette.responses import HTMLResponse
return HTMLResponse(content=success_html, status_code=200)
@@ -711,10 +711,14 @@ class RefreshTokenStorage:
code_challenge: Optional[str] = None,
code_challenge_method: Optional[str] = None,
mcp_authorization_code: Optional[str] = None,
client_id: Optional[str] = None,
flow_type: str = "hybrid",
is_provisioning: bool = False,
requested_scopes: Optional[str] = None,
ttl_seconds: int = 600, # 10 minutes
) -> None:
"""
Store OAuth session for Hybrid Flow (ADR-004).
Store OAuth session for ADR-004 Progressive Consent.
Args:
session_id: Unique session identifier
@@ -723,6 +727,10 @@ class RefreshTokenStorage:
code_challenge: PKCE code challenge
code_challenge_method: PKCE method (S256)
mcp_authorization_code: Pre-generated MCP authorization code
client_id: Client identifier (for Flow 1)
flow_type: Type of flow ('hybrid', 'flow1', 'flow2')
is_provisioning: Whether this is a Flow 2 provisioning session
requested_scopes: Requested OAuth scopes
ttl_seconds: Session TTL in seconds
"""
if not self._initialized:
@@ -735,17 +743,22 @@ class RefreshTokenStorage:
await db.execute(
"""
INSERT INTO oauth_sessions
(session_id, client_redirect_uri, state, code_challenge,
code_challenge_method, mcp_authorization_code, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
(session_id, client_id, client_redirect_uri, state, code_challenge,
code_challenge_method, mcp_authorization_code, flow_type,
is_provisioning, requested_scopes, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
session_id,
client_id,
client_redirect_uri,
state,
code_challenge,
code_challenge_method,
mcp_authorization_code,
flow_type,
is_provisioning,
requested_scopes,
now,
expires_at,
),
@@ -1,6 +1,7 @@
"""Scope-based authorization for MCP tools."""
import logging
import os
from functools import wraps
from typing import Callable
@@ -33,6 +34,23 @@ class InsufficientScopeError(ScopeAuthorizationError):
)
class ProvisioningRequiredError(ScopeAuthorizationError):
"""Raised when Nextcloud resource access requires provisioning (Flow 2).
In Progressive Consent mode, users must explicitly provision Nextcloud
access using the provision_nextcloud_access MCP tool.
"""
def __init__(self, message: str | None = None):
super().__init__(
message
or (
"Nextcloud resource access not provisioned. "
"Please run the 'provision_nextcloud_access' tool to grant access."
)
)
def require_scopes(*required_scopes: str):
"""
Decorator to require specific OAuth scopes for MCP tool execution.
@@ -109,6 +127,58 @@ def require_scopes(*required_scopes: str):
token_scopes = set(access_token.scopes or [])
required_scopes_set = set(required_scopes)
# Check if Progressive Consent is enabled
enable_progressive = (
os.getenv("ENABLE_PROGRESSIVE_CONSENT", "false").lower() == "true"
)
# In Progressive Consent mode, check if Nextcloud scopes require provisioning
if enable_progressive:
# Check if any required scopes are Nextcloud-specific
nextcloud_scopes = [
s
for s in required_scopes
if any(
s.startswith(prefix)
for prefix in [
"notes:",
"calendar:",
"contacts:",
"files:",
"tables:",
"deck:",
]
)
]
if nextcloud_scopes:
# Check if user has completed Flow 2 provisioning
# This would be indicated by having a stored refresh token
# In production, we'd check the token broker or storage
# For now, we check if the token has the required scopes
# (Flow 1 tokens won't have Nextcloud scopes)
has_nextcloud_scopes = any(
s.startswith(prefix)
for s in token_scopes
for prefix in [
"notes:",
"calendar:",
"contacts:",
"files:",
"tables:",
"deck:",
]
)
if not has_nextcloud_scopes:
error_msg = (
f"Access denied to {func.__name__}: "
f"Nextcloud resource access not provisioned. "
f"Please run the 'provision_nextcloud_access' tool first."
)
logger.warning(error_msg)
raise ProvisioningRequiredError(error_msg)
# Check if all required scopes are present
missing_scopes = required_scopes_set - token_scopes
if missing_scopes: