feat: implement RFC 8693 Standard Token Exchange for Keycloak

Configure Keycloak 26.4.2 realm to support Standard Token Exchange V2,
enabling the MCP server to exchange client tokens (aud: nextcloud-mcp-server)
for Nextcloud-scoped tokens (aud: nextcloud) via RFC 8693.

Changes:
- Remove duplicate audience workarounds from realm configuration
- Add token-exchange-nextcloud client scope with audience mapper
- Configure scope as default for nextcloud-mcp-server client
- Enable standard.token.exchange.enabled on both clients
- Add comprehensive integration tests (7 tests, all passing)

Token Exchange Flow:
1. Client obtains token with aud: [nextcloud-mcp-server, nextcloud]
2. Server exchanges to aud: nextcloud, azp: nextcloud-mcp-server
3. Exchanged token used for Nextcloud API calls
4. Each request gets fresh ephemeral token (stateless)

Key Implementation Details:
- Uses Keycloak 26.2+ scope-based authorization (no FGAP required)
- Target audiences must be in client's default/optional scopes
- Protocol mappers alone don't grant exchange permission
- Tokens expire after 300s (5 minutes)

Tests validate:
- Basic token exchange flow
- Nextcloud API integration (Capabilities, Notes)
- CRUD operations with exchanged tokens
- Multiple stateless exchanges from same client token
- Token claims preservation (aud, azp, sub)
- Scope configuration validation

