Compare commits

...

17 Commits

Author SHA1 Message Date
github-actions[bot] 77e491beea bump: version 0.24.1 → 0.25.0 2025-11-05 23:02:25 +00:00
Chris Coutinho 7812ac0ee7 Merge pull request #263 from cbcoutinho/adr/005-unified-token-verifier
feat: Implement ADR-005 unified token verifier to eliminate token passthrough vulnerability
2025-11-06 00:02:02 +01:00
Chris Coutinho 659087e4c7 fix: Implement proper OAuth resource parameters and PRM-based discovery
This commit completes the OAuth audience validation implementation per RFC 7519,
RFC 8707 (Resource Indicators), and RFC 9728 (Protected Resource Metadata).

## Key Changes

### OAuth Resource Parameters (RFC 8707)
- Add `resource` parameter to Flow 1 (MCP client auth) with MCP server audience
- Add `resource` parameter to Flow 2 (Nextcloud access) with Nextcloud audience
- Add `nextcloud_resource_uri` to oauth_context configuration
- Fix undefined variable error in starlette_lifespan

### PRM-Based Resource Discovery (RFC 9728)
- Update tests to fetch resource identifier from PRM endpoint
- Add fallback to hardcoded value if PRM fetch fails
- Demonstrate correct OAuth client implementation pattern

