refactor: Update JWT client to use DCR, re-enable tool filtering

This commit is contained in:
Chris Coutinho
2025-10-23 09:33:06 +02:00
parent e48f5f3f30
commit e9a16c43b5
5 changed files with 63 additions and 21 deletions
+8 -10
View File
@@ -92,21 +92,19 @@ services:
ports:
- 127.0.0.1:8002:8002
environment:
#- NEXTCLOUD_HOST=http://app:80
- NEXTCLOUD_HOST=http://host.docker.internal:8080
- NEXTCLOUD_HOST=http://app:80
- NEXTCLOUD_MCP_SERVER_URL=http://127.0.0.1:8002
- NEXTCLOUD_PUBLIC_ISSUER_URL=http://localhost:8080
- NEXTCLOUD_OIDC_CLIENT_STORAGE=/var/www/html/.oauth-jwt/nextcloud_oauth_client.json
- NEXTCLOUD_PUBLIC_ISSUER_URL=http://127.0.0.1:8080
- NEXTCLOUD_OIDC_CLIENT_STORAGE=/app/.oauth-jwt/nextcloud_oauth_client.json
- NEXTCLOUD_OIDC_SCOPES=openid profile email nc:read nc:write
# No USERNAME/PASSWORD - will use OAuth with JWT tokens
# Client credentials auto-generated by app container post-installation hook
- NEXTCLOUD_OIDC_TOKEN_TYPE=jwt
# No USERNAME/PASSWORD - will use OAuth with Dynamic Client Registration (DCR)
# Client will be registered with token_type=JWT on first startup
volumes:
# NOTE: JWT-enabled OIDC client credentials created during nextcloud installation scripts
- nextcloud:/var/www/html:ro
extra_hosts:
- host.docker.internal:host-gateway
- oauth-jwt-client-storage:/app/.oauth-jwt
volumes:
nextcloud:
db:
oauth-client-storage:
oauth-jwt-client-storage:
+28 -10
View File
@@ -19,6 +19,7 @@ from nextcloud_mcp_server.auth import (
NextcloudTokenVerifier,
get_access_token_scopes,
has_required_scopes,
is_jwt_token,
)
from nextcloud_mcp_server.client import NextcloudClient
from nextcloud_mcp_server.config import LOGGING_CONFIG, setup_logging
@@ -195,6 +196,14 @@ async def load_oauth_client_credentials(
)
logger.info(f"Requesting OAuth scopes: {scopes}")
# Get token type from environment (Bearer or jwt)
# Note: Must be lowercase "jwt" to match OIDC app's check
token_type = os.getenv("NEXTCLOUD_OIDC_TOKEN_TYPE", "Bearer").lower()
# Special case: "bearer" should remain capitalized for compatibility
if token_type != "jwt":
token_type = "Bearer"
logger.info(f"Requesting token type: {token_type}")
# Load or register client
from nextcloud_mcp_server.auth.client_registration import (
load_or_register_client,
@@ -207,6 +216,7 @@ async def load_oauth_client_credentials(
client_name="Nextcloud MCP Server",
redirect_uris=redirect_uris,
scopes=scopes,
token_type=token_type,
)
logger.info(f"OAuth client ready: {client_info.client_id[:16]}...")
@@ -464,40 +474,48 @@ def get_app(transport: str = "sse", enabled_apps: list[str] | None = None):
original_list_tools = mcp._tool_manager.list_tools
def list_tools_filtered():
"""List tools filtered by user's token scopes."""
"""List tools filtered by user's token scopes (JWT tokens only)."""
# Get user's scopes from token using MCP SDK's contextvar
# This works for all request types including list_tools
user_scopes = get_access_token_scopes()
logger.info(f"🔍 list_tools called - User scopes: {user_scopes}")
is_jwt = is_jwt_token()
logger.info(
f"🔍 list_tools called - Token type: {'JWT' if is_jwt else 'opaque/none'}, "
f"User scopes: {user_scopes}"
)
# Get all tools
all_tools = original_list_tools()
# If OAuth mode and user has scopes, filter by them
# TODO: Re-enable once OIDC clients respect allowed_scopes from PRM
if 1 == 0: # user_scopes:
# Only filter for JWT tokens (opaque tokens show all tools)
# JWT tokens have scopes embedded, so we can reliably filter
# Opaque tokens may not have accurate scope information from introspection
if is_jwt and user_scopes:
allowed_tools = [
tool
for tool in all_tools
if has_required_scopes(tool.fn, user_scopes)
]
logger.info(
f"✂️ Filtered tools: {len(allowed_tools)}/{len(all_tools)} tools "
f"✂️ JWT scope filtering: {len(allowed_tools)}/{len(all_tools)} tools "
f"available for scopes: {user_scopes}"
)
else:
# BasicAuth mode or no token - show all tools
# Opaque token, BasicAuth mode, or no token - show all tools
allowed_tools = all_tools
logger.info(
f"📋 No scope filtering: showing all {len(all_tools)} tools"
reason = (
"opaque token (no filtering)"
if not is_jwt and user_scopes
else "no token/BasicAuth"
)
logger.info(f"📋 Showing all {len(all_tools)} tools ({reason})")
# Return the Tool objects directly (they're already in the correct format)
return allowed_tools
# Replace the tool manager's list_tools method
mcp._tool_manager.list_tools = list_tools_filtered
logger.info("Dynamic tool filtering enabled for OAuth mode")
logger.info("Dynamic tool filtering enabled for OAuth mode (JWT tokens only)")
if transport == "sse":
mcp_app = mcp.sse_app()
+2
View File
@@ -10,6 +10,7 @@ from .scope_authorization import (
get_access_token_scopes,
get_required_scopes,
has_required_scopes,
is_jwt_token,
require_scopes,
)
from .token_verifier import NextcloudTokenVerifier
@@ -27,4 +28,5 @@ __all__ = [
"get_access_token_scopes",
"get_required_scopes",
"has_required_scopes",
"is_jwt_token",
]
@@ -213,6 +213,30 @@ def get_required_scopes(func: Callable) -> list[str]:
return getattr(func, "_required_scopes", [])
def is_jwt_token() -> bool:
"""
Check if the current access token is in JWT format.
JWT tokens have 3 parts separated by dots (header.payload.signature).
Opaque tokens are random strings without this structure.
Returns:
True if current token is JWT format, False if opaque or no token
"""
access_token: AccessToken | None = get_access_token()
if access_token is None:
logger.debug("No access token found - not JWT")
return False
# JWT tokens have exactly 2 dots (3 parts)
token_string = access_token.token
is_jwt = "." in token_string and token_string.count(".") == 2
logger.debug(f"Token format check: is_jwt={is_jwt}")
return is_jwt
def has_required_scopes(func: Callable, user_scopes: set[str]) -> bool:
"""
Check if a user has all scopes required by a function.