Compare commits

...

14 Commits

Author SHA1 Message Date
github-actions[bot] 218f0bd366 bump: version 0.25.0 → 0.26.0 2025-11-08 03:48:50 +00:00
Chris Coutinho afee3e8bb4 Merge pull request #268 from cbcoutinho/fix/unified-oauth-callback-pkce
fix: Consolidate OAuth callbacks and implement PKCE for all flows
2025-11-08 04:48:27 +01:00
Chris Coutinho 050a00d8c8 Merge pull request #269 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.9.8
2025-11-08 00:45:24 +01:00
renovate-bot-cbcoutinho[bot] f59b6a6cfb chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.9.8 2025-11-07 23:09:16 +00:00
Chris Coutinho a766f4be32 test: enhance elicitation callback logging and error handling
Improve debugging capabilities for OAuth flow in elicitation callback:
- Add detailed logging for consent screen handling
- Capture screenshots when consent screen not detected or fails
- Replace networkidle wait with explicit callback URL polling
- Add 2-second grace period for server-side callback processing
- Log page title and current URL for debugging

This helps diagnose issues like expired OAuth clients or authorization failures during real elicitation testing.

Test results: All 4 elicitation integration tests passing
- test_check_logged_in_with_real_elicitation_callback ✓
- test_elicitation_callback_url_extraction ✓
- test_elicitation_stores_refresh_token ✓
- test_second_check_logged_in_does_not_elicit ✓

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-07 23:49:58 +01:00
Chris Coutinho ee053d559c chore: Remove tests 2025-11-07 22:59:57 +01:00
Chris Coutinho 71326384da feat: add real elicitation integration test with python-sdk MCP client
This commit adds proper integration testing of the login elicitation flow
(ADR-006) using python-sdk's MCP client with actual elicitation callback
support, and fixes user_id extraction to support both JWT and opaque tokens.

## Changes

### 1. Enhanced create_mcp_client_session helper (tests/conftest.py)
- Added `elicitation_callback` parameter to function signature
- Pass callback to ClientSession constructor
- Added necessary imports: RequestContext, ElicitRequestParams,
  ElicitResult, ErrorData from mcp package
- Allows fixtures to provide custom elicitation handlers

### 2. New fixture: nc_mcp_oauth_client_with_elicitation (tests/conftest.py)
- Creates MCP client with Playwright-based elicitation callback
- Callback implementation:
  - Extracts OAuth URL from elicitation message using regex
  - Uses Playwright browser to complete OAuth flow automatically
  - Handles Nextcloud login form (username/password)
  - Handles consent screen if present
  - Waits for OAuth callback completion
  - Returns ElicitResult(action="accept") on success
- Function-scoped to allow independent test state
- Tracks elicitation invocations via session.elicitation_triggered

### 3. Fixed user_id extraction for opaque tokens (oauth_tools.py)
- Created extract_user_id_from_token() helper to handle both JWT and
  opaque tokens by calling userinfo endpoint when needed
- Fixed check_logged_in to use helper instead of broken ctx.authorization
- Fixed revoke_nextcloud_access to use helper instead of ctx.context.get()
- Both tools now properly extract user_id from access tokens

### 4. Enhanced integration tests (test_elicitation_integration.py)
- Updated tests to revoke refresh tokens via MCP tool
- All 4 tests now pass:
  - test_check_logged_in_with_real_elicitation_callback: Complete flow
  - test_elicitation_callback_url_extraction: URL extraction validation
  - test_elicitation_stores_refresh_token: Token persistence verification
  - test_second_check_logged_in_does_not_elicit: No redundant elicitations

### 5. Added diagnostic logging (oauth_routes.py)
- Track user_id extraction from ID tokens during OAuth callbacks
- Log refresh token storage with user_id and flow_type

## Test Results
 4/4 tests pass

The test suite successfully validates:
- Elicitation callback is triggered when no refresh token exists
- Playwright automation completes OAuth flow
- Refresh token is stored after OAuth with correct user_id
- Tool returns "yes" after successful login
- Already-logged-in users don't get redundant elicitations

## Why This Matters
Previous tests (test_login_elicitation.py) only validated response
formats and acknowledged they couldn't test real elicitation protocol.

This test exercises the REAL MCP elicitation protocol end-to-end:
1. MCP server calls ctx.elicit()
2. python-sdk ClientSession invokes custom callback
3. Callback completes OAuth via Playwright
4. Client returns acceptance to server
5. Tool proceeds with authenticated state

This proves the python-sdk MCP client can handle elicitation in
production environments with both JWT and opaque tokens.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-07 22:55:49 +01:00
Chris Coutinho 11cdab475f feat: unify session architecture and enhance login status visibility
This commit addresses the "Login not detected" issue after completing
OAuth login via elicitation by unifying the session architecture and
adding comprehensive visibility into background session status.

## Changes

### 1. Enhanced check_logged_in with comprehensive logging (oauth_tools.py)
- Added detailed logging at each step of token lookup
- Implemented fallback strategy: first search by provisioning_client_id,
  then fall back to user_id lookup
- This allows detection of refresh tokens created via any flow
  (elicitation or browser login)
- Log messages include flow_type, provisioned_at, and provisioning_client_id
  for debugging

### 2. Unified session architecture (browser_oauth_routes.py)
- Browser login now stores provisioning_client_id=state when saving
  refresh token
- This makes browser and elicitation flows consistent - both can be
  found by the same state parameter
- Treats Flow 2 (elicitation) and browser login as the same "background
  session"

### 3. Enhanced /user/page with session status (userinfo_routes.py)
- Added comprehensive background access section showing:
  - Background Access: Granted/Not Granted (with visual indicators)
  - Flow Type: browser/flow2/hybrid
  - Provisioned At: timestamp
  - Token Audience: nextcloud/mcp
  - Scopes: detailed scope list
- Status displayed regardless of which flow created the session
  (browser login or elicitation)

### 4. Added revoke functionality (userinfo_routes.py, app.py)
- New POST endpoint: /user/revoke
- Allows users to revoke background access (delete refresh token)
- Browser session cookie remains valid for UI access
- Confirmation dialog before revocation
- Success page with auto-redirect back to /user/page
- Registered route in app.py browser_routes

## Testing
All tests pass:
- 6/6 login elicitation tests pass
- 21/21 core OAuth tests pass
- Comprehensive logging helps debug future issues

## Fixes
Resolves: "Login not detected. Please ensure you completed the login
at the provided URL before clicking OK."

The issue occurred because elicitation and browser login created
separate sessions. Now they are unified under the same architecture.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-07 21:50:55 +01:00
Chris Coutinho 281d28c7cd test: Add comprehensive elicitation URL and refresh token validation
Enhanced test suite to validate:
1. Elicitation URL format and Flow 2 endpoint routing
2. Server-side refresh token validation via check_provisioning_status API
3. Proper separation of concerns - tests use MCP server API, not direct storage access

The refresh token validation test validates server responses:
- is_provisioned=true: Server has valid refresh token
- is_provisioned=false: No token or invalid token
- Error response: Token validation failed

This ensures the MCP server properly validates refresh tokens internally
and reports status correctly through its public API.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-07 21:21:58 +01:00
Chris Coutinho 0c9a9ea24d fix: Consolidate OAuth callbacks and implement PKCE for all flows
This PR fixes multiple OAuth-related issues:

## Unified OAuth Callback
- Consolidated `/oauth/callback-nextcloud` and `/oauth/login-callback` into single `/oauth/callback` endpoint
- Flow type determined by session lookup via state parameter (no query params in redirect_uri)
- Fixes redirect_uri validation issues with IdPs requiring exact match
- Legacy endpoints kept as aliases for backwards compatibility

## PKCE Implementation
- Implemented PKCE (RFC 7636) for Flow 2 (resource provisioning)
  - Generate code_verifier and code_challenge
  - Store code_verifier in session storage
  - Retrieve and use in token exchange
- Fixed PKCE for browser login (integrated mode)
  - Previously only worked for external IdP (Keycloak)
  - Now works for both Nextcloud OIDC and external IdP

## Login Elicitation Fixes (ADR-006)
- Fixed elicitation URL to route through MCP server endpoint
  - Changed from direct Nextcloud URL to `/oauth/authorize-nextcloud`
  - Ensures PKCE is properly handled by server
- Fixed login detection after OAuth flow completes
  - Look up refresh token by state parameter instead of user_id
  - Works even when Flow 1 token not present
- Added `get_refresh_token_by_provisioning_client_id()` method

## Session Authentication
- Fixed `/user/page` redirect loop
  - Shared oauth_context with mounted browser_app
  - SessionAuthBackend can now validate sessions correctly

## Tests
- Added comprehensive login elicitation test suite
- Updated scope authorization test expectations
- All 43 OAuth tests passing