### ADR-005 Documentation Updates
- Update to reflect simplified RFC 7519 compliant implementation
- Document that MCP validates only its own audience (not Nextcloud's)
- Add section on OAuth resource parameters and PRM discovery
- Update implementation checklist to show completed items
- Mark status as "Implemented" with update date

## Implementation Details

The solution follows RFC 7519 Section 4.1.3: resource servers validate only
their own presence in the audience claim. This simplifies the logic while
maintaining security:

- MCP server validates MCP audience only
- Nextcloud independently validates its own audience
- No dual validation required at MCP layer
- Token reuse is allowed per RFC 8707 for multi-audience tokens

## Test Results
 test_mcp_oauth_server_connection - PASSED
 test_deck_board_view_permissions - PASSED
 test_prm_endpoint - PASSED

All OAuth flows now properly specify target resources, resulting in correct
audience validation throughout the system.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 23:19:03 +01:00
Chris Coutinho bdb1ba2051 refactor: Eliminate duplicate validation logic in UnifiedTokenVerifier
Since both multi-audience and exchange modes now validate the same thing
(MCP audience only per RFC 7519), consolidated the duplicate methods:

- Removed duplicate verification methods (_verify_multi_audience_token
  and _verify_mcp_audience_only)
- Created single _verify_mcp_audience() method for all validation
- Removed duplicate helper (_validate_multi_audience), kept _has_mcp_audience
- Mode only affects logging and what happens AFTER verification

The mode distinction is now purely about post-verification behavior:
- Multi-audience mode: Use token directly (Nextcloud validates its own)
- Exchange mode: Exchange for Nextcloud-audience token via RFC 8693

This makes the code cleaner and clearer about what's actually happening -
both modes do identical validation, they just differ in how the validated
token is used.

All tests pass: unit (65), OAuth integration confirmed working.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 21:58:52 +01:00
Chris Coutinho 7d9ab5559c fix: Simplify token verifier to be RFC 7519 compliant
Per RFC 7519 Section 4.1.3, resource servers should only validate their
own presence in the audience claim, not check for other resource servers.

Changes:
- UnifiedTokenVerifier now validates only MCP audience (not Nextcloud's)
- Nextcloud independently validates its own audience when receiving API calls
- This is NOT token passthrough (we validate tokens before use)
- This IS token reuse which is explicitly allowed by RFC 8707

Updates:
- Simplified _validate_multi_audience() to follow OAuth spec
- Updated docstrings and comments to clarify RFC 7519 compliance
- Fixed unit tests that expected dual-audience validation
- Updated ADR-005 to document the correct OAuth interpretation
- All tests pass: unit (65), smoke (5), OAuth integration

This makes the implementation simpler, more maintainable, and properly
aligned with OAuth 2.0 specifications while maintaining security.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 21:44:04 +01:00
Chris Coutinho 877c4c91e0 fix: Use Keycloak client ID for NEXTCLOUD_RESOURCE_URI in token exchange
Fix external IdP token exchange by using the correct audience identifier
for Keycloak.

Keycloak uses client IDs as audience identifiers, not URLs. The token
exchange was failing with "Audience not found" because it was requesting
audience "http://localhost:8080" but Keycloak only knows about the
"nextcloud" client ID.

Changes:
- Update mcp-keycloak service NEXTCLOUD_RESOURCE_URI from
  "http://localhost:8080" to "nextcloud"
- Matches Keycloak's client ID convention for resource identifiers
- Token exchange now requests audience "nextcloud" which matches the
  Keycloak resource server client configuration

Note: mcp-oauth service keeps URL-based resource URI because Nextcloud's
integrated OIDC app expects URLs, not client IDs. Different IdPs have
different conventions for audience/resource identifiers.

Test result: test_external_idp_token_validation now passes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 19:18:10 +01:00
Chris Coutinho 5deb3132c3 fix: Correct OAuth token audience validation for multi-audience mode
Fix two issues preventing OAuth tests from passing:

1. Set oidc_client_id and oidc_client_secret on Settings object
   - These were being read from environment but not propagated to the
     UnifiedTokenVerifier settings instance

2. Use client_issuer instead of issuer for JWT validation
   - client_issuer accounts for NEXTCLOUD_PUBLIC_ISSUER_URL override
   - Fixes "Invalid issuer" errors when public URL differs from internal

3. Accept resource URL with /mcp path in audience validation
   - During DCR, resource_url is registered as "{mcp_server_url}/mcp"
   - Tokens correctly include this full path as audience
   - Verifier now accepts both "http://localhost:8001" and
     "http://localhost:8001/mcp" as valid MCP audiences

These changes restore OAuth functionality while maintaining ADR-005
security requirements for proper audience validation.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 19:03:35 +01:00
Chris Coutinho 9fab6cb550 feat: Implement ADR-005 unified token verifier to eliminate token passthrough vulnerability
Replace two non-compliant token verifiers (NextcloudTokenVerifier and
ProgressiveConsentTokenVerifier) with a single UnifiedTokenVerifier that properly
validates token audiences per MCP Security Best Practices specification.

The previous implementation had a critical security vulnerability where tokens
intended for the MCP server were passed directly to Nextcloud APIs without
proper audience validation (token passthrough anti-pattern). This violates
OAuth 2.0 security principles and the MCP specification.

Changes:
- Add UnifiedTokenVerifier supporting two compliant modes:
  * Multi-audience mode (default): Validates tokens contain BOTH MCP and
    Nextcloud audiences, enabling direct use without exchange
  * Token exchange mode (opt-in): Validates MCP audience only, exchanges
    for Nextcloud tokens via RFC 8693 with caching to minimize latency

- Remove token passthrough vulnerability from context.py and context_helper.py
- Implement token exchange caching (5-minute TTL default) to reduce network calls
- Add required environment variables for audience validation:
  * NEXTCLOUD_MCP_SERVER_URL - MCP server URL (used as audience)
  * NEXTCLOUD_RESOURCE_URI - Nextcloud resource identifier
  * TOKEN_EXCHANGE_CACHE_TTL - Cache TTL for exchanged tokens

- Update docker-compose.yml with resource URI configuration for both OAuth modes
- Add comprehensive test suite (29 tests) covering both authentication modes
- Remove legacy NextcloudTokenVerifier and ProgressiveConsentTokenVerifier

Security improvements:
- Eliminates token passthrough anti-pattern
- Enforces proper audience separation between MCP and Nextcloud
- Complies with MCP Security Best Practices and RFC 8707/8693
- Maintains performance with token exchange caching

Test results: 65/65 unit tests passed, 5/5 smoke tests passed

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 18:53:14 +01:00
Chris Coutinho 28c2debf3e docs: Add ADR-005 for unified token verifier architecture
This ADR addresses the critical token passthrough vulnerability identified
in Issue #261 by proposing a unified token verifier that eliminates the
security issue while maintaining flexibility.

Key changes:
- Consolidates two non-compliant verifiers into single UnifiedTokenVerifier
- Implements two-layer architecture (verification + exchange)
- Supports multi-audience mode (default) and token exchange mode (opt-in)
- Removes all token passthrough paths to comply with MCP security spec
- Works within python-sdk constraints using proper separation of concerns

The solution provides:
- Single source of truth for token validation
- MCP specification compliance
- Minimal performance impact (1-2% of LLM request time)
- Clear migration path for existing deployments

BREAKING CHANGE: All OAuth deployments must be reconfigured to specify
resource URIs (NEXTCLOUD_MCP_SERVER_URL and NEXTCLOUD_RESOURCE_URI) and
choose between multi-audience or token exchange mode.

Related: #261
Supersedes: Token passthrough mode in ADR-004

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-05 18:34:43 +01:00
Chris Coutinho 461971a1a8 Merge pull request #262 from cbcoutinho/feature/user-settings
Feature/user settings
2025-11-05 15:59:54 +01:00
Chris Coutinho ed9a8677fe Merge pull request #260 from cbcoutinho/renovate/docker-metadata-action-digest
chore(deps): update docker/metadata-action digest to 318604b
2025-11-05 05:53:52 +01:00
Chris Coutinho e8c499938f Merge pull request #259 from cbcoutinho/renovate/docker.io-library-nextcloud-32.0.1
chore(deps): update docker.io/library/nextcloud:32.0.1 docker digest to 40b1b5d
2025-11-05 05:43:17 +01:00
renovate-bot-cbcoutinho[bot] 4d8b6fca49 chore(deps): update docker.io/library/nextcloud:32.0.1 docker digest to 40b1b5d 2025-11-04 23:09:17 +00:00
renovate-bot-cbcoutinho[bot] 67eb4455fd chore(deps): update docker/metadata-action digest to 318604b 2025-11-04 17:08:19 +00:00
github-actions[bot] 7052c19de0 bump: version 0.24.0 → 0.24.1 2025-11-04 12:28:13 +00:00
Chris Coutinho 921854ce87 Merge pull request #253 from cbcoutinho/renovate/mcp-1.x
fix(deps): update dependency mcp to >=1.20,<1.21
2025-11-04 13:27:46 +01:00
renovate-bot-cbcoutinho[bot] 3e988acb97 fix(deps): update dependency mcp to >=1.20,<1.21 2025-11-04 11:08:34 +00:00
22 changed files with 2879 additions and 962 deletions
+1 -1
View File
@@ -16,7 +16,7 @@ jobs:
- name: Docker meta
id: meta
uses: docker/metadata-action@c1e51972afc2121e065aed6d45c65596fe445f3f # v5
uses: docker/metadata-action@318604b99e75e41977312d83839a89be02ca4893 # v5
with:
# list of Docker images to use as base name for tags
images: |
+29
View File
@@ -1,3 +1,32 @@
## v0.25.0 (2025-11-05)
### BREAKING CHANGE
- All OAuth deployments must be reconfigured to specify
resource URIs (NEXTCLOUD_MCP_SERVER_URL and NEXTCLOUD_RESOURCE_URI) and
choose between multi-audience or token exchange mode.
### Feat
- Implement ADR-005 unified token verifier to eliminate token passthrough vulnerability
### Fix
- Implement proper OAuth resource parameters and PRM-based discovery
- Simplify token verifier to be RFC 7519 compliant
- Use Keycloak client ID for NEXTCLOUD_RESOURCE_URI in token exchange
- Correct OAuth token audience validation for multi-audience mode
### Refactor
- Eliminate duplicate validation logic in UnifiedTokenVerifier
## v0.24.1 (2025-11-04)
### Fix
- **deps**: update dependency mcp to >=1.20,<1.21
## v0.24.0 (2025-11-04)
### Feat
@@ -35,5 +35,6 @@ php /var/www/html/occ config:app:set oidc dynamic_client_registration --value='t
php /var/www/html/occ config:app:set oidc proof_key_for_code_exchange --value=true --type=boolean
php /var/www/html/occ config:app:set oidc allow_user_settings --value='enabled'
php /var/www/html/occ config:app:set oidc default_token_type --value='jwt'
php /var/www/html/occ config:app:set oidc default_resource_identifier --value='http://localhost:8080'
echo "OIDC app installed and configured successfully"
+3
View File
@@ -0,0 +1,3 @@
#!/bin/bash
php /var/www/html/occ config:app:set --value false firstrunwizard wizard_enabled
+2 -2
View File
@@ -2,8 +2,8 @@ apiVersion: v2
name: nextcloud-mcp-server
description: A Helm chart for Nextcloud MCP Server - enables AI assistants to interact with Nextcloud
type: application
version: 0.24.0
appVersion: "0.24.0"
version: 0.25.0
appVersion: "0.25.0"
keywords:
- nextcloud
- mcp
+10 -4
View File
@@ -21,7 +21,7 @@ services:
restart: always
app:
image: docker.io/library/nextcloud:32.0.1@sha256:1e4eae55eebe094cae6f9e7b6e0b4bccf4a4fe7b7e6f6f8f57010994b3b2ee42
image: docker.io/library/nextcloud:32.0.1@sha256:40b1b5dc35bcc9a0e922ec847451e43fa14222c9a99dcd5dfcc03b08f6c15775
restart: always
ports:
- 0.0.0.0:8080:80
@@ -96,6 +96,7 @@ services:
# OIDC_CLIENT_ID not set - uses Dynamic Client Registration (DCR)
- NEXTCLOUD_HOST=http://app:80
- NEXTCLOUD_MCP_SERVER_URL=http://localhost:8001
- NEXTCLOUD_RESOURCE_URI=http://localhost:8080 # ADR-005: Nextcloud resource identifier for audience validation
- NEXTCLOUD_PUBLIC_ISSUER_URL=http://localhost:8080
- NEXTCLOUD_OIDC_SCOPES=openid profile email notes:read notes:write calendar:read calendar:write contacts:read contacts:write cookbook:read cookbook:write deck:read deck:write tables:read tables:write files:read files:write sharing:read sharing:write todo:read todo:write
@@ -104,8 +105,9 @@ services:
- TOKEN_ENCRYPTION_KEY=ESF1BvEQdGYsCluwMx9Cxvw3uh5pFowPH7Rg_nIliyo=
- TOKEN_STORAGE_DB=/app/data/tokens.db
# ADR-004: Use Hybrid Flow (server intercepts OAuth callback)
# Set to false to enable Hybrid Flow tests - server stores refresh token and issues MCP codes
# ADR-005: Multi-audience mode (default - ENABLE_TOKEN_EXCHANGE=false)
# Tokens must contain BOTH MCP and Nextcloud audiences
# No token exchange needed - tokens work for both MCP auth and Nextcloud APIs
# NO admin credentials - using OAuth with Dynamic Client Registration (DCR)
# Client credentials registered via RFC 7591 and stored in volume
@@ -159,6 +161,7 @@ services:
# Nextcloud API endpoint (for accessing APIs with validated token)
- NEXTCLOUD_HOST=http://app:80
- NEXTCLOUD_MCP_SERVER_URL=http://localhost:8002
- NEXTCLOUD_RESOURCE_URI=nextcloud # ADR-005: Keycloak uses client IDs as audiences, not URLs
- NEXTCLOUD_PUBLIC_ISSUER_URL=http://localhost:8888/realms/nextcloud-mcp
# Refresh token storage (ADR-002 Tier 1 & 2)
@@ -166,8 +169,11 @@ services:
- TOKEN_ENCRYPTION_KEY=ESF1BvEQdGYsCluwMx9Cxvw3uh5pFowPH7Rg_nIliyo=
- TOKEN_STORAGE_DB=/app/data/tokens.db
# Token exchange (RFC 8693) - convert aud:nextcloud-mcp-server → aud:nextcloud
# ADR-005: Token exchange mode (RFC 8693)
# Exchange MCP tokens (aud: nextcloud-mcp-server) for Nextcloud tokens (aud: http://localhost:8080)
# Provides strict audience separation between MCP session and Nextcloud API access
- ENABLE_TOKEN_EXCHANGE=true
- TOKEN_EXCHANGE_CACHE_TTL=300 # Cache exchanged tokens for 5 minutes (default)
# OAuth scopes (optional - uses defaults if not specified)
- NEXTCLOUD_OIDC_SCOPES=openid profile email offline_access notes:read notes:write calendar:read calendar:write contacts:read contacts:write cookbook:read cookbook:write deck:read deck:write tables:read tables:write files:read files:write sharing:read sharing:write todo:read todo:write
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,651 @@
# ADR-006: Progressive Consent via URL Elicitation (SEP-1036)
**Status**: Proposed
**Date**: 2025-01-05
**Related**: [SEP-1036](https://github.com/modelcontextprotocol/specification/pull/887), ADR-004
**Depends On**: ADR-005 (token validation)
## Context
The current progressive consent implementation (ADR-004) requires users to manually visit OAuth URLs returned by MCP tools. This creates a poor user experience:
1. User calls `provision_nextcloud_access` tool
2. Tool returns a URL as text in the response
3. User must manually copy URL and open in browser
4. No indication when provisioning is complete
5. User must retry the original operation manually
### SEP-1036: URL Mode Elicitation
The MCP specification now supports **URL mode elicitation** ([SEP-1036](https://github.com/modelcontextprotocol/specification/pull/887)), which enables servers to:
- Request out-of-band user interactions via secure URLs
- Handle sensitive operations like OAuth flows without exposing credentials to the client
- Provide progress tracking for async operations
- Return errors that automatically trigger elicitation flows
**Key benefits for progressive consent**:
- **Automatic URL Opening**: Client opens URL in browser automatically (with user consent)
- **Progress Tracking**: Server can notify client when provisioning is complete
- **Error-Triggered Flows**: Server can return `ElicitationRequired` error to trigger provisioning
- **Better UX**: User doesn't manually copy/paste URLs
### Current Implementation Limitations
The current progressive consent flow in `nextcloud_mcp_server/server/oauth_tools.py`:
```python
@mcp.tool(name="provision_nextcloud_access")
async def tool_provision_access(ctx: Context) -> ProvisioningResult:
"""Returns OAuth URL as text - user must manually open it."""
return ProvisioningResult(
success=True,
authorization_url=auth_url, # User must copy this
message="Please visit the authorization URL..."
)
```
**Problems**:
1. Manual URL handling (copy/paste)
2. No progress tracking
3. No automatic retry after provisioning
4. Tool call required just to get URL
5. No client integration (URL just displayed as text)
## Decision
We will **migrate progressive consent from manual tools to URL mode elicitation**, leveraging SEP-1036 for better user experience and OAuth security.
### New Architecture: Elicitation-Driven Consent
Instead of explicit tools, use **automatic elicitation** triggered by authorization errors:
```
User → Calls Nextcloud Tool → Server Checks Provisioning
↓ Not Provisioned
Error: ElicitationRequired
Client Shows Consent UI
↓ User Accepts
Client Opens OAuth URL
User Completes OAuth
Server Sends Progress Update
Original Tool Call Auto-Retries
```
### Mode 1: Elicitation-Required Error (Primary)
When a tool requires provisioning, return an **ElicitationRequired error** (-32000):
```python
# In any Nextcloud tool decorated with @require_provisioning
@mcp.tool()
@require_provisioning # New decorator
async def nc_notes_list_notes(ctx: Context):
"""List notes - auto-triggers provisioning if needed."""
# If not provisioned, decorator returns ElicitationRequired error
# If provisioned, continues normally
client = await get_client(ctx)
return await client.notes.list_notes()
```
**Error response structure**:
```json
{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32000,
"message": "Nextcloud access provisioning required",
"data": {
"elicitations": [
{
"mode": "url",
"elicitationId": "550e8400-e29b-41d4-a716-446655440000",
"url": "https://mcp.example.com/oauth/provision?id=550e8400...",
"message": "Grant the MCP server access to your Nextcloud account to continue."
}
]
}
}
}
```
**Client behavior**:
1. Receives error with elicitation
2. Shows consent UI: "App wants to access Nextcloud. Open authorization page?"
3. On user acceptance, opens URL in browser
4. Optionally tracks progress via `elicitation/track`
5. Auto-retries original tool call when complete
### Mode 2: Explicit Elicitation Request (Fallback)
For clients that don't support error-triggered elicitation, provide explicit tool:
```python
@mcp.tool(name="request_nextcloud_access")
async def request_access(ctx: Context) -> ElicitationResponse:
"""Explicitly request provisioning via elicitation."""
# Send elicitation/create request
return await create_elicitation(
mode="url",
url=generate_oauth_url(),
message="Grant access to Nextcloud",
elicitation_id=generate_id()
)
```
**Note**: This is a fallback for compatibility. Primary flow uses error-triggered elicitation.
## Implementation
### 1. New Decorator: `@require_provisioning`
Replace explicit provisioning checks with a decorator that returns `ElicitationRequired`:
```python
# nextcloud_mcp_server/auth/provisioning_decorator.py
def require_provisioning(func):
"""
Decorator that ensures user has provisioned Nextcloud access.
If not provisioned, returns ElicitationRequired error with OAuth URL.
Otherwise, proceeds with normal tool execution.
"""
@functools.wraps(func)
async def wrapper(ctx: Context, *args, **kwargs):
# Extract user ID from token
user_id = get_user_id_from_context(ctx)
# Check if provisioned
storage = RefreshTokenStorage.from_env()
await storage.initialize()
if not await storage.has_refresh_token(user_id):
# Not provisioned - return ElicitationRequired error
elicitation_id = str(uuid.uuid4())
oauth_url = await generate_oauth_url_for_provisioning(
user_id=user_id,
elicitation_id=elicitation_id,
ctx=ctx
)
# Store elicitation for tracking
await storage.store_elicitation(
elicitation_id=elicitation_id,
user_id=user_id,
status="pending",
created_at=datetime.now(timezone.utc)
)
raise McpError(
code=ErrorCode.ELICITATION_REQUIRED, # -32000
message="Nextcloud access provisioning required",
data={
"elicitations": [
{
"mode": "url",
"elicitationId": elicitation_id,
"url": oauth_url,
"message": (
"Grant the MCP server access to your Nextcloud "
"account to continue. This is a one-time setup."
)
}
]
}
)
# Already provisioned - proceed normally
return await func(ctx, *args, **kwargs)
return wrapper
```
### 2. Elicitation Tracking Endpoint
Implement `elicitation/track` to provide progress updates:
```python
# nextcloud_mcp_server/server/elicitation.py
@mcp.request_handler("elicitation/track")
async def track_elicitation(
elicitation_id: str,
_meta: dict = None
) -> dict:
"""
Track progress of an elicitation request.
Returns when elicitation is complete or times out.
"""
progress_token = _meta.get("progressToken") if _meta else None
storage = RefreshTokenStorage.from_env()
await storage.initialize()
# Poll for completion (with timeout)
timeout = 300 # 5 minutes
start_time = datetime.now(timezone.utc)
while (datetime.now(timezone.utc) - start_time).seconds < timeout:
elicitation = await storage.get_elicitation(elicitation_id)
if not elicitation:
raise McpError(
code=-32602, # Invalid params
message=f"Unknown elicitation ID: {elicitation_id}"
)
# Send progress notification if token provided
if progress_token and elicitation["status"] == "pending":
await send_progress_notification(
progress_token=progress_token,
progress=50,
message="Waiting for OAuth authorization..."
)
# Check if complete
if elicitation["status"] == "complete":
return {"status": "complete"}
# Check if failed
if elicitation["status"] == "failed":
return {
"status": "failed",
"error": elicitation.get("error_message")
}
# Wait before polling again
await asyncio.sleep(2)
# Timeout
raise McpError(
code=-32000,
message="Elicitation timed out - user did not complete authorization"
)
```
### 3. OAuth Callback Updates
Update the OAuth callback to mark elicitations as complete:
```python
# nextcloud_mcp_server/auth/oauth_routes.py
async def oauth_callback(request: Request) -> Response:
"""Handle OAuth callback and mark elicitation complete."""
code = request.query_params.get("code")
state = request.query_params.get("state")
# Validate and exchange code for tokens
tokens = await exchange_authorization_code(code)
# Store refresh token
await storage.store_refresh_token(
user_id=user_id,
refresh_token=tokens["refresh_token"]
)
# Mark elicitation as complete
elicitation_id = request.query_params.get("elicitation_id")
if elicitation_id:
await storage.update_elicitation(
elicitation_id=elicitation_id,
status="complete",
completed_at=datetime.now(timezone.utc)
)
return Response(
content="<h1>Authorization Complete!</h1>"
"<p>You can close this window and return to the application.</p>",
media_type="text/html"
)
```
### 4. Update All Nextcloud Tools
Add `@require_provisioning` decorator to all Nextcloud tools:
```python
# nextcloud_mcp_server/server/notes.py
@mcp.tool()
@require_scopes("notes:read")
@require_provisioning # NEW: Auto-triggers provisioning
async def nc_notes_list_notes(
ctx: Context,
category: Optional[str] = None
) -> NotesListResponse:
"""List all notes - automatically handles provisioning."""
client = await get_client(ctx)
# Tool logic proceeds only if provisioned
notes = await client.notes.list_notes(category=category)
return NotesListResponse(results=notes)
```
### 5. Capability Declaration
Declare URL elicitation support during initialization:
```python
# nextcloud_mcp_server/app.py
capabilities = {
"elicitation": {
"url": {} # Declare URL mode support
# Note: We don't support "form" mode (in-band data collection)
},
# ... other capabilities
}
```
### 6. Environment Variables
**New variables**:
```bash
# ELICITATION_CALLBACK_URL: Base URL for OAuth callbacks with elicitation tracking
# Default: NEXTCLOUD_MCP_SERVER_URL + /oauth/callback
ELICITATION_CALLBACK_URL=http://localhost:8000/oauth/callback
# ELICITATION_TIMEOUT_SECONDS: How long to wait for user to complete OAuth
# Default: 300 (5 minutes)
ELICITATION_TIMEOUT_SECONDS=300
```
**Removed variables** (no longer needed):
```bash
# ENABLE_PROGRESSIVE_CONSENT - removed, now always enabled in OAuth mode
# MCP_SERVER_CLIENT_ID - merged into OIDC_CLIENT_ID
```
## User Experience Comparison
### Before (ADR-004 Manual Tools)
```
User: "List my notes"
Assistant: *calls nc_notes_list_notes*
Server: Error - not provisioned
Assistant: "You need to provision access first. Let me do that."
Assistant: *calls provision_nextcloud_access*
Server: {authorization_url: "https://..."}
Assistant: "Please visit this URL: https://..."
User: *copies URL, opens browser, completes OAuth*
User: "OK, I'm done"
Assistant: *calls nc_notes_list_notes again*
Server: Success! [notes...]
```
**Issues**: 4 interactions, manual URL handling, no automation
### After (ADR-006 Elicitation)
```
User: "List my notes"
Assistant: *calls nc_notes_list_notes*
Server: ElicitationRequired error
Client: Shows dialog: "Grant access to Nextcloud? [Yes] [No]"
User: *clicks Yes*
Client: Opens OAuth URL in browser automatically
User: *completes OAuth*
Server: Sends progress notification "Complete!"
Client: Auto-retries nc_notes_list_notes
Server: Success! [notes...]
Assistant: "Here are your notes: ..."
```
**Benefits**: 1 interaction, automatic URL opening, seamless retry
## Migration Path
### Phase 1: Add Elicitation Support (v0.26.0)
- Implement `@require_provisioning` decorator
- Add `elicitation/track` endpoint
- Keep existing tools (`provision_nextcloud_access`) for compatibility
- Update OAuth callback to track elicitations
- Add capability declaration
**Breaking changes**: None (additive)
### Phase 2: Update Documentation (v0.27.0)
- Document elicitation-based flow as primary
- Mark manual tools as deprecated
- Update examples and guides
**Breaking changes**: None (documentation only)
### Phase 3: Remove Manual Tools (v0.28.0)
- Remove `provision_nextcloud_access` tool
- Remove `check_provisioning_status` tool (status in error message)
- Remove `revoke_nextcloud_access` (or keep for explicit revocation?)
**Breaking changes**: Yes (removed tools)
### Phase 4: Optimize (v0.29.0+)
- Add elicitation result caching
- Implement retry strategies
- Add metrics and monitoring
## Testing
### Test Cases
1. **First-Time User Flow**
```python
@pytest.mark.oauth
async def test_elicitation_first_time_user(nc_mcp_oauth_client):
"""Test that first tool call triggers elicitation."""
# User has no provisioning
with pytest.raises(McpError) as exc:
await nc_mcp_oauth_client.call_tool("nc_notes_list_notes")
# Should get ElicitationRequired error
assert exc.value.code == -32000
assert "elicitations" in exc.value.data
assert exc.value.data["elicitations"][0]["mode"] == "url"
# Verify URL is valid OAuth URL
url = exc.value.data["elicitations"][0]["url"]
assert "oauth" in url
assert "elicitationId" in url
```
2. **Progress Tracking**
```python
@pytest.mark.oauth
async def test_elicitation_progress_tracking(nc_mcp_oauth_client):
"""Test progress tracking during OAuth flow."""
# Trigger elicitation
elicitation_id = trigger_elicitation()
# Start tracking
track_task = asyncio.create_task(
nc_mcp_oauth_client.track_elicitation(
elicitation_id=elicitation_id,
progress_token="test-token"
)
)
# Simulate OAuth completion
await asyncio.sleep(1)
await complete_oauth_flow(elicitation_id)
# Track should complete
result = await track_task
assert result["status"] == "complete"
```
3. **Auto-Retry After Provisioning**
```python
@pytest.mark.oauth
async def test_auto_retry_after_provisioning(nc_mcp_oauth_client):
"""Test that client auto-retries after elicitation."""
# Mock client that auto-retries on ElicitationRequired
client = AutoRetryMcpClient(nc_mcp_oauth_client)
# First call triggers elicitation, client handles it, retries
result = await client.call_tool_with_elicitation("nc_notes_list_notes")
# Should succeed after provisioning
assert result.success
assert "notes" in result.data
```
4. **Timeout Handling**
```python
@pytest.mark.oauth
async def test_elicitation_timeout(nc_mcp_oauth_client):
"""Test timeout if user doesn't complete OAuth."""
elicitation_id = trigger_elicitation()
# Track with short timeout
with pytest.raises(McpError, match="timed out"):
await nc_mcp_oauth_client.track_elicitation(
elicitation_id=elicitation_id,
timeout=5 # 5 seconds
)
```
## Security Considerations
### Out-of-Band OAuth Flow
**Benefit**: OAuth credentials never pass through MCP client
- User enters credentials directly on IdP page
- MCP server receives only authorization code
- Client never sees passwords or refresh tokens
**Threat mitigation**:
- **Credential theft**: Client can't intercept credentials (out-of-band)
- **Token exposure**: Client never receives Nextcloud refresh tokens
- **CSRF**: State parameter validates OAuth callback
- **URL tampering**: Elicitation ID ties OAuth flow to user session
### Elicitation ID as Security Token
The `elicitationId` serves as a capability token:
- Cryptographically random (UUID v4)
- Single-use (invalidated after completion)
- Time-limited (expires after timeout)
- User-scoped (tied to user session)
**Validation**:
```python
async def validate_elicitation_id(elicitation_id: str, user_id: str) -> bool:
"""Validate that elicitation belongs to user and is still valid."""
elicitation = await storage.get_elicitation(elicitation_id)
if not elicitation:
return False
# Check ownership
if elicitation["user_id"] != user_id:
logger.warning(f"Elicitation ID mismatch: {elicitation_id}")
return False
# Check expiry
if elicitation["expires_at"] < datetime.now(timezone.utc):
return False
# Check not already used
if elicitation["status"] != "pending":
return False
return True
```
### Progress Tracking Security
**Risk**: Progress token reuse across users
**Mitigation**:
- Progress tokens tied to elicitation ID
- Elicitation ID tied to user session
- Server validates ownership before sending updates
## Consequences
### Positive
1. **Better UX**: Automatic URL opening, no manual copy/paste
2. **Seamless Flow**: Auto-retry after provisioning
3. **Progress Feedback**: User knows when OAuth is complete
4. **Spec Compliance**: Implements SEP-1036 correctly
5. **Secure by Design**: Out-of-band OAuth prevents credential exposure
6. **Simpler API**: No explicit provisioning tools needed
### Negative
1. **Client Dependency**: Requires client support for URL elicitation
2. **Complexity**: More moving parts (elicitation tracking, callbacks)
3. **Polling**: Progress tracking uses polling (not ideal)
4. **Breaking Change**: Removes manual provisioning tools (in v0.28.0)
### Neutral
1. **Storage Requirements**: Need to store elicitation state
2. **Timeout Management**: Must handle long-running OAuth flows
3. **Fallback Support**: Still need compatibility for older clients
## Alternatives Considered
### 1. Keep Manual Tools Only (Rejected)
**Pros**: Simple, no client changes needed
**Cons**: Poor UX, doesn't leverage SEP-1036
**Rejection reason**: SEP-1036 provides better UX and security
### 2. Form Mode Elicitation (Rejected)
**Pros**: No browser redirect needed
**Cons**: Would expose OAuth credentials to client (security violation)
**Rejection reason**: Form mode only for non-sensitive data per SEP-1036
### 3. Hybrid: Both Tools and Elicitation (Considered)
**Pros**: Maximum compatibility, gradual migration
**Cons**: API duplication, maintenance burden, confusing for users
**Decision**: Support during migration (v0.26-0.27), remove in v0.28
### 4. WebSocket for Progress (Rejected)
**Pros**: Real-time updates instead of polling
**Cons**: MCP spec uses polling pattern, adds complexity
**Rejection reason**: Follow spec pattern (polling via elicitation/track)
## References
- [SEP-1036: URL Mode Elicitation](https://github.com/modelcontextprotocol/specification/pull/887)
- [MCP Elicitation Specification](https://modelcontextprotocol.io/specification/draft/client/elicitation)
- [ADR-004: Federated Authentication Architecture](./ADR-004-mcp-application-oauth.md)
- [ADR-005: Token Audience Validation](./ADR-005-token-audience-validation.md)
- [RFC 8252: OAuth 2.0 for Native Apps](https://datatracker.ietf.org/doc/html/rfc8252)
## Implementation Checklist
- [ ] Implement `@require_provisioning` decorator with ElicitationRequired error
- [ ] Add `elicitation/track` request handler
- [ ] Update OAuth callback to mark elicitations complete
- [ ] Add elicitation storage (ID, user, status, timestamps)
- [ ] Update all Nextcloud tools with `@require_provisioning`
- [ ] Add URL elicitation capability declaration
- [ ] Write integration tests for elicitation flow
- [ ] Write tests for progress tracking
- [ ] Update documentation with elicitation examples
- [ ] Add migration guide for manual tools → elicitation
- [ ] Keep manual tools with deprecation warnings (v0.26-0.27)
- [ ] Remove manual tools (v0.28.0)
- [ ] Update CHANGELOG.md with migration timeline
+71 -37
View File
@@ -27,9 +27,7 @@ from nextcloud_mcp_server.auth import (
has_required_scopes,
is_jwt_token,
)
from nextcloud_mcp_server.auth.progressive_token_verifier import (
ProgressiveConsentTokenVerifier,
)
from nextcloud_mcp_server.auth.unified_verifier import UnifiedTokenVerifier
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import (
LOGGING_CONFIG,
@@ -215,9 +213,7 @@ class OAuthAppContext:
"""Application context for OAuth mode."""
nextcloud_host: str
token_verifier: (
object # Can be NextcloudTokenVerifier or ProgressiveConsentTokenVerifier
)
token_verifier: object # UnifiedTokenVerifier (ADR-005 compliant)
refresh_token_storage: Optional["RefreshTokenStorage"] = None
oauth_client: Optional[object] = None # NextcloudOAuthClient or KeycloakOAuthClient
oauth_provider: str = "nextcloud" # "nextcloud" or "keycloak"
@@ -555,46 +551,80 @@ async def setup_oauth_config():
else:
client_issuer = issuer
# Progressive Consent mode (always enabled) - dual OAuth flows with audience separation
logger.info("✓ Progressive Consent mode enabled - dual OAuth flows active")
# ADR-005: Unified Token Verifier with proper audience validation
# Get MCP server URL for audience validation
mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000")
nextcloud_resource_uri = os.getenv("NEXTCLOUD_RESOURCE_URI", nextcloud_host)
# Get encryption key for token broker
encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY")
if not encryption_key:
# Warn if resource URIs are not configured (required for ADR-005 compliance)
if not os.getenv("NEXTCLOUD_MCP_SERVER_URL"):
logger.warning(
"TOKEN_ENCRYPTION_KEY not set - token broker will not be available"
f"NEXTCLOUD_MCP_SERVER_URL not set, defaulting to: {mcp_server_url}. "
"This should be set explicitly for proper audience validation."
)
if not os.getenv("NEXTCLOUD_RESOURCE_URI"):
logger.warning(
f"NEXTCLOUD_RESOURCE_URI not set, defaulting to: {nextcloud_resource_uri}. "
"This should be set explicitly for proper audience validation."
)
# Create token broker service
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
# Create settings for UnifiedTokenVerifier
from nextcloud_mcp_server.config import get_settings
token_broker = None
if encryption_key and refresh_token_storage:
token_broker = TokenBrokerService(
storage=refresh_token_storage,
oidc_discovery_url=discovery_url,
nextcloud_host=nextcloud_host,
encryption_key=encryption_key,
settings = get_settings()
# Override with discovered values if not set in environment
if not settings.oidc_client_id:
settings.oidc_client_id = client_id
if not settings.oidc_client_secret:
settings.oidc_client_secret = client_secret
if not settings.jwks_uri:
settings.jwks_uri = jwks_uri
if not settings.introspection_uri:
settings.introspection_uri = introspection_uri
if not settings.userinfo_uri:
settings.userinfo_uri = userinfo_uri
if not settings.oidc_issuer:
# Use client_issuer which handles public URL override
settings.oidc_issuer = client_issuer
if not settings.nextcloud_mcp_server_url:
settings.nextcloud_mcp_server_url = mcp_server_url
if not settings.nextcloud_resource_uri:
settings.nextcloud_resource_uri = nextcloud_resource_uri
# Create Unified Token Verifier (ADR-005 compliant)
token_verifier = UnifiedTokenVerifier(settings)
# Log the mode
enable_token_exchange = (
os.getenv("ENABLE_TOKEN_EXCHANGE", "false").lower() == "true"
)
if enable_token_exchange:
logger.info(
"✓ Token Exchange mode enabled (ADR-005) - exchanging MCP tokens for Nextcloud tokens via RFC 8693"
)
logger.info("✓ Token Broker service initialized for audience-specific tokens")
logger.info(f" MCP audience: {client_id} or {mcp_server_url}")
logger.info(f" Nextcloud audience: {nextcloud_resource_uri}")
else:
logger.info(
"✓ Multi-audience mode enabled (ADR-005) - tokens must contain both MCP and Nextcloud audiences"
)
logger.info(f" Required MCP audience: {client_id} or {mcp_server_url}")
logger.info(f" Required Nextcloud audience: {nextcloud_resource_uri}")
# Create Progressive Consent token verifier
token_verifier = ProgressiveConsentTokenVerifier(
token_storage=refresh_token_storage,
token_broker=token_broker,
oidc_discovery_url=discovery_url,
nextcloud_host=nextcloud_host,
encryption_key=encryption_key,
mcp_client_id=client_id,
introspection_uri=introspection_uri,
client_secret=client_secret,
)
logger.info(
"✓ Progressive Consent verifier configured - enforcing audience separation"
)
if introspection_uri:
logger.info("✓ Opaque token introspection enabled (RFC 7662)")
if jwks_uri:
logger.info("✓ JWT signature verification enabled (JWKS)")
# Progressive Consent mode (for offline access / background jobs)
encryption_key = os.getenv("TOKEN_ENCRYPTION_KEY")
if enable_offline_access and encryption_key and refresh_token_storage:
logger.info("✓ Progressive Consent mode enabled - offline access available")
# Note: Token Broker service would be initialized here for background job support
# Currently not used in ADR-005 implementation as it's specific to offline access patterns
# that are separate from the real-time token exchange flow
logger.debug("Token broker available for future offline access features")
# Create OAuth client for server-initiated flows (e.g., token exchange, background workers)
oauth_client = None
@@ -837,6 +867,9 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
mcp_server_url = os.getenv(
"NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000"
)
nextcloud_resource_uri = os.getenv(
"NEXTCLOUD_RESOURCE_URI", nextcloud_host
)
discovery_url = os.getenv(
"OIDC_DISCOVERY_URL",
f"{nextcloud_host}/.well-known/openid-configuration",
@@ -854,6 +887,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
"client_secret": client_secret, # From setup_oauth_config (DCR or static)
"scopes": scopes,
"nextcloud_host": nextcloud_host,
"nextcloud_resource_uri": nextcloud_resource_uri,
"oauth_provider": oauth_provider,
},
}
+2 -2
View File
@@ -14,11 +14,11 @@ from .scope_authorization import (
is_jwt_token,
require_scopes,
)
from .token_verifier import NextcloudTokenVerifier
from .unified_verifier import UnifiedTokenVerifier
__all__ = [
"BearerAuth",
"NextcloudTokenVerifier",
"UnifiedTokenVerifier",
"register_client",
"ensure_oauth_client",
"get_client_from_context",
+88 -36
View File
@@ -1,6 +1,11 @@
"""Helper functions for extracting OAuth context from MCP requests."""
"""Helper functions for extracting OAuth context from MCP requests.
ADR-005 compliant implementation with token exchange caching.
"""
import hashlib
import logging
import time
from mcp.server.auth.provider import AccessToken
from mcp.server.fastmcp import Context
@@ -11,35 +16,36 @@ from .token_exchange import exchange_token_for_audience
logger = logging.getLogger(__name__)
# Token exchange cache: token_hash -> (exchanged_token, expiry_timestamp)
_exchange_cache: dict[str, tuple[str, float]] = {}
def get_client_from_context(ctx: Context, base_url: str) -> NextcloudClient:
"""
Extract authenticated user context from MCP request and create NextcloudClient.
Create NextcloudClient for multi-audience mode (no exchange needed).
This function retrieves the OAuth access token from the MCP context,
extracts the username from the token's resource field (where we stored it
during token verification), and creates a NextcloudClient with bearer auth.
ADR-005 Mode 1: Use multi-audience tokens directly.
The UnifiedTokenVerifier validated MCP audience per RFC 7519.
Nextcloud will independently validate its own audience.
Args:
ctx: MCP request context containing session info
base_url: Nextcloud base URL
Returns:
NextcloudClient configured with bearer token auth
NextcloudClient configured with multi-audience token
Raises:
AttributeError: If context doesn't contain expected OAuth session data
ValueError: If username cannot be extracted from token
"""
try:
# In Starlette with FastMCP OAuth, the authenticated user info is stored in request.user
# The FastMCP auth middleware sets request.user to an AuthenticatedUser object
# which contains the access_token
# Extract validated access token from MCP context
if hasattr(ctx.request_context.request, "user") and hasattr(
ctx.request_context.request.user, "access_token"
):
access_token: AccessToken = ctx.request_context.request.user.access_token
logger.debug("Retrieved access token from request.user for OAuth request")
logger.debug("Retrieved multi-audience token from request.user")
else:
logger.error(
"OAuth authentication failed: No access token found in request"
@@ -47,16 +53,20 @@ def get_client_from_context(ctx: Context, base_url: str) -> NextcloudClient:
raise AttributeError("No access token found in OAuth request context")
# Extract username from resource field (RFC 8707)
# We stored the username here during token verification
# UnifiedTokenVerifier stored the username here during validation
username = access_token.resource
if not username:
logger.error("No username found in access token resource field")
raise ValueError("Username not available in OAuth token context")
logger.debug(f"Creating OAuth NextcloudClient for user: {username}")
logger.debug(
f"Creating NextcloudClient for user {username} with multi-audience token "
f"(no exchange needed)"
)
# Create client with bearer token
# Token was validated to have MCP audience
# Nextcloud will validate its own audience independently
return NextcloudClient.from_token(
base_url=base_url, token=access_token.token, username=username
)
@@ -71,12 +81,19 @@ async def get_session_client_from_context(
ctx: Context, base_url: str
) -> NextcloudClient:
"""
Create NextcloudClient using RFC 8693 token exchange for session operations.
Create NextcloudClient using RFC 8693 token exchange with caching.
ADR-005 Mode 2: Exchange MCP token for Nextcloud token via RFC 8693.
This implements the token exchange pattern where:
1. Extract Flow 1 token from context (aud: "mcp-server")
2. Exchange it for ephemeral Nextcloud token via RFC 8693
3. Create client with delegated token (NOT stored)
1. Extract MCP token from context (validated by UnifiedTokenVerifier)
2. Check cache for existing exchanged token
3. If not cached or expired, exchange via RFC 8693
4. Cache the exchanged token to minimize exchange frequency
5. Create client with exchanged token
CRITICAL: This is where token exchange happens, NOT in the verifier.
The verifier already validated the MCP audience; now we exchange for Nextcloud.
Note: Nextcloud doesn't support OAuth scopes natively. Scopes are enforced
by the MCP server via @require_scopes decorator, not by the IdP. Therefore,
@@ -88,7 +105,7 @@ async def get_session_client_from_context(
base_url: Nextcloud base URL
Returns:
NextcloudClient configured with ephemeral delegated token
NextcloudClient configured with ephemeral exchanged token
Raises:
AttributeError: If context doesn't contain expected OAuth session data
@@ -96,43 +113,60 @@ async def get_session_client_from_context(
"""
settings = get_settings()
# Check if token exchange is enabled
if not settings.enable_token_exchange:
logger.info("Token exchange disabled, falling back to standard OAuth flow")
return get_client_from_context(ctx, base_url)
try:
# Extract Flow 1 token from context
# Extract MCP token from context
if hasattr(ctx.request_context.request, "user") and hasattr(
ctx.request_context.request.user, "access_token"
):
access_token: AccessToken = ctx.request_context.request.user.access_token
flow1_token = access_token.token
username = access_token.resource # Username stored during verification
logger.debug(f"Retrieved Flow 1 token for user: {username}")
mcp_token = access_token.token
username = access_token.resource # Username from UnifiedTokenVerifier
logger.debug(f"Retrieved MCP token for user: {username}")
else:
logger.error("No Flow 1 token found in request context")
logger.error("No MCP token found in request context")
raise AttributeError("No access token found in OAuth request context")
if not username:
logger.error("No username found in access token resource field")
raise ValueError("Username not available in OAuth token context")
logger.info("Exchanging client token for Nextcloud API token (pure RFC 8693)")
# Check cache for existing exchanged token
cache_key = hashlib.sha256(mcp_token.encode()).hexdigest()
if cache_key in _exchange_cache:
cached_token, expiry = _exchange_cache[cache_key]
if time.time() < expiry:
logger.debug(
f"Using cached exchanged token (expires in {expiry - time.time():.1f}s)"
)
return NextcloudClient.from_token(
base_url=base_url, token=cached_token, username=username
)
else:
logger.debug("Cached token expired, removing from cache")
del _exchange_cache[cache_key]
# Perform pure RFC 8693 token exchange (no refresh tokens)
# Note: We don't pass scopes since Nextcloud doesn't enforce them.
# The MCP server's @require_scopes decorator handles authorization.
# Perform RFC 8693 token exchange
logger.info(f"Exchanging MCP token for Nextcloud API token (user: {username})")
# Exchange for Nextcloud resource URI audience
exchanged_token, expires_in = await exchange_token_for_audience(
subject_token=flow1_token,
requested_audience="nextcloud",
subject_token=mcp_token,
requested_audience=settings.nextcloud_resource_uri or "nextcloud",
requested_scopes=None, # Nextcloud doesn't support scopes
)
logger.info(f"Pure token exchange successful. Token expires in {expires_in}s")
logger.info(f"Token exchange successful. Token expires in {expires_in}s")
# Cache the exchanged token
# Use the minimum of exchange TTL and configured cache TTL
cache_ttl = min(expires_in, settings.token_exchange_cache_ttl)
_exchange_cache[cache_key] = (exchanged_token, time.time() + cache_ttl)
logger.debug(f"Cached exchanged token for {cache_ttl}s")
# Clean up expired cache entries
_cleanup_exchange_cache()
# Create client with exchanged token
# This token is ephemeral (per-request) and NOT stored
return NextcloudClient.from_token(
base_url=base_url, token=exchanged_token, username=username
)
@@ -143,3 +177,21 @@ async def get_session_client_from_context(
except Exception as e:
logger.error(f"Token exchange failed: {e}")
raise RuntimeError(f"Token exchange required but failed: {e}") from e
def _cleanup_exchange_cache():
"""Remove expired entries from the token exchange cache."""
global _exchange_cache
now = time.time()
expired_keys = [k for k, (_, expiry) in _exchange_cache.items() if expiry <= now]
for key in expired_keys:
del _exchange_cache[key]
if expired_keys:
logger.debug(f"Cleaned up {len(expired_keys)} expired cache entries")
def clear_exchange_cache():
"""Clear the entire token exchange cache. Useful for testing."""
global _exchange_cache
_exchange_cache.clear()
logger.debug("Token exchange cache cleared")
@@ -252,6 +252,7 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
"scope": scopes,
"state": idp_state,
"prompt": "consent", # Ensure refresh token
"resource": f"{oauth_config['mcp_server_url']}/mcp", # MCP server audience
}
auth_url = f"{authorization_endpoint}?{urlencode(idp_params)}"
@@ -359,6 +360,7 @@ async def oauth_authorize_nextcloud(
"state": state,
"prompt": "consent", # Force consent to show resource access
"access_type": "offline", # Request refresh token
"resource": oauth_config["nextcloud_resource_uri"], # Nextcloud audience
}
auth_url = f"{authorization_endpoint}?{urlencode(idp_params)}"
@@ -1,366 +0,0 @@
"""
Token Verifier for ADR-004 Progressive Consent Architecture.
This module implements token verification with strict audience separation:
- Flow 1 tokens have aud: <mcp-client-id> for MCP authentication
- Flow 2 tokens have aud: "nextcloud" for resource access
- Token Broker manages the exchange between audiences
"""
import logging
import os
from datetime import datetime, timezone
from typing import Optional
import httpx
import jwt
from mcp.server.auth.provider import AccessToken
from nextcloud_mcp_server.auth.refresh_token_storage import RefreshTokenStorage
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
logger = logging.getLogger(__name__)
class ProgressiveConsentTokenVerifier:
"""
Token verifier for Progressive Consent dual OAuth flows.
This verifier:
1. Validates Flow 1 tokens (aud: <mcp-client-id>) for MCP authentication
2. Checks if user has provisioned Nextcloud access (Flow 2)
3. Uses Token Broker to obtain aud: "nextcloud" tokens when needed
"""
def __init__(
self,
token_storage: RefreshTokenStorage | None,
token_broker: Optional[TokenBrokerService] = None,
oidc_discovery_url: Optional[str] = None,
nextcloud_host: Optional[str] = None,
encryption_key: Optional[str] = None,
mcp_client_id: Optional[str] = None,
introspection_uri: Optional[str] = None,
client_secret: Optional[str] = None,
):
"""
Initialize the Progressive Consent token verifier.
Args:
token_storage: Storage for refresh tokens
token_broker: Token broker service (created if not provided)
oidc_discovery_url: OIDC provider discovery URL
nextcloud_host: Nextcloud server URL
encryption_key: Fernet key for token encryption
mcp_client_id: MCP server OAuth client ID for audience validation
introspection_uri: OAuth introspection endpoint URL (for opaque tokens)
client_secret: OAuth client secret (required for introspection)
"""
self.storage = token_storage
self.oidc_discovery_url = oidc_discovery_url or os.getenv(
"OIDC_DISCOVERY_URL",
f"{os.getenv('NEXTCLOUD_HOST')}/.well-known/openid-configuration",
)
self.nextcloud_host = nextcloud_host or os.getenv("NEXTCLOUD_HOST")
self.encryption_key = encryption_key or os.getenv("TOKEN_ENCRYPTION_KEY")
self.mcp_client_id = mcp_client_id or os.getenv("OIDC_CLIENT_ID")
self.introspection_uri = introspection_uri
self.client_secret = client_secret or os.getenv("OIDC_CLIENT_SECRET")
# HTTP client for introspection requests
self._http_client: Optional[httpx.AsyncClient] = None
if self.introspection_uri and self.mcp_client_id and self.client_secret:
self._http_client = httpx.AsyncClient(timeout=10.0)
logger.info(f"Introspection support enabled: {introspection_uri}")
elif self.introspection_uri:
logger.warning(
"Introspection URI provided but missing client credentials - introspection disabled"
)
# Create token broker if not provided
if token_broker:
self.token_broker = token_broker
elif self.encryption_key and token_storage and self.nextcloud_host:
self.token_broker = TokenBrokerService(
storage=token_storage,
oidc_discovery_url=self.oidc_discovery_url,
nextcloud_host=self.nextcloud_host,
encryption_key=self.encryption_key,
)
else:
self.token_broker = None
if not self.encryption_key:
logger.warning("Token broker not available - encryption key missing")
elif not token_storage:
logger.warning("Token broker not available - token storage missing")
elif not self.nextcloud_host:
logger.warning("Token broker not available - nextcloud host missing")
async def verify_token(self, token: str) -> Optional[AccessToken]:
"""
Verify a Flow 1 token (aud: <mcp-client-id>).
This validates that:
1. Token has correct audience for MCP server (matches client ID)
2. Token is not expired
3. Token has valid signature (if verification enabled)
Supports both JWT and opaque tokens:
- JWT tokens: Decoded directly from payload
- Opaque tokens: Validated via introspection endpoint (RFC 7662)
Args:
token: Access token from Flow 1 (JWT or opaque)
Returns:
AccessToken if valid, None otherwise
"""
logger.info("🔐 verify_token called - attempting to validate token")
logger.info(f"Token (first 50 chars): {token[:50]}...")
logger.info(f"Expected MCP client ID: {self.mcp_client_id}")
# Check if token is JWT format (has 3 parts separated by dots)
is_jwt = "." in token and token.count(".") == 2
logger.info(f"Token format: {'JWT' if is_jwt else 'opaque'}")
if is_jwt:
# Try JWT verification
return await self._verify_jwt_token(token)
else:
# Fall back to introspection for opaque tokens
return await self._verify_opaque_token(token)
async def _verify_jwt_token(self, token: str) -> Optional[AccessToken]:
"""Verify JWT token by decoding payload."""
try:
# Decode without signature verification (IdP handles that)
# In production, would verify signature with IdP public key
payload = jwt.decode(token, options={"verify_signature": False})
logger.info(f"Token payload decoded: {payload}")
# CRITICAL: Verify audience is for MCP server (Flow 1)
audiences = payload.get("aud", [])
if isinstance(audiences, str):
audiences = [audiences]
# Audience validation:
# - Accept tokens with no audience (will validate via introspection if needed)
# - Accept tokens with MCP client ID in audience (Keycloak multi-audience)
# - Accept tokens with resource URL in audience (Nextcloud JWT redirect URI)
# - Reject tokens with "nextcloud" audience only (wrong flow)
if audiences:
# Check if MCP client ID is in the audience (Keycloak multi-audience)
if self.mcp_client_id in audiences:
logger.debug(
f"Token has audience {audiences} - MCP client ID present"
)
# Check if this is a Nextcloud-only token (wrong flow)
elif audiences == ["nextcloud"]:
logger.warning(
f"Token rejected: Nextcloud-only audience {audiences}"
)
logger.error(
"Received Nextcloud token in MCP context - "
"client may be using wrong token"
)
return None
# Otherwise accept (likely resource URL audience from Nextcloud JWT)
else:
logger.info(
f"Token has audience {audiences} (resource URL or non-standard) - accepting"
)
else:
logger.info(
"Token has no audience claim - accepting for MCP server validation"
)
# Check expiry
exp = payload.get("exp", 0)
if exp < datetime.now(timezone.utc).timestamp():
logger.warning(
f"❌ Token expired: exp={exp}, now={datetime.now(timezone.utc).timestamp()}"
)
return None
# Extract user info
user_id = payload.get("sub", "unknown")
client_id = payload.get("client_id", "unknown")
scopes = payload.get("scope", "").split()
exp = payload.get("exp", None)
logger.info(
f"✅ Token validation successful! user={user_id}, scopes={scopes}"
)
# Create AccessToken for MCP framework
return AccessToken(
token=token,
client_id=client_id,
scopes=scopes,
expires_at=exp,
resource=user_id, # Store user_id in resource field (RFC 8707)
)
except jwt.InvalidTokenError as e:
logger.warning(f"❌ Invalid token (JWT decode failed): {e}")
return None
except Exception as e:
logger.error(f"❌ Token verification failed with exception: {e}")
return None
async def _verify_opaque_token(self, token: str) -> Optional[AccessToken]:
"""
Verify opaque token via introspection endpoint (RFC 7662).
Args:
token: Opaque access token
Returns:
AccessToken if active and valid, None otherwise
"""
if not self._http_client or not self.introspection_uri:
logger.error(
"❌ Cannot verify opaque token - introspection not configured. "
"Set introspection_uri and client credentials."
)
return None
try:
logger.info(f"Introspecting token at {self.introspection_uri}")
# Call introspection endpoint (requires client authentication)
response = await self._http_client.post(
self.introspection_uri,
data={"token": token},
auth=(self.mcp_client_id, self.client_secret),
)
if response.status_code != 200:
logger.warning(
f"❌ Introspection failed: HTTP {response.status_code} - {response.text[:200]}"
)
return None
introspection_data = response.json()
logger.info(f"Introspection response: {introspection_data}")
# Check if token is active
if not introspection_data.get("active", False):
logger.warning("❌ Token introspection returned active=false")
return None
# Extract user info
user_id = introspection_data.get("sub") or introspection_data.get(
"username"
)
if not user_id:
logger.error("❌ No username found in introspection response")
return None
# Extract scopes (space-separated string)
scope_string = introspection_data.get("scope", "")
scopes = scope_string.split() if scope_string else []
# Extract client ID and expiration
client_id = introspection_data.get("client_id", "unknown")
exp = introspection_data.get("exp")
logger.info(f"✅ Opaque token validated! user={user_id}, scopes={scopes}")
return AccessToken(
token=token,
client_id=client_id,
scopes=scopes,
expires_at=int(exp) if exp else None,
resource=user_id,
)
except httpx.TimeoutException:
logger.error("❌ Timeout while introspecting token")
return None
except httpx.RequestError as e:
logger.error(f"❌ Network error during introspection: {e}")
return None
except Exception as e:
logger.error(f"❌ Introspection failed with exception: {e}")
return None
async def check_provisioning(self, user_id: str) -> bool:
"""
Check if user has provisioned Nextcloud access (Flow 2).
Args:
user_id: User identifier from Flow 1 token
Returns:
True if user has completed Flow 2, False otherwise
"""
if not self.storage:
return False
refresh_data = await self.storage.get_refresh_token(user_id)
return refresh_data is not None
async def get_nextcloud_token(self, user_id: str) -> Optional[str]:
"""
Get a Nextcloud access token (aud: "nextcloud") for the user.
This uses the Token Broker to:
1. Check for cached Nextcloud token
2. If expired, refresh using stored master refresh token
3. Return token with aud: "nextcloud" for API access
Args:
user_id: User identifier from Flow 1 token
Returns:
Nextcloud access token if provisioned, None otherwise
"""
if not self.token_broker:
logger.error("Token broker not available")
return None
# Check if user has provisioned access
if not await self.check_provisioning(user_id):
logger.info(f"User {user_id} has not provisioned Nextcloud access")
return None
# Get or refresh Nextcloud token
try:
nextcloud_token = await self.token_broker.get_nextcloud_token(user_id)
if nextcloud_token:
logger.debug(f"Obtained Nextcloud token for user {user_id}")
return nextcloud_token
except Exception as e:
logger.error(f"Failed to get Nextcloud token: {e}")
return None
async def validate_scopes(
self, token: AccessToken, required_scopes: list[str]
) -> bool:
"""
Validate that token has required scopes.
Args:
token: The access token
required_scopes: List of required scopes
Returns:
True if all required scopes present, False otherwise
"""
token_scopes = set(token.scopes) if token.scopes else set()
required = set(required_scopes)
missing = required - token_scopes
if missing:
logger.debug(f"Token missing required scopes: {missing}")
return False
return True
async def close(self):
"""Clean up resources."""
if self.token_broker:
await self.token_broker.close()
if self._http_client:
await self._http_client.aclose()
-492
View File
@@ -1,492 +0,0 @@
"""Token verification using Nextcloud OIDC userinfo endpoint."""
import logging
import time
from typing import Any
import httpx
import jwt
from jwt import PyJWKClient
from mcp.server.auth.provider import AccessToken, TokenVerifier
logger = logging.getLogger(__name__)
class NextcloudTokenVerifier(TokenVerifier):
"""
Validates access tokens using JWT verification with JWKS or userinfo endpoint fallback.
This verifier supports both JWT and opaque tokens:
1. For JWT tokens: Verifies signature with JWKS and extracts scopes from payload
2. For opaque tokens: Falls back to userinfo endpoint validation
3. Caches successful responses to avoid repeated API calls/verifications
JWT validation provides:
- Faster validation (no HTTP call needed)
- Direct scope extraction from token payload
- Signature verification using JWKS
Userinfo fallback provides:
- Support for opaque tokens
- Backward compatibility
- Additional validation layer
"""
def __init__(
self,
nextcloud_host: str,
userinfo_uri: str,
jwks_uri: str | None = None,
issuer: str | None = None,
introspection_uri: str | None = None,
client_id: str | None = None,
client_secret: str | None = None,
cache_ttl: int = 3600,
):
"""
Initialize the token verifier.
Args:
nextcloud_host: Base URL of the Nextcloud instance (e.g., https://cloud.example.com)
userinfo_uri: Full URL to the userinfo endpoint
jwks_uri: Full URL to the JWKS endpoint (for JWT verification)
issuer: Expected issuer claim value (for JWT verification)
introspection_uri: Full URL to the introspection endpoint (for opaque tokens)
client_id: OAuth client ID (required for introspection)
client_secret: OAuth client secret (required for introspection)
cache_ttl: Time-to-live for cached tokens in seconds (default: 3600)
"""
self.nextcloud_host = nextcloud_host.rstrip("/")
self.userinfo_uri = userinfo_uri
self.jwks_uri = jwks_uri
self.issuer = issuer
self.introspection_uri = introspection_uri
self.client_id = client_id
self.client_secret = client_secret
self.cache_ttl = cache_ttl
# Cache: token -> (userinfo, expiry_timestamp)
self._token_cache: dict[str, tuple[dict[str, Any], float]] = {}
# HTTP client for userinfo/introspection requests
self._client = httpx.AsyncClient(timeout=10.0)
# PyJWKClient for JWT verification (lazy initialization)
self._jwks_client: PyJWKClient | None = None
if jwks_uri:
logger.info(f"JWT verification enabled with JWKS URI: {jwks_uri}")
self._jwks_client = PyJWKClient(jwks_uri, cache_keys=True)
# Introspection support
if introspection_uri and client_id and client_secret:
logger.info(f"Token introspection enabled: {introspection_uri}")
elif introspection_uri:
logger.warning(
"Introspection URI provided but missing client credentials - introspection disabled"
)
async def verify_token(self, token: str) -> AccessToken | None:
"""
Verify a bearer token using JWT verification, introspection, or userinfo endpoint.
This method:
1. Checks the cache first for recent validations
2. Attempts JWT verification if JWKS is configured and token looks like JWT
3. Falls back to introspection for opaque tokens (if configured)
4. Falls back to userinfo endpoint as last resort
5. Returns AccessToken with username and scopes
Args:
token: The bearer token to verify
Returns:
AccessToken if valid, None if invalid or expired
"""
# Check cache first
cached = self._get_cached_token(token)
if cached:
logger.debug("Token found in cache")
return cached
# Try JWT verification first if enabled and token looks like JWT
is_jwt_format = self._is_jwt_format(token)
logger.debug(
f"Token format check: is_jwt_format={is_jwt_format}, _jwks_client={self._jwks_client is not None}"
)
if self._jwks_client and is_jwt_format:
logger.debug("Attempting JWT verification...")
jwt_result = self._verify_jwt(token)
if jwt_result:
logger.info("Token validated via JWT verification")
return jwt_result
else:
logger.warning("JWT verification failed, will try other methods")
# For opaque tokens, try introspection if available
if self.introspection_uri and self.client_id and self.client_secret:
logger.debug("Attempting token introspection...")
try:
introspection_result = await self._verify_via_introspection(token)
if introspection_result:
logger.info("Token validated via introspection")
return introspection_result
except Exception as e:
logger.warning(f"Introspection failed: {e}")
# Fall back to userinfo endpoint validation (last resort)
logger.debug("Attempting userinfo endpoint validation...")
try:
return await self._verify_via_userinfo(token)
except Exception as e:
logger.warning(f"Token verification failed: {e}")
return None
def _is_jwt_format(self, token: str) -> bool:
"""
Check if token looks like a JWT (has 3 parts separated by dots).
Args:
token: The token to check
Returns:
True if token appears to be JWT format
"""
return "." in token and token.count(".") == 2
def _verify_jwt(self, token: str) -> AccessToken | None:
"""
Verify JWT token with signature validation using JWKS.
Args:
token: The JWT token to verify
Returns:
AccessToken if valid, None if invalid
"""
try:
# Get signing key from JWKS
assert self._jwks_client is not None # Caller should check before calling
signing_key = self._jwks_client.get_signing_key_from_jwt(token)
# Verify and decode JWT
# Accept tokens with audience: "mcp-server" or ["mcp-server", "nextcloud"]
# This allows:
# 1. Tokens from MCP clients (aud: "mcp-server")
# 2. Tokens for Nextcloud APIs (aud: "nextcloud")
# 3. Tokens for both (aud: ["mcp-server", "nextcloud"])
payload = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
issuer=self.issuer,
audience=["mcp-server", "nextcloud"], # Accept either audience
options={
"verify_signature": True,
"verify_exp": True,
"verify_iat": True,
"verify_iss": True if self.issuer else False,
"verify_aud": True, # Enable audience validation
},
)
logger.debug(f"JWT verified successfully for user: {payload.get('sub')}")
logger.debug(f"Full JWT payload: {payload}")
# Extract username (sub claim, with fallback to preferred_username)
# Some OIDC providers (like Keycloak) may not include sub in access tokens
username = payload.get("sub") or payload.get("preferred_username")
if not username:
logger.error(
"No 'sub' or 'preferred_username' claim found in JWT payload"
)
return None
# Extract scopes from scope claim (space-separated string)
scope_string = payload.get("scope", "")
scopes = scope_string.split() if scope_string else []
logger.debug(
f"Extracted scopes from JWT - scope claim: '{scope_string}' -> scopes list: {scopes}"
)
# Extract expiration
exp = payload.get("exp")
if not exp:
logger.warning("No 'exp' claim in JWT, using default TTL")
exp = int(time.time() + self.cache_ttl)
# Cache the result
userinfo = {
"sub": username,
"scope": scope_string,
**{k: v for k, v in payload.items() if k not in ["sub", "scope"]},
}
self._token_cache[token] = (userinfo, exp)
return AccessToken(
token=token,
client_id=payload.get("client_id", ""),
scopes=scopes,
expires_at=exp,
resource=username, # Store username in resource field (RFC 8707)
)
except jwt.ExpiredSignatureError:
logger.info("JWT token has expired")
return None
except jwt.InvalidIssuerError as e:
logger.warning(f"JWT issuer validation failed: {e}")
return None
except jwt.InvalidTokenError as e:
logger.warning(f"JWT validation failed: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error during JWT verification: {e}")
return None
async def _verify_via_introspection(self, token: str) -> AccessToken | None:
"""
Validate token by calling the introspection endpoint (RFC 7662).
This method validates opaque tokens and retrieves their scopes.
Args:
token: The bearer token to introspect
Returns:
AccessToken if active, None if inactive or invalid
"""
try:
# Introspection requires client authentication
response = await self._client.post(
self.introspection_uri, # type: ignore
data={"token": token},
auth=(self.client_id, self.client_secret),
)
if response.status_code == 200:
introspection_data = response.json()
# Check if token is active
if not introspection_data.get("active", False):
logger.info("Token introspection returned inactive=false")
return None
logger.debug(
f"Token introspected successfully for user: {introspection_data.get('sub')}"
)
# Extract username
username = introspection_data.get("sub") or introspection_data.get(
"username"
)
if not username:
logger.error("No username found in introspection response")
return None
# Extract scopes (space-separated string)
scope_string = introspection_data.get("scope", "")
scopes = scope_string.split() if scope_string else []
logger.debug(f"Extracted scopes from introspection: {scopes}")
# Extract expiration
exp = introspection_data.get("exp")
if exp:
expiry = float(exp)
else:
logger.warning(
"No 'exp' in introspection response, using default TTL"
)
expiry = time.time() + self.cache_ttl
# Cache the result
cache_data = {
"sub": username,
"scope": scope_string,
**{
k: v
for k, v in introspection_data.items()
if k not in ["sub", "scope", "active"]
},
}
self._token_cache[token] = (cache_data, expiry)
return AccessToken(
token=token,
client_id=introspection_data.get("client_id", ""),
scopes=scopes,
expires_at=int(expiry),
resource=username,
)
elif response.status_code in (400, 401, 403):
logger.warning(
f"Token introspection failed: HTTP {response.status_code}. "
f"This may indicate: (1) Client credentials mismatch - trying to introspect "
f"token issued to different OAuth client, (2) Expired client credentials, "
f"(3) Invalid token. Will fall back to userinfo endpoint. "
f"Response: {response.text[:200] if response.text else 'empty'}"
)
return None
else:
logger.warning(
f"Unexpected response from introspection: {response.status_code}. "
f"Response: {response.text[:200] if response.text else 'empty'}"
)
return None
except httpx.TimeoutException:
logger.error("Timeout while introspecting token")
return None
except httpx.RequestError as e:
logger.error(f"Network error while introspecting token: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error during token introspection: {e}")
return None
async def _verify_via_userinfo(self, token: str) -> AccessToken | None:
"""
Validate token by calling the userinfo endpoint.
Args:
token: The bearer token to verify
Returns:
AccessToken if valid, None otherwise
"""
try:
response = await self._client.get(
self.userinfo_uri, headers={"Authorization": f"Bearer {token}"}
)
if response.status_code == 200:
userinfo = response.json()
logger.debug(
f"Token validated successfully for user: {userinfo.get('sub')}"
)
# Cache the result
expiry = time.time() + self.cache_ttl
self._token_cache[token] = (userinfo, expiry)
# Create AccessToken with username in resource field (workaround for MCP SDK)
username = userinfo.get("sub") or userinfo.get("preferred_username")
if not username:
logger.error("No username found in userinfo response")
return None
return AccessToken(
token=token,
client_id="", # Not available from userinfo
scopes=self._extract_scopes(userinfo),
expires_at=int(expiry),
resource=username, # Store username in resource field (RFC 8707)
)
elif response.status_code in (400, 401, 403):
logger.info(f"Token validation failed: HTTP {response.status_code}")
return None
else:
logger.warning(
f"Unexpected response from userinfo: {response.status_code}"
)
return None
except httpx.TimeoutException:
logger.error("Timeout while validating token via userinfo endpoint")
return None
except httpx.RequestError as e:
logger.error(f"Network error while validating token: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error during token validation: {e}")
return None
def _get_cached_token(self, token: str) -> AccessToken | None:
"""
Retrieve a token from cache if not expired.
Args:
token: The bearer token to look up
Returns:
AccessToken if cached and valid, None otherwise
"""
if token not in self._token_cache:
return None
userinfo, expiry = self._token_cache[token]
# Check if expired
if time.time() >= expiry:
logger.debug("Cached token expired, removing from cache")
del self._token_cache[token]
return None
# Return cached AccessToken
username = userinfo.get("sub") or userinfo.get("preferred_username")
return AccessToken(
token=token,
client_id="",
scopes=self._extract_scopes(userinfo),
expires_at=int(expiry),
resource=username,
)
def _extract_scopes(self, userinfo: dict[str, Any]) -> list[str]:
"""
Extract scopes from userinfo response.
First attempts to read actual scopes from the 'scope' field (RFC 8693).
If not present, infers scopes from the claims present in the response.
Args:
userinfo: The userinfo response dictionary
Returns:
List of scopes (actual or inferred)
"""
# Try to get actual scopes from userinfo response (if OIDC provider includes it)
scope_string = userinfo.get("scope")
if scope_string:
scopes = scope_string.split() if isinstance(scope_string, str) else []
if scopes:
logger.debug(
f"Using actual scopes from userinfo: {scopes} (scope field present)"
)
return scopes
# Fallback: Infer scopes from claims present in response
# This maintains backward compatibility with OIDC providers that don't
# include the scope field in userinfo responses
logger.debug(
"No scope field in userinfo response, inferring scopes from claims"
)
scopes = ["openid"] # Always present
if "email" in userinfo:
scopes.append("email")
if any(
key in userinfo for key in ["name", "given_name", "family_name", "picture"]
):
scopes.append("profile")
if "roles" in userinfo:
scopes.append("roles")
if "groups" in userinfo:
scopes.append("groups")
logger.debug(f"Inferred scopes from userinfo claims: {scopes}")
return scopes
def clear_cache(self):
"""Clear the token cache."""
self._token_cache.clear()
logger.debug("Token cache cleared")
async def close(self):
"""Cleanup resources."""
await self._client.aclose()
logger.debug("Token verifier closed")
@@ -0,0 +1,417 @@
"""
Unified Token Verifier for ADR-005 Token Audience Validation.
This module replaces both NextcloudTokenVerifier and ProgressiveConsentTokenVerifier
with a single implementation that supports two compliant OAuth modes:
1. Multi-audience mode (default): Validates MCP audience per RFC 7519 (resource servers
validate only their own audience). Nextcloud independently validates its own audience.
2. Token exchange mode (opt-in): Tokens have MCP audience only, exchanged for Nextcloud tokens
Key Design Principles:
- Token verification happens HERE (validates MCP audience per OAuth spec)
- Token exchange happens in context_helper.py (when creating NextcloudClient)
- No token passthrough allowed (complies with MCP Security Specification)
- Token reuse IS allowed for multi-audience tokens (RFC 8707)
"""
import hashlib
import logging
import time
from typing import Any
import httpx
import jwt
from jwt import PyJWKClient
from mcp.server.auth.provider import AccessToken, TokenVerifier
from nextcloud_mcp_server.config import Settings
logger = logging.getLogger(__name__)
class UnifiedTokenVerifier(TokenVerifier):
"""
Unified token verifier supporting both multi-audience and token exchange modes.
Compliant with MCP security specification - no token pass-through.
This verifier:
1. Validates tokens using JWT verification with JWKS or introspection fallback
2. Enforces proper audience validation based on configured mode
3. Caches successful validations to avoid repeated API calls
Mode Selection (via ENABLE_TOKEN_EXCHANGE setting):
- False/omit (default): Multi-audience mode - validates MCP audience only (per RFC 7519).
Nextcloud independently validates its own audience when receiving API calls.
- True: Exchange mode - requires MCP audience only, then exchanges for Nextcloud token
"""
def __init__(self, settings: Settings):
"""
Initialize the unified token verifier.
Args:
settings: Application settings containing OAuth configuration
"""
self.settings = settings
self.mode = "exchange" if settings.enable_token_exchange else "multi-audience"
# Common components for all modes
self.http_client = httpx.AsyncClient(timeout=10.0)
# JWT verification support
self.jwks_client: PyJWKClient | None = None
if hasattr(settings, "jwks_uri") and settings.jwks_uri:
logger.info(f"JWT verification enabled with JWKS URI: {settings.jwks_uri}")
self.jwks_client = PyJWKClient(settings.jwks_uri, cache_keys=True)
# Introspection support (for opaque tokens)
self.introspection_uri: str | None = None
if (
hasattr(settings, "introspection_uri")
and settings.introspection_uri
and settings.oidc_client_id
and settings.oidc_client_secret
):
self.introspection_uri = settings.introspection_uri
logger.info(f"Token introspection enabled: {self.introspection_uri}")
# Token cache: token_hash -> (userinfo, expiry_timestamp)
self._token_cache: dict[str, tuple[dict[str, Any], float]] = {}
self.cache_ttl = 3600 # 1 hour default
logger.info(
f"UnifiedTokenVerifier initialized in {self.mode} mode. "
f"MCP audience: {settings.oidc_client_id} or {settings.nextcloud_mcp_server_url}, "
f"Nextcloud resource URI: {settings.nextcloud_resource_uri}"
)
async def verify_token(self, token: str) -> AccessToken | None:
"""
Verify token according to MCP TokenVerifier protocol.
Per RFC 7519, we validate only MCP audience. The mode determines what
happens AFTER verification in context_helper.py:
- Multi-audience mode: Use token directly (Nextcloud validates its own audience)
- Exchange mode: Exchange for Nextcloud-audience token via RFC 8693
Args:
token: Bearer token to verify
Returns:
AccessToken if valid with MCP audience, None otherwise
"""
# Check cache first
cached = self._get_cached_token(token)
if cached:
logger.debug("Token found in cache")
return cached
# Both modes do the same validation (MCP audience only)
return await self._verify_mcp_audience(token)
async def _verify_mcp_audience(self, token: str) -> AccessToken | None:
"""
Validate token has MCP audience.
Per RFC 7519 Section 4.1.3, resource servers validate only their own
presence in the audience claim. We don't validate Nextcloud's audience -
that's Nextcloud's responsibility when it receives the token.
Args:
token: Bearer token to verify
Returns:
AccessToken if valid with MCP audience, None otherwise
"""
try:
# Attempt JWT verification first
if self._is_jwt_format(token) and self.jwks_client:
payload = await self._verify_jwt_signature(token)
else:
# Fall back to introspection for opaque tokens
payload = await self._introspect_token(token)
if not payload:
return None
# Check payload is valid
if not payload:
return None
# Validate MCP audience is present
if not self._has_mcp_audience(payload):
audiences = payload.get("aud", [])
logger.error(
f"Token rejected: Missing MCP audience. "
f"Got {audiences}, need MCP ({self.settings.oidc_client_id} or "
f"{self.settings.nextcloud_mcp_server_url})"
)
return None
# Log based on mode for clarity
if self.mode == "multi-audience":
logger.info(
"MCP audience validated - token can be used directly "
"(Nextcloud will validate its own audience)"
)
else:
logger.info(
"MCP audience validated - token will be exchanged for Nextcloud access"
)
return self._create_access_token(token, payload)
except Exception as e:
logger.error(f"Token verification failed: {e}")
return None
def _has_mcp_audience(self, payload: dict[str, Any]) -> bool:
"""
Check if token has MCP audience.
Per RFC 7519 Section 4.1.3, resource servers should only validate their own
presence in the audience claim. We don't validate Nextcloud's audience - that's
Nextcloud's responsibility when it receives the token.
Args:
payload: Decoded token payload
Returns:
True if MCP audience present, False otherwise
"""
audiences = payload.get("aud", [])
if isinstance(audiences, str):
audiences = [audiences]
audiences_set = set(audiences)
# MCP must have at least one: client_id OR server_url OR server_url/mcp
return bool(
self.settings.oidc_client_id in audiences_set
or (
self.settings.nextcloud_mcp_server_url
and (
self.settings.nextcloud_mcp_server_url in audiences_set
or f"{self.settings.nextcloud_mcp_server_url}/mcp" in audiences_set
)
)
)
def _is_jwt_format(self, token: str) -> bool:
"""
Check if token looks like a JWT (has 3 parts separated by dots).
Args:
token: The token to check
Returns:
True if token appears to be JWT format
"""
return "." in token and token.count(".") == 2
async def _verify_jwt_signature(self, token: str) -> dict[str, Any] | None:
"""
Verify JWT token with signature validation using JWKS.
Args:
token: JWT token to verify
Returns:
Decoded payload if valid, None if invalid
"""
try:
assert self.jwks_client is not None # Caller should check before calling
# Get signing key from JWKS
signing_key = self.jwks_client.get_signing_key_from_jwt(token)
# Verify and decode JWT
# Note: We don't validate audience here - that's done separately based on mode
payload = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
issuer=self.settings.oidc_issuer
if hasattr(self.settings, "oidc_issuer")
else None,
options={
"verify_signature": True,
"verify_exp": True,
"verify_iat": True,
"verify_iss": True
if hasattr(self.settings, "oidc_issuer")
and self.settings.oidc_issuer
else False,
"verify_aud": False, # We handle audience validation separately
},
)
logger.debug(f"JWT signature verified for user: {payload.get('sub')}")
return payload
except jwt.ExpiredSignatureError:
logger.info("JWT token has expired")
return None
except jwt.InvalidIssuerError as e:
logger.warning(f"JWT issuer validation failed: {e}")
return None
except jwt.InvalidTokenError as e:
logger.warning(f"JWT validation failed: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error during JWT verification: {e}")
return None
async def _introspect_token(self, token: str) -> dict[str, Any] | None:
"""
Validate token by calling the introspection endpoint (RFC 7662).
Args:
token: Bearer token to introspect
Returns:
Token payload if active, None if inactive or invalid
"""
if not self.introspection_uri:
logger.debug("No introspection endpoint configured")
return None
try:
# Introspection requires client authentication
response = await self.http_client.post(
self.introspection_uri,
data={"token": token},
auth=(self.settings.oidc_client_id, self.settings.oidc_client_secret),
)
if response.status_code == 200:
introspection_data = response.json()
# Check if token is active
if not introspection_data.get("active", False):
logger.info("Token introspection returned inactive=false")
return None
logger.debug(
f"Token introspected successfully for user: {introspection_data.get('sub')}"
)
return introspection_data
elif response.status_code in (400, 401, 403):
logger.warning(
f"Token introspection failed: HTTP {response.status_code}. "
f"Response: {response.text[:200] if response.text else 'empty'}"
)
return None
else:
logger.warning(
f"Unexpected response from introspection: {response.status_code}. "
f"Response: {response.text[:200] if response.text else 'empty'}"
)
return None
except httpx.TimeoutException:
logger.error("Timeout while introspecting token")
return None
except httpx.RequestError as e:
logger.error(f"Network error while introspecting token: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error during token introspection: {e}")
return None
def _create_access_token(
self, token: str, payload: dict[str, Any]
) -> AccessToken | None:
"""
Create AccessToken object from validated token payload.
Args:
token: The bearer token
payload: Validated token payload
Returns:
AccessToken object or None if required fields missing
"""
# Extract username (sub claim, with fallback to preferred_username)
username = payload.get("sub") or payload.get("preferred_username")
if not username:
logger.error(
"No 'sub' or 'preferred_username' claim found in token payload"
)
return None
# Extract scopes from scope claim (space-separated string)
scope_string = payload.get("scope", "")
scopes = scope_string.split() if scope_string else []
logger.debug(
f"Extracted scopes from token - scope claim: '{scope_string}' -> scopes list: {scopes}"
)
# Extract expiration
exp = payload.get("exp")
if not exp:
logger.warning("No 'exp' claim in token, using default TTL")
exp = int(time.time() + self.cache_ttl)
# Cache the result
token_hash = hashlib.sha256(token.encode()).hexdigest()
userinfo = {
"sub": username,
"scope": scope_string,
**{k: v for k, v in payload.items() if k not in ["sub", "scope"]},
}
self._token_cache[token_hash] = (userinfo, exp)
return AccessToken(
token=token,
client_id=payload.get("client_id", ""),
scopes=scopes,
expires_at=exp,
resource=username, # Store username in resource field (RFC 8707)
)
def _get_cached_token(self, token: str) -> AccessToken | None:
"""
Retrieve a token from cache if not expired.
Args:
token: The bearer token to look up
Returns:
AccessToken if cached and valid, None otherwise
"""
token_hash = hashlib.sha256(token.encode()).hexdigest()
if token_hash not in self._token_cache:
return None
userinfo, expiry = self._token_cache[token_hash]
# Check if expired
if time.time() >= expiry:
logger.debug("Cached token expired, removing from cache")
del self._token_cache[token_hash]
return None
# Return cached AccessToken
username = userinfo.get("sub") or userinfo.get("preferred_username")
scope_string = userinfo.get("scope", "")
scopes = scope_string.split() if scope_string else []
return AccessToken(
token=token,
client_id=userinfo.get("client_id", ""),
scopes=scopes,
expires_at=int(expiry),
resource=username,
)
def clear_cache(self):
"""Clear the token cache."""
self._token_cache.clear()
logger.debug("Token cache cleared")
async def close(self):
"""Cleanup resources."""
await self.http_client.aclose()
logger.debug("Unified token verifier closed")
+23
View File
@@ -129,16 +129,29 @@ class Settings:
oidc_discovery_url: Optional[str] = None
oidc_client_id: Optional[str] = None
oidc_client_secret: Optional[str] = None
oidc_issuer: Optional[str] = None
# Nextcloud settings
nextcloud_host: Optional[str] = None
nextcloud_username: Optional[str] = None
nextcloud_password: Optional[str] = None
# ADR-005: Token Audience Validation (required for OAuth mode)
nextcloud_mcp_server_url: Optional[str] = None # MCP server URL (used as audience)
nextcloud_resource_uri: Optional[str] = None # Nextcloud resource identifier
# Token verification endpoints
jwks_uri: Optional[str] = None
introspection_uri: Optional[str] = None
userinfo_uri: Optional[str] = None
# Progressive Consent settings (always enabled - no flag needed)
enable_token_exchange: bool = False
enable_offline_access: bool = False
# Token exchange cache settings
token_exchange_cache_ttl: int = 300 # seconds (5 minutes default)
# Token settings
token_encryption_key: Optional[str] = None
token_storage_db: Optional[str] = None
@@ -155,10 +168,18 @@ def get_settings() -> Settings:
oidc_discovery_url=os.getenv("OIDC_DISCOVERY_URL"),
oidc_client_id=os.getenv("OIDC_CLIENT_ID"),
oidc_client_secret=os.getenv("OIDC_CLIENT_SECRET"),
oidc_issuer=os.getenv("OIDC_ISSUER"),
# Nextcloud settings
nextcloud_host=os.getenv("NEXTCLOUD_HOST"),
nextcloud_username=os.getenv("NEXTCLOUD_USERNAME"),
nextcloud_password=os.getenv("NEXTCLOUD_PASSWORD"),
# ADR-005: Token Audience Validation
nextcloud_mcp_server_url=os.getenv("NEXTCLOUD_MCP_SERVER_URL"),
nextcloud_resource_uri=os.getenv("NEXTCLOUD_RESOURCE_URI"),
# Token verification endpoints
jwks_uri=os.getenv("JWKS_URI"),
introspection_uri=os.getenv("INTROSPECTION_URI"),
userinfo_uri=os.getenv("USERINFO_URI"),
# Progressive Consent settings (always enabled)
enable_token_exchange=(
os.getenv("ENABLE_TOKEN_EXCHANGE", "false").lower() == "true"
@@ -166,6 +187,8 @@ def get_settings() -> Settings:
enable_offline_access=(
os.getenv("ENABLE_OFFLINE_ACCESS", "false").lower() == "true"
),
# Token exchange cache settings
token_exchange_cache_ttl=int(os.getenv("TOKEN_EXCHANGE_CACHE_TTL", "300")),
# Token settings
token_encryption_key=os.getenv("TOKEN_ENCRYPTION_KEY"),
token_storage_db=os.getenv("TOKEN_STORAGE_DB", "/tmp/tokens.db"),
+19 -14
View File
@@ -10,12 +10,15 @@ async def get_client(ctx: Context) -> NextcloudClient:
"""
Get the appropriate Nextcloud client based on authentication mode.
This function handles three modes:
ADR-005 compliant implementation supporting two modes:
1. BasicAuth mode: Returns shared client from lifespan context
2. OAuth pass-through mode (ENABLE_TOKEN_EXCHANGE=false, default):
Verifies Flow 1 token and passes it to Nextcloud
3. OAuth token exchange mode (ENABLE_TOKEN_EXCHANGE=true):
Exchanges Flow 1 token for ephemeral Nextcloud token via RFC 8693
2. Multi-audience mode (ENABLE_TOKEN_EXCHANGE=false, default):
Token already contains both MCP and Nextcloud audiences - use directly
3. Token exchange mode (ENABLE_TOKEN_EXCHANGE=true):
Exchange MCP token for Nextcloud token via RFC 8693
SECURITY: Token passthrough has been REMOVED. All OAuth modes validate
proper token audiences per MCP Security Best Practices specification.
Note: Nextcloud doesn't support OAuth scopes natively. Scopes are enforced
by the MCP server via @require_scopes decorator, not by the IdP.
@@ -49,20 +52,22 @@ async def get_client(ctx: Context) -> NextcloudClient:
# OAuth mode (has 'nextcloud_host' attribute)
if hasattr(lifespan_ctx, "nextcloud_host"):
# Check if token exchange is enabled
if settings.enable_token_exchange:
from nextcloud_mcp_server.auth.context_helper import (
get_session_client_from_context,
)
from nextcloud_mcp_server.auth.context_helper import (
get_client_from_context,
get_session_client_from_context,
)
# Token exchange mode: Exchange Flow 1 token for ephemeral Nextcloud token
if settings.enable_token_exchange:
# Mode 2: Exchange MCP token for Nextcloud token
# Token was validated to have MCP audience in UnifiedTokenVerifier
# Now exchange it for Nextcloud audience
return await get_session_client_from_context(
ctx, lifespan_ctx.nextcloud_host
)
else:
# Pass-through mode (default): Verify and pass Flow 1 token to Nextcloud
from nextcloud_mcp_server.auth import get_client_from_context
# Mode 1: Multi-audience token - use directly
# Token was validated to have MCP audience in UnifiedTokenVerifier
# Nextcloud will independently validate its own audience when receiving API calls
return get_client_from_context(ctx, lifespan_ctx.nextcloud_host)
# Unknown context type
+2 -2
View File
@@ -1,6 +1,6 @@
[project]
name = "nextcloud-mcp-server"
version = "0.24.0"
version = "0.25.0"
description = "Model Context Protocol (MCP) server for Nextcloud integration - enables AI assistants to interact with Nextcloud data"
authors = [
{name = "Chris Coutinho", email = "chris@coutinho.io"}
@@ -10,7 +10,7 @@ license = {text = "AGPL-3.0-only"}
requires-python = ">=3.11"
keywords = ["nextcloud", "mcp", "model-context-protocol", "llm", "ai", "claude", "webdav", "caldav", "carddav"]
dependencies = [
"mcp[cli] (>=1.19,<1.20)",
"mcp[cli] (>=1.20,<1.21)",
"httpx (>=0.28.1,<0.29.0)",
"pillow (>=12.0.0,<12.1.0)",
"icalendar (>=6.0.0,<7.0.0)",
+18
View File
@@ -2193,17 +2193,35 @@ async def _get_oauth_token_for_user(
logger.info(f"Getting OAuth token for user: {username}...")
logger.info(f"Using shared OAuth client: {client_id[:16]}...")
# Fetch resource identifier from PRM endpoint (RFC 9728)
mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8001")
prm_url = f"{mcp_server_url}/.well-known/oauth-protected-resource"
logger.debug(f"Fetching PRM metadata from: {prm_url}")
async with httpx.AsyncClient() as client:
prm_response = await client.get(prm_url, timeout=10)
if prm_response.status_code != 200:
logger.warning(f"Failed to fetch PRM metadata: {prm_response.status_code}")
# Fallback to default if PRM fetch fails
mcp_server_resource = f"{mcp_server_url}/mcp"
else:
prm_data = prm_response.json()
mcp_server_resource = prm_data.get("resource", f"{mcp_server_url}/mcp")
logger.info(f"Using resource from PRM: {mcp_server_resource}")
# Generate unique state parameter for this OAuth flow
state = secrets.token_urlsafe(32)
logger.debug(f"Generated state for {username}: {state[:16]}...")
# Construct authorization URL with state parameter
# Include resource parameter discovered from PRM endpoint
auth_url = (
f"{authorization_endpoint}?"
f"response_type=code&"
f"client_id={client_id}&"
f"redirect_uri={quote(callback_url, safe='')}&"
f"state={state}&"
f"resource={quote(mcp_server_resource, safe='')}&" # Resource URI from PRM
f"scope=openid%20profile%20email%20notes:read%20notes:write%20calendar:read%20calendar:write%20contacts:read%20contacts:write%20cookbook:read%20cookbook:write%20deck:read%20deck:write%20tables:read%20tables:write%20files:read%20files:write%20sharing:read%20sharing:write"
)
+523
View File
@@ -0,0 +1,523 @@
"""
Unit tests for UnifiedTokenVerifier (ADR-005).
Tests token audience validation for both multi-audience and token exchange modes
without requiring real network calls or IdP connections.
"""
import time
from unittest.mock import AsyncMock, MagicMock, patch
import jwt
import pytest
from nextcloud_mcp_server.auth.unified_verifier import UnifiedTokenVerifier
from nextcloud_mcp_server.config import Settings
pytestmark = pytest.mark.unit
@pytest.fixture
def base_settings():
"""Create base settings for testing."""
return Settings(
oidc_client_id="test-client-id",
oidc_client_secret="test-client-secret",
oidc_issuer="https://idp.example.com",
nextcloud_host="https://nextcloud.example.com",
nextcloud_mcp_server_url="http://localhost:8000",
nextcloud_resource_uri="http://localhost:8080",
jwks_uri="https://idp.example.com/jwks",
introspection_uri="https://idp.example.com/introspect",
enable_token_exchange=False, # Multi-audience mode
token_exchange_cache_ttl=300,
)
@pytest.fixture
def exchange_settings(base_settings):
"""Create settings for token exchange mode."""
base_settings.enable_token_exchange = True
return base_settings
class TestUnifiedTokenVerifierInit:
"""Test UnifiedTokenVerifier initialization."""
def test_init_multi_audience_mode(self, base_settings):
"""Test verifier initialization in multi-audience mode."""
verifier = UnifiedTokenVerifier(base_settings)
assert verifier.mode == "multi-audience"
assert verifier.settings == base_settings
def test_init_exchange_mode(self, exchange_settings):
"""Test verifier initialization in token exchange mode."""
verifier = UnifiedTokenVerifier(exchange_settings)
assert verifier.mode == "exchange"
assert verifier.settings == exchange_settings
class TestAudienceValidation:
"""Test audience validation logic."""
def test_validate_multi_audience_both_present(self, base_settings):
"""Test MCP audience validation with both audiences present."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"aud": ["test-client-id", "http://localhost:8080"],
"sub": "testuser",
"exp": int(time.time() + 3600),
}
assert verifier._has_mcp_audience(payload) is True
def test_validate_multi_audience_server_url_and_resource(self, base_settings):
"""Test MCP audience validation with server URL instead of client ID."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"aud": ["http://localhost:8000", "http://localhost:8080"],
"sub": "testuser",
"exp": int(time.time() + 3600),
}
assert verifier._has_mcp_audience(payload) is True
def test_validate_multi_audience_missing_mcp(self, base_settings):
"""Test MCP audience validation fails without MCP audience."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"aud": ["http://localhost:8080"], # Only Nextcloud
"sub": "testuser",
"exp": int(time.time() + 3600),
}
assert verifier._has_mcp_audience(payload) is False
def test_validate_multi_audience_missing_nextcloud(self, base_settings):
"""Test MCP audience validation succeeds with only MCP audience (RFC 7519 compliant)."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"aud": ["test-client-id"], # Only MCP
"sub": "testuser",
"exp": int(time.time() + 3600),
}
# Per RFC 7519, we only validate MCP audience. Nextcloud validates its own.
assert verifier._has_mcp_audience(payload) is True
def test_validate_multi_audience_string_audience(self, base_settings):
"""Test MCP audience validation with string audience works (RFC 7519 compliant)."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"aud": "test-client-id", # Single audience as string
"sub": "testuser",
"exp": int(time.time() + 3600),
}
# Should pass - we only validate MCP audience per RFC 7519
assert verifier._has_mcp_audience(payload) is True
def test_has_mcp_audience_with_client_id(self, exchange_settings):
"""Test MCP audience validation with client ID."""
verifier = UnifiedTokenVerifier(exchange_settings)
payload = {
"aud": ["test-client-id"],
"sub": "testuser",
"exp": int(time.time() + 3600),
}
assert verifier._has_mcp_audience(payload) is True
def test_has_mcp_audience_with_server_url(self, exchange_settings):
"""Test MCP audience validation with server URL."""
verifier = UnifiedTokenVerifier(exchange_settings)
payload = {
"aud": ["http://localhost:8000"],
"sub": "testuser",
"exp": int(time.time() + 3600),
}
assert verifier._has_mcp_audience(payload) is True
def test_has_mcp_audience_missing(self, exchange_settings):
"""Test MCP audience validation fails without MCP audience."""
verifier = UnifiedTokenVerifier(exchange_settings)
payload = {
"aud": ["http://localhost:8080"], # Wrong audience
"sub": "testuser",
"exp": int(time.time() + 3600),
}
assert verifier._has_mcp_audience(payload) is False
class TestTokenFormatDetection:
"""Test JWT format detection."""
def test_is_jwt_format_valid(self, base_settings):
"""Test JWT format detection with valid JWT."""
verifier = UnifiedTokenVerifier(base_settings)
jwt_token = "eyJhbGc.eyJzdWI.signature"
assert verifier._is_jwt_format(jwt_token) is True
def test_is_jwt_format_opaque(self, base_settings):
"""Test JWT format detection with opaque token."""
verifier = UnifiedTokenVerifier(base_settings)
opaque_token = "opaque-token-12345"
assert verifier._is_jwt_format(opaque_token) is False
class TestTokenCaching:
"""Test token caching functionality."""
async def test_cache_stores_and_retrieves(self, base_settings):
"""Test token caching stores and retrieves tokens."""
verifier = UnifiedTokenVerifier(base_settings)
# Create a valid access token
payload = {
"aud": ["test-client-id", "http://localhost:8080"],
"sub": "testuser",
"scope": "openid profile",
"exp": int(time.time() + 3600),
"client_id": "test-client-id",
}
test_token = jwt.encode(payload, "secret", algorithm="HS256")
# Create AccessToken and cache it
access_token = verifier._create_access_token(test_token, payload)
assert access_token is not None
# Should retrieve from cache
cached = verifier._get_cached_token(test_token)
assert cached is not None
assert cached.resource == "testuser"
assert cached.scopes == ["openid", "profile"]
async def test_cache_respects_expiry(self, base_settings):
"""Test that expired tokens are not returned from cache."""
verifier = UnifiedTokenVerifier(base_settings)
# Create expired token payload
payload = {
"aud": ["test-client-id", "http://localhost:8080"],
"sub": "testuser",
"scope": "openid profile",
"exp": int(time.time() - 100), # Expired 100 seconds ago
"client_id": "test-client-id",
}
test_token = jwt.encode(payload, "secret", algorithm="HS256")
# Create and cache
access_token = verifier._create_access_token(test_token, payload)
assert access_token is not None
# Should not retrieve expired token
cached = verifier._get_cached_token(test_token)
assert cached is None
async def test_cache_clear(self, base_settings):
"""Test cache clearing."""
verifier = UnifiedTokenVerifier(base_settings)
# Create and cache token
payload = {
"aud": ["test-client-id", "http://localhost:8080"],
"sub": "testuser",
"exp": int(time.time() + 3600),
}
test_token = jwt.encode(payload, "secret", algorithm="HS256")
verifier._create_access_token(test_token, payload)
# Clear cache
verifier.clear_cache()
# Should not retrieve after clear
cached = verifier._get_cached_token(test_token)
assert cached is None
class TestMultiAudienceVerification:
"""Test multi-audience token verification."""
async def test_verify_multi_audience_with_introspection(self, base_settings):
"""Test multi-audience verification using introspection."""
verifier = UnifiedTokenVerifier(base_settings)
# Mock introspection response
introspection_response = {
"active": True,
"sub": "testuser",
"aud": ["test-client-id", "http://localhost:8080"],
"scope": "openid profile",
"exp": int(time.time() + 3600),
"client_id": "test-client-id",
}
with patch.object(
verifier, "_introspect_token", return_value=introspection_response
):
opaque_token = "opaque-token-12345"
result = await verifier._verify_mcp_audience(opaque_token)
assert result is not None
assert result.resource == "testuser"
assert result.scopes == ["openid", "profile"]
async def test_verify_multi_audience_fails_without_both_audiences(
self, base_settings
):
"""Test MCP audience verification succeeds with only MCP audience (RFC 7519 compliant)."""
verifier = UnifiedTokenVerifier(base_settings)
# Mock introspection response with only MCP audience
introspection_response = {
"active": True,
"sub": "testuser",
"aud": [
"test-client-id"
], # Only MCP audience (Nextcloud validates its own)
"scope": "openid profile",
"exp": int(time.time() + 3600),
}
with patch.object(
verifier, "_introspect_token", return_value=introspection_response
):
opaque_token = "opaque-token-12345"
result = await verifier._verify_mcp_audience(opaque_token)
# Should succeed with only MCP audience per RFC 7519
assert result is not None
assert result.resource == "testuser"
class TestExchangeModeVerification:
"""Test token exchange mode verification."""
async def test_verify_mcp_audience_only_success(self, exchange_settings):
"""Test MCP-only audience verification succeeds with MCP audience."""
verifier = UnifiedTokenVerifier(exchange_settings)
# Mock introspection response with MCP audience only
introspection_response = {
"active": True,
"sub": "testuser",
"aud": ["test-client-id"],
"scope": "openid profile",
"exp": int(time.time() + 3600),
"client_id": "test-client-id",
}
with patch.object(
verifier, "_introspect_token", return_value=introspection_response
):
opaque_token = "opaque-token-12345"
result = await verifier._verify_mcp_audience(opaque_token)
assert result is not None
assert result.resource == "testuser"
async def test_verify_mcp_audience_only_fails_without_mcp(self, exchange_settings):
"""Test MCP audience verification fails without MCP audience."""
verifier = UnifiedTokenVerifier(exchange_settings)
# Mock introspection response without MCP audience
introspection_response = {
"active": True,
"sub": "testuser",
"aud": ["http://localhost:8080"], # Wrong audience
"scope": "openid profile",
"exp": int(time.time() + 3600),
}
with patch.object(
verifier, "_introspect_token", return_value=introspection_response
):
opaque_token = "opaque-token-12345"
result = await verifier._verify_mcp_audience(opaque_token)
assert result is None
class TestIntrospection:
"""Test token introspection."""
async def test_introspect_active_token(self, base_settings):
"""Test introspection of active token."""
verifier = UnifiedTokenVerifier(base_settings)
# Mock HTTP response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"active": True,
"sub": "testuser",
"aud": ["test-client-id", "http://localhost:8080"],
"scope": "openid profile",
"exp": int(time.time() + 3600),
"client_id": "test-client-id",
}
verifier.http_client.post = AsyncMock(return_value=mock_response)
result = await verifier._introspect_token("test-token")
assert result is not None
assert result["active"] is True
assert result["sub"] == "testuser"
async def test_introspect_inactive_token(self, base_settings):
"""Test introspection of inactive token."""
verifier = UnifiedTokenVerifier(base_settings)
# Mock HTTP response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"active": False}
verifier.http_client.post = AsyncMock(return_value=mock_response)
result = await verifier._introspect_token("test-token")
assert result is None
async def test_introspect_without_endpoint(self, base_settings):
"""Test introspection when endpoint not configured."""
base_settings.introspection_uri = None
verifier = UnifiedTokenVerifier(base_settings)
result = await verifier._introspect_token("test-token")
assert result is None
class TestAccessTokenCreation:
"""Test AccessToken object creation."""
def test_create_access_token_success(self, base_settings):
"""Test successful AccessToken creation."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"sub": "testuser",
"scope": "openid profile email",
"exp": int(time.time() + 3600),
"client_id": "test-client-id",
}
token = "test-token-123"
result = verifier._create_access_token(token, payload)
assert result is not None
assert result.token == token
assert result.resource == "testuser"
assert result.scopes == ["openid", "profile", "email"]
assert result.client_id == "test-client-id"
def test_create_access_token_with_preferred_username(self, base_settings):
"""Test AccessToken creation with preferred_username fallback."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"preferred_username": "testuser", # No 'sub' claim
"scope": "openid profile",
"exp": int(time.time() + 3600),
}
token = "test-token-123"
result = verifier._create_access_token(token, payload)
assert result is not None
assert result.resource == "testuser"
def test_create_access_token_no_username(self, base_settings):
"""Test AccessToken creation fails without username."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
# No sub or preferred_username
"scope": "openid profile",
"exp": int(time.time() + 3600),
}
token = "test-token-123"
result = verifier._create_access_token(token, payload)
assert result is None
def test_create_access_token_no_expiry(self, base_settings):
"""Test AccessToken creation uses default TTL without expiry."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"sub": "testuser",
"scope": "openid profile",
# No exp claim
}
token = "test-token-123"
result = verifier._create_access_token(token, payload)
assert result is not None
# Should have set a default expiry
assert result.expires_at > int(time.time())
class TestVerifyTokenFlow:
"""Test complete verify_token flow."""
async def test_verify_token_from_cache(self, base_settings):
"""Test verify_token returns cached token."""
verifier = UnifiedTokenVerifier(base_settings)
payload = {
"aud": ["test-client-id", "http://localhost:8080"],
"sub": "testuser",
"scope": "openid profile",
"exp": int(time.time() + 3600),
}
token = jwt.encode(payload, "secret", algorithm="HS256")
# First call - should cache
result1 = verifier._create_access_token(token, payload)
assert result1 is not None
# Mock _verify_mcp_audience to ensure it's not called
with patch.object(verifier, "_verify_mcp_audience") as mock_verify:
result2 = await verifier.verify_token(token)
assert result2 is not None
assert result2.resource == "testuser"
# Should not call verification since it's cached
mock_verify.assert_not_called()
async def test_verify_token_multi_audience_mode(self, base_settings):
"""Test verify_token in multi-audience mode."""
verifier = UnifiedTokenVerifier(base_settings)
introspection_response = {
"active": True,
"sub": "testuser",
"aud": ["test-client-id", "http://localhost:8080"],
"scope": "openid profile",
"exp": int(time.time() + 3600),
}
with patch.object(
verifier, "_introspect_token", return_value=introspection_response
):
result = await verifier.verify_token("opaque-token")
assert result is not None
assert result.resource == "testuser"
async def test_verify_token_exchange_mode(self, exchange_settings):
"""Test verify_token in exchange mode."""
verifier = UnifiedTokenVerifier(exchange_settings)
introspection_response = {
"active": True,
"sub": "testuser",
"aud": ["test-client-id"], # MCP audience only
"scope": "openid profile",
"exp": int(time.time() + 3600),
}
with patch.object(
verifier, "_introspect_token", return_value=introspection_response
):
result = await verifier.verify_token("opaque-token")
assert result is not None
assert result.resource == "testuser"
Generated
+6 -5
View File
@@ -937,7 +937,7 @@ wheels = [
[[package]]
name = "mcp"
version = "1.19.0"
version = "1.20.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
@@ -946,15 +946,16 @@ dependencies = [
{ name = "jsonschema" },
{ name = "pydantic" },
{ name = "pydantic-settings" },
{ name = "pyjwt", extra = ["crypto"] },
{ name = "python-multipart" },
{ name = "pywin32", marker = "sys_platform == 'win32'" },
{ name = "sse-starlette" },
{ name = "starlette" },
{ name = "uvicorn", marker = "sys_platform != 'emscripten'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/69/2b/916852a5668f45d8787378461eaa1244876d77575ffef024483c94c0649c/mcp-1.19.0.tar.gz", hash = "sha256:213de0d3cd63f71bc08ffe9cc8d4409cc87acffd383f6195d2ce0457c021b5c1", size = 444163, upload-time = "2025-10-24T01:11:15.839Z" }
sdist = { url = "https://files.pythonhosted.org/packages/f8/22/fae38092e6c2995c03232635028510d77e7decff31b4ae79dfa0ba99c635/mcp-1.20.0.tar.gz", hash = "sha256:9ccc09eaadbfbcbbdab1c9723cfe2e0d1d9e324d7d3ce7e332ef90b09ed35177", size = 451377, upload-time = "2025-10-30T22:14:53.421Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ce/a3/3e71a875a08b6a830b88c40bc413bff01f1650f1efe8a054b5e90a9d4f56/mcp-1.19.0-py3-none-any.whl", hash = "sha256:f5907fe1c0167255f916718f376d05f09a830a215327a3ccdd5ec8a519f2e572", size = 170105, upload-time = "2025-10-24T01:11:14.151Z" },
{ url = "https://files.pythonhosted.org/packages/df/00/76fc92f4892d47fecb37131d0e95ea69259f077d84c68f6793a0d96cfe80/mcp-1.20.0-py3-none-any.whl", hash = "sha256:d0dc06f93653f7432ff89f694721c87f79876b6f93741bf628ad1e48f7ac5e5d", size = 173136, upload-time = "2025-10-30T22:14:51.078Z" },
]
[package.optional-dependencies]
@@ -974,7 +975,7 @@ wheels = [
[[package]]
name = "nextcloud-mcp-server"
version = "0.24.0"
version = "0.25.0"
source = { editable = "." }
dependencies = [
{ name = "aiosqlite" },
@@ -1013,7 +1014,7 @@ requires-dist = [
{ name = "click", specifier = ">=8.1.8" },
{ name = "httpx", specifier = ">=0.28.1,<0.29.0" },
{ name = "icalendar", specifier = ">=6.0.0,<7.0.0" },
{ name = "mcp", extras = ["cli"], specifier = ">=1.19,<1.20" },
{ name = "mcp", extras = ["cli"], specifier = ">=1.20,<1.21" },
{ name = "pillow", specifier = ">=12.0.0,<12.1.0" },
{ name = "pydantic", specifier = ">=2.11.4" },
{ name = "pyjwt", extras = ["crypto"], specifier = ">=2.8.0" },