Compare commits

...

20 Commits

Author SHA1 Message Date
github-actions[bot] bfbaed9a66 bump: version 0.18.0 → 0.19.0 2025-10-23 23:50:51 +00:00
Chris Coutinho ff32149220 Merge pull request #235 from cbcoutinho/feature/opaque-introspection
Feature/opaque introspection
2025-10-24 01:50:17 +02:00
Chris Coutinho d55e5708c7 ci: fix imports 2025-10-24 01:04:30 +02:00
Chris Coutinho d4ee5a74c2 test: Update default tokens to JWT, add to introspection tests 2025-10-24 00:51:50 +02:00
Chris Coutinho 261749fcdc ci: Update oidc app 2025-10-23 22:45:22 +02:00
Chris Coutinho bdb0e17401 chore: Add logging to token introspection 2025-10-23 21:18:14 +02:00
Chris Coutinho 8942f3119c Merge pull request #236 from cbcoutinho/renovate/pin-dependencies
chore(deps): pin shivammathur/setup-php action to bf6b4fb
2025-10-23 18:51:05 +02:00
renovate-bot-cbcoutinho[bot] 3863cca2ed chore(deps): pin shivammathur/setup-php action to bf6b4fb 2025-10-23 16:05:50 +00:00
Chris Coutinho a93e7a1e3b build: Update submodule 2025-10-23 16:56:18 +02:00
Chris Coutinho f2d2dd8068 feat: Enable token introspection for opaque tokens 2025-10-23 15:51:27 +02:00
Chris Coutinho d915efd3f6 docs: Update jwt docs [skip ci] 2025-10-23 15:26:51 +02:00
Chris Coutinho 053cf7798b fix: Add CORS middleware to allow browser-based clients like MCP Inspector 2025-10-23 15:23:41 +02:00
github-actions[bot] 87c6f077f3 bump: version 0.17.1 → 0.18.0 2025-10-23 10:23:48 +00:00
Chris Coutinho 38e12db46a Merge pull request #233 from cbcoutinho/feature/jwt-scopes
feat: Initialize JWT-scoped tools
2025-10-23 12:23:12 +02:00
Chris Coutinho eb7e15cac0 Merge pull request #232 from cbcoutinho/renovate/docker.io-library-nextcloud-32.0.0
chore(deps): update docker.io/library/nextcloud:32.0.0 docker digest to f9bec5c
2025-10-22 06:42:22 +02:00
renovate-bot-cbcoutinho[bot] e3436fecc0 chore(deps): update docker.io/library/nextcloud:32.0.0 docker digest to f9bec5c 2025-10-22 04:06:24 +00:00
Chris Coutinho e3feb3eb2f Merge pull request #231 from cbcoutinho/renovate/ghcr.io-astral-sh-uv-0.x
chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.9.5
2025-10-22 03:59:07 +02:00
renovate-bot-cbcoutinho[bot] eedaa2e3f1 chore(deps): update ghcr.io/astral-sh/uv docker tag to v0.9.5 2025-10-21 22:09:23 +00:00
Chris Coutinho d517fe09d8 Merge pull request #230 from cbcoutinho/renovate/docker.io-library-nextcloud-32.0.0
chore(deps): update docker.io/library/nextcloud:32.0.0 docker digest to d3d8b9d
2025-10-21 23:24:50 +02:00
renovate-bot-cbcoutinho[bot] 08ebab9f48 chore(deps): update docker.io/library/nextcloud:32.0.0 docker digest to d3d8b9d 2025-10-21 16:06:08 +00:00
16 changed files with 607 additions and 48 deletions
+1 -1
View File
@@ -32,7 +32,7 @@ jobs:
###### Required to build OIDC App ######
- name: Set up php 8.4
uses: shivammathur/setup-php@v2
uses: shivammathur/setup-php@bf6b4fbd49ca58e4608c9c89fba0b8d90bd2a39f # v2
with:
php-version: 8.4
coverage: none
+26
View File
@@ -1,3 +1,29 @@
## v0.19.0 (2025-10-23)
### Feat
- Enable token introspection for opaque tokens
### Fix
- Add CORS middleware to allow browser-based clients like MCP Inspector
## v0.18.0 (2025-10-23)
### Feat
- **server**: Add support for custom OIDC scopes and permissions via JWTs
- Initialize JWT-scoped tools
### Fix
- Use occ-created OAuth clients with allowed_scopes for all tests
- Separate OAuth fixtures for opaque vs JWT tokens
### Refactor
- Update JWT client to use DCR, re-enable tool filtering
## v0.17.1 (2025-10-20)
### Fix
+1 -1
View File
@@ -1,4 +1,4 @@
FROM ghcr.io/astral-sh/uv:0.9.4-python3.11-alpine@sha256:1a51c7710eaf839fa3365329ad993b48d17ddd9ab0f0672efaa9b09f407ebf44
FROM ghcr.io/astral-sh/uv:0.9.5-python3.11-alpine@sha256:64ecec379ff82bea84b8a80c0b374f6392bcd54aa52f8c63c12f510f9c0b214d
# Install git (required for caldav dependency from git)
RUN apk add --no-cache git
@@ -33,5 +33,6 @@ fi
# Configure OIDC Identity Provider with dynamic client registration enabled
php /var/www/html/occ config:app:set oidc dynamic_client_registration --value='true'
php /var/www/html/occ config:app:set oidc proof_key_for_code_exchange --value=true --type=boolean
php /var/www/html/occ config:app:set oidc default_token_type --value='jwt'
echo "OIDC app installed and configured successfully"
+1 -1
View File
@@ -21,7 +21,7 @@ services:
restart: always
app:
image: docker.io/library/nextcloud:32.0.0@sha256:4fbd72f05b5e6b82e078542b6cb2ecf021d2f8b5045454ffa7f4e080e488b375
image: docker.io/library/nextcloud:32.0.0@sha256:f9bec5c77a8d5603354b990550a4d24487deae6e589dd20ce870e43e28460e18
restart: always
ports:
- 0.0.0.0:8080:80
+17 -15
View File
@@ -31,7 +31,7 @@ The Nextcloud MCP Server supports OAuth authentication with both **JWT** (RFC 90
-**Custom Scopes** - `nc:read` and `nc:write` for read/write access control
-**Dynamic Tool Filtering** - Tools filtered based on user's token scopes
-**Scope Challenges** - RFC-compliant `WWW-Authenticate` headers for insufficient scopes
-**Protected Resource Metadata** - RFC 8959 endpoint for scope discovery
-**Protected Resource Metadata** - RFC 9728 endpoint for scope discovery
-**Backward Compatible** - BasicAuth mode bypasses all scope checks
### Supported Scopes
@@ -169,25 +169,27 @@ async def nc_notes_create_note(title: str, content: str, ctx: Context):
### Dynamic Tool Filtering
The MCP server implements **dynamic tool filtering** - users only see tools they have permission to use:
The MCP server implements **dynamic tool filtering** - users only see tools they have permission to use. This applies to **both JWT and Bearer (opaque) tokens** in OAuth mode:
**JWT with `nc:read` only:**
**Token with `nc:read` only:**
- `list_tools()` returns 36 read-only tools
- Write tools are hidden from the tool list
**JWT with `nc:write` only:**
**Token with `nc:write` only:**
- `list_tools()` returns 54 write-only tools
- Read tools are hidden from the tool list
**JWT with both scopes:**
**Token with both scopes:**
- `list_tools()` returns all 90 tools
**JWT with no custom scopes:**
**Token with no custom scopes:**
- `list_tools()` returns 0 tools (all require `nc:read` or `nc:write`)
**BasicAuth mode:**
- `list_tools()` returns all 90 tools (no filtering)
**Note:** JWT tokens include scopes in the token payload, while Bearer tokens retrieve scopes via the introspection endpoint. Both methods provide reliable scope information for filtering.
### Scope Challenges
When a tool is called without required scopes, the server returns a `403 Forbidden` response with a `WWW-Authenticate` header:
@@ -196,21 +198,21 @@ When a tool is called without required scopes, the server returns a `403 Forbidd
HTTP/1.1 403 Forbidden
WWW-Authenticate: Bearer error="insufficient_scope",
scope="nc:write",
resource_metadata="http://server/.well-known/oauth-protected-resource"
resource_metadata="http://server/.well-known/oauth-protected-resource/mcp"
```
This enables **step-up authorization** - clients can detect missing scopes and trigger re-authentication to obtain additional permissions.
### Protected Resource Metadata (PRM)
The server implements RFC 8959's Protected Resource Metadata endpoint:
The server implements RFC 9728's Protected Resource Metadata endpoint:
**Endpoint:** `GET /.well-known/oauth-protected-resource`
**Endpoint:** `GET /.well-known/oauth-protected-resource/mcp`
**Response:**
```json
{
"resource": "http://localhost:8002",
"resource": "http://localhost:8002/mcp",
"scopes_supported": ["nc:read", "nc:write"],
"authorization_servers": ["http://localhost:8080"],
"bearer_methods_supported": ["header"],
@@ -456,16 +458,16 @@ When credentials are provided via environment variables or storage file, **DCR i
- `has_required_scopes()` - Check if user has necessary scopes
- `InsufficientScopeError` exception for WWW-Authenticate challenges
**3. Dynamic Filtering** (`nextcloud_mcp_server/app.py:433-488`)
**3. Dynamic Filtering** (`nextcloud_mcp_server/app.py:473-516`)
- Overrides FastMCP's `list_tools()` method
- Filters based on user's JWT token scopes
- Filters based on user's OAuth token scopes (JWT and Bearer)
- Only active in OAuth mode
- Bypassed in BasicAuth mode
**4. PRM Endpoint** (`nextcloud_mcp_server/app.py:503-532`)
- `GET /.well-known/oauth-protected-resource`
- `GET /.well-known/oauth-protected-resource/mcp`
- Advertises `["nc:read", "nc:write"]`
- RFC 8959 compliant
- RFC 9728 compliant
**5. Exception Handler** (`nextcloud_mcp_server/app.py:540-563`)
- Catches `InsufficientScopeError`
@@ -876,7 +878,7 @@ WARNING Missing required scopes: nc:write
- [RFC 9068: JWT Profile for OAuth 2.0 Access Tokens](https://www.rfc-editor.org/rfc/rfc9068.html)
- [RFC 7519: JSON Web Token (JWT)](https://www.rfc-editor.org/rfc/rfc7519.html)
- [RFC 7517: JSON Web Key (JWK)](https://www.rfc-editor.org/rfc/rfc7517.html)
- [RFC 8959: Protected Resource Metadata](https://www.rfc-editor.org/rfc/rfc8959.html)
- [RFC 9728: OAuth 2.0 Protected Resource Metadata](https://www.rfc-editor.org/rfc/rfc9728.html)
- [RFC 7662: OAuth 2.0 Token Introspection](https://www.rfc-editor.org/rfc/rfc7662.html)
### Related Documentation
+46 -19
View File
@@ -11,6 +11,7 @@ from mcp.server.auth.settings import AuthSettings
from mcp.server.fastmcp import Context, FastMCP
from pydantic import AnyHttpUrl
from starlette.applications import Starlette
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route
@@ -213,7 +214,7 @@ async def load_oauth_client_credentials(
nextcloud_url=nextcloud_host,
registration_endpoint=registration_endpoint,
storage_path=storage_path,
client_name="Nextcloud MCP Server",
client_name=f"Nextcloud MCP Server ({token_type})",
redirect_uris=redirect_uris,
scopes=scopes,
token_type=token_type,
@@ -474,7 +475,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
original_list_tools = mcp._tool_manager.list_tools
def list_tools_filtered():
"""List tools filtered by user's token scopes (JWT tokens only)."""
"""List tools filtered by user's token scopes (JWT and Bearer tokens)."""
# Get user's scopes from token using MCP SDK's contextvar
# This works for all request types including list_tools
user_scopes = get_access_token_scopes()
@@ -487,35 +488,36 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
# Get all tools
all_tools = original_list_tools()
# Only filter for JWT tokens (opaque tokens show all tools)
# JWT tokens have scopes embedded, so we can reliably filter
# Opaque tokens may not have accurate scope information from introspection
if is_jwt and user_scopes:
# Filter tools based on user's token scopes (both JWT and opaque tokens)
# JWT tokens have scopes embedded in payload
# Opaque tokens get scopes via introspection endpoint
# Claude Code now properly respects PRM endpoint for scope discovery
if user_scopes:
allowed_tools = [
tool
for tool in all_tools
if has_required_scopes(tool.fn, user_scopes)
]
token_type = "JWT" if is_jwt else "Bearer"
logger.info(
f"✂️ JWT scope filtering: {len(allowed_tools)}/{len(all_tools)} tools "
f"✂️ {token_type} scope filtering: {len(allowed_tools)}/{len(all_tools)} tools "
f"available for scopes: {user_scopes}"
)
else:
# Opaque token, BasicAuth mode, or no token - show all tools
# BasicAuth mode or no token - show all tools
allowed_tools = all_tools
reason = (
"opaque token (no filtering)"
if not is_jwt and user_scopes
else "no token/BasicAuth"
logger.info(
f"📋 Showing all {len(all_tools)} tools (no token/BasicAuth)"
)
logger.info(f"📋 Showing all {len(all_tools)} tools ({reason})")
# Return the Tool objects directly (they're already in the correct format)
return allowed_tools
# Replace the tool manager's list_tools method
mcp._tool_manager.list_tools = list_tools_filtered
logger.info("Dynamic tool filtering enabled for OAuth mode (JWT tokens only)")
logger.info(
"Dynamic tool filtering enabled for OAuth mode (JWT and Bearer tokens)"
)
if transport == "sse":
mcp_app = mcp.sse_app()
@@ -534,10 +536,13 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
if oauth_enabled:
def oauth_protected_resource_metadata(request):
"""RFC 8959 Protected Resource Metadata endpoint."""
"""RFC 9728 Protected Resource Metadata endpoint."""
mcp_server_url = os.getenv(
"NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000"
)
# Append /mcp to match the actual resource path (FastMCP streamable-http endpoint)
resource_url = f"{mcp_server_url}/mcp"
# Use PUBLIC_ISSUER_URL for authorization server since external clients
# (like Claude) need the publicly accessible URL, not internal Docker URLs
public_issuer_url = os.getenv("NEXTCLOUD_PUBLIC_ISSUER_URL")
@@ -547,14 +552,24 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
return JSONResponse(
{
"resource": mcp_server_url,
"scopes_supported": ["nc:read", "nc:write"],
"resource": resource_url,
"scopes_supported": ["openid", "nc:read", "nc:write"],
"authorization_servers": [public_issuer_url],
"bearer_methods_supported": ["header"],
"resource_signing_alg_values_supported": ["RS256"],
}
)
# Register PRM endpoint at both path-based and root locations per RFC 9728
# Path-based discovery: /.well-known/oauth-protected-resource{path}
routes.append(
Route(
"/.well-known/oauth-protected-resource/mcp",
oauth_protected_resource_metadata,
methods=["GET"],
)
)
# Root discovery (fallback): /.well-known/oauth-protected-resource
routes.append(
Route(
"/.well-known/oauth-protected-resource",
@@ -562,11 +577,23 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
methods=["GET"],
)
)
logger.info("Protected Resource Metadata (PRM) endpoint enabled")
logger.info(
"Protected Resource Metadata (PRM) endpoints enabled (path-based + root)"
)
routes.append(Mount("/", app=mcp_app))
app = Starlette(routes=routes, lifespan=lifespan)
# Add CORS middleware to allow browser-based clients like MCP Inspector
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins for development
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"],
)
# Add exception handler for scope challenges (OAuth mode only)
if oauth_enabled:
@@ -584,7 +611,7 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
"WWW-Authenticate": (
f'Bearer error="insufficient_scope", '
f'scope="{scope_str}", '
f'resource_metadata="{resource_url}/.well-known/oauth-protected-resource"'
f'resource_metadata="{resource_url}/.well-known/oauth-protected-resource/mcp"'
)
},
content={
+29 -5
View File
@@ -309,11 +309,18 @@ class NextcloudTokenVerifier(TokenVerifier):
)
elif response.status_code in (400, 401, 403):
logger.info(f"Token introspection failed: HTTP {response.status_code}")
logger.warning(
f"Token introspection failed: HTTP {response.status_code}. "
f"This may indicate: (1) Client credentials mismatch - trying to introspect "
f"token issued to different OAuth client, (2) Expired client credentials, "
f"(3) Invalid token. Will fall back to userinfo endpoint. "
f"Response: {response.text[:200] if response.text else 'empty'}"
)
return None
else:
logger.warning(
f"Unexpected response from introspection: {response.status_code}"
f"Unexpected response from introspection: {response.status_code}. "
f"Response: {response.text[:200] if response.text else 'empty'}"
)
return None
@@ -420,15 +427,31 @@ class NextcloudTokenVerifier(TokenVerifier):
"""
Extract scopes from userinfo response.
Since the userinfo response doesn't include the original scopes,
we infer them from the claims present in the response.
First attempts to read actual scopes from the 'scope' field (RFC 8693).
If not present, infers scopes from the claims present in the response.
Args:
userinfo: The userinfo response dictionary
Returns:
List of inferred scopes
List of scopes (actual or inferred)
"""
# Try to get actual scopes from userinfo response (if OIDC provider includes it)
scope_string = userinfo.get("scope")
if scope_string:
scopes = scope_string.split() if isinstance(scope_string, str) else []
if scopes:
logger.debug(
f"Using actual scopes from userinfo: {scopes} (scope field present)"
)
return scopes
# Fallback: Infer scopes from claims present in response
# This maintains backward compatibility with OIDC providers that don't
# include the scope field in userinfo responses
logger.debug(
"No scope field in userinfo response, inferring scopes from claims"
)
scopes = ["openid"] # Always present
if "email" in userinfo:
@@ -445,6 +468,7 @@ class NextcloudTokenVerifier(TokenVerifier):
if "groups" in userinfo:
scopes.append("groups")
logger.debug(f"Inferred scopes from userinfo claims: {scopes}")
return scopes
def clear_cache(self):
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "nextcloud-mcp-server"
version = "0.17.1"
version = "0.19.0"
description = "Model Context Protocol (MCP) server for Nextcloud integration - enables AI assistants to interact with Nextcloud data"
authors = [
{name = "Chris Coutinho", email = "chris@coutinho.io"}
View File
View File
View File
@@ -0,0 +1,479 @@
"""
Integration tests for token introspection authorization.
These tests verify that the introspection endpoint properly enforces
authorization rules:
1. Client authentication is required (401 if missing)
2. Only the token owner can introspect its own tokens
3. Only the designated resource server can introspect tokens
4. Other clients cannot introspect tokens they don't own or aren't the audience for
"""
import asyncio
import logging
import os
import secrets
# Import helpers from conftest
import time
from typing import AsyncGenerator
from urllib.parse import quote
import httpx
import pytest
# Import from the root tests/ conftest.py using relative import
from ..conftest import _handle_oauth_consent_screen
logger = logging.getLogger(__name__)
@pytest.fixture(scope="module")
def nextcloud_host() -> str:
"""Get Nextcloud host from environment."""
host = os.getenv("NEXTCLOUD_HOST", "http://localhost:8080")
return host
@pytest.fixture(scope="module")
async def oidc_endpoints(nextcloud_host: str) -> dict[str, str]:
"""Discover OIDC endpoints."""
async with httpx.AsyncClient(timeout=30.0) as client:
discovery_url = f"{nextcloud_host}/.well-known/openid-configuration"
response = await client.get(discovery_url)
response.raise_for_status()
config = response.json()
return {
"token_endpoint": config["token_endpoint"],
"authorization_endpoint": config.get("authorization_endpoint"),
"introspection_endpoint": config.get("introspection_endpoint"),
"registration_endpoint": config.get("registration_endpoint"),
}
@pytest.fixture(scope="module")
async def test_oauth_clients(
nextcloud_host: str, oidc_endpoints: dict[str, str], oauth_callback_server
) -> AsyncGenerator[dict[str, tuple[str, str]], None]:
"""
Create multiple OAuth clients for introspection testing.
Returns a dict mapping client names to (client_id, client_secret) tuples.
"""
from nextcloud_mcp_server.auth.client_registration import register_client
clients = {}
registration_endpoint = oidc_endpoints["registration_endpoint"]
# Get the correct callback URL from the oauth_callback_server fixture
auth_states, callback_url = oauth_callback_server
# Create client A (will be the token owner)
logger.info("Creating OAuth client A for introspection testing")
client_a = await register_client(
nextcloud_url=nextcloud_host,
registration_endpoint=registration_endpoint,
client_name="Introspection Test Client A",
redirect_uris=[callback_url],
scopes="openid profile email",
token_type="Bearer", # Use opaque tokens for this test
)
clients["clientA"] = (client_a.client_id, client_a.client_secret)
logger.info(f"Created client A: {client_a.client_id[:16]}...")
# Create client B (will attempt to introspect client A's tokens)
logger.info("Creating OAuth client B for introspection testing")
client_b = await register_client(
nextcloud_url=nextcloud_host,
registration_endpoint=registration_endpoint,
client_name="Introspection Test Client B",
redirect_uris=[callback_url],
scopes="openid profile email",
token_type="Bearer",
)
clients["clientB"] = (client_b.client_id, client_b.client_secret)
logger.info(f"Created client B: {client_b.client_id[:16]}...")
# Create client C (third party, should not be able to introspect)
logger.info("Creating OAuth client C for introspection testing")
client_c = await register_client(
nextcloud_url=nextcloud_host,
registration_endpoint=registration_endpoint,
client_name="Introspection Test Client C",
redirect_uris=[callback_url],
scopes="openid profile email",
token_type="Bearer",
)
clients["clientC"] = (client_c.client_id, client_c.client_secret)
logger.info(f"Created client C: {client_c.client_id[:16]}...")
yield clients
# Cleanup is handled by Nextcloud - clients will be removed when tests are done
logger.info("Test OAuth clients fixture complete")
@pytest.mark.integration
async def test_introspection_requires_client_authentication(
oidc_endpoints: dict[str, str],
):
"""
Test that the introspection endpoint requires client authentication.
Expected: 401 UNAUTHORIZED when credentials are missing or invalid.
"""
introspection_endpoint = oidc_endpoints["introspection_endpoint"]
if not introspection_endpoint:
pytest.skip("Introspection endpoint not available")
async with httpx.AsyncClient(timeout=10.0) as client:
# Test 1: No credentials
response = await client.post(
introspection_endpoint,
data={"token": "some_token"},
)
assert response.status_code == 401, "Should return 401 without credentials"
data = response.json()
assert data.get("error") == "invalid_client"
# Test 2: Invalid credentials
response = await client.post(
introspection_endpoint,
data={"token": "some_token"},
auth=("invalid_client", "invalid_secret"),
)
assert response.status_code == 401, "Should return 401 with invalid credentials"
data = response.json()
logger.info(f"Invalid client response: {data}")
# Response may be either {"error": "invalid_client"} or {"message": "..."}
# Both are acceptable as long as we get 401
assert "error" in data or "message" in data, "Should return error information"
async def _obtain_token_for_client(
browser,
oauth_callback_server,
client_id: str,
client_secret: str,
token_endpoint: str,
authorization_endpoint: str,
scope: str = "openid profile email",
resource: str | None = None,
) -> str:
"""
Helper to obtain an OAuth token using existing callback server and playwright automation.
Reuses the pattern from conftest.py's playwright_oauth_token fixture.
"""
username = os.getenv("NEXTCLOUD_USERNAME", "admin")
password = os.getenv("NEXTCLOUD_PASSWORD", "admin")
# Get callback server from fixture
auth_states, callback_url = oauth_callback_server
# Generate unique state parameter
state = secrets.token_urlsafe(32)
# Construct authorization URL
auth_url_parts = [
f"{authorization_endpoint}?",
"response_type=code&",
f"client_id={client_id}&",
f"redirect_uri={quote(callback_url, safe='')}&",
f"state={state}&",
f"scope={quote(scope, safe='')}",
]
if resource:
auth_url_parts.append(f"&resource={quote(resource, safe='')}")
auth_url = "".join(auth_url_parts)
logger.info(f"Obtaining token for client {client_id[:16]}... with scopes={scope}")
if resource:
logger.info(f" Resource parameter: {resource[:16]}...")
# Browser automation (same pattern as conftest.py)
context = await browser.new_context(ignore_https_errors=True)
page = await context.new_page()
try:
logger.debug(f"Navigating to: {auth_url[:100]}...")
await page.goto(auth_url, wait_until="networkidle", timeout=60000)
current_url = page.url
logger.debug(f"Current URL after navigation: {current_url}")
# Handle login if needed
if "/login" in current_url or "/index.php/login" in current_url:
logger.info("Login page detected, filling credentials...")
await page.wait_for_selector('input[name="user"]', timeout=10000)
await page.fill('input[name="user"]', username)
await page.fill('input[name="password"]', password)
await page.click('button[type="submit"]')
await page.wait_for_load_state("networkidle", timeout=60000)
current_url = page.url
logger.info(f"After login: {current_url}")
# Wait a bit for page to fully render after login
await asyncio.sleep(2)
current_url = page.url
logger.info(f"After waiting, current URL: {current_url}")
# Check page content for debugging
page_content = await page.content()
has_consent_div = "#oidc-consent" in page_content
logger.info(f"Page has #oidc-consent div: {has_consent_div}")
# Handle consent screen using the helper from conftest
try:
consent_handled = await _handle_oauth_consent_screen(page, username)
logger.info(f"Consent screen handled: {consent_handled}")
except Exception as e:
logger.warning(f"Error handling consent screen: {e}")
# Take screenshot for debugging
await page.screenshot(path=f"/tmp/consent_error_{state[:8]}.png")
logger.error("Consent error screenshot saved")
raise
# Wait for callback server to receive auth code
logger.info("Waiting for callback server to receive auth code...")
timeout_seconds = 30
start_time = time.time()
while state not in auth_states:
if time.time() - start_time > timeout_seconds:
screenshot_path = (
f"/tmp/oauth_introspection_test_timeout_{state[:8]}.png"
)
await page.screenshot(path=screenshot_path)
logger.error(f"Timeout! Screenshot saved to {screenshot_path}")
logger.error(f"Current URL: {page.url}")
raise TimeoutError(
f"Timeout waiting for OAuth callback (state={state[:16]}...)"
)
await asyncio.sleep(0.5)
auth_code = auth_states[state]
logger.info(f"Successfully received auth code: {auth_code[:20]}...")
finally:
await context.close()
# Exchange code for token
logger.debug("Exchanging authorization code for access token...")
async with httpx.AsyncClient(timeout=30.0) as http_client:
token_response = await http_client.post(
token_endpoint,
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": callback_url,
"client_id": client_id,
"client_secret": client_secret,
},
)
token_response.raise_for_status()
token_data = token_response.json()
access_token = token_data.get("access_token")
if not access_token:
raise ValueError(f"No access_token in response: {token_data}")
logger.info("Successfully obtained access token")
return access_token
@pytest.mark.integration
async def test_client_cannot_introspect_other_clients_tokens(
playwright_oauth_token: str,
shared_oauth_client_credentials: tuple,
test_oauth_clients: dict[str, tuple[str, str]],
oidc_endpoints: dict[str, str],
):
"""
Test that one client cannot introspect tokens owned by another client.
This test uses a pre-authorized shared OAuth client (with existing token)
and verifies that a different client cannot introspect that token.
Expected: introspection returns {active: false} to not reveal token existence.
"""
introspection_endpoint = oidc_endpoints["introspection_endpoint"]
if not introspection_endpoint:
pytest.skip("Introspection endpoint not available")
# Use the shared OAuth client's token (pre-authorized, working)
access_token = playwright_oauth_token
shared_client_id, shared_client_secret, _, _, _ = shared_oauth_client_credentials
# Get a different client to try to introspect
different_client_id, different_client_secret = test_oauth_clients["clientB"]
logger.info(
f"Testing introspection with shared client token: {access_token[:16]}..."
)
logger.info(f"Shared client ID: {shared_client_id[:16]}...")
logger.info(f"Different client ID: {different_client_id[:16]}...")
async with httpx.AsyncClient(timeout=10.0) as client:
# Test 1: The owning client (shared client) can introspect its own token
response = await client.post(
introspection_endpoint,
data={"token": access_token},
auth=(shared_client_id, shared_client_secret),
)
assert response.status_code == 200
data = response.json()
logger.info(f"Owner client introspection response: {data}")
assert data.get("active") is True, (
"Owner client should be able to introspect its own token"
)
# Test 2: A different client CANNOT introspect the shared client's token
response = await client.post(
introspection_endpoint,
data={"token": access_token},
auth=(different_client_id, different_client_secret),
)
assert response.status_code == 200
data = response.json()
logger.info(f"Different client introspection response: {data}")
assert data.get("active") is False, (
"Different client should NOT be able to introspect another client's token"
)
@pytest.mark.integration
async def test_introspection_with_resource_parameter(
browser,
oauth_callback_server,
test_oauth_clients: dict[str, tuple[str, str]],
oidc_endpoints: dict[str, str],
nextcloud_host: str,
):
"""
Test that the resource server (specified via 'resource' parameter) can introspect tokens.
This test verifies that when a token is issued with resource=clientB,
clientB can introspect it even though it's owned by clientA.
This requires obtaining a token with the 'resource' parameter set via authorization code grant.
Uses playwright automation to obtain real tokens.
"""
introspection_endpoint = oidc_endpoints["introspection_endpoint"]
if not introspection_endpoint:
pytest.skip("Introspection endpoint not available")
client_a_id, client_a_secret = test_oauth_clients["clientA"]
client_b_id, client_b_secret = test_oauth_clients["clientB"]
client_c_id, client_c_secret = test_oauth_clients["clientC"]
token_endpoint = oidc_endpoints["token_endpoint"]
authorization_endpoint = oidc_endpoints.get("authorization_endpoint")
if not authorization_endpoint:
pytest.skip("Authorization endpoint not available")
# Obtain a token for client A with resource parameter set to client B
try:
access_token = await _obtain_token_for_client(
browser=browser,
oauth_callback_server=oauth_callback_server,
client_id=client_a_id,
client_secret=client_a_secret,
token_endpoint=token_endpoint,
authorization_endpoint=authorization_endpoint,
scope="openid profile email",
resource=client_b_id, # Set client B as the resource server
)
except Exception as e:
logger.error(f"Failed to obtain token with resource parameter: {e}")
pytest.skip(f"Cannot obtain test token with resource parameter: {e}")
logger.info(
f"Obtained access token from client A with resource={client_b_id}: {access_token[:16]}..."
)
# Test introspection
async with httpx.AsyncClient(timeout=10.0) as client:
# Test 1: Client A (owner) can introspect its own token
response = await client.post(
introspection_endpoint,
data={"token": access_token},
auth=(client_a_id, client_a_secret),
)
assert response.status_code == 200
data = response.json()
logger.info(f"Client A (owner) introspection response: {data}")
assert data.get("active") is True, (
"Client A (owner) should be able to introspect its own token"
)
# Test 2: Client B (resource server) can introspect the token
response = await client.post(
introspection_endpoint,
data={"token": access_token},
auth=(client_b_id, client_b_secret),
)
assert response.status_code == 200
data = response.json()
logger.info(f"Client B (resource server) introspection response: {data}")
assert data.get("active") is True, (
"Client B (resource server) should be able to introspect token intended for it"
)
# Verify the resource field in the response matches client B
logger.info(f"Full introspection response from Client B: {data}")
# Test 3: Client C CANNOT introspect the token (not owner, not resource server)
response = await client.post(
introspection_endpoint,
data={"token": access_token},
auth=(client_c_id, client_c_secret),
)
assert response.status_code == 200
data = response.json()
logger.info(f"Client C (third party) introspection response: {data}")
assert data.get("active") is False, (
"Client C should NOT be able to introspect token (not owner or resource server)"
)
@pytest.mark.integration
async def test_introspection_returns_inactive_for_invalid_token(
test_oauth_clients: dict[str, tuple[str, str]],
oidc_endpoints: dict[str, str],
):
"""
Test that introspection returns {active: false} for invalid/unknown tokens.
This is important for security - we shouldn't reveal whether a token exists or not.
"""
introspection_endpoint = oidc_endpoints["introspection_endpoint"]
if not introspection_endpoint:
pytest.skip("Introspection endpoint not available")
client_a_id, client_a_secret = test_oauth_clients["clientA"]
async with httpx.AsyncClient(timeout=10.0) as client:
# Test with a fake token
response = await client.post(
introspection_endpoint,
data={"token": "completely_fake_token_12345"},
auth=(client_a_id, client_a_secret),
)
assert response.status_code == 200
data = response.json()
logger.info(f"Introspection response for fake token: {data}")
assert data.get("active") is False, (
"Should return active=false for invalid token"
)
# Should NOT return any other information
assert len(data) == 1, "Should only return 'active' field for invalid token"
if __name__ == "__main__":
# Run with: uv run pytest tests/server/test_introspection_authorization.py -v -s
pytest.main([__file__, "-v", "-s", "-m", "integration"])
+3 -3
View File
@@ -16,15 +16,15 @@ async def test_prm_endpoint():
"""Test that the Protected Resource Metadata endpoint returns correct data."""
import httpx
# Test the PRM endpoint directly
# Test the PRM endpoint directly (RFC 9728 - path includes /mcp resource)
async with httpx.AsyncClient() as client:
response = await client.get(
"http://localhost:8001/.well-known/oauth-protected-resource"
"http://localhost:8001/.well-known/oauth-protected-resource/mcp"
)
assert response.status_code == 200
prm_data = response.json()
assert prm_data["resource"] == "http://localhost:8001"
assert prm_data["resource"] == "http://localhost:8001/mcp"
assert "nc:read" in prm_data["scopes_supported"]
assert "nc:write" in prm_data["scopes_supported"]
assert "http://localhost:8080" in prm_data["authorization_servers"]
Generated
+1 -1
View File
@@ -931,7 +931,7 @@ wheels = [
[[package]]
name = "nextcloud-mcp-server"
version = "0.17.1"
version = "0.19.0"
source = { editable = "." }
dependencies = [
{ name = "caldav" },