db1e0606ad
Consolidate three independent RefreshTokenStorage lazy singletons into a single lock-protected get_shared_storage() function, eliminating race conditions on concurrent first-access. Remove blanket try/except in _get_stored_scopes so storage errors propagate as proper MCP errors instead of silently triggering "please provision" messages. Handle declined/cancelled elicitation results in Login Flow tools by cleaning up sessions and returning clear status. Add update_app_password_scopes() to avoid unnecessary decrypt/re-encrypt when only scopes change. Add unprovisioned-user early exit and no-op detection to nc_auth_update_scopes. Remove four dead config fields and misleading NEXTCLOUD_PASSWORD deprecation warning. Add periodic login flow session cleanup task. Generate separate Fernet keys per service. Add board cleanup in deck integration test. Gate CI unit tests on linting and skip Astrolabe build for single-user profile. Fix test markers from oauth to multi_user_basic for astrolabe integration tests. Update login_flow.py docstrings to document outbound HTTP calls. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
303 lines
11 KiB
Python
303 lines
11 KiB
Python
"""Helper functions for accessing context in MCP tools."""
|
|
|
|
import logging
|
|
|
|
from httpx import BasicAuth
|
|
from mcp.server.fastmcp import Context
|
|
|
|
from nextcloud_mcp_server.auth.context_helper import (
|
|
get_client_from_context,
|
|
get_session_client_from_context,
|
|
)
|
|
from nextcloud_mcp_server.auth.scope_authorization import ProvisioningRequiredError
|
|
from nextcloud_mcp_server.auth.storage import get_shared_storage
|
|
from nextcloud_mcp_server.client import NextcloudClient
|
|
from nextcloud_mcp_server.config import (
|
|
DeploymentMode,
|
|
get_deployment_mode,
|
|
get_settings,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def get_client(ctx: Context) -> NextcloudClient:
|
|
"""
|
|
Get the appropriate Nextcloud client based on authentication mode.
|
|
|
|
ADR-016 compliant implementation supporting three deployment modes:
|
|
|
|
1. Smithery stateless mode (SMITHERY_DEPLOYMENT=true):
|
|
Create client from session configuration (nextcloud_url, username, app_password)
|
|
No persistent state - client created per-request from Smithery session config.
|
|
|
|
2. BasicAuth mode: Returns shared client from lifespan context
|
|
|
|
3. OAuth mode:
|
|
a. Multi-audience mode (ENABLE_TOKEN_EXCHANGE=false, default):
|
|
Token already contains both MCP and Nextcloud audiences - use directly
|
|
b. Token exchange mode (ENABLE_TOKEN_EXCHANGE=true):
|
|
Exchange MCP token for Nextcloud token via RFC 8693
|
|
|
|
SECURITY: Token passthrough has been REMOVED. All OAuth modes validate
|
|
proper token audiences per MCP Security Best Practices specification.
|
|
|
|
Note: Nextcloud doesn't support OAuth scopes natively. Scopes are enforced
|
|
by the MCP server via @require_scopes decorator, not by the IdP.
|
|
|
|
This function automatically detects the authentication mode by checking
|
|
the deployment mode and type of the lifespan context.
|
|
|
|
Args:
|
|
ctx: MCP request context
|
|
|
|
Returns:
|
|
NextcloudClient configured for the current authentication mode
|
|
|
|
Raises:
|
|
AttributeError: If context doesn't contain expected data
|
|
ValueError: If Smithery mode but session config is missing required fields
|
|
|
|
Example:
|
|
```python
|
|
@mcp.tool()
|
|
async def my_tool(ctx: Context):
|
|
client = await get_client(ctx)
|
|
return await client.capabilities()
|
|
```
|
|
"""
|
|
deployment_mode = get_deployment_mode()
|
|
|
|
# ADR-016: Smithery stateless mode - create client from session config
|
|
if deployment_mode == DeploymentMode.SMITHERY_STATELESS:
|
|
return _get_client_from_session_config(ctx)
|
|
|
|
settings = get_settings()
|
|
|
|
# Multi-user BasicAuth pass-through mode - extract credentials from request
|
|
if settings.enable_multi_user_basic_auth:
|
|
return _get_client_from_basic_auth(ctx)
|
|
|
|
lifespan_ctx = ctx.request_context.lifespan_context
|
|
|
|
# Login Flow v2 multi-user mode: app password is REQUIRED for NC API access
|
|
# OAuth token is only used for MCP session identity, not NC API calls
|
|
if hasattr(lifespan_ctx, "nextcloud_host") and settings.enable_login_flow:
|
|
return await _get_client_from_login_flow(ctx, lifespan_ctx.nextcloud_host)
|
|
|
|
# BasicAuth mode - use shared client (no token exchange)
|
|
if hasattr(lifespan_ctx, "client"):
|
|
return lifespan_ctx.client
|
|
|
|
# OAuth mode (has 'nextcloud_host' attribute)
|
|
if hasattr(lifespan_ctx, "nextcloud_host"):
|
|
if settings.enable_token_exchange:
|
|
# Mode 2: Exchange MCP token for Nextcloud token
|
|
# Token was validated to have MCP audience in UnifiedTokenVerifier
|
|
# Now exchange it for Nextcloud audience
|
|
return await get_session_client_from_context(
|
|
ctx, lifespan_ctx.nextcloud_host
|
|
)
|
|
else:
|
|
# Mode 1: Multi-audience token - use directly
|
|
# Token was validated to have MCP audience in UnifiedTokenVerifier
|
|
# Nextcloud will independently validate its own audience when receiving API calls
|
|
return get_client_from_context(ctx, lifespan_ctx.nextcloud_host)
|
|
|
|
# Unknown context type
|
|
raise AttributeError(
|
|
f"Lifespan context does not have 'client' or 'nextcloud_host' attribute. "
|
|
f"Type: {type(lifespan_ctx)}"
|
|
)
|
|
|
|
|
|
def _get_client_from_session_config(ctx: Context) -> NextcloudClient:
|
|
"""
|
|
Create NextcloudClient from Smithery session configuration.
|
|
|
|
ADR-016: In Smithery stateless mode, each request includes session config
|
|
with the user's Nextcloud credentials. This function creates a fresh client
|
|
for each request - no state is persisted between requests.
|
|
|
|
For container runtime, config is extracted from URL query parameters by
|
|
SmitheryConfigMiddleware and stored in a context variable.
|
|
|
|
Expected session config fields (from Smithery configSchema):
|
|
- nextcloud_url: str - Nextcloud instance URL (required)
|
|
- username: str - Nextcloud username (required)
|
|
- app_password: str - Nextcloud app password (required)
|
|
|
|
Args:
|
|
ctx: MCP request context (not used directly for Smithery config)
|
|
|
|
Returns:
|
|
NextcloudClient configured with session credentials
|
|
|
|
Raises:
|
|
ValueError: If required session config fields are missing
|
|
"""
|
|
# ADR-016: Get session config from context variable (set by SmitheryConfigMiddleware)
|
|
from nextcloud_mcp_server.app import get_smithery_session_config # noqa: PLC0415
|
|
|
|
session_config = get_smithery_session_config()
|
|
|
|
if session_config is None:
|
|
raise ValueError(
|
|
"Session configuration required in Smithery mode. "
|
|
"Ensure nextcloud_url, username, and app_password are provided as URL query parameters."
|
|
)
|
|
|
|
# Extract required fields - config is always a dict from SmitheryConfigMiddleware
|
|
nextcloud_url = session_config.get("nextcloud_url")
|
|
username = session_config.get("username")
|
|
app_password = session_config.get("app_password")
|
|
|
|
# Validate required fields
|
|
missing_fields = []
|
|
if not nextcloud_url:
|
|
missing_fields.append("nextcloud_url")
|
|
if not username:
|
|
missing_fields.append("username")
|
|
if not app_password:
|
|
missing_fields.append("app_password")
|
|
|
|
if missing_fields:
|
|
raise ValueError(
|
|
f"Missing required session config fields: {', '.join(missing_fields)}. "
|
|
f"Configure these in the Smithery connection settings."
|
|
)
|
|
|
|
# Type assertions after validation (for type checker)
|
|
# These are guaranteed to be str after the missing_fields check above
|
|
assert nextcloud_url is not None
|
|
assert username is not None
|
|
assert app_password is not None
|
|
|
|
# Validate URL format
|
|
if not nextcloud_url.startswith(("http://", "https://")):
|
|
raise ValueError(
|
|
f"Invalid nextcloud_url: {nextcloud_url}. "
|
|
f"Must start with http:// or https://"
|
|
)
|
|
|
|
logger.debug(f"Creating Smithery client for {nextcloud_url} as {username}")
|
|
|
|
# Create client with session credentials using BasicAuth
|
|
return NextcloudClient(
|
|
base_url=nextcloud_url,
|
|
username=username,
|
|
auth=BasicAuth(username, app_password),
|
|
)
|
|
|
|
|
|
def _get_client_from_basic_auth(ctx: Context) -> NextcloudClient:
|
|
"""
|
|
Create NextcloudClient from BasicAuth credentials in request headers.
|
|
|
|
For multi-user BasicAuth pass-through mode, this function extracts
|
|
username/password from the Authorization: Basic header (stored by
|
|
BasicAuthMiddleware) and creates a client that passes these credentials
|
|
through to Nextcloud APIs.
|
|
|
|
The credentials are NOT stored persistently - they exist only for the
|
|
duration of this request (stateless).
|
|
|
|
Args:
|
|
ctx: MCP request context with basic_auth in request state
|
|
|
|
Returns:
|
|
NextcloudClient configured with BasicAuth credentials
|
|
|
|
Raises:
|
|
ValueError: If BasicAuth credentials not found in request or if
|
|
NEXTCLOUD_HOST is not configured
|
|
"""
|
|
settings = get_settings()
|
|
|
|
# Validate that NEXTCLOUD_HOST is configured
|
|
if not settings.nextcloud_host:
|
|
raise ValueError(
|
|
"NEXTCLOUD_HOST environment variable must be set for multi-user BasicAuth mode"
|
|
)
|
|
|
|
# Extract BasicAuth credentials from request state (set by BasicAuthMiddleware)
|
|
# Access scope through the request object
|
|
scope = getattr(ctx.request_context.request, "scope", None)
|
|
if scope is None:
|
|
raise ValueError("Request scope not available in context")
|
|
|
|
request_state = scope.get("state", {})
|
|
basic_auth = request_state.get("basic_auth")
|
|
|
|
if not basic_auth:
|
|
raise ValueError(
|
|
"BasicAuth credentials not found in request. "
|
|
"Ensure Authorization: Basic header is provided with valid credentials."
|
|
)
|
|
|
|
username = basic_auth.get("username")
|
|
password = basic_auth.get("password")
|
|
|
|
if not username or not password:
|
|
raise ValueError("Invalid BasicAuth credentials - missing username or password")
|
|
|
|
logger.debug(
|
|
f"Creating multi-user BasicAuth client for {settings.nextcloud_host} as {username}"
|
|
)
|
|
|
|
# Create client that passes BasicAuth credentials through to Nextcloud
|
|
# settings.nextcloud_host is guaranteed to be str after the check above
|
|
return NextcloudClient(
|
|
base_url=settings.nextcloud_host,
|
|
username=username,
|
|
auth=BasicAuth(username, password),
|
|
)
|
|
|
|
|
|
async def _get_client_from_login_flow(
|
|
ctx: Context, nextcloud_host: str
|
|
) -> NextcloudClient:
|
|
"""Create NextcloudClient from stored Login Flow v2 app password.
|
|
|
|
In Login Flow v2 mode, the OAuth token only provides MCP session identity.
|
|
Nextcloud API calls always use the stored app password obtained via Login Flow v2.
|
|
|
|
Args:
|
|
ctx: MCP context (used to extract user identity)
|
|
nextcloud_host: Nextcloud instance URL
|
|
|
|
Returns:
|
|
NextcloudClient with stored app password credentials
|
|
|
|
Raises:
|
|
ProvisioningRequiredError: If no stored app password exists
|
|
"""
|
|
from nextcloud_mcp_server.server.oauth_tools import ( # noqa: PLC0415
|
|
extract_user_id_from_token,
|
|
)
|
|
|
|
user_id = await extract_user_id_from_token(ctx)
|
|
if not user_id or user_id == "default_user":
|
|
raise ProvisioningRequiredError(
|
|
"Cannot determine user identity from MCP token."
|
|
)
|
|
|
|
storage = await get_shared_storage()
|
|
|
|
app_data = await storage.get_app_password_with_scopes(user_id)
|
|
if not app_data:
|
|
raise ProvisioningRequiredError(
|
|
"Nextcloud access not provisioned. "
|
|
"Call nc_auth_provision_access to complete Login Flow."
|
|
)
|
|
|
|
username = app_data.get("username") or user_id
|
|
|
|
logger.debug(f"Creating Login Flow v2 client for {nextcloud_host} as {username}")
|
|
|
|
return NextcloudClient(
|
|
base_url=nextcloud_host,
|
|
username=username,
|
|
auth=BasicAuth(username, app_data["app_password"]),
|
|
)
|