## Files Changed
- `app.py`: Shared oauth_context, unified callback route
- `oauth_routes.py`: Unified callback, PKCE for Flow 2
- `browser_oauth_routes.py`: PKCE for integrated mode
- `oauth_tools.py`: Fixed elicitation URL generation
- `refresh_token_storage.py`: Added lookup by provisioning_client_id
- `test_login_elicitation.py`: New test suite

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-07 21:08:55 +01:00
Chris Coutinho dfa6d08ba7 Merge pull request #266 from cbcoutinho/renovate/quay.io-keycloak-keycloak-26.x
chore(deps): update quay.io/keycloak/keycloak docker tag to v26.4.4
2025-11-07 12:24:57 +01:00
renovate-bot-cbcoutinho[bot] c5395041d3 chore(deps): update quay.io/keycloak/keycloak docker tag to v26.4.4 2025-11-07 11:06:04 +00:00
Chris Coutinho 50cda2209f Merge pull request #264 from cbcoutinho/renovate/docker.io-library-nextcloud-32.0.1
chore(deps): update docker.io/library/nextcloud:32.0.1 docker digest to 5b043f7
2025-11-07 01:01:06 +01:00
renovate-bot-cbcoutinho[bot] d34e17a68b chore(deps): update docker.io/library/nextcloud:32.0.1 docker digest to 5b043f7 2025-11-06 23:17:53 +00:00
22 changed files with 1791 additions and 127 deletions
+11
View File
@@ -1,3 +1,14 @@
## v0.26.0 (2025-11-08)
### Feat
- add real elicitation integration test with python-sdk MCP client
- unify session architecture and enhance login status visibility
### Fix
- Consolidate OAuth callbacks and implement PKCE for all flows
## v0.25.0 (2025-11-05)
### BREAKING CHANGE
+15 -3
View File
@@ -167,23 +167,35 @@ docker compose exec db mariadb -u root -ppassword nextcloud -e \
### Progressive Consent Architecture (ADR-004)
**Status**: Always enabled in OAuth mode (default)
**Important**: Progressive consent is a *mechanism* for granting access, not a feature flag. The architecture is always present in OAuth mode. Whether provisioning tools are available is controlled by `ENABLE_OFFLINE_ACCESS`.
**What is Progressive Consent?**
- Dual OAuth flow architecture that separates client authentication (Flow 1) from resource provisioning (Flow 2)
- Flow 1: MCP client authenticates directly to IdP with resource scopes (notes:*, calendar:*, etc.)
- Token audience: "mcp-server"
- Client receives resource-scoped token for MCP session
- Flow 2: Server explicitly provisions Nextcloud access via separate login
- Flow 2: Server explicitly provisions Nextcloud access via separate login (only when `ENABLE_OFFLINE_ACCESS=true`)
- Server requests: openid, profile, email, offline_access
- Token audience: "nextcloud"
- Server receives refresh token for offline access
- Client never sees this token
- Provides clear separation between session tokens and offline access tokens
**Modes:**
- **Pass-through mode** (`ENABLE_OFFLINE_ACCESS=false`, default):
- No Flow 2 provisioning
- Server uses client's token to access Nextcloud (pass-through)
- No provisioning tools available
- Suitable for stateless, client-driven operations
- **Offline access mode** (`ENABLE_OFFLINE_ACCESS=true`):
- Flow 2 provisioning available
- Server stores refresh tokens for background operations
- Provisioning tools available: `provision_nextcloud_access`, `check_logged_in`
- Suitable for background jobs and server-initiated operations
**When to use OAuth mode:**
- Multi-user deployments
- Background jobs requiring offline access
- Background jobs requiring offline access (with `ENABLE_OFFLINE_ACCESS=true`)
- Enhanced security with separate authorization contexts
- Explicit user control over resource access
+1 -1
View File
@@ -1,4 +1,4 @@
FROM ghcr.io/astral-sh/uv:0.9.7-python3.11-alpine@sha256:0006b77df7ebf46e68959fdc8d3af9d19f1adfae8c2e7e77907ad257e5d05be4
FROM ghcr.io/astral-sh/uv:0.9.8-python3.11-alpine@sha256:6c842c49ad032f46b62f32a7e7779f45f12671a8e0d82ea24c766ab62d58b396
# Install dependencies
# 1. git (required for caldav dependency from git)
+2 -2
View File
@@ -2,8 +2,8 @@ apiVersion: v2
name: nextcloud-mcp-server
description: A Helm chart for Nextcloud MCP Server - enables AI assistants to interact with Nextcloud
type: application
version: 0.25.0
appVersion: "0.25.0"
version: 0.26.0
appVersion: "0.26.0"
keywords:
- nextcloud
- mcp
+2 -2
View File
@@ -21,7 +21,7 @@ services:
restart: always
app:
image: docker.io/library/nextcloud:32.0.1@sha256:40b1b5dc35bcc9a0e922ec847451e43fa14222c9a99dcd5dfcc03b08f6c15775
image: docker.io/library/nextcloud:32.0.1@sha256:5b043f7ea2f609d5ff5635f475c30d303bec17775a5c3f7fa435e3818e669120
restart: always
ports:
- 0.0.0.0:8080:80
@@ -117,7 +117,7 @@ services:
- oauth-tokens:/app/data
keycloak:
image: quay.io/keycloak/keycloak:26.4.2@sha256:3617b09bb4b7510a8d8d9b9fc5707399e2d70688dbcc2f8fb013a144829be1b9
image: quay.io/keycloak/keycloak:26.4.4@sha256:c6459d5fae1b759f5d667ebdc6237ab3121379c3494e213898569014ede1846d
command:
- "start-dev"
- "--import-realm"
+222 -8
View File
@@ -1,13 +1,33 @@
# ADR-006: Progressive Consent via URL Elicitation (SEP-1036)
**Status**: Proposed
**Date**: 2025-01-05
**Status**: Partially Implemented (Interim Workaround)
**Date**: 2025-01-05 (Updated: 2025-01-07)
**Related**: [SEP-1036](https://github.com/modelcontextprotocol/specification/pull/887), ADR-004
**Depends On**: ADR-005 (token validation)
## Context
The current progressive consent implementation (ADR-004) requires users to manually visit OAuth URLs returned by MCP tools. This creates a poor user experience:
### What is Progressive Consent?
**Progressive consent is a mechanism, not a feature**. It describes HOW users grant the MCP server access to Nextcloud resources through OAuth elicitation. The server can operate in two modes:
1. **Pass-through mode (ENABLE_OFFLINE_ACCESS=false)**:
- No refresh tokens requested or stored
- Server passes through client's access token to Nextcloud
- No provisioning tools available
- Suitable for stateless, client-driven operations
2. **Offline access mode (ENABLE_OFFLINE_ACCESS=true)**:
- Server requests `offline_access` scope and stores refresh tokens
- Enables background operations and server-initiated API calls
- Provisioning tools available (`provision_nextcloud_access`, `check_logged_in`)
- Requires explicit user consent via OAuth Flow 2
**Single-user mode (BasicAuth)** doesn't use progressive consent at all - credentials are directly available.
### Current User Experience Issues
The current offline access provisioning flow (ADR-004) requires users to manually visit OAuth URLs returned by MCP tools. This creates a poor user experience:
1. User calls `provision_nextcloud_access` tool
2. Tool returns a URL as text in the response
@@ -346,7 +366,15 @@ capabilities = {
### 6. Environment Variables
**New variables**:
**Primary control**:
```bash
# ENABLE_OFFLINE_ACCESS: Controls whether server requests refresh tokens and enables provisioning tools
# Default: false (pass-through mode)
# Set to true to enable offline access mode with Flow 2 provisioning
ENABLE_OFFLINE_ACCESS=true
```
**Future variables** (when URL elicitation is implemented):
```bash
# ELICITATION_CALLBACK_URL: Base URL for OAuth callbacks with elicitation tracking
# Default: NEXTCLOUD_MCP_SERVER_URL + /oauth/callback
@@ -357,9 +385,10 @@ ELICITATION_CALLBACK_URL=http://localhost:8000/oauth/callback
ELICITATION_TIMEOUT_SECONDS=300
```
**Removed variables** (no longer needed):
**Removed variables**:
```bash
# ENABLE_PROGRESSIVE_CONSENT - removed, now always enabled in OAuth mode
# ENABLE_PROGRESSIVE_CONSENT - Removed. Progressive consent is a mechanism, not a feature toggle.
# Use ENABLE_OFFLINE_ACCESS to control whether provisioning tools are available.
# MCP_SERVER_CLIENT_ID - merged into OIDC_CLIENT_ID
```
@@ -626,6 +655,180 @@ async def validate_elicitation_id(elicitation_id: str, user_id: str) -> bool:
**Rejection reason**: Follow spec pattern (polling via elicitation/track)
## Interim Implementation: Inline Form Elicitation (Pre-SEP-1036)
**Note**: SEP-1036 (URL mode elicitation) is not yet available in the stable MCP Python SDK. As a temporary workaround, we've implemented a simplified version using the current **inline form elicitation** API.
### What Changed
Instead of waiting for URL mode elicitation, we implemented a `check_logged_in` tool that:
1. Checks if the user has completed Flow 2 (resource provisioning)
2. If logged in, returns `"yes"`
3. If not logged in, uses **inline form elicitation** to prompt the user
### Implementation Details
**New Tool**: `check_logged_in`
```python
# nextcloud_mcp_server/server/oauth_tools.py
class LoginConfirmation(BaseModel):
"""Schema for login confirmation elicitation."""
acknowledged: bool = Field(
default=False,
description="Check this box after completing login at the provided URL",
)
@mcp.tool(name="check_logged_in")
@require_scopes("openid")
async def tool_check_logged_in(ctx: Context, user_id: Optional[str] = None) -> str:
"""Check if user is logged in and elicit login if needed."""
# Check if already logged in
status = await get_provisioning_status(ctx, user_id)
if status.is_provisioned:
return "yes"
# Generate OAuth URL for Flow 2
auth_url = generate_oauth_url_for_flow2(...)
# Use inline form elicitation (current MCP API)
result = await ctx.elicit(
message=f"Please log in to Nextcloud at the following URL:\n\n{auth_url}\n\nAfter completing the login, check the box below and click OK.",
schema=LoginConfirmation,
)
if result.action == "accept":
# Verify login succeeded
status = await get_provisioning_status(ctx, user_id)
return "yes" if status.is_provisioned else "Login not detected"
elif result.action == "decline":
return "Login declined by user."
else:
return "Login cancelled by user."
```
**OAuth Routes** (added to `app.py`):
```python
# Flow 2 routes for resource provisioning
routes.append(
Route("/oauth/authorize-nextcloud", oauth_authorize_nextcloud, methods=["GET"])
)
routes.append(
Route("/oauth/callback-nextcloud", oauth_callback_nextcloud, methods=["GET"])
)
```
### User Experience
```
User: *calls check_logged_in tool*
MCP Client: Displays form elicitation
┌─────────────────────────────────────────────────────────┐
│ Please log in to Nextcloud at the following URL: │
│ │
│ http://localhost:8000/oauth/authorize-nextcloud?... │
│ │
│ After completing the login, check the box below and │
│ click OK. │
│ │
│ ☐ Check this box after completing login │
│ │
│ [Accept] [Decline] [Cancel] │
└─────────────────────────────────────────────────────────┘
User: *copies URL, opens in browser, completes OAuth*
User: *checks box and clicks Accept*
MCP Server: Verifies login and returns "yes"
```
### Limitations of Interim Approach
1. **Manual URL Handling**: User must manually copy and paste the URL (not clickable)
2. **No Automatic Browser Opening**: Client doesn't automatically open the URL
3. **No Progress Tracking**: Can't track OAuth completion status in real-time
4. **URL in Message Text**: Login URL embedded in plain text message (not as structured field)
5. **Client-Side Confirmation**: Relies on user clicking "OK" after OAuth (honor system)
### Why Not Use URL Mode Now?
The current stable MCP Python SDK (`main` branch) only supports **inline form elicitation**:
```python
# Current API (no 'mode' parameter)
class ElicitRequestParams(RequestParams):
message: str
requestedSchema: ElicitRequestedSchema
# No 'mode', 'url', or 'elicitationId' fields
```
URL mode elicitation (`mode: "url"`) is only available in the SEP-1036 branch, which has not been merged to `main` yet.
### Migration to URL Mode (When SEP-1036 Lands)
Once SEP-1036 is merged and available in the stable SDK, we will migrate to URL mode elicitation:
**Before (Current Workaround)**:
```python
result = await ctx.elicit(
message=f"Please log in at: {auth_url}\n\nClick OK after login.",
schema=LoginConfirmation,
)
```
**After (URL Mode)**:
```python
result = await ctx.session.elicit_url(
message="Please log in to Nextcloud to authorize this MCP server.",
url=auth_url,
elicitation_id=elicitation_id,
)
```
**Benefits of migration**:
- Automatic URL opening (with user consent)
- Clickable URLs in client UI
- Progress tracking via `elicitation/track`
- Better security (URL not in message text)
- Auto-retry support
### Testing
Integration tests validate the current inline form elicitation:
```python
# tests/server/oauth/test_login_elicitation.py
async def test_check_logged_in_already_authenticated(nc_mcp_oauth_client):
"""Test immediate 'yes' for authenticated users."""
result = await nc_mcp_oauth_client.call_tool("check_logged_in", arguments={})
assert "yes" in result.content[0].text.lower()
async def test_check_logged_in_url_format(nc_mcp_oauth_client):
"""Test that login URL (when needed) contains correct OAuth parameters."""
result = await nc_mcp_oauth_client.call_tool("check_logged_in", arguments={})
response_text = result.content[0].text
# If URL present, validate OAuth parameters
if "http" in response_text:
assert "response_type=code" in response_text
assert "client_id=" in response_text
assert "redirect_uri=" in response_text
assert "openid" in response_text
```
### Future Work
- **Monitor SEP-1036**: Watch for merge to MCP Python SDK `main` branch
- **Implement URL Mode**: Once available, migrate `check_logged_in` to use `ctx.session.elicit_url()`
- **Add Progress Tracking**: Implement `elicitation/track` endpoint for OAuth completion status
- **Implement Error-Triggered Elicitation**: Use `@require_provisioning` decorator to return `ElicitationRequired` errors
- **Remove Manual Workaround**: Deprecate inline form approach once URL mode is stable
## References
- [SEP-1036: URL Mode Elicitation](https://github.com/modelcontextprotocol/specification/pull/887)
@@ -636,16 +839,27 @@ async def validate_elicitation_id(elicitation_id: str, user_id: str) -> bool:
## Implementation Checklist
### Interim Implementation (Inline Form Elicitation)
- [x] Create `check_logged_in` tool with inline form elicitation
- [x] Register Flow 2 OAuth routes (`/oauth/authorize-nextcloud`, `/oauth/callback-nextcloud`)
- [x] Write integration tests for login elicitation flow
- [x] Update ADR-006 with interim implementation documentation
- [x] Add `LoginConfirmation` schema for elicitation
- [ ] Run tests to validate implementation
### Future Work (URL Mode Elicitation - Post SEP-1036)
- [ ] Implement `@require_provisioning` decorator with ElicitationRequired error
- [ ] Add `elicitation/track` request handler
- [ ] Update OAuth callback to mark elicitations complete
- [ ] Add elicitation storage (ID, user, status, timestamps)
- [ ] Update all Nextcloud tools with `@require_provisioning`
- [ ] Add URL elicitation capability declaration
- [ ] Write integration tests for elicitation flow
- [ ] Write tests for progress tracking
- [ ] Update documentation with elicitation examples
- [ ] Update documentation with URL mode examples
- [ ] Add migration guide for manual tools → elicitation
- [ ] Migrate `check_logged_in` from inline form to URL mode
- [ ] Keep manual tools with deprecation warnings (v0.26-0.27)
- [ ] Remove manual tools (v0.28.0)
- [ ] Update CHANGELOG.md with migration timeline
+36 -1
View File
@@ -751,6 +751,40 @@
"display.on.consent.screen": "true",
"consent.screen.text": "Create, update, and delete tasks"
}
},
{
"name": "default-audience",
"protocol": "openid-connect",
"attributes": {
"include.in.token.scope": "false",
"display.on.consent.screen": "false",
"gui.order": "",
"consent.screen.text": ""
},
"protocolMappers": [
{
"name": "mcp-server-audience",
"protocol": "openid-connect",
"protocolMapper": "oidc-audience-mapper",
"consentRequired": false,
"config": {
"included.client.audience": "nextcloud-mcp-server",
"access.token.claim": "true",
"id.token.claim": "false"
}
},
{
"name": "mcp-url-audience",
"protocol": "openid-connect",
"protocolMapper": "oidc-audience-mapper",
"consentRequired": false,
"config": {
"included.custom.audience": "http://localhost:8002",
"access.token.claim": "true",
"id.token.claim": "false"
}
}
]
}
],
"components": {
@@ -791,7 +825,8 @@
"profile",
"email",
"roles",
"web-origins"
"web-origins",
"default-audience"
],
"defaultOptionalClientScopes": [
"offline_access",
+73 -7
View File
@@ -217,6 +217,9 @@ class OAuthAppContext:
refresh_token_storage: Optional["RefreshTokenStorage"] = None
oauth_client: Optional[object] = None # NextcloudOAuthClient or KeycloakOAuthClient
oauth_provider: str = "nextcloud" # "nextcloud" or "keycloak"
server_client_id: Optional[str] = (
None # MCP server's OAuth client ID (static or DCR)
)
def is_oauth_mode() -> bool:
@@ -292,8 +295,7 @@ async def load_oauth_client_credentials(
logger.info("Dynamic client registration available")
mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000")
redirect_uris = [
f"{mcp_server_url}/oauth/callback", # MCP OAuth flow
f"{mcp_server_url}/oauth/login-callback", # Browser OAuth flow for /user/page
f"{mcp_server_url}/oauth/callback", # Unified callback (flow determined by query param)
]
# MCP server DCR: Register with ALL supported scopes
@@ -633,6 +635,8 @@ async def setup_oauth_config():
from nextcloud_mcp_server.auth.keycloak_oauth import KeycloakOAuthClient
mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000")
# Note: This redirect_uri is for OAuth client initialization, not used for actual redirects
# since this client is used for backend token operations (exchange, refresh)
redirect_uri = f"{mcp_server_url}/oauth/callback"
# Extract base URL and realm from discovery URL
@@ -738,6 +742,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
refresh_token_storage=refresh_token_storage,
oauth_client=oauth_client,
oauth_provider=oauth_provider,
server_client_id=client_id,
)
finally:
logger.info("Shutting down MCP server")
@@ -793,16 +798,27 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
f"Unknown app: {app_name}. Available apps: {list(available_apps.keys())}"
)
# Register OAuth provisioning tools (only when offline access/Progressive Consent is used)
# Register OAuth provisioning tools (only when offline access is enabled)
# With token exchange enabled (external IdP), provisioning is not needed for MCP operations
enable_token_exchange = (
os.getenv("ENABLE_TOKEN_EXCHANGE", "false").lower() == "true"
)
if oauth_enabled and not enable_token_exchange:
logger.info("Registering OAuth provisioning tools for Progressive Consent")
enable_offline_access_for_tools = os.getenv(
"ENABLE_OFFLINE_ACCESS", "false"
).lower() in (
"true",
"1",
"yes",
)
if oauth_enabled and enable_offline_access_for_tools and not enable_token_exchange:
logger.info("Registering OAuth provisioning tools for offline access")
register_oauth_tools(mcp)
elif oauth_enabled and enable_token_exchange:
logger.info("Skipping provisioning tools registration (token exchange enabled)")
elif oauth_enabled and not enable_offline_access_for_tools:
logger.info(
"Skipping provisioning tools registration (offline access not enabled)"
)
# Override list_tools to filter based on user's token scopes (OAuth mode only)
if oauth_enabled:
@@ -876,7 +892,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
)
scopes = os.getenv("NEXTCLOUD_OIDC_SCOPES", "")
app.state.oauth_context = {
oauth_context_dict = {
"storage": refresh_token_storage,
"oauth_client": oauth_client,
"token_verifier": token_verifier, # For querying IdP userinfo endpoint
@@ -891,6 +907,19 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
"oauth_provider": oauth_provider,
},
}
app.state.oauth_context = oauth_context_dict
# Also set oauth_context on browser_app for session authentication
# browser_app is in the same function scope (defined later in create_app)
# We need to find it in the mounted routes
for route in app.routes:
if isinstance(route, Mount) and route.path == "/user":
route.app.state.oauth_context = oauth_context_dict
logger.info(
"OAuth context shared with browser_app for session auth"
)
break
logger.info(
f"OAuth context initialized for login routes (client_id={client_id[:16]}...)"
)
@@ -1031,6 +1060,38 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
routes.append(Route("/oauth/authorize", oauth_authorize, methods=["GET"]))
logger.info("OAuth login routes enabled: /oauth/authorize (Flow 1)")
# Add unified OAuth callback endpoint supporting both flows
from nextcloud_mcp_server.auth.oauth_routes import (
oauth_authorize_nextcloud,
oauth_callback,
oauth_callback_nextcloud,
)
routes.append(Route("/oauth/callback", oauth_callback, methods=["GET"]))
logger.info(
"OAuth unified callback enabled: /oauth/callback?flow={browser|provisioning}"
)
# Add OAuth resource provisioning routes (ADR-004 Progressive Consent Flow 2)
routes.append(
Route(
"/oauth/authorize-nextcloud",
oauth_authorize_nextcloud,
methods=["GET"],
)
)
# Keep old callback endpoint as backwards-compatible alias
routes.append(
Route(
"/oauth/callback-nextcloud",
oauth_callback_nextcloud,
methods=["GET"],
)
)
logger.info(
"OAuth resource provisioning routes enabled: /oauth/authorize-nextcloud, /oauth/callback-nextcloud (Flow 2, legacy)"
)
# Add browser OAuth login routes (OAuth mode only)
if oauth_enabled:
from nextcloud_mcp_server.auth.browser_oauth_routes import (
@@ -1042,6 +1103,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
routes.append(
Route("/oauth/login", oauth_login, methods=["GET"], name="oauth_login")
)
# Keep old callback endpoint as backwards-compatible alias
routes.append(
Route(
"/oauth/login-callback",
@@ -1054,13 +1116,14 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
Route("/oauth/logout", oauth_logout, methods=["GET"], name="oauth_logout")
)
logger.info(
"Browser OAuth routes enabled: /oauth/login, /oauth/login-callback, /oauth/logout"
"Browser OAuth routes enabled: /oauth/login, /oauth/login-callback (legacy), /oauth/logout"
)
# Add user info routes (available in both BasicAuth and OAuth modes)
# These require session authentication, so we wrap them in a separate app
from nextcloud_mcp_server.auth.session_backend import SessionAuthBackend
from nextcloud_mcp_server.auth.userinfo_routes import (
revoke_session,
user_info_html,
user_info_json,
)
@@ -1070,6 +1133,9 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
browser_routes = [
Route("/", user_info_json, methods=["GET"]), # /user/ → user_info_json
Route("/page", user_info_html, methods=["GET"]), # /user/page → user_info_html
Route(
"/revoke", revoke_session, methods=["POST"], name="revoke_session_endpoint"
), # /user/revoke → revoke_session
]
browser_app = Starlette(routes=browser_routes)
@@ -4,9 +4,11 @@ Separate from MCP OAuth flow - these routes establish browser sessions
for accessing admin UI endpoints like /user/page.
"""
import hashlib
import logging
import os
import secrets
from base64 import urlsafe_b64encode
from urllib.parse import urlencode
import httpx
@@ -53,39 +55,36 @@ async def oauth_login(request: Request) -> RedirectResponse | JSONResponse:
# Build OAuth authorization URL
mcp_server_url = oauth_config["mcp_server_url"]
callback_uri = f"{mcp_server_url}/oauth/login-callback"
callback_uri = f"{mcp_server_url}/oauth/callback"
# Request only basic OIDC scopes for browser session
# Note: Nextcloud app scopes (notes:read, etc.) are for MCP client access tokens,
# not for the MCP server's own browser authentication
scopes = "openid profile email offline_access"
code_challenge = ""
code_verifier = ""
# Generate PKCE values for ALL modes (both external and integrated IdP require PKCE)
code_verifier = secrets.token_urlsafe(32)
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = urlsafe_b64encode(digest).decode().rstrip("=")
# Store code_verifier in session for retrieval during callback (using state as key)
await storage.store_oauth_session(
session_id=state, # Use state as session ID
client_id="browser-ui",
client_redirect_uri="/user/page",
state=state,
code_challenge=code_challenge,
code_challenge_method="S256",
mcp_authorization_code=code_verifier, # Store code_verifier here temporarily
flow_type="browser",
ttl_seconds=600, # 10 minutes
)
if oauth_client:
# External IdP mode (Keycloak)
# Keycloak requires PKCE, so generate code_verifier and code_challenge
if not oauth_client.authorization_endpoint:
await oauth_client.discover()
# Generate PKCE values
code_verifier, code_challenge = oauth_client.generate_pkce_challenge()
# Store code_verifier temporarily (using state as key)
# We'll retrieve it in the callback using the state parameter
await storage.store_oauth_session(
session_id=state, # Use state as session ID
client_id="browser-ui",
client_redirect_uri="/user/page",
state=state,
code_challenge=code_challenge,
code_challenge_method="S256",
mcp_authorization_code=code_verifier, # Store code_verifier here temporarily
flow_type="browser",
ttl_seconds=600, # 10 minutes
)
idp_params = {
"client_id": oauth_client.client_id,
"redirect_uri": callback_uri,
@@ -138,6 +137,8 @@ async def oauth_login(request: Request) -> RedirectResponse | JSONResponse:
"response_type": "code",
"scope": scopes,
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"prompt": "consent", # Ensure refresh token
}
@@ -213,20 +214,18 @@ async def oauth_login_callback(request: Request) -> RedirectResponse | HTMLRespo
oauth_client = oauth_ctx["oauth_client"]
oauth_config = oauth_ctx["config"]
# Retrieve code_verifier from session storage (if using PKCE)
# Retrieve code_verifier from session storage (PKCE required for all modes)
code_verifier = ""
if oauth_client:
# For Keycloak (external IdP), we stored the code_verifier in the session
oauth_session = await storage.get_oauth_session(state)
if oauth_session:
# code_verifier was stored in mcp_authorization_code field
code_verifier = oauth_session.get("mcp_authorization_code", "")
# Clean up the temporary session
# Note: We don't have delete_oauth_session method, but it will expire after TTL
oauth_session = await storage.get_oauth_session(state)
if oauth_session:
# code_verifier was stored in mcp_authorization_code field
code_verifier = oauth_session.get("mcp_authorization_code", "")
# Clean up the temporary session
# Note: We don't have delete_oauth_session method, but it will expire after TTL
# Exchange authorization code for tokens
mcp_server_url = oauth_config["mcp_server_url"]
callback_uri = f"{mcp_server_url}/oauth/login-callback"
callback_uri = f"{mcp_server_url}/oauth/callback"
try:
if oauth_client:
@@ -263,16 +262,22 @@ async def oauth_login_callback(request: Request) -> RedirectResponse | HTMLRespo
discovery = response.json()
token_endpoint = discovery["token_endpoint"]
token_params = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": callback_uri,
"client_id": oauth_config["client_id"],
"client_secret": oauth_config["client_secret"],
}
# Add code_verifier for PKCE (required by Nextcloud OIDC)
if code_verifier:
token_params["code_verifier"] = code_verifier
async with httpx.AsyncClient() as http_client:
response = await http_client.post(
token_endpoint,
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": callback_uri,
"client_id": oauth_config["client_id"],
"client_secret": oauth_config["client_secret"],
},
data=token_params,
)
response.raise_for_status()
token_data = response.json()
@@ -336,13 +341,18 @@ async def oauth_login_callback(request: Request) -> RedirectResponse | HTMLRespo
# Store refresh token (for background jobs ONLY)
if refresh_token:
logger.info(f"Storing refresh token for user_id: {user_id}")
logger.info(f" State parameter (provisioning_client_id): {state[:16]}...")
await storage.store_refresh_token(
user_id=user_id,
refresh_token=refresh_token,
expires_at=None,
flow_type="browser", # Browser-based login flow
provisioning_client_id=state, # Store state for unified session lookup
)
logger.info(f"✓ Refresh token stored successfully for user_id: {user_id}")
logger.info(
f" Token can now be found via provisioning_client_id={state[:16]}..."
)
else:
logger.warning("No refresh token in token response - cannot store session")
@@ -90,6 +90,8 @@ class KeycloakOAuthClient:
)
# Parse server URL to construct redirect URI
# Note: This is for OAuth client initialization, not used for actual redirects
# since this client is used for backend token operations (exchange, refresh)
parsed_url = urlparse(server_url)
redirect_uri = f"{parsed_url.scheme}://{parsed_url.netloc}/oauth/callback"
+152 -16
View File
@@ -1,7 +1,7 @@
"""
OAuth 2.0 Login Routes for ADR-004 Progressive Consent Architecture
OAuth 2.0 Login Routes for ADR-004 (Offline Access Architecture)
Implements dual OAuth flows with explicit provisioning:
Implements dual OAuth flows with optional offline access provisioning:
Flow 1: Client Authentication - MCP client authenticates directly to IdP
- Client requests: Nextcloud MCP resource scopes (notes:*, calendar:*, etc.)
@@ -19,8 +19,11 @@ Flow 2: Resource Provisioning - MCP server gets delegated Nextcloud access
"""
import hashlib
import logging
import os
import secrets
from base64 import urlsafe_b64encode
from urllib.parse import urlencode
import httpx
@@ -118,7 +121,7 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
status_code=400,
)
# Validate client_id (required for Progressive Consent Flow 1)
# Validate client_id (required for Flow 1)
if not client_id:
return JSONResponse(
{
@@ -168,7 +171,7 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
# The MCP server does NOT see the IdP authorization code!
logger.info(
f"Starting Progressive Consent Flow 1 - no server session needed, "
f"Starting Flow 1 - no server session needed, "
f"client will handle IdP response directly at {redirect_uri}"
)
@@ -188,7 +191,7 @@ async def oauth_authorize(request: Request) -> RedirectResponse | JSONResponse:
# Use client's own client_id (client must be pre-registered at IdP)
idp_client_id = client_id
logger.info("Flow 1 (Progressive Consent): Direct client auth to IdP")
logger.info("Flow 1: Direct client auth to IdP")
logger.info(f" Client ID: {client_id}")
logger.info(f" Client will receive IdP code directly at: {callback_uri}")
logger.info(f" Scopes: {scopes} (resource access for MCP tools)")
@@ -314,12 +317,31 @@ async def oauth_authorize_nextcloud(
)
mcp_server_url = oauth_config["mcp_server_url"]
callback_uri = f"{mcp_server_url}/oauth/callback-nextcloud"
callback_uri = f"{mcp_server_url}/oauth/callback"
# Flow 2: Server only needs identity + offline access (no resource scopes)
# Resource scopes are requested by client in Flow 1
scopes = "openid profile email offline_access"
# Generate PKCE values (required by Nextcloud OIDC)
code_verifier = secrets.token_urlsafe(32)
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = urlsafe_b64encode(digest).decode().rstrip("=")
# Store code_verifier in session for retrieval during callback
storage = oauth_ctx["storage"]
await storage.store_oauth_session(
session_id=state,
client_id=mcp_server_client_id,
client_redirect_uri=callback_uri,
state=state,
code_challenge=code_challenge,
code_challenge_method="S256",
mcp_authorization_code=code_verifier, # Store code_verifier here temporarily
flow_type="flow2",
ttl_seconds=600, # 10 minutes
)
# Get authorization endpoint
discovery_url = oauth_config.get("discovery_url")
if not discovery_url:
@@ -358,6 +380,8 @@ async def oauth_authorize_nextcloud(
"response_type": "code",
"scope": scopes,
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"prompt": "consent", # Force consent to show resource access
"access_type": "offline", # Request refresh token
"resource": oauth_config["nextcloud_resource_uri"], # Nextcloud audience
@@ -416,6 +440,16 @@ async def oauth_callback_nextcloud(request: Request):
storage: RefreshTokenStorage = oauth_ctx["storage"]
oauth_config = oauth_ctx["config"]
# Retrieve code_verifier from session storage (PKCE required by Nextcloud OIDC)
code_verifier = ""
oauth_session = await storage.get_oauth_session(state)
if oauth_session:
# code_verifier was stored in mcp_authorization_code field
code_verifier = oauth_session.get("mcp_authorization_code", "")
logger.info(
f"Retrieved code_verifier for Flow 2 callback (state={state[:16]}...)"
)
# Exchange code for tokens
mcp_server_client_id = os.getenv(
"MCP_SERVER_CLIENT_ID", oauth_config.get("client_id")
@@ -424,7 +458,7 @@ async def oauth_callback_nextcloud(request: Request):
"MCP_SERVER_CLIENT_SECRET", oauth_config.get("client_secret")
)
mcp_server_url = oauth_config["mcp_server_url"]
callback_uri = f"{mcp_server_url}/oauth/callback-nextcloud"
callback_uri = f"{mcp_server_url}/oauth/callback"
discovery_url = oauth_config.get("discovery_url")
async with httpx.AsyncClient() as http_client:
@@ -433,17 +467,24 @@ async def oauth_callback_nextcloud(request: Request):
discovery = response.json()
token_endpoint = discovery["token_endpoint"]
# Build token exchange params
token_params = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": callback_uri,
"client_id": mcp_server_client_id,
"client_secret": mcp_server_client_secret,
}
# Add code_verifier for PKCE (required by Nextcloud OIDC)
if code_verifier:
token_params["code_verifier"] = code_verifier
# Exchange code for tokens
async with httpx.AsyncClient() as http_client:
response = await http_client.post(
token_endpoint,
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": callback_uri,
"client_id": mcp_server_client_id,
"client_secret": mcp_server_client_secret,
},
data=token_params,
)
response.raise_for_status()
token_data = response.json()
@@ -452,14 +493,22 @@ async def oauth_callback_nextcloud(request: Request):
id_token = token_data.get("id_token")
# Decode ID token to get user info
logger.info("=" * 60)
logger.info("oauth_callback_nextcloud: Extracting user_id from ID token")
logger.info("=" * 60)
try:
userinfo = jwt.decode(id_token, options={"verify_signature": False})
user_id = userinfo.get("sub")
username = userinfo.get("preferred_username") or userinfo.get("email")
logger.info(" ✓ ID token decode SUCCESSFUL")
logger.info(f" Extracted user_id: {user_id}")
logger.info(f" Username: {username}")
logger.info(f" ID token payload keys: {list(userinfo.keys())}")
logger.info(f"Flow 2: User {username} provisioned resource access")
except Exception as e:
logger.warning(f"Failed to decode ID token: {e}")
logger.error(f" ✗ ID token decode FAILED: {type(e).__name__}: {e}")
user_id = "unknown"
logger.error(f" Using fallback user_id: {user_id}")
# Store master refresh token for Flow 2
if refresh_token:
@@ -468,6 +517,13 @@ async def oauth_callback_nextcloud(request: Request):
token_data.get("scope", "").split() if token_data.get("scope") else None
)
logger.info("Storing refresh token:")
logger.info(f" user_id: {user_id}")
logger.info(" flow_type: flow2")
logger.info(" token_audience: nextcloud")
logger.info(f" provisioning_client_id: {state[:16]}...")
logger.info(f" scopes: {granted_scopes}")
await storage.store_refresh_token(
user_id=user_id,
refresh_token=refresh_token,
@@ -477,7 +533,8 @@ async def oauth_callback_nextcloud(request: Request):
scopes=granted_scopes,
expires_at=None, # Refresh tokens typically don't expire
)
logger.info(f"Stored Flow 2 master refresh token for user {user_id}")
logger.info(f"Stored Flow 2 master refresh token for user {user_id}")
logger.info("=" * 60)
# Return success HTML page
success_html = """
@@ -502,3 +559,82 @@ async def oauth_callback_nextcloud(request: Request):
from starlette.responses import HTMLResponse
return HTMLResponse(content=success_html, status_code=200)
async def oauth_callback(request: Request):
"""
Unified OAuth callback endpoint supporting multiple flows.
This endpoint consolidates all OAuth callback handling into a single URL.
The flow type is determined by looking up the OAuth session using the
state parameter.
This simplifies IdP configuration by requiring only one callback URL
to be registered: /oauth/callback
Query parameters:
code: Authorization code from IdP
state: CSRF protection state (also used to lookup flow type)
error: Error code (if authorization failed)
Returns:
Response from the appropriate flow handler
"""
# Get state parameter to lookup OAuth session
state = request.query_params.get("state")
if not state:
logger.warning("Unified callback called without state parameter")
return JSONResponse(
{
"error": "invalid_request",
"error_description": "state parameter is required",
},
status_code=400,
)
# Lookup OAuth session to determine flow type
oauth_ctx = request.app.state.oauth_context
if not oauth_ctx:
logger.error("OAuth context not available")
return JSONResponse(
{
"error": "server_error",
"error_description": "OAuth not configured on server",
},
status_code=500,
)
storage = oauth_ctx["storage"]
oauth_session = await storage.get_oauth_session(state)
# Determine flow type from session, default to "browser" for backwards compatibility
flow_type = (
oauth_session.get("flow_type", "browser") if oauth_session else "browser"
)
logger.info(f"Unified callback: flow_type={flow_type} (from session lookup)")
if flow_type == "flow2":
# Flow 2: Resource Provisioning - MCP server gets delegated Nextcloud access
logger.info("Routing to Flow 2 (resource provisioning)")
return await oauth_callback_nextcloud(request)
elif flow_type == "browser":
# Browser UI Login - establish browser session for /user/page access
logger.info("Routing to browser login flow")
from nextcloud_mcp_server.auth.browser_oauth_routes import (
oauth_login_callback,
)
return await oauth_login_callback(request)
else:
# Unknown flow type
logger.warning(f"Unknown flow_type in OAuth session: {flow_type}")
return JSONResponse(
{
"error": "invalid_request",
"error_description": f"Unknown flow type: {flow_type}",
},
status_code=400,
)
@@ -1,8 +1,8 @@
"""
Provisioning decorator for ADR-004 Progressive Consent Architecture.
Provisioning decorator for ADR-004 (Offline Access Architecture).
This decorator ensures users have completed Flow 2 (Resource Provisioning)
before accessing Nextcloud resources.
before accessing Nextcloud resources when offline access is enabled.
"""
import functools
@@ -73,7 +73,7 @@ def require_provisioning(func: Callable) -> Callable:
logger.debug("Token exchange mode detected - skipping provisioning check")
return await func(*args, **kwargs)
# Progressive Consent mode (offline access) - check if user has completed Flow 2 provisioning
# Offline access mode - check if user has completed Flow 2 provisioning
# Get user_id from authorization token
user_id = None
if hasattr(ctx, "authorization") and ctx.authorization:
@@ -430,6 +430,84 @@ class RefreshTokenStorage:
logger.error(f"Failed to decrypt refresh token for user {user_id}: {e}")
return None
async def get_refresh_token_by_provisioning_client_id(
self, provisioning_client_id: str
) -> Optional[dict]:
"""
Retrieve and decrypt refresh token by provisioning_client_id (state parameter).
This is used to check if an OAuth Flow 2 login completed successfully
by looking up the refresh token using the state parameter that was generated
during the authorization request.
Args:
provisioning_client_id: OAuth state parameter from the authorization request
Returns:
Dictionary with token data or None if not found
"""
if not self._initialized:
await self.initialize()
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(
"""
SELECT user_id, encrypted_token, expires_at, flow_type, token_audience,
provisioned_at, provisioning_client_id, scopes
FROM refresh_tokens WHERE provisioning_client_id = ?
""",
(provisioning_client_id,),
) as cursor:
row = await cursor.fetchone()
if not row:
logger.debug(
f"No refresh token found for provisioning_client_id {provisioning_client_id[:16]}..."
)
return None
(
user_id,
encrypted_token,
expires_at,
flow_type,
token_audience,
provisioned_at,
prov_client_id,
scopes_json,
) = row
# Check expiration
if expires_at is not None and expires_at < time.time():
logger.warning(
f"Refresh token for provisioning_client_id {provisioning_client_id[:16]}... has expired"
)
return None
try:
decrypted_token = self.cipher.decrypt(encrypted_token).decode()
scopes = json.loads(scopes_json) if scopes_json else None
logger.debug(
f"Retrieved refresh token for provisioning_client_id {provisioning_client_id[:16]}... (user_id: {user_id})"
)
return {
"user_id": user_id,
"refresh_token": decrypted_token,
"expires_at": expires_at,
"flow_type": flow_type or "hybrid",
"token_audience": token_audience or "nextcloud",
"provisioned_at": provisioned_at,
"provisioning_client_id": prov_client_id,
"scopes": scopes,
}
except Exception as e:
logger.error(
f"Failed to decrypt refresh token for provisioning_client_id {provisioning_client_id[:16]}...: {e}"
)
return None
async def delete_refresh_token(self, user_id: str) -> bool:
"""
Delete refresh token for user.
@@ -130,13 +130,13 @@ def require_scopes(*required_scopes: str):
token_scopes = set(access_token.scopes or [])
required_scopes_set = set(required_scopes)
# Check if Progressive Consent is enabled
enable_progressive = (
os.getenv("ENABLE_PROGRESSIVE_CONSENT", "false").lower() == "true"
# Check if offline access is enabled
enable_offline_access = (
os.getenv("ENABLE_OFFLINE_ACCESS", "false").lower() == "true"
)
# In Progressive Consent mode, check if Nextcloud scopes require provisioning
if enable_progressive:
# In offline access mode, check if Nextcloud scopes require provisioning
if enable_offline_access:
# Check if any required scopes are Nextcloud-specific
nextcloud_scopes = [
s
@@ -141,9 +141,23 @@ async def _get_user_info(request: Request) -> dict[str, Any]:
try:
# Check if background access was granted (refresh token exists)
# This works for both Flow 2 (elicitation) and browser login
token_data = await storage.get_refresh_token(session_id)
background_access_granted = token_data is not None
# Build background access details
background_access_details = None
if token_data:
background_access_details = {
"flow_type": token_data.get("flow_type", "unknown"),
"provisioned_at": token_data.get("provisioned_at", "unknown"),
"provisioning_client_id": token_data.get(
"provisioning_client_id", "N/A"
),
"scopes": token_data.get("scopes", "N/A"),
"token_audience": token_data.get("token_audience", "unknown"),
}
# Retrieve cached user profile (no token operations!)
profile_data = await storage.get_user_profile(session_id)
@@ -153,6 +167,7 @@ async def _get_user_info(request: Request) -> dict[str, Any]:
"auth_mode": "oauth",
"session_id": session_id[:16] + "...", # Truncated for security
"background_access_granted": background_access_granted,
"background_access_details": background_access_details,
}
# Include cached profile if available
@@ -291,6 +306,47 @@ async def user_info_html(request: Request) -> HTMLResponse:
session_info_html = ""
if auth_mode == "oauth" and "session_id" in user_context:
session_id = user_context.get("session_id", "unknown")
background_access_granted = user_context.get("background_access_granted", False)
background_details = user_context.get("background_access_details")
# Build background access section
background_html = ""
if background_access_granted and background_details:
flow_type = background_details.get("flow_type", "unknown")
provisioned_at = background_details.get("provisioned_at", "unknown")
scopes = background_details.get("scopes", "N/A")
token_audience = background_details.get("token_audience", "unknown")
background_html = f"""
<tr>
<td><strong>Background Access</strong></td>
<td><span style="color: #4caf50; font-weight: bold;">✓ Granted</span></td>
</tr>
<tr>
<td><strong>Flow Type</strong></td>
<td>{flow_type}</td>
</tr>
<tr>
<td><strong>Provisioned At</strong></td>
<td>{provisioned_at}</td>
</tr>
<tr>
<td><strong>Token Audience</strong></td>
<td>{token_audience}</td>
</tr>
<tr>
<td><strong>Scopes</strong></td>
<td><code style="font-size: 11px;">{scopes}</code></td>
</tr>
"""
else:
background_html = """
<tr>
<td><strong>Background Access</strong></td>
<td><span style="color: #999;">Not Granted</span></td>
</tr>
"""
session_info_html = f"""
<h2>Session Information</h2>
<table>
@@ -298,9 +354,23 @@ async def user_info_html(request: Request) -> HTMLResponse:
<td><strong>Session ID</strong></td>
<td><code>{session_id}</code></td>
</tr>
{background_html}
</table>
"""
# Add revoke button if background access is granted
if background_access_granted:
revoke_url = str(request.url_for("revoke_session_endpoint"))
session_info_html += f"""
<div style="margin-top: 15px;">
<form method="post" action="{revoke_url}" onsubmit="return confirm('Are you sure you want to revoke background access? This will delete the refresh token.');">
<button type="submit" style="padding: 8px 16px; background-color: #ff9800; color: white; border: none; border-radius: 4px; cursor: pointer; font-size: 14px;">
Revoke Background Access
</button>
</form>
</div>
"""
# Build IdP profile HTML
idp_profile_html = ""
if "idp_profile" in user_context:
@@ -446,3 +516,117 @@ async def user_info_html(request: Request) -> HTMLResponse:
"""
return HTMLResponse(content=html_content)
@requires("authenticated", redirect="oauth_login")
async def revoke_session(request: Request) -> HTMLResponse:
"""Revoke background access (delete refresh token).
This endpoint allows users to revoke the refresh token that grants
background access to Nextcloud resources. The session cookie remains
valid for browser UI access, but background jobs will no longer work.
Args:
request: Starlette request object
Returns:
HTML response confirming revocation or showing error
"""
oauth_ctx = getattr(request.app.state, "oauth_context", None)
if not oauth_ctx:
return HTMLResponse(
"""
<!DOCTYPE html>
<html>
<head><title>Error</title></head>
<body>
<h1>Error</h1>
<p>OAuth mode not enabled</p>
</body>
</html>
""",
status_code=400,
)
storage = oauth_ctx.get("storage")
session_id = request.cookies.get("mcp_session")
if not storage or not session_id:
return HTMLResponse(
"""
<!DOCTYPE html>
<html>
<head><title>Error</title></head>
<body>
<h1>Error</h1>
<p>Session not found</p>
</body>
</html>
""",
status_code=400,
)
try:
# Delete the refresh token
logger.info(f"Revoking background access for session {session_id[:16]}...")
await storage.delete_refresh_token(session_id)
logger.info(f"✓ Background access revoked for session {session_id[:16]}...")
# Redirect back to user page
user_page_url = str(request.url_for("user_info_html"))
return HTMLResponse(
f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="refresh" content="2;url={user_page_url}">
<title>Background Access Revoked</title>
<style>
body {{
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
max-width: 600px;
margin: 50px auto;
padding: 20px;
text-align: center;
}}
.success {{
background-color: #e8f5e9;
border: 2px solid #4caf50;
padding: 30px;
border-radius: 8px;
}}
h1 {{
color: #4caf50;
}}
</style>
</head>
<body>
<div class="success">
<h1>✓ Background Access Revoked</h1>
<p>Your refresh token has been deleted successfully.</p>
<p>Browser session remains active.</p>
<p>Redirecting back to user page...</p>
</div>
</body>
</html>
"""
)
except Exception as e:
logger.error(f"Failed to revoke background access: {e}")
return HTMLResponse(
f"""
<!DOCTYPE html>
<html>
<head><title>Error</title></head>
<body>
<h1>Error</h1>
<p>Failed to revoke background access: {e}</p>
</body>
</html>
""",
status_code=500,
)
+336 -37
View File
@@ -11,16 +11,88 @@ import secrets
from typing import Optional
from urllib.parse import urlencode
import httpx
from mcp.server.auth.middleware.auth_context import get_access_token
from mcp.server.auth.provider import AccessToken
from mcp.server.fastmcp import Context
from pydantic import BaseModel, Field
from nextcloud_mcp_server.auth import require_scopes
from nextcloud_mcp_server.auth.refresh_token_storage import RefreshTokenStorage
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
from nextcloud_mcp_server.auth.userinfo_routes import _query_idp_userinfo
logger = logging.getLogger(__name__)
async def extract_user_id_from_token(ctx: Context) -> str:
"""Extract user_id from the MCP access token (Flow 1).
Handles both JWT and opaque tokens:
- JWT: Decode and extract 'sub' claim
- Opaque: Call userinfo endpoint to get 'sub'
Args:
ctx: MCP context with access token
Returns:
user_id extracted from token, or "default_user" as fallback
"""
# Use MCP SDK's get_access_token() which uses contextvars
access_token: AccessToken | None = get_access_token()
if not access_token or not access_token.token:
logger.warning(" ✗ No access token found via get_access_token()")
return "default_user"
token = access_token.token
is_jwt = "." in token and token.count(".") >= 2
logger.info(f" Token type: {'JWT' if is_jwt else 'Opaque'}")
# Try JWT decode first
if is_jwt:
try:
import jwt
payload = jwt.decode(token, options={"verify_signature": False})
user_id = payload.get("sub", "unknown")
logger.info(f" ✓ JWT decode successful: user_id={user_id}")
return user_id
except Exception as e:
logger.error(f" ✗ JWT decode failed: {type(e).__name__}: {e}")
# Opaque token - call userinfo endpoint
logger.info(" Opaque token detected, calling userinfo endpoint...")
try:
# Get userinfo endpoint from OIDC discovery
oidc_discovery_uri = os.getenv(
"OIDC_DISCOVERY_URI",
"http://localhost:8080/.well-known/openid-configuration",
)
async with httpx.AsyncClient() as http_client:
discovery_response = await http_client.get(oidc_discovery_uri)
discovery_response.raise_for_status()
discovery = discovery_response.json()
userinfo_endpoint = discovery.get("userinfo_endpoint")
if userinfo_endpoint:
userinfo = await _query_idp_userinfo(token, userinfo_endpoint)
if userinfo:
user_id = userinfo.get("sub", "unknown")
logger.info(f" ✓ Userinfo query successful: user_id={user_id}")
return user_id
else:
logger.error(" ✗ Userinfo query failed")
else:
logger.error(" ✗ No userinfo_endpoint available")
except Exception as e:
logger.error(f" ✗ Userinfo query failed: {type(e).__name__}: {e}")
# Fallback
logger.warning(" Using fallback user_id: default_user")
return "default_user"
class ProvisioningStatus(BaseModel):
"""Status of Nextcloud provisioning for a user."""
@@ -57,6 +129,15 @@ class RevocationResult(BaseModel):
message: str = Field(description="Status message for the user")
class LoginConfirmation(BaseModel):
"""Schema for login confirmation elicitation."""
acknowledged: bool = Field(
default=False,
description="Check this box after completing login at the provided URL",
)
async def get_provisioning_status(ctx: Context, user_id: str) -> ProvisioningStatus:
"""
Check the provisioning status for Nextcloud access.
@@ -71,14 +152,28 @@ async def get_provisioning_status(ctx: Context, user_id: str) -> ProvisioningSta
Returns:
ProvisioningStatus with current provisioning state
"""
logger.info(
f" get_provisioning_status: Looking up refresh token for user_id={user_id}"
)
storage = RefreshTokenStorage.from_env()
await storage.initialize()
token_data = await storage.get_refresh_token(user_id)
if not token_data:
logger.info(
f" get_provisioning_status: ✗ No refresh token found for user_id={user_id}"
)
return ProvisioningStatus(is_provisioned=False)
logger.info(
f" get_provisioning_status: ✓ Refresh token FOUND for user_id={user_id}"
)
logger.info(f" flow_type: {token_data.get('flow_type')}")
logger.info(
f" provisioning_client_id: {token_data.get('provisioning_client_id', 'N/A')}"
)
# Convert timestamp to ISO format if present
provisioned_at_str = None
if token_data.get("provisioned_at"):
@@ -106,36 +201,33 @@ def generate_oauth_url_for_flow2(
"""
Generate OAuth authorization URL for Flow 2 (Resource Provisioning).
This creates the URL that the MCP server uses to get delegated
access to Nextcloud on behalf of the user.
This returns the MCP server's Flow 2 authorization endpoint, which will:
1. Generate PKCE parameters (required by Nextcloud OIDC)
2. Store code_verifier in session
3. Redirect to Nextcloud IdP with PKCE
4. Handle the callback with code_verifier for token exchange
Args:
oidc_discovery_url: OIDC provider discovery URL
server_client_id: MCP server's OAuth client ID
redirect_uri: Callback URL for the MCP server
oidc_discovery_url: OIDC provider discovery URL (unused, kept for compatibility)
server_client_id: MCP server's OAuth client ID (unused, kept for compatibility)
redirect_uri: Callback URL for the MCP server (unused, kept for compatibility)
state: CSRF protection state
scopes: List of scopes to request
scopes: List of scopes to request (unused, kept for compatibility)
Returns:
Complete authorization URL for Flow 2
MCP server's Flow 2 authorization URL with state parameter
"""
# Extract base URL from discovery URL
# Format: https://example.com/.well-known/openid-configuration
# We need: https://example.com/apps/oidc/authorize
base_url = oidc_discovery_url.replace("/.well-known/openid-configuration", "")
auth_endpoint = f"{base_url}/apps/oidc/authorize"
# Use the MCP server's Flow 2 endpoint which handles PKCE internally
# This endpoint will:
# - Generate code_verifier and code_challenge (PKCE)
# - Store code_verifier in session storage
# - Redirect to Nextcloud with PKCE parameters
# - Handle the callback with proper code_verifier
mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000")
auth_endpoint = f"{mcp_server_url}/oauth/authorize-nextcloud"
# Build OAuth parameters
params = {
"response_type": "code",
"client_id": server_client_id,
"redirect_uri": redirect_uri,
"scope": " ".join(scopes),
"state": state,
# Request offline access for background operations
"access_type": "offline",
"prompt": "consent", # Force consent screen to show scopes
}
# Only pass state parameter - the endpoint handles everything else
params = {"state": state}
return f"{auth_endpoint}?{urlencode(params)}"
@@ -190,27 +282,33 @@ async def provision_nextcloud_access(
)
# Get configuration
enable_progressive = (
os.getenv("ENABLE_PROGRESSIVE_CONSENT", "false").lower() == "true"
enable_offline_access = (
os.getenv("ENABLE_OFFLINE_ACCESS", "false").lower() == "true"
)
if not enable_progressive:
if not enable_offline_access:
return ProvisioningResult(
success=False,
message=(
"Progressive Consent is not enabled. "
"Set ENABLE_PROGRESSIVE_CONSENT=true to use this feature."
"Offline access is not enabled. "
"Set ENABLE_OFFLINE_ACCESS=true to use this feature."
),
)
# Get MCP server's OAuth client credentials
# Try environment variable first, then fall back to DCR client_id
server_client_id = os.getenv("MCP_SERVER_CLIENT_ID")
if not server_client_id:
# In production, would use Dynamic Client Registration here
# Try to get from lifespan context (DCR)
lifespan_ctx = ctx.request_context.lifespan_context
if hasattr(lifespan_ctx, "server_client_id"):
server_client_id = lifespan_ctx.server_client_id
if not server_client_id:
return ProvisioningResult(
success=False,
message=(
"MCP server OAuth client not configured. "
"Administrator must set MCP_SERVER_CLIENT_ID."
"Set MCP_SERVER_CLIENT_ID environment variable or use Dynamic Client Registration."
),
)
@@ -229,7 +327,7 @@ async def provision_nextcloud_access(
# Create OAuth session for Flow 2
session_id = f"flow2_{user_id}_{secrets.token_hex(8)}"
redirect_uri = f"{os.getenv('NEXTCLOUD_MCP_SERVER_URL', 'http://localhost:8000')}/oauth/callback-nextcloud"
redirect_uri = f"{os.getenv('NEXTCLOUD_MCP_SERVER_URL', 'http://localhost:8000')}/oauth/callback"
await storage.store_oauth_session(
session_id=session_id,
@@ -301,13 +399,11 @@ async def revoke_nextcloud_access(
RevocationResult with status
"""
try:
# Get user ID from context if not provided
# Get user ID from token if not provided
if not user_id:
user_id = (
ctx.context.get("user_id", "default_user") # type: ignore
if hasattr(ctx, "context")
else "default_user"
)
logger.info("Extracting user_id from access token for revoke...")
user_id = await extract_user_id_from_token(ctx)
logger.info(f" Revoke using user_id: {user_id}")
# Check current status
status = await get_provisioning_status(ctx, user_id)
@@ -390,6 +486,198 @@ async def check_provisioning_status(
return await get_provisioning_status(ctx, user_id)
async def check_logged_in(ctx: Context, user_id: Optional[str] = None) -> str:
"""
MCP Tool: Check if user is logged in and elicit login if needed.
This tool checks whether the user has completed Flow 2 (resource provisioning)
to grant offline access to Nextcloud. If not logged in, it uses MCP elicitation
to prompt the user to complete the login flow.
Args:
ctx: MCP context with user's Flow 1 token
user_id: Optional user identifier (extracted from token if not provided)
Returns:
"yes" if logged in, or elicitation prompting for login
"""
try:
# Extract user ID from the MCP access token (Flow 1 token)
logger.info("=" * 60)
logger.info("check_logged_in: Starting user_id extraction")
logger.info("=" * 60)
if not user_id:
user_id = await extract_user_id_from_token(ctx)
logger.info(f" Final user_id for check_logged_in: {user_id}")
else:
logger.info(f" user_id provided as argument: {user_id}")
# Check if already logged in
logger.info(f"Checking provisioning status for user_id: {user_id}")
status = await get_provisioning_status(ctx, user_id)
logger.info(f" Provisioning status: is_provisioned={status.is_provisioned}")
if status.is_provisioned:
logger.info(f"✓ User {user_id} is already logged in - returning 'yes'")
logger.info("=" * 60)
return "yes"
logger.info(f"✗ User {user_id} is NOT logged in - triggering elicitation")
logger.info("=" * 60)
# Not logged in - generate OAuth URL for Flow 2
enable_offline_access = (
os.getenv("ENABLE_OFFLINE_ACCESS", "false").lower() == "true"
)
if not enable_offline_access:
return (
"Not logged in. Offline access is not enabled. "
"Set ENABLE_OFFLINE_ACCESS=true to use this feature."
)
# Get MCP server's OAuth client credentials
# Try environment variable first, then fall back to DCR client_id
server_client_id = os.getenv("MCP_SERVER_CLIENT_ID")
if not server_client_id:
# Try to get from lifespan context (DCR)
lifespan_ctx = ctx.request_context.lifespan_context
if hasattr(lifespan_ctx, "server_client_id"):
server_client_id = lifespan_ctx.server_client_id
if not server_client_id:
return (
"Not logged in. MCP server OAuth client not configured. "
"Set MCP_SERVER_CLIENT_ID environment variable or use Dynamic Client Registration."
)
# Generate OAuth URL for Flow 2
oidc_discovery_url = os.getenv(
"OIDC_DISCOVERY_URL",
f"{os.getenv('NEXTCLOUD_HOST')}/.well-known/openid-configuration",
)
# Generate secure state for CSRF protection
state = secrets.token_urlsafe(32)
# Store state in session for validation on callback
storage = RefreshTokenStorage.from_env()
await storage.initialize()
# Create OAuth session for Flow 2
session_id = f"flow2_{user_id}_{secrets.token_hex(8)}"
redirect_uri = f"{os.getenv('NEXTCLOUD_MCP_SERVER_URL', 'http://localhost:8000')}/oauth/callback"
await storage.store_oauth_session(
session_id=session_id,
client_redirect_uri="", # No client redirect for Flow 2
state=state,
flow_type="flow2",
is_provisioning=True,
ttl_seconds=600, # 10 minute TTL
)
# Define scopes for Nextcloud access
scopes = [
"openid",
"profile",
"email",
"offline_access", # Critical for background operations
"notes:read",
"notes:write",
"calendar:read",
"calendar:write",
"contacts:read",
"contacts:write",
"files:read",
"files:write",
]
# Generate authorization URL
auth_url = generate_oauth_url_for_flow2(
oidc_discovery_url=oidc_discovery_url,
server_client_id=server_client_id,
redirect_uri=redirect_uri,
state=state,
scopes=scopes,
)
# Use elicitation to prompt user to login
logger.info(f"Eliciting login for user {user_id} with URL: {auth_url}")
result = await ctx.elicit(
message=f"Please log in to Nextcloud at the following URL:\n\n{auth_url}\n\nAfter completing the login, check the box below and click OK.",
schema=LoginConfirmation,
)
if result.action == "accept":
# Check if login was successful by looking for refresh token
# Strategy: Try multiple lookup methods to handle both flows
logger.info("User accepted login prompt, checking for refresh token")
logger.info(f" State parameter: {state[:16]}...")
logger.info(f" User ID: {user_id}")
# First, try to find token by provisioning_client_id (Flow 2 from elicitation)
refresh_token_data = (
await storage.get_refresh_token_by_provisioning_client_id(state)
)
if refresh_token_data:
logger.info("✓ Refresh token found via provisioning_client_id lookup")
logger.info(
f" Flow type: {refresh_token_data.get('flow_type', 'unknown')}"
)
logger.info(
f" Provisioned at: {refresh_token_data.get('provisioned_at', 'unknown')}"
)
return "yes"
# Fallback: Try to find token by user_id (browser login or any other flow)
logger.info(f"✗ No token found with provisioning_client_id={state[:16]}...")
logger.info(f" Trying fallback lookup by user_id: {user_id}")
refresh_token_data = await storage.get_refresh_token(user_id)
if refresh_token_data:
logger.info("✓ Refresh token found via user_id lookup")
logger.info(
f" Flow type: {refresh_token_data.get('flow_type', 'unknown')}"
)
logger.info(
f" Provisioned at: {refresh_token_data.get('provisioned_at', 'unknown')}"
)
logger.info(
f" Provisioning client ID: {refresh_token_data.get('provisioning_client_id', 'NULL')}"
)
logger.info(
" Note: This token was created via browser login or different flow"
)
return "yes"
# No token found by either method
logger.warning(f"✗ No refresh token found for user {user_id}")
logger.warning(
f" Checked provisioning_client_id={state[:16]}... - NOT FOUND"
)
logger.warning(f" Checked user_id={user_id} - NOT FOUND")
logger.warning(
" This may indicate the user completed login but token wasn't stored"
)
return (
"Login not detected. Please ensure you completed the login "
"at the provided URL before clicking OK."
)
elif result.action == "decline":
return "Login declined by user."
else:
return "Login cancelled by user."
except Exception as e:
logger.error(f"Failed to check login status: {e}")
return f"Error checking login status: {str(e)}"
# Register MCP tools
def register_oauth_tools(mcp):
"""Register OAuth and provisioning tools with the MCP server."""
@@ -428,3 +716,14 @@ def register_oauth_tools(mcp):
ctx: Context, user_id: Optional[str] = None
) -> ProvisioningStatus:
return await check_provisioning_status(ctx, user_id)
@mcp.tool(
name="check_logged_in",
description=(
"Check if you are logged in to Nextcloud. "
"If not logged in, this tool will prompt you to complete the login flow."
),
)
@require_scopes("openid")
async def tool_check_logged_in(ctx: Context, user_id: Optional[str] = None) -> str:
return await check_logged_in(ctx, user_id)
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "nextcloud-mcp-server"
version = "0.25.0"
version = "0.26.0"
description = "Model Context Protocol (MCP) server for Nextcloud integration - enables AI assistants to interact with Nextcloud data"
authors = [
{name = "Chris Coutinho", email = "chris@coutinho.io"}
+165 -1
View File
@@ -8,7 +8,9 @@ import httpx
import pytest
from httpx import HTTPStatusError
from mcp import ClientSession
from mcp.client.session import RequestContext
from mcp.client.streamable_http import streamablehttp_client
from mcp.types import ElicitRequestParams, ElicitResult, ErrorData
from nextcloud_mcp_server.client import NextcloudClient
@@ -110,6 +112,7 @@ async def create_mcp_client_session(
url: str,
token: str | None = None,
client_name: str = "MCP",
elicitation_callback: Any = None,
) -> AsyncGenerator[ClientSession, Any]:
"""
Factory function to create an MCP client session with proper lifecycle management.
@@ -127,6 +130,8 @@ async def create_mcp_client_session(
url: MCP server URL (e.g., "http://localhost:8000/mcp")
token: Optional OAuth access token for Bearer authentication
client_name: Client name for logging (e.g., "OAuth MCP (Playwright)")
elicitation_callback: Optional callback for handling elicitation requests.
Should match signature: async def callback(context: RequestContext, params: ElicitRequestParams) -> ElicitResult | ErrorData
Yields:
Initialized MCP ClientSession
@@ -149,7 +154,9 @@ async def create_mcp_client_session(
write_stream,
_,
):
async with ClientSession(read_stream, write_stream) as session:
async with ClientSession(
read_stream, write_stream, elicitation_callback=elicitation_callback
) as session:
await session.initialize()
logger.info(f"{client_name} client session initialized successfully")
yield session
@@ -251,6 +258,163 @@ async def nc_mcp_oauth_jwt_client(
yield session
@pytest.fixture
async def nc_mcp_oauth_client_with_elicitation(
anyio_backend,
playwright_oauth_token: str,
browser,
) -> AsyncGenerator[ClientSession, Any]:
"""
Fixture to create an MCP client session with elicitation callback support.
This fixture enables REAL elicitation testing by providing a callback that:
1. Extracts OAuth URL from elicitation message
2. Uses Playwright to complete OAuth flow automatically
3. Returns acceptance to confirm completion
This allows testing the complete login elicitation flow (ADR-006) end-to-end,
verifying that:
- The check_logged_in tool triggers elicitation for unauthenticated users
- The OAuth flow completes successfully via automated browser
- Refresh token is stored after OAuth completion
- The tool returns "yes" after successful login
Uses function scope to allow each test to have independent elicitation state.
"""
# Get credentials from environment
username = os.getenv("NEXTCLOUD_USERNAME")
password = os.getenv("NEXTCLOUD_PASSWORD")
if not all([username, password]):
pytest.skip(
"Elicitation test requires NEXTCLOUD_USERNAME and NEXTCLOUD_PASSWORD"
)
# Track whether elicitation was triggered (for test validation)
elicitation_triggered = {"count": 0}
async def elicitation_callback(
context: RequestContext[ClientSession, Any],
params: ElicitRequestParams,
) -> ElicitResult | ErrorData:
"""Handle elicitation by completing OAuth flow with Playwright."""
elicitation_triggered["count"] += 1
logger.info("🎯 Elicitation callback invoked!")
logger.info(f" Message: {params.message[:100]}...")
logger.info(f" Schema: {params.schema}")
# Extract OAuth URL from elicitation message
import re
url_pattern = r"https?://[^\s]+"
urls = re.findall(url_pattern, params.message)
if not urls:
error_msg = "No URL found in elicitation message"
logger.error(f"{error_msg}")
return ErrorData(code=-32602, message=error_msg)
oauth_url = urls[0]
logger.info(f" Extracted URL: {oauth_url}")
# Complete OAuth flow with Playwright
page = await browser.new_page()
try:
logger.info("🌐 Navigating to OAuth URL...")
await page.goto(oauth_url, timeout=60000)
current_url = page.url
logger.info(f" Current URL after navigation: {current_url}")
# Handle login form if present
if "/login" in current_url or "/index.php/login" in current_url:
logger.info("🔐 Login page detected, filling credentials...")
await page.wait_for_selector('input[name="user"]', timeout=10000)
await page.fill('input[name="user"]', username)
await page.fill('input[name="password"]', password)
await page.click('button[type="submit"]')
await page.wait_for_load_state("networkidle", timeout=60000)
logger.info(" ✓ Login completed")
# Handle consent screen if present
try:
logger.info(f" Current URL before consent: {page.url}")
consent_handled = await _handle_oauth_consent_screen(page, username)
if consent_handled:
logger.info(" ✓ Consent granted")
else:
logger.warning(" ⚠ No consent screen detected")
# Take screenshot for debugging
screenshot_path = f"/tmp/elicitation_no_consent_{uuid.uuid4()}.png"
await page.screenshot(path=screenshot_path)
logger.info(f" Screenshot saved: {screenshot_path}")
# Log page title for debugging
page_title = await page.title()
logger.info(f" Page title: {page_title}")
except Exception as e:
logger.warning(f" ⚠ Consent screen handling failed: {e}")
# Take screenshot for debugging
screenshot_path = f"/tmp/elicitation_consent_error_{uuid.uuid4()}.png"
await page.screenshot(path=screenshot_path)
logger.info(f" Screenshot saved: {screenshot_path}")
# Wait for OAuth callback URL to be reached
# The MCP server's callback endpoint will handle token exchange
logger.info("⏳ Waiting for OAuth callback to complete...")
# Wait for URL to contain /oauth/callback or a success page
# Give it up to 30 seconds for the redirect and token exchange
for _ in range(60): # 60 * 0.5s = 30s max wait
await anyio.sleep(0.5)
current_url = page.url
if "/oauth/callback" in current_url or "/user" in current_url:
logger.info(f" ✓ Callback URL reached: {current_url}")
break
else:
logger.warning(
f" ⚠ Timeout waiting for callback, final URL: {page.url}"
)
# Wait a bit more to ensure the server processed the callback
await anyio.sleep(2)
final_url = page.url
logger.info(f" Final URL: {final_url}")
# Return success - user "accepted" the elicitation
logger.info("✅ OAuth flow completed, returning accept")
return ElicitResult(action="accept", content={"acknowledged": True})
except Exception as e:
logger.error(f"❌ Elicitation OAuth flow failed: {e}")
# Take screenshot for debugging
try:
screenshot_path = f"/tmp/elicitation_oauth_failure_{uuid.uuid4()}.png"
await page.screenshot(path=screenshot_path)
logger.error(f" Screenshot saved: {screenshot_path}")
except Exception:
pass
return ErrorData(
code=-32603, message=f"Failed to complete OAuth flow: {str(e)}"
)
finally:
await page.close()
# Create client session with elicitation callback
async for session in create_mcp_client_session(
url="http://localhost:8001/mcp",
token=playwright_oauth_token,
client_name="OAuth MCP with Elicitation",
elicitation_callback=elicitation_callback,
):
# Attach elicitation metadata for test validation
session.elicitation_triggered = elicitation_triggered
yield session
@pytest.fixture(scope="session")
async def nc_mcp_oauth_client_read_only(
anyio_backend,
@@ -0,0 +1,206 @@
"""Integration tests for login elicitation with real MCP client callback support.
These tests verify the complete end-to-end login elicitation flow (ADR-006)
using the python-sdk MCP client with actual elicitation callback implementation.
Unlike test_login_elicitation.py which validates response formats, these tests
exercise the REAL elicitation protocol:
1. MCP client with elicitation callback connects to server
2. Tool triggers elicitation (ctx.elicit())
3. Client callback receives elicitation request
4. Callback completes OAuth flow via Playwright automation
5. Client returns acceptance
6. Tool proceeds with authenticated operation
This validates that:
- python-sdk MCP client can handle elicitation requests
- OAuth flow completion via callback works end-to-end
- Refresh tokens are properly stored after elicitation
- check_logged_in returns "yes" after successful OAuth
"""
import logging
import pytest
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
async def revoke_refresh_tokens(client):
"""Helper to revoke all refresh tokens from MCP server.
This forces check_logged_in to trigger elicitation by removing
any existing refresh tokens via the revoke_nextcloud_access tool.
"""
logger.info("Revoking refresh tokens via revoke_nextcloud_access tool...")
result = await client.call_tool("revoke_nextcloud_access", arguments={})
logger.info(f"Revoke result: isError={result.isError}")
if not result.isError:
logger.info(f"✓ Revoke response: {result.content[0].text}")
else:
logger.warning(f"Revoke failed: {result.content}")
async def test_check_logged_in_with_real_elicitation_callback(
nc_mcp_oauth_client_with_elicitation,
):
"""Test check_logged_in with actual elicitation callback that completes OAuth.
This test validates the COMPLETE elicitation flow:
1. Call check_logged_in tool (which triggers elicitation)
2. Elicitation callback extracts OAuth URL
3. Playwright automation completes OAuth flow
4. Callback returns acceptance
5. Tool returns "yes" (logged in)
6. Refresh token is stored
This is the ONLY test that exercises the real MCP elicitation protocol
with python-sdk's ClientSession elicitation callback support.
"""
client = nc_mcp_oauth_client_with_elicitation
logger.info("=" * 80)
logger.info("TEST: Real elicitation callback with OAuth completion")
logger.info("=" * 80)
# Revoke refresh tokens to force elicitation
await revoke_refresh_tokens(client)
# Call check_logged_in - this should trigger elicitation
logger.info("Calling check_logged_in tool...")
result = await client.call_tool("check_logged_in", arguments={})
logger.info("Tool execution completed")
logger.info(f" Is error: {result.isError}")
if result.content:
response_text = result.content[0].text
logger.info(f" Response: {response_text}")
else:
logger.warning(" No content in response")
# Validate tool execution succeeded
assert result.isError is False, f"Tool execution failed: {result.content}"
assert result.content is not None, "No content in tool response"
response_text = result.content[0].text.lower()
# Validate elicitation was triggered
elicitation_count = client.elicitation_triggered["count"]
logger.info(f"✓ Elicitation triggered {elicitation_count} time(s)")
assert elicitation_count >= 1, (
"Elicitation callback should have been invoked at least once"
)
# Validate OAuth completed successfully and tool returned "yes"
assert "yes" in response_text, (
f"Expected 'yes' after successful OAuth via elicitation, got: {response_text}"
)
logger.info("✅ Test passed: Real elicitation callback completed OAuth flow")
logger.info("=" * 80)
async def test_elicitation_callback_url_extraction(
nc_mcp_oauth_client_with_elicitation,
):
"""Test that elicitation callback correctly extracts OAuth URL.
This validates the URL extraction logic in the callback by examining
the elicitation message format returned by check_logged_in.
"""
client = nc_mcp_oauth_client_with_elicitation
logger.info("Testing OAuth URL extraction from elicitation message...")
# Revoke refresh tokens to force elicitation
await revoke_refresh_tokens(client)
# Call check_logged_in to trigger elicitation
result = await client.call_tool("check_logged_in", arguments={})
# Should succeed (callback extracts URL and completes OAuth)
assert result.isError is False
assert "yes" in result.content[0].text.lower()
# Elicitation should have been triggered
assert client.elicitation_triggered["count"] >= 1
logger.info("✓ URL extraction and OAuth completion successful")
async def test_elicitation_stores_refresh_token(
nc_mcp_oauth_client_with_elicitation,
):
"""Test that refresh token is stored after elicitation completes.
Validates that after successful OAuth via elicitation:
1. check_logged_in returns "yes"
2. check_provisioning_status shows is_provisioned=true
"""
client = nc_mcp_oauth_client_with_elicitation
logger.info("Testing refresh token storage after elicitation...")
# Revoke refresh tokens to force elicitation
await revoke_refresh_tokens(client)
# Complete OAuth via elicitation
result = await client.call_tool("check_logged_in", arguments={})
assert result.isError is False
assert "yes" in result.content[0].text.lower()
# Verify refresh token was stored
logger.info("Checking provisioning status...")
status_result = await client.call_tool("check_provisioning_status", arguments={})
assert status_result.isError is False
status_text = status_result.content[0].text.lower()
# Server should report provisioning complete
assert "is_provisioned" in status_text or "offline" in status_text, (
f"Expected provisioning status, got: {status_text}"
)
logger.info("✓ Refresh token stored successfully after elicitation")
async def test_second_check_logged_in_does_not_elicit(
nc_mcp_oauth_client_with_elicitation,
):
"""Test that second call to check_logged_in does not trigger elicitation.
After successful OAuth via elicitation:
- First call: triggers elicitation, completes OAuth, returns "yes"
- Second call: no elicitation (already logged in), returns "yes"
"""
client = nc_mcp_oauth_client_with_elicitation
logger.info("Testing that already-logged-in users don't get elicited...")
# First call: triggers elicitation
result1 = await client.call_tool("check_logged_in", arguments={})
assert result1.isError is False
assert "yes" in result1.content[0].text.lower()
elicitation_count_after_first = client.elicitation_triggered["count"]
logger.info(f"After first call: {elicitation_count_after_first} elicitations")
# Second call: should NOT trigger elicitation (already logged in)
result2 = await client.call_tool("check_logged_in", arguments={})
assert result2.isError is False
assert "yes" in result2.content[0].text.lower()
elicitation_count_after_second = client.elicitation_triggered["count"]
logger.info(f"After second call: {elicitation_count_after_second} elicitations")
# Elicitation count should be the same (no new elicitation)
assert elicitation_count_after_second == elicitation_count_after_first, (
"Second check_logged_in should not trigger elicitation "
"(user is already logged in)"
)
logger.info("✓ Already-logged-in users don't get redundant elicitations")
@@ -0,0 +1,246 @@
"""Integration tests for login elicitation flow (ADR-006 Interim Implementation).
Tests verify:
1. check_logged_in tool with elicitation for unauthenticated users
2. Elicitation contains login URL in message
3. User can complete login via OAuth
4. After login, check_logged_in returns "yes"
5. Already-authenticated users get immediate "yes" response
6. Elicitation decline/cancel handling
"""
import logging
import re
import pytest
logger = logging.getLogger(__name__)
pytestmark = [pytest.mark.integration, pytest.mark.oauth]
async def test_check_logged_in_elicitation_flow(
nc_mcp_oauth_client, browser, oauth_callback_server
):
"""Test that check_logged_in elicits login for unauthenticated user.
This test validates the complete elicitation flow:
1. Call check_logged_in on authenticated client (already has refresh token)
2. Verify tool returns "yes" without elicitation
3. Extract and validate the elicitation URL format from response
4. Verify refresh token exists after successful OAuth flow
Note: Actual elicitation handling requires MCP protocol support in the test client.
This test validates the response format and token storage.
"""
# Call check_logged_in tool on authenticated client
logger.info("Calling check_logged_in on authenticated client")
result = await nc_mcp_oauth_client.call_tool("check_logged_in", arguments={})
assert result.isError is False, f"Tool execution failed: {result.content}"
assert result.content is not None
response_text = result.content[0].text
logger.info(f"check_logged_in response: {response_text}")
# Since nc_mcp_oauth_client fixture already completes OAuth during setup,
# the user should already be provisioned and we expect "yes"
# For unauthenticated users, the response would contain an elicitation URL
# Note: Test framework may return "elicitation not supported" if MCP elicitation is unavailable
assert (
"yes" in response_text.lower()
or "http" in response_text.lower()
or "elicitation not supported" in response_text.lower()
), f"Unexpected response: {response_text}"
# If response contains a URL (elicitation case), validate its format
if "http" in response_text:
url_pattern = r"https?://[^\s]+"
urls = re.findall(url_pattern, response_text)
assert len(urls) > 0, "Expected elicitation URL in response"
login_url = urls[0]
logger.info(f"Elicitation URL: {login_url}")
# Validate URL points to MCP server's Flow 2 endpoint
assert "/oauth/authorize-nextcloud" in login_url, (
f"Expected URL to point to MCP server Flow 2 endpoint, got: {login_url}"
)
# Validate URL contains state parameter
assert "state=" in login_url, "Expected state parameter in elicitation URL"
elif "elicitation not supported" in response_text.lower():
logger.info(
"✓ Test client doesn't support elicitation - this is expected in test environment"
)
async def test_check_logged_in_already_authenticated(nc_mcp_oauth_client):
"""Test that check_logged_in returns 'yes' for authenticated user.
This test verifies that if the user has already completed Flow 2
(resource provisioning), the tool immediately returns "yes" without
elicitation.
"""
logger.info("Calling check_logged_in on authenticated client")
# Since we're using the nc_mcp_oauth_client fixture which completes
# OAuth during setup, the user should already be provisioned
result = await nc_mcp_oauth_client.call_tool("check_logged_in", arguments={})
assert result.isError is False, f"Tool execution failed: {result.content}"
assert result.content is not None
response_text = result.content[0].text
logger.info(f"Response: {response_text}")
# Check for valid responses:
# - "yes" (already logged in)
# - "not enabled" (offline access not enabled)
# - "not configured" (MCP_SERVER_CLIENT_ID not set)
# - "elicitation not supported" (test environment limitation)
assert (
"yes" in response_text.lower()
or "not enabled" in response_text.lower()
or "not configured" in response_text.lower()
or "elicitation not supported" in response_text.lower()
)
async def test_check_logged_in_url_format(nc_mcp_oauth_client):
"""Test that login URL (when needed) follows correct OAuth format.
This test verifies that if the tool needs to provide a login URL,
the URL contains the correct OAuth parameters for Flow 2.
"""
# Call the tool
result = await nc_mcp_oauth_client.call_tool("check_logged_in", arguments={})
assert result.isError is False, f"Tool execution failed: {result.content}"
assert result.content is not None
response_text = result.content[0].text
logger.info(f"Response: {response_text}")
# If response contains a URL, validate it
url_pattern = r"https?://[^\s]+"
urls = re.findall(url_pattern, response_text)
if urls:
login_url = urls[0]
logger.info(f"Found login URL: {login_url}")
# Validate OAuth parameters
assert "response_type=code" in login_url
assert "client_id=" in login_url
assert "redirect_uri=" in login_url
assert "scope=" in login_url
assert "state=" in login_url
assert "openid" in login_url # Should request openid scope
# Validate callback URL (unified endpoint without query params)
# Note: redirect_uri should be /oauth/callback (no query params)
# Flow type is determined by session lookup, not URL params
assert (
"/oauth/callback" in login_url
or "callback-nextcloud" in login_url # Legacy support
or "authorize-nextcloud" in login_url
)
async def test_check_logged_in_with_user_id(nc_mcp_oauth_client):
"""Test that check_logged_in accepts optional user_id parameter.
This verifies the tool can be called with an explicit user_id.
"""
result = await nc_mcp_oauth_client.call_tool(
"check_logged_in", arguments={"user_id": "testuser"}
)
assert result.isError is False, f"Tool execution failed: {result.content}"
assert result.content is not None
response_text = result.content[0].text
logger.info(f"Response with user_id: {response_text}")
# Should get some response (either yes or not logged in)
assert len(response_text) > 0
async def test_check_logged_in_tool_metadata(nc_mcp_oauth_client):
"""Test that check_logged_in tool has correct metadata."""
tools = await nc_mcp_oauth_client.list_tools()
assert tools is not None
# Find the check_logged_in tool
check_logged_in_tool = None
for tool in tools.tools:
if tool.name == "check_logged_in":
check_logged_in_tool = tool
break
assert check_logged_in_tool is not None, "check_logged_in tool not found"
logger.info(f"Tool: {check_logged_in_tool.name}")
logger.info(f"Description: {check_logged_in_tool.description}")
# Verify description mentions login
assert "login" in check_logged_in_tool.description.lower()
# Tool should have openid scope requirement
# (This would need to be verified via tool schema if exposed)
async def test_elicitation_url_and_refresh_token_flow(nc_mcp_oauth_client):
"""Test that MCP server validates refresh tokens after OAuth completion.
This test validates the server's refresh token handling through its API:
1. Call check_provisioning_status to verify server-side token validation
2. Server responses indicate token state:
- is_provisioned=True: Server has valid refresh token
- is_provisioned=False: No token or invalid token
- Error response: Token validation failed
The test does NOT directly access refresh token storage - it relies on
the MCP server to validate tokens internally and report status via API.
"""
logger.info("Testing server-side refresh token validation via API")
# Call check_provisioning_status - the server will internally:
# 1. Check if refresh token exists for the user
# 2. Validate the refresh token is not expired
# 3. Return provisioning status
result = await nc_mcp_oauth_client.call_tool(
"check_provisioning_status", arguments={}
)
assert result.isError is False, f"Tool execution failed: {result.content}"
assert result.content is not None
response_text = result.content[0].text
logger.info(f"Provisioning status response: {response_text}")
# Parse the response to validate server's token validation
# Expected responses:
# 1. "is_provisioned: true" - server validated token successfully
# 2. "is_provisioned: false" - no token or invalid token
# 3. Error message - token validation failed
if "is_provisioned" in response_text.lower():
if "true" in response_text.lower():
logger.info("✓ Server validated refresh token: is_provisioned=True")
logger.info(" This confirms the server has a valid refresh token stored")
else:
logger.info("Server reports: is_provisioned=False (no valid token)")
elif "error" in response_text.lower():
logger.warning(
f"Server returned error during token validation: {response_text}"
)
else:
logger.info(f"Server response: {response_text}")
# The key validation: Server must return a valid response
# (not an error), proving it can check its own refresh token state
assert (
"is_provisioned" in response_text.lower() or "offline" in response_text.lower()
), f"Expected provisioning status response from server, got: {response_text}"
logger.info("✓ Server successfully validated refresh token state via API")
@@ -412,7 +412,7 @@ async def test_jwt_with_no_custom_scopes_returns_zero_tools(
tool_names = [tool.name for tool in result.tools]
logger.info(
f"JWT token with no custom scopes sees {len(tool_names)} tools (should be 3 OAuth tools)"
f"JWT token with no custom scopes sees {len(tool_names)} tools (should be 4 OAuth tools)"
)
# Only OAuth provisioning tools should be visible (they require 'openid' scope)
@@ -420,6 +420,7 @@ async def test_jwt_with_no_custom_scopes_returns_zero_tools(
"provision_nextcloud_access",
"revoke_nextcloud_access",
"check_provisioning_status",
"check_logged_in", # Login elicitation tool (ADR-006)
]
assert set(tool_names) == set(expected_oauth_tools), (
Generated
+1 -1
View File
@@ -975,7 +975,7 @@ wheels = [
[[package]]
name = "nextcloud-mcp-server"
version = "0.25.0"
version = "0.26.0"
source = { editable = "." }
dependencies = [
{ name = "aiosqlite" },