Files
Chris Coutinho f43343356e fix: address review feedback — security, caching, CI 429 retry
- Add 429 retry with exponential backoff to register_client() (fixes CI
  oauth matrix failures from parallel DCR requests)
- Make client_id, redirect_uri, and PKCE mandatory at token endpoint
- Add null-checks for discovery_url and OAuth credentials in proxy flows
- Add OIDC discovery document caching with 5-min TTL
- Add per-IP rate limiting on /oauth/register DCR proxy
- Discover DCR endpoint from OIDC discovery instead of hardcoding
- Extract extract_user_id_from_token to auth/token_utils.py (breaks
  circular imports between server/ and auth/ layers)
- Add TTL scope cache in scope_authorization.py (avoids DB hit per tool)
- Add defense-in-depth scope validation in storage layer
- Broaden elicitation exception handling with graceful fallback
- Add idempotentHint to nc_auth_check_status, return "pending" status
  after accepted elicitation, add polling interval to description
- Change ALL_SUPPORTED_SCOPES from tuple to frozenset for O(1) lookups
- Replace Optional[str] with str | None throughout config.py
- Use default_factory for ProxyCodeEntry/ASProxySession dataclasses
- Add proxy code/session cleanup to background loop
- Fix OIDC verification CI step to only run for oauth/login-flow modes
- Add unit tests for access.py REST endpoints (10 tests)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 17:22:23 +01:00

641 lines
23 KiB
Python