See docs/ADR-004-progressive-consent.md for architecture details.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Chris Coutinho
2025-11-04 02:30:37 +01:00
parent b20c9c6203
commit 0ff85dbe4f
2 changed files with 398 additions and 24 deletions
+18 -24
View File
@@ -166,13 +166,13 @@
{
"clientId": "nextcloud",
"name": "Nextcloud Resource Server",
"description": "Resource server for Nextcloud APIs - used by user_oidc app for bearer token validation",
"description": "Resource server for Nextcloud APIs - used by user_oidc app for bearer token validation and as token exchange target",
"enabled": true,
"clientAuthenticatorType": "client-secret",
"secret": "nextcloud-secret-change-in-production",
"redirectUris": [],
"webOrigins": [],
"bearerOnly": true,
"bearerOnly": false,
"consentRequired": false,
"standardFlowEnabled": false,
"implicitFlowEnabled": false,
@@ -181,7 +181,10 @@
"publicClient": false,
"protocol": "openid-connect",
"attributes": {
"display.on.consent.screen": "false"
"display.on.consent.screen": "false",
"token.exchange.grant.enabled": "true",
"client.token.exchange.standard.enabled": "true",
"standard.token.exchange.enabled": "true"
},
"fullScopeAllowed": true,
"nodeReRegistrationTimeout": -1
@@ -220,18 +223,19 @@
"client_credentials.use_refresh_token": "false",
"display.on.consent.screen": "false",
"token.exchange.grant.enabled": "true",
"client.token.exchange.standard.enabled": "true"
"client.token.exchange.standard.enabled": "true",
"standard.token.exchange.enabled": "true"
},
"fullScopeAllowed": true,
"nodeReRegistrationTimeout": -1,
"protocolMappers": [
{
"name": "audience-nextcloud",
"name": "audience-mcp-server",
"protocol": "openid-connect",
"protocolMapper": "oidc-audience-mapper",
"consentRequired": false,
"config": {
"included.custom.audience": "nextcloud",
"included.custom.audience": "nextcloud-mcp-server",
"access.token.claim": "true",
"id.token.claim": "false"
}
@@ -308,13 +312,15 @@
"web-origins",
"profile",
"roles",
"email"
"email",
"token-exchange-nextcloud"
],
"optionalClientScopes": [
"address",
"phone",
"offline_access",
"microprofile-jwt",
"token-exchange-nextcloud",
"notes:read",
"notes:write",
"calendar:read",
@@ -685,27 +691,16 @@
}
},
{
"name": "audience",
"description": "Audience scope for token validation",
"name": "token-exchange-nextcloud",
"description": "Allows token exchange for nextcloud client",
"protocol": "openid-connect",
"attributes": {
"include.in.token.scope": "true",
"include.in.token.scope": "false",
"display.on.consent.screen": "false"
},
"protocolMappers": [
{
"name": "mcp-server-audience",
"protocol": "openid-connect",
"protocolMapper": "oidc-audience-mapper",
"consentRequired": false,
"config": {
"included.client.audience": "nextcloud-mcp-server",
"id.token.claim": "false",
"access.token.claim": "true"
}
},
{
"name": "nextcloud-audience",
"name": "nextcloud-audience-for-exchange",
"protocol": "openid-connect",
"protocolMapper": "oidc-audience-mapper",
"consentRequired": false,
@@ -756,8 +751,7 @@
"profile",
"email",
"roles",
"web-origins",
"audience"
"web-origins"
],
"defaultOptionalClientScopes": [
"offline_access",
@@ -0,0 +1,380 @@
"""Integration tests for RFC 8693 Token Exchange with Keycloak.
These tests validate the complete token exchange flow:
1. Obtain client token from Keycloak
2. Exchange for Nextcloud-audience token via RFC 8693
3. Use exchanged token to access Nextcloud APIs
4. Verify CRUD operations work with exchanged tokens
Requirements:
- Keycloak running with nextcloud-mcp realm configured
- Nextcloud running with user_oidc app configured
- Standard Token Exchange enabled on both clients
- token-exchange-nextcloud scope configured
"""
from typing import Any
import httpx
import jwt
import pytest
@pytest.fixture
async def keycloak_base_url() -> str:
"""Keycloak base URL (external)."""
return "http://localhost:8888"
@pytest.fixture
async def keycloak_token_url(keycloak_base_url: str) -> str:
"""Keycloak token endpoint URL."""
return f"{keycloak_base_url}/realms/nextcloud-mcp/protocol/openid-connect/token"
@pytest.fixture
async def nextcloud_base_url() -> str:
"""Nextcloud base URL."""
return "http://localhost:8080"
@pytest.fixture
async def http_client() -> httpx.AsyncClient:
"""Async HTTP client for API requests."""
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
yield client
@pytest.fixture
async def keycloak_client_token(
http_client: httpx.AsyncClient, keycloak_token_url: str
) -> str:
"""Get client token from Keycloak using password grant.
Returns token with aud: ["nextcloud-mcp-server", "nextcloud"]
"""
response = await http_client.post(
keycloak_token_url,
data={
"grant_type": "password",
"client_id": "nextcloud-mcp-server",
"client_secret": "mcp-secret-change-in-production",
"username": "admin",
"password": "admin",
"scope": "openid profile email offline_access notes:read notes:write",
},
)
response.raise_for_status()
token_data = response.json()
return token_data["access_token"]
async def exchange_token(
http_client: httpx.AsyncClient,
token_url: str,
subject_token: str,
audience: str = "nextcloud",
) -> dict[str, Any]:
"""Exchange token using RFC 8693.
Args:
http_client: HTTP client
token_url: Token endpoint URL
subject_token: Token to exchange
audience: Target audience
Returns:
Token response with access_token and expires_in
"""
response = await http_client.post(
token_url,
data={
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"client_id": "nextcloud-mcp-server",
"client_secret": "mcp-secret-change-in-production",
"subject_token": subject_token,
"subject_token_type": "urn:ietf:params:oauth:token-type:access_token",
"requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
"audience": audience,
},
)
response.raise_for_status()
return response.json()
def decode_token_claims(token: str) -> dict[str, Any]:
"""Decode JWT token claims without verification.
Args:
token: JWT token
Returns:
Token claims
"""
return jwt.decode(token, options={"verify_signature": False})
@pytest.mark.integration
@pytest.mark.keycloak
class TestKeycloakTokenExchange:
"""Test RFC 8693 Token Exchange with Keycloak."""
async def test_token_exchange_basic(
self,
http_client: httpx.AsyncClient,
keycloak_token_url: str,
keycloak_client_token: str,
):
"""Test basic token exchange flow."""
# Verify initial token has both audiences
initial_claims = decode_token_claims(keycloak_client_token)
assert "nextcloud-mcp-server" in initial_claims["aud"]
assert "nextcloud" in initial_claims["aud"]
assert initial_claims["azp"] == "nextcloud-mcp-server"
# Exchange for Nextcloud-audience token
exchange_response = await exchange_token(
http_client, keycloak_token_url, keycloak_client_token
)
assert "access_token" in exchange_response
assert "expires_in" in exchange_response
assert exchange_response["expires_in"] > 0
# Verify exchanged token has correct audience
exchanged_token = exchange_response["access_token"]
exchanged_claims = decode_token_claims(exchanged_token)
assert exchanged_claims["aud"] == "nextcloud"
assert exchanged_claims["azp"] == "nextcloud-mcp-server"
assert exchanged_claims["sub"] == initial_claims["sub"]
async def test_token_exchange_with_nextcloud_api(
self,
http_client: httpx.AsyncClient,
keycloak_token_url: str,
keycloak_client_token: str,
nextcloud_base_url: str,
):
"""Test exchanged token works with Nextcloud APIs."""
# Exchange token
exchange_response = await exchange_token(
http_client, keycloak_token_url, keycloak_client_token
)
nextcloud_token = exchange_response["access_token"]
# Call Nextcloud Capabilities API
response = await http_client.get(
f"{nextcloud_base_url}/ocs/v1.php/cloud/capabilities",
headers={
"Authorization": f"Bearer {nextcloud_token}",
"OCS-APIRequest": "true",
},
)
response.raise_for_status()
# Verify response contains OCS data
assert "ocs" in response.text.lower()
async def test_token_exchange_multiple_times(
self,
http_client: httpx.AsyncClient,
keycloak_token_url: str,
keycloak_client_token: str,
):
"""Test multiple exchanges from same client token (stateless)."""
# Exchange token three times
tokens = []
for _ in range(3):
exchange_response = await exchange_token(
http_client, keycloak_token_url, keycloak_client_token
)
tokens.append(exchange_response["access_token"])
# All exchanges should succeed
assert len(tokens) == 3
# Tokens should be different (fresh ephemeral tokens)
# Note: Keycloak may cache, so tokens might be identical
# The important thing is that all exchanges succeeded
async def test_token_exchange_crud_operations(
self,
http_client: httpx.AsyncClient,
keycloak_token_url: str,
keycloak_client_token: str,
nextcloud_base_url: str,
):
"""Test CRUD operations with exchanged tokens."""
notes_api = f"{nextcloud_base_url}/index.php/apps/notes/api/v1/notes"
# Step 1: Exchange token for CREATE
exchange_response = await exchange_token(
http_client, keycloak_token_url, keycloak_client_token
)
create_token = exchange_response["access_token"]
# Step 2: Create a test note
create_response = await http_client.post(
notes_api,
headers={"Authorization": f"Bearer {create_token}"},
json={
"title": "Token Exchange Test",
"content": "This note was created using an RFC 8693 exchanged token!",
"category": "Test",
},
)
create_response.raise_for_status()
note_data = create_response.json()
note_id = note_data["id"]
assert note_data["title"] == "Token Exchange Test"
assert note_data["category"] == "Test"
# Step 3: Exchange token again for READ (simulate new request)
exchange_response = await exchange_token(
http_client, keycloak_token_url, keycloak_client_token
)
read_token = exchange_response["access_token"]
# Step 4: Read the note back
read_response = await http_client.get(
f"{notes_api}/{note_id}",
headers={"Authorization": f"Bearer {read_token}"},
)
read_response.raise_for_status()
read_data = read_response.json()
assert read_data["id"] == note_id
assert read_data["title"] == "Token Exchange Test"
assert "RFC 8693 exchanged token" in read_data["content"]
# Step 5: Exchange token again for DELETE
exchange_response = await exchange_token(
http_client, keycloak_token_url, keycloak_client_token
)
delete_token = exchange_response["access_token"]
# Step 6: Delete the note
delete_response = await http_client.delete(
f"{notes_api}/{note_id}",
headers={"Authorization": f"Bearer {delete_token}"},
)
# Notes API returns the deleted note or empty array
assert delete_response.status_code in (200, 204)
async def test_token_claims_preservation(
self,
http_client: httpx.AsyncClient,
keycloak_token_url: str,
keycloak_client_token: str,
):
"""Test that important claims are preserved during exchange."""
initial_claims = decode_token_claims(keycloak_client_token)
# Exchange token
exchange_response = await exchange_token(
http_client, keycloak_token_url, keycloak_client_token
)
exchanged_token = exchange_response["access_token"]
exchanged_claims = decode_token_claims(exchanged_token)
# Subject (user ID) should be preserved
assert exchanged_claims["sub"] == initial_claims["sub"]
# Authorized party should show delegation
assert exchanged_claims["azp"] == "nextcloud-mcp-server"
# Audience should be filtered to target
assert exchanged_claims["aud"] == "nextcloud"
# Token should have expiration
assert "exp" in exchanged_claims
assert exchanged_claims["exp"] > 0
async def test_token_exchange_scope_configuration(
self, http_client: httpx.AsyncClient, keycloak_token_url: str
):
"""Test that token-exchange-nextcloud scope is configured as default.
Since token-exchange-nextcloud is a default scope for nextcloud-mcp-server,
all tokens should have the nextcloud audience available for exchange.
"""
# Get a token - should automatically include default scopes
response = await http_client.post(
keycloak_token_url,
data={
"grant_type": "password",
"client_id": "nextcloud-mcp-server",
"client_secret": "mcp-secret-change-in-production",
"username": "admin",
"password": "admin",
"scope": "openid profile email",
},
)
response.raise_for_status()
token = response.json()["access_token"]
# Verify token has nextcloud in aud (from default token-exchange-nextcloud scope)
claims = decode_token_claims(token)
assert "nextcloud" in claims.get("aud", [])
# Exchange should succeed
exchange_response = await http_client.post(
keycloak_token_url,
data={
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"client_id": "nextcloud-mcp-server",
"client_secret": "mcp-secret-change-in-production",
"subject_token": token,
"subject_token_type": "urn:ietf:params:oauth:token-type:access_token",
"requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
"audience": "nextcloud",
},
)
# Should succeed because token-exchange-nextcloud is a default scope
assert exchange_response.status_code == 200
exchanged_data = exchange_response.json()
assert "access_token" in exchanged_data
@pytest.mark.integration
@pytest.mark.keycloak
class TestTokenExchangeService:
"""Test the TokenExchangeService implementation."""
async def test_exchange_token_for_audience(
self, keycloak_client_token: str, keycloak_token_url: str
):
"""Test the exchange_token_for_audience function."""
from nextcloud_mcp_server.auth.token_exchange import (
TokenExchangeService,
)
# Create service
service = TokenExchangeService(
oidc_discovery_url="http://localhost:8888/realms/nextcloud-mcp/.well-known/openid-configuration",
client_id="nextcloud-mcp-server",
client_secret="mcp-secret-change-in-production",
)
try:
# Exchange token
exchanged_token, expires_in = await service.exchange_token_for_audience(
subject_token=keycloak_client_token,
requested_audience="nextcloud",
)
# Verify exchange succeeded
assert exchanged_token is not None
assert isinstance(exchanged_token, str)
assert expires_in > 0
# Verify token has correct claims
claims = decode_token_claims(exchanged_token)
assert claims["aud"] == "nextcloud"
assert claims["azp"] == "nextcloud-mcp-server"
finally:
await service.close()