"""
MCP Tools for OAuth and Provisioning Management (ADR-004 Progressive Consent).
This module provides MCP tools that enable users to explicitly provision
Nextcloud access using the Flow 2 (Resource Provisioning) OAuth flow.
"""
import logging
import os
import secrets
from datetime import datetime, timezone
from typing import Optional
from urllib.parse import urlencode
from mcp.server.fastmcp import Context
from mcp.types import ToolAnnotations
from pydantic import BaseModel, Field
from nextcloud_mcp_server.auth import require_scopes
from nextcloud_mcp_server.auth.astrolabe_client import AstrolabeClient
from nextcloud_mcp_server.auth.storage import RefreshTokenStorage
from nextcloud_mcp_server.auth.token_broker import TokenBrokerService
# Re-export for backward compatibility — canonical location is auth.token_utils
from nextcloud_mcp_server.auth.token_utils import (
extract_user_id_from_token as extract_user_id_from_token, # noqa: PLC0414
)
from nextcloud_mcp_server.config import get_settings
logger = logging.getLogger(__name__)
class ProvisioningStatus(BaseModel):
"""Status of Nextcloud provisioning for a user."""
is_provisioned: bool = Field(description="Whether Nextcloud access is provisioned")
provisioned_at: Optional[str] = Field(
None, description="ISO timestamp when provisioned"
)
credential_type: Optional[str] = Field(
None, description="Type of credential ('refresh_token' or 'app_password')"
)
client_id: Optional[str] = Field(
None, description="Client ID that initiated the original Flow 1"
)
scopes: Optional[list[str]] = Field(None, description="Granted scopes")
flow_type: Optional[str] = Field(
None, description="Type of flow used ('hybrid', 'flow1', 'flow2')"
)
class ProvisioningResult(BaseModel):
"""Result of provisioning attempt."""
success: bool = Field(description="Whether provisioning was initiated")
provisioning_url: Optional[str] = Field(
None, description="URL to Astrolabe settings for provisioning background sync"
)
message: str = Field(description="Status message for the user")
already_provisioned: bool = Field(
False, description="Whether access was already provisioned"
)
class RevocationResult(BaseModel):
"""Result of access revocation."""
success: bool = Field(description="Whether revocation succeeded")
message: str = Field(description="Status message for the user")
class LoginConfirmation(BaseModel):
"""Schema for login confirmation elicitation."""
acknowledged: bool = Field(
default=False,
description="Check this box after completing login at the provided URL",
)
async def get_provisioning_status(ctx: Context, user_id: str) -> ProvisioningStatus:
"""
Check the provisioning status for Nextcloud access.
Checks for both credential types:
1. App password from Astrolabe (works today)
2. OAuth refresh token from storage (for future)
Args:
mcp: MCP context
user_id: User identifier
Returns:
ProvisioningStatus with current provisioning state
"""
settings = get_settings()
# Check for app password first (interim solution)
if settings.oidc_client_id and settings.oidc_client_secret:
try:
astrolabe = AstrolabeClient(
nextcloud_host=settings.nextcloud_host or "",
client_id=settings.oidc_client_id,
client_secret=settings.oidc_client_secret,
)
status = await astrolabe.get_background_sync_status(user_id)
if status.get("has_access"):
logger.info(
f" get_provisioning_status: ✓ App password FOUND for user_id={user_id}"
)
provisioned_at_str = status.get("provisioned_at")
return ProvisioningStatus(
is_provisioned=True,
provisioned_at=provisioned_at_str,
credential_type="app_password",
)
except Exception as e:
logger.debug(f" App password check failed for {user_id}: {e}")
# Check for OAuth refresh token (fallback)
logger.info(
f" get_provisioning_status: Looking up refresh token for user_id={user_id}"
)
storage = RefreshTokenStorage.from_env()
await storage.initialize()
token_data = await storage.get_refresh_token(user_id)
if not token_data:
logger.info(
f" get_provisioning_status: ✗ No credentials found for user_id={user_id}"
)
return ProvisioningStatus(is_provisioned=False)
logger.info(
f" get_provisioning_status: ✓ Refresh token FOUND for user_id={user_id}"
)
logger.info(f" flow_type: {token_data.get('flow_type')}")
logger.info(
f" provisioning_client_id: {token_data.get('provisioning_client_id', 'N/A')}"
)
# Convert timestamp to ISO format if present
provisioned_at_str = None
if token_data.get("provisioned_at"):
dt = datetime.fromtimestamp(token_data["provisioned_at"], tz=timezone.utc)
provisioned_at_str = dt.isoformat()
return ProvisioningStatus(
is_provisioned=True,
provisioned_at=provisioned_at_str,
credential_type="refresh_token",
client_id=token_data.get("provisioning_client_id"),
scopes=token_data.get("scopes"),
flow_type=token_data.get("flow_type", "hybrid"),
)
def generate_oauth_url_for_flow2(
oidc_discovery_url: str,
server_client_id: str,
redirect_uri: str,
state: str,
scopes: list[str],
) -> str:
"""
Generate OAuth authorization URL for Flow 2 (Resource Provisioning).
This returns the MCP server's Flow 2 authorization endpoint, which will:
1. Generate PKCE parameters (required by Nextcloud OIDC)
2. Store code_verifier in session
3. Redirect to Nextcloud IdP with PKCE
4. Handle the callback with code_verifier for token exchange
Args:
oidc_discovery_url: OIDC provider discovery URL (unused, kept for compatibility)
server_client_id: MCP server's OAuth client ID (unused, kept for compatibility)
redirect_uri: Callback URL for the MCP server (unused, kept for compatibility)
state: CSRF protection state
scopes: List of scopes to request (unused, kept for compatibility)
Returns:
MCP server's Flow 2 authorization URL with state parameter
"""
# Use the MCP server's Flow 2 endpoint which handles PKCE internally
# This endpoint will:
# - Generate code_verifier and code_challenge (PKCE)
# - Store code_verifier in session storage
# - Redirect to Nextcloud with PKCE parameters
# - Handle the callback with proper code_verifier
mcp_server_url = os.getenv("NEXTCLOUD_MCP_SERVER_URL", "http://localhost:8000")
auth_endpoint = f"{mcp_server_url}/oauth/authorize-nextcloud"
# Only pass state parameter - the endpoint handles everything else
params = {"state": state}
return f"{auth_endpoint}?{urlencode(params)}"
async def provision_nextcloud_access(
ctx: Context, user_id: Optional[str] = None
) -> ProvisioningResult:
"""
MCP Tool: Provision offline access to Nextcloud resources.
Returns URL to Astrolabe settings page where users can provision background
sync access using either:
- App password (works today, interim solution)
- OAuth refresh token (future, when Nextcloud supports OAuth for app APIs)
Args:
ctx: MCP context with user's Flow 1 token
user_id: Optional user identifier (extracted from token if not provided)
Returns:
ProvisioningResult with Astrolabe settings URL or status
"""
try:
# Extract user ID from the MCP access token (Flow 1 token)
if not user_id:
user_id = await extract_user_id_from_token(ctx)
# Check if already provisioned
status = await get_provisioning_status(ctx, user_id)
if status.is_provisioned:
return ProvisioningResult(
success=True,
already_provisioned=True,
message=(
f"Nextcloud access is already provisioned (credential_type={status.credential_type}, "
f"since {status.provisioned_at}). "
"Use 'revoke_nextcloud_access' if you want to re-provision."
),
)
# Get configuration using settings (handles both ENABLE_BACKGROUND_OPERATIONS
# and ENABLE_OFFLINE_ACCESS environment variables)
settings = get_settings()
if not settings.enable_offline_access:
return ProvisioningResult(
success=False,
message=(
"Offline access is not enabled. "
"Set ENABLE_BACKGROUND_OPERATIONS=true to use this feature."
),
)
# Return Astrolabe settings URL for background sync provisioning
nextcloud_host = os.getenv("NEXTCLOUD_HOST", "http://localhost:8080")
astrolabe_url = f"{nextcloud_host}/settings/user/astrolabe#background-sync"
return ProvisioningResult(
success=True,
provisioning_url=astrolabe_url,
message=(
"Visit Astrolabe settings to provision background sync access.\n\n"
"You can choose either:\n"
"- App password (works today, recommended for now)\n"
"- OAuth refresh token (future, when Nextcloud fully supports OAuth)\n\n"
"After provisioning, background sync will enable the MCP server to "
"access Nextcloud resources even when you're not actively connected."
),
)
except Exception as e:
logger.error(f"Failed to initiate provisioning: {e}")
return ProvisioningResult(
success=False,
message=f"Failed to initiate provisioning: {str(e)}",
)
async def revoke_nextcloud_access(
ctx: Context, user_id: Optional[str] = None
) -> RevocationResult:
"""
MCP Tool: Revoke offline access to Nextcloud resources.
This tool removes the stored refresh token and revokes access
that was granted via Flow 2.
Args:
mcp: MCP context
user_id: Optional user identifier
Returns:
RevocationResult with status
"""
try:
# Get user ID from token if not provided
if not user_id:
logger.info("Extracting user_id from access token for revoke...")
user_id = await extract_user_id_from_token(ctx)
logger.info(f" Revoke using user_id: {user_id}")
# Check current status
status = await get_provisioning_status(ctx, user_id)
if not status.is_provisioned:
return RevocationResult(
success=True,
message="No Nextcloud access to revoke.",
)
# Initialize Token Broker to handle revocation
storage = RefreshTokenStorage.from_env()
await storage.initialize()
# Get OAuth client credentials from storage
client_creds = await storage.get_oauth_client()
if not client_creds:
return RevocationResult(
success=False,
message="OAuth client credentials not found in storage.",
)
broker = TokenBrokerService(
storage=storage,
oidc_discovery_url=os.getenv(
"OIDC_DISCOVERY_URL",
f"{os.getenv('NEXTCLOUD_HOST')}/.well-known/openid-configuration",
),
nextcloud_host=os.getenv("NEXTCLOUD_HOST"), # type: ignore
client_id=client_creds["client_id"],
client_secret=client_creds["client_secret"],
)
# Revoke access
success = await broker.revoke_nextcloud_access(user_id)
if success:
return RevocationResult(
success=True,
message=(
"Successfully revoked Nextcloud access. "
"You can run 'provision_nextcloud_access' again if needed."
),
)
else:
return RevocationResult(
success=False,
message="Failed to revoke access. Please try again.",
)
except Exception as e:
logger.error(f"Failed to revoke access: {e}")
return RevocationResult(
success=False,
message=f"Failed to revoke access: {str(e)}",
)
async def check_provisioning_status(
ctx: Context, user_id: Optional[str] = None
) -> ProvisioningStatus:
"""
MCP Tool: Check the current provisioning status.
This tool allows users to check whether they have provisioned
Nextcloud access and see details about their current authorization.
Args:
mcp: MCP context
user_id: Optional user identifier
Returns:
ProvisioningStatus with current state
"""
# Get user ID from context if not provided
if not user_id:
user_id = (
ctx.context.get("user_id", "default_user") # type: ignore
if hasattr(ctx, "context")
else "default_user"
)
return await get_provisioning_status(ctx, user_id)
async def check_logged_in(ctx: Context, user_id: Optional[str] = None) -> str:
"""
MCP Tool: Check if user is logged in and elicit login if needed.
This tool checks whether the user has completed Flow 2 (resource provisioning)
to grant offline access to Nextcloud. If not logged in, it uses MCP elicitation
to prompt the user to complete the login flow.
Args:
ctx: MCP context with user's Flow 1 token
user_id: Optional user identifier (extracted from token if not provided)
Returns:
"yes" if logged in, or elicitation prompting for login
"""
try:
# Extract user ID from the MCP access token (Flow 1 token)
logger.info("=" * 60)
logger.info("check_logged_in: Starting user_id extraction")
logger.info("=" * 60)
if not user_id:
user_id = await extract_user_id_from_token(ctx)
logger.info(f" Final user_id for check_logged_in: {user_id}")
else:
logger.info(f" user_id provided as argument: {user_id}")
# Check if already logged in
logger.info(f"Checking provisioning status for user_id: {user_id}")
status = await get_provisioning_status(ctx, user_id)
logger.info(f" Provisioning status: is_provisioned={status.is_provisioned}")
if status.is_provisioned:
logger.info(f"✓ User {user_id} is already logged in - returning 'yes'")
logger.info("=" * 60)
return "yes"
logger.info(f"✗ User {user_id} is NOT logged in - triggering elicitation")
logger.info("=" * 60)
# Not logged in - generate OAuth URL for Flow 2
# Use settings (handles both ENABLE_BACKGROUND_OPERATIONS and ENABLE_OFFLINE_ACCESS)
settings = get_settings()
if not settings.enable_offline_access:
return (
"Not logged in. Offline access is not enabled. "
"Set ENABLE_BACKGROUND_OPERATIONS=true to use this feature."
)
# Get MCP server's OAuth client credentials
# Try environment variable first, then fall back to DCR client_id
server_client_id = os.getenv("MCP_SERVER_CLIENT_ID")
if not server_client_id:
# Try to get from lifespan context (DCR)
lifespan_ctx = ctx.request_context.lifespan_context
if hasattr(lifespan_ctx, "server_client_id"):
server_client_id = lifespan_ctx.server_client_id
if not server_client_id:
return (
"Not logged in. MCP server OAuth client not configured. "
"Set MCP_SERVER_CLIENT_ID environment variable or use Dynamic Client Registration."
)
# Generate OAuth URL for Flow 2
oidc_discovery_url = os.getenv(
"OIDC_DISCOVERY_URL",
f"{os.getenv('NEXTCLOUD_HOST')}/.well-known/openid-configuration",
)
# Generate secure state for CSRF protection
state = secrets.token_urlsafe(32)
# Store state in session for validation on callback
storage = RefreshTokenStorage.from_env()
await storage.initialize()
# Create OAuth session for Flow 2
session_id = f"flow2_{user_id}_{secrets.token_hex(8)}"
redirect_uri = f"{os.getenv('NEXTCLOUD_MCP_SERVER_URL', 'http://localhost:8000')}/oauth/callback"
await storage.store_oauth_session(
session_id=session_id,
client_redirect_uri="", # No client redirect for Flow 2
state=state,
flow_type="flow2",
is_provisioning=True,
ttl_seconds=600, # 10 minute TTL
)
# Define scopes for Nextcloud access
scopes = [
"openid",
"profile",
"email",
"offline_access", # Critical for background operations
"notes:read",
"notes:write",
"calendar:read",
"calendar:write",
"contacts:read",
"contacts:write",
"files:read",
"files:write",
]
# Generate authorization URL
auth_url = generate_oauth_url_for_flow2(
oidc_discovery_url=oidc_discovery_url,
server_client_id=server_client_id,
redirect_uri=redirect_uri,
state=state,
scopes=scopes,
)
# Use elicitation to prompt user to login
logger.info(f"Eliciting login for user {user_id} with URL: {auth_url}")
result = await ctx.elicit(
message=f"Please log in to Nextcloud at the following URL:\n\n{auth_url}\n\nAfter completing the login, check the box below and click OK.",
schema=LoginConfirmation,
)
if result.action == "accept":
# Check if login was successful by looking for refresh token
# Strategy: Try multiple lookup methods to handle both flows
logger.info("User accepted login prompt, checking for refresh token")
logger.info(f" State parameter: {state[:16]}...")
logger.info(f" User ID: {user_id}")
# First, try to find token by provisioning_client_id (Flow 2 from elicitation)
refresh_token_data = (
await storage.get_refresh_token_by_provisioning_client_id(state)
)
if refresh_token_data:
logger.info("✓ Refresh token found via provisioning_client_id lookup")
logger.info(
f" Flow type: {refresh_token_data.get('flow_type', 'unknown')}"
)
logger.info(
f" Provisioned at: {refresh_token_data.get('provisioned_at', 'unknown')}"
)
return "yes"
# Fallback: Try to find token by user_id (browser login or any other flow)
logger.info(f"✗ No token found with provisioning_client_id={state[:16]}...")
logger.info(f" Trying fallback lookup by user_id: {user_id}")
refresh_token_data = await storage.get_refresh_token(user_id)
if refresh_token_data:
logger.info("✓ Refresh token found via user_id lookup")
logger.info(
f" Flow type: {refresh_token_data.get('flow_type', 'unknown')}"
)
logger.info(
f" Provisioned at: {refresh_token_data.get('provisioned_at', 'unknown')}"
)
logger.info(
f" Provisioning client ID: {refresh_token_data.get('provisioning_client_id', 'NULL')}"
)
logger.info(
" Note: This token was created via browser login or different flow"
)
return "yes"
# No token found by either method
logger.warning(f"✗ No refresh token found for user {user_id}")
logger.warning(
f" Checked provisioning_client_id={state[:16]}... - NOT FOUND"
)
logger.warning(f" Checked user_id={user_id} - NOT FOUND")
logger.warning(
" This may indicate the user completed login but token wasn't stored"
)
return (
"Login not detected. Please ensure you completed the login "
"at the provided URL before clicking OK."
)
elif result.action == "decline":
return "Login declined by user."
else:
return "Login cancelled by user."
except Exception as e:
logger.error(f"Failed to check login status: {e}")
return f"Error checking login status: {str(e)}"
# Register MCP tools
def register_oauth_tools(mcp):
"""Register OAuth and provisioning tools with the MCP server."""
@mcp.tool(
name="provision_nextcloud_access",
title="Grant Server Access to Nextcloud",
description=(
"Provision offline access to Nextcloud resources. "
"This is required before using Nextcloud tools. "
"You'll need to complete an OAuth authorization in your browser."
),
annotations=ToolAnnotations(
idempotentHint=False, # Creates new OAuth session each time
openWorldHint=True,
),
)
@require_scopes("openid")
async def tool_provision_access(
ctx: Context,
user_id: Optional[str] = None,
) -> ProvisioningResult:
return await provision_nextcloud_access(ctx, user_id)
@mcp.tool(
name="revoke_nextcloud_access",
title="Revoke Server Access to Nextcloud",
description="Revoke offline access to Nextcloud resources.",
annotations=ToolAnnotations(
destructiveHint=True, # Removes stored access tokens
idempotentHint=True, # Revoking revoked access = same end state
openWorldHint=True,
),
)
@require_scopes("openid")
async def tool_revoke_access(
ctx: Context, user_id: Optional[str] = None
) -> RevocationResult:
return await revoke_nextcloud_access(ctx, user_id)
@mcp.tool(
name="check_provisioning_status",
title="Check Provisioning Status",
description="Check whether Nextcloud access is provisioned.",
annotations=ToolAnnotations(
readOnlyHint=True, # Only checks status, doesn't modify
openWorldHint=True,
),
)
@require_scopes("openid")
async def tool_check_status(
ctx: Context, user_id: Optional[str] = None
) -> ProvisioningStatus:
return await check_provisioning_status(ctx, user_id)
@mcp.tool(
name="check_logged_in",
title="Check Server Login Status",
description=(
"Check if you are logged in to Nextcloud. "
"If not logged in, this tool will prompt you to complete the login flow."
),
annotations=ToolAnnotations(
readOnlyHint=True, # Checking status doesn't modify state
openWorldHint=True,
),
)
@require_scopes("openid")
async def tool_check_logged_in(ctx: Context, user_id: Optional[str] = None) -> str:
return await check_logged_in(ctx, user_id)